Skip to content

Osc

OssiaNodes

Bases: object

Manage a collection of OSC nodes.

Internal static methods allow to
  • add nodes
  • remove nodes
  • set node parameters
  • set node values
  • get node values
  • set endpoints (nodes with parameters)
Multiple endpoints can be set simultaenously with
  • list of paths.
  • dictionary of paths (k) and parameter arguments (v)
Parameter arguments must be lists containing
  • pyossia.ValueType
  • callback function (optional)
  • initial / default value (optional)
  • Note: to set a parameter value without a callback, pass None as the second argument
Source code in src/cuemsengine/osc/OssiaNodes.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class OssiaNodes(object):
    """Manage a collection of OSC nodes.

    Internal static methods allow to:
        - add nodes
        - remove nodes
        - set node parameters
        - set node values
        - get node values
        - set endpoints (nodes with parameters)

    Multiple endpoints can be set simultaenously with:
        - list of paths.
        - dictionary of paths (k) and parameter arguments (v)

    Parameter arguments must be lists containing:
        - `pyossia.ValueType`
        - callback function (*optional*)
        - initial / default value (*optional*)
        - **Note**: to set a parameter value without a callback, pass None as the second argument

    """
    def __init__(self):
        self.device = None
        self.nodes = {}


    def iterate_on_children(self, node):
        for child in node.children():
            print(str(child))
            self.iterate_on_children(child)

    def set_node(self, path: str):
        """Add a new node to the device
            Node memory address is stored in self.nodes[path]
            and must be kept to access the node later
        """
        if not self.device:
            raise AttributeError("No device found")
        try:
            self.nodes[path] = self.device.add_node(path)
        except AttributeError:
            self.nodes[path] = self.device.root_node.add_node(path)

    def get_node(self, path: str):
        """Get a node from the collection
        """
        return self.nodes[path]

    def remove_node(self, path: str):
        """Remove a node from the collection and all its children
        """
        if not path or path.strip('/') == '':
            return
        self.device.root_node.remove_child(path)
        children = [k for k in self.nodes.keys() if str(k).startswith(path)]
        for key in children:
            del self.nodes[str(key)]

    def remove_device(self) -> None:
        """Remove the device and all nodes from the collection
        """
        node_keys = list(self.nodes.keys())
        for node in node_keys:
            self.remove_node(node)
        self.nodes = {}
        del self.device
        sleep(CLEANUP_DELAY)
        self.device = None

    @staticmethod
    def set_parameter(node: Node, value_type, callback: Callable = None, value = None, repetition_filter = True):
        """Set a parameter to a node
        """
        if not isinstance(value_type, ValueType):
            raise ValueError("value_type must be a pyossia.ValueType")
        _ = node.create_parameter(value_type)
        # Impulse parameters are fire-and-forget triggers — RepetitionFilter
        # must always be OFF, otherwise ossia silently drops repeated sends.
        if value_type == ValueType.Impulse:
            repetition_filter = False
        _.repetition_filter = ossia.RepetitionFilter.On if repetition_filter else ossia.RepetitionFilter.Off
        _.access_mode = ossia.AccessMode.Bi
        if callback:
            l = len(signature(callback).parameters)
            if l == 1:
                _.add_callback(callback)
            elif l == 2:
                _.add_callback_param(callback)
            else:
                raise ValueError("callback must have 1 or 2 parameters")
        if value:
            _.value = value

    def set_node_callback(self, node: Node, callback: Callable) -> None:
        """Set a callback to a node
        """
        Logger.debug(f"Setting callback for node {str(node)}")
        l = len(signature(callback).parameters)
        if l == 1:
            node.parameter.add_callback(callback)
        elif l == 2:
            node.parameter.add_callback_param(callback)
        else:
            raise ValueError(f"callback must have 1 or 2 parameters, not {l}")

    @logged
    def set_value(self, node: Union[Node, str], value) -> None:
        """Set a value to a node
        Parameters:
            - node: The node to set the value to
                - str: The path of the node
                - Node: The node object
            - value: The value to set to the node

        Raises:
            - ValueError: If the node is not found
            - ValueError: If the value could not be set to the node
        """
        if isinstance(node, str):
            try:
                node = self.nodes[node]
            except KeyError:
                raise ValueError("Node not found")
        # Impulse parameters: pyossia rejects None — use True to trigger the send
        if node.parameter.value_type == ValueType.Impulse:
            node.parameter.push_value(True)
            return
        node.parameter.push_value(value)
        stored = node.parameter.value
        # Float parameters go through float32 (OSC wire format), so an exact
        # Python float64 equality check produces false negatives (e.g. 0.66).
        # Use a tolerance-based comparison for floats; strict equality for all others.
        if isinstance(value, float):
            if abs(stored - value) > 1e-5:
                raise ValueError(f"Could not set {str(node)} to {value} (got {stored})")
        elif stored != value:
            raise ValueError(f"Could not set {str(node)} to {value}")

    @logged
    def get_value(self, node: Union[Node, str]):
        """Get a value from a node
        Parameters:
            - node: The node to get the value from
                - str: The path of the node
                - Node: The node object

        Returns:
            - value: The value of the node

        Raises:
            - ValueError: If the node is not found
        """
        if isinstance(node, str):
            try:
                node = self.nodes[node]
            except KeyError:
                raise ValueError("Node not found")
        return node.parameter.value

    def create_endpoint(self, path: str, param_args: list | None = None):
        """Create an endpoint as a node with parameter
        """
        try:
            self.set_node(path)
            if param_args and isinstance(param_args, list):
                self.set_parameter(self.nodes[path], *param_args)
            Logger.debug(f"Created endpoint: {path}")
        except Exception as e:
            Logger.error(f"Failed to create endpoint {path}: {type(e).__name__}: {e}")
            raise

    @logged
    def create_endpoints(self, paths: dict[str, Any] | list[str]):
        """Create multiple endpoints
        """
        if isinstance(paths, list):
            for path in paths:
                self.create_endpoint(path)
        elif isinstance(paths, dict):
            for path, params in paths.items():
                self.create_endpoint(path, params)

    def get_endpoints(self) -> dict[str, list[Any]]:
        """Get all endpoints (node paths with their parameter arguments)

        """
        # endpoints_raw = self.iterate_on_children(self.device.root_node)
        Logger.info(f"Getting endpoints from device: {self.device}")
        endpoints = {}
        for path, node in self.nodes.items():
            if node.parameter:
                endpoints[path] = [node.parameter.value_type, None, node.parameter.value]
        return endpoints

    def nodes_from_device(self, node: Node = None) -> dict[str, Node]:
        nodes = {}
        is_root = node is None
        if is_root:
            node = self.device.root_node
        Logger.debug(f"{self.__class__.__name__} Node {node.name} has {len(node.children())} children")
        if len(node.children()) == 0:
            if not is_root:
                nodes[str(node)] = node
            return nodes        
        for n, i in enumerate[int, Node](node.children()):
            Logger.debug(f"Adding child {n} named {i.name}")
            nodes.update(self.nodes_from_device(i))
            # DEV: iteration raises RuntimeError at the end of the loop
            if  n + 1 == len(node.children()):
                Logger.debug(f"All children from {node.name} added")
                break
        return nodes

    def __del__(self):
        self.remove_device()
        del self

create_endpoint(path, param_args=None)

Create an endpoint as a node with parameter

Source code in src/cuemsengine/osc/OssiaNodes.py
174
175
176
177
178
179
180
181
182
183
184
def create_endpoint(self, path: str, param_args: list | None = None):
    """Create an endpoint as a node with parameter
    """
    try:
        self.set_node(path)
        if param_args and isinstance(param_args, list):
            self.set_parameter(self.nodes[path], *param_args)
        Logger.debug(f"Created endpoint: {path}")
    except Exception as e:
        Logger.error(f"Failed to create endpoint {path}: {type(e).__name__}: {e}")
        raise

create_endpoints(paths)

Create multiple endpoints

Source code in src/cuemsengine/osc/OssiaNodes.py
186
187
188
189
190
191
192
193
194
195
@logged
def create_endpoints(self, paths: dict[str, Any] | list[str]):
    """Create multiple endpoints
    """
    if isinstance(paths, list):
        for path in paths:
            self.create_endpoint(path)
    elif isinstance(paths, dict):
        for path, params in paths.items():
            self.create_endpoint(path, params)

get_endpoints()

Get all endpoints (node paths with their parameter arguments)

Source code in src/cuemsengine/osc/OssiaNodes.py
197
198
199
200
201
202
203
204
205
206
207
def get_endpoints(self) -> dict[str, list[Any]]:
    """Get all endpoints (node paths with their parameter arguments)

    """
    # endpoints_raw = self.iterate_on_children(self.device.root_node)
    Logger.info(f"Getting endpoints from device: {self.device}")
    endpoints = {}
    for path, node in self.nodes.items():
        if node.parameter:
            endpoints[path] = [node.parameter.value_type, None, node.parameter.value]
    return endpoints

get_node(path)

Get a node from the collection

Source code in src/cuemsengine/osc/OssiaNodes.py
58
59
60
61
def get_node(self, path: str):
    """Get a node from the collection
    """
    return self.nodes[path]

get_value(node)

Get a value from a node Parameters: - node: The node to get the value from - str: The path of the node - Node: The node object

Returns:

Type Description
  • value: The value of the node

Raises:

Type Description
-ValueError

If the node is not found

Source code in src/cuemsengine/osc/OssiaNodes.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@logged
def get_value(self, node: Union[Node, str]):
    """Get a value from a node
    Parameters:
        - node: The node to get the value from
            - str: The path of the node
            - Node: The node object

    Returns:
        - value: The value of the node

    Raises:
        - ValueError: If the node is not found
    """
    if isinstance(node, str):
        try:
            node = self.nodes[node]
        except KeyError:
            raise ValueError("Node not found")
    return node.parameter.value

remove_device()

Remove the device and all nodes from the collection

Source code in src/cuemsengine/osc/OssiaNodes.py
73
74
75
76
77
78
79
80
81
82
def remove_device(self) -> None:
    """Remove the device and all nodes from the collection
    """
    node_keys = list(self.nodes.keys())
    for node in node_keys:
        self.remove_node(node)
    self.nodes = {}
    del self.device
    sleep(CLEANUP_DELAY)
    self.device = None

remove_node(path)

Remove a node from the collection and all its children

Source code in src/cuemsengine/osc/OssiaNodes.py
63
64
65
66
67
68
69
70
71
def remove_node(self, path: str):
    """Remove a node from the collection and all its children
    """
    if not path or path.strip('/') == '':
        return
    self.device.root_node.remove_child(path)
    children = [k for k in self.nodes.keys() if str(k).startswith(path)]
    for key in children:
        del self.nodes[str(key)]

set_node(path)

Add a new node to the device Node memory address is stored in self.nodes[path] and must be kept to access the node later

Source code in src/cuemsengine/osc/OssiaNodes.py
46
47
48
49
50
51
52
53
54
55
56
def set_node(self, path: str):
    """Add a new node to the device
        Node memory address is stored in self.nodes[path]
        and must be kept to access the node later
    """
    if not self.device:
        raise AttributeError("No device found")
    try:
        self.nodes[path] = self.device.add_node(path)
    except AttributeError:
        self.nodes[path] = self.device.root_node.add_node(path)

set_node_callback(node, callback)

Set a callback to a node

Source code in src/cuemsengine/osc/OssiaNodes.py
108
109
110
111
112
113
114
115
116
117
118
def set_node_callback(self, node: Node, callback: Callable) -> None:
    """Set a callback to a node
    """
    Logger.debug(f"Setting callback for node {str(node)}")
    l = len(signature(callback).parameters)
    if l == 1:
        node.parameter.add_callback(callback)
    elif l == 2:
        node.parameter.add_callback_param(callback)
    else:
        raise ValueError(f"callback must have 1 or 2 parameters, not {l}")

set_parameter(node, value_type, callback=None, value=None, repetition_filter=True) staticmethod

Set a parameter to a node

Source code in src/cuemsengine/osc/OssiaNodes.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
@staticmethod
def set_parameter(node: Node, value_type, callback: Callable = None, value = None, repetition_filter = True):
    """Set a parameter to a node
    """
    if not isinstance(value_type, ValueType):
        raise ValueError("value_type must be a pyossia.ValueType")
    _ = node.create_parameter(value_type)
    # Impulse parameters are fire-and-forget triggers — RepetitionFilter
    # must always be OFF, otherwise ossia silently drops repeated sends.
    if value_type == ValueType.Impulse:
        repetition_filter = False
    _.repetition_filter = ossia.RepetitionFilter.On if repetition_filter else ossia.RepetitionFilter.Off
    _.access_mode = ossia.AccessMode.Bi
    if callback:
        l = len(signature(callback).parameters)
        if l == 1:
            _.add_callback(callback)
        elif l == 2:
            _.add_callback_param(callback)
        else:
            raise ValueError("callback must have 1 or 2 parameters")
    if value:
        _.value = value

set_value(node, value)

Set a value to a node Parameters: - node: The node to set the value to - str: The path of the node - Node: The node object - value: The value to set to the node

Raises:

Type Description
-ValueError

If the node is not found

-ValueError

If the value could not be set to the node

Source code in src/cuemsengine/osc/OssiaNodes.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
@logged
def set_value(self, node: Union[Node, str], value) -> None:
    """Set a value to a node
    Parameters:
        - node: The node to set the value to
            - str: The path of the node
            - Node: The node object
        - value: The value to set to the node

    Raises:
        - ValueError: If the node is not found
        - ValueError: If the value could not be set to the node
    """
    if isinstance(node, str):
        try:
            node = self.nodes[node]
        except KeyError:
            raise ValueError("Node not found")
    # Impulse parameters: pyossia rejects None — use True to trigger the send
    if node.parameter.value_type == ValueType.Impulse:
        node.parameter.push_value(True)
        return
    node.parameter.push_value(value)
    stored = node.parameter.value
    # Float parameters go through float32 (OSC wire format), so an exact
    # Python float64 equality check produces false negatives (e.g. 0.66).
    # Use a tolerance-based comparison for floats; strict equality for all others.
    if isinstance(value, float):
        if abs(stored - value) > 1e-5:
            raise ValueError(f"Could not set {str(node)} to {value} (got {stored})")
    elif stored != value:
        raise ValueError(f"Could not set {str(node)} to {value}")

OssiaServer

Bases: OssiaNodes

Source code in src/cuemsengine/osc/OssiaServer.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class OssiaServer(OssiaNodes):
    def __init__(
            self,
            name: str | None = None,
            log: bool = False,
            host: str = "127.0.0.1",
            remote_port: int = OSCSERVER_REMOTE_PORT,
            local_port: int = OSCSERVER_LOCAL_PORT,
            server: ServerSetupFunction = ServerDevices.OSC,
            endpoints: Union[dict, list] | None = None
        ):
        super().__init__()
        if not name:
            name = self.__class__.__name__
        self.name = name
        self.host = host
        self.device = LocalDevice(name)
        self.logging = log
        self.remote_port = remote_port
        self.local_port = local_port
        self.setup_server(server)
        if endpoints:
            self.create_endpoints(endpoints)

    def setup_server(self, server: ServerSetupFunction) -> None:
        """Create a local OSC server

        Create a local device and set it up to handle oscquery or osc requests
        """
        if not self.device:
            raise RuntimeError("OssiaServer device not bound")
        done = server(self)
        sleep(STARTUP_DELAY)
        self.started = done
        if not done:
            self.remove_device()
            raise Exception("Server setup failed")

    def add_endpoints(self, endpoints) -> None:
        self.create_endpoints(endpoints)

setup_server(server)

Create a local OSC server

Create a local device and set it up to handle oscquery or osc requests

Source code in src/cuemsengine/osc/OssiaServer.py
40
41
42
43
44
45
46
47
48
49
50
51
52
def setup_server(self, server: ServerSetupFunction) -> None:
    """Create a local OSC server

    Create a local device and set it up to handle oscquery or osc requests
    """
    if not self.device:
        raise RuntimeError("OssiaServer device not bound")
    done = server(self)
    sleep(STARTUP_DELAY)
    self.started = done
    if not done:
        self.remove_device()
        raise Exception("Server setup failed")

PyOscServer

Bases: object

Source code in src/cuemsengine/osc/PyOsc.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class PyOscServer(object):
    def __init__(self, host = PYOSC_HOST, port = PYOSC_PORT, endpoints = []):
        self.host = host
        self.port = port
        self.endpoints = endpoints
        self.dispatcher = Dispatcher()
        self.handlers = {}
        self.server = self.new_server()

    def start(self) -> None:
        self.thread = Thread(
            target = self.server.serve_forever,
            daemon = True
        )
        self.thread.start()

    def stop(self) -> None:
        self.server.shutdown()
        self.server.server_close()
        self.thread.join()

    def new_server(self) -> ThreadingOSCUDPServer:
        self.add_handlers()
        return ThreadingOSCUDPServer(
            (self.host, self.port),
            self.dispatcher
        )

    def add_handlers(self) -> None:
        """
        Add handlers to the dispatcher and store them in the handlers dict
        """
        if len(self.endpoints) == 0:
            return
        for endpoint_,function_ in self.endpoints.items():
            self.handlers[endpoint_] = self.dispatcher.map(
                endpoint_, function_
            )

add_handlers()

Add handlers to the dispatcher and store them in the handlers dict

Source code in src/cuemsengine/osc/PyOsc.py
64
65
66
67
68
69
70
71
72
73
def add_handlers(self) -> None:
    """
    Add handlers to the dispatcher and store them in the handlers dict
    """
    if len(self.endpoints) == 0:
        return
    for endpoint_,function_ in self.endpoints.items():
        self.handlers[endpoint_] = self.dispatcher.map(
            endpoint_, function_
        )

WebSocket OSC Handler for receiving OSC messages via WebSocket.

This module provides an async WebSocket listener that receives and parses OSC messages sent over WebSocket connections (as used by OSCQuery protocol). It bypasses pyossia's unreliable WebSocket handling while keeping pyossia for OSCQuery discovery and metadata.

Usage

In an AsyncCommsThread subclass:

async def websocket_osc_task(self): await websocket_osc_listener( host="0.0.0.0", port=9190, message_handler=self.handle_osc_message, stop_check=lambda: self.stop_requested )

def create_all_tasks(self): return [ asyncio.create_task(self.websocket_osc_task()), # ... other tasks ]

WebSocketOscRouter

Routes OSC messages to registered handlers based on address patterns.

This class provides a simple routing mechanism for OSC messages, allowing handlers to be registered for specific OSC addresses or address patterns.

Usage

router = WebSocketOscRouter() router.register('/engine/command/go', handle_go_command) router.register('/engine/command/*', handle_any_command) # Wildcard

In the message handler:

def handle_osc_message(address, args): router.route(address, args)

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
class WebSocketOscRouter:
    """Routes OSC messages to registered handlers based on address patterns.

    This class provides a simple routing mechanism for OSC messages, allowing
    handlers to be registered for specific OSC addresses or address patterns.

    Usage:
        router = WebSocketOscRouter()
        router.register('/engine/command/go', handle_go_command)
        router.register('/engine/command/*', handle_any_command)  # Wildcard

        # In the message handler:
        def handle_osc_message(address, args):
            router.route(address, args)
    """

    def __init__(self):
        self._handlers: dict[str, Callable[[str, list[Any]], None]] = {}
        self._wildcard_handlers: list[tuple[str, Callable[[str, list[Any]], None]]] = []

    def register(self, pattern: str, handler: Callable[[str, list[Any]], None]) -> None:
        """Register a handler for an OSC address pattern.

        Args:
            pattern: OSC address or pattern. Use '*' at the end for wildcard matching.
                    e.g., '/engine/command/go' for exact match
                    e.g., '/engine/command/*' for prefix match
            handler: Callback function to handle messages matching the pattern.
                    Called with (address: str, args: list)
        """
        if pattern.endswith('/*'):
            prefix = pattern[:-1]  # Remove trailing '*', keep '/'
            self._wildcard_handlers.append((prefix, handler))
            Logger.debug(f"Registered wildcard OSC handler: {pattern}")
        else:
            self._handlers[pattern] = handler
            Logger.debug(f"Registered OSC handler: {pattern}")

    def route(self, address: str, args: list[Any]) -> bool:
        """Route an OSC message to the appropriate handler.

        Args:
            address: OSC address (e.g., '/engine/command/go')
            args: List of OSC arguments

        Returns:
            True if a handler was found and called, False otherwise
        """
        # Check exact match first
        if address in self._handlers:
            try:
                self._handlers[address](address, args)
                return True
            except Exception as e:
                Logger.error(f"Error in OSC handler for {address}: {e}")
                return False

        # Check wildcard handlers
        for prefix, handler in self._wildcard_handlers:
            if address.startswith(prefix):
                try:
                    handler(address, args)
                    return True
                except Exception as e:
                    Logger.error(f"Error in wildcard OSC handler for {address}: {e}")
                    return False

        Logger.debug(f"No handler registered for OSC address: {address}")
        return False

    def clear(self) -> None:
        """Remove all registered handlers."""
        self._handlers.clear()
        self._wildcard_handlers.clear()

clear()

Remove all registered handlers.

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
362
363
364
365
def clear(self) -> None:
    """Remove all registered handlers."""
    self._handlers.clear()
    self._wildcard_handlers.clear()

register(pattern, handler)

Register a handler for an OSC address pattern.

Parameters:

Name Type Description Default
pattern str

OSC address or pattern. Use '' at the end for wildcard matching. e.g., '/engine/command/go' for exact match e.g., '/engine/command/' for prefix match

required
handler Callable[[str, list[Any]], None]

Callback function to handle messages matching the pattern. Called with (address: str, args: list)

required
Source code in src/cuemsengine/osc/WebSocketOscHandler.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def register(self, pattern: str, handler: Callable[[str, list[Any]], None]) -> None:
    """Register a handler for an OSC address pattern.

    Args:
        pattern: OSC address or pattern. Use '*' at the end for wildcard matching.
                e.g., '/engine/command/go' for exact match
                e.g., '/engine/command/*' for prefix match
        handler: Callback function to handle messages matching the pattern.
                Called with (address: str, args: list)
    """
    if pattern.endswith('/*'):
        prefix = pattern[:-1]  # Remove trailing '*', keep '/'
        self._wildcard_handlers.append((prefix, handler))
        Logger.debug(f"Registered wildcard OSC handler: {pattern}")
    else:
        self._handlers[pattern] = handler
        Logger.debug(f"Registered OSC handler: {pattern}")

route(address, args)

Route an OSC message to the appropriate handler.

Parameters:

Name Type Description Default
address str

OSC address (e.g., '/engine/command/go')

required
args list[Any]

List of OSC arguments

required

Returns:

Type Description
bool

True if a handler was found and called, False otherwise

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
def route(self, address: str, args: list[Any]) -> bool:
    """Route an OSC message to the appropriate handler.

    Args:
        address: OSC address (e.g., '/engine/command/go')
        args: List of OSC arguments

    Returns:
        True if a handler was found and called, False otherwise
    """
    # Check exact match first
    if address in self._handlers:
        try:
            self._handlers[address](address, args)
            return True
        except Exception as e:
            Logger.error(f"Error in OSC handler for {address}: {e}")
            return False

    # Check wildcard handlers
    for prefix, handler in self._wildcard_handlers:
        if address.startswith(prefix):
            try:
                handler(address, args)
                return True
            except Exception as e:
                Logger.error(f"Error in wildcard OSC handler for {address}: {e}")
                return False

    Logger.debug(f"No handler registered for OSC address: {address}")
    return False

build_osc_message(address, value)

Build a binary OSC message for the given address and value.

Parameters:

Name Type Description Default
address str

OSC address (e.g. '/engine/status/running')

required
value Any

Value to send. Type is inferred: str -> 's', int -> 'i', float -> 'f'.

required

Returns:

Type Description
Optional[bytes]

Bytes to send over WebSocket, or None if building failed.

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def build_osc_message(address: str, value: Any) -> Optional[bytes]:
    """Build a binary OSC message for the given address and value.

    Args:
        address: OSC address (e.g. '/engine/status/running')
        value: Value to send. Type is inferred: str -> 's', int -> 'i', float -> 'f'.

    Returns:
        Bytes to send over WebSocket, or None if building failed.
    """
    if not OscMessageBuilder:
        Logger.warning("pythonosc not available - cannot build OSC message")
        return None
    try:
        builder = OscMessageBuilder(address)
        if value is None:
            builder.add_arg('')
        elif isinstance(value, bool):
            builder.add_arg(value)
        elif isinstance(value, str):
            builder.add_arg(value)
        elif isinstance(value, int):
            builder.add_arg(value)
        elif isinstance(value, float):
            builder.add_arg(value)
        else:
            builder.add_arg(str(value))
        msg = builder.build()
        return msg.dgram
    except Exception as e:
        Logger.debug(f"Error building OSC message: {e}")
        return None

handle_websocket_connection(websocket, message_handler, stop_check, client_set=None, on_connect=None) async

Handle a single WebSocket connection.

Parameters:

Name Type Description Default
websocket

The WebSocket connection

required
message_handler Callable[[str, list[Any]], None]

Callback function to handle parsed OSC messages. Called with (address: str, args: list)

required
stop_check Callable[[], bool]

Function that returns True when the listener should stop

required
client_set Optional[set]

Optional set to track connected clients for broadcast. If provided, websocket is added on connect and removed on disconnect.

None
on_connect Optional[Callable]

Optional async callback called with the websocket after connection is established. Used for sending initial state to new clients.

None
Source code in src/cuemsengine/osc/WebSocketOscHandler.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def handle_websocket_connection(
    websocket,
    message_handler: Callable[[str, list[Any]], None],
    stop_check: Callable[[], bool],
    client_set: Optional[set] = None,
    on_connect: Optional[Callable] = None
) -> None:
    """Handle a single WebSocket connection.

    Args:
        websocket: The WebSocket connection
        message_handler: Callback function to handle parsed OSC messages.
                        Called with (address: str, args: list)
        stop_check: Function that returns True when the listener should stop
        client_set: Optional set to track connected clients for broadcast. If provided,
                    websocket is added on connect and removed on disconnect.
        on_connect: Optional async callback called with the websocket after connection
                    is established. Used for sending initial state to new clients.
    """
    if client_set is not None:
        client_set.add(websocket)
    client_info = f"{websocket.remote_address}" if hasattr(websocket, 'remote_address') else "unknown"
    Logger.info(f"WebSocket OSC client connected: {client_info}")

    if on_connect is not None:
        try:
            await on_connect(websocket)
        except Exception as e:
            Logger.error(f"Error in on_connect callback: {e}")

    try:
        async for message in websocket:
            if stop_check():
                break

            # OSCQuery sends OSC messages as binary WebSocket frames
            if isinstance(message, bytes):
                parsed = parse_osc_message(message)
                if parsed:
                    address, args = parsed
                    Logger.debug(f"WebSocket OSC received: {address} = {args}")
                    try:
                        message_handler(address, args)
                    except Exception as e:
                        Logger.error(f"Error in OSC message handler for {address}: {e}")
            else:
                # Text message - might be JSON for OSCQuery protocol
                Logger.debug(f"WebSocket text message received (ignored): {message[:100] if len(message) > 100 else message}")

    except ConnectionClosed:
        Logger.debug(f"WebSocket OSC client disconnected: {client_info}")
    except Exception as e:
        Logger.error(f"WebSocket OSC connection error: {e}")
    finally:
        if client_set is not None:
            client_set.discard(websocket)
        Logger.debug(f"WebSocket OSC connection closed: {client_info}")

parse_osc_message(data)

Parse a binary OSC message.

Parameters:

Name Type Description Default
data bytes

Raw binary OSC message data

required

Returns:

Type Description
tuple[str, list[Any]] | None

Tuple of (address, arguments) if successful, None if parsing fails

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def parse_osc_message(data: bytes) -> tuple[str, list[Any]] | None:
    """Parse a binary OSC message.

    Args:
        data: Raw binary OSC message data

    Returns:
        Tuple of (address, arguments) if successful, None if parsing fails
    """
    if not osc_types:
        Logger.error("python-osc library not available")
        return None

    try:
        # OSC message format: address (null-padded to 4 bytes), type tag string, arguments
        # Use pythonosc's parsing utilities
        address, index = osc_types.get_string(data, 0)

        if index >= len(data):
            # No type tag string - address-only message (like an impulse)
            return (address, [])

        # Get type tag string
        type_tags, index = osc_types.get_string(data, index)

        if not type_tags.startswith(','):
            Logger.warning(f"Invalid OSC type tag string: {type_tags}")
            return (address, [])

        # Parse arguments based on type tags
        args = []
        for tag in type_tags[1:]:  # Skip the leading ','
            if tag == 'i':
                value, index = osc_types.get_int(data, index)
                args.append(value)
            elif tag == 'f':
                value, index = osc_types.get_float(data, index)
                args.append(value)
            elif tag == 's':
                value, index = osc_types.get_string(data, index)
                args.append(value)
            elif tag == 'b':
                value, index = osc_types.get_blob(data, index)
                args.append(value)
            elif tag == 'T':
                args.append(True)
            elif tag == 'F':
                args.append(False)
            elif tag == 'N':
                args.append(None)
            elif tag == 'I':
                # Impulse/Infinitum - no value
                args.append(None)
            elif tag == 't':
                # OSC timetag (8 bytes)
                value, index = osc_types.get_timetag(data, index)
                args.append(value)
            elif tag == 'd':
                # Double precision float
                value, index = osc_types.get_double(data, index)
                args.append(value)
            else:
                Logger.warning(f"Unknown OSC type tag: {tag}")

        return (address, args)

    except Exception as e:
        Logger.debug(f"Error parsing OSC message: {e}")
        return None

websocket_osc_listener(host, port, message_handler, stop_check, existing_server_check=None, client_set=None, on_connect=None) async

Async WebSocket OSC listener.

Listens for WebSocket connections and parses incoming binary OSC messages. Routes parsed messages to the provided handler callback.

Parameters:

Name Type Description Default
host str

Host address to bind to (e.g., "0.0.0.0" or "127.0.0.1")

required
port int

Port to listen on (typically the OSCQuery WebSocket port)

required
message_handler Callable[[str, list[Any]], None]

Callback function to handle parsed OSC messages. Called with (address: str, args: list)

required
stop_check Callable[[], bool]

Function that returns True when the listener should stop

required
existing_server_check Optional[Callable[[], bool]]

Optional function that returns True if an existing server is already listening on the port. If True, the listener will not start its own server.

None
Note

The OSCQuery protocol uses the same WebSocket port for both discovery (JSON messages) and OSC value updates (binary messages). This listener only processes binary OSC messages and ignores JSON messages.

If pyossia's OSCQuery server is already using the port, you may need to either: 1. Disable pyossia's WebSocket handler and use this one exclusively 2. Run this on a different port and update the UI configuration 3. Intercept messages at a different layer

Source code in src/cuemsengine/osc/WebSocketOscHandler.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
async def websocket_osc_listener(
    host: str,
    port: int,
    message_handler: Callable[[str, list[Any]], None],
    stop_check: Callable[[], bool],
    existing_server_check: Optional[Callable[[], bool]] = None,
    client_set: Optional[set] = None,
    on_connect: Optional[Callable] = None
) -> None:
    """Async WebSocket OSC listener.

    Listens for WebSocket connections and parses incoming binary OSC messages.
    Routes parsed messages to the provided handler callback.

    Args:
        host: Host address to bind to (e.g., "0.0.0.0" or "127.0.0.1")
        port: Port to listen on (typically the OSCQuery WebSocket port)
        message_handler: Callback function to handle parsed OSC messages.
                        Called with (address: str, args: list)
        stop_check: Function that returns True when the listener should stop
        existing_server_check: Optional function that returns True if an existing
                              server is already listening on the port. If True,
                              the listener will not start its own server.

    Note:
        The OSCQuery protocol uses the same WebSocket port for both discovery
        (JSON messages) and OSC value updates (binary messages). This listener
        only processes binary OSC messages and ignores JSON messages.

        If pyossia's OSCQuery server is already using the port, you may need
        to either:
        1. Disable pyossia's WebSocket handler and use this one exclusively
        2. Run this on a different port and update the UI configuration
        3. Intercept messages at a different layer
    """
    if not websockets:
        Logger.error("websockets library not available - cannot start WebSocket OSC listener")
        return

    if existing_server_check and existing_server_check():
        Logger.info(f"Existing server detected on {host}:{port}, WebSocket OSC listener not starting own server")
        return

    Logger.info(f"Starting WebSocket OSC listener on ws://{host}:{port}")

    try:
        async with websocket_serve(
            lambda ws: handle_websocket_connection(ws, message_handler, stop_check, client_set, on_connect),
            host,
            port,
            # Allow concurrent connections
            max_size=2**20,  # 1 MB max message size
            # Ping/pong for keepalive
            ping_interval=20,
            ping_timeout=20,
        ):
            Logger.info(f"WebSocket OSC listener started on ws://{host}:{port}")
            # Keep running until stop is requested
            while not stop_check():
                await asyncio.sleep(0.1)

    except OSError as e:
        if "already in use" in str(e).lower() or e.errno == 98:
            Logger.warning(f"WebSocket port {port} already in use (likely by pyossia OSCQuery server)")
            Logger.info("WebSocket OSC listener will not start - pyossia is handling WebSocket connections")
            Logger.info("Commands will be received via HTTP polling fallback")
        else:
            Logger.error(f"WebSocket OSC listener error: {e}")
    except Exception as e:
        Logger.error(f"WebSocket OSC listener error: {e}")
    finally:
        Logger.info("WebSocket OSC listener stopped")

add_callback_to_all(endpoints, func)

Include the function to the endpoints dictionary

Parameters:

Name Type Description Default
endpoints dict

the endpoints dictionary

required
func Callable

the function to include

required
Source code in src/cuemsengine/osc/helpers.py
138
139
140
141
142
143
144
145
def add_callback_to_all(endpoints: dict, func: Callable) -> dict:
    """Include the function to the endpoints dictionary

    Args:
        endpoints (dict): the endpoints dictionary
        func (Callable): the function to include
    """
    return {key: [value[0], func] for key, value in endpoints.items()}

add_callbacks_from_dict(endpoints, cmd_dict)

Include the function endpoints in the endpoints dictionary

Parameters:

Name Type Description Default
endpoints dict

the endpoints dictionary

required
cmd_dict dict

the command dictionary

required

Returns:

Name Type Description
dict dict

the endpoints dictionary with the function endpoints included

Source code in src/cuemsengine/osc/helpers.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def add_callbacks_from_dict(endpoints: dict, cmd_dict: dict[str, Callable]) -> dict:
    """Include the function endpoints in the endpoints dictionary

    Args:
        endpoints (dict): the endpoints dictionary
        cmd_dict (dict): the command dictionary

    Returns:
        dict: the endpoints dictionary with the function endpoints included
    """
    for key, value in endpoints.items():
        func = cmd_dict.get(key.split('/')[-1])
        if func:
            endpoints[key] = [value[0], func]
    return endpoints

add_prefix_to_all(endpoints, prefix)

Add a prefix to the endpoints dictionary

Parameters:

Name Type Description Default
endpoints dict

the endpoints dictionary

required
prefix str

the prefix to add

required
Source code in src/cuemsengine/osc/helpers.py
147
148
149
150
151
152
153
154
def add_prefix_to_all(endpoints: dict, prefix: str) -> dict:
    """Add a prefix to the endpoints dictionary

    Args:
        endpoints (dict): the endpoints dictionary
        prefix (str): the prefix to add
    """
    return {prefix + key: value for key, value in endpoints.items()}

deserialize_node(node_data, parent_node=None)

Deserialize a dictionary structure into pyossia nodes.

Parameters: - node_data: The serialized node structure - parent_node: Optional parent node to attach to

Returns: - pyossia.ossia.Node: The reconstructed node

Source code in src/cuemsengine/osc/helpers.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def deserialize_node(node_data: dict, parent_node: Optional[Node] = None) -> Node:
        """
        Deserialize a dictionary structure into pyossia nodes.

        Parameters:
        - node_data: The serialized node structure
        - parent_node: Optional parent node to attach to

        Returns:
        - pyossia.ossia.Node: The reconstructed node
        """
        if parent_node is None:
            raise ValueError("Parent node required for deserialization")

        # Create the node
        node = parent_node.add_node(node_data["name"])

        # Recreate parameter if it existed
        if node_data.get("parameter"):
            param_dict = node_data["parameter"]
            param = node.create_parameter(ValueType.String)  # Default type

            # Set parameter properties
            if param_dict.get("value") is not None:
                try:
                    param.value = param_dict["value"]
                except:
                    Logger.warning(f"Could not set value for parameter at {node.name}")

        # Recursively create children
        for child_data in node_data.get("children", []):
            deserialize_node(child_data, node)

        return node

new_osc_device(cls)

An OSC device is required to deal with a remote application using OSC protocol

Parameters:

Name Type Description Default
name str

name of the device

required
host str

host ip address

required
remote_port int

port where osc messages have to be sent to be catch by a remote client to listen to the local device

required
local_port int

port where OSC requests have to be sent by any remote client to deal with the local device

required

Returns:

Name Type Description
OSCDevice OSCDevice

an OSC device

Source code in src/cuemsengine/osc/helpers.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def new_osc_device(cls) -> OSCDevice:
    """An OSC device is required to deal with a remote application using OSC protocol

    Args:
        name (str): name of the device
        host (str): host ip address
        remote_port (int): port where osc messages have to be sent to be catch by a remote client to listen to the local device
        local_port (int): port where OSC requests have to be sent by any remote client to deal with the local device

    Returns:
        OSCDevice: an OSC device
    """
    x = OSCDevice(
        cls.name,
        cls.host,
        cls.remote_port,
        cls.local_port
    )
    Logger.debug(f"OSCDevice created: {x}, remote_port: {cls.remote_port}, local_port: {cls.local_port}")
    return x

serialize_node(node)

Serialize a pyossia node and its children to a dictionary structure.

Parameters: - node: The pyossia node to serialize

Returns: - dict: Serialized node structure

Source code in src/cuemsengine/osc/helpers.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
def serialize_node(node: Node) -> dict:
        """
        Serialize a pyossia node and its children to a dictionary structure.

        Parameters:
        - node: The pyossia node to serialize

        Returns:
        - dict: Serialized node structure
        """
        node_dict = {
            "name": node.name,
            "children": [],
            "parameter": None
        }

        # Serialize parameter if exists
        param = node.parameter
        if param:
            param_dict = {
                "access": str(param.access_mode),
                "bounding": str(param.bounding_mode),
                "type": str(param.value_type) if hasattr(param, 'value_type') else None,
            }

            # Try to get current value
            try:
                value = param.value
                # Convert value to JSON-serializable format
                if hasattr(value, '__iter__') and not isinstance(value, str):
                    param_dict["value"] = list(value)
                else:
                    param_dict["value"] = value
            except:
                param_dict["value"] = None

            # Get other parameter properties
            try:
                param_dict["domain"] = str(param.domain) if hasattr(param, 'domain') else None
                param_dict["unit"] = str(param.unit) if hasattr(param, 'unit') else None
            except:
                pass

            node_dict["parameter"] = param_dict

        # Recursively serialize children
        for child in node.children():
            node_dict["children"].append(serialize_node(child))

        return node_dict

set_osc_server(cls)

LocalDevice.create_osc_server

Make the local device able to handle osc request and emit osc message

Parameters:

Name Type Description Default
host str

host ip address

required
remote_port int

port where osc messages have to be sent to be catch by a remote client to listen to the local device

required
local_port int

port where OSC requests have to be sent by any remote client to deal with the local device

required
log bool

enable protocol logging

required

Returns:

Name Type Description
bool bool

True if the server has been created successfully

Source code in src/cuemsengine/osc/helpers.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def set_osc_server(cls) -> bool:
    """LocalDevice.create_osc_server

    Make the local device able to handle osc request and emit osc message

    Args:
        host (str): host ip address
        remote_port (int): port where osc messages have to be sent to be catch by a remote client to listen to the local device
        local_port (int): port where OSC requests have to be sent by any remote client to deal with the local device
        log (bool): enable protocol logging

    Returns:
        bool: True if the server has been created successfully
    """
    Logger.debug(f'creating osc server for {cls.name} on {cls.host}:{cls.local_port} -> {cls.remote_port}')
    return cls.device.create_osc_server(
        cls.host,
        cls.remote_port,
        cls.local_port,
        cls.logging
    )

set_oscquery_server(cls)

LocalDevice.create_oscquery_server

Make the local device able to handle oscquery request

Parameters:

Name Type Description Default
osc_port int

port where OSC requests have to be sent by any remote client to deal with the local device

required
log bool

enable protocol logging

required

Returns:

Name Type Description
bool bool

True if the server has been created successfully

Source code in src/cuemsengine/osc/helpers.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def set_oscquery_server(cls) -> bool:
    """LocalDevice.create_oscquery_server

    Make the local device able to handle oscquery request

    Args:
        osc_port (int): port where OSC requests have to be sent by any remote client to deal with the local device
        ws_port (int) port where WebSocket requests have to be sent by any remote client to deal with the local device
        log (bool): enable protocol logging

    Returns:
        bool: True if the server has been created successfully
    """
    Logger.debug(f'creating oscquery server on {cls.host}:{cls.remote_port} -> {cls.local_port}')

    try:
        return cls.device.create_oscquery_server(
            cls.local_port,
            cls.remote_port,
            cls.logging
        )
    except Exception as e:
        Logger.error(f"{type(e).__name__} creating oscquery server: {e}")
        raise e