Enrichment scripts for country-specific city data: - enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py - enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py - enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py Location resolution utilities: - resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames - resolve_cities_wikidata.py - Use Wikidata P131 for city resolution - resolve_country_codes.py - Standardize country codes - resolve_cz_xx_regions.py - Fix Czech XX region codes - resolve_locations_by_name.py - Name-based location lookup - resolve_regions_from_city.py - Derive regions from city data - update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data CH-Annotator integration: - create_custodian_from_ch_annotator.py - Create custodians from annotations - add_ch_annotator_location_claims.py - Add location claims - extract_locations_ch_annotator.py - Extract locations from annotations Migration and fixes: - migrate_egyptian_from_ch.py - Migrate Egyptian data - migrate_web_archives.py - Migrate web archive data - fix_belgian_cities.py - Fix Belgian city data
547 lines
19 KiB
Python
547 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Create custodian files from CH-Annotator data for unmatched institutions.
|
|
|
|
This script:
|
|
1. Loads CH-Annotator files from data/instances/*_ch_annotator.yaml
|
|
2. Checks which institutions don't have custodian files yet
|
|
3. Generates GHCID for each new institution
|
|
4. Creates custodian files in data/custodian/
|
|
|
|
Usage:
|
|
python scripts/create_custodian_from_ch_annotator.py [--dry-run] [--limit N]
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import json
|
|
import re
|
|
import uuid
|
|
import hashlib
|
|
import argparse
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
|
|
# Paths
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
CH_ANNOTATOR_DIR = PROJECT_ROOT / "data" / "instances"
|
|
CUSTODIAN_DIR = PROJECT_ROOT / "data" / "custodian"
|
|
REPORTS_DIR = PROJECT_ROOT / "reports"
|
|
INDEX_FILE = Path("/tmp/custodian_index.json")
|
|
|
|
# GHCID namespace UUID for deterministic UUID generation
|
|
GHCID_NAMESPACE = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # URL namespace
|
|
|
|
# Institution type to GHCID code mapping
|
|
TYPE_TO_CODE = {
|
|
'GALLERY': 'G',
|
|
'LIBRARY': 'L',
|
|
'ARCHIVE': 'A',
|
|
'MUSEUM': 'M',
|
|
'OFFICIAL_INSTITUTION': 'O',
|
|
'RESEARCH_CENTER': 'R',
|
|
'CORPORATION': 'C',
|
|
'UNKNOWN': 'U',
|
|
'BOTANICAL_ZOO': 'B',
|
|
'EDUCATION_PROVIDER': 'E',
|
|
'COLLECTING_SOCIETY': 'S',
|
|
'FEATURES': 'F',
|
|
'INTANGIBLE_HERITAGE_GROUP': 'I',
|
|
'MIXED': 'X',
|
|
'PERSONAL_COLLECTION': 'P',
|
|
'HOLY_SITES': 'H',
|
|
'DIGITAL_PLATFORM': 'D',
|
|
'NGO': 'N',
|
|
'TASTE_SMELL': 'T',
|
|
}
|
|
|
|
# Prepositions/articles to skip in abbreviations
|
|
SKIP_WORDS = {
|
|
'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des',
|
|
'a', 'an', 'the', 'of', 'at', 'on', 'to', 'for', 'with', 'from', 'by',
|
|
'le', 'la', 'les', 'un', 'une', 'des', 'du', 'à', 'au', 'aux', 'en',
|
|
'der', 'die', 'das', 'dem', 'ein', 'eine', 'von', 'zu', 'für', 'mit',
|
|
'el', 'la', 'los', 'las', 'un', 'una', 'del', 'al', 'con', 'por', 'para',
|
|
'o', 'os', 'as', 'um', 'uma', 'do', 'da', 'dos', 'das', 'em', 'no', 'na',
|
|
'il', 'lo', 'i', 'gli', 'di', 'del', 'dello', 'della', 'nel', 'nella',
|
|
'and', 'or', 'but', 'und', 'oder', 'et', 'ou', 'e', 'y', 'o',
|
|
}
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize name for comparison."""
|
|
if not name:
|
|
return ""
|
|
name = name.lower()
|
|
name = re.sub(r'[^\w\s]', '', name)
|
|
name = re.sub(r'\s+', ' ', name).strip()
|
|
return name
|
|
|
|
|
|
def normalize_wikidata(qid: str) -> str:
|
|
"""Normalize Wikidata ID."""
|
|
if not qid:
|
|
return ""
|
|
if '/' in str(qid):
|
|
qid = str(qid).split('/')[-1]
|
|
return str(qid).strip().upper()
|
|
|
|
|
|
def generate_abbreviation(name: str, max_len: int = 10) -> str:
|
|
"""Generate abbreviation from institution name."""
|
|
if not name:
|
|
return "UNK"
|
|
|
|
# Remove special characters but keep letters and spaces
|
|
clean = re.sub(r'[^\w\s]', ' ', name)
|
|
words = clean.split()
|
|
|
|
# Filter out skip words and numbers
|
|
significant_words = [w for w in words if w.lower() not in SKIP_WORDS and not w.isdigit()]
|
|
|
|
if not significant_words:
|
|
significant_words = words[:3] # Fallback to first 3 words
|
|
|
|
# Take first letter of each word
|
|
abbrev = ''.join(w[0].upper() for w in significant_words if w)
|
|
|
|
# Limit length
|
|
return abbrev[:max_len] if abbrev else "UNK"
|
|
|
|
|
|
def name_to_snake_case(name: str) -> str:
|
|
"""Convert name to snake_case for file suffix."""
|
|
import unicodedata
|
|
|
|
# Normalize unicode
|
|
normalized = unicodedata.normalize('NFD', name)
|
|
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
|
|
# Lowercase and clean
|
|
lower = ascii_name.lower()
|
|
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lower)
|
|
underscored = re.sub(r'[\s\-]+', '_', no_punct)
|
|
clean = re.sub(r'[^a-z0-9_]', '', underscored)
|
|
final = re.sub(r'_+', '_', clean).strip('_')
|
|
|
|
return final[:50] # Limit length
|
|
|
|
|
|
def generate_ghcid(
|
|
country_code: str,
|
|
region_code: str,
|
|
city_code: str,
|
|
institution_type: str,
|
|
abbreviation: str,
|
|
name_suffix: Optional[str] = None
|
|
) -> str:
|
|
"""Generate GHCID string."""
|
|
type_code = TYPE_TO_CODE.get(institution_type, 'U')
|
|
ghcid = f"{country_code}-{region_code}-{city_code}-{type_code}-{abbreviation}"
|
|
if name_suffix:
|
|
ghcid = f"{ghcid}-{name_suffix}"
|
|
return ghcid
|
|
|
|
|
|
def generate_ghcid_uuid(ghcid: str) -> str:
|
|
"""Generate UUID v5 from GHCID string."""
|
|
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid))
|
|
|
|
|
|
def generate_ghcid_uuid_sha256(ghcid: str) -> str:
|
|
"""Generate UUID v8 (SHA-256 based) from GHCID string."""
|
|
sha256_hash = hashlib.sha256(ghcid.encode()).hexdigest()
|
|
# Format as UUID v8
|
|
uuid_str = f"{sha256_hash[:8]}-{sha256_hash[8:12]}-8{sha256_hash[13:16]}-{sha256_hash[16:20]}-{sha256_hash[20:32]}"
|
|
return uuid_str
|
|
|
|
|
|
def generate_ghcid_numeric(ghcid: str) -> int:
|
|
"""Generate 64-bit numeric ID from GHCID."""
|
|
sha256_hash = hashlib.sha256(ghcid.encode()).digest()
|
|
return int.from_bytes(sha256_hash[:8], 'big')
|
|
|
|
|
|
def load_custodian_index() -> Dict:
|
|
"""Load or build custodian index."""
|
|
if INDEX_FILE.exists():
|
|
with open(INDEX_FILE, 'r') as f:
|
|
return json.load(f)
|
|
|
|
# Build index
|
|
print("Building custodian index...")
|
|
index = {'by_wikidata': {}, 'by_name': {}, 'by_isil': {}, 'by_ghcid': {}}
|
|
|
|
for f in CUSTODIAN_DIR.glob("*.yaml"):
|
|
try:
|
|
with open(f, 'r') as fh:
|
|
content = fh.read()
|
|
|
|
# Extract GHCID from filename
|
|
ghcid = f.stem
|
|
index['by_ghcid'][ghcid] = str(f)
|
|
|
|
# Extract Wikidata
|
|
match = re.search(r'wikidata_entity_id:\s*["\']?(Q\d+)', content)
|
|
if match:
|
|
index['by_wikidata'][match.group(1).upper()] = str(f)
|
|
|
|
# Extract name
|
|
match = re.search(r'organisatie:\s*(.+?)$', content, re.MULTILINE)
|
|
if match:
|
|
name = match.group(1).strip().strip('"\'')
|
|
index['by_name'][normalize_name(name)] = str(f)
|
|
|
|
except:
|
|
pass
|
|
|
|
with open(INDEX_FILE, 'w') as f:
|
|
json.dump(index, f)
|
|
|
|
return index
|
|
|
|
|
|
def institution_exists(inst: Dict, index: Dict) -> bool:
|
|
"""Check if institution already has a custodian file."""
|
|
# Check Wikidata
|
|
for ident in inst.get('identifiers', []):
|
|
if ident.get('identifier_scheme', '').upper() == 'WIKIDATA':
|
|
qid = normalize_wikidata(ident.get('identifier_value', ''))
|
|
if qid and qid in index['by_wikidata']:
|
|
return True
|
|
|
|
# Check name
|
|
name = normalize_name(inst.get('name', ''))
|
|
if name and name in index['by_name']:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def sanitize_code(code: str, max_len: int = 2) -> str:
|
|
"""Sanitize a code for use in filenames and GHCIDs.
|
|
|
|
- Removes diacritics
|
|
- Keeps only alphanumeric chars
|
|
- Converts to uppercase
|
|
- Truncates to max_len
|
|
"""
|
|
import unicodedata
|
|
if not code:
|
|
return "XX" if max_len == 2 else "XXX"
|
|
|
|
# Normalize unicode and remove diacritics
|
|
normalized = unicodedata.normalize('NFD', str(code))
|
|
ascii_only = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
|
|
# Keep only alphanumeric
|
|
clean = re.sub(r'[^a-zA-Z0-9]', '', ascii_only)
|
|
|
|
if not clean:
|
|
return "XX" if max_len == 2 else "XXX"
|
|
|
|
return clean[:max_len].upper()
|
|
|
|
|
|
def extract_location_info(inst: Dict) -> Tuple[str, str, str]:
|
|
"""Extract country, region, city codes from institution."""
|
|
locations = inst.get('locations', [])
|
|
|
|
country_code = "XX"
|
|
region_code = "XX"
|
|
city_code = "XXX"
|
|
|
|
if locations:
|
|
loc = locations[0]
|
|
country_code = loc.get('country', 'XX') or 'XX'
|
|
|
|
# Region: if it's a 2-letter code, use it; otherwise sanitize
|
|
region_raw = loc.get('region', 'XX') or 'XX'
|
|
if len(region_raw) == 2 and region_raw.isalpha():
|
|
region_code = region_raw.upper()
|
|
else:
|
|
# It's a full region name - take first 2 letters
|
|
region_code = sanitize_code(region_raw, 2)
|
|
|
|
# City: generate 3-letter code
|
|
city = loc.get('city', '')
|
|
if city:
|
|
city_code = sanitize_code(city, 3)
|
|
|
|
return country_code, region_code, city_code
|
|
|
|
|
|
def create_custodian_file(inst: Dict, source_file: str, index: Dict) -> Tuple[Optional[Path], str]:
|
|
"""
|
|
Create a custodian file for an institution.
|
|
|
|
Returns: (file_path, status) where status is 'created', 'exists', or 'error'
|
|
"""
|
|
try:
|
|
name = inst.get('name', 'Unknown Institution')
|
|
institution_type = inst.get('institution_type', 'UNKNOWN')
|
|
|
|
# Extract location
|
|
country_code, region_code, city_code = extract_location_info(inst)
|
|
|
|
# Generate abbreviation
|
|
abbreviation = generate_abbreviation(name)
|
|
|
|
# Generate base GHCID
|
|
base_ghcid = generate_ghcid(country_code, region_code, city_code, institution_type, abbreviation)
|
|
|
|
# Check for collision
|
|
ghcid = base_ghcid
|
|
if ghcid in index['by_ghcid']:
|
|
# Add name suffix to resolve collision
|
|
name_suffix = name_to_snake_case(name)
|
|
ghcid = generate_ghcid(country_code, region_code, city_code, institution_type, abbreviation, name_suffix)
|
|
|
|
# Generate UUIDs
|
|
ghcid_uuid = generate_ghcid_uuid(ghcid)
|
|
ghcid_uuid_sha256 = generate_ghcid_uuid_sha256(ghcid)
|
|
ghcid_numeric = generate_ghcid_numeric(ghcid)
|
|
record_id = str(uuid.uuid4())
|
|
|
|
timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
# Build custodian data structure
|
|
custodian_data = {
|
|
'original_entry': {
|
|
'name': name,
|
|
'institution_type': institution_type,
|
|
'source': f'CH-Annotator ({source_file})',
|
|
'identifiers': inst.get('identifiers', []),
|
|
'locations': inst.get('locations', []),
|
|
},
|
|
'processing_timestamp': timestamp,
|
|
'ghcid': {
|
|
'ghcid_current': ghcid,
|
|
'ghcid_original': ghcid,
|
|
'ghcid_uuid': ghcid_uuid,
|
|
'ghcid_uuid_sha256': ghcid_uuid_sha256,
|
|
'ghcid_numeric': ghcid_numeric,
|
|
'record_id': record_id,
|
|
'generation_timestamp': timestamp,
|
|
'location_resolution': {
|
|
'country_code': country_code,
|
|
'region_code': region_code,
|
|
'city_code': city_code,
|
|
'method': 'CH_ANNOTATOR_SOURCE',
|
|
},
|
|
'ghcid_history': [{
|
|
'ghcid': ghcid,
|
|
'ghcid_numeric': ghcid_numeric,
|
|
'valid_from': timestamp,
|
|
'reason': f'Initial GHCID from CH-Annotator ({source_file})',
|
|
}],
|
|
},
|
|
'custodian_name': {
|
|
'claim_type': 'custodian_name',
|
|
'claim_value': name,
|
|
'source_type': 'ch_annotator',
|
|
},
|
|
'identifiers': [
|
|
{'identifier_scheme': 'GHCID', 'identifier_value': ghcid},
|
|
{'identifier_scheme': 'GHCID_UUID', 'identifier_value': ghcid_uuid},
|
|
{'identifier_scheme': 'GHCID_UUID_SHA256', 'identifier_value': ghcid_uuid_sha256},
|
|
{'identifier_scheme': 'GHCID_NUMERIC', 'identifier_value': str(ghcid_numeric)},
|
|
{'identifier_scheme': 'RECORD_ID', 'identifier_value': record_id},
|
|
],
|
|
'provenance': {
|
|
'data_source': inst.get('provenance', {}).get('data_source', 'CH_ANNOTATOR'),
|
|
'data_tier': inst.get('provenance', {}).get('data_tier', 'TIER_3_CROWD_SOURCED'),
|
|
'extraction_date': inst.get('provenance', {}).get('extraction_date', timestamp),
|
|
'extraction_method': f'Created from CH-Annotator file: {source_file}',
|
|
'confidence_score': inst.get('provenance', {}).get('confidence_score', 0.8),
|
|
},
|
|
'ch_annotator': inst.get('ch_annotator', {}),
|
|
}
|
|
|
|
# Add original identifiers
|
|
for ident in inst.get('identifiers', []):
|
|
scheme = ident.get('identifier_scheme', '').upper()
|
|
if scheme not in ['GHCID', 'GHCID_UUID', 'GHCID_UUID_SHA256', 'GHCID_NUMERIC', 'RECORD_ID']:
|
|
custodian_data['identifiers'].append(ident)
|
|
|
|
# Add Wikidata enrichment if available
|
|
for ident in inst.get('identifiers', []):
|
|
if ident.get('identifier_scheme', '').upper() == 'WIKIDATA':
|
|
custodian_data['wikidata_enrichment'] = {
|
|
'wikidata_entity_id': ident.get('identifier_value', '').split('/')[-1],
|
|
'wikidata_label_en': name,
|
|
}
|
|
break
|
|
|
|
# Add integration note to ch_annotator
|
|
if 'ch_annotator' in custodian_data and custodian_data['ch_annotator']:
|
|
custodian_data['ch_annotator']['integration_note'] = {
|
|
'created_from': source_file,
|
|
'creation_date': timestamp,
|
|
'creation_method': 'create_custodian_from_ch_annotator.py',
|
|
}
|
|
|
|
# Create file
|
|
file_path = CUSTODIAN_DIR / f"{ghcid}.yaml"
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(custodian_data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120)
|
|
|
|
# Update index
|
|
index['by_ghcid'][ghcid] = str(file_path)
|
|
if normalize_name(name):
|
|
index['by_name'][normalize_name(name)] = str(file_path)
|
|
|
|
return file_path, 'created'
|
|
|
|
except Exception as e:
|
|
return None, f'error: {e}'
|
|
|
|
|
|
def load_ch_annotator_file(path: Path) -> List[Dict]:
|
|
"""Load institutions from CH-Annotator file."""
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if isinstance(data, list):
|
|
return data
|
|
elif isinstance(data, dict):
|
|
return data.get('institutions', [])
|
|
return []
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Create custodian files from CH-Annotator data')
|
|
parser.add_argument('--dry-run', action='store_true', help='Preview without creating files')
|
|
parser.add_argument('--limit', type=int, default=0, help='Limit institutions per file (0=unlimited)')
|
|
parser.add_argument('--skip-large', action='store_true', help='Skip files with >5000 institutions')
|
|
args = parser.parse_args()
|
|
|
|
print("=" * 60)
|
|
print("Create Custodian Files from CH-Annotator Data")
|
|
print("=" * 60)
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN MODE - No files will be created")
|
|
|
|
# Load index
|
|
print("\n1. Loading custodian index...")
|
|
index = load_custodian_index()
|
|
print(f" Indexed: {len(index.get('by_ghcid', {}))} GHCIDs, "
|
|
f"{len(index.get('by_wikidata', {}))} Wikidata, "
|
|
f"{len(index.get('by_name', {}))} names")
|
|
|
|
# Find CH-Annotator files
|
|
ch_files = sorted(CH_ANNOTATOR_DIR.glob("*_ch_annotator.yaml"))
|
|
print(f"\n2. Found {len(ch_files)} CH-Annotator files")
|
|
|
|
# Process files
|
|
total_stats = {
|
|
'processed': 0,
|
|
'created': 0,
|
|
'skipped_exists': 0,
|
|
'errors': 0,
|
|
'by_source': {},
|
|
}
|
|
|
|
for ch_file in ch_files:
|
|
print(f"\n--- {ch_file.name} ---")
|
|
|
|
try:
|
|
institutions = load_ch_annotator_file(ch_file)
|
|
print(f" Loaded {len(institutions)} institutions")
|
|
|
|
if args.skip_large and len(institutions) > 5000:
|
|
print(f" SKIPPING (>5000 institutions)")
|
|
continue
|
|
|
|
file_stats = {'processed': 0, 'created': 0, 'skipped': 0, 'errors': 0}
|
|
|
|
for i, inst in enumerate(institutions):
|
|
if args.limit and file_stats['processed'] >= args.limit:
|
|
print(f" Reached limit of {args.limit}")
|
|
break
|
|
|
|
if i % 500 == 0 and i > 0:
|
|
print(f" Progress: {i}/{len(institutions)}, created: {file_stats['created']}")
|
|
|
|
file_stats['processed'] += 1
|
|
total_stats['processed'] += 1
|
|
|
|
# Check if exists
|
|
if institution_exists(inst, index):
|
|
file_stats['skipped'] += 1
|
|
total_stats['skipped_exists'] += 1
|
|
continue
|
|
|
|
# Create file
|
|
if not args.dry_run:
|
|
path, status = create_custodian_file(inst, ch_file.name, index)
|
|
|
|
if status == 'created':
|
|
file_stats['created'] += 1
|
|
total_stats['created'] += 1
|
|
elif 'error' in status:
|
|
file_stats['errors'] += 1
|
|
total_stats['errors'] += 1
|
|
else:
|
|
file_stats['created'] += 1
|
|
total_stats['created'] += 1
|
|
|
|
print(f" Processed: {file_stats['processed']}, Created: {file_stats['created']}, "
|
|
f"Skipped: {file_stats['skipped']}, Errors: {file_stats['errors']}")
|
|
|
|
total_stats['by_source'][ch_file.name] = file_stats
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
total_stats['errors'] += 1
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total processed: {total_stats['processed']}")
|
|
print(f"Files created: {total_stats['created']}")
|
|
print(f"Skipped (already exist): {total_stats['skipped_exists']}")
|
|
print(f"Errors: {total_stats['errors']}")
|
|
|
|
# Save report
|
|
if not args.dry_run:
|
|
REPORTS_DIR.mkdir(exist_ok=True)
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_path = REPORTS_DIR / f"CUSTODIAN_CREATION_REPORT_{timestamp}.md"
|
|
|
|
report = f"""# Custodian File Creation Report
|
|
|
|
Generated: {datetime.now(timezone.utc).isoformat()}
|
|
|
|
## Summary
|
|
|
|
| Metric | Count |
|
|
|--------|-------|
|
|
| Institutions processed | {total_stats['processed']} |
|
|
| Custodian files created | {total_stats['created']} |
|
|
| Skipped (already exist) | {total_stats['skipped_exists']} |
|
|
| Errors | {total_stats['errors']} |
|
|
|
|
## By Source File
|
|
|
|
| Source File | Processed | Created | Skipped | Errors |
|
|
|-------------|-----------|---------|---------|--------|
|
|
"""
|
|
for source, stats in total_stats['by_source'].items():
|
|
report += f"| {source} | {stats['processed']} | {stats['created']} | {stats['skipped']} | {stats['errors']} |\n"
|
|
|
|
with open(report_path, 'w') as f:
|
|
f.write(report)
|
|
|
|
print(f"\nReport saved to: {report_path}")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|