Commit 237d144e authored by Oscar Olsson's avatar Oscar Olsson
Browse files

Merge branch 'tcp_transport'

parents 66ede2a7 f962aeb1
{
<pthread_create_leak>
Memcheck:Leak
fun:calloc
fun:_dl_allocate_tls
fun:pthread_create@@GLIBC_2.2.5
}
{
<pthread_cancel_leak>
Memcheck:Leak
fun:malloc
fun:_dl_map_object
fun:dl_open_worker
fun:_dl_catch_error
fun:_dl_open
fun:do_dlopen
fun:_dl_catch_error
fun:__libc_dlopen_mode
fun:pthread_cancel_init
fun:pthread_cancel
}
{
<pthread_cancel_leak_2>
Memcheck:Leak
fun:malloc
fun:_dl_new_object
fun:_dl_map_object_from_fd
fun:_dl_map_object
fun:dl_open_worker
fun:_dl_catch_error
fun:_dl_open
fun:do_dlopen
fun:_dl_catch_error
fun:__libc_dlopen_mode
fun:pthread_cancel_init
fun:pthread_cancel
}
{
<pthread_cancel_leak_3>
Memcheck:Leak
fun:calloc
fun:_dl_new_object
fun:_dl_map_object_from_fd
fun:_dl_map_object
fun:dl_open_worker
fun:_dl_catch_error
fun:_dl_open
fun:do_dlopen
fun:_dl_catch_error
fun:__libc_dlopen_mode
fun:pthread_cancel_init
fun:pthread_cancel
}
{
<pthread_cancel_leak_4>
Memcheck:Leak
fun:calloc
fun:_dl_check_map_versions
fun:dl_open_worker
fun:_dl_catch_error
fun:_dl_open
fun:do_dlopen
fun:_dl_catch_error
fun:__libc_dlopen_mode
fun:pthread_cancel_init
fun:pthread_cancel
fun:firefly_transport_tcp_posix_stop
fun:ping_main_thread
}
......@@ -18,7 +18,7 @@ UNIT_TEST_PROGS="../build/test/test_event_main
UNIT_TEST_ROOT_PROGS="../build/test/test_transport_eth_posix_main
../build/test/test_transport_eth_posix_main"
SYSTEM_TEST_PROGS="../build/test/pingpong_main ../build/test/udp_posix"
SYSTEM_TEST_PROGS="../build/test/pingpong_main ../build/test/pingpong_main_tcp ../build/test/udp_posix"
# TODO: Should these (below) run at all on a single computer?
SYSTEM_TEST_ROOT_PROGS="../build/test/pong_eth_main ../build/test/ping_eth_main
../build/test/pingpong_multi_main"
......@@ -45,7 +45,7 @@ fi
if [[ "${RUN_SYSTEM_TESTS}" -eq 1 ]]; then
for prog in ${SYSTEM_TEST_PROGS}; do
echo "=========================>BEGIN TEST: ${prog}";
valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes $prog;
valgrind --gen-suppressions=all --suppressions=./pthreads.supp --quiet --error-exitcode=1 --leak-check=full --show-reachable=yes $prog;
test "$?" -ne 0 && echo "FAILURE: Program exited with failure status." >&2 && break;
echo "=========================>END TEST: ${prog}";
done
......
/**
* @file
* @brief The public API of the transport TCP POSIX with specific structures and
* functions.
*/
#ifndef FIREFLY_TRANSPORT_TCP_POSIX_H
#define FIREFLY_TRANSPORT_TCP_POSIX_H
#include <pthread.h>
#include <stdbool.h>
#include <netinet/in.h>
#include <protocol/firefly_protocol.h>
#include <transport/firefly_transport.h>
#include <utils/firefly_event_queue.h>
/**
* @brief This callback will be called when a new connection is received.
*
* This function is implemented by the application layer. It will be called
* when the transport layer receives a new connection. It will be called with
* address of the remote node of the connection as argument. The application
* layer may open a new connection to this remote node on its own.
* If a connection is opened, the id of the event as returned by
* #firefly_connection_open must be returned. If no new connection is
* opened 0 must be returned.
*
* @param llp The \a llp the incomming connection is associated with.
* @param ip_addr The IP addr of the remote node.
* @param port The port number of the remote node.
* @return Event id or 0.
* @retval >0 A new connection was opened and the read data will propagate as
* soon as the connection is completely open.
* @retval 0 The new connection was refused and the read data is discarded.
*/
typedef int64_t (*firefly_on_conn_recv_ptcp)(
struct firefly_transport_llp *llp, int socket,
const char *ip_addr, unsigned short port);
/**
* @brief Allocates and initializes a new \c #firefly_transport_llp with TCP
* specific data and open an TCP socket bound to the specified \a local_port.
*
* @param local_port The port to bind the new socket to.
* @param on_conn_recv The callback to call when a new connection is received.
* If it is NULL no received connections will be accepted.
* @param event_queue The event queue to push spawned events to.
* @return A pointer to the created \c firefly_transport_llp.
* @retval NULL on error.
*/
struct firefly_transport_llp *firefly_transport_llp_tcp_posix_new(
unsigned short local_port,
firefly_on_conn_recv_ptcp on_conn_recv,
struct firefly_event_queue *event_queue);
/**
* @brief Through events, close the socket and free any resources associated
* with this firefly_transport_llp.
*
* The resources freed include all connections and resources freed due to
* freeing a connection.
*
* @param llp The firefly_transport_llp to free.
*/
void firefly_transport_llp_tcp_posix_free(struct firefly_transport_llp *llp);
/**
* @brief Allocates and initializes the transport layer specific data of a
* #firefly_connection. It shall be supplied as parameter to
* #firefly_connection_open().
*
* @param llp The \c #firefly_transport_llp to associate the data with.
* @param existing_socket An existing socket to use, should only be used when
* called from the context of the #firefly_on_conn_recv_ptcp callback (where the
* socket is received as a parameter from the transport layer).
* @param remote_ipaddr The IP address to connect to.
* @param remote_port The port to connect to.
* @return The transport specific data ready to be supplied as argument to
* #firefly_connection_open().
* @retval NULL upon failure.
* @see #firefly_connection_open()
*/
struct firefly_transport_connection *firefly_transport_connection_tcp_posix_new(
struct firefly_transport_llp *llp,
int existing_socket,
const char *remote_ipaddr,
unsigned short remote_port);
/**
* @brief Start reader and resend thread. It will run until stopped with
* firefly_transport_tcp_posix_stop().
*
* @param llp The LLP to run.
* @return Integer indicating success or failure. If it failed, errno contains
* the error code (same as pthread_create's).
* @retval 0 if successfull.
* @retval != 0 upon error.
* @see #firefly_transport_tcp_posix_stop()
*/
int firefly_transport_tcp_posix_run(struct firefly_transport_llp *llp);
/**
* @brief Stop reader thread.
*
* #firefly_transport_tcp_posix_run() must have been run before calling this
* function, if not the result is undefined.
*
* @param llp The LLP to stop.
* @return Integer indicating success or failure.
* @retval 0 if successfull.
* @retval != 0 upon error.
* @see #firefly_transport_tcp_posix_run()
*/
int firefly_transport_tcp_posix_stop(struct firefly_transport_llp *llp);
/**
* @brief Read data from the #firefly_transport_llp. Any read data will be
* included in an event pushed to the #firefly_event_queue.
*
* The read data will be distributed to the connection opened to the remote
* address the data is sent from.
*
* If no such connection exists the #firefly_on_conn_recv_ptcp will be called,
* if it is NULL the data will be discarded.
*
* This function is blocking.
*
* @param llp The Link Layer Port to read data from.
* @see firefly_on_conn_recv_ptcp
*/
void firefly_transport_tcp_posix_read(struct firefly_transport_llp *llp);
#endif
sample int test_var;
sample int test_var_2;
sample int test_var_3;
sample struct {
int data[10];
} test_var_large;
sample struct {
long sec;
long nsec;
......
......@@ -24,7 +24,9 @@ static void signature_trans_write(unsigned char *data, size_t size,
{
UNUSED_VAR(important);
UNUSED_VAR(id);
protocol_data_received(conn, data, size);
unsigned char *cpy_data = FIREFLY_RUNTIME_MALLOC(conn, size);
memcpy(cpy_data, data, size);
protocol_data_received(conn, cpy_data, size);
}
static struct firefly_transport_connection sig_transport = {
......@@ -203,7 +205,9 @@ void protocol_data_received(struct firefly_connection *conn,
labcomm_decoder_ioctl(conn->transport_decoder,
FIREFLY_LABCOMM_IOCTL_READER_SET_BUFFER,
data, size);
labcomm_decoder_decode_one(conn->transport_decoder);
int res = 0;
while (res >= 0)
res = labcomm_decoder_decode_one(conn->transport_decoder);
}
}
......
......@@ -47,6 +47,15 @@ struct firefly_connection *firefly_connection_new(
return NULL;
}
conn->actions = actions;
if (memory_replacements) {
conn->memory_replacements.alloc_replacement =
memory_replacements->alloc_replacement;
conn->memory_replacements.free_replacement =
memory_replacements->free_replacement;
} else {
conn->memory_replacements.alloc_replacement = NULL;
conn->memory_replacements.free_replacement = NULL;
}
reader = transport_labcomm_reader_new(conn, lc_mem);
writer = transport_labcomm_writer_new(conn, lc_mem);
if (reader == NULL || writer == NULL || lc_mem == NULL) {
......@@ -92,15 +101,6 @@ struct firefly_connection *firefly_connection_new(
/* labcomm_register_error_handler_decoder(conn->transport_decoder,*/
/* labcomm_error_to_ff_error);*/
if (memory_replacements) {
conn->memory_replacements.alloc_replacement =
memory_replacements->alloc_replacement;
conn->memory_replacements.free_replacement =
memory_replacements->free_replacement;
} else {
conn->memory_replacements.alloc_replacement = NULL;
conn->memory_replacements.free_replacement = NULL;
}
conn->transport = tc;
conn->open = FIREFLY_CONNECTION_OPEN;
......
......@@ -24,6 +24,19 @@ struct transport_writer_context {
unsigned char *important_id;
};
struct transport_reader_list {
unsigned char *data;
size_t len;
struct transport_reader_list *next;
};
struct transport_reader_context {
struct firefly_connection *conn;
struct transport_reader_list *read;
struct transport_reader_list *to_read;
int last_end_pos;
};
static int proto_reader_alloc(struct labcomm_reader *r,
struct labcomm_reader_action_context *context,
char *version)
......@@ -126,7 +139,7 @@ struct labcomm_reader *protocol_labcomm_reader_new(
struct labcomm_reader_action_context *action_context;
result = FIREFLY_MALLOC(sizeof(*result));
action_context = FIREFLY_MALLOC(sizeof(*result));
action_context = FIREFLY_MALLOC(sizeof(*action_context));
if (result != NULL && action_context != NULL) {
action_context->context = conn;
action_context->action = &proto_reader_action;
......@@ -146,16 +159,271 @@ void protocol_labcomm_reader_free(struct labcomm_reader *r)
FIREFLY_FREE(r);
}
static int trans_reader_append_buffer(struct labcomm_reader *r,
unsigned char *data, size_t len)
{
struct transport_reader_context *ctx;
struct transport_reader_list *le;
struct transport_reader_list **next;
ctx = r->action_context->context;
le = FIREFLY_RUNTIME_MALLOC(ctx->conn, sizeof(*le));
if (le == NULL)
return -1;
le->data = data;
le->len = len;
le->next = NULL;
for (next = &ctx->to_read; *next != NULL; next = &(*next)->next) {}
*next = le;
return 0;
}
static int trans_reader_next_buffer(struct labcomm_reader *r)
{
struct transport_reader_context *ctx;
struct transport_reader_list *le;
struct transport_reader_list **next;
unsigned char *tmp_data;
size_t tmp_len;
ctx = r->action_context->context;
if (ctx->to_read == NULL)
return -1;
// Pop buffer from to_read list
le = ctx->to_read;
ctx->to_read = le->next;
// temporarily save buffer
tmp_data = le->data;
tmp_len = le->len;
// Reuse buffer struct to save current buffer in backstack
le->data = r->data;
le->len = r->count;
le->next = NULL;
// Save current buffer in backstack
for (next = &ctx->read; *next != NULL; next = &(*next)->next) {}
*next = le;
// Set popped buffer as current buffer
r->data = tmp_data;
r->count = tmp_len;
r->pos = 0;
return 0;
}
/*
* This function pops the backstack back in the to_read list. It is done by
* first replacing the current buffer with the bottom (first in list) buffer.
* The replaced buffer is the pushed to the beginning of the to_read list and
* then the entire backstack is pushed as is to the beginning of the to_read
* list.
*/
static void trans_reader_pop_backstack(struct labcomm_reader *r)
{
struct transport_reader_context *ctx;
struct transport_reader_list *le;
struct transport_reader_list **next;
unsigned char *tmp_data;
size_t tmp_len;
ctx = r->action_context->context;
if (ctx->read == NULL)
return;
le = ctx->read;
// Switch current buffer and first buffer in backstack
ctx->read = le->next;
tmp_data = r->data;
tmp_len = r->count;
r->data = le->data;
r->count = le->len;
le->data = tmp_data;
le->len = tmp_len;
// Get previous buffer in backstack, the last one in the list
for (next = &ctx->read; *next != NULL; next = &(*next)->next) {}
// Let the previous buffer point to the current
*next = le;
// Let the current buffer point to the next in to_read;
le->next = ctx->to_read;
// Push the backstack to beginning of to_read
ctx->to_read = ctx->read;
ctx->read = NULL;
}
static void trans_reader_free_backstack(struct labcomm_reader *r)
{
struct transport_reader_context *ctx;
struct transport_reader_list *le;
ctx = r->action_context->context;
while (ctx->read != NULL) {
le = ctx->read;
ctx->read = le->next;
FIREFLY_RUNTIME_FREE(ctx->conn, le->data);
FIREFLY_RUNTIME_FREE(ctx->conn, le);
}
}
static int trans_reader_alloc(struct labcomm_reader *r,
struct labcomm_reader_action_context *context,
char *version)
{
UNUSED_VAR(context);
UNUSED_VAR(version);
UNUSED_VAR(r);
return 0;
}
static int trans_reader_free(struct labcomm_reader *r,
struct labcomm_reader_action_context *context)
{
UNUSED_VAR(context);
transport_labcomm_reader_free(r);
return 0;
}
static int trans_reader_fill(struct labcomm_reader *r,
struct labcomm_reader_action_context *context)
{
UNUSED_VAR(context);
int result;
if (r->error < 0) {
return r->error;
}
result = r->count - r->pos;
if (result <= 0) {
r->error = trans_reader_next_buffer(r);
}
if (r->error < 0) {
trans_reader_pop_backstack(r);
}
return result;
}
static int trans_reader_start(struct labcomm_reader *r,
struct labcomm_reader_action_context *context,
int local_index, int remote_index,
struct labcomm_signature *signature,
void *value)
{
UNUSED_VAR(context);
UNUSED_VAR(local_index);
UNUSED_VAR(remote_index);
UNUSED_VAR(signature);
UNUSED_VAR(value);
return r->count - r->pos;
}
static int trans_reader_end(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context)
{
struct transport_reader_context *ctx;
ctx = action_context->context;
if (r->pos >= r->count) {
if (trans_reader_next_buffer(r) < 0 && r->data != NULL) {
struct firefly_connection *conn = ctx->conn;
FIREFLY_RUNTIME_FREE(conn, r->data);
r->data = NULL;
r->count = 0;
r->pos = 0;
}
ctx->last_end_pos = 0;
} else {
ctx->last_end_pos = r->pos;
}
trans_reader_free_backstack(r);
return 0;
}
static int trans_reader_ioctl(struct labcomm_reader *r,
struct labcomm_reader_action_context *action_context,
int local_index, int remote_index,
struct labcomm_signature *signature,
uint32_t ioctl_action, va_list args)
{
UNUSED_VAR(local_index);
UNUSED_VAR(remote_index);
UNUSED_VAR(signature);
int result;
struct transport_reader_context *ctx;
ctx = action_context->context;
switch (ioctl_action) {
case FIREFLY_LABCOMM_IOCTL_READER_SET_BUFFER: {
void *buffer;
size_t size;
buffer = va_arg(args, void*);
size = va_arg(args, size_t);
if (r->data == NULL) {
r->data = buffer;
r->data_size = size;
r->count = size;
r->pos = 0;
} else {
trans_reader_append_buffer(r, buffer, size);
}
if (r->error) {
r->error = 0;
r->pos = ctx->last_end_pos;
}
result = 0;
} break;
default:
result = -ENOTSUP;
break;
}
return result;
}
static const struct labcomm_reader_action trans_reader_action = {
.alloc = trans_reader_alloc,
.free = trans_reader_free,
.start = trans_reader_start,
.fill = trans_reader_fill,
.end = trans_reader_end,
.ioctl = trans_reader_ioctl,
};
struct labcomm_reader *transport_labcomm_reader_new(
struct firefly_connection *conn, struct labcomm_memory *mem)
struct firefly_connection *conn,
struct labcomm_memory *mem)
{
return protocol_labcomm_reader_new(conn, mem);
struct labcomm_reader *reader;
struct labcomm_reader_action_context *action_context;
struct transport_reader_context *reader_context;
reader = FIREFLY_MALLOC(sizeof(*reader));
action_context = FIREFLY_MALLOC(sizeof(*action_context));
reader_context = FIREFLY_MALLOC(sizeof(*reader_context));
if (reader != NULL && action_context != NULL && reader_context != NULL)
{
reader_context->read = NULL;
reader_context->to_read = NULL;
reader_context->last_end_pos = 0;
reader_context->conn = conn;
action_context->context = reader_context;
action_context->action = &trans_reader_action;
action_context->next = NULL;
reader->action_context = action_context;
reader->memory = mem;
} else {
FIREFLY_FREE(reader);
FIREFLY_FREE(reader_context);
FIREFLY_FREE(action_context);
}
return reader;
}
void transport_labcomm_reader_free(struct labcomm_reader *r)
{
FIREFLY_FREE(r->action_context->context);
FIREFLY_FREE(r->action_context);
FIREFLY_FREE(r);
}
static int comm_writer_alloc(struct labcomm_writer *w,
struct labcomm_writer_action_context *action_context,
char *labcomm_version)
......@@ -429,6 +697,7 @@ void transport_labcomm_writer_free(struct labcomm_writer *w)
comm_writer_free(w, w->action_context);
}
int send_data_sample_event(void *event_arg)
{
struct firefly_event_send_sample *fess;
......
......@@ -109,6 +109,23 @@ if(NOT CMAKE_CROSSCOMPILING AND CUNIT_FOUND)
)
## }}}
## PINGPONG_MAIN {{{
add_executable(pingpong_main_tcp
${Firefly_SOURCE_DIR}/test/pingpong/pingpong_main_tcp.c
${Firefly_SOURCE_DIR}/test/pingpong/pingpong_ptcp.c
${Firefly_SOURCE_DIR}/test/pingpong/pingpong.c
${Firefly_SOURCE_DIR}/test/pingpong/pong_ptcp.c
${Firefly_SOURCE_DIR}/test/pingpong/ping_ptcp.c
${Firefly_SOURCE_DIR}/test/test_labcomm_utils.c
${Firefly_SOURCE_DIR}/utils/firefly_event_queue_posix.c
${Firefly_SOURCE_DIR}/utils/firefly_resend_posix.c
)
target_link_libraries(pingpong_main_tcp