Commit ba8db4c9 authored by Arnaud Blanchard's avatar Arnaud Blanchard
Browse files

Manage synchronized semaphores

parent d2b61816
......@@ -5,7 +5,7 @@ BLC channel
- Author : [Arnaud Blanchard](http://arnaudblanchard.thoughtsheet.com)
- Licence : [CeCILL v2.1](http://www.cecill.info/licences/Licence_CeCILL_V2-en.html)
Functions to easily share data through shared memory (shm_... functions). The advantage is that it is the fastest and the more econom in memory ( no copy in each process ) way to share informtation between processes. At this moment it only works in asynchronous mode.
Functions to easily share data through shared memory (shm_... functions). It is the fastest, and the more econom in memory ( no copy in each process ), way to share informtation between processes. At this moment it only works in asynchronous mode.
The idea is that you have a structure blc_channel which is like a blc_array ( https://framagit.org/blaar/blc_core/blob/master/t_array/main.cpp ) but has a name starting with '/' to identificate it on your system.
......@@ -50,6 +50,24 @@ You will see all the existing blc_channels, the process reading or writing and t
On Linux we can see and manipulate a virtual file containing this memory in /run/shm/<name of your shared memory>, on OSX you cannot but anyway it is only used for debug.
Synchronisation
===============
Two semaphores associated to the shared memory are created:
- `blc_channle<id>_new_data0` which indicates that new data is available on the shared memory
- `blc_channle<id>_ack_data0` which acknowledges that the data has been read
Proccesses which do not consider synchronisation use starting char '/'
`[writer]/->[reader]` means no synchronization
`[writer].->[reader]` means the reader waits for new data from the writer
`[writer]^->[reader]` means the writer waits for the reader to read and acknowledge the data
`[writer]:->[reader]` means both direction synchronization. The writer waits for acknowledgement and the reader wait for new data from the writer.
C/C++ documentation
===================
......
......@@ -44,24 +44,11 @@ Shared memory are opened with POSIX shm_open. The file /tmp/blc_channels.txt mai
///Defines channel in writting mode ( if memory can be written, we cannot disable reading )
#define BLC_CHANNEL_WRITE O_RDWR
///Not yet in use
#define BLC_CHANNEL_SYNC 4 // O_ACCMODE + 1 First flag after access mode
///Not yet in use
#define BLC_CHANNEL_BLOCK (BLC_CHANNEL_SYNC | 8) //BLOCK has to be SYNC
///Size of parameters for a channel description. Parameter is not used at this moment
#define BLC_PARAMETER_MAX 31
///Like EXIT_ON_ERROR but also display all debug informatin about the blc_channel
#define EXIT_ON_CHANNEL_ERROR(channel, ...) blc_channel_fatal_error(channel, __FILE__, __FUNCTION__, __LINE__, __VA_ARGS__)
///Not yet in use. Structure that will be used for synchronisation.
typedef struct shared_control{
/* pthread_mutex_t mutex;
pthread_cond_t writer_cond, reader_cond;*/
int writing, event_id, waitings_nb;
uint64_t new_value_flags, readers_flags, reading_flags; //bitfield max 64 blocking_readers;
struct timeval write_time;
}blc_share_control;
///Extends **blc_array** allowing it to be share among processes through shared memory
typedef struct blc_channel
#ifdef __cplusplus
......@@ -126,20 +113,37 @@ typedef struct blc_channel
///Map the shared memory with the blc_channel
void map_memory(int mode);
void open_semaphores(int create=0);
/**Post new data if the semaphore exist (otherwise return -1). Return 1, if the data has been used ( sem_new_data busy which is normal), 0 otherwise ( which means nobody is istening ).*/
int post_new_data();
/**Post acknoledged data the semaphore exist (otherwise return -1). Return 1, if the data has been used ( sem_new_data busy which is normal ), 0 otherwise ( which means nobody is checking that).*/
int post_ack_data();
#else
{
blc_array array;// Not beautiful but makes it easy to convert C++ heritage of "class" in C struct inclusion.
#endif
int id, access_mode, potential_new_data, potential_ack_data;
int id, access_mode;
char name[NAME_MAX+1];
char parameter[BLC_PARAMETER_MAX+1];
int fd, ctrl_fd ; ///< shared memory file descriptor
blc_share_control *control; //<Part that we be share in purpose of control
sem_t *sem_new_data, *sem_ack_data, *sem_events[2], *sem_var;
uint64_t reader_flag;
int fd ; ///< shared memory file descriptor
sem_t *sem_new_data, *sem_ack_data;
}blc_channel;
START_EXTERN_C
/** Wait until new data is available
We use void * to make it ablle to be use in callback*/
void blc_channel_wait_new_data(void *channel);
/** Wait until sommeone has read and acknoledged the data.
We use void * to make it ablle to be use in callback*/
void blc_channel_wait_ack_data(void *channel);
/**Used by EXIT_ON_CHANNEL_ERROR*/
void blc_channel_fatal_error(blc_channel *channel, const char *name_of_file, const char* name_of_function, int numero_of_line, const char *message, ...);
/**Create a blc_channel with the name, the mode is BLC_CHANNEL_READ or BLC_CHANNEL_WRITE, the type of data UIN8, INT8, UI16, IN16, FL32, FL64. ( default 0:UIN8),
......@@ -161,6 +165,11 @@ typedef struct blc_channel
int blc_channel_get_info_with_name(blc_channel *info, char const *name);
///Remove the blc_channel **name**. The other processes using it will still work but no new ones can use it.
void blc_remove_channel_with_name(char const *name);
void blc_channel_post_event();
void blc_channel_check_for_event(void (*callback)(void*user_data), void *user_data);
END_EXTERN_C
///@}
......
......@@ -39,13 +39,13 @@
static int blc_channel_id_max = 0;
blc_channel::blc_channel(): access_mode(0), potential_new_data(0), potential_ack_data(0), fd(-1),control(NULL), sem_new_data(NULL), sem_ack_data(NULL){}
blc_channel::blc_channel(): access_mode(0), fd(-1), sem_new_data(NULL), sem_ack_data(NULL){}
blc_channel::blc_channel(char const *new_name, int mode){
open(new_name, mode);
}
blc_channel::blc_channel(char const *new_name, int mode, uint32_t type, uint32_t format, int dims_nb, int length, ...):access_mode(0), potential_new_data(0), potential_ack_data(0), fd(-1),control(NULL), sem_new_data(NULL), sem_ack_data(NULL)
blc_channel::blc_channel(char const *new_name, int mode, uint32_t type, uint32_t format, int dims_nb, int length, ...):access_mode(0), fd(-1), sem_new_data(NULL), sem_ack_data(NULL)
{
va_list arguments;
......@@ -89,8 +89,7 @@ void blc_channel::fprint_info(FILE *file, int print_id){
width=fprint_dims(file);
fprintf(file, "%*c", 32-width, ' ');
if (strlen(name)==0) EXIT_ON_ERROR("name is empty");
if (potential_new_data) fprintf(file, " .%s\n", name+1); //Replace first '/' by @
else fprintf(file, " %s\n", name);
fprintf(file, " %s\n", name);
}
//Should use code in common with fscan_info
......@@ -125,7 +124,6 @@ int blc_channel::fscan_info(FILE *file, int scan_id){
data=NULL;
size=0;
if (scan_id){
SYSTEM_ERROR_CHECK(ret = fscanf(file, "%d ", &id),-1, "Reading channel id");
if (ret!=1) EXIT_ON_ERROR("impossible to read id");
......@@ -141,12 +139,6 @@ int blc_channel::fscan_info(FILE *file, int scan_id){
if (fscanf(file, " %"STRINGIFY_CONTENT(NAME_MAX)"[^\n]\n", name) != 1) EXIT_ON_ERROR("Impossible to read channel's name of channel id '%d'.", id);
if (name[0]=='.') {
potential_new_data=1;
name[0]='/';
}
else potential_new_data=0;
if (id > blc_channel_id_max) blc_channel_id_max=id;
return id;
}
......@@ -164,47 +156,94 @@ void blc_channel::map_memory(int access_mode){
SYSTEM_ERROR_CHECK(data = mmap(0, size, prot, MAP_SHARED, fd, 0), MAP_FAILED, "Mapping memory of %s (size %ld bytes), fd(%d), prot(%d).", name, size, fd, prot);
if (data==NULL) EXIT_ON_CHANNEL_ERROR(this, "Fail mapping blc_channel");
blc_channel_post_event();
}
void blc_channel::open_semaphores(int create){
char tmp_name[NAME_MAX];
sprintf(tmp_name, "blc_channel%d_sem_new_data0", id);
if (create) SYSTEM_ERROR_CHECK(sem_new_data = sem_open(tmp_name, O_CREAT | O_EXCL, S_IRWXU, 0), SEM_FAILED, "Creating named semaphore '%s' for blc_channel '%s'", tmp_name, name);
else SYSTEM_ERROR_CHECK(sem_new_data = sem_open(tmp_name, NO_FLAG), SEM_FAILED, "Opening named semaphore '%s' for blc_channel '%s'", tmp_name, name);
sprintf(tmp_name, "blc_channel%d_sem_ack_data0", id);
if (create) SYSTEM_ERROR_CHECK(sem_ack_data = sem_open(tmp_name, O_CREAT | O_EXCL, S_IRWXU, 0), SEM_FAILED, "Creating named semaphore '%s' for blc_channel '%s'", tmp_name, name);
else SYSTEM_ERROR_CHECK(sem_ack_data = sem_open(tmp_name, NO_FLAG), SEM_FAILED, "Opening named semaphore '%s' for blc_channel '%s'", tmp_name, name);
}
///Create a channel once the blc_array is already defined
void blc_channel::create(char const *new_name, int access_mode){
blc_channel info;
FILE *file;
char name_tmp[NAME_MAX];
int sync_new_data=0, sync_ack_data=0;
strcpy(parameter, "NDEF");
STRCPY(name, new_name);
if (name[0]=='.'){
potential_new_data=1;
name[0]='/'; //We remove @ which was just to indicate synchronisation.
switch (name[0]){
case '/':
break;
case '.':
sync_new_data=1;
break;
case '^':
sync_ack_data=1;
break;
case ':':
sync_new_data=1;
sync_ack_data=1;
break;
default:EXIT_ON_ERROR("Blc channel names must start with '/' for asynchrone mode or with '.',';',':' for synchrone mode. But it is '%s'", name);
break;
}
else if (new_name[0]!='/') EXIT_ON_ERROR("Blc channel names must start with '/' for asynchrone mode or with '.' for synchrone mode");
name[0]='/'; //In any case the name start with '/'
id = blc_channel_get_info_with_name(&info, name);
if (id !=-1 ) EXIT_ON_ERROR("The channel '%s' is already referenced in '" BLC_CHANNELS_LIST_PATH"'.\nYou may unlink it before:\n ./run.sh i_channels --unlink %s", info.name);
if (id !=-1 ) EXIT_ON_ERROR("The channel '%s' is already referenced in '" BLC_CHANNELS_LIST_PATH"'.\nYou may unlink it before:\n blc_channels --unlink %s", info.name);
if (blc_channel_id_max == BLC_CHANNELS_MAX) EXIT_ON_CHANNEL_ERROR(this, "creation impossible, channel id (%d) too big", blc_channel_id_max);
else blc_channel_id_max++;
id = blc_channel_id_max;
SYSTEM_ERROR_CHECK(file=fopen(BLC_CHANNELS_LIST_PATH, "a+"), NULL, "Openning the file '" BLC_CHANNELS_LIST_PATH"' in order to reference the channel '%s'.", name);
fd = shm_open(name, O_CREAT | O_EXCL | access_mode, S_IRWXU); // (O_EXCL first) is important in order to avoid a race condition. We create it at the same time we check it does not exist. Otherwise someone else could create it in between.
if (fd==-1){
if (fd==-1){//Creation impossible
fclose(file);
if (errno == EEXIST) EXIT_ON_ERROR("The shared memory '%s' already exists, you should destroy it before. e.g.:\n ./run.sh i_channels -u%s", name, name);
if (errno == EEXIST) EXIT_ON_ERROR("The shared memory '%s' already exists, you should destroy it before. e.g.:\n blc_channels --unlink%s", name, name);
else EXIT_ON_SYSTEM_ERROR("Creating shared memory '%s'.", name);
}
else{
fprint_info(file, 1);
fclose(file);
if (potential_new_data){
sprintf(name_tmp, "blc_channel%d_sem_new_data0", id);
SYSTEM_ERROR_CHECK(sem_new_data = sem_open(name_tmp, O_CREAT | O_EXCL, S_IRWXU, 0), NULL, "Creating named semaphore '%s' for blc_channel '%s'", name_tmp, name);
open_semaphores(1);
if (sync_new_data) {
if (access_mode==BLC_CHANNEL_READ){
while (sem_trywait(sem_new_data)==-1){ //If the semaphore is busy we post it until it is unlocked
if (errno==EAGAIN) SYSTEM_ERROR_CHECK(sem_post(sem_new_data), -1, "blc_channel '%s'", this->name);
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name);
}
SYSTEM_ERROR_CHECK(sem_post(sem_ack_data), -1, "blc_channel '%s'", this->name);
}
}else {
sem_close(sem_new_data);
sem_new_data=NULL;
}
if (sync_ack_data) {
if (access_mode==BLC_CHANNEL_READ){
while (sem_trywait(sem_new_data)==-1){ //If the semaphore is busy we post it until it is unlocked
if (errno==EAGAIN) SYSTEM_ERROR_CHECK(sem_post(sem_new_data), -1, "blc_channel '%s'", this->name);
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name);
}
SYSTEM_ERROR_CHECK(sem_post(sem_ack_data), -1, "blc_channel '%s'", this->name);
}
}else {
sem_close(sem_ack_data);
sem_ack_data=NULL;
}
SYSTEM_ERROR_CHECK(ftruncate(fd, size), -1, "fd:%d, size '%ld'.", fd, size);
......@@ -262,19 +301,54 @@ int blc_channel::conflict(const char *new_name){
///Open an existing channel
void blc_channel::open(const char *name, int access_mode){
char name_tmp[NAME_MAX];
int sync_new_data=0, sync_ack_data=0;
STRCPY(this->name, name);
switch (name[0]){
case '/':
break;
case '.':
sync_new_data=1;
break;
case '^':
sync_ack_data=1;
break;
case ':':
sync_new_data=1;
sync_ack_data=1;
break;
default:EXIT_ON_ERROR("Blc channel names must start with '/' for asynchrone mode or with '.','^',':' for synchrone mode. But it is '%s'", this->name);
break;
}
this->name[0]='/'; //In any case the nale start with '/'
id = blc_channel_get_info_with_name(this, name);
if (id==-1) EXIT_ON_ERROR("The blc_channel '%s' does not exist. Run blc_channels", name);
if (this->name[0]=='.') this->name[0]='/'; //We remove ! which was just to indicate synchronisation.
else if (this->name[0]!='/') EXIT_ON_ERROR("Blc channel names must start with '/' for asynchrone mode or with '.' for synchrone mode");
SYSTEM_ERROR_CHECK(fd = shm_open(this->name, access_mode, S_IRWXU), -1, "Impossible to open shared memory '%s'with mode %d.", name, access_mode);
if (name[0]=='.'){
if (sync_new_data) {
sprintf(name_tmp, "blc_channel%d_sem_new_data0", id);
SYSTEM_ERROR_CHECK(sem_new_data = sem_open(name_tmp, O_CREAT, S_IRWXU, 0), NULL, "Creating or opening named semaphore '%s' for blc_channel '%s'", name_tmp, name);
SYSTEM_ERROR_CHECK(sem_new_data = sem_open(name_tmp, NO_FLAG), SEM_FAILED, "Opening named semaphore '%s' for blc_channel '%s'", name_tmp, name);
/* if (access_mode==BLC_CHANNEL_READ){ //As we have just started. Any data is new
while (sem_trywait(sem_new_data)==-1){ //If the semaphore is busy we post it until it is unlocked
if (errno==EAGAIN) SYSTEM_ERROR_CHECK(sem_post(sem_new_data), -1, "blc_channel '%s'", this->name);
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name);
}
SYSTEM_ERROR_CHECK(sem_post(sem_new_data), -1, "blc_channel '%s'", this->name);
}*/
}
if (sync_ack_data){
sprintf(name_tmp, "blc_channel%d_sem_ack_data0", id);
SYSTEM_ERROR_CHECK(sem_ack_data = sem_open(name_tmp, NO_FLAG), SEM_FAILED, "Opening named semaphore '%s' for blc_channel '%s'", name_tmp, name);
if (access_mode==BLC_CHANNEL_READ){ //As we have just started. Any data is new
while (sem_trywait(sem_ack_data)==-1){ //If the semaphore is busy we post it until it is unlocked
if (errno==EAGAIN) SYSTEM_ERROR_CHECK(sem_post(sem_ack_data), -1, "blc_channel '%s'", this->name);
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name);
}
SYSTEM_ERROR_CHECK(sem_post(sem_ack_data), -1, "blc_channel '%s'", this->name);
}
}
map_memory(access_mode);
}
......@@ -291,12 +365,12 @@ int blc_channel::create_or_open(char const *new_name, int access_mode, uint32_t
created=1;
}
else{
if (info.dims_nb!=dims_nb) EXIT_ON_CHANNEL_ERROR(&info, "Reopening a blc_channel with different dims_nb '%d'. You may want to unlink it:\n ./run.sh i_channels --unlink %s", dims_nb, name);
if (info.type!=type) EXIT_ON_CHANNEL_ERROR(&info, "Reopening blc_channel with another type '%.4s'. You may want to unlink it:\n ./run.sh i_channels --unlink %s", UINT32_TO_STRING(new_type_str, type), name);
if (info.format!=format) EXIT_ON_CHANNEL_ERROR(&info, "Reopening blc_channel with another format '%.4s'. You may want to unlink it:\n ./run.sh i_channels --unlink %s", UINT32_TO_STRING(new_format_str, format), name);
if (info.dims_nb!=dims_nb) EXIT_ON_CHANNEL_ERROR(&info, "Reopening a blc_channel with different dims_nb '%d'. You may want to unlink it:\n blc_channels --unlink %s", dims_nb, info.name);
if (info.type!=type) EXIT_ON_CHANNEL_ERROR(&info, "Reopening blc_channel with another type '%.4s'. You may want to unlink it:\n blc_channels --unlink %s", UINT32_TO_STRING(new_type_str, type), info.name);
if (info.format!=format) EXIT_ON_CHANNEL_ERROR(&info, "Reopening blc_channel with another format '%.4s'. You may want to unlink it:\n blc_channels --unlink %s", UINT32_TO_STRING(new_format_str, format), info.name);
if (info.dims_nb!=0){
for(dim=0; dim != info.dims_nb; dim++) {
if (dims[dim].length!=info.dims[dim].length) EXIT_ON_ERROR("The reopening dimension '%d' length '%d' of '%s' differ from the existing one '%d'. You may want to unlink it:\n ./run.sh i_channels --unlink %s", dim, info.dims[dim].length, info.name, dims[dim].length, info.name);
if (dims[dim].length!=info.dims[dim].length) EXIT_ON_ERROR("The reopening dimension '%d' length '%d' of '%s' differ from the existing one '%d'. You may want to unlink the blc_channel before:\n blc_channels --unlink %s", dim, dims[dim].length, info.name, info.dims[dim].length, info.name);
}
}
open(new_name, access_mode);
......@@ -367,6 +441,7 @@ blc_channel::~blc_channel(){
if (fd!=-1){
close(fd);
blc_channel_post_event();
}
}
......@@ -376,8 +451,16 @@ void blc_channel::remove(){
}
void blc_channel::publish(){
if (sem_new_data) fprintf(stdout, ".%s\n", name+1); //Replace first '/' by '.'
else fprintf(stdout, "%s\n", name);
char sync_char;
if (sem_new_data && sem_ack_data) sync_char=':'; // <=>
else if (sem_new_data) sync_char='.'; // ->
else if (sem_ack_data) sync_char='\''; // <-
else sync_char='/'; //No synchronisation
fprintf(stdout, "%c%s\n", sync_char, name+1); //Replace first '/' by '.'
//This is needed with pipe. "\n" dos not flush whane the program is piped
fflush(stdout);
}
......@@ -390,3 +473,50 @@ void blc_channel::fprint_debug(FILE *file){
if (data==NULL) fprintf(file, "data is null\n");
}
int blc_channel::post_new_data(){
int listenner = -1;
if (this->sem_new_data) {
if (sem_trywait(this->sem_new_data)==0) listenner=0;
else if (errno==EAGAIN) listenner=1;
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name); // EAGAIN just means semaphore is busy
SYSTEM_ERROR_CHECK(sem_post(this->sem_new_data), -1, "Sem new_data of blc_channel '%s'", this->name); //We signal new_data
}
return listenner;
}
int blc_channel::post_ack_data(){
int listenner = -1;
if (this->sem_new_data) {
if (sem_trywait(this->sem_ack_data)==0) listenner=0;
else if (errno==EAGAIN) listenner=1;
else EXIT_ON_SYSTEM_ERROR("blc_channel '%s'", this->name); // EAGAIN just means semaphore is busy
SYSTEM_ERROR_CHECK(sem_post(this->sem_ack_data), -1, "Sem new_data of blc_channel '%s'", this->name); //We signal new_data
};
return listenner;
}
START_EXTERN_C
void blc_channel_wait_new_data(void *channel_pt){
blc_channel *channel=(blc_channel*)channel_pt;
SYSTEM_ERROR_CHECK(sem_wait(channel->sem_new_data), -1, "Waiting new data for channel '%s'", channel->name);
}
void blc_channel_wait_ack_data(void *channel_pt){
blc_channel *channel=(blc_channel*)channel_pt;
SYSTEM_ERROR_CHECK(sem_wait(channel->sem_new_data), -1, "Waiting ack data for channel '%s'", channel->name);
}
END_EXTERN_C
......@@ -38,7 +38,15 @@
#define BLC_CHANNELS_LIST_PATH "/tmp/blc_channels.txt"
static int blc_channel_id_max = 0;
static sem_t *blc_channel_event[2]={NULL, NULL};
static int blc_channel_event_id=0;
//NULL, *blc_channel_event=NULL;
;
static void (*blc_channel_event_callback)(void*)=NULL;
static void *blc_channel_event_user_data=NULL;
pthread_t thread;
START_EXTERN_C
......@@ -82,20 +90,10 @@ void blc_remove_channel_with_name(char const *name){
if (info.id==blc_channel_id_max) blc_channel_id_max--;
SYSTEM_ERROR_CHECK(shm_unlink(name), -1, "unlinking blc_channel '%s'", name);
if (info.potential_new_data){
sprintf(tmp_name, "blc_channel%d_sem_new_data0", info.id);
SYSTEM_ERROR_CHECK(sem_unlink(tmp_name), -1, "Unlinking sem_new_data '%s' for blc_channel '%s'", tmp_name, name);
}
/* SPRINTF(control_filename, "/blc_channel%d-var", info.id);
SYSTEM_ERROR_CHECK(sem_unlink(control_filename), -1, NULL);
SPRINTF(control_filename, "/blc_channel%d-event0", info.id);
SYSTEM_ERROR_CHECK(sem_unlink(control_filename), -1, NULL);
SPRINTF(control_filename, "/blc_channel%d-event1", info.id);
SYSTEM_ERROR_CHECK(sem_unlink(control_filename), -1, NULL);
if (shm_unlink(control_filename)==-1) if (errno!=ENOENT) EXIT_ON_SYSTEM_ERROR("unlinking blc_channel '%s'", name); // It does not matter if the file does not exist
*/
sprintf(tmp_name, "blc_channel%d_sem_ack_data0", info.id);
SYSTEM_ERROR_CHECK(sem_unlink(tmp_name), -1, "Unlinking sem_ack_data '%s' for blc_channel '%s'", tmp_name, name);
}
// Envoie un message d'erreur avec name_of_file, name_of_function, number_of_line et affiche le message formate avec les parametres variables. Puis exit le programme avec le parametre EXIT_FAILURE. To be used with EXIT_ON_ERROR.
......@@ -216,5 +214,57 @@ int blc_channel_remove(blc_channel * channel)
else channel->remove();
return 1;
}
static void init_blc_channel_sem_event(){
SYSTEM_ERROR_CHECK(blc_channel_event[0]=sem_open("blc_channel_event0", O_CREAT, S_IRWXU, 0), SEM_FAILED, NULL);
SYSTEM_ERROR_CHECK(blc_channel_event[1]=sem_open("blc_channel_event1", O_CREAT, S_IRWXU, 0), SEM_FAILED, NULL);
}
void blc_channel_post_event(){
if (blc_channel_event[0]==NULL || blc_channel_event[1]==NULL) init_blc_channel_sem_event();
//If the event is already free it is not the good one. We free the other one to trigger the event
if (sem_trywait(blc_channel_event[blc_channel_event_id])==0){
SYSTEM_ERROR_CHECK(sem_post(blc_channel_event[blc_channel_event_id]), -1, NULL);
SYSTEM_ERROR_CHECK(sem_post(blc_channel_event[1-blc_channel_event_id]), -1, NULL);
}
else { //If the event is not free, we free it to trigger the event
if (errno!=EAGAIN) EXIT_ON_SYSTEM_ERROR(NULL);
SYSTEM_ERROR_CHECK(sem_post(blc_channel_event[blc_channel_event_id]), -1, NULL);
}
blc_channel_event_id=1-blc_channel_event_id;
}
/**Block until event. It may produce false positive in connection */
static void *blc_channel_thread_manager(void*){
int i=0;
//If the event is already free it is not the good one. We will wait for the other one
if (sem_trywait(blc_channel_event[i])==0){
SYSTEM_ERROR_CHECK(sem_post(blc_channel_event[i]), -1, NULL);
i=1-i;
}
while(1){
//We wait for event
SYSTEM_ERROR_CHECK(sem_wait(blc_channel_event[i]), -1, NULL);
//We free event for the others
SYSTEM_ERROR_CHECK(sem_post(blc_channel_event[i]), -1, NULL);
i=1-i;
while(sem_trywait(blc_channel_event[i])==0);
if (errno!=EAGAIN) EXIT_ON_SYSTEM_ERROR(NULL);
blc_channel_event_callback(blc_channel_event_user_data);
}
}
void blc_channel_check_for_event(void (*callback)(void*), void *user_data) {
if (blc_channel_event[0]==NULL || blc_channel_event[1]==NULL) init_blc_channel_sem_event();
blc_channel_event_callback=callback;
blc_channel_event_user_data=user_data;
BLC_PTHREAD_CHECK(pthread_create(&thread, NULL, blc_channel_thread_manager, NULL), NULL);
}
END_EXTERN_C
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment