robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

robocop-uk-to-json (6296B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #
      4 # robocop-uk-to-json
      5 #
      6 # Copyright (C) 2025 Taler Systems SA
      7 #
      8 # This program is free software: you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation, either version 3 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
     20 """
     21 UK OFSI Consolidated List XML to robocop JSON converter.
     22 
     23 Converts the UK OFSI "ConList" (the <ArrayOfFinancialSanctionsTarget> schema,
     24 default namespace http://schemas.hmtreasury.gov.uk/ofsi/consolidatedlist) into
     25 robocop's internal JSON format. The OFSI list is FLAT: each
     26 <FinancialSanctionsTarget> is a single name/alias row, and rows that share a
     27 <GroupID> are the same designated target. This converter groups rows by GroupID
     28 so one robocop record (ssid "GB-<GroupID>") accumulates every name variation,
     29 address and attribute, using the same registry field names as robocop-ch-to-json.
     30 
     31 Usage:
     32     robocop-uk-to-json < ConList.xml | robocop-json-postprocess > uk.json
     33 """
     34 
     35 import xml.etree.ElementTree as ET
     36 import json
     37 import sys
     38 import argparse
     39 
     40 
     41 def strip_ns(root):
     42     for el in root.iter():
     43         if isinstance(el.tag, str) and "}" in el.tag:
     44             el.tag = el.tag.split("}", 1)[1]
     45     return root
     46 
     47 
     48 def text(el, tag):
     49     child = el.find(tag)
     50     if child is not None and child.text and child.text.strip():
     51         return child.text.strip()
     52     return None
     53 
     54 
     55 def add(rec, key, value):
     56     if value is None:
     57         return
     58     value = value.strip()
     59     if not value:
     60         return
     61     rec.setdefault(key, []).append(value)
     62 
     63 
     64 def dedupe(rec):
     65     for key, val in rec.items():
     66         if isinstance(val, list):
     67             seen = set()
     68             rec[key] = [x for x in val if not (x in seen or seen.add(x))]
     69     return rec
     70 
     71 
     72 def date_only(value):
     73     """OFSI dates look like 2022-12-09T00:00:00; keep the date part."""
     74     if value and "T" in value:
     75         return value.split("T", 1)[0]
     76     return value
     77 
     78 
     79 def convert(root):
     80     groups = {}
     81     order = []
     82     for t in root.findall("FinancialSanctionsTarget"):
     83         gid = text(t, "GroupID") or ""
     84         if gid not in groups:
     85             type_desc = (text(t, "GroupTypeDescription") or "").lower()
     86             target_type = "individual" if type_desc == "individual" else (
     87                 "entity" if type_desc == "entity" else "other")
     88             groups[gid] = {
     89                 "ssid": "GB-{}".format(gid),
     90                 "foreign_identifier": text(t, "UKSanctionsListRef"),
     91                 "target_type": target_type,
     92                 "justification": [],
     93                 "other_information": [],
     94             }
     95             order.append(gid)
     96         rec = groups[gid]
     97         is_entity = rec["target_type"] == "entity"
     98         addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_"
     99 
    100         # Name parts: name1..name5 are forenames, Name6 is the family name.
    101         forenames = [text(t, "name{}".format(i)) for i in range(1, 6)]
    102         forenames = [p for p in forenames if p]
    103         family = text(t, "Name6")
    104         whole = " ".join(p for p in (forenames + [family]) if p)
    105         if is_entity:
    106             add(rec, "COMPANY_NAME", whole)
    107             add(rec, "BUSINESS_DISPLAY_NAME", whole)
    108         else:
    109             for p in forenames:
    110                 add(rec, "PERSON_FIRST_NAMES", p)
    111             add(rec, "PERSON_LAST_NAME", family)
    112             add(rec, "FULL_NAME", whole)
    113         add(rec, "FULL_NAME", text(t, "NameNonLatinScript"))
    114 
    115         gender = text(t, "Individual_Gender")
    116         if gender and "sex" not in rec:
    117             rec["sex"] = gender.lower()
    118 
    119         # Address.
    120         line = ", ".join(p for p in (text(t, "Address1"), text(t, "Address2"),
    121                                      text(t, "Address3"), text(t, "Address4"),
    122                                      text(t, "Address5"), text(t, "Address6")) if p)
    123         add(rec, addr_prefix + "LINES", line)
    124         add(rec, addr_prefix + "ZIPCODE", text(t, "PostCode"))
    125         add(rec, addr_prefix + "COUNTRY", text(t, "Country"))
    126 
    127         # Individual attributes.
    128         add(rec, "DATE_OF_BIRTH", date_only(text(t, "Individual_DateOfBirth")))
    129         add(rec, "NATIONALITY", text(t, "Individual_Nationality"))
    130         add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_PassportNumber"))
    131         add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_NINumber"))
    132         cob = text(t, "Individual_CountryOfBirth")
    133         tob = text(t, "Individual_TownOfBirth")
    134         pob = ", ".join(p for p in (tob, cob) if p)
    135         if pob:
    136             add(rec, "other_information", "Place of birth: " + pob)
    137 
    138         # Entity attributes.
    139         add(rec, "COMMERCIAL_REGISTER_NUMBER", text(t, "Entity_BusinessRegNumber"))
    140 
    141         # Contact details and reasons.
    142         add(rec, "CONTACT_PHONE", text(t, "PhoneNumber"))
    143         add(rec, "CONTACT_EMAIL", text(t, "EmailAddress"))
    144         add(rec, "justification", text(t, "UKStatementOfReasons"))
    145         add(rec, "other_information", text(t, "OtherInformation"))
    146 
    147     return [dedupe(groups[gid]) for gid in order]
    148 
    149 
    150 def main():
    151     parser = argparse.ArgumentParser(
    152         description="Convert the UK OFSI consolidated list (XML) to robocop JSON")
    153     parser.add_argument("--input", help="Input XML file (default: stdin)")
    154     parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)")
    155     parser.add_argument("--indent", type=int, default=2)
    156     args = parser.parse_args()
    157 
    158     tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin)
    159     root = strip_ns(tree.getroot())
    160     targets = convert(root)
    161 
    162     out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout
    163     json.dump(targets, out, indent=args.indent, ensure_ascii=False)
    164     if args.output:
    165         out.close()
    166     print("UK: converted {} targets".format(len(targets)), file=sys.stderr)
    167 
    168 
    169 if __name__ == "__main__":
    170     main()