glam/scripts/enrich_by_location.py
2025-12-21 00:01:54 +01:00

887 lines
30 KiB
Python

#!/usr/bin/env python3
"""
Enrich custodian files with Wikidata Q-numbers using location-based matching.
Uses coordinates from custodian files to find nearby Wikidata heritage institutions,
then applies fuzzy name matching for verification.
Process:
1. Find custodian files with coordinates but no wikidata_enrichment
2. For each file, query Wikidata for heritage institutions within radius
3. Fuzzy match by name (higher threshold since we have location proximity)
4. Add Wikidata identifiers to matched files
Usage:
python scripts/enrich_by_location.py --country AT [--limit N] [--dry-run] [--radius 2.0] [--threshold 80]
"""
import yaml
import requests
import argparse
import sys
import time
import re
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from datetime import datetime, timezone
from rapidfuzz import fuzz
# Wikidata SPARQL endpoint
WIKIDATA_SPARQL = "https://query.wikidata.org/sparql"
# Country configurations
COUNTRY_CONFIG = {
"AT": {
"qid": "Q40",
"name": "Austria",
"languages": "de,en",
},
"BE": {
"qid": "Q31",
"name": "Belgium",
"languages": "nl,fr,de,en",
},
"BG": {
"qid": "Q219",
"name": "Bulgaria",
"languages": "bg,en",
},
"BR": {
"qid": "Q155",
"name": "Brazil",
"languages": "pt,en",
},
"BY": {
"qid": "Q184",
"name": "Belarus",
"languages": "be,ru,en",
},
"CH": {
"qid": "Q39",
"name": "Switzerland",
"languages": "de,fr,it,rm,en",
},
"CZ": {
"qid": "Q213",
"name": "Czech Republic",
"languages": "cs,en",
},
"DE": {
"qid": "Q183",
"name": "Germany",
"languages": "de,en",
},
"EG": {
"qid": "Q79",
"name": "Egypt",
"languages": "ar,en",
},
"FR": {
"qid": "Q142",
"name": "France",
"languages": "fr,en",
},
"GB": {
"qid": "Q145",
"name": "United Kingdom",
"languages": "en",
},
"IT": {
"qid": "Q38",
"name": "Italy",
"languages": "it,en",
},
"JP": {
"qid": "Q17",
"name": "Japan",
"languages": "ja,en",
},
"MX": {
"qid": "Q96",
"name": "Mexico",
"languages": "es,en",
},
"NL": {
"qid": "Q55",
"name": "Netherlands",
"languages": "nl,en",
},
"PL": {
"qid": "Q36",
"name": "Poland",
"languages": "pl,en",
},
"AR": {
"qid": "Q414",
"name": "Argentina",
"languages": "es,en",
},
}
# Heritage institution types to search
HERITAGE_TYPES = [
"wd:Q33506", # museum
"wd:Q7075", # library
"wd:Q166118", # archive
"wd:Q1007870", # art gallery
"wd:Q28564", # public library
"wd:Q207694", # art museum
"wd:Q17431399", # natural history museum
"wd:Q856584", # research library
"wd:Q15243209", # historical archive
"wd:Q2668072", # cantonal/state library
"wd:Q3329412", # cantonal/state archive
"wd:Q928830", # metro station (sometimes misclassified)
"wd:Q11315", # building (general)
"wd:Q3152824", # cultural institution
"wd:Q210272", # cultural property
"wd:Q18918145", # museum building
"wd:Q1030034", # special library
"wd:Q1970365", # community archive
"wd:Q2151232", # documentation center
]
# Institution type mapping from GHCID filename patterns
# Pattern: XX-R-CCC-T-... where T is the single-letter type code
INSTITUTION_TYPE_MAP = {
'A': 'archive', # Archive
'L': 'library', # Library
'M': 'museum', # Museum
'G': 'gallery', # Gallery
'H': 'heritage', # Holy sites / Heritage
'O': 'official', # Official institution
'R': 'research', # Research center
'C': 'corporate', # Corporation
'U': 'unknown', # Unknown
'B': 'botanical', # Botanical garden / Zoo
'E': 'education', # Education provider
'S': 'society', # Collecting society
'F': 'feature', # Physical feature
'I': 'intangible',# Intangible heritage
'X': 'mixed', # Mixed types
'P': 'personal', # Personal collection
'D': 'digital', # Digital platform
'N': 'ngo', # NGO
'T': 'taste', # Taste/smell heritage
}
# Keywords to detect institution type from Wikidata type labels
# Maps our institution type to keywords that indicate compatibility
TYPE_KEYWORDS = {
'archive': ['archiv', 'archive', 'records', 'akten', 'stadtarchiv', 'landesarchiv',
'staatsarchiv', 'kreisarchiv', 'gemeindearchiv', 'bezirksarchiv'],
'library': ['bibliothek', 'library', 'bücherei', 'mediathek', 'stadtbibliothek',
'landesbibliothek', 'universitätsbibliothek', 'bibliothèque', 'biblioteca'],
'museum': ['museum', 'musée', 'museo', 'galerie', 'gallery', 'ausstellung',
'sammlung', 'collection', 'kunsthalle'],
'gallery': ['galerie', 'gallery', 'kunsthalle', 'art museum', 'kunstmuseum'],
'heritage': ['heritage', 'cultural', 'denkmal', 'monument', 'kirche', 'church',
'cathedral', 'temple', 'shrine'],
'research': ['research', 'forschung', 'institut', 'institute', 'zentrum', 'center',
'documentation', 'dokumentation'],
'education': ['universität', 'university', 'hochschule', 'college', 'school',
'akademie', 'academy'],
}
# Default settings
DEFAULT_RADIUS_KM = 2.0
DEFAULT_THRESHOLD = 80.0
def normalize_name(name: str) -> str:
"""Normalize institution name for matching."""
if not name:
return ""
# Lowercase
name = name.lower()
# Remove common prefixes/suffixes
remove_patterns = [
r'^(die|das|der|the|het|de|le|la|les|il|lo|la)\s+',
r'\s+(gmbh|ag|e\.v\.|ev|vzw|asbl|stiftung|foundation|verein)$',
]
for pattern in remove_patterns:
name = re.sub(pattern, '', name, flags=re.IGNORECASE)
# Normalize whitespace
name = ' '.join(name.split())
return name.strip()
def extract_name_variants(name: str) -> List[str]:
"""Extract multiple name variants for matching.
Handles pipe-separated names like "University | Library"
Returns all meaningful variants (excludes too-short/generic parts).
"""
variants = [name]
# If name contains pipe separator, add parts
if '|' in name:
parts = [p.strip() for p in name.split('|')]
# Only add parts that are meaningful (> 15 chars or multi-word)
for part in parts:
if len(part) > 15 or len(part.split()) > 1:
variants.append(part)
# Also try combinations
if len(parts) >= 2:
# "University Library" instead of "University | Library"
variants.append(' '.join(parts))
# If name contains comma, add parts
if ',' in name:
parts = [p.strip() for p in name.split(',')]
# Only add meaningful parts
for part in parts:
if len(part) > 15 or len(part.split()) > 1:
variants.append(part)
return variants
def is_generic_match(name1: str, name2: str, score: float) -> bool:
"""Check if a match is too generic (e.g., matching on just 'Bibliothek')."""
# Generic terms that shouldn't count as full matches
generic_terms = {
'bibliothek', 'library', 'archiv', 'archive', 'museum',
'gallery', 'galerie', 'stadtbibliothek', 'stadtarchiv',
'universitätsbibliothek', 'landesbibliothek', 'landesarchiv'
}
n1_lower = name1.lower().strip()
n2_lower = name2.lower().strip()
# If score is very high but one name is just a generic term, it's suspicious
if score > 95:
if n1_lower in generic_terms or n2_lower in generic_terms:
return True
return False
def get_institution_type_from_filename(filename: str) -> Optional[str]:
"""
Extract institution type from GHCID filename pattern.
Pattern: XX-RR-CCC-T-ABBREV.yaml where T is the single-letter type code.
Example: AT-6-LEO-A-MLUA.yaml -> 'archive' (A = Archive)
Args:
filename: The filename (not full path)
Returns:
Institution type string (e.g., 'archive', 'library', 'museum') or None
"""
# Pattern matches: country-region-city-TYPE-abbreviation
# The TYPE is a single letter after the third hyphen
match = re.search(r'^[A-Z]{2}-[A-Z0-9]+-[A-Z]{3}-([A-Z])-', filename)
if match:
type_code = match.group(1)
return INSTITUTION_TYPE_MAP.get(type_code)
return None
def is_combined_institution(custodian_name: str) -> set:
"""
Check if institution name suggests multiple types (library+archive, etc.).
Handles combined institutions like "Universitätsbibliothek und Archiv" which
are legitimately both a library AND an archive.
Args:
custodian_name: The institution name to check
Returns:
Set of institution types found in the name (e.g., {'library', 'archive'})
"""
if not custodian_name:
return set()
name_lower = custodian_name.lower()
types_found = set()
# Only check the core types that can be combined
combinable_types = ['archive', 'library', 'museum', 'gallery', 'research']
for inst_type in combinable_types:
keywords = TYPE_KEYWORDS.get(inst_type, [])
for keyword in keywords:
if keyword in name_lower:
types_found.add(inst_type)
break # Found this type, move to next type
return types_found
def check_type_compatibility(custodian_type: Optional[str], wikidata_type_label: str,
custodian_name: Optional[str] = None) -> Tuple[bool, float]:
"""
Check if institution types are compatible.
Args:
custodian_type: Type extracted from filename (e.g., 'archive', 'library')
wikidata_type_label: Type label from Wikidata (e.g., 'public library')
custodian_name: Optional custodian name to check for combined institutions
Returns:
Tuple of (is_compatible, penalty_factor)
- is_compatible: True if types match or are unknown
- penalty_factor: 1.0 for match, 0.7 for unknown, 0.3 for mismatch
"""
# If we don't know the custodian type, allow the match with slight penalty
if not custodian_type or custodian_type in ('unknown', 'mixed'):
return (True, 0.85)
# If no Wikidata type label, allow with penalty
if not wikidata_type_label:
return (True, 0.85)
wd_lower = wikidata_type_label.lower()
# Get keywords for this institution type
keywords = TYPE_KEYWORDS.get(custodian_type, [])
# Check if any keyword matches
for keyword in keywords:
if keyword in wd_lower:
return (True, 1.0) # Perfect type match
# Check for cross-type compatibility (some types are related)
# Museum and gallery are often interchangeable
if custodian_type == 'museum' and any(k in wd_lower for k in TYPE_KEYWORDS.get('gallery', [])):
return (True, 0.95)
if custodian_type == 'gallery' and any(k in wd_lower for k in TYPE_KEYWORDS.get('museum', [])):
return (True, 0.95)
# Research centers can also be museums/libraries/archives
if custodian_type == 'research':
for related_type in ['museum', 'library', 'archive']:
if any(k in wd_lower for k in TYPE_KEYWORDS.get(related_type, [])):
return (True, 0.9)
# Check for combined institutions (e.g., "Bibliothek und Archiv")
# If the custodian name indicates multiple types, allow cross-type matches
if custodian_name:
combined_types = is_combined_institution(custodian_name)
if len(combined_types) > 1:
# This is a combined institution - check if Wikidata type matches ANY of the combined types
for combined_type in combined_types:
combined_keywords = TYPE_KEYWORDS.get(combined_type, [])
for keyword in combined_keywords:
if keyword in wd_lower:
# Wikidata matches one of the combined types - allow with small penalty
return (True, 0.92)
# If we have keywords defined but none matched, it's a mismatch
if keywords:
# Check if Wikidata type matches a DIFFERENT institution category
for other_type, other_keywords in TYPE_KEYWORDS.items():
if other_type != custodian_type:
for keyword in other_keywords:
if keyword in wd_lower:
# Clear mismatch - e.g., custodian is archive but Wikidata says library
return (False, 0.0)
# No clear match or mismatch - allow with penalty
return (True, 0.75)
def query_nearby_institutions(lat: float, lon: float, country_qid: str,
languages: str, radius_km: float = 2.0) -> List[Dict]:
"""
Query Wikidata for heritage institutions near given coordinates.
Args:
lat: Latitude
lon: Longitude
country_qid: Wikidata Q-number for country (e.g., "Q40" for Austria)
languages: Language codes for labels
radius_km: Search radius in kilometers
Returns:
List of dicts with: qid, label, description, type, distance_km, isil, viaf, website
"""
types_str = " ".join(HERITAGE_TYPES)
query = f"""
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?typeLabel ?coord ?isil ?viaf ?website
WHERE {{
# Geographic filter - institutions near coordinates
SERVICE wikibase:around {{
?item wdt:P625 ?coord .
bd:serviceParam wikibase:center "Point({lon} {lat})"^^geo:wktLiteral .
bd:serviceParam wikibase:radius "{radius_km}" .
}}
# Filter to heritage institution types
?item wdt:P31 ?type .
VALUES ?type {{ {types_str} }}
# In the target country
?item wdt:P17 wd:{country_qid} .
# Optional: ISIL code
OPTIONAL {{ ?item wdt:P791 ?isil }}
# Optional: VIAF ID
OPTIONAL {{ ?item wdt:P214 ?viaf }}
# Optional: official website
OPTIONAL {{ ?item wdt:P856 ?website }}
SERVICE wikibase:label {{
bd:serviceParam wikibase:language "{languages}"
}}
}}
LIMIT 50
"""
headers = {
'User-Agent': 'GLAM-Data-Extraction/0.2.1 (heritage institution location matching)',
'Accept': 'application/sparql-results+json'
}
try:
response = requests.get(
WIKIDATA_SPARQL,
params={'query': query, 'format': 'json'},
headers=headers,
timeout=30
)
response.raise_for_status()
data = response.json()
results = []
seen_qids = set()
for binding in data.get('results', {}).get('bindings', []):
qid = binding.get('item', {}).get('value', '').split('/')[-1]
# Skip duplicates
if qid in seen_qids:
continue
seen_qids.add(qid)
label = binding.get('itemLabel', {}).get('value', '')
description = binding.get('itemDescription', {}).get('value', '')
type_label = binding.get('typeLabel', {}).get('value', '')
# Skip if label is just the Q-number
if label.startswith('Q') and label[1:].isdigit():
continue
result = {
'qid': qid,
'label': label,
'description': description,
'type': type_label,
'isil': binding.get('isil', {}).get('value'),
'viaf': binding.get('viaf', {}).get('value'),
'website': binding.get('website', {}).get('value'),
}
# Parse coordinates and calculate approximate distance
coord_str = binding.get('coord', {}).get('value', '')
if coord_str:
# Format: Point(lon lat)
match = re.search(r'Point\(([0-9.-]+)\s+([0-9.-]+)\)', coord_str)
if match:
wd_lon, wd_lat = float(match.group(1)), float(match.group(2))
# Simple distance approximation (Euclidean, not great circle)
result['distance_km'] = ((lat - wd_lat)**2 + (lon - wd_lon)**2)**0.5 * 111
results.append(result)
return results
except requests.exceptions.RequestException as e:
print(f" Warning: Wikidata query failed: {e}")
return []
def find_best_match(custodian_name: str, candidates: List[Dict],
threshold: float = 80.0,
custodian_type: Optional[str] = None,
verbose: bool = False) -> Optional[Tuple[Dict, float]]:
"""
Find the best matching Wikidata entity for a custodian name.
Args:
custodian_name: Name from custodian file
candidates: List of nearby Wikidata institutions
threshold: Minimum similarity score (0-100)
custodian_type: Institution type from filename (e.g., 'archive', 'library')
verbose: Print detailed matching info
Returns:
Tuple of (best_match, score) or None if no match above threshold
"""
if not candidates:
return None
# Get all name variants
name_variants = extract_name_variants(custodian_name)
best_match = None
best_score = 0.0
best_name_pair = ("", "")
rejected_for_type = [] # Track type mismatches for verbose output
for candidate in candidates:
wd_label = candidate.get('label', '')
wd_type = candidate.get('type', '')
wd_variants = extract_name_variants(wd_label)
# Check type compatibility FIRST (include custodian name for combined institution detection)
is_compatible, type_penalty = check_type_compatibility(custodian_type, wd_type, custodian_name)
if not is_compatible:
# Skip this candidate entirely - type mismatch
rejected_for_type.append((wd_label, wd_type))
continue
# Try all combinations of name variants
for name_var in name_variants:
normalized_name = normalize_name(name_var)
if len(normalized_name) < 5: # Skip too short names
continue
for wd_var in wd_variants:
normalized_wd = normalize_name(wd_var)
if len(normalized_wd) < 5: # Skip too short names
continue
# Try multiple fuzzy matching strategies
scores = [
fuzz.ratio(normalized_name, normalized_wd),
fuzz.partial_ratio(normalized_name, normalized_wd) * 0.9, # Discount partial
fuzz.token_sort_ratio(normalized_name, normalized_wd),
fuzz.token_set_ratio(normalized_name, normalized_wd) * 0.85, # Discount set
]
max_score = max(scores)
# Check for generic matches
if is_generic_match(name_var, wd_var, max_score):
max_score *= 0.5 # Heavy penalty for generic matches
# Apply type compatibility penalty
max_score *= type_penalty
if max_score > best_score:
best_score = max_score
best_match = candidate
best_name_pair = (name_var, wd_var)
# Verbose output for rejected candidates
if verbose and rejected_for_type:
print(f" Type mismatches rejected (custodian type: {custodian_type}):")
for wd_label, wd_type in rejected_for_type[:3]: # Show up to 3
print(f" - {wd_label} ({wd_type})")
if best_match and best_score >= threshold:
return (best_match, best_score)
return None
def load_custodian_file(filepath: Path) -> Optional[Dict]:
"""Load a custodian YAML file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f" Error loading {filepath}: {e}")
return None
def save_custodian_file(filepath: Path, data: Dict) -> bool:
"""Save a custodian YAML file."""
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
return True
except Exception as e:
print(f" Error saving {filepath}: {e}")
return False
def get_custodian_name(data: Dict) -> Optional[str]:
"""Extract custodian name from data."""
# Try multiple locations
if 'custodian_name' in data and 'claim_value' in data['custodian_name']:
return data['custodian_name']['claim_value']
if 'original_entry' in data and 'name' in data['original_entry']:
return data['original_entry']['name']
return None
def get_coordinates(data: Dict) -> Optional[Tuple[float, float]]:
"""Extract coordinates from custodian data."""
# Try location block first
if 'location' in data:
loc = data['location']
if 'latitude' in loc and 'longitude' in loc:
try:
return (float(loc['latitude']), float(loc['longitude']))
except (ValueError, TypeError):
pass
# Try ghcid.location_resolution
if 'ghcid' in data and 'location_resolution' in data['ghcid']:
loc = data['ghcid']['location_resolution']
if 'latitude' in loc and 'longitude' in loc:
try:
return (float(loc['latitude']), float(loc['longitude']))
except (ValueError, TypeError):
pass
return None
def add_wikidata_enrichment(data: Dict, match: Dict, score: float) -> Dict:
"""Add Wikidata enrichment to custodian data."""
qid = match['qid']
enrichment = {
'wikidata_id': qid,
'wikidata_url': f"http://www.wikidata.org/entity/{qid}",
'matched_by': 'location_name_match',
'match_score': round(score / 100.0, 3), # Convert to 0-1 scale
'matched_name': match.get('label', ''),
'enrichment_date': datetime.now(timezone.utc).isoformat(),
'enrichment_version': '2.2.0_location',
'wikidata_label': match.get('label', ''),
}
# Add optional fields
if match.get('description'):
enrichment['wikidata_description'] = match['description']
if match.get('website'):
enrichment['official_website'] = match['website']
if match.get('type'):
enrichment['instance_of_label'] = match['type']
if match.get('isil'):
enrichment['isil_from_wikidata'] = match['isil']
if match.get('viaf'):
enrichment['viaf_from_wikidata'] = match['viaf']
if match.get('distance_km'):
enrichment['distance_km'] = round(match['distance_km'], 2)
data['wikidata_enrichment'] = enrichment
# Add provenance note
if 'provenance' not in data:
data['provenance'] = {}
if 'notes' not in data['provenance']:
data['provenance']['notes'] = []
elif isinstance(data['provenance']['notes'], str):
# Convert string to list if needed
data['provenance']['notes'] = [data['provenance']['notes']]
note = f"Wikidata enrichment via location+name match {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}: {qid} ({match.get('label', '')}) - score: {score:.1f}%"
data['provenance']['notes'].append(note)
return data
def find_candidates(country: str, data_dir: Path) -> List[Path]:
"""Find custodian files that need location-based enrichment."""
pattern = f"{country}-*.yaml"
candidates = []
for filepath in data_dir.glob(pattern):
data = load_custodian_file(filepath)
if not data:
continue
# Skip if already has wikidata_enrichment
if 'wikidata_enrichment' in data:
continue
# Skip if no coordinates
coords = get_coordinates(data)
if not coords:
continue
# Skip if no name
name = get_custodian_name(data)
if not name:
continue
candidates.append(filepath)
return candidates
def main():
parser = argparse.ArgumentParser(
description='Enrich custodian files using location-based Wikidata matching'
)
parser.add_argument('--country', required=True,
choices=list(COUNTRY_CONFIG.keys()),
help='Country code (AT, BE, CH, NL, DE)')
parser.add_argument('--limit', type=int, default=0,
help='Limit number of files to process (0 = all)')
parser.add_argument('--dry-run', action='store_true',
help='Show matches without saving')
parser.add_argument('--radius', type=float, default=DEFAULT_RADIUS_KM,
help=f'Search radius in km (default: {DEFAULT_RADIUS_KM})')
parser.add_argument('--threshold', type=float, default=DEFAULT_THRESHOLD,
help=f'Name similarity threshold 0-100 (default: {DEFAULT_THRESHOLD})')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed matching info')
args = parser.parse_args()
country = args.country
config = COUNTRY_CONFIG[country]
print(f"\n{'='*60}")
print(f"Location-Based Wikidata Enrichment for {config['name']} ({country})")
print(f"{'='*60}")
print(f" Wikidata: {config['qid']}")
print(f" Languages: {config['languages']}")
print(f" Search radius: {args.radius} km")
print(f" Name threshold: {args.threshold}%")
print(f" Dry run: {args.dry_run}")
print()
data_dir = Path('data/custodian')
# Find candidates
print("Finding candidate files...")
candidates = find_candidates(country, data_dir)
print(f" Found {len(candidates)} files with coordinates but no Wikidata enrichment")
if args.limit > 0:
candidates = candidates[:args.limit]
print(f" Limited to {len(candidates)} files")
if not candidates:
print("No candidates to process.")
return
print()
# Process each candidate
enriched_count = 0
no_match_count = 0
error_count = 0
for i, filepath in enumerate(candidates, 1):
print(f"[{i}/{len(candidates)}] {filepath.name}")
data = load_custodian_file(filepath)
if not data:
error_count += 1
continue
name = get_custodian_name(data)
coords = get_coordinates(data)
if not name or not coords:
error_count += 1
continue
lat, lon = coords
# Extract institution type from filename
institution_type = get_institution_type_from_filename(filepath.name)
if args.verbose:
print(f" Name: {name}")
print(f" Coords: {lat}, {lon}")
print(f" Type: {institution_type or 'unknown'}")
# Query Wikidata for nearby institutions
nearby = query_nearby_institutions(
lat, lon,
config['qid'],
config['languages'],
args.radius
)
if args.verbose:
print(f" Found {len(nearby)} nearby institutions")
for n in nearby[:5]:
print(f" - {n['qid']}: {n['label']} ({n.get('type', '?')}) [{n.get('distance_km', '?'):.2f} km]")
if not nearby:
no_match_count += 1
print(f" -> No nearby heritage institutions found")
# Rate limit
time.sleep(0.5)
continue
# Find best match with type checking
result = find_best_match(
name,
nearby,
args.threshold,
custodian_type=institution_type,
verbose=args.verbose
)
if not result:
no_match_count += 1
print(f" -> No name match above {args.threshold}% threshold")
if args.verbose and nearby:
# Show what we did find
best_candidate = nearby[0]
normalized_name = normalize_name(name)
normalized_wd = normalize_name(best_candidate['label'])
score = fuzz.token_set_ratio(normalized_name, normalized_wd)
print(f" Best candidate: {best_candidate['label']} ({best_candidate.get('type', '?')}) - {score}%")
# Rate limit
time.sleep(0.5)
continue
match, score = result
print(f" -> MATCH: {match['qid']} - {match['label']} ({match.get('type', '?')}) [score: {score:.1f}%]")
if not args.dry_run:
data = add_wikidata_enrichment(data, match, score)
if save_custodian_file(filepath, data):
enriched_count += 1
else:
error_count += 1
else:
enriched_count += 1
# Rate limit to avoid overloading Wikidata
time.sleep(0.5)
# Summary
print()
print(f"{'='*60}")
print("Summary")
print(f"{'='*60}")
print(f" Processed: {len(candidates)}")
print(f" Enriched: {enriched_count}")
print(f" No match: {no_match_count}")
print(f" Errors: {error_count}")
if args.dry_run:
print("\n (Dry run - no files were modified)")
# Show updated stats
if not args.dry_run and enriched_count > 0:
print()
total = len(list(data_dir.glob(f"{country}-*.yaml")))
enriched = 0
for f in data_dir.glob(f"{country}-*.yaml"):
d = load_custodian_file(f)
if d and 'wikidata_enrichment' in d:
enriched += 1
print(f" {country} enrichment: {enriched}/{total} ({100*enriched/total:.1f}%)")
if __name__ == '__main__':
main()