Skip to content

Xml

CMLCuemsConverter

Bases: XMLSchemaConverter

Source code in src/cuemsutils/xml/CMLCuemsConverter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class CMLCuemsConverter(XMLSchemaConverter):

    def __init__(self, namespaces=None, dict_class=None, list_class=None,
                 etree_element_class=None, text_key='&', attr_prefix='',
                 cdata_prefix=None, indent=4, strip_namespaces=True,
                 preserve_root=False, force_dict=False, force_list=False, **kwargs):

        if etree_element_class is None or etree_element_class is Element:
            register_namespace = etree_register_namespace
        elif etree_element_class is lxml_etree_element:
            register_namespace = lxml_etree_register_namespace
        else:
            raise XMLSchemaTypeError("unsupported element class {!r}".format(etree_element_class))

        super().__init__(namespaces=None, register_namespace=register_namespace, strip_namespaces=strip_namespaces)

        self.dict = dict_class or dict
        self.list = list_class or list
        self.etree_element_class = etree_element_class or Element
        self.text_key = text_key
        self.attr_prefix = attr_prefix
        self.cdata_prefix = cdata_prefix
        self.indent = indent
        self.preserve_root = preserve_root
        self.force_dict = force_dict
        self.force_list = force_list


    def element_decode(self, data, xsd_element, xsd_type=None, level=0):
        """
        Converts a decoded element data to a data structure.
        :param data: ElementData instance decoded from an Element node.
        :param xsd_element: the `XsdElement` associated to decoded the data.
        :param xsd_type: optional `XsdType` for supporting dynamic type through \
        xsi:type or xs:alternative.
        :param level: the level related to the decoding process (0 means the root).
        :return: a data structure containing the decoded data.
        """
        xsd_type = xsd_type or xsd_element.type
        result_dict = self.dict()
        if level == 0 and xsd_element.is_global() and not self.strip_namespaces and self:
            schema_namespaces = set(xsd_element.namespaces.values())
            result_dict.update(
                ('%s:%s' % (self.ns_prefix, k) if k else self.ns_prefix, v)
                for k, v in self._namespaces.items()
                if v in schema_namespaces
            )

        if xsd_type.is_simple() or xsd_type.has_simple_content():
            if data.attributes or self.force_dict and not xsd_type.is_simple():
                result_dict.update(t for t in self.map_attributes(data.attributes))
                if data.text is not None and data.text != '':
                    result_dict[self.text_key] = data.text
                return result_dict
            else:
                return data.text if data.text != '' else None
        else:
#            has_single_group = xsd_type.content_type.is_single()
            list_types = list if self.list is list else (self.list, list)
            dict_types = dict if self.dict is dict else (self.dict, dict)
            if data.content:
                for name, value, xsd_child in self.map_content(data.content):
                    if isinstance(xsd_child, Xsd11AnyElement):
                        result_dict.update({name:value})
                        continue
                    try:
                        if isinstance(result_dict, list_types):
                            result = result_dict
                        else:
                            result = result_dict[name]
                    except KeyError:
                        if xsd_child is not None and not xsd_child.is_single():
                            result_dict = [{name:value}]
                        else:
                            result_dict[name] = self.list([value]) if self.force_list else value
                    else:
                        if isinstance(result, dict_types):
                            result_dict[name] = self.list([result, value])
                        elif isinstance(result, list_types) or not result:
                            result_dict.append({name:value})
                        else:
                            result.append(value)

            elif data.content is None and data.text is not None and data.text != '':
                result_dict = data.text

            elif data.text is not None and data.text != '':
                result_dict[self.text_key] = data.text

            if data.attributes:
                result_dict.update(t for t in self.map_attributes(data.attributes))

            if level == 0 and self.preserve_root:
                return self.dict(
                    [(self.map_qname(data.tag), result_dict if result_dict else None)]
                )
            return result_dict if result_dict else None

    def element_encode(self, obj, xsd_element, level=0):
        """
        Extracts XML decoded data from a data structure for encoding into an ElementTree.
        :param obj: the decoded object.
        :param xsd_element: the `XsdElement` associated to the decoded data structure.
        :param level: the level related to the encoding process (0 means the root).
        :return: an ElementData instance.
        """
        if level != 0:
            tag = xsd_element.name
        elif not self.preserve_root:
            tag = xsd_element.qualified_name
        else:
            tag = xsd_element.qualified_name
            try:
                obj = obj.get(tag, xsd_element.local_name)
            except (KeyError, AttributeError, TypeError):
                pass

        if not isinstance(obj, (self.dict, dict)):
            if xsd_element.type.is_simple() or xsd_element.type.has_simple_content():
                return ElementData(tag, obj, None, {})
            elif xsd_element.type.mixed and not isinstance(obj, list):
                return ElementData(tag, obj, None, {})
            else:
                return ElementData(tag, None, obj, {})

        text = None
        content = []
        attributes = {}

        for name, value in obj.items():
            if name == self.text_key and self.text_key:
                text = obj[self.text_key]
            elif (self.cdata_prefix and name.startswith(self.cdata_prefix)) or \
                    name[0].isdigit() and self.cdata_prefix == '':
                index = int(name[len(self.cdata_prefix):])
                content.append((index, value))
            elif name == self.ns_prefix:
                self[''] = value
            elif name.startswith('%s:' % self.ns_prefix):
                if not self.strip_namespaces:
                    self[name[len(self.ns_prefix) + 1:]] = value
            elif self.attr_prefix and name.startswith(self.attr_prefix):
                attr_name = name[len(self.attr_prefix):]
                ns_name = self.unmap_qname(attr_name, xsd_element.attributes)
                attributes[ns_name] = value
            elif not isinstance(value, (self.list, list)) or not value:
                content.append((self.unmap_qname(name), value))
            elif isinstance(value[0], (self.dict, dict, self.list, list)):
                ns_name = self.unmap_qname(name)
                content.extend((ns_name, item) for item in value)
            else:
                ns_name = self.unmap_qname(name)
                for xsd_child in xsd_element.type.content_type.iter_elements():
                    matched_element = xsd_child.match(ns_name, resolve=True)
                    if matched_element is not None:
                        if matched_element.type.is_list():
                            content.append((ns_name, value))
                        else:
                            content.extend((ns_name, item) for item in value)
                        break
                else:
                    if self.attr_prefix == '' and ns_name not in attributes:
                        for key, xsd_attribute in xsd_element.attributes.items():
                            if xsd_attribute.is_matching(ns_name):
                                attributes[key] = value
                                break
                        else:
                            content.append((ns_name, value))
                    else:
                        content.append((ns_name, value))

        return ElementData(tag, text, content, attributes)

element_decode(data, xsd_element, xsd_type=None, level=0)

Converts a decoded element data to a data structure. :param data: ElementData instance decoded from an Element node. :param xsd_element: the XsdElement associated to decoded the data. :param xsd_type: optional XsdType for supporting dynamic type through xsi:type or xs:alternative. :param level: the level related to the decoding process (0 means the root). :return: a data structure containing the decoded data.

Source code in src/cuemsutils/xml/CMLCuemsConverter.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
    def element_decode(self, data, xsd_element, xsd_type=None, level=0):
        """
        Converts a decoded element data to a data structure.
        :param data: ElementData instance decoded from an Element node.
        :param xsd_element: the `XsdElement` associated to decoded the data.
        :param xsd_type: optional `XsdType` for supporting dynamic type through \
        xsi:type or xs:alternative.
        :param level: the level related to the decoding process (0 means the root).
        :return: a data structure containing the decoded data.
        """
        xsd_type = xsd_type or xsd_element.type
        result_dict = self.dict()
        if level == 0 and xsd_element.is_global() and not self.strip_namespaces and self:
            schema_namespaces = set(xsd_element.namespaces.values())
            result_dict.update(
                ('%s:%s' % (self.ns_prefix, k) if k else self.ns_prefix, v)
                for k, v in self._namespaces.items()
                if v in schema_namespaces
            )

        if xsd_type.is_simple() or xsd_type.has_simple_content():
            if data.attributes or self.force_dict and not xsd_type.is_simple():
                result_dict.update(t for t in self.map_attributes(data.attributes))
                if data.text is not None and data.text != '':
                    result_dict[self.text_key] = data.text
                return result_dict
            else:
                return data.text if data.text != '' else None
        else:
#            has_single_group = xsd_type.content_type.is_single()
            list_types = list if self.list is list else (self.list, list)
            dict_types = dict if self.dict is dict else (self.dict, dict)
            if data.content:
                for name, value, xsd_child in self.map_content(data.content):
                    if isinstance(xsd_child, Xsd11AnyElement):
                        result_dict.update({name:value})
                        continue
                    try:
                        if isinstance(result_dict, list_types):
                            result = result_dict
                        else:
                            result = result_dict[name]
                    except KeyError:
                        if xsd_child is not None and not xsd_child.is_single():
                            result_dict = [{name:value}]
                        else:
                            result_dict[name] = self.list([value]) if self.force_list else value
                    else:
                        if isinstance(result, dict_types):
                            result_dict[name] = self.list([result, value])
                        elif isinstance(result, list_types) or not result:
                            result_dict.append({name:value})
                        else:
                            result.append(value)

            elif data.content is None and data.text is not None and data.text != '':
                result_dict = data.text

            elif data.text is not None and data.text != '':
                result_dict[self.text_key] = data.text

            if data.attributes:
                result_dict.update(t for t in self.map_attributes(data.attributes))

            if level == 0 and self.preserve_root:
                return self.dict(
                    [(self.map_qname(data.tag), result_dict if result_dict else None)]
                )
            return result_dict if result_dict else None

element_encode(obj, xsd_element, level=0)

Extracts XML decoded data from a data structure for encoding into an ElementTree. :param obj: the decoded object. :param xsd_element: the XsdElement associated to the decoded data structure. :param level: the level related to the encoding process (0 means the root). :return: an ElementData instance.

Source code in src/cuemsutils/xml/CMLCuemsConverter.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def element_encode(self, obj, xsd_element, level=0):
    """
    Extracts XML decoded data from a data structure for encoding into an ElementTree.
    :param obj: the decoded object.
    :param xsd_element: the `XsdElement` associated to the decoded data structure.
    :param level: the level related to the encoding process (0 means the root).
    :return: an ElementData instance.
    """
    if level != 0:
        tag = xsd_element.name
    elif not self.preserve_root:
        tag = xsd_element.qualified_name
    else:
        tag = xsd_element.qualified_name
        try:
            obj = obj.get(tag, xsd_element.local_name)
        except (KeyError, AttributeError, TypeError):
            pass

    if not isinstance(obj, (self.dict, dict)):
        if xsd_element.type.is_simple() or xsd_element.type.has_simple_content():
            return ElementData(tag, obj, None, {})
        elif xsd_element.type.mixed and not isinstance(obj, list):
            return ElementData(tag, obj, None, {})
        else:
            return ElementData(tag, None, obj, {})

    text = None
    content = []
    attributes = {}

    for name, value in obj.items():
        if name == self.text_key and self.text_key:
            text = obj[self.text_key]
        elif (self.cdata_prefix and name.startswith(self.cdata_prefix)) or \
                name[0].isdigit() and self.cdata_prefix == '':
            index = int(name[len(self.cdata_prefix):])
            content.append((index, value))
        elif name == self.ns_prefix:
            self[''] = value
        elif name.startswith('%s:' % self.ns_prefix):
            if not self.strip_namespaces:
                self[name[len(self.ns_prefix) + 1:]] = value
        elif self.attr_prefix and name.startswith(self.attr_prefix):
            attr_name = name[len(self.attr_prefix):]
            ns_name = self.unmap_qname(attr_name, xsd_element.attributes)
            attributes[ns_name] = value
        elif not isinstance(value, (self.list, list)) or not value:
            content.append((self.unmap_qname(name), value))
        elif isinstance(value[0], (self.dict, dict, self.list, list)):
            ns_name = self.unmap_qname(name)
            content.extend((ns_name, item) for item in value)
        else:
            ns_name = self.unmap_qname(name)
            for xsd_child in xsd_element.type.content_type.iter_elements():
                matched_element = xsd_child.match(ns_name, resolve=True)
                if matched_element is not None:
                    if matched_element.type.is_list():
                        content.append((ns_name, value))
                    else:
                        content.extend((ns_name, item) for item in value)
                    break
            else:
                if self.attr_prefix == '' and ns_name not in attributes:
                    for key, xsd_attribute in xsd_element.attributes.items():
                        if xsd_attribute.is_matching(ns_name):
                            attributes[key] = value
                            break
                    else:
                        content.append((ns_name, value))
                else:
                    content.append((ns_name, value))

    return ElementData(tag, text, content, attributes)

fade_profileParser

Bases: GenericParser

Parse a single fade_profile element into a :class:FadeProfile.

Source code in src/cuemsutils/xml/Parsers.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
class fade_profileParser(GenericParser):
    """Parse a single ``fade_profile`` element into a :class:`FadeProfile`."""

    def parse(self):
        d = {}
        for dict_key, dict_value in self.init_dict.items():
            if dict_key == 'parameters':
                d['parameters'] = _normalize_fade_parameters(dict_value)
            elif isinstance(dict_value, dict):
                sub_parser, sub_cls = self.get_parser_class(dict_key)
                d[dict_key] = sub_parser(
                    init_dict=dict_value, class_string=sub_cls
                ).parse()
            elif isinstance(dict_value, list):
                pcls, pstr = self.get_parser_class(dict_key)
                d[dict_key] = [
                    pcls(init_dict=li, class_string=pstr).parse() for li in dict_value
                ]
            else:
                d[dict_key] = self.str_to_value(dict_value)
        return FadeProfile(d)

fade_profilesParser

Bases: GenericParser

Parse fade_profiles wrapper content into a list of :class:FadeProfile.

Source code in src/cuemsutils/xml/Parsers.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
class fade_profilesParser(GenericParser):
    """Parse ``fade_profiles`` wrapper content into a list of :class:`FadeProfile`."""

    def parse(self):
        if not self.init_dict or not isinstance(self.init_dict, dict):
            return []
        raw = self.init_dict.get('fade_profile')
        if raw is None:
            return []
        if not isinstance(raw, list):
            raw = [raw]
        return [
            item
            if isinstance(item, FadeProfile)
            else fade_profileParser(
                init_dict=item, class_string='fade_profile'
            ).parse()
            for item in raw
        ]

For the moment it works with pip3 install xmlschema==1.1.2

NetworkMap

Bases: Settings

NetworkMap class that extends Settings to handle network map operations.

Source code in src/cuemsutils/xml/Settings.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class NetworkMap(Settings):
    """
    NetworkMap class that extends Settings to handle network map operations.
    """
    def __init__(self, xmlfile, schema_name = 'network_map', **kwargs):
        if not hasattr(self, 'main_key'):
            self.main_key = ''
        super().__init__(
            xmlfile,
            schema_name,
            xml_root_tag='CuemsNetworkMap',
            **kwargs
        )

    def get_node(self, uuid):
        out = None
        network_dict = self.get_dict()
        nodes_list = network_dict.get('node_list')
        for node_item in nodes_list:
            node = node_item.get('node')
            if node.get('uuid') == uuid:
                out = node
                break
        if not out:
            raise ValueError(f'Node with uuid {uuid} not found')
        return out

    @staticmethod
    def get_nodes_by_adoption(network_map_dict: dict[str, Any]) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
        nodes = []
        new_nodes = []

        if not network_map_dict:
            raise ValueError('No network map dictionary found')
        node_list = network_map_dict.get('node_list', [])
        if not node_list:
            raise ValueError('No node list found in network map dictionary')
        for node_item in node_list:
            if 'node' in node_item:
                # Convert boolean strings directly in the node_item structure
                node_item['node']['online'] = strtobool(node_item['node'].get('online', 'False'))
                node_item['node']['adopted'] = strtobool(node_item['node'].get('adopted', 'False'))

                # Append the node_item directly (it already has the wrapper structure)
                if node_item['node']['adopted']:
                    nodes.append(node_item)
                else:
                    new_nodes.append(node_item)

        return nodes, new_nodes

ProjectMappings

Bases: Settings

Mappings class that extends Settings to handle hardware mappings operations.

Source code in src/cuemsutils/xml/Settings.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
class ProjectMappings(Settings):
    """
    Mappings class that extends Settings to handle hardware mappings operations.
    """

    # Absorb float round-trip noise; mirrors cues.CueOutput._CONTAINMENT_EPS.
    _CONTAINMENT_EPS = 1e-6

    def __init__(self, xmlfile, schema_name = 'project_mappings', **kwargs):
        if not hasattr(self, 'main_key'):
            self.main_key = ''
        super().__init__(
            xmlfile,
            schema_name,
            xml_root_tag='CuemsProjectMappings',
            **kwargs
        )

    def get_node(self, uuid):
        out = None
        for node in self.processed['nodes']: # type: ignore[index]
            node = node['node']
            if node['uuid'] == uuid:
                out = node
                break
        if not out:
            raise ValueError(f'Node with uuid {uuid} not found')
        return out

    def process_xml_dict(self):
        self.processed = self.get_dict()
        self._validate_custom_templates()

    def _validate_custom_templates(self):
        """Enforce semantic rules on video outputs that XSD can't express.

        - canvas_region containment: x+width ≤ 1 and y+height ≤ 1.
        - At most one custom template (entry with canvas_region) per node.

        The per-node cap is a V1 constraint; see docs/canvas_region.md
        (Deferred — Multiple customs per cue / per node) for the lift path.

        Note on scope: canvas_region at the mappings level is a UI-template
        hint — it exposes a named custom slot to the editor's output-picker
        as a default starting rectangle. It does NOT describe physical
        monitor layout (that comes from videocomposer's DRM detection)
        nor per-cue output regions (those are in script.xsd's
        VideoCueOutput.canvas_region; see cuemsutils.cues.CueOutput).
        """
        for section in ('nodes', 'new_nodes'):
            for node_wrap in self.processed.get(section, []) or []:
                node = node_wrap.get('node') if isinstance(node_wrap, dict) else None
                if not node:
                    continue
                video = node.get('video')
                if not video:
                    continue
                template_count = 0
                uuid = node.get('uuid', '<unknown>')
                for video_group in video:
                    if not isinstance(video_group, dict):
                        continue
                    for output_wrap in video_group.get('outputs', []) or []:
                        output = output_wrap.get('output') if isinstance(output_wrap, dict) else None
                        if not output:
                            continue
                        region = output.get('canvas_region')
                        if region is None:
                            continue
                        self._check_region_containment(region, uuid)
                        template_count += 1
                if template_count > 1:
                    raise ValueError(
                        f"Node {uuid} has {template_count} custom templates "
                        f"(canvas_region entries); at most 1 is allowed"
                    )

    def _check_region_containment(self, region, uuid):
        """XSD already guarantees per-component ranges; check only the sums."""
        x = float(region.get('x', 0))
        y = float(region.get('y', 0))
        w = float(region.get('width', 0))
        h = float(region.get('height', 0))
        if x + w > 1.0 + self._CONTAINMENT_EPS:
            raise ValueError(
                f"Node {uuid} canvas_region x+width must be <= 1, got {x + w}"
            )
        if y + h > 1.0 + self._CONTAINMENT_EPS:
            raise ValueError(
                f"Node {uuid} canvas_region y+height must be <= 1, got {y + h}"
            )

    def process_network_mappings(self, mappings):
        '''Temporary process instead of reviewing xml read and convert to objects'''
        # By now we need to correct the data structure from the xml
        # the converter is not getting what we really intended but we'll
        # correct it here by the moment
        temp_nodes = []

        Logger.info(f'Processing network mappings: {mappings}')

        for node in mappings['nodes']:
            temp_node = {}
            for section, contents in node['node'].items():
                if not isinstance(contents, list):
                    temp_node[section] = contents
                else:
                    temp_node[section] = {}
                    for item in contents:
                        for key, values in item.items():
                            temp_node[section][key] = []
                            if values:
                                for elem in values:
                                    for subkey, subvalue in elem.items():
                                        temp_node[section][key].append(subvalue)
            temp_nodes.append(temp_node)

        mappings['nodes'] = temp_nodes
        return mappings

process_network_mappings(mappings)

Temporary process instead of reviewing xml read and convert to objects

Source code in src/cuemsutils/xml/Settings.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
def process_network_mappings(self, mappings):
    '''Temporary process instead of reviewing xml read and convert to objects'''
    # By now we need to correct the data structure from the xml
    # the converter is not getting what we really intended but we'll
    # correct it here by the moment
    temp_nodes = []

    Logger.info(f'Processing network mappings: {mappings}')

    for node in mappings['nodes']:
        temp_node = {}
        for section, contents in node['node'].items():
            if not isinstance(contents, list):
                temp_node[section] = contents
            else:
                temp_node[section] = {}
                for item in contents:
                    for key, values in item.items():
                        temp_node[section][key] = []
                        if values:
                            for elem in values:
                                for subkey, subvalue in elem.items():
                                    temp_node[section][key].append(subvalue)
        temp_nodes.append(temp_node)

    mappings['nodes'] = temp_nodes
    return mappings

ProjectSettings

Bases: Settings

ProjectSettings class that extends Settings to handle project settings operations and override system-wide settings.

Source code in src/cuemsutils/xml/Settings.py
273
274
275
276
277
278
279
280
281
282
283
284
285
class ProjectSettings(Settings):
    """
    ProjectSettings class that extends Settings to handle project settings operations and override system-wide settings.
    """
    def __init__(self, xmlfile, schema_name = 'project_settings', **kwargs):
        if not hasattr(self, 'main_key'):
            self.main_key = 'CuemsProjectSettings'
        super().__init__(
            xmlfile,
            schema_name,
            xml_root_tag='CuemsProjectSettings',
            **kwargs
        )

Settings

Bases: XmlReaderWriter

Settings class that extends XmlReaderWriter to handle configuration file operations.

Source code in src/cuemsutils/xml/Settings.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class Settings(XmlReaderWriter):
    """
    Settings class that extends XmlReaderWriter to handle configuration file operations.
    """
    def __init__(self, xmlfile, schema_name = 'settings', **kwargs):
      if 'xml_root_tag' not in kwargs:
        kwargs['xml_root_tag'] = "CuemsSettings"
      super().__init__(
          schema_name = schema_name,
          xmlfile = xmlfile,
          **kwargs
      )
      if not hasattr(self, 'main_key'):
        self.main_key = 'Settings'
      self.xml_dict = {}
      self.processed = {}
      self.loaded = False

      if self.schema is not None and self.xmlfile is not None:
          self.read()

    def get_dict(self) -> dict[str, Any]:
        if self.main_key == '':
            return self.xml_dict if isinstance(self.xml_dict, dict) else {}
        value = self.xml_dict.get(self.main_key) # type: ignore[index]
        if isinstance(value, dict):
            return value
        # If main_key value is not a dict (e.g., list), wrap it in a dict
        return {self.main_key: value} if value is not None else {}

    def backup(self):
        if os.path.isfile(self.xmlfile):
            Logger.info("File exist")
            try:
                os.rename(self.xmlfile, "{}.back".format(self.xmlfile))
            except OSError:
                Logger.error("Cannot create settings backup")
        else:
            Logger.error("Settings file not found")

    def read(self) -> None:
        self.xml_dict = self.schema_object.to_dict(
            self.xmlfile,
            validation = 'strict',
            dict_class = dict,
            list_class = list,
            strip_namespaces = True,
            attr_prefix = ''
        )
        if (hasattr(self, 'process_xml_dict')):
            self.process_xml_dict() # type: ignore[attr-defined]
        self.loaded = True

    def data2xml(self, obj):
        xml_tree = ET.Element(self.main_key)
        self.xmldata = self.buildxml(xml_tree, obj)

    def buildxml(self, xml_tree, d): #TODO: clean variable names, simplify¿
        if isinstance(d, dict):
            for k, v in d.items():
                if isinstance(k, str):
                    s = ET.SubElement(xml_tree, k)

                elif isinstance(k, (dict)):
                    s = ET.SubElement(xml_tree, type(k).__name__)
                    s.text = str(k)
                elif isinstance(k, (int, float)):
                    s = ET.SubElement(xml_tree, type(v).__name__, id=str(k))
                    if not isinstance(v, dict):
                        s.text =str(v)
                else:
                    s = ET.SubElement(xml_tree, type(k).__name__)

                if isinstance(v, (type(None), CTimecode, dict, list, tuple, int, float, str)): #TODO: filter without using explicit classes (like CTimecode)
                    self.buildxml(s, v)
        elif isinstance(d, tuple) or isinstance(d, list):
            for v in d:
                s = ET.SubElement(xml_tree, type(v).__name__)
                self.buildxml(s, v)
        elif isinstance(d, str):
            xml_tree.text = d
        elif isinstance(d, (float, int)):
      #  elif type(d) is int:
            xml_tree.text = str(d)
        else:
            s = ET.SubElement(xml_tree, type(d).__name__)
            self.buildxml(s, str(d))
        return xml_tree

For the moment it works with pip3 install xmlschema==1.2.2

XmlReaderWriter

Bases: CuemsXml

Source code in src/cuemsutils/xml/XmlReaderWriter.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class XmlReaderWriter(CuemsXml):
    def write(self, xml_data: ElementTree):
        self.schema_object.validate(xml_data)
        xml_data.write(
            self.xmlfile,
            encoding = "utf-8",
            xml_declaration = True
        )

    def write_from_dict(self, project_dict):
        project_object = CuemsParser(project_dict).parse()
        self.write_from_object(project_object)

    def build_xml_from_object(self, project_object):
        """Build XML data from a project object"""
        xml_data = XmlBuilder(
            project_object,
            namespace=self.namespace,
            xsd_path=self.schema,
            xml_root_tag=self.xml_root_tag
        ).build()
        return xml_data

    def write_from_object(self, project_object):
        """Write a project object to an XML file"""
        xml_data = self.build_xml_from_object(project_object)
        self.write(xml_data)

    def validate_object(self, project_object):
        """Validate a project object against the schema"""
        xml_data = self.build_xml_from_object(project_object)
        return self.schema_object.validate(xml_data)

    def read(self, **kwargs):
        return self.schema_object.to_dict(
            self.xmlfile,
            validation = 'strict',
            strip_namespaces = False,
            **kwargs
        )

    def read_to_objects(self):
        xml_dict = self.read()
        return CuemsParser(xml_dict).parse()

build_xml_from_object(project_object)

Build XML data from a project object

Source code in src/cuemsutils/xml/XmlReaderWriter.py
69
70
71
72
73
74
75
76
77
def build_xml_from_object(self, project_object):
    """Build XML data from a project object"""
    xml_data = XmlBuilder(
        project_object,
        namespace=self.namespace,
        xsd_path=self.schema,
        xml_root_tag=self.xml_root_tag
    ).build()
    return xml_data

validate_object(project_object)

Validate a project object against the schema

Source code in src/cuemsutils/xml/XmlReaderWriter.py
84
85
86
87
def validate_object(self, project_object):
    """Validate a project object against the schema"""
    xml_data = self.build_xml_from_object(project_object)
    return self.schema_object.validate(xml_data)

write_from_object(project_object)

Write a project object to an XML file

Source code in src/cuemsutils/xml/XmlReaderWriter.py
79
80
81
82
def write_from_object(self, project_object):
    """Write a project object to an XML file"""
    xml_data = self.build_xml_from_object(project_object)
    self.write(xml_data)

get_pkg_schema(schema_name)

Get the schema file from package resources

Source code in src/cuemsutils/xml/XmlReaderWriter.py
13
14
15
16
17
18
19
20
21
22
@logged
def get_pkg_schema(schema_name: str):
    """Get the schema file from package resources"""
    schemas_dir = path.join(path.dirname(__file__), 'schemas')
    if not schema_name[len(schema_name)-4:] == '.xsd':
        schema_name = schema_name + '.xsd'
    schema = path.join(schemas_dir, schema_name)
    if not path.isfile(schema):
        raise FileNotFoundError(f"Schema file {schema_name} not found")
    return schema