View Javadoc

1   /*
2    * Copyright (C) 2007 ETH Zurich
3    *
4    * This file is part of Fosstrak (www.fosstrak.org).
5    *
6    * Fosstrak is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License version 2.1, as published by the Free Software Foundation.
9    *
10   * Fosstrak is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13   * Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public
16   * License along with Fosstrak; if not, write to the Free
17   * Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18   * Boston, MA  02110-1301  USA
19   */
20  
21  package org.fosstrak.epcis.repository.query;
22  
23  import java.io.ByteArrayInputStream;
24  import java.io.ByteArrayOutputStream;
25  import java.io.IOException;
26  import java.io.ObjectInput;
27  import java.io.ObjectInputStream;
28  import java.io.ObjectOutput;
29  import java.io.ObjectOutputStream;
30  import java.sql.Connection;
31  import java.sql.PreparedStatement;
32  import java.sql.ResultSet;
33  import java.sql.SQLException;
34  import java.sql.Statement;
35  import java.sql.Timestamp;
36  import java.util.ArrayList;
37  import java.util.Calendar;
38  import java.util.GregorianCalendar;
39  import java.util.HashMap;
40  import java.util.List;
41  import java.util.Map;
42  
43  import javax.sql.DataSource;
44  import javax.xml.bind.JAXBElement;
45  import javax.xml.datatype.DatatypeConfigurationException;
46  import javax.xml.datatype.DatatypeFactory;
47  import javax.xml.datatype.XMLGregorianCalendar;
48  import javax.xml.namespace.QName;
49  
50  import org.apache.commons.logging.Log;
51  import org.apache.commons.logging.LogFactory;
52  import org.fosstrak.epcis.model.ActionType;
53  import org.fosstrak.epcis.model.AggregationEventType;
54  import org.fosstrak.epcis.model.AttributeType;
55  import org.fosstrak.epcis.model.BusinessLocationType;
56  import org.fosstrak.epcis.model.BusinessTransactionListType;
57  import org.fosstrak.epcis.model.BusinessTransactionType;
58  import org.fosstrak.epcis.model.EPC;
59  import org.fosstrak.epcis.model.EPCISEventType;
60  import org.fosstrak.epcis.model.EPCListType;
61  import org.fosstrak.epcis.model.IDListType;
62  import org.fosstrak.epcis.model.ImplementationException;
63  import org.fosstrak.epcis.model.ImplementationExceptionSeverity;
64  import org.fosstrak.epcis.model.ObjectEventType;
65  import org.fosstrak.epcis.model.QuantityEventType;
66  import org.fosstrak.epcis.model.QueryParams;
67  import org.fosstrak.epcis.model.QueryTooLargeException;
68  import org.fosstrak.epcis.model.ReadPointType;
69  import org.fosstrak.epcis.model.SubscriptionControls;
70  import org.fosstrak.epcis.model.TransactionEventType;
71  import org.fosstrak.epcis.model.VocabularyElementListType;
72  import org.fosstrak.epcis.model.VocabularyElementType;
73  import org.fosstrak.epcis.model.VocabularyType;
74  import org.fosstrak.epcis.repository.EpcisConstants;
75  import org.fosstrak.epcis.repository.query.SimpleEventQueryDTO.EventQueryParam;
76  import org.fosstrak.epcis.repository.query.SimpleEventQueryDTO.Operation;
77  import org.fosstrak.epcis.soap.ImplementationExceptionResponse;
78  import org.fosstrak.epcis.soap.QueryTooLargeExceptionResponse;
79  
80  /**
81   * The QueryOperationsBackendSQL uses basic SQL statements (actually
82   * <code>PreparedStatement</code>s) to implement the QueryOperationsBackend
83   * interface.
84   * 
85   * @author Marco Steybe
86   */
87  public class QueryOperationsBackendSQL implements QueryOperationsBackend {
88  
89      private static final Log LOG = LogFactory.getLog(QueryOperationsBackendSQL.class);
90  
91      private static final String SQL_SELECT_FROM_AGGREGATIONEVENT = "SELECT DISTINCT event_AggregationEvent.id, eventTime, eventTimeMs, recordTime, recordTimeMs, eventTimeZoneOffset, readPoint.uri AS readPoint, bizLocation.uri AS bizLocation, bizStep.uri AS bizStep, disposition.uri AS disposition, action, parentID FROM event_AggregationEvent LEFT JOIN voc_BizStep AS bizStep ON event_AggregationEvent.bizStep=bizStep.id LEFT JOIN voc_Disposition AS disposition ON event_AggregationEvent.disposition=disposition.id LEFT JOIN voc_ReadPoint AS readPoint ON event_AggregationEvent.readPoint=readPoint.id LEFT JOIN voc_BizLoc AS bizLocation ON event_AggregationEvent.bizLocation=bizLocation.id";
92      private static final String SQL_SELECT_FROM_OBJECTEVENT = "SELECT DISTINCT event_ObjectEvent.id, eventTime, eventTimeMs, recordTime, recordTimeMs, eventTimeZoneOffset, readPoint.uri AS readPoint, bizLocation.uri AS bizLocation, bizStep.uri AS bizStep, disposition.uri AS disposition, action FROM event_ObjectEvent LEFT JOIN voc_BizStep AS bizStep ON event_ObjectEvent.bizStep=bizStep.id LEFT JOIN voc_Disposition AS disposition ON event_ObjectEvent.disposition=disposition.id LEFT JOIN voc_ReadPoint AS readPoint ON event_ObjectEvent.readPoint=readPoint.id LEFT JOIN voc_BizLoc AS bizLocation ON event_ObjectEvent.bizLocation=bizLocation.id";
93      private static final String SQL_SELECT_FROM_QUANTITYEVENT = "SELECT DISTINCT event_QuantityEvent.id, eventTime, eventTimeMs, recordTime, recordTimeMs, eventTimeZoneOffset, readPoint.uri AS readPoint, bizLocation.uri AS bizLocation, bizStep.uri AS bizStep, disposition.uri AS disposition, epcClass.uri AS epcClass, quantity FROM event_QuantityEvent LEFT JOIN voc_BizStep AS bizStep ON event_QuantityEvent.bizStep=bizStep.id LEFT JOIN voc_Disposition AS disposition ON event_QuantityEvent.disposition=disposition.id LEFT JOIN voc_ReadPoint AS readPoint ON event_QuantityEvent.readPoint=readPoint.id LEFT JOIN voc_BizLoc AS bizLocation ON event_QuantityEvent.bizLocation=bizLocation.id LEFT JOIN voc_EPCClass AS epcClass ON event_QuantityEvent.epcClass=epcClass.id";
94      private static final String SQL_SELECT_FROM_TRANSACTIONEVENT = "SELECT DISTINCT event_TransactionEvent.id, eventTime, eventTimeMs, recordTime, recordTimeMs, eventTimeZoneOffset, readPoint.uri AS readPoint, bizLocation.uri AS bizLocation, bizStep.uri AS bizStep, disposition.uri AS disposition, action, parentID FROM event_TransactionEvent LEFT JOIN voc_BizStep AS bizStep ON event_TransactionEvent.bizStep=bizStep.id LEFT JOIN voc_Disposition AS disposition ON event_TransactionEvent.disposition=disposition.id LEFT JOIN voc_ReadPoint AS readPoint ON event_TransactionEvent.readPoint=readPoint.id LEFT JOIN voc_BizLoc AS bizLocation ON event_TransactionEvent.bizLocation=bizLocation.id";
95  
96      private static final String SQL_SELECT_AGGREGATIONEVENT_EXTENSIONS = "SELECT ext.fieldname, ext.prefix, ext.intValue, ext.floatValue, ext.dateValue, ext.strValue FROM event_AggregationEvent_extensions AS ext WHERE ext.event_id=?";
97      private static final String SQL_SELECT_OBJECTEVENT_EXTENSIONS = "SELECT ext.fieldname, ext.prefix, ext.intValue, ext.floatValue, ext.dateValue, ext.strValue FROM event_ObjectEvent_extensions AS ext WHERE event_id=?";
98      private static final String SQL_SELECT_QUANTITYEVENT_EXTENSIONS = "SELECT ext.fieldname, ext.prefix, ext.intValue, ext.floatValue, ext.dateValue, ext.strValue FROM event_QuantityEvent_extensions AS ext WHERE event_id=?";
99      private static final String SQL_SELECT_TRANSACTIONEVENT_EXTENSIONS = "SELECT ext.fieldname, ext.prefix, ext.intValue, ext.floatValue, ext.dateValue, ext.strValue FROM event_TransactionEvent_extensions AS ext WHERE event_id=?";
100 
101     private static final String SQL_SELECT_AGGREGATIONEVENT_BIZTRANS = "SELECT bizTrans.uri AS bizTrans, bizTransType.uri AS bizTransType FROM event_AggregationEvent_bizTrans AS eventBizTrans JOIN BizTransaction ON eventBizTrans.bizTrans_id=BizTransaction.id JOIN voc_BizTrans AS bizTrans ON BizTransaction.bizTrans=bizTrans.id JOIN voc_BizTransType AS bizTransType ON BizTransaction.type=bizTransType.id WHERE eventBizTrans.event_id=?";
102     private static final String SQL_SELECT_OBJECTEVENT_BIZTRANS = "SELECT bizTrans.uri AS bizTrans, bizTransType.uri AS bizTransType FROM event_ObjectEvent_bizTrans AS eventBizTrans JOIN BizTransaction ON eventBizTrans.bizTrans_id=BizTransaction.id JOIN voc_BizTrans AS bizTrans ON BizTransaction.bizTrans=bizTrans.id JOIN voc_BizTransType AS bizTransType ON BizTransaction.type=bizTransType.id WHERE eventBizTrans.event_id=?";
103     private static final String SQL_SELECT_QUANTITYEVENT_BIZTRANS = "SELECT bizTrans.uri AS bizTrans, bizTransType.uri AS bizTransType FROM event_QuantityEvent_bizTrans AS eventBizTrans JOIN BizTransaction ON eventBizTrans.bizTrans_id=BizTransaction.id JOIN voc_BizTrans AS bizTrans ON BizTransaction.bizTrans=bizTrans.id JOIN voc_BizTransType AS bizTransType ON BizTransaction.type=bizTransType.id WHERE eventBizTrans.event_id=?";
104     private static final String SQL_SELECT_TRANSACTIONEVENT_BIZTRANS = "SELECT bizTrans.uri AS bizTrans, bizTransType.uri AS bizTransType FROM event_TransactionEvent_bizTrans AS eventBizTrans JOIN BizTransaction ON eventBizTrans.bizTrans_id=BizTransaction.id JOIN voc_BizTrans AS bizTrans ON BizTransaction.bizTrans=bizTrans.id JOIN voc_BizTransType AS bizTransType ON BizTransaction.type=bizTransType.id WHERE eventBizTrans.event_id=?";
105 
106     private static final String SQL_SELECT_AGGREGATIONEVENT_EPCS = "SELECT epc FROM event_AggregationEvent_EPCs WHERE event_id=?";
107     private static final String SQL_SELECT_OBJECTEVENT_EPCS = "SELECT epc FROM event_ObjectEvent_EPCs WHERE event_id=?";
108     private static final String SQL_SELECT_QUANTITYEVENT_EPCS = "SELECT epc FROM event_QuantityEvent_EPCs WHERE event_id=?";
109     private static final String SQL_SELECT_TRANSACTIONEVENT_EPCS = "SELECT epc FROM event_TransactionEvent_EPCs WHERE event_id=?";
110 
111     private static final String SQL_EXISTS_SUBSCRIPTION = "SELECT EXISTS (SELECT subscriptionid FROM subscription WHERE subscriptionid=?)";
112 
113     private static Map<String, String> attributeTablenameMap;
114     private static Map<String, String> vocabularyTablenameMap;
115     private static Map<String, String> vocabularyTypeMap;
116 
117     private static Map<Operation, String> operationMap;
118 
119     static {
120         attributeTablenameMap = new HashMap<String, String>(7);
121         attributeTablenameMap.put("bizLocation.attribute", "voc_BizLoc_attr");
122         attributeTablenameMap.put("bizStep.attribute", "voc_BizStep_attr");
123         attributeTablenameMap.put("bizTransType.attribute", "voc_BizTransType_attr");
124         attributeTablenameMap.put("bizTrans.attribute", "voc_BizTrans_attr");
125         attributeTablenameMap.put("disposition.attribute", "voc_Disposition_attr");
126         attributeTablenameMap.put("readPoint.attribute", "voc_ReadPoint_attr");
127         attributeTablenameMap.put("epcClass.attribute", "voc_EPCClass_attr");
128 
129         vocabularyTablenameMap = new HashMap<String, String>(5);
130         vocabularyTablenameMap.put(EpcisConstants.BUSINESS_STEP_ID, "voc_BizStep");
131         vocabularyTablenameMap.put(EpcisConstants.BUSINESS_LOCATION_ID, "voc_BizLoc");
132         vocabularyTablenameMap.put(EpcisConstants.BUSINESS_TRANSACTION_ID, "voc_BizTrans");
133         vocabularyTablenameMap.put(EpcisConstants.BUSINESS_TRANSACTION_TYPE_ID, "voc_BizTransType");
134         vocabularyTablenameMap.put(EpcisConstants.DISPOSITION_ID, "voc_Disposition");
135         vocabularyTablenameMap.put(EpcisConstants.EPC_CLASS_ID, "voc_EPCClass");
136         vocabularyTablenameMap.put(EpcisConstants.READ_POINT_ID, "voc_ReadPoint");
137 
138         vocabularyTypeMap = new HashMap<String, String>(7);
139         vocabularyTypeMap.put("bizLocation", "bizLocation.uri");
140         vocabularyTypeMap.put("bizStep", "bizStep.uri");
141         vocabularyTypeMap.put("bizTransType", "bizTransType.uri");
142         vocabularyTypeMap.put("bizTrans", "bizTrans.uri");
143         vocabularyTypeMap.put("disposition", "disposition.uri");
144         vocabularyTypeMap.put("readPoint", "readPoint.uri");
145         vocabularyTypeMap.put("epcClass", "epcClass.uri");
146 
147         operationMap = new HashMap<Operation, String>(9);
148         operationMap.put(Operation.EQ, "=");
149         operationMap.put(Operation.GE, ">=");
150         operationMap.put(Operation.LE, "<=");
151         operationMap.put(Operation.GT, ">");
152         operationMap.put(Operation.LT, "<");
153         operationMap.put(Operation.MATCH, "LIKE");
154         operationMap.put(Operation.WD, "LIKE");
155         operationMap.put(Operation.EQATTR, "=");
156         operationMap.put(Operation.HASATTR, "=");
157     }
158 
159     private PreparedStatement prepareSimpleEventQuery(final QueryOperationsSession session, SimpleEventQueryDTO seQuery)
160             throws SQLException, ImplementationExceptionResponse {
161 
162         StringBuilder sqlSelectFrom;
163         StringBuilder sqlWhereClause = new StringBuilder(" WHERE 1");
164         List<Object> sqlParams = new ArrayList<Object>();
165 
166         String eventType = seQuery.getEventType();
167         if (EpcisConstants.AGGREGATION_EVENT.equals(eventType)) {
168             sqlSelectFrom = new StringBuilder(SQL_SELECT_FROM_AGGREGATIONEVENT);
169         } else if (EpcisConstants.OBJECT_EVENT.equals(eventType)) {
170             sqlSelectFrom = new StringBuilder(SQL_SELECT_FROM_OBJECTEVENT);
171         } else if (EpcisConstants.QUANTITY_EVENT.equals(eventType)) {
172             sqlSelectFrom = new StringBuilder(SQL_SELECT_FROM_QUANTITYEVENT);
173         } else if (EpcisConstants.TRANSACTION_EVENT.equals(eventType)) {
174             sqlSelectFrom = new StringBuilder(SQL_SELECT_FROM_TRANSACTIONEVENT);
175         } else {
176             String msg = "Unknown event type: " + eventType;
177             LOG.error(msg);
178             ImplementationException ie = new ImplementationException();
179             ie.setReason(msg);
180             throw new ImplementationExceptionResponse(msg, ie);
181         }
182 
183         boolean joinedEpcs = false;
184         boolean joinedBizTransacitions = false;
185 
186         // construct the SQL query dynamically
187         List<EventQueryParam> eventQueryParams = seQuery.getEventQueryParams();
188         int nofEventFieldExtensions = 0;
189         for (EventQueryParam queryParam : eventQueryParams) {
190             String eventField = queryParam.getEventField();
191             Operation op = queryParam.getOp();
192             Object value = queryParam.getValue();
193 
194             // check if we need to do any JOINs
195             if ("epcList".equals(eventField) || "childEPCs".equals(eventField) || "anyEPC".equals(eventField)) {
196                 // we have a query on EPCs, so we need to join the appropriate
197                 // "_EPCs" table
198                 if (!joinedEpcs) {
199                     sqlSelectFrom.append(" JOIN event_").append(eventType).append("_EPCs AS epc");
200                     sqlSelectFrom.append(" ON event_").append(eventType).append(".id=epc.event_id");
201                     joinedEpcs = true;
202                 }
203                 // update the event field to search in
204                 eventField = "epc.epc";
205             } else if (eventField.startsWith("extension")) {
206                 // we have a query on an extension field, so we need to join the
207                 // appropriate "_extensions" table
208 
209                 /*
210                  * For every extension condition there are two EventQueryParams,
211                  * one for the name of the parameter and another one for the
212                  * value. Example: extension.intValue extension.fieldname
213                  * Therefore, the JOINs will be created once from every two
214                  * extension conditions (the odd ones)
215                  */
216                 nofEventFieldExtensions++;
217                 if (nofEventFieldExtensions % 2 == 1) {
218                     sqlSelectFrom.append(" JOIN event_").append(eventType).append("_extensions AS extension").append(
219                             (nofEventFieldExtensions / 2)+1);
220                     sqlSelectFrom.append(" ON event_").append(eventType).append(".id=extension").append(
221                             (nofEventFieldExtensions / 2)+1).append(".event_id");
222                 }
223             } else if (eventField.startsWith("bizTrans")) {
224                 // we have a query on business transactions, so we need to join
225                 // the appropriate "_bizTrans" and "bizTransList" tables
226                 if (!joinedBizTransacitions) {
227                     sqlSelectFrom.append(" JOIN event_").append(eventType).append("_bizTrans AS bizTransList");
228                     sqlSelectFrom.append(" ON event_").append(eventType).append(".id=bizTransList.event_id");
229                     sqlSelectFrom.append(" JOIN BizTransaction ON bizTransList.bizTrans_id=BizTransaction.id");
230                     sqlSelectFrom.append(" JOIN voc_BizTrans AS bizTrans ON BizTransaction.bizTrans=bizTrans.id");
231                     sqlSelectFrom.append(" JOIN voc_BizTransType AS bizTransType ON BizTransaction.type=bizTransType.id");
232                     joinedBizTransacitions = true;
233                 }
234                 if ("bizTransList.bizTrans".equals(eventField)) {
235                     eventField = "bizTrans";
236                 } else if ("bizTransList.type".equals(eventField)) {
237                     eventField = "bizTransType";
238                 }
239             } else if (eventField.endsWith(".attribute")) {
240                 String attrTable = attributeTablenameMap.get(eventField);
241                 if (attrTable != null) {
242                     String vocAlias = eventField.substring(0, eventField.indexOf("."));
243                     sqlSelectFrom.append(" JOIN ").append(attrTable);
244                     sqlSelectFrom.append(" ON ").append(attrTable).append(".id=").append(vocAlias).append(".id");
245                     eventField = attrTable + ".attribute";
246                 }
247             } else if (eventField.endsWith(".attribute.value")) {
248                 String attrTable = attributeTablenameMap.get(eventField.substring(0, eventField.length() - 6));
249                 eventField = attrTable + ".value";
250             }
251             String vocField = vocabularyTypeMap.get(eventField);
252             if (vocField != null) {
253                 eventField = vocField;
254             }
255 
256             // now check the provided event field, operation, and value and
257             // update the SQL strings accordingly
258             if (value == null && op == Operation.EXISTS) {
259                 if (eventField.startsWith("epc") || eventField.startsWith("bizTransList")) {
260                     // EXISTS-query already coped with by JOIN - nothing to do
261                 } else {
262                     // check if the given event field exists
263                     sqlWhereClause.append(" AND ?");
264                     sqlParams.add(eventField);
265                 }
266             } else if (value != null) {
267                 if (value instanceof List<?>) {
268                     // we have a multi-value query parameter, e.g. action, EPCs,
269                     // vocabulary types
270                     List<?> paramValues = (List<?>) value;
271                     if (!paramValues.isEmpty()) {
272                         if (op == Operation.MATCH || op == Operation.WD) {
273                             // this results in a SQL "LIKE" query
274                             sqlWhereClause.append(" AND (0");
275                             for (Object paramValue : paramValues) {
276                                 String strValue = (String) paramValue;
277 
278                                 // MATCH-params might be 'pure identity' EPC
279                                 // patterns
280                                 if (op == Operation.MATCH && !eventField.startsWith("epcClass")) {
281                                     if (strValue.startsWith("urn:epc:idpat:")) {
282                                         strValue = strValue.replace("urn:epc:idpat:", "urn:epc:id:");
283                                     }
284                                 }
285                                 strValue = strValue.replaceAll("\\*", "%");
286 
287                                 sqlWhereClause.append(" OR ").append(eventField).append(" LIKE ?");
288                                 sqlParams.add(strValue);
289                                 if (seQuery.isAnyEpc() && "epc.epc".equals(eventField)) {
290                                     sqlWhereClause.append(" OR parentID LIKE ?");
291                                     sqlParams.add(strValue);
292                                 }
293                             }
294                             sqlWhereClause.append(")");
295                         } else {
296                             // this results in a SQL "IN" query
297                             sqlWhereClause.append(" AND ").append(eventField).append(" IN (?");
298                             sqlParams.add(paramValues.get(0));
299                             for (int i = 1; i < paramValues.size(); i++) {
300                                 sqlWhereClause.append(",?");
301                                 sqlParams.add(paramValues.get(i));
302                             }
303                             sqlWhereClause.append(")");
304                         }
305                     }
306                 } else {
307                     // we have a single-value parameter, e.g. eventTime,
308                     // recordTime, parentID
309                     String sqlOp = operationMap.get(op);
310                     sqlWhereClause.append(" AND ").append(eventField).append(" ").append(sqlOp).append(" ?");
311                     sqlParams.add(value);
312                 }
313             }
314         }
315 
316         // construct the final SQL query string
317         StringBuilder sql = sqlSelectFrom.append(sqlWhereClause);
318         if (seQuery.getOrderBy() != null) {
319             sql.append(" ORDER BY ").append(seQuery.getOrderBy());
320             if (seQuery.getOrderDirection() != null) {
321                 sql.append(" ").append(seQuery.getOrderDirection().name());
322             }
323         }
324         if (seQuery.getLimit() != -1) {
325             sql.append(" LIMIT ").append(seQuery.getLimit());
326         } else if (seQuery.getMaxEventCount() != -1) {
327             sql.append(" LIMIT ").append(seQuery.getMaxEventCount() + 1);
328         }
329         String sqlSelect = sql.toString();
330 
331         PreparedStatement selectEventsStmt = session.getConnection().prepareStatement(sqlSelect);
332         LOG.debug("SQL: " + sqlSelect);
333         for (int i = 0; i < sqlParams.size(); i++) {
334             selectEventsStmt.setObject(i + 1, sqlParams.get(i));
335             if (LOG.isDebugEnabled()) {
336                 LOG.debug("     param" + i + " = " + sqlParams.get(i));
337             }
338         }
339         return selectEventsStmt;
340     }
341 
342     /**
343      * {@inheritDoc}
344      */
345     public void runSimpleEventQuery(final QueryOperationsSession session, final SimpleEventQueryDTO seQuery,
346             final List<Object> eventList) throws SQLException, ImplementationExceptionResponse,
347             QueryTooLargeExceptionResponse {
348         PreparedStatement selectEventsStmt = prepareSimpleEventQuery(session, seQuery);
349         ResultSet rs = selectEventsStmt.executeQuery();
350 
351         String eventType = seQuery.getEventType();
352 
353         // prepare the required remaining SQL queries
354         String selectExtensions = null;
355         String selectEpcs = null;
356         String selectBizTrans = null;
357         if (EpcisConstants.AGGREGATION_EVENT.equals(eventType)) {
358             selectExtensions = SQL_SELECT_AGGREGATIONEVENT_EXTENSIONS;
359             selectEpcs = SQL_SELECT_AGGREGATIONEVENT_EPCS;
360             selectBizTrans = SQL_SELECT_AGGREGATIONEVENT_BIZTRANS;
361         } else if (EpcisConstants.OBJECT_EVENT.equals(eventType)) {
362             selectExtensions = SQL_SELECT_OBJECTEVENT_EXTENSIONS;
363             selectEpcs = SQL_SELECT_OBJECTEVENT_EPCS;
364             selectBizTrans = SQL_SELECT_OBJECTEVENT_BIZTRANS;
365         } else if (EpcisConstants.QUANTITY_EVENT.equals(eventType)) {
366             selectExtensions = SQL_SELECT_QUANTITYEVENT_EXTENSIONS;
367             selectEpcs = SQL_SELECT_QUANTITYEVENT_EPCS;
368             selectBizTrans = SQL_SELECT_QUANTITYEVENT_BIZTRANS;
369         } else if (EpcisConstants.TRANSACTION_EVENT.equals(eventType)) {
370             selectExtensions = SQL_SELECT_TRANSACTIONEVENT_EXTENSIONS;
371             selectEpcs = SQL_SELECT_TRANSACTIONEVENT_EPCS;
372             selectBizTrans = SQL_SELECT_TRANSACTIONEVENT_BIZTRANS;
373         }
374         PreparedStatement selectExtensionsStmt = session.getPreparedStatement(selectExtensions);
375         PreparedStatement selectEpcsStmt = session.getPreparedStatement(selectEpcs);
376         PreparedStatement selectBizTransStmt = session.getPreparedStatement(selectBizTrans);
377 
378         // cycle through result set and fill an event list
379         int actEventCount = 0;
380         while (rs.next()) {
381             actEventCount++;
382             int eventId = rs.getInt(1);
383             // Timestamp eventTime = rs.getTimestamp(2);
384             long eventTimeMs = rs.getLong(3);
385             // Timestamp recordTime = rs.getTimestamp(4);
386             long recordTimeMs = rs.getLong(5);
387             String eventTimeZoneOffset = rs.getString(6);
388             String readPointId = rs.getString(7);
389             ReadPointType readPoint = null;
390             if (readPointId != null) {
391                 readPoint = new ReadPointType();
392                 readPoint.setId(readPointId);
393             }
394             String bizLocationId = rs.getString(8);
395             BusinessLocationType bizLocation = null;
396             if (bizLocationId != null) {
397                 bizLocation = new BusinessLocationType();
398                 bizLocation.setId(bizLocationId);
399             }
400             String bizStep = rs.getString(9);
401             String disposition = rs.getString(10);
402             // fetch biz transactions
403             if (LOG.isDebugEnabled()) {
404                 LOG.debug("SQL: " + selectBizTrans);
405                 LOG.debug("     param1 = " + eventId);
406             }
407             selectBizTransStmt.setInt(1, eventId);
408             BusinessTransactionListType bizTransList = readBizTransactionsFromResult(selectBizTransStmt.executeQuery());
409 
410             EPCISEventType event = null;
411             if (EpcisConstants.AGGREGATION_EVENT.equals(eventType)) {
412                 AggregationEventType aggrEvent = new AggregationEventType();
413                 aggrEvent.setReadPoint(readPoint);
414                 aggrEvent.setBizLocation(bizLocation);
415                 aggrEvent.setBizStep(bizStep);
416                 aggrEvent.setDisposition(disposition);
417                 aggrEvent.setAction(ActionType.valueOf(rs.getString(11)));
418                 aggrEvent.setParentID(rs.getString(12));
419                 aggrEvent.setBizTransactionList(bizTransList);
420                 // fetch EPCs
421                 if (LOG.isDebugEnabled()) {
422                     LOG.debug("SQL: " + selectEpcs);
423                     LOG.debug("     param1 = " + eventId);
424                 }
425                 selectEpcsStmt.setInt(1, eventId);
426                 aggrEvent.setChildEPCs(readEpcsFromResult(selectEpcsStmt.executeQuery()));
427                 // fetch and fill extensions
428                 if (LOG.isDebugEnabled()) {
429                     LOG.debug("SQL: " + selectExtensions);
430                     LOG.debug("     param1 = " + eventId);
431                 }
432                 selectExtensionsStmt.setInt(1, eventId);
433                 readExtensionsFromResult(selectExtensionsStmt.executeQuery(), aggrEvent.getAny());
434                 event = aggrEvent;
435             } else if (EpcisConstants.OBJECT_EVENT.equals(eventType)) {
436                 ObjectEventType objEvent = new ObjectEventType();
437                 objEvent.setReadPoint(readPoint);
438                 objEvent.setBizLocation(bizLocation);
439                 objEvent.setBizStep(bizStep);
440                 objEvent.setDisposition(disposition);
441                 objEvent.setAction(ActionType.valueOf(rs.getString(11)));
442                 objEvent.setBizTransactionList(bizTransList);
443                 // fetch EPCs
444                 if (LOG.isDebugEnabled()) {
445                     LOG.debug("SQL: " + selectEpcs);
446                     LOG.debug("     param1 = " + eventId);
447                 }
448                 selectEpcsStmt.setInt(1, eventId);
449                 objEvent.setEpcList(readEpcsFromResult(selectEpcsStmt.executeQuery()));
450                 // fetch and fill extensions
451                 if (LOG.isDebugEnabled()) {
452                     LOG.debug("SQL: " + selectExtensions);
453                     LOG.debug("     param1 = " + eventId);
454                 }
455                 selectExtensionsStmt.setInt(1, eventId);
456                 readExtensionsFromResult(selectExtensionsStmt.executeQuery(), objEvent.getAny());
457                 event = objEvent;
458             } else if (EpcisConstants.QUANTITY_EVENT.equals(eventType)) {
459                 QuantityEventType quantEvent = new QuantityEventType();
460                 quantEvent.setReadPoint(readPoint);
461                 quantEvent.setBizLocation(bizLocation);
462                 quantEvent.setBizStep(bizStep);
463                 quantEvent.setDisposition(disposition);
464                 quantEvent.setEpcClass(rs.getString(11));
465                 quantEvent.setQuantity(rs.getInt(12));
466                 quantEvent.setBizTransactionList(bizTransList);
467                 // fetch and fill extensions
468                 if (LOG.isDebugEnabled()) {
469                     LOG.debug("SQL: " + selectExtensions);
470                     LOG.debug("     param1 = " + eventId);
471                 }
472                 selectExtensionsStmt.setInt(1, eventId);
473                 readExtensionsFromResult(selectExtensionsStmt.executeQuery(), quantEvent.getAny());
474                 event = quantEvent;
475             } else if (EpcisConstants.TRANSACTION_EVENT.equals(eventType)) {
476                 TransactionEventType transEvent = new TransactionEventType();
477                 transEvent.setReadPoint(readPoint);
478                 transEvent.setBizLocation(bizLocation);
479                 transEvent.setBizStep(bizStep);
480                 transEvent.setDisposition(disposition);
481                 transEvent.setAction(ActionType.valueOf(rs.getString(11)));
482                 transEvent.setParentID(rs.getString(12));
483                 transEvent.setBizTransactionList(bizTransList);
484                 // fetch EPCs
485                 if (LOG.isDebugEnabled()) {
486                     LOG.debug("SQL: " + selectEpcs);
487                     LOG.debug("     param1 = " + eventId);
488                 }
489                 selectEpcsStmt.setInt(1, eventId);
490                 transEvent.setEpcList(readEpcsFromResult(selectEpcsStmt.executeQuery()));
491                 // fetch and fill extensions
492                 if (LOG.isDebugEnabled()) {
493                     LOG.debug("SQL: " + selectExtensions);
494                     LOG.debug("     param1 = " + eventId);
495                 }
496                 selectExtensionsStmt.setInt(1, eventId);
497                 readExtensionsFromResult(selectExtensionsStmt.executeQuery(), transEvent.getAny());
498                 event = transEvent;
499             } else {
500                 String msg = "Unknown event type: " + eventType;
501                 LOG.error(msg);
502                 ImplementationException ie = new ImplementationException();
503                 ie.setReason(msg);
504                 throw new ImplementationExceptionResponse(msg, ie);
505             }
506             event.setEventTime(timeToXmlCalendar(eventTimeMs));
507             event.setRecordTime(timeToXmlCalendar(recordTimeMs));
508             event.setEventTimeZoneOffset(eventTimeZoneOffset);
509             eventList.add(event);
510         }
511         int maxEventCount = seQuery.getMaxEventCount();
512         if (LOG.isDebugEnabled()) {
513             LOG.debug("Event query returned " + actEventCount + " events (maxEventCount is " + maxEventCount + ")");
514         }
515         if (maxEventCount > -1 && actEventCount > maxEventCount) {
516             // according to spec, this must result in a QueryTooLargeException
517             String msg = "The query returned more results than specified by 'maxEventCount'";
518             LOG.info("USER ERROR: " + msg);
519             QueryTooLargeException e = new QueryTooLargeException();
520             e.setReason(msg);
521             throw new QueryTooLargeExceptionResponse(msg, e);
522         }
523     }
524 
525     private PreparedStatement prepareMasterDataQuery(final QueryOperationsSession session, String vocType,
526             MasterDataQueryDTO mdQuery) throws SQLException {
527 
528         StringBuilder sqlSelectFrom = new StringBuilder("SELECT uri FROM");
529         StringBuilder sqlWhereClause = new StringBuilder(" WHERE 1");
530         List<Object> sqlParams = new ArrayList<Object>();
531 
532         // get the values from the query DTO
533         List<String> attributeNames = mdQuery.getAttributeNames();
534         Map<String, List<String>> attributeNameAndValues = mdQuery.getAttributeNameAndValues();
535         List<String> vocabularyEqNames = mdQuery.getVocabularyEqNames();
536         List<String> vocabularyWdNames = mdQuery.getVocabularyWdNames();
537 
538         boolean joinedAttribute = false;
539         String vocTablename = getVocabularyTablename(vocType);
540         sqlSelectFrom.append(" ").append(vocTablename).append(",");
541         if ("voc_Any".equals(vocTablename)) {
542             // this is not a standard vocabulary, we need to restrict by vtype
543             // in the voc_Any table
544             sqlWhereClause.append(" AND voc_Any.vtype=?");
545             sqlParams.add(vocType);
546         }
547 
548         // filter by attribute names
549         if (attributeNames != null && !attributeNames.isEmpty()) {
550             if (!joinedAttribute) {
551                 sqlSelectFrom.append(" ").append(vocTablename).append("_attr,");
552                 sqlWhereClause.append(" AND ").append(vocTablename).append(".id=");
553                 sqlWhereClause.append(vocTablename).append("_attr.id");
554             }
555 
556             sqlWhereClause.append(" AND ").append(vocTablename).append("_attr.attribute IN (?");
557             sqlParams.add(attributeNames.get(0));
558             for (int i = 1; i < attributeNames.size(); i++) {
559                 sqlWhereClause.append(",?");
560                 sqlParams.add(attributeNames.get(i));
561             }
562             sqlWhereClause.append(")");
563         }
564 
565         // filter by attribute names and values
566         if (attributeNameAndValues != null && !attributeNameAndValues.isEmpty()) {
567             if (!joinedAttribute) {
568                 sqlSelectFrom.append(" ").append(vocTablename).append("_attr,");
569                 sqlWhereClause.append(" AND ").append(vocTablename).append(".id=");
570                 sqlWhereClause.append(vocTablename).append("_attr.id");
571             }
572             for (String attrName : attributeNameAndValues.keySet()) {
573                 sqlWhereClause.append(" AND ").append(vocTablename).append("_attr.attribute=?");
574                 sqlParams.add(attrName);
575                 sqlWhereClause.append(" AND ").append(vocTablename).append("_attr.value IN (?");
576                 List<String> attrValues = attributeNameAndValues.get(attrName);
577                 sqlParams.add(attrValues.get(0));
578                 for (int i = 1; i < attrValues.size(); i++) {
579                     sqlWhereClause.append(",?");
580                     sqlParams.add(attrValues.get(i));
581                 }
582                 sqlWhereClause.append(")");
583             }
584         }
585 
586         // filter by vocabulary names
587         if (vocabularyEqNames != null && !vocabularyEqNames.isEmpty()) {
588             sqlWhereClause.append(" AND ").append(vocTablename).append(".uri IN (?");
589             sqlParams.add(vocabularyEqNames.get(0));
590             for (int i = 1; i < vocabularyEqNames.size(); i++) {
591                 sqlWhereClause.append(",?");
592                 sqlParams.add(vocabularyEqNames.get(i));
593             }
594             sqlWhereClause.append(")");
595         }
596         if (vocabularyWdNames != null && !vocabularyWdNames.isEmpty()) {
597             sqlWhereClause.append(" AND (0");
598             for (String vocWdName : vocabularyWdNames) {
599                 sqlWhereClause.append(" OR ").append(vocTablename).append(".uri LIKE ?");
600                 sqlParams.add(vocWdName + "%");
601             }
602             sqlWhereClause.append(")");
603         }
604 
605         // remove last comma
606         sqlSelectFrom.delete(sqlSelectFrom.length() - 1, sqlSelectFrom.length());
607 
608         // set the complete query and pass it back to the caller
609         String sqlSelect = sqlSelectFrom.append(sqlWhereClause).toString();
610 
611         PreparedStatement ps = session.getConnection().prepareStatement(sqlSelect);
612         LOG.debug("SQL: " + sqlSelect);
613         for (int i = 0; i < sqlParams.size(); i++) {
614             ps.setObject(i + 1, sqlParams.get(i));
615             if (LOG.isDebugEnabled()) {
616                 LOG.debug("     param" + i + " = " + sqlParams.get(i));
617             }
618         }
619         return ps;
620     }
621 
622     /**
623      * {@inheritDoc}
624      */
625     public void runMasterDataQuery(final QueryOperationsSession session, final MasterDataQueryDTO mdQuery,
626             final List<VocabularyType> vocList) throws SQLException, ImplementationExceptionResponse,
627             QueryTooLargeExceptionResponse {
628         // create and run a separate query for each vocabulary
629         List<String> vocabularyTypes = mdQuery.getVocabularyTypes();
630         for (String vocType : vocabularyTypes) {
631             PreparedStatement ps = prepareMasterDataQuery(session, vocType, mdQuery);
632             ResultSet rs = ps.executeQuery();
633 
634             int maxElementCount = mdQuery.getMaxElementCount();
635             boolean includeAttributes = mdQuery.getIncludeAttributes();
636             boolean includeChildren = mdQuery.getIncludeChildren();
637 
638             // fetch matching vocabulary element uris
639             List<String> vocElemUris = new ArrayList<String>();
640             int actVocElemCount = 0;
641             while (rs.next()) {
642                 actVocElemCount++;
643                 if (maxElementCount > -1 && actVocElemCount > maxElementCount) {
644                     // according to spec, this must result in a
645                     // QueryTooLargeException
646                     String msg = "The query returned more results than specified by 'maxElementCount'";
647                     LOG.info("USER ERROR: " + msg);
648                     QueryTooLargeException e = new QueryTooLargeException();
649                     e.setReason(msg);
650                     throw new QueryTooLargeExceptionResponse(msg, e);
651                 }
652                 vocElemUris.add(rs.getString(1));
653             }
654             rs.close();
655             if (LOG.isDebugEnabled()) {
656                 LOG.debug("Masterdata query returned " + actVocElemCount + " vocabularies (maxElementCount is "
657                         + maxElementCount + ")");
658             }
659 
660             // populate the VocabularyElementList
661             VocabularyElementListType vocElems = new VocabularyElementListType();
662             for (String vocElemUri : vocElemUris) {
663                 VocabularyElementType vocElem = new VocabularyElementType();
664                 vocElem.setId(vocElemUri);
665                 if (includeAttributes) {
666                     fetchAttributes(session, vocType, vocElemUri, mdQuery.getIncludedAttributeNames(),
667                             vocElem.getAttribute());
668                 }
669                 if (includeChildren) {
670                     IDListType children = fetchChildren(session, vocType, vocElemUri);
671                     vocElem.setChildren(children);
672                 }
673                 vocElems.getVocabularyElement().add(vocElem);
674             }
675 
676             // add the vocabulary element to the vocabulary list
677             if (!vocElems.getVocabularyElement().isEmpty()) {
678                 VocabularyType voc = new VocabularyType();
679                 voc.setType(vocType);
680                 voc.setVocabularyElementList(vocElems);
681                 vocList.add(voc);
682             }
683         }
684     }
685 
686     /**
687      * {@inheritDoc}
688      */
689     public boolean fetchExistsSubscriptionId(final QueryOperationsSession session, final String subscriptionID)
690             throws SQLException {
691         PreparedStatement stmt = session.getPreparedStatement(SQL_EXISTS_SUBSCRIPTION);
692         stmt.setString(1, subscriptionID);
693         if (LOG.isDebugEnabled()) {
694             LOG.debug("SQL: " + SQL_EXISTS_SUBSCRIPTION);
695             LOG.debug("     param1 = " + subscriptionID);
696         }
697         ResultSet rs = stmt.executeQuery();
698         rs.first();
699         return rs.getBoolean(1);
700     }
701 
702     /**
703      * @param session
704      * @param vocElemUri
705      * @param attribute
706      * @throws SQLException
707      */
708     private void fetchAttributes(final QueryOperationsSession session, final String vocType, final String vocUri,
709             final List<String> filterAttrNames, final List<AttributeType> attributes) throws SQLException {
710         String vocTablename = getVocabularyTablename(vocType);
711         StringBuilder sql = new StringBuilder();
712         List<Object> sqlParams = new ArrayList<Object>();
713         sql.append("SELECT attribute, value FROM ").append(vocTablename).append(" AS voc, ");
714         sql.append(vocTablename).append("_attr AS attr WHERE voc.id=attr.id AND voc.uri=?");
715         sqlParams.add(vocUri);
716         if ("voc_Any".equals(vocTablename)) {
717             sql.append(" AND voc.vtype=?");
718             sqlParams.add(vocType);
719         }
720         if (filterAttrNames != null && !filterAttrNames.isEmpty()) {
721             // filter by attribute names
722             sql.append(" AND attribute IN (?");
723             sqlParams.add(filterAttrNames.get(0));
724             for (int i = 1; i < filterAttrNames.size(); i++) {
725                 sql.append(",?");
726                 sqlParams.add(filterAttrNames.get(i));
727             }
728             sql.append(")");
729         }
730         PreparedStatement ps = session.getPreparedStatement(sql.toString());
731         LOG.debug("SQL: " + sql.toString());
732         for (int i = 0; i < sqlParams.size(); i++) {
733             ps.setObject(i + 1, sqlParams.get(i));
734             if (LOG.isDebugEnabled()) {
735                 LOG.debug("     param" + i + " = " + sqlParams.get(i));
736             }
737         }
738 
739         ResultSet rs = ps.executeQuery();
740         while (rs.next()) {
741             AttributeType attr = new AttributeType();
742             attr.setId(rs.getString(1));
743             
744 			//replaced by nkef of "attr.getContent().add(rs.getString(2));" with
745 			attr.getOtherAttributes().put(new QName("value"), rs.getString(2));
746 
747             attributes.add(attr);
748         }
749         rs.close();
750     }
751 
752     /**
753      * Retrieves all children URI for the given vocabulary uri in the given
754      * vocabulary table.
755      * 
756 	 *(nkef) The paragraphs below are taken from from the EPCIS Specs
757 	 * 
758 	 * "A parent identifier carries, in addition to its master data attributes, a
759 	 * list of its children identifiers."
760 	 * 
761 	 * "The term "direct or indirect descendant" is used to refer to the set of
762 	 * vocabulary elements including the children of a given vocabulary element,
763 	 * the children of those children, etc. That is, the "direct or indirect
764 	 * descendants" of a vocabulary element are the set of vocabulary elements
765 	 * obtained by taking the transitive closure of the "children" relation
766 	 * starting with the given vocabulary element."
767      * 
768 	 * "A given element MAY be the child of more than one parent. This allows for
769 	 * more than one way of grouping vocabulary elements;"
770 	 * 
771      * @param vocTableName
772      *            The name of the vocabulary table in which to look for the
773      *            children uris.
774      * @param vocUri
775      *            The vocabulary uri string for which the children should be
776      *            retrieved.
777      * @throws SQLException
778      *             If a DB access error occurred.
779      * @throws ImplementationException
780      *             If a String could not be converted into an URI.
781      */
782     private IDListType fetchChildren(final QueryOperationsSession session, final String vocType, final String vocUri)
783             throws SQLException, ImplementationExceptionResponse {
784         IDListType children = new IDListType();
785         String vocTablename = getVocabularyTablename(vocType);
786         StringBuilder sql = new StringBuilder();
787         sql.append("SELECT uri FROM ").append(vocTablename).append(" AS voc WHERE voc.uri LIKE ?");
788         PreparedStatement ps = session.getPreparedStatement(sql.toString());
789 		// (nkef) changed "_%" to ",%"
790 		String uri = vocUri + ",%";
791         if (LOG.isDebugEnabled()) {
792             LOG.debug("SQL: " + sql.toString());
793             LOG.debug("     param1 = " + uri);
794         }
795         ps.setString(1, uri);
796         ResultSet rs = ps.executeQuery();
797         while (rs.next()) {
798             children.getId().add(rs.getString("uri"));
799         }
800         return (children.getId().isEmpty()) ? null : children;
801     }
802 
803     /**
804      * Retrieves a list of business transactions (an instance of
805      * BusinessTransactionListType) from the given result set.
806      * 
807      * @param rs
808      *            The result of the SQL query.
809      * @return A List of qualified XML elements
810      * @throws SQLException
811      *             If a database access error occurred.
812      */
813     private BusinessTransactionListType readBizTransactionsFromResult(final ResultSet rs) throws SQLException,
814             ImplementationExceptionResponse {
815         BusinessTransactionListType list = new BusinessTransactionListType();
816         while (rs.next()) {
817             BusinessTransactionType btrans = new BusinessTransactionType();
818             btrans.setValue(rs.getString(1));
819             btrans.setType(rs.getString(2));
820             list.getBizTransaction().add(btrans);
821         }
822         return list.getBizTransaction().isEmpty() ? null : list;
823     }
824 
825     /**
826      * Retrieves a list of EPCs (an instance of EPCListType) from the given
827      * result set.
828      * 
829      * @param rs
830      *            The result of the SQL query.
831      * @return A List of qualified XML elements
832      * @throws SQLException
833      *             If a database access error occurred.
834      */
835     private EPCListType readEpcsFromResult(final ResultSet rs) throws SQLException {
836         EPCListType epcs = new EPCListType();
837         while (rs.next()) {
838             EPC epc = new EPC();
839             epc.setValue(rs.getString(1));
840             epcs.getEpc().add(epc);
841         }
842         return epcs.getEpc().isEmpty() ? null : epcs;
843     }
844 
845     /**
846      * Fetches the qualified XML elements representing extensions for event
847      * fields from the given result set and populates the given List.
848      * 
849      * @param rs
850      *            The result of the SQL query.
851      * @throws SQLException
852      *             If a database access error occurred.
853      */
854     private void readExtensionsFromResult(final ResultSet rs, final List<Object> extensions) throws SQLException {
855         while (rs.next()) {
856             String fieldname = rs.getString(1);
857             String[] parts = fieldname.split("#");
858             if (parts.length != 2) {
859                 throw new SQLException(
860                         "Fieldname extension has invalid format: required 'namespace#localname' but was " + fieldname);
861             }
862             String namespace = parts[0];
863             String localPart = parts[1];
864             String prefix = rs.getString(2);
865             String value = rs.getString(3);
866             if (value == null) {
867                 value = rs.getString(4);
868                 if (value == null) {
869                     value = rs.getString(5);
870                     if (value == null) {
871                         value = rs.getString(6);
872                         if (value == null) {
873                             throw new SQLException("No valid extension value found");
874                         }
875                     }
876                 }
877             }
878             JAXBElement<String> elem = new JAXBElement<String>(new QName(namespace, localPart, prefix), String.class,
879                     value);
880             extensions.add(elem);
881         }
882     }
883 
884     /**
885      * {@inheritDoc}
886      */
887     public Map<String, QuerySubscriptionScheduled> fetchSubscriptions(final QueryOperationsSession session)
888             throws SQLException, ImplementationExceptionResponse {
889         String query = "SELECT * FROM subscription";
890         LOG.debug("SQL: " + query);
891         Statement stmt = session.getConnection().createStatement();
892         QuerySubscriptionScheduled storedSubscription;
893         GregorianCalendar initrectime = new GregorianCalendar();
894 
895         ResultSet rs = stmt.executeQuery(query);
896         Map<String, QuerySubscriptionScheduled> subscribedMap = new HashMap<String, QuerySubscriptionScheduled>();
897         while (rs.next()) {
898             try {
899                 String subscrId = rs.getString("subscriptionid");
900 
901                 ObjectInput in = new ObjectInputStream(rs.getBinaryStream("params"));
902                 QueryParams params = (QueryParams) in.readObject();
903 
904                 String dest = rs.getString("dest");
905 
906                 in = new ObjectInputStream(rs.getBinaryStream("sched"));
907                 Schedule sched = (Schedule) in.readObject();
908 
909                 initrectime.setTime(rs.getTimestamp("initialrecordingtime"));
910 
911                 boolean exportifempty = rs.getBoolean("exportifempty");
912 
913                 String queryName = rs.getString("queryname");
914                 String trigger = rs.getString("trigg");
915 
916                 if (trigger == null || trigger.length() == 0) {
917                     storedSubscription = new QuerySubscriptionScheduled(subscrId, params, dest,
918                             Boolean.valueOf(exportifempty), initrectime, new GregorianCalendar(), sched, queryName);
919                 } else {
920                     storedSubscription = new QuerySubscriptionTriggered(subscrId, params, dest,
921                             Boolean.valueOf(exportifempty), initrectime, new GregorianCalendar(), queryName, trigger,
922                             sched);
923                 }
924                 subscribedMap.put(subscrId, storedSubscription);
925             } catch (SQLException e) {
926                 // sql exceptions are passed on
927                 throw e;
928             } catch (Exception e) {
929                 // all other exceptions are caught
930                 String msg = "Unable to restore subscribed queries from the database.";
931                 LOG.error(msg, e);
932                 ImplementationException iex = new ImplementationException();
933                 iex.setReason(msg);
934                 iex.setSeverity(ImplementationExceptionSeverity.ERROR);
935                 throw new ImplementationExceptionResponse(msg, iex, e);
936             }
937         }
938         return subscribedMap;
939     }
940 
941     /**
942      * {@inheritDoc}
943      */
944     public void storeSupscriptions(final QueryOperationsSession session, QueryParams queryParams, String dest,
945             String subscrId, SubscriptionControls controls, String trigger, QuerySubscriptionScheduled newSubscription,
946             String queryName, Schedule schedule) throws SQLException, ImplementationExceptionResponse {
947         String insert = "INSERT INTO subscription (subscriptionid, "
948                 + "params, dest, sched, trigg, initialrecordingtime, "
949                 + "exportifempty, queryname, lastexecuted) VALUES " + "((?), (?), (?), (?), (?), (?), (?), (?), (?))";
950         PreparedStatement stmt = session.getConnection().prepareStatement(insert);
951         LOG.debug("QUERY: " + insert);
952         try {
953             stmt.setString(1, subscrId);
954             LOG.debug("       query param 1: " + subscrId);
955 
956             ByteArrayOutputStream outStream = new ByteArrayOutputStream();
957             ObjectOutput out = new ObjectOutputStream(outStream);
958             out.writeObject(queryParams);
959             ByteArrayInputStream inStream = new ByteArrayInputStream(outStream.toByteArray());
960             stmt.setBinaryStream(2, inStream, inStream.available());
961             LOG.debug("       query param 2: [" + inStream.available() + " bytes]");
962 
963             stmt.setString(3, dest.toString());
964             LOG.debug("       query param 3: " + dest);
965 
966             outStream = new ByteArrayOutputStream();
967             out = new ObjectOutputStream(outStream);
968             out.writeObject(schedule);
969             inStream = new ByteArrayInputStream(outStream.toByteArray());
970             stmt.setBinaryStream(4, inStream, inStream.available());
971             LOG.debug("       query param 4: [" + inStream.available() + " bytes]");
972 
973             stmt.setString(5, trigger);
974             LOG.debug("       query param 5: " + trigger);
975 
976             Calendar cal = newSubscription.getInitialRecordTime();
977             Timestamp ts = new Timestamp(cal.getTimeInMillis());
978             String time = ts.toString();
979             stmt.setString(6, time);
980             LOG.debug("       query param 6: " + time);
981 
982             stmt.setBoolean(7, controls.isReportIfEmpty());
983             LOG.debug("       query param 7: " + controls.isReportIfEmpty());
984 
985             stmt.setString(8, queryName);
986             LOG.debug("       query param 8: " + queryName);
987 
988             stmt.setString(9, time);
989             LOG.debug("       query param 9: " + time);
990 
991             stmt.executeUpdate();
992             session.commit();
993         } catch (IOException e) {
994             String msg = "Unable to store the subscription to the database: " + e.getMessage();
995             LOG.error(msg);
996             ImplementationException iex = new ImplementationException();
997             iex.setReason(msg);
998             iex.setSeverity(ImplementationExceptionSeverity.ERROR);
999             throw new ImplementationExceptionResponse(msg, iex, e);
1000         }
1001     }
1002 
1003     /**
1004      * {@inheritDoc}
1005      */
1006     public void deleteSubscription(final QueryOperationsSession session, String subscrId) throws SQLException {
1007         String delete = "DELETE FROM subscription WHERE subscriptionid=?";
1008         PreparedStatement ps = session.getConnection().prepareStatement(delete);
1009         ps.setString(1, subscrId);
1010         if (LOG.isDebugEnabled()) {
1011             LOG.debug("SQL: " + delete);
1012             LOG.debug("     param1 = " + subscrId);
1013         }
1014         ps.executeUpdate();
1015         session.commit();
1016     }
1017 
1018     /**
1019      * Creates a new XMLGregorianCalendar from the given milliseconds time.
1020      * 
1021      * @param time
1022      *            The time in ms to convert.
1023      * @return The XML calendar object representing the given timestamp.
1024      * @throws ImplementationExceptionResponse
1025      *             If an error occurred when parsing the given timestamp into a
1026      *             calendar instance.
1027      */
1028     private XMLGregorianCalendar timeToXmlCalendar(long time) throws ImplementationExceptionResponse {
1029         try {
1030             DatatypeFactory factory = DatatypeFactory.newInstance();
1031             Calendar cal = GregorianCalendar.getInstance();
1032             cal.setTimeInMillis(time);
1033             return factory.newXMLGregorianCalendar((GregorianCalendar) cal);
1034         } catch (DatatypeConfigurationException e) {
1035             String msg = "Unable to instantiate an XML representation for a date/time datatype.";
1036             ImplementationException iex = new ImplementationException();
1037             iex.setReason(msg);
1038             iex.setSeverity(ImplementationExceptionSeverity.SEVERE);
1039             throw new ImplementationExceptionResponse(msg, iex, e);
1040         }
1041     }
1042 
1043     /**
1044      * {@inheritDoc}
1045      */
1046     public QueryOperationsSession openSession(final DataSource dataSource) throws SQLException {
1047         Connection connection = dataSource.getConnection();
1048         LOG.debug("Database connection for session established");
1049         return new QueryOperationsSession(connection);
1050     }
1051 
1052     protected String getVocabularyTablename(String vocTypeId) {
1053         if (vocTypeId == null || "".equals(vocTypeId)) {
1054             return null;
1055         }
1056         String tablename = vocabularyTablenameMap.get(vocTypeId);
1057         if (tablename == null) {
1058             return "voc_Any";
1059         }
1060         return tablename;
1061     }
1062 }