robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

commit 672fa4cef6382c7686c0270c9d78f3fccc5ddf3e
parent c00ae79a6b3be1a90e09ada0d30f8fb4d16234be
Author: Christian Grothoff <christian@grothoff.org>
Date:   Sat, 27 Jun 2026 20:17:45 +0200

Add EU/UN/OFAC/UK sanction-list converters to robocop's internal format:

- robocop-eu-to-json    EU consolidated  (export/sanctionEntity schema)
- robocop-un-to-json    UN consolidated  (CONSOLIDATED_LIST schema)
- robocop-ofac-to-json  OFAC SDN + Consolidated (legacy sdnList schema)
- robocop-uk-to-json    UK OFSI ConList  (ArrayOfFinancialSanctionsTarget)

ssids are namespaced per authority (EU-/UN-/OFAC-/GB-) so lists can be merged;
robocop-ofac-to-json takes --prefix (use OFAC-CONS-) because OFAC reuses a few
uids across SDN and the consolidated list.

Diffstat:
M.gitignore | 4++--
MMakefile | 4++++
MREADME.md | 45+++++++++++++++++++++++++++++++++++++++++++--
Mdebian/robocop.install | 4++++
Arobocop-eu-to-json | 171+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Arobocop-ofac-to-json | 181+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Arobocop-uk-to-json | 170+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Arobocop-un-to-json | 210+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 785 insertions(+), 4 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -8,4 +8,5 @@ debian/robocop.substvars debian/tmp/ release/ target -*.mk -\ No newline at end of file +*.mk__pycache__/ +*.pyc diff --git a/Makefile b/Makefile @@ -21,6 +21,10 @@ build: install: build install -D -t $(bin_dir) target/release/robocop install -D -t $(bin_dir) robocop-ch-to-json + install -D -t $(bin_dir) robocop-eu-to-json + install -D -t $(bin_dir) robocop-un-to-json + install -D -t $(bin_dir) robocop-ofac-to-json + install -D -t $(bin_dir) robocop-uk-to-json install -D -t $(bin_dir) robocop-json-postprocess install -m 644 -D doc/prebuilt/man/robocop.1 $(man_dir)/man1 diff --git a/README.md b/README.md @@ -29,8 +29,49 @@ $ ~/.cargo/bin/robocop swiss.json ``` -Create the list from the Swiss XML file using: +## Converting official sanction lists to robocop's internal format + +`robocop` consumes its sanction list as a JSON **array of target records**. Each +record has a string `ssid` (its identifier) plus any number of registry fields +whose values are **arrays of strings** (e.g. `FULL_NAME`, `PERSON_FIRST_NAMES`, +`PERSON_LAST_NAME`, `DATE_OF_BIRTH`, `NATIONALITY`, `PERSON_NATIONAL_ID`, +`COMPANY_NAME`, `ADDRESS_*` / `REGISTERED_OFFICE_ADDRESS_*`). At match time +`robocop` compares each field of an incoming query against the same-named field +of every record (fuzzy, Levenshtein-based), so all converters emit the **same** +registry field names regardless of source list. + +One converter is provided per source-list format. Each reads the official XML on +stdin and writes the JSON array on stdout; pipe it through +`robocop-json-postprocess` (which drops empty/`null` fields) to get the final +list: + +| Converter | Source list | Official XML schema | +|-----------|-------------|---------------------| +| `robocop-ch-to-json` | Switzerland — SECO | `swiss-sanctions-list` | +| `robocop-eu-to-json` | EU — Consolidated Financial Sanctions List | `export` / `sanctionEntity` | +| `robocop-un-to-json` | UN — Security Council Consolidated List | `CONSOLIDATED_LIST` | +| `robocop-ofac-to-json` | US — OFAC SDN **and** Consolidated lists | legacy `sdnList` / `sdnEntry` | +| `robocop-uk-to-json` | UK — OFSI Consolidated List | `ArrayOfFinancialSanctionsTarget` | + +Each record's `ssid` is namespaced by authority (`EU-`, `UN-`, `OFAC-`, `GB-`, +and the bare numeric Swiss id) so records stay unique if several lists are +concatenated into one file. ``` -$ ./robocop-ch-to-json < swiss.xml | robocop-json-postprocess > swiss.json +$ ./robocop-ch-to-json < swiss.xml | robocop-json-postprocess > swiss.json +$ ./robocop-eu-to-json < eu.xml | robocop-json-postprocess > eu.json +$ ./robocop-un-to-json < un.xml | robocop-json-postprocess > un.json +$ ./robocop-ofac-to-json < SDN.XML | robocop-json-postprocess > ofac-sdn.json +$ ./robocop-ofac-to-json --prefix OFAC-CONS- < CONSOLIDATED.XML \ + | robocop-json-postprocess > ofac-cons.json +$ ./robocop-uk-to-json < ConList.xml | robocop-json-postprocess > uk.json ``` + +Note on the OFAC consolidated (non-SDN) list: OFAC serves it in the legacy +`sdnList` format at `.../exports/CONSOLIDATED.XML`, and in the newer "advanced" +format at `CONS_ADVANCED.XML`. `robocop-ofac-to-json` reads the **legacy** format, +so a single converter handles both the SDN list and the consolidated list; use +`CONSOLIDATED.XML` (not `CONS_ADVANCED.XML`, which robocop does not parse). Because +OFAC reuses a few `uid`s across the two lists, pass `--prefix OFAC-CONS-` when +converting the consolidated list if you intend to merge it with the SDN list into +one file, so the records keep distinct `ssid`s. diff --git a/debian/robocop.install b/debian/robocop.install @@ -1,4 +1,8 @@ robocop-ch-to-json /usr/bin +robocop-eu-to-json /usr/bin +robocop-un-to-json /usr/bin +robocop-ofac-to-json /usr/bin +robocop-uk-to-json /usr/bin robocop-json-postprocess /usr/bin target/release/robocop /usr/bin doc/prebuilt/man/robocop.1 /usr/share/man/man1/ \ No newline at end of file diff --git a/robocop-eu-to-json b/robocop-eu-to-json @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# robocop-eu-to-json +# +# Copyright (C) 2025 Taler Systems SA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +EU Consolidated Financial Sanctions List XML to robocop JSON converter. + +Converts the EU's consolidated list (the <export>/<sanctionEntity> schema served +by the European Commission FISMA "FSD" download, default namespace +http://eu.europa.ec/fpi/fsd/export) into robocop's internal JSON format: a flat +JSON array of self-contained target records keyed by a string "ssid", with the +same registry field names emitted by robocop-ch-to-json (PERSON_FIRST_NAMES, +PERSON_LAST_NAME, FULL_NAME, DATE_OF_BIRTH, NATIONALITY, PERSON_NATIONAL_ID, +COMPANY_NAME, ADDRESS_* / REGISTERED_OFFICE_ADDRESS_*, ...). + +Each record's ssid is namespaced "EU-<logicalId>" so records stay unique when +several authorities' lists are combined. + +Usage: + robocop-eu-to-json < eu.xml | robocop-json-postprocess > eu.json +""" + +import xml.etree.ElementTree as ET +import json +import sys +import argparse + + +def strip_ns(root): + """Drop XML namespaces so elements can be matched by their local name.""" + for el in root.iter(): + if isinstance(el.tag, str) and "}" in el.tag: + el.tag = el.tag.split("}", 1)[1] + return root + + +def add(rec, key, value): + """Append a non-empty, stripped string value to a list field.""" + if value is None: + return + value = value.strip() + if not value: + return + rec.setdefault(key, []).append(value) + + +def dedupe(rec): + """Remove duplicate values from every list field, preserving order.""" + for key, val in rec.items(): + if isinstance(val, list): + seen = set() + rec[key] = [x for x in val if not (x in seen or seen.add(x))] + return rec + + +def convert(root): + targets = [] + for ent in root.findall("sanctionEntity"): + logical_id = ent.get("logicalId") or ent.get("euReferenceNumber") or "" + rec = { + "ssid": "EU-{}".format(logical_id), + "foreign_identifier": ent.get("euReferenceNumber") or None, + "united_nation_id": ent.get("unitedNationId") or None, + "justification": [], + "other_information": [], + } + + subject = ent.find("subjectType") + code = subject.get("code") if subject is not None else None + is_entity = code in ("enterprise", "vessel", "ship", "aircraft") + rec["target_type"] = "entity" if is_entity else ( + "individual" if code == "person" else "other") + + addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" + + # Names (one <nameAlias> per spelling / alias). + for na in ent.findall("nameAlias"): + first = na.get("firstName") or "" + middle = na.get("middleName") or "" + last = na.get("lastName") or "" + whole = na.get("wholeName") or "" + gender = na.get("gender") or "" + if gender and "sex" not in rec: + rec["sex"] = {"M": "male", "F": "female"}.get(gender, gender) + if not whole: + whole = " ".join(p for p in (first, middle, last) if p) + if is_entity: + add(rec, "COMPANY_NAME", whole) + add(rec, "BUSINESS_DISPLAY_NAME", whole) + else: + add(rec, "PERSON_FIRST_NAMES", first) + add(rec, "PERSON_FIRST_NAMES", middle) + add(rec, "PERSON_LAST_NAME", last) + add(rec, "FULL_NAME", whole) + add(rec, "other_information", na.get("function")) + + # Citizenship -> nationality. + for cit in ent.findall("citizenship"): + add(rec, "NATIONALITY", cit.get("countryIso2Code")) + + # Birth dates and places. + for bd in ent.findall("birthdate"): + iso = bd.get("birthdate") + if iso: + add(rec, "DATE_OF_BIRTH", iso) + elif bd.get("year"): + add(rec, "DATE_OF_BIRTH", bd.get("year")) + pob = ", ".join(p for p in (bd.get("city"), bd.get("countryDescription")) if p) + if pob: + add(rec, "other_information", "Place of birth: " + pob) + + # Addresses. + for ad in ent.findall("address"): + line = ", ".join(p for p in (ad.get("street"), ad.get("poBox"), + ad.get("place")) if p) + add(rec, addr_prefix + "LINES", line) + add(rec, addr_prefix + "ZIPCODE", ad.get("zipCode")) + add(rec, addr_prefix + "TOWN_LOCATION", ad.get("city")) + add(rec, addr_prefix + "COUNTRY_SUBDIVISION", ad.get("region")) + add(rec, addr_prefix + "COUNTRY", ad.get("countryIso2Code")) + + # Identification documents. + for ident in ent.findall("identification"): + number = ident.get("number") or ident.get("latinNumber") + if number: + add(rec, "PERSON_NATIONAL_ID", number) + + # Remarks / statement of reasons. + for rmk in ent.findall("remark"): + add(rec, "justification", rmk.text) + + targets.append(dedupe(rec)) + return targets + + +def main(): + parser = argparse.ArgumentParser( + description="Convert the EU consolidated sanctions list (XML) to robocop JSON") + parser.add_argument("--input", help="Input XML file (default: stdin)") + parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") + parser.add_argument("--indent", type=int, default=2) + args = parser.parse_args() + + tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) + root = strip_ns(tree.getroot()) + targets = convert(root) + + out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout + json.dump(targets, out, indent=args.indent, ensure_ascii=False) + if args.output: + out.close() + print("EU: converted {} targets".format(len(targets)), file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/robocop-ofac-to-json b/robocop-ofac-to-json @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# robocop-ofac-to-json +# +# Copyright (C) 2025 Taler Systems SA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +OFAC (US Treasury) sanctions XML to robocop JSON converter. + +Converts OFAC's legacy <sdnList>/<sdnEntry> XML schema into robocop's internal +JSON format. This schema is used by BOTH OFAC publications served from +sanctionslistservice.ofac.treas.gov: + - the SDN list (.../exports/SDN.XML) + - the Consolidated list (.../exports/CONS.XML) [non-SDN] +so a single converter handles both. Output is a flat JSON array of self-contained +target records keyed by a string "ssid" (namespaced "OFAC-<uid>"), using the same +registry field names as robocop-ch-to-json. + +Usage: + robocop-ofac-to-json < SDN.XML | robocop-json-postprocess > ofac-sdn.json + robocop-ofac-to-json < CONS.XML | robocop-json-postprocess > ofac-cons.json +""" + +import xml.etree.ElementTree as ET +import json +import sys +import argparse + + +def strip_ns(root): + for el in root.iter(): + if isinstance(el.tag, str) and "}" in el.tag: + el.tag = el.tag.split("}", 1)[1] + return root + + +def text(el, tag): + child = el.find(tag) + if child is not None and child.text and child.text.strip(): + return child.text.strip() + return None + + +def add(rec, key, value): + if value is None: + return + value = value.strip() + if not value: + return + rec.setdefault(key, []).append(value) + + +def dedupe(rec): + for key, val in rec.items(): + if isinstance(val, list): + seen = set() + rec[key] = [x for x in val if not (x in seen or seen.add(x))] + return rec + + +def whole_name(first, last): + return " ".join(p for p in (first, last) if p) + + +def add_name(rec, first, last, is_entity): + if is_entity: + name = whole_name(first, last) + add(rec, "COMPANY_NAME", name) + add(rec, "BUSINESS_DISPLAY_NAME", name) + else: + add(rec, "PERSON_FIRST_NAMES", first) + add(rec, "PERSON_LAST_NAME", last) + add(rec, "FULL_NAME", whole_name(first, last)) + + +def convert(root, prefix="OFAC-"): + targets = [] + for entry in root.findall("sdnEntry"): + uid = text(entry, "uid") or "" + sdn_type = text(entry, "sdnType") or "" + is_entity = sdn_type not in ("Individual",) + rec = { + "ssid": "{}{}".format(prefix, uid), + "target_type": "individual" if sdn_type == "Individual" else ( + "entity" if sdn_type == "Entity" else "other"), + "sdn_type": sdn_type or None, + "justification": [], + "other_information": [], + } + addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" + + add_name(rec, text(entry, "firstName"), text(entry, "lastName"), is_entity) + add(rec, "other_information", text(entry, "title")) + + aka_list = entry.find("akaList") + if aka_list is not None: + for aka in aka_list.findall("aka"): + add_name(rec, text(aka, "firstName"), text(aka, "lastName"), is_entity) + + dob_list = entry.find("dateOfBirthList") + if dob_list is not None: + for item in dob_list.findall("dateOfBirthItem"): + add(rec, "DATE_OF_BIRTH", text(item, "dateOfBirth")) + + pob_list = entry.find("placeOfBirthList") + if pob_list is not None: + for item in pob_list.findall("placeOfBirthItem"): + pob = text(item, "placeOfBirth") + if pob: + add(rec, "other_information", "Place of birth: " + pob) + + nat_list = entry.find("nationalityList") + if nat_list is not None: + for item in nat_list.findall("nationality"): + add(rec, "NATIONALITY", text(item, "country")) + + addr_list = entry.find("addressList") + if addr_list is not None: + for ad in addr_list.findall("address"): + line = ", ".join(p for p in (text(ad, "address1"), text(ad, "address2"), + text(ad, "address3")) if p) + add(rec, addr_prefix + "LINES", line) + add(rec, addr_prefix + "TOWN_LOCATION", text(ad, "city")) + add(rec, addr_prefix + "COUNTRY_SUBDIVISION", text(ad, "stateOrProvince")) + add(rec, addr_prefix + "ZIPCODE", text(ad, "postalCode")) + add(rec, addr_prefix + "COUNTRY", text(ad, "country")) + + id_list = entry.find("idList") + if id_list is not None: + for idel in id_list.findall("id"): + number = text(idel, "idNumber") + id_type = (text(idel, "idType") or "").lower() + # Skip OFAC's non-identifier annotations carried in idList. + if number and "secondary sanctions risk" not in id_type: + add(rec, "PERSON_NATIONAL_ID", number) + + for rmk in entry.findall("remarks"): + add(rec, "justification", rmk.text) + + targets.append(dedupe(rec)) + return targets + + +def main(): + parser = argparse.ArgumentParser( + description="Convert an OFAC sanctions list (legacy sdnList XML) to robocop JSON") + parser.add_argument("--input", help="Input XML file (default: stdin)") + parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") + parser.add_argument("--prefix", default="OFAC-", + help="ssid prefix (default: OFAC-). Use a distinct value, " + "e.g. OFAC-CONS-, for the consolidated list so its uids " + "do not collide with the SDN list when the two are merged.") + parser.add_argument("--indent", type=int, default=2) + args = parser.parse_args() + + tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) + root = strip_ns(tree.getroot()) + targets = convert(root, args.prefix) + + out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout + json.dump(targets, out, indent=args.indent, ensure_ascii=False) + if args.output: + out.close() + print("OFAC: converted {} targets".format(len(targets)), file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/robocop-uk-to-json b/robocop-uk-to-json @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# robocop-uk-to-json +# +# Copyright (C) 2025 Taler Systems SA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +UK OFSI Consolidated List XML to robocop JSON converter. + +Converts the UK OFSI "ConList" (the <ArrayOfFinancialSanctionsTarget> schema, +default namespace http://schemas.hmtreasury.gov.uk/ofsi/consolidatedlist) into +robocop's internal JSON format. The OFSI list is FLAT: each +<FinancialSanctionsTarget> is a single name/alias row, and rows that share a +<GroupID> are the same designated target. This converter groups rows by GroupID +so one robocop record (ssid "GB-<GroupID>") accumulates every name variation, +address and attribute, using the same registry field names as robocop-ch-to-json. + +Usage: + robocop-uk-to-json < ConList.xml | robocop-json-postprocess > uk.json +""" + +import xml.etree.ElementTree as ET +import json +import sys +import argparse + + +def strip_ns(root): + for el in root.iter(): + if isinstance(el.tag, str) and "}" in el.tag: + el.tag = el.tag.split("}", 1)[1] + return root + + +def text(el, tag): + child = el.find(tag) + if child is not None and child.text and child.text.strip(): + return child.text.strip() + return None + + +def add(rec, key, value): + if value is None: + return + value = value.strip() + if not value: + return + rec.setdefault(key, []).append(value) + + +def dedupe(rec): + for key, val in rec.items(): + if isinstance(val, list): + seen = set() + rec[key] = [x for x in val if not (x in seen or seen.add(x))] + return rec + + +def date_only(value): + """OFSI dates look like 2022-12-09T00:00:00; keep the date part.""" + if value and "T" in value: + return value.split("T", 1)[0] + return value + + +def convert(root): + groups = {} + order = [] + for t in root.findall("FinancialSanctionsTarget"): + gid = text(t, "GroupID") or "" + if gid not in groups: + type_desc = (text(t, "GroupTypeDescription") or "").lower() + target_type = "individual" if type_desc == "individual" else ( + "entity" if type_desc == "entity" else "other") + groups[gid] = { + "ssid": "GB-{}".format(gid), + "foreign_identifier": text(t, "UKSanctionsListRef"), + "target_type": target_type, + "justification": [], + "other_information": [], + } + order.append(gid) + rec = groups[gid] + is_entity = rec["target_type"] == "entity" + addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" + + # Name parts: name1..name5 are forenames, Name6 is the family name. + forenames = [text(t, "name{}".format(i)) for i in range(1, 6)] + forenames = [p for p in forenames if p] + family = text(t, "Name6") + whole = " ".join(p for p in (forenames + [family]) if p) + if is_entity: + add(rec, "COMPANY_NAME", whole) + add(rec, "BUSINESS_DISPLAY_NAME", whole) + else: + for p in forenames: + add(rec, "PERSON_FIRST_NAMES", p) + add(rec, "PERSON_LAST_NAME", family) + add(rec, "FULL_NAME", whole) + add(rec, "FULL_NAME", text(t, "NameNonLatinScript")) + + gender = text(t, "Individual_Gender") + if gender and "sex" not in rec: + rec["sex"] = gender.lower() + + # Address. + line = ", ".join(p for p in (text(t, "Address1"), text(t, "Address2"), + text(t, "Address3"), text(t, "Address4"), + text(t, "Address5"), text(t, "Address6")) if p) + add(rec, addr_prefix + "LINES", line) + add(rec, addr_prefix + "ZIPCODE", text(t, "PostCode")) + add(rec, addr_prefix + "COUNTRY", text(t, "Country")) + + # Individual attributes. + add(rec, "DATE_OF_BIRTH", date_only(text(t, "Individual_DateOfBirth"))) + add(rec, "NATIONALITY", text(t, "Individual_Nationality")) + add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_PassportNumber")) + add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_NINumber")) + cob = text(t, "Individual_CountryOfBirth") + tob = text(t, "Individual_TownOfBirth") + pob = ", ".join(p for p in (tob, cob) if p) + if pob: + add(rec, "other_information", "Place of birth: " + pob) + + # Entity attributes. + add(rec, "COMMERCIAL_REGISTER_NUMBER", text(t, "Entity_BusinessRegNumber")) + + # Contact details and reasons. + add(rec, "CONTACT_PHONE", text(t, "PhoneNumber")) + add(rec, "CONTACT_EMAIL", text(t, "EmailAddress")) + add(rec, "justification", text(t, "UKStatementOfReasons")) + add(rec, "other_information", text(t, "OtherInformation")) + + return [dedupe(groups[gid]) for gid in order] + + +def main(): + parser = argparse.ArgumentParser( + description="Convert the UK OFSI consolidated list (XML) to robocop JSON") + parser.add_argument("--input", help="Input XML file (default: stdin)") + parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") + parser.add_argument("--indent", type=int, default=2) + args = parser.parse_args() + + tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) + root = strip_ns(tree.getroot()) + targets = convert(root) + + out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout + json.dump(targets, out, indent=args.indent, ensure_ascii=False) + if args.output: + out.close() + print("UK: converted {} targets".format(len(targets)), file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/robocop-un-to-json b/robocop-un-to-json @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# robocop-un-to-json +# +# Copyright (C) 2025 Taler Systems SA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +UN Security Council Consolidated List XML to robocop JSON converter. + +Converts the UN consolidated list (the <CONSOLIDATED_LIST> schema with +<INDIVIDUALS>/<INDIVIDUAL> and <ENTITIES>/<ENTITY> records published at +scsanctions.un.org) into robocop's internal JSON format: a flat JSON array of +self-contained target records keyed by a string "ssid", using the same registry +field names as robocop-ch-to-json. + +Each record's ssid is namespaced "UN-<DATAID>". + +Usage: + robocop-un-to-json < un.xml | robocop-json-postprocess > un.json +""" + +import xml.etree.ElementTree as ET +import json +import sys +import argparse + + +def text(el, tag): + """Return the stripped text of a direct child <tag>, or None.""" + child = el.find(tag) + if child is not None and child.text and child.text.strip(): + return child.text.strip() + return None + + +def add(rec, key, value): + if value is None: + return + value = value.strip() + if not value: + return + rec.setdefault(key, []).append(value) + + +def dedupe(rec): + for key, val in rec.items(): + if isinstance(val, list): + seen = set() + rec[key] = [x for x in val if not (x in seen or seen.add(x))] + return rec + + +def values(el, tag): + """Yield the <VALUE> texts under each child <tag> (UN wraps many fields).""" + for sub in el.findall(tag): + for v in sub.findall("VALUE"): + if v.text and v.text.strip(): + yield v.text.strip() + + +def dob(el): + """Build a date-of-birth string from an <*_DATE_OF_BIRTH> element.""" + for d in el.findall("INDIVIDUAL_DATE_OF_BIRTH"): + date = text(d, "DATE") + if date: + yield date + continue + year = text(d, "YEAR") + if year: + from_y = text(d, "FROM_YEAR") + to_y = text(d, "TO_YEAR") + yield "{}-{}".format(from_y, to_y) if from_y and to_y else year + else: + from_y = text(d, "FROM_YEAR") + to_y = text(d, "TO_YEAR") + if from_y or to_y: + yield "{}-{}".format(from_y or "?", to_y or "?") + + +def address(el, tag): + """Format an <*_ADDRESS> child into a single address line.""" + for a in el.findall(tag): + parts = [text(a, p) for p in ("STREET", "CITY", "STATE_PROVINCE", + "ZIP_CODE", "COUNTRY", "NOTE")] + line = ", ".join(p for p in parts if p) + country = text(a, "COUNTRY") + yield line, country + + +def convert_person(ind): + dataid = text(ind, "DATAID") or "" + rec = { + "ssid": "UN-{}".format(dataid), + "foreign_identifier": text(ind, "REFERENCE_NUMBER"), + "target_type": "individual", + "justification": [], + "other_information": [], + } + gender = text(ind, "GENDER") + if gender: + rec["sex"] = gender.lower() + + name_parts = [text(ind, t) for t in + ("FIRST_NAME", "SECOND_NAME", "THIRD_NAME", "FOURTH_NAME")] + name_parts = [p for p in name_parts if p] + for p in name_parts: + add(rec, "PERSON_FIRST_NAMES", p) + if name_parts: + add(rec, "FULL_NAME", " ".join(name_parts)) + add(rec, "FULL_NAME", text(ind, "NAME_ORIGINAL_SCRIPT")) + + for alias in ind.findall("INDIVIDUAL_ALIAS"): + add(rec, "FULL_NAME", text(alias, "ALIAS_NAME")) + + for nat in values(ind, "NATIONALITY"): + add(rec, "NATIONALITY", nat) + + for d in dob(ind): + add(rec, "DATE_OF_BIRTH", d) + + for pob in ind.findall("INDIVIDUAL_PLACE_OF_BIRTH"): + parts = [text(pob, p) for p in ("CITY", "STATE_PROVINCE", "COUNTRY")] + line = ", ".join(p for p in parts if p) + if line: + add(rec, "other_information", "Place of birth: " + line) + + for line, country in address(ind, "INDIVIDUAL_ADDRESS"): + add(rec, "ADDRESS_LINES", line) + add(rec, "ADDRESS_COUNTRY", country) + + for doc in ind.findall("INDIVIDUAL_DOCUMENT"): + number = text(doc, "NUMBER") + if number: + add(rec, "PERSON_NATIONAL_ID", number) + + add(rec, "justification", text(ind, "COMMENTS1")) + return dedupe(rec) + + +def convert_entity(ent): + dataid = text(ent, "DATAID") or "" + rec = { + "ssid": "UN-{}".format(dataid), + "foreign_identifier": text(ent, "REFERENCE_NUMBER"), + "target_type": "entity", + "justification": [], + "other_information": [], + } + name = text(ent, "FIRST_NAME") + add(rec, "COMPANY_NAME", name) + add(rec, "BUSINESS_DISPLAY_NAME", name) + add(rec, "FULL_NAME", text(ent, "NAME_ORIGINAL_SCRIPT")) + + for alias in ent.findall("ENTITY_ALIAS"): + alias_name = text(alias, "ALIAS_NAME") + add(rec, "COMPANY_NAME", alias_name) + add(rec, "BUSINESS_DISPLAY_NAME", alias_name) + + for line, country in address(ent, "ENTITY_ADDRESS"): + add(rec, "REGISTERED_OFFICE_ADDRESS_LINES", line) + add(rec, "REGISTERED_OFFICE_ADDRESS_COUNTRY", country) + + add(rec, "justification", text(ent, "COMMENTS1")) + return dedupe(rec) + + +def convert(root): + targets = [] + for group in root.findall("INDIVIDUALS"): + for ind in group.findall("INDIVIDUAL"): + targets.append(convert_person(ind)) + for group in root.findall("ENTITIES"): + for ent in group.findall("ENTITY"): + targets.append(convert_entity(ent)) + return targets + + +def main(): + parser = argparse.ArgumentParser( + description="Convert the UN consolidated sanctions list (XML) to robocop JSON") + parser.add_argument("--input", help="Input XML file (default: stdin)") + parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") + parser.add_argument("--indent", type=int, default=2) + args = parser.parse_args() + + tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) + targets = convert(tree.getroot()) + + out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout + json.dump(targets, out, indent=args.indent, ensure_ascii=False) + if args.output: + out.close() + print("UN: converted {} targets".format(len(targets)), file=sys.stderr) + + +if __name__ == "__main__": + main()