515 lines
18 KiB
Python
515 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Integrate CH-Annotator data into custodian files.
|
|
|
|
This script:
|
|
1. Reads CH-Annotator enhanced files from data/instances/*_ch_annotator.yaml
|
|
2. Matches institutions to existing custodian files by ISIL, Wikidata ID, or name
|
|
3. Adds ch_annotator section to matching custodian files
|
|
4. Creates new custodian files for unmatched institutions
|
|
5. Generates an integration report
|
|
|
|
Usage:
|
|
python scripts/integrate_ch_annotator_to_custodian.py [--dry-run] [--report-only]
|
|
|
|
Options:
|
|
--dry-run Preview changes without writing files
|
|
--report-only Only generate matching report, no file changes
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from collections import defaultdict
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
import re
|
|
|
|
|
|
# Paths
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
CH_ANNOTATOR_DIR = PROJECT_ROOT / "data" / "instances"
|
|
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
|
|
REPORTS_DIR = PROJECT_ROOT / "reports"
|
|
|
|
|
|
def load_yaml(path: Path) -> Any:
|
|
"""Load YAML file, handling various formats."""
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
|
|
def save_yaml(path: Path, data: Any) -> None:
|
|
"""Save data to YAML file with proper formatting."""
|
|
with open(path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120)
|
|
|
|
|
|
def normalize_isil(isil: str) -> str:
|
|
"""Normalize ISIL code for comparison."""
|
|
if not isil:
|
|
return ""
|
|
# Remove spaces, convert to uppercase
|
|
return isil.strip().upper().replace(" ", "")
|
|
|
|
|
|
def normalize_wikidata(qid: str) -> str:
|
|
"""Normalize Wikidata ID for comparison."""
|
|
if not qid:
|
|
return ""
|
|
# Extract Q-number if URL
|
|
if '/' in str(qid):
|
|
qid = str(qid).split('/')[-1]
|
|
return str(qid).strip().upper()
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize institution name for fuzzy matching."""
|
|
if not name:
|
|
return ""
|
|
# Lowercase, remove punctuation, collapse whitespace
|
|
name = name.lower()
|
|
name = re.sub(r'[^\w\s]', '', name)
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
return name
|
|
|
|
|
|
def extract_identifiers_from_ch_annotator(institution: Dict) -> Dict[str, str]:
|
|
"""Extract normalized identifiers from CH-Annotator institution."""
|
|
identifiers = {}
|
|
|
|
for ident in institution.get('identifiers', []):
|
|
scheme = ident.get('identifier_scheme', '').upper()
|
|
value = ident.get('identifier_value', '')
|
|
|
|
if scheme == 'ISIL':
|
|
identifiers['isil'] = normalize_isil(value)
|
|
elif scheme == 'WIKIDATA':
|
|
identifiers['wikidata'] = normalize_wikidata(value)
|
|
|
|
# Also check for name
|
|
if institution.get('name'):
|
|
identifiers['name'] = normalize_name(institution['name'])
|
|
identifiers['name_original'] = institution['name']
|
|
|
|
return identifiers
|
|
|
|
|
|
def build_custodian_index(custodian_dir: Path) -> Dict[str, Dict]:
|
|
"""
|
|
Build an index of custodian files by various identifiers.
|
|
|
|
Returns dict with keys: 'by_isil', 'by_wikidata', 'by_ghcid', 'by_name'
|
|
Each maps to {identifier: file_path}
|
|
"""
|
|
index = {
|
|
'by_isil': {},
|
|
'by_wikidata': {},
|
|
'by_ghcid': {},
|
|
'by_name': {},
|
|
'all_files': {}
|
|
}
|
|
|
|
if not custodian_dir.exists():
|
|
print(f"Warning: Custodian directory does not exist: {custodian_dir}")
|
|
return index
|
|
|
|
yaml_files = list(custodian_dir.glob("*.yaml"))
|
|
print(f"Indexing {len(yaml_files)} custodian files...")
|
|
|
|
for file_path in yaml_files:
|
|
try:
|
|
data = load_yaml(file_path)
|
|
if not data:
|
|
continue
|
|
|
|
# Store file path
|
|
index['all_files'][file_path.name] = file_path
|
|
|
|
# Extract GHCID from filename or identifiers
|
|
ghcid = file_path.stem
|
|
index['by_ghcid'][ghcid] = file_path
|
|
|
|
# Extract identifiers
|
|
for ident in data.get('identifiers', []):
|
|
scheme = ident.get('identifier_scheme', '').upper()
|
|
value = ident.get('identifier_value', '')
|
|
|
|
if scheme == 'GHCID' and value:
|
|
index['by_ghcid'][value] = file_path
|
|
|
|
# Extract ISIL from original_entry
|
|
original = data.get('original_entry', {})
|
|
if original:
|
|
isil = original.get('isil-code_na') or original.get('isil_code') or original.get('ISIL')
|
|
if isil:
|
|
index['by_isil'][normalize_isil(isil)] = file_path
|
|
|
|
# Also check for name
|
|
name = original.get('organisatie') or original.get('name') or original.get('institution_name')
|
|
if name:
|
|
index['by_name'][normalize_name(name)] = file_path
|
|
|
|
# Extract Wikidata from enrichment
|
|
wikidata = data.get('wikidata_enrichment', {})
|
|
if wikidata:
|
|
qid = wikidata.get('wikidata_entity_id')
|
|
if qid:
|
|
index['by_wikidata'][normalize_wikidata(qid)] = file_path
|
|
|
|
# Also check original entry for wikidata
|
|
if original and original.get('wikidata_id'):
|
|
index['by_wikidata'][normalize_wikidata(original['wikidata_id'])] = file_path
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Error processing {file_path.name}: {e}")
|
|
continue
|
|
|
|
print(f"Indexed: {len(index['by_isil'])} ISIL, {len(index['by_wikidata'])} Wikidata, "
|
|
f"{len(index['by_ghcid'])} GHCID, {len(index['by_name'])} names")
|
|
|
|
return index
|
|
|
|
|
|
def find_matching_custodian(
|
|
institution: Dict,
|
|
custodian_index: Dict
|
|
) -> Tuple[Optional[Path], str]:
|
|
"""
|
|
Find matching custodian file for a CH-Annotator institution.
|
|
|
|
Returns: (file_path, match_type) or (None, 'no_match')
|
|
"""
|
|
identifiers = extract_identifiers_from_ch_annotator(institution)
|
|
|
|
# Priority 1: ISIL match (most reliable)
|
|
if identifiers.get('isil'):
|
|
if identifiers['isil'] in custodian_index['by_isil']:
|
|
return custodian_index['by_isil'][identifiers['isil']], 'isil'
|
|
|
|
# Priority 2: Wikidata match
|
|
if identifiers.get('wikidata'):
|
|
if identifiers['wikidata'] in custodian_index['by_wikidata']:
|
|
return custodian_index['by_wikidata'][identifiers['wikidata']], 'wikidata'
|
|
|
|
# Priority 3: Exact name match (less reliable)
|
|
if identifiers.get('name'):
|
|
if identifiers['name'] in custodian_index['by_name']:
|
|
return custodian_index['by_name'][identifiers['name']], 'name'
|
|
|
|
return None, 'no_match'
|
|
|
|
|
|
def extract_ch_annotator_section(institution: Dict) -> Dict:
|
|
"""Extract the ch_annotator section from an institution."""
|
|
return institution.get('ch_annotator', {})
|
|
|
|
|
|
def add_ch_annotator_to_custodian(
|
|
custodian_data: Dict,
|
|
ch_annotator_section: Dict,
|
|
source_file: str
|
|
) -> Dict:
|
|
"""Add CH-Annotator section to custodian data."""
|
|
# Don't overwrite if already present (unless empty)
|
|
if 'ch_annotator' in custodian_data and custodian_data['ch_annotator']:
|
|
# Merge or update existing
|
|
existing = custodian_data['ch_annotator']
|
|
|
|
# Add integration note
|
|
ch_annotator_section['integration_note'] = {
|
|
'integrated_from': source_file,
|
|
'integration_date': datetime.now(timezone.utc).isoformat(),
|
|
'previous_annotation_present': True
|
|
}
|
|
else:
|
|
ch_annotator_section['integration_note'] = {
|
|
'integrated_from': source_file,
|
|
'integration_date': datetime.now(timezone.utc).isoformat()
|
|
}
|
|
|
|
custodian_data['ch_annotator'] = ch_annotator_section
|
|
return custodian_data
|
|
|
|
|
|
def load_ch_annotator_files() -> List[Tuple[Path, List[Dict]]]:
|
|
"""Load all CH-Annotator files and return list of (path, institutions)."""
|
|
files = list(CH_ANNOTATOR_DIR.glob("*_ch_annotator.yaml"))
|
|
results = []
|
|
|
|
for file_path in files:
|
|
try:
|
|
data = load_yaml(file_path)
|
|
|
|
# Handle different formats
|
|
if isinstance(data, list):
|
|
institutions = data
|
|
elif isinstance(data, dict):
|
|
institutions = data.get('institutions', [])
|
|
else:
|
|
print(f"Warning: Unexpected format in {file_path.name}")
|
|
continue
|
|
|
|
results.append((file_path, institutions))
|
|
print(f"Loaded {len(institutions)} institutions from {file_path.name}")
|
|
|
|
except Exception as e:
|
|
print(f"Error loading {file_path.name}: {e}")
|
|
continue
|
|
|
|
return results
|
|
|
|
|
|
def generate_report(
|
|
stats: Dict,
|
|
matches: List[Dict],
|
|
unmatched: List[Dict],
|
|
report_path: Path
|
|
) -> None:
|
|
"""Generate integration report."""
|
|
|
|
report = f"""# CH-Annotator to Custodian Integration Report
|
|
|
|
Generated: {datetime.now(timezone.utc).isoformat()}
|
|
|
|
## Summary
|
|
|
|
| Metric | Count |
|
|
|--------|-------|
|
|
| Total CH-Annotator files processed | {stats['files_processed']} |
|
|
| Total institutions in CH-Annotator files | {stats['total_institutions']} |
|
|
| Matched to existing custodian files | {stats['matched']} |
|
|
| - Matched by ISIL | {stats['match_by_isil']} |
|
|
| - Matched by Wikidata | {stats['match_by_wikidata']} |
|
|
| - Matched by Name | {stats['match_by_name']} |
|
|
| Unmatched (no custodian file) | {stats['unmatched']} |
|
|
| Custodian files updated | {stats['files_updated']} |
|
|
| New custodian files created | {stats['files_created']} |
|
|
| Errors | {stats['errors']} |
|
|
|
|
## Match Rate by Source File
|
|
|
|
| Source File | Institutions | Matched | Unmatched | Match Rate |
|
|
|-------------|--------------|---------|-----------|------------|
|
|
"""
|
|
|
|
for source, source_stats in stats.get('by_source', {}).items():
|
|
total = source_stats['total']
|
|
matched = source_stats['matched']
|
|
unmatched_count = source_stats['unmatched']
|
|
rate = f"{(matched/total*100):.1f}%" if total > 0 else "N/A"
|
|
report += f"| {source} | {total} | {matched} | {unmatched_count} | {rate} |\n"
|
|
|
|
report += f"""
|
|
## Match Details
|
|
|
|
### Matched Institutions (first 50)
|
|
|
|
| Institution Name | Match Type | Custodian File |
|
|
|------------------|------------|----------------|
|
|
"""
|
|
|
|
for match in matches[:50]:
|
|
name = match.get('name', 'Unknown')[:50]
|
|
match_type = match.get('match_type', 'unknown')
|
|
file_name = match.get('custodian_file', 'N/A')
|
|
if isinstance(file_name, Path):
|
|
file_name = file_name.name
|
|
report += f"| {name} | {match_type} | {file_name} |\n"
|
|
|
|
if len(matches) > 50:
|
|
report += f"\n... and {len(matches) - 50} more matches\n"
|
|
|
|
report += f"""
|
|
### Unmatched Institutions (first 50)
|
|
|
|
These institutions from CH-Annotator files have no matching custodian file:
|
|
|
|
| Institution Name | Source File | Identifiers |
|
|
|------------------|-------------|-------------|
|
|
"""
|
|
|
|
for inst in unmatched[:50]:
|
|
name = inst.get('name', 'Unknown')[:40]
|
|
source = inst.get('source_file', 'Unknown')
|
|
idents = []
|
|
for ident in inst.get('identifiers', [])[:2]:
|
|
idents.append(f"{ident.get('identifier_scheme')}: {ident.get('identifier_value', '')[:20]}")
|
|
ident_str = ", ".join(idents) if idents else "None"
|
|
report += f"| {name} | {source} | {ident_str} |\n"
|
|
|
|
if len(unmatched) > 50:
|
|
report += f"\n... and {len(unmatched) - 50} more unmatched institutions\n"
|
|
|
|
report += f"""
|
|
## Notes
|
|
|
|
- **ISIL match**: Most reliable, based on standard library/archive identifier
|
|
- **Wikidata match**: Reliable, based on unique Wikidata Q-number
|
|
- **Name match**: Less reliable, based on normalized name comparison
|
|
|
|
## Next Steps
|
|
|
|
1. Review unmatched institutions - may need manual matching or new custodian files
|
|
2. Validate integration by spot-checking updated custodian files
|
|
3. Run LinkML validation on updated files
|
|
"""
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
f.write(report)
|
|
|
|
print(f"\nReport saved to: {report_path}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Integrate CH-Annotator data into custodian files')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview changes without writing')
|
|
parser.add_argument('--report-only', action='store_true', help='Only generate report, no changes')
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print("CH-Annotator to Custodian Integration")
|
|
print("=" * 60)
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN MODE - No files will be modified")
|
|
if args.report_only:
|
|
print("REPORT ONLY MODE - Only generating matching report")
|
|
|
|
# Build index of existing custodian files
|
|
print("\n1. Building custodian file index...")
|
|
custodian_index = build_custodian_index(CUSTODIAN_DIR)
|
|
|
|
# Load CH-Annotator files
|
|
print("\n2. Loading CH-Annotator files...")
|
|
ch_annotator_files = load_ch_annotator_files()
|
|
|
|
if not ch_annotator_files:
|
|
print("No CH-Annotator files found!")
|
|
return
|
|
|
|
# Process institutions
|
|
print("\n3. Matching institutions to custodian files...")
|
|
|
|
stats = {
|
|
'files_processed': len(ch_annotator_files),
|
|
'total_institutions': 0,
|
|
'matched': 0,
|
|
'unmatched': 0,
|
|
'match_by_isil': 0,
|
|
'match_by_wikidata': 0,
|
|
'match_by_name': 0,
|
|
'files_updated': 0,
|
|
'files_created': 0,
|
|
'errors': 0,
|
|
'by_source': {}
|
|
}
|
|
|
|
all_matches = []
|
|
all_unmatched = []
|
|
files_to_update = {} # custodian_path -> list of ch_annotator sections
|
|
|
|
for source_path, institutions in ch_annotator_files:
|
|
source_name = source_path.name
|
|
source_stats = {'total': len(institutions), 'matched': 0, 'unmatched': 0}
|
|
|
|
for institution in institutions:
|
|
stats['total_institutions'] += 1
|
|
|
|
# Find matching custodian file
|
|
match_path, match_type = find_matching_custodian(institution, custodian_index)
|
|
|
|
if match_path:
|
|
stats['matched'] += 1
|
|
source_stats['matched'] += 1
|
|
|
|
if match_type == 'isil':
|
|
stats['match_by_isil'] += 1
|
|
elif match_type == 'wikidata':
|
|
stats['match_by_wikidata'] += 1
|
|
elif match_type == 'name':
|
|
stats['match_by_name'] += 1
|
|
|
|
all_matches.append({
|
|
'name': institution.get('name'),
|
|
'match_type': match_type,
|
|
'custodian_file': match_path,
|
|
'source_file': source_name
|
|
})
|
|
|
|
# Queue for update
|
|
if match_path not in files_to_update:
|
|
files_to_update[match_path] = []
|
|
files_to_update[match_path].append({
|
|
'institution': institution,
|
|
'source_file': source_name
|
|
})
|
|
else:
|
|
stats['unmatched'] += 1
|
|
source_stats['unmatched'] += 1
|
|
all_unmatched.append({
|
|
'name': institution.get('name'),
|
|
'identifiers': institution.get('identifiers', []),
|
|
'source_file': source_name
|
|
})
|
|
|
|
stats['by_source'][source_name] = source_stats
|
|
|
|
# Update custodian files
|
|
if not args.report_only and not args.dry_run:
|
|
print(f"\n4. Updating {len(files_to_update)} custodian files...")
|
|
|
|
for custodian_path, updates in files_to_update.items():
|
|
try:
|
|
custodian_data = load_yaml(custodian_path)
|
|
|
|
# Use the first matching institution's CH-Annotator section
|
|
# (usually there's only one match per custodian file)
|
|
first_update = updates[0]
|
|
ch_annotator_section = extract_ch_annotator_section(first_update['institution'])
|
|
|
|
if ch_annotator_section:
|
|
custodian_data = add_ch_annotator_to_custodian(
|
|
custodian_data,
|
|
ch_annotator_section,
|
|
first_update['source_file']
|
|
)
|
|
|
|
save_yaml(custodian_path, custodian_data)
|
|
stats['files_updated'] += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error updating {custodian_path.name}: {e}")
|
|
stats['errors'] += 1
|
|
elif args.dry_run:
|
|
print(f"\n4. Would update {len(files_to_update)} custodian files (dry run)")
|
|
stats['files_updated'] = len(files_to_update)
|
|
|
|
# Generate report
|
|
print("\n5. Generating report...")
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_path = REPORTS_DIR / f"CH_ANNOTATOR_CUSTODIAN_INTEGRATION_{timestamp}.md"
|
|
|
|
generate_report(stats, all_matches, all_unmatched, report_path)
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("INTEGRATION SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total institutions processed: {stats['total_institutions']}")
|
|
print(f"Matched: {stats['matched']} ({stats['matched']/stats['total_institutions']*100:.1f}%)")
|
|
print(f" - By ISIL: {stats['match_by_isil']}")
|
|
print(f" - By Wikidata: {stats['match_by_wikidata']}")
|
|
print(f" - By Name: {stats['match_by_name']}")
|
|
print(f"Unmatched: {stats['unmatched']}")
|
|
print(f"Custodian files updated: {stats['files_updated']}")
|
|
print(f"Errors: {stats['errors']}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|