#!/usr/bin/env python3 """ Enrich custodian files with Wikidata Q-numbers using location-based matching. Uses coordinates from custodian files to find nearby Wikidata heritage institutions, then applies fuzzy name matching for verification. Process: 1. Find custodian files with coordinates but no wikidata_enrichment 2. For each file, query Wikidata for heritage institutions within radius 3. Fuzzy match by name (higher threshold since we have location proximity) 4. Add Wikidata identifiers to matched files Usage: python scripts/enrich_by_location.py --country AT [--limit N] [--dry-run] [--radius 2.0] [--threshold 80] """ import yaml import requests import argparse import sys import time import re from pathlib import Path from typing import Dict, List, Optional, Tuple from datetime import datetime, timezone from rapidfuzz import fuzz # Wikidata SPARQL endpoint WIKIDATA_SPARQL = "https://query.wikidata.org/sparql" # Country configurations COUNTRY_CONFIG = { "AT": { "qid": "Q40", "name": "Austria", "languages": "de,en", }, "BE": { "qid": "Q31", "name": "Belgium", "languages": "nl,fr,de,en", }, "BG": { "qid": "Q219", "name": "Bulgaria", "languages": "bg,en", }, "BR": { "qid": "Q155", "name": "Brazil", "languages": "pt,en", }, "BY": { "qid": "Q184", "name": "Belarus", "languages": "be,ru,en", }, "CH": { "qid": "Q39", "name": "Switzerland", "languages": "de,fr,it,rm,en", }, "CZ": { "qid": "Q213", "name": "Czech Republic", "languages": "cs,en", }, "DE": { "qid": "Q183", "name": "Germany", "languages": "de,en", }, "EG": { "qid": "Q79", "name": "Egypt", "languages": "ar,en", }, "FR": { "qid": "Q142", "name": "France", "languages": "fr,en", }, "GB": { "qid": "Q145", "name": "United Kingdom", "languages": "en", }, "IT": { "qid": "Q38", "name": "Italy", "languages": "it,en", }, "JP": { "qid": "Q17", "name": "Japan", "languages": "ja,en", }, "MX": { "qid": "Q96", "name": "Mexico", "languages": "es,en", }, "NL": { "qid": "Q55", "name": "Netherlands", "languages": "nl,en", }, "PL": { "qid": "Q36", "name": "Poland", "languages": "pl,en", }, "AR": { "qid": "Q414", "name": "Argentina", "languages": "es,en", }, } # Heritage institution types to search HERITAGE_TYPES = [ "wd:Q33506", # museum "wd:Q7075", # library "wd:Q166118", # archive "wd:Q1007870", # art gallery "wd:Q28564", # public library "wd:Q207694", # art museum "wd:Q17431399", # natural history museum "wd:Q856584", # research library "wd:Q15243209", # historical archive "wd:Q2668072", # cantonal/state library "wd:Q3329412", # cantonal/state archive "wd:Q928830", # metro station (sometimes misclassified) "wd:Q11315", # building (general) "wd:Q3152824", # cultural institution "wd:Q210272", # cultural property "wd:Q18918145", # museum building "wd:Q1030034", # special library "wd:Q1970365", # community archive "wd:Q2151232", # documentation center ] # Institution type mapping from GHCID filename patterns # Pattern: XX-R-CCC-T-... where T is the single-letter type code INSTITUTION_TYPE_MAP = { 'A': 'archive', # Archive 'L': 'library', # Library 'M': 'museum', # Museum 'G': 'gallery', # Gallery 'H': 'heritage', # Holy sites / Heritage 'O': 'official', # Official institution 'R': 'research', # Research center 'C': 'corporate', # Corporation 'U': 'unknown', # Unknown 'B': 'botanical', # Botanical garden / Zoo 'E': 'education', # Education provider 'S': 'society', # Collecting society 'F': 'feature', # Physical feature 'I': 'intangible',# Intangible heritage 'X': 'mixed', # Mixed types 'P': 'personal', # Personal collection 'D': 'digital', # Digital platform 'N': 'ngo', # NGO 'T': 'taste', # Taste/smell heritage } # Keywords to detect institution type from Wikidata type labels # Maps our institution type to keywords that indicate compatibility TYPE_KEYWORDS = { 'archive': ['archiv', 'archive', 'records', 'akten', 'stadtarchiv', 'landesarchiv', 'staatsarchiv', 'kreisarchiv', 'gemeindearchiv', 'bezirksarchiv'], 'library': ['bibliothek', 'library', 'bücherei', 'mediathek', 'stadtbibliothek', 'landesbibliothek', 'universitätsbibliothek', 'bibliothèque', 'biblioteca'], 'museum': ['museum', 'musée', 'museo', 'galerie', 'gallery', 'ausstellung', 'sammlung', 'collection', 'kunsthalle'], 'gallery': ['galerie', 'gallery', 'kunsthalle', 'art museum', 'kunstmuseum'], 'heritage': ['heritage', 'cultural', 'denkmal', 'monument', 'kirche', 'church', 'cathedral', 'temple', 'shrine'], 'research': ['research', 'forschung', 'institut', 'institute', 'zentrum', 'center', 'documentation', 'dokumentation'], 'education': ['universität', 'university', 'hochschule', 'college', 'school', 'akademie', 'academy'], } # Default settings DEFAULT_RADIUS_KM = 2.0 DEFAULT_THRESHOLD = 80.0 def normalize_name(name: str) -> str: """Normalize institution name for matching.""" if not name: return "" # Lowercase name = name.lower() # Remove common prefixes/suffixes remove_patterns = [ r'^(die|das|der|the|het|de|le|la|les|il|lo|la)\s+', r'\s+(gmbh|ag|e\.v\.|ev|vzw|asbl|stiftung|foundation|verein)$', ] for pattern in remove_patterns: name = re.sub(pattern, '', name, flags=re.IGNORECASE) # Normalize whitespace name = ' '.join(name.split()) return name.strip() def extract_name_variants(name: str) -> List[str]: """Extract multiple name variants for matching. Handles pipe-separated names like "University | Library" Returns all meaningful variants (excludes too-short/generic parts). """ variants = [name] # If name contains pipe separator, add parts if '|' in name: parts = [p.strip() for p in name.split('|')] # Only add parts that are meaningful (> 15 chars or multi-word) for part in parts: if len(part) > 15 or len(part.split()) > 1: variants.append(part) # Also try combinations if len(parts) >= 2: # "University Library" instead of "University | Library" variants.append(' '.join(parts)) # If name contains comma, add parts if ',' in name: parts = [p.strip() for p in name.split(',')] # Only add meaningful parts for part in parts: if len(part) > 15 or len(part.split()) > 1: variants.append(part) return variants def is_generic_match(name1: str, name2: str, score: float) -> bool: """Check if a match is too generic (e.g., matching on just 'Bibliothek').""" # Generic terms that shouldn't count as full matches generic_terms = { 'bibliothek', 'library', 'archiv', 'archive', 'museum', 'gallery', 'galerie', 'stadtbibliothek', 'stadtarchiv', 'universitätsbibliothek', 'landesbibliothek', 'landesarchiv' } n1_lower = name1.lower().strip() n2_lower = name2.lower().strip() # If score is very high but one name is just a generic term, it's suspicious if score > 95: if n1_lower in generic_terms or n2_lower in generic_terms: return True return False def get_institution_type_from_filename(filename: str) -> Optional[str]: """ Extract institution type from GHCID filename pattern. Pattern: XX-RR-CCC-T-ABBREV.yaml where T is the single-letter type code. Example: AT-6-LEO-A-MLUA.yaml -> 'archive' (A = Archive) Args: filename: The filename (not full path) Returns: Institution type string (e.g., 'archive', 'library', 'museum') or None """ # Pattern matches: country-region-city-TYPE-abbreviation # The TYPE is a single letter after the third hyphen match = re.search(r'^[A-Z]{2}-[A-Z0-9]+-[A-Z]{3}-([A-Z])-', filename) if match: type_code = match.group(1) return INSTITUTION_TYPE_MAP.get(type_code) return None def is_combined_institution(custodian_name: str) -> set: """ Check if institution name suggests multiple types (library+archive, etc.). Handles combined institutions like "Universitätsbibliothek und Archiv" which are legitimately both a library AND an archive. Args: custodian_name: The institution name to check Returns: Set of institution types found in the name (e.g., {'library', 'archive'}) """ if not custodian_name: return set() name_lower = custodian_name.lower() types_found = set() # Only check the core types that can be combined combinable_types = ['archive', 'library', 'museum', 'gallery', 'research'] for inst_type in combinable_types: keywords = TYPE_KEYWORDS.get(inst_type, []) for keyword in keywords: if keyword in name_lower: types_found.add(inst_type) break # Found this type, move to next type return types_found def check_type_compatibility(custodian_type: Optional[str], wikidata_type_label: str, custodian_name: Optional[str] = None) -> Tuple[bool, float]: """ Check if institution types are compatible. Args: custodian_type: Type extracted from filename (e.g., 'archive', 'library') wikidata_type_label: Type label from Wikidata (e.g., 'public library') custodian_name: Optional custodian name to check for combined institutions Returns: Tuple of (is_compatible, penalty_factor) - is_compatible: True if types match or are unknown - penalty_factor: 1.0 for match, 0.7 for unknown, 0.3 for mismatch """ # If we don't know the custodian type, allow the match with slight penalty if not custodian_type or custodian_type in ('unknown', 'mixed'): return (True, 0.85) # If no Wikidata type label, allow with penalty if not wikidata_type_label: return (True, 0.85) wd_lower = wikidata_type_label.lower() # Get keywords for this institution type keywords = TYPE_KEYWORDS.get(custodian_type, []) # Check if any keyword matches for keyword in keywords: if keyword in wd_lower: return (True, 1.0) # Perfect type match # Check for cross-type compatibility (some types are related) # Museum and gallery are often interchangeable if custodian_type == 'museum' and any(k in wd_lower for k in TYPE_KEYWORDS.get('gallery', [])): return (True, 0.95) if custodian_type == 'gallery' and any(k in wd_lower for k in TYPE_KEYWORDS.get('museum', [])): return (True, 0.95) # Research centers can also be museums/libraries/archives if custodian_type == 'research': for related_type in ['museum', 'library', 'archive']: if any(k in wd_lower for k in TYPE_KEYWORDS.get(related_type, [])): return (True, 0.9) # Check for combined institutions (e.g., "Bibliothek und Archiv") # If the custodian name indicates multiple types, allow cross-type matches if custodian_name: combined_types = is_combined_institution(custodian_name) if len(combined_types) > 1: # This is a combined institution - check if Wikidata type matches ANY of the combined types for combined_type in combined_types: combined_keywords = TYPE_KEYWORDS.get(combined_type, []) for keyword in combined_keywords: if keyword in wd_lower: # Wikidata matches one of the combined types - allow with small penalty return (True, 0.92) # If we have keywords defined but none matched, it's a mismatch if keywords: # Check if Wikidata type matches a DIFFERENT institution category for other_type, other_keywords in TYPE_KEYWORDS.items(): if other_type != custodian_type: for keyword in other_keywords: if keyword in wd_lower: # Clear mismatch - e.g., custodian is archive but Wikidata says library return (False, 0.0) # No clear match or mismatch - allow with penalty return (True, 0.75) def query_nearby_institutions(lat: float, lon: float, country_qid: str, languages: str, radius_km: float = 2.0) -> List[Dict]: """ Query Wikidata for heritage institutions near given coordinates. Args: lat: Latitude lon: Longitude country_qid: Wikidata Q-number for country (e.g., "Q40" for Austria) languages: Language codes for labels radius_km: Search radius in kilometers Returns: List of dicts with: qid, label, description, type, distance_km, isil, viaf, website """ types_str = " ".join(HERITAGE_TYPES) query = f""" SELECT DISTINCT ?item ?itemLabel ?itemDescription ?typeLabel ?coord ?isil ?viaf ?website WHERE {{ # Geographic filter - institutions near coordinates SERVICE wikibase:around {{ ?item wdt:P625 ?coord . bd:serviceParam wikibase:center "Point({lon} {lat})"^^geo:wktLiteral . bd:serviceParam wikibase:radius "{radius_km}" . }} # Filter to heritage institution types ?item wdt:P31 ?type . VALUES ?type {{ {types_str} }} # In the target country ?item wdt:P17 wd:{country_qid} . # Optional: ISIL code OPTIONAL {{ ?item wdt:P791 ?isil }} # Optional: VIAF ID OPTIONAL {{ ?item wdt:P214 ?viaf }} # Optional: official website OPTIONAL {{ ?item wdt:P856 ?website }} SERVICE wikibase:label {{ bd:serviceParam wikibase:language "{languages}" }} }} LIMIT 50 """ headers = { 'User-Agent': 'GLAM-Data-Extraction/0.2.1 (heritage institution location matching)', 'Accept': 'application/sparql-results+json' } try: response = requests.get( WIKIDATA_SPARQL, params={'query': query, 'format': 'json'}, headers=headers, timeout=30 ) response.raise_for_status() data = response.json() results = [] seen_qids = set() for binding in data.get('results', {}).get('bindings', []): qid = binding.get('item', {}).get('value', '').split('/')[-1] # Skip duplicates if qid in seen_qids: continue seen_qids.add(qid) label = binding.get('itemLabel', {}).get('value', '') description = binding.get('itemDescription', {}).get('value', '') type_label = binding.get('typeLabel', {}).get('value', '') # Skip if label is just the Q-number if label.startswith('Q') and label[1:].isdigit(): continue result = { 'qid': qid, 'label': label, 'description': description, 'type': type_label, 'isil': binding.get('isil', {}).get('value'), 'viaf': binding.get('viaf', {}).get('value'), 'website': binding.get('website', {}).get('value'), } # Parse coordinates and calculate approximate distance coord_str = binding.get('coord', {}).get('value', '') if coord_str: # Format: Point(lon lat) match = re.search(r'Point\(([0-9.-]+)\s+([0-9.-]+)\)', coord_str) if match: wd_lon, wd_lat = float(match.group(1)), float(match.group(2)) # Simple distance approximation (Euclidean, not great circle) result['distance_km'] = ((lat - wd_lat)**2 + (lon - wd_lon)**2)**0.5 * 111 results.append(result) return results except requests.exceptions.RequestException as e: print(f" Warning: Wikidata query failed: {e}") return [] def find_best_match(custodian_name: str, candidates: List[Dict], threshold: float = 80.0, custodian_type: Optional[str] = None, verbose: bool = False) -> Optional[Tuple[Dict, float]]: """ Find the best matching Wikidata entity for a custodian name. Args: custodian_name: Name from custodian file candidates: List of nearby Wikidata institutions threshold: Minimum similarity score (0-100) custodian_type: Institution type from filename (e.g., 'archive', 'library') verbose: Print detailed matching info Returns: Tuple of (best_match, score) or None if no match above threshold """ if not candidates: return None # Get all name variants name_variants = extract_name_variants(custodian_name) best_match = None best_score = 0.0 best_name_pair = ("", "") rejected_for_type = [] # Track type mismatches for verbose output for candidate in candidates: wd_label = candidate.get('label', '') wd_type = candidate.get('type', '') wd_variants = extract_name_variants(wd_label) # Check type compatibility FIRST (include custodian name for combined institution detection) is_compatible, type_penalty = check_type_compatibility(custodian_type, wd_type, custodian_name) if not is_compatible: # Skip this candidate entirely - type mismatch rejected_for_type.append((wd_label, wd_type)) continue # Try all combinations of name variants for name_var in name_variants: normalized_name = normalize_name(name_var) if len(normalized_name) < 5: # Skip too short names continue for wd_var in wd_variants: normalized_wd = normalize_name(wd_var) if len(normalized_wd) < 5: # Skip too short names continue # Try multiple fuzzy matching strategies scores = [ fuzz.ratio(normalized_name, normalized_wd), fuzz.partial_ratio(normalized_name, normalized_wd) * 0.9, # Discount partial fuzz.token_sort_ratio(normalized_name, normalized_wd), fuzz.token_set_ratio(normalized_name, normalized_wd) * 0.85, # Discount set ] max_score = max(scores) # Check for generic matches if is_generic_match(name_var, wd_var, max_score): max_score *= 0.5 # Heavy penalty for generic matches # Apply type compatibility penalty max_score *= type_penalty if max_score > best_score: best_score = max_score best_match = candidate best_name_pair = (name_var, wd_var) # Verbose output for rejected candidates if verbose and rejected_for_type: print(f" Type mismatches rejected (custodian type: {custodian_type}):") for wd_label, wd_type in rejected_for_type[:3]: # Show up to 3 print(f" - {wd_label} ({wd_type})") if best_match and best_score >= threshold: return (best_match, best_score) return None def load_custodian_file(filepath: Path) -> Optional[Dict]: """Load a custodian YAML file.""" try: with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: print(f" Error loading {filepath}: {e}") return None def save_custodian_file(filepath: Path, data: Dict) -> bool: """Save a custodian YAML file.""" try: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) return True except Exception as e: print(f" Error saving {filepath}: {e}") return False def get_custodian_name(data: Dict) -> Optional[str]: """Extract custodian name from data.""" # Try multiple locations if 'custodian_name' in data and 'claim_value' in data['custodian_name']: return data['custodian_name']['claim_value'] if 'original_entry' in data and 'name' in data['original_entry']: return data['original_entry']['name'] return None def get_coordinates(data: Dict) -> Optional[Tuple[float, float]]: """Extract coordinates from custodian data.""" # Try location block first if 'location' in data: loc = data['location'] if 'latitude' in loc and 'longitude' in loc: try: return (float(loc['latitude']), float(loc['longitude'])) except (ValueError, TypeError): pass # Try ghcid.location_resolution if 'ghcid' in data and 'location_resolution' in data['ghcid']: loc = data['ghcid']['location_resolution'] if 'latitude' in loc and 'longitude' in loc: try: return (float(loc['latitude']), float(loc['longitude'])) except (ValueError, TypeError): pass return None def add_wikidata_enrichment(data: Dict, match: Dict, score: float) -> Dict: """Add Wikidata enrichment to custodian data.""" qid = match['qid'] enrichment = { 'wikidata_id': qid, 'wikidata_url': f"http://www.wikidata.org/entity/{qid}", 'matched_by': 'location_name_match', 'match_score': round(score / 100.0, 3), # Convert to 0-1 scale 'matched_name': match.get('label', ''), 'enrichment_date': datetime.now(timezone.utc).isoformat(), 'enrichment_version': '2.2.0_location', 'wikidata_label': match.get('label', ''), } # Add optional fields if match.get('description'): enrichment['wikidata_description'] = match['description'] if match.get('website'): enrichment['official_website'] = match['website'] if match.get('type'): enrichment['instance_of_label'] = match['type'] if match.get('isil'): enrichment['isil_from_wikidata'] = match['isil'] if match.get('viaf'): enrichment['viaf_from_wikidata'] = match['viaf'] if match.get('distance_km'): enrichment['distance_km'] = round(match['distance_km'], 2) data['wikidata_enrichment'] = enrichment # Add provenance note if 'provenance' not in data: data['provenance'] = {} if 'notes' not in data['provenance']: data['provenance']['notes'] = [] elif isinstance(data['provenance']['notes'], str): # Convert string to list if needed data['provenance']['notes'] = [data['provenance']['notes']] note = f"Wikidata enrichment via location+name match {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}: {qid} ({match.get('label', '')}) - score: {score:.1f}%" data['provenance']['notes'].append(note) return data def find_candidates(country: str, data_dir: Path) -> List[Path]: """Find custodian files that need location-based enrichment.""" pattern = f"{country}-*.yaml" candidates = [] for filepath in data_dir.glob(pattern): data = load_custodian_file(filepath) if not data: continue # Skip if already has wikidata_enrichment if 'wikidata_enrichment' in data: continue # Skip if no coordinates coords = get_coordinates(data) if not coords: continue # Skip if no name name = get_custodian_name(data) if not name: continue candidates.append(filepath) return candidates def main(): parser = argparse.ArgumentParser( description='Enrich custodian files using location-based Wikidata matching' ) parser.add_argument('--country', required=True, choices=list(COUNTRY_CONFIG.keys()), help='Country code (AT, BE, CH, NL, DE)') parser.add_argument('--limit', type=int, default=0, help='Limit number of files to process (0 = all)') parser.add_argument('--dry-run', action='store_true', help='Show matches without saving') parser.add_argument('--radius', type=float, default=DEFAULT_RADIUS_KM, help=f'Search radius in km (default: {DEFAULT_RADIUS_KM})') parser.add_argument('--threshold', type=float, default=DEFAULT_THRESHOLD, help=f'Name similarity threshold 0-100 (default: {DEFAULT_THRESHOLD})') parser.add_argument('--verbose', '-v', action='store_true', help='Show detailed matching info') args = parser.parse_args() country = args.country config = COUNTRY_CONFIG[country] print(f"\n{'='*60}") print(f"Location-Based Wikidata Enrichment for {config['name']} ({country})") print(f"{'='*60}") print(f" Wikidata: {config['qid']}") print(f" Languages: {config['languages']}") print(f" Search radius: {args.radius} km") print(f" Name threshold: {args.threshold}%") print(f" Dry run: {args.dry_run}") print() data_dir = Path('data/custodian') # Find candidates print("Finding candidate files...") candidates = find_candidates(country, data_dir) print(f" Found {len(candidates)} files with coordinates but no Wikidata enrichment") if args.limit > 0: candidates = candidates[:args.limit] print(f" Limited to {len(candidates)} files") if not candidates: print("No candidates to process.") return print() # Process each candidate enriched_count = 0 no_match_count = 0 error_count = 0 for i, filepath in enumerate(candidates, 1): print(f"[{i}/{len(candidates)}] {filepath.name}") data = load_custodian_file(filepath) if not data: error_count += 1 continue name = get_custodian_name(data) coords = get_coordinates(data) if not name or not coords: error_count += 1 continue lat, lon = coords # Extract institution type from filename institution_type = get_institution_type_from_filename(filepath.name) if args.verbose: print(f" Name: {name}") print(f" Coords: {lat}, {lon}") print(f" Type: {institution_type or 'unknown'}") # Query Wikidata for nearby institutions nearby = query_nearby_institutions( lat, lon, config['qid'], config['languages'], args.radius ) if args.verbose: print(f" Found {len(nearby)} nearby institutions") for n in nearby[:5]: print(f" - {n['qid']}: {n['label']} ({n.get('type', '?')}) [{n.get('distance_km', '?'):.2f} km]") if not nearby: no_match_count += 1 print(f" -> No nearby heritage institutions found") # Rate limit time.sleep(0.5) continue # Find best match with type checking result = find_best_match( name, nearby, args.threshold, custodian_type=institution_type, verbose=args.verbose ) if not result: no_match_count += 1 print(f" -> No name match above {args.threshold}% threshold") if args.verbose and nearby: # Show what we did find best_candidate = nearby[0] normalized_name = normalize_name(name) normalized_wd = normalize_name(best_candidate['label']) score = fuzz.token_set_ratio(normalized_name, normalized_wd) print(f" Best candidate: {best_candidate['label']} ({best_candidate.get('type', '?')}) - {score}%") # Rate limit time.sleep(0.5) continue match, score = result print(f" -> MATCH: {match['qid']} - {match['label']} ({match.get('type', '?')}) [score: {score:.1f}%]") if not args.dry_run: data = add_wikidata_enrichment(data, match, score) if save_custodian_file(filepath, data): enriched_count += 1 else: error_count += 1 else: enriched_count += 1 # Rate limit to avoid overloading Wikidata time.sleep(0.5) # Summary print() print(f"{'='*60}") print("Summary") print(f"{'='*60}") print(f" Processed: {len(candidates)}") print(f" Enriched: {enriched_count}") print(f" No match: {no_match_count}") print(f" Errors: {error_count}") if args.dry_run: print("\n (Dry run - no files were modified)") # Show updated stats if not args.dry_run and enriched_count > 0: print() total = len(list(data_dir.glob(f"{country}-*.yaml"))) enriched = 0 for f in data_dir.glob(f"{country}-*.yaml"): d = load_custodian_file(f) if d and 'wikidata_enrichment' in d: enriched += 1 print(f" {country} enrichment: {enriched}/{total} ({100*enriched/total:.1f}%)") if __name__ == '__main__': main()