Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
blc_channel.py 5.14 KiB
import os
import ctypes
from ctypes import *
from ctypes.util import find_library


class BlcDim(Structure):
    _fields_ = [("length", c_size_t), ("step", c_size_t)]

class BlcChannel:
    """
        blc_channel wrapper.
    """
    # Bind blc_channel methods
    lib_path = find_library("blc_channel")
    lib = CDLL(lib_path)
    lib.blc_channel_new.restype = c_void_p
    lib.blc_channel_delete.argtypes = [ c_void_p ]
    lib.blc_channel_get_data.argtypes = [ c_void_p ]
    lib.blc_channel_get_data.restype = c_void_p
    lib.blc_channel_get_type.argtypes = [ c_void_p ]
    lib.blc_channel_get_type.restype = c_uint32
    lib.blc_channel_get_format.argtypes = [ c_void_p ]
    lib.blc_channel_get_format.restype = c_uint32
    lib.blc_channel_get_dims_nb.argtypes= [ c_void_p ]
    lib.blc_channel_get_dims_nb.restype = c_int
    lib.blc_channel_get_dims.argtypes = [ c_void_p ]
    lib.blc_channel_wait_new_data.argtypes = [ c_void_p ]
    lib.blc_channel_wait_ack_data.argtypes = [ c_void_p ] 
    lib.blc_channel_post_new_data.argtypes = [ c_void_p ]
    lib.blc_channel_post_ack_data.argtypes = [ c_void_p ]
    lib.blc_channel_remove.argtypes = [ c_void_p ]
    
    
    types={ 'UIN8': c_uint8, 'INT8': c_int8, 'UI16': c_uint16, 'INT16': c_int16, 'UI32': c_uint32, 'IN32': c_int32, 'UI64': c_uint64, 'IN64': c_int64, 'FL32': c_float, 'FL64': c_double}

    def __init__(
        self, name, mode, data_type=None,format_type=None, dims=None
    ):
        """
            Creates or reopens a given blc_channel.
            :param name: Name of the blc_channel (with sync/async identifier). Example: "/data"
            :param mode: RW mode. Example: os.O_RDWR

            TODO
        """
        self.name = name
        self.mode = mode
        self.type = data_type
        self.format = format_type
        self.dims = dims
                
        if (dims): #Creates
           
            dim_types = [c_int] * len(dims)

            BlcChannel.lib.blc_channel_new.argtypes = [
                c_char_p,
                c_int,
                c_uint32,
                c_uint32,
                c_int,
                *dim_types
            ]

            # Create a new channel
            self.channel = BlcChannel.lib.blc_channel_new(
                name.encode(),
                mode,
                int.from_bytes(data_type.encode(), "big"),
                int.from_bytes(format_type.encode(), "big"),
                len(dims),
                *dims
            )
    
        else: #Open 
            BlcChannel.lib.blc_channel_new.argtypes = [
                c_char_p,
                c_int,
                c_uint32,
                c_uint32,
                c_int,
                c_int
            ]
            self.channel = BlcChannel.lib.blc_channel_new(
                name.encode(),
                mode, 
                0,
                0,
                0,
                0
            )
            self.type=BlcChannel.lib.blc_channel_get_type(self.channel).to_bytes(4, "big").decode()
            self.format=BlcChannel.lib.blc_channel_get_format(self.channel).to_bytes(4, "big").decode()

            dims_nb=BlcChannel.lib.blc_channel_get_dims_nb(self.channel)
            BlcChannel.lib.blc_channel_get_dims.restype = POINTER(BlcDim  * dims_nb)
            blc_dims=BlcChannel.lib.blc_channel_get_dims(self.channel)
            dims=[]
            for i in range(dims_nb):
                dims.append(blc_dims[0][i].length) #dims[i]=
            
        # Specify types for channel
        self.total_length=1
        for length in dims:
            self.total_length *= length

        self.ctype = BlcChannel.types[self.type]
        self.array = cast(BlcChannel.lib.blc_channel_get_data(self.channel), POINTER(self.ctype))
    
    def wait_new_data(self):
        BlcChannel.lib.blc_channel_wait_new_data(self.channel)
    
    def wait_ack_data(self):
        BlcChannel.lib.blc_channel_wait_ack_data(self.channel)

    def post_new_data(self):
        BlcChannel.lib.blc_channel_post_new_data(self.channel)

    def post_ack_data(self):
        BlcChannel.lib.blc_channel_post_ack_data(self.channel)
    
    def remove(self):
        BlcChannel.lib.blc_channel_remove(self.channel)
    
    def __del__(self):
        BlcChannel.lib.blc_channel_delete(self.channel) #Fre the local object but do not destroy the system shared memory
    

    @property
    def list(self):
        #cast it from
        data_list = list(self.array[:self.total_length])
        return data_list

if __name__ == "__main__":
    print()
    print("EXEMPLE: blc_channel")
    print("We create a blc_channel of type unsigned char and size 3x2 on the POSIX system (shared memory)")
    channel = BlcChannel("/python_test", os.O_RDWR, 'UIN8', 'RGB3', [3, 2])
    print("We write 7 in column 2")
    channel.array[2]=7
    print("We open the shared blc_channel to read it")
    channel_read = BlcChannel("/python_test", os.O_RDONLY)
    print("We check that we have the same values")
    print("type",channel_read.type, "format", channel_read.format, "length", channel_read.total_length)
    print(channel_read.array[:channel.total_length])
    print("We remove our shared memory from the system")
    channel.remove()