glam/scripts/integrate_ch_annotator_to_custodian.py
2025-12-07 00:26:01 +01:00

515 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Integrate CH-Annotator data into custodian files.
This script:
1. Reads CH-Annotator enhanced files from data/instances/*_ch_annotator.yaml
2. Matches institutions to existing custodian files by ISIL, Wikidata ID, or name
3. Adds ch_annotator section to matching custodian files
4. Creates new custodian files for unmatched institutions
5. Generates an integration report
Usage:
python scripts/integrate_ch_annotator_to_custodian.py [--dry-run] [--report-only]
Options:
--dry-run Preview changes without writing files
--report-only Only generate matching report, no file changes
"""
import os
import sys
import yaml
import argparse
from datetime import datetime, timezone
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Any
import re
# Paths
PROJECT_ROOT = Path(__file__).parent.parent
CH_ANNOTATOR_DIR = PROJECT_ROOT / "data" / "instances"
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
REPORTS_DIR = PROJECT_ROOT / "reports"
def load_yaml(path: Path) -> Any:
"""Load YAML file, handling various formats."""
with open(path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def save_yaml(path: Path, data: Any) -> None:
"""Save data to YAML file with proper formatting."""
with open(path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120)
def normalize_isil(isil: str) -> str:
"""Normalize ISIL code for comparison."""
if not isil:
return ""
# Remove spaces, convert to uppercase
return isil.strip().upper().replace(" ", "")
def normalize_wikidata(qid: str) -> str:
"""Normalize Wikidata ID for comparison."""
if not qid:
return ""
# Extract Q-number if URL
if '/' in str(qid):
qid = str(qid).split('/')[-1]
return str(qid).strip().upper()
def normalize_name(name: str) -> str:
"""Normalize institution name for fuzzy matching."""
if not name:
return ""
# Lowercase, remove punctuation, collapse whitespace
name = name.lower()
name = re.sub(r'[^\w\s]', '', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def extract_identifiers_from_ch_annotator(institution: Dict) -> Dict[str, str]:
"""Extract normalized identifiers from CH-Annotator institution."""
identifiers = {}
for ident in institution.get('identifiers', []):
scheme = ident.get('identifier_scheme', '').upper()
value = ident.get('identifier_value', '')
if scheme == 'ISIL':
identifiers['isil'] = normalize_isil(value)
elif scheme == 'WIKIDATA':
identifiers['wikidata'] = normalize_wikidata(value)
# Also check for name
if institution.get('name'):
identifiers['name'] = normalize_name(institution['name'])
identifiers['name_original'] = institution['name']
return identifiers
def build_custodian_index(custodian_dir: Path) -> Dict[str, Dict]:
"""
Build an index of custodian files by various identifiers.
Returns dict with keys: 'by_isil', 'by_wikidata', 'by_ghcid', 'by_name'
Each maps to {identifier: file_path}
"""
index = {
'by_isil': {},
'by_wikidata': {},
'by_ghcid': {},
'by_name': {},
'all_files': {}
}
if not custodian_dir.exists():
print(f"Warning: Custodian directory does not exist: {custodian_dir}")
return index
yaml_files = list(custodian_dir.glob("*.yaml"))
print(f"Indexing {len(yaml_files)} custodian files...")
for file_path in yaml_files:
try:
data = load_yaml(file_path)
if not data:
continue
# Store file path
index['all_files'][file_path.name] = file_path
# Extract GHCID from filename or identifiers
ghcid = file_path.stem
index['by_ghcid'][ghcid] = file_path
# Extract identifiers
for ident in data.get('identifiers', []):
scheme = ident.get('identifier_scheme', '').upper()
value = ident.get('identifier_value', '')
if scheme == 'GHCID' and value:
index['by_ghcid'][value] = file_path
# Extract ISIL from original_entry
original = data.get('original_entry', {})
if original:
isil = original.get('isil-code_na') or original.get('isil_code') or original.get('ISIL')
if isil:
index['by_isil'][normalize_isil(isil)] = file_path
# Also check for name
name = original.get('organisatie') or original.get('name') or original.get('institution_name')
if name:
index['by_name'][normalize_name(name)] = file_path
# Extract Wikidata from enrichment
wikidata = data.get('wikidata_enrichment', {})
if wikidata:
qid = wikidata.get('wikidata_entity_id')
if qid:
index['by_wikidata'][normalize_wikidata(qid)] = file_path
# Also check original entry for wikidata
if original and original.get('wikidata_id'):
index['by_wikidata'][normalize_wikidata(original['wikidata_id'])] = file_path
except Exception as e:
print(f"Warning: Error processing {file_path.name}: {e}")
continue
print(f"Indexed: {len(index['by_isil'])} ISIL, {len(index['by_wikidata'])} Wikidata, "
f"{len(index['by_ghcid'])} GHCID, {len(index['by_name'])} names")
return index
def find_matching_custodian(
institution: Dict,
custodian_index: Dict
) -> Tuple[Optional[Path], str]:
"""
Find matching custodian file for a CH-Annotator institution.
Returns: (file_path, match_type) or (None, 'no_match')
"""
identifiers = extract_identifiers_from_ch_annotator(institution)
# Priority 1: ISIL match (most reliable)
if identifiers.get('isil'):
if identifiers['isil'] in custodian_index['by_isil']:
return custodian_index['by_isil'][identifiers['isil']], 'isil'
# Priority 2: Wikidata match
if identifiers.get('wikidata'):
if identifiers['wikidata'] in custodian_index['by_wikidata']:
return custodian_index['by_wikidata'][identifiers['wikidata']], 'wikidata'
# Priority 3: Exact name match (less reliable)
if identifiers.get('name'):
if identifiers['name'] in custodian_index['by_name']:
return custodian_index['by_name'][identifiers['name']], 'name'
return None, 'no_match'
def extract_ch_annotator_section(institution: Dict) -> Dict:
"""Extract the ch_annotator section from an institution."""
return institution.get('ch_annotator', {})
def add_ch_annotator_to_custodian(
custodian_data: Dict,
ch_annotator_section: Dict,
source_file: str
) -> Dict:
"""Add CH-Annotator section to custodian data."""
# Don't overwrite if already present (unless empty)
if 'ch_annotator' in custodian_data and custodian_data['ch_annotator']:
# Merge or update existing
existing = custodian_data['ch_annotator']
# Add integration note
ch_annotator_section['integration_note'] = {
'integrated_from': source_file,
'integration_date': datetime.now(timezone.utc).isoformat(),
'previous_annotation_present': True
}
else:
ch_annotator_section['integration_note'] = {
'integrated_from': source_file,
'integration_date': datetime.now(timezone.utc).isoformat()
}
custodian_data['ch_annotator'] = ch_annotator_section
return custodian_data
def load_ch_annotator_files() -> List[Tuple[Path, List[Dict]]]:
"""Load all CH-Annotator files and return list of (path, institutions)."""
files = list(CH_ANNOTATOR_DIR.glob("*_ch_annotator.yaml"))
results = []
for file_path in files:
try:
data = load_yaml(file_path)
# Handle different formats
if isinstance(data, list):
institutions = data
elif isinstance(data, dict):
institutions = data.get('institutions', [])
else:
print(f"Warning: Unexpected format in {file_path.name}")
continue
results.append((file_path, institutions))
print(f"Loaded {len(institutions)} institutions from {file_path.name}")
except Exception as e:
print(f"Error loading {file_path.name}: {e}")
continue
return results
def generate_report(
stats: Dict,
matches: List[Dict],
unmatched: List[Dict],
report_path: Path
) -> None:
"""Generate integration report."""
report = f"""# CH-Annotator to Custodian Integration Report
Generated: {datetime.now(timezone.utc).isoformat()}
## Summary
| Metric | Count |
|--------|-------|
| Total CH-Annotator files processed | {stats['files_processed']} |
| Total institutions in CH-Annotator files | {stats['total_institutions']} |
| Matched to existing custodian files | {stats['matched']} |
| - Matched by ISIL | {stats['match_by_isil']} |
| - Matched by Wikidata | {stats['match_by_wikidata']} |
| - Matched by Name | {stats['match_by_name']} |
| Unmatched (no custodian file) | {stats['unmatched']} |
| Custodian files updated | {stats['files_updated']} |
| New custodian files created | {stats['files_created']} |
| Errors | {stats['errors']} |
## Match Rate by Source File
| Source File | Institutions | Matched | Unmatched | Match Rate |
|-------------|--------------|---------|-----------|------------|
"""
for source, source_stats in stats.get('by_source', {}).items():
total = source_stats['total']
matched = source_stats['matched']
unmatched_count = source_stats['unmatched']
rate = f"{(matched/total*100):.1f}%" if total > 0 else "N/A"
report += f"| {source} | {total} | {matched} | {unmatched_count} | {rate} |\n"
report += f"""
## Match Details
### Matched Institutions (first 50)
| Institution Name | Match Type | Custodian File |
|------------------|------------|----------------|
"""
for match in matches[:50]:
name = match.get('name', 'Unknown')[:50]
match_type = match.get('match_type', 'unknown')
file_name = match.get('custodian_file', 'N/A')
if isinstance(file_name, Path):
file_name = file_name.name
report += f"| {name} | {match_type} | {file_name} |\n"
if len(matches) > 50:
report += f"\n... and {len(matches) - 50} more matches\n"
report += f"""
### Unmatched Institutions (first 50)
These institutions from CH-Annotator files have no matching custodian file:
| Institution Name | Source File | Identifiers |
|------------------|-------------|-------------|
"""
for inst in unmatched[:50]:
name = inst.get('name', 'Unknown')[:40]
source = inst.get('source_file', 'Unknown')
idents = []
for ident in inst.get('identifiers', [])[:2]:
idents.append(f"{ident.get('identifier_scheme')}: {ident.get('identifier_value', '')[:20]}")
ident_str = ", ".join(idents) if idents else "None"
report += f"| {name} | {source} | {ident_str} |\n"
if len(unmatched) > 50:
report += f"\n... and {len(unmatched) - 50} more unmatched institutions\n"
report += f"""
## Notes
- **ISIL match**: Most reliable, based on standard library/archive identifier
- **Wikidata match**: Reliable, based on unique Wikidata Q-number
- **Name match**: Less reliable, based on normalized name comparison
## Next Steps
1. Review unmatched institutions - may need manual matching or new custodian files
2. Validate integration by spot-checking updated custodian files
3. Run LinkML validation on updated files
"""
with open(report_path, 'w', encoding='utf-8') as f:
f.write(report)
print(f"\nReport saved to: {report_path}")
def main():
parser = argparse.ArgumentParser(description='Integrate CH-Annotator data into custodian files')
parser.add_argument('--dry-run', action='store_true', help='Preview changes without writing')
parser.add_argument('--report-only', action='store_true', help='Only generate report, no changes')
args = parser.parse_args()
print("=" * 60)
print("CH-Annotator to Custodian Integration")
print("=" * 60)
if args.dry_run:
print("DRY RUN MODE - No files will be modified")
if args.report_only:
print("REPORT ONLY MODE - Only generating matching report")
# Build index of existing custodian files
print("\n1. Building custodian file index...")
custodian_index = build_custodian_index(CUSTODIAN_DIR)
# Load CH-Annotator files
print("\n2. Loading CH-Annotator files...")
ch_annotator_files = load_ch_annotator_files()
if not ch_annotator_files:
print("No CH-Annotator files found!")
return
# Process institutions
print("\n3. Matching institutions to custodian files...")
stats = {
'files_processed': len(ch_annotator_files),
'total_institutions': 0,
'matched': 0,
'unmatched': 0,
'match_by_isil': 0,
'match_by_wikidata': 0,
'match_by_name': 0,
'files_updated': 0,
'files_created': 0,
'errors': 0,
'by_source': {}
}
all_matches = []
all_unmatched = []
files_to_update = {} # custodian_path -> list of ch_annotator sections
for source_path, institutions in ch_annotator_files:
source_name = source_path.name
source_stats = {'total': len(institutions), 'matched': 0, 'unmatched': 0}
for institution in institutions:
stats['total_institutions'] += 1
# Find matching custodian file
match_path, match_type = find_matching_custodian(institution, custodian_index)
if match_path:
stats['matched'] += 1
source_stats['matched'] += 1
if match_type == 'isil':
stats['match_by_isil'] += 1
elif match_type == 'wikidata':
stats['match_by_wikidata'] += 1
elif match_type == 'name':
stats['match_by_name'] += 1
all_matches.append({
'name': institution.get('name'),
'match_type': match_type,
'custodian_file': match_path,
'source_file': source_name
})
# Queue for update
if match_path not in files_to_update:
files_to_update[match_path] = []
files_to_update[match_path].append({
'institution': institution,
'source_file': source_name
})
else:
stats['unmatched'] += 1
source_stats['unmatched'] += 1
all_unmatched.append({
'name': institution.get('name'),
'identifiers': institution.get('identifiers', []),
'source_file': source_name
})
stats['by_source'][source_name] = source_stats
# Update custodian files
if not args.report_only and not args.dry_run:
print(f"\n4. Updating {len(files_to_update)} custodian files...")
for custodian_path, updates in files_to_update.items():
try:
custodian_data = load_yaml(custodian_path)
# Use the first matching institution's CH-Annotator section
# (usually there's only one match per custodian file)
first_update = updates[0]
ch_annotator_section = extract_ch_annotator_section(first_update['institution'])
if ch_annotator_section:
custodian_data = add_ch_annotator_to_custodian(
custodian_data,
ch_annotator_section,
first_update['source_file']
)
save_yaml(custodian_path, custodian_data)
stats['files_updated'] += 1
except Exception as e:
print(f"Error updating {custodian_path.name}: {e}")
stats['errors'] += 1
elif args.dry_run:
print(f"\n4. Would update {len(files_to_update)} custodian files (dry run)")
stats['files_updated'] = len(files_to_update)
# Generate report
print("\n5. Generating report...")
REPORTS_DIR.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = REPORTS_DIR / f"CH_ANNOTATOR_CUSTODIAN_INTEGRATION_{timestamp}.md"
generate_report(stats, all_matches, all_unmatched, report_path)
# Print summary
print("\n" + "=" * 60)
print("INTEGRATION SUMMARY")
print("=" * 60)
print(f"Total institutions processed: {stats['total_institutions']}")
print(f"Matched: {stats['matched']} ({stats['matched']/stats['total_institutions']*100:.1f}%)")
print(f" - By ISIL: {stats['match_by_isil']}")
print(f" - By Wikidata: {stats['match_by_wikidata']}")
print(f" - By Name: {stats['match_by_name']}")
print(f"Unmatched: {stats['unmatched']}")
print(f"Custodian files updated: {stats['files_updated']}")
print(f"Errors: {stats['errors']}")
return 0
if __name__ == '__main__':
sys.exit(main())