WIP161213

master
Harald Wolff 2016-12-14 10:50:49 +01:00
parent 984dd94fd1
commit 0309f98a3b
6 changed files with 99 additions and 50 deletions

View File

@ -43,7 +43,6 @@ public class AsynchronServiceLinkProvider {
if ((telegram.getOpcode() & ServiceLinkTelegram.REQ_READ) != 0){
int hash = cache.calcHash(telegram.getAddress(), ((telegram.getOpcode() & ServiceLinkTelegram.REQ_FLOAT) == ServiceLinkTelegram.REQ_FLOAT));
log(DEBUGDETAIL,"servicelink cache update %s = %s", telegram.getAddress(), telegram.getValue());
cache.updateValue(hash, telegram.getValue());

View File

@ -4,6 +4,7 @@ import static org.hwo.logging.Logging.log;
import java.awt.DisplayMode;
import java.io.IOException;
import java.nio.channels.ShutdownChannelGroupException;
import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
@ -53,10 +54,12 @@ public class ServiceLink implements NewSerialPortListener {
int requestTimeOut;
boolean useV2 = true;
boolean forceSynchronousRequests;
LinkedList<ServiceLinkTelegram> txQueue;
Hashtable<ServiceLinkAddress, List<ServiceLinkTelegram>>
pendingRequests;
int pendingRequestsCount;
RxWorker rxWorker;
TxWorker txWorker;
@ -65,7 +68,7 @@ public class ServiceLink implements NewSerialPortListener {
public ServiceLink(NewSerialPort serialPort)
{
this.requestTimeOut = 50;
this.requestTimeOut = 25;
this.serviceLinkListeners = new LinkedList<ServiceLinkListener>();
this.retries = 3;
@ -82,6 +85,7 @@ public class ServiceLink implements NewSerialPortListener {
this.txQueue = new LinkedList<>();
this.pendingRequests = new Hashtable<>();
}
/** Handling der Seriellen Verbindung **/
@ -150,6 +154,13 @@ public class ServiceLink implements NewSerialPortListener {
this.useV2 = useV2;
}
public boolean isForceSynchronousRequests() {
return forceSynchronousRequests;
}
public void setForceSynchronousRequests(boolean forceSynchronousRequests) {
this.forceSynchronousRequests = forceSynchronousRequests;
}
private void addPendingRequest(ServiceLinkTelegram request){
synchronized (pendingRequests) {
if (!pendingRequests.containsKey(request.getAddress())){
@ -157,19 +168,22 @@ public class ServiceLink implements NewSerialPortListener {
}
List<ServiceLinkTelegram> ll = pendingRequests.get(request.getAddress());
ll.add(request);
pendingRequestsCount++;
}
}
private void removePendingRequest(ServiceLinkTelegram request){
synchronized (pendingRequests) {
if (pendingRequests.containsKey(request.getAddress())){
pendingRequests.get(request.getAddress()).remove(request);
pendingRequestsCount--;
}
}
}
public void appendRequest(ServiceLinkTelegram request){
addPendingRequest(request);
addPendingRequest(request);
synchronized (txQueue) {
txQueue.add(request);
txQueue.notify();
@ -192,8 +206,24 @@ public class ServiceLink implements NewSerialPortListener {
public void queueRequest(ServiceLinkTelegram request){
long ts_start = System.currentTimeMillis();
appendRequest(request);
log(DEBUG,"QUEUE: %s",request);
if (forceSynchronousRequests){
log(DEBUGDETAIL,"SYNCH.");
while (true){
synchronized (pendingRequests) {
if (pendingRequestsCount == 0){
appendRequest(request);
break;
}
}
}
} else {
appendRequest(request);
}
log(DEBUGDETAIL,"WAIT: %s",request);
synchronized (request) {
try {
request.wait(requestTimeOut);
@ -206,23 +236,19 @@ public class ServiceLink implements NewSerialPortListener {
requestTime.cycle( (int)(System.currentTimeMillis() - ts_start) );
log(DEBUG,"ServiceLink: queuedRequest: %s Value: %s",request.getAddress().toString(),request.value);
log(DEBUG,"DONE: %s",request);
}
public void received(ServiceLinkTelegram rx){
synchronized (pendingRequests) {
if (pendingRequests.containsKey(rx.getAddress())){
for (ServiceLinkTelegram pending: pendingRequests.get(rx.getAddress())){
log(DEBUGDETAIL,"Found Pending: %s", pending);
if ((pending.getOpcode() & (REQ_INT | REQ_FLOAT))==(rx.getOpcode() & (REQ_INT | REQ_FLOAT))){
synchronized (pending) {
pending.setValue(rx.getValue());
log(DEBUGDETAIL,"req.handling time: %d",rx.getTimestamp() - pending.getTimestamp());
pending.notifyAll();
}
}
@ -344,6 +370,9 @@ public class ServiceLink implements NewSerialPortListener {
private boolean exit;
public synchronized void exit(){
if (!isAlive())
return;
exit = true;
this.notifyAll();
try {
@ -371,17 +400,17 @@ public class ServiceLink implements NewSerialPortListener {
}
try {
rx = useV2 ?
new ServiceLinkV2Telegram(getSerialPort().getInputStream()) :
new ServiceLinkV1Telegram(getSerialPort().getInputStream()) ;
if (rx != null){
if (!rx.isNull()){
received(rx);
};
}
} catch (Exception io){
rx = useV2 ?
new ServiceLinkV2Telegram() :
new ServiceLinkV1Telegram() ;
if (rx.read(getSerialPort().getInputStream())){
received(rx);
};
} catch (Exception io){
log(io);
ServiceLink.this.close();
};
}
}
@ -395,6 +424,9 @@ public class ServiceLink implements NewSerialPortListener {
}
public synchronized void exit(){
if (!isAlive())
return;
exit = true;
try {
this.wait();
@ -430,16 +462,10 @@ public class ServiceLink implements NewSerialPortListener {
byte[] txbytes = tx.toByteArray();
try {
log(DEBUGDETAIL,"TX BYTES: %s",ByteArrayHexlifier.byteArrayToString(txbytes));
serialPort.getOutputStream().write(txbytes);
/* try {
sleep(10);
} catch (InterruptedException e) {
log(e);
}
*/
} catch (IOException e) {
log(e);
serialPort.getOutputStream().write(txbytes);
} catch (IOException io) {
log(io);
ServiceLink.this.close();
}
synchronized (txQueue) {

View File

@ -2,6 +2,7 @@ package org.hwo.servicelink;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.io.InputStream;
import java.lang.Thread.State;
import static org.hwo.logging.LogLevel.*;
import static org.hwo.logging.Logging.*;
@ -63,6 +64,7 @@ public abstract class ServiceLinkTelegram {
}
public abstract byte[] toByteArray();
public abstract boolean read(InputStream in);
public ServiceLinkAddress getAddress() {
return address;
@ -82,7 +84,6 @@ public abstract class ServiceLinkTelegram {
return value;
}
public void setValue(Object value) {
log(DEBUGDETAIL,"setValue( %s )",value);
this.value = value;
}
@ -93,5 +94,13 @@ public abstract class ServiceLinkTelegram {
public long getTimestamp() {
return timestamp;
}
@Override
public String toString() {
return String.format("%s %s %s",
getAddress().toString(),
((opcode & REQ_WRITE)==REQ_WRITE) ? "=" : "?",
value);
}
}

View File

@ -18,21 +18,16 @@ import static org.hwo.logging.LogLevel.*;
public class ServiceLinkV1Telegram extends ServiceLinkTelegram {
byte buffer[];
public ServiceLinkV1Telegram(int operation,int achse,int knoten,int register,Object value){
super(operation,achse,knoten,register,value);
}
public ServiceLinkV1Telegram(InputStream in) {
byte[] bytes = readTelegramBytes(in);
try {
fromBytes(bytes);
} catch (CRCFailedException e){
log(e);
throw new RuntimeException(e);
}
public ServiceLinkV1Telegram(){
}
protected byte[] readTelegramBytes(InputStream in){
public boolean read(InputStream in){
int l,
sync,
opcode;
@ -46,7 +41,7 @@ public class ServiceLinkV1Telegram extends ServiceLinkTelegram {
ch = in.read();
if (ch != SL_MAGIC){
if (ch == -1){
return null;
return false;
}
log(DEBUGDETAIL,"readTelegramBytes(): junk byte in the line: %d [ 0x%02x ]",ch,ch);
}
@ -63,12 +58,18 @@ public class ServiceLinkV1Telegram extends ServiceLinkTelegram {
l = in.read(rxbuffer,2,l);
log(DEBUGDETAIL,"RX BYTES: %2d: %s",rxbuffer.length,ByteArrayHexlifier.byteArrayToString(rxbuffer));
return rxbuffer;
try {
fromBytes(rxbuffer);
return true;
} catch (CRCFailedException e){
log(e);
throw new RuntimeException(e);
}
} catch (IOException e) {
log(e);
}
return null;
return false;
}
protected void fromBytes(byte[] buffer) throws CRCFailedException {

View File

@ -25,19 +25,31 @@ public class ServiceLinkV2Telegram extends ServiceLinkTelegram {
static int REQ_FLOAT = 0x20;
static int REQ_ACK = 0x40;
byte[] buffer = new byte[8];
int bufferUsed = 0;
public ServiceLinkV2Telegram(int operation,int achse,int knoten,int register,Object value){
super(operation,achse,knoten,register,value);
}
public ServiceLinkV2Telegram(InputStream in){
byte[] buffer = new byte[8];
public ServiceLinkV2Telegram(){
}
public boolean read(InputStream in){
try {
in.read(buffer, 0, 8);
int n = in.read(buffer, bufferUsed, 8 - bufferUsed);
if (n > 0){
bufferUsed += n;
}
} catch (IOException e) {
log(e);
throw new RuntimeException(e);
}
if (bufferUsed != 8)
return false;
log(DEBUGDETAIL,"RX BYTES: %s",ByteArrayHexlifier.byteArrayToString(buffer));
setAddress(new ServiceLinkAddress((buffer[1]>>4) & 0x0F, buffer[1] & 0x0F, (((int)buffer[2])&0xff) | (((int)buffer[3])<<8)&0xFF00 ));
@ -62,6 +74,8 @@ public class ServiceLinkV2Telegram extends ServiceLinkTelegram {
} else {
setValue(null);
}
return true;
}

View File

@ -72,7 +72,7 @@ public class ServiceRegisterCache {
public void setValue(Object value){
this.value = value;
this.lastReadTime = System.currentTimeMillis();
log(DEBUGDETAIL,"cached value update: [%02d:%02d:%04d] = %s", ax,node,register,value);
log(DEBUGDETAIL,"CACHE: SLA: [%d:%d:%d] = %s", ax,node,register,value);
}
public void invalidate(){