robocop-uk-to-json (6296B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # robocop-uk-to-json 5 # 6 # Copyright (C) 2025 Taler Systems SA 7 # 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 12 # 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 17 # 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <https://www.gnu.org/licenses/>. 20 """ 21 UK OFSI Consolidated List XML to robocop JSON converter. 22 23 Converts the UK OFSI "ConList" (the <ArrayOfFinancialSanctionsTarget> schema, 24 default namespace http://schemas.hmtreasury.gov.uk/ofsi/consolidatedlist) into 25 robocop's internal JSON format. The OFSI list is FLAT: each 26 <FinancialSanctionsTarget> is a single name/alias row, and rows that share a 27 <GroupID> are the same designated target. This converter groups rows by GroupID 28 so one robocop record (ssid "GB-<GroupID>") accumulates every name variation, 29 address and attribute, using the same registry field names as robocop-ch-to-json. 30 31 Usage: 32 robocop-uk-to-json < ConList.xml | robocop-json-postprocess > uk.json 33 """ 34 35 import xml.etree.ElementTree as ET 36 import json 37 import sys 38 import argparse 39 40 41 def strip_ns(root): 42 for el in root.iter(): 43 if isinstance(el.tag, str) and "}" in el.tag: 44 el.tag = el.tag.split("}", 1)[1] 45 return root 46 47 48 def text(el, tag): 49 child = el.find(tag) 50 if child is not None and child.text and child.text.strip(): 51 return child.text.strip() 52 return None 53 54 55 def add(rec, key, value): 56 if value is None: 57 return 58 value = value.strip() 59 if not value: 60 return 61 rec.setdefault(key, []).append(value) 62 63 64 def dedupe(rec): 65 for key, val in rec.items(): 66 if isinstance(val, list): 67 seen = set() 68 rec[key] = [x for x in val if not (x in seen or seen.add(x))] 69 return rec 70 71 72 def date_only(value): 73 """OFSI dates look like 2022-12-09T00:00:00; keep the date part.""" 74 if value and "T" in value: 75 return value.split("T", 1)[0] 76 return value 77 78 79 def convert(root): 80 groups = {} 81 order = [] 82 for t in root.findall("FinancialSanctionsTarget"): 83 gid = text(t, "GroupID") or "" 84 if gid not in groups: 85 type_desc = (text(t, "GroupTypeDescription") or "").lower() 86 target_type = "individual" if type_desc == "individual" else ( 87 "entity" if type_desc == "entity" else "other") 88 groups[gid] = { 89 "ssid": "GB-{}".format(gid), 90 "foreign_identifier": text(t, "UKSanctionsListRef"), 91 "target_type": target_type, 92 "justification": [], 93 "other_information": [], 94 } 95 order.append(gid) 96 rec = groups[gid] 97 is_entity = rec["target_type"] == "entity" 98 addr_prefix = "REGISTERED_OFFICE_ADDRESS_" if is_entity else "ADDRESS_" 99 100 # Name parts: name1..name5 are forenames, Name6 is the family name. 101 forenames = [text(t, "name{}".format(i)) for i in range(1, 6)] 102 forenames = [p for p in forenames if p] 103 family = text(t, "Name6") 104 whole = " ".join(p for p in (forenames + [family]) if p) 105 if is_entity: 106 add(rec, "COMPANY_NAME", whole) 107 add(rec, "BUSINESS_DISPLAY_NAME", whole) 108 else: 109 for p in forenames: 110 add(rec, "PERSON_FIRST_NAMES", p) 111 add(rec, "PERSON_LAST_NAME", family) 112 add(rec, "FULL_NAME", whole) 113 add(rec, "FULL_NAME", text(t, "NameNonLatinScript")) 114 115 gender = text(t, "Individual_Gender") 116 if gender and "sex" not in rec: 117 rec["sex"] = gender.lower() 118 119 # Address. 120 line = ", ".join(p for p in (text(t, "Address1"), text(t, "Address2"), 121 text(t, "Address3"), text(t, "Address4"), 122 text(t, "Address5"), text(t, "Address6")) if p) 123 add(rec, addr_prefix + "LINES", line) 124 add(rec, addr_prefix + "ZIPCODE", text(t, "PostCode")) 125 add(rec, addr_prefix + "COUNTRY", text(t, "Country")) 126 127 # Individual attributes. 128 add(rec, "DATE_OF_BIRTH", date_only(text(t, "Individual_DateOfBirth"))) 129 add(rec, "NATIONALITY", text(t, "Individual_Nationality")) 130 add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_PassportNumber")) 131 add(rec, "PERSON_NATIONAL_ID", text(t, "Individual_NINumber")) 132 cob = text(t, "Individual_CountryOfBirth") 133 tob = text(t, "Individual_TownOfBirth") 134 pob = ", ".join(p for p in (tob, cob) if p) 135 if pob: 136 add(rec, "other_information", "Place of birth: " + pob) 137 138 # Entity attributes. 139 add(rec, "COMMERCIAL_REGISTER_NUMBER", text(t, "Entity_BusinessRegNumber")) 140 141 # Contact details and reasons. 142 add(rec, "CONTACT_PHONE", text(t, "PhoneNumber")) 143 add(rec, "CONTACT_EMAIL", text(t, "EmailAddress")) 144 add(rec, "justification", text(t, "UKStatementOfReasons")) 145 add(rec, "other_information", text(t, "OtherInformation")) 146 147 return [dedupe(groups[gid]) for gid in order] 148 149 150 def main(): 151 parser = argparse.ArgumentParser( 152 description="Convert the UK OFSI consolidated list (XML) to robocop JSON") 153 parser.add_argument("--input", help="Input XML file (default: stdin)") 154 parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") 155 parser.add_argument("--indent", type=int, default=2) 156 args = parser.parse_args() 157 158 tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) 159 root = strip_ns(tree.getroot()) 160 targets = convert(root) 161 162 out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout 163 json.dump(targets, out, indent=args.indent, ensure_ascii=False) 164 if args.output: 165 out.close() 166 print("UK: converted {} targets".format(len(targets)), file=sys.stderr) 167 168 169 if __name__ == "__main__": 170 main()