robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

robocop-ofac-to-json (6799B)


      1 #!/usr/bin/env python3
      2 # -*- coding: utf-8 -*-
      3 #
      4 # robocop-ofac-to-json
      5 #
      6 # Copyright (C) 2025 Taler Systems SA
      7 #
      8 # This program is free software: you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation, either version 3 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
     20 """
     21 OFAC (US Treasury) sanctions XML to robocop JSON converter.
     22 
     23 Converts OFAC's legacy <sdnList>/<sdnEntry> XML schema into robocop's internal
     24 JSON format. This schema is used by BOTH OFAC publications served from
     25 sanctionslistservice.ofac.treas.gov:
     26   - the SDN list           (.../exports/SDN.XML)
     27   - the Consolidated list  (.../exports/CONS.XML)   [non-SDN]
     28 so a single converter handles both. Output is a flat JSON array of self-contained
     29 target records keyed by a string "ssid" (namespaced "OFAC-<uid>"), using the same
     30 registry field names as robocop-ch-to-json.
     31 
     32 Usage:
     33     robocop-ofac-to-json < SDN.XML  | robocop-json-postprocess > ofac-sdn.json
     34     robocop-ofac-to-json < CONS.XML | robocop-json-postprocess > ofac-cons.json
     35 """
     36 
     37 import xml.etree.ElementTree as ET
     38 import json
     39 import sys
     40 import argparse
     41 
     42 
     43 def strip_ns(root):
     44     for el in root.iter():
     45         if isinstance(el.tag, str) and "}" in el.tag:
     46             el.tag = el.tag.split("}", 1)[1]
     47     return root
     48 
     49 
     50 def text(el, tag):
     51     child = el.find(tag)
     52     if child is not None and child.text and child.text.strip():
     53         return child.text.strip()
     54     return None
     55 
     56 
     57 def add(rec, key, value):
     58     if value is None:
     59         return
     60     value = value.strip()
     61     if not value:
     62         return
     63     rec.setdefault(key, []).append(value)
     64 
     65 
     66 def dedupe(rec):
     67     for key, val in rec.items():
     68         if isinstance(val, list):
     69             seen = set()
     70             rec[key] = [x for x in val if not (x in seen or seen.add(x))]
     71     return rec
     72 
     73 
     74 def whole_name(first, last):
     75     return " ".join(p for p in (first, last) if p)
     76 
     77 
     78 def add_name(rec, first, last, is_entity):
     79     if is_entity:
     80         name = whole_name(first, last)
     81         add(rec, "COMPANY_NAME", name)
     82         add(rec, "BUSINESS_DISPLAY_NAME", name)
     83     else:
     84         add(rec, "PERSON_FIRST_NAMES", first)
     85         add(rec, "PERSON_LAST_NAME", last)
     86         add(rec, "FULL_NAME", whole_name(first, last))
     87 
     88 
     89 def convert(root, prefix="OFAC-"):
     90     targets = []
     91     for entry in root.findall("sdnEntry"):
     92         uid = text(entry, "uid") or ""
     93         sdn_type = text(entry, "sdnType") or ""
     94         is_entity = sdn_type not in ("Individual",)
     95         rec = {
     96             "ssid": "{}{}".format(prefix, uid),
     97             "target_type": "individual" if sdn_type == "Individual" else (
     98                 "entity" if sdn_type == "Entity" else "other"),
     99             "sdn_type": sdn_type or None,
    100             "justification": [],
    101             "other_information": [],
    102         }
    103         addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_"
    104 
    105         add_name(rec, text(entry, "firstName"), text(entry, "lastName"), is_entity)
    106         add(rec, "other_information", text(entry, "title"))
    107 
    108         aka_list = entry.find("akaList")
    109         if aka_list is not None:
    110             for aka in aka_list.findall("aka"):
    111                 add_name(rec, text(aka, "firstName"), text(aka, "lastName"), is_entity)
    112 
    113         dob_list = entry.find("dateOfBirthList")
    114         if dob_list is not None:
    115             for item in dob_list.findall("dateOfBirthItem"):
    116                 add(rec, "DATE_OF_BIRTH", text(item, "dateOfBirth"))
    117 
    118         pob_list = entry.find("placeOfBirthList")
    119         if pob_list is not None:
    120             for item in pob_list.findall("placeOfBirthItem"):
    121                 pob = text(item, "placeOfBirth")
    122                 if pob:
    123                     add(rec, "other_information", "Place of birth: " + pob)
    124 
    125         nat_list = entry.find("nationalityList")
    126         if nat_list is not None:
    127             for item in nat_list.findall("nationality"):
    128                 add(rec, "NATIONALITY", text(item, "country"))
    129 
    130         addr_list = entry.find("addressList")
    131         if addr_list is not None:
    132             for ad in addr_list.findall("address"):
    133                 line = ", ".join(p for p in (text(ad, "address1"), text(ad, "address2"),
    134                                              text(ad, "address3")) if p)
    135                 add(rec, addr_prefix + "LINES", line)
    136                 add(rec, addr_prefix + "TOWN_LOCATION", text(ad, "city"))
    137                 add(rec, addr_prefix + "COUNTRY_SUBDIVISION", text(ad, "stateOrProvince"))
    138                 add(rec, addr_prefix + "ZIPCODE", text(ad, "postalCode"))
    139                 add(rec, addr_prefix + "COUNTRY", text(ad, "country"))
    140 
    141         id_list = entry.find("idList")
    142         if id_list is not None:
    143             for idel in id_list.findall("id"):
    144                 number = text(idel, "idNumber")
    145                 id_type = (text(idel, "idType") or "").lower()
    146                 # Skip OFAC's non-identifier annotations carried in idList.
    147                 if number and "secondary sanctions risk" not in id_type:
    148                     add(rec, "PERSON_NATIONAL_ID", number)
    149 
    150         for rmk in entry.findall("remarks"):
    151             add(rec, "justification", rmk.text)
    152 
    153         targets.append(dedupe(rec))
    154     return targets
    155 
    156 
    157 def main():
    158     parser = argparse.ArgumentParser(
    159         description="Convert an OFAC sanctions list (legacy sdnList XML) to robocop JSON")
    160     parser.add_argument("--input", help="Input XML file (default: stdin)")
    161     parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)")
    162     parser.add_argument("--prefix", default="OFAC-",
    163                         help="ssid prefix (default: OFAC-). Use a distinct value, "
    164                              "e.g. OFAC-CONS-, for the consolidated list so its uids "
    165                              "do not collide with the SDN list when the two are merged.")
    166     parser.add_argument("--indent", type=int, default=2)
    167     args = parser.parse_args()
    168 
    169     tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin)
    170     root = strip_ns(tree.getroot())
    171     targets = convert(root, args.prefix)
    172 
    173     out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout
    174     json.dump(targets, out, indent=args.indent, ensure_ascii=False)
    175     if args.output:
    176         out.close()
    177     print("OFAC: converted {} targets".format(len(targets)), file=sys.stderr)
    178 
    179 
    180 if __name__ == "__main__":
    181     main()