Commit 399839ea authored by Arnaud Blanchard's avatar Arnaud Blanchard
Browse files

Update the way semaphore are initialised while openned

parent 61027173
......@@ -44,7 +44,7 @@ Press 'q' to quit.
Details
=======
The information about the properties of the blc_channels (type, format, sizes) is stored in a special file: '/tmp/blc_channels.txt'. You are not suppose to use it but you can check at anytime the status of the blc_channels using `./run.sh i_channels` in your blaar directory.
The information about the properties of the blc_channels (type, format, sizes) is stored in a special file: '/tmp/blc_channels.txt'. You are not suppose to use it but you can check at anytime the status of the blc_channels using `./run.sh blc_channels` in your blaar directory.
You will see all the existing blc_channels, the process reading or writing and the possibility to destroy channels.
......@@ -61,13 +61,24 @@ Two semaphores associated to the shared memory are created:
Proccesses which do not consider synchronisation use starting char '/'
`[writer]/->[reader]` means no synchronization
`[writer].->[reader]` means the reader waits for new data from the writer
`[writer]^->[reader]` means the writer waits for the reader to read and acknowledge the data
`[writer]:->[reader]` means both direction synchronization. The writer waits for acknowledgement and the reader wait for new data from the writer.
While opening a channel
-----------------------
In synchronous mode, we assume there is only one reader and writter at a time
- **new 0, ack 1**: The data has been read, the receiver is ready
- **new 1, ack 0**: New data is ready for reading
- **new 0, ack 0**(1): The data is being read or written. This is a scenario where you do not know if you have to wait or if the channel was badly closed
- **new 1, ack 1**(2): The new data has not been read yet. This must not happen as the risk is that you can read and write at the same time
At the initialisation, when a channel is created, **ack** is set to 1
If you have a problem, case (1) you can use the tools blc_channels to manually unlock **ack** or **new**.
C/C++ documentation
===================
......
This diff is collapsed.
/*
Basic Library for C/C++ (blclib)
Copyright ETIS — ENSEA, Université de Cergy-Pontoise, CNRS (2011 - 2015)
Author: Arnaud Blanchard
This software is governed by the CeCILL v2.1 license under French law and abiding by the rules of distribution of free software.
You can use, modify and/ or redistribute the software under the terms of the CeCILL v2.1 license as circulated by CEA, CNRS and INRIA at the following URL "http://www.cecill.info".
As a counterpart to the access to the source code and rights to copy, modify and redistribute granted by the license,
 users are provided only with a limited warranty and the software's author, the holder of the economic rights, and the successive licensors have only limited liability.
 In this respect, the user's attention is drawn to the risks associated with loading, using, modifying and/or developing or reproducing the software by the user in light of its specific status of free software,
 that may mean that it is complicated to manipulate, and that also therefore means that it is reserved for developers and experienced professionals having in-depth computer knowledge.
Users are therefore encouraged to load and test the software's suitability as regards their requirements in conditions enabling the security of their systems and/or data to be ensured
 and, more generally, to use and operate it in the same conditions as regards security.
 The fact that you are presently reading this means that you have had knowledge of the CeCILL v2.1 license and that you accept its terms. *
*
*
* Created on: Oct 9, 2014
* Author: Arnaud Blanchard
*/
#include "blc_realtime.h"
#include "blc_net_channel.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <limits.h>
#include <pthread.h>
#include <fcntl.h> // O_... constants
#include <semaphore.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/mman.h> //mmap
#include <sys/stat.h> // mode S_ ... constants
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/un.h>
#include "blc_core.h"
#include "blc_network.h"
#define BLC_CHANNEL_DEFAULT_PORT 3333
#define PAQUET_SIZE 1024 // Should be less than MTU - header_size. Usually between in [1280, 1500] - 28
#define TMP_BUFFER_SIZE 4096
#define BLC_CHANNELS_LIST_PATH "/tmp/blc_channels.txt"
blc_channel *channels=NULL;
int channels_nb=0;
typedef struct
{
uint16_t index, position; //index 0 it is control data
}type_channel_header;
struct changes_check
{
sem_t *sems[2];
void(*callback)(void*);
void *user_data;
};
enum {HEADER_MEM=0, DATA_MEM};
blc_net_channel **net_channels = NULL;
int net_channels_nb = 0;
int channel_server_socket = 0;
int blc_channel_port = BLC_CHANNEL_DEFAULT_PORT;
/* We suppose sizeof(int) >= 4 !!! */
void listen_data(blc_mem *mems, int mems_nb, void *arg)
{
int i;
blc_channel *channel = (blc_channel*)arg;
div_t div_result;
type_channel_header header;
int *destination, *source;
(void) mems_nb;
header = *(type_channel_header*)mems[HEADER_MEM].data;
/* We do not use memcpy to be sure that the copy are 4 by 4. The memory has also to be aligned. TODO check if we can do int by int*/
destination = (int*)channel->data + header.position*PAQUET_SIZE;
source = (int*)mems[DATA_MEM].data;
div_result = div(mems[DATA_MEM].size, sizeof(int));
FOR_INV(i, div_result.quot) *destination++ = *source++;
/* If sizeof(int) = 8, a chunk of 4 can be forgotten */
if (div_result.rem == 4) *(int32_t*)destination = *(int32_t*)source;
else EXIT_ON_ERROR("Data receive is not a multiple of 4 : %d", mems[DATA_MEM].size);
}
/*
void blc_net_channel::send()
{
struct iovec index_position_and_buffer[3];
struct msghdr message;
uint32_t position;
// type_channel_header header;
ssize_t data_sent;
CLEAR(message);
message.msg_iov = index_position_and_buffer;
message.msg_iovlen = BLC_SET_IOVEC2(index_position_and_buffer, index, position)+1;
// message.msg_name = &target_address;
// message.msg_namelen = sizeof(target_address);
FOR_INV(position, (size-1)/PAQUET_SIZE+1) // (size-1) for the case size=PAQUET_SIZE should still be 1
{
index_position_and_buffer[2].iov_base = chars+position*PAQUET_SIZE;
index_position_and_buffer[2].iov_len = MIN(PAQUET_SIZE, size-position*PAQUET_SIZE);
SYSTEM_ERROR_CHECK(data_sent = sendmsg(socket_fd, &message, 0), -1, "Error while sending data of size %ld position %d index %d.", size, position, index);
}
index++;
}*/
void server_recv_callback(blc_mem *mems, int mems_nb, void* user_data)
{
int i;
blc_net_channel *channel=NULL;
const char *channel_name;
//blc_server *server;
channel_name = mems[1].chars;
BLC_LOG("Receive channel name %s", mems[1].chars);
/* FOR_INV(i, net_channels_nb) if(strncmp(net_channels[i]->channel->name, channel_name, FILENAME_MAX) == 0)
{
channel = net_channels[i];
BLC_LOG( "The channel %s is known", channel_name);
break;
}
*/
if (channel == NULL){
/* channel = new blc_channel(mems[1].data, (uint64_t)mems[0].data);
if (channel->size <= PAQUET_SIZE) server = new blc_server(BLC_UDP4, 0, NULL, NULL, channel, 1);
else server = new blc_server(0, BLC_UDP4, listen_data, (void*)channel, 3, sizeof(uint32_t), sizeof(uint32_t), PAQUET_SIZE);
APPEND_ITEM(net_channels, &net_channels_nb, &channel)
*/
}
// send(channel->socket_fd, channel_name, strlen(channel_name), 0); //Envoi de son nom pour dire où on attend la reponse.
}
//void receive_cb(blc_server *server, )
int blc_net_channel::try_recv_update(int us_timeout){
int ret;
fd_set fdset;
struct timeval timeout={0, us_timeout}; //100ms
FD_ZERO(&fdset);
FD_SET(network.socket_fd, &fdset);
network.send_buffer(&refresh_period, sizeof(refresh_period));
SYSTEM_ERROR_CHECK(ret=select(network.socket_fd+1, &fdset, NULL, NULL, &timeout), -1, "Waiting server channel data"); //It would be nice to check errorfds
if (ret==0) return 0; //Timeout
SYSTEM_SUCCESS_CHECK(ret=read(network.socket_fd, data, size), size, "Reading blc_channel '%s' data: '%lu' bytes instead of '%lu'", name, ret, size);
return 1;
}
void blc_net_channel::recv_update(int us_timeout, int trials_nb){
int i;
FOR_INV(i, trials_nb) if (try_recv_update(us_timeout) == 1) return;
EXIT_ON_CHANNEL_ERROR(this, "Impossible to receive data in '%d' µs", us_timeout);
}
static void *net_channel_update_manager(void *user_data){
blc_net_channel *net_channel = (blc_net_channel*)user_data;
fd_set fdset;
int ret;
struct timeval timeout={0, 100000}; //100ms
//Wait for data to be ready
FD_ZERO(&fdset);
net_channel->network.send_buffer(&net_channel->refresh_period, sizeof(net_channel->refresh_period));
while(1){
FD_SET(net_channel->network.socket_fd, &fdset);
SYSTEM_ERROR_CHECK(ret=select(net_channel->network.socket_fd+1, &fdset, NULL, NULL, &timeout), -1, "Waiting server channel data"); //It would be nice to check errorfds
if (ret==0) printf("Timeout for server\n");
else{
SYSTEM_SUCCESS_CHECK(ret=read(net_channel->network.socket_fd, net_channel->data, net_channel->size), net_channel->size, "Reading blc_channel '%s' data: '%lu' bytes instead of '%lu'", net_channel->name, ret, net_channel->size);
}
}
}
void blc_net_channel::init(char const *channel_name, char const *address, int refresh_period){
blc_channel channel_info;
char buffer[LINE_MAX];
fd_set fdset;
int ret, error;
pthread_t thread;
struct timeval timeout={0, 100000}; //100ms
// 33333 default net_channel port. Ready to udp send request to address
network.init(address, BLC_NET_CHANNEL_PORT_NAME, BLC_UDP4);
//Send channel name request
network.send_buffer(channel_name, strlen(channel_name));
//Wait for data to be ready
FD_ZERO(&fdset);
FD_SET(network.socket_fd, &fdset);
SYSTEM_ERROR_CHECK(ret=select(network.socket_fd+1, &fdset, NULL, NULL, &timeout), -1, "Waiting server channel definition"); //It would be nice to check errorfds
if (ret==0) EXIT_ON_ERROR("Timeout of %.3fms. The server is probably down.", timeout.tv_sec*1000 + timeout.tv_usec/1000.f);
network.remote_address_length=sizeof(network.remote_address);
//Get the propoerties of the channel and the address (especilly the new port) of the sender
SYSTEM_ERROR_CHECK(ret=recvfrom(network.socket_fd, buffer, LINE_MAX, NO_FLAG, (struct sockaddr*)&network.remote_address, &network.remote_address_length), -1, "Reading blc_channel properties");
if (ret == LINE_MAX) EXIT_ON_ERROR("Buffer of '%d' is too small", ret);
//We interpret the data as blc_channel properties
channel_info.sscan_info(buffer);
//We create or reopen a similar local channel
create_or_open("/pipi", BLC_CHANNEL_WRITE, channel_info.type, channel_info.format, channel_info.dims_nb, channel_info.dims);
if (refresh_period >=0) PTHREAD_CHECK(pthread_create(&thread, NULL, net_channel_update_manager, this), error, NULL);
}
static void *refresh_server_cb(void *user_data){
// struct timespec time_left={0,0};
struct timeval timer={0, 0};
blc_net_channel *net_channel = (blc_net_channel*)user_data;
int current_duration;
int64_t time_left;
char buffer[1024];
int32_t refresh_period=-1;
char address_name[NAME_MAX], port_name[NAME_MAX];
net_channel->sprint_info(buffer, 1024);
net_channel->network.send_buffer(buffer, 1024);
read(net_channel->network.socket_fd, &refresh_period, sizeof(refresh_period));
while(1){
net_channel->network.get_remote_address(address_name, NAME_MAX, port_name, NAME_MAX);
printf("Answer from %s:%s\n", address_name, port_name);
net_channel->network.send_buffer(net_channel->data, net_channel->size);
if (refresh_period){
if (refresh_period==-1){
read(net_channel->network.socket_fd, &refresh_period, sizeof(refresh_period));
}else if (refresh_period==-2)
{
break;
}else{
current_duration=us_time_diff(&timer);
time_left = (net_channel->refresh_period - current_duration);
if (time_left < 0) color_fprintf(BLC_YELLOW, stderr, "\rMissing %.3fms to send the channel.", -time_left/1000.f);
else SYSTEM_SUCCESS_CHECK(usleep(time_left), 0, "Program loop interrupted");
current_duration=us_time_diff(&timer);
/*Warning usleep can be overtimed of 10ms !!! it is the same for usleep !!!*/
}
}
}
net_channel->close();
return NULL;
}
/** It is called each time the port 33333 receive a message. If it is a known channel, a new port is opened specifically for the channel*/
static void channel_request_cb(blc_server *server){
blc_channel channel_info, *new_channel;
blc_net_channel *new_net_channel;
int channel_id, error;
pthread_t thread;
struct sockaddr_storage address;
socklen_t address_size;
channel_id=blc_channel_get_info_with_name(&channel_info, server->chars);
SYSTEM_ERROR_CHECK(getsockname(server->socket_fd, (struct sockaddr*)&address, &address_size), -1, NULL);
if (channel_id==-1) {
snprintf(server->chars, server->size, "error: blc_channel does not exist.\n");
server->send_back();
}
else {
//create a new server for the new channel
/* new_channel=(blc_channel*)APPEND_ALLOCATION(&channels, &channels_nb, blc_channel);
new_channel->open(server->chars, BLC_CHANNEL_READ); //If refresh_period = 0 it may be BLC_CHANNEL_READ_SYNC (not yet working) to read as fast as it changes*/
new_net_channel=(blc_net_channel*)APPEND_ALLOCATION(&net_channels, &net_channels_nb, blc_net_channel);
new_net_channel->init(server->chars, "0", BLC_UDP4 | BLC_SERVER);
SYSTEM_ERROR_CHECK(getsockname(new_net_channel->socket_fd, (struct sockaddr*)&address, &address_size), -1, NULL);
new_net_channel->remote_address=server->remote_address;
new_net_channel->remote_address_length=server->remote_address_length;
new_net_channel->refresh_period=1000000;
PTHREAD_CHECK(pthread_create(&thread, NULL, refresh_server_cb, new_net_channel), error, "Creating refresh cb");
}
}
void blc_channel_start_server(char const* port_name){
blc_server server;
server.start(port_name, BLC_UDP4, channel_request_cb, NULL, LINE_MAX);
}
/*Send a request on 3333 based on the channel name. Wait for an answer with the caracteristics. Send a feedback asking for data with a refresh period.
- -1 for only one blocking update.
- 0 Start a server waiting for data as fast as possible.
- any positive values for any periodical return in µs
*/
//blc_channel *blc_net_channel::recv_update(char const *name, char const *address){
/* char buffer[1024];
blc_channel channel_info;
// type_channel_header channel_header;
int32_t refresh_period=-1;
fd_set fdset;
struct timeval timeout;
int ret=1;
char address_name[NAME_MAX], port_name[NAME_MAX];
timeout.tv_sec=0;
timeout.tv_usec=100000; //100ms
// this->channel=channel;
init(address, "33333", BLC_UDP4);
send_buffer(name, strlen(name));
get_remote_address(address_name, NAME_MAX, port_name, NAME_MAX);
printf("send to %s:%s\n", address_name, port_name);
//the max size that can store
FD_ZERO(&fdset);
FD_SET(socket_fd, &fdset);
SYSTEM_ERROR_CHECK(ret=select(socket_fd+1, &fdset, NULL, NULL, &timeout), -1, "Waiting server channel definition"); //It would be nice to check errorfds
if (ret==0) EXIT_ON_ERROR("Timeout of %.3fms for server", timeout.tv_sec*1000 + timeout.tv_usec/1000.f);
CLEAR(buffer);
remote_address_length=sizeof(remote_address);
SYSTEM_ERROR_CHECK(ret=recvfrom(socket_fd, buffer, 1024, NO_FLAG, (struct sockaddr*)&remote_address, &remote_address_length), -1, "Reading blc_channel properties");
printf("\nread '%d' bytes, \n%s\n\n", ret, buffer);
get_remote_address(address_name, NAME_MAX, port_name, NAME_MAX);
printf("Answer from %s:%s\n", address_name, port_name);
// if (buffer[0]!='/') PRINT_WARNING("Receiving error from server: '%s' requesting channel '%s'", buffer, name);
channel_info.sscan_info(buffer);
channel=(blc_channel*)APPEND_ALLOCATION(&channels, &channels_nb, blc_channel);
channel->create_or_open("/pipi", BLC_CHANNEL_WRITE, channel_info.type, channel_info.format, channel_info.dims_nb, channel_info.dims);
FD_ZERO(&fdset);
FD_SET(socket_fd, &fdset);
send_buffer(&refresh_period, sizeof(refresh_period));
strncpy(channel->chars, "Not receive", channel->size);
while(1){
// SYSTEM_ERROR_CHECK(ret=select(socket_fd+1, &fdset, NULL, NULL, &timeout), -1, "Waiting server channel data"); //It would be nice to check errorfds
if (ret==0) printf("Timeout for server\n");
else{
SYSTEM_ERROR_CHECK(ret=recvfrom(socket_fd, channel->chars, channel->size, NO_FLAG, (struct sockaddr*)&remote_address, &remote_address_length), -1, "Reading blc_channel properties");
printf("message '%s'\n", channel->chars);
}
}
if (size < PAQUET_SIZE) read(socket_fd, channel->data, channel->size);
else EXIT_ON_CHANNEL_ERROR(channel, "Data is bigger than PAQUET size (%d), it is not yet managed", PAQUET_SIZE);
printf("%s", channel->chars);
return channel;*/
//}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment