437 lines
17 KiB
Python
Executable file
437 lines
17 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
VIAF Enrichment Script for Latin American Institutions
|
|
|
|
Purpose: Fetch VIAF records for institutions with VIAF IDs and extract additional identifiers
|
|
including ISIL codes (if present), Wikidata QIDs, and national authority file IDs.
|
|
|
|
Strategy:
|
|
1. Load documented Latin American institutions dataset
|
|
2. Find all institutions with VIAF identifiers (currently 19)
|
|
3. Fetch full VIAF record for each VIAF ID
|
|
4. Parse XML to extract:
|
|
- ISIL codes (if present in organizational identifiers)
|
|
- Wikidata QIDs (cross-references)
|
|
- National authority file IDs (LC, BNF, DNB, etc.)
|
|
- Alternative names
|
|
- Related organizations
|
|
5. Update institution records with new identifiers
|
|
6. Generate enrichment report
|
|
|
|
Author: Global GLAM Dataset Project
|
|
Date: 2025-11-06
|
|
"""
|
|
|
|
import yaml
|
|
import requests
|
|
import xml.etree.ElementTree as ET
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from collections import defaultdict
|
|
import time
|
|
|
|
# VIAF API Configuration
|
|
VIAF_BASE_URL = "https://viaf.org/viaf"
|
|
VIAF_TIMEOUT = 10 # seconds
|
|
RATE_LIMIT_DELAY = 1.0 # seconds between requests (be nice to VIAF)
|
|
|
|
# Namespaces used in VIAF XML
|
|
VIAF_NAMESPACES = {
|
|
'viaf': 'http://viaf.org/viaf/terms#',
|
|
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
|
|
'foaf': 'http://xmlns.com/foaf/0.1/',
|
|
'void': 'http://rdfs.org/ns/void#',
|
|
'dcterms': 'http://purl.org/dc/terms/',
|
|
'owl': 'http://www.w3.org/2002/07/owl#',
|
|
'skos': 'http://www.w3.org/2004/02/skos/core#'
|
|
}
|
|
|
|
|
|
class VIAFEnricher:
|
|
"""Enriches heritage institution records using VIAF API"""
|
|
|
|
def __init__(self, input_file: Path, output_file: Path):
|
|
self.input_file = input_file
|
|
self.output_file = output_file
|
|
self.institutions = []
|
|
self.enrichment_stats = {
|
|
'total_institutions': 0,
|
|
'viaf_ids_found': 0,
|
|
'viaf_records_fetched': 0,
|
|
'viaf_fetch_errors': 0,
|
|
'new_isil_codes': 0,
|
|
'new_wikidata_ids': 0,
|
|
'new_authority_ids': 0,
|
|
'alternative_names_added': 0,
|
|
'institutions_enriched': 0
|
|
}
|
|
self.enrichment_details = []
|
|
|
|
def load_institutions(self):
|
|
"""Load institutions from YAML file"""
|
|
print(f"Loading institutions from {self.input_file}")
|
|
with open(self.input_file, 'r', encoding='utf-8') as f:
|
|
self.institutions = yaml.safe_load(f)
|
|
|
|
self.enrichment_stats['total_institutions'] = len(self.institutions)
|
|
print(f"Loaded {len(self.institutions)} institutions")
|
|
|
|
def fetch_viaf_record(self, viaf_id: str) -> Optional[ET.Element]:
|
|
"""
|
|
Fetch VIAF record as XML
|
|
|
|
Args:
|
|
viaf_id: VIAF identifier
|
|
|
|
Returns:
|
|
XML element tree root or None if fetch failed
|
|
"""
|
|
url = f"{VIAF_BASE_URL}/{viaf_id}/viaf.xml"
|
|
|
|
try:
|
|
print(f" Fetching VIAF record: {url}")
|
|
response = requests.get(url, timeout=VIAF_TIMEOUT)
|
|
|
|
if response.status_code == 200:
|
|
root = ET.fromstring(response.content)
|
|
return root
|
|
else:
|
|
print(f" ⚠️ VIAF fetch failed: HTTP {response.status_code}")
|
|
return None
|
|
|
|
except requests.RequestException as e:
|
|
print(f" ❌ VIAF fetch error: {e}")
|
|
return None
|
|
except ET.ParseError as e:
|
|
print(f" ❌ XML parse error: {e}")
|
|
return None
|
|
|
|
def extract_isil_from_viaf(self, root: ET.Element) -> Optional[str]:
|
|
"""
|
|
Extract ISIL code from VIAF XML if present
|
|
|
|
VIAF may include ISIL codes in various fields. This is exploratory.
|
|
"""
|
|
# Strategy: Search for text containing "ISIL" or matching ISIL pattern
|
|
# ISIL format: XX-XXXXX (2-letter country code + dash + identifier)
|
|
|
|
import re
|
|
isil_pattern = re.compile(r'\b([A-Z]{2}-[A-Za-z0-9]+)\b')
|
|
|
|
# Search all text content for ISIL pattern
|
|
for elem in root.iter():
|
|
if elem.text:
|
|
match = isil_pattern.search(elem.text)
|
|
if match:
|
|
potential_isil = match.group(1)
|
|
# Validate it's actually ISIL (not just any XX-YYY pattern)
|
|
if any(potential_isil.startswith(code) for code in ['BR-', 'MX-', 'CL-', 'US-', 'NL-', 'FR-', 'DE-']):
|
|
return potential_isil
|
|
|
|
return None
|
|
|
|
def extract_wikidata_from_viaf(self, root: ET.Element) -> Optional[str]:
|
|
"""
|
|
Extract Wikidata QID from VIAF record
|
|
|
|
VIAF includes Wikidata as an external source
|
|
"""
|
|
# Look for Wikidata in various places
|
|
# 1. Check for wikidata.org URLs
|
|
for elem in root.iter():
|
|
if elem.text and 'wikidata.org' in elem.text:
|
|
# Extract Q-number
|
|
import re
|
|
match = re.search(r'Q\d+', elem.text)
|
|
if match:
|
|
return match.group(0)
|
|
|
|
# 2. Check for owl:sameAs or skos:exactMatch to Wikidata
|
|
for elem in root.findall('.//owl:sameAs', VIAF_NAMESPACES):
|
|
resource = elem.get('{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource', '')
|
|
if 'wikidata.org' in resource:
|
|
import re
|
|
match = re.search(r'Q\d+', resource)
|
|
if match:
|
|
return match.group(0)
|
|
|
|
return None
|
|
|
|
def extract_alternative_names(self, root: ET.Element) -> List[str]:
|
|
"""Extract alternative name variants from VIAF"""
|
|
names = []
|
|
|
|
# Look for variant names in various VIAF fields
|
|
# Common fields: skos:altLabel, mainHeadings, x400 (variant forms)
|
|
|
|
for elem in root.findall('.//skos:altLabel', VIAF_NAMESPACES):
|
|
if elem.text and elem.text.strip():
|
|
names.append(elem.text.strip())
|
|
|
|
# Also check for foaf:name variants
|
|
for elem in root.findall('.//foaf:name', VIAF_NAMESPACES):
|
|
if elem.text and elem.text.strip():
|
|
name = elem.text.strip()
|
|
if name not in names:
|
|
names.append(name)
|
|
|
|
return names[:5] # Limit to 5 alternative names
|
|
|
|
def extract_authority_ids(self, root: ET.Element) -> Dict[str, str]:
|
|
"""
|
|
Extract national authority file IDs from VIAF
|
|
|
|
Returns:
|
|
Dictionary mapping authority scheme to ID
|
|
e.g., {'LC': 'n79021164', 'BNF': '11865344r', 'DNB': '1047974'}
|
|
"""
|
|
authority_ids = {}
|
|
|
|
# VIAF includes sources from various national libraries
|
|
# Look for dcterms:source or viaf:sources
|
|
for elem in root.findall('.//dcterms:source', VIAF_NAMESPACES):
|
|
source_text = elem.text or ''
|
|
# Parse source references (format varies)
|
|
# Example: "LC|n 79021164" or "BNF|11865344r"
|
|
if '|' in source_text:
|
|
parts = source_text.split('|')
|
|
if len(parts) == 2:
|
|
scheme, identifier = parts
|
|
authority_ids[scheme.strip()] = identifier.strip()
|
|
|
|
return authority_ids
|
|
|
|
def enrich_institution(self, institution: Dict[str, Any]) -> bool:
|
|
"""
|
|
Enrich a single institution with VIAF data
|
|
|
|
Returns:
|
|
True if enrichment occurred, False otherwise
|
|
"""
|
|
# Find VIAF identifier
|
|
viaf_id = None
|
|
identifiers = institution.get('identifiers', [])
|
|
|
|
for identifier in identifiers:
|
|
if identifier.get('identifier_scheme') == 'VIAF':
|
|
viaf_id = identifier.get('identifier_value')
|
|
break
|
|
|
|
if not viaf_id:
|
|
return False
|
|
|
|
self.enrichment_stats['viaf_ids_found'] += 1
|
|
|
|
print(f"\n🔍 Enriching: {institution.get('name')} (VIAF {viaf_id})")
|
|
|
|
# Fetch VIAF record
|
|
root = self.fetch_viaf_record(viaf_id)
|
|
if root is None:
|
|
self.enrichment_stats['viaf_fetch_errors'] += 1
|
|
return False
|
|
|
|
self.enrichment_stats['viaf_records_fetched'] += 1
|
|
|
|
enriched = False
|
|
enrichment_log = {
|
|
'institution_name': institution.get('name'),
|
|
'viaf_id': viaf_id,
|
|
'new_identifiers': [],
|
|
'alternative_names': []
|
|
}
|
|
|
|
# Extract ISIL code
|
|
isil_code = self.extract_isil_from_viaf(root)
|
|
if isil_code:
|
|
# Check if we already have this ISIL
|
|
has_isil = any(id.get('identifier_scheme') == 'ISIL' for id in identifiers)
|
|
if not has_isil:
|
|
print(f" ✅ Found ISIL code: {isil_code}")
|
|
identifiers.append({
|
|
'identifier_scheme': 'ISIL',
|
|
'identifier_value': isil_code,
|
|
# ISIL codes don't have a universal URL
|
|
})
|
|
self.enrichment_stats['new_isil_codes'] += 1
|
|
enrichment_log['new_identifiers'].append(f"ISIL: {isil_code}")
|
|
enriched = True
|
|
|
|
# Extract Wikidata QID
|
|
wikidata_qid = self.extract_wikidata_from_viaf(root)
|
|
if wikidata_qid:
|
|
# Check if we already have this Wikidata ID
|
|
has_wikidata = any(
|
|
id.get('identifier_scheme') == 'Wikidata' and id.get('identifier_value') == wikidata_qid
|
|
for id in identifiers
|
|
)
|
|
if not has_wikidata:
|
|
print(f" ✅ Found Wikidata: {wikidata_qid}")
|
|
identifiers.append({
|
|
'identifier_scheme': 'Wikidata',
|
|
'identifier_value': wikidata_qid,
|
|
'identifier_url': f"https://www.wikidata.org/wiki/{wikidata_qid}"
|
|
})
|
|
self.enrichment_stats['new_wikidata_ids'] += 1
|
|
enrichment_log['new_identifiers'].append(f"Wikidata: {wikidata_qid}")
|
|
enriched = True
|
|
|
|
# Extract alternative names
|
|
alt_names = self.extract_alternative_names(root)
|
|
if alt_names:
|
|
existing_alt_names = institution.get('alternative_names', [])
|
|
new_names = [name for name in alt_names if name not in existing_alt_names]
|
|
if new_names:
|
|
print(f" ✅ Found {len(new_names)} alternative names")
|
|
institution['alternative_names'] = existing_alt_names + new_names
|
|
self.enrichment_stats['alternative_names_added'] += len(new_names)
|
|
enrichment_log['alternative_names'] = new_names
|
|
enriched = True
|
|
|
|
# Extract authority IDs
|
|
authority_ids = self.extract_authority_ids(root)
|
|
for scheme, auth_id in authority_ids.items():
|
|
# Add as identifier
|
|
has_authority = any(
|
|
id.get('identifier_scheme') == scheme and id.get('identifier_value') == auth_id
|
|
for id in identifiers
|
|
)
|
|
if not has_authority:
|
|
print(f" ✅ Found authority ID: {scheme} = {auth_id}")
|
|
identifiers.append({
|
|
'identifier_scheme': scheme,
|
|
'identifier_value': auth_id,
|
|
'identifier_url': None # URLs vary by scheme
|
|
})
|
|
self.enrichment_stats['new_authority_ids'] += 1
|
|
enrichment_log['new_identifiers'].append(f"{scheme}: {auth_id}")
|
|
enriched = True
|
|
|
|
if enriched:
|
|
self.enrichment_stats['institutions_enriched'] += 1
|
|
self.enrichment_details.append(enrichment_log)
|
|
|
|
# Update provenance
|
|
if 'provenance' in institution:
|
|
existing_notes = institution['provenance'].get('notes', '')
|
|
viaf_note = f"\nVIAF enrichment (2025-11-06): Fetched full VIAF record {viaf_id}. "
|
|
viaf_note += f"Added {len(enrichment_log['new_identifiers'])} new identifiers."
|
|
|
|
institution['provenance']['notes'] = (existing_notes + viaf_note).strip()
|
|
|
|
return enriched
|
|
|
|
def process_all_institutions(self):
|
|
"""Process all institutions and enrich from VIAF"""
|
|
print(f"\n{'='*70}")
|
|
print("VIAF Enrichment Process")
|
|
print(f"{'='*70}\n")
|
|
|
|
for idx, institution in enumerate(self.institutions, 1):
|
|
enriched = self.enrich_institution(institution)
|
|
|
|
if enriched:
|
|
print(f" ✅ Enrichment successful")
|
|
|
|
# Rate limiting
|
|
if idx < len(self.institutions):
|
|
time.sleep(RATE_LIMIT_DELAY)
|
|
|
|
print(f"\n{'='*70}")
|
|
print("VIAF Enrichment Complete")
|
|
print(f"{'='*70}\n")
|
|
|
|
def save_enriched_dataset(self):
|
|
"""Save enriched institutions to output file"""
|
|
print(f"Saving enriched dataset to {self.output_file}")
|
|
|
|
# Add metadata header
|
|
metadata = {
|
|
'enrichment_date': datetime.now(timezone.utc).isoformat(),
|
|
'enrichment_method': 'VIAF API v2.0',
|
|
'viaf_ids_processed': self.enrichment_stats['viaf_ids_found'],
|
|
'viaf_records_fetched': self.enrichment_stats['viaf_records_fetched'],
|
|
'institutions_enriched': self.enrichment_stats['institutions_enriched']
|
|
}
|
|
|
|
with open(self.output_file, 'w', encoding='utf-8') as f:
|
|
f.write("---\n")
|
|
f.write("# Latin American GLAM Institutions - VIAF Enriched\n")
|
|
f.write(f"# Generated: {metadata['enrichment_date']}\n")
|
|
f.write("#\n")
|
|
f.write("# VIAF Enrichment Summary:\n")
|
|
for key, value in self.enrichment_stats.items():
|
|
f.write(f"# - {key}: {value}\n")
|
|
f.write("\n")
|
|
|
|
yaml.dump(self.institutions, f, allow_unicode=True, sort_keys=False)
|
|
|
|
print(f"✅ Saved {len(self.institutions)} institutions")
|
|
|
|
def generate_report(self):
|
|
"""Generate enrichment report"""
|
|
print("\n" + "="*70)
|
|
print("VIAF ENRICHMENT REPORT")
|
|
print("="*70 + "\n")
|
|
|
|
print(f"Total institutions processed: {self.enrichment_stats['total_institutions']}")
|
|
print(f"Institutions with VIAF IDs: {self.enrichment_stats['viaf_ids_found']}")
|
|
print(f"VIAF records successfully fetched: {self.enrichment_stats['viaf_records_fetched']}")
|
|
print(f"VIAF fetch errors: {self.enrichment_stats['viaf_fetch_errors']}")
|
|
print(f"\nEnrichment Results:")
|
|
print(f" New ISIL codes added: {self.enrichment_stats['new_isil_codes']}")
|
|
print(f" New Wikidata IDs added: {self.enrichment_stats['new_wikidata_ids']}")
|
|
print(f" New authority IDs added: {self.enrichment_stats['new_authority_ids']}")
|
|
print(f" Alternative names added: {self.enrichment_stats['alternative_names_added']}")
|
|
print(f" Institutions enriched: {self.enrichment_stats['institutions_enriched']}")
|
|
|
|
if self.enrichment_details:
|
|
print(f"\nDetailed Enrichment Log:")
|
|
for detail in self.enrichment_details:
|
|
print(f"\n {detail['institution_name']} (VIAF {detail['viaf_id']})")
|
|
if detail['new_identifiers']:
|
|
for identifier in detail['new_identifiers']:
|
|
print(f" + {identifier}")
|
|
if detail['alternative_names']:
|
|
print(f" + Alternative names: {', '.join(detail['alternative_names'][:3])}")
|
|
|
|
print("\n" + "="*70 + "\n")
|
|
|
|
|
|
def main():
|
|
"""Main execution"""
|
|
# File paths
|
|
base_dir = Path(__file__).parent.parent
|
|
input_file = base_dir / "data" / "instances" / "latin_american_institutions_documented.yaml"
|
|
output_file = base_dir / "data" / "instances" / "latin_american_institutions_viaf_enriched.yaml"
|
|
|
|
# Validate input file exists
|
|
if not input_file.exists():
|
|
print(f"❌ Error: Input file not found: {input_file}")
|
|
print(" Please ensure the documented dataset exists.")
|
|
return 1
|
|
|
|
# Create enricher
|
|
enricher = VIAFEnricher(input_file, output_file)
|
|
|
|
# Load institutions
|
|
enricher.load_institutions()
|
|
|
|
# Process all institutions
|
|
enricher.process_all_institutions()
|
|
|
|
# Save enriched dataset
|
|
enricher.save_enriched_dataset()
|
|
|
|
# Generate report
|
|
enricher.generate_report()
|
|
|
|
print(f"✅ VIAF enrichment complete!")
|
|
print(f" Input: {input_file}")
|
|
print(f" Output: {output_file}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
exit(main())
|