View Javadoc

1   /*
2    *  
3    *  Fosstrak LLRP Commander (www.fosstrak.org)
4    * 
5    *  Copyright (C) 2008 ETH Zurich
6    *
7    *  This program is free software: you can redistribute it and/or modify
8    *  it under the terms of the GNU General Public License as published by
9    *  the Free Software Foundation, either version 3 of the License, or
10   *  (at your option) any later version.
11   *
12   *  This program is distributed in the hope that it will be useful,
13   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   *  GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License
18   *  along with this program.  If not, see <http://www.gnu.org/licenses/> 
19   *
20   */
21  
22  package org.fosstrak.llrp.client.repository.sql.roaccess;
23  
24  import java.math.BigInteger;
25  import java.sql.Connection;
26  import java.sql.DatabaseMetaData;
27  import java.sql.PreparedStatement;
28  import java.sql.ResultSet;
29  import java.sql.SQLException;
30  import java.sql.Statement;
31  import java.sql.Timestamp;
32  import java.sql.Types;
33  import java.util.LinkedList;
34  import java.util.List;
35  
36  import org.apache.log4j.Logger;
37  import org.fosstrak.llrp.adaptor.exception.LLRPRuntimeException;
38  import org.fosstrak.llrp.client.ROAccessReportsRepository;
39  import org.fosstrak.llrp.client.Repository;
40  import org.fosstrak.llrp.client.RepositoryFactory;
41  import org.fosstrak.llrp.client.repository.sql.DerbyRepository;
42  import org.llrp.ltk.generated.messages.RO_ACCESS_REPORT;
43  import org.llrp.ltk.types.LLRPMessage;
44  import org.llrp.ltk.types.UnsignedLong;
45  
46  /**
47   * Gives access to the database holding logged RO_ACCESS_REPORTS. There are 
48   * several constants helping you to simplify your SQL code. If your database 
49   * does not support the datatypes used in {@link DerbyROAccessReportsRepository} 
50   * you should subclass from this class and invoke the respective class from 
51   * your repository implementation {@link Repository}.
52   * @author sawielan
53   *
54   */
55  public abstract class AbstractSQLROAccessReportsRepository implements ROAccessReportsRepository {
56  	
57  	// the connection to the database.
58  	private Connection conn = null;
59  	
60  	// log4j logger.
61  	private static Logger log = Logger.getLogger(AbstractSQLROAccessReportsRepository.class);
62  	
63  	// flag whether the table is up and OK.
64  	private boolean tableOk = false;
65  	
66  	// only report connection error at the first time.
67  	private boolean reportErrorFirstTime = true;
68  	
69  	/** the name of the RO_ACCESS_REPORTS table. */
70  	public static final String TABLE_RO_ACCESS_REPORTS 
71  		= "table_ro_access_reports";
72  	
73  	// NOTICE: COLUMN INDEX IN DERBY BEGINS WITH 1
74  	/** column index of the log time.*/ 
75  	public static final int CINDEX_LOGTIME = 					1;
76  	
77  	/** column index of the adapter name.*/
78  	public static final int CINDEX_ADAPTER = 					2;
79  	
80  	/** column index of the reader name.*/
81  	public static final int CINDEX_READER = 					3;
82  	
83  	/** column index of the EPC value.*/
84  	public static final int CINDEX_EPC = 						4;
85  	
86  	/** column index of the RO spec ID.*/
87  	public static final int CINDEX_ROSpecID = 					5;
88  	
89  	/** column index of the spec index.*/
90  	public static final int CINDEX_SpecIndex = 					6;
91  	
92  	/** column index of the inventory parameter spec ID.*/
93  	public static final int CINDEX_InventoryParameterSpecID = 	7;
94  	
95  	/** column index of the antenna ID.*/
96  	public static final int CINDEX_AntennaID = 					8;
97  	
98  	/** column index of the peak RSSI.*/
99  	public static final int CINDEX_PeakRSSI = 					9;
100 	
101 	/** column index of the channel index.*/
102 	public static final int CINDEX_ChannelIndex = 				10;
103 	
104 	/** column index of the first seen time stamp in UTC.*/
105 	public static final int CINDEX_FirstSeenTimestampUTC = 		11;
106 	
107 	/** column index of the first seen time stamp since uptime.*/
108 	public static final int CINDEX_FirstSeenTimestampUptime = 	12;
109 	
110 	/** column index of the last seen time stamp in UTC.*/
111 	public static final int CINDEX_LastSeenTimestampUTC = 		13;
112 	
113 	/** column index of the last seen time stamp since uptime.*/
114 	public static final int CINDEX_LastSeenTimestampUptime = 	14;
115 	
116 	/** column index of the tag seen count.*/
117 	public static final int CINDEX_TagSeenCount = 				15;
118 	
119 	/** column index of the c1g2 crc.*/
120 	public static final int CINDEX_C1G2_CRC = 					16;
121 	
122 	/** column index of the c1g2 pc.*/
123 	public static final int CINDEX_C1G2_PC = 					17;
124 	
125 	/** column index of the access spec ID.*/
126 	public static final int CINDEX_AccessSpecID = 				18;
127 	
128 	/** the number of columns in the table. */
129 	public static final int NUM_COLUMNS = 18;
130 	
131 	/** the repository "owning" this item. */
132 	protected Repository repository;
133 	
134 	/** whether to wipe the database at startup or not. */
135 	protected boolean wipe = false;
136 	
137 	/**
138 	 * creates a new handle to the RO_ACCESS_REPORTS repository. The default 
139 	 * constructor uses the connection from the {@link DerbyRepository}.
140 	 */
141 	public AbstractSQLROAccessReportsRepository() {	
142 
143 	}
144 	
145 	public void setRepository(Repository repository) {
146 		this.repository = repository;
147 	}
148 	
149 	/**
150 	 * @return a SQL creating the necessary table.
151 	 */
152 	protected abstract String sqlCreateTable();
153 	
154 	/**
155 	 * @return a SQL dropping the table.
156 	 */
157 	protected abstract String sqlDropTable();
158 	
159 	/**
160 	 * @return a SQL allowing to insert a new log item.
161 	 */
162 	protected abstract String sqlInsert();
163 	
164 	/** flag, whether this repository is initialized or not. */
165 	protected boolean initialized = false;
166 	
167 	public void initialize(Repository repository) 
168 		throws LLRPRuntimeException {
169 		
170 		if (initialized) return; 
171 		
172 		wipe = Boolean.parseBoolean(repository.getArgs().get(
173 				RepositoryFactory.ARG_WIPE_RO_ACCESS_REPORTS_DB));
174 		this.repository = repository;
175 		
176 		conn = repository.getDBConnection();
177 		
178 		try {
179 			init();
180 		} catch (Exception e) {
181 			log.error("could not connect to database or database is corrupt: " + 
182 					e.getMessage());
183 			conn = null;
184 		}
185 		initialized = true;
186 	}
187 	
188 	/**
189 	 * initialize the table. (create it, if not existing yet).
190 	 * @throws Exception when the connection could not be established or if the 
191 	 * database is corrupted (and repair mode was not able to fix it).
192 	 */
193 	private void init() throws Exception {
194 		boolean recreate = false;
195 		if (!checkIfTableOk()) {
196 			recreate = true;
197 			log.error("table for RO_ACCESS_REPORT not ok, (re)create it.");
198 		}
199 		
200 		if (recreate || wipe) {
201 			dropTable();
202 			createTable();
203 		}
204 		
205 		tableOk = true;
206 	}
207 
208 	/**
209 	 * drop the log table.
210 	 */
211 	protected boolean dropTable() {
212 		try {
213 			Statement drop = conn.createStatement();
214 			drop.execute(sqlDropTable());
215 			
216 			log.info(String.format("Removed table '%s'", 
217 					TABLE_RO_ACCESS_REPORTS));			
218 		} catch (Exception e) {
219 			log.error(String.format("Could not remove table '%s': %s", 
220 					TABLE_RO_ACCESS_REPORTS, e.getMessage()));
221 			return false;
222 		}
223 		return true;
224 	}
225 
226 	/**
227 	 * create a new log table.
228 	 */
229 	protected boolean createTable() {
230 		try {
231 			String sqlCreate = sqlCreateTable();
232 			log.debug(String.format("creating table with SQL %s", sqlCreate));			
233 			Statement create = conn.createStatement();	
234 			create.execute(sqlCreate);
235 			create.close();
236 			
237 			log.info(String.format("Created table '%s'", 
238 					TABLE_RO_ACCESS_REPORTS));
239 		} catch (Exception e) {
240 			log.info(String.format("Could not create table '%s': %s",
241 					TABLE_RO_ACCESS_REPORTS, e.getMessage()));
242 			return false;
243 		}	
244 		return true;
245 	}
246 	
247 	/**
248 	 * checks whether the required tables exist or not.
249 	 * @return true if everything is OK, false otherwise.
250 	 */
251 	protected boolean checkIfTableOk() {		
252 		// we try to make a SQL query. if it fails, we assume the table to be dead...
253 		try {
254 			DatabaseMetaData dbMeta = conn.getMetaData();
255 			ResultSet resultSet = dbMeta.getColumns(
256 					null, null, TABLE_RO_ACCESS_REPORTS, null);
257 			int n = 0;
258 			while (resultSet.next()) {
259 				n++;
260 			}
261 			final int len = NUM_COLUMNS;
262 			if (n < len) {
263 				throw new SQLException(
264 						String.format("missing fields. %d instead of %d.",
265 							n, len));
266 			}
267 			 
268 		} catch (SQLException e) {
269 			log.error("table erroneous or missing.");
270 			return false;
271 		}
272 		return true;
273 	}
274 	
275 	/**
276 	 * @return true if the table is up and OK, false otherwise.
277 	 */
278 	public boolean isTableOK() {
279 		return tableOk;
280 	}
281 
282 	public void handle(String adapterName, String readerName,
283 			LLRPMessage message) {
284 		
285 		if ((null == conn) && reportErrorFirstTime) {
286 			log.error("connection to the repository could not be established.");
287 			
288 			return;
289 		}
290 		
291 		if (message instanceof RO_ACCESS_REPORT) {
292 			handleROAccessReport(adapterName, 
293 					readerName, (RO_ACCESS_REPORT) message);
294 		}
295 	}
296 
297 	/**
298 	 * write an RO_ACCESS_REPORT into the database.
299 	 * @param adapterName name of the adapter.
300 	 * @param readerName name of the reader.
301 	 * @param message the LLRP RO_ACCESS_REPORT to be logged.
302 	 */
303 	protected void handleROAccessReport(
304 			String adapterName, 
305 			String readerName,
306 			RO_ACCESS_REPORT message) {
307 		
308 		log.debug("logging RO_ACCESS_REPORT to database.");
309 		
310 		List<ROAccessItem> items = ROAccessItem.parse(
311 				message, adapterName, readerName, System.currentTimeMillis());
312 		int successfullyHandled = 0;
313 		for (ROAccessItem item : items) {
314 			try {
315 				
316 				String sqlInsert = sqlInsert();
317 				
318 				PreparedStatement insert = conn.prepareStatement(sqlInsert);
319 	
320 				// log time.
321 				insert.setTimestamp(CINDEX_LOGTIME, item.getLogTime());
322 				
323 				// adapter name.
324 				insert.setString(CINDEX_ADAPTER, item.getAdapterName());
325 				
326 				// reader name.
327 				insert.setString(CINDEX_READER, item.getReaderName());
328 				
329 				// store the EPC as EPC96 or EPCData
330 				if (null != item.getEpc()) {
331 					insert.setString(CINDEX_EPC, item.getEpc());
332 				} else {
333 					insert.setNull(CINDEX_EPC, Types.VARCHAR);
334 				}
335 				
336 				// RO Spec ID.
337 				if (null != item.getRoSpecID()) {
338 					insert.setLong(CINDEX_ROSpecID, item.getRoSpecID());
339 				} else {
340 					insert.setNull(CINDEX_ROSpecID, Types.BIGINT);
341 				}
342 				
343 				// spec index.
344 				if (null != item.getSpecIndex()) {
345 					insert.setInt(CINDEX_SpecIndex, item.getSpecIndex());
346 				} else {
347 					insert.setNull(CINDEX_SpecIndex, Types.INTEGER);
348 				}
349 				
350 				// inventory parameter spec ID.
351 				if (null != item.getInventoryPrmSpecID()) {
352 					insert.setInt(CINDEX_InventoryParameterSpecID, 
353 							item.getInventoryPrmSpecID());	
354 				} else {
355 					insert.setNull(CINDEX_InventoryParameterSpecID, Types.INTEGER);
356 				}
357 				
358 				// antenna ID.
359 				if (null != item.getAntennaID()) {
360 					insert.setInt(CINDEX_AntennaID, item.getAntennaID());
361 				} else {
362 					insert.setNull(CINDEX_AntennaID, Types.INTEGER);
363 				}
364 				
365 				// peak RSSI.
366 				if (null != item.getPeakRSSI()) {
367 					insert.setShort(CINDEX_PeakRSSI, item.getPeakRSSI());
368 				} else {
369 					insert.setNull(CINDEX_PeakRSSI, Types.SMALLINT);
370 				}
371 				
372 				// channel index.
373 				if (null != item.getChannelIndex()) {
374 					insert.setInt(CINDEX_ChannelIndex, item.getChannelIndex());
375 				} else {
376 					insert.setNull(CINDEX_ChannelIndex, Types.INTEGER);
377 				}
378 				
379 				// extract the first seen UTC time stamp.
380 				if (null != item.getFirstSeenUTC()) {
381 					insert.setTimestamp(CINDEX_FirstSeenTimestampUTC, 
382 							item.getFirstSeenUTC());
383 				} else {
384 					insert.setNull(CINDEX_FirstSeenTimestampUTC, Types.TIMESTAMP);
385 				}
386 				
387 				// extract the first seen since uptime time stamp.
388 				if (null != item.getFirstSeenUptime()) {
389 					insert.setTimestamp(CINDEX_FirstSeenTimestampUptime, 
390 							item.getFirstSeenUptime());
391 				} else {
392 					insert.setNull(CINDEX_FirstSeenTimestampUptime, Types.TIMESTAMP);
393 				}
394 				
395 				// extract the last seen time stamp UTC.
396 				if (null != item.getLastSeenUTC()) {
397 					insert.setTimestamp(CINDEX_LastSeenTimestampUTC, 
398 							item.getLastSeenUTC());
399 				}else {
400 					insert.setNull(CINDEX_LastSeenTimestampUTC, Types.TIMESTAMP);
401 				}
402 				
403 				// extract the last seen time stamp since uptime.
404 				if (null != item.getLastSeenUptime()) {
405 					insert.setTimestamp(CINDEX_LastSeenTimestampUptime, 
406 							item.getLastSeenUptime());
407 				}else {
408 					insert.setNull(CINDEX_LastSeenTimestampUptime, Types.TIMESTAMP);
409 				}
410 				
411 				// extract the tag count.
412 				if (null != item.getTagSeenCount()) {
413 					insert.setInt(CINDEX_TagSeenCount, item.getTagSeenCount());
414 				} else {
415 					insert.setNull(CINDEX_TagSeenCount, Types.INTEGER);
416 				}
417 				
418 				// crc
419 				if (null != item.getC1g2_CRC()) {
420 					insert.setInt(CINDEX_C1G2_CRC, item.getC1g2_CRC());
421 				} else {
422 					insert.setNull(CINDEX_C1G2_CRC, Types.INTEGER);
423 				}
424 				if (null != item.getC1g2_CRC()) {
425 					insert.setInt(CINDEX_C1G2_PC, item.getC1g2_PC());
426 				} else {
427 					insert.setNull(CINDEX_C1G2_PC, Types.INTEGER);
428 				}
429 				
430 				// extract the access spec ID.
431 				if (null != item.getAccessSpecID()) {
432 					insert.setLong(CINDEX_AccessSpecID, item.getAccessSpecID());
433 				} else {
434 					insert.setNull(CINDEX_AccessSpecID, Types.BIGINT);
435 				}
436 				
437 				insert.executeUpdate();
438 				insert.close();
439 				
440 				successfullyHandled++;
441 			} catch (Exception e) {
442 				log.debug("Could not log entry of RO_ACCESS_REPORT to the " +
443 						"database - ignoring the entry.");
444 			}
445 			log.debug(
446 					String.format("Successfully stored %s row(s) into database.",
447 							successfullyHandled));
448 		}
449 	}
450 	
451 	public List<ROAccessItem> getAll() throws Exception {
452 		List<ROAccessItem> items = new LinkedList<ROAccessItem> ();
453 		Statement s = repository.getDBConnection().createStatement();
454 		String sql = String.format("SELECT * FROM %s", TABLE_RO_ACCESS_REPORTS);
455 		ResultSet res = s.executeQuery(sql);
456 		
457 		while (res.next()) {
458 			ROAccessItem item = new ROAccessItem();
459 			
460 			item.setLogTime(res.getTimestamp(CINDEX_LOGTIME));
461 			item.setAdapterName(res.getString(CINDEX_ADAPTER));
462 			item.setReaderName(res.getString(CINDEX_READER));
463 			item.setEpc(res.getString(CINDEX_EPC));
464 			item.setRoSpecID(res.getLong(CINDEX_ROSpecID));
465 			item.setSpecIndex(res.getInt(CINDEX_SpecIndex));
466 			item.setInventoryPrmSpecID(
467 					res.getInt(CINDEX_InventoryParameterSpecID));	
468 			item.setAntennaID(res.getInt(CINDEX_AntennaID));
469 			item.setPeakRSSI(res.getShort(CINDEX_PeakRSSI));
470 			item.setChannelIndex(res.getInt(CINDEX_ChannelIndex));
471 			item.setFirstSeenUTC(res.getTimestamp(
472 					CINDEX_FirstSeenTimestampUTC));
473 			item.setFirstSeenUptime(
474 					res.getTimestamp(CINDEX_FirstSeenTimestampUptime));
475 			item.setLastSeenUTC(
476 					res.getTimestamp(CINDEX_LastSeenTimestampUTC));
477 			item.setLastSeenUptime(
478 					res.getTimestamp(CINDEX_LastSeenTimestampUptime));
479 			item.setTagSeenCount(res.getInt(CINDEX_TagSeenCount));
480 			item.setC1g2_CRC(res.getInt(CINDEX_C1G2_CRC));
481 			item.setC1g2_PC(res.getInt(CINDEX_C1G2_PC));
482 			item.setAccessSpecID(res.getLong(CINDEX_AccessSpecID));
483 			items.add(item);
484 		}
485 		res.close();
486 		return items;
487 	}
488 	
489 	public void clear() throws Exception {
490 		Statement s = repository.getDBConnection().createStatement();
491 		String sql = String.format("DELETE FROM %s", TABLE_RO_ACCESS_REPORTS);
492 		s.execute(sql);
493 	}
494 
495 	/**
496 	 * Creates a TimeStamp object from a {@link UnsignedLong} object.
497 	 * @param ulong the unsigned long TimeStamp object.
498 	 * @return a SQL {@link Timestamp} object.
499 	 */
500 	public static Timestamp extractTimestamp(UnsignedLong ulong) {
501 		try {
502 //			log.debug(String.format("Extracting timestamp '%s'", ulong.toString()));
503 			BigInteger value = ulong.toBigInteger();
504 			final long tsMillis = value.divide(new BigInteger("1000")).longValue();
505 //			log.debug(String.format("Timestamp in Milliseconds: %d", tsMillis));
506 			
507 			final int l = value.toString().length();
508 			// we need the milliseconds and the microseconds to assemble the 
509 			// fractional seconds part.
510 			String fractSeconds = value.toString().substring(l-6, l);
511 			final int nanoseconds = Integer.parseInt(fractSeconds) * 1000;
512 //			log.debug(String.format(
513 //					"Fractional Seconds Part: %s ms, Nanoseconds: %d ns",
514 //					fractSeconds, nanoseconds));
515 			
516 			Timestamp ts = new Timestamp(tsMillis);
517 			ts.setNanos(nanoseconds);
518 //			log.debug(String.format("Generated Timestamp: %s", ts.toString()));
519 			return ts;
520 		} catch (Exception e) {
521 			e.printStackTrace();
522 		}
523 		return null;
524 	}
525 }