robocop-eu-to-json (6570B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # robocop-eu-to-json 5 # 6 # Copyright (C) 2025 Taler Systems SA 7 # 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 12 # 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 17 # 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <https://www.gnu.org/licenses/>. 20 """ 21 EU Consolidated Financial Sanctions List XML to robocop JSON converter. 22 23 Converts the EU's consolidated list (the <export>/<sanctionEntity> schema served 24 by the European Commission FISMA "FSD" download, default namespace 25 http://eu.europa.ec/fpi/fsd/export) into robocop's internal JSON format: a flat 26 JSON array of self-contained target records keyed by a string "ssid", with the 27 same registry field names emitted by robocop-ch-to-json (PERSON_FIRST_NAMES, 28 PERSON_LAST_NAME, FULL_NAME, DATE_OF_BIRTH, NATIONALITY, PERSON_NATIONAL_ID, 29 COMPANY_NAME, ADDRESS_* / REGISTERED_OFFICE_ADDRESS_*, ...). 30 31 Each record's ssid is namespaced "EU-<logicalId>" so records stay unique when 32 several authorities' lists are combined. 33 34 Usage: 35 robocop-eu-to-json < eu.xml | robocop-json-postprocess > eu.json 36 """ 37 38 import xml.etree.ElementTree as ET 39 import json 40 import sys 41 import argparse 42 43 44 def strip_ns(root): 45 """Drop XML namespaces so elements can be matched by their local name.""" 46 for el in root.iter(): 47 if isinstance(el.tag, str) and "}" in el.tag: 48 el.tag = el.tag.split("}", 1)[1] 49 return root 50 51 52 def add(rec, key, value): 53 """Append a non-empty, stripped string value to a list field.""" 54 if value is None: 55 return 56 value = value.strip() 57 if not value: 58 return 59 rec.setdefault(key, []).append(value) 60 61 62 def dedupe(rec): 63 """Remove duplicate values from every list field, preserving order.""" 64 for key, val in rec.items(): 65 if isinstance(val, list): 66 seen = set() 67 rec[key] = [x for x in val if not (x in seen or seen.add(x))] 68 return rec 69 70 71 def convert(root): 72 targets = [] 73 for ent in root.findall("sanctionEntity"): 74 logical_id = ent.get("logicalId") or ent.get("euReferenceNumber") or "" 75 rec = { 76 "ssid": "EU-{}".format(logical_id), 77 "foreign_identifier": ent.get("euReferenceNumber") or None, 78 "united_nation_id": ent.get("unitedNationId") or None, 79 "justification": [], 80 "other_information": [], 81 } 82 83 subject = ent.find("subjectType") 84 code = subject.get("code") if subject is not None else None 85 is_entity = code in ("enterprise", "vessel", "ship", "aircraft") 86 rec["target_type"] = "entity" if is_entity else ( 87 "individual" if code == "person" else "other") 88 89 addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" 90 91 # Names (one <nameAlias> per spelling / alias). 92 for na in ent.findall("nameAlias"): 93 first = na.get("firstName") or "" 94 middle = na.get("middleName") or "" 95 last = na.get("lastName") or "" 96 whole = na.get("wholeName") or "" 97 gender = na.get("gender") or "" 98 if gender and "sex" not in rec: 99 rec["sex"] = {"M": "male", "F": "female"}.get(gender, gender) 100 if not whole: 101 whole = " ".join(p for p in (first, middle, last) if p) 102 if is_entity: 103 add(rec, "COMPANY_NAME", whole) 104 add(rec, "BUSINESS_DISPLAY_NAME", whole) 105 else: 106 add(rec, "PERSON_FIRST_NAMES", first) 107 add(rec, "PERSON_FIRST_NAMES", middle) 108 add(rec, "PERSON_LAST_NAME", last) 109 add(rec, "FULL_NAME", whole) 110 add(rec, "other_information", na.get("function")) 111 112 # Citizenship -> nationality. 113 for cit in ent.findall("citizenship"): 114 add(rec, "NATIONALITY", cit.get("countryIso2Code")) 115 116 # Birth dates and places. 117 for bd in ent.findall("birthdate"): 118 iso = bd.get("birthdate") 119 if iso: 120 add(rec, "DATE_OF_BIRTH", iso) 121 elif bd.get("year"): 122 add(rec, "DATE_OF_BIRTH", bd.get("year")) 123 pob = ", ".join(p for p in (bd.get("city"), bd.get("countryDescription")) if p) 124 if pob: 125 add(rec, "other_information", "Place of birth: " + pob) 126 127 # Addresses. 128 for ad in ent.findall("address"): 129 line = ", ".join(p for p in (ad.get("street"), ad.get("poBox"), 130 ad.get("place")) if p) 131 add(rec, addr_prefix + "LINES", line) 132 add(rec, addr_prefix + "ZIPCODE", ad.get("zipCode")) 133 add(rec, addr_prefix + "TOWN_LOCATION", ad.get("city")) 134 add(rec, addr_prefix + "COUNTRY_SUBDIVISION", ad.get("region")) 135 add(rec, addr_prefix + "COUNTRY", ad.get("countryIso2Code")) 136 137 # Identification documents. 138 for ident in ent.findall("identification"): 139 number = ident.get("number") or ident.get("latinNumber") 140 if number: 141 add(rec, "PERSON_NATIONAL_ID", number) 142 143 # Remarks / statement of reasons. 144 for rmk in ent.findall("remark"): 145 add(rec, "justification", rmk.text) 146 147 targets.append(dedupe(rec)) 148 return targets 149 150 151 def main(): 152 parser = argparse.ArgumentParser( 153 description="Convert the EU consolidated sanctions list (XML) to robocop JSON") 154 parser.add_argument("--input", help="Input XML file (default: stdin)") 155 parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") 156 parser.add_argument("--indent", type=int, default=2) 157 args = parser.parse_args() 158 159 tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) 160 root = strip_ns(tree.getroot()) 161 targets = convert(root) 162 163 out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout 164 json.dump(targets, out, indent=args.indent, ensure_ascii=False) 165 if args.output: 166 out.close() 167 print("EU: converted {} targets".format(len(targets)), file=sys.stderr) 168 169 170 if __name__ == "__main__": 171 main()