#!/usr/bin/env python3 """ Parser for Czech Republic ISIL Database (ADR) Converts MARC21 XML format to LinkML-compliant HeritageCustodian records. Source: National Library of the Czech Republic Format: MARC21 XML with custom tags License: CC0 """ import xml.etree.ElementTree as ET from datetime import datetime, timezone from typing import Dict, List, Optional, Any import yaml import sys import os # Add parent directory to path for imports sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) def parse_subfields(field, ns: dict) -> Dict[str, Any]: """Extract subfields from a MARC datafield.""" subfields = {} for subfield in field.findall('marc:subfield', ns): code = subfield.get('code') value = subfield.text if value: if code in subfields: # Handle repeated subfields if isinstance(subfields[code], list): subfields[code].append(value) else: subfields[code] = [subfields[code], value] else: subfields[code] = value return subfields def map_institution_type(czech_type: str) -> str: """Map Czech institution types to GLAMORCUBESFIXPHDNT taxonomy. Based on actual TYP codes found in ADR database: - OK (61.5%): Obecní knihovna (Community Library) - SP (12.7%): Ostatní specializovaná knihovna (Other Specialized Library) - MK (6.3%): Městská knihovna (Municipal Library) - VK (4.6%): Knihovna výzkumného ústavu (Research Institute Library) - VŠ (3.5%): Vysokoškolská knihovna (Academic/University Library) - LK (2.8%): Lékařská knihovna (Medical Library) - KI-MU (2.2%): Knihovna kulturní instituce - muzeum (Museum Library) - AK (2.0%): Knihovna státní správy (State Administration Library) - ŠK (1.7%): Školní knihovna (School Library) - KI (1.3%): Knihovna kulturní instituce (Cultural Institution Library) - CK (0.7%): Církevní knihovna (Church Library) - KI-GA (0.4%): Knihovna kulturní instituce - galerie (Gallery Library) - KK (0.2%): Krajská knihovna (Regional Library) - NK (0.1%): Národní knihovna (National Library) """ type_mapping = { # Libraries (various types) 'NK': 'LIBRARY', # National Library 'VŠ': 'LIBRARY', # Academic/University Library 'VK': 'LIBRARY', # Research Institute Library 'MK': 'LIBRARY', # Municipal Library 'OK': 'LIBRARY', # Community Library 'KK': 'LIBRARY', # Regional Library 'SP': 'LIBRARY', # Specialized Library 'LK': 'LIBRARY', # Medical Library 'ŠK': 'EDUCATION_PROVIDER', # School Library (schools with collections) 'CK': 'HOLY_SITES', # Church Library (religious institutions with collections) 'AK': 'OFFICIAL_INSTITUTION', # State Administration Library 'KI': 'LIBRARY', # Cultural Institution Library (generic) # Museum and Gallery Libraries 'KI-MU': 'MUSEUM', # Museum with library collection 'KI-GA': 'GALLERY', # Gallery with library collection # Archives (not yet seen in data, but included for completeness) 'ARC': 'ARCHIVE', } return type_mapping.get(czech_type, 'UNKNOWN') def parse_gps(gps_string: str) -> Optional[Dict[str, float]]: """Parse GPS coordinates from string format like: 50°5'11.12"N, 14°24'56.61"E""" if not gps_string: return None try: parts = gps_string.split(',') if len(parts) != 2: return None # Parse latitude lat_str = parts[0].strip() lat_parts = lat_str.replace('°', ' ').replace("'", ' ').replace('"', ' ').split() lat_deg = float(lat_parts[0]) lat_min = float(lat_parts[1]) lat_sec = float(lat_parts[2]) lat_dir = lat_parts[3] latitude = lat_deg + (lat_min / 60) + (lat_sec / 3600) if lat_dir == 'S': latitude = -latitude # Parse longitude lon_str = parts[1].strip() lon_parts = lon_str.replace('°', ' ').replace("'", ' ').replace('"', ' ').split() lon_deg = float(lon_parts[0]) lon_min = float(lon_parts[1]) lon_sec = float(lon_parts[2]) lon_dir = lon_parts[3] longitude = lon_deg + (lon_min / 60) + (lon_sec / 3600) if lon_dir == 'W': longitude = -longitude return { 'latitude': round(latitude, 6), 'longitude': round(longitude, 6) } except (ValueError, IndexError): return None def parse_record(record, ns: dict) -> Optional[Dict[str, Any]]: """Parse a single MARC21 record into HeritageCustodian format.""" # Extract all fields fields = {} for field in record.findall('marc:datafield', ns): tag = field.get('tag') subfields = parse_subfields(field, ns) if tag in fields: if isinstance(fields[tag], list): fields[tag].append(subfields) else: fields[tag] = [fields[tag], subfields] else: fields[tag] = subfields # Skip records without sigla (library code) if 'SGL' not in fields: return None sigla = fields['SGL'].get('a') if not sigla: return None # Build institution record institution = { 'id': f'https://w3id.org/heritage/custodian/cz/{sigla.lower()}', 'ghcid': f'CZ-{sigla}', # NOTE: Using sigla as ISIL code # Czech siglas (e.g., ABA000) are library codes, NOT standard ISIL format # Standard ISIL should be CZ-XXXXX per ISO 15511 # Investigation needed: mapping between siglas and official ISIL codes } # Name (NAZ field) if 'NAZ' in fields: naz = fields['NAZ'] name_parts = [] for key in ['a', 'b', 'c']: if key in naz: name_parts.append(naz[key]) institution['name'] = ' - '.join(name_parts) # Alternative names (VAR field) alternative_names = [] if 'VAR' in fields: vars = fields['VAR'] if isinstance(fields['VAR'], list) else [fields['VAR']] for var in vars: if 'a' in var: alt_name = var['a'] if 'b' in var: alt_name += ' - ' + var['b'] if 'c' in var: alt_name += ' - ' + var['c'] alternative_names.append(alt_name) if alternative_names: institution['alternative_names'] = alternative_names # Institution type (TYP field) if 'TYP' in fields: typ = fields['TYP'] if isinstance(fields['TYP'], dict) else fields['TYP'][0] czech_type = typ.get('a', '').upper() institution['institution_type'] = map_institution_type(czech_type) # Add description with Czech type if 'b' in typ: institution['description'] = f"Czech institution type: {typ['b']}" # Location (ADR field) locations = [] if 'ADR' in fields: adrs = fields['ADR'] if isinstance(fields['ADR'], list) else [fields['ADR']] for adr in adrs: location = {} if 'u' in adr: location['street_address'] = adr['u'] if 'c' in adr: location['postal_code'] = adr['c'] if 'm' in adr: location['city'] = adr['m'] # City from MES field if not in ADR if 'city' not in location and 'MES' in fields: location['city'] = fields['MES'].get('a') # Region and country if 'KRJ' in fields: krj = fields['KRJ'] if 'a' in krj: location['region'] = krj['a'] location['country'] = 'CZ' # GPS coordinates if 'g' in adr: gps = parse_gps(adr['g']) if gps: location['latitude'] = gps['latitude'] location['longitude'] = gps['longitude'] if location: locations.append(location) if locations: institution['locations'] = locations # Identifiers identifiers = [] # Sigla as identifier identifiers.append({ 'identifier_scheme': 'Sigla', 'identifier_value': sigla }) # IČO (Czech company registration number) if 'ICO' in fields: ico = fields['ICO'] if 'a' in ico: identifiers.append({ 'identifier_scheme': 'IČO', 'identifier_value': ico['a'] }) if 'b' in ico: identifiers.append({ 'identifier_scheme': 'DIČ', 'identifier_value': ico['b'] }) # URLs if 'URL' in fields: urls = fields['URL'] if isinstance(fields['URL'], list) else [fields['URL']] for url_field in urls: if 'u' in url_field: identifier = { 'identifier_scheme': 'Website', 'identifier_value': url_field['u'], 'identifier_url': url_field['u'] } identifiers.append(identifier) break # Just take the first URL as main website if identifiers: institution['identifiers'] = identifiers # Collection metadata if 'FND' in fields: fnd = fields['FND'] collection = { 'collection_name': 'Main Collection', 'collection_type': 'library' } extent_parts = [] if 'k' in fnd: extent_parts.append(f"{fnd['k']} books") if 'p' in fnd: extent_parts.append(f"{fnd['p']} periodicals") if extent_parts: collection['extent'] = ', '.join(extent_parts) if 'r' in fnd: collection['temporal_coverage'] = f"{fnd['r']}-01-01/{fnd['r']}-12-31" institution['collections'] = [collection] # Digital platform (library system) if 'KNS' in fields: kns = fields['KNS'] if 'a' in kns: platform = { 'platform_name': kns['a'], 'platform_type': 'LIBRARY_SYSTEM' } institution['digital_platforms'] = [platform] # Provenance institution['provenance'] = { 'data_source': 'CSV_REGISTRY', 'data_tier': 'TIER_1_AUTHORITATIVE', 'extraction_date': datetime.now(timezone.utc).isoformat(), 'extraction_method': 'Parsed from Czech ADR MARC21 XML database', 'source_url': 'https://aleph.nkp.cz/data/adr.xml.gz', 'confidence_score': 0.95 } return institution def parse_czech_isil(xml_path: str, output_path: str, limit: Optional[int] = None): """Parse Czech ISIL database and generate LinkML instances.""" print(f"Parsing Czech ISIL database from: {xml_path}") # Parse XML tree = ET.parse(xml_path) root = tree.getroot() # MARC21 namespace ns = {'marc': 'http://www.loc.gov/MARC21/slim'} # Parse all records institutions = [] total_records = 0 skipped_records = 0 for record in root.findall('marc:record', ns): total_records += 1 if limit and total_records > limit: break institution = parse_record(record, ns) if institution: institutions.append(institution) else: skipped_records += 1 if total_records % 500 == 0: print(f"Processed {total_records} records... ({len(institutions)} valid)") print(f"\nParsing complete:") print(f" Total records: {total_records}") print(f" Valid institutions: {len(institutions)}") print(f" Skipped: {skipped_records}") # Write to YAML print(f"\nWriting to: {output_path}") with open(output_path, 'w', encoding='utf-8') as f: f.write("---\n") f.write("# Czech Republic Heritage Institutions\n") f.write(f"# Source: National Library of the Czech Republic (ADR Database)\n") f.write(f"# Parsed: {datetime.now(timezone.utc).isoformat()}\n") f.write(f"# Total institutions: {len(institutions)}\n") f.write("# License: CC0 (Public Domain)\n\n") yaml.dump(institutions, f, default_flow_style=False, allow_unicode=True, sort_keys=False) print(f"✅ Successfully wrote {len(institutions)} institutions to {output_path}") return institutions if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Parse Czech ISIL database') parser.add_argument('--input', '-i', default='data/isil/czech_republic/adr.xml', help='Input XML file path') parser.add_argument('--output', '-o', default='data/instances/czech_institutions.yaml', help='Output YAML file path') parser.add_argument('--limit', '-l', type=int, default=None, help='Limit number of records to parse (for testing)') args = parser.parse_args() parse_czech_isil(args.input, args.output, args.limit)