Source code for ocx_schema_parser.transformer

#  Copyright (c) 2023-2025. OCX Consortium https://3docx.org. See the LICENSE
"""Schema transformer"""

# System imports

from collections import defaultdict
from pathlib import Path
from typing import Dict, Iterator, List, Union

# Third party imports
from loguru import logger
from lxml.etree import Element, QName

# Application imports
from ocx_schema_parser.data_classes import (
    OcxEnumerator,
    OcxSchemaAttribute,
    OcxSchemaChild,
    SchemaAttribute,
)
from ocx_schema_parser.elements import OcxGlobalElement
from ocx_schema_parser.helpers import SchemaHelper
from ocx_schema_parser.ocxdownloader.downloader import SchemaDownloader
from ocx_schema_parser.ocxparser import OcxParser
from ocx_schema_parser.xparse import LxmlElement


def resolve_source(source: str, recursive: bool) -> Iterator[str]:
    """Resolve the source url.

    Args:
        source:
        recursive: True if
    """
    if "://" in source and not source.startswith("file://"):
        yield source
    else:
        path = Path(source).resolve()
        match = "**/*" if recursive else "*"
        if path.is_dir():
            for ext in ["wsdl", "xsd", "dtd", "xml", "json"]:
                yield from (x.as_uri() for x in path.glob(f"{match}.{ext}"))
        else:  # is file
            yield path.as_uri()


def download_schema_from_url(url: str, schema_folder: Path) -> List:
    """ "Download the schemas from an url before processing.

    Args:
        url: The location of the schema
        schema_folder: The download folder. Will be created if not existing. If existing,
                    any ``.xsd`` files will be deleted.

    Returns:
        True if the was downloaded , false otherwise:
    """

    schema_folder.mkdir(parents=True, exist_ok=True)
    logger.debug(f"Created download folder: {schema_folder.resolve()}")
    # Delete any content if existing
    for file in schema_folder.glob("*.xsd"):
        logger.debug(f"Deleting file: {file}")
        Path(file).unlink()
    downloader = SchemaDownloader(schema_folder)
    downloader.wget(url)
    uris = []
    for key in downloader.downloaded:
        if key is not None:
            uris.append(key)
            logger.debug(f"Downloading from uri: {key}")
    files = list(schema_folder.glob("*.xsd"))
    for file in files:
        logger.debug(f"Downloaded schema file: {file}")
    return uris


def filter_ocx(name: str, prefix: str, ocx: OcxGlobalElement) -> bool:
    """Filter function for looking up an ocx instance by name and prefix."""
    return name == ocx.get_name() and prefix == ocx.get_prefix()


[docs] class Transformer: """The OCX transformer class. Attributes: parser: The instance of the OcxParser _ocx_global_elements: Hash table as key-value pairs `(tag, OcxSchemaElement)` for all parsed schema elements _schema_enumerators: All schema enumerators _simple_types: All schema simple type elements -is_transformed: True if schema classes are transformed, False otherwise """ def __init__(self): self.parser: OcxParser = OcxParser() self._ocx_global_elements: Dict = {} # Hash table with tag as key, value pairs(tag, OcxGlobalElement) self._schema_enumerators: Dict = {} self._simple_types: List[SchemaAttribute] = [] self._global_attributes: List[SchemaAttribute] = [] self._is_transformed: bool = False
[docs] def transform_schema_from_url(self, url: str, folder: Path) -> bool: """Transform the xsd schema with ``url`` into python objects. Returns: True if success, False otherwise. """ if self._transform_schema_from_url(url, folder): self._transform_objects() self._is_transformed = True return self.is_transformed()
[docs] def transform_schema_from_folder(self, folder: Path) -> bool: """Transform the xsd schemas in ``folder``. Returns: True if success, False otherwise. """ if self._transform_schema_in_folder(folder): self._transform_objects() self._is_transformed = True return self.is_transformed()
[docs] def is_transformed(self) -> bool: """Return transformation status.""" return self._is_transformed
[docs] def get_ocx_elements(self) -> List: """Return all global OCX instances.""" return list(self._ocx_global_elements.values())
[docs] def get_ocx_element_with_name(self, name: str) -> OcxGlobalElement: """Return a global OCX instances with name ``name``. Returns: OCX instance with ``name`` """ items = filter(lambda item: item.get_name() == name, self.get_ocx_elements()) for ocx in items: return ocx
[docs] def get_ocx_element_from_type( self, schema_type: str ) -> Union[OcxGlobalElement, None]: """Method to retrieve the schema ``element etree.Element`` with the key 'type' Args: schema_type: the ocx type on the form ``prefix:name`` Returns: The ``OcxGlobalElement`` instance """ object = None name = LxmlElement.strip_namespace_prefix(schema_type) prefix = LxmlElement.namespace_prefix(schema_type) result = filter( lambda ocx: filter_ocx(name, prefix, ocx), self.get_ocx_elements() ) for item in result: object = item return object
[docs] def ocx_iterator(self) -> Iterator: """Return an iterator of the OCX elements.""" return iter(self._ocx_global_elements.values())
[docs] def get_enumerators(self) -> Dict: """Return all enumeration instances.""" return self._schema_enumerators
[docs] def get_global_attributes(self) -> List[SchemaAttribute]: """Return all enumeration instances.""" return self._global_attributes
[docs] def get_simple_types(self) -> List: """Return all global simpleType instances.""" return self._simple_types
[docs] def get_enumerator_types(self) -> Dict: """Return the schema enumerator types. Returns: All enumerator types """ tbl = defaultdict(list) enums = self.get_enumerators() for name, enum in enums.items(): tbl["Name"].append(name) tbl["prefix"].append(enum.prefix) tbl["Tag"].append(enum.tag) return tbl
def _transform_schema_from_url(self, url: str, folder: Path) -> bool: """Transform from a schema location given by a remote url. The schemas and any referenced schemas will be downloaded before transformed. Args: folder: The download folder. url: The remote location of the xsd schema. """ result = False if download_schema_from_url(url, folder): # process downloaded schemas if self._transform_schema_in_folder(folder): result = True return result def _transform_schema_in_folder(self, location: Path) -> bool: """Transform all xsd schemas in the folder ``location`` Args: location: The folder containing the xsd schemas. """ files = resolve_source(str(location.resolve()), True) counter = 0 # parse_file all schemas for file in files: self.parser.process_xsd_from_file(file) counter = +1 return counter > 0 def _add_global_ocx_element(self, tag: str, element: OcxGlobalElement): """Add a global OCX element to the hash table Args: tag: The hash key element: The global OCX element to add """ self._ocx_global_elements[tag] = element def _find_all_my_parents(self, ocx: OcxGlobalElement): """Recursively find all the xsd schema parents of a global xsd element(parent, grandparent ...) The parents found is added to the ocx instance (child) Args: ocx: The global ocx instance to search from """ # Get the unique tag of the global element tag = ocx.get_tag() # Find my parents self._find_parents(tag, ocx) def _find_parents(self, child_tag: str, ocx: OcxGlobalElement): """Recursively find all ancestors of the global element ``OxcGlobalElement`` Args: child_tag: The unique tag of a child ocx: The global element (the root to start the search from) """ # Look up the xsd element ocx.get_name() e = self.parser.get_element_from_tag(child_tag) if e is not None: # The element's type is the parent schema_type = SchemaHelper.get_type(e) if schema_type is None: return # Look up the parent element from its type parent_tag, parent_element = self.parser.get_element_from_type(schema_type) # Add the parent to the global ocx if parent_tag is not None: ocx.put_parent(parent_tag, parent_element) assertion = LxmlElement.find_assertion(parent_element) if assertion is not None: ocx.add_assertion(assertion) self._find_parents(parent_tag, ocx) return def _transform_objects(self) -> None: """Transform all parsed elements to python objects""" # All schema elements of type element elements = self.parser.get_schema_element_types() for tag in elements: e = self.parser.get_element_from_tag(tag) qn = QName(tag) name = qn.localname logger.debug(f"Adding global element {name}") ocx = OcxGlobalElement(e, tag, self.parser._schema_namespaces) # store in look-up table self._add_global_ocx_element(tag, ocx) # Find all parents and add them to the instance self._find_all_my_parents(ocx) # Process all xs:attribute elements including all supertypes self._process_attributes(ocx) # Process ald children including super type children self._process_children(ocx, self.parser.get_substitution_groups()) # Enumeration types for tag in self.parser.get_schema_enumerations(): e = self.parser.get_element_from_tag(tag) name = LxmlElement.get_name(e) prefix = self.parser.get_prefix_from_namespace(QName(tag).namespace) enum = OcxEnumerator(name=name, prefix=prefix, tag=tag) values = [] descriptions = [] for enumeration in LxmlElement.iter(e, "{*}enumeration"): values.append(enumeration.get("value")) descriptions.append(LxmlElement.get_element_text(enumeration)) enum.values = values enum.descriptions = descriptions self._add_schema_enumerator(enum) # Simple types for tag in self.parser.get_schema_simple_types(): element = self.parser.get_element_from_tag(tag) name = LxmlElement.get_name(element) type = SchemaHelper.get_type(element) prefix = self.parser.get_prefix_from_namespace(QName(tag).namespace) restriction = LxmlElement.get_restriction(element) annotation = LxmlElement.get_element_text(element) attribute = SchemaAttribute( name=name, type=type, prefix=prefix, restriction=restriction, description=annotation, ) self._simple_types.append(attribute) # Global attributes for tag in self.parser.get_schema_attribute_types(): element = self.parser.get_element_from_tag(tag) name = LxmlElement.get_name(element) type = SchemaHelper.get_type(element) prefix = self.parser.get_prefix_from_namespace(QName(tag).namespace) restriction = LxmlElement.get_restriction(element) annotation = LxmlElement.get_element_text(element) attribute = SchemaAttribute( name=name, prefix=prefix, type=type, restriction=restriction, description=annotation, ) self._add_global_attribute(attribute) return def _add_schema_enumerator(self, enum: OcxEnumerator): """Add a schema enumerator type. Args: enum: Schema enumerator """ self._schema_enumerators[enum.name] = enum def _add_global_attribute(self, attrib: SchemaAttribute): """Add a schema attribute type. Args: attrib: Schema attribute """ self._global_attributes.append(attrib) def _process_attributes(self, ocx: OcxGlobalElement) -> None: # Simplify function """Process all xs:attributes of the global element Args: ocx: The parent OCX element """ # Process all xs:attribute elements including all supertypes ns = ocx.get_namespace() attributes = LxmlElement.find_attributes(ocx.get_schema_element()) for a in attributes: ocx.add_attribute(self._process_attribute(a, ns)) # Iterate over parents parents = ocx.get_parents() for t in parents: attributes = LxmlElement.find_attributes(parents[t]) for a in attributes: ocx.add_attribute(self._process_attribute(a, ns)) # Process all xs:attributeGroup elements including all supertypes attributeGroups groups = LxmlElement.find_attribute_groups(ocx.get_schema_element()) for group in groups: # Get the reference ref = LxmlElement.get_reference(group) if ref is not None: tag, at_group = self.parser.get_element_from_type(ref) if at_group is not None: attributes = LxmlElement.find_attributes(at_group) for a in attributes: ocx.add_attribute(self._process_attribute(a, ns)) else: logger.error( f"Attribute group {ref} is not found in the global look-up table" ) # Iterate over parents parents = ocx.get_parents() for t in parents: groups = LxmlElement.find_attribute_groups(parents[t]) for group in groups: # Get the reference ref = LxmlElement.get_reference(group) if ref is not None: tag, at_group = self.parser.get_element_from_type(ref) if at_group is not None: attributes = LxmlElement.find_attributes(at_group) for a in attributes: ocx.add_attribute(self._process_attribute(a, ns)) else: logger.error( f"Attribute group {ref} is not found in the global look-up table" ) return def _process_children(self, ocx: OcxGlobalElement, substitutions: Dict): """Process all xs:element of the global element Args: ocx: The parent OCX element """ # Process all xs:element elements including all supertypes target_ns = ocx.get_namespace() elements = LxmlElement.find_all_children_with_name( ocx.get_schema_element(), "element" ) for e in elements: name = f"{self.parser.get_prefix_from_namespace(target_ns)}:{LxmlElement.get_name(e)}" prefix = self.parser.get_prefix_from_namespace(target_ns) child = self._process_child(e, prefix) child.cardinality = LxmlElement.cardinality_string(e) child.is_choice = LxmlElement.is_choice(e) if name in substitutions: for tag in substitutions[name]: element = self.parser.get_element_from_tag(tag) subst = self._process_child(element, prefix) subst.name = LxmlElement.get_name(element) subst.type = SchemaHelper.get_type(element) subst.description = LxmlElement.get_element_text(element) subst.cardinality = child.cardinality subst.is_choice = child.is_choice ocx.add_child(subst) logger.debug(f"{ocx.get_name()}: Adding child {subst.name}") else: ocx.add_child(child) logger.debug(f"{ocx.get_name()}: Adding child {child.name}") # Iterate over parents parents = ocx.get_parents() for t in reversed(parents): elements = LxmlElement.find_all_children_with_name(parents[t], "element") for e in elements: name = f"{self.parser.get_prefix_from_namespace(target_ns)}:{LxmlElement.get_name(e)}" prefix = self.parser.get_prefix_from_namespace(target_ns) child = self._process_child(e, prefix) child.cardinality = LxmlElement.cardinality_string(e) child.is_choice = LxmlElement.is_choice(e) if name in substitutions: for tag in substitutions[name]: element = self.parser.get_element_from_tag(tag) subst = self._process_child(element, prefix) subst.name = LxmlElement.get_name(element) subst.type = SchemaHelper.get_type(element) subst.description = LxmlElement.get_element_text(element) subst.cardinality = child.cardinality subst.is_choice = child.is_choice ocx.add_child(subst) logger.debug(f"{ocx.get_name()}: Adding child {subst.name}") else: ocx.add_child(child) logger.debug(f"{ocx.get_name()}: Adding child {child.name}") return def _process_attribute( self, xs_attribute: Element, target_ns: str ) -> OcxSchemaAttribute: """Process an xs:attribute element Arguments: xs_attribute: The schema attribute target_ns: attribute target namespace Returns: An instance of the OcxSchemaAttribute """ name = LxmlElement.get_name(xs_attribute) type = SchemaHelper.get_type(xs_attribute) use = LxmlElement.get_use(xs_attribute) fixed = xs_attribute.get("fixed") default = xs_attribute.get("default") annotation = LxmlElement.get_element_text(xs_attribute) prefix = self.parser.get_prefix_from_namespace(target_ns) attribute = OcxSchemaAttribute( name=name, prefix=prefix, type=type, fixed=fixed, use=use, default=default, description=annotation, ) reference = LxmlElement.get_reference(xs_attribute) if reference is not None: # Get the referenced element tag, a = self.parser.get_element_from_type(reference) attribute.name = LxmlElement.get_name(a) # attribute.assign_referenced_attribute(a) if attribute.description == "": attribute.description = LxmlElement.get_element_text(a) attribute.type = SchemaHelper.get_type(a) return attribute else: if attribute.type is None: # logger.debug(f'The schema type {attribute.get_name()} has no type') qn = QName(xs_attribute) prefix = self.parser.get_prefix_from_namespace(qn.namespace) type = LxmlElement.strip_namespace_tag(xs_attribute.tag) attribute.type = f"{prefix}:{type}" return attribute def _process_child(self, xs_element: Element, prefix: str) -> OcxSchemaChild: """Process an xs:element child element Arguments: xs_element: The schema element unique_tag: The element unique tag Returns: An instance of the child """ name = LxmlElement.get_name(xs_element) type = SchemaHelper.get_type(xs_element) annotation = LxmlElement.get_element_text(xs_element) cardinality = LxmlElement.cardinality_string(xs_element) lower, upper = LxmlElement.cardinality(xs_element) choice = LxmlElement.is_choice(xs_element) if lower == 0: use = "opt." else: use = "req." child = OcxSchemaChild( name=name, prefix=prefix, type=type, use=use, description=annotation, cardinality=cardinality, is_choice=choice, ) reference = LxmlElement.get_reference(xs_element) if reference is not None: # Get the referenced element tag, a = self.parser.get_element_from_type(reference) child.name = LxmlElement.get_name(a) if child.description == "": child.description = LxmlElement.get_element_text(a) child.type = SchemaHelper.get_type(a) return child