robocop

Checks KYC attributes against sanction lists
Log | Files | Refs | Submodules | README | LICENSE

commit fb27b3edb21d058307ce9031670056ac29e6e698
parent db7110fec5bcfb2e279c346e111b2b20601ea67a
Author: Christian Grothoff <christian@grothoff.org>
Date:   Sat,  7 Jun 2025 23:30:28 +0200

add script to convert XML input into saner JSON

Diffstat:
Arobocop-ch-to-json | 1003+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Arobocop-json-postprocess | 4++++
2 files changed, 1007 insertions(+), 0 deletions(-)

diff --git a/robocop-ch-to-json b/robocop-ch-to-json @@ -0,0 +1,1003 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# robocop-ch-to-json +# +# Copyright (C) 2025 Taler Systems SA +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. +""" +Swiss Sanctions XML to JSON Converter + +This program converts Swiss sanctions XML files (following the swiss-sanctions-list XSD) +to JSON format, mapping XML elements to registry identifiers and inlining all references. + +Features: +- Converts targets (individuals, entities, objects) to self-contained JSON records +- Maps XML elements to registry identifiers from GANA registry +- Inlines place references (location, area, country) +- Flattens name parts and includes spelling variants +- Handles multiple identities and addresses +- Preserves all identification documents and relationships + +Usage: + robocop-ch-to-json < input.xml > output.json +""" + +import xml.etree.ElementTree as ET +import json +import sys +from datetime import datetime +from typing import Dict, List, Any, Optional +import argparse +import re + +class SwissSanctionsConverter: + """Converts Swiss sanctions XML to JSON format with registry mapping.""" + + def __init__(self): + # Registry mapping from XML elements to standardized identifiers + self.registry_mapping = { + # Personal information + 'given-name': 'PERSON_FIRST_NAMES', + 'further-given-name': 'PERSON_FIRST_NAMES', + 'family-name': 'PERSON_LAST_NAME', + 'maiden-name': 'PERSON_LAST_NAME', + 'whole-name': 'FULL_NAME', + 'day-month-year': 'DATE_OF_BIRTH', + 'nationality': 'NATIONALITY', + 'identification-document': 'PERSON_NATIONAL_ID', + + # Business information + 'entity_name': 'COMPANY_NAME', + 'business_name': 'BUSINESS_DISPLAY_NAME', + + # Address information + 'address-details': 'ADDRESS_LINES', + 'zip-code': 'ADDRESS_ZIPCODE', + 'c-o': 'ADDRESS_LINES', + 'p-o-box': 'ADDRESS_LINES', + + # Contact information + 'contact-person': 'CONTACT_PERSON_NAME', + } + + def parse_xml(self, xml_file: str) -> ET.Element: + """Parse the XML file and return the root element.""" + try: + if xml_file: + tree = ET.parse(xml_file) + else: + tree = ET.parse(sys.stdin) + return tree.getroot() + except ET.ParseError as e: + raise ValueError(f"Invalid XML file: {e}") + except FileNotFoundError: + raise FileNotFoundError(f"XML file not found: {xml_file}") + + def _parse_element(self, element: ET.Element) -> Dict[str, Any]: + """Recursively parse XML element to dictionary.""" + result = {} + + # Add attributes + if element.attrib: + result.update(element.attrib) + # Convert numeric attributes to integers where appropriate + for key, value in element.attrib.items(): + if key in ['ssid', 'day', 'month', 'year', 'place-id', 'target-id', 'order']: + try: + result[key] = int(value) + except ValueError: + pass # Keep as string if conversion fails + elif key in ['main', 'current']: + result[key] = value.lower() == 'true' + + # Handle text content + if element.text and element.text.strip(): + if len(element) == 0: # Leaf node with text only + return element.text.strip() + else: # Mixed content + result['_text'] = element.text.strip() + + # Process child elements + children_by_tag = {} + for child in element: + tag = child.tag + child_data = self._parse_element(child) + + if tag not in children_by_tag: + children_by_tag[tag] = [] + children_by_tag[tag].append(child_data) + + # Add children to result + for tag, children in children_by_tag.items(): + if len(children) == 1: + result[tag] = children[0] + else: + result[tag] = children + + return result + + def build_place_lookup(self, root: ET.Element) -> Dict[str, Dict[str, Any]]: + """Build a lookup dictionary for place references.""" + places = {} + + for place_elem in root.findall('place'): + ssid = place_elem.get('ssid') + if ssid: + place_data = { + 'location': None, + 'location_variants': [], + 'area': None, + 'area_variants': [], + 'country': None, + 'country_code': None + } + + # Extract location + location_elem = place_elem.find('location') + if location_elem is not None and location_elem.text: + place_data['location'] = location_elem.text.strip() + + # Extract location variants + for variant in place_elem.findall('location-variant'): + if variant.text: + place_data['location_variants'].append({ + 'value': variant.text.strip(), + 'type': variant.get('variant-type', 'unknown') + }) + + # Extract area + area_elem = place_elem.find('area') + if area_elem is not None and area_elem.text: + place_data['area'] = area_elem.text.strip() + + # Extract area variants + for variant in place_elem.findall('area-variant'): + if variant.text: + place_data['area_variants'].append({ + 'value': variant.text.strip(), + 'type': variant.get('variant-type', 'unknown') + }) + + # Extract country + country_elem = place_elem.find('country') + if country_elem is not None: + place_data['country'] = country_elem.text.strip() if country_elem.text else None + place_data['country_code'] = country_elem.get('iso-code') + + places[ssid] = place_data + + return places + + def resolve_place(self, place_id: str, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Resolve a place reference and return flattened address components.""" + if place_id not in places_lookup: + return {} + + place = places_lookup[place_id] + result = {} + + # Add country information + if place['country_code']: + result['ADDRESS_COUNTRY'] = [place['country_code']] + + # Add location (town/city) + locations = [] + if place['location']: + locations.append(place['location']) + for variant in place['location_variants']: + locations.append(variant['value']) + if locations: + result['ADDRESS_TOWN_LOCATION'] = locations + + # Add area (district/subdivision) + areas = [] + if place['area']: + areas.append(place['area']) + for variant in place['area_variants']: + areas.append(variant['value']) + if areas: + result['ADDRESS_COUNTRY_SUBDIVISION'] = areas + + return result + + def extract_names(self, identity_elem: ET.Element) -> Dict[str, List[str]]: + """Extract and flatten name information from an identity element.""" + result = { + 'PERSON_FIRST_NAMES': [], + 'PERSON_LAST_NAME': [], + 'FULL_NAME': [] + } + + for name_elem in identity_elem.findall('name'): + # Process name parts + name_parts = [] + first_names = [] + last_names = [] + + for name_part in name_elem.findall('name-part'): + part_type = name_part.get('name-part-type', '') + value_elem = name_part.find('value') + + if value_elem is not None and value_elem.text: + value = value_elem.text.strip() + name_parts.append(value) + + # Categorize name parts + if part_type in ['given-name', 'further-given-name']: + first_names.append(value) + elif part_type in ['family-name', 'maiden-name']: + last_names.append(value) + elif part_type == 'whole-name': + result['FULL_NAME'].append(value) + + # Add spelling variants + for variant in name_part.findall('spelling-variant'): + if variant.text: + variant_value = variant.text.strip() + if part_type in ['given-name', 'further-given-name']: + first_names.append(variant_value) + elif part_type in ['family-name', 'maiden-name']: + last_names.append(variant_value) + elif part_type == 'whole-name': + result['FULL_NAME'].append(variant_value) + + # Add categorized names + result['PERSON_FIRST_NAMES'].extend(first_names) + result['PERSON_LAST_NAME'].extend(last_names) + + # If we have separate parts but no whole name, combine them + if name_parts and not any(part.get('name-part-type') == 'whole-name' + for part in name_elem.findall('name-part')): + full_name = ' '.join(name_parts) + result['FULL_NAME'].append(full_name) + + # Remove duplicates while preserving order + for key in result: + seen = set() + result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] + + return result + + def extract_birth_info(self, identity_elem: ET.Element) -> Dict[str, List[str]]: + """Extract birth date and nationality information.""" + result = {} + + # Extract birth dates + birth_dates = [] + for dmy_elem in identity_elem.findall('day-month-year'): + day = dmy_elem.get('day') + month = dmy_elem.get('month') + year = dmy_elem.get('year') + + date_parts = [] + if year: + date_parts.append(year) + if month: + date_parts.append(f"{int(month):02d}") + if day: + date_parts.append(f"{int(day):02d}") + + if date_parts: + # Format as ISO date if complete, otherwise partial + if len(date_parts) == 3: + birth_dates.append(f"{date_parts[0]}-{date_parts[1]}-{date_parts[2]}") + else: + birth_dates.append('-'.join(date_parts)) + + if birth_dates: + result['DATE_OF_BIRTH'] = birth_dates + + # Extract nationalities + nationalities = [] + for nat_elem in identity_elem.findall('nationality'): + country_elem = nat_elem.find('country') + if country_elem is not None: + country_code = country_elem.get('iso-code') + if country_code: + nationalities.append(country_code) + + if nationalities: + result['NATIONALITY'] = nationalities + + return result + + def extract_addresses(self, identity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Extract address information from identity element.""" + result = {} + + for addr_elem in identity_elem.findall('address'): + place_id = addr_elem.get('place-id') + + # Resolve place reference + if place_id: + place_info = self.resolve_place(place_id, places_lookup) + for key, values in place_info.items(): + if key not in result: + result[key] = [] + result[key].extend(values) + + # Extract address details + details_elem = addr_elem.find('address-details') + if details_elem is not None and details_elem.text: + if 'ADDRESS_LINES' not in result: + result['ADDRESS_LINES'] = [] + result['ADDRESS_LINES'].append(details_elem.text.strip()) + + # Extract zip code + zip_elem = addr_elem.find('zip-code') + if zip_elem is not None and zip_elem.text: + if 'ADDRESS_ZIPCODE' not in result: + result['ADDRESS_ZIPCODE'] = [] + result['ADDRESS_ZIPCODE'].append(zip_elem.text.strip()) + + # Extract c/o + co_elem = addr_elem.find('c-o') + if co_elem is not None and co_elem.text: + if 'ADDRESS_LINES' not in result: + result['ADDRESS_LINES'] = [] + result['ADDRESS_LINES'].append(f"c/o {co_elem.text.strip()}") + + # Extract P.O. Box + po_elem = addr_elem.find('p-o-box') + if po_elem is not None and po_elem.text: + if 'ADDRESS_LINES' not in result: + result['ADDRESS_LINES'] = [] + result['ADDRESS_LINES'].append(f"P.O. Box {po_elem.text.strip()}") + + return result + + def extract_identification_documents(self, identity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Extract identification document information.""" + result = {} + + for doc_elem in identity_elem.findall('identification-document'): + doc_type = doc_elem.get('document-type', 'unknown') + + # Extract document number + number_elem = doc_elem.find('number') + if number_elem is not None and number_elem.text: + doc_info = f"{doc_type}: {number_elem.text.strip()}" + + # Add issuer information + issuer_elem = doc_elem.find('issuer') + if issuer_elem is not None: + issuer_code = issuer_elem.get('code') + if issuer_code: + doc_info += f" (issued by {issuer_code})" + + # Add dates if available + issue_date = doc_elem.find('date-of-issue') + expiry_date = doc_elem.find('expiry-date') + if issue_date is not None and issue_date.text: + doc_info += f" issued: {issue_date.text}" + if expiry_date is not None and expiry_date.text: + doc_info += f" expires: {expiry_date.text}" + + if 'PERSON_NATIONAL_ID' not in result: + result['PERSON_NATIONAL_ID'] = [] + result['PERSON_NATIONAL_ID'].append(doc_info) + + return result + + def process_individual(self, individual_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Process an individual target and extract all relevant information.""" + result = {} + + # Process all identities + for identity_elem in individual_elem.findall('identity'): + # Extract names + names = self.extract_names(identity_elem) + for key, values in names.items(): + if key not in result: + result[key] = [] + result[key].extend(values) + + # Extract birth information + birth_info = self.extract_birth_info(identity_elem) + for key, values in birth_info.items(): + if key not in result: + result[key] = [] + result[key].extend(values) + + # Extract addresses + addresses = self.extract_addresses(identity_elem, places_lookup) + for key, values in addresses.items(): + if key not in result: + result[key] = [] + result[key].extend(values) + + # Extract identification documents + id_docs = self.extract_identification_documents(identity_elem, places_lookup) + for key, values in id_docs.items(): + if key not in result: + result[key] = [] + result[key].extend(values) + + # Remove duplicates + for key in result: + seen = set() + result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] + + return result + + def process_entity(self, entity_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Process an entity target and extract all relevant information.""" + result = {} + + # Process all identities + for identity_elem in entity_elem.findall('identity'): + # Extract entity names + names = self.extract_names(identity_elem) + # Map entity names to business identifiers + if names.get('FULL_NAME'): + result['COMPANY_NAME'] = names['FULL_NAME'] + result['BUSINESS_DISPLAY_NAME'] = names['FULL_NAME'].copy() + + # Extract addresses (registered office) + addresses = self.extract_addresses(identity_elem, places_lookup) + # Map to registered office address for entities + for key, values in addresses.items(): + if 'OFFICE' not in key: + new_key = key.replace('ADDRESS_', 'REGISTERED_OFFICE_ADDRESS_') + else: + new_key = key + if new_key not in result: + result[new_key] = [] + result[new_key].extend(values) + + # Remove duplicates + for key in result: + if isinstance(result[key], list): + seen = set() + result[key] = [x for x in result[key] if not (x in seen or seen.add(x))] + + return result + + def process_object(self, object_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Dict[str, List[str]]: + """Process an object target and extract all relevant information.""" + result = {} + object_type = object_elem.get('object-type', 'unknown') + + # Process all identities + for identity_elem in object_elem.findall('identity'): + # Extract object names + names = self.extract_names(identity_elem) + if names.get('FULL_NAME'): + # Use a generic name field for objects + result['FULL_NAME'] = names['FULL_NAME'] + # Add object type information + object_names = [f"{name} ({object_type})" for name in names['FULL_NAME']] + result['BUSINESS_DISPLAY_NAME'] = object_names + + # Add object type as additional information + if 'FULL_NAME' not in result: + result['FULL_NAME'] = [f"Unknown {object_type}"] + + return result + + def _is_target_active(self, target: Dict[str, Any]) -> bool: + """Check if a target is active (most recent modification is not 'de-listed').""" + + if 'modification' not in target: + return True # No modifications, consider active + + modifications = target['modification'] + if not isinstance(modifications, list): + modifications = [modifications] + + # Find the most recent modification by effective-date, then by enactment-date + most_recent = None + most_recent_date = None + + for mod in modifications: + mod_type = mod.get('modification-type', '') + + # Determine the date to use for comparison + date_str = None + if 'effective-date' in mod: + date_str = mod['effective-date'] + elif 'enactment-date' in mod: + date_str = mod['enactment-date'] + elif 'publication-date' in mod: + date_str = mod['publication-date'] + + if date_str: + try: + mod_date = datetime.strptime(date_str, '%Y-%m-%d') + if most_recent_date is None or mod_date > most_recent_date: + most_recent_date = mod_date + most_recent = mod + except ValueError: + continue # Skip invalid dates + elif most_recent is None: + # If no dates available, use the last modification in the list + most_recent = mod + + if most_recent is None: + return True # No valid modification found, consider active + + return most_recent.get('modification-type') != 'de-listed' + + def process_target(self, target_elem: ET.Element, places_lookup: Dict[str, Dict]) -> Optional[Dict[str, Any]]: + """Process a single target element and return JSON representation.""" + ssid = target_elem.get('ssid') + if not ssid: + return None + + # Base target information + target_data = { + 'ssid': ssid, + 'sanctions_set_ids': [], + 'foreign_identifier': None, + 'target_type': None, + 'justification': [], + 'relations': [], + 'other_information': [], + 'PERSON_NATIONAL_ID': [], + 'DATE_OF_BIRTH': [], + 'CONTACT_EMAIL': [], + 'CONTACT_PHONE': [], + 'COMMERCIAL_REGISTER_NUMBER': [], + 'FOUNDING_DATE': [], + 'generic_attributes': {} + } + + # Extract sanctions set IDs + for ss_id_elem in target_elem.findall('sanctions-set-id'): + if ss_id_elem.text: + target_data['sanctions_set_ids'].append(ss_id_elem.text.strip()) + + # Extract foreign identifier + foreign_id_elem = target_elem.find('foreign-identifier') + if foreign_id_elem is not None and foreign_id_elem.text: + target_data['foreign_identifier'] = foreign_id_elem.text.strip() + + # Process target type and extract specific information + registry_data = {} + + individual_elem = target_elem.find('individual') + entity_elem = target_elem.find('entity') + object_elem = target_elem.find('object') + + if individual_elem is not None: + target_data['target_type'] = 'individual' + target_data['sex'] = individual_elem.get('sex') + registry_data = self.process_individual(individual_elem, places_lookup) + + # Extract justifications + for just_elem in individual_elem.findall('justification'): + if just_elem.text: + target_data['justification'].append(just_elem.text.strip()) + + # Extract relations + for rel_elem in individual_elem.findall('relation'): + relation_info = { + 'target_id': rel_elem.get('target-id'), + 'relation_type': rel_elem.get('relation-type'), + 'remark': None + } + remark_elem = rel_elem.find('remark') + if remark_elem is not None and remark_elem.text: + relation_info['remark'] = remark_elem.text.strip() + target_data['relations'].append(relation_info) + + # Extract other information + for other_elem in individual_elem.findall('other-information'): + if other_elem.text: + # "other-information" is very messy. We try our best to match + # it against various regular expressions and extract bits. + oi = other_elem.text.strip() + found = False; + match = re.search(r'Passport Number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'([A-Za-z])*\s*national number:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'Personal ID:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'National ID:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'National ID\.:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'National identification number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'National identification no:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'Personal identification:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'Passport:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'Passport\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'ID Card Number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'Passport or ID number:\s*([A-Za-z0-9]+)\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'National ID:\s*([A-Za-z0-9]+)\s*;\s*Passport:\s*([A-Za-z0-9()]+)', oi, re.IGNORECASE) + nnum = match.group(1) if match else None + if nnum is not None: + target_data['PERSON_NATIONAL_ID'].append(nnum) + found = True + pnum = match.group(2) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'State Identification Number\s*([A-Za-z()]*)\s*:\s*([A-Za-z0-9]+)', oi, re.IGNORECASE) + pnum = match.group(2) if match else None + if pnum is not None: + target_data['PERSON_NATIONAL_ID'].append(pnum) + found = True + match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'email:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'Tel. \(office\):\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'DOB:\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['DATE_OF_BIRTH'].append(pnum) + found = True + match = re.search(r'Date range: DOB between\s*([A-Za-z0-9:\. -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['DATE_OF_BIRTH'].append(pnum) + found = True + if not found: + target_data['other_information'].append(oi) + + elif entity_elem is not None: + target_data['target_type'] = 'entity' + registry_data = self.process_entity(entity_elem, places_lookup) + + # Extract justifications, relations, other info (same structure as individual) + for just_elem in entity_elem.findall('justification'): + if just_elem.text: + target_data['justification'].append(just_elem.text.strip()) + + for rel_elem in entity_elem.findall('relation'): + relation_info = { + 'target_id': rel_elem.get('target-id'), + 'relation_type': rel_elem.get('relation-type'), + 'remark': None + } + remark_elem = rel_elem.find('remark') + if remark_elem is not None and remark_elem.text: + relation_info['remark'] = remark_elem.text.strip() + target_data['relations'].append(relation_info) + + for other_elem in entity_elem.findall('other-information'): + if other_elem.text: + # "other-information" is very messy. We try our best to match + # it against various regular expressions and extract bits. + oi = other_elem.text.strip() + found = False; + match = re.search(r'Tel.:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'Company phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'Phone:\s*([A-Za-z0-9() +-]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_PHONE'].append(pnum) + found = True + match = re.search(r'e-mail:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'e-mail address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'email address:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'company email:\s*([A-Za-z0-9@]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['CONTACT_EMAIL'].append(pnum) + found = True + match = re.search(r'Date of registration:\s*([A-Za-z0-9\/\.]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['FOUNDING_DATE'].append(pnum) + found = True + match = re.search(r'([A-Za-z]*)\s*Number([A-Za-z()]*)\s:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) + found = True + match = re.search(r'Registration no:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) + found = True + match = re.search(r'Registration Number:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) + found = True + if not found: + target_data['other_information'].append(oi) + + elif object_elem is not None: + target_data['target_type'] = 'other' + target_data['object_type'] = object_elem.get('object-type') + registry_data = self.process_object(object_elem, places_lookup) + + # Extract justifications, relations, other info (same structure) + for just_elem in object_elem.findall('justification'): + if just_elem.text: + target_data['justification'].append(just_elem.text.strip()) + + for rel_elem in object_elem.findall('relation'): + relation_info = { + 'target_id': rel_elem.get('target-id'), + 'relation_type': rel_elem.get('relation-type'), + 'remark': None + } + remark_elem = rel_elem.find('remark') + if remark_elem is not None and remark_elem.text: + relation_info['remark'] = remark_elem.text.strip() + target_data['relations'].append(relation_info) + + for other_elem in object_elem.findall('other-information'): + if other_elem.text: + # "other-information" is very messy. We try our best to match + # it against various regular expressions and extract bits. + oi = other_elem.text.strip() + found = False + match = re.search(r'Registration no\.:\s*([A-Za-z0-9 -]+)', oi, re.IGNORECASE) + pnum = match.group(1) if match else None + if pnum is not None: + target_data['COMMERCIAL_REGISTER_NUMBER'].append(pnum) + found = True + if not found: + target_data['other_information'].append(oi) + + # Extract generic attributes + for attr_elem in target_elem.findall('generic-attribute'): + attr_name = attr_elem.get('name') + if attr_name and attr_elem.text: + target_data['generic_attributes'][attr_name] = attr_elem.text.strip() + + # Merge registry data into target data + target_data.update(registry_data) + + return target_data + + def convert_xml_to_json(self, xml_file: str, active_only: bool = False) -> Dict[str, Any]: + """Convert Swiss sanctions XML file to JSON format.""" + root = self.parse_xml(xml_file) + + # Build place lookup + places_lookup = self.build_place_lookup(root) + + # Extract metadata + metadata = { + 'list_type': root.get('list-type'), + 'date': root.get('date'), + 'conversion_timestamp': datetime.now().isoformat(), + 'total_targets': 0, + 'total_places': len(places_lookup) + } + + # Process sanctions programs + programs = [] + for program_elem in root.findall('sanctions-program'): + program_data = { + 'ssid': program_elem.get('ssid'), + 'version_date': program_elem.get('version-date'), + 'predecessor_version_date': program_elem.get('predecessor-version-date'), + 'program_keys': {}, + 'program_names': {}, + 'sanctions_sets': {}, + 'origin': None + } + + # Extract program keys + for key_elem in program_elem.findall('program-key'): + lang = key_elem.get('lang') + if lang and key_elem.text: + program_data['program_keys'][lang] = key_elem.text.strip() + + # Extract program names + for name_elem in program_elem.findall('program-name'): + lang = name_elem.get('lang') + if lang and name_elem.text: + program_data['program_names'][lang] = name_elem.text.strip() + + # Extract sanctions sets + for set_elem in program_elem.findall('sanctions-set'): + lang = set_elem.get('lang') + ssid = set_elem.get('ssid') + if lang and ssid and set_elem.text: + if ssid not in program_data['sanctions_sets']: + program_data['sanctions_sets'][ssid] = {} + program_data['sanctions_sets'][ssid][lang] = set_elem.text.strip() + + # Extract origin + origin_elem = program_elem.find('origin') + if origin_elem is not None and origin_elem.text: + program_data['origin'] = origin_elem.text.strip() + + programs.append(program_data) + + # Process targets + targets = [] + + # Filter targets if active_only is requested + if active_only and 'target' in root: + print(f"Filtering for active targets", file=sys.stderr) + targets = root['target'] if isinstance(root['target'], list) else [root['target']] + active_targets = [target for target in targets if self._is_target_active(target)] + + if active_targets: + root['target'] = active_targets if len(active_targets) > 1 else active_targets[0] + else: + # Remove targets key if no active targets + del root['target'] + + for target_elem in root.findall('target'): + # The "_is_target_active" logic expects JSON, convert first + data = self._parse_element (target_elem) + if self._is_target_active(data) or not active_only: + target_data = self.process_target(target_elem, places_lookup) + else: + target_data = None + if target_data: + targets.append(target_data) + + metadata['total_targets'] = len(targets) + + # Build final JSON structure + result = { + 'metadata': metadata, + 'sanctions_programs': programs, + 'targets': targets, + 'places': places_lookup + } + + return result + +def main(): + """Main entry point for the converter.""" + parser = argparse.ArgumentParser( + description='Convert Swiss sanction list from XML to JSON format', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + robocop-ch-to-json --active < sanctions.xml > sanctions.json + robocop-ch-to-json --input sanctions.xml --output sanctions.json + """ + ) + + parser.add_argument('--active', action='store_true', + help='Only include active targets (exclude de-listed)') + parser.add_argument('--input', help='Input XML file path') + parser.add_argument('--output', '-o', help='Output JSON file path (default: stdout)') + parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose output') + parser.add_argument('--indent', type=int, default=2, help='JSON indentation level (default: 2)') + + args = parser.parse_args() + + try: + converter = SwissSanctionsConverter() + + # Convert XML to JSON + json_data = converter.convert_xml_to_json(args.input, args.active) + + # Save JSON file + json_result = json_data['targets'] + + # Output to file or stdout + try: + if args.output: + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(json_result, f, indent=args.indent, ensure_ascii=False) + print(f"Successfully converted XML to JSON: {args.output}", file=sys.stderr) + else: + json.dump(json_result, sys.stdout, indent=args.indent, ensure_ascii=False) + except IOError as e: + raise IOError(f"Failed to write JSON output: {e}") + + if args.verbose: + print(f"Conversion completed successfully!", file=sys.stderr) + print(f"Total targets: {json_data['metadata']['total_targets']}", file=sys.stderr) + print(f"Total places: {json_data['metadata']['total_places']}", file=sys.stderr) + print(f"Total programs: {len(json_data['sanctions_programs'])}", file=sys.stderr) + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/robocop-json-postprocess b/robocop-json-postprocess @@ -0,0 +1,4 @@ +#!/bin/sh +# This script is in the public domain. +# It removes empty arrays, objects and null values from the JSON data structure it is given. +exec jq 'walk(if type == "object" then with_entries(select(.value != [] and .value != {} and .value != null)) else . end)'