summaryrefslogtreecommitdiff
path: root/codegen/schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'codegen/schema.py')
-rw-r--r--codegen/schema.py171
1 files changed, 171 insertions, 0 deletions
diff --git a/codegen/schema.py b/codegen/schema.py
new file mode 100644
index 00000000..6943e59c
--- /dev/null
+++ b/codegen/schema.py
@@ -0,0 +1,171 @@
+"""Manipulate ISO 20022 schemas"""
+
+import dataclasses
+from dataclasses import dataclass
+import xsd
+
+
+@dataclass
+class Primitive:
+ type: xsd.Kind
+ restriction: dict[str, str]
+
+
+@dataclass
+class Enumeration:
+ values: list[str] | None
+
+
+@dataclass
+class Attribute:
+ type: Primitive | Enumeration
+ required: bool
+
+
+@dataclass
+class Simple:
+ type: Primitive | Enumeration
+ attributes: dict[str, Attribute]
+
+
+@dataclass
+class Property:
+ type: "Type"
+ min: int
+ max: int
+
+
+@dataclass
+class Sequence:
+ properties: dict[str, Property]
+
+
+@dataclass
+class Choice:
+ choices: dict[str, Property]
+
+
+@dataclass
+class Any:
+ pass
+
+
+Type = Primitive | Simple | Enumeration | Sequence | Choice | Any
+
+
+def _json_attribute(attr: Attribute, name: str):
+ if not attr.required:
+ name = f"{name}?"
+
+ return _json(attr.type, name)
+
+
+def _json_property(property: Property, name: str):
+ if property.max != 1:
+ name = f"{name}[{property.min}:{property.max}]"
+ elif property.min == 0:
+ name = f"{name}?"
+
+ return _json(property.type, name)
+
+
+def _json(ty: Type, name: str):
+ """Generate a json object from any type"""
+ if isinstance(ty, Primitive):
+ return (name, dataclasses.asdict(ty))
+ elif isinstance(ty, Simple):
+ (name, type) = _json(ty.type, name)
+ return (
+ name,
+ {
+ "type": type,
+ "attribute": {
+ k: v
+ for k, v in [
+ _json_attribute(attr, name)
+ for name, attr in ty.attributes.items()
+ ]
+ },
+ },
+ )
+ elif isinstance(ty, Enumeration):
+ return (name, ty.values)
+ elif isinstance(ty, Sequence):
+ return (
+ name,
+ {
+ k: v
+ for k, v in [
+ _json_property(property, name)
+ for name, property in ty.properties.items()
+ ]
+ },
+ )
+ elif isinstance(ty, Choice):
+ return (
+ name,
+ {
+ k: v
+ for k, v in [
+ _json_property(property, name)
+ for name, property in ty.choices.items()
+ ]
+ },
+ )
+ elif isinstance(ty, Any):
+ return (name, "any")
+ else:
+ raise Exception(f"unexpected type {ty}")
+
+
+def _build(ty: xsd.Type, types: dict[str, xsd.Type]) -> Type:
+ def _inner_simple(name: str) -> Primitive | Enumeration:
+ inner = types[name]
+ assert isinstance(inner, xsd.Primitive | xsd.Enumeration), f"{ty}"
+ built = _build(inner, types)
+ assert isinstance(built, Primitive | Enumeration)
+ return built
+
+ def _inner(name: str) -> Type:
+ return _build(types[name], types)
+
+ if isinstance(ty, xsd.Primitive):
+ return Primitive(ty.type, ty.restriction)
+ elif isinstance(ty, xsd.Simple):
+ return Simple(
+ _inner_simple(ty.type),
+ {
+ attr.name: Attribute(_inner_simple(attr.type), attr.required)
+ for attr in ty.attributes
+ },
+ )
+ elif isinstance(ty, xsd.Enumeration):
+ return Enumeration(ty.values)
+ elif isinstance(ty, xsd.Sequence):
+ return Sequence(
+ {
+ property.name: Property(
+ _inner(property.type), property.min, property.max
+ )
+ for property in ty.properties
+ }
+ )
+ elif isinstance(ty, xsd.Choice):
+ return Choice(
+ {
+ choice.name: Property(_inner(choice.type), choice.min, choice.max)
+ for choice in ty.choices
+ }
+ )
+ elif isinstance(ty, xsd.Any):
+ return Any()
+ else:
+ raise Exception(f"unexpected type {ty}")
+
+
+def build(schema: str, types: dict[str, xsd.Type]) -> Sequence:
+ """Build a schema tree from a parsed XML schema"""
+ root = types[schema.replace(".", "_")]
+ root = _build(root, types)
+ assert isinstance(root, Sequence)
+ return root