#!/usr/bin/env python3 """ VIAF Enrichment Script for Latin American Institutions Purpose: Fetch VIAF records for institutions with VIAF IDs and extract additional identifiers including ISIL codes (if present), Wikidata QIDs, and national authority file IDs. Strategy: 1. Load documented Latin American institutions dataset 2. Find all institutions with VIAF identifiers (currently 19) 3. Fetch full VIAF record for each VIAF ID 4. Parse XML to extract: - ISIL codes (if present in organizational identifiers) - Wikidata QIDs (cross-references) - National authority file IDs (LC, BNF, DNB, etc.) - Alternative names - Related organizations 5. Update institution records with new identifiers 6. Generate enrichment report Author: Global GLAM Dataset Project Date: 2025-11-06 """ import yaml import requests import xml.etree.ElementTree as ET from datetime import datetime, timezone from pathlib import Path from typing import Dict, List, Optional, Any from collections import defaultdict import time # VIAF API Configuration VIAF_BASE_URL = "https://viaf.org/viaf" VIAF_TIMEOUT = 10 # seconds RATE_LIMIT_DELAY = 1.0 # seconds between requests (be nice to VIAF) # Namespaces used in VIAF XML VIAF_NAMESPACES = { 'viaf': 'http://viaf.org/viaf/terms#', 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', 'foaf': 'http://xmlns.com/foaf/0.1/', 'void': 'http://rdfs.org/ns/void#', 'dcterms': 'http://purl.org/dc/terms/', 'owl': 'http://www.w3.org/2002/07/owl#', 'skos': 'http://www.w3.org/2004/02/skos/core#' } class VIAFEnricher: """Enriches heritage institution records using VIAF API""" def __init__(self, input_file: Path, output_file: Path): self.input_file = input_file self.output_file = output_file self.institutions = [] self.enrichment_stats = { 'total_institutions': 0, 'viaf_ids_found': 0, 'viaf_records_fetched': 0, 'viaf_fetch_errors': 0, 'new_isil_codes': 0, 'new_wikidata_ids': 0, 'new_authority_ids': 0, 'alternative_names_added': 0, 'institutions_enriched': 0 } self.enrichment_details = [] def load_institutions(self): """Load institutions from YAML file""" print(f"Loading institutions from {self.input_file}") with open(self.input_file, 'r', encoding='utf-8') as f: self.institutions = yaml.safe_load(f) self.enrichment_stats['total_institutions'] = len(self.institutions) print(f"Loaded {len(self.institutions)} institutions") def fetch_viaf_record(self, viaf_id: str) -> Optional[ET.Element]: """ Fetch VIAF record as XML Args: viaf_id: VIAF identifier Returns: XML element tree root or None if fetch failed """ url = f"{VIAF_BASE_URL}/{viaf_id}/viaf.xml" try: print(f" Fetching VIAF record: {url}") response = requests.get(url, timeout=VIAF_TIMEOUT) if response.status_code == 200: root = ET.fromstring(response.content) return root else: print(f" āš ļø VIAF fetch failed: HTTP {response.status_code}") return None except requests.RequestException as e: print(f" āŒ VIAF fetch error: {e}") return None except ET.ParseError as e: print(f" āŒ XML parse error: {e}") return None def extract_isil_from_viaf(self, root: ET.Element) -> Optional[str]: """ Extract ISIL code from VIAF XML if present VIAF may include ISIL codes in various fields. This is exploratory. """ # Strategy: Search for text containing "ISIL" or matching ISIL pattern # ISIL format: XX-XXXXX (2-letter country code + dash + identifier) import re isil_pattern = re.compile(r'\b([A-Z]{2}-[A-Za-z0-9]+)\b') # Search all text content for ISIL pattern for elem in root.iter(): if elem.text: match = isil_pattern.search(elem.text) if match: potential_isil = match.group(1) # Validate it's actually ISIL (not just any XX-YYY pattern) if any(potential_isil.startswith(code) for code in ['BR-', 'MX-', 'CL-', 'US-', 'NL-', 'FR-', 'DE-']): return potential_isil return None def extract_wikidata_from_viaf(self, root: ET.Element) -> Optional[str]: """ Extract Wikidata QID from VIAF record VIAF includes Wikidata as an external source """ # Look for Wikidata in various places # 1. Check for wikidata.org URLs for elem in root.iter(): if elem.text and 'wikidata.org' in elem.text: # Extract Q-number import re match = re.search(r'Q\d+', elem.text) if match: return match.group(0) # 2. Check for owl:sameAs or skos:exactMatch to Wikidata for elem in root.findall('.//owl:sameAs', VIAF_NAMESPACES): resource = elem.get('{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource', '') if 'wikidata.org' in resource: import re match = re.search(r'Q\d+', resource) if match: return match.group(0) return None def extract_alternative_names(self, root: ET.Element) -> List[str]: """Extract alternative name variants from VIAF""" names = [] # Look for variant names in various VIAF fields # Common fields: skos:altLabel, mainHeadings, x400 (variant forms) for elem in root.findall('.//skos:altLabel', VIAF_NAMESPACES): if elem.text and elem.text.strip(): names.append(elem.text.strip()) # Also check for foaf:name variants for elem in root.findall('.//foaf:name', VIAF_NAMESPACES): if elem.text and elem.text.strip(): name = elem.text.strip() if name not in names: names.append(name) return names[:5] # Limit to 5 alternative names def extract_authority_ids(self, root: ET.Element) -> Dict[str, str]: """ Extract national authority file IDs from VIAF Returns: Dictionary mapping authority scheme to ID e.g., {'LC': 'n79021164', 'BNF': '11865344r', 'DNB': '1047974'} """ authority_ids = {} # VIAF includes sources from various national libraries # Look for dcterms:source or viaf:sources for elem in root.findall('.//dcterms:source', VIAF_NAMESPACES): source_text = elem.text or '' # Parse source references (format varies) # Example: "LC|n 79021164" or "BNF|11865344r" if '|' in source_text: parts = source_text.split('|') if len(parts) == 2: scheme, identifier = parts authority_ids[scheme.strip()] = identifier.strip() return authority_ids def enrich_institution(self, institution: Dict[str, Any]) -> bool: """ Enrich a single institution with VIAF data Returns: True if enrichment occurred, False otherwise """ # Find VIAF identifier viaf_id = None identifiers = institution.get('identifiers', []) for identifier in identifiers: if identifier.get('identifier_scheme') == 'VIAF': viaf_id = identifier.get('identifier_value') break if not viaf_id: return False self.enrichment_stats['viaf_ids_found'] += 1 print(f"\nšŸ” Enriching: {institution.get('name')} (VIAF {viaf_id})") # Fetch VIAF record root = self.fetch_viaf_record(viaf_id) if root is None: self.enrichment_stats['viaf_fetch_errors'] += 1 return False self.enrichment_stats['viaf_records_fetched'] += 1 enriched = False enrichment_log = { 'institution_name': institution.get('name'), 'viaf_id': viaf_id, 'new_identifiers': [], 'alternative_names': [] } # Extract ISIL code isil_code = self.extract_isil_from_viaf(root) if isil_code: # Check if we already have this ISIL has_isil = any(id.get('identifier_scheme') == 'ISIL' for id in identifiers) if not has_isil: print(f" āœ… Found ISIL code: {isil_code}") identifiers.append({ 'identifier_scheme': 'ISIL', 'identifier_value': isil_code, # ISIL codes don't have a universal URL }) self.enrichment_stats['new_isil_codes'] += 1 enrichment_log['new_identifiers'].append(f"ISIL: {isil_code}") enriched = True # Extract Wikidata QID wikidata_qid = self.extract_wikidata_from_viaf(root) if wikidata_qid: # Check if we already have this Wikidata ID has_wikidata = any( id.get('identifier_scheme') == 'Wikidata' and id.get('identifier_value') == wikidata_qid for id in identifiers ) if not has_wikidata: print(f" āœ… Found Wikidata: {wikidata_qid}") identifiers.append({ 'identifier_scheme': 'Wikidata', 'identifier_value': wikidata_qid, 'identifier_url': f"https://www.wikidata.org/wiki/{wikidata_qid}" }) self.enrichment_stats['new_wikidata_ids'] += 1 enrichment_log['new_identifiers'].append(f"Wikidata: {wikidata_qid}") enriched = True # Extract alternative names alt_names = self.extract_alternative_names(root) if alt_names: existing_alt_names = institution.get('alternative_names', []) new_names = [name for name in alt_names if name not in existing_alt_names] if new_names: print(f" āœ… Found {len(new_names)} alternative names") institution['alternative_names'] = existing_alt_names + new_names self.enrichment_stats['alternative_names_added'] += len(new_names) enrichment_log['alternative_names'] = new_names enriched = True # Extract authority IDs authority_ids = self.extract_authority_ids(root) for scheme, auth_id in authority_ids.items(): # Add as identifier has_authority = any( id.get('identifier_scheme') == scheme and id.get('identifier_value') == auth_id for id in identifiers ) if not has_authority: print(f" āœ… Found authority ID: {scheme} = {auth_id}") identifiers.append({ 'identifier_scheme': scheme, 'identifier_value': auth_id, 'identifier_url': None # URLs vary by scheme }) self.enrichment_stats['new_authority_ids'] += 1 enrichment_log['new_identifiers'].append(f"{scheme}: {auth_id}") enriched = True if enriched: self.enrichment_stats['institutions_enriched'] += 1 self.enrichment_details.append(enrichment_log) # Update provenance if 'provenance' in institution: existing_notes = institution['provenance'].get('notes', '') viaf_note = f"\nVIAF enrichment (2025-11-06): Fetched full VIAF record {viaf_id}. " viaf_note += f"Added {len(enrichment_log['new_identifiers'])} new identifiers." institution['provenance']['notes'] = (existing_notes + viaf_note).strip() return enriched def process_all_institutions(self): """Process all institutions and enrich from VIAF""" print(f"\n{'='*70}") print("VIAF Enrichment Process") print(f"{'='*70}\n") for idx, institution in enumerate(self.institutions, 1): enriched = self.enrich_institution(institution) if enriched: print(f" āœ… Enrichment successful") # Rate limiting if idx < len(self.institutions): time.sleep(RATE_LIMIT_DELAY) print(f"\n{'='*70}") print("VIAF Enrichment Complete") print(f"{'='*70}\n") def save_enriched_dataset(self): """Save enriched institutions to output file""" print(f"Saving enriched dataset to {self.output_file}") # Add metadata header metadata = { 'enrichment_date': datetime.now(timezone.utc).isoformat(), 'enrichment_method': 'VIAF API v2.0', 'viaf_ids_processed': self.enrichment_stats['viaf_ids_found'], 'viaf_records_fetched': self.enrichment_stats['viaf_records_fetched'], 'institutions_enriched': self.enrichment_stats['institutions_enriched'] } with open(self.output_file, 'w', encoding='utf-8') as f: f.write("---\n") f.write("# Latin American GLAM Institutions - VIAF Enriched\n") f.write(f"# Generated: {metadata['enrichment_date']}\n") f.write("#\n") f.write("# VIAF Enrichment Summary:\n") for key, value in self.enrichment_stats.items(): f.write(f"# - {key}: {value}\n") f.write("\n") yaml.dump(self.institutions, f, allow_unicode=True, sort_keys=False) print(f"āœ… Saved {len(self.institutions)} institutions") def generate_report(self): """Generate enrichment report""" print("\n" + "="*70) print("VIAF ENRICHMENT REPORT") print("="*70 + "\n") print(f"Total institutions processed: {self.enrichment_stats['total_institutions']}") print(f"Institutions with VIAF IDs: {self.enrichment_stats['viaf_ids_found']}") print(f"VIAF records successfully fetched: {self.enrichment_stats['viaf_records_fetched']}") print(f"VIAF fetch errors: {self.enrichment_stats['viaf_fetch_errors']}") print(f"\nEnrichment Results:") print(f" New ISIL codes added: {self.enrichment_stats['new_isil_codes']}") print(f" New Wikidata IDs added: {self.enrichment_stats['new_wikidata_ids']}") print(f" New authority IDs added: {self.enrichment_stats['new_authority_ids']}") print(f" Alternative names added: {self.enrichment_stats['alternative_names_added']}") print(f" Institutions enriched: {self.enrichment_stats['institutions_enriched']}") if self.enrichment_details: print(f"\nDetailed Enrichment Log:") for detail in self.enrichment_details: print(f"\n {detail['institution_name']} (VIAF {detail['viaf_id']})") if detail['new_identifiers']: for identifier in detail['new_identifiers']: print(f" + {identifier}") if detail['alternative_names']: print(f" + Alternative names: {', '.join(detail['alternative_names'][:3])}") print("\n" + "="*70 + "\n") def main(): """Main execution""" # File paths base_dir = Path(__file__).parent.parent input_file = base_dir / "data" / "instances" / "latin_american_institutions_documented.yaml" output_file = base_dir / "data" / "instances" / "latin_american_institutions_viaf_enriched.yaml" # Validate input file exists if not input_file.exists(): print(f"āŒ Error: Input file not found: {input_file}") print(" Please ensure the documented dataset exists.") return 1 # Create enricher enricher = VIAFEnricher(input_file, output_file) # Load institutions enricher.load_institutions() # Process all institutions enricher.process_all_institutions() # Save enriched dataset enricher.save_enriched_dataset() # Generate report enricher.generate_report() print(f"āœ… VIAF enrichment complete!") print(f" Input: {input_file}") print(f" Output: {output_file}") return 0 if __name__ == '__main__': exit(main())