robocop-ch-to-json (44458B)
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 # 4 # robocop-ch-to-json 5 # 6 # Copyright (C) 2025 Taler Systems SA 7 # 8 # This program is free software: you can redistribute it and/or modify 9 # it under the terms of the GNU General Public License as published by 10 # the Free Software Foundation, either version 3 of the License, or 11 # (at your option) any later version. 12 # 13 # This program is distributed in the hope that it will be useful, 14 # but WITHOUT ANY WARRANTY; without even the implied warranty of 15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 16 # GNU General Public License for more details. 17 # 18 # You should have received a copy of the GNU General Public License 19 # along with this program. If not, see <https://www.gnu.org/licenses/>. 20 """ 21 Swiss Sanctions XML to JSON Converter 22 23 This program converts Swiss sanctions XML files (following the swiss-sanctions-list XSD) 24 to JSON format, mapping XML elements to registry identifiers and inlining all references. 25 26 Features: 27 - Converts targets (individuals, entities, objects) to self-contained JSON records 28 - Maps XML elements to registry identifiers from GANA registry 29 - Inlines place references (location, area, country) 30 - Flattens name parts and includes spelling variants 31 - Handles multiple identities and addresses 32 - Preserves all identification documents and relationships 33 34 Usage: 35 robocop-ch-to-json < input.xml > output.json 36 """ 37 38 import xml.etree.ElementTree as ET 39 import json 40 import sys 41 from datetime import datetime 42 from typing import Dict, List, Any, Optional 43 import argparse 44 import re 45 46 class SwissSanctionsConverter: 47 """Converts Swiss sanctions XML to JSON format with registry mapping.""" 48 49 def __init__(self): 50 # Registry mapping from XML elements to standardized identifiers 51 self.registry_mapping = { 52 # Personal information 53 'given-name': 'PERSON_FIRST_NAMES', 54 'further-given-name': 'PERSON_FIRST_NAMES', 55 'family-name': 'PERSON_LAST_NAME', 56 'maiden-name': 'PERSON_LAST_NAME', 57 'whole-name': 'FULL_NAME', 58 'day-month-year': 'DATE_OF_BIRTH', 59 'nationality': 'NATIONALITY', 60 'identification-document': 'PERSON_NATIONAL_ID', 61 62 # Business information 63 'entity_name': 'COMPANY_NAME', 64 'business_name': 'BUSINESS_DISPLAY_NAME', 65 66 # Address information 67 'address-details': 'ADDRESS_LINES', 68 'zip-code': 'ADDRESS_ZIPCODE', 69 'c-o': 'ADDRESS_LINES', 70 'p-o-box': 'ADDRESS_LINES', 71 72 # Contact information 73 'contact-person': 'CONTACT_PERSON_NAME', 74 } 75 76 def parse_xml(self, xml_file: str) -> ET.Element: 77 """Parse the XML file and return the root element.""" 78 try: 79 if xml_file: 80 tree = ET.parse(xml_file) 81 else: 82 tree = ET.parse(sys.stdin) 83 return tree.getroot() 84 except ET.ParseError as e: 85 raise ValueError(f"Invalid XML file: {e}") 86 except FileNotFoundError: 87 raise FileNotFoundError(f"XML file not found: {xml_file}") 88 89 def _parse_element(self, element: ET.Element) -> Dict[str, Any]: 90 """Recursively parse XML element to dictionary.""" 91 result = {} 92 93 # Add attributes 94 if element.attrib: 95 result.update(element.attrib) 96 # Convert numeric attributes to integers where appropriate 97 for key, value in element.attrib.items(): 98 if key in ['ssid', 'day', 'month', 'year', 'place-id', 'target-id', 'order']: 99 try: 100 result[key] = int(value) 101 except ValueError: 102 pass # Keep as string if conversion fails 103 elif key in ['main', 'current']: 104 result[key] = value.lower() == 'true' 105 106 # Handle text content 107 if element.text and element.text.strip(): 108 if len(element) == 0: # Leaf node with text only 109 return element.text.strip() 110 else: # Mixed content 111 result['_text'] = element.text.strip() 112 113 # Process child elements 114 children_by_tag = {} 115 for child in element: 116 tag = child.tag 117 child_data = self._parse_element(child) 118 119 if tag not in children_by_tag: 120 children_by_tag[tag] = [] 121 children_by_tag[tag].append(child_data) 122 123 # Add children to result 124 for tag, children in children_by_tag.items(): 125 if len(children) == 1: 126 result[tag] = children[0] 127 else: 128 result[tag] = children 129 130 return result 131 132 def build_place_lookup(self, root: ET.Element) -> Dict[str, Dict[str, Any]]: 133 """Build a lookup dictionary for place references.""" 134 places = {} 135 136 for place_elem in root.findall('place'): 137 ssid = place_elem.get('ssid') 138 if ssid: 139 place_data = { 140 'location': None, 141 'location_variants': [], 142 'area': None, 143 'area_variants': [], 144 'country': None, 145 'country_code': None 146 } 147 148 # Extract location 149 location_elem = place_elem.find('location') 150 if location_elem is not None and location_elem.text: 151 place_data['location'] = location_elem.text.strip() 152 153 # Extract location variants 154 for variant in place_elem.findall('location-variant'): 155 if variant.text: 156 place_data['location_variants'].append({ 157 'value': variant.text.strip(), 158 'type': variant.get('variant-type', 'unknown') 159 }) 160 161 # Extract area 162 area_elem = place_elem.find('area') 163 if area_elem is not None and area_elem.text: 164 place_data['area'] = area_elem.text.strip() 165 166 # Extract area variants 167 for variant in place_elem.findall('area-variant'): 168 if variant.text: 169 place_data['area_variants'].append({ 170 'value': variant.text.strip(), 171 'type': variant.get('variant-type', 'unknown') 172 }) 173 174 # Extract country 175 country_elem = place_elem.find('country') 176 if country_elem is not None: 177 place_data['country'] = country_elem.text.strip() if country_elem.text else None 178 place_data['country_code'] = country_elem.get('iso-code') 179 180 places[ssid] = place_data 181 182 return places 183 184 def resolve_place(self, place_id: str, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 185 """Resolve a place reference and return flattened address components.""" 186 if place_id not in places_lookup: 187 return {} 188 189 place = places_lookup[place_id] 190 result = {} 191 192 # Add country information 193 if place['country_code']: 194 result['ADDRESS_COUNTRY'] = [place['country_code']] 195 196 # Add location (town/city) 197 locations = [] 198 if place['location']: 199 locations.append(place['location']) 200 for variant in place['location_variants']: 201 locations.append(variant['value']) 202 if locations: 203 result['ADDRESS_TOWN_LOCATION'] = locations 204 205 # Add area (district/subdivision) 206 areas = [] 207 if place['area']: 208 areas.append(place['area']) 209 for variant in place['area_variants']: 210 areas.append(variant['value']) 211 if areas: 212 result['ADDRESS_COUNTRY_SUBDIVISION'] = areas 213 214 return result 215 216 def extract_names(self, identity_elem: ET.Element) -> Dict[str, List[str]]: 217 """Extract and flatten name information from an identity element.""" 218 result = { 219 'PERSON_FIRST_NAMES': [], 220 'PERSON_LAST_NAME': [], 221 'FULL_NAME': [] 222 } 223 224 for name_elem in identity_elem.findall('name'): 225 # Process name parts 226 name_parts = [] 227 first_names = [] 228 last_names = [] 229 230 for name_part in name_elem.findall('name-part'): 231 part_type = name_part.get('name-part-type', '') 232 value_elem = name_part.find('value') 233 234 if value_elem is not None and value_elem.text: 235 value = value_elem.text.strip() 236 name_parts.append(value) 237 238 # Categorize name parts 239 if part_type in ['given-name', 'further-given-name']: 240 first_names.append(value) 241 elif part_type in ['family-name', 'maiden-name']: 242 last_names.append(value) 243 elif part_type == 'whole-name': 244 result['FULL_NAME'].append(value) 245 246 # Add spelling variants 247 for variant in name_part.findall('spelling-variant'): 248 if variant.text: 249 variant_value = variant.text.strip() 250 if part_type in ['given-name', 'further-given-name']: 251 first_names.append(variant_value) 252 elif part_type in ['family-name', 'maiden-name']: 253 last_names.append(variant_value) 254 elif part_type == 'whole-name': 255 result['FULL_NAME'].append(variant_value) 256 257 # Add categorized names 258 result['PERSON_FIRST_NAMES'].extend(first_names) 259 result['PERSON_LAST_NAME'].extend(last_names) 260 261 # If we have separate parts but no whole name, combine them 262 if name_parts and not any(part.get('name-part-type') == 'whole-name' 263 for part in name_elem.findall('name-part')): 264 full_name = ' '.join(name_parts) 265 result['FULL_NAME'].append(full_name) 266 267 # Remove duplicates while preserving order 268 for key in result: 269 seen = set() 270 result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] 271 272 return result 273 274 def extract_birth_info(self, identity_elem: ET.Element) -> Dict[str, List[str]]: 275 """Extract birth date and nationality information.""" 276 result = {} 277 278 # Extract birth dates 279 birth_dates = [] 280 for dmy_elem in identity_elem.findall('day-month-year'): 281 day = dmy_elem.get('day') 282 month = dmy_elem.get('month') 283 year = dmy_elem.get('year') 284 285 date_parts = [] 286 if year: 287 date_parts.append(year) 288 if month: 289 date_parts.append(f"{int(month):02d}") 290 if day: 291 date_parts.append(f"{int(day):02d}") 292 293 if date_parts: 294 # Format as ISO date if complete, otherwise partial 295 if len(date_parts) == 3: 296 birth_dates.append(f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}") 297 else: 298 birth_dates.append('-'.join(date_parts)) 299 300 if birth_dates: 301 result['DATE_OF_BIRTH'] = birth_dates 302 303 # Extract nationalities 304 nationalities = [] 305 for nat_elem in identity_elem.findall('nationality'): 306 country_elem = nat_elem.find('country') 307 if country_elem is not None: 308 country_code = country_elem.get('iso-code') 309 if country_code: 310 nationalities.append(country_code) 311 312 if nationalities: 313 result['NATIONALITY'] = nationalities 314 315 return result 316 317 def extract_addresses(self, identity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 318 """Extract address information from identity element.""" 319 result = {} 320 321 for addr_elem in identity_elem.findall('address'): 322 place_id = addr_elem.get('place-id') 323 324 # Resolve place reference 325 if place_id: 326 place_info = self.resolve_place(place_id, places_lookup) 327 for key, values in place_info.items(): 328 if key not in result: 329 result[key] = [] 330 result[key].extend(values) 331 332 # Extract address details 333 details_elem = addr_elem.find('address-details') 334 if details_elem is not None and details_elem.text: 335 if 'ADDRESS_LINES' not in result: 336 result['ADDRESS_LINES'] = [] 337 result['ADDRESS_LINES'].append(details_elem.text.strip()) 338 339 # Extract zip code 340 zip_elem = addr_elem.find('zip-code') 341 if zip_elem is not None and zip_elem.text: 342 if 'ADDRESS_ZIPCODE' not in result: 343 result['ADDRESS_ZIPCODE'] = [] 344 result['ADDRESS_ZIPCODE'].append(zip_elem.text.strip()) 345 346 # Extract c/o 347 co_elem = addr_elem.find('c-o') 348 if co_elem is not None and co_elem.text: 349 if 'ADDRESS_LINES' not in result: 350 result['ADDRESS_LINES'] = [] 351 result['ADDRESS_LINES'].append(f"c/o {co_elem.text.strip()}") 352 353 # Extract P.O. Box 354 po_elem = addr_elem.find('p-o-box') 355 if po_elem is not None and po_elem.text: 356 if 'ADDRESS_LINES' not in result: 357 result['ADDRESS_LINES'] = [] 358 result['ADDRESS_LINES'].append(f"P.O. Box {po_elem.text.strip()}") 359 360 return result 361 362 def extract_identification_documents(self, identity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 363 """Extract identification document information.""" 364 result = {} 365 366 for doc_elem in identity_elem.findall('identification-document'): 367 doc_type = doc_elem.get('document-type', 'unknown') 368 369 # Extract document number 370 number_elem = doc_elem.find('number') 371 if number_elem is not None and number_elem.text: 372 doc_info = f"{doc_type}: {number_elem.text.strip()}" 373 374 # Add issuer information 375 issuer_elem = doc_elem.find('issuer') 376 if issuer_elem is not None: 377 issuer_code = issuer_elem.get('code') 378 if issuer_code: 379 doc_info += f" (issued by {issuer_code})" 380 381 # Add dates if available 382 issue_date = doc_elem.find('date-of-issue') 383 expiry_date = doc_elem.find('expiry-date') 384 if issue_date is not None and issue_date.text: 385 doc_info += f" issued: {issue_date.text}" 386 if expiry_date is not None and expiry_date.text: 387 doc_info += f" expires: {expiry_date.text}" 388 389 if 'PERSON_NATIONAL_ID' not in result: 390 result['PERSON_NATIONAL_ID'] = [] 391 result['PERSON_NATIONAL_ID'].append(doc_info) 392 393 return result 394 395 def process_individual(self, individual_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 396 """Process an individual target and extract all relevant information.""" 397 result = {} 398 399 # Process all identities 400 for identity_elem in individual_elem.findall('identity'): 401 # Extract names 402 names = self.extract_names(identity_elem) 403 for key, values in names.items(): 404 if key not in result: 405 result[key] = [] 406 result[key].extend(values) 407 408 # Extract birth information 409 birth_info = self.extract_birth_info(identity_elem) 410 for key, values in birth_info.items(): 411 if key not in result: 412 result[key] = [] 413 result[key].extend(values) 414 415 # Extract addresses 416 addresses = self.extract_addresses(identity_elem, places_lookup) 417 for key, values in addresses.items(): 418 if key not in result: 419 result[key] = [] 420 result[key].extend(values) 421 422 # Extract identification documents 423 id_docs = self.extract_identification_documents(identity_elem, places_lookup) 424 for key, values in id_docs.items(): 425 if key not in result: 426 result[key] = [] 427 result[key].extend(values) 428 429 # Remove duplicates 430 for key in result: 431 seen = set() 432 result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] 433 434 return result 435 436 def process_entity(self, entity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 437 """Process an entity target and extract all relevant information.""" 438 result = {} 439 440 # Process all identities 441 for identity_elem in entity_elem.findall('identity'): 442 # Extract entity names 443 names = self.extract_names(identity_elem) 444 # Map entity names to business identifiers 445 if names.get('FULL_NAME'): 446 result['COMPANY_NAME'] = names['FULL_NAME'] 447 result['BUSINESS_DISPLAY_NAME'] = names['FULL_NAME'].copy() 448 449 # Extract addresses (registered office) 450 addresses = self.extract_addresses(identity_elem, places_lookup) 451 # Map to registered office address for entities 452 for key, values in addresses.items(): 453 if 'OFFICE' not in key: 454 new_key = key.replace('ADDRESS_', 'REGISTERED_OFFICE_ADDRESS_') 455 else: 456 new_key = key 457 if new_key not in result: 458 result[new_key] = [] 459 result[new_key].extend(values) 460 461 # Remove duplicates 462 for key in result: 463 if isinstance(result[key], list): 464 seen = set() 465 result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] 466 467 return result 468 469 def process_object(self, object_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: 470 """Process an object target and extract all relevant information.""" 471 result = {} 472 object_type = object_elem.get('object-type', 'unknown') 473 474 # Process all identities 475 for identity_elem in object_elem.findall('identity'): 476 # Extract object names 477 names = self.extract_names(identity_elem) 478 if names.get('FULL_NAME'): 479 # Use a generic name field for objects 480 result['FULL_NAME'] = names['FULL_NAME'] 481 # Add object type information 482 object_names = [f"{name} ({object_type})" for name in names['FULL_NAME']] 483 result['BUSINESS_DISPLAY_NAME'] = object_names 484 485 # Add object type as additional information 486 if 'FULL_NAME' not in result: 487 result['FULL_NAME'] = [f"Unknown {object_type}"] 488 489 return result 490 491 def _is_target_active(self, target: Dict[str, Any]) -> bool: 492 """Check if a target is active (most recent modification is not 'de-listed').""" 493 494 if 'modification' not in target: 495 return True # No modifications, consider active 496 497 modifications = target['modification'] 498 if not isinstance(modifications, list): 499 modifications = [modifications] 500 501 # Find the most recent modification by effective-date, then by enactment-date 502 most_recent = None 503 most_recent_date = None 504 505 for mod in modifications: 506 mod_type = mod.get('modification-type', '') 507 508 # Determine the date to use for comparison 509 date_str = None 510 if 'effective-date' in mod: 511 date_str = mod['effective-date'] 512 elif 'enactment-date' in mod: 513 date_str = mod['enactment-date'] 514 elif 'publication-date' in mod: 515 date_str = mod['publication-date'] 516 517 if date_str: 518 try: 519 mod_date = datetime.strptime(date_str, '%Y-%m-%d') 520 if most_recent_date is None or mod_date > most_recent_date: 521 most_recent_date = mod_date 522 most_recent = mod 523 except ValueError: 524 continue # Skip invalid dates 525 elif most_recent is None: 526 # If no dates available, use the last modification in the list 527 most_recent = mod 528 529 if most_recent is None: 530 return True # No valid modification found, consider active 531 532 return most_recent.get('modification-type') != 'de-listed' 533 534 def process_target(self, target_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Optional[Dict[str, Any]]: 535 """Process a single target element and return JSON representation.""" 536 ssid = target_elem.get('ssid') 537 if not ssid: 538 return None 539 540 # Base target information 541 target_data = { 542 'ssid': ssid, 543 'sanctions_set_ids': [], 544 'foreign_identifier': None, 545 'target_type': None, 546 'justification': [], 547 'relations': [], 548 'other_information': [], 549 'PERSON_NATIONAL_ID': [], 550 'DATE_OF_BIRTH': [], 551 'CONTACT_EMAIL': [], 552 'CONTACT_PHONE': [], 553 'COMMERCIAL_REGISTER_NUMBER': [], 554 'FOUNDING_DATE': [], 555 'generic_attributes': {} 556 } 557 558 # Extract sanctions set IDs 559 for ss_id_elem in target_elem.findall('sanctions-set-id'): 560 if ss_id_elem.text: 561 target_data['sanctions_set_ids'].append(ss_id_elem.text.strip()) 562 563 # Extract foreign identifier 564 foreign_id_elem = target_elem.find('foreign-identifier') 565 if foreign_id_elem is not None and foreign_id_elem.text: 566 target_data['foreign_identifier'] = foreign_id_elem.text.strip() 567 568 # Process target type and extract specific information 569 registry_data = {} 570 571 individual_elem = target_elem.find('individual') 572 entity_elem = target_elem.find('entity') 573 object_elem = target_elem.find('object') 574 575 if individual_elem is not None: 576 target_data['target_type'] = 'individual' 577 target_data['sex'] = individual_elem.get('sex') 578 registry_data = self.process_individual(individual_elem, places_lookup) 579 580 # Extract justifications 581 for just_elem in individual_elem.findall('justification'): 582 if just_elem.text: 583 target_data['justification'].append(just_elem.text.strip()) 584 585 # Extract relations 586 for rel_elem in individual_elem.findall('relation'): 587 relation_info = { 588 'target_id': rel_elem.get('target-id'), 589 'relation_type': rel_elem.get('relation-type'), 590 'remark': None 591 } 592 remark_elem = rel_elem.find('remark') 593 if remark_elem is not None and remark_elem.text: 594 relation_info['remark'] = remark_elem.text.strip() 595 target_data['relations'].append(relation_info) 596 597 # Extract other information 598 for other_elem in individual_elem.findall('other-information'): 599 if other_elem.text: 600 # "other-information" is very messy. We try our best to match 601 # it against various regular expressions and extract bits. 602 oi = other_elem.text.strip() 603 found = False; 604 match = re.search(r'Passport Number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 605 pnum = match.group(1) if match else None 606 if pnum is not None: 607 target_data['PERSON_NATIONAL_ID'].append(pnum) 608 found = True 609 match = re.search(r'([A-Za-z])*\s*national number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 610 pnum = match.group(1) if match else None 611 if pnum is not None: 612 target_data['PERSON_NATIONAL_ID'].append(pnum) 613 found = True 614 match = re.search(r'Personal ID:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 615 pnum = match.group(1) if match else None 616 if pnum is not None: 617 target_data['PERSON_NATIONAL_ID'].append(pnum) 618 found = True 619 match = re.search(r'National ID:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 620 pnum = match.group(1) if match else None 621 if pnum is not None: 622 target_data['PERSON_NATIONAL_ID'].append(pnum) 623 found = True 624 match = re.search(r'National ID\.:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 625 pnum = match.group(1) if match else None 626 if pnum is not None: 627 target_data['PERSON_NATIONAL_ID'].append(pnum) 628 found = True 629 match = re.search(r'National identification number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 630 pnum = match.group(1) if match else None 631 if pnum is not None: 632 target_data['PERSON_NATIONAL_ID'].append(pnum) 633 found = True 634 match = re.search(r'National identification no:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 635 pnum = match.group(1) if match else None 636 if pnum is not None: 637 target_data['PERSON_NATIONAL_ID'].append(pnum) 638 found = True 639 match = re.search(r'Personal identification:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 640 pnum = match.group(1) if match else None 641 if pnum is not None: 642 target_data['PERSON_NATIONAL_ID'].append(pnum) 643 found = True 644 match = re.search(r'Passport:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 645 pnum = match.group(1) if match else None 646 if pnum is not None: 647 target_data['PERSON_NATIONAL_ID'].append(pnum) 648 found = True 649 match = re.search(r'Passport\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 650 pnum = match.group(1) if match else None 651 if pnum is not None: 652 target_data['PERSON_NATIONAL_ID'].append(pnum) 653 found = True 654 match = re.search(r'ID Card Number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 655 pnum = match.group(1) if match else None 656 if pnum is not None: 657 target_data['PERSON_NATIONAL_ID'].append(pnum) 658 found = True 659 match = re.search(r'Passport or ID number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 660 pnum = match.group(1) if match else None 661 if pnum is not None: 662 target_data['PERSON_NATIONAL_ID'].append(pnum) 663 found = True 664 match = re.search(r'National ID:\s*([A-Za-z0-9]+)\s*;\s*Passport:\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) 665 nnum = match.group(1) if match else None 666 if nnum is not None: 667 target_data['PERSON_NATIONAL_ID'].append(nnum) 668 found = True 669 pnum = match.group(2) if match else None 670 if pnum is not None: 671 target_data['PERSON_NATIONAL_ID'].append(pnum) 672 found = True 673 match = re.search(r'State Identification Number\s*([A-Za-z()]*)\s*:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) 674 pnum = match.group(2) if match else None 675 if pnum is not None: 676 target_data['PERSON_NATIONAL_ID'].append(pnum) 677 found = True 678 match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 679 pnum = match.group(1) if match else None 680 if pnum is not None: 681 target_data['CONTACT_EMAIL'].append(pnum) 682 found = True 683 match = re.search(r'email:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 684 pnum = match.group(1) if match else None 685 if pnum is not None: 686 target_data['CONTACT_EMAIL'].append(pnum) 687 found = True 688 match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 689 pnum = match.group(1) if match else None 690 if pnum is not None: 691 target_data['CONTACT_EMAIL'].append(pnum) 692 found = True 693 match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 694 pnum = match.group(1) if match else None 695 if pnum is not None: 696 target_data['CONTACT_EMAIL'].append(pnum) 697 found = True 698 match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 699 pnum = match.group(1) if match else None 700 if pnum is not None: 701 target_data['CONTACT_PHONE'].append(pnum) 702 found = True 703 match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 704 pnum = match.group(1) if match else None 705 if pnum is not None: 706 target_data['CONTACT_PHONE'].append(pnum) 707 found = True 708 match = re.search(r'Tel. \(office\):\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 709 pnum = match.group(1) if match else None 710 if pnum is not None: 711 target_data['CONTACT_PHONE'].append(pnum) 712 found = True 713 match = re.search(r'DOB:\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE) 714 pnum = match.group(1) if match else None 715 if pnum is not None: 716 target_data['DATE_OF_BIRTH'].append(pnum) 717 found = True 718 match = re.search(r'Date range: DOB between\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE) 719 pnum = match.group(1) if match else None 720 if pnum is not None: 721 target_data['DATE_OF_BIRTH'].append(pnum) 722 found = True 723 if not found: 724 target_data['other_information'].append(oi) 725 726 elif entity_elem is not None: 727 target_data['target_type'] = 'entity' 728 registry_data = self.process_entity(entity_elem, places_lookup) 729 730 # Extract justifications, relations, other info (same structure as individual) 731 for just_elem in entity_elem.findall('justification'): 732 if just_elem.text: 733 target_data['justification'].append(just_elem.text.strip()) 734 735 for rel_elem in entity_elem.findall('relation'): 736 relation_info = { 737 'target_id': rel_elem.get('target-id'), 738 'relation_type': rel_elem.get('relation-type'), 739 'remark': None 740 } 741 remark_elem = rel_elem.find('remark') 742 if remark_elem is not None and remark_elem.text: 743 relation_info['remark'] = remark_elem.text.strip() 744 target_data['relations'].append(relation_info) 745 746 for other_elem in entity_elem.findall('other-information'): 747 if other_elem.text: 748 # "other-information" is very messy. We try our best to match 749 # it against various regular expressions and extract bits. 750 oi = other_elem.text.strip() 751 found = False; 752 match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 753 pnum = match.group(1) if match else None 754 if pnum is not None: 755 target_data['CONTACT_PHONE'].append(pnum) 756 found = True 757 match = re.search(r'Company phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 758 pnum = match.group(1) if match else None 759 if pnum is not None: 760 target_data['CONTACT_PHONE'].append(pnum) 761 found = True 762 match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) 763 pnum = match.group(1) if match else None 764 if pnum is not None: 765 target_data['CONTACT_PHONE'].append(pnum) 766 found = True 767 match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 768 pnum = match.group(1) if match else None 769 if pnum is not None: 770 target_data['CONTACT_EMAIL'].append(pnum) 771 found = True 772 match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 773 pnum = match.group(1) if match else None 774 if pnum is not None: 775 target_data['CONTACT_EMAIL'].append(pnum) 776 found = True 777 match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 778 pnum = match.group(1) if match else None 779 if pnum is not None: 780 target_data['CONTACT_EMAIL'].append(pnum) 781 found = True 782 match = re.search(r'company email:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) 783 pnum = match.group(1) if match else None 784 if pnum is not None: 785 target_data['CONTACT_EMAIL'].append(pnum) 786 found = True 787 match = re.search(r'Date of registration:\s*([A-Za-z0-9\/\.]+)', oi, re.IGNORECASE) 788 pnum = match.group(1) if match else None 789 if pnum is not None: 790 target_data['FOUNDING_DATE'].append(pnum) 791 found = True 792 match = re.search(r'([A-Za-z]*)\s*Number([A-Za-z()]*)\s:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) 793 pnum = match.group(1) if match else None 794 if pnum is not None: 795 target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) 796 found = True 797 match = re.search(r'Registration no:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) 798 pnum = match.group(1) if match else None 799 if pnum is not None: 800 target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) 801 found = True 802 match = re.search(r'Registration Number:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) 803 pnum = match.group(1) if match else None 804 if pnum is not None: 805 target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) 806 found = True 807 if not found: 808 target_data['other_information'].append(oi) 809 810 elif object_elem is not None: 811 target_data['target_type'] = 'other' 812 target_data['object_type'] = object_elem.get('object-type') 813 registry_data = self.process_object(object_elem, places_lookup) 814 815 # Extract justifications, relations, other info (same structure) 816 for just_elem in object_elem.findall('justification'): 817 if just_elem.text: 818 target_data['justification'].append(just_elem.text.strip()) 819 820 for rel_elem in object_elem.findall('relation'): 821 relation_info = { 822 'target_id': rel_elem.get('target-id'), 823 'relation_type': rel_elem.get('relation-type'), 824 'remark': None 825 } 826 remark_elem = rel_elem.find('remark') 827 if remark_elem is not None and remark_elem.text: 828 relation_info['remark'] = remark_elem.text.strip() 829 target_data['relations'].append(relation_info) 830 831 for other_elem in object_elem.findall('other-information'): 832 if other_elem.text: 833 # "other-information" is very messy. We try our best to match 834 # it against various regular expressions and extract bits. 835 oi = other_elem.text.strip() 836 found = False 837 match = re.search(r'Registration no\.:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) 838 pnum = match.group(1) if match else None 839 if pnum is not None: 840 target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) 841 found = True 842 if not found: 843 target_data['other_information'].append(oi) 844 845 # Extract generic attributes 846 for attr_elem in target_elem.findall('generic-attribute'): 847 attr_name = attr_elem.get('name') 848 if attr_name and attr_elem.text: 849 target_data['generic_attributes'][attr_name] = attr_elem.text.strip() 850 851 # Merge registry data into target data 852 target_data.update(registry_data) 853 854 return target_data 855 856 def convert_xml_to_json(self, xml_file: str, active_only: bool = False) -> Dict[str, Any]: 857 """Convert Swiss sanctions XML file to JSON format.""" 858 root = self.parse_xml(xml_file) 859 860 # Build place lookup 861 places_lookup = self.build_place_lookup(root) 862 863 # Extract metadata 864 metadata = { 865 'list_type': root.get('list-type'), 866 'date': root.get('date'), 867 'conversion_timestamp': datetime.now().isoformat(), 868 'total_targets': 0, 869 'total_places': len(places_lookup) 870 } 871 872 # Process sanctions programs 873 programs = [] 874 for program_elem in root.findall('sanctions-program'): 875 program_data = { 876 'ssid': program_elem.get('ssid'), 877 'version_date': program_elem.get('version-date'), 878 'predecessor_version_date': program_elem.get('predecessor-version-date'), 879 'program_keys': {}, 880 'program_names': {}, 881 'sanctions_sets': {}, 882 'origin': None 883 } 884 885 # Extract program keys 886 for key_elem in program_elem.findall('program-key'): 887 lang = key_elem.get('lang') 888 if lang and key_elem.text: 889 program_data['program_keys'][lang] = key_elem.text.strip() 890 891 # Extract program names 892 for name_elem in program_elem.findall('program-name'): 893 lang = name_elem.get('lang') 894 if lang and name_elem.text: 895 program_data['program_names'][lang] = name_elem.text.strip() 896 897 # Extract sanctions sets 898 for set_elem in program_elem.findall('sanctions-set'): 899 lang = set_elem.get('lang') 900 ssid = set_elem.get('ssid') 901 if lang and ssid and set_elem.text: 902 if ssid not in program_data['sanctions_sets']: 903 program_data['sanctions_sets'][ssid] = {} 904 program_data['sanctions_sets'][ssid][lang] = set_elem.text.strip() 905 906 # Extract origin 907 origin_elem = program_elem.find('origin') 908 if origin_elem is not None and origin_elem.text: 909 program_data['origin'] = origin_elem.text.strip() 910 911 programs.append(program_data) 912 913 # Process targets 914 targets = [] 915 916 # Filter targets if active_only is requested 917 if active_only and 'target' in root: 918 print(f"Filtering for active targets", file=sys.stderr) 919 targets = root['target'] if isinstance(root['target'], list) else [root['target']] 920 active_targets = [target for target in targets if self._is_target_active(target)] 921 922 if active_targets: 923 root['target'] = active_targets if len(active_targets) > 1 else active_targets[0] 924 else: 925 # Remove targets key if no active targets 926 del root['target'] 927 928 for target_elem in root.findall('target'): 929 # The "_is_target_active" logic expects JSON, convert first 930 data = self._parse_element (target_elem) 931 if self._is_target_active(data) or not active_only: 932 target_data = self.process_target(target_elem, places_lookup) 933 else: 934 target_data = None 935 if target_data: 936 targets.append(target_data) 937 938 metadata['total_targets'] = len(targets) 939 940 # Build final JSON structure 941 result = { 942 'metadata': metadata, 943 'sanctions_programs': programs, 944 'targets': targets, 945 'places': places_lookup 946 } 947 948 return result 949 950 def main(): 951 """Main entry point for the converter.""" 952 parser = argparse.ArgumentParser( 953 description='Convert Swiss sanction list from XML to JSON format', 954 formatter_class=argparse.RawDescriptionHelpFormatter, 955 epilog=""" 956 Examples: 957 robocop-ch-to-json --active < sanctions.xml > sanctions.json 958 robocop-ch-to-json --input sanctions.xml --output sanctions.json 959 """ 960 ) 961 962 parser.add_argument('--active', action='store_true', 963 help='Only include active targets (exclude de-listed)') 964 parser.add_argument('--input', help='Input XML file path') 965 parser.add_argument('--output', '-o', help='Output JSON file path (default: stdout)') 966 parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') 967 parser.add_argument('--indent', type=int, default=2, help='JSON indentation level (default: 2)') 968 969 args = parser.parse_args() 970 971 try: 972 converter = SwissSanctionsConverter() 973 974 # Convert XML to JSON 975 json_data = converter.convert_xml_to_json(args.input, args.active) 976 977 # Save JSON file 978 json_result = json_data['targets'] 979 980 # Output to file or stdout 981 try: 982 if args.output: 983 with open(args.output, 'w', encoding='utf-8') as f: 984 json.dump(json_result, f, indent=args.indent, ensure_ascii=False) 985 print(f"Successfully converted XML to JSON: {args.output}", file=sys.stderr) 986 else: 987 json.dump(json_result, sys.stdout, indent=args.indent, ensure_ascii=False) 988 except IOError as e: 989 raise IOError(f"Failed to write JSON output: {e}") 990 991 if args.verbose: 992 print(f"Conversion completed successfully!", file=sys.stderr) 993 print(f"Total targets: {json_data['metadata']['total_targets']}", file=sys.stderr) 994 print(f"Total places: {json_data['metadata']['total_places']}", file=sys.stderr) 995 print(f"Total programs: {len(json_data['sanctions_programs'])}", file=sys.stderr) 996 997 except Exception as e: 998 print(f"Error: {e}", file=sys.stderr) 999 sys.exit(1) 1000 1001 1002 if __name__ == '__main__': 1003 main()