-
Arnaud Blanchard authoredArnaud Blanchard authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
blc_channel.py 5.14 KiB
import os
import ctypes
from ctypes import *
from ctypes.util import find_library
class BlcDim(Structure):
_fields_ = [("length", c_size_t), ("step", c_size_t)]
class BlcChannel:
"""
blc_channel wrapper.
"""
# Bind blc_channel methods
lib_path = find_library("blc_channel")
lib = CDLL(lib_path)
lib.blc_channel_new.restype = c_void_p
lib.blc_channel_delete.argtypes = [ c_void_p ]
lib.blc_channel_get_data.argtypes = [ c_void_p ]
lib.blc_channel_get_data.restype = c_void_p
lib.blc_channel_get_type.argtypes = [ c_void_p ]
lib.blc_channel_get_type.restype = c_uint32
lib.blc_channel_get_format.argtypes = [ c_void_p ]
lib.blc_channel_get_format.restype = c_uint32
lib.blc_channel_get_dims_nb.argtypes= [ c_void_p ]
lib.blc_channel_get_dims_nb.restype = c_int
lib.blc_channel_get_dims.argtypes = [ c_void_p ]
lib.blc_channel_wait_new_data.argtypes = [ c_void_p ]
lib.blc_channel_wait_ack_data.argtypes = [ c_void_p ]
lib.blc_channel_post_new_data.argtypes = [ c_void_p ]
lib.blc_channel_post_ack_data.argtypes = [ c_void_p ]
lib.blc_channel_remove.argtypes = [ c_void_p ]
types={ 'UIN8': c_uint8, 'INT8': c_int8, 'UI16': c_uint16, 'INT16': c_int16, 'UI32': c_uint32, 'IN32': c_int32, 'UI64': c_uint64, 'IN64': c_int64, 'FL32': c_float, 'FL64': c_double}
def __init__(
self, name, mode, data_type=None,format_type=None, dims=None
):
"""
Creates or reopens a given blc_channel.
:param name: Name of the blc_channel (with sync/async identifier). Example: "/data"
:param mode: RW mode. Example: os.O_RDWR
TODO
"""
self.name = name
self.mode = mode
self.type = data_type
self.format = format_type
self.dims = dims
if (dims): #Creates
dim_types = [c_int] * len(dims)
BlcChannel.lib.blc_channel_new.argtypes = [
c_char_p,
c_int,
c_uint32,
c_uint32,
c_int,
*dim_types
]
# Create a new channel
self.channel = BlcChannel.lib.blc_channel_new(
name.encode(),
mode,
int.from_bytes(data_type.encode(), "big"),
int.from_bytes(format_type.encode(), "big"),
len(dims),
*dims
)
else: #Open
BlcChannel.lib.blc_channel_new.argtypes = [
c_char_p,
c_int,
c_uint32,
c_uint32,
c_int,
c_int
]
self.channel = BlcChannel.lib.blc_channel_new(
name.encode(),
mode,
0,
0,
0,
0
)
self.type=BlcChannel.lib.blc_channel_get_type(self.channel).to_bytes(4, "big").decode()
self.format=BlcChannel.lib.blc_channel_get_format(self.channel).to_bytes(4, "big").decode()
dims_nb=BlcChannel.lib.blc_channel_get_dims_nb(self.channel)
BlcChannel.lib.blc_channel_get_dims.restype = POINTER(BlcDim * dims_nb)
blc_dims=BlcChannel.lib.blc_channel_get_dims(self.channel)
dims=[]
for i in range(dims_nb):
dims.append(blc_dims[0][i].length) #dims[i]=
# Specify types for channel
self.total_length=1
for length in dims:
self.total_length *= length
self.ctype = BlcChannel.types[self.type]
self.array = cast(BlcChannel.lib.blc_channel_get_data(self.channel), POINTER(self.ctype))
def wait_new_data(self):
BlcChannel.lib.blc_channel_wait_new_data(self.channel)
def wait_ack_data(self):
BlcChannel.lib.blc_channel_wait_ack_data(self.channel)
def post_new_data(self):
BlcChannel.lib.blc_channel_post_new_data(self.channel)
def post_ack_data(self):
BlcChannel.lib.blc_channel_post_ack_data(self.channel)
def remove(self):
BlcChannel.lib.blc_channel_remove(self.channel)
def __del__(self):
BlcChannel.lib.blc_channel_delete(self.channel) #Fre the local object but do not destroy the system shared memory
@property
def list(self):
#cast it from
data_list = list(self.array[:self.total_length])
return data_list
if __name__ == "__main__":
print()
print("EXEMPLE: blc_channel")
print("We create a blc_channel of type unsigned char and size 3x2 on the POSIX system (shared memory)")
channel = BlcChannel("/python_test", os.O_RDWR, 'UIN8', 'RGB3', [3, 2])
print("We write 7 in column 2")
channel.array[2]=7
print("We open the shared blc_channel to read it")
channel_read = BlcChannel("/python_test", os.O_RDONLY)
print("We check that we have the same values")
print("type",channel_read.type, "format", channel_read.format, "length", channel_read.total_length)
print(channel_read.array[:channel.total_length])
print("We remove our shared memory from the system")
channel.remove()