robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

robocop-eu-to-json (6570B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #
      4 # robocop-eu-to-json
      5 #
      6 # Copyright (C) 2025 Taler Systems SA
      7 #
      8 # This program is free software: you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation, either version 3 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
     20 """
     21 EU Consolidated Financial Sanctions List XML to robocop JSON converter.
     22 
     23 Converts the EU's consolidated list (the <export>/<sanctionEntity> schema served
     24 by the European Commission FISMA "FSD" download, default namespace
     25 http://eu.europa.ec/fpi/fsd/export) into robocop's internal JSON format: a flat
     26 JSON array of self-contained target records keyed by a string "ssid", with the
     27 same registry field names emitted by robocop-ch-to-json (PERSON_FIRST_NAMES,
     28 PERSON_LAST_NAME, FULL_NAME, DATE_OF_BIRTH, NATIONALITY, PERSON_NATIONAL_ID,
     29 COMPANY_NAME, ADDRESS_* / REGISTERED_OFFICE_ADDRESS_*, ...).
     30 
     31 Each record's ssid is namespaced "EU-<logicalId>" so records stay unique when
     32 several authorities' lists are combined.
     33 
     34 Usage:
     35     robocop-eu-to-json < eu.xml | robocop-json-postprocess > eu.json
     36 """
     37 
     38 import xml.etree.ElementTree as ET
     39 import json
     40 import sys
     41 import argparse
     42 
     43 
     44 def strip_ns(root):
     45     """Drop XML namespaces so elements can be matched by their local name."""
     46     for el in root.iter():
     47         if isinstance(el.tag, str) and "}" in el.tag:
     48             el.tag = el.tag.split("}", 1)[1]
     49     return root
     50 
     51 
     52 def add(rec, key, value):
     53     """Append a non-empty, stripped string value to a list field."""
     54     if value is None:
     55         return
     56     value = value.strip()
     57     if not value:
     58         return
     59     rec.setdefault(key, []).append(value)
     60 
     61 
     62 def dedupe(rec):
     63     """Remove duplicate values from every list field, preserving order."""
     64     for key, val in rec.items():
     65         if isinstance(val, list):
     66             seen = set()
     67             rec[key] = [x for x in val if not (x in seen or seen.add(x))]
     68     return rec
     69 
     70 
     71 def convert(root):
     72     targets = []
     73     for ent in root.findall("sanctionEntity"):
     74         logical_id = ent.get("logicalId") or ent.get("euReferenceNumber") or ""
     75         rec = {
     76             "ssid": "EU-{}".format(logical_id),
     77             "foreign_identifier": ent.get("euReferenceNumber") or None,
     78             "united_nation_id": ent.get("unitedNationId") or None,
     79             "justification": [],
     80             "other_information": [],
     81         }
     82 
     83         subject = ent.find("subjectType")
     84         code = subject.get("code") if subject is not None else None
     85         is_entity = code in ("enterprise", "vessel", "ship", "aircraft")
     86         rec["target_type"] = "entity" if is_entity else (
     87             "individual" if code == "person" else "other")
     88 
     89         addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_"
     90 
     91         # Names (one <nameAlias> per spelling / alias).
     92         for na in ent.findall("nameAlias"):
     93             first = na.get("firstName") or ""
     94             middle = na.get("middleName") or ""
     95             last = na.get("lastName") or ""
     96             whole = na.get("wholeName") or ""
     97             gender = na.get("gender") or ""
     98             if gender and "sex" not in rec:
     99                 rec["sex"] = {"M": "male", "F": "female"}.get(gender, gender)
    100             if not whole:
    101                 whole = " ".join(p for p in (first, middle, last) if p)
    102             if is_entity:
    103                 add(rec, "COMPANY_NAME", whole)
    104                 add(rec, "BUSINESS_DISPLAY_NAME", whole)
    105             else:
    106                 add(rec, "PERSON_FIRST_NAMES", first)
    107                 add(rec, "PERSON_FIRST_NAMES", middle)
    108                 add(rec, "PERSON_LAST_NAME", last)
    109                 add(rec, "FULL_NAME", whole)
    110             add(rec, "other_information", na.get("function"))
    111 
    112         # Citizenship -> nationality.
    113         for cit in ent.findall("citizenship"):
    114             add(rec, "NATIONALITY", cit.get("countryIso2Code"))
    115 
    116         # Birth dates and places.
    117         for bd in ent.findall("birthdate"):
    118             iso = bd.get("birthdate")
    119             if iso:
    120                 add(rec, "DATE_OF_BIRTH", iso)
    121             elif bd.get("year"):
    122                 add(rec, "DATE_OF_BIRTH", bd.get("year"))
    123             pob = ", ".join(p for p in (bd.get("city"), bd.get("countryDescription")) if p)
    124             if pob:
    125                 add(rec, "other_information", "Place of birth: " + pob)
    126 
    127         # Addresses.
    128         for ad in ent.findall("address"):
    129             line = ", ".join(p for p in (ad.get("street"), ad.get("poBox"),
    130                                          ad.get("place")) if p)
    131             add(rec, addr_prefix + "LINES", line)
    132             add(rec, addr_prefix + "ZIPCODE", ad.get("zipCode"))
    133             add(rec, addr_prefix + "TOWN_LOCATION", ad.get("city"))
    134             add(rec, addr_prefix + "COUNTRY_SUBDIVISION", ad.get("region"))
    135             add(rec, addr_prefix + "COUNTRY", ad.get("countryIso2Code"))
    136 
    137         # Identification documents.
    138         for ident in ent.findall("identification"):
    139             number = ident.get("number") or ident.get("latinNumber")
    140             if number:
    141                 add(rec, "PERSON_NATIONAL_ID", number)
    142 
    143         # Remarks / statement of reasons.
    144         for rmk in ent.findall("remark"):
    145             add(rec, "justification", rmk.text)
    146 
    147         targets.append(dedupe(rec))
    148     return targets
    149 
    150 
    151 def main():
    152     parser = argparse.ArgumentParser(
    153         description="Convert the EU consolidated sanctions list (XML) to robocop JSON")
    154     parser.add_argument("--input", help="Input XML file (default: stdin)")
    155     parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)")
    156     parser.add_argument("--indent", type=int, default=2)
    157     args = parser.parse_args()
    158 
    159     tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin)
    160     root = strip_ns(tree.getroot())
    161     targets = convert(root)
    162 
    163     out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout
    164     json.dump(targets, out, indent=args.indent, ensure_ascii=False)
    165     if args.output:
    166         out.close()
    167     print("EU: converted {} targets".format(len(targets)), file=sys.stderr)
    168 
    169 
    170 if __name__ == "__main__":
    171     main()