View Javadoc

1   /*
2    * Copyright (C) 2007 ETH Zurich
3    *
4    * This file is part of Fosstrak (www.fosstrak.org).
5    *
6    * Fosstrak is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License version 2.1, as published by the Free Software Foundation.
9    *
10   * Fosstrak is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13   * Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public
16   * License along with Fosstrak; if not, write to the Free
17   * Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18   * Boston, MA  02110-1301  USA
19   */
20  
21  package org.fosstrak.capturingapp;
22  
23  import java.io.BufferedReader;
24  import java.io.ByteArrayInputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.InputStreamReader;
28  import java.net.ServerSocket;
29  import java.net.Socket;
30  import java.util.LinkedList;
31  import java.util.concurrent.ConcurrentLinkedQueue;
32  
33  import org.apache.log4j.Logger;
34  import org.apache.log4j.PropertyConfigurator;
35  import org.fosstrak.ale.util.DeserializerUtil;
36  import org.fosstrak.ale.xsd.ale.epcglobal.ECReports;
37  import org.fosstrak.epcis.captureclient.CaptureClient;
38  import org.fosstrak.epcis.model.EPCISDocumentType;
39  
40  /**
41   * the capture application retrieves an ECReports from a specified socket. Then 
42   * a set of registered handlers get invoked with the ECReports returning a 
43   * simple EPCIS document. The capture application relays the EPCIS document to 
44   * the EPCIS repository.
45   * @author sawielan
46   *
47   */
48  public class CaptureApp implements Runnable {
49  
50  	// the port where the capture application is listening.
51  	private int port = -1;
52  
53  	// the URL where the EPCIS can be called.
54  	private String epcisRepositoryURL = null;
55  
56  	// the EPCIS capture client.
57  	private CaptureClient client = null;
58  
59  	// execute the capture app.
60  	private boolean execute = true;
61  
62  	// logger
63  	private static final Logger log = Logger.getLogger(CaptureApp.class);
64  	
65  	// server socket accepting incoming reports.
66  	private ServerSocket ss = null;
67  	
68  	// a queue holding the received reports.
69  	private ConcurrentLinkedQueue<ECReports> reports = 
70  		new ConcurrentLinkedQueue<ECReports> ();
71  
72  	// the ECReport handlers.
73  	private ConcurrentLinkedQueue<ECReportsHandler> handlers =
74  		new ConcurrentLinkedQueue<ECReportsHandler> ();
75  	
76  	// the EPCIS documents.
77  	private ConcurrentLinkedQueue<EPCISDocumentType> epcisDocs = 
78  		new ConcurrentLinkedQueue<EPCISDocumentType> ();
79  	
80  	// the reports queue worker.
81  	private Thread reportsQueueWorker = null;
82  	
83  	// the EPCIS documents worker.
84  	private Thread epcisQueueWorker = null;
85  	
86  	// flag whether capture app is up and running.
87  	private boolean up = false;
88  	
89  	/**
90  	 * construct a new capture application.
91  	 * 
92  	 * @param port
93  	 *            the port where to listen for incoming ECReports.
94  	 * @param epcisRepositoryURL
95  	 *            the URL where to call the EPCIS.
96  	 */
97  	public CaptureApp(int port, String epcisRepositoryURL) {
98  		this.setPort(port);
99  		this.setEpcisRepositoryURL(epcisRepositoryURL);
100 	}
101 
102 	/**
103 	 * construct a new capture application.
104 	 * 
105 	 * @param port
106 	 *            the port where to listen for incoming ECReports.
107 	 * @param epcisClient
108 	 *            the EPCIS capture client.
109 	 */
110 	public CaptureApp(int port, CaptureClient epcisClient) {
111 		this.setPort(port);
112 		this.client = epcisClient;
113 	}
114 	
115 	/**
116 	 * stops the execution of the capture app.
117 	 */
118 	public void stopCaptureApp() throws IOException {
119 		this.execute = false;
120 		reportsQueueWorker.interrupt();
121 		epcisQueueWorker.interrupt();
122 		ss.close();
123 	}
124 	
125 	/**
126 	 * @return true if capture application is up.
127 	 */
128 	public boolean isUp() {
129 		return up;
130 	}
131 	
132 	/**
133 	 * @return true if capture application is executing.
134 	 */
135 	public boolean isExecuting() {
136 		return execute;
137 	}
138 
139 	/**
140 	 * handles incoming ECReports.
141 	 * @param reports the ECReports.
142 	 */
143 	private void handleReports(ECReports reports) {
144 		log.debug("Handling incoming reports");
145 		synchronized (this.reports) {
146 			this.reports.add(reports);
147 			this.reports.notifyAll();
148 		}
149 	}
150 	
151 	/**
152 	 * register a handler for ECReports.
153 	 * @param handler the handler for the ECReport.
154 	 */
155 	public void registerHandler(ECReportsHandler handler) {
156 		synchronized (handlers) {
157 			handlers.add(handler);
158 		}
159 	}
160 	
161 	/** 
162 	 * removes a handler.
163 	 * @param handler the handler
164 	 */
165 	public void deregisterHandler(ECReportsHandler handler) {
166 		synchronized (handlers) {
167 			handlers.remove(handler);
168 		}
169 	}
170 
171 	public void run() {
172 		if ((null == client) && (null == getEpcisRepositoryURL())) {
173 			log.error("parameters for EPCIS repository missing");
174 			throw new RuntimeException(
175 					"parameters for EPCIS repository missing");
176 		}
177 		if (null == client) {
178 			client = new CaptureClient(getEpcisRepositoryURL());
179 		}
180 		
181 		// queue worker...
182 		reportsQueueWorker = new Thread(new Runnable() {
183 			public void run() {
184 				
185 				while (execute) {
186 					try { 
187 						ECReports r = null;
188 						synchronized (reports) {
189 							while (0 == reports.size()) {
190 								reports.wait();
191 							}
192 							// remove the first report to work on.
193 							r = reports.remove();
194 						}
195 						synchronized (handlers) {
196 							for (ECReportsHandler handler : handlers) {
197 								try {
198 									// retrieve the EPCIS document
199 									LinkedList<EPCISDocumentType> docs = 
200 										handler.handle(
201 											r);
202 									
203 									if (null != docs) {
204 										// add it to the queue
205 										synchronized (epcisDocs) {
206 											for (EPCISDocumentType doc : docs) {
207 												if (null != doc) {
208 													epcisDocs.add(doc);
209 												}
210 											}
211 											epcisDocs.notifyAll();
212 										}
213 									}
214 								} catch (Exception ex) {
215 									log.debug("handler triggered exception." + 
216 											ex.getMessage());
217 								}
218 							}
219 						}
220 					} catch (InterruptedException e) {
221 						log.debug("received interrupt.");
222 					}
223 				}
224 				log.info("stopping queue worker.");
225 			}			
226 		});
227 		reportsQueueWorker.start();
228 		
229 		// EPCIS documents queue worker...
230 		epcisQueueWorker = new Thread(new Runnable() {
231 			public void run() {
232 				while (execute) {
233 					try { 
234 						EPCISDocumentType doc = null;
235 						synchronized (epcisDocs) {
236 							while (0 == epcisDocs.size()) {
237 								epcisDocs.wait();
238 							}
239 							// remove the first report to work on.
240 							doc = epcisDocs.remove();
241 						}
242 						try {
243 							int httpResponseCode = client.capture(doc);
244 							if (httpResponseCode != 200) {
245 							    log.error("The event could NOT be captured!");
246 							}
247 						} catch (Exception e) {
248 							e.printStackTrace();
249 						}
250 					} catch (InterruptedException e) {
251 						log.debug("received interrupt.");
252 					}
253 				}
254 				log.info("stopping queue worker.");
255 			}
256 		});
257 		epcisQueueWorker.start();
258 
259 		try {
260 			log.debug(String.format("Binding capture app to port %d", getPort()));
261 			ss = new ServerSocket(getPort());
262 			up = true;
263 			while (execute) {
264 				try {
265 					Socket s = ss.accept();
266 					BufferedReader in = new BufferedReader(
267 							new InputStreamReader(s.getInputStream()));
268 					
269 					String data = in.readLine();
270 					// ignore the HTTP header
271 					data = in.readLine();
272 					data = in.readLine();
273 					data = in.readLine();
274 					data = in.readLine();
275 
276 					StringBuffer buffer = new StringBuffer();
277 					while (null != data) {
278 						buffer.append(data);
279 						data = in.readLine();
280 					}
281 					log.debug(buffer.toString());
282 
283 					// create a stream from the buffer
284 					InputStream parseStream = new ByteArrayInputStream(
285 							buffer.toString().getBytes());
286 
287 					// parse the string
288 					ECReports reports = DeserializerUtil
289 							.deserializeECReports(parseStream);
290 					if (null != reports) {
291 						handleReports(reports);
292 					}
293 				} catch (Exception e) {
294 					log.error(String.format("Could not receive report: %s", 
295 							e.getMessage()));
296 				}
297 			}
298 		} catch (IOException bindException) {
299 			log.error(String.format("Could not bind capture app: %s", 
300 					bindException.getMessage()));
301 		}
302 		ss = null;
303 		execute = false;
304 		up = false;
305 	}
306 
307 	
308 	/**
309 	 * @param port the port to set
310 	 */
311 	public void setPort(int port) {
312 		this.port = port;
313 	}
314 
315 	/**
316 	 * @return the port
317 	 */
318 	public int getPort() {
319 		return port;
320 	}
321 
322 	/**
323 	 * @param epcisRepositoryURL the epcisRepositoryURL to set
324 	 */
325 	public void setEpcisRepositoryURL(String epcisRepositoryURL) {
326 		this.epcisRepositoryURL = epcisRepositoryURL;
327 	}
328 
329 	/**
330 	 * @return the epcisRepositoryURL
331 	 */
332 	public String getEpcisRepositoryURL() {
333 		return epcisRepositoryURL;
334 	}
335 
336 	/**
337 	 * starts the CaptureApp in event sink mode (means no relay to EPCIS).
338 	 * 
339 	 * @param args the first command line parameter is the TCP port. if omitted port 9999 is used.
340 	 */
341 	public static void main(String[] args) {
342 		CaptureApp client;
343 		int port;
344 		// check if args[0] is tcp-port
345 		if (args.length > 0){
346 			port = Integer.parseInt(args[0]);
347 			client = new CaptureApp(port, "dummy");
348 		} else {	 
349 			return;
350 		}
351 		
352 		// register the say hello handler 
353 		client.registerHandler(new DefaultECReportHandler());
354 		
355 		// configure Logger with properties file
356 		PropertyConfigurator.configure(
357 				CaptureApp.class.getResource("/log4j.properties"));
358 		
359 		new Thread(client).start();
360 		try {
361 			synchronized (CaptureApp.class) {
362 				CaptureApp.class.wait();
363 			}
364 		} catch (InterruptedException e) {
365 			e.printStackTrace();
366 		}
367 		System.out.println("Exiting");
368 	}
369 }