Commit 9c925cde authored by Tommy Olofsson's avatar Tommy Olofsson
Browse files

udp_vx *linking*.

parent c6b919a7
build/
*build/
doc/gen
lib/
gen/
......@@ -10,3 +10,6 @@ tags
data.enc
sig.enc
.*.swp
*~
\#*
.\#*
#ifndef FIREFLY_EVENT_QUEUE_VX_H
#define FIREFLY_EVENT_QUEUE_VX_H
#include <utils/firefly_event_queue.h>
/**
* @brief Construct a new struct firefly_event_queue with with a context
* specific for this utility.
*
* @param pool_size The number of preallocated events.
* @return The newly contructed event queue.
*/
struct firefly_event_queue *firefly_event_queue_vx_new(size_t pool_size);
/**
* @brief Free the specified event queue and the posix specific context. Stop
* the event loop if it is running.
*
* @param eq The event queue to free
*/
void firefly_event_queue_vx_free(struct firefly_event_queue **eq);
/**
* @brief Start the event loop.
*
* @param eq The event queue to loop execute events from. It must have been
* constructed with firefly_event_queue_vx_new().
* @param attr The attributes to use when starting the event loop posix thread.
* If NULL the default is used.
* @return Integer indicating result.
* @retval 0 on success.
* @retval <0 on failure.
*/
int firefly_event_queue_vx_run(struct firefly_event_queue *eq);
/**
* @brief Stop the event loop. Will block untill the event loop is stopped.
*
* @param eq The event queue of the event loop to stop. It must have been
* constructed with firefly_event_queue_vx_new().
* @return Integer indicating result.
* @retval 0 on success.
* @retval <0 on failure.
*/
int firefly_event_queue_vx_stop(struct firefly_event_queue *eq);
#endif
......@@ -22,7 +22,8 @@ set_source_files_properties(${Firefly_PROJECT_DIR}/gen/firefly_protocol.c PROPER
add_library(firefly ${firefly_src_files})
# If not cross compiling, add firefly with error lib target:
if (NOT CMAKE_CROSSCOMPILING)
#if (NOT CMAKE_CROSSCOMPILING)
if (NOT ARM_COMPILING)
add_library(firefly-werr
${firefly_src_files}
${Firefly_SOURCE_DIR}/utils/firefly_errors.c
......@@ -32,7 +33,8 @@ if (NOT CMAKE_CROSSCOMPILING)
set(firefly_install_werr
firefly-werr
)
endif (NOT CMAKE_CROSSCOMPILING)
#endif (NOT CMAKE_CROSSCOMPILING)
endif (NOT ARM_COMPILING)
# Add libs to install target:
install(TARGETS ${firefly_install_werr} firefly
......
......@@ -39,7 +39,7 @@ if(NOT CMAKE_CROSSCOMPILING OR VXWORKS_COMPILING)
)
endif (XENOMAI_FOUND AND RTNET_FOUND)
# if (NOT VXWORKS_COMPILING)
if (NOT VXWORKS_COMPILING)
# UDP POSIX
add_library(transport-udp-posix
${Firefly_SOURCE_DIR}/transport/firefly_transport.c
......@@ -52,7 +52,22 @@ if(NOT CMAKE_CROSSCOMPILING OR VXWORKS_COMPILING)
${transport_install_libs}
transport-udp-posix
)
# endif (NOT VXWORKS_COMPILING)
endif (NOT VXWORKS_COMPILING)
if (VXWORKS_COMPILING)
# UDP VX
add_library(transport-udp-vx
${Firefly_SOURCE_DIR}/transport/firefly_transport.c
${Firefly_SOURCE_DIR}/transport/firefly_transport_udp_posix.c
${Firefly_SOURCE_DIR}/utils/firefly_resend_vx.c
${Firefly_SOURCE_DIR}/utils/firefly_event_queue_vx.c
)
target_link_libraries(transport-udp-vx gen-files)
set(transport_install_libs
${transport_install_libs}
transport-udp-vx
)
endif (VXWORKS_COMPILING)
if (NOT VXWORKS_COMPILING)
# TCP POSIX
......
......@@ -18,12 +18,19 @@
#include <sys/ioctl.h>
#ifdef LABCOMM_COMPAT
#include <selectLib.h>
#include <sockLib.h>
#include <inetLib.h>
#include <ioLib.h>
#include <taskLib.h>
#include <utils/firefly_resend_vx.h>
#else
#include <sys/select.h>
#include <utils/firefly_resend_posix.h>
#endif
#include <signal.h>
......@@ -32,7 +39,6 @@
#include <utils/firefly_errors.h>
#include <utils/firefly_event_queue.h>
#include <utils/firefly_resend_posix.h>
#include "utils/firefly_event_queue_private.h"
#include "protocol/firefly_protocol_private.h"
#include "transport/firefly_transport_private.h"
......@@ -303,24 +309,47 @@ int firefly_transport_udp_posix_run(struct firefly_transport_llp *llp)
struct firefly_resend_loop_args *largs;
llp_udp = llp->llp_platspec;
res = pthread_create(&llp_udp->read_thread, NULL,
firefly_transport_udp_posix_read_run, llp);
if (res < 0)
return res;
largs = malloc(sizeof(*largs));
if (!largs) {
pthread_cancel(llp_udp->read_thread);
if (!largs)
return -1;
}
largs->rq = llp_udp->resend_queue;
largs->on_no_ack = resend_on_no_ack;
/* TODO: Clean this up. */
#ifndef LABCOMM_COMPAT
res = pthread_create(&llp_udp->read_thread, NULL,
firefly_transport_udp_posix_read_run, llp);
if (res < 0)
goto fail;
res = pthread_create(&llp_udp->resend_thread, NULL,
firefly_resend_run, largs);
if (res < 0) {
pthread_cancel(llp_udp->read_thread);
return res;
}
if (res < 0)
goto ptfail;
return 0;
ptfail:
pthread_cancel(llp_udp->read_thread);
goto fail;
#else
res = taskSpawn("ff_read_thread", 254, 0, 20000,
firefly_transport_udp_posix_read_run,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0); /* TODO: arg */
if (res == ERROR)
goto fail;
llp_udp->tid_read = res;
res = taskSpawn("ff_resend_thread", 254, 0, 20000,
firefly_resend_run,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0); /* TODO: arg */
if (res == ERROR)
goto vxfail;
llp_udp->tid_resend = res;
return 0;
vxfail:
taskDelete(llp_udp->tid_read);
goto fail;
#endif
fail:
free(largs);
return res;
}
int firefly_transport_udp_posix_stop(struct firefly_transport_llp *llp)
......@@ -328,10 +357,16 @@ int firefly_transport_udp_posix_stop(struct firefly_transport_llp *llp)
struct transport_llp_udp_posix *llp_udp;
llp_udp = llp->llp_platspec;
#ifndef LABCOMM_COMPAT
pthread_cancel(llp_udp->read_thread);
pthread_cancel(llp_udp->resend_thread);
pthread_join(llp_udp->resend_thread, NULL);
pthread_join(llp_udp->read_thread, NULL);
#else
taskDelete(llp_udp->tid_read);
taskDelete(llp_udp->tid_resend);
#endif
return 0;
}
......
......@@ -10,7 +10,11 @@
#include <signal.h>
#include <utils/firefly_event_queue.h>
#ifndef LABCOMM_COMPAT
#include <utils/firefly_resend_posix.h>
#else
#include <utils/firefly_resend_vx.h>
#endif
#include "transport/firefly_transport_private.h"
......@@ -26,9 +30,14 @@ struct transport_llp_udp_posix {
events on. */
struct resend_queue *resend_queue; /**< The resend queue managing important
packets. */
#ifndef LABCOMM_COMPAT
pthread_t read_thread; /**< The handle to the thread running the read loop. */
pthread_t resend_thread; /**< The handle to the thread running the resend
loop. */
#else
int tid_read;
int tid_resend;
#endif
};
/**
......
#define _VX_C_SOURCE (200112L)
#include <semLib.h>
#include <taskLib.h>
#include <stdio.h>
#include <utils/firefly_event_queue_vx.h>
#include <utils/firefly_event_queue.h>
struct firefly_event_queue_vx_context {
SEM_ID lock;
SEM_ID signal;
int tid_event_loop;
bool event_loop_stop;
};
int64_t firefly_event_queue_vx_add(struct firefly_event_queue *eq,
unsigned char prio, firefly_event_execute_f execute, void *context,
unsigned int nbr_deps, const int64_t *deps);
struct firefly_event_queue *firefly_event_queue_vx_new(size_t pool_size)
{
struct firefly_event_queue_vx_context *ctx;
struct firefly_event_queue *eq = NULL;
if ((ctx = malloc(sizeof(*ctx)))) {
ctx->lock = semMCreate(SEM_INVERSION_SAFE);
ctx->signal = semCCreate(0, 0);
if (!ctx->lock || !ctx->signal)
printf("fail to create sem\n");
ctx->event_loop_stop = 0;
eq = firefly_event_queue_new(firefly_event_queue_vx_add, pool_size, ctx);
if (!eq) {
semDelete(ctx->lock);
semDelete(ctx->signal);
free(ctx);
}
}
return eq;
}
void firefly_event_queue_vx_free(struct firefly_event_queue **eq)
{
struct firefly_event_queue_vx_context *ctx;
ctx = firefly_event_queue_get_context(*eq);
/* firefly_event_queue_vx_stop(*eq); */
semDelete(ctx->lock);
semDelete(ctx->signal);
free(ctx);
firefly_event_queue_free(eq);
}
int64_t firefly_event_queue_vx_add(struct firefly_event_queue *eq,
unsigned char prio, firefly_event_execute_f execute, void *context,
unsigned int nbr_deps, const int64_t *deps)
{
int res = 0;
struct firefly_event_queue_vx_context *ctx;
ctx = firefly_event_queue_get_context(eq);
semTake(ctx->lock, WAIT_FOREVER);
res = firefly_event_add(eq, prio, execute, context, nbr_deps, deps);
if (res > 0) {
semGive(ctx->signal);
}
semGive(ctx->lock);
return res;
}
void *firefly_event_vx_thread_main(void *args)
{
struct firefly_event_queue *eq;
struct firefly_event_queue_vx_context *ctx;
struct firefly_event *ev = NULL;
int event_left;
bool finish;
eq = args;
ctx = firefly_event_queue_get_context(eq);
semTake(ctx->lock, WAIT_FOREVER);
event_left = firefly_event_queue_length(eq);
finish = ctx->event_loop_stop;
semGive(ctx->lock);
while (!finish || event_left > 0) {
semTake(ctx->lock, WAIT_FOREVER);
event_left = firefly_event_queue_length(eq);
finish = ctx->event_loop_stop;
while (event_left <= 0 && !finish) {
semTake(ctx->signal, WAIT_FOREVER);
finish = ctx->event_loop_stop;
event_left = firefly_event_queue_length(eq);
}
ev = firefly_event_pop(eq);
semGive(ctx->lock);
if (ev) {
firefly_event_execute(ev);
semTake(ctx->lock, WAIT_FOREVER);
firefly_event_return(eq, &ev);
semGive(ctx->lock);
}
}
return NULL;
}
int firefly_event_queue_vx_run(struct firefly_event_queue *eq)
{
int res;
struct firefly_event_queue_vx_context *ctx;
ctx = firefly_event_queue_get_context(eq);
res = taskSpawn("ff_event_task", 254, 0, 20000,
firefly_event_vx_thread_main,
(int) eq, 0, 0, 0, 0, 0, 0, 0, 0, 0);
ctx->tid_event_loop = res;
return 0; /* TODO: Do properly. */
}
int firefly_event_queue_vx_stop(struct firefly_event_queue *eq)
{
struct firefly_event_queue_vx_context *ctx;
ctx = firefly_event_queue_get_context(eq);
semTake(ctx->lock, WAIT_FOREVER);
ctx->event_loop_stop = true;
semGive(ctx->signal);
semGive(ctx->lock);
taskDelete(ctx->tid_event_loop);
return 0;
}
#define _POSIX_C_SOURCE (200112L) /* TODO: Why? */
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <semLib.h>
#include <taskLib.h>
#include <sysLib.h>
#include <protocol/firefly_protocol.h>
#include "utils/firefly_resend_vx.h"
#include "protocol/firefly_protocol_private.h"
struct resend_queue *firefly_resend_queue_new()
{
struct resend_queue *rq;
rq = malloc(sizeof(*rq));
if (rq) {
rq->next_id = 1;
rq->first = NULL;
rq->last = NULL;
rq->lock = semMCreate(SEM_Q_PRIORITY | SEM_INVERSION_SAFE);
rq->sig = semCCreate(0, 0);
}
return rq;
}
void firefly_resend_queue_free(struct resend_queue *rq)
{
struct resend_elem *re;
re = rq->first;
while (re) {
struct resend_elem *tmp;
tmp = re->prev;
firefly_resend_elem_free(re);
re = tmp;
}
semDelete(rq->lock);
semDelete(rq->sig);
free(rq);
}
static inline void timespec_add_ms(struct timespec *t, long d)
{
long long tmp;
tmp = t->tv_nsec + d * 1000000LL;
if (tmp >= 1000000000) {
t->tv_sec += tmp / 1000000000;
tmp %= 1000000000;
}
t->tv_nsec = tmp;
}
unsigned char firefly_resend_add(struct resend_queue *rq,
unsigned char *data, size_t size, long timeout_ms,
unsigned char retries, struct firefly_connection *conn)
{
struct resend_elem *re;
re = malloc(sizeof(*re));
if (!re)
return 0;
re->data = data;
re->size = size;
clock_gettime(CLOCK_REALTIME, &re->resend_at);
timespec_add_ms(&re->resend_at, timeout_ms);
re->num_retries = retries;
re->conn = conn;
re->timeout = timeout_ms;
re->prev = NULL;
semTake(rq->lock, WAIT_FOREVER);
re->id = rq->next_id++;
if (rq->next_id == 0) {
rq->next_id = 1;
}
if (rq->last == NULL) {
rq->first = re;
} else {
rq->last->prev = re;
}
rq->last = re;
semGive(rq->lock);
semGive(rq->sig);
return re->id;
}
static inline struct resend_elem *firefly_resend_pop(
struct resend_queue *rq, unsigned char id)
{
struct resend_elem *re;
re = rq->first;
if (!re)
return NULL;
// If first elem remove it and return it
if (re->id == id) {
rq->first = re->prev;
if (rq->last == re)
rq->last = NULL;
return re;
}
while (re->prev) {
if (re->prev->id == id) {
struct resend_elem *tmp;
tmp = re->prev;
re->prev = re->prev->prev;
if (rq->last == tmp) {
rq->last = re;
}
return tmp;
} else {
re = re->prev;
}
}
return NULL;
}
void firefly_resend_readd(struct resend_queue *rq, unsigned char id)
{
struct resend_elem *re;
semTake(rq->lock, WAIT_FOREVER);
re = firefly_resend_pop(rq, id);
semGive(rq->lock);
if (!re)
return;
re->num_retries--;
timespec_add_ms(&re->resend_at, re->timeout);
re->prev = NULL;
semTake(rq->lock, WAIT_FOREVER);
if (rq->last == NULL) {
rq->first = re;
} else {
rq->last->prev = re;
}
rq->last = re;
semGive(rq->lock);
semGive(rq->sig);
}
void firefly_resend_remove(struct resend_queue *rq, unsigned char id)
{
struct resend_elem *re;
semTake(rq->lock, WAIT_FOREVER);
re = firefly_resend_pop(rq, id);
if (re != NULL)
firefly_resend_elem_free(re);
semGive(rq->lock);
semGive(rq->sig);
}
void firefly_resend_elem_free(struct resend_elem *re)
{
free(re->data);
free(re);
}
/* TODO: Looks wrong... */
struct resend_elem *firefly_resend_top(struct resend_queue *rq)
{
struct resend_elem *re = NULL;
semTake(rq->lock, WAIT_FOREVER);
re = rq->first;
semGive(rq->lock);
return re;
}
static inline bool timespec_past(struct timespec *fixed, struct timespec *var)
{
return fixed->tv_sec == var->tv_sec ?
var->tv_nsec <= fixed->tv_nsec : var->tv_sec < fixed->tv_sec;
}
int firefly_resend_wait(struct resend_queue *rq,
unsigned char **data, size_t *size,
struct firefly_connection **conn,
unsigned char *id)
{
int result;
struct resend_elem *res = NULL;
struct timespec now;
semTake(rq->lock, WAIT_FOREVER);
clock_gettime(CLOCK_REALTIME, &now);
res = rq->first;
while (!res || !timespec_past(&now, &res->resend_at)) {
if (!res) {
semTake(rq->sig, WAIT_FOREVER);
} else {
struct timespec at;
long timeout;
at = res->resend_at;
timeout = ((at.tv_sec - now.tv_sec) * 1000000 +
(at.tv_nsec - now.tv_nsec) / 1000) / sysClkRateGet();
semTake(rq->sig, timeout);
}
clock_gettime(CLOCK_REALTIME, &now);
res = rq->first;
}
*conn = res->conn;
if (res->num_retries <= 0) {
firefly_resend_pop(rq, res->id);
firefly_resend_elem_free(res);
*data = NULL;
*id = 0;
*size = 0;
result = -1;
} else {
*data = malloc(res->size);
memcpy(*data, res->data, res->size);
*size = res->size;
*id = res->id;
result = 0;