robocop-un-to-json (6896B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # robocop-un-to-json 5 # 6 # Copyright (C) 2025 Taler Systems SA 7 # 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 12 # 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 17 # 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <https://www.gnu.org/licenses/>. 20 """ 21 UN Security Council Consolidated List XML to robocop JSON converter. 22 23 Converts the UN consolidated list (the <CONSOLIDATED_LIST> schema with 24 <INDIVIDUALS>/<INDIVIDUAL> and <ENTITIES>/<ENTITY> records published at 25 scsanctions.un.org) into robocop's internal JSON format: a flat JSON array of 26 self-contained target records keyed by a string "ssid", using the same registry 27 field names as robocop-ch-to-json. 28 29 Each record's ssid is namespaced "UN-<DATAID>". 30 31 Usage: 32 robocop-un-to-json < un.xml | robocop-json-postprocess > un.json 33 """ 34 35 import xml.etree.ElementTree as ET 36 import json 37 import sys 38 import argparse 39 40 41 def text(el, tag): 42 """Return the stripped text of a direct child <tag>, or None.""" 43 child = el.find(tag) 44 if child is not None and child.text and child.text.strip(): 45 return child.text.strip() 46 return None 47 48 49 def add(rec, key, value): 50 if value is None: 51 return 52 value = value.strip() 53 if not value: 54 return 55 rec.setdefault(key, []).append(value) 56 57 58 def dedupe(rec): 59 for key, val in rec.items(): 60 if isinstance(val, list): 61 seen = set() 62 rec[key] = [x for x in val if not (x in seen or seen.add(x))] 63 return rec 64 65 66 def values(el, tag): 67 """Yield the <VALUE> texts under each child <tag> (UN wraps many fields).""" 68 for sub in el.findall(tag): 69 for v in sub.findall("VALUE"): 70 if v.text and v.text.strip(): 71 yield v.text.strip() 72 73 74 def dob(el): 75 """Build a date-of-birth string from an <*_DATE_OF_BIRTH> element.""" 76 for d in el.findall("INDIVIDUAL_DATE_OF_BIRTH"): 77 date = text(d, "DATE") 78 if date: 79 yield date 80 continue 81 year = text(d, "YEAR") 82 if year: 83 from_y = text(d, "FROM_YEAR") 84 to_y = text(d, "TO_YEAR") 85 yield "{}-{}".format(from_y, to_y) if from_y and to_y else year 86 else: 87 from_y = text(d, "FROM_YEAR") 88 to_y = text(d, "TO_YEAR") 89 if from_y or to_y: 90 yield "{}-{}".format(from_y or "?", to_y or "?") 91 92 93 def address(el, tag): 94 """Format an <*_ADDRESS> child into a single address line.""" 95 for a in el.findall(tag): 96 parts = [text(a, p) for p in ("STREET", "CITY", "STATE_PROVINCE", 97 "ZIP_CODE", "COUNTRY", "NOTE")] 98 line = ", ".join(p for p in parts if p) 99 country = text(a, "COUNTRY") 100 yield line, country 101 102 103 def convert_person(ind): 104 dataid = text(ind, "DATAID") or "" 105 rec = { 106 "ssid": "UN-{}".format(dataid), 107 "foreign_identifier": text(ind, "REFERENCE_NUMBER"), 108 "target_type": "individual", 109 "justification": [], 110 "other_information": [], 111 } 112 gender = text(ind, "GENDER") 113 if gender: 114 rec["sex"] = gender.lower() 115 116 name_parts = [text(ind, t) for t in 117 ("FIRST_NAME", "SECOND_NAME", "THIRD_NAME", "FOURTH_NAME")] 118 name_parts = [p for p in name_parts if p] 119 for p in name_parts: 120 add(rec, "PERSON_FIRST_NAMES", p) 121 if name_parts: 122 add(rec, "FULL_NAME", " ".join(name_parts)) 123 add(rec, "FULL_NAME", text(ind, "NAME_ORIGINAL_SCRIPT")) 124 125 for alias in ind.findall("INDIVIDUAL_ALIAS"): 126 add(rec, "FULL_NAME", text(alias, "ALIAS_NAME")) 127 128 for nat in values(ind, "NATIONALITY"): 129 add(rec, "NATIONALITY", nat) 130 131 for d in dob(ind): 132 add(rec, "DATE_OF_BIRTH", d) 133 134 for pob in ind.findall("INDIVIDUAL_PLACE_OF_BIRTH"): 135 parts = [text(pob, p) for p in ("CITY", "STATE_PROVINCE", "COUNTRY")] 136 line = ", ".join(p for p in parts if p) 137 if line: 138 add(rec, "other_information", "Place of birth: " + line) 139 140 for line, country in address(ind, "INDIVIDUAL_ADDRESS"): 141 add(rec, "ADDRESS_LINES", line) 142 add(rec, "ADDRESS_COUNTRY", country) 143 144 for doc in ind.findall("INDIVIDUAL_DOCUMENT"): 145 number = text(doc, "NUMBER") 146 if number: 147 add(rec, "PERSON_NATIONAL_ID", number) 148 149 add(rec, "justification", text(ind, "COMMENTS1")) 150 return dedupe(rec) 151 152 153 def convert_entity(ent): 154 dataid = text(ent, "DATAID") or "" 155 rec = { 156 "ssid": "UN-{}".format(dataid), 157 "foreign_identifier": text(ent, "REFERENCE_NUMBER"), 158 "target_type": "entity", 159 "justification": [], 160 "other_information": [], 161 } 162 name = text(ent, "FIRST_NAME") 163 add(rec, "COMPANY_NAME", name) 164 add(rec, "BUSINESS_DISPLAY_NAME", name) 165 add(rec, "FULL_NAME", text(ent, "NAME_ORIGINAL_SCRIPT")) 166 167 for alias in ent.findall("ENTITY_ALIAS"): 168 alias_name = text(alias, "ALIAS_NAME") 169 add(rec, "COMPANY_NAME", alias_name) 170 add(rec, "BUSINESS_DISPLAY_NAME", alias_name) 171 172 for line, country in address(ent, "ENTITY_ADDRESS"): 173 add(rec, "REGISTERED_OFFICE_ADDRESS_LINES", line) 174 add(rec, "REGISTERED_OFFICE_ADDRESS_COUNTRY", country) 175 176 add(rec, "justification", text(ent, "COMMENTS1")) 177 return dedupe(rec) 178 179 180 def convert(root): 181 targets = [] 182 for group in root.findall("INDIVIDUALS"): 183 for ind in group.findall("INDIVIDUAL"): 184 targets.append(convert_person(ind)) 185 for group in root.findall("ENTITIES"): 186 for ent in group.findall("ENTITY"): 187 targets.append(convert_entity(ent)) 188 return targets 189 190 191 def main(): 192 parser = argparse.ArgumentParser( 193 description="Convert the UN consolidated sanctions list (XML) to robocop JSON") 194 parser.add_argument("--input", help="Input XML file (default: stdin)") 195 parser.add_argument("--output", "-o", help="Output JSON file (default: stdout)") 196 parser.add_argument("--indent", type=int, default=2) 197 args = parser.parse_args() 198 199 tree = ET.parse(args.input) if args.input else ET.parse(sys.stdin) 200 targets = convert(tree.getroot()) 201 202 out = open(args.output, "w", encoding="utf-8") if args.output else sys.stdout 203 json.dump(targets, out, indent=args.indent, ensure_ascii=False) 204 if args.output: 205 out.close() 206 print("UN: converted {} targets".format(len(targets)), file=sys.stderr) 207 208 209 if __name__ == "__main__": 210 main()