1 /*
2 *
3 * Fosstrak LLRP Commander (www.fosstrak.org)
4 *
5 * Copyright (C) 2008 ETH Zurich
6 *
7 * This program is free software: you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation, either version 3 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program. If not, see <http://www.gnu.org/licenses/>
19 *
20 */
21
22 package org.fosstrak.llrp.adaptor;
23
24 import java.util.LinkedList;
25
26 import org.fosstrak.llrp.adaptor.queue.QueueEntry;
27 import org.fosstrak.llrp.client.LLRPExceptionHandlerTypeMap;
28 import org.fosstrak.llrp.adaptor.exception.LLRPRuntimeException;
29 import org.llrp.ltk.exceptions.InvalidLLRPMessageException;
30
31 /**
32 * an LLRPAdaptorWorker holds an Adaptor and a callback. the worker
33 * enqueues messages and dispatches them to the corresponding reader.
34 * through the callback it can retrieve messages that then will be
35 * dispatched to the MessageRepository.
36 * @author sawielan
37 *
38 */
39 public class AdaptorWorker implements Runnable {
40 /** the worker does not accept more messages in the queue than this threshold. */
41 public static final int QUEUE_THRESHOLD = 100;
42
43 /** the callback for asynchronous message retrieval. */
44 private AdaptorCallback callback = null;
45
46 /** the adaptor holding the connection to the readers. */
47 private Adaptor adaptor = null;
48
49 /** the queue holding messages to be sent to readers. */
50 private LinkedList<QueueEntry> outQueue = new LinkedList<QueueEntry> ();
51
52 /** as long as this value is set to true the worker will accept and process messages. */
53 private boolean isRunning = true;
54
55 /** the ip address of this adaptor. if its the local adaptor it returns null. */
56 private String adaptorIpAddress = null;
57
58 /**
59 * the number of connection failures. The initial value is chosen in
60 * such a way that upon startup erroneous adaptors get cleaned out.
61 * */
62 private int connFailures = 2;
63
64 /** the number of allowed connection failures between adaptor and client. */
65 public static final int MAX_CONN_FAILURES = 3;
66
67 /**
68 * creates a new LLRPAdaptorWorker.
69 * @param callback the callback for asynchronous message retrieval.
70 * @param adaptor the adaptor holding the connection to the readers.
71 */
72 public AdaptorWorker(AdaptorCallback callback, Adaptor adaptor) {
73 this.callback = callback;
74 callback.setWorker(this);
75
76 this.adaptor = adaptor;
77 }
78 public void run() {
79 while (isRunning) {
80 try {
81 QueueEntry entry = null;
82 synchronized (outQueue) {
83 // if the outQueue is empty we put this thread to sleep.
84 // when someone posts a new message to the queue to
85 // poster has to call the notifyAll() method and by
86 // that awake this thread.
87 while (outQueue.isEmpty()) outQueue.wait();
88 // the thread has been awakened and is now able to
89 // process the first element in the queue.
90 entry = outQueue.removeFirst();
91
92 process(entry);
93 }
94 } catch (InterruptedException e) {
95 e.printStackTrace();
96 }
97 }
98
99 }
100
101 /**
102 * call this method if you want to stop the worker thread.
103 */
104 public void tearDown() {
105 // set isRunning to false.
106 isRunning = false;
107
108 // wakeup the thread to let it stop.
109 synchronized (outQueue) {
110 outQueue.notifyAll();
111 }
112 }
113
114 /**
115 * send a queued element through the adaptor to the reader.
116 * @param entry the queued element to be sent.
117 */
118 private void process(QueueEntry entry) {
119 try {
120 getAdaptor().sendLLRPMessage(entry.getReaderName(), entry.getMessage().encodeBinary());
121 } catch (LLRPRuntimeException e) {
122 AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
123 LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SENDING_ERROR,
124 entry.getAdaptorName(),
125 entry.getReaderName());
126 } catch (InvalidLLRPMessageException e) {
127 AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
128 LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SYNTAX_ERROR,
129 entry.getAdaptorName(),
130 entry.getReaderName());
131
132 } catch (Exception e) {
133 AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
134 LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SENDING_ERROR,
135 entry.getAdaptorName(),
136 entry.getReaderName());
137 }
138 }
139
140 /**
141 * returns the callback for asynchronous message retrieval.
142 * @return the callback for asynchronous message retrieval.
143 */
144 public AdaptorCallback getCallback() {
145 return callback;
146 }
147
148 /**
149 * sets the callback for asynchronous message retrieval.
150 * @param callback the callback for asynchronous message retrieval.
151 */
152 public void setCallback(AdaptorCallback callback) {
153 this.callback = callback;
154 }
155
156 /**
157 * the adaptor holding the connection to the readers.
158 * @return the adaptor holding the connection to the readers.
159 */
160 public Adaptor getAdaptor() {
161 return adaptor;
162 }
163
164 /**
165 * sets the adaptor holding the connection to the readers.
166 * @param adaptor the adaptor holding the connection to the readers.
167 */
168 public void setAdaptor(Adaptor adaptor) {
169 this.adaptor = adaptor;
170 }
171
172 /**
173 * signals whether this worker is ready to accept messages.
174 * @return true if ok, else otherwise.
175 */
176 public boolean isReady() {
177 if (outQueue.size() >= QUEUE_THRESHOLD) {
178 return false;
179 }
180 return true;
181 }
182
183 /**
184 * enqueues a message to be sent.
185 * @param e the queue element holding the message.
186 * @throws LLRPRuntimeException when worker is not ready or queue is full
187 */
188 public void enqueue(QueueEntry e) throws LLRPRuntimeException {
189 if (!isReady()) {
190 throw new LLRPRuntimeException("Queue is full or worker not ready.");
191 }
192 synchronized (outQueue) {
193 outQueue.add(e);
194 outQueue.notifyAll();
195 }
196 }
197
198 /**
199 * returns the ip address of this adaptor. if its the local adaptor it returns null.
200 * @return null if local adaptor else the address of the adaptor.
201 */
202 public String getAdaptorIpAddress() {
203 return adaptorIpAddress;
204 }
205
206 /**
207 * sets the address of this adaptor. if its the local adaptor set null.
208 * @param adaptorIpAddress the address of the adaptor.
209 */
210 public void setAdaptorIpAddress(String adaptorIpAddress) {
211 this.adaptorIpAddress = adaptorIpAddress;
212 }
213
214 /**
215 * increases the connection failure counter by one.
216 */
217 public void reportConnFailure() {
218 connFailures++;
219 }
220
221 /**
222 * resets the connection failure counter to zero.
223 */
224 public void cleanConnFailure() {
225 connFailures = 0;
226 }
227
228 /**
229 * @return true if connection to adaptor is alright, false otherwise.
230 */
231 public boolean ok() {
232 return (connFailures < MAX_CONN_FAILURES);
233 }
234 }