1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.fosstrak.epcis.repository.query;
22
23 import java.io.ByteArrayInputStream;
24 import java.io.ByteArrayOutputStream;
25 import java.io.IOException;
26 import java.io.InputStream;
27 import java.io.ObjectOutput;
28 import java.io.ObjectOutputStream;
29 import java.io.OutputStreamWriter;
30 import java.io.Serializable;
31 import java.io.StringWriter;
32 import java.io.Writer;
33 import java.math.BigDecimal;
34 import java.net.HttpURLConnection;
35 import java.net.URL;
36 import java.sql.Connection;
37 import java.sql.PreparedStatement;
38 import java.sql.SQLException;
39 import java.sql.Timestamp;
40 import java.util.Calendar;
41 import java.util.Date;
42 import java.util.GregorianCalendar;
43 import java.util.Properties;
44
45 import javax.naming.Context;
46 import javax.naming.InitialContext;
47 import javax.naming.NamingException;
48 import javax.net.ssl.HttpsURLConnection;
49 import javax.net.ssl.SSLContext;
50 import javax.net.ssl.TrustManager;
51 import javax.net.ssl.X509TrustManager;
52 import javax.sql.DataSource;
53 import javax.xml.bind.JAXBContext;
54 import javax.xml.bind.JAXBElement;
55 import javax.xml.bind.JAXBException;
56 import javax.xml.bind.Marshaller;
57 import javax.xml.datatype.DatatypeConfigurationException;
58 import javax.xml.datatype.DatatypeFactory;
59 import javax.xml.datatype.XMLGregorianCalendar;
60
61 import org.fosstrak.epcis.model.EPCISQueryBodyType;
62 import org.fosstrak.epcis.model.EPCISQueryDocumentType;
63 import org.fosstrak.epcis.model.EventListType;
64 import org.fosstrak.epcis.model.ImplementationException;
65 import org.fosstrak.epcis.model.ObjectFactory;
66 import org.fosstrak.epcis.model.Poll;
67 import org.fosstrak.epcis.model.QueryParam;
68 import org.fosstrak.epcis.model.QueryParams;
69 import org.fosstrak.epcis.model.QueryResults;
70 import org.fosstrak.epcis.model.QueryTooLargeException;
71 import org.fosstrak.epcis.repository.EpcisQueryCallbackInterface;
72 import org.fosstrak.epcis.soap.EPCISServicePortType;
73 import org.fosstrak.epcis.soap.ImplementationExceptionResponse;
74 import org.fosstrak.epcis.soap.NoSuchNameExceptionResponse;
75 import org.fosstrak.epcis.soap.QueryParameterExceptionResponse;
76 import org.fosstrak.epcis.soap.QueryTooComplexExceptionResponse;
77 import org.fosstrak.epcis.soap.QueryTooLargeExceptionResponse;
78 import org.fosstrak.epcis.soap.SecurityExceptionResponse;
79 import org.fosstrak.epcis.soap.ValidationExceptionResponse;
80 import org.apache.commons.logging.Log;
81 import org.apache.commons.logging.LogFactory;
82 import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
83
84
85
86
87
88
89
90
91
92 public class QuerySubscription implements EpcisQueryCallbackInterface, Serializable {
93
94 private static final long serialVersionUID = -3066828914403000033L;
95
96 private static final Log LOG = LogFactory.getLog(QuerySubscription.class);
97
98
99 protected String subscriptionID;
100 protected String dest;
101 protected Calendar initialRecordTime;
102 protected Boolean reportIfEmpty;
103 protected String queryName;
104 private QueryParams queryParams;
105 private Calendar lastTimeExecuted;
106
107 private Properties properties;
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 public QuerySubscription(final String subscriptionID, final QueryParams queryParams, final String dest,
129 final Boolean reportIfEmpty, final Calendar initialRecordTime,
130 final Calendar lastTimeExecuted, final String queryName) {
131 LOG.debug("Constructing Query Subscription with ID '" + subscriptionID + "'");
132 this.queryParams = queryParams;
133 this.subscriptionID = subscriptionID;
134 this.dest = dest;
135 this.initialRecordTime = initialRecordTime;
136 this.reportIfEmpty = reportIfEmpty;
137 this.queryName = queryName;
138 this.lastTimeExecuted = lastTimeExecuted;
139
140
141
142 updateRecordTime(queryParams, initialRecordTime);
143 }
144
145
146
147
148
149
150
151
152
153
154
155
156
157 private void updateSubscription(final Calendar lastTimeExecuted) {
158 String jndiName = getProperties().getProperty("jndi.datasource.name", "java:comp/env/jdbc/EPCISDB");
159 try {
160
161 Context ctx = new InitialContext();
162 DataSource db = (DataSource) ctx.lookup(jndiName);
163 Connection dbconnection = db.getConnection();
164
165
166 String update = "UPDATE subscription SET lastexecuted=(?), params=(?)" + " WHERE subscriptionid=(?);";
167 PreparedStatement stmt = dbconnection.prepareStatement(update);
168 LOG.debug("SQL: " + update);
169 Timestamp ts = new Timestamp(lastTimeExecuted.getTimeInMillis());
170 String time = ts.toString();
171 stmt.setString(1, time);
172 LOG.debug(" query param 1: " + time);
173 ByteArrayOutputStream outStream = new ByteArrayOutputStream();
174 ObjectOutput out = new ObjectOutputStream(outStream);
175 out.writeObject(queryParams);
176 ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
177 stmt.setBinaryStream(2, inStream, inStream.available());
178 LOG.debug(" query param 2: [" + inStream.available() + " bytes]");
179 stmt.setString(3, subscriptionID);
180 LOG.debug(" query param 3: " + subscriptionID);
181 stmt.executeUpdate();
182 dbconnection.commit();
183
184
185 dbconnection.close();
186 } catch (SQLException e) {
187 String msg = "An SQL error occurred while updating the subscriptions in the database.";
188 LOG.error(msg, e);
189 } catch (IOException e) {
190 String msg = "Unable to update the subscription in the database: " + e.getMessage();
191 LOG.error(msg, e);
192 } catch (NamingException e) {
193 String msg = "Unable to find JNDI data source with name " + jndiName;
194 LOG.error(msg, e);
195 }
196 }
197
198
199
200
201
202
203
204
205
206
207
208 private void updateRecordTime(final QueryParams queryParams, final Calendar initialRecordTime) {
209
210 boolean foundRecordTime = false;
211 for (QueryParam p : this.queryParams.getParam()) {
212 if (p.getName().equalsIgnoreCase("GE_recordTime")) {
213 LOG.debug("Updating query parameter 'GE_recordTime' with value '" + initialRecordTime + "'.");
214 p.setValue(initialRecordTime.getTimeInMillis());
215 foundRecordTime = true;
216 break;
217 }
218 }
219 if (!foundRecordTime) {
220 LOG.debug("Adding query parameter 'GE_recordTime' with value '" + initialRecordTime + "'.");
221 QueryParam newParam = new QueryParam();
222 newParam.setName("GE_recordTime");
223 newParam.setValue(initialRecordTime);
224 this.queryParams.getParam().add(newParam);
225 }
226
227
228 updateSubscription(initialRecordTime);
229 }
230
231
232
233
234 public void executeQuery() {
235 if (LOG.isDebugEnabled()) {
236 LOG.debug("--------------------------------------------");
237 LOG.debug("Executing subscribed query '" + subscriptionID + "' with " + queryParams.getParam().size()
238 + " parameters:");
239 for (QueryParam p : queryParams.getParam()) {
240 LOG.debug(" param name: " + p.getName());
241 Object val = p.getValue();
242 if (val instanceof GregorianCalendar) {
243 LOG.debug(" param value: " + ((GregorianCalendar) val).getTime());
244 } else {
245 LOG.debug(" param value: " + val);
246 }
247 }
248 }
249
250
251 Poll poll = new Poll();
252 poll.setQueryName(queryName);
253 poll.setParams(queryParams);
254 QueryResults result = null;
255 try {
256
257 GregorianCalendar cal = new GregorianCalendar();
258 result = executePoll(poll);
259 LOG.debug("Subscribed query '" + subscriptionID + "' has been executed");
260
261
262
263
264 this.lastTimeExecuted = cal;
265 } catch (QueryTooLargeExceptionResponse e) {
266
267 QueryTooLargeException qtle = e.getFaultInfo();
268 if (qtle == null) {
269 qtle = new QueryTooLargeException();
270 qtle.setQueryName(queryName);
271 qtle.setSubscriptionID(subscriptionID);
272 qtle.setReason(e.getMessage());
273 LOG.info("USER ERROR: " + qtle.getReason());
274 }
275 callbackQueryTooLargeException(qtle);
276 return;
277 } catch (ImplementationExceptionResponse e) {
278
279 ImplementationException ie = e.getFaultInfo();
280 if (ie == null) {
281 ie = new ImplementationException();
282 ie.setQueryName(queryName);
283 ie.setReason(e.getMessage());
284 ie.setSubscriptionID(subscriptionID);
285 LOG.info("USER ERROR: " + ie.getReason());
286 }
287 callbackImplementationException(ie);
288 return;
289 } catch (Exception e) {
290 String msg = "An unexpected error occurred while executing a subscribed query";
291 LOG.error(msg + ": " + e.getMessage(), e);
292
293 ImplementationException ie = new ImplementationException();
294 ie.setQueryName(queryName);
295 ie.setReason(msg);
296 ie.setSubscriptionID(subscriptionID);
297 callbackImplementationException(ie);
298 return;
299 }
300 result.setSubscriptionID(subscriptionID);
301 EventListType eventList = result.getResultsBody().getEventList();
302
303
304 boolean isEmpty = false;
305 isEmpty = (eventList == null) ? true : eventList.getObjectEventOrAggregationEventOrQuantityEvent().isEmpty();
306 if (!reportIfEmpty.booleanValue() && isEmpty) {
307 LOG.debug("Subscribed query '" + subscriptionID + "' returned no results, nothing to report.");
308 return;
309 }
310
311 callbackResults(result);
312
313
314 updateRecordTime(queryParams, lastTimeExecuted);
315 }
316
317
318
319
320 protected QueryResults executePoll(Poll poll) throws ImplementationExceptionResponse,
321 QueryTooComplexExceptionResponse, QueryTooLargeExceptionResponse, SecurityExceptionResponse,
322 ValidationExceptionResponse, NoSuchNameExceptionResponse, QueryParameterExceptionResponse {
323
324
325
326
327
328
329
330 JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
331 factory.setAddress("local://query");
332 factory.setServiceClass(EPCISServicePortType.class);
333 EPCISServicePortType servicePort = (EPCISServicePortType)
334 factory.create();
335
336 return servicePort.poll(poll);
337 }
338
339
340
341
342 public void callbackResults(final QueryResults results) {
343 callbackObject(results);
344 }
345
346
347
348
349 public void callbackImplementationException(ImplementationException ie) {
350 callbackObject(ie);
351 }
352
353
354
355
356 public void callbackQueryTooLargeException(QueryTooLargeException qtle) {
357 callbackObject(qtle);
358 }
359
360
361
362
363
364
365
366
367
368
369
370 private void callbackObject(final Object o) {
371 if (LOG.isDebugEnabled()) {
372 LOG.debug("Callback " + o + " at " + new Date());
373 }
374
375 EPCISQueryDocumentType epcisDoc = new EPCISQueryDocumentType();
376 epcisDoc.setSchemaVersion(BigDecimal.valueOf(1.0));
377 try {
378 DatatypeFactory dataFactory = DatatypeFactory.newInstance();
379 XMLGregorianCalendar now = dataFactory.newXMLGregorianCalendar(new GregorianCalendar());
380 epcisDoc.setCreationDate(now);
381 } catch (DatatypeConfigurationException e) {
382
383 }
384 EPCISQueryBodyType epcisBody = new EPCISQueryBodyType();
385 if (o instanceof QueryResults) {
386 epcisBody.setQueryResults((QueryResults) o);
387 } else if (o instanceof QueryTooLargeException) {
388 epcisBody.setQueryTooLargeException((QueryTooLargeException) o);
389 } else if (o instanceof ImplementationException) {
390 epcisBody.setImplementationException((ImplementationException) o);
391 } else {
392 epcisBody = null;
393 }
394 epcisDoc.setEPCISBody(epcisBody);
395
396
397 String data;
398 try {
399 data = marshalQueryDoc(epcisDoc);
400 } catch (JAXBException e) {
401 String msg = "An error serializing contents occurred: " + e.getMessage();
402 LOG.error(msg, e);
403 return;
404 }
405
406
407 try {
408 URL serviceUrl = new URL(dest.toString());
409 if (LOG.isDebugEnabled()) {
410 LOG.debug("Sending results of subscribed query '" + subscriptionID + "' to '" + serviceUrl + "'");
411 if (data.length() < 10 * 1024) {
412 LOG.debug("Sending data:\n" + data);
413 } else {
414 LOG.debug("Sending data: [" + data.length() + " bytes]");
415 }
416 }
417 int responseCode;
418 try {
419 responseCode = sendData(serviceUrl, data);
420 } catch (Exception e) {
421 LOG.warn("Unable to send results of subscribed query '" + subscriptionID + "' to '" + serviceUrl
422 + "', retrying in 3 sec ...");
423
424 try {
425 Thread.sleep(3000);
426 } catch (InterruptedException e1) {
427
428 }
429 try {
430 responseCode = sendData(serviceUrl, data);
431 } catch (Exception e2) {
432 LOG.warn("Unable to send results of subscribed query '" + subscriptionID + "' to '" + serviceUrl
433 + "', retrying in 3 sec ...");
434
435 try {
436 Thread.sleep(3000);
437 } catch (InterruptedException e1) {
438
439 }
440 responseCode = sendData(serviceUrl, data);
441 }
442 }
443 LOG.debug("Response " + responseCode);
444 } catch (IOException e) {
445 String msg = "Unable to send results of subscribed query '" + subscriptionID + "' to '" + dest + "': "
446 + e.getMessage();
447 LOG.error(msg, e);
448 return;
449 }
450 }
451
452
453
454
455
456
457
458
459 private String marshalQueryDoc(EPCISQueryDocumentType epcisDoc) throws JAXBException {
460 ObjectFactory objectFactory = new ObjectFactory();
461 JAXBContext context = JAXBContext.newInstance("org.fosstrak.epcis.model");
462 JAXBElement<EPCISQueryDocumentType> item = objectFactory.createEPCISQueryDocument(epcisDoc);
463 LOG.debug("Serializing " + item + " into XML");
464 StringWriter writer = new StringWriter();
465 Marshaller marshaller = context.createMarshaller();
466 marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
467 marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
468 marshaller.marshal(item, writer);
469 return writer.toString();
470 }
471
472
473
474
475
476
477
478
479
480
481
482
483 private int sendData(final URL url, final String data) throws IOException {
484 HttpURLConnection connection;
485 if ("HTTPS".equalsIgnoreCase(url.getProtocol()) && trustAllCertificates()) {
486 connection = getAllTrustingConnection(url);
487 } else {
488 connection = getConnection(url);
489 }
490 connection.setRequestMethod("POST");
491 connection.setRequestProperty("content-type", "text/xml");
492 connection.setRequestProperty("content-length", "" + data.length());
493 connection.setDoOutput(true);
494 connection.setDoInput(true);
495
496
497 Writer out = new OutputStreamWriter(connection.getOutputStream());
498 out.write(data);
499 out.flush();
500 out.close();
501
502
503 int responseCode = connection.getResponseCode();
504
505
506 connection.disconnect();
507 return responseCode;
508 }
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532 private HttpURLConnection getConnection(URL url) throws IOException {
533 return (HttpURLConnection) url.openConnection();
534 }
535
536
537
538
539
540
541
542
543
544
545
546
547 private HttpURLConnection getAllTrustingConnection(URL url) throws IOException {
548
549 TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() {
550 public java.security.cert.X509Certificate[] getAcceptedIssuers() {
551 return null;
552 }
553
554 public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
555 }
556
557 public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
558 }
559 } };
560
561
562 try {
563 SSLContext sc = SSLContext.getInstance("SSL");
564 sc.init(null, trustAllCerts, new java.security.SecureRandom());
565 HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
566 } catch (Exception e) {
567 LOG.error("Unable to install the all-trusting trust manager", e);
568 }
569 return getConnection(url);
570 }
571
572
573
574
575
576 private boolean trustAllCertificates() {
577 Properties properties = getProperties();
578 return Boolean.parseBoolean(properties.getProperty("trustAllCertificates", "false"));
579 }
580
581
582
583
584
585
586
587 private Properties getProperties() {
588 if (properties == null) {
589
590 String resource = "/application.properties";
591 InputStream is = this.getClass().getResourceAsStream(resource);
592 properties = new Properties();
593 try {
594 properties.load(is);
595 is.close();
596 } catch (IOException e) {
597 LOG.error("Unable to load application properties from classpath:" + resource + " ("
598 + this.getClass().getResource(resource) + ")", e);
599 }
600 }
601 return properties;
602 }
603
604
605
606
607 public Calendar getInitialRecordTime() {
608 return initialRecordTime;
609 }
610
611
612
613
614 public String getSubscriptionID() {
615 return subscriptionID;
616 }
617 }