1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.fosstrak.capturingapp;
22
23 import java.io.BufferedReader;
24 import java.io.ByteArrayInputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.InputStreamReader;
28 import java.net.ServerSocket;
29 import java.net.Socket;
30 import java.util.LinkedList;
31 import java.util.concurrent.ConcurrentLinkedQueue;
32
33 import org.apache.log4j.Logger;
34 import org.apache.log4j.PropertyConfigurator;
35 import org.fosstrak.ale.util.DeserializerUtil;
36 import org.fosstrak.ale.xsd.ale.epcglobal.ECReports;
37 import org.fosstrak.epcis.captureclient.CaptureClient;
38 import org.fosstrak.epcis.model.EPCISDocumentType;
39
40
41
42
43
44
45
46
47
48 public class CaptureApp implements Runnable {
49
50
51 private int port = -1;
52
53
54 private String epcisRepositoryURL = null;
55
56
57 private CaptureClient client = null;
58
59
60 private boolean execute = true;
61
62
63 private static final Logger log = Logger.getLogger(CaptureApp.class);
64
65
66 private ServerSocket ss = null;
67
68
69 private ConcurrentLinkedQueue<ECReports> reports =
70 new ConcurrentLinkedQueue<ECReports> ();
71
72
73 private ConcurrentLinkedQueue<ECReportsHandler> handlers =
74 new ConcurrentLinkedQueue<ECReportsHandler> ();
75
76
77 private ConcurrentLinkedQueue<EPCISDocumentType> epcisDocs =
78 new ConcurrentLinkedQueue<EPCISDocumentType> ();
79
80
81 private Thread reportsQueueWorker = null;
82
83
84 private Thread epcisQueueWorker = null;
85
86
87 private boolean up = false;
88
89
90
91
92
93
94
95
96
97 public CaptureApp(int port, String epcisRepositoryURL) {
98 this.setPort(port);
99 this.setEpcisRepositoryURL(epcisRepositoryURL);
100 }
101
102
103
104
105
106
107
108
109
110 public CaptureApp(int port, CaptureClient epcisClient) {
111 this.setPort(port);
112 this.client = epcisClient;
113 }
114
115
116
117
118 public void stopCaptureApp() throws IOException {
119 this.execute = false;
120 reportsQueueWorker.interrupt();
121 epcisQueueWorker.interrupt();
122 ss.close();
123 }
124
125
126
127
128 public boolean isUp() {
129 return up;
130 }
131
132
133
134
135 public boolean isExecuting() {
136 return execute;
137 }
138
139
140
141
142
143 private void handleReports(ECReports reports) {
144 log.debug("Handling incoming reports");
145 synchronized (this.reports) {
146 this.reports.add(reports);
147 this.reports.notifyAll();
148 }
149 }
150
151
152
153
154
155 public void registerHandler(ECReportsHandler handler) {
156 synchronized (handlers) {
157 handlers.add(handler);
158 }
159 }
160
161
162
163
164
165 public void deregisterHandler(ECReportsHandler handler) {
166 synchronized (handlers) {
167 handlers.remove(handler);
168 }
169 }
170
171 public void run() {
172 if ((null == client) && (null == getEpcisRepositoryURL())) {
173 log.error("parameters for EPCIS repository missing");
174 throw new RuntimeException(
175 "parameters for EPCIS repository missing");
176 }
177 if (null == client) {
178 client = new CaptureClient(getEpcisRepositoryURL());
179 }
180
181
182 reportsQueueWorker = new Thread(new Runnable() {
183 public void run() {
184
185 while (execute) {
186 try {
187 ECReports r = null;
188 synchronized (reports) {
189 while (0 == reports.size()) {
190 reports.wait();
191 }
192
193 r = reports.remove();
194 }
195 synchronized (handlers) {
196 for (ECReportsHandler handler : handlers) {
197 try {
198
199 LinkedList<EPCISDocumentType> docs =
200 handler.handle(
201 r);
202
203 if (null != docs) {
204
205 synchronized (epcisDocs) {
206 for (EPCISDocumentType doc : docs) {
207 if (null != doc) {
208 epcisDocs.add(doc);
209 }
210 }
211 epcisDocs.notifyAll();
212 }
213 }
214 } catch (Exception ex) {
215 log.debug("handler triggered exception." +
216 ex.getMessage());
217 }
218 }
219 }
220 } catch (InterruptedException e) {
221 log.debug("received interrupt.");
222 }
223 }
224 log.info("stopping queue worker.");
225 }
226 });
227 reportsQueueWorker.start();
228
229
230 epcisQueueWorker = new Thread(new Runnable() {
231 public void run() {
232 while (execute) {
233 try {
234 EPCISDocumentType doc = null;
235 synchronized (epcisDocs) {
236 while (0 == epcisDocs.size()) {
237 epcisDocs.wait();
238 }
239
240 doc = epcisDocs.remove();
241 }
242 try {
243 int httpResponseCode = client.capture(doc);
244 if (httpResponseCode != 200) {
245 log.error("The event could NOT be captured!");
246 }
247 } catch (Exception e) {
248 e.printStackTrace();
249 }
250 } catch (InterruptedException e) {
251 log.debug("received interrupt.");
252 }
253 }
254 log.info("stopping queue worker.");
255 }
256 });
257 epcisQueueWorker.start();
258
259 try {
260 log.debug(String.format("Binding capture app to port %d", getPort()));
261 ss = new ServerSocket(getPort());
262 up = true;
263 while (execute) {
264 try {
265 Socket s = ss.accept();
266 BufferedReader in = new BufferedReader(
267 new InputStreamReader(s.getInputStream()));
268
269 String data = in.readLine();
270
271 data = in.readLine();
272 data = in.readLine();
273 data = in.readLine();
274 data = in.readLine();
275
276 StringBuffer buffer = new StringBuffer();
277 while (null != data) {
278 buffer.append(data);
279 data = in.readLine();
280 }
281 log.debug(buffer.toString());
282
283
284 InputStream parseStream = new ByteArrayInputStream(
285 buffer.toString().getBytes());
286
287
288 ECReports reports = DeserializerUtil
289 .deserializeECReports(parseStream);
290 if (null != reports) {
291 handleReports(reports);
292 }
293 } catch (Exception e) {
294 log.error(String.format("Could not receive report: %s",
295 e.getMessage()));
296 }
297 }
298 } catch (IOException bindException) {
299 log.error(String.format("Could not bind capture app: %s",
300 bindException.getMessage()));
301 }
302 ss = null;
303 execute = false;
304 up = false;
305 }
306
307
308
309
310
311 public void setPort(int port) {
312 this.port = port;
313 }
314
315
316
317
318 public int getPort() {
319 return port;
320 }
321
322
323
324
325 public void setEpcisRepositoryURL(String epcisRepositoryURL) {
326 this.epcisRepositoryURL = epcisRepositoryURL;
327 }
328
329
330
331
332 public String getEpcisRepositoryURL() {
333 return epcisRepositoryURL;
334 }
335
336
337
338
339
340
341 public static void main(String[] args) {
342 CaptureApp client;
343 int port;
344
345 if (args.length > 0){
346 port = Integer.parseInt(args[0]);
347 client = new CaptureApp(port, "dummy");
348 } else {
349 return;
350 }
351
352
353 client.registerHandler(new DefaultECReportHandler());
354
355
356 PropertyConfigurator.configure(
357 CaptureApp.class.getResource("/log4j.properties"));
358
359 new Thread(client).start();
360 try {
361 synchronized (CaptureApp.class) {
362 CaptureApp.class.wait();
363 }
364 } catch (InterruptedException e) {
365 e.printStackTrace();
366 }
367 System.out.println("Exiting");
368 }
369 }