1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package org.fosstrak.llrp.adaptor;
23
24 import java.rmi.RemoteException;
25 import java.rmi.server.UnicastRemoteObject;
26 import java.util.LinkedList;
27
28 import org.apache.log4j.Logger;
29 import org.fosstrak.llrp.adaptor.exception.LLRPRuntimeException;
30 import org.fosstrak.llrp.adaptor.util.AsynchronousNotifiableList;
31 import org.fosstrak.llrp.client.LLRPExceptionHandlerTypeMap;
32 import org.llrp.ltk.exceptions.InvalidLLRPMessageException;
33 import org.llrp.ltk.generated.LLRPMessageFactory;
34 import org.llrp.ltk.generated.enumerations.KeepaliveTriggerType;
35 import org.llrp.ltk.generated.messages.KEEPALIVE;
36 import org.llrp.ltk.generated.messages.SET_READER_CONFIG;
37 import org.llrp.ltk.generated.parameters.KeepaliveSpec;
38 import org.llrp.ltk.net.LLRPAcceptor;
39 import org.llrp.ltk.net.LLRPConnection;
40 import org.llrp.ltk.net.LLRPConnectionAttemptFailedException;
41 import org.llrp.ltk.net.LLRPConnector;
42 import org.llrp.ltk.net.LLRPEndpoint;
43 import org.llrp.ltk.net.LLRPIoHandlerAdapter;
44 import org.llrp.ltk.net.LLRPIoHandlerAdapterImpl;
45 import org.llrp.ltk.types.Bit;
46 import org.llrp.ltk.types.LLRPMessage;
47 import org.llrp.ltk.types.UnsignedInteger;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62 public class ReaderImpl extends UnicastRemoteObject implements LLRPEndpoint, Reader {
63
64
65
66
67 private static final long serialVersionUID = 1L;
68
69
70 private static Logger log = Logger.getLogger(ReaderImpl.class);
71
72
73 private LLRPConnection connector = null;
74
75
76 private Adaptor adaptor = null;
77
78
79 private AsynchronousNotifiableList toNotify = new AsynchronousNotifiableList();
80
81
82 public static final int DEFAULT_KEEPALIVE_PERIOD = 10000;
83
84
85 public static final int DEFAULT_MISS_KEEPALIVE = 3;
86
87
88 private boolean throwExceptionKeepAlive = true;
89
90
91 private ReaderMetaData metaData = new ReaderMetaData();
92
93
94 private LLRPIoHandlerAdapter handler = null;
95
96
97 private Thread wd = null;
98
99
100 private Thread outQueueWorker = null;
101
102
103 private Thread inQueueWorker = null;
104
105
106 private LinkedList<byte[]> inqueue = new LinkedList<byte[]> ();
107
108
109 private LinkedList<LLRPMessage> outqueue = new LinkedList<LLRPMessage> ();
110
111
112 public enum QueuePolicy {
113 DROP_QUEUE_ON_ERROR,
114 KEEP_QUEUE_ON_ERROR
115 };
116
117
118
119
120
121
122
123
124
125 public ReaderImpl(Adaptor adaptor, String readerName, String readerAddress) throws RemoteException {
126 this.adaptor = adaptor;
127 metaData._setAllowNKeepAliveMisses(DEFAULT_MISS_KEEPALIVE);
128 metaData._setKeepAlivePeriod(DEFAULT_KEEPALIVE_PERIOD);
129 metaData._setReaderName(readerName);
130 metaData._setReaderAddress(readerAddress);
131 }
132
133
134
135
136
137
138
139
140
141
142 public ReaderImpl(Adaptor adaptor, String readerName, String readerAddress, int port) throws RemoteException {
143 this.adaptor = adaptor;
144 metaData._setAllowNKeepAliveMisses(DEFAULT_MISS_KEEPALIVE);
145 metaData._setKeepAlivePeriod(DEFAULT_KEEPALIVE_PERIOD);
146 metaData._setReaderName(readerName);
147 metaData._setReaderAddress(readerAddress);
148 metaData._setPort(port);
149 }
150
151
152
153
154 public void connect(boolean clientInitiatedConnection) throws LLRPRuntimeException, RemoteException {
155
156 try {
157 String address = metaData.getReaderAddress();
158 metaData._setClientInitiated(clientInitiatedConnection);
159
160
161 metaData._newSession();
162
163 if (metaData.getPort() == -1) {
164 metaData._setPort(Constants.DEFAULT_LLRP_PORT);
165 log.warn("port for reader '" + metaData.getReaderName() + "' not specified. using default port " + metaData.getPort());
166 }
167
168 if (clientInitiatedConnection) {
169 if (address == null) {
170 log.error("address for reader '" + metaData.getReaderName() + "' is empty!");
171 reportException(new LLRPRuntimeException("address for reader '" + metaData.getReaderName() + "' is empty!"));
172 return;
173 }
174
175
176 LLRPConnector connector = new LLRPConnector(this, address, metaData.getPort());
177 connector.getHandler().setKeepAliveAck(true);
178 connector.getHandler().setKeepAliveForward(true);
179 try {
180 connector.connect();
181 } catch (LLRPConnectionAttemptFailedException e) {
182 log.error("connection attempt to reader " + metaData.getReaderName() + " failed");
183 reportException(new LLRPRuntimeException("connection attempt to reader " + metaData.getReaderName() + " failed"));
184 }
185
186 this.connector = connector;
187 } else {
188
189 LLRPAcceptor acceptor = new LLRPAcceptor(this, metaData.getPort());
190 handler = new LLRPIoHandlerAdapterImpl(acceptor);
191 handler.setKeepAliveAck(true);
192 acceptor.getHandler().setKeepAliveForward(true);
193 acceptor.setHandler(handler);
194 try {
195 acceptor.bind();
196 } catch (LLRPConnectionAttemptFailedException e) {
197 log.error("could not bind acceptor for reader " + metaData.getReaderName());
198 reportException(new LLRPRuntimeException("could not bind acceptor for reader " + metaData.getReaderName() ));
199 }
200
201 this.connector = acceptor;
202 }
203 metaData._setConnected(true);
204
205 outQueueWorker = new Thread(getOutQueueWorker());
206 outQueueWorker.start();
207
208 inQueueWorker = new Thread(getInQueueWorker());
209 inQueueWorker.start();
210
211
212 if (clientInitiatedConnection) {
213 enableHeartBeat();
214 }
215 log.info(String.format("reader %s connected.", metaData.getReaderName()));
216
217 } catch (Exception e) {
218
219 LLRPRuntimeException ex = new LLRPRuntimeException(
220 String.format("Could not connect to reader %s on adapter %s:\nException: %s",
221 getReaderName(), adaptor.getAdaptorName(), e.getMessage()));
222 reportException(ex);
223 throw ex;
224 }
225 }
226
227
228
229
230 public void disconnect() throws RemoteException {
231 log.debug("disconnecting the reader.");
232 setReportKeepAlive(false);
233
234 if (connector != null) {
235 try {
236 if (connector instanceof LLRPConnector) {
237
238 ((LLRPConnector)connector).disconnect();
239 } else if (connector instanceof LLRPAcceptor) {
240
241 ((LLRPAcceptor)connector).close();
242 }
243 } catch (Exception e) {
244 connector = null;
245 }
246 }
247
248 metaData._setConnected(false);
249
250
251 if (null != outQueueWorker) {
252 outQueueWorker.interrupt();
253 }
254
255
256 if (null != inQueueWorker) {
257 inQueueWorker.interrupt();
258 }
259
260
261 if (null != wd) {
262 wd.interrupt();
263 }
264 }
265
266
267
268
269 public void reconnect() throws LLRPRuntimeException, RemoteException {
270
271 disconnect();
272 connect(isClientInitiated());
273 }
274
275
276
277
278 public void send(byte[] message) throws RemoteException {
279 if (!metaData.isConnected() || (connector == null)) {
280 reportException(new LLRPRuntimeException(String.format("reader %s is not connected", metaData.getReaderName())));
281 return;
282 }
283
284
285 LLRPMessage llrpMessage = null;
286 try {
287 llrpMessage = LLRPMessageFactory.createLLRPMessage(message);
288 } catch (InvalidLLRPMessageException e) {
289 reportException(new LLRPRuntimeException(e.getMessage()));
290 }
291
292 if (llrpMessage == null) {
293 log.warn(String.format("do not send empty llrp message on reader %s", metaData.getReaderName()));
294 return;
295 }
296
297
298 synchronized (outqueue) {
299 outqueue.add(llrpMessage);
300 outqueue.notifyAll();
301 }
302 }
303
304
305
306
307
308
309 private void sendLLRPMessage(LLRPMessage llrpMessage) throws RemoteException {
310 try {
311
312 connector.send(llrpMessage);
313 metaData._packageSent();
314 } catch (NullPointerException npe) {
315
316
317 disconnect();
318 reportException(new LLRPRuntimeException(String.format("reader %s is not connected", metaData.getReaderName()),
319 LLRPExceptionHandlerTypeMap.EXCEPTION_READER_LOST));
320 } catch (Exception e) {
321
322 disconnect();
323 }
324 }
325
326
327
328
329 public boolean isConnected() throws RemoteException {
330 return metaData.isConnected();
331 }
332
333
334
335
336
337 public void errorOccured(String message) {
338 reportException(new LLRPRuntimeException(message));
339 }
340
341
342
343
344
345 public void messageReceived(LLRPMessage message) {
346 if (message == null) {
347 return;
348 }
349 byte[] binaryEncoded;
350 try {
351 binaryEncoded = message.encodeBinary();
352 } catch (InvalidLLRPMessageException e1) {
353 reportException(new LLRPRuntimeException(e1.getMessage()));
354 return;
355 }
356 metaData._packageReceived();
357 if (message instanceof KEEPALIVE) {
358
359 metaData._setAlive(true);
360 log.debug("received keepalive message from the reader:" + metaData.getReaderName());
361 if (!metaData.isReportKeepAlive()) {
362 return;
363 }
364 }
365
366
367 synchronized (inqueue) {
368 inqueue.add(binaryEncoded);
369 inqueue.notifyAll();
370 }
371 }
372
373
374
375
376
377 private void deliverMessage(byte[] binaryEncoded) {
378 try {
379 adaptor.messageReceivedCallback(binaryEncoded, metaData.getReaderName());
380 } catch (RemoteException e) {
381 reportException(new LLRPRuntimeException(e.getMessage()));
382 }
383
384
385 try {
386 toNotify.notify(binaryEncoded, metaData.getReaderName());
387 } catch (RemoteException e) {
388 reportException(new LLRPRuntimeException(e.getMessage()));
389 }
390 }
391
392
393
394
395 public String getReaderAddress() throws RemoteException {
396 return metaData.getReaderAddress();
397 }
398
399
400
401
402 public int getPort() throws RemoteException {
403 return metaData.getPort();
404 }
405
406
407
408
409 public boolean isClientInitiated() throws RemoteException {
410 return metaData.isClientInitiated();
411 }
412
413 public void setClientInitiated(boolean clientInitiated)
414 throws RemoteException {
415 metaData._setClientInitiated(clientInitiated);
416 }
417
418
419
420
421 public void registerForAsynchronous(AsynchronousNotifiable receiver) throws RemoteException {
422 toNotify.add(receiver);
423 }
424
425
426
427
428 public void deregisterFromAsynchronous(AsynchronousNotifiable receiver) throws RemoteException {
429 toNotify.remove(receiver);
430 }
431 public String getReaderName() throws RemoteException {
432 return metaData.getReaderName();
433 }
434
435 public boolean isConnectImmediate() throws RemoteException {
436 return metaData.isConnectImmediately();
437 }
438
439 public void setConnectImmediate(boolean value) throws RemoteException {
440 metaData._setConnectImmediately(value);
441 }
442
443
444
445
446
447
448
449 private void reportException(LLRPRuntimeException e) {
450 if (adaptor == null) {
451 log.error("no adaptor to report exception to on reader: " + metaData.getReaderName());
452 return;
453 }
454 try {
455 adaptor.errorCallback(e, metaData.getReaderName());
456 } catch (RemoteException e1) {
457
458 log.debug(e1.getStackTrace().toString());
459 }
460 }
461
462
463
464
465 private void enableHeartBeat() {
466
467 SET_READER_CONFIG sr = new SET_READER_CONFIG();
468 KeepaliveSpec ks = new KeepaliveSpec();
469 ks.setKeepaliveTriggerType(new KeepaliveTriggerType(KeepaliveTriggerType.Periodic));
470 ks.setPeriodicTriggerValue(new UnsignedInteger(metaData.getKeepAlivePeriod()));
471
472 sr.setKeepaliveSpec(ks);
473 sr.setResetToFactoryDefault(new Bit(0));
474
475 log.debug(String.format("using keepalive periode: %d", metaData.getKeepAlivePeriod()));
476 try {
477 send(sr.encodeBinary());
478 } catch (RemoteException e) {
479 if (throwExceptionKeepAlive) {
480 reportException(new LLRPRuntimeException("Could not install keepalive message: " + e.getMessage()));
481 } else {
482 e.printStackTrace();
483 }
484 } catch (InvalidLLRPMessageException e) {
485 e.printStackTrace();
486 }
487
488
489 wd = new Thread(new Runnable() {
490 public void run() {
491 log.debug("starting connection watchdog.");
492 try {
493 while (isConnected()) {
494 try {
495 Thread.sleep(metaData.getAllowNKeepAliveMisses() * metaData.getKeepAlivePeriod());
496 if (!metaData.isAlive()) {
497 log.debug("connection timed out...");
498 disconnect();
499 if (throwExceptionKeepAlive) {
500 reportException(new LLRPRuntimeException("Connection timed out",
501 LLRPExceptionHandlerTypeMap.EXCEPTION_READER_LOST));
502 }
503 }
504 metaData._setAlive(false);
505 } catch (InterruptedException e) {
506 log.debug("received interrupt - stopping watchdog.");
507 }
508
509 }
510 } catch (RemoteException e) {
511 e.printStackTrace();
512 }
513 log.debug("connection watchdog stopped.");
514 }
515 });
516 wd.start();
517 }
518
519
520
521
522 public int getKeepAlivePeriod() throws RemoteException {
523 return metaData.getKeepAlivePeriod();
524 }
525
526
527
528
529 public void setKeepAlivePeriod(int keepAlivePeriod, int times,
530 boolean report, boolean throwException) throws RemoteException {
531
532 metaData._setKeepAlivePeriod(keepAlivePeriod);
533 metaData._setAllowNKeepAliveMisses(times);
534 metaData._setReportKeepAlive(report);
535 this.throwExceptionKeepAlive = throwException;
536
537 }
538
539 public void setReportKeepAlive(boolean report) throws RemoteException {
540 metaData._setReportKeepAlive(report);
541 }
542
543 public boolean isReportKeepAlive() throws RemoteException {
544 return metaData.isReportKeepAlive();
545 }
546
547 public final ReaderMetaData getMetaData() throws RemoteException {
548 return new ReaderMetaData(metaData);
549 }
550
551
552
553
554
555
556 private Runnable getOutQueueWorker() {
557 final LinkedList<LLRPMessage> queue = outqueue;
558 return new Runnable() {
559 public void run() {
560 try {
561 while (true) {
562 synchronized (queue) {
563 while (queue.isEmpty()) queue.wait();
564
565 LLRPMessage msg = queue.removeFirst();
566 try {
567 sendLLRPMessage(msg);
568 } catch (RemoteException e) {
569 log.debug(String.format(
570 "Could not send message: %s",
571 e.getMessage()));
572 }
573 }
574 }
575 } catch (InterruptedException e) {
576 log.debug("stopping out queue worker.");
577 }
578 }
579
580 };
581 }
582
583
584
585
586
587
588 private Runnable getInQueueWorker() {
589 final LinkedList<byte[]> queue = inqueue;
590 return new Runnable() {
591 public void run() {
592 try {
593 while (true) {
594 synchronized (queue) {
595 while (queue.isEmpty()) queue.wait();
596
597 byte[] msg = queue.removeFirst();
598 deliverMessage(msg);
599 }
600 }
601 } catch (InterruptedException e) {
602 log.debug("stopping in queue worker.");
603 }
604 }
605 };
606 }
607
608 }