Skip to content

Comms

AsyncCommsThread

Bases: Thread

Base class for asynchronous communication threads.

This class extends Thread to run an asyncio event loop in a separate daemon thread. Subclasses must implement create_all_tasks() to define the async tasks that will be executed concurrently.

The event loop runs in the background thread and can be safely accessed from other threads using run_coroutine().

Attributes:

Name Type Description
thread_name str

Base name for the thread.

name str

Full thread name with 'AsyncComms-' prefix.

timeout float

Default timeout in seconds for coroutine execution.

stop_requested bool

Flag indicating whether thread should stop.

send_contexts List

List of send contexts (subclass-specific).

event_loop AbstractEventLoop

The asyncio event loop running in this thread. None until run() is called.

Example

Subclass implementation:

class MyAsyncComms(AsyncCommsThread):
    async def my_task(self):
        # Do async work
        pass

    def create_all_tasks(self):
        return [asyncio.create_task(self.my_task())]
Source code in src/cuemsengine/comms/AsyncCommsThread.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
class AsyncCommsThread(Thread):
    """Base class for asynchronous communication threads.

    This class extends Thread to run an asyncio event loop in a separate daemon
    thread. Subclasses must implement `create_all_tasks()` to define the async
    tasks that will be executed concurrently.

    The event loop runs in the background thread and can be safely accessed from
    other threads using `run_coroutine()`.

    Attributes:
        thread_name (str): Base name for the thread.
        name (str): Full thread name with 'AsyncComms-' prefix.
        timeout (float): Default timeout in seconds for coroutine execution.
        stop_requested (bool): Flag indicating whether thread should stop.
        send_contexts (List): List of send contexts (subclass-specific).
        event_loop (asyncio.AbstractEventLoop): The asyncio event loop running
            in this thread. None until `run()` is called.

    Example:
        Subclass implementation:

        ```python
        class MyAsyncComms(AsyncCommsThread):
            async def my_task(self):
                # Do async work
                pass

            def create_all_tasks(self):
                return [asyncio.create_task(self.my_task())]
        ```
    """
    def __init__(self, **kwargs):
        """Initialize the AsyncCommsThread.

        Creates a daemon thread that will run an asyncio event loop. The thread
        is configured with a name and optional timeout for coroutine execution.

        Args:
            **kwargs: Keyword arguments.
                - thread_name (str, optional): Base name for the thread.
                    Defaults to the name of the subclass.
                - timeout (float, optional): Timeout in seconds for coroutine
                    execution. Defaults to TIMEOUT (15 seconds).

        Note:
            The thread is created as a daemon thread, so it will automatically
            terminate when the main program exits.
        """
        self.thread_name = kwargs.get('thread_name', type(self).__name__)
        Logger.info(f'Initializing AsyncCommsThread: {self.thread_name}')
        super().__init__(name=self.thread_name, daemon=True)
        self.name = f'AsyncComms-{self.thread_name}'
        self.timeout = kwargs.get('timeout', TIMEOUT)
        self.stop_requested = False
        self.send_contexts: List[Any] = []
        self.event_loop: asyncio.AbstractEventLoop | None = None

    def run(self) -> None:
        """Thread entry point.

        Creates a new asyncio event loop, schedules the async communications
        task, and runs the event loop forever. This method is called
        automatically when the thread is started.

        The event loop will continue running until `stop()` is called, which
        will cause the loop to stop and the thread to terminate.
        """
        Logger.info(f'Running {self.name}')
        self.event_loop = asyncio.new_event_loop()
        self.event_loop.create_task(self.run_asyncio_comms())
        self.event_loop.run_forever()

    def stop(self) -> None:
        """Stop the thread and event loop.

        Thread-safe method that signals the thread to stop and schedules the
        async stop coroutine to run in the event loop. This will cause the
        event loop to stop and the thread to terminate.

        Note:
            This method can be called from any thread. It does not wait for
            the thread to fully terminate.
        """
        self.stop_requested = True
        if self.event_loop and self.is_alive():
            try:
                asyncio.run_coroutine_threadsafe(self.stop_async(), self.event_loop)
            except Exception as e:
                Logger.debug(f'Error stopping {self.name}: {e}')

    async def stop_async(self) -> None:
        """Async stop handler.

        Cancels all running tasks, waits for cleanup, then stops the event loop.
        This is called internally by `stop()` and should not be called directly.

        Note:
            This coroutine must run in the same event loop that it stops.
        """
        # Get all tasks except the current one
        current_task = asyncio.current_task()
        pending_tasks = [
            task for task in asyncio.all_tasks(self.event_loop)
            if task is not current_task and not task.done()
        ]

        # Cancel all pending tasks
        for task in pending_tasks:
            task.cancel()

        # Wait for all tasks to complete cancellation
        if pending_tasks:
            await asyncio.gather(*pending_tasks, return_exceptions=True)
            Logger.debug(f'{self.name} cancelled {len(pending_tasks)} pending tasks')

        # Now stop the event loop
        self.event_loop.call_soon_threadsafe(self.event_loop.stop)
        Logger.info(f'{self.name} event loop stopped')

    async def run_asyncio_comms(self) -> None:
        """Run all async communication tasks.

        Creates all tasks from `create_all_tasks()` and waits for them to
        complete. Tasks run concurrently and exceptions are captured rather
        than immediately raised (via `return_exceptions=True`).

        This method runs until all tasks complete or until `stop_async()` is
        called.

        Note:
            Subclasses should implement `create_all_tasks()` to return a list
            of asyncio tasks that need to run concurrently.
        """
        Logger.info(f'Starting asyncio communications in {self.name}')
        tasks = self.create_all_tasks()
        results = await asyncio.gather(*tasks, return_exceptions=True)
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                Logger.error(f'{self.name} task {i} failed with {type(result).__name__}: {result}')
        Logger.info(f'{self.name} asyncio communications finished')

    def create_all_tasks(self) -> List[asyncio.Task]:
        """Create all async tasks to run concurrently.

        Subclasses must implement this method to return a list of asyncio
        tasks that should run concurrently in the event loop. These tasks
        typically handle various communication channels or services.

        Returns:
            List[asyncio.Task]: List of asyncio tasks to run concurrently.

        Raises:
            NotImplementedError: If not implemented by subclass.

        Example:
            ```python
            def create_all_tasks(self):
                return [
                    asyncio.create_task(self.listener_task()),
                    asyncio.create_task(self.sender_task()),
                ]
            ```
        """
        raise NotImplementedError('create_all_tasks is not implemented')

    def run_coroutine(self, coroutine: Callable, message: dict, timeout: Optional[float] = None) -> Any:
        """Run a coroutine in the event loop from another thread.

        Thread-safe method to execute a coroutine function in this thread's
        event loop. The coroutine is called with the provided message and
        the result is returned synchronously, with a timeout.

        This is the primary way to interact with the async event loop from
        other threads (e.g., the main thread).

        Args:
            coroutine: A coroutine function to execute. Must be a coroutine
                function (not a regular function).
            message: Dictionary to pass as argument to the coroutine.
            timeout: Optional timeout in seconds (defaults to self.timeout). -1 means no timeout.

        Returns:
            Any: The return value from the coroutine.

        Raises:
            AttributeError: If the event loop has not been initialized (thread
                not started).
            TypeError: If `coroutine` is not a coroutine function.
            TimeoutError: If the coroutine does not complete within `timeout`
                seconds.
            Exception: If the coroutine raises an exception, it is re-raised
                here.

        Example:
            ```python
            async def send_message(msg: dict) -> dict:
                # Async operation
                return {'status': 'ok'}

            # From another thread:
            result = comms_thread.run_coroutine(send_message, {'data': 'test'})
            ```
        """
        if not self.event_loop:
            raise AttributeError(f'{self.name} event loop is not initialized')

        if not asyncio.iscoroutinefunction(coroutine):
            raise TypeError(f'{self.name} parameter coroutine is not a coroutine function')

        function_name = coroutine.__name__
        Logger.debug(f'{self.name} running coroutine: {function_name}')

        if timeout is None:
            timeout = self.timeout

        if timeout == -1:
            timeout = None

        send_task = asyncio.run_coroutine_threadsafe(
            coroutine(message), self.event_loop
        )
        try:
            result = send_task.result(timeout=timeout)
            Logger.debug(f'{self.name} {function_name} returned: {result!r}')
            return result
        except TimeoutError:
            Logger.error(f'{self.name} {function_name} timed out after {timeout}s')
            send_task.cancel()
            raise
        except Exception as exc:
            Logger.error(f'{self.name} {function_name} raised an exception: {exc!r}')
            send_task.cancel()
            raise

__init__(**kwargs)

Initialize the AsyncCommsThread.

Creates a daemon thread that will run an asyncio event loop. The thread is configured with a name and optional timeout for coroutine execution.

Parameters:

Name Type Description Default
**kwargs

Keyword arguments. - thread_name (str, optional): Base name for the thread. Defaults to the name of the subclass. - timeout (float, optional): Timeout in seconds for coroutine execution. Defaults to TIMEOUT (15 seconds).

{}
Note

The thread is created as a daemon thread, so it will automatically terminate when the main program exits.

Source code in src/cuemsengine/comms/AsyncCommsThread.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __init__(self, **kwargs):
    """Initialize the AsyncCommsThread.

    Creates a daemon thread that will run an asyncio event loop. The thread
    is configured with a name and optional timeout for coroutine execution.

    Args:
        **kwargs: Keyword arguments.
            - thread_name (str, optional): Base name for the thread.
                Defaults to the name of the subclass.
            - timeout (float, optional): Timeout in seconds for coroutine
                execution. Defaults to TIMEOUT (15 seconds).

    Note:
        The thread is created as a daemon thread, so it will automatically
        terminate when the main program exits.
    """
    self.thread_name = kwargs.get('thread_name', type(self).__name__)
    Logger.info(f'Initializing AsyncCommsThread: {self.thread_name}')
    super().__init__(name=self.thread_name, daemon=True)
    self.name = f'AsyncComms-{self.thread_name}'
    self.timeout = kwargs.get('timeout', TIMEOUT)
    self.stop_requested = False
    self.send_contexts: List[Any] = []
    self.event_loop: asyncio.AbstractEventLoop | None = None

create_all_tasks()

Create all async tasks to run concurrently.

Subclasses must implement this method to return a list of asyncio tasks that should run concurrently in the event loop. These tasks typically handle various communication channels or services.

Returns:

Type Description
List[Task]

List[asyncio.Task]: List of asyncio tasks to run concurrently.

Raises:

Type Description
NotImplementedError

If not implemented by subclass.

Example
def create_all_tasks(self):
    return [
        asyncio.create_task(self.listener_task()),
        asyncio.create_task(self.sender_task()),
    ]
Source code in src/cuemsengine/comms/AsyncCommsThread.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
def create_all_tasks(self) -> List[asyncio.Task]:
    """Create all async tasks to run concurrently.

    Subclasses must implement this method to return a list of asyncio
    tasks that should run concurrently in the event loop. These tasks
    typically handle various communication channels or services.

    Returns:
        List[asyncio.Task]: List of asyncio tasks to run concurrently.

    Raises:
        NotImplementedError: If not implemented by subclass.

    Example:
        ```python
        def create_all_tasks(self):
            return [
                asyncio.create_task(self.listener_task()),
                asyncio.create_task(self.sender_task()),
            ]
        ```
    """
    raise NotImplementedError('create_all_tasks is not implemented')

run()

Thread entry point.

Creates a new asyncio event loop, schedules the async communications task, and runs the event loop forever. This method is called automatically when the thread is started.

The event loop will continue running until stop() is called, which will cause the loop to stop and the thread to terminate.

Source code in src/cuemsengine/comms/AsyncCommsThread.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def run(self) -> None:
    """Thread entry point.

    Creates a new asyncio event loop, schedules the async communications
    task, and runs the event loop forever. This method is called
    automatically when the thread is started.

    The event loop will continue running until `stop()` is called, which
    will cause the loop to stop and the thread to terminate.
    """
    Logger.info(f'Running {self.name}')
    self.event_loop = asyncio.new_event_loop()
    self.event_loop.create_task(self.run_asyncio_comms())
    self.event_loop.run_forever()

run_asyncio_comms() async

Run all async communication tasks.

Creates all tasks from create_all_tasks() and waits for them to complete. Tasks run concurrently and exceptions are captured rather than immediately raised (via return_exceptions=True).

This method runs until all tasks complete or until stop_async() is called.

Note

Subclasses should implement create_all_tasks() to return a list of asyncio tasks that need to run concurrently.

Source code in src/cuemsengine/comms/AsyncCommsThread.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def run_asyncio_comms(self) -> None:
    """Run all async communication tasks.

    Creates all tasks from `create_all_tasks()` and waits for them to
    complete. Tasks run concurrently and exceptions are captured rather
    than immediately raised (via `return_exceptions=True`).

    This method runs until all tasks complete or until `stop_async()` is
    called.

    Note:
        Subclasses should implement `create_all_tasks()` to return a list
        of asyncio tasks that need to run concurrently.
    """
    Logger.info(f'Starting asyncio communications in {self.name}')
    tasks = self.create_all_tasks()
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            Logger.error(f'{self.name} task {i} failed with {type(result).__name__}: {result}')
    Logger.info(f'{self.name} asyncio communications finished')

run_coroutine(coroutine, message, timeout=None)

Run a coroutine in the event loop from another thread.

Thread-safe method to execute a coroutine function in this thread's event loop. The coroutine is called with the provided message and the result is returned synchronously, with a timeout.

This is the primary way to interact with the async event loop from other threads (e.g., the main thread).

Parameters:

Name Type Description Default
coroutine Callable

A coroutine function to execute. Must be a coroutine function (not a regular function).

required
message dict

Dictionary to pass as argument to the coroutine.

required
timeout Optional[float]

Optional timeout in seconds (defaults to self.timeout). -1 means no timeout.

None

Returns:

Name Type Description
Any Any

The return value from the coroutine.

Raises:

Type Description
AttributeError

If the event loop has not been initialized (thread not started).

TypeError

If coroutine is not a coroutine function.

TimeoutError

If the coroutine does not complete within timeout seconds.

Exception

If the coroutine raises an exception, it is re-raised here.

Example
async def send_message(msg: dict) -> dict:
    # Async operation
    return {'status': 'ok'}

# From another thread:
result = comms_thread.run_coroutine(send_message, {'data': 'test'})
Source code in src/cuemsengine/comms/AsyncCommsThread.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def run_coroutine(self, coroutine: Callable, message: dict, timeout: Optional[float] = None) -> Any:
    """Run a coroutine in the event loop from another thread.

    Thread-safe method to execute a coroutine function in this thread's
    event loop. The coroutine is called with the provided message and
    the result is returned synchronously, with a timeout.

    This is the primary way to interact with the async event loop from
    other threads (e.g., the main thread).

    Args:
        coroutine: A coroutine function to execute. Must be a coroutine
            function (not a regular function).
        message: Dictionary to pass as argument to the coroutine.
        timeout: Optional timeout in seconds (defaults to self.timeout). -1 means no timeout.

    Returns:
        Any: The return value from the coroutine.

    Raises:
        AttributeError: If the event loop has not been initialized (thread
            not started).
        TypeError: If `coroutine` is not a coroutine function.
        TimeoutError: If the coroutine does not complete within `timeout`
            seconds.
        Exception: If the coroutine raises an exception, it is re-raised
            here.

    Example:
        ```python
        async def send_message(msg: dict) -> dict:
            # Async operation
            return {'status': 'ok'}

        # From another thread:
        result = comms_thread.run_coroutine(send_message, {'data': 'test'})
        ```
    """
    if not self.event_loop:
        raise AttributeError(f'{self.name} event loop is not initialized')

    if not asyncio.iscoroutinefunction(coroutine):
        raise TypeError(f'{self.name} parameter coroutine is not a coroutine function')

    function_name = coroutine.__name__
    Logger.debug(f'{self.name} running coroutine: {function_name}')

    if timeout is None:
        timeout = self.timeout

    if timeout == -1:
        timeout = None

    send_task = asyncio.run_coroutine_threadsafe(
        coroutine(message), self.event_loop
    )
    try:
        result = send_task.result(timeout=timeout)
        Logger.debug(f'{self.name} {function_name} returned: {result!r}')
        return result
    except TimeoutError:
        Logger.error(f'{self.name} {function_name} timed out after {timeout}s')
        send_task.cancel()
        raise
    except Exception as exc:
        Logger.error(f'{self.name} {function_name} raised an exception: {exc!r}')
        send_task.cancel()
        raise

stop()

Stop the thread and event loop.

Thread-safe method that signals the thread to stop and schedules the async stop coroutine to run in the event loop. This will cause the event loop to stop and the thread to terminate.

Note

This method can be called from any thread. It does not wait for the thread to fully terminate.

Source code in src/cuemsengine/comms/AsyncCommsThread.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def stop(self) -> None:
    """Stop the thread and event loop.

    Thread-safe method that signals the thread to stop and schedules the
    async stop coroutine to run in the event loop. This will cause the
    event loop to stop and the thread to terminate.

    Note:
        This method can be called from any thread. It does not wait for
        the thread to fully terminate.
    """
    self.stop_requested = True
    if self.event_loop and self.is_alive():
        try:
            asyncio.run_coroutine_threadsafe(self.stop_async(), self.event_loop)
        except Exception as e:
            Logger.debug(f'Error stopping {self.name}: {e}')

stop_async() async

Async stop handler.

Cancels all running tasks, waits for cleanup, then stops the event loop. This is called internally by stop() and should not be called directly.

Note

This coroutine must run in the same event loop that it stops.

Source code in src/cuemsengine/comms/AsyncCommsThread.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
async def stop_async(self) -> None:
    """Async stop handler.

    Cancels all running tasks, waits for cleanup, then stops the event loop.
    This is called internally by `stop()` and should not be called directly.

    Note:
        This coroutine must run in the same event loop that it stops.
    """
    # Get all tasks except the current one
    current_task = asyncio.current_task()
    pending_tasks = [
        task for task in asyncio.all_tasks(self.event_loop)
        if task is not current_task and not task.done()
    ]

    # Cancel all pending tasks
    for task in pending_tasks:
        task.cancel()

    # Wait for all tasks to complete cancellation
    if pending_tasks:
        await asyncio.gather(*pending_tasks, return_exceptions=True)
        Logger.debug(f'{self.name} cancelled {len(pending_tasks)} pending tasks')

    # Now stop the event loop
    self.event_loop.call_soon_threadsafe(self.event_loop.stop)
    Logger.info(f'{self.name} event loop stopped')

Utilites for communications from ControllerEngine and NodeEngine.

ControllerCommunications

Bases: AsyncCommsThread

Communications class for ControllerEngine.

Handles: - Editor messages - Player operation messages - Nodeconf messages - HWDiscovery messages - WebSocket OSC messages (commands from UI)

Source code in src/cuemsengine/comms/ControllerCommunications.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class ControllerCommunications(AsyncCommsThread):
    """
    Communications class for ControllerEngine.

    Handles:
    - Editor messages
    - Player operation messages
    - Nodeconf messages
    - HWDiscovery messages
    - WebSocket OSC messages (commands from UI)
    """
    def __init__(self, 
                 nng_hub_address: str,
                 editor_callback: Callable,
                 node_operation_callback: dict[OperationType, Callable],
                 websocket_osc_config: Optional[dict] = None):
        """
        Initialize AsyncCommsThread for ControllerEngine.

        Parameters:
        - nng_hub_address: TCP/IPC address for NNG hub (e.g., "tcp://127.0.0.1:5555")
        - editor_callback: Callback for editor messages
        - node_operation_callback: Callback dictionary for received node operations
        - websocket_osc_config: Optional dict with WebSocket OSC listener config:
            - host: Host to bind to (default: "0.0.0.0")
            - port: Port to listen on (default: 9190)
            - node_id: Node identifier for NNG operations
        """
        super().__init__()

        # Initialize communicators
        Logger.debug('Initializing ControllerCommunications')
        self.editor_callback = editor_callback
        self.editor = Communicator(IpcAddress.EDITOR.value)
        self.hw_discovery = Communicator(IpcAddress.HWDISCOVERY.value)
        self.nodeconf = Communicator(IpcAddress.NODECONF.value)

        # Initialize OSC hub based on mode
        Logger.info(f'Initializing NNG hub: {nng_hub_address} in {NodesHub.Mode.LISTENER.value} mode')
        self.nng_hub = NodesHub(
            hub_address=nng_hub_address, mode=NodesHub.Mode.LISTENER
        )

        # Set operation callbacks
        self.nng_hub.set_receive_callbacks(node_operation_callback)

        # WebSocket OSC configuration
        self._ws_osc_config = websocket_osc_config or {}
        self._ws_osc_host = self._ws_osc_config.get('host', '0.0.0.0')
        self._ws_osc_port = self._ws_osc_config.get('port', 9190)
        self._node_id = self._ws_osc_config.get('node_id', 'controller')

        # WebSocket OSC router for message handling
        self._osc_router = WebSocketOscRouter()

        # Track connected WebSocket clients for status broadcast (bidirectional)
        self._ws_clients: set = set()

        # Command handlers (set by ControllerEngine)
        self._command_handlers: dict[str, Callable] = {}

        # Optional callback for new WebSocket client connections (late-join state dump)
        self._on_client_connect: Optional[Callable] = None

    def create_all_tasks(self):
        Logger.info('Starting all tasks in ControllerCommunications')
        tasks = [
            asyncio.create_task(self.editor_listener()),
            asyncio.create_task(self.nng_hub.start()),
            asyncio.create_task(self.nng_hub.start_message_receiver())
        ]

        # Add WebSocket OSC listener if configured
        if self._ws_osc_port:
            tasks.append(asyncio.create_task(self._websocket_osc_task()))

        return tasks

    #########################
    # WebSocket OSC handling
    #########################

    def register_command_handler(self, osc_path: str, handler: Callable[[Any], None], 
                                  forward_to_nodes: bool = True) -> None:
        """Register a handler for an OSC command path.

        Args:
            osc_path: The OSC address to handle (e.g., '/engine/command/go')
            handler: Callback function to handle the command value
            forward_to_nodes: If True, also forward the command to NodeEngine via NNG
        """
        self._command_handlers[osc_path] = {
            'handler': handler,
            'forward': forward_to_nodes
        }

        # Register with the OSC router
        self._osc_router.register(osc_path, lambda addr, args: self._handle_osc_command(addr, args))
        Logger.debug(f"Registered command handler for {osc_path} (forward={forward_to_nodes})")

    def register_osc_handler(self, osc_pattern: str, handler: Callable[[str, list], None]) -> None:
        """Register a generic OSC handler for a pattern (non-command messages).

        Args:
            osc_pattern: OSC address pattern (e.g., '/engine/players/*')
            handler: Callback function receiving (address, args)
        """
        self._osc_router.register(osc_pattern, handler)
        Logger.debug(f"Registered OSC handler for {osc_pattern}")

    def _handle_osc_command(self, address: str, args: list[Any]) -> None:
        """Handle an OSC command received via WebSocket.

        Calls the registered handler and optionally forwards to NodeEngine.
        """
        handler_info = self._command_handlers.get(address)
        if not handler_info:
            Logger.warning(f"No handler registered for OSC command: {address}")
            return

        # Get the value (first argument, or None for impulse)
        value = args[0] if args else None

        Logger.info(f"WebSocket OSC command received: {address} = {repr(value)}")

        # Call the handler
        try:
            handler_info['handler'](value)
        except Exception as e:
            Logger.error(f"Error executing command handler for {address}: {e}")

        # Forward to NodeEngine via NNG if configured
        if handler_info.get('forward', True):
            self._forward_command_to_nodes(address, value)

    def _forward_command_to_nodes(self, address: str, value: Any) -> None:
        """Forward a command to NodeEngine via NNG.

        Args:
            address: The OSC command address (e.g., '/engine/command/go')
            value: The command value
        """
        # Extract command name from address (e.g., '/engine/command/go' -> 'go')
        parts = address.strip('/').split('/')
        command_name = parts[-1] if parts else address

        operation = NodeOperation(
            type=OperationType.COMMAND,
            action=ActionType.UPDATE,
            sender=self._node_id,
            target=command_name,
            data={'value': value, 'address': address}
        )

        # Send via NNG (fire-and-forget)
        try:
            asyncio.run_coroutine_threadsafe(
                self.nng_hub.send_operation(operation),
                self.event_loop
            )
            Logger.debug(f"Forwarded command to nodes: {command_name} = {repr(value)}")
        except Exception as e:
            Logger.error(f"Error forwarding command to nodes: {e}")

    def set_on_client_connect(self, callback: Callable) -> None:
        """Set callback for new WebSocket client connections.

        The callback receives the websocket object and is awaited
        inside the connection handler (runs on the comms event loop).
        """
        self._on_client_connect = callback

    async def _websocket_osc_task(self) -> None:
        """Async task that runs the WebSocket OSC listener."""
        await websocket_osc_listener(
            host=self._ws_osc_host,
            port=self._ws_osc_port,
            message_handler=self._osc_router.route,
            stop_check=lambda: self.stop_requested,
            client_set=self._ws_clients,
            on_connect=self._on_client_connect
        )

    def broadcast_osc(self, address: str, value: Any) -> None:
        """Send an OSC status message to all connected WebSocket clients.

        Call from ControllerEngine when status changes (running, armed, load, timecode).
        Thread-safe: schedules send on the comms event loop.

        Args:
            address: OSC address (e.g. '/engine/status/armed')
            value: Value to send (str, int, or float)
        """
        data = build_osc_message(address, value)
        if not data or not self._ws_clients:
            return
        async def _send_all():
            for ws in list(self._ws_clients):
                try:
                    await ws.send(data)
                except Exception as e:
                    Logger.debug(f"WebSocket broadcast to client failed: {e}")
        try:
            asyncio.run_coroutine_threadsafe(_send_all(), self.event_loop)
        except Exception as e:
            Logger.debug(f"Could not schedule status broadcast: {e}")


    #########################
    # Editor messages
    #########################
    async def editor_listener(self):
        """Editor listener (thread-safe)."""
        Logger.info('Editor listener started')
        await self.editor.responder_connect()
        while not self.stop_requested:
            Logger.debug(f'waiting for editor message')
            await self.editor.responder_get_request(self.editor_callback)

    async def respond_to_editor(self, message, context: Context):
        """Respond to editor (thread-safe)."""
        Logger.debug(f'Sending to editor: {message}, with context ')
        await context.asend(json.dumps(message).encode())

    def reply_to_editor(self, message, context: Context):
        send_task = asyncio.run_coroutine_threadsafe(
            self.editor.responder_post_reply(message, context),
            self.event_loop
        )
        try:
            _ = send_task.result(timeout=self.timeout)
        except TimeoutError:
            Logger.debug('The coroutine took too long, cancelling the task...')
            send_task.cancel()
            raise
        except Exception as exc:
            Logger.debug(f'The coroutine raised an exception: {exc!r}')
            send_task.cancel()
            raise


    #########################
    # Nodeconf messages
    #########################
    def request_to_nodeconf(self, message: dict, timeout: Optional[float] = None) -> dict:
        """
        Send a request to nodeconf and get response (thread-safe).

        Parameters:
        - message: Dictionary containing the request message
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)

        Returns:
        - dict: Response from `nodeconf.send_request` via `run_coroutine` method

        Raises:
        - AttributeError: If `nodeconf` is not initialized
        """
        if not self.nodeconf:
            raise AttributeError('nodeconf communicator is not initialized')

        return self.run_coroutine(self.nodeconf.send_request, message, timeout)

    #########################
    # HWDiscovery messages
    #########################
    def request_to_hwdiscovery(self, message: dict, timeout: Optional[float] = None) -> dict:
        """
        Send a request to hardware discovery and get response (thread-safe).

        Parameters:
        - message: Dictionary containing the request message
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)

        Returns:
        - dict: Response from `hwdiscovery.send_request` via `run_coroutine` method

        Raises:
        - AttributeError: If `hwdiscovery` is not initialized
        """
        if not self.hw_discovery:
            raise AttributeError('hw_discovery communicator is not initialized')

        return self.run_coroutine(self.hw_discovery.send_request, message, timeout)

__init__(nng_hub_address, editor_callback, node_operation_callback, websocket_osc_config=None)

Initialize AsyncCommsThread for ControllerEngine.

Parameters: - nng_hub_address: TCP/IPC address for NNG hub (e.g., "tcp://127.0.0.1:5555") - editor_callback: Callback for editor messages - node_operation_callback: Callback dictionary for received node operations - websocket_osc_config: Optional dict with WebSocket OSC listener config: - host: Host to bind to (default: "0.0.0.0") - port: Port to listen on (default: 9190) - node_id: Node identifier for NNG operations

Source code in src/cuemsengine/comms/ControllerCommunications.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(self, 
             nng_hub_address: str,
             editor_callback: Callable,
             node_operation_callback: dict[OperationType, Callable],
             websocket_osc_config: Optional[dict] = None):
    """
    Initialize AsyncCommsThread for ControllerEngine.

    Parameters:
    - nng_hub_address: TCP/IPC address for NNG hub (e.g., "tcp://127.0.0.1:5555")
    - editor_callback: Callback for editor messages
    - node_operation_callback: Callback dictionary for received node operations
    - websocket_osc_config: Optional dict with WebSocket OSC listener config:
        - host: Host to bind to (default: "0.0.0.0")
        - port: Port to listen on (default: 9190)
        - node_id: Node identifier for NNG operations
    """
    super().__init__()

    # Initialize communicators
    Logger.debug('Initializing ControllerCommunications')
    self.editor_callback = editor_callback
    self.editor = Communicator(IpcAddress.EDITOR.value)
    self.hw_discovery = Communicator(IpcAddress.HWDISCOVERY.value)
    self.nodeconf = Communicator(IpcAddress.NODECONF.value)

    # Initialize OSC hub based on mode
    Logger.info(f'Initializing NNG hub: {nng_hub_address} in {NodesHub.Mode.LISTENER.value} mode')
    self.nng_hub = NodesHub(
        hub_address=nng_hub_address, mode=NodesHub.Mode.LISTENER
    )

    # Set operation callbacks
    self.nng_hub.set_receive_callbacks(node_operation_callback)

    # WebSocket OSC configuration
    self._ws_osc_config = websocket_osc_config or {}
    self._ws_osc_host = self._ws_osc_config.get('host', '0.0.0.0')
    self._ws_osc_port = self._ws_osc_config.get('port', 9190)
    self._node_id = self._ws_osc_config.get('node_id', 'controller')

    # WebSocket OSC router for message handling
    self._osc_router = WebSocketOscRouter()

    # Track connected WebSocket clients for status broadcast (bidirectional)
    self._ws_clients: set = set()

    # Command handlers (set by ControllerEngine)
    self._command_handlers: dict[str, Callable] = {}

    # Optional callback for new WebSocket client connections (late-join state dump)
    self._on_client_connect: Optional[Callable] = None

broadcast_osc(address, value)

Send an OSC status message to all connected WebSocket clients.

Call from ControllerEngine when status changes (running, armed, load, timecode). Thread-safe: schedules send on the comms event loop.

Parameters:

Name Type Description Default
address str

OSC address (e.g. '/engine/status/armed')

required
value Any

Value to send (str, int, or float)

required
Source code in src/cuemsengine/comms/ControllerCommunications.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def broadcast_osc(self, address: str, value: Any) -> None:
    """Send an OSC status message to all connected WebSocket clients.

    Call from ControllerEngine when status changes (running, armed, load, timecode).
    Thread-safe: schedules send on the comms event loop.

    Args:
        address: OSC address (e.g. '/engine/status/armed')
        value: Value to send (str, int, or float)
    """
    data = build_osc_message(address, value)
    if not data or not self._ws_clients:
        return
    async def _send_all():
        for ws in list(self._ws_clients):
            try:
                await ws.send(data)
            except Exception as e:
                Logger.debug(f"WebSocket broadcast to client failed: {e}")
    try:
        asyncio.run_coroutine_threadsafe(_send_all(), self.event_loop)
    except Exception as e:
        Logger.debug(f"Could not schedule status broadcast: {e}")

editor_listener() async

Editor listener (thread-safe).

Source code in src/cuemsengine/comms/ControllerCommunications.py
234
235
236
237
238
239
240
async def editor_listener(self):
    """Editor listener (thread-safe)."""
    Logger.info('Editor listener started')
    await self.editor.responder_connect()
    while not self.stop_requested:
        Logger.debug(f'waiting for editor message')
        await self.editor.responder_get_request(self.editor_callback)

register_command_handler(osc_path, handler, forward_to_nodes=True)

Register a handler for an OSC command path.

Parameters:

Name Type Description Default
osc_path str

The OSC address to handle (e.g., '/engine/command/go')

required
handler Callable[[Any], None]

Callback function to handle the command value

required
forward_to_nodes bool

If True, also forward the command to NodeEngine via NNG

True
Source code in src/cuemsengine/comms/ControllerCommunications.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def register_command_handler(self, osc_path: str, handler: Callable[[Any], None], 
                              forward_to_nodes: bool = True) -> None:
    """Register a handler for an OSC command path.

    Args:
        osc_path: The OSC address to handle (e.g., '/engine/command/go')
        handler: Callback function to handle the command value
        forward_to_nodes: If True, also forward the command to NodeEngine via NNG
    """
    self._command_handlers[osc_path] = {
        'handler': handler,
        'forward': forward_to_nodes
    }

    # Register with the OSC router
    self._osc_router.register(osc_path, lambda addr, args: self._handle_osc_command(addr, args))
    Logger.debug(f"Registered command handler for {osc_path} (forward={forward_to_nodes})")

register_osc_handler(osc_pattern, handler)

Register a generic OSC handler for a pattern (non-command messages).

Parameters:

Name Type Description Default
osc_pattern str

OSC address pattern (e.g., '/engine/players/*')

required
handler Callable[[str, list], None]

Callback function receiving (address, args)

required
Source code in src/cuemsengine/comms/ControllerCommunications.py
123
124
125
126
127
128
129
130
131
def register_osc_handler(self, osc_pattern: str, handler: Callable[[str, list], None]) -> None:
    """Register a generic OSC handler for a pattern (non-command messages).

    Args:
        osc_pattern: OSC address pattern (e.g., '/engine/players/*')
        handler: Callback function receiving (address, args)
    """
    self._osc_router.register(osc_pattern, handler)
    Logger.debug(f"Registered OSC handler for {osc_pattern}")

request_to_hwdiscovery(message, timeout=None)

Send a request to hardware discovery and get response (thread-safe).

Parameters: - message: Dictionary containing the request message - timeout: Optional timeout in seconds (defaults to self.timeout)

Returns: - dict: Response from hwdiscovery.send_request via run_coroutine method

Raises: - AttributeError: If hwdiscovery is not initialized

Source code in src/cuemsengine/comms/ControllerCommunications.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def request_to_hwdiscovery(self, message: dict, timeout: Optional[float] = None) -> dict:
    """
    Send a request to hardware discovery and get response (thread-safe).

    Parameters:
    - message: Dictionary containing the request message
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)

    Returns:
    - dict: Response from `hwdiscovery.send_request` via `run_coroutine` method

    Raises:
    - AttributeError: If `hwdiscovery` is not initialized
    """
    if not self.hw_discovery:
        raise AttributeError('hw_discovery communicator is not initialized')

    return self.run_coroutine(self.hw_discovery.send_request, message, timeout)

request_to_nodeconf(message, timeout=None)

Send a request to nodeconf and get response (thread-safe).

Parameters: - message: Dictionary containing the request message - timeout: Optional timeout in seconds (defaults to self.timeout)

Returns: - dict: Response from nodeconf.send_request via run_coroutine method

Raises: - AttributeError: If nodeconf is not initialized

Source code in src/cuemsengine/comms/ControllerCommunications.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
def request_to_nodeconf(self, message: dict, timeout: Optional[float] = None) -> dict:
    """
    Send a request to nodeconf and get response (thread-safe).

    Parameters:
    - message: Dictionary containing the request message
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)

    Returns:
    - dict: Response from `nodeconf.send_request` via `run_coroutine` method

    Raises:
    - AttributeError: If `nodeconf` is not initialized
    """
    if not self.nodeconf:
        raise AttributeError('nodeconf communicator is not initialized')

    return self.run_coroutine(self.nodeconf.send_request, message, timeout)

respond_to_editor(message, context) async

Respond to editor (thread-safe).

Source code in src/cuemsengine/comms/ControllerCommunications.py
242
243
244
245
async def respond_to_editor(self, message, context: Context):
    """Respond to editor (thread-safe)."""
    Logger.debug(f'Sending to editor: {message}, with context ')
    await context.asend(json.dumps(message).encode())

set_on_client_connect(callback)

Set callback for new WebSocket client connections.

The callback receives the websocket object and is awaited inside the connection handler (runs on the comms event loop).

Source code in src/cuemsengine/comms/ControllerCommunications.py
187
188
189
190
191
192
193
def set_on_client_connect(self, callback: Callable) -> None:
    """Set callback for new WebSocket client connections.

    The callback receives the websocket object and is awaited
    inside the connection handler (runs on the comms event loop).
    """
    self._on_client_connect = callback

NodeCommunications

Bases: AsyncCommsThread

Source code in src/cuemsengine/comms/NodeCommunications.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class NodeCommunications(AsyncCommsThread):
    def __init__(self, hub_address: str, node_id: str, 
                 command_callback: Optional[Callable[[str, Any], None]] = None):
        """
        Initialize AsyncCommsThread for NodeEngine.

        - Runs `OscNodesHub` in `DIALER` mode
        - Sends players to `ControllerEngine`
        - Receives COMMAND operations from ControllerEngine via NNG
        - Routes commands to NodeEngine handlers

        Parameters:
        - hub_address: TCP/IPC address for OSC hub (e.g., "tcp://127.0.0.1:5555")
        - node_id: Unique identifier for this node
        - command_callback: Optional callback for handling received commands.
                           Called with (command_name: str, value: Any)
        """
        super().__init__()
        self.nng_hub = NodesHub(
            hub_address, mode=NodesHub.Mode.DIALER
        )
        self.node_id = node_id
        self._command_callback = command_callback

        self.nng_hub.set_receive_callbacks({
            OperationType.COMMAND: self._handle_command_operation,
        })

    def set_command_callback(self, callback: Callable[[str, Any], None]) -> None:
        """Set the callback for handling received commands.

        Args:
            callback: Function to call when a command is received.
                     Called with (command_name: str, value: Any)
        """
        self._command_callback = callback
        Logger.debug(f"Command callback set in NodeCommunications")

    def create_all_tasks(self):
        """Create async tasks for node communications."""
        Logger.info('Starting all tasks in NodeCommunications')
        Logger.info(f'NNG hub mode: {self.nng_hub.mode}')
        Logger.info(f'NNG hub address: {self.nng_hub.address}')
        Logger.info(f'Command callbacks registered: {list(self.nng_hub._on_operation_received.keys()) if self.nng_hub._on_operation_received else "None"}')
        return [
            asyncio.create_task(self.nng_hub.start()),
            asyncio.create_task(self.nng_hub.start_message_receiver())
        ]

    def _handle_command_operation(self, operation: NodeOperation) -> None:
        """Handle a COMMAND operation received from ControllerEngine.

        IMPORTANT: Commands are executed in a separate thread to avoid blocking
        the NNG message receiver. Some commands like 'go' can block for the
        duration of cue playback, which would prevent receiving STOP/LOAD commands.

        Args:
            operation: The NodeOperation containing the command
        """
        if operation.type != OperationType.COMMAND:
            return

        # Cluster liveness probe — controller broadcasts ping, we reply pong.
        # Must be cheap and independent of project state. Reply is fire-and-
        # forget on the running event loop (we're already inside it via the
        # receiver coroutine).
        if operation.target == 'ping':
            pong = NodeOperation(
                type=OperationType.STATUS,
                action=ActionType.UPDATE,
                sender=self.node_id,
                target='pong',
                data={},
            )
            asyncio.create_task(self.nng_hub.send_operation(pong))
            Logger.debug(f"Replied pong to ping from {operation.sender}")
            return

        command_name = operation.target
        data = operation.data or {}
        value = data.get('value')
        address = data.get('address', f'/engine/command/{command_name}')

        Logger.info(f"Received command via NNG: {command_name} = {repr(value)}")

        if self._command_callback:
            # Execute command in a separate thread to avoid blocking the NNG receiver
            # This is critical because commands like 'go' block until cue playback completes
            import threading
            def run_command():
                try:
                    self._command_callback(command_name, value, address)
                except Exception as e:
                    Logger.error(f"Error executing command callback for {command_name}: {e}")

            thread = threading.Thread(
                target=run_command,
                name=f"NNG-Command-{command_name}",
                daemon=True
            )
            thread.start()
            Logger.debug(f"Started command thread: {thread.name}")
        else:
            Logger.warning(f"No command callback set for NodeCommunications")

    #########################
    # Nng comms to Controller
    #########################
    def send_operation(self, operation: NodeOperation, timeout: Optional[float] = None):
        """
        Send a NodeOperation to the controller (thread-safe).

        Parameters:
        - operation: NodeOperation to send
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        return self.run_coroutine(self.nng_hub.send_operation, operation, timeout)

    def add_player(self, player_id: str, data: dict, timeout: Optional[float] = None):
        """
        Add a player to the OSC hub (thread-safe).

        Parameters:
        - player_id: Unique identifier for the player
        - data: Player data to send
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.PLAYER,
            action=ActionType.ADD,
            sender=self.node_id,
            target=player_id,
            data=data
        )
        return self.send_operation(operation, timeout)

    def remove_player(self, player_id: str, timeout: Optional[float] = None):
        """
        Remove a player from the OSC hub (thread-safe).

        Parameters:
        - player_id: Unique identifier of the player to remove
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.PLAYER,
            action=ActionType.REMOVE,
            sender=self.node_id,
            target=player_id,
            data=None
        )
        return self.send_operation(operation, timeout)

    def add_cue(self, cue_id: str, offset: str, timeout: Optional[float] = None):
        """
        Add a cue to the OSC hub (thread-safe).

        Parameters:
        - cue_id: Unique identifier of the cue to add
        - data: Data to send
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.CUE,
            action=ActionType.ADD,
            sender=self.node_id,
            target=cue_id,
            data={
                'id': cue_id,
                'offset': offset
            }
        )
        return self.send_operation(operation, timeout)

    def remove_cue(self, cue_id: str, timeout: Optional[float] = None):
        """
        Remove a cue from the OSC hub (thread-safe).

        Parameters:
        - cue_id: Unique identifier of the cue to remove
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.CUE,
            action=ActionType.REMOVE,
            sender=self.node_id,
            target=cue_id,
            data={'id': cue_id}
        )
        return self.send_operation(operation, timeout)

    def update_nextcue(self, cue_id: str, timeout: Optional[float] = None):
        """Send a nextcue status update to the controller (thread-safe).

        Parameters:
        - cue_id: UUID of the next cue (or empty string when no next cue)
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.STATUS,
            action=ActionType.UPDATE,
            sender=self.node_id,
            target='nextcue',
            data={'nextcue': cue_id}
        )
        return self.send_operation(operation, timeout)

    def update_cue(self, cue_id: str, percentage: int, timeout: Optional[float] = None):
        """Send a cue percentage progress update to the controller (thread-safe).

        Used during playback to report in-progress status (values 1-99).

        Callers MUST throttle calls to CUE_STATUS_UPDATE_HZ (defined in loop_cue.py)
        before invoking this method to limit NNG traffic over the network in
        multi-node deployments (Tier 1 of the two-tier throttle strategy).
        The controller applies a second throttle (CUE_BROADCAST_MIN_INTERVAL) before
        forwarding to the UI via WebSocket (Tier 2).

        Parameters:
        - cue_id: Unique identifier of the cue being played
        - percentage: Playback progress (1-99); 1 = started, 99 = almost done
        - timeout: Optional timeout in seconds (defaults to `self.timeout`)
        """
        operation = NodeOperation(
            type=OperationType.CUE,
            action=ActionType.UPDATE,
            sender=self.node_id,
            target=cue_id,
            data={'id': cue_id, 'percentage': percentage}
        )
        return self.send_operation(operation, timeout)

__init__(hub_address, node_id, command_callback=None)

Initialize AsyncCommsThread for NodeEngine.

  • Runs OscNodesHub in DIALER mode
  • Sends players to ControllerEngine
  • Receives COMMAND operations from ControllerEngine via NNG
  • Routes commands to NodeEngine handlers

Parameters: - hub_address: TCP/IPC address for OSC hub (e.g., "tcp://127.0.0.1:5555") - node_id: Unique identifier for this node - command_callback: Optional callback for handling received commands. Called with (command_name: str, value: Any)

Source code in src/cuemsengine/comms/NodeCommunications.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(self, hub_address: str, node_id: str, 
             command_callback: Optional[Callable[[str, Any], None]] = None):
    """
    Initialize AsyncCommsThread for NodeEngine.

    - Runs `OscNodesHub` in `DIALER` mode
    - Sends players to `ControllerEngine`
    - Receives COMMAND operations from ControllerEngine via NNG
    - Routes commands to NodeEngine handlers

    Parameters:
    - hub_address: TCP/IPC address for OSC hub (e.g., "tcp://127.0.0.1:5555")
    - node_id: Unique identifier for this node
    - command_callback: Optional callback for handling received commands.
                       Called with (command_name: str, value: Any)
    """
    super().__init__()
    self.nng_hub = NodesHub(
        hub_address, mode=NodesHub.Mode.DIALER
    )
    self.node_id = node_id
    self._command_callback = command_callback

    self.nng_hub.set_receive_callbacks({
        OperationType.COMMAND: self._handle_command_operation,
    })

add_cue(cue_id, offset, timeout=None)

Add a cue to the OSC hub (thread-safe).

Parameters: - cue_id: Unique identifier of the cue to add - data: Data to send - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def add_cue(self, cue_id: str, offset: str, timeout: Optional[float] = None):
    """
    Add a cue to the OSC hub (thread-safe).

    Parameters:
    - cue_id: Unique identifier of the cue to add
    - data: Data to send
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.CUE,
        action=ActionType.ADD,
        sender=self.node_id,
        target=cue_id,
        data={
            'id': cue_id,
            'offset': offset
        }
    )
    return self.send_operation(operation, timeout)

add_player(player_id, data, timeout=None)

Add a player to the OSC hub (thread-safe).

Parameters: - player_id: Unique identifier for the player - data: Player data to send - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def add_player(self, player_id: str, data: dict, timeout: Optional[float] = None):
    """
    Add a player to the OSC hub (thread-safe).

    Parameters:
    - player_id: Unique identifier for the player
    - data: Player data to send
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.PLAYER,
        action=ActionType.ADD,
        sender=self.node_id,
        target=player_id,
        data=data
    )
    return self.send_operation(operation, timeout)

create_all_tasks()

Create async tasks for node communications.

Source code in src/cuemsengine/comms/NodeCommunications.py
52
53
54
55
56
57
58
59
60
61
def create_all_tasks(self):
    """Create async tasks for node communications."""
    Logger.info('Starting all tasks in NodeCommunications')
    Logger.info(f'NNG hub mode: {self.nng_hub.mode}')
    Logger.info(f'NNG hub address: {self.nng_hub.address}')
    Logger.info(f'Command callbacks registered: {list(self.nng_hub._on_operation_received.keys()) if self.nng_hub._on_operation_received else "None"}')
    return [
        asyncio.create_task(self.nng_hub.start()),
        asyncio.create_task(self.nng_hub.start_message_receiver())
    ]

remove_cue(cue_id, timeout=None)

Remove a cue from the OSC hub (thread-safe).

Parameters: - cue_id: Unique identifier of the cue to remove - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def remove_cue(self, cue_id: str, timeout: Optional[float] = None):
    """
    Remove a cue from the OSC hub (thread-safe).

    Parameters:
    - cue_id: Unique identifier of the cue to remove
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.CUE,
        action=ActionType.REMOVE,
        sender=self.node_id,
        target=cue_id,
        data={'id': cue_id}
    )
    return self.send_operation(operation, timeout)

remove_player(player_id, timeout=None)

Remove a player from the OSC hub (thread-safe).

Parameters: - player_id: Unique identifier of the player to remove - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def remove_player(self, player_id: str, timeout: Optional[float] = None):
    """
    Remove a player from the OSC hub (thread-safe).

    Parameters:
    - player_id: Unique identifier of the player to remove
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.PLAYER,
        action=ActionType.REMOVE,
        sender=self.node_id,
        target=player_id,
        data=None
    )
    return self.send_operation(operation, timeout)

send_operation(operation, timeout=None)

Send a NodeOperation to the controller (thread-safe).

Parameters: - operation: NodeOperation to send - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
122
123
124
125
126
127
128
129
130
def send_operation(self, operation: NodeOperation, timeout: Optional[float] = None):
    """
    Send a NodeOperation to the controller (thread-safe).

    Parameters:
    - operation: NodeOperation to send
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    return self.run_coroutine(self.nng_hub.send_operation, operation, timeout)

set_command_callback(callback)

Set the callback for handling received commands.

Parameters:

Name Type Description Default
callback Callable[[str, Any], None]

Function to call when a command is received. Called with (command_name: str, value: Any)

required
Source code in src/cuemsengine/comms/NodeCommunications.py
42
43
44
45
46
47
48
49
50
def set_command_callback(self, callback: Callable[[str, Any], None]) -> None:
    """Set the callback for handling received commands.

    Args:
        callback: Function to call when a command is received.
                 Called with (command_name: str, value: Any)
    """
    self._command_callback = callback
    Logger.debug(f"Command callback set in NodeCommunications")

update_cue(cue_id, percentage, timeout=None)

Send a cue percentage progress update to the controller (thread-safe).

Used during playback to report in-progress status (values 1-99).

Callers MUST throttle calls to CUE_STATUS_UPDATE_HZ (defined in loop_cue.py) before invoking this method to limit NNG traffic over the network in multi-node deployments (Tier 1 of the two-tier throttle strategy). The controller applies a second throttle (CUE_BROADCAST_MIN_INTERVAL) before forwarding to the UI via WebSocket (Tier 2).

Parameters: - cue_id: Unique identifier of the cue being played - percentage: Playback progress (1-99); 1 = started, 99 = almost done - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def update_cue(self, cue_id: str, percentage: int, timeout: Optional[float] = None):
    """Send a cue percentage progress update to the controller (thread-safe).

    Used during playback to report in-progress status (values 1-99).

    Callers MUST throttle calls to CUE_STATUS_UPDATE_HZ (defined in loop_cue.py)
    before invoking this method to limit NNG traffic over the network in
    multi-node deployments (Tier 1 of the two-tier throttle strategy).
    The controller applies a second throttle (CUE_BROADCAST_MIN_INTERVAL) before
    forwarding to the UI via WebSocket (Tier 2).

    Parameters:
    - cue_id: Unique identifier of the cue being played
    - percentage: Playback progress (1-99); 1 = started, 99 = almost done
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.CUE,
        action=ActionType.UPDATE,
        sender=self.node_id,
        target=cue_id,
        data={'id': cue_id, 'percentage': percentage}
    )
    return self.send_operation(operation, timeout)

update_nextcue(cue_id, timeout=None)

Send a nextcue status update to the controller (thread-safe).

Parameters: - cue_id: UUID of the next cue (or empty string when no next cue) - timeout: Optional timeout in seconds (defaults to self.timeout)

Source code in src/cuemsengine/comms/NodeCommunications.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def update_nextcue(self, cue_id: str, timeout: Optional[float] = None):
    """Send a nextcue status update to the controller (thread-safe).

    Parameters:
    - cue_id: UUID of the next cue (or empty string when no next cue)
    - timeout: Optional timeout in seconds (defaults to `self.timeout`)
    """
    operation = NodeOperation(
        type=OperationType.STATUS,
        action=ActionType.UPDATE,
        sender=self.node_id,
        target='nextcue',
        data={'nextcue': cue_id}
    )
    return self.send_operation(operation, timeout)

ActionType

Bases: Enum

The type of action to be performed.

Source code in src/cuemsengine/comms/NodesHub.py
14
15
16
17
18
class ActionType(Enum):
    """The type of action to be performed."""
    ADD = "add"
    REMOVE = "remove"
    UPDATE = "update"

NodeOperation dataclass

Represents an operation to be performed from/to a node.

Source code in src/cuemsengine/comms/NodesHub.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@dataclass
class NodeOperation:
    """Represents an operation to be performed from/to a node."""
    type: OperationType
    action: ActionType
    sender: str
    target: str
    data: dict

    def duplicate(self):
        return self.__class__(
            type=self.type,
            action=self.action,
            sender=self.sender,
            target=self.target,
            data=self.data if self.data else {}
        )

    @staticmethod
    def from_message(message: Message):
        """
        Create a NodeOperation from a message.
        Uses sender from message data (node_id) rather than NNG address.
        """
        return NodeOperation(
            type=OperationType(message.data["type"]),
            action=ActionType(message.data["action"]),
            sender=message.data["sender"],
            target=message.data["target"],
            data=message.data["data"]
        )

    def __dict__(self):
        return {
            "type": self.type.value,
            "action": self.action.value,
            "sender": self.sender,
            "target": self.target,
            "data": self.data
        }

    def __str__(self):
        return f"{type(self).__name__} by {self.sender}: {self.action.value} on {self.type.value} {self.target} (with{'out' if not self.data else ''} data)"

from_message(message) staticmethod

Create a NodeOperation from a message. Uses sender from message data (node_id) rather than NNG address.

Source code in src/cuemsengine/comms/NodesHub.py
45
46
47
48
49
50
51
52
53
54
55
56
57
@staticmethod
def from_message(message: Message):
    """
    Create a NodeOperation from a message.
    Uses sender from message data (node_id) rather than NNG address.
    """
    return NodeOperation(
        type=OperationType(message.data["type"]),
        action=ActionType(message.data["action"]),
        sender=message.data["sender"],
        target=message.data["target"],
        data=message.data["data"]
    )

NodesHub

Bases: NngBusHub

Extension of NngBusHub for transmitting pyossia player node structures.

Nodes send player structures (player_id + root_node) to the controller. Players are transmitted one by one as they become available. This class handles transmission only - storage is left to the user.

Source code in src/cuemsengine/comms/NodesHub.py
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
class NodesHub(NngBusHub):
    """
    Extension of NngBusHub for transmitting pyossia player node structures.

    Nodes send player structures (player_id + root_node) to the controller.
    Players are transmitted one by one as they become available.
    This class handles transmission only - storage is left to the user.
    """

    def __init__(self, hub_address: str, mode=NngBusHub.Mode.LISTENER):
        """
        Initialize NodesHub.

        Parameters:
        - hub_address: The address for the bus communication
        - mode: LISTENER or DIALER mode

        Note: We use the base class queues (self.outgoing and self.incoming) to send and receive Message objects that are translated into NodeOperations.
        """
        super().__init__(hub_address, mode)

        # Callback for when operations are received
        self._on_operation_received: Optional[dict[OperationType, Callable]] = None

    #########################
    # Nodes communication
    #########################
    async def get_operation(self) -> NodeOperation | None:
        """
        Get the next operation from the queue and return it as a NodeOperation object.
        """
        message = await self.get_message()
        if not message:
            return None
        return NodeOperation.from_message(message)

    async def send_operation(self, operation: NodeOperation):
        """
        Send an operation to the send queue.
        """
        message = Message(sender=operation.sender, data=operation.__dict__())
        await self.send_message(message)
        Logger.debug(f"Queued {operation.action.value} operation for {operation.type.value} {operation.target}")

    def set_receive_callbacks(self, callback_dict: dict[OperationType, Callable]):
        """
        Set the callbacks to be invoked when nodes send operations.

        The keys of the dictionary are the operation types to perform, and the values are the callbacks.
        The callbacks must take the following argument: (operation: NodeOperation)
        """
        self._on_operation_received = callback_dict

    async def start_message_receiver(self):
        """
        Continuously receive messages and invoke callback (controller side).

        This runs in a loop, receiving messages and invoking the callback
        if set. Should be run as a background task.

        The callback receives: (sender, message)
        """
        if not self._on_operation_received:
            Logger.warning("No operation callbacks set")
            return

        while True:
            try:
                operation = await self.get_operation()

                if operation:
                    Logger.debug(f"Received {operation}")

                    # Invoke callback if set (lookup by enum, not string value)
                    message_function = self._on_operation_received.get(operation.type)
                    if message_function:
                        if asyncio.iscoroutinefunction(message_function):
                            await message_function(operation)
                        else:
                            message_function(operation)
                await asyncio.sleep(0.01)  # Prevent tight loop

            except Exception as e:
                Logger.error(f"{type(e)} handling {operation}: {e}")
                await asyncio.sleep(0.1)  # Back off on error

__init__(hub_address, mode=NngBusHub.Mode.LISTENER)

Initialize NodesHub.

Parameters: - hub_address: The address for the bus communication - mode: LISTENER or DIALER mode

Note: We use the base class queues (self.outgoing and self.incoming) to send and receive Message objects that are translated into NodeOperations.

Source code in src/cuemsengine/comms/NodesHub.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(self, hub_address: str, mode=NngBusHub.Mode.LISTENER):
    """
    Initialize NodesHub.

    Parameters:
    - hub_address: The address for the bus communication
    - mode: LISTENER or DIALER mode

    Note: We use the base class queues (self.outgoing and self.incoming) to send and receive Message objects that are translated into NodeOperations.
    """
    super().__init__(hub_address, mode)

    # Callback for when operations are received
    self._on_operation_received: Optional[dict[OperationType, Callable]] = None

get_operation() async

Get the next operation from the queue and return it as a NodeOperation object.

Source code in src/cuemsengine/comms/NodesHub.py
 98
 99
100
101
102
103
104
105
async def get_operation(self) -> NodeOperation | None:
    """
    Get the next operation from the queue and return it as a NodeOperation object.
    """
    message = await self.get_message()
    if not message:
        return None
    return NodeOperation.from_message(message)

send_operation(operation) async

Send an operation to the send queue.

Source code in src/cuemsengine/comms/NodesHub.py
107
108
109
110
111
112
113
async def send_operation(self, operation: NodeOperation):
    """
    Send an operation to the send queue.
    """
    message = Message(sender=operation.sender, data=operation.__dict__())
    await self.send_message(message)
    Logger.debug(f"Queued {operation.action.value} operation for {operation.type.value} {operation.target}")

set_receive_callbacks(callback_dict)

Set the callbacks to be invoked when nodes send operations.

The keys of the dictionary are the operation types to perform, and the values are the callbacks. The callbacks must take the following argument: (operation: NodeOperation)

Source code in src/cuemsengine/comms/NodesHub.py
115
116
117
118
119
120
121
122
def set_receive_callbacks(self, callback_dict: dict[OperationType, Callable]):
    """
    Set the callbacks to be invoked when nodes send operations.

    The keys of the dictionary are the operation types to perform, and the values are the callbacks.
    The callbacks must take the following argument: (operation: NodeOperation)
    """
    self._on_operation_received = callback_dict

start_message_receiver() async

Continuously receive messages and invoke callback (controller side).

This runs in a loop, receiving messages and invoking the callback if set. Should be run as a background task.

The callback receives: (sender, message)

Source code in src/cuemsengine/comms/NodesHub.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
async def start_message_receiver(self):
    """
    Continuously receive messages and invoke callback (controller side).

    This runs in a loop, receiving messages and invoking the callback
    if set. Should be run as a background task.

    The callback receives: (sender, message)
    """
    if not self._on_operation_received:
        Logger.warning("No operation callbacks set")
        return

    while True:
        try:
            operation = await self.get_operation()

            if operation:
                Logger.debug(f"Received {operation}")

                # Invoke callback if set (lookup by enum, not string value)
                message_function = self._on_operation_received.get(operation.type)
                if message_function:
                    if asyncio.iscoroutinefunction(message_function):
                        await message_function(operation)
                    else:
                        message_function(operation)
            await asyncio.sleep(0.01)  # Prevent tight loop

        except Exception as e:
            Logger.error(f"{type(e)} handling {operation}: {e}")
            await asyncio.sleep(0.1)  # Back off on error

OperationType

Bases: Enum

The type of operation to be performed.

Source code in src/cuemsengine/comms/NodesHub.py
20
21
22
23
24
25
class OperationType(Enum):
    """The type of operation to be performed."""
    CUE = "cue"
    PLAYER = "player"
    COMMAND = "command"  # For ControllerEngine → NodeEngine command forwarding
    STATUS = "status"    # For NodeEngine → ControllerEngine status updates