robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

robocop-un-to-json (6896B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #
      4 # robocop-un-to-json
      5 #
      6 # Copyright (C) 2025 Taler Systems SA
      7 #
      8 # This program is free software: you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation, either version 3 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
     20 """
     21 UN Security Council Consolidated List XML to robocop JSON converter.
     22 
     23 Converts the UN consolidated list (the <CONSOLIDATED_LIST> schema with
     24 <INDIVIDUALS>/<INDIVIDUAL> and <ENTITIES>/<ENTITY> records published at
     25 scsanctions.un.org) into robocop's internal JSON format: a flat JSON array of
     26 self-contained target records keyed by a string "ssid", using the same registry
     27 field names as robocop-ch-to-json.
     28 
     29 Each record's ssid is namespaced "UN-<DATAID>".
     30 
     31 Usage:
     32     robocop-un-to-json < un.xml | robocop-json-postprocess > un.json
     33 """
     34 
     35 import xml.etree.ElementTree as ET
     36 import json
     37 import sys
     38 import argparse
     39 
     40 
     41 def text(el, tag):
     42     """Return the stripped text of a direct child <tag>, or None."""
     43     child = el.find(tag)
     44     if child is not None and child.text and child.text.strip():
     45         return child.text.strip()
     46     return None
     47 
     48 
     49 def add(rec, key, value):
     50     if value is None:
     51         return
     52     value = value.strip()
     53     if not value:
     54         return
     55     rec.setdefault(key, []).append(value)
     56 
     57 
     58 def dedupe(rec):
     59     for key, val in rec.items():
     60         if isinstance(val, list):
     61             seen = set()
     62             rec[key] = [x for x in val if not (x in seen or seen.add(x))]
     63     return rec
     64 
     65 
     66 def values(el, tag):
     67     """Yield the <VALUE> texts under each child <tag> (UN wraps many fields)."""
     68     for sub in el.findall(tag):
     69         for v in sub.findall("VALUE"):
     70             if v.text and v.text.strip():
     71                 yield v.text.strip()
     72 
     73 
     74 def dob(el):
     75     """Build a date-of-birth string from an <*_DATE_OF_BIRTH> element."""
     76     for d in el.findall("INDIVIDUAL_DATE_OF_BIRTH"):
     77         date = text(d, "DATE")
     78         if date:
     79             yield date
     80             continue
     81         year = text(d, "YEAR")
     82         if year:
     83             from_y = text(d, "FROM_YEAR")
     84             to_y = text(d, "TO_YEAR")
     85             yield "{}-{}".format(from_y, to_y) if from_y and to_y else year
     86         else:
     87             from_y = text(d, "FROM_YEAR")
     88             to_y = text(d, "TO_YEAR")
     89             if from_y or to_y:
     90                 yield "{}-{}".format(from_y or "?", to_y or "?")
     91 
     92 
     93 def address(el, tag):
     94     """Format an <*_ADDRESS> child into a single address line."""
     95     for a in el.findall(tag):
     96         parts = [text(a, p) for p in ("STREET", "CITY", "STATE_PROVINCE",
     97                                       "ZIP_CODE", "COUNTRY", "NOTE")]
     98         line = ", ".join(p for p in parts if p)
     99         country = text(a, "COUNTRY")
    100         yield line, country
    101 
    102 
    103 def convert_person(ind):
    104     dataid = text(ind, "DATAID") or ""
    105     rec = {
    106         "ssid": "UN-{}".format(dataid),
    107         "foreign_identifier": text(ind, "REFERENCE_NUMBER"),
    108         "target_type": "individual",
    109         "justification": [],
    110         "other_information": [],
    111     }
    112     gender = text(ind, "GENDER")
    113     if gender:
    114         rec["sex"] = gender.lower()
    115 
    116     name_parts = [text(ind, t) for t in
    117                   ("FIRST_NAME", "SECOND_NAME", "THIRD_NAME", "FOURTH_NAME")]
    118     name_parts = [p for p in name_parts if p]
    119     for p in name_parts:
    120         add(rec, "PERSON_FIRST_NAMES", p)
    121     if name_parts:
    122         add(rec, "FULL_NAME", " ".join(name_parts))
    123     add(rec, "FULL_NAME", text(ind, "NAME_ORIGINAL_SCRIPT"))
    124 
    125     for alias in ind.findall("INDIVIDUAL_ALIAS"):
    126         add(rec, "FULL_NAME", text(alias, "ALIAS_NAME"))
    127 
    128     for nat in values(ind, "NATIONALITY"):
    129         add(rec, "NATIONALITY", nat)
    130 
    131     for d in dob(ind):
    132         add(rec, "DATE_OF_BIRTH", d)
    133 
    134     for pob in ind.findall("INDIVIDUAL_PLACE_OF_BIRTH"):
    135         parts = [text(pob, p) for p in ("CITY", "STATE_PROVINCE", "COUNTRY")]
    136         line = ", ".join(p for p in parts if p)
    137         if line:
    138             add(rec, "other_information", "Place of birth: " + line)
    139 
    140     for line, country in address(ind, "INDIVIDUAL_ADDRESS"):
    141         add(rec, "ADDRESS_LINES", line)
    142         add(rec, "ADDRESS_COUNTRY", country)
    143 
    144     for doc in ind.findall("INDIVIDUAL_DOCUMENT"):
    145         number = text(doc, "NUMBER")
    146         if number:
    147             add(rec, "PERSON_NATIONAL_ID", number)
    148 
    149     add(rec, "justification", text(ind, "COMMENTS1"))
    150     return dedupe(rec)
    151 
    152 
    153 def convert_entity(ent):
    154     dataid = text(ent, "DATAID") or ""
    155     rec = {
    156         "ssid": "UN-{}".format(dataid),
    157         "foreign_identifier": text(ent, "REFERENCE_NUMBER"),
    158         "target_type": "entity",
    159         "justification": [],
    160         "other_information": [],
    161     }
    162     name = text(ent, "FIRST_NAME")
    163     add(rec, "COMPANY_NAME", name)
    164     add(rec, "BUSINESS_DISPLAY_NAME", name)
    165     add(rec, "FULL_NAME", text(ent, "NAME_ORIGINAL_SCRIPT"))
    166 
    167     for alias in ent.findall("ENTITY_ALIAS"):
    168         alias_name = text(alias, "ALIAS_NAME")
    169         add(rec, "COMPANY_NAME", alias_name)
    170         add(rec, "BUSINESS_DISPLAY_NAME", alias_name)
    171 
    172     for line, country in address(ent, "ENTITY_ADDRESS"):
    173         add(rec, "REGISTERED_OFFICE_ADDRESS_LINES", line)
    174         add(rec, "REGISTERED_OFFICE_ADDRESS_COUNTRY", country)
    175 
    176     add(rec, "justification", text(ent, "COMMENTS1"))
    177     return dedupe(rec)
    178 
    179 
    180 def convert(root):
    181     targets = []
    182     for group in root.findall("INDIVIDUALS"):
    183         for ind in group.findall("INDIVIDUAL"):
    184             targets.append(convert_person(ind))
    185     for group in root.findall("ENTITIES"):
    186         for ent in group.findall("ENTITY"):
    187             targets.append(convert_entity(ent))
    188     return targets
    189 
    190 
    191 def main():
    192     parser = argparse.ArgumentParser(
    193         description="Convert the UN consolidated sanctions list (XML) to robocop JSON")
    194     parser.add_argument("--input", help="Input XML file (default: stdin)")
    195     parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)")
    196     parser.add_argument("--indent", type=int, default=2)
    197     args = parser.parse_args()
    198 
    199     tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin)
    200     targets = convert(tree.getroot())
    201 
    202     out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout
    203     json.dump(targets, out, indent=args.indent, ensure_ascii=False)
    204     if args.output:
    205         out.close()
    206     print("UN: converted {} targets".format(len(targets)), file=sys.stderr)
    207 
    208 
    209 if __name__ == "__main__":
    210     main()