Skip to content

Tools

CuemsDeploy: async rsync-based media and project deployer for CUEMS nodes.

Async model: _sync(), _check_mandatory_sources(), _kill(), and _pump() are coroutines scheduled on the event loop injected via NodeEngine.start() late-bind. sync_files() remains a synchronous blocking API; it submits _deploy_all_async() via run_coroutine_threadsafe() and blocks until completion. Watchdogs inside the coroutine bound all wait times.

Late-bind protocol

NodeEngine.init() creates CuemsDeploy(loop=None). NodeEngine.start() calls CUE_HANDLER.set_nng_comms(...), which starts AsyncCommsThread (creates the event_loop), then: self.deploy_manager.loop = CUE_HANDLER.communications_thread.event_loop Any sync_files() before that bind returns False immediately.

Why _avahi_resolve stays synchronous: it is called from init() before any asyncio loop exists, so subprocess.run() (short timeout) is correct.

CuemsDeploy

Source code in src/cuemsengine/tools/CuemsDeploy.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
class CuemsDeploy():
    _RSYNC_PASSWORD: ClassVar[str] = 'f48t5eL2kLHw2Wfw'

    def __init__(
            self,
            library_path = '/opt/cuems_library/',
            tmp_path = '/tmp/cuems_library/',
            controller_ip: str | None = None,
            hostname: str | None = None,
            log_file: str = '/run/cuems/rsync.log',
            on_progress: Callable[[dict], None] | None = None,
            loop: asyncio.AbstractEventLoop | None = None,
        ):
        """Construct a deploy manager.

        Args:
            controller_ip: IP of the controller's rsync daemon (preferred).
                Pass BaseEngine.controller_ip. If falsy, manager runs disabled.
            hostname: Legacy fallback resolved via avahi when controller_ip is
                not provided. Kept for backwards compatibility.
            log_file: Where rsync writes its log.
            on_progress: Optional callback fired for each rsync progress update
                (parsed from --info=progress2). Receives a dict with keys
                bytes, pct, rate, eta, and optionally xfr, remaining, total.
                Must be non-blocking — invoked from the asyncio loop.
            loop: The asyncio event loop for run_coroutine_threadsafe(). Defaults
                to None; late-bind via NodeEngine.start().
        """
        self.library_path = library_path
        self.tmp_path = tmp_path
        self.log_file = log_file
        self.errors = []
        self.encoding = sys.getfilesystemencoding()
        self._on_progress = on_progress or (lambda parsed: None)
        self.loop = loop

        # TODO: rebuild on network_map reload to pick up IP changes without restarting.
        if controller_ip:
            self.main_ip = controller_ip
        elif hostname:
            self.main_ip = self._avahi_resolve(hostname)
        else:
            self.main_ip = None

        if self.main_ip:
            self.address = f'rsync://cuems_library_rsync@{self.main_ip}/cuems'
            self.enabled = True
        else:
            self.address = None
            self.enabled = False
            Logger.warning(
                f'CuemsDeploy disabled: no valid controller IP '
                f'(controller_ip={controller_ip!r}, hostname={hostname!r}, '
                f'resolved={self.main_ip!r}). Project deploys will be skipped.'
            )

    def sync_files(self, project: str, tag: str, file_names: list[str] | None = None) -> bool:
        """Sync files from the controller to the node.

        Submits _deploy_all_async() to self.loop via run_coroutine_threadsafe()
        and blocks until the coroutine completes. Watchdogs inside the coroutine
        handle all time bounds; no external timeout is needed here.

        Args:
            project: Project identifier used to build paths and log file names.
            tag: Transfer type — ``'project'`` or ``'media'``. Controls which
                mandatory-path precheck runs and which default file list is used
                when file_names is empty.
            file_names: Explicit list of rsync-relative paths to transfer. When
                omitted (or empty) and tag is ``'project'``, defaults to the
                standard project file set (script, mappings, settings).

        Returns:
            True on success; False if disabled, loop unbound, precheck failed,
            rsync exited non-zero, or any unexpected exception occurred. On
            failure, self.errors contains one or more diagnostic strings.
        """
        if not self.enabled:
            Logger.error(
                f'CuemsDeploy is disabled (no controller IP) — '
                f'skipping {tag} sync for project {project!r}'
            )
            return False

        if self.loop is None:
            Logger.error(
                f'CuemsDeploy event loop not bound (NodeEngine.start() not '
                f'called yet) — skipping {tag} sync for project {project!r}'
            )
            self.errors = ['event loop not bound']
            return False

        file_names = list(file_names or [])
        if tag == 'project' and len(file_names) == 0:
            file_names = self._project_files(project)
        elif tag == 'media' and len(file_names) > 0:
            file_names = self._media_files(file_names)

        mandatory_paths = self._mandatory_paths(project, tag)
        log_file = self._deploy_log_path(project, tag)

        try:
            coro = self._deploy_all_async(log_file, file_names, mandatory_paths)
            synced = asyncio.run_coroutine_threadsafe(coro, self.loop).result()
        except Exception as e:
            Logger.error(f'Unexpected error during deploy of {project!r}: {e}')
            self.errors = [str(e)]
            return False

        if synced:
            self._reset_deploy_log(log_file)
            self.errors = []
        else:
            Logger.error(
                f'Failed to sync {tag} files for project {project!r} '
                f'from {self.address} (log: {log_file})'
            )
            for error in self.errors:
                Logger.error(error)
        return synced

    def _mandatory_paths(self, project: str, tag: str) -> list[str]:
        if tag != 'project':
            return []
        return [f'/projects/{project}/script.xml']

    async def _check_mandatory_sources(self, mandatory_paths: list[str]) -> tuple[bool, list[str]]:
        """Verify mandatory remote paths exist before bulk transfer.

        Uses one rsync --list-only probe with all mandatory paths. Output is
        consumed via proc.communicate() — the probe is short-lived and has
        bounded output, so streaming is unnecessary (proc.communicate()
        rationale: see research.md §2).
        """
        mandatory_paths = [p.strip() for p in mandatory_paths if p.strip()]
        if not mandatory_paths:
            return True, []

        env = dict(os.environ, RSYNC_PASSWORD=self._RSYNC_PASSWORD)
        with tempfile.NamedTemporaryFile(
            mode='w',
            encoding=self.encoding,
            delete=False,
            prefix='rsync_mandatory_',
            suffix='.lst',
        ) as probe_list:
            for source_path in mandatory_paths:
                probe_list.write(f'{source_path}\n')
            probe_list_path = probe_list.name

        try:
            proc = await asyncio.create_subprocess_exec(
                'rsync',
                '-r',
                '--list-only',
                '--contimeout=2',
                '--timeout=5',
                f'--files-from={probe_list_path}',
                self.address,
                self.library_path,
                stdout=asyncio.subprocess.PIPE,
                stderr=asyncio.subprocess.PIPE,
                env=env,
            )
            _, stderr_bytes = await proc.communicate()
        finally:
            try:
                os.remove(probe_list_path)
            except OSError:
                pass

        if proc.returncode == 0:
            return True, []

        stderr = stderr_bytes.decode(errors='replace').strip()
        missing = []
        for source_path in mandatory_paths:
            if source_path in stderr and (
                'No such file or directory' in stderr
                or 'link_stat' in stderr
                or 'failed to stat' in stderr
            ):
                missing.append(source_path)

        if missing:
            return False, missing

        self.errors = [
            f'rsync mandatory precheck failed: '
            f'{stderr or f"exit code {proc.returncode}"}'
        ]
        return False, []

    def _avahi_resolve(self, hostname: str) -> str | None:
        """Resolve a hostname via avahi-resolve-host-name.

        Stays synchronous: called from __init__() before any asyncio loop
        exists. Returns the IP string on success, or None on failure.
        """
        import subprocess
        try:
            result = subprocess.run(
                ['avahi-resolve-host-name', '-n', hostname],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                timeout=5,
            )
            result.check_returncode()
            ip = result.stdout.decode(self.encoding).replace(hostname, '').strip()
            return ip if ip else None
        except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
            return None

    async def _pump(self, stream: asyncio.StreamReader, tag: str, queue: asyncio.Queue) -> None:
        """Read 4096-byte chunks until EOF; push (tag, chunk) then (tag, None).

        (tag, None) is the EOF sentinel consumed by _sync's driver loop.
        """
        while True:
            chunk = await stream.read(4096)
            if not chunk:
                await queue.put((tag, None))
                return
            await queue.put((tag, chunk))

    async def _sync(self, path: str) -> bool:
        """Async rsync transfer with two concurrent reader tasks and watchdogs.

        Watchdog state machine (data-model.md):
          STARTUP → empty asyncio.wait result before first queue item → KILLED
          ACTIVE  → empty asyncio.wait result after receiving data    → KILLED
          DONE    → both pipes closed, proc exits rc=0                → True
          DONE    → both pipes closed, proc exits rc≠0                → False

        Queue pattern: two _pump tasks push to a shared asyncio.Queue; the
        main driver loop drains it after each asyncio.wait() call. The
        watchdog deadline resets on every received chunk.
        """
        try:
            os.makedirs(os.path.dirname(self.log_file), exist_ok=True)
        except OSError as e:
            Logger.warning(f'Could not create rsync log directory: {e}')

        # -t: rsync shows >f..T...... without it; also breaks .idx mtime cache (2026-05-19).
        cmd = [
            'rsync', '-rt',
            '--delete',
            '--delete-delay',
            '--info=progress2,name0',
            '--stats',
            '--contimeout=2',
            '--timeout=5',
            '--ignore-missing-args',
            f'--files-from={path}',
            f'--log-file={self.log_file}',
            self.address,
            self.library_path,
        ]
        env = dict(os.environ, RSYNC_PASSWORD=self._RSYNC_PASSWORD)

        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            env=env,
        )

        queue: asyncio.Queue = asyncio.Queue()
        t_out = asyncio.create_task(self._pump(proc.stdout, 'out', queue))
        t_err = asyncio.create_task(self._pump(proc.stderr, 'err', queue))

        bufs = {'out': '', 'err': ''}
        stderr_lines: list[str] = []
        started = False
        deadline = asyncio.get_event_loop().time() + _STARTUP_DEADLINE_S
        pipes_done = 0
        rc: int | None = None
        # Only pending tasks: avoids busy-loop once one pump finishes before the other.
        pending = {t_out, t_err}

        try:
            while pipes_done < 2:
                budget = deadline - asyncio.get_event_loop().time()
                done, pending = await asyncio.wait(
                    pending, timeout=max(budget, 0.1)
                )
                # Drain before watchdog: pump can push data without completing its task.
                got_data = False
                while not queue.empty():
                    tag, chunk = queue.get_nowait()
                    if chunk is None:
                        if bufs[tag]:
                            self._dispatch_line(tag, bufs[tag], stderr_lines)
                            bufs[tag] = ''
                        pipes_done += 1
                        continue
                    got_data = True
                    started = True
                    deadline = asyncio.get_event_loop().time() + _INACTIVITY_S
                    bufs[tag] += chunk.decode(errors='replace')
                    *parts, bufs[tag] = re.split(r'[\r\n]', bufs[tag])
                    for p in parts:
                        if p:
                            self._dispatch_line(tag, p, stderr_lines)
                if not done and not got_data:
                    reason = (
                        'no output within startup deadline' if not started
                        else 'no output within inactivity threshold'
                    )
                    t_out.cancel()
                    t_err.cancel()
                    await asyncio.gather(t_out, t_err, return_exceptions=True)
                    await self._kill(proc)
                    self.errors = [f'rsync {reason} (target: {self.address})']
                    return False

            try:
                rc = await asyncio.wait_for(proc.wait(), timeout=_INACTIVITY_S)
            except asyncio.TimeoutError:
                await self._kill(proc)
                self.errors = [
                    f'rsync closed pipes but did not exit within '
                    f'{_INACTIVITY_S}s (target: {self.address})'
                ]
                return False
        finally:
            if proc.returncode is None:
                await self._kill(proc)

        if rc == 0:
            self.errors = []
            return True
        # Drop the positional "rsync error: ... at main.c(NNN)" trailer if present.
        self.errors = (
            stderr_lines[:-1]
            if stderr_lines and 'rsync error:' in stderr_lines[-1]
            else stderr_lines
        )
        return False

    def _dispatch_line(self, tag: str, line: str, stderr_lines: list[str]) -> None:
        if tag == 'out':
            Logger.debug(f'rsync: {line}')
            parsed = self._parse_progress(line)
            if parsed:
                self._on_progress(parsed)
        else:
            Logger.warning(f'rsync: {line}')
            stderr_lines.append(line)

    async def _kill(self, proc: asyncio.subprocess.Process) -> None:
        """Terminate proc gracefully, escalating to SIGKILL after 2 s."""
        proc.terminate()
        try:
            await asyncio.wait_for(proc.wait(), timeout=2.0)
        except asyncio.TimeoutError:
            proc.kill()
            try:
                await asyncio.wait_for(proc.wait(), timeout=2.0)
            except asyncio.TimeoutError:
                pass

    async def _deploy_all_async(self, log_file: str, file_names: list[str], mandatory_paths: list[str]) -> bool:
        """Full deploy flow: precheck → log creation → rsync transfer.

        Early-fails on precheck failure; the log file is only created when
        precheck passes (preserving the pre-refactor invariant).
        """
        if mandatory_paths:
            mandatory_ok, missing = await self._check_mandatory_sources(
                mandatory_paths
            )
            if not mandatory_ok:
                if missing:
                    self.errors = [
                        f'mandatory project files are missing at source '
                        f'{self.address}: {", ".join(missing)}'
                    ]
                Logger.error('Failed mandatory precheck for project files')
                return False
        self._create_deploy_log(log_file, file_names)
        return await self._sync(log_file)

    def _parse_progress(self, line: str) -> dict[str, int | str]:
        """Parse a rsync --info=progress2 line.

        Returns {} for non-progress lines (stats block, file names, blank).
        Returns a structured dict with keys bytes, pct, rate, eta, and
        optionally xfr, remaining, total. Keep keys stable — UI consumers
        depend on them.
        """
        m = _PROGRESS2_RE.match(line)
        if not m:
            return {}
        bytes_str, pct, rate, eta, xfr, done, total = m.groups()
        out = {
            'bytes': int(bytes_str.replace(',', '')),
            'pct': int(pct),
            'rate': rate,
            'eta': eta,
        }
        if xfr is not None:
            out.update({
                'xfr': int(xfr),
                'remaining': int(done),
                'total': int(total),
            })
        return out

    def _deploy_log_path(self, project: str, tag: str = 'project') -> str:
        return os.path.join(
            self.tmp_path, f'rsync_request_{project}_{tag}.log'
        )

    def _create_deploy_log(self, log_file: str, file_names: list[str] = []) -> bool:
        """Create the rsync --files-from list file for a deploy request."""
        try:
            os.makedirs(os.path.dirname(log_file), exist_ok=True)
            with open(log_file, 'w') as f:
                # Normalize to one-path-per-line; callers may omit the trailing newline.
                for name in file_names:
                    if not name.endswith('\n'):
                        name = name + '\n'
                    f.write(name)
        except Exception as e:
            Logger.error(f'Exception raised when writing rsync request log file: {e}')
            return False
        return True

    def _reset_deploy_log(self, log_file: str) -> None:
        with open(log_file, 'w'):
            pass
        Logger.info(f'rsync Deploy log file {log_file} emptied')

    def _project_files(self, project: str) -> list[str]:
        return [
            '/projects/' + project + '/script.xml\n',
            '/projects/' + project + '/mappings.xml\n',
            '/projects/' + project + '/settings.xml\n'
        ]

    def _media_files(self, bare_names: list[str]) -> list[str]:
        """Expand bare media filenames to rsync-relative paths for --files-from.

        Every file gets a media/<name> entry. Video files (.mp4 .mov .avi
        .mkv .mpg) also get a media/indexes/<name>.idx sidecar entry.
        """
        _VIDEO_EXTS = {'.mp4', '.mov', '.avi', '.mkv', '.mpg'}
        result = []
        for name in bare_names:
            result.append(f'media/{name}')
            if os.path.splitext(name)[1].lower() in _VIDEO_EXTS:
                result.append(f'media/indexes/{name}.idx')
        return result

__init__(library_path='/opt/cuems_library/', tmp_path='/tmp/cuems_library/', controller_ip=None, hostname=None, log_file='/run/cuems/rsync.log', on_progress=None, loop=None)

Construct a deploy manager.

Parameters:

Name Type Description Default
controller_ip str | None

IP of the controller's rsync daemon (preferred). Pass BaseEngine.controller_ip. If falsy, manager runs disabled.

None
hostname str | None

Legacy fallback resolved via avahi when controller_ip is not provided. Kept for backwards compatibility.

None
log_file str

Where rsync writes its log.

'/run/cuems/rsync.log'
on_progress Callable[[dict], None] | None

Optional callback fired for each rsync progress update (parsed from --info=progress2). Receives a dict with keys bytes, pct, rate, eta, and optionally xfr, remaining, total. Must be non-blocking — invoked from the asyncio loop.

None
loop AbstractEventLoop | None

The asyncio event loop for run_coroutine_threadsafe(). Defaults to None; late-bind via NodeEngine.start().

None
Source code in src/cuemsengine/tools/CuemsDeploy.py
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
        self,
        library_path = '/opt/cuems_library/',
        tmp_path = '/tmp/cuems_library/',
        controller_ip: str | None = None,
        hostname: str | None = None,
        log_file: str = '/run/cuems/rsync.log',
        on_progress: Callable[[dict], None] | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ):
    """Construct a deploy manager.

    Args:
        controller_ip: IP of the controller's rsync daemon (preferred).
            Pass BaseEngine.controller_ip. If falsy, manager runs disabled.
        hostname: Legacy fallback resolved via avahi when controller_ip is
            not provided. Kept for backwards compatibility.
        log_file: Where rsync writes its log.
        on_progress: Optional callback fired for each rsync progress update
            (parsed from --info=progress2). Receives a dict with keys
            bytes, pct, rate, eta, and optionally xfr, remaining, total.
            Must be non-blocking — invoked from the asyncio loop.
        loop: The asyncio event loop for run_coroutine_threadsafe(). Defaults
            to None; late-bind via NodeEngine.start().
    """
    self.library_path = library_path
    self.tmp_path = tmp_path
    self.log_file = log_file
    self.errors = []
    self.encoding = sys.getfilesystemencoding()
    self._on_progress = on_progress or (lambda parsed: None)
    self.loop = loop

    # TODO: rebuild on network_map reload to pick up IP changes without restarting.
    if controller_ip:
        self.main_ip = controller_ip
    elif hostname:
        self.main_ip = self._avahi_resolve(hostname)
    else:
        self.main_ip = None

    if self.main_ip:
        self.address = f'rsync://cuems_library_rsync@{self.main_ip}/cuems'
        self.enabled = True
    else:
        self.address = None
        self.enabled = False
        Logger.warning(
            f'CuemsDeploy disabled: no valid controller IP '
            f'(controller_ip={controller_ip!r}, hostname={hostname!r}, '
            f'resolved={self.main_ip!r}). Project deploys will be skipped.'
        )

sync_files(project, tag, file_names=None)

Sync files from the controller to the node.

Submits _deploy_all_async() to self.loop via run_coroutine_threadsafe() and blocks until the coroutine completes. Watchdogs inside the coroutine handle all time bounds; no external timeout is needed here.

Parameters:

Name Type Description Default
project str

Project identifier used to build paths and log file names.

required
tag str

Transfer type — 'project' or 'media'. Controls which mandatory-path precheck runs and which default file list is used when file_names is empty.

required
file_names list[str] | None

Explicit list of rsync-relative paths to transfer. When omitted (or empty) and tag is 'project', defaults to the standard project file set (script, mappings, settings).

None

Returns:

Type Description
bool

True on success; False if disabled, loop unbound, precheck failed,

bool

rsync exited non-zero, or any unexpected exception occurred. On

bool

failure, self.errors contains one or more diagnostic strings.

Source code in src/cuemsengine/tools/CuemsDeploy.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def sync_files(self, project: str, tag: str, file_names: list[str] | None = None) -> bool:
    """Sync files from the controller to the node.

    Submits _deploy_all_async() to self.loop via run_coroutine_threadsafe()
    and blocks until the coroutine completes. Watchdogs inside the coroutine
    handle all time bounds; no external timeout is needed here.

    Args:
        project: Project identifier used to build paths and log file names.
        tag: Transfer type — ``'project'`` or ``'media'``. Controls which
            mandatory-path precheck runs and which default file list is used
            when file_names is empty.
        file_names: Explicit list of rsync-relative paths to transfer. When
            omitted (or empty) and tag is ``'project'``, defaults to the
            standard project file set (script, mappings, settings).

    Returns:
        True on success; False if disabled, loop unbound, precheck failed,
        rsync exited non-zero, or any unexpected exception occurred. On
        failure, self.errors contains one or more diagnostic strings.
    """
    if not self.enabled:
        Logger.error(
            f'CuemsDeploy is disabled (no controller IP) — '
            f'skipping {tag} sync for project {project!r}'
        )
        return False

    if self.loop is None:
        Logger.error(
            f'CuemsDeploy event loop not bound (NodeEngine.start() not '
            f'called yet) — skipping {tag} sync for project {project!r}'
        )
        self.errors = ['event loop not bound']
        return False

    file_names = list(file_names or [])
    if tag == 'project' and len(file_names) == 0:
        file_names = self._project_files(project)
    elif tag == 'media' and len(file_names) > 0:
        file_names = self._media_files(file_names)

    mandatory_paths = self._mandatory_paths(project, tag)
    log_file = self._deploy_log_path(project, tag)

    try:
        coro = self._deploy_all_async(log_file, file_names, mandatory_paths)
        synced = asyncio.run_coroutine_threadsafe(coro, self.loop).result()
    except Exception as e:
        Logger.error(f'Unexpected error during deploy of {project!r}: {e}')
        self.errors = [str(e)]
        return False

    if synced:
        self._reset_deploy_log(log_file)
        self.errors = []
    else:
        Logger.error(
            f'Failed to sync {tag} files for project {project!r} '
            f'from {self.address} (log: {log_file})'
        )
        for error in self.errors:
            Logger.error(error)
    return synced

MtcListener

Bases: Thread

Source code in src/cuemsengine/tools/MtcListener.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
class MtcListener(Thread):
    def __init__(self, step_callback: Callable | None = None, reset_callback: Callable | None = None, port: str | None = None):
        # self.main_tc = CTimecode('0:0:0:0')
        self.main_tc = CTimecode()
        self.main_tc.set_fractional(True)

        self.__quarter_frames = [0,0,0,0,0,0,0,0]
        self.port = None
        self.port_name = None
        self.__open_port(port)

        self.step_callback = step_callback
        self.reset_callback = reset_callback

        # 24h MTC rollover state (closes 869cpdbzy):
        # MIDI MTC encodes hours in a 5-bit field (0-23) and real SMPTE senders
        # reset to 00:00:00:00 after 24h. This listener detects that wrap by
        # comparing decoded TC frames against the previous one (a backward jump
        # of more than 1 hour indicates a rollover, not a manual seek) and
        # accumulates a 24h offset that is added to every subsequent decoded
        # TC. CTimecode itself is monotonic past 24h post-cuemsutils 0.1.0rc7
        # (PR #10 layer 1), but without this listener-side accumulation the
        # received MTC would reset to ~frames=1 every 24h regardless.
        self._24h_offset_frames: int = 0
        self._last_decoded_frames: int | None = None

        super().__init__(name = 'mtclistener')
        self.daemon = True

    def _apply_24h_offset(self, decoded: CTimecode) -> CTimecode:
        """Detect 24h MTC rollover and apply accumulated offset.

        Heuristic: a real 24h wrap goes from 23:59:59:F (frames ≈ 24h - 1f)
        to 00:00:00:00 (frames ≈ 0). We treat a backward jump as a 24h wrap
        only when both:
        - delta < -1h (large backward jump, not a small seek), AND
        - prev_frames was within the last hour of the 24h boundary.

        The second condition is critical: without it, a manual seek back to
        00:00:00:00 from ANY high-watermark MTC time (e.g. after the engine
        has been running for 4h and the user reloads a project, which sends
        a Full-Frame SYSEX SEEK to frame 0) is mistakenly treated as a 24h
        wrap, adding a phantom 2,160,000-frame offset that corrupts every
        downstream timestamp (cue offsets become -2160k, video layers try
        to seek to frame -2.5M of a 300-frame clip, layers stay in
        awaitingFrame forever, monitor goes black).

        For environments with manual seeking, deltas under 1h are treated
        as seeks (no offset accumulated; existing reset detection still
        fires when main_tc.milliseconds_rounded reaches 0).

        Returns the offset-adjusted CTimecode (or the original if no offset
        is active).
        """
        if self._last_decoded_frames is not None:
            delta = decoded.frames - self._last_decoded_frames
            frames_per_hour = decoded._int_framerate * 3600
            frames_per_24h = decoded._int_framerate * 86400
            near_24h_boundary = (
                self._last_decoded_frames > frames_per_24h - frames_per_hour
            )
            if delta < -frames_per_hour and near_24h_boundary:
                self._24h_offset_frames += frames_per_24h
                Logger.info(
                    f'MtcListener: detected 24h MTC rollover '
                    f'(prev frames={self._last_decoded_frames}, '
                    f'new={decoded.frames}, delta={delta}); '
                    f'accumulated offset = {self._24h_offset_frames} frames '
                    f'({self._24h_offset_frames / decoded._int_framerate / 3600:.1f}h)'
                )
            elif delta < -frames_per_hour:
                Logger.info(
                    f'MtcListener: large backward MTC jump ignored as '
                    f'manual seek (prev frames={self._last_decoded_frames}, '
                    f'new={decoded.frames}, delta={delta}); not a 24h wrap '
                    f'(prev < {frames_per_24h - frames_per_hour})'
                )
        self._last_decoded_frames = decoded.frames

        if self._24h_offset_frames > 0:
            return CTimecode(
                framerate=decoded.framerate,
                frames=decoded.frames + self._24h_offset_frames,
            )
        return decoded


    def timecode(self):
        return self.main_tc

    def milliseconds(self):
        return int(self.main_tc.frames * (1000 / float(self.main_tc._framerate))) # type: ignore[attr-defined]

    def __update_timecode(self, timecode):
        self.main_tc = timecode
        if (self.main_tc.milliseconds_rounded == 0):
            if self.step_callback != None and self.reset_callback != None:
                self.reset_callback()
        if self.step_callback != None:
            self.step_callback(self.main_tc)

    def __open_port(self, port):
        # HEADLESS/CLOUD: get_input_names() can throw when no MIDI subsystem is
        # present; catch and treat as empty list so the engine keeps running.
        # port_name is left as None and re-detected later in ControllerEngine.start()
        # once the timecode sender has created the virtual MIDI port.
        try:
            ports = mido.get_input_names() # type: ignore[attr-defined]
        except Exception as e:
            Logger.warning(f'Could not list MIDI input ports: {e}')
            ports = []

        if port is not None:
            # Exact match first; fall back to substring match because ALSA/JACK
            # port names include the client name and ID suffix
            # e.g. "Midi Through Port-0" → "Midi Through:Midi Through Port-0 14:0"
            if port in ports:
                self.port_name = port
            else:
                matches = [p for p in ports if port in p]
                if matches:
                    self.port_name = matches[0]
                    Logger.info(f'MIDI port "{port}" matched as "{self.port_name}"')
                else:
                    Logger.warning(f'MIDI port "{port}" not found, auto-detecting...')
                    port = None  # fall through to auto-detect

        if port is None:
            # Prefer ports whose name contains "mtc" (e.g. MtcMaster:MTCPort)
            mtc_ports = [s for s in ports if "mtc" in s.lower()]
            if mtc_ports:
                self.port_name = mtc_ports[-1]
            elif ports:
                self.port_name = ports[-1]
            else:
                # HEADLESS/CLOUD: no ports yet; caller must retry after the
                # virtual MIDI sender port has been created.
                self.port_name = None
                Logger.warning('No MIDI input ports available')
        if self.port_name:
            Logger.info(f'MtcListener will use MIDI port: {self.port_name}')

    def run(self):
        Logger.debug('Starting MTC listener')
        self.port = mido.open_input( # type: ignore[attr-defined]
            self.port_name,
            callback = self.__handle_message
        )
        Logger.info('Listening to MIDI messages on > {} <'.format(self.port_name))

    def stop(self):
        if self.port is not None:
            self.port.close()

    def __handle_message(self, message):
        if message.type == 'quarter_frame':        
            self.__quarter_frames[message.frame_type] = message.frame_value
            if (message.frame_type == 3) or (message.frame_type == 7):
                self.__update_timecode(self.main_tc + 1)
            #    print('QF+:',self.main_tc)
            if message.frame_type == 7:
                tc = self.__mtc_decode_quarter_frames(self.__quarter_frames)
            #    print('QFC:',tc)
                self.__update_timecode(tc)
        elif message.type == 'sysex':
            # check to see if this is a timecode frame
            if len(message.data) == 8 and message.data[0:4] == (127,127,1,1):
                data = message.data[4:]
                tc = self.__mtc_decode(data)
                Logger.debug('FF:' + tc.__str__())
                self.__update_timecode(tc)
        else:
            Logger.debug(message)
            raise(NotImplementedError)

    def __mtc_decode(self, mtc_bytes):
        #print(mtc_bytes)
        rhh, mins, secs, frs = mtc_bytes
        rateflag = rhh >> 5
        hrs      = rhh & 31
        fps = ['24','25','29.97','30'][rateflag]
        # total_frames = frs + float(fps) * (secs + mins * 60 + hrs * 60 * 60) //  TODO: goes to frame 0 in tc, non existent frame, changed to tc 0:0:0:0 = frame 1
        decoded = CTimecode('{}:{}:{}:{}'.format(hrs, mins, secs, frs), framerate=fps)
        # Route through 24h-wrap detection so main_tc stays monotonic past 24h.
        # See _apply_24h_offset docstring for heuristic details (closes 869cpdbzy).
        return self._apply_24h_offset(decoded)

    def __mtc_decode_full_frame(self, full_frame_bytes):
        mtc_bytes = full_frame_bytes[5:-1]
        return self.__mtc_decode(mtc_bytes)

    def __mtc_decode_quarter_frames(self, frame_pieces):
        mtc_bytes = bytearray(4)
        if len(frame_pieces) < 8:
            return None
        for piece in range(8):
            mtc_index = 3 - piece//2    # quarter frame pieces are in reverse order of mtc_encode
            this_frame = frame_pieces[piece]
            if this_frame is bytearray or this_frame is list:
                this_frame = this_frame[1] # type: ignore[index]
            # ignore the frame_piece marker bits
            data = this_frame & 15      # type: ignore[operator]
            if piece % 2 == 0:
                # 'even' pieces came from the low nibble
                # and the first piece is 0, so it's even
                mtc_bytes[mtc_index] += data
            else:
                # 'odd' pieces came from the high nibble
                mtc_bytes[mtc_index] += data * 16
        return self.__mtc_decode(mtc_bytes)

PortHandler

Bases: object

Source code in src/cuemsengine/tools/PortHandler.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
class PortHandler(object):
    def __new__(cls):
        """
        Singleton class responsible for handling port objects.

        Holds a list of used ports and manages the assignment of new ports.
        The ports are assigned to a cue
        Config ports are ports that are ports assigned with None as key
        Thread-safe: internal state mutations are guarded by a Lock.
        """
        if not hasattr(cls, '_instance'):
            cls._instance = super(PortHandler, cls).__new__(cls)
            cls._instance._lock = RLock()
            cls._instance._ports = {None: {}}
            cls._instance._all_used_ports = []
            cls._instance._all_available_ports = set(range(INITIAL_PORT, MAX_PORT))
            cls._instance._random_ports = []
        return cls._instance

    def assign_ports(self, names: list[str], cue: CuemsDict = None) -> dict:
        """Assign free ports to a list of names

        This method is thread-safe and should be the preferred way to assign ports to a list of names for a cue or config.

        Args:
            names: The names to assign ports to
            cue: The cue to assign ports to
        """
        with self._lock:
            new_ports = self.get_free_ports(len(names))
        out = {k: new_ports[i] for i,k in enumerate(names)}
        if cue is None:
            self.add_config_ports(out)
        else:
            self.set_ports(cue, out)
        return out

    def last_port(self) -> int:
        """
        Get the last port
        """
        with self._lock:
            return self._ports[-1]

    def get_ports(self, cue: CuemsDict) -> dict | None:
        """
        Get the ports for a cue
        """
        with self._lock:
            return self._ports.get(cue, None)

    def set_ports(self, cue: CuemsDict, ports: list | dict, check_range: bool = True) -> None:
        """
        Set the ports for a cue
        """
        previous_ports = self.get_ports(cue)
        if previous_ports == ports:
            return
        ports_list = self.check_ports(ports, check_range)
        self._all_used_ports.extend(ports_list)
        if previous_ports is not None:
            ports.update(previous_ports)
        self._ports[cue] = ports

    def remove_ports(self, cue: CuemsDict):
        """
        Remove the ports for a cue
        """
        if self.get_ports(cue) is not None:
            with self._lock:
                p = self._ports.pop(cue)
                new_ports = set(self._all_used_ports) - set(p.values())
                self._all_used_ports = list(new_ports)

    def get_all_used_ports(self) -> set:
        """
        Get the set of all used ports (assigned ports + random ports combined)
        """
        with self._lock:
            Logger.debug(f"All used ports: {self._all_used_ports}")
            Logger.debug(f'Random ports: {self._random_ports}')
            return set(self._all_used_ports) | set(self._random_ports)

    def check_ports(self, ports: list | dict, check_range: bool = True) -> list:
        """
        Check the ports for a cue and return the list of ports if they are valid

        Args:
            ports: The ports to check
            check_range: Whether to check the port range

        Returns:
            The ports list if they are valid

        Raises:
            ValueError:
            - If duplicate ports are found
            - If ports are already in use
            - If check_range is True and the port range is invalid
        """
        if isinstance(ports, dict):
            ports = [i for i in ports.values()]
        if len(ports) > len(set(ports)):
            raise ValueError(f"Duplicate ports found")
        all_used_ports = set(self.get_all_used_ports())
        if all_used_ports & set(ports):
            raise ValueError(f"Ports already in use: {all_used_ports & set(ports)}")
        if check_range:
            self.check_port_range(ports)
        return ports

    @staticmethod
    def check_port_range(ports: list) -> None:
        """
        Check the port range
        """
        for port in ports:
            if port > MAX_PORT:
                raise ValueError(f"Port {port} is too high")
            if port < INITIAL_PORT:
                raise ValueError(f"Port {port} is too low")

    def get_free_port(self) -> int:
        """
        Get a free port

        Thread-safe: internal state mutations are guarded by a Lock.

        Returns:
            The free port
        Raises:
            ValueError: If no free ports are found
        """
        available_ports = self._all_available_ports - set(self.get_all_used_ports())
        if not available_ports:
            raise ValueError(f"No free ports found")
        return choice(list(available_ports))

    def get_free_ports(self, n: int) -> list:
        """
        Get n free ports
        """
        return [self.get_free_port() for _ in range(n)]

    def find_system_ports(self) -> list:
        """
        Find all system ports used on the system
        """
        return get_used_ports_with_pid()

    def add_system_ports(self):
        """
        Add all system ports to the configuration dictionary
        """
        self.add_config_ports(self.find_system_ports())

    def add_config_ports(self, ports: list | dict):
        """
        Add new ports to the configuration dictionary
        """
        with self._lock:
            config_ports = self.get_ports(None)
            config_ports.update(ports)
            self.set_ports(None, config_ports, check_range=False)

    def new_random_port(self) -> int:
        """
        Get a new random port and store it
        """
        port = self.get_free_port()
        self.store_random_port(port)
        return port

    def store_random_port(self, port: int):
        """
        Store a random port to the random ports set
        """
        with self._lock:
            self._random_ports.append(port)

    def remove_random_port(self, port: int):
        """
        Remove a specific port from the random ports list, freeing it for reuse.
        Called when an OSC client that owned the port is closed.
        """
        with self._lock:
            try:
                self._random_ports.remove(port)
            except ValueError:
                pass

    def clean_random_ports(self):
        """
        Clean the random ports set by keeping only ports that are in use by the system
        """
        sys_ports = [i for i in self.find_system_ports().values() if i in self._random_ports]
        with self._lock:
            self._random_ports = [i for i in self._random_ports if i in sys_ports]

__new__()

Singleton class responsible for handling port objects.

Holds a list of used ports and manages the assignment of new ports. The ports are assigned to a cue Config ports are ports that are ports assigned with None as key Thread-safe: internal state mutations are guarded by a Lock.

Source code in src/cuemsengine/tools/PortHandler.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def __new__(cls):
    """
    Singleton class responsible for handling port objects.

    Holds a list of used ports and manages the assignment of new ports.
    The ports are assigned to a cue
    Config ports are ports that are ports assigned with None as key
    Thread-safe: internal state mutations are guarded by a Lock.
    """
    if not hasattr(cls, '_instance'):
        cls._instance = super(PortHandler, cls).__new__(cls)
        cls._instance._lock = RLock()
        cls._instance._ports = {None: {}}
        cls._instance._all_used_ports = []
        cls._instance._all_available_ports = set(range(INITIAL_PORT, MAX_PORT))
        cls._instance._random_ports = []
    return cls._instance

add_config_ports(ports)

Add new ports to the configuration dictionary

Source code in src/cuemsengine/tools/PortHandler.py
171
172
173
174
175
176
177
178
def add_config_ports(self, ports: list | dict):
    """
    Add new ports to the configuration dictionary
    """
    with self._lock:
        config_ports = self.get_ports(None)
        config_ports.update(ports)
        self.set_ports(None, config_ports, check_range=False)

add_system_ports()

Add all system ports to the configuration dictionary

Source code in src/cuemsengine/tools/PortHandler.py
165
166
167
168
169
def add_system_ports(self):
    """
    Add all system ports to the configuration dictionary
    """
    self.add_config_ports(self.find_system_ports())

assign_ports(names, cue=None)

Assign free ports to a list of names

This method is thread-safe and should be the preferred way to assign ports to a list of names for a cue or config.

Parameters:

Name Type Description Default
names list[str]

The names to assign ports to

required
cue CuemsDict

The cue to assign ports to

None
Source code in src/cuemsengine/tools/PortHandler.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def assign_ports(self, names: list[str], cue: CuemsDict = None) -> dict:
    """Assign free ports to a list of names

    This method is thread-safe and should be the preferred way to assign ports to a list of names for a cue or config.

    Args:
        names: The names to assign ports to
        cue: The cue to assign ports to
    """
    with self._lock:
        new_ports = self.get_free_ports(len(names))
    out = {k: new_ports[i] for i,k in enumerate(names)}
    if cue is None:
        self.add_config_ports(out)
    else:
        self.set_ports(cue, out)
    return out

check_port_range(ports) staticmethod

Check the port range

Source code in src/cuemsengine/tools/PortHandler.py
126
127
128
129
130
131
132
133
134
135
@staticmethod
def check_port_range(ports: list) -> None:
    """
    Check the port range
    """
    for port in ports:
        if port > MAX_PORT:
            raise ValueError(f"Port {port} is too high")
        if port < INITIAL_PORT:
            raise ValueError(f"Port {port} is too low")

check_ports(ports, check_range=True)

Check the ports for a cue and return the list of ports if they are valid

Parameters:

Name Type Description Default
ports list | dict

The ports to check

required
check_range bool

Whether to check the port range

True

Returns:

Type Description
list

The ports list if they are valid

Raises:

Type Description
ValueError
Source code in src/cuemsengine/tools/PortHandler.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def check_ports(self, ports: list | dict, check_range: bool = True) -> list:
    """
    Check the ports for a cue and return the list of ports if they are valid

    Args:
        ports: The ports to check
        check_range: Whether to check the port range

    Returns:
        The ports list if they are valid

    Raises:
        ValueError:
        - If duplicate ports are found
        - If ports are already in use
        - If check_range is True and the port range is invalid
    """
    if isinstance(ports, dict):
        ports = [i for i in ports.values()]
    if len(ports) > len(set(ports)):
        raise ValueError(f"Duplicate ports found")
    all_used_ports = set(self.get_all_used_ports())
    if all_used_ports & set(ports):
        raise ValueError(f"Ports already in use: {all_used_ports & set(ports)}")
    if check_range:
        self.check_port_range(ports)
    return ports

clean_random_ports()

Clean the random ports set by keeping only ports that are in use by the system

Source code in src/cuemsengine/tools/PortHandler.py
206
207
208
209
210
211
212
def clean_random_ports(self):
    """
    Clean the random ports set by keeping only ports that are in use by the system
    """
    sys_ports = [i for i in self.find_system_ports().values() if i in self._random_ports]
    with self._lock:
        self._random_ports = [i for i in self._random_ports if i in sys_ports]

find_system_ports()

Find all system ports used on the system

Source code in src/cuemsengine/tools/PortHandler.py
159
160
161
162
163
def find_system_ports(self) -> list:
    """
    Find all system ports used on the system
    """
    return get_used_ports_with_pid()

get_all_used_ports()

Get the set of all used ports (assigned ports + random ports combined)

Source code in src/cuemsengine/tools/PortHandler.py
89
90
91
92
93
94
95
96
def get_all_used_ports(self) -> set:
    """
    Get the set of all used ports (assigned ports + random ports combined)
    """
    with self._lock:
        Logger.debug(f"All used ports: {self._all_used_ports}")
        Logger.debug(f'Random ports: {self._random_ports}')
        return set(self._all_used_ports) | set(self._random_ports)

get_free_port()

Get a free port

Thread-safe: internal state mutations are guarded by a Lock.

Returns:

Type Description
int

The free port

Raises: ValueError: If no free ports are found

Source code in src/cuemsengine/tools/PortHandler.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def get_free_port(self) -> int:
    """
    Get a free port

    Thread-safe: internal state mutations are guarded by a Lock.

    Returns:
        The free port
    Raises:
        ValueError: If no free ports are found
    """
    available_ports = self._all_available_ports - set(self.get_all_used_ports())
    if not available_ports:
        raise ValueError(f"No free ports found")
    return choice(list(available_ports))

get_free_ports(n)

Get n free ports

Source code in src/cuemsengine/tools/PortHandler.py
153
154
155
156
157
def get_free_ports(self, n: int) -> list:
    """
    Get n free ports
    """
    return [self.get_free_port() for _ in range(n)]

get_ports(cue)

Get the ports for a cue

Source code in src/cuemsengine/tools/PortHandler.py
59
60
61
62
63
64
def get_ports(self, cue: CuemsDict) -> dict | None:
    """
    Get the ports for a cue
    """
    with self._lock:
        return self._ports.get(cue, None)

last_port()

Get the last port

Source code in src/cuemsengine/tools/PortHandler.py
52
53
54
55
56
57
def last_port(self) -> int:
    """
    Get the last port
    """
    with self._lock:
        return self._ports[-1]

new_random_port()

Get a new random port and store it

Source code in src/cuemsengine/tools/PortHandler.py
180
181
182
183
184
185
186
def new_random_port(self) -> int:
    """
    Get a new random port and store it
    """
    port = self.get_free_port()
    self.store_random_port(port)
    return port

remove_ports(cue)

Remove the ports for a cue

Source code in src/cuemsengine/tools/PortHandler.py
79
80
81
82
83
84
85
86
87
def remove_ports(self, cue: CuemsDict):
    """
    Remove the ports for a cue
    """
    if self.get_ports(cue) is not None:
        with self._lock:
            p = self._ports.pop(cue)
            new_ports = set(self._all_used_ports) - set(p.values())
            self._all_used_ports = list(new_ports)

remove_random_port(port)

Remove a specific port from the random ports list, freeing it for reuse. Called when an OSC client that owned the port is closed.

Source code in src/cuemsengine/tools/PortHandler.py
195
196
197
198
199
200
201
202
203
204
def remove_random_port(self, port: int):
    """
    Remove a specific port from the random ports list, freeing it for reuse.
    Called when an OSC client that owned the port is closed.
    """
    with self._lock:
        try:
            self._random_ports.remove(port)
        except ValueError:
            pass

set_ports(cue, ports, check_range=True)

Set the ports for a cue

Source code in src/cuemsengine/tools/PortHandler.py
66
67
68
69
70
71
72
73
74
75
76
77
def set_ports(self, cue: CuemsDict, ports: list | dict, check_range: bool = True) -> None:
    """
    Set the ports for a cue
    """
    previous_ports = self.get_ports(cue)
    if previous_ports == ports:
        return
    ports_list = self.check_ports(ports, check_range)
    self._all_used_ports.extend(ports_list)
    if previous_ports is not None:
        ports.update(previous_ports)
    self._ports[cue] = ports

store_random_port(port)

Store a random port to the random ports set

Source code in src/cuemsengine/tools/PortHandler.py
188
189
190
191
192
193
def store_random_port(self, port: int):
    """
    Store a random port to the random ports set
    """
    with self._lock:
        self._random_ports.append(port)

Reader for /run/cuems/display.conf.

display.conf is written by cuems-generate-display-conf (ExecStartPre of cuems-videocomposer.service) and read by both the videocomposer (for DRM modeset + canvas layout) and the engine (for canvas geometry + per-output canvas regions, replacing the broken x = index * 1920 heuristic that used to live in NodeEngine.set_video_outputs).

File format is INI-like:

canvas_layout=custom
canvas_size=5760x1080        # optional, overrides bbox

[output:HDMI-A-1]
canvas_region=0,0,1920,1080
resolution=1920x1080         # optional
refresh=60.0                 # optional

The engine consumes canvas_region and the optional global canvas_size override; resolution + refresh are read by the videocomposer directly.

DisplayConfNotFoundError

Bases: RuntimeError

display.conf is missing, unreadable, or has no [output:*] sections.

Source code in src/cuemsengine/tools/display_conf.py
36
37
class DisplayConfNotFoundError(RuntimeError):
    """display.conf is missing, unreadable, or has no [output:*] sections."""

DisplayConfValueError

Bases: RuntimeError

display.conf is present but contains an invalid value (e.g. a canvas_size override that is malformed, non-positive, or smaller than the per-output region bounding box).

Source code in src/cuemsengine/tools/display_conf.py
40
41
42
43
class DisplayConfValueError(RuntimeError):
    """display.conf is present but contains an invalid value (e.g. a
    canvas_size override that is malformed, non-positive, or smaller than
    the per-output region bounding box)."""

read_display_conf(path=DEFAULT_DISPLAY_CONF)

Parse display.conf and return (regions, canvas_size).

Returns a 2-tuple:

  • regions: {connector_name: {'x', 'y', 'width', 'height'}} with pixel-int values.
  • canvas_size: (canvas_width, canvas_height). If the global canvas_size=WIDTHxHEIGHT key is present in the file's preamble, it is used (after validating it is >= the per-region bounding box). Otherwise, computed as max(x + width, y + height) over all regions.

Raises:

  • DisplayConfNotFoundError if the file is missing or has no [output:*] sections.
  • DisplayConfValueError if canvas_size= is malformed, has non-positive values, or is smaller than the per-region bbox.
Source code in src/cuemsengine/tools/display_conf.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def read_display_conf(path: str = DEFAULT_DISPLAY_CONF) -> Tuple[dict, Tuple[int, int]]:
    """Parse display.conf and return ``(regions, canvas_size)``.

    Returns a 2-tuple:

    - ``regions``: ``{connector_name: {'x', 'y', 'width', 'height'}}`` with
      pixel-int values.
    - ``canvas_size``: ``(canvas_width, canvas_height)``. If the global
      ``canvas_size=WIDTHxHEIGHT`` key is present in the file's preamble,
      it is used (after validating it is >= the per-region bounding box).
      Otherwise, computed as ``max(x + width, y + height)`` over all regions.

    Raises:

    - ``DisplayConfNotFoundError`` if the file is missing or has no
      ``[output:*]`` sections.
    - ``DisplayConfValueError`` if ``canvas_size=`` is malformed, has
      non-positive values, or is smaller than the per-region bbox.
    """
    if not os.path.isfile(path):
        raise DisplayConfNotFoundError(
            f"display.conf not found at {path}; "
            "videocomposer must run first (its ExecStartPre generates it)"
        )

    with open(path) as f:
        body = f.read()

    # Pre-pass: extract global preamble (everything before the first
    # [section] header) so we can scan it for `canvas_size=`. The
    # ConfigParser path below DISCARDS the preamble — without this
    # pre-pass the override is silently lost.
    if body.lstrip().startswith("["):
        preamble = ""
        sectioned = body.lstrip()
    elif "\n[" in body:
        split_at = body.find("\n[") + 1
        preamble = body[:split_at]
        sectioned = body[split_at:]
    else:
        preamble = body
        sectioned = ""

    canvas_override = _parse_canvas_size_override(preamble)

    parser = configparser.ConfigParser()
    parser.optionxform = str  # preserve key case for forward-compat keys
    if sectioned:
        parser.read_string(sectioned)

    regions: dict = {}
    for section in parser.sections():
        if not section.startswith("output:"):
            continue
        connector = section[len("output:"):]
        raw = parser.get(section, "canvas_region", fallback=None)
        if raw is None:
            continue
        parts = [p.strip() for p in raw.split(",")]
        if len(parts) != 4:
            continue
        try:
            x, y, w, h = (int(p) for p in parts)
        except ValueError:
            continue
        regions[connector] = {"x": x, "y": y, "width": w, "height": h}

    if not regions:
        raise DisplayConfNotFoundError(
            f"display.conf at {path} has no [output:*] sections with a "
            "valid canvas_region"
        )

    bbox_w = max(r["x"] + r["width"] for r in regions.values())
    bbox_h = max(r["y"] + r["height"] for r in regions.values())

    if canvas_override is not None:
        cw, ch = canvas_override
        if cw < bbox_w or ch < bbox_h:
            raise DisplayConfValueError(
                f"canvas_size={cw}x{ch} is smaller than the per-output "
                f"bounding box {bbox_w}x{bbox_h}; monitors would be cropped"
            )
        return regions, (cw, ch)

    return regions, (bbox_w, bbox_h)

get_pid_by_port(target_port)

Get the PID using a specific port.

Parameters:

Name Type Description Default
target_port int

The port number to look up

required

Returns:

Type Description
Optional[int]

Optional[int]: The process ID if found, None otherwise

Example

pid = get_pid_by_port(8080) print(pid) 1234

Source code in src/cuemsengine/tools/system_ports.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def get_pid_by_port(target_port: int) -> Optional[int]:
    """
    Get the PID using a specific port.

    Args:
        target_port (int): The port number to look up

    Returns:
        Optional[int]: The process ID if found, None otherwise

    Example:
        >>> pid = get_pid_by_port(8080)
        >>> print(pid)
        1234
    """
    ports = get_used_ports_with_pid()
    # Reverse lookup: find PID by port
    for pid, port in ports.items():
        if port == target_port:
            return pid
    return None

get_port_by_pid(target_pid)

Get the port used by a specific PID.

Parameters:

Name Type Description Default
target_pid int

The process ID to look up

required

Returns:

Type Description
Optional[int]

Optional[int]: The port number if found, None otherwise

Example

port = get_port_by_pid(1234) print(port) 8080

Source code in src/cuemsengine/tools/system_ports.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def get_port_by_pid(target_pid: int) -> Optional[int]:
    """
    Get the port used by a specific PID.

    Args:
        target_pid (int): The process ID to look up

    Returns:
        Optional[int]: The port number if found, None otherwise

    Example:
        >>> port = get_port_by_pid(1234)
        >>> print(port)
        8080
    """
    ports = get_used_ports_with_pid()
    return ports.get(target_pid)

get_used_ports_with_pid(user=None)

Recover all used ports using the 'ss' command. Returns a dictionary with PID as key and port as value.

Parameters:

Name Type Description Default
user str

The user to filter ports by

None

Returns:

Type Description
Dict[str, int]

Dict[str, int]: Dictionary mapping PID to port

Example

ports = get_used_ports_with_pid() print(ports) {'1234': 8080, '5678': 9090}

Source code in src/cuemsengine/tools/system_ports.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
def get_used_ports_with_pid(user: str = None) -> Dict[str, int]:
    """
    Recover all used ports using the 'ss' command.
    Returns a dictionary with PID as key and port as value.

    Args:
        user (str): The user to filter ports by
        If no user is provided, all used ports will be returned.

    Returns:
        Dict[str, int]: Dictionary mapping PID to port

    Example:
        >>> ports = get_used_ports_with_pid()
        >>> print(ports)
        {'1234': 8080, '5678': 9090}
    """
    try:
        # Run 'ss -tulnp' to get all listening ports with process info
        result = subprocess.run(
            ['ss', '-tulnp'], 
            capture_output=True, 
            text=True, 
            check=True
        )

        # Parse the output to extract PIDs and ports
        pid_port_dict = {}
        pid = None
        port = None

        for line in result.stdout.strip().split('\n')[1:]:  # Skip header line
            if line.strip():
                if user and user not in line:
                    continue
                # Parse the ss output format
                parts = line.split()
                for part in parts:
                    if user and user not in part:
                        continue
                    if "pid=" in part:
                        pid_match = re.search(r'pid=(\d+)', part)
                        if pid_match:
                            pid = int(pid_match.group(1))
                            pid_port_dict[pid] = port
                    elif ":" in part:
                        try:
                            port = int(part.split(':')[-1])
                        except (ValueError, IndexError):
                            continue
                    else:
                        continue
                if pid and port:
                    pid_port_dict[str(pid)] = port
                pid = None
                port = None

        return pid_port_dict

    except subprocess.CalledProcessError as e:
        # Handle case where 'ss' command is not available or fails
        print(f"Warning: Could not execute 'ss' command: {e}")
        return {}
    except Exception as e:
        print(f"Error getting used ports: {e}")
        return {}

is_port_in_use(port)

Check if a specific port is in use.

Parameters:

Name Type Description Default
port int

The port number to check

required

Returns:

Name Type Description
bool bool

True if port is in use, False otherwise

Example

if is_port_in_use(8080): ... print("Port 8080 is in use") ... else: ... print("Port 8080 is available")

Source code in src/cuemsengine/tools/system_ports.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def is_port_in_use(port: int) -> bool:
    """
    Check if a specific port is in use.

    Args:
        port (int): The port number to check

    Returns:
        bool: True if port is in use, False otherwise

    Example:
        >>> if is_port_in_use(8080):
        ...     print("Port 8080 is in use")
        ... else:
        ...     print("Port 8080 is available")
    """
    ports = get_used_ports_with_pid()
    return port in ports.values()