Commit 441dcc53 authored by Tommy Olofsson's avatar Tommy Olofsson
Browse files

Pingpong works with new labcomm and auto restr.

parent fdd9817f
......@@ -116,7 +116,7 @@ int firefly_channel_open_auto_restrict_event(void *event_arg)
chan_req.source_chan_id = chan->local_id;
chan_req.dest_chan_id = chan->remote_id;
chan_req.auto_restrict = true;
firefly_channel_set_types(chan, types);
chan->types = types;
labcomm_encoder_ioctl(conn->transport_encoder,
FIREFLY_LABCOMM_IOCTL_TRANS_SET_IMPORTANT_ID,
&chan->important_id);
......@@ -128,7 +128,7 @@ int firefly_channel_open_auto_restrict_event(void *event_arg)
return 0;
}
void firefly_channel_open_auto_restrict(struct firefly_connection *conn,
void firefly_channel_open_auto_restrict( struct firefly_connection *conn,
struct firefly_channel_types types)
{
int64_t ret;
......@@ -356,12 +356,13 @@ int handle_channel_response_event(void *event_arg)
if (chan == NULL) {
firefly_unknown_dest(fecrr->conn, fecrr->chan_res.source_chan_id,
fecrr->chan_res.dest_chan_id, "channel_response");
fecrr->chan_res.dest_chan_id, "channel_response");
} else if (fecrr->chan_res.ack) {
if (chan->remote_id == CHANNEL_ID_NOT_SET) {
chan->remote_id = fecrr->chan_res.source_chan_id;
firefly_channel_ack(chan);
firefly_channel_internal_opened(chan);
firefly_channel_set_types(chan, chan->types);
}
firefly_channel_send_channel_ack(fecrr->conn, chan,
fecrr->chan_res.source_chan_id);
......@@ -480,9 +481,11 @@ int handle_data_sample_event(void *event_arg)
struct firefly_channel *chan;
fers = event_arg;
chan = find_channel_by_local_id(fers->conn, fers->data.dest_chan_id);
if (chan != NULL) {
/* FIXME: Undefined wrapping behaviour. */
int expected_seqno = chan->remote_seqno + 1;
if (expected_seqno <= 0) {
......@@ -873,22 +876,5 @@ void firefly_channel_types_add_encoder_type(
void firefly_channel_set_types(struct firefly_channel *chan,
struct firefly_channel_types types)
{
while (types.decoder_types) {
struct firefly_channel_decoder_type *f;
f = types.decoder_types;
f->register_func(chan->proto_decoder, f->handler, f->context);
types.decoder_types = f->next;
FIREFLY_FREE(f);
chan->n_decoder_types++;
}
chan->seen_decoder_ids = calloc(chan->n_decoder_types,
sizeof(*chan->seen_decoder_ids));
if (chan->seen_decoder_ids) {
for (size_t i = 0; i < chan->n_decoder_types; i++)
chan->seen_decoder_ids[i] = -1;
} else {
FFL(FIREFLY_ERROR_ALLOC);
}
chan->enc_types = types.encoder_types;
chan->types = types;
}
......@@ -8,10 +8,6 @@
struct firefly_channel *firefly_channel_new(struct firefly_connection *conn)
{
struct firefly_channel *chan;
struct labcomm_decoder *proto_decoder;
struct labcomm_encoder *proto_encoder;
struct labcomm_reader *reader;
struct labcomm_writer *writer;
chan = FIREFLY_MALLOC(sizeof(*chan));
if (!chan) {
......@@ -32,31 +28,8 @@ struct firefly_channel *firefly_channel_new(struct firefly_connection *conn)
chan->enc_types = NULL;
chan->seen_decoder_ids = NULL;
chan->n_decoder_types = 0;
reader = protocol_labcomm_reader_new(conn, conn->lc_memory);
writer = protocol_labcomm_writer_new(chan, conn->lc_memory);
if (!reader || !writer) {
FFL(FIREFLY_ERROR_ALLOC);
protocol_labcomm_reader_free(reader);
protocol_labcomm_writer_free(writer);
FIREFLY_FREE(chan);
return NULL;
}
proto_decoder = labcomm_decoder_new(reader, NULL, conn->lc_memory, NULL);
proto_encoder = labcomm_encoder_new(writer, NULL, conn->lc_memory, NULL);
if (!proto_decoder || !proto_encoder) {
FFL(FIREFLY_ERROR_ALLOC);
if (chan->proto_decoder)
labcomm_decoder_free(chan->proto_decoder);
if (chan->proto_encoder)
labcomm_encoder_free(chan->proto_encoder);
protocol_labcomm_reader_free(reader);
protocol_labcomm_writer_free(writer);
FIREFLY_FREE(chan);
return NULL;
}
chan->proto_decoder = proto_decoder;
chan->proto_encoder = proto_encoder;
chan->proto_decoder = NULL;
chan->proto_encoder = NULL;
// TODO: Fix this once Labcomm re-gets error handling
/* labcomm_register_error_handler_encoder(proto_encoder,*/
......@@ -215,11 +188,69 @@ int firefly_channel_unrestrict_event(void *earg)
void firefly_channel_internal_opened(struct firefly_channel *chan)
{
struct labcomm_decoder *proto_decoder;
struct labcomm_encoder *proto_encoder;
struct labcomm_reader *reader;
struct labcomm_writer *writer;
struct firefly_connection *conn;
struct firefly_channel_types types;
if (chan->state == FIREFLY_CHANNEL_OPEN)
return;
conn = chan->conn;
chan->state = FIREFLY_CHANNEL_OPEN;
reader = protocol_labcomm_reader_new(conn, conn->lc_memory);
writer = protocol_labcomm_writer_new(chan, conn->lc_memory);
if (!reader || !writer) {
FFL(FIREFLY_ERROR_ALLOC);
protocol_labcomm_reader_free(reader);
protocol_labcomm_writer_free(writer);
FIREFLY_FREE(chan);
return; /* FIXME: call ff_err()... */
}
proto_decoder = labcomm_decoder_new(reader, NULL, conn->lc_memory, NULL);
proto_encoder = labcomm_encoder_new(writer, NULL, conn->lc_memory, NULL);
if (!proto_decoder || !proto_encoder) {
FFL(FIREFLY_ERROR_ALLOC);
if (chan->proto_decoder)
labcomm_decoder_free(chan->proto_decoder);
if (chan->proto_encoder)
labcomm_encoder_free(chan->proto_encoder);
protocol_labcomm_reader_free(reader);
protocol_labcomm_writer_free(writer);
FIREFLY_FREE(chan);
return; /* FIXME: call ff_err()... */
}
chan->proto_decoder = proto_decoder;
chan->proto_encoder = proto_encoder;
chan->conn->actions->channel_opened(chan);
/* Reg */
types = chan->types;
while (types.decoder_types) {
struct firefly_channel_decoder_type *f;
f = types.decoder_types;
f->register_func(chan->proto_decoder, f->handler, f->context);
types.decoder_types = f->next;
FIREFLY_FREE(f);
chan->n_decoder_types++;
}
chan->seen_decoder_ids = calloc(chan->n_decoder_types,
sizeof(*chan->seen_decoder_ids));
if (chan->seen_decoder_ids) {
for (size_t i = 0; i < chan->n_decoder_types; i++)
chan->seen_decoder_ids[i] = -1;
} else {
FFL(FIREFLY_ERROR_ALLOC);
}
chan->enc_types = types.encoder_types;
/* /Reg */
if (chan->auto_restrict) {
struct firefly_channel_encoder_type *t;
......
......@@ -54,8 +54,6 @@ struct firefly_connection *firefly_connection_new(
{
struct firefly_connection *conn;
struct labcomm_memory *lc_mem;
struct labcomm_encoder *transport_encoder;
struct labcomm_decoder *transport_decoder;
struct labcomm_reader *reader;
struct labcomm_writer *writer;
......@@ -106,9 +104,11 @@ struct firefly_connection *firefly_connection_new(
init_firefly_protocol__signatures();
transport_decoder = labcomm_decoder_new(reader, NULL, lc_mem, NULL);
transport_encoder = labcomm_encoder_new(writer, NULL, lc_mem, NULL);
conn->transport_decoder = labcomm_decoder_new(reader, NULL, lc_mem, NULL);
conn->transport_encoder = labcomm_encoder_new(writer, NULL, lc_mem, NULL);
/* TODO: Error handling. */
#if 0
if (transport_encoder == NULL || transport_decoder == NULL) {
firefly_error(FIREFLY_ERROR_ALLOC, 3,
"memory allocation failed %s:%d",
......@@ -125,8 +125,7 @@ struct firefly_connection *firefly_connection_new(
FIREFLY_FREE(conn);
return NULL;
}
conn->transport_encoder = transport_encoder;
conn->transport_decoder = transport_decoder;
#endif
labcomm_decoder_register_firefly_protocol_data_sample(conn->transport_decoder,
handle_data_sample, conn);
......
......@@ -408,6 +408,7 @@ struct labcomm_reader *transport_labcomm_reader_new(
FIREFLY_FREE(reader);
FIREFLY_FREE(reader_context);
FIREFLY_FREE(action_context);
reader = NULL;
}
return reader;
......@@ -575,6 +576,8 @@ static struct labcomm_writer *labcomm_writer_new(void *context,
action_context->next = NULL;
result->memory = mem;
result->action_context = action_context;
} else {
result = NULL;
}
return result;
......
......@@ -311,6 +311,7 @@ struct firefly_channel {
struct firefly_channel_encoder_type *enc_types;
size_t n_decoder_types;
int *seen_decoder_ids;
struct firefly_channel_types types; /**< Holds types until after channel handshake. */
};
/**
......
......@@ -27,7 +27,6 @@ enum ping_test_id {
CHAN_RESTRICTED,
DATA_SEND,
DATA_RECEIVE,
CHAN_UNRESTRICTED,
CHAN_CLOSE,
TEST_DONE,
PING_NBR_TESTS
......@@ -39,7 +38,6 @@ static char *ping_test_names[] = {
"Restrict channel (requesting party)",
"Send data",
"Receive data",
"Unrestrict channel (requesting party)",
"Close channel",
"Ping done"
};
......@@ -62,11 +60,26 @@ static struct firefly_event_queue *event_queue;
static pthread_mutex_t ping_done_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t ping_done_signal = PTHREAD_COND_INITIALIZER;
static bool ping_done;
static struct firefly_channel *ping_chan;
void ping_connection_opened(struct firefly_connection *conn)
{
struct firefly_channel_types types = FIREFLY_CHANNEL_TYPES_INITIALIZER;
ping_pass_test(CONNECTION_OPEN);
firefly_channel_open(conn);
firefly_channel_types_add_decoder_type(
&types,
(firefly_labcomm_decoder_register_function)
labcomm_decoder_register_pingpong_data,
(firefly_labcomm_handler_function) ping_handle_pingpong_data,
NULL);
firefly_channel_types_add_encoder_type(
&types,
labcomm_encoder_register_pingpong_data);
firefly_channel_open_auto_restrict(conn, types);
}
/* Restr. state change. */
......@@ -77,9 +90,8 @@ void ping_chan_restr_info(struct firefly_channel *chan,
switch (restr) {
case UNRESTRICTED:
/* Done */
ping_pass_test(CHAN_UNRESTRICTED);
firefly_channel_close(chan);
/* Can not happen anymore... */
warnx("ping unrestrict");
break;
case RESTRICTED:
ping_pass_test(CHAN_RESTRICTED);
......@@ -95,16 +107,8 @@ void ping_chan_restr_info(struct firefly_channel *chan,
void ping_chan_opened(struct firefly_channel *chan)
{
struct labcomm_encoder *enc;
struct labcomm_decoder *dec;
ping_chan = chan;
ping_pass_test(CHAN_OPEN);
enc = firefly_protocol_get_output_stream(chan);
dec = firefly_protocol_get_input_stream(chan);
labcomm_decoder_register_pingpong_data(dec, ping_handle_pingpong_data,
chan);
labcomm_encoder_register_pingpong_data(enc);
firefly_channel_restrict(chan);
}
void ping_chan_closed(struct firefly_channel *chan)
......@@ -146,8 +150,7 @@ void ping_handle_pingpong_data(pingpong_data *data, void *ctx)
if (*data == PONG_DATA)
ping_pass_test(DATA_RECEIVE);
firefly_channel_unrestrict(ctx);
firefly_channel_close(ping_chan);
}
struct firefly_connection_actions ping_actions = {
......
......@@ -24,7 +24,6 @@ enum pong_test_id {
CHAN_RESTRICTED,
DATA_RECEIVE,
DATA_SEND,
CHAN_UNRESTRICTED,
CHAN_CLOSE,
TEST_DONE,
PONG_NBR_TESTS
......@@ -37,7 +36,6 @@ static char *pong_test_names[] = {
"Restricted channel (responding party)",
"Receive data",
"Send data",
"Unrestrict channel (responding party)",
"Close channel",
"Pong done"
};
......@@ -72,13 +70,14 @@ void pong_chan_restr_info(struct firefly_channel *chan,
enum restriction_transition restr)
{
UNUSED_VAR(chan);
switch (restr) {
case UNRESTRICTED:
pong_pass_test(CHAN_UNRESTRICTED);
/* Can not happen anymore... */
warnx("pong unrestrict");
break;
case RESTRICTED: {
warnx("pong restricted. Should no happen "
"this way, pong is passive!");
pong_pass_test(CHAN_RESTRICTED);
break;
}
case RESTRICTION_DENIED:
......@@ -135,14 +134,7 @@ int64_t pong_connection_received(
void pong_chan_opened(struct firefly_channel *chan)
{
struct labcomm_encoder *enc;
struct labcomm_decoder *dec;
enc = firefly_protocol_get_output_stream(chan);
dec = firefly_protocol_get_input_stream(chan);
labcomm_decoder_register_pingpong_data(dec, pong_handle_pingpong_data,
chan);
labcomm_encoder_register_pingpong_data(enc);
UNUSED_VAR(chan);
pong_pass_test(CHAN_OPENED);
}
......@@ -158,7 +150,20 @@ void pong_chan_closed(struct firefly_channel *chan)
bool pong_chan_received(struct firefly_channel *chan)
{
UNUSED_VAR(chan);
struct firefly_channel_types types = FIREFLY_CHANNEL_TYPES_INITIALIZER;
firefly_channel_types_add_decoder_type(
&types,
(firefly_labcomm_decoder_register_function)
labcomm_decoder_register_pingpong_data,
(firefly_labcomm_handler_function) pong_handle_pingpong_data, chan);
firefly_channel_types_add_encoder_type(
&types,
labcomm_encoder_register_pingpong_data);
firefly_channel_set_types(chan, types);
pong_pass_test(CHAN_RECEIVE);
return true;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment