Comms
AsyncCommsThread
Bases: Thread
Base class for asynchronous communication threads.
This class extends Thread to run an asyncio event loop in a separate daemon
thread. Subclasses must implement create_all_tasks() to define the async
tasks that will be executed concurrently.
The event loop runs in the background thread and can be safely accessed from
other threads using run_coroutine().
Attributes:
| Name | Type | Description |
|---|---|---|
thread_name |
str
|
Base name for the thread. |
name |
str
|
Full thread name with 'AsyncComms-' prefix. |
timeout |
float
|
Default timeout in seconds for coroutine execution. |
stop_requested |
bool
|
Flag indicating whether thread should stop. |
send_contexts |
List
|
List of send contexts (subclass-specific). |
event_loop |
AbstractEventLoop
|
The asyncio event loop running
in this thread. None until |
Example
Subclass implementation:
class MyAsyncComms(AsyncCommsThread):
async def my_task(self):
# Do async work
pass
def create_all_tasks(self):
return [asyncio.create_task(self.my_task())]
Source code in src/cuemsengine/comms/AsyncCommsThread.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | |
__init__(**kwargs)
Initialize the AsyncCommsThread.
Creates a daemon thread that will run an asyncio event loop. The thread is configured with a name and optional timeout for coroutine execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
**kwargs
|
Keyword arguments. - thread_name (str, optional): Base name for the thread. Defaults to the name of the subclass. - timeout (float, optional): Timeout in seconds for coroutine execution. Defaults to TIMEOUT (15 seconds). |
{}
|
Note
The thread is created as a daemon thread, so it will automatically terminate when the main program exits.
Source code in src/cuemsengine/comms/AsyncCommsThread.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | |
create_all_tasks()
Create all async tasks to run concurrently.
Subclasses must implement this method to return a list of asyncio tasks that should run concurrently in the event loop. These tasks typically handle various communication channels or services.
Returns:
| Type | Description |
|---|---|
List[Task]
|
List[asyncio.Task]: List of asyncio tasks to run concurrently. |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
If not implemented by subclass. |
Example
def create_all_tasks(self):
return [
asyncio.create_task(self.listener_task()),
asyncio.create_task(self.sender_task()),
]
Source code in src/cuemsengine/comms/AsyncCommsThread.py
154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 | |
run()
Thread entry point.
Creates a new asyncio event loop, schedules the async communications task, and runs the event loop forever. This method is called automatically when the thread is started.
The event loop will continue running until stop() is called, which
will cause the loop to stop and the thread to terminate.
Source code in src/cuemsengine/comms/AsyncCommsThread.py
70 71 72 73 74 75 76 77 78 79 80 81 82 83 | |
run_asyncio_comms()
async
Run all async communication tasks.
Creates all tasks from create_all_tasks() and waits for them to
complete. Tasks run concurrently and exceptions are captured rather
than immediately raised (via return_exceptions=True).
This method runs until all tasks complete or until stop_async() is
called.
Note
Subclasses should implement create_all_tasks() to return a list
of asyncio tasks that need to run concurrently.
Source code in src/cuemsengine/comms/AsyncCommsThread.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | |
run_coroutine(coroutine, message, timeout=None)
Run a coroutine in the event loop from another thread.
Thread-safe method to execute a coroutine function in this thread's event loop. The coroutine is called with the provided message and the result is returned synchronously, with a timeout.
This is the primary way to interact with the async event loop from other threads (e.g., the main thread).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
coroutine
|
Callable
|
A coroutine function to execute. Must be a coroutine function (not a regular function). |
required |
message
|
dict
|
Dictionary to pass as argument to the coroutine. |
required |
timeout
|
Optional[float]
|
Optional timeout in seconds (defaults to self.timeout). -1 means no timeout. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
Any |
Any
|
The return value from the coroutine. |
Raises:
| Type | Description |
|---|---|
AttributeError
|
If the event loop has not been initialized (thread not started). |
TypeError
|
If |
TimeoutError
|
If the coroutine does not complete within |
Exception
|
If the coroutine raises an exception, it is re-raised here. |
Example
async def send_message(msg: dict) -> dict:
# Async operation
return {'status': 'ok'}
# From another thread:
result = comms_thread.run_coroutine(send_message, {'data': 'test'})
Source code in src/cuemsengine/comms/AsyncCommsThread.py
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | |
stop()
Stop the thread and event loop.
Thread-safe method that signals the thread to stop and schedules the async stop coroutine to run in the event loop. This will cause the event loop to stop and the thread to terminate.
Note
This method can be called from any thread. It does not wait for the thread to fully terminate.
Source code in src/cuemsengine/comms/AsyncCommsThread.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 | |
stop_async()
async
Async stop handler.
Cancels all running tasks, waits for cleanup, then stops the event loop.
This is called internally by stop() and should not be called directly.
Note
This coroutine must run in the same event loop that it stops.
Source code in src/cuemsengine/comms/AsyncCommsThread.py
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | |
Utilites for communications from ControllerEngine and NodeEngine.
ControllerCommunications
Bases: AsyncCommsThread
Communications class for ControllerEngine.
Handles: - Editor messages - Player operation messages - Nodeconf messages - HWDiscovery messages - WebSocket OSC messages (commands from UI)
Source code in src/cuemsengine/comms/ControllerCommunications.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | |
__init__(nng_hub_address, editor_callback, node_operation_callback, websocket_osc_config=None)
Initialize AsyncCommsThread for ControllerEngine.
Parameters: - nng_hub_address: TCP/IPC address for NNG hub (e.g., "tcp://127.0.0.1:5555") - editor_callback: Callback for editor messages - node_operation_callback: Callback dictionary for received node operations - websocket_osc_config: Optional dict with WebSocket OSC listener config: - host: Host to bind to (default: "0.0.0.0") - port: Port to listen on (default: 9190) - node_id: Node identifier for NNG operations
Source code in src/cuemsengine/comms/ControllerCommunications.py
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | |
broadcast_osc(address, value)
Send an OSC status message to all connected WebSocket clients.
Call from ControllerEngine when status changes (running, armed, load, timecode). Thread-safe: schedules send on the comms event loop.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
address
|
str
|
OSC address (e.g. '/engine/status/armed') |
required |
value
|
Any
|
Value to send (str, int, or float) |
required |
Source code in src/cuemsengine/comms/ControllerCommunications.py
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 | |
editor_listener()
async
Editor listener (thread-safe).
Source code in src/cuemsengine/comms/ControllerCommunications.py
234 235 236 237 238 239 240 | |
register_command_handler(osc_path, handler, forward_to_nodes=True)
Register a handler for an OSC command path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
osc_path
|
str
|
The OSC address to handle (e.g., '/engine/command/go') |
required |
handler
|
Callable[[Any], None]
|
Callback function to handle the command value |
required |
forward_to_nodes
|
bool
|
If True, also forward the command to NodeEngine via NNG |
True
|
Source code in src/cuemsengine/comms/ControllerCommunications.py
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 | |
register_osc_handler(osc_pattern, handler)
Register a generic OSC handler for a pattern (non-command messages).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
osc_pattern
|
str
|
OSC address pattern (e.g., '/engine/players/*') |
required |
handler
|
Callable[[str, list], None]
|
Callback function receiving (address, args) |
required |
Source code in src/cuemsengine/comms/ControllerCommunications.py
123 124 125 126 127 128 129 130 131 | |
request_to_hwdiscovery(message, timeout=None)
Send a request to hardware discovery and get response (thread-safe).
Parameters:
- message: Dictionary containing the request message
- timeout: Optional timeout in seconds (defaults to self.timeout)
Returns:
- dict: Response from hwdiscovery.send_request via run_coroutine method
Raises:
- AttributeError: If hwdiscovery is not initialized
Source code in src/cuemsengine/comms/ControllerCommunications.py
289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | |
request_to_nodeconf(message, timeout=None)
Send a request to nodeconf and get response (thread-safe).
Parameters:
- message: Dictionary containing the request message
- timeout: Optional timeout in seconds (defaults to self.timeout)
Returns:
- dict: Response from nodeconf.send_request via run_coroutine method
Raises:
- AttributeError: If nodeconf is not initialized
Source code in src/cuemsengine/comms/ControllerCommunications.py
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 | |
respond_to_editor(message, context)
async
Respond to editor (thread-safe).
Source code in src/cuemsengine/comms/ControllerCommunications.py
242 243 244 245 | |
set_on_client_connect(callback)
Set callback for new WebSocket client connections.
The callback receives the websocket object and is awaited inside the connection handler (runs on the comms event loop).
Source code in src/cuemsengine/comms/ControllerCommunications.py
187 188 189 190 191 192 193 | |
NodeCommunications
Bases: AsyncCommsThread
Source code in src/cuemsengine/comms/NodeCommunications.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | |
__init__(hub_address, node_id, command_callback=None)
Initialize AsyncCommsThread for NodeEngine.
- Runs
OscNodesHubinDIALERmode - Sends players to
ControllerEngine - Receives COMMAND operations from ControllerEngine via NNG
- Routes commands to NodeEngine handlers
Parameters: - hub_address: TCP/IPC address for OSC hub (e.g., "tcp://127.0.0.1:5555") - node_id: Unique identifier for this node - command_callback: Optional callback for handling received commands. Called with (command_name: str, value: Any)
Source code in src/cuemsengine/comms/NodeCommunications.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | |
add_cue(cue_id, offset, timeout=None)
Add a cue to the OSC hub (thread-safe).
Parameters:
- cue_id: Unique identifier of the cue to add
- data: Data to send
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | |
add_player(player_id, data, timeout=None)
Add a player to the OSC hub (thread-safe).
Parameters:
- player_id: Unique identifier for the player
- data: Player data to send
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | |
create_all_tasks()
Create async tasks for node communications.
Source code in src/cuemsengine/comms/NodeCommunications.py
52 53 54 55 56 57 58 59 60 61 | |
remove_cue(cue_id, timeout=None)
Remove a cue from the OSC hub (thread-safe).
Parameters:
- cue_id: Unique identifier of the cue to remove
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 | |
remove_player(player_id, timeout=None)
Remove a player from the OSC hub (thread-safe).
Parameters:
- player_id: Unique identifier of the player to remove
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | |
send_operation(operation, timeout=None)
Send a NodeOperation to the controller (thread-safe).
Parameters:
- operation: NodeOperation to send
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
122 123 124 125 126 127 128 129 130 | |
set_command_callback(callback)
Set the callback for handling received commands.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable[[str, Any], None]
|
Function to call when a command is received. Called with (command_name: str, value: Any) |
required |
Source code in src/cuemsengine/comms/NodeCommunications.py
42 43 44 45 46 47 48 49 50 | |
update_cue(cue_id, percentage, timeout=None)
Send a cue percentage progress update to the controller (thread-safe).
Used during playback to report in-progress status (values 1-99).
Callers MUST throttle calls to CUE_STATUS_UPDATE_HZ (defined in loop_cue.py) before invoking this method to limit NNG traffic over the network in multi-node deployments (Tier 1 of the two-tier throttle strategy). The controller applies a second throttle (CUE_BROADCAST_MIN_INTERVAL) before forwarding to the UI via WebSocket (Tier 2).
Parameters:
- cue_id: Unique identifier of the cue being played
- percentage: Playback progress (1-99); 1 = started, 99 = almost done
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 | |
update_nextcue(cue_id, timeout=None)
Send a nextcue status update to the controller (thread-safe).
Parameters:
- cue_id: UUID of the next cue (or empty string when no next cue)
- timeout: Optional timeout in seconds (defaults to self.timeout)
Source code in src/cuemsengine/comms/NodeCommunications.py
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 | |
ActionType
Bases: Enum
The type of action to be performed.
Source code in src/cuemsengine/comms/NodesHub.py
14 15 16 17 18 | |
NodeOperation
dataclass
Represents an operation to be performed from/to a node.
Source code in src/cuemsengine/comms/NodesHub.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | |
from_message(message)
staticmethod
Create a NodeOperation from a message. Uses sender from message data (node_id) rather than NNG address.
Source code in src/cuemsengine/comms/NodesHub.py
45 46 47 48 49 50 51 52 53 54 55 56 57 | |
NodesHub
Bases: NngBusHub
Extension of NngBusHub for transmitting pyossia player node structures.
Nodes send player structures (player_id + root_node) to the controller. Players are transmitted one by one as they become available. This class handles transmission only - storage is left to the user.
Source code in src/cuemsengine/comms/NodesHub.py
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | |
__init__(hub_address, mode=NngBusHub.Mode.LISTENER)
Initialize NodesHub.
Parameters: - hub_address: The address for the bus communication - mode: LISTENER or DIALER mode
Note: We use the base class queues (self.outgoing and self.incoming) to send and receive Message objects that are translated into NodeOperations.
Source code in src/cuemsengine/comms/NodesHub.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 | |
get_operation()
async
Get the next operation from the queue and return it as a NodeOperation object.
Source code in src/cuemsengine/comms/NodesHub.py
98 99 100 101 102 103 104 105 | |
send_operation(operation)
async
Send an operation to the send queue.
Source code in src/cuemsengine/comms/NodesHub.py
107 108 109 110 111 112 113 | |
set_receive_callbacks(callback_dict)
Set the callbacks to be invoked when nodes send operations.
The keys of the dictionary are the operation types to perform, and the values are the callbacks. The callbacks must take the following argument: (operation: NodeOperation)
Source code in src/cuemsengine/comms/NodesHub.py
115 116 117 118 119 120 121 122 | |
start_message_receiver()
async
Continuously receive messages and invoke callback (controller side).
This runs in a loop, receiving messages and invoking the callback if set. Should be run as a background task.
The callback receives: (sender, message)
Source code in src/cuemsengine/comms/NodesHub.py
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | |
OperationType
Bases: Enum
The type of operation to be performed.
Source code in src/cuemsengine/comms/NodesHub.py
20 21 22 23 24 25 | |