1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.fosstrak.epcis.repository.capture;
22
23 import java.io.BufferedReader;
24 import java.io.FileReader;
25 import java.io.IOException;
26 import java.sql.Connection;
27 import java.sql.PreparedStatement;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.sql.Statement;
31 import java.sql.Timestamp;
32 import java.util.ArrayList;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36
37 import javax.sql.DataSource;
38
39 import org.fosstrak.epcis.model.BusinessTransactionType;
40 import org.fosstrak.epcis.repository.EpcisConstants;
41 import org.fosstrak.epcis.repository.model.EventFieldExtension;
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44
45
46
47
48
49
50
51
52
53
54 public class CaptureOperationsBackendSQL implements CaptureOperationsBackend {
55
56 private static final Log LOG = LogFactory.getLog(CaptureOperationsBackendSQL.class);
57
58 private static final String SQL_INSERT_AGGREGATIONEVENT = "INSERT INTO event_AggregationEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action, parentID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
59 private static final String SQL_INSERT_OBJECTEVENT = "INSERT INTO event_ObjectEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action VALUES (?, ?, ?, ?, ?, ?, ?, ?)";
60 private static final String SQL_INSERT_QUANTITYEVENT = "INSERT INTO event_QuantityEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, epcClass, quantity) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
61 private static final String SQL_INSERT_TRANSACTIONEVENT = "INSERT INTO event_TransactionEvent (eventTime, recordTime, eventTimeZoneOffset, bizStep, disposition, readPoint, bizLocation, action, parentID) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)";
62
63 private static Map<String, String> VOCABTYPE_TABLENAME_MAP;
64
65 static {
66 VOCABTYPE_TABLENAME_MAP = new HashMap<String, String>();
67 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.DISPOSITION_ID, "voc_Disposition");
68 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.READ_POINT_ID, "voc_ReadPoint");
69 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.EPC_CLASS_ID, "voc_EPCClass");
70 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_LOCATION_ID, "voc_BizLoc");
71 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_STEP_ID, "voc_BizStep");
72 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_TRANSACTION_TYPE_ID, "voc_BizTransType");
73 VOCABTYPE_TABLENAME_MAP.put(EpcisConstants.BUSINESS_TRANSACTION_ID, "voc_BizTrans");
74 }
75
76
77
78
79 public void dbReset(final Connection dbconnection, final String dbResetScript) throws SQLException, IOException {
80 LOG.info("Running db reset script.");
81 Statement stmt = null;
82 try {
83 stmt = dbconnection.createStatement();
84 if (dbResetScript != null) {
85 BufferedReader reader = new BufferedReader(new FileReader(dbResetScript));
86 String line;
87 while ((line = reader.readLine()) != null) {
88 stmt.addBatch(line);
89 }
90 }
91 stmt.executeBatch();
92 } finally {
93 if (stmt != null) {
94 stmt.close();
95 }
96 }
97 }
98
99
100
101
102 public CaptureOperationsSession openSession(final DataSource dataSource) throws SQLException {
103 return new CaptureOperationsSession(dataSource.getConnection());
104 }
105
106
107
108
109 public Long insertObjectEvent(final CaptureOperationsSession session, final Timestamp eventTime,
110 final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
111 final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action)
112 throws SQLException {
113 return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
114 bizLocationId, action, null, null, null, EpcisConstants.OBJECT_EVENT);
115 }
116
117
118
119
120 public Long insertTransactionEvent(final CaptureOperationsSession session, final Timestamp eventTime,
121 final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
122 final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
123 final String parentId) throws SQLException {
124 return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
125 bizLocationId, action, parentId, null, null, EpcisConstants.TRANSACTION_EVENT);
126 }
127
128
129
130
131 public Long insertAggregationEvent(final CaptureOperationsSession session, final Timestamp eventTime,
132 final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
133 final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
134 final String parentId) throws SQLException {
135 return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
136 bizLocationId, action, parentId, null, null, EpcisConstants.AGGREGATION_EVENT);
137 }
138
139
140
141
142 public Long insertQuantityEvent(final CaptureOperationsSession session, final Timestamp eventTime,
143 final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
144 final Long dispositionId, final Long readPointId, final Long bizLocationId, final Long epcClassId,
145 final Long quantity) throws SQLException {
146 return insertEvent(session, eventTime, recordTime, eventTimeZoneOffset, bizStepId, dispositionId, readPointId,
147 bizLocationId, null, null, epcClassId, quantity, EpcisConstants.QUANTITY_EVENT);
148 }
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184 private Long insertEvent(final CaptureOperationsSession session, final Timestamp eventTime,
185 final Timestamp recordTime, final String eventTimeZoneOffset, final Long bizStepId,
186 final Long dispositionId, final Long readPointId, final Long bizLocationId, final String action,
187 final String parentId, final Long epcClassId, final Long quantity, final String eventName)
188 throws SQLException {
189
190 PreparedStatement ps;
191 if (eventName.equals(EpcisConstants.AGGREGATION_EVENT)) {
192 ps = session.getInsert(SQL_INSERT_AGGREGATIONEVENT);
193 } else if (eventName.equals(EpcisConstants.OBJECT_EVENT)) {
194 ps = session.getInsert(SQL_INSERT_OBJECTEVENT);
195 } else if (eventName.equals(EpcisConstants.QUANTITY_EVENT)) {
196 ps = session.getInsert(SQL_INSERT_QUANTITYEVENT);
197 } else if (eventName.equals(EpcisConstants.TRANSACTION_EVENT)) {
198 ps = session.getInsert(SQL_INSERT_TRANSACTIONEVENT);
199 } else {
200 throw new SQLException("Encountered unknown event element '" + eventName + "'.");
201 }
202
203
204
205 ps.setTimestamp(1, eventTime);
206
207 ps.setTimestamp(2, recordTime != null ? recordTime : new Timestamp(System.currentTimeMillis()));
208
209
210 ps.setString(3, eventTimeZoneOffset);
211 if (bizStepId != null) {
212 ps.setLong(4, bizStepId.longValue());
213 } else {
214 ps.setNull(4, java.sql.Types.BIGINT);
215 }
216 if (dispositionId != null) {
217 ps.setLong(5, dispositionId.longValue());
218 } else {
219 ps.setNull(5, java.sql.Types.BIGINT);
220 }
221 if (readPointId != null) {
222 ps.setLong(6, readPointId.longValue());
223 } else {
224 ps.setNull(6, java.sql.Types.BIGINT);
225 }
226 if (bizLocationId != null) {
227 ps.setLong(7, bizLocationId.longValue());
228 } else {
229 ps.setNull(7, java.sql.Types.BIGINT);
230 }
231
232
233 if (eventName.equals("QuantityEvent")) {
234 if (epcClassId != null) {
235 ps.setLong(8, epcClassId.longValue());
236 } else {
237 ps.setNull(8, java.sql.Types.BIGINT);
238 }
239 if (quantity != null) {
240 ps.setLong(9, quantity.longValue());
241 } else {
242 ps.setNull(9, java.sql.Types.BIGINT);
243 }
244 } else {
245
246 ps.setString(8, action);
247
248
249 if (eventName.equals("AggregationEvent") || eventName.equals("TransactionEvent")) {
250 ps.setString(9, parentId);
251 }
252 }
253
254 ps.executeUpdate();
255 session.commit();
256
257 return getLastAutoIncrementedId(session, "event_" + eventName);
258 }
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273 private Long getLastAutoIncrementedId(final CaptureOperationsSession session, final String tableName)
274 throws SQLException {
275 String stmt = "SELECT LAST_INSERT_ID() as id FROM " + tableName;
276 PreparedStatement ps = session.getSelect(stmt);
277 ResultSet rs = null;
278 try {
279 rs = ps.executeQuery();
280 rs.next();
281 return Long.valueOf(rs.getLong("id"));
282 } finally {
283 if (rs != null) {
284 rs.close();
285 }
286 }
287 }
288
289
290
291
292 public void insertEpcsForEvent(final CaptureOperationsSession session, final long eventId, final String eventType,
293 final List<String> epcs) throws SQLException {
294
295 String insert = "INSERT INTO event_" + eventType + "_EPCs (event_id, epc) VALUES (?, ?)";
296 PreparedStatement ps = session.getBatchInsert(insert);
297 LOG.debug("INSERT: " + insert);
298
299
300 for (String epc : epcs) {
301 if (LOG.isDebugEnabled()) {
302 LOG.debug(" insert param 1: " + eventId);
303 LOG.debug(" insert param 2: " + epc.toString());
304 }
305 ps.setLong(1, eventId);
306 ps.setString(2, epc.toString());
307 ps.addBatch();
308 }
309 }
310
311
312
313
314 public Long getVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
315 final String vocabularyElement) throws SQLException {
316 String stmt = "SELECT id FROM " + VOCABTYPE_TABLENAME_MAP.get(vocabularyType) + " WHERE uri=?";
317 PreparedStatement ps = session.getSelect(stmt);
318 ps.setString(1, vocabularyElement.toString());
319 ResultSet rs = null;
320 try {
321 rs = ps.executeQuery();
322 if (rs.next()) {
323
324 return Long.valueOf(rs.getLong("id"));
325 } else {
326 return null;
327 }
328 } finally {
329 if (rs != null) {
330 rs.close();
331 }
332 }
333 }
334
335
336
337
338 public Long insertVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
339 final String vocabularyElement) throws SQLException {
340 String tableName = VOCABTYPE_TABLENAME_MAP.get(vocabularyType);
341 String stmt = "INSERT INTO " + tableName + " (uri) VALUES (?)";
342 if (LOG.isDebugEnabled()) {
343 LOG.debug("INSERT: " + stmt);
344 LOG.debug(" insert param 1: " + vocabularyElement.toString());
345 }
346
347 PreparedStatement ps = session.getInsert(stmt);
348 ps.setString(1, vocabularyElement.toString());
349 ps.executeUpdate();
350
351
352 return getLastAutoIncrementedId(session, tableName);
353 }
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370 private Long getBusinessTransaction(final CaptureOperationsSession session, final String bizTrans,
371 final String bizTransType) throws SQLException {
372 String stmt = "select id from BizTransaction where bizTrans = (select id from voc_BizTrans where uri = ?) and type = (select id from voc_BizTransType where uri = ?);";
373 PreparedStatement ps = session.getSelect(stmt);
374 ps.setString(1, bizTrans.toString());
375 ps.setString(2, bizTransType.toString());
376
377 ResultSet rs = null;
378 try {
379 rs = ps.executeQuery();
380 if (rs.next()) {
381
382 return Long.valueOf(rs.getLong("id"));
383 } else {
384 return null;
385 }
386 } finally {
387 if (rs != null) {
388 rs.close();
389 }
390 }
391 }
392
393
394
395
396 public Long insertBusinessTransaction(final CaptureOperationsSession session, final String bizTrans,
397 final String bizTransType) throws SQLException {
398
399 final Long id = getOrInsertVocabularyElement(session, EpcisConstants.BUSINESS_TRANSACTION_ID, bizTrans);
400 final Long type = getOrInsertVocabularyElement(session, EpcisConstants.BUSINESS_TRANSACTION_TYPE_ID,
401 bizTransType);
402
403 String stmt = "INSERT INTO BizTransaction (bizTrans, type) VALUES (?, ?)";
404 if (LOG.isDebugEnabled()) {
405 LOG.debug("INSERT: " + stmt);
406 LOG.debug(" insert param 1: " + bizTrans);
407 LOG.debug(" insert param 2: " + bizTransType);
408 }
409
410 PreparedStatement ps = session.getInsert(stmt);
411 ps.setLong(1, id.longValue());
412 ps.setLong(2, type.longValue());
413 ps.executeUpdate();
414
415 return getLastAutoIncrementedId(session, "BizTransaction");
416 }
417
418 private Long getOrInsertVocabularyElement(final CaptureOperationsSession session, final String vocabularyType,
419 final String vocabularyElement) throws SQLException {
420 Long vocabularyElementId = getVocabularyElement(session, vocabularyType, vocabularyElement);
421 if (vocabularyElementId != null) {
422 return vocabularyElementId;
423 } else {
424 return insertVocabularyElement(session, vocabularyType, vocabularyElement);
425 }
426 }
427
428
429
430
431 public void insertBusinessTransactionsForEvent(final CaptureOperationsSession session, final long eventId,
432 final String eventType, final List<BusinessTransactionType> btts) throws SQLException {
433
434
435 List<Long> btIds = new ArrayList<Long>();
436 for (BusinessTransactionType btt : btts) {
437 btIds.add(getOrInsertBizTransaction(session, btt.getValue(), btt.getType()));
438 }
439
440 String insert = "INSERT INTO event_" + eventType + "_bizTrans (event_id, bizTrans_id) VALUES (?, ?)";
441 if (LOG.isDebugEnabled()) {
442 LOG.debug("INSERT: " + insert);
443 }
444 PreparedStatement ps = session.getBatchInsert(insert);
445
446
447 for (long btId : btIds) {
448 if (LOG.isDebugEnabled()) {
449 LOG.debug(" insert param 1: " + eventId);
450 LOG.debug(" insert param 2: " + btId);
451 }
452 ps.setLong(1, eventId);
453 ps.setLong(2, btId);
454 ps.addBatch();
455 }
456 }
457
458 private Long getOrInsertBizTransaction(final CaptureOperationsSession session, final String bizTrans,
459 final String bizTransType) throws SQLException {
460 Long bizTransactionId = getBusinessTransaction(session, bizTrans, bizTransType);
461 if (bizTransactionId != null) {
462 return bizTransactionId;
463 } else {
464 return insertBusinessTransaction(session, bizTrans, bizTransType);
465 }
466 }
467
468
469
470
471 public void insertExtensionFieldsForEvent(final CaptureOperationsSession session, final long eventId,
472 final String eventType, final List<EventFieldExtension> exts) throws SQLException {
473 for (EventFieldExtension ext : exts) {
474 String insert = "INSERT INTO event_" + eventType + "_extensions " + "(event_id, fieldname, prefix, "
475 + ext.getValueColumnName() + ") VALUES (?, ? ,?, ?)";
476 PreparedStatement ps = session.getBatchInsert(insert);
477 if (LOG.isDebugEnabled()) {
478 LOG.debug("INSERT: " + insert);
479 LOG.debug(" insert param 1: " + eventId);
480 LOG.debug(" insert param 2: " + ext.getFieldname());
481 LOG.debug(" insert param 3: " + ext.getPrefix());
482 LOG.debug(" insert param 4: " + ext.getStrValue());
483 }
484 ps.setLong(1, eventId);
485 ps.setString(2, ext.getFieldname());
486 ps.setString(3, ext.getPrefix());
487 if (ext.getIntValue() != null) {
488 ps.setInt(4, ext.getIntValue().intValue());
489 } else if (ext.getFloatValue() != null) {
490 ps.setFloat(4, ext.getFloatValue().floatValue());
491 } else if (ext.getDateValue() != null) {
492 ps.setTimestamp(4, ext.getDateValue());
493 } else {
494 ps.setString(4, ext.getStrValue());
495 }
496 ps.addBatch();
497 }
498 }
499 }