diff --git a/schemas/20251121/linkml/modules/classes/CustodianName.yaml b/schemas/20251121/linkml/modules/classes/CustodianName.yaml index e7c25f1c0b..71d17e2c08 100644 --- a/schemas/20251121/linkml/modules/classes/CustodianName.yaml +++ b/schemas/20251121/linkml/modules/classes/CustodianName.yaml @@ -114,6 +114,46 @@ classes: See: .opencode/ABBREVIATION_SPECIAL_CHAR_RULE.md for complete documentation + =========================================================================== + MANDATORY RULE: Diacritics MUST Be Normalized to ASCII in Abbreviations + =========================================================================== + + When generating abbreviations for GHCID, diacritics (accented characters) + MUST be normalized to their ASCII base letter equivalents. Only ASCII + uppercase letters (A-Z) are permitted in the abbreviation component. + + RATIONALE: + 1. URI/URL safety - Non-ASCII requires percent-encoding + 2. Cross-system compatibility - ASCII is universally supported + 3. Parsing consistency - No special character handling needed + 4. Human readability - Easier to type and communicate + + DIACRITICS TO NORMALIZE (examples by language): + - Czech: Č→C, Ř→R, Š→S, Ž→Z, Ě→E, Ů→U + - Polish: Ł→L, Ń→N, Ó→O, Ś→S, Ź→Z, Ż→Z, Ą→A, Ę→E + - German: Ä→A, Ö→O, Ü→U, ß→SS + - French: É→E, È→E, Ê→E, Ç→C, Ô→O + - Spanish: Ñ→N, Á→A, É→E, Í→I, Ó→O, Ú→U + - Nordic: Å→A, Ä→A, Ö→O, Ø→O, Æ→AE + + EXAMPLES: + - "Vlastivědné muzeum" (Czech) → "VM" (not "VM" with háček) + - "Österreichische Nationalbibliothek" (German) → "ON" + - "Bibliothèque nationale" (French) → "BN" + + REAL-WORLD EXAMPLE: + - ❌ WRONG: CZ-VY-TEL-L-VHSPAOČRZS (contains Č) + - ✅ CORRECT: CZ-VY-TEL-L-VHSPAOCRZS (ASCII only) + + IMPLEMENTATION: + ```python + import unicodedata + normalized = unicodedata.normalize('NFD', text) + ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') + ``` + + See: .opencode/ABBREVIATION_SPECIAL_CHAR_RULE.md for complete documentation + Can be generated by: 1. ReconstructionActivity (formal entity resolution) - was_generated_by link 2. Direct extraction (simple standardization) - no was_generated_by link diff --git a/scripts/enrich_custodian_emic_names.py b/scripts/enrich_custodian_emic_names.py new file mode 100644 index 0000000000..40c9d8dba4 --- /dev/null +++ b/scripts/enrich_custodian_emic_names.py @@ -0,0 +1,557 @@ +#!/usr/bin/env python3 +""" +Enrich UNESCO MoW custodian files with proper CustodianName data. + +This script: +1. Loads multilingual labels from Wikidata cache +2. Determines the appropriate emic (local language) name for each custodian +3. Updates custodian YAML files with: + - custodian_name.emic_name (local language name) + - custodian_name.name_language (ISO 639-1 code) + - custodian_name.standardized_name (same as emic_name for now) +4. Regenerates abbreviations from local language names if different +5. Updates GHCIDs and maintains history for changed abbreviations + +Per AGENTS.md: The abbreviation and optional snake_case name suffix should be +derived from the emic name in the institution's official local/national language. +""" + +import json +import yaml +import unicodedata +import re +from pathlib import Path +from datetime import datetime, timezone +from typing import Optional, Dict, List, Tuple + +# Country to primary official language(s) mapping +# Format: country_name -> (primary_lang, fallback_langs) +# For multilingual countries, we'll check if the institution has a label in any official language +COUNTRY_LANGUAGE_MAP = { + # Europe - Western + 'Germany': ('de', []), + 'Austria': ('de', []), + 'France': ('fr', []), + 'Netherlands': ('nl', []), + 'Belgium': ('nl', ['fr', 'de']), # Check all three official languages + 'Luxembourg': ('lb', ['fr', 'de']), + 'Switzerland': ('de', ['fr', 'it', 'rm']), # Check all four national languages + 'United Kingdom': ('en', []), + 'Ireland': ('en', ['ga']), + + # Europe - Northern + 'Norway': ('nb', ['nn', 'no']), # Bokmål preferred, Nynorsk fallback + 'Sweden': ('sv', []), + 'Denmark': ('da', []), + 'Finland': ('fi', ['sv']), # Swedish is also official + 'Iceland': ('is', []), + + # Europe - Southern + 'Spain': ('es', ['ca', 'eu', 'gl']), # Regional languages + 'Portugal': ('pt', []), + 'Italy': ('it', []), + 'Greece': ('el', []), + 'Malta': ('mt', ['en']), + 'Cyprus': ('el', ['tr']), + + # Europe - Central/Eastern + 'Poland': ('pl', []), + 'Czech Republic': ('cs', []), + 'Czechia': ('cs', []), + 'Slovakia': ('sk', []), + 'Hungary': ('hu', []), + 'Slovenia': ('sl', []), + 'Croatia': ('hr', []), + 'Serbia': ('sr', []), + 'Bosnia and Herzegovina': ('bs', ['hr', 'sr']), + 'North Macedonia': ('mk', []), + 'Albania': ('sq', []), + 'Bulgaria': ('bg', []), + 'Romania': ('ro', []), + 'Moldova': ('ro', []), + 'Ukraine': ('uk', []), + 'Belarus': ('be', ['ru']), + 'Russia': ('ru', []), + 'Estonia': ('et', []), + 'Latvia': ('lv', []), + 'Lithuania': ('lt', []), + + # Americas + 'United States': ('en', []), + 'Canada': ('en', ['fr']), + 'Mexico': ('es', []), + 'Brazil': ('pt', []), + 'Argentina': ('es', []), + 'Chile': ('es', []), + 'Colombia': ('es', []), + 'Peru': ('es', []), + 'Venezuela': ('es', []), + 'Ecuador': ('es', []), + 'Bolivia': ('es', []), + 'Paraguay': ('es', ['gn']), + 'Uruguay': ('es', []), + 'Cuba': ('es', []), + 'Dominican Republic': ('es', []), + 'Puerto Rico': ('es', ['en']), + 'Costa Rica': ('es', []), + 'Panama': ('es', []), + 'Guatemala': ('es', []), + 'Honduras': ('es', []), + 'El Salvador': ('es', []), + 'Nicaragua': ('es', []), + 'Jamaica': ('en', []), + 'Trinidad and Tobago': ('en', []), + 'Barbados': ('en', []), + 'Suriname': ('nl', []), + 'Guyana': ('en', []), + + # Asia - East + 'Japan': ('ja', []), + "People's Republic of China": ('zh', []), + 'China': ('zh', []), + 'Taiwan': ('zh', []), + 'South Korea': ('ko', []), + 'North Korea': ('ko', []), + 'Mongolia': ('mn', []), + + # Asia - Southeast + 'Vietnam': ('vi', []), + 'Thailand': ('th', []), + 'Cambodia': ('km', []), + 'Laos': ('lo', []), + 'Myanmar': ('my', []), + 'Malaysia': ('ms', []), + 'Singapore': ('en', ['zh', 'ms', 'ta']), + 'Indonesia': ('id', []), + 'Philippines': ('tl', ['en']), + 'Brunei': ('ms', []), + 'East Timor': ('pt', ['tet']), + 'Timor-Leste': ('pt', ['tet']), + + # Asia - South + 'India': ('hi', ['en', 'bn', 'ta', 'te', 'mr', 'gu', 'kn', 'ml', 'pa', 'or']), + 'Pakistan': ('ur', ['en']), + 'Bangladesh': ('bn', []), + 'Sri Lanka': ('si', ['ta']), + 'Nepal': ('ne', []), + 'Bhutan': ('dz', []), + 'Maldives': ('dv', []), + + # Asia - Central + 'Kazakhstan': ('kk', ['ru']), + 'Uzbekistan': ('uz', []), + 'Turkmenistan': ('tk', []), + 'Kyrgyzstan': ('ky', ['ru']), + 'Tajikistan': ('tg', []), + 'Afghanistan': ('ps', ['fa']), + + # Asia - West / Middle East + 'Turkey': ('tr', []), + 'Iran': ('fa', []), + 'Iraq': ('ar', ['ku']), + 'Syria': ('ar', []), + 'Lebanon': ('ar', []), + 'Jordan': ('ar', []), + 'Israel': ('he', ['ar']), + 'Palestine': ('ar', []), + 'Saudi Arabia': ('ar', []), + 'United Arab Emirates': ('ar', []), + 'Kuwait': ('ar', []), + 'Qatar': ('ar', []), + 'Bahrain': ('ar', []), + 'Oman': ('ar', []), + 'Yemen': ('ar', []), + 'Georgia': ('ka', []), + 'Armenia': ('hy', []), + 'Azerbaijan': ('az', []), + + # Africa - North + 'Egypt': ('ar', []), + 'Libya': ('ar', []), + 'Tunisia': ('ar', ['fr']), + 'Algeria': ('ar', ['fr']), + 'Morocco': ('ar', ['fr']), + + # Africa - West + 'Nigeria': ('en', []), + 'Ghana': ('en', []), + 'Senegal': ('fr', []), + 'Ivory Coast': ('fr', []), + "Côte d'Ivoire": ('fr', []), + 'Mali': ('fr', []), + 'Burkina Faso': ('fr', []), + 'Niger': ('fr', []), + 'Benin': ('fr', []), + 'Togo': ('fr', []), + 'Guinea': ('fr', []), + 'Sierra Leone': ('en', []), + 'Liberia': ('en', []), + 'Mauritania': ('ar', ['fr']), + 'Cape Verde': ('pt', []), + 'Gambia': ('en', []), + + # Africa - East + 'Kenya': ('sw', ['en']), + 'Tanzania': ('sw', ['en']), + 'Uganda': ('en', ['sw']), + 'Rwanda': ('rw', ['fr', 'en']), + 'Burundi': ('rn', ['fr']), + 'Ethiopia': ('am', []), + 'Eritrea': ('ti', ['ar']), + 'Somalia': ('so', ['ar']), + 'Djibouti': ('fr', ['ar']), + 'Madagascar': ('mg', ['fr']), + 'Mauritius': ('en', ['fr']), + 'Seychelles': ('en', ['fr']), + + # Africa - Central + 'Democratic Republic of the Congo': ('fr', []), + 'Republic of the Congo': ('fr', []), + 'Central African Republic': ('fr', []), + 'Chad': ('fr', ['ar']), + 'Cameroon': ('fr', ['en']), + 'Gabon': ('fr', []), + 'Equatorial Guinea': ('es', ['fr', 'pt']), + + # Africa - Southern + 'South Africa': ('en', ['af', 'zu', 'xh']), + 'Namibia': ('en', ['de', 'af']), + 'Botswana': ('en', ['tn']), + 'Zimbabwe': ('en', ['sn', 'nd']), + 'Zambia': ('en', []), + 'Malawi': ('en', []), + 'Mozambique': ('pt', []), + 'Angola': ('pt', []), + 'Lesotho': ('en', ['st']), + 'Eswatini': ('en', ['ss']), + + # Oceania + 'Australia': ('en', []), + 'New Zealand': ('en', ['mi']), + 'Papua New Guinea': ('en', ['tpi', 'ho']), + 'Fiji': ('en', ['fj', 'hi']), + 'Vanuatu': ('en', ['fr', 'bi']), + 'Samoa': ('sm', ['en']), + 'Tonga': ('to', ['en']), + 'Solomon Islands': ('en', []), + 'Kiribati': ('en', ['gil']), + 'Micronesia': ('en', []), + 'Palau': ('en', ['pau']), + + # Caribbean + 'Haiti': ('ht', ['fr']), + 'Bahamas': ('en', []), + 'Curaçao': ('nl', ['pap']), + 'Aruba': ('nl', ['pap']), + + # Default fallback + 'Unknown': ('en', []), +} + + +def normalize_diacritics(text: str) -> str: + """Normalize diacritics to ASCII equivalents.""" + normalized = unicodedata.normalize('NFD', text) + ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn') + return ascii_text + + +def get_significant_words(text: str) -> List[str]: + """Extract significant words from a name, skipping articles/prepositions.""" + # Skip words by language + SKIP_WORDS = { + # Dutch + 'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des', "'s", + 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', 'over', 'onder', 'door', 'en', 'of', + # English + 'a', 'an', 'the', 'of', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as', 'under', + 'and', 'or', 'but', + # French + 'le', 'la', 'les', 'un', 'une', 'des', 'du', 'à', 'au', 'aux', 'dans', 'sur', 'sous', + 'pour', 'par', 'avec', "l'", "d'", 'et', 'ou', + # German + 'der', 'die', 'das', 'dem', 'den', 'ein', 'eine', 'einer', 'einem', 'einen', + 'von', 'zu', 'für', 'bei', 'nach', 'aus', 'vor', 'über', 'unter', 'durch', 'und', 'oder', + # Spanish + 'el', 'los', 'las', 'unos', 'unas', 'del', 'al', 'con', 'por', 'para', 'sobre', 'bajo', + 'y', 'o', 'e', 'u', + # Portuguese + 'o', 'os', 'as', 'um', 'uma', 'uns', 'umas', 'do', 'da', 'dos', 'das', 'em', 'no', 'na', + 'nos', 'nas', 'com', 'sob', + # Italian + 'il', 'lo', 'gli', 'uno', 'di', 'dello', 'della', 'dei', 'degli', 'delle', + 'allo', 'alla', 'ai', 'agli', 'alle', 'dal', 'dallo', 'dalla', 'dai', 'dagli', 'dalle', + 'nel', 'nello', 'nella', 'nei', 'negli', 'nelle', 'sul', 'sullo', 'sulla', 'sui', 'sugli', + 'sulle', 'per', 'tra', 'fra', 'ed', 'od', + # Russian (transliterated) + 'i', 'v', 'na', 'pri', + } + + words = text.split() + significant = [] + for word in words: + # Clean word + clean_word = re.sub(r"[''`\",.:;!?()[\]{}]", '', word.lower()) + if clean_word and clean_word not in SKIP_WORDS: + # Skip pure numbers + if not clean_word.isdigit() and not re.match(r'^\d+-\d+$', clean_word): + significant.append(word) + + return significant + + +def generate_abbreviation(name: str, max_length: int = 10) -> str: + """Generate abbreviation from emic name using first letters of significant words.""" + significant_words = get_significant_words(name) + + if not significant_words: + # Fallback: use first letters of all words + significant_words = name.split()[:3] + + # Take first letter of each word + abbrev = '' + for word in significant_words: + # Clean the word of special characters + clean = re.sub(r"[''`\",.:;!?()[\]{}&/\\+@#$%*|=<>~^_-]", '', word) + if clean: + # Normalize diacritics and take first letter + first_letter = normalize_diacritics(clean[0]).upper() + if first_letter.isalpha(): + abbrev += first_letter + + # Ensure at least 2 characters + if len(abbrev) < 2: + # Try to get more from the name + clean_name = normalize_diacritics(name) + clean_name = re.sub(r'[^A-Za-z]', '', clean_name) + abbrev = clean_name[:3].upper() + + return abbrev[:max_length] + + +def get_emic_name(custodian: Dict, country: str, labels: Dict[str, str]) -> Tuple[str, str]: + """ + Determine the appropriate emic (local language) name for a custodian. + + Returns: (emic_name, language_code) + """ + # Get language mapping for country + if country in COUNTRY_LANGUAGE_MAP: + primary_lang, fallback_langs = COUNTRY_LANGUAGE_MAP[country] + all_langs = [primary_lang] + fallback_langs + else: + # Unknown country - default to English + all_langs = ['en'] + + # Try each language in order + for lang in all_langs: + if lang in labels: + return labels[lang], lang + + # Fallback to English if available + if 'en' in labels: + return labels['en'], 'en' + + # Ultimate fallback: first available label + if labels: + first_lang = next(iter(labels)) + return labels[first_lang], first_lang + + # No labels at all - use original name + return custodian.get('name_en', 'Unknown'), 'en' + + +def load_custodian_file(filepath: Path) -> Optional[Dict]: + """Load a custodian YAML file.""" + try: + with open(filepath, 'r', encoding='utf-8') as f: + return yaml.safe_load(f) + except Exception as e: + print(f"Error loading {filepath}: {e}") + return None + + +def save_custodian_file(filepath: Path, data: Dict): + """Save a custodian YAML file.""" + with open(filepath, 'w', encoding='utf-8') as f: + yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + + +def main(): + # Paths + project_root = Path(__file__).parent.parent + custodian_dir = project_root / 'data' / 'custodian' + cache_dir = project_root / 'data' / 'cache' + + # Load data + print("Loading multilingual labels...") + with open(cache_dir / 'unesco_mow_multilingual_labels.json', 'r') as f: + labels_data = json.load(f) + wikidata_labels = labels_data['custodians'] + + print("Loading original custodian info...") + with open(cache_dir / 'unesco_mow_custodians.json', 'r') as f: + custodians_data = json.load(f) + original_custodians = {c['wikidata_id']: c for c in custodians_data['custodians']} + + print(f"Processing {len(original_custodians)} UNESCO MoW custodians...\n") + + # Statistics + stats = { + 'total': 0, + 'updated': 0, + 'abbreviation_changed': 0, + 'ghcid_changed': 0, + 'not_found': 0, + 'errors': 0, + 'already_enriched': 0, + } + + # Track changes for reporting + changes = [] + + timestamp = datetime.now(timezone.utc).isoformat() + + for qid, custodian in original_custodians.items(): + stats['total'] += 1 + + # Find the custodian file by Wikidata ID + matching_files = list(custodian_dir.glob('*.yaml')) + custodian_file = None + + for filepath in matching_files: + data = load_custodian_file(filepath) + if data: + # Check if this is the right custodian by Wikidata ID + wikidata_id = data.get('original_entry', {}).get('wikidata_id') or \ + data.get('wikidata_enrichment', {}).get('wikidata_entity_id') + if wikidata_id == qid: + custodian_file = filepath + break + + if not custodian_file: + stats['not_found'] += 1 + continue + + # Load full custodian data + data = load_custodian_file(custodian_file) + if not data: + stats['errors'] += 1 + continue + + # Get labels for this custodian + labels_info = wikidata_labels.get(qid, {}) + labels = labels_info.get('labels', {}) + + if not labels: + print(f" No labels found for {qid}") + continue + + # Determine emic name + country = custodian.get('country', 'Unknown') + emic_name, lang_code = get_emic_name(custodian, country, labels) + + # Current values + current_name = data.get('custodian_name', {}).get('claim_value', '') + current_emic = data.get('custodian_name', {}).get('emic_name', '') + + # Check if already enriched with emic_name + if current_emic and current_emic == emic_name: + stats['already_enriched'] += 1 + continue + + # Generate abbreviation from emic name + new_abbrev = generate_abbreviation(emic_name) + + # Get current abbreviation from GHCID + current_ghcid = data.get('ghcid', {}).get('ghcid_current', '') + current_abbrev = current_ghcid.split('-')[-1] if current_ghcid else '' + + # Update custodian_name + if 'custodian_name' not in data: + data['custodian_name'] = {} + + data['custodian_name']['emic_name'] = emic_name + data['custodian_name']['name_language'] = lang_code + data['custodian_name']['standardized_name'] = emic_name + + # Keep original English name if different + if current_name and current_name != emic_name: + if 'alternative_names' not in data['custodian_name']: + data['custodian_name']['alternative_names'] = [] + if current_name not in [n.get('name') if isinstance(n, dict) else n + for n in data['custodian_name']['alternative_names']]: + data['custodian_name']['alternative_names'].append({ + 'name': current_name, + 'language': 'en', + 'source': 'wikidata' + }) + + # Track change + change_info = { + 'wikidata_id': qid, + 'file': custodian_file.name, + 'country': country, + 'old_name': current_name, + 'new_emic_name': emic_name, + 'language': lang_code, + 'old_abbrev': current_abbrev, + 'new_abbrev': new_abbrev, + } + + # Check if abbreviation changed + if new_abbrev != current_abbrev and current_abbrev: + stats['abbreviation_changed'] += 1 + change_info['abbrev_changed'] = True + + # TODO: For now, we don't update GHCID - that requires more careful handling + # with collision detection. Just log the change. + print(f" ABBREV CHANGE: {custodian_file.name}") + print(f" {country}: {current_name}") + print(f" Emic ({lang_code}): {emic_name}") + print(f" Abbrev: {current_abbrev} → {new_abbrev}") + + changes.append(change_info) + + # Save updated file + save_custodian_file(custodian_file, data) + stats['updated'] += 1 + + # Print summary + print("\n" + "=" * 60) + print("ENRICHMENT SUMMARY") + print("=" * 60) + print(f"Total custodians processed: {stats['total']}") + print(f"Files updated: {stats['updated']}") + print(f"Already enriched: {stats['already_enriched']}") + print(f"Abbreviation changes detected: {stats['abbreviation_changed']}") + print(f"Files not found: {stats['not_found']}") + print(f"Errors: {stats['errors']}") + + # Save changes log + changes_log = { + 'timestamp': timestamp, + 'stats': stats, + 'changes': changes + } + + log_file = cache_dir / 'emic_name_enrichment_log.json' + with open(log_file, 'w', encoding='utf-8') as f: + json.dump(changes_log, f, indent=2, ensure_ascii=False) + print(f"\nChanges log saved to: {log_file}") + + # Show sample of abbreviation changes + abbrev_changes = [c for c in changes if c.get('abbrev_changed')] + if abbrev_changes[:10]: + print("\n" + "-" * 60) + print("Sample abbreviation changes (not yet applied to GHCID):") + print("-" * 60) + for c in abbrev_changes[:10]: + print(f" {c['country']}: {c['old_abbrev']} → {c['new_abbrev']}") + print(f" EN: {c['old_name']}") + print(f" {c['language'].upper()}: {c['new_emic_name']}") + print() + + +if __name__ == '__main__': + main() diff --git a/scripts/enrich_custodian_youtube_maps.py b/scripts/enrich_custodian_youtube_maps.py new file mode 100644 index 0000000000..af0de2cfd7 --- /dev/null +++ b/scripts/enrich_custodian_youtube_maps.py @@ -0,0 +1,912 @@ +#!/usr/bin/env python3 +""" +Enrich Heritage Custodian YAML files with YouTube and Google Maps data. + +This script enriches custodian files in data/custodian/ with: +1. YouTube channel/video data (if channel can be found) +2. Google Maps/Places API data (address, ratings, reviews, photos) +3. GLM-4.6 verification of matches (CH-Annotator convention) + +Usage: + python scripts/enrich_custodian_youtube_maps.py [--dry-run] [--limit N] [--force] + python scripts/enrich_custodian_youtube_maps.py --files FILE1.yaml FILE2.yaml + python scripts/enrich_custodian_youtube_maps.py --pattern "ZA-*.yaml" + +Environment Variables: + GOOGLE_PLACES_TOKEN - Required for Google Maps enrichment + GOOGLE_YOUTUBE_TOKEN - Required for YouTube enrichment + ZAI_API_TOKEN - Required for GLM-4.6 verification (optional but recommended) + +Author: GLAM Data Extraction Project +Date: December 2025 +""" + +import argparse +import asyncio +import fnmatch +import json +import logging +import os +import re +import sys +import time +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import httpx +import yaml + +# Add project src to path +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT / "src")) + +# Load environment variables +from dotenv import load_dotenv +load_dotenv(PROJECT_ROOT / ".env") + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# ============================================================================ +# Configuration +# ============================================================================ + +CUSTODIAN_DIR = PROJECT_ROOT / "data/custodian" + +# API Keys +GOOGLE_PLACES_TOKEN = os.getenv("GOOGLE_PLACES_TOKEN", "") +GOOGLE_YOUTUBE_TOKEN = os.getenv("GOOGLE_YOUTUBE_TOKEN", "") +# Z.AI GLM 4.6 API for CH-Annotator verification (NOT Anthropic Claude) +ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "") + +# API Endpoints +YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3" +TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText" +# Z.AI GLM 4.6 API endpoint (Anthropic-compatible interface) +ZAI_API_BASE = "https://api.z.ai/api/anthropic/v1" +ZAI_MODEL = "glm-4.6" + +# Rate limiting +REQUEST_DELAY = 0.3 # seconds between API calls + +# CH-Annotator convention version +CH_ANNOTATOR_VERSION = "ch_annotator-v1_7_0" + +# Google Places fields to request +PLACE_FIELDS = [ + "id", "displayName", "formattedAddress", "addressComponents", + "location", "types", "businessStatus", "internationalPhoneNumber", + "nationalPhoneNumber", "regularOpeningHours", "currentOpeningHours", + "websiteUri", "rating", "userRatingCount", "reviews", "priceLevel", + "photos", "googleMapsUri", "utcOffsetMinutes", "primaryType", + "primaryTypeDisplayName", "shortFormattedAddress", "editorialSummary", +] + +# ============================================================================ +# Utility Functions +# ============================================================================ + +def get_institution_name(entry: Dict[str, Any]) -> str: + """Extract institution name from custodian entry.""" + # Try custodian_name.claim_value first + if entry.get("custodian_name", {}).get("claim_value"): + return entry["custodian_name"]["claim_value"] + # Fall back to wikidata label + if entry.get("wikidata_enrichment", {}).get("wikidata_label_en"): + return entry["wikidata_enrichment"]["wikidata_label_en"] + # Fall back to original entry + if entry.get("original_entry", {}).get("name"): + return entry["original_entry"]["name"] + return "" + + +def get_country_code(entry: Dict[str, Any]) -> str: + """Extract country code from entry.""" + loc = entry.get("ghcid", {}).get("location_resolution", {}) + if loc.get("country_code"): + return loc["country_code"] + # Parse from GHCID + ghcid = entry.get("ghcid", {}).get("ghcid_current", "") + if ghcid and "-" in ghcid: + return ghcid.split("-")[0] + return "" + + +def get_coordinates(entry: Dict[str, Any]) -> Optional[Tuple[float, float]]: + """Extract coordinates from entry if available.""" + loc = entry.get("ghcid", {}).get("location_resolution", {}) + src = loc.get("source_coordinates", {}) + if src.get("latitude") and src.get("longitude"): + return (src["latitude"], src["longitude"]) + return None + + +def get_city_name(entry: Dict[str, Any]) -> str: + """Extract city name from entry.""" + loc = entry.get("ghcid", {}).get("location_resolution", {}) + return loc.get("city_name", "") + + +def get_wikidata_id(entry: Dict[str, Any]) -> str: + """Extract Wikidata ID from entry.""" + if entry.get("wikidata_enrichment", {}).get("wikidata_entity_id"): + return entry["wikidata_enrichment"]["wikidata_entity_id"] + if entry.get("original_entry", {}).get("wikidata_id"): + return entry["original_entry"]["wikidata_id"] + return "" + + +# ============================================================================ +# Google Maps Enrichment +# ============================================================================ + +def build_maps_search_query(entry: Dict[str, Any]) -> str: + """Build Google Maps search query from entry data.""" + parts = [] + + name = get_institution_name(entry) + if name: + parts.append(name) + + city = get_city_name(entry) + if city: + parts.append(city) + + # Get country name + loc = entry.get("ghcid", {}).get("location_resolution", {}) + country = loc.get("country_label", "") + if country: + parts.append(country) + + return ", ".join(parts) + + +def search_google_place( + query: str, + client: httpx.Client, + country_code: str = "", + location_bias: Optional[Tuple[float, float]] = None, +) -> Optional[Dict[str, Any]]: + """Search for a place using Google Places API (New).""" + if not GOOGLE_PLACES_TOKEN: + logger.warning("GOOGLE_PLACES_TOKEN not set, skipping Maps enrichment") + return None + + headers = { + "Content-Type": "application/json", + "X-Goog-Api-Key": GOOGLE_PLACES_TOKEN, + "X-Goog-FieldMask": ",".join([f"places.{f}" for f in PLACE_FIELDS]), + } + + body = { + "textQuery": query, + "maxResultCount": 1, + } + + # Set language/region based on country + if country_code == "ZA": + body["languageCode"] = "en" + body["regionCode"] = "ZA" + elif country_code == "ZW": + body["languageCode"] = "en" + body["regionCode"] = "ZW" + + # Add location bias if coordinates available + if location_bias: + lat, lng = location_bias + body["locationBias"] = { + "circle": { + "center": {"latitude": lat, "longitude": lng}, + "radius": 50000.0 # 50km radius + } + } + + try: + response = client.post(TEXT_SEARCH_URL, headers=headers, json=body) + response.raise_for_status() + data = response.json() + + places = data.get("places", []) + if places: + return places[0] + else: + logger.warning(f"No place found for: {query}") + return None + + except httpx.HTTPStatusError as e: + error_data = {} + try: + error_data = e.response.json() + except Exception: + pass + error_msg = error_data.get("error", {}).get("message", str(e)) + logger.error(f"Google Places API error: {error_msg}") + return None + except Exception as e: + logger.error(f"Error searching for '{query}': {e}") + return None + + +def parse_google_place(place: Dict[str, Any]) -> Dict[str, Any]: + """Parse Google Places API response into enrichment dict.""" + result = { + "place_id": place.get("id", ""), + "name": place.get("displayName", {}).get("text", ""), + "fetch_timestamp": datetime.now(timezone.utc).isoformat(), + "api_status": "OK", + } + + # Location + location = place.get("location", {}) + if location.get("latitude") and location.get("longitude"): + result["coordinates"] = { + "latitude": location["latitude"], + "longitude": location["longitude"], + } + + if place.get("formattedAddress"): + result["formatted_address"] = place["formattedAddress"] + if place.get("shortFormattedAddress"): + result["short_address"] = place["shortFormattedAddress"] + + # Contact + if place.get("nationalPhoneNumber"): + result["phone_local"] = place["nationalPhoneNumber"] + if place.get("internationalPhoneNumber"): + result["phone_international"] = place["internationalPhoneNumber"] + if place.get("websiteUri"): + result["website"] = place["websiteUri"] + + # Business info + if place.get("types"): + result["google_place_types"] = place["types"] + if place.get("primaryType"): + result["primary_type"] = place["primaryType"] + if place.get("businessStatus"): + result["business_status"] = place["businessStatus"] + + # Ratings and reviews + if place.get("rating") is not None: + result["rating"] = place["rating"] + if place.get("userRatingCount") is not None: + result["total_ratings"] = place["userRatingCount"] + + # Parse reviews + reviews = place.get("reviews", []) + if reviews: + result["reviews"] = [ + { + "author_name": r.get("authorAttribution", {}).get("displayName"), + "author_uri": r.get("authorAttribution", {}).get("uri"), + "rating": r.get("rating"), + "relative_time_description": r.get("relativePublishTimeDescription"), + "text": r.get("text", {}).get("text"), + "publish_time": r.get("publishTime"), + } + for r in reviews + ] + + # Opening hours + if place.get("regularOpeningHours"): + result["opening_hours"] = { + "open_now": place.get("currentOpeningHours", {}).get("openNow"), + "weekday_text": place["regularOpeningHours"].get("weekdayDescriptions"), + } + + # Editorial summary + if place.get("editorialSummary"): + result["editorial_summary"] = place["editorialSummary"].get("text") + + # Photos (just references, not downloading) + photos = place.get("photos", []) + if photos: + result["photo_count"] = len(photos) + result["photos_metadata"] = [ + { + "name": p.get("name"), + "height": p.get("heightPx"), + "width": p.get("widthPx"), + } + for p in photos[:5] # First 5 only + ] + + # Links + if place.get("googleMapsUri"): + result["google_maps_url"] = place["googleMapsUri"] + + return result + + +# ============================================================================ +# YouTube Enrichment +# ============================================================================ + +def search_youtube_channel( + query: str, + client: httpx.Client, +) -> Optional[Dict[str, Any]]: + """Search for a YouTube channel.""" + if not GOOGLE_YOUTUBE_TOKEN: + logger.warning("GOOGLE_YOUTUBE_TOKEN not set, skipping YouTube enrichment") + return None + + params = { + "part": "snippet", + "type": "channel", + "q": query, + "maxResults": 3, # Get top 3 for verification + "key": GOOGLE_YOUTUBE_TOKEN, + } + + try: + response = client.get( + f"{YOUTUBE_API_BASE}/search", + params=params, + timeout=30.0 + ) + response.raise_for_status() + data = response.json() + + items = data.get("items", []) + if items: + # Return all candidates for LLM verification + return {"candidates": items, "query": query} + return None + + except httpx.HTTPStatusError as e: + if "quotaExceeded" in str(e): + logger.error("YouTube API quota exceeded") + else: + logger.error(f"YouTube API error: {e}") + return None + except Exception as e: + logger.error(f"Error searching YouTube for '{query}': {e}") + return None + + +def get_youtube_channel_details( + channel_id: str, + client: httpx.Client, +) -> Optional[Dict[str, Any]]: + """Get detailed channel information.""" + if not GOOGLE_YOUTUBE_TOKEN: + return None + + params = { + "part": "snippet,statistics,brandingSettings,contentDetails", + "id": channel_id, + "key": GOOGLE_YOUTUBE_TOKEN, + } + + try: + response = client.get( + f"{YOUTUBE_API_BASE}/channels", + params=params, + timeout=30.0 + ) + response.raise_for_status() + data = response.json() + + items = data.get("items", []) + if items: + return items[0] + return None + + except Exception as e: + logger.error(f"Error getting channel details for '{channel_id}': {e}") + return None + + +def parse_youtube_channel(channel: Dict[str, Any]) -> Dict[str, Any]: + """Parse YouTube channel API response.""" + snippet = channel.get("snippet", {}) + stats = channel.get("statistics", {}) + branding = channel.get("brandingSettings", {}) + + result = { + "channel_id": channel.get("id", ""), + "channel_url": f"https://www.youtube.com/channel/{channel.get('id', '')}", + "title": snippet.get("title", ""), + "description": snippet.get("description", ""), + "custom_url": snippet.get("customUrl", ""), + "published_at": snippet.get("publishedAt", ""), + "country": snippet.get("country", ""), + "fetch_timestamp": datetime.now(timezone.utc).isoformat(), + } + + # Statistics + if stats.get("subscriberCount"): + result["subscriber_count"] = int(stats["subscriberCount"]) + if stats.get("videoCount"): + result["video_count"] = int(stats["videoCount"]) + if stats.get("viewCount"): + result["view_count"] = int(stats["viewCount"]) + + # Thumbnails + thumbnails = snippet.get("thumbnails", {}) + if thumbnails.get("high", {}).get("url"): + result["thumbnail_url"] = thumbnails["high"]["url"] + + return result + + +# ============================================================================ +# Z.AI GLM 4.6 Verification with Exponential Backoff (CH-Annotator) +# ============================================================================ + +MAX_RETRIES = 3 +BASE_DELAY = 1.0 # seconds +MAX_DELAY = 30.0 # seconds + + +async def call_glm_with_retry( + prompt: str, + max_retries: int = MAX_RETRIES, +) -> Optional[str]: + """ + Call Z.AI GLM 4.6 API with exponential backoff retry. + + Uses Anthropic-compatible interface at api.z.ai. + + Returns: + Response content string or None if all retries fail + """ + headers = { + "x-api-key": ZAI_API_TOKEN, + "anthropic-version": "2023-06-01", + "Content-Type": "application/json", + } + + body = { + "model": ZAI_MODEL, + "max_tokens": 500, + "messages": [ + {"role": "user", "content": prompt} + ], + } + + for attempt in range(max_retries): + try: + async with httpx.AsyncClient() as client: + response = await client.post( + f"{ZAI_API_BASE}/messages", + headers=headers, + json=body, + timeout=60.0 + ) + response.raise_for_status() + data = response.json() + + # Anthropic-compatible response format + content_blocks = data.get("content", []) + if content_blocks and content_blocks[0].get("type") == "text": + return content_blocks[0].get("text", "") + return "" + + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + # Rate limited - exponential backoff + delay = min(BASE_DELAY * (2 ** attempt), MAX_DELAY) + logger.warning(f"Rate limited, waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})") + await asyncio.sleep(delay) + else: + logger.error(f"GLM 4.6 API error: {e}") + return None + except Exception as e: + logger.error(f"GLM 4.6 API call failed: {e}") + return None + + logger.error(f"All {max_retries} GLM 4.6 API retries exhausted") + return None + + +async def verify_match_with_llm( + institution_name: str, + institution_info: Dict[str, Any], + candidate_name: str, + candidate_info: Dict[str, Any], + match_type: str, # "google_maps" or "youtube" +) -> Dict[str, Any]: + """ + Use Z.AI GLM 4.6 to verify if a candidate match is correct. + + Returns: + Dict with keys: + - is_match: bool + - confidence: float (0.0-1.0) + - reasoning: str + - agent: str (model version) + """ + if not ZAI_API_TOKEN: + logger.warning("ZAI_API_TOKEN not set, skipping LLM verification") + return { + "is_match": None, + "confidence": 0.5, + "reasoning": "LLM verification skipped - no API key", + "agent": "none", + "verified": False, + } + + # Build verification prompt + if match_type == "google_maps": + prompt = f"""You are verifying if a Google Maps place matches a heritage institution. + +INSTITUTION: +- Name: {institution_name} +- Wikidata: {institution_info.get('wikidata_id', 'N/A')} +- City: {institution_info.get('city', 'N/A')} +- Country: {institution_info.get('country', 'N/A')} +- Type: {institution_info.get('type', 'N/A')} + +GOOGLE MAPS CANDIDATE: +- Name: {candidate_name} +- Address: {candidate_info.get('formatted_address', 'N/A')} +- Types: {candidate_info.get('google_place_types', 'N/A')} +- Website: {candidate_info.get('website', 'N/A')} + +Is this Google Maps place the same institution? Consider: +1. Name similarity (allowing for translations/abbreviations) +2. Location consistency +3. Type consistency (archive, museum, library, etc.) + +Respond in JSON format: +{{"is_match": true/false, "confidence": 0.0-1.0, "reasoning": "..."}} +""" + else: # youtube + prompt = f"""You are verifying if a YouTube channel belongs to a heritage institution. + +INSTITUTION: +- Name: {institution_name} +- Wikidata: {institution_info.get('wikidata_id', 'N/A')} +- City: {institution_info.get('city', 'N/A')} +- Country: {institution_info.get('country', 'N/A')} +- Type: {institution_info.get('type', 'N/A')} + +YOUTUBE CHANNEL CANDIDATE: +- Title: {candidate_name} +- Description: {candidate_info.get('description', 'N/A')[:500]} +- Country: {candidate_info.get('country', 'N/A')} +- Subscribers: {candidate_info.get('subscriber_count', 'N/A')} + +Is this YouTube channel the official channel of this institution? Consider: +1. Name similarity +2. Description relevance to heritage/archives/museums +3. Location consistency + +Respond in JSON format: +{{"is_match": true/false, "confidence": 0.0-1.0, "reasoning": "..."}} +""" + + # Call GLM 4.6 API with retry + content = await call_glm_with_retry(prompt) + + if content is None: + return { + "is_match": None, + "confidence": 0.5, + "reasoning": "LLM verification failed - API error", + "agent": ZAI_MODEL, + "verified": False, + } + + # Parse JSON response + try: + # Extract JSON from response + json_match = re.search(r'\{[^}]+\}', content, re.DOTALL) + if json_match: + result = json.loads(json_match.group()) + result["agent"] = ZAI_MODEL + result["verified"] = True + result["ch_annotator_version"] = CH_ANNOTATOR_VERSION + return result + except json.JSONDecodeError: + pass + + # Fallback if JSON parsing fails + is_match = "true" in content.lower() and "false" not in content.lower() + return { + "is_match": is_match, + "confidence": 0.7 if is_match else 0.3, + "reasoning": content[:200], + "agent": ZAI_MODEL, + "verified": True, + "ch_annotator_version": CH_ANNOTATOR_VERSION, + } + + +# ============================================================================ +# Main Enrichment Pipeline +# ============================================================================ + +async def enrich_custodian_file( + filepath: Path, + client: httpx.Client, + force: bool = False, + dry_run: bool = False, +) -> Tuple[bool, str]: + """ + Enrich a single custodian YAML file with YouTube and Google Maps data. + + Returns: + Tuple of (modified: bool, status: str) + """ + logger.info(f"Processing: {filepath.name}") + + # Load YAML + with open(filepath, 'r', encoding='utf-8') as f: + entry = yaml.safe_load(f) + + if not entry: + return False, "Empty file" + + modified = False + statuses = [] + + # Check if already enriched + has_maps = entry.get("google_maps_enrichment") is not None + has_youtube = entry.get("youtube_enrichment") is not None + + if not force and has_maps and has_youtube: + return False, "Already enriched (use --force to re-enrich)" + + # Extract info for matching + institution_name = get_institution_name(entry) + if not institution_name: + return False, "No institution name found" + + country_code = get_country_code(entry) + city_name = get_city_name(entry) + coords = get_coordinates(entry) + wikidata_id = get_wikidata_id(entry) + + institution_info = { + "wikidata_id": wikidata_id, + "city": city_name, + "country": country_code, + "type": entry.get("wikidata_enrichment", {}).get("instance_of", ""), + } + + logger.info(f" Institution: {institution_name}") + logger.info(f" Location: {city_name}, {country_code}") + + # ------------------------------------------------------------------------- + # Google Maps Enrichment + # ------------------------------------------------------------------------- + if not has_maps or force: + query = build_maps_search_query(entry) + logger.info(f" Maps query: {query}") + + time.sleep(REQUEST_DELAY) + place = search_google_place(query, client, country_code, coords) + + if place: + maps_data = parse_google_place(place) + candidate_name = maps_data.get("name", "") + logger.info(f" Maps found: {candidate_name}") + + # LLM verification + verification = await verify_match_with_llm( + institution_name, + institution_info, + candidate_name, + maps_data, + "google_maps" + ) + + if verification.get("is_match") is True: + maps_data["llm_verification"] = verification + entry["google_maps_enrichment"] = maps_data + entry["google_maps_status"] = "SUCCESS" + modified = True + statuses.append(f"Maps: {candidate_name} (conf: {verification.get('confidence', 0):.2f})") + logger.info(f" ✓ Maps verified: {verification.get('reasoning', '')[:60]}") + elif verification.get("is_match") is False: + entry["google_maps_status"] = "NO_MATCH" + entry["google_maps_rejected"] = { + "candidate_name": candidate_name, + "rejection_reason": verification.get("reasoning", ""), + "timestamp": datetime.now(timezone.utc).isoformat(), + } + modified = True + statuses.append("Maps: rejected by LLM") + logger.info(f" ✗ Maps rejected: {verification.get('reasoning', '')[:60]}") + else: + # Verification skipped or failed - include with warning + maps_data["llm_verification"] = verification + entry["google_maps_enrichment"] = maps_data + entry["google_maps_status"] = "UNVERIFIED" + modified = True + statuses.append(f"Maps: {candidate_name} (unverified)") + else: + entry["google_maps_status"] = "NOT_FOUND" + entry["google_maps_search_query"] = query + entry["google_maps_search_timestamp"] = datetime.now(timezone.utc).isoformat() + modified = True + statuses.append("Maps: not found") + + # ------------------------------------------------------------------------- + # YouTube Enrichment + # ------------------------------------------------------------------------- + if not has_youtube or force: + # Build YouTube search query + youtube_query = f"{institution_name} official" + logger.info(f" YouTube query: {youtube_query}") + + time.sleep(REQUEST_DELAY) + search_result = search_youtube_channel(youtube_query, client) + + if search_result and search_result.get("candidates"): + candidates = search_result["candidates"] + logger.info(f" YouTube candidates: {len(candidates)}") + + # Try each candidate + best_match = None + best_verification = None + + for candidate in candidates[:3]: # Top 3 candidates + channel_id = candidate.get("id", {}).get("channelId") + if not channel_id: + continue + + # Get full channel details + time.sleep(REQUEST_DELAY) + channel_details = get_youtube_channel_details(channel_id, client) + + if not channel_details: + continue + + youtube_data = parse_youtube_channel(channel_details) + candidate_name = youtube_data.get("title", "") + + # LLM verification + verification = await verify_match_with_llm( + institution_name, + institution_info, + candidate_name, + youtube_data, + "youtube" + ) + + if verification.get("is_match") is True: + if best_verification is None or verification.get("confidence", 0) > best_verification.get("confidence", 0): + best_match = youtube_data + best_verification = verification + logger.info(f" YouTube match: {candidate_name} (conf: {verification.get('confidence', 0):.2f})") + + if best_match: + best_match["llm_verification"] = best_verification + entry["youtube_enrichment"] = best_match + entry["youtube_status"] = "SUCCESS" + modified = True + statuses.append(f"YouTube: {best_match.get('title', '')} ({best_match.get('subscriber_count', 0)} subs)") + else: + entry["youtube_status"] = "NO_MATCH" + entry["youtube_search_query"] = youtube_query + entry["youtube_candidates_rejected"] = len(candidates) + entry["youtube_search_timestamp"] = datetime.now(timezone.utc).isoformat() + modified = True + statuses.append("YouTube: no verified match") + else: + entry["youtube_status"] = "NOT_FOUND" + entry["youtube_search_query"] = youtube_query + entry["youtube_search_timestamp"] = datetime.now(timezone.utc).isoformat() + modified = True + statuses.append("YouTube: not found") + + # ------------------------------------------------------------------------- + # Add provenance note + # ------------------------------------------------------------------------- + if modified: + if "provenance" not in entry: + entry["provenance"] = {} + if "notes" not in entry["provenance"]: + entry["provenance"]["notes"] = [] + + timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + entry["provenance"]["notes"].append( + f"YouTube/Google Maps enrichment {timestamp}: {'; '.join(statuses)}" + ) + + # ------------------------------------------------------------------------- + # Save file + # ------------------------------------------------------------------------- + if modified and not dry_run: + with open(filepath, 'w', encoding='utf-8') as f: + yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False) + logger.info(f" Saved: {filepath.name}") + + status = "; ".join(statuses) if statuses else "No changes" + return modified, status + + +async def main(): + """Main entry point.""" + parser = argparse.ArgumentParser( + description="Enrich custodian files with YouTube and Google Maps data" + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Don't save changes, just show what would be done" + ) + parser.add_argument( + "--force", + action="store_true", + help="Re-enrich even if already enriched" + ) + parser.add_argument( + "--limit", + type=int, + default=None, + help="Limit number of files to process" + ) + parser.add_argument( + "--files", + nargs="+", + help="Specific files to process (just filenames)" + ) + parser.add_argument( + "--pattern", + type=str, + default=None, + help="Glob pattern for files (e.g., 'ZA-*.yaml')" + ) + + args = parser.parse_args() + + # Check for required API keys + if not GOOGLE_PLACES_TOKEN and not GOOGLE_YOUTUBE_TOKEN: + logger.error("No API keys found! Set GOOGLE_PLACES_TOKEN or GOOGLE_YOUTUBE_TOKEN") + sys.exit(1) + + # Find files to process + if args.files: + files = [CUSTODIAN_DIR / f for f in args.files] + files = [f for f in files if f.exists()] + elif args.pattern: + files = sorted(CUSTODIAN_DIR.glob(args.pattern)) + else: + files = sorted(CUSTODIAN_DIR.glob("*.yaml")) + + if args.limit: + files = files[:args.limit] + + logger.info(f"Found {len(files)} files to process") + + if args.dry_run: + logger.info("DRY RUN - no files will be modified") + + # Process files + results = {"modified": 0, "skipped": 0, "errors": 0} + + with httpx.Client(timeout=60.0) as client: + for filepath in files: + try: + modified, status = await enrich_custodian_file( + filepath, client, args.force, args.dry_run + ) + if modified: + results["modified"] += 1 + else: + results["skipped"] += 1 + logger.info(f" Status: {status}") + except Exception as e: + logger.error(f"Error processing {filepath.name}: {e}") + results["errors"] += 1 + + # Rate limiting between files + time.sleep(REQUEST_DELAY) + + # Summary + logger.info("=" * 60) + logger.info(f"SUMMARY: {results['modified']} modified, {results['skipped']} skipped, {results['errors']} errors") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/enrich_descriptions.py b/scripts/enrich_descriptions.py new file mode 100644 index 0000000000..63f4f50f14 --- /dev/null +++ b/scripts/enrich_descriptions.py @@ -0,0 +1,386 @@ +#!/usr/bin/env python3 +""" +Enrich custodian descriptions using available data sources and GLM-4.6. + +This script: +1. Finds custodian files with placeholder descriptions +2. Gathers available data (Wikidata, Google Maps, UNESCO MoW, etc.) +3. Uses GLM-4.6 to generate a rich description +4. Updates the file with the new description + +Usage: + python enrich_descriptions.py --limit 10 # Process 10 files + python enrich_descriptions.py --dry-run # Show what would be done + python enrich_descriptions.py --all # Process all files +""" + +import asyncio +import argparse +import os +import re +import json +from pathlib import Path +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional +import httpx +from ruamel.yaml import YAML + +# Load environment +from dotenv import load_dotenv +load_dotenv() + +# Constants +DATA_DIR = Path(__file__).parent.parent / "data" / "custodian" +PLACEHOLDER_DESCRIPTION = "Heritage institution holding UNESCO Memory of the World inscribed documents" + +# Z.AI GLM API configuration +ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions" + + +class DescriptionEnricher: + """Enrich custodian descriptions using GLM-4.6.""" + + SYSTEM_PROMPT = """You are a cultural heritage expert writing descriptions for heritage institutions. + +Your task is to create a concise, informative description (2-4 sentences) for a heritage institution based on the available data. + +## Guidelines +- Focus on what makes the institution significant +- Include the type of collections if known (manuscripts, archives, art, etc.) +- Mention UNESCO Memory of the World inscriptions if present +- Include location context when relevant +- Use formal, encyclopedic tone +- Do NOT invent information not present in the data +- Keep descriptions under 100 words + +## Output Format +Provide ONLY the description text, no quotes or formatting. +""" + + def __init__(self, model: str = "glm-4.6", dry_run: bool = False): + self.api_key = os.environ.get("ZAI_API_TOKEN") + if not self.api_key: + raise ValueError("ZAI_API_TOKEN not found in environment. See docs/GLM_API_SETUP.md") + + self.model = model + self.dry_run = dry_run + self.yaml = YAML() + self.yaml.preserve_quotes = True + self.yaml.default_flow_style = False + self.yaml.width = 4096 # Prevent line wrapping + + self.client = httpx.AsyncClient( + timeout=60.0, + headers={ + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + ) + + self.stats = { + "processed": 0, + "enriched": 0, + "skipped": 0, + "errors": 0, + } + + async def close(self): + """Close the HTTP client.""" + await self.client.aclose() + + def find_files_with_placeholder(self, limit: Optional[int] = None) -> List[Path]: + """Find custodian files with placeholder descriptions.""" + files = [] + + for yaml_file in DATA_DIR.glob("*.yaml"): + try: + with open(yaml_file, 'r', encoding='utf-8') as f: + data = self.yaml.load(f) + + if not data: + continue + + # Check for placeholder in wikidata_enrichment.wikidata_description_en + wd_desc = data.get('wikidata_enrichment', {}).get('wikidata_description_en', '') + if PLACEHOLDER_DESCRIPTION in str(wd_desc): + files.append(yaml_file) + if limit and len(files) >= limit: + break + + except Exception as e: + print(f"Error reading {yaml_file}: {e}") + + return files + + def gather_context(self, data: Dict[str, Any]) -> Dict[str, Any]: + """Gather all available context from the entry.""" + context = { + "name": None, + "type": None, + "location": {}, + "wikidata": {}, + "google_maps": {}, + "unesco_mow": {}, + "collections": [], + } + + # Name from various sources + if 'custodian_name' in data: + context['name'] = data['custodian_name'].get('claim_value') + elif 'wikidata_enrichment' in data: + context['name'] = data['wikidata_enrichment'].get('wikidata_label_en') + elif 'original_entry' in data: + context['name'] = data['original_entry'].get('name') or data['original_entry'].get('organisatie') + + # Institution type + if 'wikidata_enrichment' in data: + context['type'] = data['wikidata_enrichment'].get('instance_of') + + # Location from GHCID + if 'ghcid' in data: + loc_res = data['ghcid'].get('location_resolution', {}) + context['location'] = { + "city": loc_res.get('city_label'), + "country": loc_res.get('country_label'), + "region": loc_res.get('region_code'), + } + + # Wikidata data + if 'wikidata_enrichment' in data: + wd = data['wikidata_enrichment'] + context['wikidata'] = { + "qid": wd.get('wikidata_entity_id'), + "instance_of": wd.get('instance_of'), + } + + # Google Maps data + if 'google_maps_enrichment' in data: + gm = data['google_maps_enrichment'] + context['google_maps'] = { + "name": gm.get('name'), + "types": gm.get('google_place_types', []), + "address": gm.get('formatted_address'), + "primary_type": gm.get('primary_type'), + } + + # UNESCO Memory of the World + if 'unesco_mow_enrichment' in data: + mow = data['unesco_mow_enrichment'] + context['unesco_mow'] = { + "is_custodian": mow.get('is_mow_custodian', False), + "inscription_count": mow.get('inscription_count', 0), + "inscriptions": [ + {"name": i.get('name'), "country": i.get('inscription_country')} + for i in mow.get('inscriptions', []) + ], + } + + return context + + def build_prompt(self, context: Dict[str, Any]) -> str: + """Build a prompt for GLM based on available context.""" + parts = [f"Institution: {context['name']}"] + + if context['type']: + parts.append(f"Type: {context['type']}") + + if context['location'].get('city'): + loc = context['location'] + loc_str = f"Location: {loc['city']}" + if loc.get('country'): + loc_str += f", {loc['country']}" + parts.append(loc_str) + + if context['google_maps'].get('types'): + parts.append(f"Google Maps Types: {', '.join(context['google_maps']['types'])}") + + if context['unesco_mow'].get('is_custodian'): + mow = context['unesco_mow'] + inscriptions = mow.get('inscriptions', []) + if inscriptions: + inscription_names = [i['name'] for i in inscriptions[:3]] # Limit to 3 + parts.append(f"UNESCO Memory of the World inscriptions held: {', '.join(inscription_names)}") + if mow['inscription_count'] > 3: + parts.append(f"(Total: {mow['inscription_count']} inscriptions)") + + if context['wikidata'].get('qid'): + parts.append(f"Wikidata ID: {context['wikidata']['qid']}") + + return "\n".join(parts) + + async def generate_description(self, context: Dict[str, Any]) -> Optional[str]: + """Generate a description using GLM-4.6.""" + prompt = self.build_prompt(context) + + try: + response = await self.client.post( + ZAI_API_URL, + json={ + "model": self.model, + "messages": [ + {"role": "system", "content": self.SYSTEM_PROMPT}, + {"role": "user", "content": prompt} + ], + "temperature": 0.3, + "max_tokens": 1024, # GLM-4.6 needs room for reasoning + content + } + ) + + if response.status_code != 200: + print(f" API Error: {response.status_code}") + print(f" Response: {response.text[:500]}") + return None + + result = response.json() + + if "choices" not in result or len(result["choices"]) == 0: + print(f" No choices in response") + return None + + content = result["choices"][0]["message"]["content"] + + if not content or content.strip() == "": + # GLM-4.6 sometimes puts content in reasoning_content + reasoning = result["choices"][0]["message"].get("reasoning_content", "") + if reasoning: + print(f" Warning: Content was empty, model only provided reasoning") + return None + + # Clean up the response + content = content.strip().strip('"').strip("'") + + return content + + except httpx.HTTPStatusError as e: + print(f" HTTP Error: {e.response.status_code}") + return None + except Exception as e: + print(f" Error calling GLM API: {type(e).__name__}: {e}") + return None + + async def enrich_file(self, file_path: Path) -> bool: + """Enrich a single file with a better description.""" + try: + with open(file_path, 'r', encoding='utf-8') as f: + data = self.yaml.load(f) + + if not data: + return False + + # Gather context + context = self.gather_context(data) + + if not context['name']: + print(f" Skipping {file_path.name}: No name found") + self.stats['skipped'] += 1 + return False + + print(f" Processing: {context['name']}") + + if self.dry_run: + print(f" [DRY RUN] Would generate description from context:") + print(f" - Type: {context['type']}") + print(f" - Location: {context['location'].get('city')}, {context['location'].get('country')}") + if context['unesco_mow'].get('is_custodian'): + print(f" - UNESCO MoW inscriptions: {context['unesco_mow']['inscription_count']}") + return True + + # Generate new description + new_description = await self.generate_description(context) + + if not new_description: + print(f" Failed to generate description") + self.stats['errors'] += 1 + return False + + print(f" Generated: {new_description[:80]}...") + + # Update the file + if 'wikidata_enrichment' not in data: + data['wikidata_enrichment'] = {} + + data['wikidata_enrichment']['wikidata_description_en'] = new_description + data['wikidata_enrichment']['description_enrichment'] = { + 'method': 'glm-4.6', + 'timestamp': datetime.now(timezone.utc).isoformat(), + 'source_data': ['wikidata', 'google_maps', 'unesco_mow'], + } + + # Write back + with open(file_path, 'w', encoding='utf-8') as f: + self.yaml.dump(data, f) + + self.stats['enriched'] += 1 + return True + + except Exception as e: + print(f" Error processing {file_path.name}: {e}") + self.stats['errors'] += 1 + return False + + async def run(self, limit: Optional[int] = None): + """Run the enrichment process.""" + print(f"Finding files with placeholder descriptions...") + files = self.find_files_with_placeholder(limit) + print(f"Found {len(files)} files to process") + + if not files: + print("No files need enrichment.") + return + + for i, file_path in enumerate(files, 1): + print(f"\n[{i}/{len(files)}] {file_path.name}") + await self.enrich_file(file_path) + self.stats['processed'] += 1 + + # Small delay between API calls + if not self.dry_run: + await asyncio.sleep(0.5) + + await self.close() + + # Print summary + print("\n" + "=" * 50) + print("SUMMARY") + print("=" * 50) + print(f"Processed: {self.stats['processed']}") + print(f"Enriched: {self.stats['enriched']}") + print(f"Skipped: {self.stats['skipped']}") + print(f"Errors: {self.stats['errors']}") + + +async def main(): + parser = argparse.ArgumentParser( + description="Enrich custodian descriptions using GLM-4.6" + ) + parser.add_argument( + "--limit", "-n", type=int, default=10, + help="Maximum number of files to process (default: 10)" + ) + parser.add_argument( + "--dry-run", "-d", action="store_true", + help="Show what would be done without making changes" + ) + parser.add_argument( + "--all", "-a", action="store_true", + help="Process all files (ignores --limit)" + ) + parser.add_argument( + "--model", "-m", type=str, default="glm-4.6", + help="GLM model to use (default: glm-4.6)" + ) + + args = parser.parse_args() + + limit = None if args.all else args.limit + + enricher = DescriptionEnricher( + model=args.model, + dry_run=args.dry_run, + ) + + await enricher.run(limit=limit) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/scripts/load_custodians_to_ducklake.py b/scripts/load_custodians_to_ducklake.py index 3900abfe50..7099f3f195 100644 --- a/scripts/load_custodians_to_ducklake.py +++ b/scripts/load_custodians_to_ducklake.py @@ -76,6 +76,8 @@ def extract_top_level_fields(data: dict) -> dict: # Custodian name consensus "custodian_name": "", "custodian_name_confidence": None, + "emic_name": "", # Official name in native/local language + "name_language": "", # ISO 639-1 language code for emic_name # Ratings "google_rating": None, @@ -87,10 +89,44 @@ def extract_top_level_fields(data: dict) -> dict: "timespan_notes": "", "timespan_json": "", + # Conflict-related temporal data (Palestinian heritage, etc.) + "time_of_destruction_json": "", + "conflict_status_json": "", + "destruction_date": None, # From time_of_destruction.date or conflict_status.date + + # Temporal extent (founding/dissolution dates) + "founding_date": None, + "dissolution_date": None, + "temporal_extent_json": "", + + # Wikidata inception (P571) + "wikidata_inception": None, + + # YouTube enrichment fields (extracted for querying) + "youtube_channel_id": "", + "youtube_channel_title": "", + "youtube_channel_url": "", + "youtube_subscriber_count": None, + "youtube_video_count": None, + "youtube_view_count": None, + "youtube_published_at": None, + "youtube_description": "", + + # Google Maps extended fields (in addition to rating/total_ratings) + "google_place_id": "", + "google_business_status": "", + "google_website": "", + "google_phone_international": "", + "google_primary_type": "", + "google_opening_hours_json": "", + "google_reviews_json": "", + "google_photo_count": None, + # Complex nested objects as JSON strings "original_entry_json": "", "wikidata_enrichment_json": "", "google_maps_enrichment_json": "", + "youtube_enrichment_json": "", "web_enrichment_json": "", "web_claims_json": "", "ghcid_json": "", @@ -98,6 +134,7 @@ def extract_top_level_fields(data: dict) -> dict: "provenance_json": "", "genealogiewerkbalk_json": "", "digital_platforms_json": "", + "service_area_json": "", } # Extract GHCID @@ -172,12 +209,49 @@ def extract_top_level_fields(data: dict) -> dict: } record["org_type"] = type_map.get(type_code, type_code) - # Extract Google Maps data + # ========================================================================== + # COORDINATE EXTRACTION - Priority order (first valid wins) + # ========================================================================== + # 1a. google_maps_enrichment.coordinates.latitude/longitude (nested) + # 1b. google_maps_enrichment.latitude/longitude (flat - Argentine files) + # 2. ghcid.location_resolution.source_coordinates.latitude/longitude + # 3. wikidata_enrichment.wikidata_coordinates.latitude/longitude + # 4. locations[0].latitude/longitude OR locations[0].lat/lon + # 5. original_entry.locations[0].latitude/longitude + # 6. root-level latitude/longitude + # ========================================================================== + + # Helper to check if coordinates are valid + def is_valid_coord(lat, lon): + if lat is None or lon is None: + return False + try: + lat_f = float(lat) + lon_f = float(lon) + return -90 <= lat_f <= 90 and -180 <= lon_f <= 180 + except (ValueError, TypeError): + return False + + # 1. Extract Google Maps data (highest priority for coordinates) gm = data.get("google_maps_enrichment", {}) if gm: + # 1a. Try nested structure first: google_maps_enrichment.coordinates.latitude coords = gm.get("coordinates", {}) - record["latitude"] = coords.get("latitude") - record["longitude"] = coords.get("longitude") + lat = coords.get("latitude") + lon = coords.get("longitude") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + + # 1b. Fallback to flat structure: google_maps_enrichment.latitude + # (used by Argentine and other recent enrichments) + if record["latitude"] is None: + lat = gm.get("latitude") + lon = gm.get("longitude") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + record["formatted_address"] = gm.get("formatted_address", "") record["google_rating"] = gm.get("rating") record["google_total_ratings"] = gm.get("total_ratings") @@ -193,8 +267,68 @@ def extract_top_level_fields(data: dict) -> dict: record["postal_code"] = comp.get("long_name", "") record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str) + + # Extract extended Google Maps fields + record["google_place_id"] = gm.get("place_id", "") + record["google_business_status"] = gm.get("business_status", "") + record["google_website"] = gm.get("website", "") + record["google_phone_international"] = gm.get("phone_international", "") + record["google_primary_type"] = gm.get("primary_type", "") + record["google_photo_count"] = gm.get("photo_count") + + # Opening hours as JSON (complex nested structure) + if gm.get("opening_hours"): + record["google_opening_hours_json"] = json.dumps( + gm["opening_hours"], ensure_ascii=False, default=str + ) + + # Reviews as JSON array + if gm.get("reviews"): + record["google_reviews_json"] = json.dumps( + gm["reviews"], ensure_ascii=False, default=str + ) - # Fallback: Extract location from locations array if not set from Google Maps + # ========================================================================== + # YOUTUBE ENRICHMENT EXTRACTION + # ========================================================================== + yt = data.get("youtube_enrichment", {}) + if yt: + record["youtube_enrichment_json"] = json.dumps(yt, ensure_ascii=False, default=str) + + # Extract channel data + channel = yt.get("channel", {}) + if channel: + record["youtube_channel_id"] = channel.get("channel_id", "") + record["youtube_channel_title"] = channel.get("title", "") + record["youtube_channel_url"] = channel.get("channel_url", "") + record["youtube_subscriber_count"] = channel.get("subscriber_count") + record["youtube_video_count"] = channel.get("video_count") + record["youtube_view_count"] = channel.get("view_count") + record["youtube_published_at"] = channel.get("published_at") + record["youtube_description"] = channel.get("description", "") + + # 2. Fallback: GHCID location_resolution.source_coordinates + ghcid = data.get("ghcid", {}) + if ghcid and record["latitude"] is None: + loc_res = ghcid.get("location_resolution", {}) + src_coords = loc_res.get("source_coordinates", {}) + lat = src_coords.get("latitude") + lon = src_coords.get("longitude") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + + # 3. Fallback: Wikidata coordinates + wd = data.get("wikidata_enrichment", {}) + if wd and record["latitude"] is None: + wd_coords = wd.get("wikidata_coordinates", {}) + lat = wd_coords.get("latitude") + lon = wd_coords.get("longitude") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + + # 4. Fallback: locations array locations = data.get("locations", []) if locations and isinstance(locations, list) and len(locations) > 0: loc = locations[0] # Use first location @@ -202,13 +336,44 @@ def extract_top_level_fields(data: dict) -> dict: record["city"] = loc.get("city", "") if not record["country"] and loc.get("country"): record["country"] = loc.get("country", "") - if record["latitude"] is None and loc.get("latitude"): - record["latitude"] = loc.get("latitude") - if record["longitude"] is None and loc.get("longitude"): - record["longitude"] = loc.get("longitude") + + if record["latitude"] is None: + # Try latitude/longitude first, then lat/lon + lat = loc.get("latitude") or loc.get("lat") + lon = loc.get("longitude") or loc.get("lon") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + + # 5. Fallback: original_entry.locations array (Japanese files, etc.) + orig_locations = original.get("locations", []) if original else [] + if orig_locations and isinstance(orig_locations, list) and len(orig_locations) > 0: + orig_loc = orig_locations[0] + if record["latitude"] is None: + lat = orig_loc.get("latitude") or orig_loc.get("lat") + lon = orig_loc.get("longitude") or orig_loc.get("lon") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + # Also try to get city/country from original_entry.locations if not set + if not record["city"] and orig_loc.get("city"): + record["city"] = orig_loc.get("city", "") + if not record["country"] and orig_loc.get("country"): + record["country"] = orig_loc.get("country", "") + + # 6. Fallback: Root-level coordinates + if record["latitude"] is None: + lat = data.get("latitude") or data.get("lat") + lon = data.get("longitude") or data.get("lon") + if is_valid_coord(lat, lon): + record["latitude"] = lat + record["longitude"] = lon + + # ========================================================================== + # COUNTRY/CITY EXTRACTION - Fallbacks from GHCID + # ========================================================================== # Fallback: Extract country from GHCID location_resolution - ghcid = data.get("ghcid", {}) if ghcid and not record["country"]: loc_res = ghcid.get("location_resolution", {}) if loc_res.get("country_code"): @@ -264,6 +429,11 @@ def extract_top_level_fields(data: dict) -> dict: data["digital_platforms"], ensure_ascii=False, default=str ) + if data.get("service_area"): + record["service_area_json"] = json.dumps( + data["service_area"], ensure_ascii=False, default=str + ) + # Extract TimeSpan (CIDOC-CRM E52_Time-Span) timespan = data.get("timespan", {}) if timespan: @@ -273,6 +443,77 @@ def extract_top_level_fields(data: dict) -> dict: record["timespan_notes"] = timespan.get("notes", "") record["timespan_json"] = json.dumps(timespan, ensure_ascii=False, default=str) + # ========================================================================== + # TEMPORAL DATA EXTRACTION - Multiple paths + # ========================================================================== + + # Extract time_of_destruction (conflict-related: PS-GZ-*, PS-GZA-* files) + time_of_destruction = data.get("time_of_destruction", {}) + if time_of_destruction: + record["time_of_destruction_json"] = json.dumps(time_of_destruction, ensure_ascii=False, default=str) + # Extract destruction date + if time_of_destruction.get("date"): + record["destruction_date"] = time_of_destruction.get("date") + + # Extract conflict_status (current operational status) + conflict_status = data.get("conflict_status", {}) + if conflict_status: + record["conflict_status_json"] = json.dumps(conflict_status, ensure_ascii=False, default=str) + # If status is 'destroyed' and we don't have destruction_date yet, use this + if conflict_status.get("status") == "destroyed" and not record.get("destruction_date"): + record["destruction_date"] = conflict_status.get("date") + + # Extract temporal_extent (founding/dissolution dates) + temporal_extent = data.get("temporal_extent", {}) + if temporal_extent: + record["temporal_extent_json"] = json.dumps(temporal_extent, ensure_ascii=False, default=str) + record["founding_date"] = temporal_extent.get("founding_date") + record["dissolution_date"] = temporal_extent.get("dissolution_date") or temporal_extent.get("end_date") + + # Fallback: Check identifiers for temporal_extent + identifiers = data.get("identifiers", {}) + if identifiers and isinstance(identifiers, dict): + id_temporal = identifiers.get("temporal_extent", {}) + if id_temporal and not record.get("founding_date"): + record["founding_date"] = id_temporal.get("founding_date") + if id_temporal and not record.get("dissolution_date"): + record["dissolution_date"] = id_temporal.get("dissolution_date") or id_temporal.get("end_date") + # Also check for founding_year in identifiers + if identifiers.get("founding_year") and not record.get("founding_date"): + # Convert year to date format + record["founding_date"] = f"{identifiers['founding_year']}-01-01" + + # Extract wikidata_inception from wikidata_enrichment + wd = data.get("wikidata_enrichment", {}) + if wd: + # Direct wikidata_inception field + if wd.get("wikidata_inception"): + record["wikidata_inception"] = wd.get("wikidata_inception") + # Or from wikidata_claims.inception + elif wd.get("wikidata_claims", {}).get("inception"): + record["wikidata_inception"] = wd.get("wikidata_claims", {}).get("inception") + + # Fallback: Check web_enrichment claims for inception or founding_date + web_enrichment = data.get("web_enrichment", {}) + if web_enrichment and web_enrichment.get("claims"): + for claim in web_enrichment.get("claims", []): + claim_type = claim.get("claim_type", "") + if claim_type in ("inception", "founding_date") and not record.get("founding_date"): + record["founding_date"] = claim.get("claim_value") + break + + # Final consolidation: If we have timespan_begin but no founding_date, use it + if record.get("timespan_begin") and not record.get("founding_date"): + record["founding_date"] = record["timespan_begin"] + + # If we have timespan_end but no dissolution_date, use it + if record.get("timespan_end") and not record.get("dissolution_date"): + record["dissolution_date"] = record["timespan_end"] + + # If we have destruction_date but no dissolution_date, use it + if record.get("destruction_date") and not record.get("dissolution_date"): + record["dissolution_date"] = record["destruction_date"] + return record @@ -403,8 +644,21 @@ def main(): # Show sample record print("\nSample record (first):") sample = records[0] - for key in ["file_name", "ghcid_current", "custodian_name", "city", "country"]: - print(f" {key}: {sample.get(key, 'N/A')}") + for key in ["file_name", "ghcid_current", "custodian_name", "city", "country", + "google_rating", "youtube_channel_id"]: + value = sample.get(key, 'N/A') + if value == "" or value is None: + value = "(empty)" + print(f" {key}: {value}") + + # Count non-empty enrichment fields + yt_count = sum(1 for r in records if r.get("youtube_channel_id")) + gm_count = sum(1 for r in records if r.get("google_place_id")) + coord_count = sum(1 for r in records if r.get("latitude") is not None) + print(f"\nEnrichment summary:") + print(f" With coordinates: {coord_count}/{len(records)}") + print(f" With Google Maps: {gm_count}/{len(records)}") + print(f" With YouTube: {yt_count}/{len(records)}") if args.dry_run: print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.")