Skip to content

Core

BaseEngine

Bases: SignalEngine

Source code in src/cuemsengine/core/BaseEngine.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
class BaseEngine(SignalEngine):
    def __init__(self, with_cm: bool = True, with_mtc: bool = True, with_signals: bool = True):
        """
        Initialize the BaseEngine.

        Args:
            with_cm (bool): Whether to initialize the ConfigManager. Default is True.
            with_mtc (bool): Whether to initialize the MTC listener. Default is True.
            with_signals (bool): Whether to initialize the SignalEngine. Default is True.
        """
        # Engine parameters
        self.with_cm = with_cm
        self.with_mtc = with_mtc
        self.with_signals = with_signals
        self.go_offset = None  # None = not computing timecode; 0 = raw MTC
        self.script: CuemsScript = None
        self.stop_requested = False
        self.node_name = None
        self.node_host = None
        self.mtc_port = MTC_PORT
        self.timecode = None
        self.status = EngineStatus()
        self.oscquery_client_list: list[OssiaClient] = []

        super().__init__(with_signals=with_signals)

        if self.with_cm:
            self.set_config_manager()
        if self.with_mtc:
            self.set_mtc_listener()

        ## dev: CUE "POINTERS":
        # here we use the "standard" point of view that there is an
        # ongoing cue already running (one or many, at least the last to be gone)
        # and a pointer indicating which is the next to be gone when go is pressed

        self.ongoing_cue = None
        self.next_cue_pointer = None
        self.show_locked = False

        Logger.info(f"{self.__class__.__name__}@{self.node_name} initialized, waiting start signal")

    @property
    def timecode(self) -> str | None:
        return self._timecode

    @timecode.setter
    def timecode(self, value: str | None) -> None:
        self._timecode = value
        if hasattr(self, 'on_timecode_change'):
            self.on_timecode_change(value) # type: ignore[attr-defined]

    def stop_all(self) -> None:
        if self.with_mtc:
            try:
                self.stop_mtc_listener()
            except Exception as e:
                Logger.error(f'Error stopping MTC listener: {e}')
                raise e
        try:
            self.remove_show_lock_file()
        except Exception as e:
            Logger.error(f'Error removing show lock file: {e}')
            raise e

    ### STATUS ###
    def set_status(self, property: str, value: str, strict: bool = False) -> None:
        """Set the status of the engine

        Args:
            property (str): The property to set
            value (str): The value to set
            strict (bool): If True, raise an AttributeError if the property is not found
        """
        if f"_{property}" in self.status.__dict__.keys():
            Logger.debug(f'Setting property {property} to {value}')
            self.status.__setattr__(property, value)
        else:
            Logger.error(f'Property {property} not found in EngineStatus')
            if strict:
                raise AttributeError(f'Property {property} not found in EngineStatus')

    def get_status(self, property: str, strict: bool = False) -> str:
        """Get the status of the engine

        Args:
            property (str): The property to get
            strict (bool): If True, raise an AttributeError if the property is not found
        """
        value = getattr(self.status, property, "NotFound")
        if value == "NotFound":
            Logger.error(f'Property {property} not found in EngineStatus')
            if strict:
                raise AttributeError(f'Property {property} not found in EngineStatus')
        return value

    def status_callback(self, endpoint: str, value: str) -> None:
        """Callback for the status endpoint"""
        Logger.debug(f'Status callback received: {endpoint} = {value}')
        parameter = str(endpoint).split('/')[-1]
        self.set_status(parameter, value)

    def get_all_status_names(self) -> list[str]:
        return [i[1:] for i in vars(self.status).keys()]

    def get_status_endpoints(self) -> dict[str, list[Any]]:
        endpoints = self.build_endpoints_from_status()
        Logger.debug(f"Status endpoints: {endpoints}")
        # remove unwanted callbacks from status nodes that are set programmatically
        # to avoid callback loops and threading issues when push_value() is called
        for i in ["currentcue", "running", "load", "timecode", "armed"]:
            if f"/engine/status/{i}" in endpoints:
                endpoints[f"/engine/status/{i}"][1] = None
        return endpoints

    def build_endpoints_from_status(self) -> dict[str, list[Any, Callable | None, Any]]:
        endpoints = {}
        Logger.debug(f"Building endpoints from status, vars: {list(vars(self.status).keys())}")
        for k, v in vars(self.status).items():
            if v is None:
                Logger.debug(f"Skipping {k} (value is None)")
                continue
            type_name = type(v).__name__
            # Map Python type names to pyossia type names
            if type_name == 'str':
                type_name = 'string'
            if type_name not in VALUE_TYPES_DICT:
                Logger.warning(f"Unknown value type {type_name} for status property {k}, skipping")
                continue
            endpoint_path = f"/engine/status/{k[1:]}"
            endpoints[endpoint_path] = [VALUE_TYPES_DICT[type_name], self.status_callback, v]
            Logger.debug(f"Added endpoint: {endpoint_path} with type {type_name} and value {v}")
        return endpoints   

    ### OSCQUERY ###
    def set_oscquery_server(self, endpoints: dict = None, host: str = None, port: int = None):
        if port is None:
            # Try to get port from config, fallback to default
            if hasattr(self, 'cm') and self.cm and hasattr(self.cm, 'node_conf') and self.cm.node_conf:
                port = self.cm.node_conf.get('oscquery_ws_port', 9001)
            else:
                port = 9001  # Default OSCQuery port
        if host is None:
            # For ControllerEngine, controller_ip might be None, use CONTROLLER_HOST as fallback
            host = getattr(self, 'controller_ip', None) or CONTROLLER_HOST
        local_port = PORT_HANDLER.new_random_port()
        if local_port is None:
            raise RuntimeError("Failed to get random port for OSCQuery server")
        self.oscquery_server = OssiaServer(
            host = host,
            local_port = local_port,
            remote_port = port,
            server = ServerDevices.OSCQUERY,
            endpoints = endpoints
        )

    def set_oscquery_client(self, host: str = None, port: int = None) -> OssiaClient:
        if port is None:
            port = self.cm.node_conf['oscquery_ws_port']
        if host is None:
            host = self.controller_ip
        oscquery_client = OssiaClient(
            host = host,
            local_port = PORT_HANDLER.new_random_port(),
            remote_port = port,
            remote_type = ClientDevices.OSCQUERY
        )
        Logger.debug(f"OscQueryClient created: {oscquery_client}")
        self.oscquery_client_list.append(oscquery_client)
        return oscquery_client

    ### MTC LISTENER ###
    def set_mtc_listener(self) -> None:
        """Set the MTC listener"""
        mtc_step = partial(BaseEngine.mtc_callback, self)
        mtc_reset = partial(BaseEngine.mtc_callback, self, CTimecode('00:00:00:00'))

        if not self.mtc_port:
            self.mtc_port = self.cm.node_conf['mtc_port']

        if self.mtc_port is not None:
            self.mtc_listener = MtcListener(
                port=self.mtc_port,
                step_callback = mtc_step,
                reset_callback = mtc_reset
            )
        else:
            Logger.error('MTC port not set, cannot create MtcListener')
            self.stop()
            exit(-1)

    def stop_mtc_listener(self) -> None:
        if self.mtc_listener is not None and self.mtc_listener.is_alive():
            try:
                self.mtc_listener.stop()
                self.mtc_listener.join()
                self.mtc_listener = None
            except Exception as e:
                Logger.error(f'Error stopping MTC listener: {e}')
                raise e

    def reset_script(self) -> None:
        if self.script:
            self.script = None
            self.ongoing_cue = None
            self.next_cue_pointer = None
            self.go_offset = None
            # Only set OSCQuery values if server exists and has the nodes
            if hasattr(self, 'oscquery_server') and self.oscquery_server:
                try:
                    self.oscquery_server.set_value('/engine/status/running', "no")
                    self.oscquery_server.set_value('/engine/status/gocue', "no")
                except ValueError as e:
                    Logger.warning(f"Could not reset OSCQuery status nodes: {e}. Server may not be fully initialized.")

    def mtc_callback(self, mtc: CTimecode) -> None:
        if self.go_offset is not None:
            # Drift = current MTC - GO-time MTC. Both _exact for sub-ms
            # precision at NTSC framerates (29.97/23.976).
            self.timecode = mtc.milliseconds_exact - self.go_offset

    ### CONFIG MANAGER ###
    def set_config_manager(self) -> None:
        """Set the ConfigManager"""
        from cuemsutils.xml import ProjectMappings
        try:
            self.cm = ConfigManager(load_all=True)
            self.node_host = f"http://{self.cm.node_conf['uuid'][-12:]}.local"
        except FileNotFoundError:
            Logger.error('Node config file could not be found. Exiting !!!!!')
            exit(-1)
        except Exception as e:
            Logger.error(f'Exception while loading config: {e}')
            exit(-1)
        Logger.info(f'Node conf: {self.cm.node_conf}')
        # Get node name from config as a check step
        try:
            self.node_name = str(self.cm.node_conf['uuid'])
        except KeyError:
            Logger.error('Node name not found in config. Exiting !!!!!')
            exit(-1)

        # Get tmp path from config as a check step
        try:
            self.tmp_path = str(self.cm.tmp_path)
        except KeyError:
            Logger.error('Tmp path not found in config. Exiting !!!!!')
            exit(-1)

        # Get controller IP from network map
        try:
            self.controller_ip = self.get_controller_ip()
            Logger.info(f'Controller IP: {self.controller_ip}')
        except Exception as e:
            Logger.error(f'{type(e)} while getting controller IP: {e}')
            exit(-1)

    def get_controller_ip(self) -> str:
        """Set the controller IP address"""
        if not hasattr(self, 'cm') or not self.cm.network_map:
            raise AttributeError('No network map found')
        nodes = self.cm.network_map['node_list']
        if not nodes:
            raise ValueError('No nodes found in network map')
        for node_item in nodes:
            node = node_item.get('node', {}) if isinstance(node_item, dict) else {}
            if node.get('node_type') == CONTROLLER_NETWORK_FLAG:
                return node.get('ip')
        raise ValueError('No controller node found in network map')

    def find_hosts(self) -> list[dict[str, str | bool]]:
        """
        Extract the list of adopted online hosts in the network map

        Returns:
        - list[dict[str, str | bool]]: List of hosts with their IP, uuid and controller flag

        Exceptions:
        - ValueError: No nodes found in network map
        - AttributeError: No controller found in network map
        """
        Logger.info(f'Looking for hosts in network map')
        network_dict = self.cm.network_map
        if not network_dict:
            raise ValueError('No network map not found')
        nodes, _ = self.cm.network_map.get_nodes_by_adoption(network_dict)
        if not nodes:
            raise ValueError('No adopted nodes found in network map')
        hosts = [
            {'ip': node.get('ip'), 'uuid': node.get('uuid'), 'controller': node.get('node_type') == CONTROLLER_NETWORK_FLAG}
            for node in nodes
            if node.get('online') == 'True'
        ]
        if not any(host.get('controller') for host in hosts):
            raise AttributeError('No controller found in network map')
        if len([host for host in hosts if host.get('controller')]) > 1:
            raise AttributeError('Multiple controllers found in network map')
        return hosts

    def print_all_status(self) -> None:
        Logger.info('STATUS REQUEST BY SIGUSR2 SIGNAL')
        if self.cm.is_alive():
            Logger.info(self.cm.getName() + ' is alive)')
        else:
            Logger.info(self.cm.getName() + ' is not alive, trying to restore it')
            self.cm.start()

        '''
        if self.ws_server.is_alive():
            Logger.info(self.ws_server.getName() + ' is alive')
            try:
                # os.kill(self.ws_pid, 0)
            except OSError:
                Logger.info('\tws child process is NOT running')
            else:
                Logger.info('\tws child process is running')
        else:
            Logger.info(self.ws_server.getName() + ' is not alive, trying to restore it')
            # self.ws_server.start()
        '''

        Logger.info(f'MTC: {self.mtc_listener.timecode()}')

    ### SHOW LOCK FILE ###
    def set_show_lock_file(self): # DEV: static
        if not path.isfile(SHOW_LOCK_PATH):
            try:
                with open(SHOW_LOCK_PATH, 'w') as file:
                    file.write(' ')
                Logger.info("/tmp/cuems.show.lock file written...")
                self.show_locked = True
            except:
                Logger.warning("Could not write show lock file")
        else:
            Logger.info(f'Show lock file {SHOW_LOCK_PATH} already exists')
            self.show_locked = True

    def remove_show_lock_file(self): # DEV: static
        if path.isfile(SHOW_LOCK_PATH):
            try:
                remove(SHOW_LOCK_PATH)
                Logger.info("/tmp/cuems.show.lock file removed...")
                self.show_locked = False
            except OSError:
                Logger.warning("Could not delete master lock file")
        else:
            Logger.info(f'Show lock file {SHOW_LOCK_PATH} does not exist')
            self.show_locked = False

    @logged
    def read_script(self, project_name: str) -> None:
        xml_file = path.join(self.cm.library_path, 'projects', project_name, 'script.xml')
        if not path.isfile(xml_file):
            raise FileNotFoundError(f'Script file {xml_file} not found')
        reader = XmlReaderWriter(
            schema_name = 'script',
            xmlfile = xml_file
        )
        self.script = reader.read_to_objects()

    @logged
    def initial_cuelist_process(self, cuelist: CueList = None):
        ''' 
        Review all the items recursively to update target uuids and objects
        and to load all the "loaded" flagged
        '''

        if not self.script:
            Logger.error('No script found, need to load a project first')
            raise ValueError('Script is not loaded')

        if cuelist is None:
            cuelist = self.script.cuelist
        Logger.info(f'Processing {type(cuelist).__name__}: {cuelist.id}')
        if not hasattr(cuelist, 'contents') or not cuelist.contents or len(cuelist.contents) == 0:
            Logger.warning('Cuelist contents is empty, nothing to process')
            return

        cuelist.localize_cue(self.cm.node_uuid)
        CUE_HANDLER.arm(cuelist, True)

        for index, item in enumerate(cuelist.contents):
            if item is None:
                Logger.warning(f'Skipping None item at index {index} in cuelist {cuelist.id}')
                continue

            try:
                if isinstance(item, CueList):
                    self.initial_cuelist_process(item)

                item.localize_cue(self.cm.node_uuid)

                if item.target is None or item.target == "":
                    if (index + 1) == len(cuelist.contents):
                        '''
                        If the item is the last in the cuelist we leave the
                        target fields as None
                        '''
                        item.target = None
                        item._target_object = None
                    else:
                        next_item = cuelist.contents[index + 1]
                        if next_item is not None:
                            item.target = next_item.id
                            item._target_object = next_item
                        else:
                            item.target = None
                            item._target_object = None
                else:
                    item._target_object = self.script.find(item.target)
                    if item._target_object is None:
                        Logger.warning(f'{type(item).__name__} {item.id} has target {item.target} that could not be found in the script (deleted?)')

                Logger.debug(f'Target object for {type(item)} {item.id} is {item._target_object}')
                if isinstance(item, ActionCue):
                    item._action_target_object = self.script.find(item.action_target)
                    if item._action_target_object is None and item.action_target:
                        Logger.warning(f'ActionCue {item.id} has action_target {item.action_target} that could not be found in the script (deleted?)')

            except Exception as e:
                Logger.error(f'Error processing item at index {index} in cuelist {cuelist.id}: {e}')
                continue

        # Arm first cue + duration-aware lookahead. The sliding window
        # (_arm_ahead in go/go_threaded) arms subsequent cues during
        # playback. For post_go='go' chains, arm() recursively arms the
        # entire chain. For go_at_end chains, only 2 cues with meaningful
        # duration are armed, saving resources for large projects.
        if cuelist.contents:
            first_cue = None
            for c in cuelist.contents:
                if c.enabled:
                    first_cue = c
                    break
            # If the cuelist's first cue isn't ours, walk the post_go='go' chain
            # to find our first local cue — same shape as NodeEngine.go_script.
            # Without this, slaves don't pre-arm anything at load time and the
            # /videocomposer/layer/load only fires when GO is hit, producing
            # staggered starts as the async loads complete in arrival order.
            first_local = first_cue
            walked = 0
            while first_local is not None and not getattr(first_local, '_local', False):
                if first_local.post_go != 'go':
                    first_local = None
                    break
                first_local = getattr(first_local, '_target_object', None)
                walked += 1
                if walked > 1024:
                    first_local = None
                    break
            if first_local is not None:
                if first_local is not first_cue:
                    Logger.info(
                        f'Pre-arm: skipped {walked} non-local cue(s); arming first '
                        f'local cue {first_local.id} + lookahead')
                else:
                    Logger.info(f'Arming first enabled cue + lookahead for {type(cuelist).__name__}: {cuelist.id}')
                CUE_HANDLER.arm(first_local, True)
                CUE_HANDLER._arm_ahead(first_local)

__init__(with_cm=True, with_mtc=True, with_signals=True)

Initialize the BaseEngine.

Parameters:

Name Type Description Default
with_cm bool

Whether to initialize the ConfigManager. Default is True.

True
with_mtc bool

Whether to initialize the MTC listener. Default is True.

True
with_signals bool

Whether to initialize the SignalEngine. Default is True.

True
Source code in src/cuemsengine/core/BaseEngine.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self, with_cm: bool = True, with_mtc: bool = True, with_signals: bool = True):
    """
    Initialize the BaseEngine.

    Args:
        with_cm (bool): Whether to initialize the ConfigManager. Default is True.
        with_mtc (bool): Whether to initialize the MTC listener. Default is True.
        with_signals (bool): Whether to initialize the SignalEngine. Default is True.
    """
    # Engine parameters
    self.with_cm = with_cm
    self.with_mtc = with_mtc
    self.with_signals = with_signals
    self.go_offset = None  # None = not computing timecode; 0 = raw MTC
    self.script: CuemsScript = None
    self.stop_requested = False
    self.node_name = None
    self.node_host = None
    self.mtc_port = MTC_PORT
    self.timecode = None
    self.status = EngineStatus()
    self.oscquery_client_list: list[OssiaClient] = []

    super().__init__(with_signals=with_signals)

    if self.with_cm:
        self.set_config_manager()
    if self.with_mtc:
        self.set_mtc_listener()

    ## dev: CUE "POINTERS":
    # here we use the "standard" point of view that there is an
    # ongoing cue already running (one or many, at least the last to be gone)
    # and a pointer indicating which is the next to be gone when go is pressed

    self.ongoing_cue = None
    self.next_cue_pointer = None
    self.show_locked = False

    Logger.info(f"{self.__class__.__name__}@{self.node_name} initialized, waiting start signal")

find_hosts()

Extract the list of adopted online hosts in the network map

Returns: - list[dict[str, str | bool]]: List of hosts with their IP, uuid and controller flag

Exceptions: - ValueError: No nodes found in network map - AttributeError: No controller found in network map

Source code in src/cuemsengine/core/BaseEngine.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def find_hosts(self) -> list[dict[str, str | bool]]:
    """
    Extract the list of adopted online hosts in the network map

    Returns:
    - list[dict[str, str | bool]]: List of hosts with their IP, uuid and controller flag

    Exceptions:
    - ValueError: No nodes found in network map
    - AttributeError: No controller found in network map
    """
    Logger.info(f'Looking for hosts in network map')
    network_dict = self.cm.network_map
    if not network_dict:
        raise ValueError('No network map not found')
    nodes, _ = self.cm.network_map.get_nodes_by_adoption(network_dict)
    if not nodes:
        raise ValueError('No adopted nodes found in network map')
    hosts = [
        {'ip': node.get('ip'), 'uuid': node.get('uuid'), 'controller': node.get('node_type') == CONTROLLER_NETWORK_FLAG}
        for node in nodes
        if node.get('online') == 'True'
    ]
    if not any(host.get('controller') for host in hosts):
        raise AttributeError('No controller found in network map')
    if len([host for host in hosts if host.get('controller')]) > 1:
        raise AttributeError('Multiple controllers found in network map')
    return hosts

get_controller_ip()

Set the controller IP address

Source code in src/cuemsengine/core/BaseEngine.py
288
289
290
291
292
293
294
295
296
297
298
299
def get_controller_ip(self) -> str:
    """Set the controller IP address"""
    if not hasattr(self, 'cm') or not self.cm.network_map:
        raise AttributeError('No network map found')
    nodes = self.cm.network_map['node_list']
    if not nodes:
        raise ValueError('No nodes found in network map')
    for node_item in nodes:
        node = node_item.get('node', {}) if isinstance(node_item, dict) else {}
        if node.get('node_type') == CONTROLLER_NETWORK_FLAG:
            return node.get('ip')
    raise ValueError('No controller node found in network map')

get_status(property, strict=False)

Get the status of the engine

Parameters:

Name Type Description Default
property str

The property to get

required
strict bool

If True, raise an AttributeError if the property is not found

False
Source code in src/cuemsengine/core/BaseEngine.py
113
114
115
116
117
118
119
120
121
122
123
124
125
def get_status(self, property: str, strict: bool = False) -> str:
    """Get the status of the engine

    Args:
        property (str): The property to get
        strict (bool): If True, raise an AttributeError if the property is not found
    """
    value = getattr(self.status, property, "NotFound")
    if value == "NotFound":
        Logger.error(f'Property {property} not found in EngineStatus')
        if strict:
            raise AttributeError(f'Property {property} not found in EngineStatus')
    return value

initial_cuelist_process(cuelist=None)

Review all the items recursively to update target uuids and objects and to load all the "loaded" flagged

Source code in src/cuemsengine/core/BaseEngine.py
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
@logged
def initial_cuelist_process(self, cuelist: CueList = None):
    ''' 
    Review all the items recursively to update target uuids and objects
    and to load all the "loaded" flagged
    '''

    if not self.script:
        Logger.error('No script found, need to load a project first')
        raise ValueError('Script is not loaded')

    if cuelist is None:
        cuelist = self.script.cuelist
    Logger.info(f'Processing {type(cuelist).__name__}: {cuelist.id}')
    if not hasattr(cuelist, 'contents') or not cuelist.contents or len(cuelist.contents) == 0:
        Logger.warning('Cuelist contents is empty, nothing to process')
        return

    cuelist.localize_cue(self.cm.node_uuid)
    CUE_HANDLER.arm(cuelist, True)

    for index, item in enumerate(cuelist.contents):
        if item is None:
            Logger.warning(f'Skipping None item at index {index} in cuelist {cuelist.id}')
            continue

        try:
            if isinstance(item, CueList):
                self.initial_cuelist_process(item)

            item.localize_cue(self.cm.node_uuid)

            if item.target is None or item.target == "":
                if (index + 1) == len(cuelist.contents):
                    '''
                    If the item is the last in the cuelist we leave the
                    target fields as None
                    '''
                    item.target = None
                    item._target_object = None
                else:
                    next_item = cuelist.contents[index + 1]
                    if next_item is not None:
                        item.target = next_item.id
                        item._target_object = next_item
                    else:
                        item.target = None
                        item._target_object = None
            else:
                item._target_object = self.script.find(item.target)
                if item._target_object is None:
                    Logger.warning(f'{type(item).__name__} {item.id} has target {item.target} that could not be found in the script (deleted?)')

            Logger.debug(f'Target object for {type(item)} {item.id} is {item._target_object}')
            if isinstance(item, ActionCue):
                item._action_target_object = self.script.find(item.action_target)
                if item._action_target_object is None and item.action_target:
                    Logger.warning(f'ActionCue {item.id} has action_target {item.action_target} that could not be found in the script (deleted?)')

        except Exception as e:
            Logger.error(f'Error processing item at index {index} in cuelist {cuelist.id}: {e}')
            continue

    # Arm first cue + duration-aware lookahead. The sliding window
    # (_arm_ahead in go/go_threaded) arms subsequent cues during
    # playback. For post_go='go' chains, arm() recursively arms the
    # entire chain. For go_at_end chains, only 2 cues with meaningful
    # duration are armed, saving resources for large projects.
    if cuelist.contents:
        first_cue = None
        for c in cuelist.contents:
            if c.enabled:
                first_cue = c
                break
        # If the cuelist's first cue isn't ours, walk the post_go='go' chain
        # to find our first local cue — same shape as NodeEngine.go_script.
        # Without this, slaves don't pre-arm anything at load time and the
        # /videocomposer/layer/load only fires when GO is hit, producing
        # staggered starts as the async loads complete in arrival order.
        first_local = first_cue
        walked = 0
        while first_local is not None and not getattr(first_local, '_local', False):
            if first_local.post_go != 'go':
                first_local = None
                break
            first_local = getattr(first_local, '_target_object', None)
            walked += 1
            if walked > 1024:
                first_local = None
                break
        if first_local is not None:
            if first_local is not first_cue:
                Logger.info(
                    f'Pre-arm: skipped {walked} non-local cue(s); arming first '
                    f'local cue {first_local.id} + lookahead')
            else:
                Logger.info(f'Arming first enabled cue + lookahead for {type(cuelist).__name__}: {cuelist.id}')
            CUE_HANDLER.arm(first_local, True)
            CUE_HANDLER._arm_ahead(first_local)

set_config_manager()

Set the ConfigManager

Source code in src/cuemsengine/core/BaseEngine.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def set_config_manager(self) -> None:
    """Set the ConfigManager"""
    from cuemsutils.xml import ProjectMappings
    try:
        self.cm = ConfigManager(load_all=True)
        self.node_host = f"http://{self.cm.node_conf['uuid'][-12:]}.local"
    except FileNotFoundError:
        Logger.error('Node config file could not be found. Exiting !!!!!')
        exit(-1)
    except Exception as e:
        Logger.error(f'Exception while loading config: {e}')
        exit(-1)
    Logger.info(f'Node conf: {self.cm.node_conf}')
    # Get node name from config as a check step
    try:
        self.node_name = str(self.cm.node_conf['uuid'])
    except KeyError:
        Logger.error('Node name not found in config. Exiting !!!!!')
        exit(-1)

    # Get tmp path from config as a check step
    try:
        self.tmp_path = str(self.cm.tmp_path)
    except KeyError:
        Logger.error('Tmp path not found in config. Exiting !!!!!')
        exit(-1)

    # Get controller IP from network map
    try:
        self.controller_ip = self.get_controller_ip()
        Logger.info(f'Controller IP: {self.controller_ip}')
    except Exception as e:
        Logger.error(f'{type(e)} while getting controller IP: {e}')
        exit(-1)

set_mtc_listener()

Set the MTC listener

Source code in src/cuemsengine/core/BaseEngine.py
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def set_mtc_listener(self) -> None:
    """Set the MTC listener"""
    mtc_step = partial(BaseEngine.mtc_callback, self)
    mtc_reset = partial(BaseEngine.mtc_callback, self, CTimecode('00:00:00:00'))

    if not self.mtc_port:
        self.mtc_port = self.cm.node_conf['mtc_port']

    if self.mtc_port is not None:
        self.mtc_listener = MtcListener(
            port=self.mtc_port,
            step_callback = mtc_step,
            reset_callback = mtc_reset
        )
    else:
        Logger.error('MTC port not set, cannot create MtcListener')
        self.stop()
        exit(-1)

set_status(property, value, strict=False)

Set the status of the engine

Parameters:

Name Type Description Default
property str

The property to set

required
value str

The value to set

required
strict bool

If True, raise an AttributeError if the property is not found

False
Source code in src/cuemsengine/core/BaseEngine.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def set_status(self, property: str, value: str, strict: bool = False) -> None:
    """Set the status of the engine

    Args:
        property (str): The property to set
        value (str): The value to set
        strict (bool): If True, raise an AttributeError if the property is not found
    """
    if f"_{property}" in self.status.__dict__.keys():
        Logger.debug(f'Setting property {property} to {value}')
        self.status.__setattr__(property, value)
    else:
        Logger.error(f'Property {property} not found in EngineStatus')
        if strict:
            raise AttributeError(f'Property {property} not found in EngineStatus')

status_callback(endpoint, value)

Callback for the status endpoint

Source code in src/cuemsengine/core/BaseEngine.py
127
128
129
130
131
def status_callback(self, endpoint: str, value: str) -> None:
    """Callback for the status endpoint"""
    Logger.debug(f'Status callback received: {endpoint} = {value}')
    parameter = str(endpoint).split('/')[-1]
    self.set_status(parameter, value)

EngineStatus

A class that represents the status of an engine.

Source code in src/cuemsengine/core/EngineStatus.py
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
class EngineStatus:
    """
    A class that represents the status of an engine.
    """
    def __init__(self):
        self.recieved = 0  # Initialize before test (test setter increments this)
        self.load = ""
        self.loadcue = ""
        self.go = ""
        self.gocue = ""
        self.pause = ""
        self.stop = ""
        self.resetall = ""
        self.preload = ""
        self.unload = ""
        self.hwdiscovery = ""
        self.deploy = ""
        self.test = ""
        self.timecode = 0
        self.nextcue = ""
        self.running = ""
        self.armed = ""

        del self.currentcue # start with empty array

    @property
    def load(self) -> str | None:
        return self._load

    @load.setter 
    def load(self, value: str | None) -> None:
        self._load = value

    @property
    def loadcue(self) -> str | None:
        return self._loadcue

    @loadcue.setter
    def loadcue(self, value: str | None) -> None:
        self._loadcue = value

    @property
    def go(self) -> str | None:
        return self._go

    @go.setter
    def go(self, value: str | None) -> None:
        self._go = value

    @property
    def gocue(self) -> str | None:
        return self._gocue

    @gocue.setter
    def gocue(self, value: str | None) -> None:
        self._gocue = value

    @property
    def pause(self) -> str | None:
        return self._pause

    @pause.setter
    def pause(self, value: str | None) -> None:
        self._pause = value

    @property
    def stop(self) -> str | None:
        return self._stop

    @stop.setter
    def stop(self, value: str | None) -> None:
        self._stop = value

    @property
    def resetall(self) -> str | None:
        return self._resetall

    @resetall.setter
    def resetall(self, value: str | None) -> None:
        self._resetall = value

    @property
    def preload(self) -> str | None:
        return self._preload

    @preload.setter
    def preload(self, value: str | None) -> None:
        self._preload = value

    @property
    def unload(self) -> str | None:
        return self._unload

    @unload.setter
    def unload(self, value: str | None) -> None:
        self._unload = value

    @property
    def hwdiscovery(self) -> str | None:
        return self._hwdiscovery

    @hwdiscovery.setter
    def hwdiscovery(self, value: str | None) -> None:
        self._hwdiscovery = value

    @property
    def deploy(self) -> str | None  :
        return self._deploy

    @deploy.setter
    def deploy(self, value: str | None) -> None:
        self._deploy = value

    @property
    def test(self) -> str | None:
        return self._test

    @test.setter
    def test(self, value: str | None) -> None:
        self._test = value
        if value is not None:
            self.recieved += 1

    @property
    def recieved(self) -> int:
        return self._recieved

    @recieved.setter
    def recieved(self, value: int) -> None:
        self._recieved = value

    @property
    def timecode(self) -> int | None:
        return self._timecode

    @timecode.setter
    def timecode(self, value: int | None) -> None:
        self._timecode = value

    @property
    def currentcue(self) -> list[list[str, str]]:
        return self._currentcue

    @currentcue.setter
    def currentcue(self, value: list[str, str] | tuple[str, str]) -> None:
        """Set a (cue, offset) pair to the current cue list

        Args:
            value: A list or tuple of two strings

        Raises:
            ValueError: If the value is not a list or tuple of two elements

        Note:
            Non-string values are converted to strings using str().
        """
        if not isinstance(value, (list, tuple)) or len(value) != 2:
            raise ValueError('Current cue must be a list or tuple of two strings')
        id, offset = str(value[0]), str(value[1])
        for item in self._currentcue:
            if item[0] == id:
                item[1] = offset
                return
        self._currentcue.append([id, offset])

    @currentcue.deleter
    def currentcue(self) -> None:
        """Clear all current cue entries."""
        self._currentcue = []

    def remove_currentcue(self, cue_id: str) -> None:
        """Remove a specific cue entry by its ID.

        Args:
            cue_id: The ID of the cue to remove
        """
        id = str(cue_id)
        for i, item in enumerate(self._currentcue):
            if item[0] == id:
                self._currentcue.pop(i)
                return

    @property
    def nextcue(self) -> str | None:
        return self._nextcue

    @nextcue.setter
    def nextcue(self, value: str | None) -> None:
        self._nextcue = value

    @property
    def running(self) -> int | None:
        return self._running

    @running.setter
    def running(self, value: int | None) -> None:
        self._running = value

    @property
    def armed(self) -> str | None:
        return self._armed

    @armed.setter
    def armed(self, value: str | None) -> None:
        self._armed = value

remove_currentcue(cue_id)

Remove a specific cue entry by its ID.

Parameters:

Name Type Description Default
cue_id str

The ID of the cue to remove

required
Source code in src/cuemsengine/core/EngineStatus.py
175
176
177
178
179
180
181
182
183
184
185
def remove_currentcue(self, cue_id: str) -> None:
    """Remove a specific cue entry by its ID.

    Args:
        cue_id: The ID of the cue to remove
    """
    id = str(cue_id)
    for i, item in enumerate(self._currentcue):
        if item[0] == id:
            self._currentcue.pop(i)
            return