View Javadoc

1   /*
2    * Copyright (C) 2007 ETH Zurich
3    *
4    * This file is part of Fosstrak (www.fosstrak.org).
5    *
6    * Fosstrak is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License version 2.1, as published by the Free Software Foundation.
9    *
10   * Fosstrak is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13   * Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public
16   * License along with Fosstrak; if not, write to the Free
17   * Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18   * Boston, MA  02110-1301  USA
19   */
20  
21  package org.fosstrak.epcis.repository.query;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.ObjectOutput;
28  import java.io.ObjectOutputStream;
29  import java.io.OutputStreamWriter;
30  import java.io.Serializable;
31  import java.io.StringWriter;
32  import java.io.Writer;
33  import java.math.BigDecimal;
34  import java.net.HttpURLConnection;
35  import java.net.URL;
36  import java.sql.Connection;
37  import java.sql.PreparedStatement;
38  import java.sql.SQLException;
39  import java.sql.Timestamp;
40  import java.util.Calendar;
41  import java.util.Date;
42  import java.util.GregorianCalendar;
43  import java.util.Properties;
44  
45  import javax.naming.Context;
46  import javax.naming.InitialContext;
47  import javax.naming.NamingException;
48  import javax.net.ssl.HttpsURLConnection;
49  import javax.net.ssl.SSLContext;
50  import javax.net.ssl.TrustManager;
51  import javax.net.ssl.X509TrustManager;
52  import javax.sql.DataSource;
53  import javax.xml.bind.JAXBContext;
54  import javax.xml.bind.JAXBElement;
55  import javax.xml.bind.JAXBException;
56  import javax.xml.bind.Marshaller;
57  import javax.xml.datatype.DatatypeConfigurationException;
58  import javax.xml.datatype.DatatypeFactory;
59  import javax.xml.datatype.XMLGregorianCalendar;
60  
61  import org.fosstrak.epcis.model.EPCISQueryBodyType;
62  import org.fosstrak.epcis.model.EPCISQueryDocumentType;
63  import org.fosstrak.epcis.model.EventListType;
64  import org.fosstrak.epcis.model.ImplementationException;
65  import org.fosstrak.epcis.model.ObjectFactory;
66  import org.fosstrak.epcis.model.Poll;
67  import org.fosstrak.epcis.model.QueryParam;
68  import org.fosstrak.epcis.model.QueryParams;
69  import org.fosstrak.epcis.model.QueryResults;
70  import org.fosstrak.epcis.model.QueryTooLargeException;
71  import org.fosstrak.epcis.repository.EpcisQueryCallbackInterface;
72  import org.fosstrak.epcis.soap.EPCISServicePortType;
73  import org.fosstrak.epcis.soap.ImplementationExceptionResponse;
74  import org.fosstrak.epcis.soap.NoSuchNameExceptionResponse;
75  import org.fosstrak.epcis.soap.QueryParameterExceptionResponse;
76  import org.fosstrak.epcis.soap.QueryTooComplexExceptionResponse;
77  import org.fosstrak.epcis.soap.QueryTooLargeExceptionResponse;
78  import org.fosstrak.epcis.soap.SecurityExceptionResponse;
79  import org.fosstrak.epcis.soap.ValidationExceptionResponse;
80  import org.apache.commons.logging.Log;
81  import org.apache.commons.logging.LogFactory;
82  import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
83  
84  /**
85   * Implements a subscription to a query. Created upon using subscribe() on the
86   * querying interface side.
87   * 
88   * @author Alain Remund
89   * @author Arthur van Dorp
90   * @author Marco Steybe
91   */
92  public class QuerySubscription implements EpcisQueryCallbackInterface, Serializable {
93  
94      private static final long serialVersionUID = -3066828914403000033L;
95  
96      private static final Log LOG = LogFactory.getLog(QuerySubscription.class);
97  
98      // the parameters from the subscribed query
99      protected String subscriptionID;
100     protected String dest;
101     protected Calendar initialRecordTime;
102     protected Boolean reportIfEmpty;
103     protected String queryName;
104     private QueryParams queryParams;
105     private Calendar lastTimeExecuted;
106 
107     private Properties properties;
108 
109     /**
110      * Constructor to be used when recreating from storage.
111      * 
112      * @param subscriptionID
113      *            subscriptionID.
114      * @param queryParams
115      *            Query parameters.
116      * @param dest
117      *            Destination URI.
118      * @param reportIfEmpty
119      *            Whether to report when nothing changed.
120      * @param initialRecordTime
121      *            Time from when on events should be reported on first
122      *            execution.
123      * @param lastTimeExecuted
124      *            Last time the query got executed.
125      * @param queryName
126      *            queryName.
127      */
128     public QuerySubscription(final String subscriptionID, final QueryParams queryParams, final String dest,
129             final Boolean reportIfEmpty, final Calendar initialRecordTime,
130             final Calendar lastTimeExecuted, final String queryName) {
131         LOG.debug("Constructing Query Subscription with ID '" + subscriptionID + "'");
132         this.queryParams = queryParams;
133         this.subscriptionID = subscriptionID;
134         this.dest = dest;
135         this.initialRecordTime = initialRecordTime;
136         this.reportIfEmpty = reportIfEmpty;
137         this.queryName = queryName;
138         this.lastTimeExecuted = lastTimeExecuted;
139 
140         // update/add GE_recordTime restriction to query params (we only need to
141         // return results not previously returned!)
142         updateRecordTime(queryParams, initialRecordTime);
143     }
144 
145     /**
146      * Updates the subscription in the database. This is required in order to
147      * correctly re-initialize the subscriptions, especially the
148      * lastTimeExecuted field, after a context restart.
149      * <p>
150      * TODO: This is a back-end method: move this method to the
151      * QueryOperationsBackend and delegate to it (thus we would need a reference
152      * to the QueryOperationsBackend in this class).
153      * 
154      * @param lastTimeExecuted
155      *            The new lastTimeExecuted.
156      */
157     private void updateSubscription(final Calendar lastTimeExecuted) {
158         String jndiName = getProperties().getProperty("jndi.datasource.name", "java:comp/env/jdbc/EPCISDB");
159         try {
160             // open a database connection
161             Context ctx = new InitialContext();
162             DataSource db = (DataSource) ctx.lookup(jndiName);
163             Connection dbconnection = db.getConnection();
164 
165             // update the subscription in the database
166             String update = "UPDATE subscription SET lastexecuted=(?), params=(?)" + " WHERE subscriptionid=(?);";
167             PreparedStatement stmt = dbconnection.prepareStatement(update);
168             LOG.debug("SQL: " + update);
169             Timestamp ts = new Timestamp(lastTimeExecuted.getTimeInMillis());
170             String time = ts.toString();
171             stmt.setString(1, time);
172             LOG.debug("       query param 1: " + time);
173             ByteArrayOutputStream outStream = new ByteArrayOutputStream();
174             ObjectOutput out = new ObjectOutputStream(outStream);
175             out.writeObject(queryParams);
176             ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
177             stmt.setBinaryStream(2, inStream, inStream.available());
178             LOG.debug("       query param 2: [" + inStream.available() + " bytes]");
179             stmt.setString(3, subscriptionID);
180             LOG.debug("       query param 3: " + subscriptionID);
181             stmt.executeUpdate();
182             dbconnection.commit();
183 
184             // close the database connection
185             dbconnection.close();
186         } catch (SQLException e) {
187             String msg = "An SQL error occurred while updating the subscriptions in the database.";
188             LOG.error(msg, e);
189         } catch (IOException e) {
190             String msg = "Unable to update the subscription in the database: " + e.getMessage();
191             LOG.error(msg, e);
192         } catch (NamingException e) {
193             String msg = "Unable to find JNDI data source with name " + jndiName;
194             LOG.error(msg, e);
195         }
196     }
197 
198     /**
199      * Updates or adds the 'GE_recordTime' query parameter in the given query
200      * parameter array and sets its value to the given time.
201      * 
202      * @param queryParams
203      *            The (old) query parameter array.
204      * @param initialRecordTime
205      *            The time to which the 'GE_recordTime' parameter will be
206      *            updated.
207      */
208     private void updateRecordTime(final QueryParams queryParams, final Calendar initialRecordTime) {
209         // update or add GE_recordTime restriction
210         boolean foundRecordTime = false;
211         for (QueryParam p : this.queryParams.getParam()) {
212             if (p.getName().equalsIgnoreCase("GE_recordTime")) {
213                 LOG.debug("Updating query parameter 'GE_recordTime' with value '" + initialRecordTime + "'.");
214                 p.setValue(initialRecordTime.getTimeInMillis());
215                 foundRecordTime = true;
216                 break;
217             }
218         }
219         if (!foundRecordTime) {
220             LOG.debug("Adding query parameter 'GE_recordTime' with value '" + initialRecordTime + "'.");
221             QueryParam newParam = new QueryParam();
222             newParam.setName("GE_recordTime");
223             newParam.setValue(initialRecordTime);
224             this.queryParams.getParam().add(newParam);
225         }
226 
227         // update the subscription in the db
228         updateSubscription(initialRecordTime);
229     }
230 
231     /**
232      * Runs the query assigned to this subscription. Advances lastTimeExecuted.
233      */
234     public void executeQuery() {
235         if (LOG.isDebugEnabled()) {
236             LOG.debug("--------------------------------------------");
237             LOG.debug("Executing subscribed query '" + subscriptionID + "' with " + queryParams.getParam().size()
238                     + " parameters:");
239             for (QueryParam p : queryParams.getParam()) {
240                 LOG.debug(" param name:  " + p.getName());
241                 Object val = p.getValue();
242                 if (val instanceof GregorianCalendar) {
243                     LOG.debug(" param value: " + ((GregorianCalendar) val).getTime());
244                 } else {
245                     LOG.debug(" param value: " + val);
246                 }
247             }
248         }
249 
250         // poll the query
251         Poll poll = new Poll();
252         poll.setQueryName(queryName);
253         poll.setParams(queryParams);
254         QueryResults result = null;
255         try {
256             // get current time and send the query
257             GregorianCalendar cal = new GregorianCalendar();
258             result = executePoll(poll);
259             LOG.debug("Subscribed query '" + subscriptionID + "' has been executed");
260 
261             // set new lastTimeExecuted (must be <= to time when query is
262             // executed, otherwise we loose results)
263             // cal.add(Calendar.SECOND, 1);
264             this.lastTimeExecuted = cal;
265         } catch (QueryTooLargeExceptionResponse e) {
266             // send exception back to client
267             QueryTooLargeException qtle = e.getFaultInfo();
268             if (qtle == null) {
269                 qtle = new QueryTooLargeException();
270                 qtle.setQueryName(queryName);
271                 qtle.setSubscriptionID(subscriptionID);
272                 qtle.setReason(e.getMessage());
273                 LOG.info("USER ERROR: " + qtle.getReason());
274             }
275             callbackQueryTooLargeException(qtle);
276             return;
277         } catch (ImplementationExceptionResponse e) {
278             // send exception back to client
279             ImplementationException ie = e.getFaultInfo();
280             if (ie == null) {
281                 ie = new ImplementationException();
282                 ie.setQueryName(queryName);
283                 ie.setReason(e.getMessage());
284                 ie.setSubscriptionID(subscriptionID);
285                 LOG.info("USER ERROR: " + ie.getReason());
286             }
287             callbackImplementationException(ie);
288             return;
289         } catch (Exception e) {
290             String msg = "An unexpected error occurred while executing a subscribed query";
291             LOG.error(msg + ": " + e.getMessage(), e);
292             // send exception back to client
293             ImplementationException ie = new ImplementationException();
294             ie.setQueryName(queryName);
295             ie.setReason(msg);
296             ie.setSubscriptionID(subscriptionID);
297             callbackImplementationException(ie);
298             return;
299         }
300         result.setSubscriptionID(subscriptionID);
301         EventListType eventList = result.getResultsBody().getEventList();
302 
303         // check if we have an empty result list
304         boolean isEmpty = false;
305         isEmpty = (eventList == null) ? true : eventList.getObjectEventOrAggregationEventOrQuantityEvent().isEmpty();
306         if (!reportIfEmpty.booleanValue() && isEmpty) {
307             LOG.debug("Subscribed query '" + subscriptionID + "' returned no results, nothing to report.");
308             return;
309         }
310 
311         callbackResults(result);
312 
313         // update query params with new lastTimeExecuted
314         updateRecordTime(queryParams, lastTimeExecuted);
315     }
316 
317     /**
318      * Poll a query using local transport.
319      */
320     protected QueryResults executePoll(Poll poll) throws ImplementationExceptionResponse,
321             QueryTooComplexExceptionResponse, QueryTooLargeExceptionResponse, SecurityExceptionResponse,
322             ValidationExceptionResponse, NoSuchNameExceptionResponse, QueryParameterExceptionResponse {
323         // we use CXF's local transport feature here
324 //        EPCglobalEPCISService service = new EPCglobalEPCISService();
325 //        QName portName = new QName("urn:epcglobal:epcis:wsdl:1", "EPCglobalEPCISServicePortLocal");
326 //        service.addPort(portName, "http://schemas.xmlsoap.org/soap/", "local://query");
327 //        EPCISServicePortType servicePort = service.getPort(portName, EPCISServicePortType.class);
328 
329         // the same using CXF API
330          JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
331          factory.setAddress("local://query");
332          factory.setServiceClass(EPCISServicePortType.class);
333          EPCISServicePortType servicePort = (EPCISServicePortType)
334          factory.create();
335 
336         return servicePort.poll(poll);
337     }
338 
339     /**
340      * {@inheritDoc}
341      */
342     public void callbackResults(final QueryResults results) {
343         callbackObject(results);
344     }
345 
346     /**
347      * {@inheritDoc}
348      */
349     public void callbackImplementationException(ImplementationException ie) {
350         callbackObject(ie);
351     }
352 
353     /**
354      * {@inheritDoc}
355      */
356     public void callbackQueryTooLargeException(QueryTooLargeException qtle) {
357         callbackObject(qtle);
358     }
359 
360     /**
361      * Serializes and sends the given object back to the client. The Object must
362      * be an instance of QueryResults, QueryTooLargeException, or
363      * ImplementationException.
364      * 
365      * @param o
366      *            The object to be sent back to the client. An instance of
367      *            QueryResults, QueryTooLargeException, or
368      *            ImplementationException.
369      */
370     private void callbackObject(final Object o) {
371         if (LOG.isDebugEnabled()) {
372             LOG.debug("Callback " + o + " at " + new Date());
373         }
374         // create the EPCIS document
375         EPCISQueryDocumentType epcisDoc = new EPCISQueryDocumentType();
376         epcisDoc.setSchemaVersion(BigDecimal.valueOf(1.0));
377         try {
378             DatatypeFactory dataFactory = DatatypeFactory.newInstance();
379             XMLGregorianCalendar now = dataFactory.newXMLGregorianCalendar(new GregorianCalendar());
380             epcisDoc.setCreationDate(now);
381         } catch (DatatypeConfigurationException e) {
382             // oh well - don't care about setting the creation date
383         }
384         EPCISQueryBodyType epcisBody = new EPCISQueryBodyType();
385         if (o instanceof QueryResults) {
386             epcisBody.setQueryResults((QueryResults) o);
387         } else if (o instanceof QueryTooLargeException) {
388             epcisBody.setQueryTooLargeException((QueryTooLargeException) o);
389         } else if (o instanceof ImplementationException) {
390             epcisBody.setImplementationException((ImplementationException) o);
391         } else {
392             epcisBody = null;
393         }
394         epcisDoc.setEPCISBody(epcisBody);
395 
396         // serialize the response
397         String data;
398         try {
399             data = marshalQueryDoc(epcisDoc);
400         } catch (JAXBException e) {
401             String msg = "An error serializing contents occurred: " + e.getMessage();
402             LOG.error(msg, e);
403             return;
404         }
405 
406         // set up connection and send data to given destination
407         try {
408             URL serviceUrl = new URL(dest.toString());
409             if (LOG.isDebugEnabled()) {
410                 LOG.debug("Sending results of subscribed query '" + subscriptionID + "' to '" + serviceUrl + "'");
411                 if (data.length() < 10 * 1024) {
412                     LOG.debug("Sending data:\n" + data);
413                 } else {
414                     LOG.debug("Sending data: [" + data.length() + " bytes]");
415                 }
416             }
417             int responseCode;
418             try {
419                 responseCode = sendData(serviceUrl, data);
420             } catch (Exception e) {
421                 LOG.warn("Unable to send results of subscribed query '" + subscriptionID + "' to '" + serviceUrl
422                         + "', retrying in 3 sec ...");
423                 // wait 3 seconds and try again
424                 try {
425                     Thread.sleep(3000);
426                 } catch (InterruptedException e1) {
427                     // never mind
428                 }
429                 try {
430                     responseCode = sendData(serviceUrl, data);
431                 } catch (Exception e2) {
432                     LOG.warn("Unable to send results of subscribed query '" + subscriptionID + "' to '" + serviceUrl
433                             + "', retrying in 3 sec ...");
434                     // wait 3 seconds and try again
435                     try {
436                         Thread.sleep(3000);
437                     } catch (InterruptedException e1) {
438                         // never mind
439                     }
440                     responseCode = sendData(serviceUrl, data);
441                 }
442             }
443             LOG.debug("Response " + responseCode);
444         } catch (IOException e) {
445             String msg = "Unable to send results of subscribed query '" + subscriptionID + "' to '" + dest + "': "
446                     + e.getMessage();
447             LOG.error(msg, e);
448             return;
449         }
450     }
451 
452     /**
453      * Marshals the given EPCIS query document into it's XML representation.
454      * 
455      * @param epcisDoc
456      *            The EPCISQueryDocumentType to marshal.
457      * @return The marshaled EPCISQueryDocumentType XML String.
458      */
459     private String marshalQueryDoc(EPCISQueryDocumentType epcisDoc) throws JAXBException {
460         ObjectFactory objectFactory = new ObjectFactory();
461         JAXBContext context = JAXBContext.newInstance("org.fosstrak.epcis.model");
462         JAXBElement<EPCISQueryDocumentType> item = objectFactory.createEPCISQueryDocument(epcisDoc);
463         LOG.debug("Serializing " + item + " into XML");
464         StringWriter writer = new StringWriter();
465         Marshaller marshaller = context.createMarshaller();
466         marshaller.setProperty(Marshaller.JAXB_ENCODING, "UTF-8");
467         marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
468         marshaller.marshal(item, writer);
469         return writer.toString();
470     }
471 
472     /**
473      * Sends the given data String to the specified URL.
474      * 
475      * @param url
476      *            The URL to send the data to.
477      * @param data
478      *            The data to send.
479      * @return The HTTP response code.
480      * @throws IOException
481      *             If a communication error occurred.
482      */
483     private int sendData(final URL url, final String data) throws IOException {
484         HttpURLConnection connection;
485         if ("HTTPS".equalsIgnoreCase(url.getProtocol()) && trustAllCertificates()) {
486             connection = getAllTrustingConnection(url);
487         } else {
488             connection = getConnection(url);
489         }
490         connection.setRequestMethod("POST");
491         connection.setRequestProperty("content-type", "text/xml");
492         connection.setRequestProperty("content-length", "" + data.length());
493         connection.setDoOutput(true);
494         connection.setDoInput(true);
495 
496         // send data
497         Writer out = new OutputStreamWriter(connection.getOutputStream());
498         out.write(data);
499         out.flush();
500         out.close();
501 
502         // get response code
503         int responseCode = connection.getResponseCode();
504 
505         // disconnect and return
506         connection.disconnect();
507         return responseCode;
508     }
509 
510     /**
511      * Opens a connection to the given URL.
512      * <p>
513      * The URL.openConnection() method returns an instance of
514      * javax.net.ssl.HttpsURLConnection, which extends
515      * java.net.HttpURLConnection, if the HTTPS protocol is used in the URL.
516      * Thus, we support both the HTTP and HTTPS binding of the query callback
517      * interface.
518      * <p>
519      * Note: By default, accessing an HTTPS URL using the URL class results in
520      * an exception if the destination's certificate chain cannot be validated.
521      * In this case you can manually import the destination's certificate into
522      * the Java runtime's trust store, or, if you want to disable the validation
523      * of certificates for testing purposes, use
524      * {@link getAllTrustingConnection(URL)}.
525      * 
526      * @param url
527      *            The URL on which a connection will be opened.
528      * @return A HttpURLConnection connection object.
529      * @throws IOException
530      *             If an I/O error occurred.
531      */
532     private HttpURLConnection getConnection(URL url) throws IOException {
533         return (HttpURLConnection) url.openConnection();
534     }
535 
536     /**
537      * Retrieves an "all-trusting" HTTP URL connection object, by disabling the
538      * validation of certificates and overriding the default trust manager with
539      * one that trusts all certificates.
540      * 
541      * @param url
542      *            The URL on which a connection will be opened.
543      * @return A HttpURLConnection connection object.
544      * @throws IOException
545      *             If an I/O error occurred.
546      */
547     private HttpURLConnection getAllTrustingConnection(URL url) throws IOException {
548         // Create a trust manager that does not validate certificate chains
549         TrustManager[] trustAllCerts = new TrustManager[] { new X509TrustManager() {
550             public java.security.cert.X509Certificate[] getAcceptedIssuers() {
551                 return null;
552             }
553 
554             public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) {
555             }
556 
557             public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) {
558             }
559         } };
560 
561         // Install the all-trusting trust manager
562         try {
563             SSLContext sc = SSLContext.getInstance("SSL");
564             sc.init(null, trustAllCerts, new java.security.SecureRandom());
565             HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
566         } catch (Exception e) {
567             LOG.error("Unable to install the all-trusting trust manager", e);
568         }
569         return getConnection(url);
570     }
571 
572     /**
573      * @return Whether to trust a certificate whose certificate chain cannot be
574      *         validated when delivering results via Query Callback Interface.
575      */
576     private boolean trustAllCertificates() {
577         Properties properties = getProperties();
578         return Boolean.parseBoolean(properties.getProperty("trustAllCertificates", "false"));
579     }
580 
581     /**
582      * Loads the application's properties file from the class path if it has not
583      * already done so.
584      * 
585      * @return A populated Properties instance.
586      */
587     private Properties getProperties() {
588         if (properties == null) {
589             // read application properties from classpath
590             String resource = "/application.properties";
591             InputStream is = this.getClass().getResourceAsStream(resource);
592             properties = new Properties();
593             try {
594                 properties.load(is);
595                 is.close();
596             } catch (IOException e) {
597                 LOG.error("Unable to load application properties from classpath:" + resource + " ("
598                         + this.getClass().getResource(resource) + ")", e);
599             }
600         }
601         return properties;
602     }
603 
604     /**
605      * @return The initial record time.
606      */
607     public Calendar getInitialRecordTime() {
608         return initialRecordTime;
609     }
610 
611     /**
612      * @return the subscriptionID
613      */
614     public String getSubscriptionID() {
615         return subscriptionID;
616     }
617 }