View Javadoc

1   /*
2    *  
3    *  Fosstrak LLRP Commander (www.fosstrak.org)
4    * 
5    *  Copyright (C) 2008 ETH Zurich
6    *
7    *  This program is free software: you can redistribute it and/or modify
8    *  it under the terms of the GNU General Public License as published by
9    *  the Free Software Foundation, either version 3 of the License, or
10   *  (at your option) any later version.
11   *
12   *  This program is distributed in the hope that it will be useful,
13   *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14   *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15   *  GNU General Public License for more details.
16   *
17   *  You should have received a copy of the GNU General Public License
18   *  along with this program.  If not, see <http://www.gnu.org/licenses/> 
19   *
20   */
21  
22  package org.fosstrak.llrp.adaptor;
23  
24  import java.util.LinkedList;
25  
26  import org.fosstrak.llrp.adaptor.queue.QueueEntry;
27  import org.fosstrak.llrp.client.LLRPExceptionHandlerTypeMap;
28  import org.fosstrak.llrp.adaptor.exception.LLRPRuntimeException;
29  import org.llrp.ltk.exceptions.InvalidLLRPMessageException;
30  
31  /**
32   * an LLRPAdaptorWorker holds an Adaptor and a callback. the worker
33   * enqueues messages and dispatches them to the corresponding reader.
34   * through the callback it can retrieve messages that then will be 
35   * dispatched to the MessageRepository.
36   * @author sawielan
37   *
38   */
39  public class AdaptorWorker implements Runnable {
40  	/** the worker does not accept more messages in the queue than this threshold. */
41  	public static final int QUEUE_THRESHOLD = 100;
42  	
43  	/** the callback for asynchronous message retrieval. */
44  	private AdaptorCallback callback = null;
45  	
46  	/** the adaptor holding the connection to the readers. */
47  	private Adaptor adaptor = null;	
48  	
49  	/** the queue holding messages to be sent to readers. */
50  	private LinkedList<QueueEntry> outQueue = new LinkedList<QueueEntry> ();
51  	
52  	/** as long as this value is set to true the worker will accept and process messages. */
53  	private boolean isRunning = true;
54  	
55  	/** the ip address of this adaptor. if its the local adaptor it returns null. */
56  	private String adaptorIpAddress = null;
57  	
58  	/** 
59  	 * the number of connection failures. The initial value is chosen in 
60  	 * such a way that upon startup erroneous adaptors get cleaned out. 
61  	 * */
62  	private int connFailures = 2;
63  	
64  	/** the number of allowed connection failures between adaptor and client. */
65  	public static final int MAX_CONN_FAILURES = 3;
66  	
67  	/**
68  	 * creates a new LLRPAdaptorWorker. 
69  	 * @param callback the callback for asynchronous message retrieval. 
70  	 * @param adaptor the adaptor holding the connection to the readers. 
71  	 */
72  	public AdaptorWorker(AdaptorCallback callback, Adaptor adaptor) {
73  		this.callback = callback;
74  		callback.setWorker(this);
75  		
76  		this.adaptor = adaptor;
77  	}
78  	public void run() {
79  		while (isRunning) {
80  			try {
81  				QueueEntry entry = null;
82  				synchronized (outQueue) {
83  					// if the outQueue is empty we put this thread to sleep. 
84  					// when someone posts a new message to the queue to 
85  					// poster has to call the notifyAll() method and by 
86  					// that awake this thread.
87  					while (outQueue.isEmpty()) outQueue.wait();
88  					// the thread has been awakened and is now able to 
89  					// process the first element in the queue.
90  					entry = outQueue.removeFirst();
91  					
92  					process(entry);
93  				}
94  			} catch (InterruptedException e) {
95  				e.printStackTrace();
96  			}
97  		}
98  		
99  	}
100 	
101 	/**
102 	 * call this method if you want to stop the worker thread.
103 	 */
104 	public void tearDown() {
105 		// set isRunning to false.
106 		isRunning = false;
107 		
108 		// wakeup the thread to let it stop.
109 		synchronized (outQueue) {
110 			outQueue.notifyAll();
111 		}
112 	}
113 	
114 	/**
115 	 * send a queued element through the adaptor to the reader.
116 	 * @param entry the queued element to be sent.
117 	 */
118 	private void process(QueueEntry entry) {
119 		try {
120 			getAdaptor().sendLLRPMessage(entry.getReaderName(), entry.getMessage().encodeBinary());
121 		} catch (LLRPRuntimeException e) {
122 			AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
123 					LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SENDING_ERROR,
124 					entry.getAdaptorName(),
125 					entry.getReaderName());
126 		} catch (InvalidLLRPMessageException e) {
127 			AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
128 					LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SYNTAX_ERROR,
129 					entry.getAdaptorName(),
130 					entry.getReaderName());
131 					
132 		} catch (Exception e) {
133 			AdaptorManagement.getInstance().postException(new LLRPRuntimeException(e.getMessage()),
134 					LLRPExceptionHandlerTypeMap.EXCEPTION_MSG_SENDING_ERROR,
135 					entry.getAdaptorName(),
136 					entry.getReaderName());
137 		}
138 	}
139 
140 	/**
141 	 * returns the callback for asynchronous message retrieval. 
142 	 * @return the callback for asynchronous message retrieval. 
143 	 */
144 	public AdaptorCallback getCallback() {
145 		return callback;
146 	}
147 
148 	/**
149 	 * sets the callback for asynchronous message retrieval. 
150 	 * @param callback the callback for asynchronous message retrieval. 
151 	 */
152 	public void setCallback(AdaptorCallback callback) {
153 		this.callback = callback;
154 	}
155 
156 	/**
157 	 * the adaptor holding the connection to the readers. 
158 	 * @return the adaptor holding the connection to the readers.
159 	 */
160 	public Adaptor getAdaptor() {
161 		return adaptor;
162 	}
163 
164 	/**
165 	 * sets the adaptor holding the connection to the readers.
166 	 * @param adaptor the adaptor holding the connection to the readers.
167 	 */
168 	public void setAdaptor(Adaptor adaptor) {
169 		this.adaptor = adaptor;
170 	}
171 	
172 	/**
173 	 * signals whether this worker is ready to accept messages.
174 	 * @return true if ok, else otherwise.
175 	 */
176 	public boolean isReady() {
177 		if (outQueue.size() >= QUEUE_THRESHOLD) {
178 			return false;
179 		}
180 		return true;
181 	}
182 	
183 	/**
184 	 * enqueues a message to be sent.
185 	 * @param e the queue element holding the message.
186 	 * @throws LLRPRuntimeException when worker is not ready or queue is full
187 	 */
188 	public void enqueue(QueueEntry e) throws LLRPRuntimeException {
189 		if (!isReady()) {
190 			throw new LLRPRuntimeException("Queue is full or worker not ready.");
191 		}
192 		synchronized (outQueue) {
193 			outQueue.add(e);
194 			outQueue.notifyAll();
195 		}
196 	}
197 
198 	/**
199 	 * returns the ip address of this adaptor. if its the local adaptor it returns null.
200 	 * @return null if local adaptor else the address of the adaptor.
201 	 */
202 	public String getAdaptorIpAddress() {
203 		return adaptorIpAddress;
204 	}
205 
206 	/**
207 	 * sets the address of this adaptor. if its the local adaptor set null.
208 	 * @param adaptorIpAddress the address of the adaptor.
209 	 */
210 	public void setAdaptorIpAddress(String adaptorIpAddress) {
211 		this.adaptorIpAddress = adaptorIpAddress;
212 	}
213 	
214 	/**
215 	 * increases the connection failure counter by one.
216 	 */
217 	public void reportConnFailure() {
218 		connFailures++;
219 	}
220 	
221 	/**
222 	 * resets the connection failure counter to zero.
223 	 */
224 	public void cleanConnFailure() {
225 		connFailures = 0;
226 	}
227 	
228 	/**
229 	 * @return true if connection to adaptor is alright, false otherwise.
230 	 */
231 	public boolean ok() {
232 		return (connFailures < MAX_CONN_FAILURES);
233 	}
234 }