1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.fosstrak.llrp.client.repository.sql.roaccess;
23
24 import java.math.BigInteger;
25 import java.sql.Connection;
26 import java.sql.DatabaseMetaData;
27 import java.sql.PreparedStatement;
28 import java.sql.ResultSet;
29 import java.sql.SQLException;
30 import java.sql.Statement;
31 import java.sql.Timestamp;
32 import java.sql.Types;
33 import java.util.LinkedList;
34 import java.util.List;
35
36 import org.apache.log4j.Logger;
37 import org.fosstrak.llrp.adaptor.exception.LLRPRuntimeException;
38 import org.fosstrak.llrp.client.ROAccessReportsRepository;
39 import org.fosstrak.llrp.client.Repository;
40 import org.fosstrak.llrp.client.RepositoryFactory;
41 import org.fosstrak.llrp.client.repository.sql.DerbyRepository;
42 import org.llrp.ltk.generated.messages.RO_ACCESS_REPORT;
43 import org.llrp.ltk.types.LLRPMessage;
44 import org.llrp.ltk.types.UnsignedLong;
45
46
47
48
49
50
51
52
53
54
55 public abstract class AbstractSQLROAccessReportsRepository implements ROAccessReportsRepository {
56
57
58 private Connection conn = null;
59
60
61 private static Logger log = Logger.getLogger(AbstractSQLROAccessReportsRepository.class);
62
63
64 private boolean tableOk = false;
65
66
67 private boolean reportErrorFirstTime = true;
68
69
70 public static final String TABLE_RO_ACCESS_REPORTS
71 = "table_ro_access_reports";
72
73
74
75 public static final int CINDEX_LOGTIME = 1;
76
77
78 public static final int CINDEX_ADAPTER = 2;
79
80
81 public static final int CINDEX_READER = 3;
82
83
84 public static final int CINDEX_EPC = 4;
85
86
87 public static final int CINDEX_ROSpecID = 5;
88
89
90 public static final int CINDEX_SpecIndex = 6;
91
92
93 public static final int CINDEX_InventoryParameterSpecID = 7;
94
95
96 public static final int CINDEX_AntennaID = 8;
97
98
99 public static final int CINDEX_PeakRSSI = 9;
100
101
102 public static final int CINDEX_ChannelIndex = 10;
103
104
105 public static final int CINDEX_FirstSeenTimestampUTC = 11;
106
107
108 public static final int CINDEX_FirstSeenTimestampUptime = 12;
109
110
111 public static final int CINDEX_LastSeenTimestampUTC = 13;
112
113
114 public static final int CINDEX_LastSeenTimestampUptime = 14;
115
116
117 public static final int CINDEX_TagSeenCount = 15;
118
119
120 public static final int CINDEX_C1G2_CRC = 16;
121
122
123 public static final int CINDEX_C1G2_PC = 17;
124
125
126 public static final int CINDEX_AccessSpecID = 18;
127
128
129 public static final int NUM_COLUMNS = 18;
130
131
132 protected Repository repository;
133
134
135 protected boolean wipe = false;
136
137
138
139
140
141 public AbstractSQLROAccessReportsRepository() {
142
143 }
144
145 public void setRepository(Repository repository) {
146 this.repository = repository;
147 }
148
149
150
151
152 protected abstract String sqlCreateTable();
153
154
155
156
157 protected abstract String sqlDropTable();
158
159
160
161
162 protected abstract String sqlInsert();
163
164
165 protected boolean initialized = false;
166
167 public void initialize(Repository repository)
168 throws LLRPRuntimeException {
169
170 if (initialized) return;
171
172 wipe = Boolean.parseBoolean(repository.getArgs().get(
173 RepositoryFactory.ARG_WIPE_RO_ACCESS_REPORTS_DB));
174 this.repository = repository;
175
176 conn = repository.getDBConnection();
177
178 try {
179 init();
180 } catch (Exception e) {
181 log.error("could not connect to database or database is corrupt: " +
182 e.getMessage());
183 conn = null;
184 }
185 initialized = true;
186 }
187
188
189
190
191
192
193 private void init() throws Exception {
194 boolean recreate = false;
195 if (!checkIfTableOk()) {
196 recreate = true;
197 log.error("table for RO_ACCESS_REPORT not ok, (re)create it.");
198 }
199
200 if (recreate || wipe) {
201 dropTable();
202 createTable();
203 }
204
205 tableOk = true;
206 }
207
208
209
210
211 protected boolean dropTable() {
212 try {
213 Statement drop = conn.createStatement();
214 drop.execute(sqlDropTable());
215
216 log.info(String.format("Removed table '%s'",
217 TABLE_RO_ACCESS_REPORTS));
218 } catch (Exception e) {
219 log.error(String.format("Could not remove table '%s': %s",
220 TABLE_RO_ACCESS_REPORTS, e.getMessage()));
221 return false;
222 }
223 return true;
224 }
225
226
227
228
229 protected boolean createTable() {
230 try {
231 String sqlCreate = sqlCreateTable();
232 log.debug(String.format("creating table with SQL %s", sqlCreate));
233 Statement create = conn.createStatement();
234 create.execute(sqlCreate);
235 create.close();
236
237 log.info(String.format("Created table '%s'",
238 TABLE_RO_ACCESS_REPORTS));
239 } catch (Exception e) {
240 log.info(String.format("Could not create table '%s': %s",
241 TABLE_RO_ACCESS_REPORTS, e.getMessage()));
242 return false;
243 }
244 return true;
245 }
246
247
248
249
250
251 protected boolean checkIfTableOk() {
252
253 try {
254 DatabaseMetaData dbMeta = conn.getMetaData();
255 ResultSet resultSet = dbMeta.getColumns(
256 null, null, TABLE_RO_ACCESS_REPORTS, null);
257 int n = 0;
258 while (resultSet.next()) {
259 n++;
260 }
261 final int len = NUM_COLUMNS;
262 if (n < len) {
263 throw new SQLException(
264 String.format("missing fields. %d instead of %d.",
265 n, len));
266 }
267
268 } catch (SQLException e) {
269 log.error("table erroneous or missing.");
270 return false;
271 }
272 return true;
273 }
274
275
276
277
278 public boolean isTableOK() {
279 return tableOk;
280 }
281
282 public void handle(String adapterName, String readerName,
283 LLRPMessage message) {
284
285 if ((null == conn) && reportErrorFirstTime) {
286 log.error("connection to the repository could not be established.");
287
288 return;
289 }
290
291 if (message instanceof RO_ACCESS_REPORT) {
292 handleROAccessReport(adapterName,
293 readerName, (RO_ACCESS_REPORT) message);
294 }
295 }
296
297
298
299
300
301
302
303 protected void handleROAccessReport(
304 String adapterName,
305 String readerName,
306 RO_ACCESS_REPORT message) {
307
308 log.debug("logging RO_ACCESS_REPORT to database.");
309
310 List<ROAccessItem> items = ROAccessItem.parse(
311 message, adapterName, readerName, System.currentTimeMillis());
312 int successfullyHandled = 0;
313 for (ROAccessItem item : items) {
314 try {
315
316 String sqlInsert = sqlInsert();
317
318 PreparedStatement insert = conn.prepareStatement(sqlInsert);
319
320
321 insert.setTimestamp(CINDEX_LOGTIME, item.getLogTime());
322
323
324 insert.setString(CINDEX_ADAPTER, item.getAdapterName());
325
326
327 insert.setString(CINDEX_READER, item.getReaderName());
328
329
330 if (null != item.getEpc()) {
331 insert.setString(CINDEX_EPC, item.getEpc());
332 } else {
333 insert.setNull(CINDEX_EPC, Types.VARCHAR);
334 }
335
336
337 if (null != item.getRoSpecID()) {
338 insert.setLong(CINDEX_ROSpecID, item.getRoSpecID());
339 } else {
340 insert.setNull(CINDEX_ROSpecID, Types.BIGINT);
341 }
342
343
344 if (null != item.getSpecIndex()) {
345 insert.setInt(CINDEX_SpecIndex, item.getSpecIndex());
346 } else {
347 insert.setNull(CINDEX_SpecIndex, Types.INTEGER);
348 }
349
350
351 if (null != item.getInventoryPrmSpecID()) {
352 insert.setInt(CINDEX_InventoryParameterSpecID,
353 item.getInventoryPrmSpecID());
354 } else {
355 insert.setNull(CINDEX_InventoryParameterSpecID, Types.INTEGER);
356 }
357
358
359 if (null != item.getAntennaID()) {
360 insert.setInt(CINDEX_AntennaID, item.getAntennaID());
361 } else {
362 insert.setNull(CINDEX_AntennaID, Types.INTEGER);
363 }
364
365
366 if (null != item.getPeakRSSI()) {
367 insert.setShort(CINDEX_PeakRSSI, item.getPeakRSSI());
368 } else {
369 insert.setNull(CINDEX_PeakRSSI, Types.SMALLINT);
370 }
371
372
373 if (null != item.getChannelIndex()) {
374 insert.setInt(CINDEX_ChannelIndex, item.getChannelIndex());
375 } else {
376 insert.setNull(CINDEX_ChannelIndex, Types.INTEGER);
377 }
378
379
380 if (null != item.getFirstSeenUTC()) {
381 insert.setTimestamp(CINDEX_FirstSeenTimestampUTC,
382 item.getFirstSeenUTC());
383 } else {
384 insert.setNull(CINDEX_FirstSeenTimestampUTC, Types.TIMESTAMP);
385 }
386
387
388 if (null != item.getFirstSeenUptime()) {
389 insert.setTimestamp(CINDEX_FirstSeenTimestampUptime,
390 item.getFirstSeenUptime());
391 } else {
392 insert.setNull(CINDEX_FirstSeenTimestampUptime, Types.TIMESTAMP);
393 }
394
395
396 if (null != item.getLastSeenUTC()) {
397 insert.setTimestamp(CINDEX_LastSeenTimestampUTC,
398 item.getLastSeenUTC());
399 }else {
400 insert.setNull(CINDEX_LastSeenTimestampUTC, Types.TIMESTAMP);
401 }
402
403
404 if (null != item.getLastSeenUptime()) {
405 insert.setTimestamp(CINDEX_LastSeenTimestampUptime,
406 item.getLastSeenUptime());
407 }else {
408 insert.setNull(CINDEX_LastSeenTimestampUptime, Types.TIMESTAMP);
409 }
410
411
412 if (null != item.getTagSeenCount()) {
413 insert.setInt(CINDEX_TagSeenCount, item.getTagSeenCount());
414 } else {
415 insert.setNull(CINDEX_TagSeenCount, Types.INTEGER);
416 }
417
418
419 if (null != item.getC1g2_CRC()) {
420 insert.setInt(CINDEX_C1G2_CRC, item.getC1g2_CRC());
421 } else {
422 insert.setNull(CINDEX_C1G2_CRC, Types.INTEGER);
423 }
424 if (null != item.getC1g2_CRC()) {
425 insert.setInt(CINDEX_C1G2_PC, item.getC1g2_PC());
426 } else {
427 insert.setNull(CINDEX_C1G2_PC, Types.INTEGER);
428 }
429
430
431 if (null != item.getAccessSpecID()) {
432 insert.setLong(CINDEX_AccessSpecID, item.getAccessSpecID());
433 } else {
434 insert.setNull(CINDEX_AccessSpecID, Types.BIGINT);
435 }
436
437 insert.executeUpdate();
438 insert.close();
439
440 successfullyHandled++;
441 } catch (Exception e) {
442 log.debug("Could not log entry of RO_ACCESS_REPORT to the " +
443 "database - ignoring the entry.");
444 }
445 log.debug(
446 String.format("Successfully stored %s row(s) into database.",
447 successfullyHandled));
448 }
449 }
450
451 public List<ROAccessItem> getAll() throws Exception {
452 List<ROAccessItem> items = new LinkedList<ROAccessItem> ();
453 Statement s = repository.getDBConnection().createStatement();
454 String sql = String.format("SELECT * FROM %s", TABLE_RO_ACCESS_REPORTS);
455 ResultSet res = s.executeQuery(sql);
456
457 while (res.next()) {
458 ROAccessItem item = new ROAccessItem();
459
460 item.setLogTime(res.getTimestamp(CINDEX_LOGTIME));
461 item.setAdapterName(res.getString(CINDEX_ADAPTER));
462 item.setReaderName(res.getString(CINDEX_READER));
463 item.setEpc(res.getString(CINDEX_EPC));
464 item.setRoSpecID(res.getLong(CINDEX_ROSpecID));
465 item.setSpecIndex(res.getInt(CINDEX_SpecIndex));
466 item.setInventoryPrmSpecID(
467 res.getInt(CINDEX_InventoryParameterSpecID));
468 item.setAntennaID(res.getInt(CINDEX_AntennaID));
469 item.setPeakRSSI(res.getShort(CINDEX_PeakRSSI));
470 item.setChannelIndex(res.getInt(CINDEX_ChannelIndex));
471 item.setFirstSeenUTC(res.getTimestamp(
472 CINDEX_FirstSeenTimestampUTC));
473 item.setFirstSeenUptime(
474 res.getTimestamp(CINDEX_FirstSeenTimestampUptime));
475 item.setLastSeenUTC(
476 res.getTimestamp(CINDEX_LastSeenTimestampUTC));
477 item.setLastSeenUptime(
478 res.getTimestamp(CINDEX_LastSeenTimestampUptime));
479 item.setTagSeenCount(res.getInt(CINDEX_TagSeenCount));
480 item.setC1g2_CRC(res.getInt(CINDEX_C1G2_CRC));
481 item.setC1g2_PC(res.getInt(CINDEX_C1G2_PC));
482 item.setAccessSpecID(res.getLong(CINDEX_AccessSpecID));
483 items.add(item);
484 }
485 res.close();
486 return items;
487 }
488
489 public void clear() throws Exception {
490 Statement s = repository.getDBConnection().createStatement();
491 String sql = String.format("DELETE FROM %s", TABLE_RO_ACCESS_REPORTS);
492 s.execute(sql);
493 }
494
495
496
497
498
499
500 public static Timestamp extractTimestamp(UnsignedLong ulong) {
501 try {
502
503 BigInteger value = ulong.toBigInteger();
504 final long tsMillis = value.divide(new BigInteger("1000")).longValue();
505
506
507 final int l = value.toString().length();
508
509
510 String fractSeconds = value.toString().substring(l-6, l);
511 final int nanoseconds = Integer.parseInt(fractSeconds) * 1000;
512
513
514
515
516 Timestamp ts = new Timestamp(tsMillis);
517 ts.setNanos(nanoseconds);
518
519 return ts;
520 } catch (Exception e) {
521 e.printStackTrace();
522 }
523 return null;
524 }
525 }