Skip to content

Cue Architecture

ActionHandler (action_handler.py)

Owns all ActionCue processing — validation, dispatch, hooks, and result delivery.

  • Hook phases: before_dispatch, after_dispatch, wrap_dispatch
  • Registration layers: cue_layer (from CueHandler), node_layer (from NodeEngine)
  • Result sink: injectable callable; defaults to NNG NodeOperation.STATUS via NodeCommunications.send_operation

See action-handler-extensibility contract for integration details.

Dedicated action-cue execution, extension hooks, and optional result sink.

ActionHandler

Owns ActionCue validation, default handlers, hooks, and result delivery.

Source code in src/cuemsengine/cues/ActionHandler.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
class ActionHandler:
    """Owns ActionCue validation, default handlers, hooks, and result delivery."""

    def __init__(self) -> None:
        self._cue_handler: Any = None
        self._lock = threading.Lock()
        self._hooks: dict[
            tuple[str, str, frozenset[str]], Callable[[ActionHookContext], Any]
        ] = {}
        self._result_sink: Callable[[dict], None] | None = None
        self._emit_enabled: bool = True

    # ---- binding ----

    def bind_cue_handler(self, cue_handler: Any) -> None:
        """Bind the singleton cue orchestrator (arm, go, armed lookups)."""
        self._cue_handler = cue_handler

    def set_result_sink(self, sink: Callable[[dict], None] | None) -> None:
        """Replace result delivery; None restores default (NNG via comms thread)."""
        with self._lock:
            self._result_sink = sink

    def set_emit_enabled(self, enabled: bool) -> None:
        """When False, suppress outcome emission (useful in tests)."""
        with self._lock:
            self._emit_enabled = enabled

    def clear_action_extensions(self) -> None:
        """Remove all hooks and custom sink (for isolated tests)."""
        with self._lock:
            self._hooks.clear()
            self._result_sink = None
            self._emit_enabled = True

    # ---- registration ----

    def register_action_hook(
        self,
        phase: HookPhase,
        fn: Callable[[ActionHookContext], Any],
        *,
        source: RegistrationLayer = "cue_layer",
        action_types: frozenset[str] | None = None,
    ) -> None:
        """Register a hook; last registration wins for the same (phase, source, filter)."""
        filter_key = action_types if action_types is not None else _ALL_ACTIONS
        key = (phase, source, filter_key)
        with self._lock:
            self._hooks[key] = fn

    def unregister_action_hook(
        self,
        phase: HookPhase,
        *,
        source: RegistrationLayer,
        action_types: frozenset[str] | None = None,
    ) -> None:
        filter_key = action_types if action_types is not None else _ALL_ACTIONS
        key = (phase, source, filter_key)
        with self._lock:
            self._hooks.pop(key, None)

    def finalize_node_layer_bindings(self) -> None:
        """Call from NodeEngine after comms are ready (extension point; default no-op)."""
        return

    # ---- hook resolution ----

    def _matching_hooks(
        self, phase: HookPhase, action_type: str
    ) -> list[tuple[str, Callable[[ActionHookContext], Any]]]:
        """Return (layer, fn) pairs: cue_layer first, then node_layer."""
        with self._lock:
            items = list(self._hooks.items())
        cue_hooks: list[tuple[str, Callable[[ActionHookContext], Any]]] = []
        node_hooks: list[tuple[str, Callable[[ActionHookContext], Any]]] = []
        for (ph, layer, filter_key), fn in items:
            if ph != phase or not _filter_matches(action_type, filter_key):
                continue
            if layer == "cue_layer":
                cue_hooks.append((layer, fn))
            else:
                node_hooks.append((layer, fn))
        return cue_hooks + node_hooks

    def _wrap_for_action(
        self, layer: RegistrationLayer, action_type: str
    ) -> Callable[..., Any] | None:
        with self._lock:
            best_specific: Callable[..., Any] | None = None
            best_all: Callable[..., Any] | None = None
            for (ph, src, filter_key), fn in self._hooks.items():
                if ph != "wrap_dispatch" or src != layer:
                    continue
                if not filter_key:
                    best_all = fn
                elif action_type in filter_key:
                    best_specific = fn
            return best_specific if best_specific is not None else best_all

    # ---- result delivery ----

    def _emit_outcome(self, outcome: dict) -> None:
        with self._lock:
            sink = self._result_sink
            emit = self._emit_enabled
        if not emit:
            return
        if sink is not None:
            try:
                sink(outcome)
            except Exception as exc:
                Logger.error(f"Custom action result sink raised: {exc}")
            return
        self._default_result_sink(outcome)

    def _default_result_sink(self, outcome: dict) -> None:
        ch = self._cue_handler
        if ch is None:
            return
        ct: NodeCommunications | None = getattr(ch, "communications_thread", None)
        if ct is None:
            return
        try:
            op = NodeOperation(
                type=OperationType.STATUS,
                action=ActionType.UPDATE,
                sender=ct.node_id,
                target="action_cue_outcome",
                data=dict(outcome),
            )
            ct.send_operation(op, timeout=0.1)
        except Exception as exc:
            Logger.debug(f"Default action outcome emit skipped: {exc}")

    # ---- main dispatch ----

    def execute_action(
        self,
        cue: ActionCue,
        mtc: MtcListener,
        frozen_mtc_ms: float | None = None,
    ) -> dict:
        action_type = cue.action_type
        target = cue._action_target_object

        if action_type not in SUPPORTED_CUE_ACTIONS:
            reason = f"Unsupported action_type: {action_type!r}"
            Logger.warning(reason)
            out = self._action_result("rejected", action_type, None, reason)
            self._emit_outcome(out)
            return out

        if target is None:
            reason = (
                f"Missing target for {action_type} "
                f"(action_target={cue.action_target!r})"
            )
            Logger.warning(reason)
            out = self._action_result("rejected", action_type, None, reason)
            self._emit_outcome(out)
            return out

        target_id = getattr(target, "id", None)
        ctx = ActionHookContext(
            cue=cue,
            target=target,
            mtc=mtc,
            action_type=action_type,
            target_id=target_id,
            outcome=None,
            cue_handler=self._cue_handler,
            frozen_mtc_ms=frozen_mtc_ms,
        )

        # before_dispatch hooks
        for _layer, hook_fn in self._matching_hooks("before_dispatch", action_type):
            try:
                hook_fn(ctx)
            except Exception as exc:
                reason = f"before_dispatch hook raised {type(exc).__name__}: {exc}"
                Logger.error(reason)
                out = self._action_result("failed", action_type, target_id, reason)
                self._emit_outcome(out)
                return out

        handler = _ACTION_HANDLERS.get(action_type)
        if handler is None:
            reason = f"No handler registered for {action_type}"
            Logger.error(reason)
            out = self._action_result("failed", action_type, target_id, reason)
            self._emit_outcome(out)
            return out

        ch = self._cue_handler

        def run_default() -> dict:
            return handler(ch, cue, target, mtc, frozen_mtc_ms)

        def apply_wraps() -> dict:
            inner: Callable[[], dict] = run_default
            for layer in ("node_layer", "cue_layer"):
                wfn = self._wrap_for_action(layer, action_type)
                if wfn is None:
                    continue
                prev = inner

                def make_wrapped(
                    w: Callable[..., Any] = wfn, p: Callable[[], dict] = prev
                ) -> Callable[[], dict]:
                    def _w() -> dict:
                        return w(ctx, p)

                    return _w

                inner = make_wrapped()
            return inner()

        dispatch_exc: bool
        try:
            has_wrap = any(
                self._wrap_for_action(layer, action_type) is not None
                for layer in ("cue_layer", "node_layer")
            )
            if has_wrap:
                result = apply_wraps()
            else:
                result = run_default()
            dispatch_exc = False
        except Exception as exc:
            dispatch_exc = True
            reason = (
                f"{action_type} on {target_id} raised " f"{type(exc).__name__}: {exc}"
            )
            Logger.error(reason)
            result = self._action_result("failed", action_type, target_id, reason)

        ctx.outcome = result

        # after_dispatch hooks (skipped if default handler raised)
        if not dispatch_exc:
            for _layer, hook_fn in self._matching_hooks("after_dispatch", action_type):
                try:
                    hook_fn(ctx)
                except Exception as exc:
                    reason = (
                        f"after_dispatch hook raised " f"{type(exc).__name__}: {exc}"
                    )
                    Logger.error(reason)
                    result = self._action_result(
                        "failed", action_type, target_id, reason
                    )
                    ctx.outcome = result
                    break
            Logger.info(
                f'Action {action_type} on {target_id}: {result["status"]}'
                + (f' ({result["reason"]})' if result.get("reason") else "")
            )

        self._emit_outcome(result)
        return result

    @staticmethod
    def _action_result(
        status: str,
        action_type: str,
        target_id: str | None,
        reason: str | None = None,
    ) -> dict:
        return {
            "status": status,
            "action_type": action_type,
            "target_id": target_id,
            "reason": reason,
        }

bind_cue_handler(cue_handler)

Bind the singleton cue orchestrator (arm, go, armed lookups).

Source code in src/cuemsengine/cues/ActionHandler.py
81
82
83
def bind_cue_handler(self, cue_handler: Any) -> None:
    """Bind the singleton cue orchestrator (arm, go, armed lookups)."""
    self._cue_handler = cue_handler

clear_action_extensions()

Remove all hooks and custom sink (for isolated tests).

Source code in src/cuemsengine/cues/ActionHandler.py
 95
 96
 97
 98
 99
100
def clear_action_extensions(self) -> None:
    """Remove all hooks and custom sink (for isolated tests)."""
    with self._lock:
        self._hooks.clear()
        self._result_sink = None
        self._emit_enabled = True

finalize_node_layer_bindings()

Call from NodeEngine after comms are ready (extension point; default no-op).

Source code in src/cuemsengine/cues/ActionHandler.py
130
131
132
def finalize_node_layer_bindings(self) -> None:
    """Call from NodeEngine after comms are ready (extension point; default no-op)."""
    return

register_action_hook(phase, fn, *, source='cue_layer', action_types=None)

Register a hook; last registration wins for the same (phase, source, filter).

Source code in src/cuemsengine/cues/ActionHandler.py
104
105
106
107
108
109
110
111
112
113
114
115
116
def register_action_hook(
    self,
    phase: HookPhase,
    fn: Callable[[ActionHookContext], Any],
    *,
    source: RegistrationLayer = "cue_layer",
    action_types: frozenset[str] | None = None,
) -> None:
    """Register a hook; last registration wins for the same (phase, source, filter)."""
    filter_key = action_types if action_types is not None else _ALL_ACTIONS
    key = (phase, source, filter_key)
    with self._lock:
        self._hooks[key] = fn

set_emit_enabled(enabled)

When False, suppress outcome emission (useful in tests).

Source code in src/cuemsengine/cues/ActionHandler.py
90
91
92
93
def set_emit_enabled(self, enabled: bool) -> None:
    """When False, suppress outcome emission (useful in tests)."""
    with self._lock:
        self._emit_enabled = enabled

set_result_sink(sink)

Replace result delivery; None restores default (NNG via comms thread).

Source code in src/cuemsengine/cues/ActionHandler.py
85
86
87
88
def set_result_sink(self, sink: Callable[[dict], None] | None) -> None:
    """Replace result delivery; None restores default (NNG via comms thread)."""
    with self._lock:
        self._result_sink = sink

ActionHookContext dataclass

Context passed to extension hooks (stable field names for integrators).

Source code in src/cuemsengine/cues/ActionHandler.py
53
54
55
56
57
58
59
60
61
62
63
64
@dataclass
class ActionHookContext:
    """Context passed to extension hooks (stable field names for integrators)."""

    cue: ActionCue
    target: Cue | None
    mtc: MtcListener
    action_type: str
    target_id: str | None
    outcome: dict | None = None
    cue_handler: Any = None
    frozen_mtc_ms: float | None = None

CueHandler

Singleton class responsible for handling Cue objects.

Holds a list of armed cues and manages video players. Thread-safe: internal state mutations are guarded by a Lock.

Source code in src/cuemsengine/cues/CueHandler.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
class CueHandler:
    """
    Singleton class responsible for handling Cue objects.

    Holds a list of armed cues and manages video players.
    Thread-safe: internal state mutations are guarded by a Lock.
    """

    _instance: "CueHandler | None" = None

    # Instance attributes (declared for IDE/type checker support)
    _armed_cues: list[Cue]
    _armed_cues_set: set[str]
    _video_players: dict
    _front_video_player: VideoPlayer | None
    _lock: Lock
    communications_thread: NodeCommunications

    def __new__(cls, *args, **kwargs):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            # Initialize instance attributes
            cls._instance._armed_cues = []
            cls._instance._armed_cues_set = set()
            cls._instance._video_players = {}
            cls._instance._front_video_player = None
            cls._instance._lock = Lock()
        return cls._instance


    # ---------------------------
    # Communications To Controller
    # ---------------------------
    def set_nng_comms(self, hub_address: str, node_id: str):
        """Set the communications infrastructure"""
        from time import sleep

        Logger.info(f"Starting communications for Node {node_id}")
        Logger.info(f"NNG Hub address: {hub_address}")
        self.communications_thread = NodeCommunications(
            hub_address=hub_address,
            node_id=node_id
        )
        self.communications_thread.start()

        # Wait for NNG thread to initialize (prevents race condition in nni_random)
        max_wait = 5.0  # seconds
        wait_interval = 0.1
        waited = 0.0
        while waited < max_wait:
            if (self.communications_thread.is_alive() and 
                self.communications_thread.event_loop is not None):
                Logger.info(f"NNG communications thread ready after {waited:.1f}s")
                break
            sleep(wait_interval)
            waited += wait_interval
        else:
            Logger.warning(f"NNG communications thread not ready after {max_wait}s")

    # ---------------------------
    # Armed Cues List Methods
    # ---------------------------

    def add_armed_cue(self, cue: Cue) -> None:
        """Adds an armed cue to the list."""
        with self._lock:
            self._armed_cues.append(cue)
            self._armed_cues_set.add(cue.id)

    def get_armed_cues(self) -> list[Cue]:
        """Returns the list of armed cues."""
        with self._lock:
            return self._armed_cues

    def get_armed_cue(self, cue: Cue) -> Cue | None:
        """Returns the armed cue with the given uuid."""
        try:
            return self.get_armed_cues().index(cue)
        except ValueError:
            return None

    def find_armed_cue(self, cue: Cue) -> Cue | None:
        """Finds an armed cue with the given uuid."""
        with self._lock:
            return cue.id in self._armed_cues_set

    def remove_armed_cue(self, cue: Cue) -> bool:
        """Removes an armed cue from the list."""
        with self._lock:
            if cue.id in self._armed_cues_set:
                self._armed_cues.remove(cue)
                self._armed_cues_set.remove(cue.id)
                return True
        return False

    def reset_armed_cues(self) -> None:
        """Resets the list of armed cues."""
        with self._lock:
            self._armed_cues = []
            self._armed_cues_set.clear()


    # ---------------------------
    # Cue Management
    # ---------------------------

    # Minimum effective duration (ms) for a cue to "count" as providing
    # enough time to arm subsequent cues during its playback.
    # Configurable per deployment. Default 1000ms covers 4K video decode.
    _ARM_WINDOW_THRESHOLD_MS = 1000

    # Maximum cues to walk ahead. Prevents runaway on pathological chains.
    _MAX_LOOKAHEAD_DEPTH = 15

    @staticmethod
    def _effective_duration_ms(cue: Cue) -> float:
        """Effective time a cue occupies: prewait + body + postwait.

        prewait/postwait are always CTimecode (format_timecode returns
        CTimecode() for None/empty). CTimecode(0) is truthy but
        .milliseconds_exact returns 0.0.
        """
        pre = cue.prewait.milliseconds_exact
        post = cue.postwait.milliseconds_exact

        if isinstance(cue, CueList):
            body = 0  # container — duration is its contents
        elif isinstance(cue, (AudioCue, VideoCue)):
            try:
                body = CTimecode(cue.media.duration).milliseconds_exact if cue.media else 0
            except Exception:
                body = 0
        elif isinstance(cue, DmxCue):
            # fadein_time/fadeout_time stored as float seconds.
            # fadeout_time exists in model but not yet implemented (always 0.0).
            fadein = getattr(cue, 'fadein_time', 0) or 0
            fadeout = getattr(cue, 'fadeout_time', 0) or 0
            body = (fadein + fadeout) * 1000  # convert seconds → ms
        elif isinstance(cue, ActionCue):
            # play/stop/enable/disable/go_to = instant
            # TODO: use fade duration once fade_in/fade_out implemented
            body = 0
        else:
            body = 0

        return pre + body + post

    def _arm_ahead(self, start_cue: Cue) -> None:
        """Arm ahead in the target chain until 2 cues with meaningful
        duration are armed. Short/zero-duration cues are armed but don't
        count. CueList targets are skipped (handled by initial_cuelist_process).
        """
        target = getattr(start_cue, '_target_object', None)
        counted = 0
        walked = 0

        while (isinstance(target, Cue)
               and counted < 2
               and walked < self._MAX_LOOKAHEAD_DEPTH):
            if isinstance(target, CueList):
                # CueLists are containers — skip, don't count
                target = getattr(target, '_target_object', None)
                walked += 1
                continue
            if not target.enabled:
                target = getattr(target, '_target_object', None)
                walked += 1
                continue
            if not getattr(target, 'loaded', False):
                self.arm(target, init=True)
            if self._effective_duration_ms(target) >= self._ARM_WINDOW_THRESHOLD_MS:
                counted += 1
            target = getattr(target, '_target_object', None)
            walked += 1

        if walked >= self._MAX_LOOKAHEAD_DEPTH and counted < 2:
            Logger.warning(
                f'_arm_ahead hit depth limit ({self._MAX_LOOKAHEAD_DEPTH}) '
                f'from cue {start_cue.id} with only {counted}/2 real-duration '
                f'cues found. Remaining cues will rely on safety-net re-arm.')

    def arm(self, cue: Cue, init=False) -> bool:
        """Arms a cue by appending it to the armed_cues list."""
        if cue is None:
            return False

        needs_disarm = False
        do_arm = False
        pending_event = None

        with self._lock:
            found = cue.id in self._armed_cues_set  # O(1) set lookup
            if hasattr(cue, 'loaded') and cue.loaded:
                if not cue.enabled:
                    needs_disarm = True
            elif isinstance(getattr(cue, '_loading', None), Event):
                if init:
                    # Another thread is arming — wait for it outside the lock
                    pending_event = cue._loading
                else:
                    # Non-init callers just register; no need to wait
                    return False
            elif not init:
                if not found:
                    self._armed_cues.append(cue)
                    self._armed_cues_set.add(cue.id)
            elif cue._local and cue.enabled:
                # Mark as loading inside the lock to block concurrent arm
                # attempts. Cleared in finally below (outside lock —
                # intentional: avoids holding lock during arm_cue(). The
                # Event is set atomically here, so no other thread can
                # enter this branch for the same cue until _loading is
                # cleared. Waiting threads block on the Event.)
                cue._loading = Event()
                do_arm = True

        # Another thread is arming this cue — wait for it to finish
        if pending_event is not None:
            Logger.debug(f'Waiting for in-progress arm of {type(cue).__name__} {cue.id}')
            armed = pending_event.wait(timeout=5.0)
            if not armed:
                Logger.warning(f'Timed out waiting for arm of {cue.id}')
            return getattr(cue, 'loaded', False)

        # Disarm disabled-but-loaded cues outside lock (disarm acquires lock)
        if needs_disarm:
            self.disarm(cue)
            return False

        if not do_arm:
            return not needs_disarm

        try:
            Logger.info(f"Arming {type(cue).__name__} {cue.id}")
            arm_cue(cue)
            with self._lock:
                cue.loaded = True
                if not found:
                    self._armed_cues.append(cue)
                    self._armed_cues_set.add(cue.id)
            if isinstance(cue, AudioCue):
                try:
                    self.communications_thread.add_player(
                        f'audioplayer_{cue.id}', None, timeout=0.1)
                except Exception:
                    pass
        finally:
            loading_event = cue._loading
            cue._loading = None
            if isinstance(loading_event, Event):
                loading_event.set()

        # Recursive arms — only reached if cue was actually armed.
        # _loading sentinel prevents cycles; loaded guard prevents re-arm.
        if cue.post_go == 'go' and cue._target_object:
            if cue._target_object.enabled:
                self.arm(cue._target_object, init)

        # ActionCue(play) and FadeCue(fade_action) + target = 1 unit. Arm target
        # so it's ready when the action fires (ActionCue has zero duration; FadeCue
        # expects target_cue already armed before reading its OSC cache).
        if isinstance(cue, ActionCue) and cue._action_target_object:
            if cue.action_type in ('play', 'fade_action'):
                self.arm(cue._action_target_object, init)

        return True

    def disarm(self, cue: Cue) -> bool:
        """Disarms a cue by removing it from the armed_cues list."""
        if hasattr(cue, 'loaded') and cue.loaded:
            self.remove_armed_cue(cue)
            cue.loaded = False
            try:
                if isinstance(cue, AudioCue):
                    self.communications_thread.remove_player(f'audioplayer_{cue.id}', timeout=0.1)
                self.communications_thread.remove_cue(cue.id, timeout=0.1)
            except Exception:
                pass

            if isinstance(cue, VideoCue):
                layer_ids = getattr(cue, '_layer_ids', [])
                client = getattr(cue, '_osc', None)
                if client and layer_ids:
                    for layer_id in layer_ids:
                        try:
                            client.set_value(f'/videocomposer/layer/{layer_id}/visible', 0)
                            client.set_value('/videocomposer/layer/unload', layer_id)
                            client.remove_layer_endpoints(layer_id)
                            PLAYER_HANDLER.deregister_layer(layer_id)
                        except Exception as e:
                            Logger.debug(f'Error disarming video layer {layer_id}: {e}')
                cue._layer_ids = []

            PLAYER_HANDLER.remove_cue_player(cue)
            return True

        return False

    def stop_all_cues(self) -> None:
        """Signal all armed cues to stop their playback loops.

        Also bumps each cue's generation counter so that any still-running
        go_threaded threads will see a mismatch and skip post-loop cleanup
        (disarm), which would otherwise undo the re-arm that follows.
        """
        with self._lock:
            for cue in self._armed_cues:
                cue._stop_requested = True
                cue._go_generation = getattr(cue, '_go_generation', 0) + 1

    def disarm_all(self) -> None:
        """Disarms all cues."""
        self.stop_all_cues()
        with self._lock:
            cues_snapshot = list(self._armed_cues)
        for cue in cues_snapshot:
            self.disarm(cue)
        self.reset_armed_cues()

    def get_next_cue(self, cue: Cue) -> Cue | None:
        """Returns the next cue to be played."""
        return cue._target_object if cue._target_object else None

    # ---------------------------
    # Cue Execution
    # ---------------------------

    @logged
    def go(self, cue: Cue, mtc: MtcListener, frozen_mtc_ms: float = None) -> Thread | None:
        """Starts a cue in a thread.

        Args:
            cue: The cue to start
            mtc: The MTC listener
            frozen_mtc_ms: Optional frozen MTC timestamp for sync with chained cues

        Returns:
            Thread running the cue, or None if the cue is disabled or not
            local to this node (the node owning the target will run it via
            its own GO/post_go dispatch).
        """
        if not cue.enabled:
            Logger.info(f'Cue {cue.id} is disabled, skipping execution')
            return None
        if not getattr(cue, '_local', True):
            # Non-local target: handled by the node where it IS local via that
            # node's own go_threaded → post_go chain. Trying to arm/run it
            # locally would fail at re-arm (no local player), raise inside the
            # caller's thread, and kill chained playback (master videocomposer
            # froze after loop 1 when post_go='go' targeted an audio cue local
            # to slave only).
            Logger.info(f'Cue {cue.id} is not local to this node, skipping execution')
            return None
        Logger.info(f'GO command received. Starting cue {cue.id}')
        if not hasattr(cue, 'loaded') or not cue.loaded:
            Logger.warning(f'Cue {cue.id} not loaded at go() time — this should not happen, '
                           f'pre-arm may have failed. Re-arming as fallback.')
            self.arm(cue, init=True)
            if not hasattr(cue, 'loaded') or not cue.loaded:
                raise Exception(f'{cue.__class__.__name__} {cue.id} not loaded to go (re-arm failed)')

        cue._stop_requested = False
        go_gen = getattr(cue, '_go_generation', 0) + 1
        cue._go_generation = go_gen

        thread = Thread(
            name=f'GO:{cue.__class__.__name__}:{cue.id}',
            target=self.go_threaded,
            args=[cue, mtc, frozen_mtc_ms, go_gen],
            daemon=True
        )
        thread.start()

        # Duration-aware lookahead: arm ahead until 2 cues with
        # meaningful playback duration are ready.
        self._arm_ahead(cue)
        return thread

    def go_threaded(self, cue: Cue, mtc: MtcListener, frozen_mtc_ms: float = None, go_gen: int = 0):
        """Runs a cue based on its properties.

        Args:
            cue: The cue to run
            mtc: The MTC listener (for live MTC)
            frozen_mtc_ms: Optional frozen MTC timestamp in milliseconds.
            go_gen: Generation counter captured at go() time. If the cue's
                    generation has changed by the time the loop ends, another
                    go/stop cycle occurred and this thread must not touch the cue.
        """
        if cue.prewait > 0:
            # Notify controller before pre-wait so UI shows "playing" immediately
            if cue._local and not cue._stop_requested:
                try:
                    offset = frozen_mtc_ms if frozen_mtc_ms is not None else 0
                    self.communications_thread.add_cue(cue.id, str(offset), timeout=0.1)
                except Exception:
                    pass
            sleep(cue.prewait.milliseconds_rounded / 1000)
            # Bail out if stop arrived during pre-wait
            if cue._stop_requested:
                return

        if frozen_mtc_ms is None:
            # GO-time MTC capture is used by BaseEngine.timecode = mtc - go_offset
            # for drift measurement; _exact preserves sub-ms precision at NTSC.
            frozen_mtc_ms = mtc.main_tc.milliseconds_exact
            Logger.debug(f'Captured MTC snapshot for cue {cue.id}: {frozen_mtc_ms}ms')

        if cue._local:
            try:
                self.communications_thread.add_cue(cue.id, str(frozen_mtc_ms), timeout=0.1)
            except Exception:
                pass

            run_cue(cue, mtc, frozen_mtc_ms)

        if cue.postwait > 0:
            sleep(cue.postwait.milliseconds_rounded / 1000)

        if cue.post_go == 'go' and cue._target_object and not cue._stop_requested:
            Logger.info(f'Running post go for next cue:{cue.target}')
            post_go_thread = self.go(cue._target_object, mtc, frozen_mtc_ms)

        # Pre-arm go_at_end targets during playback. Runs after
        # run_cue() so current cue is already playing. The arm happens
        # in parallel with the media. go() also calls _arm_ahead but
        # that fires before run_cue — this call catches cues that were
        # disarmed between go() and here (loop passes).
        if cue.post_go == 'go_at_end':
            self._arm_ahead(cue)

        Logger.info(f'Going to loop for {cue.__class__.__name__}:{cue.id}')
        loop_cue(cue, mtc)

        if getattr(cue, '_go_generation', 0) != go_gen:
            Logger.info(f'Cue {cue.id} generation changed ({go_gen}{cue._go_generation}), skipping cleanup')
            return

        # Notify the controller that the cue finished playing (status → 100).
        # Done here (after loop_cue) so the status only changes to 100 when the
        # cue has actually completed its full duration, not just when playback started.
        # Skipped if the cue was stopped (controller's stop_script already resets to 0).
        if cue._local and not getattr(cue, '_stop_requested', False):
            try:
                self.communications_thread.remove_cue(cue.id, timeout=0.1)
            except Exception:
                pass

        go_at_end_thread = None
        if cue.post_go == 'go_at_end' and cue._target_object and not cue._stop_requested:
            Logger.info(f'Running go at end for {cue.__class__.__name__}:{cue.id}')
            go_at_end_thread = self.go(cue._target_object, mtc)

        self.disarm(cue)

        if cue.post_go == 'go_at_end' and go_at_end_thread:
            self.wait_for_cue(go_at_end_thread)

        if cue.post_go == 'go' and cue._target_object and not cue._stop_requested:
            if post_go_thread:
                self.wait_for_cue(post_go_thread)

    def wait_for_cue(self, thread: Thread) -> None:
        """Waits for a cue to finish."""
        Logger.info(f'Waiting for {thread.name} to finish')
        while thread.is_alive():
            sleep(1)
        thread.join()
        Logger.info(f"{thread.name} finished")

    # ---------------------------
    # ---------------------------
    # Action Cue Execution (delegates to ActionHandler)
    # ---------------------------

    def execute_action(
        self,
        cue: ActionCue,
        mtc: MtcListener,
        frozen_mtc_ms: float | None = None,
    ) -> dict:
        """Execute an ActionCue against the running show (see ActionHandler)."""
        from .ActionHandler import ACTION_HANDLER

        return ACTION_HANDLER.execute_action(cue, mtc, frozen_mtc_ms)

    def register_action_hook(
        self,
        phase: str,
        fn,
        *,
        action_types: frozenset | None = None,
    ) -> None:
        """Register a cue-layer extension hook; forwards to ``ACTION_HANDLER``."""
        from .ActionHandler import ACTION_HANDLER

        ACTION_HANDLER.register_action_hook(
            phase, fn, source="cue_layer", action_types=action_types
        )

    # ---------------------------
    # OSCQuery Message Routing
    # ---------------------------

    def route_audio_message(self, path_parts: list[str], value) -> None:
        """Route audio OSCQuery message to the appropriate handler.

        Args:
            path_parts: Path parts after 'audio' (e.g., ['mixer', '0', 'master', 'volume']
                        or ['cue', '<uuid>', '0', 'volume'])
            value: The OSC value to set
        """
        if not path_parts:
            Logger.warning("Empty audio path parts")
            return

        if path_parts[0] == 'mixer':
            # Route to audio mixer: ['mixer', '<output_index>', '<channel>', 'volume']
            # → /audiomixer/0_mixer/<channel>
            if len(path_parts) >= 3:
                output_index = path_parts[1]
                channel = path_parts[2]
                mixer_cmd = f'/audiomixer/{output_index}_mixer/{channel}'
                mixer_client = PLAYER_HANDLER.get_audio_mixer_client()
                if mixer_client:
                    Logger.debug(f"Routing audio mixer: {mixer_cmd} = {value}")
                    mixer_client.set_value(mixer_cmd, float(value))
                else:
                    Logger.warning("Audio mixer client not available")
            else:
                Logger.warning(f"Invalid mixer path: {path_parts}")

        elif path_parts[0] == 'cue':
            # Route to cue player: ['cue', '<uuid>', '<channel>', 'volume']
            # → /vol<channel> on the armed cue's OSC client
            if len(path_parts) >= 3:
                cue_uuid = path_parts[1]
                channel = path_parts[2]
                audio_cmd = f'/vol{channel}'
                cue = self.get_armed_cue_by_id(cue_uuid)
                if cue and hasattr(cue, '_osc') and cue._osc:
                    # UI already sends 0.0-1.0 via sliderToFloat(); just clamp
                    vol_value = max(0.0, min(1.0, float(value)))
                    Logger.debug(f"Routing audio cue {cue_uuid}: {audio_cmd} = {vol_value}")
                    cue._osc.set_value(audio_cmd, vol_value)
                else:
                    Logger.warning(f"Cue {cue_uuid} not found or has no OSC client")
            else:
                Logger.warning(f"Invalid cue audio path: {path_parts}")
        else:
            Logger.warning(f"Unknown audio path type: {path_parts[0]}")

    def route_dmx_message(self, path_parts: list[str], value) -> None:
        """Route DMX OSCQuery message to the DMX player.

        Args:
            path_parts: Path parts after 'dmx' (e.g., ['mixer', '0', 'channel', '1'])
            value: The OSC value to set
        """
        if not path_parts:
            Logger.warning("Empty DMX path parts")
            return

        # Build DMX command from path: find 'mixer' and use everything after it
        if 'mixer' in path_parts:
            mixer_index = path_parts.index('mixer') + 1  # +1 to skip 'mixer' keyword
            dmx_cmd = '/' + '/'.join(path_parts[mixer_index:])
            dmx_client = PLAYER_HANDLER.get_dmx_player_client()
            if dmx_client:
                Logger.debug(f"Routing DMX: {dmx_cmd} = {value}")
                dmx_client.set_value(dmx_cmd, value)
            else:
                Logger.warning("DMX player client not available")
        else:
            Logger.warning(f"Invalid DMX path (no 'mixer' keyword): {path_parts}")

    def get_armed_cue_by_id(self, cue_id: str) -> Cue | None:
        """Returns the armed cue with the given uuid string."""
        with self._lock:
            for cue in self._armed_cues:
                if cue.id == cue_id:
                    return cue
        return None

add_armed_cue(cue)

Adds an armed cue to the list.

Source code in src/cuemsengine/cues/CueHandler.py
90
91
92
93
94
def add_armed_cue(self, cue: Cue) -> None:
    """Adds an armed cue to the list."""
    with self._lock:
        self._armed_cues.append(cue)
        self._armed_cues_set.add(cue.id)

arm(cue, init=False)

Arms a cue by appending it to the armed_cues list.

Source code in src/cuemsengine/cues/CueHandler.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
def arm(self, cue: Cue, init=False) -> bool:
    """Arms a cue by appending it to the armed_cues list."""
    if cue is None:
        return False

    needs_disarm = False
    do_arm = False
    pending_event = None

    with self._lock:
        found = cue.id in self._armed_cues_set  # O(1) set lookup
        if hasattr(cue, 'loaded') and cue.loaded:
            if not cue.enabled:
                needs_disarm = True
        elif isinstance(getattr(cue, '_loading', None), Event):
            if init:
                # Another thread is arming — wait for it outside the lock
                pending_event = cue._loading
            else:
                # Non-init callers just register; no need to wait
                return False
        elif not init:
            if not found:
                self._armed_cues.append(cue)
                self._armed_cues_set.add(cue.id)
        elif cue._local and cue.enabled:
            # Mark as loading inside the lock to block concurrent arm
            # attempts. Cleared in finally below (outside lock —
            # intentional: avoids holding lock during arm_cue(). The
            # Event is set atomically here, so no other thread can
            # enter this branch for the same cue until _loading is
            # cleared. Waiting threads block on the Event.)
            cue._loading = Event()
            do_arm = True

    # Another thread is arming this cue — wait for it to finish
    if pending_event is not None:
        Logger.debug(f'Waiting for in-progress arm of {type(cue).__name__} {cue.id}')
        armed = pending_event.wait(timeout=5.0)
        if not armed:
            Logger.warning(f'Timed out waiting for arm of {cue.id}')
        return getattr(cue, 'loaded', False)

    # Disarm disabled-but-loaded cues outside lock (disarm acquires lock)
    if needs_disarm:
        self.disarm(cue)
        return False

    if not do_arm:
        return not needs_disarm

    try:
        Logger.info(f"Arming {type(cue).__name__} {cue.id}")
        arm_cue(cue)
        with self._lock:
            cue.loaded = True
            if not found:
                self._armed_cues.append(cue)
                self._armed_cues_set.add(cue.id)
        if isinstance(cue, AudioCue):
            try:
                self.communications_thread.add_player(
                    f'audioplayer_{cue.id}', None, timeout=0.1)
            except Exception:
                pass
    finally:
        loading_event = cue._loading
        cue._loading = None
        if isinstance(loading_event, Event):
            loading_event.set()

    # Recursive arms — only reached if cue was actually armed.
    # _loading sentinel prevents cycles; loaded guard prevents re-arm.
    if cue.post_go == 'go' and cue._target_object:
        if cue._target_object.enabled:
            self.arm(cue._target_object, init)

    # ActionCue(play) and FadeCue(fade_action) + target = 1 unit. Arm target
    # so it's ready when the action fires (ActionCue has zero duration; FadeCue
    # expects target_cue already armed before reading its OSC cache).
    if isinstance(cue, ActionCue) and cue._action_target_object:
        if cue.action_type in ('play', 'fade_action'):
            self.arm(cue._action_target_object, init)

    return True

disarm(cue)

Disarms a cue by removing it from the armed_cues list.

Source code in src/cuemsengine/cues/CueHandler.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
def disarm(self, cue: Cue) -> bool:
    """Disarms a cue by removing it from the armed_cues list."""
    if hasattr(cue, 'loaded') and cue.loaded:
        self.remove_armed_cue(cue)
        cue.loaded = False
        try:
            if isinstance(cue, AudioCue):
                self.communications_thread.remove_player(f'audioplayer_{cue.id}', timeout=0.1)
            self.communications_thread.remove_cue(cue.id, timeout=0.1)
        except Exception:
            pass

        if isinstance(cue, VideoCue):
            layer_ids = getattr(cue, '_layer_ids', [])
            client = getattr(cue, '_osc', None)
            if client and layer_ids:
                for layer_id in layer_ids:
                    try:
                        client.set_value(f'/videocomposer/layer/{layer_id}/visible', 0)
                        client.set_value('/videocomposer/layer/unload', layer_id)
                        client.remove_layer_endpoints(layer_id)
                        PLAYER_HANDLER.deregister_layer(layer_id)
                    except Exception as e:
                        Logger.debug(f'Error disarming video layer {layer_id}: {e}')
            cue._layer_ids = []

        PLAYER_HANDLER.remove_cue_player(cue)
        return True

    return False

disarm_all()

Disarms all cues.

Source code in src/cuemsengine/cues/CueHandler.py
337
338
339
340
341
342
343
344
def disarm_all(self) -> None:
    """Disarms all cues."""
    self.stop_all_cues()
    with self._lock:
        cues_snapshot = list(self._armed_cues)
    for cue in cues_snapshot:
        self.disarm(cue)
    self.reset_armed_cues()

execute_action(cue, mtc, frozen_mtc_ms=None)

Execute an ActionCue against the running show (see ActionHandler).

Source code in src/cuemsengine/cues/CueHandler.py
502
503
504
505
506
507
508
509
510
511
def execute_action(
    self,
    cue: ActionCue,
    mtc: MtcListener,
    frozen_mtc_ms: float | None = None,
) -> dict:
    """Execute an ActionCue against the running show (see ActionHandler)."""
    from .ActionHandler import ACTION_HANDLER

    return ACTION_HANDLER.execute_action(cue, mtc, frozen_mtc_ms)

find_armed_cue(cue)

Finds an armed cue with the given uuid.

Source code in src/cuemsengine/cues/CueHandler.py
108
109
110
111
def find_armed_cue(self, cue: Cue) -> Cue | None:
    """Finds an armed cue with the given uuid."""
    with self._lock:
        return cue.id in self._armed_cues_set

get_armed_cue(cue)

Returns the armed cue with the given uuid.

Source code in src/cuemsengine/cues/CueHandler.py
101
102
103
104
105
106
def get_armed_cue(self, cue: Cue) -> Cue | None:
    """Returns the armed cue with the given uuid."""
    try:
        return self.get_armed_cues().index(cue)
    except ValueError:
        return None

get_armed_cue_by_id(cue_id)

Returns the armed cue with the given uuid string.

Source code in src/cuemsengine/cues/CueHandler.py
603
604
605
606
607
608
609
def get_armed_cue_by_id(self, cue_id: str) -> Cue | None:
    """Returns the armed cue with the given uuid string."""
    with self._lock:
        for cue in self._armed_cues:
            if cue.id == cue_id:
                return cue
    return None

get_armed_cues()

Returns the list of armed cues.

Source code in src/cuemsengine/cues/CueHandler.py
96
97
98
99
def get_armed_cues(self) -> list[Cue]:
    """Returns the list of armed cues."""
    with self._lock:
        return self._armed_cues

get_next_cue(cue)

Returns the next cue to be played.

Source code in src/cuemsengine/cues/CueHandler.py
346
347
348
def get_next_cue(self, cue: Cue) -> Cue | None:
    """Returns the next cue to be played."""
    return cue._target_object if cue._target_object else None

go(cue, mtc, frozen_mtc_ms=None)

Starts a cue in a thread.

Parameters:

Name Type Description Default
cue Cue

The cue to start

required
mtc MtcListener

The MTC listener

required
frozen_mtc_ms float

Optional frozen MTC timestamp for sync with chained cues

None

Returns:

Type Description
Thread | None

Thread running the cue, or None if the cue is disabled or not

Thread | None

local to this node (the node owning the target will run it via

Thread | None

its own GO/post_go dispatch).

Source code in src/cuemsengine/cues/CueHandler.py
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
@logged
def go(self, cue: Cue, mtc: MtcListener, frozen_mtc_ms: float = None) -> Thread | None:
    """Starts a cue in a thread.

    Args:
        cue: The cue to start
        mtc: The MTC listener
        frozen_mtc_ms: Optional frozen MTC timestamp for sync with chained cues

    Returns:
        Thread running the cue, or None if the cue is disabled or not
        local to this node (the node owning the target will run it via
        its own GO/post_go dispatch).
    """
    if not cue.enabled:
        Logger.info(f'Cue {cue.id} is disabled, skipping execution')
        return None
    if not getattr(cue, '_local', True):
        # Non-local target: handled by the node where it IS local via that
        # node's own go_threaded → post_go chain. Trying to arm/run it
        # locally would fail at re-arm (no local player), raise inside the
        # caller's thread, and kill chained playback (master videocomposer
        # froze after loop 1 when post_go='go' targeted an audio cue local
        # to slave only).
        Logger.info(f'Cue {cue.id} is not local to this node, skipping execution')
        return None
    Logger.info(f'GO command received. Starting cue {cue.id}')
    if not hasattr(cue, 'loaded') or not cue.loaded:
        Logger.warning(f'Cue {cue.id} not loaded at go() time — this should not happen, '
                       f'pre-arm may have failed. Re-arming as fallback.')
        self.arm(cue, init=True)
        if not hasattr(cue, 'loaded') or not cue.loaded:
            raise Exception(f'{cue.__class__.__name__} {cue.id} not loaded to go (re-arm failed)')

    cue._stop_requested = False
    go_gen = getattr(cue, '_go_generation', 0) + 1
    cue._go_generation = go_gen

    thread = Thread(
        name=f'GO:{cue.__class__.__name__}:{cue.id}',
        target=self.go_threaded,
        args=[cue, mtc, frozen_mtc_ms, go_gen],
        daemon=True
    )
    thread.start()

    # Duration-aware lookahead: arm ahead until 2 cues with
    # meaningful playback duration are ready.
    self._arm_ahead(cue)
    return thread

go_threaded(cue, mtc, frozen_mtc_ms=None, go_gen=0)

Runs a cue based on its properties.

Parameters:

Name Type Description Default
cue Cue

The cue to run

required
mtc MtcListener

The MTC listener (for live MTC)

required
frozen_mtc_ms float

Optional frozen MTC timestamp in milliseconds.

None
go_gen int

Generation counter captured at go() time. If the cue's generation has changed by the time the loop ends, another go/stop cycle occurred and this thread must not touch the cue.

0
Source code in src/cuemsengine/cues/CueHandler.py
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
def go_threaded(self, cue: Cue, mtc: MtcListener, frozen_mtc_ms: float = None, go_gen: int = 0):
    """Runs a cue based on its properties.

    Args:
        cue: The cue to run
        mtc: The MTC listener (for live MTC)
        frozen_mtc_ms: Optional frozen MTC timestamp in milliseconds.
        go_gen: Generation counter captured at go() time. If the cue's
                generation has changed by the time the loop ends, another
                go/stop cycle occurred and this thread must not touch the cue.
    """
    if cue.prewait > 0:
        # Notify controller before pre-wait so UI shows "playing" immediately
        if cue._local and not cue._stop_requested:
            try:
                offset = frozen_mtc_ms if frozen_mtc_ms is not None else 0
                self.communications_thread.add_cue(cue.id, str(offset), timeout=0.1)
            except Exception:
                pass
        sleep(cue.prewait.milliseconds_rounded / 1000)
        # Bail out if stop arrived during pre-wait
        if cue._stop_requested:
            return

    if frozen_mtc_ms is None:
        # GO-time MTC capture is used by BaseEngine.timecode = mtc - go_offset
        # for drift measurement; _exact preserves sub-ms precision at NTSC.
        frozen_mtc_ms = mtc.main_tc.milliseconds_exact
        Logger.debug(f'Captured MTC snapshot for cue {cue.id}: {frozen_mtc_ms}ms')

    if cue._local:
        try:
            self.communications_thread.add_cue(cue.id, str(frozen_mtc_ms), timeout=0.1)
        except Exception:
            pass

        run_cue(cue, mtc, frozen_mtc_ms)

    if cue.postwait > 0:
        sleep(cue.postwait.milliseconds_rounded / 1000)

    if cue.post_go == 'go' and cue._target_object and not cue._stop_requested:
        Logger.info(f'Running post go for next cue:{cue.target}')
        post_go_thread = self.go(cue._target_object, mtc, frozen_mtc_ms)

    # Pre-arm go_at_end targets during playback. Runs after
    # run_cue() so current cue is already playing. The arm happens
    # in parallel with the media. go() also calls _arm_ahead but
    # that fires before run_cue — this call catches cues that were
    # disarmed between go() and here (loop passes).
    if cue.post_go == 'go_at_end':
        self._arm_ahead(cue)

    Logger.info(f'Going to loop for {cue.__class__.__name__}:{cue.id}')
    loop_cue(cue, mtc)

    if getattr(cue, '_go_generation', 0) != go_gen:
        Logger.info(f'Cue {cue.id} generation changed ({go_gen}{cue._go_generation}), skipping cleanup')
        return

    # Notify the controller that the cue finished playing (status → 100).
    # Done here (after loop_cue) so the status only changes to 100 when the
    # cue has actually completed its full duration, not just when playback started.
    # Skipped if the cue was stopped (controller's stop_script already resets to 0).
    if cue._local and not getattr(cue, '_stop_requested', False):
        try:
            self.communications_thread.remove_cue(cue.id, timeout=0.1)
        except Exception:
            pass

    go_at_end_thread = None
    if cue.post_go == 'go_at_end' and cue._target_object and not cue._stop_requested:
        Logger.info(f'Running go at end for {cue.__class__.__name__}:{cue.id}')
        go_at_end_thread = self.go(cue._target_object, mtc)

    self.disarm(cue)

    if cue.post_go == 'go_at_end' and go_at_end_thread:
        self.wait_for_cue(go_at_end_thread)

    if cue.post_go == 'go' and cue._target_object and not cue._stop_requested:
        if post_go_thread:
            self.wait_for_cue(post_go_thread)

register_action_hook(phase, fn, *, action_types=None)

Register a cue-layer extension hook; forwards to ACTION_HANDLER.

Source code in src/cuemsengine/cues/CueHandler.py
513
514
515
516
517
518
519
520
521
522
523
524
525
def register_action_hook(
    self,
    phase: str,
    fn,
    *,
    action_types: frozenset | None = None,
) -> None:
    """Register a cue-layer extension hook; forwards to ``ACTION_HANDLER``."""
    from .ActionHandler import ACTION_HANDLER

    ACTION_HANDLER.register_action_hook(
        phase, fn, source="cue_layer", action_types=action_types
    )

remove_armed_cue(cue)

Removes an armed cue from the list.

Source code in src/cuemsengine/cues/CueHandler.py
113
114
115
116
117
118
119
120
def remove_armed_cue(self, cue: Cue) -> bool:
    """Removes an armed cue from the list."""
    with self._lock:
        if cue.id in self._armed_cues_set:
            self._armed_cues.remove(cue)
            self._armed_cues_set.remove(cue.id)
            return True
    return False

reset_armed_cues()

Resets the list of armed cues.

Source code in src/cuemsengine/cues/CueHandler.py
122
123
124
125
126
def reset_armed_cues(self) -> None:
    """Resets the list of armed cues."""
    with self._lock:
        self._armed_cues = []
        self._armed_cues_set.clear()

route_audio_message(path_parts, value)

Route audio OSCQuery message to the appropriate handler.

Parameters:

Name Type Description Default
path_parts list[str]

Path parts after 'audio' (e.g., ['mixer', '0', 'master', 'volume'] or ['cue', '', '0', 'volume'])

required
value

The OSC value to set

required
Source code in src/cuemsengine/cues/CueHandler.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
def route_audio_message(self, path_parts: list[str], value) -> None:
    """Route audio OSCQuery message to the appropriate handler.

    Args:
        path_parts: Path parts after 'audio' (e.g., ['mixer', '0', 'master', 'volume']
                    or ['cue', '<uuid>', '0', 'volume'])
        value: The OSC value to set
    """
    if not path_parts:
        Logger.warning("Empty audio path parts")
        return

    if path_parts[0] == 'mixer':
        # Route to audio mixer: ['mixer', '<output_index>', '<channel>', 'volume']
        # → /audiomixer/0_mixer/<channel>
        if len(path_parts) >= 3:
            output_index = path_parts[1]
            channel = path_parts[2]
            mixer_cmd = f'/audiomixer/{output_index}_mixer/{channel}'
            mixer_client = PLAYER_HANDLER.get_audio_mixer_client()
            if mixer_client:
                Logger.debug(f"Routing audio mixer: {mixer_cmd} = {value}")
                mixer_client.set_value(mixer_cmd, float(value))
            else:
                Logger.warning("Audio mixer client not available")
        else:
            Logger.warning(f"Invalid mixer path: {path_parts}")

    elif path_parts[0] == 'cue':
        # Route to cue player: ['cue', '<uuid>', '<channel>', 'volume']
        # → /vol<channel> on the armed cue's OSC client
        if len(path_parts) >= 3:
            cue_uuid = path_parts[1]
            channel = path_parts[2]
            audio_cmd = f'/vol{channel}'
            cue = self.get_armed_cue_by_id(cue_uuid)
            if cue and hasattr(cue, '_osc') and cue._osc:
                # UI already sends 0.0-1.0 via sliderToFloat(); just clamp
                vol_value = max(0.0, min(1.0, float(value)))
                Logger.debug(f"Routing audio cue {cue_uuid}: {audio_cmd} = {vol_value}")
                cue._osc.set_value(audio_cmd, vol_value)
            else:
                Logger.warning(f"Cue {cue_uuid} not found or has no OSC client")
        else:
            Logger.warning(f"Invalid cue audio path: {path_parts}")
    else:
        Logger.warning(f"Unknown audio path type: {path_parts[0]}")

route_dmx_message(path_parts, value)

Route DMX OSCQuery message to the DMX player.

Parameters:

Name Type Description Default
path_parts list[str]

Path parts after 'dmx' (e.g., ['mixer', '0', 'channel', '1'])

required
value

The OSC value to set

required
Source code in src/cuemsengine/cues/CueHandler.py
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
def route_dmx_message(self, path_parts: list[str], value) -> None:
    """Route DMX OSCQuery message to the DMX player.

    Args:
        path_parts: Path parts after 'dmx' (e.g., ['mixer', '0', 'channel', '1'])
        value: The OSC value to set
    """
    if not path_parts:
        Logger.warning("Empty DMX path parts")
        return

    # Build DMX command from path: find 'mixer' and use everything after it
    if 'mixer' in path_parts:
        mixer_index = path_parts.index('mixer') + 1  # +1 to skip 'mixer' keyword
        dmx_cmd = '/' + '/'.join(path_parts[mixer_index:])
        dmx_client = PLAYER_HANDLER.get_dmx_player_client()
        if dmx_client:
            Logger.debug(f"Routing DMX: {dmx_cmd} = {value}")
            dmx_client.set_value(dmx_cmd, value)
        else:
            Logger.warning("DMX player client not available")
    else:
        Logger.warning(f"Invalid DMX path (no 'mixer' keyword): {path_parts}")

set_nng_comms(hub_address, node_id)

Set the communications infrastructure

Source code in src/cuemsengine/cues/CueHandler.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def set_nng_comms(self, hub_address: str, node_id: str):
    """Set the communications infrastructure"""
    from time import sleep

    Logger.info(f"Starting communications for Node {node_id}")
    Logger.info(f"NNG Hub address: {hub_address}")
    self.communications_thread = NodeCommunications(
        hub_address=hub_address,
        node_id=node_id
    )
    self.communications_thread.start()

    # Wait for NNG thread to initialize (prevents race condition in nni_random)
    max_wait = 5.0  # seconds
    wait_interval = 0.1
    waited = 0.0
    while waited < max_wait:
        if (self.communications_thread.is_alive() and 
            self.communications_thread.event_loop is not None):
            Logger.info(f"NNG communications thread ready after {waited:.1f}s")
            break
        sleep(wait_interval)
        waited += wait_interval
    else:
        Logger.warning(f"NNG communications thread not ready after {max_wait}s")

stop_all_cues()

Signal all armed cues to stop their playback loops.

Also bumps each cue's generation counter so that any still-running go_threaded threads will see a mismatch and skip post-loop cleanup (disarm), which would otherwise undo the re-arm that follows.

Source code in src/cuemsengine/cues/CueHandler.py
325
326
327
328
329
330
331
332
333
334
335
def stop_all_cues(self) -> None:
    """Signal all armed cues to stop their playback loops.

    Also bumps each cue's generation counter so that any still-running
    go_threaded threads will see a mismatch and skip post-loop cleanup
    (disarm), which would otherwise undo the re-arm that follows.
    """
    with self._lock:
        for cue in self._armed_cues:
            cue._stop_requested = True
            cue._go_generation = getattr(cue, '_go_generation', 0) + 1

wait_for_cue(thread)

Waits for a cue to finish.

Source code in src/cuemsengine/cues/CueHandler.py
489
490
491
492
493
494
495
def wait_for_cue(self, thread: Thread) -> None:
    """Waits for a cue to finish."""
    Logger.info(f'Waiting for {thread.name} to finish')
    while thread.is_alive():
        sleep(1)
    thread.join()
    Logger.info(f"{thread.name} finished")

arm_cue(cue)

Type-specific logic when arming a cue

Source code in src/cuemsengine/cues/arm_cue.py
15
16
17
18
19
20
@singledispatch
def arm_cue(cue: Cue):
    """
    Type-specific logic when arming a cue
    """
    pass

arm_dmxCue(cue)

Arm a DMX cue by extracting DMX scene data.

The DMX scene data is already loaded in the cue object from the script XML. We extract the universe and channel data from cue.DmxScene and store it in a format suitable for sending as OSC bundles to the local DMX player.

Note: cue._local should be set by check_mappings() based on the output_name. For DMX cues, the output_name format is "{node_uuid}" (just the node UUID). A DMX cue can have multiple outputs (one per target node). check_mappings() should iterate through all outputs and set _local=True if ANY output_name matches the current node UUID. Other outputs are ignored. This function is only called for local cues (checked in CueHandler.arm()).

Source code in src/cuemsengine/cues/arm_cue.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@arm_cue.register
def arm_dmxCue(cue: DmxCue):
    """Arm a DMX cue by extracting DMX scene data.

    The DMX scene data is already loaded in the cue object from the script XML.
    We extract the universe and channel data from cue.DmxScene and store it
    in a format suitable for sending as OSC bundles to the local DMX player.

    Note: cue._local should be set by check_mappings() based on the output_name.
    For DMX cues, the output_name format is "{node_uuid}" (just the node UUID).
    A DMX cue can have multiple outputs (one per target node). check_mappings()
    should iterate through all outputs and set _local=True if ANY output_name
    matches the current node UUID. Other outputs are ignored.
    This function is only called for local cues (checked in CueHandler.arm()).
    """
    # Verify that _local is set (should be set by check_mappings() from output_name)
    is_local = getattr(cue, '_local', True)
    if not is_local:
        Logger.warning(
            f'DMX cue {cue.id} is not local but arm_dmxCue was called. '
            f'This should not happen - check_mappings() should set _local from output_name.',
            extra = {"caller": cue.__class__.__name__}
        )
        return

    # Get the local DMX player client
    dmx_client = PLAYER_HANDLER.get_dmx_player_client()

    if dmx_client is None:
        Logger.error(
            f'No local DMX player available for cue {cue.id}',
            extra = {"caller": cue.__class__.__name__}
        )
        return

    # Assign the local DMX player client to the cue
    cue._osc = dmx_client
    Logger.debug(
        f"DMX cue {cue.id} will use local DMX player (output_name inferred _local={is_local})",
        extra = {"caller": cue.__class__.__name__}
    )

    # Extract frame data from the DmxScene
    try:
        universe_frames = {}

        # Check if the cue has a DmxScene
        if cue.DmxScene is None:
            Logger.warning(
                f"DMX cue {cue.id} has no DmxScene data",
                extra = {"caller": cue.__class__.__name__}
            )
            cue._dmx_frames = {}
            return

        # Extract universe data from the DmxScene
        dmx_universe = cue.DmxScene.DmxUniverse
        if dmx_universe is not None:
            universe_num = dmx_universe.universe_num
            channels_data = {}

            # Extract channel data from dmx_channels list
            if dmx_universe.dmx_channels:
                for dmx_channel in dmx_universe.dmx_channels:
                    channel_num = dmx_channel.channel
                    channel_value = dmx_channel.value
                    channels_data[channel_num] = channel_value

            if channels_data:
                universe_frames[universe_num] = channels_data

        # Store the parsed frame data in the cue for use when running
        cue._dmx_frames = universe_frames

        if universe_frames:
            total_channels = sum(len(channels) for channels in universe_frames.values())
            Logger.info(
                f"DMX cue {cue.id} armed: {len(universe_frames)} universe(s), {total_channels} channel(s)",
                extra = {"caller": cue.__class__.__name__}
            )
        else:
            Logger.warning(
                f"DMX cue {cue.id} armed but no channel data found in DmxScene",
                extra = {"caller": cue.__class__.__name__}
            )

    except Exception as e:
        Logger.error(
            f'Error arming DMX cue {cue.id}: {e}',
            extra = {"caller": cue.__class__.__name__}
        )
        Logger.exception(e)
        # Set empty frames to avoid errors when running
        cue._dmx_frames = {}

loop_actionCue(cue, mtc)

Loop an ActionCue

Source code in src/cuemsengine/cues/loop_cue.py
41
42
43
44
45
46
@loop_cue.register
def loop_actionCue(cue: ActionCue, mtc: MtcListener):
    """
    Loop an ActionCue
    """
    pass

loop_audioCue(cue, mtc)

Handle the audio media playback loop.

This method manages the playback loop for audio media, including handling looping behavior and OSC communication for timing control.

Parameters:

Name Type Description Default
ossia

The OSC communication interface.

required
mtc MtcListener

The MIDI Time Code interface.

required
Source code in src/cuemsengine/cues/loop_cue.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@loop_cue.register
def loop_audioCue(cue: AudioCue, mtc: MtcListener):
    """Handle the audio media playback loop.

    This method manages the playback loop for audio media, including handling
    looping behavior and OSC communication for timing control.

    Args:
        ossia: The OSC communication interface.
        mtc: The MIDI Time Code interface.
    """
    Logger.info(f'Running audio cue loop {cue.id}, cue.loop={cue.loop} (type={type(cue.loop).__name__})')

    try:
        loop_counter = 0
        duration = CTimecode(cue.media.duration).return_in_other_framerate(mtc.main_tc.framerate)
        Logger.info(f'Audio duration: {duration}, _end_mtc: {cue._end_mtc.milliseconds_rounded}ms, current MTC: {mtc.main_tc.milliseconds_rounded}ms')

        while cue.loop < 1 or loop_counter < cue.loop:
            if cue._stop_requested:
                Logger.info(f'Audio loop {cue.id} cancelled by stop request')
                return
            Logger.info(f'Audio loop iteration starting: loop_counter={loop_counter}, cue.loop={cue.loop}')

            last_status_update = 0.0
            while mtc.main_tc.milliseconds_rounded < cue._end_mtc.milliseconds_rounded:
                if cue._stop_requested:
                    Logger.info(f'Audio loop {cue.id} cancelled by stop request (inner)')
                    return
                sleep(0.02)
                # Future: uncomment to enable percentage progress updates.
                # Throttled to CUE_STATUS_UPDATE_HZ (Tier 1 / node-side).
                # _now = time.monotonic()
                # if _now - last_status_update >= 1.0 / CUE_STATUS_UPDATE_HZ:
                #     last_status_update = _now
                #     _elapsed = mtc.main_tc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
                #     _total = cue._end_mtc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
                #     if _total > 0:
                #         _pct = max(1, min(99, int(100 * _elapsed / _total)))
                #         CUE_HANDLER.communications_thread.update_cue(cue.id, _pct, timeout=0.1)

            Logger.info(f'Audio iteration {loop_counter + 1} finished (MTC={mtc.main_tc.milliseconds_rounded}ms reached _end_mtc={cue._end_mtc.milliseconds_rounded}ms)')
            loop_counter += 1

            will_loop_again = cue.loop < 1 or loop_counter < cue.loop
            Logger.info(f'After increment: loop_counter={loop_counter}, will_loop_again={will_loop_again}')

            if cue._local and will_loop_again:
                cue._start_mtc = CTimecode(framerate=cue._end_mtc.framerate, frames=cue._end_mtc.frames)
                cue._end_mtc = cue._start_mtc + duration

                # Drift-sensitive: use _exact (float) rather than _rounded (int)
                # to preserve sub-ms precision at NTSC framerates (29.97/23.976).
                offset_to_go = -cue._start_mtc.milliseconds_exact

                Logger.info(f'Loop {loop_counter}: setting offset={offset_to_go} (MTC={mtc.main_tc.milliseconds_rounded}ms, _start_mtc={cue._start_mtc.milliseconds_rounded}ms, _end_mtc={cue._end_mtc.milliseconds_rounded}ms)')

                try:
                    cue._osc.set_value('/offset', offset_to_go)
                    Logger.info(f"Audio offset sent: {offset_to_go}", extra={"caller": cue.__class__.__name__})
                except Exception as e:
                    Logger.error(f'Audio offset send failed: {e}', extra={"caller": cue.__class__.__name__})

        Logger.info(f'Audio loop FINISHED: loop_counter={loop_counter}, cue.loop={cue.loop}')
        if cue._local:
            try:
                cue._osc.set_value('/mtcfollow', 0)
                Logger.info(f"Audio mtcfollow disabled", extra={"caller": cue.__class__.__name__})
            except Exception as e:
                Logger.warning(f'Error disabling mtcfollow: {e}', extra={"caller": cue.__class__.__name__})

    except AttributeError:
        pass

loop_cue(cue, mtc)

Loop a cue based on its type

Source code in src/cuemsengine/cues/loop_cue.py
27
28
29
30
31
32
@singledispatch
def loop_cue(cue: Cue, mtc: MtcListener):
    """
    Loop a cue based on its type
    """
    pass

loop_cueList(cue, mtc)

Loop a CueList

Source code in src/cuemsengine/cues/loop_cue.py
34
35
36
37
38
39
@loop_cue.register
def loop_cueList(cue: CueList, mtc: MtcListener):
    """
    Loop a CueList
    """
    pass

loop_dmxCue(cue, mtc)

Handle the DMX cue duration wait.

DMX scenes are fire-and-forget (sent once in run_dmxCue), so we only wait for the cue duration to elapse to maintain proper script timing. The cue._local guard is maintained for potential future looping implementation.

Parameters:

Name Type Description Default
cue DmxCue

The DmxCue

required
mtc MtcListener

The MIDI Time Code interface

required
Source code in src/cuemsengine/cues/loop_cue.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
@loop_cue.register
def loop_dmxCue(cue: DmxCue, mtc: MtcListener):
    """Handle the DMX cue duration wait.

    DMX scenes are fire-and-forget (sent once in run_dmxCue), so we only wait 
    for the cue duration to elapse to maintain proper script timing.
    The cue._local guard is maintained for potential future looping implementation.

    Args:
        cue: The DmxCue
        mtc: The MIDI Time Code interface
    """
    try:
        last_status_update = 0.0
        while mtc.main_tc.milliseconds_rounded < cue._end_mtc.milliseconds_rounded:
            if cue._stop_requested:
                Logger.info(f'DMX loop {cue.id} cancelled by stop request')
                return
            sleep(0.02)
            # Future: uncomment to enable percentage progress updates.
            # Throttled to CUE_STATUS_UPDATE_HZ (Tier 1 / node-side).
            # _now = time.monotonic()
            # if _now - last_status_update >= 1.0 / CUE_STATUS_UPDATE_HZ:
            #     last_status_update = _now
            #     _elapsed = mtc.main_tc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
            #     _total = cue._end_mtc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
            #     if _total > 0:
            #         _pct = max(1, min(99, int(100 * _elapsed / _total)))
            #         CUE_HANDLER.communications_thread.update_cue(cue.id, _pct, timeout=0.1)

        if cue._local:
            pass

        Logger.debug(f'DMX cue {cue.id} duration elapsed')

    except AttributeError:
        pass

loop_fadeCue(cue, mtc)

Hold a FadeCue in the cue runner for its full duration.

The actual fade is driven by gradient-motiond over OSC; this loop simply blocks until the FadeCue's _end_mtc is reached so general cue lifecycle (auto-disarm of the FadeCue itself in go_threaded's end-of-cue path) only fires after the fade has elapsed. _start_mtc / _end_mtc are set by ActionHandler._handle_fade_action at dispatch time.

Source code in src/cuemsengine/cues/loop_cue.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@loop_cue.register
def loop_fadeCue(cue: FadeCue, mtc: MtcListener):
    """Hold a FadeCue in the cue runner for its full duration.

    The actual fade is driven by gradient-motiond over OSC; this loop simply
    blocks until the FadeCue's _end_mtc is reached so general cue lifecycle
    (auto-disarm of the FadeCue itself in go_threaded's end-of-cue path) only
    fires after the fade has elapsed. _start_mtc / _end_mtc are set by
    ActionHandler._handle_fade_action at dispatch time.
    """
    end_mtc = getattr(cue, '_end_mtc', None)
    if end_mtc is None:
        Logger.warning(f'FadeCue {cue.id} has no _end_mtc; loop_fadeCue exiting immediately')
        return

    while mtc.main_tc.milliseconds_rounded < end_mtc.milliseconds_rounded:
        if getattr(cue, '_stop_requested', False):
            Logger.info(f'FadeCue {cue.id} loop cancelled by stop request')
            return
        sleep(0.02)

loop_videoCue(cue, mtc)

Handle the video media playback loop.

Manages looping behavior for all layers in cue._layer_ids, updating offset via the single VideoClient in cue._osc.

Source code in src/cuemsengine/cues/loop_cue.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
@loop_cue.register
def loop_videoCue(cue: VideoCue, mtc: MtcListener):
    """Handle the video media playback loop.

    Manages looping behavior for all layers in cue._layer_ids,
    updating offset via the single VideoClient in cue._osc.
    """
    Logger.info(f'Running video cue loop {cue.id}, cue.loop={cue.loop} (type={type(cue.loop).__name__})')

    try:
        loop_counter = 0
        duration = CTimecode(cue.media.duration).return_in_other_framerate(mtc.main_tc.framerate)
        Logger.info(f'Video duration: {duration}, duration in frames: {duration.frame_number} {duration.framerate}')
        Logger.info(f'Initial _end_mtc: {cue._end_mtc.milliseconds_rounded}ms, current MTC: {mtc.main_tc.milliseconds_rounded}ms')

        layer_ids = getattr(cue, '_layer_ids', [])

        # Tell the videocomposer this is a looping cue so it wraps frames at the
        # loop boundary (instead of clamping to the last frame).
        for layer_id in layer_ids:
            try:
                cue._osc.set_value(f'/videocomposer/layer/{layer_id}/loop', 1)
            except Exception as e:
                Logger.error(f'Loop enable failed for layer {layer_id}: {e}')

        while cue.loop < 1 or loop_counter < cue.loop:
            if cue._stop_requested:
                Logger.info(f'Video loop {cue.id} cancelled by stop request')
                return
            last_status_update = 0.0
            while mtc.main_tc.milliseconds_rounded < cue._end_mtc.milliseconds_rounded:
                if cue._stop_requested:
                    Logger.info(f'Video loop {cue.id} cancelled by stop request (inner)')
                    return
                sleep(0.02)
                # Future: uncomment to enable percentage progress updates.
                # Throttled to CUE_STATUS_UPDATE_HZ (Tier 1 / node-side).
                # _now = time.monotonic()
                # if _now - last_status_update >= 1.0 / CUE_STATUS_UPDATE_HZ:
                #     last_status_update = _now
                #     _elapsed = mtc.main_tc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
                #     _total = cue._end_mtc.milliseconds_rounded - cue._start_mtc.milliseconds_rounded
                #     if _total > 0:
                #         _pct = max(1, min(99, int(100 * _elapsed / _total)))
                #         CUE_HANDLER.communications_thread.update_cue(cue.id, _pct, timeout=0.1)

            Logger.info(f'Video iteration {loop_counter + 1} finished (MTC={mtc.main_tc.milliseconds_rounded}ms reached _end_mtc={cue._end_mtc.milliseconds_rounded}ms)')
            loop_counter += 1

            will_loop_again = cue.loop < 1 or loop_counter < cue.loop

            if cue._local and will_loop_again:
                cue._start_mtc = CTimecode(framerate=cue._end_mtc.framerate, frames=cue._end_mtc.frames)
                cue._end_mtc = cue._start_mtc + duration
                offset_change_frames = -cue._start_mtc.frame_number

                Logger.info(f'Loop {loop_counter}: setting offset={offset_change_frames}')

                for layer_id in layer_ids:
                    try:
                        cue._osc.set_value(f'/videocomposer/layer/{layer_id}/offset', int(offset_change_frames))
                    except Exception as e:
                        Logger.error(f'Offset send failed for layer {layer_id}: {e}')

        Logger.info(f'Loop FINISHED: loop_counter={loop_counter}, cue.loop={cue.loop}')

    except AttributeError:
        pass

run_actionCue(cue, mtc, frozen_mtc_ms=None)

Run an ActionCue by delegating to ActionHandler.execute_action.

Forwards frozen_mtc_ms so a chained 'play' action triggered inside a post_go='go' chain preserves the chain's MTC snapshot — without it, ActionCue-mediated chains capture live MTC inside CueHandler.go and drift relative to the chain's other cues.

Source code in src/cuemsengine/cues/run_cue.py
38
39
40
41
42
43
44
45
46
47
48
49
@run_cue.register
def run_actionCue(cue: ActionCue, mtc: MtcListener, frozen_mtc_ms: float = None):
    """Run an ActionCue by delegating to ActionHandler.execute_action.

    Forwards frozen_mtc_ms so a chained 'play' action triggered inside a
    post_go='go' chain preserves the chain's MTC snapshot — without it,
    ActionCue-mediated chains capture live MTC inside CueHandler.go and
    drift relative to the chain's other cues.
    """
    from .ActionHandler import ACTION_HANDLER

    ACTION_HANDLER.execute_action(cue, mtc, frozen_mtc_ms)

run_audioCue(cue, mtc, frozen_mtc_ms=None)

Run an AudioCue

Parameters:

Name Type Description Default
cue AudioCue

The audio cue to run

required
mtc

The MTC listener (for framerate info)

required
frozen_mtc_ms float

Optional frozen MTC timestamp for perfect sync with chained cues

None
Source code in src/cuemsengine/cues/run_cue.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@run_cue.register
def run_audioCue(cue: AudioCue, mtc, frozen_mtc_ms: float = None):
    """
    Run an AudioCue

    Args:
        cue: The audio cue to run
        mtc: The MTC listener (for framerate info)
        frozen_mtc_ms: Optional frozen MTC timestamp for perfect sync with chained cues
    """
    # CRITICAL FOR SYNC: Use frozen timestamp if provided (for post_go='go' chains)
    # Otherwise read live MTC. This ensures audio and video cues share the same reference.
    if frozen_mtc_ms is not None:
        mtc_ms = frozen_mtc_ms
        Logger.debug(f'AudioCue {cue.id} using frozen MTC: {mtc_ms}ms')
        # Frozen path: only have a float ms snapshot (CueHandler captured it
        # before this point); reconstruct via canonicalized __init__ which
        # routes through HMSF + tc_to_frames, drop-frame correct.
        cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, start_seconds=mtc_ms/1000)
    else:
        # Live MTC path: frame-domain construction skips the lossy
        # ms→seconds→frames round-trip entirely (mirrors loop_cue.py:107,224).
        cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, frames=mtc.main_tc.frames)

    # Convert duration to MTC framerate to prevent drift when looping
    duration = CTimecode(cue.media.duration).return_in_other_framerate(mtc.main_tc.framerate)
    cue._end_mtc = cue._start_mtc + duration

    # Audio player formula: file_position = MTC + offset
    # To play from position 0 when MTC = start_mtc, we need offset = -start_mtc
    offset_to_go = -cue._start_mtc.milliseconds_exact

    # Verify mixer graph; only repair if drifted. Arm already wired it; the
    # unconditional reconnect at GO costs ~21-28 ms (measured) without
    # touching the audio path.
    try:
        mixer = PLAYER_HANDLER.get_audio_mixer()
        if mixer:
            uuid_slug = ''.join(str(cue.id).split('-'))
            # Actual JACK client name is Audio_Player-{uuid} with ports "outport 0", "outport 1"
            player_name = f'Audio_Player-{uuid_slug}'

            # Resolve JACK port names from cue output IDs via audio output lookup
            selected_outputs = []
            if hasattr(cue, 'outputs') and cue.outputs:
                for output in cue.outputs:
                    output_name = output.get('output_name', '')
                    if len(output_name) > 37:
                        output_id = output_name[37:]
                        port_name = PLAYER_HANDLER.resolve_audio_port(output_id)
                        if port_name:
                            selected_outputs.append(port_name)
                        else:
                            selected_outputs.append(output_id)

            Logger.debug(f"Audio cue {cue.id} selected outputs: {selected_outputs}")

            # If the player's outport 0 is missing, the subprocess died between
            # arm and GO. connect_player_to_outputs would block 15 s in its
            # port-wait loop before failing; abort fast instead.
            channel_0 = f'{player_name}:outport 0'
            if not mixer.conn_man.port_exists(channel_0):
                Logger.error(
                    f"Audio cue {cue.id}: player JACK ports missing at GO "
                    f"({channel_0}); subprocess likely crashed between arm "
                    f"and GO. Aborting cue."
                )
                return

            if mixer.player_connections_correct(
                player_name=player_name,
                player_output_prefix='outport',
                selected_outputs=selected_outputs,
            ):
                Logger.debug(f"Audio cue {cue.id}: graph already wired, skipping connect")
            else:
                Logger.warning(
                    f"Audio cue {cue.id}: graph not wired correctly at GO; "
                    f"repairing via connect_player_to_outputs"
                )
                mixer.connect_player_to_outputs(
                    player_name=player_name,
                    player_output_prefix='outport',
                    selected_outputs=selected_outputs,
                )
    except Exception as e:
        Logger.warning(f"Could not validate/connect player to mixer: {e}")

    # Define the offset - use MTC framerate for consistent timing with video
    try:
        key = '/offset'

        cue._osc.set_value(key, offset_to_go)
        Logger.info(
            f"offset {offset_to_go} to {key}: {str(cue._osc.get_node(key).parameter.value)}",
            extra = {"caller": cue.__class__.__name__}
        )
    except Exception as e:
        Logger.warning(
            f'Error setting offset in run_audioCue: {e}',
            extra = {"caller": cue.__class__.__name__}
        )

    # Connect to mtc signal
    try:
        key = '/mtcfollow'
        cue._osc.set_value(key, 1)
    except Exception as e:
        Logger.warning(
            f'Error setting mtcfollow in run_audioCue: {e}',
            extra = {"caller": cue.__class__.__name__}
        )

    # Apply master volume from cue settings
    try:
        master_vol = getattr(cue, 'master_vol', None)
        if master_vol is not None:
            # UI uses 0-100 percentage, audioplayer expects 0.0-1.0 gain
            # Convert and clamp to valid range
            vol_value = max(0.0, min(1.0, float(master_vol) / 100.0))
            cue._osc.set_value('/volmaster', vol_value)
            Logger.info(
                f"master_vol {master_vol}% -> {vol_value} set on audio cue {cue.id}",
                extra = {"caller": cue.__class__.__name__}
            )
    except Exception as e:
        Logger.warning(
            f'Error setting master volume in run_audioCue: {e}',
            extra = {"caller": cue.__class__.__name__}
        )

run_cue(cue, mtc, frozen_mtc_ms=None)

Run a cue based on its type.

Parameters:

Name Type Description Default
cue Cue

The cue to run

required
mtc MtcListener

The MTC listener (for framerate info)

required
frozen_mtc_ms float

Optional frozen MTC timestamp in milliseconds. When provided (e.g., for chained cues with post_go='go'), this timestamp is used instead of reading live MTC. This ensures perfect sync between audio and video cues.

None
Source code in src/cuemsengine/cues/run_cue.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@singledispatch
def run_cue(cue: Cue, mtc: MtcListener, frozen_mtc_ms: float = None):
    """
    Run a cue based on its type.

    Args:
        cue: The cue to run
        mtc: The MTC listener (for framerate info)
        frozen_mtc_ms: Optional frozen MTC timestamp in milliseconds.
                       When provided (e.g., for chained cues with post_go='go'),
                       this timestamp is used instead of reading live MTC.
                       This ensures perfect sync between audio and video cues.
    """
    pass

run_cueList(cue, mtc, frozen_mtc_ms=None)

Run a CueList by dispatching its first enabled child.

Source code in src/cuemsengine/cues/run_cue.py
30
31
32
33
34
35
36
@run_cue.register
def run_cueList(cue: CueList, mtc: MtcListener, frozen_mtc_ms: float = None):
    """Run a CueList by dispatching its first enabled child."""
    if cue.contents:
        first_enabled = next((c for c in cue.contents if c.enabled), None)
        if first_enabled:
            run_cue(first_enabled, mtc, frozen_mtc_ms)

run_dmxCue(cue, mtc, frozen_mtc_ms=None)

Run a DmxCue

Sends DMX scene bundle directly to the local DMX player. Synchronized with MTC. The scene contains frame data, timing, and fade info. DMX cues have no media duration - duration is inferred from fade times. Only fadein_time is used for now. fade_out defaults to 0

Parameters:

Name Type Description Default
cue DmxCue

The DMX cue to run

required
mtc

The MTC listener (for framerate info)

required
frozen_mtc_ms float

Optional frozen MTC timestamp for perfect sync with chained cues

None
Source code in src/cuemsengine/cues/run_cue.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
@run_cue.register
def run_dmxCue(cue: DmxCue, mtc, frozen_mtc_ms: float = None):
    """
    Run a DmxCue

    Sends DMX scene bundle directly to the local DMX player.
    Synchronized with MTC. The scene contains frame data, timing, and fade info.
    DMX cues have no media duration - duration is inferred from fade times.
    Only fadein_time is used for now. fade_out defaults to 0

    Args:
        cue: The DMX cue to run
        mtc: The MTC listener (for framerate info)
        frozen_mtc_ms: Optional frozen MTC timestamp for perfect sync with chained cues
    """
    try:
        # CRITICAL FOR SYNC: Use frozen timestamp if provided (for post_go='go' chains)
        if frozen_mtc_ms is not None:
            mtc_ms = frozen_mtc_ms
            Logger.debug(f'DmxCue {cue.id} using frozen MTC: {mtc_ms}ms')
            # Frozen path: only have a float ms snapshot; canonicalized
            # __init__ routes through HMSF + tc_to_frames.
            cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, start_seconds=mtc_ms/1000)
        else:
            # Live MTC path: frame-domain construction (no round-trip loss).
            cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, frames=mtc.main_tc.frames)

        # DMX cues have no media - duration is inferred from fade times
        # Duration = fadein_time + fadeout_time (both in milliseconds)
        fadein_ms = getattr(cue, 'fadein_time', 0)
        fadeout_ms = getattr(cue, 'fadeout_time', 0)
        duration_ms = fadein_ms + fadeout_ms

        # Convert duration to timecode format with explicit framerate
        duration_seconds = duration_ms / 1000.0
        duration = CTimecode(framerate=mtc.main_tc.framerate, start_seconds=duration_seconds)
        cue._end_mtc = cue._start_mtc + duration

        # Absolute MTC time for this cue (ms). DMX player expects mtc_time as absolute
        # "0:0:S.sss" string so it can schedule m_mtcStart = max(playHead, time).
        offset_milliseconds = cue._start_mtc.milliseconds_exact
        mtc_time_str = f"0:0:{offset_milliseconds / 1000.0}"

        # Get DMX frame data from the cue
        universe_frames = getattr(cue, '_dmx_frames', {})

        if not universe_frames:
            Logger.warning(
                f"DMX cue {cue.id} has no frame data to send",
                extra = {"caller": cue.__class__.__name__}
            )
            return

        # Convert fadein_time to seconds for the DMX player (only fadein is used for now)
        fade_time = fadein_ms / 1000.0

        # Check if we have an OSC client
        if cue._osc is None:
            Logger.error(
                f"DMX cue {cue.id} has no OSC client available",
                extra = {"caller": cue.__class__.__name__}
            )
            return

        # Enable MTC following so the dmxplayer tracks timecode and stops
        # advancing when MTC stops (e.g. on STOP command).
        cue._osc.enable_mtcfollow()

        # Send DMX scene bundle to local player (mtc_time absolute so no overlap/loss)
        cue._osc.send_dmx_scene(
            universe_frames=universe_frames,
            mtc_time=mtc_time_str,
            fade_time=fade_time
        )

        Logger.info(
            f"DMX scene sent to local player for cue {cue.id}: "
            f"mtc_time={mtc_time_str} ({offset_milliseconds}ms), universes={len(universe_frames)}, fade={fade_time}s",
            extra = {"caller": cue.__class__.__name__}
        )

    except Exception as e:
        Logger.error(
            f'Error running DMX cue {cue.id}: {e}',
            extra = {"caller": cue.__class__.__name__}
        )
        Logger.exception(e)

run_videoCue(cue, mtc, frozen_mtc_ms=None)

Run a VideoCue.

Sends offset/visible/mtcfollow to all layers in cue._layer_ids via the single VideoClient in cue._osc.

Source code in src/cuemsengine/cues/run_cue.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@run_cue.register
def run_videoCue(cue: VideoCue, mtc, frozen_mtc_ms: float = None):
    """Run a VideoCue.

    Sends offset/visible/mtcfollow to all layers in cue._layer_ids
    via the single VideoClient in cue._osc.
    """
    Logger.info(f'Running video cue {cue.id}')

    layer_ids = getattr(cue, '_layer_ids', [])
    if not layer_ids or cue._osc is None:
        Logger.error(f'Video cue {cue.id} has no layers or no OSC client')
        return

    if frozen_mtc_ms is not None:
        mtc_ms = frozen_mtc_ms
        Logger.debug(f'VideoCue {cue.id} using frozen MTC: {mtc_ms}ms')
        # Frozen path: float ms snapshot; canonicalized __init__ handles it.
        cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, start_seconds=mtc_ms/1000)
    else:
        # Live MTC path: frame-domain construction (no round-trip loss).
        cue._start_mtc = CTimecode(framerate=mtc.main_tc.framerate, frames=mtc.main_tc.frames)

    duration = CTimecode(cue.media.duration).return_in_other_framerate(mtc.main_tc.framerate)
    cue._end_mtc = cue._start_mtc + duration
    offset_to_go = -cue._start_mtc.frame_number

    client = cue._osc

    # Re-apply position for each layer before making visible (layer may not have
    # been ready when position was set during arm)
    output_names = PLAYER_HANDLER.get_all_cue_output_names(cue)

    for index, layer_id in enumerate(layer_ids):
        layer_path = f'/videocomposer/layer/{layer_id}'

        # Re-apply canvas position from the output config
        if index < len(output_names):
            output_name = output_names[index]
            try:
                output = PLAYER_HANDLER.resolve_video_output_for_cue(cue, output_name)
                x, y = output.get_layer_placement()
                client.set_value(f'{layer_path}/position', [x, y])
                sx, sy = output.get_layer_scale()
                if sx != 1.0 or sy != 1.0:
                    client.set_value(f'{layer_path}/scale', [sx, sy])
            except (KeyError, RuntimeError, ValueError) as e:
                Logger.warning(f'Could not re-apply position for layer {layer_id}: {e}')
            except Exception:
                Logger.exception(f'Unexpected error re-applying position for layer {layer_id} (output "{output_name}")')

        client.set_value(f'{layer_path}/offset', int(offset_to_go))
        # Send mtcfollow before visible so the videocomposer loads the
        # correct frame (using offset + MTC position) while the layer is
        # still invisible. This prevents rendering a stale frame.
        client.set_value(f'{layer_path}/mtcfollow', 1)
        client.set_value(f'{layer_path}/visible', 1)

    Logger.info(f"Video cue {cue.id} running: {len(layer_ids)} layer(s), offset={offset_to_go}")