View Javadoc

1   /*
2    * Copyright (C) 2007 ETH Zurich
3    *
4    * This file is part of Fosstrak (www.fosstrak.org).
5    *
6    * Fosstrak is free software; you can redistribute it and/or
7    * modify it under the terms of the GNU Lesser General Public
8    * License version 2.1, as published by the Free Software Foundation.
9    *
10   * Fosstrak is distributed in the hope that it will be useful,
11   * but WITHOUT ANY WARRANTY; without even the implied warranty of
12   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13   * Lesser General Public License for more details.
14   *
15   * You should have received a copy of the GNU Lesser General Public
16   * License along with Fosstrak; if not, write to the Free
17   * Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
18   * Boston, MA  02110-1301  USA
19   */
20  
21  package org.fosstrak.epcis.repository.capture;
22  
23  import java.io.BufferedReader;
24  import java.io.FileReader;
25  import java.io.IOException;
26  import java.sql.Connection;
27  import java.sql.PreparedStatement;
28  import java.sql.ResultSet;
29  import java.sql.SQLException;
30  import java.sql.Statement;
31  import java.sql.Timestamp;
32  import java.util.ArrayList;
33  import java.util.HashMap;
34  import java.util.List;
35  import java.util.Map;
36  
37  import javax.sql.DataSource;
38  
39  import org.fosstrak.epcis.model.BusinessTransactionType;
40  import org.fosstrak.epcis.repository.EpcisConstants;
41  import org.fosstrak.epcis.repository.model.EventFieldExtension;
42  import org.apache.commons.logging.Log;
43  import org.apache.commons.logging.LogFactory;
44  
45  /**
46   * The CaptureOperationsBackendSQL uses basic SQL statements (actually
47   * <code>PreparedStatement</code>s) to implement the CaptureOperationsBackend
48   * interface.
49   * 
50   * @author Alain Remund
51   * @author Marco Steybe
52   * @author Sean Wellington
53   */
54  public class CaptureOperationsBackendSQL implements CaptureOperationsBackend {
55  
56      private static final Log LOG = LogFactory.getLog(CaptureOperationsBackendSQL.class);
57  
58      private static final String SQL_INSERT_AGGREGATIONEVENT = "INSERT INTO event_AggregationEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action, parentID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
59      private static final String SQL_INSERT_OBJECTEVENT = "INSERT INTO event_ObjectEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
60      private static final String SQL_INSERT_QUANTITYEVENT = "INSERT INTO event_QuantityEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, epcClass, quantity) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
61      private static final String SQL_INSERT_TRANSACTIONEVENT = "INSERT INTO event_TransactionEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action, parentID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
62  
63      private static Map<String, String> VOCABTYPE_TABLENAME_MAP;
64  
65      static {
66          VOCABTYPE_TABLENAME_MAP = new HashMap<String, String>();
67          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.DISPOSITION_ID, "voc_Disposition");
68          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.READ_POINT_ID, "voc_ReadPoint");
69          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.EPC_CLASS_ID, "voc_EPCClass");
70          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_LOCATION_ID, "voc_BizLoc");
71          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_STEP_ID, "voc_BizStep");
72          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_TRANSACTION_TYPE_ID, "voc_BizTransType");
73          VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_TRANSACTION_ID, "voc_BizTrans");
74      }
75  
76      /**
77       * {@inheritDoc}
78       */
79      public void dbReset(final Connection dbconnection, final String dbResetScript) throws SQLException, IOException {
80          LOG.info("Running db reset script.");
81          Statement stmt = null;
82          try {
83              stmt = dbconnection.createStatement();
84              if (dbResetScript != null) {
85                  BufferedReader reader = new BufferedReader(new FileReader(dbResetScript));
86                  String line;
87                  while ((line = reader.readLine()) != null) {
88                      stmt.addBatch(line);
89                  }
90              }
91              stmt.executeBatch();
92          } finally {
93              if (stmt != null) {
94                  stmt.close();
95              }
96          }
97      }
98  
99      /**
100      * {@inheritDoc}
101      */
102     public CaptureOperationsSession openSession(final DataSource dataSource) throws SQLException {
103         return new CaptureOperationsSession(dataSource.getConnection());
104     }
105 
106     /**
107      * {@inheritDoc}
108      */
109     public Long insertObjectEvent(final CaptureOperationsSession session, final Timestamp eventTime,
110             final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
111             final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action)
112             throws SQLException {
113         return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
114                 bizLocationId, action, null, null, null, EpcisConstants.OBJECT_EVENT);
115     }
116 
117     /**
118      * {@inheritDoc}
119      */
120     public Long insertTransactionEvent(final CaptureOperationsSession session, final Timestamp eventTime,
121             final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
122             final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
123             final String parentId) throws SQLException {
124         return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
125                 bizLocationId, action, parentId, null, null, EpcisConstants.TRANSACTION_EVENT);
126     }
127 
128     /**
129      * {@inheritDoc}
130      */
131     public Long insertAggregationEvent(final CaptureOperationsSession session, final Timestamp eventTime,
132             final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
133             final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
134             final String parentId) throws SQLException {
135         return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
136                 bizLocationId, action, parentId, null, null, EpcisConstants.AGGREGATION_EVENT);
137     }
138 
139     /**
140      * {@inheritDoc}
141      */
142     public Long insertQuantityEvent(final CaptureOperationsSession session, final Timestamp eventTime,
143             final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
144             final Long dispositionId, final Long readPointId, final Long bizLocationId, final Long epcClassId,
145             final Long quantity) throws SQLException {
146         return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
147                 bizLocationId, null, null, epcClassId, quantity, EpcisConstants.QUANTITY_EVENT);
148     }
149 
150     /**
151      * Inserts a new EPCIS event into the database by supplying a
152      * PreparedStatement with the given parameters.
153      * 
154      * @param session
155      *            The database session.
156      * @param eventTime
157      *            The event's 'eventTime' parameter.
158      * @param recordTime
159      *            The event's 'recordTime' parameter.
160      * @param eventTimeZoneOffset
161      *            The event's 'eventTimeZoneOffset' parameter.
162      * @param bizStepId
163      *            The event's 'BusinessStepID' parameter.
164      * @param dispositionId
165      *            The event's 'DispositionID' parameter.
166      * @param readPointId
167      *            The event's 'ReadPointID' parameter.
168      * @param bizLocationId
169      *            The event's 'BusinessLocationID' parameter.
170      * @param action
171      *            The event's 'action' parameter.
172      * @param parentId
173      *            The event's 'ParentID' parameter.
174      * @param epcClassId
175      *            The event's 'EpcClassID' parameter.
176      * @param quantity
177      *            The event's 'quantity' parameter.
178      * @param eventName
179      *            The name of the event.
180      * @return The database primary key of the inserted EPCIS event.
181      * @throws SQLException
182      *             If an SQL exception occurred.
183      */
184     private Long insertEvent(final CaptureOperationsSession session, final Timestamp eventTime,
185             final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
186             final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
187             final String parentId, final Long epcClassId, final Long quantity, final String eventName)
188             throws SQLException {
189 
190         PreparedStatement ps;
191         if (eventName.equals(EpcisConstants.AGGREGATION_EVENT)) {
192             ps = session.getInsert(SQL_INSERT_AGGREGATIONEVENT);
193         } else if (eventName.equals(EpcisConstants.OBJECT_EVENT)) {
194             ps = session.getInsert(SQL_INSERT_OBJECTEVENT);
195         } else if (eventName.equals(EpcisConstants.QUANTITY_EVENT)) {
196             ps = session.getInsert(SQL_INSERT_QUANTITYEVENT);
197         } else if (eventName.equals(EpcisConstants.TRANSACTION_EVENT)) {
198             ps = session.getInsert(SQL_INSERT_TRANSACTIONEVENT);
199         } else {
200             throw new SQLException("Encountered unknown event element '" + eventName + "'.");
201         }
202 
203         // parameters 1-7 of the sql query are shared by all events
204 
205         ps.setTimestamp(1, eventTime);
206         // according to the specification: recordTime is the time of capture
207         ps.setTimestamp(2, recordTime != null ? recordTime : new Timestamp(System.currentTimeMillis()));
208         // note: for testing it is handy to set recordTime=eventTime
209         // ps.setTimestamp(2, eventTime);
210         ps.setString(3, eventTimeZoneOffset);
211         if (bizStepId != null) {
212             ps.setLong(4, bizStepId.longValue());
213         } else {
214             ps.setNull(4, java.sql.Types.BIGINT);
215         }
216         if (dispositionId != null) {
217             ps.setLong(5, dispositionId.longValue());
218         } else {
219             ps.setNull(5, java.sql.Types.BIGINT);
220         }
221         if (readPointId != null) {
222             ps.setLong(6, readPointId.longValue());
223         } else {
224             ps.setNull(6, java.sql.Types.BIGINT);
225         }
226         if (bizLocationId != null) {
227             ps.setLong(7, bizLocationId.longValue());
228         } else {
229             ps.setNull(7, java.sql.Types.BIGINT);
230         }
231 
232         // special handling for QuantityEvent
233         if (eventName.equals("QuantityEvent")) {
234             if (epcClassId != null) {
235                 ps.setLong(8, epcClassId.longValue());
236             } else {
237                 ps.setNull(8, java.sql.Types.BIGINT);
238             }
239             if (quantity != null) {
240                 ps.setLong(9, quantity.longValue());
241             } else {
242                 ps.setNull(9, java.sql.Types.BIGINT);
243             }
244         } else {
245             // all other events have action
246             ps.setString(8, action);
247 
248             // AggregationEvent and TransactionEvent have a parentID field
249             if (eventName.equals("AggregationEvent") || eventName.equals("TransactionEvent")) {
250                 ps.setString(9, parentId);
251             }
252         }
253 
254         ps.executeUpdate();
255         session.commit();
256 
257         return getLastAutoIncrementedId(session, "event_" + eventName);
258     }
259 
260     /**
261      * Retrieves the last inserted ID chosen by the autoIncrement functionality
262      * in the table with the given name.
263      * 
264      * @param session
265      *            The database session.
266      * @param tableName
267      *            The name of the table for which the last inserted ID should be
268      *            retrieved.
269      * @return The last auto incremented ID.
270      * @throws SQLException
271      *             If an SQL problem with the database occurred.
272      */
273     private Long getLastAutoIncrementedId(final CaptureOperationsSession session, final String tableName)
274             throws SQLException {
275         String stmt = "SELECT LAST_INSERT_ID() as id FROM " + tableName;
276         PreparedStatement ps = session.getSelect(stmt);
277         ResultSet rs = null;
278         try {
279             rs = ps.executeQuery();
280             rs.next();
281             return Long.valueOf(rs.getLong("id"));
282         } finally {
283             if (rs != null) {
284                 rs.close();
285             }
286         }
287     }
288 
289     /**
290      * {@inheritDoc}
291      */
292     public void insertEpcsForEvent(final CaptureOperationsSession session, final long eventId, final String eventType,
293             final List<String> epcs) throws SQLException {
294         // preparing statement for insertion of associated EPCs
295         String insert = "INSERT INTO event_" + eventType + "_EPCs (event_id, epc) VALUES (?, ?)";
296         PreparedStatement ps = session.getBatchInsert(insert);
297         LOG.debug("INSERT: " + insert);
298 
299         // insert all EPCs in the EPCs array
300         for (String epc : epcs) {
301             if (LOG.isDebugEnabled()) {
302                 LOG.debug("       insert param 1: " + eventId);
303                 LOG.debug("       insert param 2: " + epc.toString());
304             }
305             ps.setLong(1, eventId);
306             ps.setString(2, epc.toString());
307             ps.addBatch();
308         }
309     }
310 
311     /**
312      * {@inheritDoc}
313      */
314     public Long getVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
315             final String vocabularyElement) throws SQLException {
316         String stmt = "SELECT id FROM " + VOCABTYPE_TABLENAME_MAP.get(vocabularyType) + " WHERE uri=?";
317         PreparedStatement ps = session.getSelect(stmt);
318         ps.setString(1, vocabularyElement.toString());
319         ResultSet rs = null;
320         try {
321             rs = ps.executeQuery();
322             if (rs.next()) {
323                 // the uri already exists
324                 return Long.valueOf(rs.getLong("id"));
325             } else {
326                 return null;
327             }
328         } finally {
329             if (rs != null) {
330                 rs.close();
331             }
332         }
333     }
334 
335     /**
336      * {@inheritDoc}
337      */
338     public Long insertVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
339             final String vocabularyElement) throws SQLException {
340         String tableName = VOCABTYPE_TABLENAME_MAP.get(vocabularyType);
341         String stmt = "INSERT INTO " + tableName + " (uri) VALUES (?)";
342         if (LOG.isDebugEnabled()) {
343             LOG.debug("INSERT: " + stmt);
344             LOG.debug("       insert param 1: " + vocabularyElement.toString());
345         }
346 
347         PreparedStatement ps = session.getInsert(stmt);
348         ps.setString(1, vocabularyElement.toString());
349         ps.executeUpdate();
350 
351         // get last auto_increment value and return it
352         return getLastAutoIncrementedId(session, tableName);
353     }
354 
355     /**
356      * Retrieves the business transaction with the given type and the given URI
357      * from the database.
358      * 
359      * @param session
360      *            The database session.
361      * @param bizTrans
362      *            The business transaction URI to insert.
363      * @param bizTransType
364      *            The type of the business transaction to insert.
365      * @return The ID (primary key) of the matching business transaction, or
366      *         <code>null</code> if none was found.
367      * @throws SQLException
368      *             If an SQL error occurred.
369      */
370     private Long getBusinessTransaction(final CaptureOperationsSession session, final String bizTrans,
371             final String bizTransType) throws SQLException {
372         String stmt = "select id from BizTransaction where bizTrans = (select id from voc_BizTrans where uri = ?) and type = (select id from voc_BizTransType where uri = ?);";
373         PreparedStatement ps = session.getSelect(stmt);
374         ps.setString(1, bizTrans.toString());
375         ps.setString(2, bizTransType.toString());
376 
377         ResultSet rs = null;
378         try {
379             rs = ps.executeQuery();
380             if (rs.next()) {
381                 // the BusinessTransaction already exists
382                 return Long.valueOf(rs.getLong("id"));
383             } else {
384                 return null;
385             }
386         } finally {
387             if (rs != null) {
388                 rs.close();
389             }
390         }
391     }
392 
393     /**
394      * {@inheritDoc}
395      */
396     public Long insertBusinessTransaction(final CaptureOperationsSession session, final String bizTrans,
397             final String bizTransType) throws SQLException {
398 
399         final Long id = getOrInsertVocabularyElement(session, EpcisConstants.BUSINESS_TRANSACTION_ID, bizTrans);
400         final Long type = getOrInsertVocabularyElement(session, EpcisConstants.BUSINESS_TRANSACTION_TYPE_ID,
401                 bizTransType);
402 
403         String stmt = "INSERT INTO BizTransaction (bizTrans, type) VALUES (?, ?)";
404         if (LOG.isDebugEnabled()) {
405             LOG.debug("INSERT: " + stmt);
406             LOG.debug("       insert param 1: " + bizTrans);
407             LOG.debug("       insert param 2: " + bizTransType);
408         }
409 
410         PreparedStatement ps = session.getInsert(stmt);
411         ps.setLong(1, id.longValue());
412         ps.setLong(2, type.longValue());
413         ps.executeUpdate();
414 
415         return getLastAutoIncrementedId(session, "BizTransaction");
416     }
417 
418     private Long getOrInsertVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
419             final String vocabularyElement) throws SQLException {
420         Long vocabularyElementId = getVocabularyElement(session, vocabularyType, vocabularyElement);
421         if (vocabularyElementId != null) {
422             return vocabularyElementId;
423         } else {
424             return insertVocabularyElement(session, vocabularyType, vocabularyElement);
425         }
426     }
427 
428     /**
429      * {@inheritDoc}
430      */
431     public void insertBusinessTransactionsForEvent(final CaptureOperationsSession session, final long eventId,
432             final String eventType, final List<BusinessTransactionType> btts) throws SQLException {
433         // preparing statement for insertion of associated EPCs
434 
435         List<Long> btIds = new ArrayList<Long>();
436         for (BusinessTransactionType btt : btts) {
437             btIds.add(getOrInsertBizTransaction(session, btt.getValue(), btt.getType()));
438         }
439 
440         String insert = "INSERT INTO event_" + eventType + "_bizTrans (event_id, bizTrans_id) VALUES (?, ?)";
441         if (LOG.isDebugEnabled()) {
442             LOG.debug("INSERT: " + insert);
443         }
444         PreparedStatement ps = session.getBatchInsert(insert);
445         // insert all BizTransactions into the BusinessTransaction-Table
446         // and connect it with the "event_<event-name>_bizTrans"-Table
447         for (long btId : btIds) {
448             if (LOG.isDebugEnabled()) {
449                 LOG.debug("       insert param 1: " + eventId);
450                 LOG.debug("       insert param 2: " + btId);
451             }
452             ps.setLong(1, eventId);
453             ps.setLong(2, btId);
454             ps.addBatch();
455         }
456     }
457 
458     private Long getOrInsertBizTransaction(final CaptureOperationsSession session, final String bizTrans,
459             final String bizTransType) throws SQLException {
460         Long bizTransactionId = getBusinessTransaction(session, bizTrans, bizTransType);
461         if (bizTransactionId != null) {
462             return bizTransactionId;
463         } else {
464             return insertBusinessTransaction(session, bizTrans, bizTransType);
465         }
466     }
467 
468     /**
469      * {@inheritDoc}
470      */
471     public void insertExtensionFieldsForEvent(final CaptureOperationsSession session, final long eventId,
472             final String eventType, final List<EventFieldExtension> exts) throws SQLException {
473         for (EventFieldExtension ext : exts) {
474             String insert = "INSERT INTO event_" + eventType + "_extensions " + "(event_id, fieldname, prefix, "
475                     + ext.getValueColumnName() + ") VALUES (?, ? ,?, ?)";
476             PreparedStatement ps = session.getBatchInsert(insert);
477             if (LOG.isDebugEnabled()) {
478                 LOG.debug("INSERT: " + insert);
479                 LOG.debug("       insert param 1: " + eventId);
480                 LOG.debug("       insert param 2: " + ext.getFieldname());
481                 LOG.debug("       insert param 3: " + ext.getPrefix());
482                 LOG.debug("       insert param 4: " + ext.getStrValue());
483             }
484             ps.setLong(1, eventId);
485             ps.setString(2, ext.getFieldname());
486             ps.setString(3, ext.getPrefix());
487             if (ext.getIntValue() != null) {
488                 ps.setInt(4, ext.getIntValue().intValue());
489             } else if (ext.getFloatValue() != null) {
490                 ps.setFloat(4, ext.getFloatValue().floatValue());
491             } else if (ext.getDateValue() != null) {
492                 ps.setTimestamp(4, ext.getDateValue());
493             } else {
494                 ps.setString(4, ext.getStrValue());
495             }
496             ps.addBatch();
497         }
498     }
499 }