Commit 3ee1dac3 authored by Niklas Jonsson's avatar Niklas Jonsson
Browse files

Updated TODO and added some comments

parent 62d2276b
......@@ -4,14 +4,14 @@ Implement auto restrict functionality
Implement unrestrict
- Add method in Channel that unrestricts it
Create method restrict in channel and let it call the appropriate method in Connection
- Currently, you need to call a method in Connection, passing a channel to it.
- Currently you need to call a method in Connection, passing a channel to it.
A cleaner way to do this would be to cal a method in Channel which then passes
itself to the method in Connection, possibly doing something before and after,
e.g. state change.
More comments
Write unit tests
- JUnit tests for all non threading classes
- Tests for thread-related, as far as possible
- Tests for thread-related classes, as far as possible
- Tests for interaction between classes
Create manual (preferably automatic) test for java to c version compatibility
- I.e. starting up pong or ping c in and the corresponding in java
......@@ -19,7 +19,15 @@ Implement TCP support
- Finish the already started TCP implementation. Might have to change
overall program structure, or provide ResendQueue stub, since this is
not needed with TCP.
Integrate with client-server solution for camera
Implement package specific timers in the resend queue
Implement max number of retries for a package in resend queue
Implement better data seqno handling
- Right now the application depends on the seqno of the first important data
packet to be Connection.SEQNO_START, instead it should set the seqno of the
very first package reveived as the remote seqno. This could be done by e.g.
a first flag in channel, indicating whether or not we have received any
packages before.
Integrate with client-server solution for axis camera
- Use firefly java in the camera application and cross-compile it
with J2C.
......@@ -38,4 +46,4 @@ Remove ActionQueue
and could be removed. In the c version an event executor thread is
needed to limit the need platform specific synchronization in most
of the application. Since java provides platform agnostic synchronization,
this could be used to protect affected variables/data.
this could instead be used to protect affected variables/data.
......@@ -13,9 +13,10 @@ import se.lth.cs.firefly.util.Debug;
import se.lth.cs.firefly.util.ResendQueue;
import se.lth.cs.firefly.util.ActionQueue.Priority;
/*
/**
* Handles channel state and encoder/decoder access
*
* It also has a queue for important packets so that
* only one packet can be simultaneously tracked.
*/
public class Channel {
public static int CHANNEL_ID_NOT_SET = -1;
......@@ -160,6 +161,7 @@ public class Channel {
boolean important = ackOnData || data[0] == LabComm.SAMPLE
|| data[0] == LabComm.TYPEDEF;
if (important) {
// TODO Possible bug here, does this actually check whether we are currently tracking a packet?
if (importantQueue.isEmpty()) {
sendImportant(data);
} else {
......@@ -176,7 +178,9 @@ public class Channel {
currentSeqno = conn.sendDataSample(toSend, true, this);
}
}
/**
* Helper class to handle important flagging etc.
*/
public class ChannelWriter implements LabCommWriter {
@Override
......
......@@ -20,9 +20,7 @@ class ChannelDecoder extends LabCommDecoderChannel {
}
public void decodeData(byte[] data) throws Exception {
Debug.log("Channel decoder appending");
is.append(data);
Debug.log("Channel decoder appending done");
try {
runOne();
} catch (EOFException e) {
......@@ -33,6 +31,5 @@ class ChannelDecoder extends LabCommDecoderChannel {
// TODO Fix this, possibly add thread in channel as well or let
// Connection.Reader do everything.
}
Debug.log("Run one done");
}
}
......@@ -4,7 +4,7 @@ import se.lth.control.labcomm.*;
import java.io.*;
/*
/**
* Only exists to limit the constructors of the superclass and to provide consistency in decoder/encoder pairs.
*/
class ChannelEncoder extends LabCommEncoderChannel {
......
......@@ -504,11 +504,8 @@ public class Connection implements ack.Handler, channel_ack.Handler,
+ ds.seqno);
ack.encode(bottomEncoder, dataAck);
}
Debug.log("Imp: " + (ds.important) +" exp: " + expected_seqno + " act: " + ds.seqno );
if (!ds.important || expected_seqno == ds.seqno) {
Debug.log("Entered if with " + (!ds.important) + " " + (expected_seqno == ds.seqno));
chan.getDecoder().decodeData(ds.app_enc_data);
Debug.log("test");
if (ds.important) {
Debug.log("Data sample is important, seqno: " + ds.seqno);
chan.setRemoteSeqno(ds.seqno);
......
......@@ -5,100 +5,117 @@ import se.lth.control.labcomm.*;
import java.io.*;
public class ConnectionDecoder extends LabCommDecoderChannel {
private LabCommDecoderRegistry registry; //Since the superclass registry variable is private, we cannot access it and we have to create our own.
private LabCommDecoderRegistry registry; // Since the superclass registry
// variable is private, we
// cannot access it and we have
// to create our own.
public ConnectionDecoder(InputStream inputStream) throws IOException {
super(inputStream);
registry = new LabCommDecoderRegistry();
}
public void shortCircuit(int index, LabCommDispatcher d) throws IOException {
registry.add(index, d.getName(), d.getSignature());
}
/**
* A direct copy of the corresponding super class method from the labcomm-core repo at 2014-07-25. If it has been updated since then,
* this has to be updated as well. This copy was necessary for this method to access the subclass registry variable instead of
* the superclass version of it. This in turn was necessary for the short circuit of type registration to work.
*/
* A direct copy of the corresponding super class method from the
* labcomm-core repo at 2014-07-25. If it has been updated since then, this
* has to be updated as well. This copy was necessary for this method to
* access the subclass registry variable instead of the superclass version
* of it. This in turn was necessary for the short circuit of type
* registration to work. TODO Possibly fork labcomm and create pull request
* with needed changes, which could also include adding methods to the
* LabCommSample interface.
*/
@Override
public void register(LabCommDispatcher dispatcher,
LabCommHandler handler) throws IOException {
registry.add(dispatcher, handler);
}
public void register(LabCommDispatcher dispatcher, LabCommHandler handler)
throws IOException {
registry.add(dispatcher, handler);
}
/**
* Direct copy of the corresponding super class method, see above.
*
*/
* Direct copy of the corresponding super class method, see above.
*
*/
@Override
public void runOne() throws Exception {
boolean done = false;
while (!done) {
int tag = decodePacked32();
switch (tag) {
int tag = decodePacked32();
switch (tag) {
case LabComm.TYPEDEF:
case LabComm.SAMPLE: {
int index = decodePacked32();
String name = decodeString();
ByteArrayOutputStream signature = new ByteArrayOutputStream();
collectFlatSignature(new LabCommEncoderChannel(signature, false));
registry.add(index, name, signature.toByteArray());
} break;
int index = decodePacked32();
String name = decodeString();
ByteArrayOutputStream signature = new ByteArrayOutputStream();
collectFlatSignature(new LabCommEncoderChannel(signature, false));
registry.add(index, name, signature.toByteArray());
}
break;
default: {
LabCommDecoderRegistry.Entry e = registry.get(tag);
if (e == null) {
throw new IOException("Unhandled tag " + tag);
}
LabCommDispatcher d = e.getDispatcher();
if (d == null) {
throw new IOException("No dispatcher for '" + e.getName() + "'");
}
LabCommHandler h = e.getHandler();
if (h == null) {
throw new IOException("No handler for '" + e.getName() +"'");
}
d.decodeAndHandle(this, h);
done = true;
LabCommDecoderRegistry.Entry e = registry.get(tag);
if (e == null) {
throw new IOException("Unhandled tag " + tag);
}
LabCommDispatcher d = e.getDispatcher();
if (d == null) {
throw new IOException("No dispatcher for '" + e.getName()
+ "'");
}
LabCommHandler h = e.getHandler();
if (h == null) {
throw new IOException("No handler for '" + e.getName()
+ "'");
}
d.decodeAndHandle(this, h);
done = true;
}
}
}
}
}
}
/**
* Same as above.
*/
* Same as above.
*/
private void collectFlatSignature(LabCommEncoder out) throws IOException {
int type = decodePacked32();
out.encodePacked32(type);
switch (type) {
case LabComm.ARRAY: {
int dimensions = decodePacked32();
out.encodePacked32(dimensions);
for (int i = 0 ; i < dimensions ; i++) {
out.encodePacked32(decodePacked32());
case LabComm.ARRAY: {
int dimensions = decodePacked32();
out.encodePacked32(dimensions);
for (int i = 0; i < dimensions; i++) {
out.encodePacked32(decodePacked32());
}
collectFlatSignature(out);
}
break;
case LabComm.STRUCT: {
int fields = decodePacked32();
out.encodePacked32(fields);
for (int i = 0; i < fields; i++) {
out.encodeString(decodeString());
collectFlatSignature(out);
}
}
collectFlatSignature(out);
} break;
case LabComm.STRUCT: {
int fields = decodePacked32();
out.encodePacked32(fields);
for (int i = 0 ; i < fields ; i++) {
out.encodeString(decodeString());
collectFlatSignature(out);
break;
case LabComm.BOOLEAN:
case LabComm.BYTE:
case LabComm.SHORT:
case LabComm.INT:
case LabComm.LONG:
case LabComm.FLOAT:
case LabComm.DOUBLE:
case LabComm.STRING: {
}
break;
default: {
throw new IOException("Unimplemented type=" + type);
}
} break;
case LabComm.BOOLEAN:
case LabComm.BYTE:
case LabComm.SHORT:
case LabComm.INT:
case LabComm.LONG:
case LabComm.FLOAT:
case LabComm.DOUBLE:
case LabComm.STRING: {
} break;
default: {
throw new IOException("Unimplemented type=" + type);
}
}
out.end(null);
}
}
}
......@@ -54,10 +54,9 @@ public abstract class LinkLayerPort {
InetAddress remoteAddress, int remotePort) throws IOException;
/**
* Listening for new connections, is called be listener thread. Also handles
* packet to connection multiplexing if its not done by the udnerlying
* packet to connection multiplexing if its not done by the underlying
* protocol.
*
*
*
*/
protected abstract void listen() throws IOException;
......
......@@ -3,7 +3,11 @@ import java.io.*;
import java.net.*;
import se.lth.control.labcomm.LabCommWriter;
/**
* This interface abstracts the protocol specific stuff from Connection and Channel.
* It is essentially a wrapper for the network protocol.
*
*/
public interface TransportLayerAbstraction {
public InputStream getInputStream() throws IOException ;
......
......@@ -9,6 +9,11 @@ import se.lth.cs.firefly.protocol.*;
import se.lth.cs.firefly.util.ActionQueue;
import se.lth.cs.firefly.util.Debug;
/**
* Multiplexes TCP connection, since TCP is a connection protocol
* this class only receives connection or opens them. It then wraps
* the socket in an TransportLayerAbstraction and sends it to Connection.
*/
public class TCPConnectionMultiplexer extends LinkLayerPort {
private ServerSocket ssock;
......
......@@ -6,6 +6,9 @@ import java.net.*;
import se.lth.control.labcomm.*;
import se.lth.cs.firefly.protocol.TransportLayerAbstraction;
/**
* Wrapper for Socket, to make Connection protocol independent
*/
public class TCPLayer implements TransportLayerAbstraction {
private Socket socket;
......
......@@ -7,6 +7,13 @@ import java.util.HashMap;
import se.lth.cs.firefly.protocol.*;
import se.lth.cs.firefly.util.*;
/**
* Multiplexes UDP connections, since UDP is a connectionless protocol this
* class receives connection or opens them but also multiplexes incoming packets
* to the appropriate channel. This is done by writing to a stream that
* Connection reads from.
*
*/
public class UDPConnectionMultiplexer extends LinkLayerPort {
private DatagramSocket dsock;
private static final int UDP_BUFFER_SIZE = 64000;
......@@ -68,7 +75,7 @@ public class UDPConnectionMultiplexer extends LinkLayerPort {
remotePort)) {
Debug.log("New Connection created");
// null for version bypass
stream = new BlockingAppendableInputStream();
stream = new BlockingAppendableInputStream();
addStream(sa, stream);
Connection conn = new Connection(new UDPLayer(
dsock, remoteAddress, remotePort,
......
......@@ -5,6 +5,11 @@ import se.lth.control.labcomm.*;
import java.io.*;
import java.util.*;
/**
* Provides a stream for the difference labcomm decoders to read
* Provides LabComm version check bypass with a helper class.
* No locking on emprty stream, insteead
*/
public class AppendableInputStream extends InputStream{
protected ByteArrayInputStream is;
protected byte[] current;
......
......@@ -2,11 +2,16 @@ package se.lth.cs.firefly.util;
import java.io.*;
/**
* Enhances the functionality of AppendableInputStream by providing blocking on
* empty stream.
*/
public class BlockingAppendableInputStream extends AppendableInputStream {
public BlockingAppendableInputStream(byte[] data) {
super(data);
}
public BlockingAppendableInputStream() {
super();
}
......@@ -18,14 +23,14 @@ public class BlockingAppendableInputStream extends AppendableInputStream {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted while waiting for bytes in stream");
throw new InterruptedIOException(
"Interrupted while waiting for bytes in stream");
}
}
int a = super.read();
Debug.log("Bytesleft: " + bytesLeft +" just read " +a);
Debug.log("Bytesleft: " + bytesLeft + " just read " + a);
return a;
}
......
......@@ -11,7 +11,7 @@ import java.net.*;
* Responsible for the resending of packets. It consists of two queues, one for data, i.e. user defined types, and one for channel messages. These queues
* are synchronized on the same lock. It also houses the actual resender thread as a private class. The queues are fifo queues.
* It is meant to be specific to each connection.
*
* TODO This is not needed for e.g. tcp. Make stub for protocols handling resending by themselves?
*/
public class ResendQueue {
private LinkedHashMap<Integer, LabCommSample> channelQueue;
......
package se.lth.cs.firefly.util;
/**
* Possibly to be as parameter for open auto restrict method in connection.
*/
public interface UserType {
/**
*
* @return the number of types that should be restricted
*/
public int getNbrOfTypes();
/**
* Registers all the handlers,encoders and decoders.
*/
public void registerAll();
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment