robocop-ofac-to-json (6799B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # robocop-ofac-to-json 5 # 6 # Copyright (C) 2025 Taler Systems SA 7 # 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 12 # 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 17 # 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <https://www.gnu.org/licenses/>. 20 """ 21 OFAC (US Treasury) sanctions XML to robocop JSON converter. 22 23 Converts OFAC's legacy <sdnList>/<sdnEntry> XML schema into robocop's internal 24 JSON format. This schema is used by BOTH OFAC publications served from 25 sanctionslistservice.ofac.treas.gov: 26 - the SDN list (.../exports/SDN.XML) 27 - the Consolidated list (.../exports/CONS.XML) [non-SDN] 28 so a single converter handles both. Output is a flat JSON array of self-contained 29 target records keyed by a string "ssid" (namespaced "OFAC-<uid>"), using the same 30 registry field names as robocop-ch-to-json. 31 32 Usage: 33 robocop-ofac-to-json < SDN.XML | robocop-json-postprocess > ofac-sdn.json 34 robocop-ofac-to-json < CONS.XML | robocop-json-postprocess > ofac-cons.json 35 """ 36 37 import xml.etree.ElementTree as ET 38 import json 39 import sys 40 import argparse 41 42 43 def strip_ns(root): 44 for el in root.iter(): 45 if isinstance(el.tag, str) and "}" in el.tag: 46 el.tag = el.tag.split("}", 1)[1] 47 return root 48 49 50 def text(el, tag): 51 child = el.find(tag) 52 if child is not None and child.text and child.text.strip(): 53 return child.text.strip() 54 return None 55 56 57 def add(rec, key, value): 58 if value is None: 59 return 60 value = value.strip() 61 if not value: 62 return 63 rec.setdefault(key, []).append(value) 64 65 66 def dedupe(rec): 67 for key, val in rec.items(): 68 if isinstance(val, list): 69 seen = set() 70 rec[key] = [x for x in val if not (x in seen or seen.add(x))] 71 return rec 72 73 74 def whole_name(first, last): 75 return " ".join(p for p in (first, last) if p) 76 77 78 def add_name(rec, first, last, is_entity): 79 if is_entity: 80 name = whole_name(first, last) 81 add(rec, "COMPANY_NAME", name) 82 add(rec, "BUSINESS_DISPLAY_NAME", name) 83 else: 84 add(rec, "PERSON_FIRST_NAMES", first) 85 add(rec, "PERSON_LAST_NAME", last) 86 add(rec, "FULL_NAME", whole_name(first, last)) 87 88 89 def convert(root, prefix="OFAC-"): 90 targets = [] 91 for entry in root.findall("sdnEntry"): 92 uid = text(entry, "uid") or "" 93 sdn_type = text(entry, "sdnType") or "" 94 is_entity = sdn_type not in ("Individual",) 95 rec = { 96 "ssid": "{}{}".format(prefix, uid), 97 "target_type": "individual" if sdn_type == "Individual" else ( 98 "entity" if sdn_type == "Entity" else "other"), 99 "sdn_type": sdn_type or None, 100 "justification": [], 101 "other_information": [], 102 } 103 addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" 104 105 add_name(rec, text(entry, "firstName"), text(entry, "lastName"), is_entity) 106 add(rec, "other_information", text(entry, "title")) 107 108 aka_list = entry.find("akaList") 109 if aka_list is not None: 110 for aka in aka_list.findall("aka"): 111 add_name(rec, text(aka, "firstName"), text(aka, "lastName"), is_entity) 112 113 dob_list = entry.find("dateOfBirthList") 114 if dob_list is not None: 115 for item in dob_list.findall("dateOfBirthItem"): 116 add(rec, "DATE_OF_BIRTH", text(item, "dateOfBirth")) 117 118 pob_list = entry.find("placeOfBirthList") 119 if pob_list is not None: 120 for item in pob_list.findall("placeOfBirthItem"): 121 pob = text(item, "placeOfBirth") 122 if pob: 123 add(rec, "other_information", "Place of birth: " + pob) 124 125 nat_list = entry.find("nationalityList") 126 if nat_list is not None: 127 for item in nat_list.findall("nationality"): 128 add(rec, "NATIONALITY", text(item, "country")) 129 130 addr_list = entry.find("addressList") 131 if addr_list is not None: 132 for ad in addr_list.findall("address"): 133 line = ", ".join(p for p in (text(ad, "address1"), text(ad, "address2"), 134 text(ad, "address3")) if p) 135 add(rec, addr_prefix + "LINES", line) 136 add(rec, addr_prefix + "TOWN_LOCATION", text(ad, "city")) 137 add(rec, addr_prefix + "COUNTRY_SUBDIVISION", text(ad, "stateOrProvince")) 138 add(rec, addr_prefix + "ZIPCODE", text(ad, "postalCode")) 139 add(rec, addr_prefix + "COUNTRY", text(ad, "country")) 140 141 id_list = entry.find("idList") 142 if id_list is not None: 143 for idel in id_list.findall("id"): 144 number = text(idel, "idNumber") 145 id_type = (text(idel, "idType") or "").lower() 146 # Skip OFAC's non-identifier annotations carried in idList. 147 if number and "secondary sanctions risk" not in id_type: 148 add(rec, "PERSON_NATIONAL_ID", number) 149 150 for rmk in entry.findall("remarks"): 151 add(rec, "justification", rmk.text) 152 153 targets.append(dedupe(rec)) 154 return targets 155 156 157 def main(): 158 parser = argparse.ArgumentParser( 159 description="Convert an OFAC sanctions list (legacy sdnList XML) to robocop JSON") 160 parser.add_argument("--input", help="Input XML file (default: stdin)") 161 parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") 162 parser.add_argument("--prefix", default="OFAC-", 163 help="ssid prefix (default: OFAC-). Use a distinct value, " 164 "e.g. OFAC-CONS-, for the consolidated list so its uids " 165 "do not collide with the SDN list when the two are merged.") 166 parser.add_argument("--indent", type=int, default=2) 167 args = parser.parse_args() 168 169 tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) 170 root = strip_ns(tree.getroot()) 171 targets = convert(root, args.prefix) 172 173 out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout 174 json.dump(targets, out, indent=args.indent, ensure_ascii=False) 175 if args.output: 176 out.close() 177 print("OFAC: converted {} targets".format(len(targets)), file=sys.stderr) 178 179 180 if __name__ == "__main__": 181 main()