Skip to content

API Documentation

Communicator

Bases: CommunicatorService

Source code in src/cuemsutils/tools/CommunicatorServices.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
class Communicator(CommunicatorService):
    def __init__(
        self,
        address:str,
        communicator_service:CommunicatorService = NngRequestResponse,
        requester_dials:bool = True
    ):
        """
        Initialize Communicator instance with address and communicator service.

        Parameters:
        - address (str): The address to connect or listen for connections.
        - communicator_service (Callable[[str, bool], CommunicatorService]): The communicator service to use.
        - requester_dials (bool): If True, the instance will dial the address. If False, it will listen for connections.
        """
        try:
            check_path(address)
        except PermissionError as e:
            Logger.error(e)
            sys.exit(1)
        except FileNotFoundError:
            try:
                check_path(address, dir_only = True)
            except (NotADirectoryError, PermissionError) as e:
                Logger.error(e)
                sys.exit(1)

        if address[0:6] != 'ipc://':
            address = "ipc://" + address
        self.address = address
        self.requester_dials = requester_dials
        self.communicator_service:CommunicatorService = communicator_service(self.address, requester_dials=self.requester_dials)

    async def send_request(self, request):
        response = await self.communicator_service.send_request(request)
        return response

    async def reply(self, request_processor):
       await self.communicator_service.reply(request_processor)

    async def responder_connect(self):
        self._has_callable('responder_connect')
        await self.communicator_service.responder_connect()

    async def responder_get_request(self, callback):
        self._has_callable('responder_get_request')
        await self.communicator_service.responder_get_request(callback)

    async def responder_post_reply(self, response, context): 
        self._has_callable('responder_post_reply')
        await self.communicator_service.responder_post_reply(response, context)

    def _has_callable(self, name):
        """Check if communicator_service has callable attribute of the given name"""
        if not hasattr(self.communicator_service, name):
            raise AttributeError(f"{name} is not an attribute of {type(self.communicator_service)}")
        if not callable(getattr(self.communicator_service, name)):
            raise AttributeError(f"{name} is not a callable attribute of {type(self.communicator_service)}")

__init__(address, communicator_service=NngRequestResponse, requester_dials=True)

Initialize Communicator instance with address and communicator service.

Parameters: - address (str): The address to connect or listen for connections. - communicator_service (Callable[[str, bool], CommunicatorService]): The communicator service to use. - requester_dials (bool): If True, the instance will dial the address. If False, it will listen for connections.

Source code in src/cuemsutils/tools/CommunicatorServices.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def __init__(
    self,
    address:str,
    communicator_service:CommunicatorService = NngRequestResponse,
    requester_dials:bool = True
):
    """
    Initialize Communicator instance with address and communicator service.

    Parameters:
    - address (str): The address to connect or listen for connections.
    - communicator_service (Callable[[str, bool], CommunicatorService]): The communicator service to use.
    - requester_dials (bool): If True, the instance will dial the address. If False, it will listen for connections.
    """
    try:
        check_path(address)
    except PermissionError as e:
        Logger.error(e)
        sys.exit(1)
    except FileNotFoundError:
        try:
            check_path(address, dir_only = True)
        except (NotADirectoryError, PermissionError) as e:
            Logger.error(e)
            sys.exit(1)

    if address[0:6] != 'ipc://':
        address = "ipc://" + address
    self.address = address
    self.requester_dials = requester_dials
    self.communicator_service:CommunicatorService = communicator_service(self.address, requester_dials=self.requester_dials)

CommunicatorService

Bases: ABC

Source code in src/cuemsutils/tools/CommunicatorServices.py
20
21
22
23
24
25
26
27
28
29
30
31
class CommunicatorService(ABC):
    @abstractmethod
    def __init__(self, address:str):
        self.address = address

    @abstractmethod
    def send_request(self, request:dict) -> dict:
        """ Send request dic and return response dict  """

    @abstractmethod
    def reply(self, request_processor:Callable[[dict], dict]) -> dict:
        """ Get request, give it to request processor, and return the response from it  """

reply(request_processor) abstractmethod

Get request, give it to request processor, and return the response from it

Source code in src/cuemsutils/tools/CommunicatorServices.py
29
30
31
@abstractmethod
def reply(self, request_processor:Callable[[dict], dict]) -> dict:
    """ Get request, give it to request processor, and return the response from it  """

send_request(request) abstractmethod

Send request dic and return response dict

Source code in src/cuemsutils/tools/CommunicatorServices.py
25
26
27
@abstractmethod
def send_request(self, request:dict) -> dict:
    """ Send request dic and return response dict  """

IpcAddress

Bases: Enum

IPC addresses for the different services

Source code in src/cuemsutils/tools/CommunicatorServices.py
14
15
16
17
18
class IpcAddress(Enum):
    """ IPC addresses for the different services """
    HWDISCOVERY = '/tmp/hwdiscovery.ipc'
    NODECONF = '/tmp/nodeconf.ipc'
    EDITOR = '/tmp/editor.ipc'

NngRequestResponse

Bases: CommunicatorService

Communicates over NNG (nanomsg) using a Request-Response protocol

Source code in src/cuemsutils/tools/CommunicatorServices.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class NngRequestResponse(CommunicatorService):
    """ Communicates over NNG (nanomsg) using a Request-Response protocol"""
    def __init__(self, address, requester_dials=True):
        """
        Initialize NngRequestResponse instance with address and dialing/listening mode.

        Parameters:
        - address (str): The address to connect or listen for connections.
        - requester_dials (bool, optional): If True, the instance requester will dial the address and replier will listen. If False, it will be the opposite way, requester listens and replier dials. Default is True.

        The instance will set up the parameters for request and reply sockets based on the requester_dials value.
        """
        self.address = address
        if requester_dials:
            self.params_request = {'dial': self.address}
            self.params_reply = {'listen': self.address}
        else: 
            self.params_request = {'listen': self.address}
            self.params_reply = {'dial': self.address}

    async def send_request(self, request):
        """
        Send a request to the specified address and return the response.

        Parameters:
        - request (dict): The request to be sent. It should be a dictionary.

        Returns:
        - dict: The response received from the address. It will be a dictionary.
        """
        try:
            with Req0(**self.params_request) as socket:
                while await asyncio.sleep(0, result=True):
                    Logger.debug(f"Sending: {request}")
                    encoded_request = json.dumps(request).encode()
                    await socket.asend(encoded_request)
                    response = await self._get_response(socket)
                    decoded_response = json.loads(response.decode())
                    Logger.debug(f"receiving: {decoded_response}")
                    return decoded_response
        except Exception as e:
            Logger.error(f"Error occurred while sending request: {e}")
            return None

    async def _get_response(self, socket: Req0):
        """
        Get the response from the socket.

        Parameters:
        - socket (Req0): The socket to get the response from.

        Returns:
        - bytes: The response from the socket.
        """
        response = await socket.arecv()
        return response

    async def reply(self, request_processor):
        """
        Asynchronously handle incoming requests and respond using the provided request processor.

        This function sets up a Rep0 socket with parameters based on the instance's configuration.
        It then enters a loop where it listens for incoming requests, processes them using the provided
        request processor, and sends the response back to the requester.
        Parameters:
        - request_processor (Callable[[dict], dict]): A function that takes a request dictionary as input and returns a response dictionary.

        Returns:
        - None: This function is designed to run indefinitely, handling incoming requests and responses.
        """
        try:
            with Rep0(**self.params_reply) as socket:
                while await asyncio.sleep(0, result=True):
                    request = await socket.arecv()
                    decoded_request = json.loads(request.decode())  # Parse the JSON request
                    Logger.debug(f"Received: {decoded_request}")
                    if asyncio.iscoroutinefunction(request_processor):
                        response = await request_processor(decoded_request)
                    else:
                        loop = asyncio.get_event_loop()
                        response = await loop.run_in_executor(None, partial(request_processor, decoded_request))
                    await self._respond(socket, response)
        except Exception as e:
            Logger.error(f"Error occurred while handling request: {e}")

    async def _respond(self, socket, response):
        try:
            encoded_response = json.dumps(response).encode()
            Logger.debug(f"Sending: {encoded_response}")
            await socket.asend(encoded_response)
        except Exception as e:
            Logger.error(f"Error occurred while sending response: {e}")

    async def responder_connect(self):
        self.responder = Rep0(**self.params_reply)

    async def responder_get_request(self, callback):
        try:
            context = self.responder.new_context()
            request = await context.arecv()
            decoded_request = json.loads(request.decode())  # Parse the JSON request
            Logger.debug(f"Received: {decoded_request}")
            if asyncio.iscoroutinefunction(callback):
                Logger.debug(f"Calling callback function async")
                await callback(decoded_request, context)
            else:
                loop = asyncio.get_event_loop()
                Logger.debug(f"Calling sync callback function in executor")
                await loop.run_in_executor(None, partial(callback, decoded_request, context))
        except Exception as e:
            Logger.error(f"Error occurred while handling request: {e}")

    async def responder_post_reply(self, response, context):
        try:
            await self._respond(context, response)
        except Exception as e:
            Logger.error(f"Error occurred while sending response: {e}")
        finally:
            context.close()

__init__(address, requester_dials=True)

Initialize NngRequestResponse instance with address and dialing/listening mode.

Parameters: - address (str): The address to connect or listen for connections. - requester_dials (bool, optional): If True, the instance requester will dial the address and replier will listen. If False, it will be the opposite way, requester listens and replier dials. Default is True.

The instance will set up the parameters for request and reply sockets based on the requester_dials value.

Source code in src/cuemsutils/tools/CommunicatorServices.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, address, requester_dials=True):
    """
    Initialize NngRequestResponse instance with address and dialing/listening mode.

    Parameters:
    - address (str): The address to connect or listen for connections.
    - requester_dials (bool, optional): If True, the instance requester will dial the address and replier will listen. If False, it will be the opposite way, requester listens and replier dials. Default is True.

    The instance will set up the parameters for request and reply sockets based on the requester_dials value.
    """
    self.address = address
    if requester_dials:
        self.params_request = {'dial': self.address}
        self.params_reply = {'listen': self.address}
    else: 
        self.params_request = {'listen': self.address}
        self.params_reply = {'dial': self.address}

reply(request_processor) async

Asynchronously handle incoming requests and respond using the provided request processor.

This function sets up a Rep0 socket with parameters based on the instance's configuration. It then enters a loop where it listens for incoming requests, processes them using the provided request processor, and sends the response back to the requester. Parameters: - request_processor (Callable[[dict], dict]): A function that takes a request dictionary as input and returns a response dictionary.

Returns: - None: This function is designed to run indefinitely, handling incoming requests and responses.

Source code in src/cuemsutils/tools/CommunicatorServices.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def reply(self, request_processor):
    """
    Asynchronously handle incoming requests and respond using the provided request processor.

    This function sets up a Rep0 socket with parameters based on the instance's configuration.
    It then enters a loop where it listens for incoming requests, processes them using the provided
    request processor, and sends the response back to the requester.
    Parameters:
    - request_processor (Callable[[dict], dict]): A function that takes a request dictionary as input and returns a response dictionary.

    Returns:
    - None: This function is designed to run indefinitely, handling incoming requests and responses.
    """
    try:
        with Rep0(**self.params_reply) as socket:
            while await asyncio.sleep(0, result=True):
                request = await socket.arecv()
                decoded_request = json.loads(request.decode())  # Parse the JSON request
                Logger.debug(f"Received: {decoded_request}")
                if asyncio.iscoroutinefunction(request_processor):
                    response = await request_processor(decoded_request)
                else:
                    loop = asyncio.get_event_loop()
                    response = await loop.run_in_executor(None, partial(request_processor, decoded_request))
                await self._respond(socket, response)
    except Exception as e:
        Logger.error(f"Error occurred while handling request: {e}")

send_request(request) async

Send a request to the specified address and return the response.

Parameters: - request (dict): The request to be sent. It should be a dictionary.

Returns: - dict: The response received from the address. It will be a dictionary.

Source code in src/cuemsutils/tools/CommunicatorServices.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def send_request(self, request):
    """
    Send a request to the specified address and return the response.

    Parameters:
    - request (dict): The request to be sent. It should be a dictionary.

    Returns:
    - dict: The response received from the address. It will be a dictionary.
    """
    try:
        with Req0(**self.params_request) as socket:
            while await asyncio.sleep(0, result=True):
                Logger.debug(f"Sending: {request}")
                encoded_request = json.dumps(request).encode()
                await socket.asend(encoded_request)
                response = await self._get_response(socket)
                decoded_response = json.loads(response.decode())
                Logger.debug(f"receiving: {decoded_response}")
                return decoded_response
    except Exception as e:
        Logger.error(f"Error occurred while sending request: {e}")
        return None

ConfigBase

Source code in src/cuemsutils/tools/ConfigBase.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class ConfigBase():
    def __init__(self, config_dir: str):
        self.load_base_settings(config_dir)

    @logged
    def load_base_settings(self, base_dir: str):
        try:
            dir = environ['CUEMS_CONF_PATH']
        except KeyError:
            dir = base_dir
        self.config_dir = dir

        try:
            settings = Settings(self.conf_path('settings.xml'))
            self.settings = settings.get_dict()
        except Exception as e:
            Logger.exception(f'Exception catched while loading settings: {e}')
            raise e

    # HELPER FUNCTIONS #
    def conf_path(self, file_name: str) -> str:
        """
        Returns the path to the configuration file.

        Args:
            file_name (str): The name of the file to be checked.

        Returns:
            str: The path to the configuration file.

        Raises:
            FileNotFoundError: If the configuration file does not exist.
        """
        conf_path = path.join(self.config_dir, file_name)
        if not path.exists(conf_path):
            raise FileNotFoundError(f'Configuration file {conf_path} not found')
        return conf_path

    def set_dir_hierarchy(self) -> None:
        """
        Sets the directory hierarchy for the library path.
        """      
        dirs = [
            'projects',
            'media',
            path.join('media', 'waveforms'),
            path.join('media', 'thumbnails')
        ]
        trash = [path.join('trash', i) for i in dirs]
        dirs.extend(trash)

        paths_to_check = [path.join(self.library_path, i) for i in dirs]
        paths_to_check.append(self.tmp_path)

        try:
            for each_path in paths_to_check:
                mkdir_recursive(each_path)
        except Exception as e:
            Logger.error("error: {} {}".format(type(e), e))

    # CLASS PROPERTIES #
    @property
    def config_dir(self):
        return self._config_dir

    @config_dir.setter
    def config_dir(self, value: str):
        if not path.exists(value):
            raise FileNotFoundError(f'Configuration directory {value} not found')
        self._config_dir = value

    @property
    def library_path(self):
        return self.settings['library_path']

    @property
    def tmp_path(self):
        return self.settings['tmp_path']

    @property
    def database_name(self):
        return self.settings['database_name']

    @property
    def show_lock_file(self):
        return self.settings['show_lock_file']

    @property
    def editor_url(self):
        return self.settings['editor_url']

    @property
    def controller_url(self):
        return self.settings['controller_url']

    @property
    def templates_path(self):
        return self.settings['templates_path']

    @property
    def controller_interfaces_template(self):
        return self.settings['controller_interfaces_template']

    @property
    def node_interfaces_template(self):
        return self.settings['node_interfaces_template']

    @property
    def controller_lock_file(self):
        return self.settings['controller_lock_file']

    @property
    def node_conf(self):
        return self.settings['node']

    @property
    def node_uuid(self):
        return self.node_conf['uuid']

    @property
    def host_name(self):
        return f"{self.node_uuid.split('-')[-1]}.local"

    @property
    def node_url(self):
        return f'http://{self.host_name}'

    @property
    def osc_initial_port(self):
        return self.node_conf['osc_in_port_base']

conf_path(file_name)

Returns the path to the configuration file.

Parameters:

Name Type Description Default
file_name str

The name of the file to be checked.

required

Returns:

Name Type Description
str str

The path to the configuration file.

Raises:

Type Description
FileNotFoundError

If the configuration file does not exist.

Source code in src/cuemsutils/tools/ConfigBase.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def conf_path(self, file_name: str) -> str:
    """
    Returns the path to the configuration file.

    Args:
        file_name (str): The name of the file to be checked.

    Returns:
        str: The path to the configuration file.

    Raises:
        FileNotFoundError: If the configuration file does not exist.
    """
    conf_path = path.join(self.config_dir, file_name)
    if not path.exists(conf_path):
        raise FileNotFoundError(f'Configuration file {conf_path} not found')
    return conf_path

set_dir_hierarchy()

Sets the directory hierarchy for the library path.

Source code in src/cuemsutils/tools/ConfigBase.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def set_dir_hierarchy(self) -> None:
    """
    Sets the directory hierarchy for the library path.
    """      
    dirs = [
        'projects',
        'media',
        path.join('media', 'waveforms'),
        path.join('media', 'thumbnails')
    ]
    trash = [path.join('trash', i) for i in dirs]
    dirs.extend(trash)

    paths_to_check = [path.join(self.library_path, i) for i in dirs]
    paths_to_check.append(self.tmp_path)

    try:
        for each_path in paths_to_check:
            mkdir_recursive(each_path)
    except Exception as e:
        Logger.error("error: {} {}".format(type(e), e))

ConfigManager

Bases: ConfigBase

Source code in src/cuemsutils/tools/ConfigManager.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
class ConfigManager(ConfigBase):
    def __init__(self, config_dir: str = CUEMS_CONF_PATH, load_all: bool = True):
        """
        ConfigManager constructor.
        This class is responsible for loading the configuration files and providing
        the configuration data to the rest of the application.

        It also provides methods to check the project files and to load them on demand.

        If load_all is True, the configuration files will be loaded and the configuration
        will be available for the rest of the application on object initialization.
        If load_all is False, the configuration will be loaded on demand.

        Base configuration directory is set to /etc/cuems/ by default.
        If the environment variable CUEMS_CONF_PATH is set, it will be used instead.
        If config_dir parameter is set, it will override the default value.

        Specifically, base configuration directory precedence is:
        - Environment variable CUEMS_CONF_PATH
        - config_dir parameter
        - /etc/cuems/ (i.e. CUEMS_CONF_PATH constant value) (default value)

        Args:
            config_dir (str): The directory containing the configuration files.
            load_all (bool): Whether to load all the configuration files.

        Raises:
            Exception: If the configuration files are not found.
        """
        # Initialize with default values
        self.project_name = ''
        self.using_default_mappings = False
        self.network_map = {}
        self.network_mappings = {}
        self.node_mappings = {}
        self.node_hw_outputs = {
            'audio_inputs':[],
            'audio_outputs':[],
            'video_inputs':[],
            'video_outputs':[],
            'dmx_inputs':[],
            'dmx_outputs':[]
        }
        super().__init__(config_dir)

        if load_all:
            self.load_config()

    @property
    def network_map(self):
        return self._network_map

    @network_map.setter
    def network_map(self, value: dict[str, Any]):
        self._network_map = value

    @property
    def node_network_map(self):
        return self._node_network_map

    @node_network_map.setter
    def node_network_map(self, value: NetworkMap | dict):
        if isinstance(value, NetworkMap):
            self._node_network_map = value.get_node(self.node_uuid)
        else:
            self._node_network_map = value

    @property
    def mappings(self):
        return self._mappings

    @mappings.setter
    def mappings(self, value: dict[str, Any]):
        self._mappings = value

    @property
    def node_mappings(self):
        return self._node_mappings

    @node_mappings.setter
    def node_mappings(self, value: ProjectMappings | dict[str, Any]):
        if isinstance(value, ProjectMappings):
            self._node_mappings = value.get_node(self.node_uuid)
        else:
            self._node_mappings = value

    @logged
    def load_config(self) -> None:
        """
        Loads the system configuration.
        """
        # Initialize with empty values
        self.network_map = {}
        self.network_mappings = {}
        self.node_mappings = {}
        self.node_hw_outputs = {
            'audio_inputs':[],
            'audio_outputs':[],
            'video_inputs':[],
            'video_outputs':[],
            'dmx_inputs':[],
            'dmx_outputs':[]
        }

        self.set_dir_hierarchy()
        self.load_network_map()
        self.load_net_and_node_mappings()

    def load_network_map(self):
        """
        Loads the network map from the base configuration file.
        """
        try:
            netmap = NetworkMap(self.conf_path('network_map.xml'))
            self.network_map = netmap.get_dict()
            self.node_network_map = netmap
        except Exception as e:
            Logger.exception(f'Exception catched while loading network map: {e}')
            raise e

    def load_net_and_node_mappings(self):
        """
        Loads the network and node mappings.
        """
        try:
            mappings_file = self.project_path(self.project_name, 'mappings.xml')
        except FileNotFoundError as e:
            mappings_file = self.conf_path('default_mappings.xml')

        try:
            project_mappings = ProjectMappings(mappings_file)
            self.mappings = project_mappings.processed # type: ignore[attr-defined]
        except Exception as e:
            Logger.exception(f'Exception catched while loading mappings file: {e}')
            raise e

        self.node_mappings = project_mappings.get_node(self.node_conf['uuid'])
        Logger.debug(f"Node uuid is: {self.node_conf['uuid']}")
        # Build node_hw_outputs: the physical port name (mapped_to) is what the
        # engine needs (e.g. the JACK port for audio, DRM connector for video).
        # <name> is now a human-readable label; <mapped_to> is the real target.
        # Fall back to <name> for legacy entries that have no mappings.
        # e.g: node_hw_outputs["audio_outputs"] = ["system:playback_1", "system:playback_2"]
        for section, content in self.node_mappings.items():
            if isinstance(content, list):
                for port_type_dict in content:
                    for port_types, port_type_list in port_type_dict.items():
                        for port in port_type_list:
                            for port_type, port_type_content in port.items():
                                mappings = port_type_content.get('mappings', [])
                                if mappings:
                                    hw_name = mappings[0]['mapped_to']
                                else:
                                    hw_name = port_type_content['name']
                                self.node_hw_outputs[section+'_'+port_types].append(hw_name)

        Logger.debug(f"Node hardware outputs are: {self.node_hw_outputs}")

    @logged
    def load_project_config(self, project_uname: str) -> None:
        """
        Loads the project configuration.

        Args:
            project_uname (str): The name of the project.
        """
        ## Initialize with empty values
        self.project_conf = {}
        self.project_mappings = {}
        self.project_node_mappings = {}
        self.project_default_outputs = {}

        self.project_name = project_uname

        self.load_project_settings(project_uname)
        self.load_project_mappings(project_uname)

    def load_project_settings(self, project_uname: str):
        """
        Loads the project settings from the project file.
        """
        try:
            settings_path = self.project_path(project_uname, 'settings.xml')
            conf = ProjectSettings(
                schema='project_settings',
                xmlfile=settings_path
            )
        except FileNotFoundError as e:
            Logger.info(
                f'Project {project_uname} settings not found. Keeping default settings.'
            )
            return
        except Exception as e:
            Logger.exception(f'Exception in load_project_settings: {e}')
            raise e

        self.project_conf = conf.get_dict()
        for key, value in self.project_conf.items():
            corrected_dict = {}
            if value:
                for item in value:
                    corrected_dict.update(item)
                self.project_conf[key] = corrected_dict

        Logger.info(f'Project {project_uname} settings loaded')

    def load_project_mappings(self, project_uname: str):
        """
        Loads the project mappings from the project file.
        """
        try:
            mappings_path = self.project_path(project_uname, 'mappings.xml')
            project_mappings = ProjectMappings(mappings_path)
            self.project_mappings = project_mappings.processed
            try:
                self.project_node_mappings = project_mappings.get_node(self.node_uuid)
            except ValueError:
                Logger.warning(
                    f'No mappings assigned for this node in project {project_uname}'
                )
        except FileNotFoundError as e:
            Logger.info(f'Project mappings not found. Adopting default mappings.')
            self.project_mappings = self.mappings
            self.project_node_mappings = self.node_mappings
        except Exception as e:
            Logger.exception(f'Exception in load_project_mappings: {e}')
            raise e

        self.number_of_nodes = int(self.mappings['number_of_nodes']) # type: ignore[index]
        Logger.info(f'Project {project_uname} mappings loaded')

    def get_video_output_id(self, mapping_name: str):
        """
        Returns the video output id for the given mapping name.
        """
        if mapping_name == 'default':
            return self.node_conf['default_video_output']
        else:
            if 'outputs' in self.project_node_mappings['video'].keys():
                for each_out in self.project_node_mappings['video']['outputs']:
                    for each_map in each_out['mappings']:
                        if mapping_name == each_map['mapped_to']:
                            return each_out['name']

        raise Exception(f'Video output wrongly mapped')

    def get_audio_output_id(self, mapping_name: str):
        """
        Returns the audio output id for the given mapping name.
        """
        if mapping_name == 'default':
            return self.node_conf['default_audio_output']
        else:
            for each_out in self.project_mappings['audio']['outputs']: # type: ignore[index]
                for each_map in each_out[0]['mappings']:
                    if mapping_name == each_map['mapped_to']:
                        return each_out[0]['name']

        raise Exception(f'Audio output wrongly mapped')

    def check_project_mappings(self) -> bool:
        """
        Checks if the project mappings are correct.
        """
        if self.using_default_mappings:
            return True

        nodes_to_check = [self.project_node_mappings]
        for node in nodes_to_check:
            for area, contents in node.items():
                if isinstance(contents, dict):
                    for section, elements in contents.items():
                        for element in elements:
                            if element['name'] not in self.node_hw_outputs[f'{area}_{section}']:
                                err_str = f'Project {area} {section} mapping incorrect: {element["name"]} not present in node: {self.node_conf["uuid"]}'
                                Logger.error(err_str)
                                raise Exception(err_str)
        return True

    ## helper functions
    def project_path(self, project_uname: str, file_name: str) -> str:
        """
        Returns the path to the project file if it exists.

        Args:
            project_uname (str): The name of the project.
            file_name (str): The name of the file to be checked.

        Returns:
            str: The path to the project file.

        Raises:
            FileNotFoundError: If the project file does not exist.
        """
        project_path = path.join(self.library_path, 'projects', project_uname, file_name)
        if not path.exists(project_path):
            raise FileNotFoundError(f'Project file {project_path} not found')
        return project_path

__init__(config_dir=CUEMS_CONF_PATH, load_all=True)

ConfigManager constructor. This class is responsible for loading the configuration files and providing the configuration data to the rest of the application.

It also provides methods to check the project files and to load them on demand.

If load_all is True, the configuration files will be loaded and the configuration will be available for the rest of the application on object initialization. If load_all is False, the configuration will be loaded on demand.

Base configuration directory is set to /etc/cuems/ by default. If the environment variable CUEMS_CONF_PATH is set, it will be used instead. If config_dir parameter is set, it will override the default value.

Specifically, base configuration directory precedence is: - Environment variable CUEMS_CONF_PATH - config_dir parameter - /etc/cuems/ (i.e. CUEMS_CONF_PATH constant value) (default value)

Parameters:

Name Type Description Default
config_dir str

The directory containing the configuration files.

CUEMS_CONF_PATH
load_all bool

Whether to load all the configuration files.

True

Raises:

Type Description
Exception

If the configuration files are not found.

Source code in src/cuemsutils/tools/ConfigManager.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def __init__(self, config_dir: str = CUEMS_CONF_PATH, load_all: bool = True):
    """
    ConfigManager constructor.
    This class is responsible for loading the configuration files and providing
    the configuration data to the rest of the application.

    It also provides methods to check the project files and to load them on demand.

    If load_all is True, the configuration files will be loaded and the configuration
    will be available for the rest of the application on object initialization.
    If load_all is False, the configuration will be loaded on demand.

    Base configuration directory is set to /etc/cuems/ by default.
    If the environment variable CUEMS_CONF_PATH is set, it will be used instead.
    If config_dir parameter is set, it will override the default value.

    Specifically, base configuration directory precedence is:
    - Environment variable CUEMS_CONF_PATH
    - config_dir parameter
    - /etc/cuems/ (i.e. CUEMS_CONF_PATH constant value) (default value)

    Args:
        config_dir (str): The directory containing the configuration files.
        load_all (bool): Whether to load all the configuration files.

    Raises:
        Exception: If the configuration files are not found.
    """
    # Initialize with default values
    self.project_name = ''
    self.using_default_mappings = False
    self.network_map = {}
    self.network_mappings = {}
    self.node_mappings = {}
    self.node_hw_outputs = {
        'audio_inputs':[],
        'audio_outputs':[],
        'video_inputs':[],
        'video_outputs':[],
        'dmx_inputs':[],
        'dmx_outputs':[]
    }
    super().__init__(config_dir)

    if load_all:
        self.load_config()

check_project_mappings()

Checks if the project mappings are correct.

Source code in src/cuemsutils/tools/ConfigManager.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
def check_project_mappings(self) -> bool:
    """
    Checks if the project mappings are correct.
    """
    if self.using_default_mappings:
        return True

    nodes_to_check = [self.project_node_mappings]
    for node in nodes_to_check:
        for area, contents in node.items():
            if isinstance(contents, dict):
                for section, elements in contents.items():
                    for element in elements:
                        if element['name'] not in self.node_hw_outputs[f'{area}_{section}']:
                            err_str = f'Project {area} {section} mapping incorrect: {element["name"]} not present in node: {self.node_conf["uuid"]}'
                            Logger.error(err_str)
                            raise Exception(err_str)
    return True

get_audio_output_id(mapping_name)

Returns the audio output id for the given mapping name.

Source code in src/cuemsutils/tools/ConfigManager.py
256
257
258
259
260
261
262
263
264
265
266
267
268
def get_audio_output_id(self, mapping_name: str):
    """
    Returns the audio output id for the given mapping name.
    """
    if mapping_name == 'default':
        return self.node_conf['default_audio_output']
    else:
        for each_out in self.project_mappings['audio']['outputs']: # type: ignore[index]
            for each_map in each_out[0]['mappings']:
                if mapping_name == each_map['mapped_to']:
                    return each_out[0]['name']

    raise Exception(f'Audio output wrongly mapped')

get_video_output_id(mapping_name)

Returns the video output id for the given mapping name.

Source code in src/cuemsutils/tools/ConfigManager.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
def get_video_output_id(self, mapping_name: str):
    """
    Returns the video output id for the given mapping name.
    """
    if mapping_name == 'default':
        return self.node_conf['default_video_output']
    else:
        if 'outputs' in self.project_node_mappings['video'].keys():
            for each_out in self.project_node_mappings['video']['outputs']:
                for each_map in each_out['mappings']:
                    if mapping_name == each_map['mapped_to']:
                        return each_out['name']

    raise Exception(f'Video output wrongly mapped')

load_config()

Loads the system configuration.

Source code in src/cuemsutils/tools/ConfigManager.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
@logged
def load_config(self) -> None:
    """
    Loads the system configuration.
    """
    # Initialize with empty values
    self.network_map = {}
    self.network_mappings = {}
    self.node_mappings = {}
    self.node_hw_outputs = {
        'audio_inputs':[],
        'audio_outputs':[],
        'video_inputs':[],
        'video_outputs':[],
        'dmx_inputs':[],
        'dmx_outputs':[]
    }

    self.set_dir_hierarchy()
    self.load_network_map()
    self.load_net_and_node_mappings()

load_net_and_node_mappings()

Loads the network and node mappings.

Source code in src/cuemsutils/tools/ConfigManager.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def load_net_and_node_mappings(self):
    """
    Loads the network and node mappings.
    """
    try:
        mappings_file = self.project_path(self.project_name, 'mappings.xml')
    except FileNotFoundError as e:
        mappings_file = self.conf_path('default_mappings.xml')

    try:
        project_mappings = ProjectMappings(mappings_file)
        self.mappings = project_mappings.processed # type: ignore[attr-defined]
    except Exception as e:
        Logger.exception(f'Exception catched while loading mappings file: {e}')
        raise e

    self.node_mappings = project_mappings.get_node(self.node_conf['uuid'])
    Logger.debug(f"Node uuid is: {self.node_conf['uuid']}")
    # Build node_hw_outputs: the physical port name (mapped_to) is what the
    # engine needs (e.g. the JACK port for audio, DRM connector for video).
    # <name> is now a human-readable label; <mapped_to> is the real target.
    # Fall back to <name> for legacy entries that have no mappings.
    # e.g: node_hw_outputs["audio_outputs"] = ["system:playback_1", "system:playback_2"]
    for section, content in self.node_mappings.items():
        if isinstance(content, list):
            for port_type_dict in content:
                for port_types, port_type_list in port_type_dict.items():
                    for port in port_type_list:
                        for port_type, port_type_content in port.items():
                            mappings = port_type_content.get('mappings', [])
                            if mappings:
                                hw_name = mappings[0]['mapped_to']
                            else:
                                hw_name = port_type_content['name']
                            self.node_hw_outputs[section+'_'+port_types].append(hw_name)

    Logger.debug(f"Node hardware outputs are: {self.node_hw_outputs}")

load_network_map()

Loads the network map from the base configuration file.

Source code in src/cuemsutils/tools/ConfigManager.py
118
119
120
121
122
123
124
125
126
127
128
def load_network_map(self):
    """
    Loads the network map from the base configuration file.
    """
    try:
        netmap = NetworkMap(self.conf_path('network_map.xml'))
        self.network_map = netmap.get_dict()
        self.node_network_map = netmap
    except Exception as e:
        Logger.exception(f'Exception catched while loading network map: {e}')
        raise e

load_project_config(project_uname)

Loads the project configuration.

Parameters:

Name Type Description Default
project_uname str

The name of the project.

required
Source code in src/cuemsutils/tools/ConfigManager.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
@logged
def load_project_config(self, project_uname: str) -> None:
    """
    Loads the project configuration.

    Args:
        project_uname (str): The name of the project.
    """
    ## Initialize with empty values
    self.project_conf = {}
    self.project_mappings = {}
    self.project_node_mappings = {}
    self.project_default_outputs = {}

    self.project_name = project_uname

    self.load_project_settings(project_uname)
    self.load_project_mappings(project_uname)

load_project_mappings(project_uname)

Loads the project mappings from the project file.

Source code in src/cuemsutils/tools/ConfigManager.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def load_project_mappings(self, project_uname: str):
    """
    Loads the project mappings from the project file.
    """
    try:
        mappings_path = self.project_path(project_uname, 'mappings.xml')
        project_mappings = ProjectMappings(mappings_path)
        self.project_mappings = project_mappings.processed
        try:
            self.project_node_mappings = project_mappings.get_node(self.node_uuid)
        except ValueError:
            Logger.warning(
                f'No mappings assigned for this node in project {project_uname}'
            )
    except FileNotFoundError as e:
        Logger.info(f'Project mappings not found. Adopting default mappings.')
        self.project_mappings = self.mappings
        self.project_node_mappings = self.node_mappings
    except Exception as e:
        Logger.exception(f'Exception in load_project_mappings: {e}')
        raise e

    self.number_of_nodes = int(self.mappings['number_of_nodes']) # type: ignore[index]
    Logger.info(f'Project {project_uname} mappings loaded')

load_project_settings(project_uname)

Loads the project settings from the project file.

Source code in src/cuemsutils/tools/ConfigManager.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def load_project_settings(self, project_uname: str):
    """
    Loads the project settings from the project file.
    """
    try:
        settings_path = self.project_path(project_uname, 'settings.xml')
        conf = ProjectSettings(
            schema='project_settings',
            xmlfile=settings_path
        )
    except FileNotFoundError as e:
        Logger.info(
            f'Project {project_uname} settings not found. Keeping default settings.'
        )
        return
    except Exception as e:
        Logger.exception(f'Exception in load_project_settings: {e}')
        raise e

    self.project_conf = conf.get_dict()
    for key, value in self.project_conf.items():
        corrected_dict = {}
        if value:
            for item in value:
                corrected_dict.update(item)
            self.project_conf[key] = corrected_dict

    Logger.info(f'Project {project_uname} settings loaded')

project_path(project_uname, file_name)

Returns the path to the project file if it exists.

Parameters:

Name Type Description Default
project_uname str

The name of the project.

required
file_name str

The name of the file to be checked.

required

Returns:

Name Type Description
str str

The path to the project file.

Raises:

Type Description
FileNotFoundError

If the project file does not exist.

Source code in src/cuemsutils/tools/ConfigManager.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def project_path(self, project_uname: str, file_name: str) -> str:
    """
    Returns the path to the project file if it exists.

    Args:
        project_uname (str): The name of the project.
        file_name (str): The name of the file to be checked.

    Returns:
        str: The path to the project file.

    Raises:
        FileNotFoundError: If the project file does not exist.
    """
    project_path = path.join(self.library_path, 'projects', project_uname, file_name)
    if not path.exists(project_path):
        raise FileNotFoundError(f'Project file {project_path} not found')
    return project_path

CTimecode — cuems wrapper around upstream timecode 1.5.1.

Type contract — three accessors for "milliseconds of the position":

  • .milliseconds_exact: float Mathematical answer (frame_number * 1000 / _int_framerate). Use for precision-sensitive math (offset calc, scheduler) where sub-ms accuracy matters at fractional framerates (29.97/23.976).

  • .milliseconds_rounded: int round(.milliseconds_exact). Use for sleep durations, integer CLI args, polling comparisons, dict/set keys, arithmetic with other ints.

  • .milliseconds: int (DEPRECATED — alias of .milliseconds_rounded) Emits DeprecationWarning. Will be removed at the first stable release. Migrate every call-site to one of the explicit names above.

Semantics: cuems uses playhead semantics — at MTC position T, .milliseconds_exact == T*1000. Upstream timecode 1.5.1 ships exposure-window semantics (a frame represents 1/FPS of elapsed exposure; 00:00:00:00 is the END of frame 1). The wrapper canonicalizes by routing all start_seconds= construction through upstream's tc_to_frames (HMSF string), which handles drop-frame correction at 29.97/59.94 DF correctly.

CTimecode

Bases: Timecode

SMPTE timecode with playhead semantics over upstream timecode 1.5.1.

See module docstring for the .milliseconds / .milliseconds_rounded / .milliseconds_exact precision split contract.

Source code in src/cuemsutils/tools/CTimecode.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
class CTimecode(Timecode):
    """SMPTE timecode with playhead semantics over upstream `timecode` 1.5.1.

    See module docstring for the .milliseconds / .milliseconds_rounded /
    .milliseconds_exact precision split contract.
    """

    def __init__(
        self,
        init_dict=None,
        start_timecode=None,
        start_seconds=None,
        frames=None,
        framerate: str | int = "ms",
    ):
        if init_dict is not None:
            super().__init__(framerate, init_dict, start_seconds, frames)
            return

        # Detect the start_seconds-only path BEFORE neutralizing 0, so we
        # know whether to apply playhead-semantics canonicalization.
        # Upstream's float_to_tc returns int(s * _int_framerate)
        # (exposure-window); tc_to_frames returns frame_number+1 and
        # applies drop-frame correction at 29.97/59.94 DF. cuems needs
        # playhead semantics — at MTC position T, .milliseconds_exact ==
        # T*1000 — and must agree with the start_timecode path at every
        # framerate including DF.
        canonicalize = (
            start_seconds is not None
            and start_seconds != 0
            and start_timecode is None
            and frames is None
        )
        original_start_seconds = start_seconds

        if canonicalize:
            # Skip upstream's start_seconds path entirely — it runs
            # `self.frames = float_to_tc(s) = int(s * _int_framerate)` which
            # raises if `int(s*ifps) == 0` (e.g., s=0.03125 at 24fps gives
            # int(0.75)=0, hitting the frames>0 setter guard). We're going to
            # overwrite frames ourselves anyway, so initialize to default
            # ('00:00:00:00' → frames=1) just to set up framerate machinery.
            super().__init__(framerate)
        else:
            # Upstream raises ValueError on start_seconds=0; let the default
            # '00:00:00:00' path handle it (frames=1 == playhead at t=0).
            if start_seconds == 0:
                start_seconds = None
                frames = None
            super().__init__(framerate, start_timecode, start_seconds, frames)

        if canonicalize:
            # Convert seconds to HMSF and delegate to tc_to_frames — correct
            # at DF, non-DF, and ms framerates uniformly. Verified empirically
            # that `round(s * _int_framerate) + 1` is wrong at 29.97 DF
            # (gives 1801 at 1min vs upstream's 1799; off by 18 frames at
            # 10min). The HMSF route inherits upstream's drop-frame logic.
            total_frames = round(original_start_seconds * self._int_framerate)
            ifps = self._int_framerate
            h, rem = divmod(total_frames, ifps * 3600)
            m, rem = divmod(rem, ifps * 60)
            s, f = divmod(rem, ifps)
            # tc_to_frames inserts the drop-frame `;` separator itself when
            # self.drop_frame is True (upstream timecode.py line 277-278), so
            # always pass `:` here.
            tc_str = f"{h:02d}:{m:02d}:{s:02d}:{f:02d}"
            self.frames = self.tc_to_frames(tc_str)

    @classmethod
    def from_dict(cls, init_dict):
        return cls(init_dict=init_dict)

    # ------------------------------------------------------------------
    # framerate getter override (Option D)
    # ------------------------------------------------------------------
    @property
    def framerate(self):
        """Canonical framerate: int for SMPTE, float for fractional, int 1000 for ms.

        Upstream stores integer SMPTE framerates as strings (``'25'``, ``'30'``)
        but ``'ms'``/``'1000'`` as int 1000 — making ``tc.framerate == 25``
        silently False. This getter normalizes the return type so callers can
        compare against numeric literals reliably.
        """
        fr = self._framerate
        if isinstance(fr, str):
            try:
                return int(fr)
            except ValueError:
                return float(fr)
        return fr

    @framerate.setter
    def framerate(self, value):
        # Delegate to upstream's setter for all the framerate normalization
        # (NTSC detection, ms_frame flag, _int_framerate computation, drop_frame).
        Timecode.framerate.fset(self, value)

    # ------------------------------------------------------------------
    # milliseconds — precision split (V2 deprecation cycle)
    # ------------------------------------------------------------------
    @property
    def milliseconds_exact(self) -> float:
        """Time as exact-precision milliseconds (float).

        The mathematical answer: ``frame_number * 1000 / _int_framerate``.
        At integer framerates (24, 25, 30, ms) this is always a whole number
        as a float (e.g., 25fps frame 31 → 1200.0). At fractional framerates
        (29.97, 23.976) the float carries the true sub-ms value (29.97fps
        frame 31 → 1001.001001...) — this is the precision the old
        int-truncating ``.milliseconds`` was throwing away.

        **Use for:**

        - precision-sensitive math (BaseEngine.go_offset calc, scheduler
          drift compensation, MTC bias measurement)
        - any place where adding/subtracting milliseconds across many frames
          at fractional framerates would accumulate truncation drift
        - comparison against expected exact values in tests
          (``pytest.approx(expected, abs=1e-9)``)

        **Do NOT use for:**

        - sleep durations (``time.sleep`` accepts float but ``_rounded``
          better signals integer-ms intent)
        - CLI args expecting integer ms
        - dict/set keys (float equality is fragile)
        - arithmetic with other ints (float contamination)

        Resolves the ``# TODO: float math for other framerates`` from the
        pre-869cyndtv code.

        Note on framerate divisor: uses ``float(self.framerate)`` (the actual
        SMPTE rate, e.g., 29.97) rather than ``_int_framerate`` (the rounded
        label rate, 30 for 29.97). For NTSC fractional rates this is the
        difference between getting real-time ms (1001.001 at frame 31) vs
        label-rate ms (1000.0). cuems consumers want real-time semantics.
        """
        return self.frame_number * 1000 / float(self.framerate)

    @property
    def milliseconds_rounded(self) -> int:
        """Time as milliseconds, rounded to the nearest int (banker's rounding).

        Equivalent to ``round(self.milliseconds_exact)``. Rounding (not
        int-truncation) halves the per-call max error at fractional
        framerates from ~1ms to ~0.5ms, and over long sums averages to
        zero drift instead of monotonic loss.

        **Use for:**

        - sleep durations passed to ``time.sleep()``
        - CLI args expecting integer ms (e.g., ``audiowaveform -e``)
        - polling comparisons (``while mtc.main_tc.milliseconds_rounded < target_ms``)
        - dict/set keys
        - arithmetic with other ints (no float contamination)

        **Migration note:** This is the spiritual successor of the old
        ``.milliseconds``, but with ``round()`` instead of ``int()``
        truncation. At integer framerates, behavior is identical to the old
        ``.milliseconds``. At fractional framerates, values may differ by
        ±1 from the old truncation — review every call-site that compared
        against an exact expected ms value at 29.97/23.976.
        """
        return round(self.milliseconds_exact)

    @property
    @deprecated(
        reason=(
            "Renamed to .milliseconds_rounded (int, rounded) — or use "
            ".milliseconds_exact (float, precise) for precision-sensitive code. "
            "The old .milliseconds will be removed at the first stable release."
        ),
        version="0.1.0rc6",
    )
    def milliseconds(self) -> int:
        """DEPRECATED — alias of .milliseconds_rounded.

        The original ``.milliseconds`` returned ``int(...)`` (truncation).
        The new ``.milliseconds_rounded`` returns ``round(...)`` (nearest
        int) — at integer framerates the values are identical; at
        fractional framerates they may differ by ±1ms. Migrate every
        call-site to the explicit name to clarify rounding intent and
        silence this warning.

        Will be removed in the first stable release after 0.1.0rc6.
        """
        return self.milliseconds_rounded

    # ------------------------------------------------------------------
    # frame-domain conversions
    # ------------------------------------------------------------------
    def return_in_other_framerate(self, framerate):
        """Return a copy of this CTimecode at a different framerate.

        Frame-domain conversion via frame_number (0-indexed elapsed-frames
        count) avoids the lossy time-domain round-trip the previous
        implementation used (which constructed the new instance via
        ``start_seconds=self.milliseconds/1000``, losing one frame to
        upstream's int(s*fr) → frame_number+1 round-trip).

        NOTE — throwaway object cost (deferred 869cyndtv PR #6, 2026-04-27):
            The ``CTimecode(framerate=framerate)`` construction below is
            used only to read ``_int_framerate`` from the resulting object.
            It runs upstream's framerate setter (correct, what we want) but
            ALSO calls ``tc_to_frames("00:00:00:00")`` to populate
            ``_frames=1`` (pure waste). Cost is ~5μs per call. Call-sites
            (run_cue.py:62/252, helpers.py:28/31, loop_cue.py:74/183) are
            all construction-time, not hot-loop — total project-load impact
            ~2ms for 100 cues. Deferred because the cost is unmeasurable in
            production.

            If profiling ever flags this, the recommended fix is a
            class-level memoization cache keyed on the framerate input
            (Option D in the 869cyndtv PR #6 plan)::

                _INT_FR_CACHE: dict = {}

                @classmethod
                def _int_framerate_for(cls, fr):
                    key = (type(fr), fr) if isinstance(fr, (int, float, str)) else repr(fr)
                    if key not in cls._INT_FR_CACHE:
                        cls._INT_FR_CACHE[key] = CTimecode(framerate=fr)._int_framerate
                    return cls._INT_FR_CACHE[key]

            Then replace the ``target_int_fr = ...`` line below with
            ``target_int_fr = self._int_framerate_for(framerate)``.

            Why a cache (Option D) over inlining upstream's NTSC-detection
            logic (Option F): inlining duplicates upstream's algorithm and
            silently diverges if upstream changes tolerance or adds new
            NTSC-like rates. The cache delegates to upstream on the first
            lookup per unique framerate and pays zero on subsequent calls.

            Revisit triggers: (1) profiling shows this method on a hot path,
            (2) a feature introduces dynamic framerate changes during
            playback, (3) the same cache pattern is needed elsewhere in
            CTimecode (unify at that point).
        """
        target_int_fr = CTimecode(framerate=framerate)._int_framerate
        new_frame_number = round(self.frame_number * target_int_fr / self._int_framerate)
        return CTimecode(framerate=framerate, frames=new_frame_number + 1)

    # ------------------------------------------------------------------
    # hashing + comparison (use _rounded directly to avoid DeprecationWarning storm)
    # ------------------------------------------------------------------
    def __hash__(self):
        # Use _rounded explicitly: the deprecated .milliseconds emits a
        # DeprecationWarning on every access, which would flood every
        # dict/set/sorted/min/max usage. Hash compatibility with __eq__ is
        # preserved (both sides use _rounded).
        return hash((self.milliseconds_rounded,))

    def __eq__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded == other.milliseconds_rounded
        return NotImplemented

    def __ne__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded != other.milliseconds_rounded
        return NotImplemented

    def __lt__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded < other.milliseconds_rounded
        elif isinstance(other, int):
            return self.milliseconds_rounded < other
        return NotImplemented

    def __le__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded <= other.milliseconds_rounded
        return NotImplemented

    def __gt__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded > other.milliseconds_rounded
        elif isinstance(other, int):
            return self.milliseconds_rounded > other
        return NotImplemented

    def __ge__(self, other):
        if isinstance(other, CTimecode):
            return self.milliseconds_rounded >= other.milliseconds_rounded
        return NotImplemented

    # ------------------------------------------------------------------
    # arithmetic dunders — playhead-correct + same-framerate assertion
    # ------------------------------------------------------------------
    def __add__(self, other):
        """Return a new CTimecode with `other` added.

        For CTimecode operands, both operands' `frames` are 1-indexed counts;
        naively summing them double-applies the +1 offset. Subtract 1 to get
        the playhead-correct sum.
        """
        if isinstance(other, CTimecode):
            if other._int_framerate != self._int_framerate:
                raise CTimecodeError(
                    f"Arithmetic between CTimecodes of different framerates "
                    f"({self.framerate} vs {other.framerate}); use "
                    f".return_in_other_framerate() first."
                )
            result_frames = self.frames + other.frames - 1
        elif isinstance(other, int):
            # int operand: caller passes a frame count to add (e.g.,
            # MtcListener.py does `self.main_tc + 1` to advance one MTC
            # frame). No 1-indexing adjustment needed.
            result_frames = self.frames + other
        else:
            raise CTimecodeError(
                f"Type {other.__class__.__name__} not supported for arithmetic."
            )
        return CTimecode(framerate=self.framerate, frames=result_frames)

    def __sub__(self, other):
        """Return a new CTimecode with `other` subtracted.

        Symmetric to __add__: subtracting two 1-indexed frame counts yields
        a 0-indexed delta, so add +1 to land back in the 1-indexed
        convention. Subtracting a larger duration from a smaller position
        produces frames<=0, which upstream's frames setter rejects with
        ValueError — that's the intended contract (no silent wrap).
        """
        if isinstance(other, CTimecode):
            if other._int_framerate != self._int_framerate:
                raise CTimecodeError(
                    f"Arithmetic between CTimecodes of different framerates "
                    f"({self.framerate} vs {other.framerate}); use "
                    f".return_in_other_framerate() first."
                )
            result_frames = self.frames - other.frames + 1
        elif isinstance(other, int):
            result_frames = self.frames - other
        else:
            raise CTimecodeError(
                f"Type {other.__class__.__name__} not supported for arithmetic."
            )
        return CTimecode(framerate=self.framerate, frames=result_frames)

    def __mul__(self, other):
        """Return a new CTimecode with frames multiplied by `other`."""
        if isinstance(other, CTimecode):
            if other._int_framerate != self._int_framerate:
                raise CTimecodeError(
                    f"Arithmetic between CTimecodes of different framerates "
                    f"({self.framerate} vs {other.framerate}); use "
                    f".return_in_other_framerate() first."
                )
            multiplied_frames = self.frames * other.frames
        elif isinstance(other, int):
            multiplied_frames = self.frames * other
        else:
            raise CTimecodeError(
                f"Type {other.__class__.__name__} not supported for arithmetic."
            )
        return CTimecode(framerate=self.framerate, frames=multiplied_frames)

    def __truediv__(self, other):
        """Return a new CTimecode with frames divided by `other`.

        Rounds the float result to int (upstream's frames setter requires
        positive int). Rejects zero/negative divisors explicitly to avoid
        silent ``max(1, negative)`` clamp paths.
        """
        if isinstance(other, CTimecode):
            if other._int_framerate != self._int_framerate:
                raise CTimecodeError(
                    f"Arithmetic between CTimecodes of different framerates "
                    f"({self.framerate} vs {other.framerate}); use "
                    f".return_in_other_framerate() first."
                )
            if other.frames == 0:  # defense-in-depth; upstream ensures frames>=1
                raise CTimecodeError("Division by CTimecode with zero frames")
            div_frames = round(self.frames / other.frames)
        elif isinstance(other, int):
            if other <= 0:
                raise CTimecodeError(
                    f"Division by non-positive int ({other}); CTimecode "
                    f"division requires a positive divisor (frames must "
                    f"stay >= 1)."
                )
            div_frames = round(self.frames / other)
        else:
            raise CTimecodeError(
                f"Type {other.__class__.__name__} not supported for arithmetic."
            )
        # round() can yield 0 only when self.frames is small and divisor is
        # large; clamp to 1 (upstream frames setter requires > 0). Negative
        # results are ruled out by the int>0 / frames>0 guards above.
        return CTimecode(framerate=self.framerate, frames=max(1, div_frames))

    # ------------------------------------------------------------------
    # serialization
    # ------------------------------------------------------------------
    def __json__(self):
        return {"CTimecode": self.__str__()}

    def __str__(self):
        # skip_rollover=True keeps __str__ monotonic past 24h. Without this,
        # frames=2_160_002 at 25fps would render as "00:00:00:01" (wrapped)
        # instead of "24:00:00:01" — bug 869cpdbzy reported by Sergio: long-
        # running install (>24h MTC) with audio/sequence stops while video
        # keeps looping. Note the underlying .frames, .milliseconds_exact, and
        # .milliseconds_rounded accessors are already monotonic post-PR-#6;
        # this fix is for the string display + log readability + any consumer
        # that round-trips through str. (869cyndtv PR #10)
        return self.tc_to_string(*self.frames_to_tc(self.frames, skip_rollover=True))

    def __iter__(self):
        yield ("timecode", self.__str__())
        yield ("framerate", self.framerate)

    def items(self):
        return list(self)

framerate property writable

Canonical framerate: int for SMPTE, float for fractional, int 1000 for ms.

Upstream stores integer SMPTE framerates as strings ('25', '30') but 'ms'/'1000' as int 1000 — making tc.framerate == 25 silently False. This getter normalizes the return type so callers can compare against numeric literals reliably.

milliseconds property

DEPRECATED — alias of .milliseconds_rounded.

The original .milliseconds returned int(...) (truncation). The new .milliseconds_rounded returns round(...) (nearest int) — at integer framerates the values are identical; at fractional framerates they may differ by ±1ms. Migrate every call-site to the explicit name to clarify rounding intent and silence this warning.

Will be removed in the first stable release after 0.1.0rc6.

milliseconds_exact property

Time as exact-precision milliseconds (float).

The mathematical answer: frame_number * 1000 / _int_framerate. At integer framerates (24, 25, 30, ms) this is always a whole number as a float (e.g., 25fps frame 31 → 1200.0). At fractional framerates (29.97, 23.976) the float carries the true sub-ms value (29.97fps frame 31 → 1001.001001...) — this is the precision the old int-truncating .milliseconds was throwing away.

Use for:

  • precision-sensitive math (BaseEngine.go_offset calc, scheduler drift compensation, MTC bias measurement)
  • any place where adding/subtracting milliseconds across many frames at fractional framerates would accumulate truncation drift
  • comparison against expected exact values in tests (pytest.approx(expected, abs=1e-9))

Do NOT use for:

  • sleep durations (time.sleep accepts float but _rounded better signals integer-ms intent)
  • CLI args expecting integer ms
  • dict/set keys (float equality is fragile)
  • arithmetic with other ints (float contamination)

Resolves the # TODO: float math for other framerates from the pre-869cyndtv code.

Note on framerate divisor: uses float(self.framerate) (the actual SMPTE rate, e.g., 29.97) rather than _int_framerate (the rounded label rate, 30 for 29.97). For NTSC fractional rates this is the difference between getting real-time ms (1001.001 at frame 31) vs label-rate ms (1000.0). cuems consumers want real-time semantics.

milliseconds_rounded property

Time as milliseconds, rounded to the nearest int (banker's rounding).

Equivalent to round(self.milliseconds_exact). Rounding (not int-truncation) halves the per-call max error at fractional framerates from ~1ms to ~0.5ms, and over long sums averages to zero drift instead of monotonic loss.

Use for:

  • sleep durations passed to time.sleep()
  • CLI args expecting integer ms (e.g., audiowaveform -e)
  • polling comparisons (while mtc.main_tc.milliseconds_rounded < target_ms)
  • dict/set keys
  • arithmetic with other ints (no float contamination)

Migration note: This is the spiritual successor of the old .milliseconds, but with round() instead of int() truncation. At integer framerates, behavior is identical to the old .milliseconds. At fractional framerates, values may differ by ±1 from the old truncation — review every call-site that compared against an exact expected ms value at 29.97/23.976.

__add__(other)

Return a new CTimecode with other added.

For CTimecode operands, both operands' frames are 1-indexed counts; naively summing them double-applies the +1 offset. Subtract 1 to get the playhead-correct sum.

Source code in src/cuemsutils/tools/CTimecode.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
def __add__(self, other):
    """Return a new CTimecode with `other` added.

    For CTimecode operands, both operands' `frames` are 1-indexed counts;
    naively summing them double-applies the +1 offset. Subtract 1 to get
    the playhead-correct sum.
    """
    if isinstance(other, CTimecode):
        if other._int_framerate != self._int_framerate:
            raise CTimecodeError(
                f"Arithmetic between CTimecodes of different framerates "
                f"({self.framerate} vs {other.framerate}); use "
                f".return_in_other_framerate() first."
            )
        result_frames = self.frames + other.frames - 1
    elif isinstance(other, int):
        # int operand: caller passes a frame count to add (e.g.,
        # MtcListener.py does `self.main_tc + 1` to advance one MTC
        # frame). No 1-indexing adjustment needed.
        result_frames = self.frames + other
    else:
        raise CTimecodeError(
            f"Type {other.__class__.__name__} not supported for arithmetic."
        )
    return CTimecode(framerate=self.framerate, frames=result_frames)

__mul__(other)

Return a new CTimecode with frames multiplied by other.

Source code in src/cuemsutils/tools/CTimecode.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def __mul__(self, other):
    """Return a new CTimecode with frames multiplied by `other`."""
    if isinstance(other, CTimecode):
        if other._int_framerate != self._int_framerate:
            raise CTimecodeError(
                f"Arithmetic between CTimecodes of different framerates "
                f"({self.framerate} vs {other.framerate}); use "
                f".return_in_other_framerate() first."
            )
        multiplied_frames = self.frames * other.frames
    elif isinstance(other, int):
        multiplied_frames = self.frames * other
    else:
        raise CTimecodeError(
            f"Type {other.__class__.__name__} not supported for arithmetic."
        )
    return CTimecode(framerate=self.framerate, frames=multiplied_frames)

__sub__(other)

Return a new CTimecode with other subtracted.

Symmetric to add: subtracting two 1-indexed frame counts yields a 0-indexed delta, so add +1 to land back in the 1-indexed convention. Subtracting a larger duration from a smaller position produces frames<=0, which upstream's frames setter rejects with ValueError — that's the intended contract (no silent wrap).

Source code in src/cuemsutils/tools/CTimecode.py
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def __sub__(self, other):
    """Return a new CTimecode with `other` subtracted.

    Symmetric to __add__: subtracting two 1-indexed frame counts yields
    a 0-indexed delta, so add +1 to land back in the 1-indexed
    convention. Subtracting a larger duration from a smaller position
    produces frames<=0, which upstream's frames setter rejects with
    ValueError — that's the intended contract (no silent wrap).
    """
    if isinstance(other, CTimecode):
        if other._int_framerate != self._int_framerate:
            raise CTimecodeError(
                f"Arithmetic between CTimecodes of different framerates "
                f"({self.framerate} vs {other.framerate}); use "
                f".return_in_other_framerate() first."
            )
        result_frames = self.frames - other.frames + 1
    elif isinstance(other, int):
        result_frames = self.frames - other
    else:
        raise CTimecodeError(
            f"Type {other.__class__.__name__} not supported for arithmetic."
        )
    return CTimecode(framerate=self.framerate, frames=result_frames)

__truediv__(other)

Return a new CTimecode with frames divided by other.

Rounds the float result to int (upstream's frames setter requires positive int). Rejects zero/negative divisors explicitly to avoid silent max(1, negative) clamp paths.

Source code in src/cuemsutils/tools/CTimecode.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def __truediv__(self, other):
    """Return a new CTimecode with frames divided by `other`.

    Rounds the float result to int (upstream's frames setter requires
    positive int). Rejects zero/negative divisors explicitly to avoid
    silent ``max(1, negative)`` clamp paths.
    """
    if isinstance(other, CTimecode):
        if other._int_framerate != self._int_framerate:
            raise CTimecodeError(
                f"Arithmetic between CTimecodes of different framerates "
                f"({self.framerate} vs {other.framerate}); use "
                f".return_in_other_framerate() first."
            )
        if other.frames == 0:  # defense-in-depth; upstream ensures frames>=1
            raise CTimecodeError("Division by CTimecode with zero frames")
        div_frames = round(self.frames / other.frames)
    elif isinstance(other, int):
        if other <= 0:
            raise CTimecodeError(
                f"Division by non-positive int ({other}); CTimecode "
                f"division requires a positive divisor (frames must "
                f"stay >= 1)."
            )
        div_frames = round(self.frames / other)
    else:
        raise CTimecodeError(
            f"Type {other.__class__.__name__} not supported for arithmetic."
        )
    # round() can yield 0 only when self.frames is small and divisor is
    # large; clamp to 1 (upstream frames setter requires > 0). Negative
    # results are ruled out by the int>0 / frames>0 guards above.
    return CTimecode(framerate=self.framerate, frames=max(1, div_frames))

return_in_other_framerate(framerate)

Return a copy of this CTimecode at a different framerate.

Frame-domain conversion via frame_number (0-indexed elapsed-frames count) avoids the lossy time-domain round-trip the previous implementation used (which constructed the new instance via start_seconds=self.milliseconds/1000, losing one frame to upstream's int(s*fr) → frame_number+1 round-trip).

NOTE — throwaway object cost (deferred 869cyndtv PR #6, 2026-04-27): The CTimecode(framerate=framerate) construction below is used only to read _int_framerate from the resulting object. It runs upstream's framerate setter (correct, what we want) but ALSO calls tc_to_frames("00:00:00:00") to populate _frames=1 (pure waste). Cost is ~5μs per call. Call-sites (run_cue.py:62/252, helpers.py:28/31, loop_cue.py:74/183) are all construction-time, not hot-loop — total project-load impact ~2ms for 100 cues. Deferred because the cost is unmeasurable in production.

If profiling ever flags this, the recommended fix is a
class-level memoization cache keyed on the framerate input
(Option D in the 869cyndtv PR #6 plan)::

    _INT_FR_CACHE: dict = {}

    @classmethod
    def _int_framerate_for(cls, fr):
        key = (type(fr), fr) if isinstance(fr, (int, float, str)) else repr(fr)
        if key not in cls._INT_FR_CACHE:
            cls._INT_FR_CACHE[key] = CTimecode(framerate=fr)._int_framerate
        return cls._INT_FR_CACHE[key]

Then replace the ``target_int_fr = ...`` line below with
``target_int_fr = self._int_framerate_for(framerate)``.

Why a cache (Option D) over inlining upstream's NTSC-detection
logic (Option F): inlining duplicates upstream's algorithm and
silently diverges if upstream changes tolerance or adds new
NTSC-like rates. The cache delegates to upstream on the first
lookup per unique framerate and pays zero on subsequent calls.

Revisit triggers: (1) profiling shows this method on a hot path,
(2) a feature introduces dynamic framerate changes during
playback, (3) the same cache pattern is needed elsewhere in
CTimecode (unify at that point).
Source code in src/cuemsutils/tools/CTimecode.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def return_in_other_framerate(self, framerate):
    """Return a copy of this CTimecode at a different framerate.

    Frame-domain conversion via frame_number (0-indexed elapsed-frames
    count) avoids the lossy time-domain round-trip the previous
    implementation used (which constructed the new instance via
    ``start_seconds=self.milliseconds/1000``, losing one frame to
    upstream's int(s*fr) → frame_number+1 round-trip).

    NOTE — throwaway object cost (deferred 869cyndtv PR #6, 2026-04-27):
        The ``CTimecode(framerate=framerate)`` construction below is
        used only to read ``_int_framerate`` from the resulting object.
        It runs upstream's framerate setter (correct, what we want) but
        ALSO calls ``tc_to_frames("00:00:00:00")`` to populate
        ``_frames=1`` (pure waste). Cost is ~5μs per call. Call-sites
        (run_cue.py:62/252, helpers.py:28/31, loop_cue.py:74/183) are
        all construction-time, not hot-loop — total project-load impact
        ~2ms for 100 cues. Deferred because the cost is unmeasurable in
        production.

        If profiling ever flags this, the recommended fix is a
        class-level memoization cache keyed on the framerate input
        (Option D in the 869cyndtv PR #6 plan)::

            _INT_FR_CACHE: dict = {}

            @classmethod
            def _int_framerate_for(cls, fr):
                key = (type(fr), fr) if isinstance(fr, (int, float, str)) else repr(fr)
                if key not in cls._INT_FR_CACHE:
                    cls._INT_FR_CACHE[key] = CTimecode(framerate=fr)._int_framerate
                return cls._INT_FR_CACHE[key]

        Then replace the ``target_int_fr = ...`` line below with
        ``target_int_fr = self._int_framerate_for(framerate)``.

        Why a cache (Option D) over inlining upstream's NTSC-detection
        logic (Option F): inlining duplicates upstream's algorithm and
        silently diverges if upstream changes tolerance or adds new
        NTSC-like rates. The cache delegates to upstream on the first
        lookup per unique framerate and pays zero on subsequent calls.

        Revisit triggers: (1) profiling shows this method on a hot path,
        (2) a feature introduces dynamic framerate changes during
        playback, (3) the same cache pattern is needed elsewhere in
        CTimecode (unify at that point).
    """
    target_int_fr = CTimecode(framerate=framerate)._int_framerate
    new_frame_number = round(self.frame_number * target_int_fr / self._int_framerate)
    return CTimecode(framerate=framerate, frames=new_frame_number + 1)

CTimecodeError

Bases: Exception

Raised when an error occurred in timecode calculation.

Source code in src/cuemsutils/tools/CTimecode.py
455
456
457
class CTimecodeError(Exception):
    """Raised when an error occurred in timecode calculation."""
    pass

Quarter-frame timer that hooks to a CTimecode object.

Fires a registered callback at every quarter-frame boundary. Optionally accepts an immutable list of parameter tuples at construction time; each tuple's contents are unpacked as callback arguments at the corresponding quarter-frame. When a tuple list is provided the timer stops automatically after the last tuple is consumed; without one it runs until explicitly stopped.

CTimecodeTimer

Timer that fires a callback at every quarter-frame boundary.

Source code in src/cuemsutils/tools/CTimecodeTimer.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class CTimecodeTimer:
    """Timer that fires a callback at every quarter-frame boundary."""

    def __init__(
        self,
        timecode: CTimecode,
        params: list[tuple] | None = None,
    ) -> None:
        if timecode is None:
            raise ValueError("CTimecodeTimer: timecode must not be None")

        self._timecode = timecode

        # QF interval: handle 'ms' pseudo-framerate (= 1000 fps)
        fr = timecode.framerate
        qf_fr = 1000.0 if fr == "ms" else float(fr)
        self._qf_interval: float = 1.0 / (4.0 * qf_fr)

        # Freeze params; empty list → bare-signal mode
        if params is not None and len(params) > 0:
            self._params: tuple[tuple, ...] | None = tuple(
                tuple(p) for p in params
            )
        else:
            self._params = None

        self._lock = threading.Lock()
        self._stop_event = threading.Event()
        self._thread: threading.Thread | None = None
        self._callback: Callable | None = None
        self._state: _State = _State.IDLE
        self._index: int = 0

    # ── Public interface ────────────────────────────────────────────────

    @property
    def callback(self) -> Callable | None:
        with self._lock:
            return self._callback

    @callback.setter
    def callback(self, fn: Callable | None) -> None:
        with self._lock:
            self._callback = fn

    def start(self) -> None:
        """Start the timer loop.

        IDLE or STOPPED → RUNNING.  No-op if already RUNNING.
        EXHAUSTED → no-op with a warning.
        """
        with self._lock:
            if self._state == _State.EXHAUSTED:
                Logger.warning(
                    "CTimecodeTimer: cannot restart exhausted timer"
                    " — create a new instance"
                )
                return
            if self._state == _State.RUNNING:
                return
            self._stop_event.clear()
            self._state = _State.RUNNING
            prev = self._thread

        # Join any previous thread outside the lock
        if prev is not None:
            prev.join(timeout=self._qf_interval * 4)

        t = threading.Thread(
            target=self._run_loop, daemon=True, name="CTimecodeTimer"
        )
        self._thread = t
        t.start()

    def stop(self) -> None:
        """Stop the timer loop.  RUNNING → STOPPED.  No-op otherwise."""
        with self._lock:
            if self._state != _State.RUNNING:
                return
            self._state = _State.STOPPED

        self._stop_event.set()
        if self._thread is not None:
            self._thread.join(timeout=self._qf_interval * 4)

    # ── Private loop ────────────────────────────────────────────────────

    def _run_loop(self) -> None:
        prev_tc_ms: float = self._timecode.milliseconds_exact
        qf_ms: float = self._qf_interval * 1000.0

        while True:
            # Wait for one QF interval or early exit on stop
            if self._stop_event.wait(timeout=self._qf_interval):
                break

            # Read current timecode position (for seek detection)
            current_tc_ms = self._timecode.milliseconds_exact
            delta_ms = current_tc_ms - prev_tc_ms
            prev_tc_ms = current_tc_ms

            # Seek detection — parameterised mode only
            if self._params is not None:
                if delta_ms > 1.5 * qf_ms:
                    extra = round(delta_ms / qf_ms) - 1
                    self._index = min(
                        self._index + extra, len(self._params)
                    )
                elif delta_ms < 0:
                    self._index = 0

                if self._index >= len(self._params):
                    with self._lock:
                        self._state = _State.EXHAUSTED
                    break

            # Dispatch callback
            with self._lock:
                cb = self._callback

            if cb is not None:
                try:
                    if self._params is not None:
                        cb(*self._params[self._index])
                    else:
                        cb()
                except Exception:
                    Logger.error(
                        "CTimecodeTimer: callback raised an exception"
                    )

            # Post-dispatch index advance
            if self._params is not None:
                self._index += 1
                if self._index >= len(self._params):
                    with self._lock:
                        self._state = _State.EXHAUSTED
                    break

start()

Start the timer loop.

IDLE or STOPPED → RUNNING. No-op if already RUNNING. EXHAUSTED → no-op with a warning.

Source code in src/cuemsutils/tools/CTimecodeTimer.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def start(self) -> None:
    """Start the timer loop.

    IDLE or STOPPED → RUNNING.  No-op if already RUNNING.
    EXHAUSTED → no-op with a warning.
    """
    with self._lock:
        if self._state == _State.EXHAUSTED:
            Logger.warning(
                "CTimecodeTimer: cannot restart exhausted timer"
                " — create a new instance"
            )
            return
        if self._state == _State.RUNNING:
            return
        self._stop_event.clear()
        self._state = _State.RUNNING
        prev = self._thread

    # Join any previous thread outside the lock
    if prev is not None:
        prev.join(timeout=self._qf_interval * 4)

    t = threading.Thread(
        target=self._run_loop, daemon=True, name="CTimecodeTimer"
    )
    self._thread = t
    t.start()

stop()

Stop the timer loop. RUNNING → STOPPED. No-op otherwise.

Source code in src/cuemsutils/tools/CTimecodeTimer.py
101
102
103
104
105
106
107
108
109
110
def stop(self) -> None:
    """Stop the timer loop.  RUNNING → STOPPED.  No-op otherwise."""
    with self._lock:
        if self._state != _State.RUNNING:
            return
        self._state = _State.STOPPED

    self._stop_event.set()
    if self._thread is not None:
        self._thread.join(timeout=self._qf_interval * 4)

FadeCalculator

Source code in src/cuemsutils/tools/FadeCalculator.py
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class FadeCalculator:
    TIMELINE_PARAMS = [
        'start_time',
        'end_time',
        'framerate',
        'drop_frame'
    ]

    TRANSITION_DURATION_MILLISECONDS = 20

    @staticmethod
    def calculate(fade_function: Callable | str, **kwargs) -> dict[str, float]:
        """
        Calculate a timeline of timecodes between start_time and end_time.

        Args:
            fade_function: The fade function to apply to the timeline.
            **kwargs: Additional keyword arguments to pass to the timeline calculation and the fade function.

        Returns:
            A list of tuples, each containing a timecode and the value of the fade function at that timecode.

        Raises:
            ValueError: If fade_function is a string and not a valid function name of FadeCalculator.
        """
        timeline_args = {k: v for k,v in kwargs.items() if k in FadeCalculator.TIMELINE_PARAMS}
        for k in timeline_args.keys():
            kwargs.pop(k)
        if isinstance(fade_function, str):
            if not hasattr(FadeCalculator, fade_function):
                raise ValueError(f"Invalid fade function name: {fade_function}")
            fade_function = getattr(FadeCalculator, fade_function)
        if not callable(fade_function):
            raise ValueError(f"Invalid fade function: {fade_function}")
        timeline = FadeCalculator.calculate_timeline(**timeline_args)
        return zip(timeline, fade_function(**kwargs), strict=True)

    @staticmethod
    def calculate_timeline(start_time: CTimecode, end_time: CTimecode, **kwargs) -> list[str]:
        """
        Calculate a timeline of timecodes between start_time and end_time. It does not allow for a zero duration resulting timeline.

        Args:
            start_time: The start timecode.
            end_time: The end timecode.
            **kwargs: Additional keyword arguments to pass to the CTimecode constructor.

        Returns:
            A list of timecodes as strings.

        Raises:
            ValueError: If start_time or end_time are not of type CTimecode.
            ValueError: If the duration is less than or equal to 0.
        """
        if not (isinstance(start_time, CTimecode) and isinstance(end_time, CTimecode)):
            raise ValueError("start_time and end_time must be of type CTimecode")
        if start_time.milliseconds_rounded >= end_time.milliseconds_rounded:
            raise ValueError("start_time must be before end_time")
        duration_ms = end_time.milliseconds_rounded - start_time.milliseconds_rounded
        steps = int(duration_ms // FadeCalculator.TRANSITION_DURATION_MILLISECONDS)
        # `start_seconds` expects SECONDS, not milliseconds. Pre-869cyndtv this
        # site passed a ms value into start_seconds; the unit error happened to
        # cancel at framerate='ms' (where 1 ms-frame == 1 ms) but produced wildly
        # wrong values at any other framerate. The /1000 here converts to seconds.
        out = [
            str(
                CTimecode(
                    start_seconds=(
                        (start_time.milliseconds_rounded + i * FadeCalculator.TRANSITION_DURATION_MILLISECONDS)
                        / 1000
                    ),
                    **kwargs,
                )
            )
            for i in range(steps)
        ]
        out[-1] = str(end_time)
        return out

    # --- Fade curve functions ---
    @staticmethod
    def linear(length: int, start_value: float, end_value: float) -> dict[str, float]:
        slope = (end_value - start_value) / (length - 1)

        def fn(x: float) -> float:
            return start_value + slope * x

        return FadeCalculator._apply_function_to_range(length, fn)

    @staticmethod
    def sigmoid(length: int, start_value: float, end_value: float, inflec: float, growth: float) -> float:

        def fn(x: float) -> float:
            return (start_value - end_value) / (1 + (x / inflec) ** growth) + end_value

        return FadeCalculator._apply_function_to_range(length, fn)

    # --- Internal helper methods ---
    @staticmethod
    def _apply_to_100(function: Callable, **kwargs) -> list[float]:
        return [function(i, **kwargs) for i in range(100)]

    @staticmethod
    def _apply_to_list(x: list[float|int], function: Callable, **kwargs) -> list[float]:
        return [function(i, **kwargs) for i in x]

    @staticmethod
    def _apply_function_to_range(length: int, function: Callable, **kwargs) -> list[float]:
        return [function(i, **kwargs) for i in range(length)]

    @staticmethod
    def _rescale(x: list[float|int], in_min: float | int, in_max: float | int, out_min: float | int, out_max: float | int) -> list[float]:

        def fn(xv: float | int) -> float:
            return (out_max - out_min) * (xv - in_min) / (in_max - in_min) + out_min

        return FadeCalculator._apply_to_list(x, fn)

    @staticmethod
    def _sample_values(x: list[float|int], n: int) -> list:
        if n < 0:
            raise ValueError("n must be >= 0")
        if n == 0:
            return []
        if len(x) == 0:
            return []

        # Map n evenly-spaced indices over [0, len(x)-1]
        idx = FadeCalculator._rescale(
            list(range(n)),
            0,
            max(1, n - 1),
            0,
            len(x) - 1,
        )
        return [x[round(i)] for i in idx]

calculate(fade_function, **kwargs) staticmethod

Calculate a timeline of timecodes between start_time and end_time.

Parameters:

Name Type Description Default
fade_function Callable | str

The fade function to apply to the timeline.

required
**kwargs

Additional keyword arguments to pass to the timeline calculation and the fade function.

{}

Returns:

Type Description
dict[str, float]

A list of tuples, each containing a timecode and the value of the fade function at that timecode.

Raises:

Type Description
ValueError

If fade_function is a string and not a valid function name of FadeCalculator.

Source code in src/cuemsutils/tools/FadeCalculator.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@staticmethod
def calculate(fade_function: Callable | str, **kwargs) -> dict[str, float]:
    """
    Calculate a timeline of timecodes between start_time and end_time.

    Args:
        fade_function: The fade function to apply to the timeline.
        **kwargs: Additional keyword arguments to pass to the timeline calculation and the fade function.

    Returns:
        A list of tuples, each containing a timecode and the value of the fade function at that timecode.

    Raises:
        ValueError: If fade_function is a string and not a valid function name of FadeCalculator.
    """
    timeline_args = {k: v for k,v in kwargs.items() if k in FadeCalculator.TIMELINE_PARAMS}
    for k in timeline_args.keys():
        kwargs.pop(k)
    if isinstance(fade_function, str):
        if not hasattr(FadeCalculator, fade_function):
            raise ValueError(f"Invalid fade function name: {fade_function}")
        fade_function = getattr(FadeCalculator, fade_function)
    if not callable(fade_function):
        raise ValueError(f"Invalid fade function: {fade_function}")
    timeline = FadeCalculator.calculate_timeline(**timeline_args)
    return zip(timeline, fade_function(**kwargs), strict=True)

calculate_timeline(start_time, end_time, **kwargs) staticmethod

Calculate a timeline of timecodes between start_time and end_time. It does not allow for a zero duration resulting timeline.

Parameters:

Name Type Description Default
start_time CTimecode

The start timecode.

required
end_time CTimecode

The end timecode.

required
**kwargs

Additional keyword arguments to pass to the CTimecode constructor.

{}

Returns:

Type Description
list[str]

A list of timecodes as strings.

Raises:

Type Description
ValueError

If start_time or end_time are not of type CTimecode.

ValueError

If the duration is less than or equal to 0.

Source code in src/cuemsutils/tools/FadeCalculator.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@staticmethod
def calculate_timeline(start_time: CTimecode, end_time: CTimecode, **kwargs) -> list[str]:
    """
    Calculate a timeline of timecodes between start_time and end_time. It does not allow for a zero duration resulting timeline.

    Args:
        start_time: The start timecode.
        end_time: The end timecode.
        **kwargs: Additional keyword arguments to pass to the CTimecode constructor.

    Returns:
        A list of timecodes as strings.

    Raises:
        ValueError: If start_time or end_time are not of type CTimecode.
        ValueError: If the duration is less than or equal to 0.
    """
    if not (isinstance(start_time, CTimecode) and isinstance(end_time, CTimecode)):
        raise ValueError("start_time and end_time must be of type CTimecode")
    if start_time.milliseconds_rounded >= end_time.milliseconds_rounded:
        raise ValueError("start_time must be before end_time")
    duration_ms = end_time.milliseconds_rounded - start_time.milliseconds_rounded
    steps = int(duration_ms // FadeCalculator.TRANSITION_DURATION_MILLISECONDS)
    # `start_seconds` expects SECONDS, not milliseconds. Pre-869cyndtv this
    # site passed a ms value into start_seconds; the unit error happened to
    # cancel at framerate='ms' (where 1 ms-frame == 1 ms) but produced wildly
    # wrong values at any other framerate. The /1000 here converts to seconds.
    out = [
        str(
            CTimecode(
                start_seconds=(
                    (start_time.milliseconds_rounded + i * FadeCalculator.TRANSITION_DURATION_MILLISECONDS)
                    / 1000
                ),
                **kwargs,
            )
        )
        for i in range(steps)
    ]
    out[-1] = str(end_time)
    return out

ConnectionInfo dataclass

Information about an active bus connection.

Source code in src/cuemsutils/tools/HubServices.py
23
24
25
26
27
28
@dataclass
class ConnectionInfo:
    """Information about an active bus connection."""
    pipe_id: int
    sender: str
    connected_at: datetime

HubService

Bases: ABC

Source code in src/cuemsutils/tools/HubServices.py
31
32
33
34
35
36
37
38
39
40
41
42
class HubService(ABC):
    @abstractmethod
    def __init__(self, address:str):
        self.address = address

    @abstractmethod
    def send_message(self, message: dict | Message) -> None:
        """ Add message (dict or Message) to the queue to be sent to hub """

    @abstractmethod
    def get_message(self) -> Message:
        """ Get message from the queue. Message.data is already JSON-decoded as dict """

get_message() abstractmethod

Get message from the queue. Message.data is already JSON-decoded as dict

Source code in src/cuemsutils/tools/HubServices.py
40
41
42
@abstractmethod
def get_message(self) -> Message:
    """ Get message from the queue. Message.data is already JSON-decoded as dict """

send_message(message) abstractmethod

Add message (dict or Message) to the queue to be sent to hub

Source code in src/cuemsutils/tools/HubServices.py
36
37
38
@abstractmethod
def send_message(self, message: dict | Message) -> None:
    """ Add message (dict or Message) to the queue to be sent to hub """

NngBusHub

Bases: HubService

Communicates over NNG (nanomsg) using Bus topology for many-to-one, one-to-many messaging

Source code in src/cuemsutils/tools/HubServices.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
class NngBusHub(HubService):
    """ Communicates over NNG (nanomsg) using Bus topology for many-to-one, one-to-many messaging """

    class Mode(Enum):
        LISTENER = "listen"
        DIALER = "dial"

    def __init__(self, hub_address:str, mode:Mode):
        """
        Initialize Nng_bus_hub instance with address and operational mode.

        Parameters:
        - hub_address (str): The address to connect or listen for bus connections.
        - mode (Mode): The operational mode - LISTENER (listens) or DIALER (dials)

        The instance will set up incoming and outgoing message queues for asynchronous message handling.
        """
        self.active_connections: dict[int, ConnectionInfo] = {}  # key -> ConnectionInfo
        self._next_conn_key: int = 0  # monotonic key; avoids calling pipe.id in callbacks
        self.address = hub_address
        self.mode = mode
        self.incoming = asyncio.Queue()
        self.outgoing = asyncio.Queue()
        self.connection = None

        # Set to True at the very start of teardown so pipe callbacks become no-ops.
        # NNG fires post_pipe_remove callbacks from its internal C threads during socket
        # close; if those callbacks call any pynng C function (e.g. pipe.id) while NNG
        # internals are being freed, a SIGFPE or SIGABRT results.
        self._closing: bool = False

        # Connection health tracking
        self._last_message_received: Optional[datetime] = None
        self._last_message_sent: Optional[datetime] = None
        self._messages_received_count: int = 0
        self._messages_sent_count: int = 0

        # Ping/pong configuration
        self._auto_ping_enabled: bool = False
        self._auto_ping_interval: float = 10.0
        self._auto_ping_task: Optional[asyncio.Task] = None
        self._auto_pong_enabled: bool = True  # Nodes auto-respond by default
        self._ping_count: int = 0
        self._pong_count: int = 0

    async def start(self):
        """
        Start the bus communication by initializing the connection and launching message handlers.

        This method starts the bus connection based on the mode (LISTENER or DIALER),
        then launches infinite receiver and sender loops. It monitors both tasks and
        exits when the first exception occurs or connection is broken, properly cleaning
        up all running tasks.

        Raises:
        - ValueError: If an unknown mode is set.
        - Exception: Re-raises any exception that occurs during task execution after cleanup.
        """
        match self.mode:
            case self.Mode.LISTENER:
                await self.start_listener()
            case self.Mode.DIALER:
                await self.start_dialer()
            case _:
                raise ValueError(f"Unknown mode: {self.mode}")

        sender_task = None
        receiver_task = None
        ping_task = None
        try:
            sender_task = asyncio.create_task(self._send_handler(), name="sender")
            receiver_task = asyncio.create_task(self._receiver_handler(), name="receiver")
            ping_task = asyncio.create_task(self._auto_ping_handler(), name="ping")

            tasks = [sender_task, receiver_task, ping_task]
            task_names = {sender_task: "sender", receiver_task: "receiver", ping_task: "ping"}

            Logger.info(f"NNG {self.mode.value} tasks started: sender, receiver, ping")

            done_tasks, pending_tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

            # Check if completed task had an exception
            for task in done_tasks:
                task_name = task_names.get(task, "unknown")
                if task.exception() is not None:
                    Logger.error(f"NNG {self.mode.value} {task_name} handler failed: {task.exception()}")
                    Logger.error(f"Exception type: {type(task.exception()).__name__}")
                else:
                    Logger.warning(f"NNG {self.mode.value} {task_name} handler exited unexpectedly (no exception)")

            # Log pending task cancellation
            pending_names = [task_names.get(t, "unknown") for t in pending_tasks]
            if pending_names:
                Logger.info(f"Cancelling pending tasks: {pending_names}")

            # Cancel and await pending tasks
            for task in pending_tasks:
                task.cancel()
                try:
                    await task
                except asyncio.CancelledError:
                    pass

            Logger.warning(f"NNG {self.mode.value} start() exiting - all tasks terminated")

        except Exception as e:
            Logger.error(f"Error occurred while starting tasks: {e} type: {type(e)}")
            # Cancel any running tasks
            for task in [sender_task, receiver_task, ping_task]:
                if task is not None and not task.done():
                    task.cancel()
                    try:
                        await task
                    except (asyncio.CancelledError, Exception):
                        pass
            raise
        finally:
            # Set _closing=True FIRST so any pipe callbacks that fire during
            # task cancellation or socket close become immediate no-ops.
            # Accessing pipe.id inside a NNG pipe callback during teardown can
            # trigger SIGFPE/SIGABRT in NNG's C code — _closing prevents that.
            self._closing = True

            # Cancel all inner tasks before closing the socket.
            for task in [sender_task, receiver_task, ping_task]:
                if task is not None and not task.done():
                    task.cancel()
                    try:
                        await task
                    except (asyncio.CancelledError, Exception):
                        pass
            if hasattr(self, 'connection') and self.connection is not None:
                try:
                    self.connection.close()
                except Exception:
                    pass
                self.connection = None

    async def start_listener(self):
        """
        Initialize the bus connection in LISTENER mode (listening for connections).

        Creates a Bus0 socket that listens on the configured address with a 100ms receive timeout.
        TCP keepalive is enabled BEFORE listening to ensure it applies to all connections.
        Callbacks are registered BEFORE listening to ensure no connection events are missed.
        """
        # Create socket first, set options and callbacks, then listen
        self.connection = Bus0(recv_timeout=RECV_TIMEOUT)
        self.connection.tcp_keepalive = True
        self._add_callbacks()  # Register callbacks BEFORE listen to catch all events
        self.connection.listen(self.address)
        Logger.debug(f"LISTENER started on {self.address}, tcp_keepalive={self.connection.tcp_keepalive}")

    async def start_dialer(self):
        """
        Initialize the bus connection in DIALER mode (dialing to controller).

        Creates a Bus0 socket that dials to the configured address with a 100ms receive timeout.
        TCP keepalive is enabled BEFORE dialing to ensure it applies to the connection.
        Non-blocking dial is used to allow reconnection if controller is not yet available.
        Callbacks are registered BEFORE dialing to ensure no connection events are missed.
        """
        # Create socket first, set options and callbacks, then dial (non-blocking for reconnection support)
        self.connection = Bus0(recv_timeout=RECV_TIMEOUT)
        self.connection.tcp_keepalive = True
        # Set explicit reconnection parameters
        self.connection.reconnect_time_min = 1000  # 1 second minimum
        self.connection.reconnect_time_max = 30000  # 30 seconds maximum
        self._add_callbacks()  # Register callbacks BEFORE dial to catch all events
        Logger.debug(f"DIALER options set: tcp_keepalive={self.connection.tcp_keepalive}, " +
                     f"reconnect_time_min={self.connection.reconnect_time_min}ms, " +
                     f"reconnect_time_max={self.connection.reconnect_time_max}ms")
        self.connection.dial(self.address, block=False)
        Logger.debug(f"DIALER dialing {self.address} (non-blocking)")

    async def send_message(self, message: dict | Message):
        """
        Queue a message to be sent to the bus.

        Parameters:
        - message (dict | Message): The message to be sent. Must be a dict or Message object with dict data.

        Raises:
        - TypeError: If message is not a dict or Message object, or if Message.data is not a dict.

        The message is JSON-encoded and placed in the outgoing queue to be sent by the sender handler.
        """
        # Extract data from Message object or use raw data
        if isinstance(message, Message):
            data = message.data
            if not isinstance(data, dict):
                raise TypeError(f"Message.data must be a dict, got {type(data).__name__}")
        elif isinstance(message, dict):
            data = message
        else:
            raise TypeError(f"send_message requires dict or Message, got {type(message).__name__}")

        # JSON-encode the dict
        json_data = json.dumps(data)

        await self.outgoing.put(json_data)

    async def get_message(self) -> Message:
        """
        Retrieve a message from the incoming queue.

        Returns:
        - Message: The next message received from the bus. The `data` field is already JSON-decoded as a dict.
                   This method blocks until a message is available.
        """
        return await self.incoming.get()

    def get_active_connections(self) -> List[ConnectionInfo]:
        """
        Get a list of all active connections.

        Returns:
        - List[ConnectionInfo]: List of all active connection information.
        """
        return list(self.active_connections.values())

    def get_connection_count(self) -> int:
        """
        Get the number of active connections.

        Returns:
        - int: Number of currently active connections.
        """
        return len(self.active_connections)

    def get_connection_health_info(self, activity_timeout: float = 30.0) -> Dict:
        """
        Get connection health information based on message activity.

        This is particularly useful for DIALERs to check if they're still 
        connected to the controller, since nodes don't track the controller
        connection in active_connections.

        Parameters:
        - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

        Returns:
        - dict: Health information containing:
            - is_healthy: bool - True if recent activity detected
            - last_received: datetime or None - Last message received time
            - last_sent: datetime or None - Last message sent time
            - seconds_since_activity: float or None - Seconds since last activity
            - messages_received: int - Total messages received
            - messages_sent: int - Total messages sent
        """
        now = datetime.now()
        last_activity = None

        # Determine most recent activity
        if self._last_message_received and self._last_message_sent:
            last_activity = max(self._last_message_received, self._last_message_sent)
        elif self._last_message_received:
            last_activity = self._last_message_received
        elif self._last_message_sent:
            last_activity = self._last_message_sent

        # Calculate seconds since last activity
        seconds_since_activity = None
        if last_activity:
            seconds_since_activity = (now - last_activity).total_seconds()

        # Determine health status
        is_healthy = False
        if seconds_since_activity is not None:
            is_healthy = seconds_since_activity <= activity_timeout

        return {
            'is_healthy': is_healthy,
            'last_received': self._last_message_received,
            'last_sent': self._last_message_sent,
            'seconds_since_activity': seconds_since_activity,
            'messages_received': self._messages_received_count,
            'messages_sent': self._messages_sent_count
        }

    def is_connection_healthy(self, activity_timeout: float = 30.0) -> bool:
        """
        Check if the connection is healthy based on recent activity.

        Parameters:
        - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

        Returns:
        - bool: True if connection appears healthy
        """
        return self.get_connection_health_info(activity_timeout)['is_healthy']

    def enable_auto_ping(self, interval: float = 10.0, inactivity_threshold: float = 5.0):
        """
        Enable automatic ping mechanism for the controller.

        The controller will automatically send ping messages to all nodes if there's
        been no activity for the specified threshold period. Nodes will automatically
        respond with pong messages.

        Parameters:
        - interval: How often to check for inactive connections (default: 10.0 seconds)
        - inactivity_threshold: Send ping if no activity for this many seconds (default: 5.0)

        Note: This is primarily useful for LISTENER mode to verify node connections.
        """
        self._auto_ping_enabled = True
        self._auto_ping_interval = interval
        self._inactivity_threshold = inactivity_threshold
        Logger.info(f"Auto-ping enabled: check every {interval}s, ping after {inactivity_threshold}s inactivity")

    def disable_auto_ping(self):
        """Disable automatic ping mechanism."""
        self._auto_ping_enabled = False
        Logger.info("Auto-ping disabled")

    def enable_auto_pong(self):
        """Enable automatic pong responses (enabled by default)."""
        self._auto_pong_enabled = True
        Logger.debug("Auto-pong enabled")

    def disable_auto_pong(self):
        """Disable automatic pong responses."""
        self._auto_pong_enabled = False
        Logger.debug("Auto-pong disabled")

    async def send_ping(self):
        """
        Manually send a ping message to all connected nodes.

        Returns:
        - int: Number of pings sent
        """
        ping_message = {"__type__": "ping", "timestamp": datetime.now().isoformat()}
        await self.send_message(ping_message)
        self._ping_count += 1
        Logger.debug(f"Ping sent (total: {self._ping_count})")
        return 1

    async def _auto_ping_handler(self):
        """
        Internal handler for automatic ping mechanism.

        Periodically checks for inactive connections and sends pings if needed.
        Only runs in LISTENER mode when auto_ping is enabled.
        """
        if not hasattr(self, '_inactivity_threshold'):
            self._inactivity_threshold = 5.0

        while await asyncio.sleep(0, result=True):
            try:
                await asyncio.sleep(self._auto_ping_interval)

                if not self._auto_ping_enabled:
                    continue

                # Check if we need to send a ping
                now = datetime.now()
                should_ping = False

                if self._last_message_sent:
                    seconds_since_last_sent = (now - self._last_message_sent).total_seconds()
                    if seconds_since_last_sent >= self._inactivity_threshold:
                        should_ping = True
                else:
                    should_ping = True  # Never sent anything yet

                if should_ping:
                    Logger.debug(f"Sending ping due to inactivity")
                    await self.send_ping()

            except asyncio.CancelledError:
                break
            except Exception as e:
                Logger.error(f"Error in auto-ping handler: {e}")

    async def _receiver_handler(self):
        """
        Internal handler that continuously receives messages from the bus.

        This infinite loop listens for incoming messages on the bus connection and
        places them in the incoming queue. Timeout exceptions are silently ignored
        to allow continuous polling. Other exceptions are logged.

        The loop runs until cancelled or an unhandled exception occurs.
        """
        while await asyncio.sleep(0, result=True):
            try:
                pynng_message = await self.connection.arecv_msg()

                # Extract sender information from the message pipe
                sender = self._extract_sender_info(pynng_message.pipe)

                # Decode bytes to string, then parse JSON to dict
                decoded_string = pynng_message.bytes.decode()
                try:
                    data_dict = json.loads(decoded_string)
                except json.JSONDecodeError:
                    Logger.warning(f"Received non-JSON message from {sender}: {decoded_string}")
                    data_dict = {"raw_data": decoded_string}

                message = Message(data=data_dict, sender=sender)

                # Track message receipt for connection health
                self._last_message_received = datetime.now()
                self._messages_received_count += 1

                # DEBUG: Log every received message with count
                msg_type = data_dict.get("__type__", data_dict.get("type", "unknown"))
                Logger.debug(f"[MSG #{self._messages_received_count}] {self.mode.value} received from {sender}: type={msg_type}")

                # Handle ping/pong messages
                handled = await self._handle_ping_pong(message, sender)

                if not handled:
                    # Normal message - put in queue for user
                    Logger.debug(f"Received message from {sender}: {message.data}")
                    await self.incoming.put(message)

            except pynng_exceptions.Timeout:
                pass  # Timeout is expected during polling
            except Exception as e:
                Logger.error(f"Error in receiver handler: {e}, type: {type(e)}")

    async def _handle_ping_pong(self, message: Message, sender) -> bool:
        """
        Handle ping/pong messages internally.

        Parameters:
        - message: The received message
        - sender: The sender information

        Returns:
        - bool: True if message was handled (ping/pong), False if it's a normal message
        """
        try:
            # Data is already parsed as dict
            data = message.data
            message_type = data.get("__type__")

            if message_type == "ping":
                # Received ping - send pong if auto-pong enabled
                if self._auto_pong_enabled:
                    pong_message = {
                        "__type__": "pong",
                        "timestamp": datetime.now().isoformat(),
                        "ping_timestamp": data.get("timestamp")
                    }
                    await self.send_message(pong_message)
                    self._pong_count += 1
                    Logger.debug(f"Received ping from {sender}, sent pong (total pongs: {self._pong_count})")
                else:
                    Logger.debug(f"Received ping from {sender} (auto-pong disabled)")
                return True

            elif message_type == "pong":
                # Received pong response
                Logger.debug(f"Received pong from {sender}")
                return True

        except (AttributeError, TypeError, KeyError):
            # Doesn't have __type__ or invalid structure - treat as normal message
            pass

        return False

    async def _send_handler(self):
        """
        Internal handler that continuously sends messages to the bus.

        This infinite loop retrieves messages from the outgoing queue and sends
        them to the bus connection. Any exceptions during sending are logged.

        The loop runs until cancelled or an unhandled exception occurs.

        Note: In LISTENER mode, waits for at least one connection before sending
        to avoid losing messages due to BUS protocol's best-effort delivery.
        """
        # Wait for at least one connection in LISTENER mode before sending
        if self.mode == self.Mode.LISTENER:
            Logger.info("LISTENER mode: waiting for node connections before sending...")
            while len(self.active_connections) == 0:
                await asyncio.sleep(0.1)
            Logger.info(f"Node connected, sender ready ({len(self.active_connections)} connections)")

        while await asyncio.sleep(0, result=True):
            try:
                message = await self.outgoing.get()

                # Track message send for connection health
                self._last_message_sent = datetime.now()
                self._messages_sent_count += 1

                # DEBUG: Log message type before sending
                try:
                    data = json.loads(message)
                    msg_type = data.get("__type__", data.get("type", "unknown"))
                except:
                    msg_type = "non-json"
                Logger.debug(f"[SEND #{self._messages_sent_count}] {self.mode.value} sending: type={msg_type}")

                await self.connection.asend(message.encode())
            except Exception as e:
                Logger.error(f"Error in send handler: {e}, type: {type(e)}")

    def _add_callbacks(self):
        self.connection.add_post_pipe_connect_cb(self._post_connect_callback)
        self.connection.add_post_pipe_remove_cb(self._post_remove_callback)
        Logger.debug(f"Pipe callbacks registered for connection on {self.address}")

    def _post_connect_callback(self, pipe):
        """Internal callback when a new pipe connects."""
        if self._closing:
            return
        # Use a monotonic counter as key — avoids calling pipe.id (a pynng C
        # call) which can trigger SIGFPE/SIGABRT during NNG teardown.
        try:
            key = self._next_conn_key
            self._next_conn_key += 1
            conn_info = ConnectionInfo(
                pipe_id=key,
                sender="unknown",
                connected_at=datetime.now()
            )
            self.active_connections[key] = conn_info
            Logger.info(f"Connection established (key={key}), "
                        f"total={len(self.active_connections)}")
        except Exception as e:
            Logger.error(f"Error in connect callback: {e}")

    def _post_remove_callback(self, pipe):
        """Internal callback when a pipe disconnects."""
        if self._closing:
            return
        # Pop the oldest tracked connection (FIFO) — we cannot call pipe.id
        # here because that is a pynng C call that triggers SIGFPE/SIGABRT
        # when NNG is tearing down the pipe concurrently.
        try:
            if self.active_connections:
                key = next(iter(self.active_connections))
                conn_info = self.active_connections.pop(key, None)
                if conn_info:
                    duration = datetime.now() - conn_info.connected_at
                    Logger.info(f"Connection closed (key={key}), "
                                f"duration={duration.total_seconds():.2f}s, "
                                f"remaining={len(self.active_connections)}")

            if len(self.active_connections) == 0 and self.mode == self.Mode.DIALER:
                Logger.warning("DIALER has no active connections - waiting for reconnection")
        except Exception as e:
            Logger.error(f"Error in disconnect callback: {e}")

    def _extract_sender_info(self, pipe):
        """
        Extract sender information from the message pipe.

        Handles both TCP and IPC addresses. For TCP, extracts the IP address.
        For IPC, returns the pipe URL.

        Parameters:
        - pipe: The pynng pipe object containing remote address information.

        Returns:
        - str: The sender identifier (IP address for TCP, URL for IPC).
        """
        try:
            remote_addr = pipe.remote_address
            # Check if it's a TCP address (has 'addr' attribute)
            if hasattr(remote_addr, 'addr'):
                as_bytes = struct.pack("I", remote_addr.addr)
                ip = socket.inet_ntop(socket.AF_INET, as_bytes)
                port = socket.ntohs(remote_addr.port)
                return (ip, port)
            else:
                # For IPC or other address types, use the pipe URL
                return str(pipe.url) if hasattr(pipe, 'url') else "unknown"
        except Exception as e:
            Logger.debug(f"Error extracting sender info: {e}")
            return "unknown"

__init__(hub_address, mode)

Initialize Nng_bus_hub instance with address and operational mode.

Parameters: - hub_address (str): The address to connect or listen for bus connections. - mode (Mode): The operational mode - LISTENER (listens) or DIALER (dials)

The instance will set up incoming and outgoing message queues for asynchronous message handling.

Source code in src/cuemsutils/tools/HubServices.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(self, hub_address:str, mode:Mode):
    """
    Initialize Nng_bus_hub instance with address and operational mode.

    Parameters:
    - hub_address (str): The address to connect or listen for bus connections.
    - mode (Mode): The operational mode - LISTENER (listens) or DIALER (dials)

    The instance will set up incoming and outgoing message queues for asynchronous message handling.
    """
    self.active_connections: dict[int, ConnectionInfo] = {}  # key -> ConnectionInfo
    self._next_conn_key: int = 0  # monotonic key; avoids calling pipe.id in callbacks
    self.address = hub_address
    self.mode = mode
    self.incoming = asyncio.Queue()
    self.outgoing = asyncio.Queue()
    self.connection = None

    # Set to True at the very start of teardown so pipe callbacks become no-ops.
    # NNG fires post_pipe_remove callbacks from its internal C threads during socket
    # close; if those callbacks call any pynng C function (e.g. pipe.id) while NNG
    # internals are being freed, a SIGFPE or SIGABRT results.
    self._closing: bool = False

    # Connection health tracking
    self._last_message_received: Optional[datetime] = None
    self._last_message_sent: Optional[datetime] = None
    self._messages_received_count: int = 0
    self._messages_sent_count: int = 0

    # Ping/pong configuration
    self._auto_ping_enabled: bool = False
    self._auto_ping_interval: float = 10.0
    self._auto_ping_task: Optional[asyncio.Task] = None
    self._auto_pong_enabled: bool = True  # Nodes auto-respond by default
    self._ping_count: int = 0
    self._pong_count: int = 0

disable_auto_ping()

Disable automatic ping mechanism.

Source code in src/cuemsutils/tools/HubServices.py
356
357
358
359
def disable_auto_ping(self):
    """Disable automatic ping mechanism."""
    self._auto_ping_enabled = False
    Logger.info("Auto-ping disabled")

disable_auto_pong()

Disable automatic pong responses.

Source code in src/cuemsutils/tools/HubServices.py
366
367
368
369
def disable_auto_pong(self):
    """Disable automatic pong responses."""
    self._auto_pong_enabled = False
    Logger.debug("Auto-pong disabled")

enable_auto_ping(interval=10.0, inactivity_threshold=5.0)

Enable automatic ping mechanism for the controller.

The controller will automatically send ping messages to all nodes if there's been no activity for the specified threshold period. Nodes will automatically respond with pong messages.

Parameters: - interval: How often to check for inactive connections (default: 10.0 seconds) - inactivity_threshold: Send ping if no activity for this many seconds (default: 5.0)

Note: This is primarily useful for LISTENER mode to verify node connections.

Source code in src/cuemsutils/tools/HubServices.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def enable_auto_ping(self, interval: float = 10.0, inactivity_threshold: float = 5.0):
    """
    Enable automatic ping mechanism for the controller.

    The controller will automatically send ping messages to all nodes if there's
    been no activity for the specified threshold period. Nodes will automatically
    respond with pong messages.

    Parameters:
    - interval: How often to check for inactive connections (default: 10.0 seconds)
    - inactivity_threshold: Send ping if no activity for this many seconds (default: 5.0)

    Note: This is primarily useful for LISTENER mode to verify node connections.
    """
    self._auto_ping_enabled = True
    self._auto_ping_interval = interval
    self._inactivity_threshold = inactivity_threshold
    Logger.info(f"Auto-ping enabled: check every {interval}s, ping after {inactivity_threshold}s inactivity")

enable_auto_pong()

Enable automatic pong responses (enabled by default).

Source code in src/cuemsutils/tools/HubServices.py
361
362
363
364
def enable_auto_pong(self):
    """Enable automatic pong responses (enabled by default)."""
    self._auto_pong_enabled = True
    Logger.debug("Auto-pong enabled")

get_active_connections()

Get a list of all active connections.

Returns: - List[ConnectionInfo]: List of all active connection information.

Source code in src/cuemsutils/tools/HubServices.py
257
258
259
260
261
262
263
264
def get_active_connections(self) -> List[ConnectionInfo]:
    """
    Get a list of all active connections.

    Returns:
    - List[ConnectionInfo]: List of all active connection information.
    """
    return list(self.active_connections.values())

get_connection_count()

Get the number of active connections.

Returns: - int: Number of currently active connections.

Source code in src/cuemsutils/tools/HubServices.py
266
267
268
269
270
271
272
273
def get_connection_count(self) -> int:
    """
    Get the number of active connections.

    Returns:
    - int: Number of currently active connections.
    """
    return len(self.active_connections)

get_connection_health_info(activity_timeout=30.0)

Get connection health information based on message activity.

This is particularly useful for DIALERs to check if they're still connected to the controller, since nodes don't track the controller connection in active_connections.

Parameters: - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

  • dict: Health information containing:
    • is_healthy: bool - True if recent activity detected
    • last_received: datetime or None - Last message received time
    • last_sent: datetime or None - Last message sent time
    • seconds_since_activity: float or None - Seconds since last activity
    • messages_received: int - Total messages received
    • messages_sent: int - Total messages sent
Source code in src/cuemsutils/tools/HubServices.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def get_connection_health_info(self, activity_timeout: float = 30.0) -> Dict:
    """
    Get connection health information based on message activity.

    This is particularly useful for DIALERs to check if they're still 
    connected to the controller, since nodes don't track the controller
    connection in active_connections.

    Parameters:
    - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

    Returns:
    - dict: Health information containing:
        - is_healthy: bool - True if recent activity detected
        - last_received: datetime or None - Last message received time
        - last_sent: datetime or None - Last message sent time
        - seconds_since_activity: float or None - Seconds since last activity
        - messages_received: int - Total messages received
        - messages_sent: int - Total messages sent
    """
    now = datetime.now()
    last_activity = None

    # Determine most recent activity
    if self._last_message_received and self._last_message_sent:
        last_activity = max(self._last_message_received, self._last_message_sent)
    elif self._last_message_received:
        last_activity = self._last_message_received
    elif self._last_message_sent:
        last_activity = self._last_message_sent

    # Calculate seconds since last activity
    seconds_since_activity = None
    if last_activity:
        seconds_since_activity = (now - last_activity).total_seconds()

    # Determine health status
    is_healthy = False
    if seconds_since_activity is not None:
        is_healthy = seconds_since_activity <= activity_timeout

    return {
        'is_healthy': is_healthy,
        'last_received': self._last_message_received,
        'last_sent': self._last_message_sent,
        'seconds_since_activity': seconds_since_activity,
        'messages_received': self._messages_received_count,
        'messages_sent': self._messages_sent_count
    }

get_message() async

Retrieve a message from the incoming queue.

  • Message: The next message received from the bus. The data field is already JSON-decoded as a dict. This method blocks until a message is available.
Source code in src/cuemsutils/tools/HubServices.py
247
248
249
250
251
252
253
254
255
async def get_message(self) -> Message:
    """
    Retrieve a message from the incoming queue.

    Returns:
    - Message: The next message received from the bus. The `data` field is already JSON-decoded as a dict.
               This method blocks until a message is available.
    """
    return await self.incoming.get()

is_connection_healthy(activity_timeout=30.0)

Check if the connection is healthy based on recent activity.

Parameters: - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

Returns: - bool: True if connection appears healthy

Source code in src/cuemsutils/tools/HubServices.py
325
326
327
328
329
330
331
332
333
334
335
def is_connection_healthy(self, activity_timeout: float = 30.0) -> bool:
    """
    Check if the connection is healthy based on recent activity.

    Parameters:
    - activity_timeout: Seconds of inactivity before considering unhealthy (default: 30.0)

    Returns:
    - bool: True if connection appears healthy
    """
    return self.get_connection_health_info(activity_timeout)['is_healthy']

send_message(message) async

Queue a message to be sent to the bus.

Parameters: - message (dict | Message): The message to be sent. Must be a dict or Message object with dict data.

Raises: - TypeError: If message is not a dict or Message object, or if Message.data is not a dict.

The message is JSON-encoded and placed in the outgoing queue to be sent by the sender handler.

Source code in src/cuemsutils/tools/HubServices.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def send_message(self, message: dict | Message):
    """
    Queue a message to be sent to the bus.

    Parameters:
    - message (dict | Message): The message to be sent. Must be a dict or Message object with dict data.

    Raises:
    - TypeError: If message is not a dict or Message object, or if Message.data is not a dict.

    The message is JSON-encoded and placed in the outgoing queue to be sent by the sender handler.
    """
    # Extract data from Message object or use raw data
    if isinstance(message, Message):
        data = message.data
        if not isinstance(data, dict):
            raise TypeError(f"Message.data must be a dict, got {type(data).__name__}")
    elif isinstance(message, dict):
        data = message
    else:
        raise TypeError(f"send_message requires dict or Message, got {type(message).__name__}")

    # JSON-encode the dict
    json_data = json.dumps(data)

    await self.outgoing.put(json_data)

send_ping() async

Manually send a ping message to all connected nodes.

Returns: - int: Number of pings sent

Source code in src/cuemsutils/tools/HubServices.py
371
372
373
374
375
376
377
378
379
380
381
382
async def send_ping(self):
    """
    Manually send a ping message to all connected nodes.

    Returns:
    - int: Number of pings sent
    """
    ping_message = {"__type__": "ping", "timestamp": datetime.now().isoformat()}
    await self.send_message(ping_message)
    self._ping_count += 1
    Logger.debug(f"Ping sent (total: {self._ping_count})")
    return 1

start() async

Start the bus communication by initializing the connection and launching message handlers.

This method starts the bus connection based on the mode (LISTENER or DIALER), then launches infinite receiver and sender loops. It monitors both tasks and exits when the first exception occurs or connection is broken, properly cleaning up all running tasks.

Raises: - ValueError: If an unknown mode is set. - Exception: Re-raises any exception that occurs during task execution after cleanup.

Source code in src/cuemsutils/tools/HubServices.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def start(self):
    """
    Start the bus communication by initializing the connection and launching message handlers.

    This method starts the bus connection based on the mode (LISTENER or DIALER),
    then launches infinite receiver and sender loops. It monitors both tasks and
    exits when the first exception occurs or connection is broken, properly cleaning
    up all running tasks.

    Raises:
    - ValueError: If an unknown mode is set.
    - Exception: Re-raises any exception that occurs during task execution after cleanup.
    """
    match self.mode:
        case self.Mode.LISTENER:
            await self.start_listener()
        case self.Mode.DIALER:
            await self.start_dialer()
        case _:
            raise ValueError(f"Unknown mode: {self.mode}")

    sender_task = None
    receiver_task = None
    ping_task = None
    try:
        sender_task = asyncio.create_task(self._send_handler(), name="sender")
        receiver_task = asyncio.create_task(self._receiver_handler(), name="receiver")
        ping_task = asyncio.create_task(self._auto_ping_handler(), name="ping")

        tasks = [sender_task, receiver_task, ping_task]
        task_names = {sender_task: "sender", receiver_task: "receiver", ping_task: "ping"}

        Logger.info(f"NNG {self.mode.value} tasks started: sender, receiver, ping")

        done_tasks, pending_tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)

        # Check if completed task had an exception
        for task in done_tasks:
            task_name = task_names.get(task, "unknown")
            if task.exception() is not None:
                Logger.error(f"NNG {self.mode.value} {task_name} handler failed: {task.exception()}")
                Logger.error(f"Exception type: {type(task.exception()).__name__}")
            else:
                Logger.warning(f"NNG {self.mode.value} {task_name} handler exited unexpectedly (no exception)")

        # Log pending task cancellation
        pending_names = [task_names.get(t, "unknown") for t in pending_tasks]
        if pending_names:
            Logger.info(f"Cancelling pending tasks: {pending_names}")

        # Cancel and await pending tasks
        for task in pending_tasks:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass

        Logger.warning(f"NNG {self.mode.value} start() exiting - all tasks terminated")

    except Exception as e:
        Logger.error(f"Error occurred while starting tasks: {e} type: {type(e)}")
        # Cancel any running tasks
        for task in [sender_task, receiver_task, ping_task]:
            if task is not None and not task.done():
                task.cancel()
                try:
                    await task
                except (asyncio.CancelledError, Exception):
                    pass
        raise
    finally:
        # Set _closing=True FIRST so any pipe callbacks that fire during
        # task cancellation or socket close become immediate no-ops.
        # Accessing pipe.id inside a NNG pipe callback during teardown can
        # trigger SIGFPE/SIGABRT in NNG's C code — _closing prevents that.
        self._closing = True

        # Cancel all inner tasks before closing the socket.
        for task in [sender_task, receiver_task, ping_task]:
            if task is not None and not task.done():
                task.cancel()
                try:
                    await task
                except (asyncio.CancelledError, Exception):
                    pass
        if hasattr(self, 'connection') and self.connection is not None:
            try:
                self.connection.close()
            except Exception:
                pass
            self.connection = None

start_dialer() async

Initialize the bus connection in DIALER mode (dialing to controller).

Creates a Bus0 socket that dials to the configured address with a 100ms receive timeout. TCP keepalive is enabled BEFORE dialing to ensure it applies to the connection. Non-blocking dial is used to allow reconnection if controller is not yet available. Callbacks are registered BEFORE dialing to ensure no connection events are missed.

Source code in src/cuemsutils/tools/HubServices.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
async def start_dialer(self):
    """
    Initialize the bus connection in DIALER mode (dialing to controller).

    Creates a Bus0 socket that dials to the configured address with a 100ms receive timeout.
    TCP keepalive is enabled BEFORE dialing to ensure it applies to the connection.
    Non-blocking dial is used to allow reconnection if controller is not yet available.
    Callbacks are registered BEFORE dialing to ensure no connection events are missed.
    """
    # Create socket first, set options and callbacks, then dial (non-blocking for reconnection support)
    self.connection = Bus0(recv_timeout=RECV_TIMEOUT)
    self.connection.tcp_keepalive = True
    # Set explicit reconnection parameters
    self.connection.reconnect_time_min = 1000  # 1 second minimum
    self.connection.reconnect_time_max = 30000  # 30 seconds maximum
    self._add_callbacks()  # Register callbacks BEFORE dial to catch all events
    Logger.debug(f"DIALER options set: tcp_keepalive={self.connection.tcp_keepalive}, " +
                 f"reconnect_time_min={self.connection.reconnect_time_min}ms, " +
                 f"reconnect_time_max={self.connection.reconnect_time_max}ms")
    self.connection.dial(self.address, block=False)
    Logger.debug(f"DIALER dialing {self.address} (non-blocking)")

start_listener() async

Initialize the bus connection in LISTENER mode (listening for connections).

Creates a Bus0 socket that listens on the configured address with a 100ms receive timeout. TCP keepalive is enabled BEFORE listening to ensure it applies to all connections. Callbacks are registered BEFORE listening to ensure no connection events are missed.

Source code in src/cuemsutils/tools/HubServices.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def start_listener(self):
    """
    Initialize the bus connection in LISTENER mode (listening for connections).

    Creates a Bus0 socket that listens on the configured address with a 100ms receive timeout.
    TCP keepalive is enabled BEFORE listening to ensure it applies to all connections.
    Callbacks are registered BEFORE listening to ensure no connection events are missed.
    """
    # Create socket first, set options and callbacks, then listen
    self.connection = Bus0(recv_timeout=RECV_TIMEOUT)
    self.connection.tcp_keepalive = True
    self._add_callbacks()  # Register callbacks BEFORE listen to catch all events
    self.connection.listen(self.address)
    Logger.debug(f"LISTENER started on {self.address}, tcp_keepalive={self.connection.tcp_keepalive}")

SignalEngine

A class that handles system signals and status tracking.

Source code in src/cuemsutils/tools/SignalEngine.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class SignalEngine:
    """
    A class that handles system signals and status tracking.
    """
    def __init__(self, with_signals: bool = True):
        self.pid = getpid()
        Logger.info(f"Starting {self.__class__.__name__} with PID {self.pid}")
        self.running = False

        if with_signals:
            self.register_signals()

    ### RUNNING LOGIC ###
    @logged
    def start(self) -> None:
        self.running = True
        Logger.info(f"{self.__class__.__name__} started")
        self.run()

    def restart(self) -> None:
        pass

    def reload(self) -> None:
        pass

    @logged
    def run(self, tick: float = 3, max_tick: float | None = None) -> None:
        while self.running:
            sleep(tick)
            if max_tick is not None:
                if tick < max_tick:
                    tick += 0.01
                else:
                    self.stop()

    @logged
    def stop(self) -> None:
        self.stop_requested = True
        try:
            if hasattr(self, 'stop_all'):
                self.stop_all()  # type: ignore[attr-defined]
        except:
            Logger.warning('Exception when calling stop_all')
        self.running = False

    ### COMMUNICATE WITH SYSTEMD ###
    def notify_systemd(self, status: str = 'READY'):
        Logger.debug('Startup complete, notifying systemd')
        notify_systemd_daemon(f'{status.upper()}=1')

    ### SIGNALS HANDLERS ###
    def register_signals(self) -> None:
        signal.signal(signal.SIGINT, self.handle_interrupt)
        signal.signal(signal.SIGTERM, self.handle_terminate)
        signal.signal(signal.SIGUSR1, self.handle_print_running)
        signal.signal(signal.SIGUSR2, self.handle_print_all)
        signal.signal(signal.SIGCHLD, self.handle_child_signal)

    def handle_interrupt(self, sigNum, frame) -> None:
        string = f'SIGINT received! Exiting with result code: {sigNum}'
        print('\n\n' + string + '\n\n')
        Logger.info(string)

        self.stop()
        sleep(0.1)
        exit()

    def handle_terminate(self, sigNum, frame) -> None:
        string = f'SIGTERM received! Exiting with result code: {sigNum}'
        print('\n\n' + string + '\n\n')
        Logger.info(string)

        self.stop()
        sleep(0.1)
        exit()

    def handle_print_all(self, sigNum, frame) -> None:
        Logger.info(f"STATUS REQUEST BY SIGUSR2 SIGNAL {sigNum}")
        if hasattr(self, 'print_all_status'):
            self.print_all_status()  # type: ignore[attr-defined]

    def handle_print_running(self, sigNum, frame) -> None:
        run_str = "" if self.running else " NOT"
        string = f"SIGNAL {sigNum} recieved: {self.__class__.__name__} is{run_str} running"
        Logger.info(string)
        print(string)

    def handle_child_signal(self, sigNum, frame):
        pass

StringSanitizer

Ensure that the string is sanitized and safe for use in the system

Source code in src/cuemsutils/tools/StringSanitizer.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class StringSanitizer():
    """Ensure that the string is sanitized and safe for use in the system"""
    @staticmethod
    def sanitize_text_size(_string):

        if _string and (len(_string) > 65535):
            _string = _string[0:65534] # return frist 255 characters
        return _string

    @staticmethod
    def sanitize_name(_string): #TODO: scape characters?
        if len(_string) > 255 :
            _string = _string[0:254] # return frist 255 characters
        return _string

    @staticmethod
    def sanitize_file_name(_string):
        if len(_string) >= 240 :
            _string = _string[0:236] + _string[-4:] # return frist 236 characters + last 4 chars = total 240 of max 255. Leave room for versioning and .tmp

        _string = _string.replace(' ', '_')
        _string = _string.replace('-', '_')
        keepcharacters = ('.','_')
        return "".join(c for c in _string if c.isalnum() or c in keepcharacters).rstrip().lower()

    @staticmethod
    def sanitize_dir_name(_string):
        if len(_string) >= 240 :
            _string = _string[0:236] + _string[-4:] # return frist 236 characters + last 4 chars = total 240 of max 255. Leave room for versioning and .tmp

        _string = _string.replace(' ', '_')
        _string = _string.replace('-', '_')
        keepcharacters = ('_')
        return "".join(c for c in _string if c.isalnum() or c in keepcharacters).rstrip().lower()

    @staticmethod
    def sanitize_dir_permit_increment(_string):
        if len(_string) >= 240 :
            _string = _string[0:236] + _string[-4:] # return frist 236 characters + last 4 chars = total 240 of max 255. Leave room for versioning and .tmp

        _string = _string.replace(' ', '_')
        keepcharacters = ('_', '-')
        return "".join(c for c in _string if c.isalnum() or c in keepcharacters).rstrip().lower()

Uuid

A class to interact with unique identifiers.

Comparisions should be made based on memory allocation.

Calling or printing the instance will return the uuid4 string.

Source code in src/cuemsutils/tools/Uuid.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Uuid():
    """A class to interact with unique identifiers.

        Comparisions should be made based on memory allocation.

        Calling or printing the instance will return the uuid4 string.
    """
    def __init__(self, uuid: str | None = None):
        if not uuid:
            self.uuid = str(uuid4())
        else:
            self.uuid = str(uuid)
        if not self.check():
            raise ValueError(f'uuid {uuid} is not valid')

    def __str__(self):
        return self.uuid

    def __repr__(self):
        return self.uuid

    def __call__(self):
        return self.uuid

    def __hash__(self):
        return hash(self.uuid)

    def __eq__(self, other):
        if isinstance(other, Uuid):
            return self.uuid == other.uuid
        elif isinstance(other, str):
            return self.uuid == other
        else:
            return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def __json__(self):
        return self.uuid

    def items(self):
        return [("uuid", self.uuid)]

    def check(self):
        m = match(UUID4_REGEX, self.uuid)
        if m:
            return m.span() == (0, 36)
        return False