feat(enrichment): add emic name enrichment and update CustodianName schema

- Add emic_name, name_language, standardized_name to CustodianName
- Add scripts for enriching custodian emic names from Wikidata
- Add YouTube and Google Maps enrichment scripts
- Update DuckLake loader for new schema fields
This commit is contained in:
kempersc 2025-12-08 14:58:50 +01:00
parent 35066eb5eb
commit 6a6557bbe8
5 changed files with 2160 additions and 11 deletions

View file

@ -114,6 +114,46 @@ classes:
See: .opencode/ABBREVIATION_SPECIAL_CHAR_RULE.md for complete documentation
===========================================================================
MANDATORY RULE: Diacritics MUST Be Normalized to ASCII in Abbreviations
===========================================================================
When generating abbreviations for GHCID, diacritics (accented characters)
MUST be normalized to their ASCII base letter equivalents. Only ASCII
uppercase letters (A-Z) are permitted in the abbreviation component.
RATIONALE:
1. URI/URL safety - Non-ASCII requires percent-encoding
2. Cross-system compatibility - ASCII is universally supported
3. Parsing consistency - No special character handling needed
4. Human readability - Easier to type and communicate
DIACRITICS TO NORMALIZE (examples by language):
- Czech: Č→C, Ř→R, Š→S, Ž→Z, Ě→E, Ů→U
- Polish: Ł→L, Ń→N, Ó→O, Ś→S, Ź→Z, Ż→Z, Ą→A, Ę→E
- German: Ä→A, Ö→O, Ü→U, ß→SS
- French: É→E, È→E, Ê→E, Ç→C, Ô→O
- Spanish: Ñ→N, Á→A, É→E, Í→I, Ó→O, Ú→U
- Nordic: Å→A, Ä→A, Ö→O, Ø→O, Æ→AE
EXAMPLES:
- "Vlastivědné muzeum" (Czech) → "VM" (not "VM" with háček)
- "Österreichische Nationalbibliothek" (German) → "ON"
- "Bibliothèque nationale" (French) → "BN"
REAL-WORLD EXAMPLE:
- ❌ WRONG: CZ-VY-TEL-L-VHSPAOČRZS (contains Č)
- ✅ CORRECT: CZ-VY-TEL-L-VHSPAOCRZS (ASCII only)
IMPLEMENTATION:
```python
import unicodedata
normalized = unicodedata.normalize('NFD', text)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
```
See: .opencode/ABBREVIATION_SPECIAL_CHAR_RULE.md for complete documentation
Can be generated by:
1. ReconstructionActivity (formal entity resolution) - was_generated_by link
2. Direct extraction (simple standardization) - no was_generated_by link

View file

@ -0,0 +1,557 @@
#!/usr/bin/env python3
"""
Enrich UNESCO MoW custodian files with proper CustodianName data.
This script:
1. Loads multilingual labels from Wikidata cache
2. Determines the appropriate emic (local language) name for each custodian
3. Updates custodian YAML files with:
- custodian_name.emic_name (local language name)
- custodian_name.name_language (ISO 639-1 code)
- custodian_name.standardized_name (same as emic_name for now)
4. Regenerates abbreviations from local language names if different
5. Updates GHCIDs and maintains history for changed abbreviations
Per AGENTS.md: The abbreviation and optional snake_case name suffix should be
derived from the emic name in the institution's official local/national language.
"""
import json
import yaml
import unicodedata
import re
from pathlib import Path
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
# Country to primary official language(s) mapping
# Format: country_name -> (primary_lang, fallback_langs)
# For multilingual countries, we'll check if the institution has a label in any official language
COUNTRY_LANGUAGE_MAP = {
# Europe - Western
'Germany': ('de', []),
'Austria': ('de', []),
'France': ('fr', []),
'Netherlands': ('nl', []),
'Belgium': ('nl', ['fr', 'de']), # Check all three official languages
'Luxembourg': ('lb', ['fr', 'de']),
'Switzerland': ('de', ['fr', 'it', 'rm']), # Check all four national languages
'United Kingdom': ('en', []),
'Ireland': ('en', ['ga']),
# Europe - Northern
'Norway': ('nb', ['nn', 'no']), # Bokmål preferred, Nynorsk fallback
'Sweden': ('sv', []),
'Denmark': ('da', []),
'Finland': ('fi', ['sv']), # Swedish is also official
'Iceland': ('is', []),
# Europe - Southern
'Spain': ('es', ['ca', 'eu', 'gl']), # Regional languages
'Portugal': ('pt', []),
'Italy': ('it', []),
'Greece': ('el', []),
'Malta': ('mt', ['en']),
'Cyprus': ('el', ['tr']),
# Europe - Central/Eastern
'Poland': ('pl', []),
'Czech Republic': ('cs', []),
'Czechia': ('cs', []),
'Slovakia': ('sk', []),
'Hungary': ('hu', []),
'Slovenia': ('sl', []),
'Croatia': ('hr', []),
'Serbia': ('sr', []),
'Bosnia and Herzegovina': ('bs', ['hr', 'sr']),
'North Macedonia': ('mk', []),
'Albania': ('sq', []),
'Bulgaria': ('bg', []),
'Romania': ('ro', []),
'Moldova': ('ro', []),
'Ukraine': ('uk', []),
'Belarus': ('be', ['ru']),
'Russia': ('ru', []),
'Estonia': ('et', []),
'Latvia': ('lv', []),
'Lithuania': ('lt', []),
# Americas
'United States': ('en', []),
'Canada': ('en', ['fr']),
'Mexico': ('es', []),
'Brazil': ('pt', []),
'Argentina': ('es', []),
'Chile': ('es', []),
'Colombia': ('es', []),
'Peru': ('es', []),
'Venezuela': ('es', []),
'Ecuador': ('es', []),
'Bolivia': ('es', []),
'Paraguay': ('es', ['gn']),
'Uruguay': ('es', []),
'Cuba': ('es', []),
'Dominican Republic': ('es', []),
'Puerto Rico': ('es', ['en']),
'Costa Rica': ('es', []),
'Panama': ('es', []),
'Guatemala': ('es', []),
'Honduras': ('es', []),
'El Salvador': ('es', []),
'Nicaragua': ('es', []),
'Jamaica': ('en', []),
'Trinidad and Tobago': ('en', []),
'Barbados': ('en', []),
'Suriname': ('nl', []),
'Guyana': ('en', []),
# Asia - East
'Japan': ('ja', []),
"People's Republic of China": ('zh', []),
'China': ('zh', []),
'Taiwan': ('zh', []),
'South Korea': ('ko', []),
'North Korea': ('ko', []),
'Mongolia': ('mn', []),
# Asia - Southeast
'Vietnam': ('vi', []),
'Thailand': ('th', []),
'Cambodia': ('km', []),
'Laos': ('lo', []),
'Myanmar': ('my', []),
'Malaysia': ('ms', []),
'Singapore': ('en', ['zh', 'ms', 'ta']),
'Indonesia': ('id', []),
'Philippines': ('tl', ['en']),
'Brunei': ('ms', []),
'East Timor': ('pt', ['tet']),
'Timor-Leste': ('pt', ['tet']),
# Asia - South
'India': ('hi', ['en', 'bn', 'ta', 'te', 'mr', 'gu', 'kn', 'ml', 'pa', 'or']),
'Pakistan': ('ur', ['en']),
'Bangladesh': ('bn', []),
'Sri Lanka': ('si', ['ta']),
'Nepal': ('ne', []),
'Bhutan': ('dz', []),
'Maldives': ('dv', []),
# Asia - Central
'Kazakhstan': ('kk', ['ru']),
'Uzbekistan': ('uz', []),
'Turkmenistan': ('tk', []),
'Kyrgyzstan': ('ky', ['ru']),
'Tajikistan': ('tg', []),
'Afghanistan': ('ps', ['fa']),
# Asia - West / Middle East
'Turkey': ('tr', []),
'Iran': ('fa', []),
'Iraq': ('ar', ['ku']),
'Syria': ('ar', []),
'Lebanon': ('ar', []),
'Jordan': ('ar', []),
'Israel': ('he', ['ar']),
'Palestine': ('ar', []),
'Saudi Arabia': ('ar', []),
'United Arab Emirates': ('ar', []),
'Kuwait': ('ar', []),
'Qatar': ('ar', []),
'Bahrain': ('ar', []),
'Oman': ('ar', []),
'Yemen': ('ar', []),
'Georgia': ('ka', []),
'Armenia': ('hy', []),
'Azerbaijan': ('az', []),
# Africa - North
'Egypt': ('ar', []),
'Libya': ('ar', []),
'Tunisia': ('ar', ['fr']),
'Algeria': ('ar', ['fr']),
'Morocco': ('ar', ['fr']),
# Africa - West
'Nigeria': ('en', []),
'Ghana': ('en', []),
'Senegal': ('fr', []),
'Ivory Coast': ('fr', []),
"Côte d'Ivoire": ('fr', []),
'Mali': ('fr', []),
'Burkina Faso': ('fr', []),
'Niger': ('fr', []),
'Benin': ('fr', []),
'Togo': ('fr', []),
'Guinea': ('fr', []),
'Sierra Leone': ('en', []),
'Liberia': ('en', []),
'Mauritania': ('ar', ['fr']),
'Cape Verde': ('pt', []),
'Gambia': ('en', []),
# Africa - East
'Kenya': ('sw', ['en']),
'Tanzania': ('sw', ['en']),
'Uganda': ('en', ['sw']),
'Rwanda': ('rw', ['fr', 'en']),
'Burundi': ('rn', ['fr']),
'Ethiopia': ('am', []),
'Eritrea': ('ti', ['ar']),
'Somalia': ('so', ['ar']),
'Djibouti': ('fr', ['ar']),
'Madagascar': ('mg', ['fr']),
'Mauritius': ('en', ['fr']),
'Seychelles': ('en', ['fr']),
# Africa - Central
'Democratic Republic of the Congo': ('fr', []),
'Republic of the Congo': ('fr', []),
'Central African Republic': ('fr', []),
'Chad': ('fr', ['ar']),
'Cameroon': ('fr', ['en']),
'Gabon': ('fr', []),
'Equatorial Guinea': ('es', ['fr', 'pt']),
# Africa - Southern
'South Africa': ('en', ['af', 'zu', 'xh']),
'Namibia': ('en', ['de', 'af']),
'Botswana': ('en', ['tn']),
'Zimbabwe': ('en', ['sn', 'nd']),
'Zambia': ('en', []),
'Malawi': ('en', []),
'Mozambique': ('pt', []),
'Angola': ('pt', []),
'Lesotho': ('en', ['st']),
'Eswatini': ('en', ['ss']),
# Oceania
'Australia': ('en', []),
'New Zealand': ('en', ['mi']),
'Papua New Guinea': ('en', ['tpi', 'ho']),
'Fiji': ('en', ['fj', 'hi']),
'Vanuatu': ('en', ['fr', 'bi']),
'Samoa': ('sm', ['en']),
'Tonga': ('to', ['en']),
'Solomon Islands': ('en', []),
'Kiribati': ('en', ['gil']),
'Micronesia': ('en', []),
'Palau': ('en', ['pau']),
# Caribbean
'Haiti': ('ht', ['fr']),
'Bahamas': ('en', []),
'Curaçao': ('nl', ['pap']),
'Aruba': ('nl', ['pap']),
# Default fallback
'Unknown': ('en', []),
}
def normalize_diacritics(text: str) -> str:
"""Normalize diacritics to ASCII equivalents."""
normalized = unicodedata.normalize('NFD', text)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
return ascii_text
def get_significant_words(text: str) -> List[str]:
"""Extract significant words from a name, skipping articles/prepositions."""
# Skip words by language
SKIP_WORDS = {
# Dutch
'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des', "'s",
'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', 'over', 'onder', 'door', 'en', 'of',
# English
'a', 'an', 'the', 'of', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as', 'under',
'and', 'or', 'but',
# French
'le', 'la', 'les', 'un', 'une', 'des', 'du', 'à', 'au', 'aux', 'dans', 'sur', 'sous',
'pour', 'par', 'avec', "l'", "d'", 'et', 'ou',
# German
'der', 'die', 'das', 'dem', 'den', 'ein', 'eine', 'einer', 'einem', 'einen',
'von', 'zu', 'für', 'bei', 'nach', 'aus', 'vor', 'über', 'unter', 'durch', 'und', 'oder',
# Spanish
'el', 'los', 'las', 'unos', 'unas', 'del', 'al', 'con', 'por', 'para', 'sobre', 'bajo',
'y', 'o', 'e', 'u',
# Portuguese
'o', 'os', 'as', 'um', 'uma', 'uns', 'umas', 'do', 'da', 'dos', 'das', 'em', 'no', 'na',
'nos', 'nas', 'com', 'sob',
# Italian
'il', 'lo', 'gli', 'uno', 'di', 'dello', 'della', 'dei', 'degli', 'delle',
'allo', 'alla', 'ai', 'agli', 'alle', 'dal', 'dallo', 'dalla', 'dai', 'dagli', 'dalle',
'nel', 'nello', 'nella', 'nei', 'negli', 'nelle', 'sul', 'sullo', 'sulla', 'sui', 'sugli',
'sulle', 'per', 'tra', 'fra', 'ed', 'od',
# Russian (transliterated)
'i', 'v', 'na', 'pri',
}
words = text.split()
significant = []
for word in words:
# Clean word
clean_word = re.sub(r"[''`\",.:;!?()[\]{}]", '', word.lower())
if clean_word and clean_word not in SKIP_WORDS:
# Skip pure numbers
if not clean_word.isdigit() and not re.match(r'^\d+-\d+$', clean_word):
significant.append(word)
return significant
def generate_abbreviation(name: str, max_length: int = 10) -> str:
"""Generate abbreviation from emic name using first letters of significant words."""
significant_words = get_significant_words(name)
if not significant_words:
# Fallback: use first letters of all words
significant_words = name.split()[:3]
# Take first letter of each word
abbrev = ''
for word in significant_words:
# Clean the word of special characters
clean = re.sub(r"[''`\",.:;!?()[\]{}&/\\+@#$%*|=<>~^_-]", '', word)
if clean:
# Normalize diacritics and take first letter
first_letter = normalize_diacritics(clean[0]).upper()
if first_letter.isalpha():
abbrev += first_letter
# Ensure at least 2 characters
if len(abbrev) < 2:
# Try to get more from the name
clean_name = normalize_diacritics(name)
clean_name = re.sub(r'[^A-Za-z]', '', clean_name)
abbrev = clean_name[:3].upper()
return abbrev[:max_length]
def get_emic_name(custodian: Dict, country: str, labels: Dict[str, str]) -> Tuple[str, str]:
"""
Determine the appropriate emic (local language) name for a custodian.
Returns: (emic_name, language_code)
"""
# Get language mapping for country
if country in COUNTRY_LANGUAGE_MAP:
primary_lang, fallback_langs = COUNTRY_LANGUAGE_MAP[country]
all_langs = [primary_lang] + fallback_langs
else:
# Unknown country - default to English
all_langs = ['en']
# Try each language in order
for lang in all_langs:
if lang in labels:
return labels[lang], lang
# Fallback to English if available
if 'en' in labels:
return labels['en'], 'en'
# Ultimate fallback: first available label
if labels:
first_lang = next(iter(labels))
return labels[first_lang], first_lang
# No labels at all - use original name
return custodian.get('name_en', 'Unknown'), 'en'
def load_custodian_file(filepath: Path) -> Optional[Dict]:
"""Load a custodian YAML file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except Exception as e:
print(f"Error loading {filepath}: {e}")
return None
def save_custodian_file(filepath: Path, data: Dict):
"""Save a custodian YAML file."""
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
def main():
# Paths
project_root = Path(__file__).parent.parent
custodian_dir = project_root / 'data' / 'custodian'
cache_dir = project_root / 'data' / 'cache'
# Load data
print("Loading multilingual labels...")
with open(cache_dir / 'unesco_mow_multilingual_labels.json', 'r') as f:
labels_data = json.load(f)
wikidata_labels = labels_data['custodians']
print("Loading original custodian info...")
with open(cache_dir / 'unesco_mow_custodians.json', 'r') as f:
custodians_data = json.load(f)
original_custodians = {c['wikidata_id']: c for c in custodians_data['custodians']}
print(f"Processing {len(original_custodians)} UNESCO MoW custodians...\n")
# Statistics
stats = {
'total': 0,
'updated': 0,
'abbreviation_changed': 0,
'ghcid_changed': 0,
'not_found': 0,
'errors': 0,
'already_enriched': 0,
}
# Track changes for reporting
changes = []
timestamp = datetime.now(timezone.utc).isoformat()
for qid, custodian in original_custodians.items():
stats['total'] += 1
# Find the custodian file by Wikidata ID
matching_files = list(custodian_dir.glob('*.yaml'))
custodian_file = None
for filepath in matching_files:
data = load_custodian_file(filepath)
if data:
# Check if this is the right custodian by Wikidata ID
wikidata_id = data.get('original_entry', {}).get('wikidata_id') or \
data.get('wikidata_enrichment', {}).get('wikidata_entity_id')
if wikidata_id == qid:
custodian_file = filepath
break
if not custodian_file:
stats['not_found'] += 1
continue
# Load full custodian data
data = load_custodian_file(custodian_file)
if not data:
stats['errors'] += 1
continue
# Get labels for this custodian
labels_info = wikidata_labels.get(qid, {})
labels = labels_info.get('labels', {})
if not labels:
print(f" No labels found for {qid}")
continue
# Determine emic name
country = custodian.get('country', 'Unknown')
emic_name, lang_code = get_emic_name(custodian, country, labels)
# Current values
current_name = data.get('custodian_name', {}).get('claim_value', '')
current_emic = data.get('custodian_name', {}).get('emic_name', '')
# Check if already enriched with emic_name
if current_emic and current_emic == emic_name:
stats['already_enriched'] += 1
continue
# Generate abbreviation from emic name
new_abbrev = generate_abbreviation(emic_name)
# Get current abbreviation from GHCID
current_ghcid = data.get('ghcid', {}).get('ghcid_current', '')
current_abbrev = current_ghcid.split('-')[-1] if current_ghcid else ''
# Update custodian_name
if 'custodian_name' not in data:
data['custodian_name'] = {}
data['custodian_name']['emic_name'] = emic_name
data['custodian_name']['name_language'] = lang_code
data['custodian_name']['standardized_name'] = emic_name
# Keep original English name if different
if current_name and current_name != emic_name:
if 'alternative_names' not in data['custodian_name']:
data['custodian_name']['alternative_names'] = []
if current_name not in [n.get('name') if isinstance(n, dict) else n
for n in data['custodian_name']['alternative_names']]:
data['custodian_name']['alternative_names'].append({
'name': current_name,
'language': 'en',
'source': 'wikidata'
})
# Track change
change_info = {
'wikidata_id': qid,
'file': custodian_file.name,
'country': country,
'old_name': current_name,
'new_emic_name': emic_name,
'language': lang_code,
'old_abbrev': current_abbrev,
'new_abbrev': new_abbrev,
}
# Check if abbreviation changed
if new_abbrev != current_abbrev and current_abbrev:
stats['abbreviation_changed'] += 1
change_info['abbrev_changed'] = True
# TODO: For now, we don't update GHCID - that requires more careful handling
# with collision detection. Just log the change.
print(f" ABBREV CHANGE: {custodian_file.name}")
print(f" {country}: {current_name}")
print(f" Emic ({lang_code}): {emic_name}")
print(f" Abbrev: {current_abbrev}{new_abbrev}")
changes.append(change_info)
# Save updated file
save_custodian_file(custodian_file, data)
stats['updated'] += 1
# Print summary
print("\n" + "=" * 60)
print("ENRICHMENT SUMMARY")
print("=" * 60)
print(f"Total custodians processed: {stats['total']}")
print(f"Files updated: {stats['updated']}")
print(f"Already enriched: {stats['already_enriched']}")
print(f"Abbreviation changes detected: {stats['abbreviation_changed']}")
print(f"Files not found: {stats['not_found']}")
print(f"Errors: {stats['errors']}")
# Save changes log
changes_log = {
'timestamp': timestamp,
'stats': stats,
'changes': changes
}
log_file = cache_dir / 'emic_name_enrichment_log.json'
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(changes_log, f, indent=2, ensure_ascii=False)
print(f"\nChanges log saved to: {log_file}")
# Show sample of abbreviation changes
abbrev_changes = [c for c in changes if c.get('abbrev_changed')]
if abbrev_changes[:10]:
print("\n" + "-" * 60)
print("Sample abbreviation changes (not yet applied to GHCID):")
print("-" * 60)
for c in abbrev_changes[:10]:
print(f" {c['country']}: {c['old_abbrev']}{c['new_abbrev']}")
print(f" EN: {c['old_name']}")
print(f" {c['language'].upper()}: {c['new_emic_name']}")
print()
if __name__ == '__main__':
main()

View file

@ -0,0 +1,912 @@
#!/usr/bin/env python3
"""
Enrich Heritage Custodian YAML files with YouTube and Google Maps data.
This script enriches custodian files in data/custodian/ with:
1. YouTube channel/video data (if channel can be found)
2. Google Maps/Places API data (address, ratings, reviews, photos)
3. GLM-4.6 verification of matches (CH-Annotator convention)
Usage:
python scripts/enrich_custodian_youtube_maps.py [--dry-run] [--limit N] [--force]
python scripts/enrich_custodian_youtube_maps.py --files FILE1.yaml FILE2.yaml
python scripts/enrich_custodian_youtube_maps.py --pattern "ZA-*.yaml"
Environment Variables:
GOOGLE_PLACES_TOKEN - Required for Google Maps enrichment
GOOGLE_YOUTUBE_TOKEN - Required for YouTube enrichment
ZAI_API_TOKEN - Required for GLM-4.6 verification (optional but recommended)
Author: GLAM Data Extraction Project
Date: December 2025
"""
import argparse
import asyncio
import fnmatch
import json
import logging
import os
import re
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import httpx
import yaml
# Add project src to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT / "src"))
# Load environment variables
from dotenv import load_dotenv
load_dotenv(PROJECT_ROOT / ".env")
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# Configuration
# ============================================================================
CUSTODIAN_DIR = PROJECT_ROOT / "data/custodian"
# API Keys
GOOGLE_PLACES_TOKEN = os.getenv("GOOGLE_PLACES_TOKEN", "")
GOOGLE_YOUTUBE_TOKEN = os.getenv("GOOGLE_YOUTUBE_TOKEN", "")
# Z.AI GLM 4.6 API for CH-Annotator verification (NOT Anthropic Claude)
ZAI_API_TOKEN = os.getenv("ZAI_API_TOKEN", "")
# API Endpoints
YOUTUBE_API_BASE = "https://www.googleapis.com/youtube/v3"
TEXT_SEARCH_URL = "https://places.googleapis.com/v1/places:searchText"
# Z.AI GLM 4.6 API endpoint (Anthropic-compatible interface)
ZAI_API_BASE = "https://api.z.ai/api/anthropic/v1"
ZAI_MODEL = "glm-4.6"
# Rate limiting
REQUEST_DELAY = 0.3 # seconds between API calls
# CH-Annotator convention version
CH_ANNOTATOR_VERSION = "ch_annotator-v1_7_0"
# Google Places fields to request
PLACE_FIELDS = [
"id", "displayName", "formattedAddress", "addressComponents",
"location", "types", "businessStatus", "internationalPhoneNumber",
"nationalPhoneNumber", "regularOpeningHours", "currentOpeningHours",
"websiteUri", "rating", "userRatingCount", "reviews", "priceLevel",
"photos", "googleMapsUri", "utcOffsetMinutes", "primaryType",
"primaryTypeDisplayName", "shortFormattedAddress", "editorialSummary",
]
# ============================================================================
# Utility Functions
# ============================================================================
def get_institution_name(entry: Dict[str, Any]) -> str:
"""Extract institution name from custodian entry."""
# Try custodian_name.claim_value first
if entry.get("custodian_name", {}).get("claim_value"):
return entry["custodian_name"]["claim_value"]
# Fall back to wikidata label
if entry.get("wikidata_enrichment", {}).get("wikidata_label_en"):
return entry["wikidata_enrichment"]["wikidata_label_en"]
# Fall back to original entry
if entry.get("original_entry", {}).get("name"):
return entry["original_entry"]["name"]
return ""
def get_country_code(entry: Dict[str, Any]) -> str:
"""Extract country code from entry."""
loc = entry.get("ghcid", {}).get("location_resolution", {})
if loc.get("country_code"):
return loc["country_code"]
# Parse from GHCID
ghcid = entry.get("ghcid", {}).get("ghcid_current", "")
if ghcid and "-" in ghcid:
return ghcid.split("-")[0]
return ""
def get_coordinates(entry: Dict[str, Any]) -> Optional[Tuple[float, float]]:
"""Extract coordinates from entry if available."""
loc = entry.get("ghcid", {}).get("location_resolution", {})
src = loc.get("source_coordinates", {})
if src.get("latitude") and src.get("longitude"):
return (src["latitude"], src["longitude"])
return None
def get_city_name(entry: Dict[str, Any]) -> str:
"""Extract city name from entry."""
loc = entry.get("ghcid", {}).get("location_resolution", {})
return loc.get("city_name", "")
def get_wikidata_id(entry: Dict[str, Any]) -> str:
"""Extract Wikidata ID from entry."""
if entry.get("wikidata_enrichment", {}).get("wikidata_entity_id"):
return entry["wikidata_enrichment"]["wikidata_entity_id"]
if entry.get("original_entry", {}).get("wikidata_id"):
return entry["original_entry"]["wikidata_id"]
return ""
# ============================================================================
# Google Maps Enrichment
# ============================================================================
def build_maps_search_query(entry: Dict[str, Any]) -> str:
"""Build Google Maps search query from entry data."""
parts = []
name = get_institution_name(entry)
if name:
parts.append(name)
city = get_city_name(entry)
if city:
parts.append(city)
# Get country name
loc = entry.get("ghcid", {}).get("location_resolution", {})
country = loc.get("country_label", "")
if country:
parts.append(country)
return ", ".join(parts)
def search_google_place(
query: str,
client: httpx.Client,
country_code: str = "",
location_bias: Optional[Tuple[float, float]] = None,
) -> Optional[Dict[str, Any]]:
"""Search for a place using Google Places API (New)."""
if not GOOGLE_PLACES_TOKEN:
logger.warning("GOOGLE_PLACES_TOKEN not set, skipping Maps enrichment")
return None
headers = {
"Content-Type": "application/json",
"X-Goog-Api-Key": GOOGLE_PLACES_TOKEN,
"X-Goog-FieldMask": ",".join([f"places.{f}" for f in PLACE_FIELDS]),
}
body = {
"textQuery": query,
"maxResultCount": 1,
}
# Set language/region based on country
if country_code == "ZA":
body["languageCode"] = "en"
body["regionCode"] = "ZA"
elif country_code == "ZW":
body["languageCode"] = "en"
body["regionCode"] = "ZW"
# Add location bias if coordinates available
if location_bias:
lat, lng = location_bias
body["locationBias"] = {
"circle": {
"center": {"latitude": lat, "longitude": lng},
"radius": 50000.0 # 50km radius
}
}
try:
response = client.post(TEXT_SEARCH_URL, headers=headers, json=body)
response.raise_for_status()
data = response.json()
places = data.get("places", [])
if places:
return places[0]
else:
logger.warning(f"No place found for: {query}")
return None
except httpx.HTTPStatusError as e:
error_data = {}
try:
error_data = e.response.json()
except Exception:
pass
error_msg = error_data.get("error", {}).get("message", str(e))
logger.error(f"Google Places API error: {error_msg}")
return None
except Exception as e:
logger.error(f"Error searching for '{query}': {e}")
return None
def parse_google_place(place: Dict[str, Any]) -> Dict[str, Any]:
"""Parse Google Places API response into enrichment dict."""
result = {
"place_id": place.get("id", ""),
"name": place.get("displayName", {}).get("text", ""),
"fetch_timestamp": datetime.now(timezone.utc).isoformat(),
"api_status": "OK",
}
# Location
location = place.get("location", {})
if location.get("latitude") and location.get("longitude"):
result["coordinates"] = {
"latitude": location["latitude"],
"longitude": location["longitude"],
}
if place.get("formattedAddress"):
result["formatted_address"] = place["formattedAddress"]
if place.get("shortFormattedAddress"):
result["short_address"] = place["shortFormattedAddress"]
# Contact
if place.get("nationalPhoneNumber"):
result["phone_local"] = place["nationalPhoneNumber"]
if place.get("internationalPhoneNumber"):
result["phone_international"] = place["internationalPhoneNumber"]
if place.get("websiteUri"):
result["website"] = place["websiteUri"]
# Business info
if place.get("types"):
result["google_place_types"] = place["types"]
if place.get("primaryType"):
result["primary_type"] = place["primaryType"]
if place.get("businessStatus"):
result["business_status"] = place["businessStatus"]
# Ratings and reviews
if place.get("rating") is not None:
result["rating"] = place["rating"]
if place.get("userRatingCount") is not None:
result["total_ratings"] = place["userRatingCount"]
# Parse reviews
reviews = place.get("reviews", [])
if reviews:
result["reviews"] = [
{
"author_name": r.get("authorAttribution", {}).get("displayName"),
"author_uri": r.get("authorAttribution", {}).get("uri"),
"rating": r.get("rating"),
"relative_time_description": r.get("relativePublishTimeDescription"),
"text": r.get("text", {}).get("text"),
"publish_time": r.get("publishTime"),
}
for r in reviews
]
# Opening hours
if place.get("regularOpeningHours"):
result["opening_hours"] = {
"open_now": place.get("currentOpeningHours", {}).get("openNow"),
"weekday_text": place["regularOpeningHours"].get("weekdayDescriptions"),
}
# Editorial summary
if place.get("editorialSummary"):
result["editorial_summary"] = place["editorialSummary"].get("text")
# Photos (just references, not downloading)
photos = place.get("photos", [])
if photos:
result["photo_count"] = len(photos)
result["photos_metadata"] = [
{
"name": p.get("name"),
"height": p.get("heightPx"),
"width": p.get("widthPx"),
}
for p in photos[:5] # First 5 only
]
# Links
if place.get("googleMapsUri"):
result["google_maps_url"] = place["googleMapsUri"]
return result
# ============================================================================
# YouTube Enrichment
# ============================================================================
def search_youtube_channel(
query: str,
client: httpx.Client,
) -> Optional[Dict[str, Any]]:
"""Search for a YouTube channel."""
if not GOOGLE_YOUTUBE_TOKEN:
logger.warning("GOOGLE_YOUTUBE_TOKEN not set, skipping YouTube enrichment")
return None
params = {
"part": "snippet",
"type": "channel",
"q": query,
"maxResults": 3, # Get top 3 for verification
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(
f"{YOUTUBE_API_BASE}/search",
params=params,
timeout=30.0
)
response.raise_for_status()
data = response.json()
items = data.get("items", [])
if items:
# Return all candidates for LLM verification
return {"candidates": items, "query": query}
return None
except httpx.HTTPStatusError as e:
if "quotaExceeded" in str(e):
logger.error("YouTube API quota exceeded")
else:
logger.error(f"YouTube API error: {e}")
return None
except Exception as e:
logger.error(f"Error searching YouTube for '{query}': {e}")
return None
def get_youtube_channel_details(
channel_id: str,
client: httpx.Client,
) -> Optional[Dict[str, Any]]:
"""Get detailed channel information."""
if not GOOGLE_YOUTUBE_TOKEN:
return None
params = {
"part": "snippet,statistics,brandingSettings,contentDetails",
"id": channel_id,
"key": GOOGLE_YOUTUBE_TOKEN,
}
try:
response = client.get(
f"{YOUTUBE_API_BASE}/channels",
params=params,
timeout=30.0
)
response.raise_for_status()
data = response.json()
items = data.get("items", [])
if items:
return items[0]
return None
except Exception as e:
logger.error(f"Error getting channel details for '{channel_id}': {e}")
return None
def parse_youtube_channel(channel: Dict[str, Any]) -> Dict[str, Any]:
"""Parse YouTube channel API response."""
snippet = channel.get("snippet", {})
stats = channel.get("statistics", {})
branding = channel.get("brandingSettings", {})
result = {
"channel_id": channel.get("id", ""),
"channel_url": f"https://www.youtube.com/channel/{channel.get('id', '')}",
"title": snippet.get("title", ""),
"description": snippet.get("description", ""),
"custom_url": snippet.get("customUrl", ""),
"published_at": snippet.get("publishedAt", ""),
"country": snippet.get("country", ""),
"fetch_timestamp": datetime.now(timezone.utc).isoformat(),
}
# Statistics
if stats.get("subscriberCount"):
result["subscriber_count"] = int(stats["subscriberCount"])
if stats.get("videoCount"):
result["video_count"] = int(stats["videoCount"])
if stats.get("viewCount"):
result["view_count"] = int(stats["viewCount"])
# Thumbnails
thumbnails = snippet.get("thumbnails", {})
if thumbnails.get("high", {}).get("url"):
result["thumbnail_url"] = thumbnails["high"]["url"]
return result
# ============================================================================
# Z.AI GLM 4.6 Verification with Exponential Backoff (CH-Annotator)
# ============================================================================
MAX_RETRIES = 3
BASE_DELAY = 1.0 # seconds
MAX_DELAY = 30.0 # seconds
async def call_glm_with_retry(
prompt: str,
max_retries: int = MAX_RETRIES,
) -> Optional[str]:
"""
Call Z.AI GLM 4.6 API with exponential backoff retry.
Uses Anthropic-compatible interface at api.z.ai.
Returns:
Response content string or None if all retries fail
"""
headers = {
"x-api-key": ZAI_API_TOKEN,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
}
body = {
"model": ZAI_MODEL,
"max_tokens": 500,
"messages": [
{"role": "user", "content": prompt}
],
}
for attempt in range(max_retries):
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{ZAI_API_BASE}/messages",
headers=headers,
json=body,
timeout=60.0
)
response.raise_for_status()
data = response.json()
# Anthropic-compatible response format
content_blocks = data.get("content", [])
if content_blocks and content_blocks[0].get("type") == "text":
return content_blocks[0].get("text", "")
return ""
except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
# Rate limited - exponential backoff
delay = min(BASE_DELAY * (2 ** attempt), MAX_DELAY)
logger.warning(f"Rate limited, waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(delay)
else:
logger.error(f"GLM 4.6 API error: {e}")
return None
except Exception as e:
logger.error(f"GLM 4.6 API call failed: {e}")
return None
logger.error(f"All {max_retries} GLM 4.6 API retries exhausted")
return None
async def verify_match_with_llm(
institution_name: str,
institution_info: Dict[str, Any],
candidate_name: str,
candidate_info: Dict[str, Any],
match_type: str, # "google_maps" or "youtube"
) -> Dict[str, Any]:
"""
Use Z.AI GLM 4.6 to verify if a candidate match is correct.
Returns:
Dict with keys:
- is_match: bool
- confidence: float (0.0-1.0)
- reasoning: str
- agent: str (model version)
"""
if not ZAI_API_TOKEN:
logger.warning("ZAI_API_TOKEN not set, skipping LLM verification")
return {
"is_match": None,
"confidence": 0.5,
"reasoning": "LLM verification skipped - no API key",
"agent": "none",
"verified": False,
}
# Build verification prompt
if match_type == "google_maps":
prompt = f"""You are verifying if a Google Maps place matches a heritage institution.
INSTITUTION:
- Name: {institution_name}
- Wikidata: {institution_info.get('wikidata_id', 'N/A')}
- City: {institution_info.get('city', 'N/A')}
- Country: {institution_info.get('country', 'N/A')}
- Type: {institution_info.get('type', 'N/A')}
GOOGLE MAPS CANDIDATE:
- Name: {candidate_name}
- Address: {candidate_info.get('formatted_address', 'N/A')}
- Types: {candidate_info.get('google_place_types', 'N/A')}
- Website: {candidate_info.get('website', 'N/A')}
Is this Google Maps place the same institution? Consider:
1. Name similarity (allowing for translations/abbreviations)
2. Location consistency
3. Type consistency (archive, museum, library, etc.)
Respond in JSON format:
{{"is_match": true/false, "confidence": 0.0-1.0, "reasoning": "..."}}
"""
else: # youtube
prompt = f"""You are verifying if a YouTube channel belongs to a heritage institution.
INSTITUTION:
- Name: {institution_name}
- Wikidata: {institution_info.get('wikidata_id', 'N/A')}
- City: {institution_info.get('city', 'N/A')}
- Country: {institution_info.get('country', 'N/A')}
- Type: {institution_info.get('type', 'N/A')}
YOUTUBE CHANNEL CANDIDATE:
- Title: {candidate_name}
- Description: {candidate_info.get('description', 'N/A')[:500]}
- Country: {candidate_info.get('country', 'N/A')}
- Subscribers: {candidate_info.get('subscriber_count', 'N/A')}
Is this YouTube channel the official channel of this institution? Consider:
1. Name similarity
2. Description relevance to heritage/archives/museums
3. Location consistency
Respond in JSON format:
{{"is_match": true/false, "confidence": 0.0-1.0, "reasoning": "..."}}
"""
# Call GLM 4.6 API with retry
content = await call_glm_with_retry(prompt)
if content is None:
return {
"is_match": None,
"confidence": 0.5,
"reasoning": "LLM verification failed - API error",
"agent": ZAI_MODEL,
"verified": False,
}
# Parse JSON response
try:
# Extract JSON from response
json_match = re.search(r'\{[^}]+\}', content, re.DOTALL)
if json_match:
result = json.loads(json_match.group())
result["agent"] = ZAI_MODEL
result["verified"] = True
result["ch_annotator_version"] = CH_ANNOTATOR_VERSION
return result
except json.JSONDecodeError:
pass
# Fallback if JSON parsing fails
is_match = "true" in content.lower() and "false" not in content.lower()
return {
"is_match": is_match,
"confidence": 0.7 if is_match else 0.3,
"reasoning": content[:200],
"agent": ZAI_MODEL,
"verified": True,
"ch_annotator_version": CH_ANNOTATOR_VERSION,
}
# ============================================================================
# Main Enrichment Pipeline
# ============================================================================
async def enrich_custodian_file(
filepath: Path,
client: httpx.Client,
force: bool = False,
dry_run: bool = False,
) -> Tuple[bool, str]:
"""
Enrich a single custodian YAML file with YouTube and Google Maps data.
Returns:
Tuple of (modified: bool, status: str)
"""
logger.info(f"Processing: {filepath.name}")
# Load YAML
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
return False, "Empty file"
modified = False
statuses = []
# Check if already enriched
has_maps = entry.get("google_maps_enrichment") is not None
has_youtube = entry.get("youtube_enrichment") is not None
if not force and has_maps and has_youtube:
return False, "Already enriched (use --force to re-enrich)"
# Extract info for matching
institution_name = get_institution_name(entry)
if not institution_name:
return False, "No institution name found"
country_code = get_country_code(entry)
city_name = get_city_name(entry)
coords = get_coordinates(entry)
wikidata_id = get_wikidata_id(entry)
institution_info = {
"wikidata_id": wikidata_id,
"city": city_name,
"country": country_code,
"type": entry.get("wikidata_enrichment", {}).get("instance_of", ""),
}
logger.info(f" Institution: {institution_name}")
logger.info(f" Location: {city_name}, {country_code}")
# -------------------------------------------------------------------------
# Google Maps Enrichment
# -------------------------------------------------------------------------
if not has_maps or force:
query = build_maps_search_query(entry)
logger.info(f" Maps query: {query}")
time.sleep(REQUEST_DELAY)
place = search_google_place(query, client, country_code, coords)
if place:
maps_data = parse_google_place(place)
candidate_name = maps_data.get("name", "")
logger.info(f" Maps found: {candidate_name}")
# LLM verification
verification = await verify_match_with_llm(
institution_name,
institution_info,
candidate_name,
maps_data,
"google_maps"
)
if verification.get("is_match") is True:
maps_data["llm_verification"] = verification
entry["google_maps_enrichment"] = maps_data
entry["google_maps_status"] = "SUCCESS"
modified = True
statuses.append(f"Maps: {candidate_name} (conf: {verification.get('confidence', 0):.2f})")
logger.info(f" ✓ Maps verified: {verification.get('reasoning', '')[:60]}")
elif verification.get("is_match") is False:
entry["google_maps_status"] = "NO_MATCH"
entry["google_maps_rejected"] = {
"candidate_name": candidate_name,
"rejection_reason": verification.get("reasoning", ""),
"timestamp": datetime.now(timezone.utc).isoformat(),
}
modified = True
statuses.append("Maps: rejected by LLM")
logger.info(f" ✗ Maps rejected: {verification.get('reasoning', '')[:60]}")
else:
# Verification skipped or failed - include with warning
maps_data["llm_verification"] = verification
entry["google_maps_enrichment"] = maps_data
entry["google_maps_status"] = "UNVERIFIED"
modified = True
statuses.append(f"Maps: {candidate_name} (unverified)")
else:
entry["google_maps_status"] = "NOT_FOUND"
entry["google_maps_search_query"] = query
entry["google_maps_search_timestamp"] = datetime.now(timezone.utc).isoformat()
modified = True
statuses.append("Maps: not found")
# -------------------------------------------------------------------------
# YouTube Enrichment
# -------------------------------------------------------------------------
if not has_youtube or force:
# Build YouTube search query
youtube_query = f"{institution_name} official"
logger.info(f" YouTube query: {youtube_query}")
time.sleep(REQUEST_DELAY)
search_result = search_youtube_channel(youtube_query, client)
if search_result and search_result.get("candidates"):
candidates = search_result["candidates"]
logger.info(f" YouTube candidates: {len(candidates)}")
# Try each candidate
best_match = None
best_verification = None
for candidate in candidates[:3]: # Top 3 candidates
channel_id = candidate.get("id", {}).get("channelId")
if not channel_id:
continue
# Get full channel details
time.sleep(REQUEST_DELAY)
channel_details = get_youtube_channel_details(channel_id, client)
if not channel_details:
continue
youtube_data = parse_youtube_channel(channel_details)
candidate_name = youtube_data.get("title", "")
# LLM verification
verification = await verify_match_with_llm(
institution_name,
institution_info,
candidate_name,
youtube_data,
"youtube"
)
if verification.get("is_match") is True:
if best_verification is None or verification.get("confidence", 0) > best_verification.get("confidence", 0):
best_match = youtube_data
best_verification = verification
logger.info(f" YouTube match: {candidate_name} (conf: {verification.get('confidence', 0):.2f})")
if best_match:
best_match["llm_verification"] = best_verification
entry["youtube_enrichment"] = best_match
entry["youtube_status"] = "SUCCESS"
modified = True
statuses.append(f"YouTube: {best_match.get('title', '')} ({best_match.get('subscriber_count', 0)} subs)")
else:
entry["youtube_status"] = "NO_MATCH"
entry["youtube_search_query"] = youtube_query
entry["youtube_candidates_rejected"] = len(candidates)
entry["youtube_search_timestamp"] = datetime.now(timezone.utc).isoformat()
modified = True
statuses.append("YouTube: no verified match")
else:
entry["youtube_status"] = "NOT_FOUND"
entry["youtube_search_query"] = youtube_query
entry["youtube_search_timestamp"] = datetime.now(timezone.utc).isoformat()
modified = True
statuses.append("YouTube: not found")
# -------------------------------------------------------------------------
# Add provenance note
# -------------------------------------------------------------------------
if modified:
if "provenance" not in entry:
entry["provenance"] = {}
if "notes" not in entry["provenance"]:
entry["provenance"]["notes"] = []
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
entry["provenance"]["notes"].append(
f"YouTube/Google Maps enrichment {timestamp}: {'; '.join(statuses)}"
)
# -------------------------------------------------------------------------
# Save file
# -------------------------------------------------------------------------
if modified and not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
logger.info(f" Saved: {filepath.name}")
status = "; ".join(statuses) if statuses else "No changes"
return modified, status
async def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Enrich custodian files with YouTube and Google Maps data"
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Don't save changes, just show what would be done"
)
parser.add_argument(
"--force",
action="store_true",
help="Re-enrich even if already enriched"
)
parser.add_argument(
"--limit",
type=int,
default=None,
help="Limit number of files to process"
)
parser.add_argument(
"--files",
nargs="+",
help="Specific files to process (just filenames)"
)
parser.add_argument(
"--pattern",
type=str,
default=None,
help="Glob pattern for files (e.g., 'ZA-*.yaml')"
)
args = parser.parse_args()
# Check for required API keys
if not GOOGLE_PLACES_TOKEN and not GOOGLE_YOUTUBE_TOKEN:
logger.error("No API keys found! Set GOOGLE_PLACES_TOKEN or GOOGLE_YOUTUBE_TOKEN")
sys.exit(1)
# Find files to process
if args.files:
files = [CUSTODIAN_DIR / f for f in args.files]
files = [f for f in files if f.exists()]
elif args.pattern:
files = sorted(CUSTODIAN_DIR.glob(args.pattern))
else:
files = sorted(CUSTODIAN_DIR.glob("*.yaml"))
if args.limit:
files = files[:args.limit]
logger.info(f"Found {len(files)} files to process")
if args.dry_run:
logger.info("DRY RUN - no files will be modified")
# Process files
results = {"modified": 0, "skipped": 0, "errors": 0}
with httpx.Client(timeout=60.0) as client:
for filepath in files:
try:
modified, status = await enrich_custodian_file(
filepath, client, args.force, args.dry_run
)
if modified:
results["modified"] += 1
else:
results["skipped"] += 1
logger.info(f" Status: {status}")
except Exception as e:
logger.error(f"Error processing {filepath.name}: {e}")
results["errors"] += 1
# Rate limiting between files
time.sleep(REQUEST_DELAY)
# Summary
logger.info("=" * 60)
logger.info(f"SUMMARY: {results['modified']} modified, {results['skipped']} skipped, {results['errors']} errors")
if __name__ == "__main__":
asyncio.run(main())

View file

@ -0,0 +1,386 @@
#!/usr/bin/env python3
"""
Enrich custodian descriptions using available data sources and GLM-4.6.
This script:
1. Finds custodian files with placeholder descriptions
2. Gathers available data (Wikidata, Google Maps, UNESCO MoW, etc.)
3. Uses GLM-4.6 to generate a rich description
4. Updates the file with the new description
Usage:
python enrich_descriptions.py --limit 10 # Process 10 files
python enrich_descriptions.py --dry-run # Show what would be done
python enrich_descriptions.py --all # Process all files
"""
import asyncio
import argparse
import os
import re
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
import httpx
from ruamel.yaml import YAML
# Load environment
from dotenv import load_dotenv
load_dotenv()
# Constants
DATA_DIR = Path(__file__).parent.parent / "data" / "custodian"
PLACEHOLDER_DESCRIPTION = "Heritage institution holding UNESCO Memory of the World inscribed documents"
# Z.AI GLM API configuration
ZAI_API_URL = "https://api.z.ai/api/coding/paas/v4/chat/completions"
class DescriptionEnricher:
"""Enrich custodian descriptions using GLM-4.6."""
SYSTEM_PROMPT = """You are a cultural heritage expert writing descriptions for heritage institutions.
Your task is to create a concise, informative description (2-4 sentences) for a heritage institution based on the available data.
## Guidelines
- Focus on what makes the institution significant
- Include the type of collections if known (manuscripts, archives, art, etc.)
- Mention UNESCO Memory of the World inscriptions if present
- Include location context when relevant
- Use formal, encyclopedic tone
- Do NOT invent information not present in the data
- Keep descriptions under 100 words
## Output Format
Provide ONLY the description text, no quotes or formatting.
"""
def __init__(self, model: str = "glm-4.6", dry_run: bool = False):
self.api_key = os.environ.get("ZAI_API_TOKEN")
if not self.api_key:
raise ValueError("ZAI_API_TOKEN not found in environment. See docs/GLM_API_SETUP.md")
self.model = model
self.dry_run = dry_run
self.yaml = YAML()
self.yaml.preserve_quotes = True
self.yaml.default_flow_style = False
self.yaml.width = 4096 # Prevent line wrapping
self.client = httpx.AsyncClient(
timeout=60.0,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
)
self.stats = {
"processed": 0,
"enriched": 0,
"skipped": 0,
"errors": 0,
}
async def close(self):
"""Close the HTTP client."""
await self.client.aclose()
def find_files_with_placeholder(self, limit: Optional[int] = None) -> List[Path]:
"""Find custodian files with placeholder descriptions."""
files = []
for yaml_file in DATA_DIR.glob("*.yaml"):
try:
with open(yaml_file, 'r', encoding='utf-8') as f:
data = self.yaml.load(f)
if not data:
continue
# Check for placeholder in wikidata_enrichment.wikidata_description_en
wd_desc = data.get('wikidata_enrichment', {}).get('wikidata_description_en', '')
if PLACEHOLDER_DESCRIPTION in str(wd_desc):
files.append(yaml_file)
if limit and len(files) >= limit:
break
except Exception as e:
print(f"Error reading {yaml_file}: {e}")
return files
def gather_context(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Gather all available context from the entry."""
context = {
"name": None,
"type": None,
"location": {},
"wikidata": {},
"google_maps": {},
"unesco_mow": {},
"collections": [],
}
# Name from various sources
if 'custodian_name' in data:
context['name'] = data['custodian_name'].get('claim_value')
elif 'wikidata_enrichment' in data:
context['name'] = data['wikidata_enrichment'].get('wikidata_label_en')
elif 'original_entry' in data:
context['name'] = data['original_entry'].get('name') or data['original_entry'].get('organisatie')
# Institution type
if 'wikidata_enrichment' in data:
context['type'] = data['wikidata_enrichment'].get('instance_of')
# Location from GHCID
if 'ghcid' in data:
loc_res = data['ghcid'].get('location_resolution', {})
context['location'] = {
"city": loc_res.get('city_label'),
"country": loc_res.get('country_label'),
"region": loc_res.get('region_code'),
}
# Wikidata data
if 'wikidata_enrichment' in data:
wd = data['wikidata_enrichment']
context['wikidata'] = {
"qid": wd.get('wikidata_entity_id'),
"instance_of": wd.get('instance_of'),
}
# Google Maps data
if 'google_maps_enrichment' in data:
gm = data['google_maps_enrichment']
context['google_maps'] = {
"name": gm.get('name'),
"types": gm.get('google_place_types', []),
"address": gm.get('formatted_address'),
"primary_type": gm.get('primary_type'),
}
# UNESCO Memory of the World
if 'unesco_mow_enrichment' in data:
mow = data['unesco_mow_enrichment']
context['unesco_mow'] = {
"is_custodian": mow.get('is_mow_custodian', False),
"inscription_count": mow.get('inscription_count', 0),
"inscriptions": [
{"name": i.get('name'), "country": i.get('inscription_country')}
for i in mow.get('inscriptions', [])
],
}
return context
def build_prompt(self, context: Dict[str, Any]) -> str:
"""Build a prompt for GLM based on available context."""
parts = [f"Institution: {context['name']}"]
if context['type']:
parts.append(f"Type: {context['type']}")
if context['location'].get('city'):
loc = context['location']
loc_str = f"Location: {loc['city']}"
if loc.get('country'):
loc_str += f", {loc['country']}"
parts.append(loc_str)
if context['google_maps'].get('types'):
parts.append(f"Google Maps Types: {', '.join(context['google_maps']['types'])}")
if context['unesco_mow'].get('is_custodian'):
mow = context['unesco_mow']
inscriptions = mow.get('inscriptions', [])
if inscriptions:
inscription_names = [i['name'] for i in inscriptions[:3]] # Limit to 3
parts.append(f"UNESCO Memory of the World inscriptions held: {', '.join(inscription_names)}")
if mow['inscription_count'] > 3:
parts.append(f"(Total: {mow['inscription_count']} inscriptions)")
if context['wikidata'].get('qid'):
parts.append(f"Wikidata ID: {context['wikidata']['qid']}")
return "\n".join(parts)
async def generate_description(self, context: Dict[str, Any]) -> Optional[str]:
"""Generate a description using GLM-4.6."""
prompt = self.build_prompt(context)
try:
response = await self.client.post(
ZAI_API_URL,
json={
"model": self.model,
"messages": [
{"role": "system", "content": self.SYSTEM_PROMPT},
{"role": "user", "content": prompt}
],
"temperature": 0.3,
"max_tokens": 1024, # GLM-4.6 needs room for reasoning + content
}
)
if response.status_code != 200:
print(f" API Error: {response.status_code}")
print(f" Response: {response.text[:500]}")
return None
result = response.json()
if "choices" not in result or len(result["choices"]) == 0:
print(f" No choices in response")
return None
content = result["choices"][0]["message"]["content"]
if not content or content.strip() == "":
# GLM-4.6 sometimes puts content in reasoning_content
reasoning = result["choices"][0]["message"].get("reasoning_content", "")
if reasoning:
print(f" Warning: Content was empty, model only provided reasoning")
return None
# Clean up the response
content = content.strip().strip('"').strip("'")
return content
except httpx.HTTPStatusError as e:
print(f" HTTP Error: {e.response.status_code}")
return None
except Exception as e:
print(f" Error calling GLM API: {type(e).__name__}: {e}")
return None
async def enrich_file(self, file_path: Path) -> bool:
"""Enrich a single file with a better description."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = self.yaml.load(f)
if not data:
return False
# Gather context
context = self.gather_context(data)
if not context['name']:
print(f" Skipping {file_path.name}: No name found")
self.stats['skipped'] += 1
return False
print(f" Processing: {context['name']}")
if self.dry_run:
print(f" [DRY RUN] Would generate description from context:")
print(f" - Type: {context['type']}")
print(f" - Location: {context['location'].get('city')}, {context['location'].get('country')}")
if context['unesco_mow'].get('is_custodian'):
print(f" - UNESCO MoW inscriptions: {context['unesco_mow']['inscription_count']}")
return True
# Generate new description
new_description = await self.generate_description(context)
if not new_description:
print(f" Failed to generate description")
self.stats['errors'] += 1
return False
print(f" Generated: {new_description[:80]}...")
# Update the file
if 'wikidata_enrichment' not in data:
data['wikidata_enrichment'] = {}
data['wikidata_enrichment']['wikidata_description_en'] = new_description
data['wikidata_enrichment']['description_enrichment'] = {
'method': 'glm-4.6',
'timestamp': datetime.now(timezone.utc).isoformat(),
'source_data': ['wikidata', 'google_maps', 'unesco_mow'],
}
# Write back
with open(file_path, 'w', encoding='utf-8') as f:
self.yaml.dump(data, f)
self.stats['enriched'] += 1
return True
except Exception as e:
print(f" Error processing {file_path.name}: {e}")
self.stats['errors'] += 1
return False
async def run(self, limit: Optional[int] = None):
"""Run the enrichment process."""
print(f"Finding files with placeholder descriptions...")
files = self.find_files_with_placeholder(limit)
print(f"Found {len(files)} files to process")
if not files:
print("No files need enrichment.")
return
for i, file_path in enumerate(files, 1):
print(f"\n[{i}/{len(files)}] {file_path.name}")
await self.enrich_file(file_path)
self.stats['processed'] += 1
# Small delay between API calls
if not self.dry_run:
await asyncio.sleep(0.5)
await self.close()
# Print summary
print("\n" + "=" * 50)
print("SUMMARY")
print("=" * 50)
print(f"Processed: {self.stats['processed']}")
print(f"Enriched: {self.stats['enriched']}")
print(f"Skipped: {self.stats['skipped']}")
print(f"Errors: {self.stats['errors']}")
async def main():
parser = argparse.ArgumentParser(
description="Enrich custodian descriptions using GLM-4.6"
)
parser.add_argument(
"--limit", "-n", type=int, default=10,
help="Maximum number of files to process (default: 10)"
)
parser.add_argument(
"--dry-run", "-d", action="store_true",
help="Show what would be done without making changes"
)
parser.add_argument(
"--all", "-a", action="store_true",
help="Process all files (ignores --limit)"
)
parser.add_argument(
"--model", "-m", type=str, default="glm-4.6",
help="GLM model to use (default: glm-4.6)"
)
args = parser.parse_args()
limit = None if args.all else args.limit
enricher = DescriptionEnricher(
model=args.model,
dry_run=args.dry_run,
)
await enricher.run(limit=limit)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -76,6 +76,8 @@ def extract_top_level_fields(data: dict) -> dict:
# Custodian name consensus
"custodian_name": "",
"custodian_name_confidence": None,
"emic_name": "", # Official name in native/local language
"name_language": "", # ISO 639-1 language code for emic_name
# Ratings
"google_rating": None,
@ -87,10 +89,44 @@ def extract_top_level_fields(data: dict) -> dict:
"timespan_notes": "",
"timespan_json": "",
# Conflict-related temporal data (Palestinian heritage, etc.)
"time_of_destruction_json": "",
"conflict_status_json": "",
"destruction_date": None, # From time_of_destruction.date or conflict_status.date
# Temporal extent (founding/dissolution dates)
"founding_date": None,
"dissolution_date": None,
"temporal_extent_json": "",
# Wikidata inception (P571)
"wikidata_inception": None,
# YouTube enrichment fields (extracted for querying)
"youtube_channel_id": "",
"youtube_channel_title": "",
"youtube_channel_url": "",
"youtube_subscriber_count": None,
"youtube_video_count": None,
"youtube_view_count": None,
"youtube_published_at": None,
"youtube_description": "",
# Google Maps extended fields (in addition to rating/total_ratings)
"google_place_id": "",
"google_business_status": "",
"google_website": "",
"google_phone_international": "",
"google_primary_type": "",
"google_opening_hours_json": "",
"google_reviews_json": "",
"google_photo_count": None,
# Complex nested objects as JSON strings
"original_entry_json": "",
"wikidata_enrichment_json": "",
"google_maps_enrichment_json": "",
"youtube_enrichment_json": "",
"web_enrichment_json": "",
"web_claims_json": "",
"ghcid_json": "",
@ -98,6 +134,7 @@ def extract_top_level_fields(data: dict) -> dict:
"provenance_json": "",
"genealogiewerkbalk_json": "",
"digital_platforms_json": "",
"service_area_json": "",
}
# Extract GHCID
@ -172,12 +209,49 @@ def extract_top_level_fields(data: dict) -> dict:
}
record["org_type"] = type_map.get(type_code, type_code)
# Extract Google Maps data
# ==========================================================================
# COORDINATE EXTRACTION - Priority order (first valid wins)
# ==========================================================================
# 1a. google_maps_enrichment.coordinates.latitude/longitude (nested)
# 1b. google_maps_enrichment.latitude/longitude (flat - Argentine files)
# 2. ghcid.location_resolution.source_coordinates.latitude/longitude
# 3. wikidata_enrichment.wikidata_coordinates.latitude/longitude
# 4. locations[0].latitude/longitude OR locations[0].lat/lon
# 5. original_entry.locations[0].latitude/longitude
# 6. root-level latitude/longitude
# ==========================================================================
# Helper to check if coordinates are valid
def is_valid_coord(lat, lon):
if lat is None or lon is None:
return False
try:
lat_f = float(lat)
lon_f = float(lon)
return -90 <= lat_f <= 90 and -180 <= lon_f <= 180
except (ValueError, TypeError):
return False
# 1. Extract Google Maps data (highest priority for coordinates)
gm = data.get("google_maps_enrichment", {})
if gm:
# 1a. Try nested structure first: google_maps_enrichment.coordinates.latitude
coords = gm.get("coordinates", {})
record["latitude"] = coords.get("latitude")
record["longitude"] = coords.get("longitude")
lat = coords.get("latitude")
lon = coords.get("longitude")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# 1b. Fallback to flat structure: google_maps_enrichment.latitude
# (used by Argentine and other recent enrichments)
if record["latitude"] is None:
lat = gm.get("latitude")
lon = gm.get("longitude")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
record["formatted_address"] = gm.get("formatted_address", "")
record["google_rating"] = gm.get("rating")
record["google_total_ratings"] = gm.get("total_ratings")
@ -193,8 +267,68 @@ def extract_top_level_fields(data: dict) -> dict:
record["postal_code"] = comp.get("long_name", "")
record["google_maps_enrichment_json"] = json.dumps(gm, ensure_ascii=False, default=str)
# Extract extended Google Maps fields
record["google_place_id"] = gm.get("place_id", "")
record["google_business_status"] = gm.get("business_status", "")
record["google_website"] = gm.get("website", "")
record["google_phone_international"] = gm.get("phone_international", "")
record["google_primary_type"] = gm.get("primary_type", "")
record["google_photo_count"] = gm.get("photo_count")
# Opening hours as JSON (complex nested structure)
if gm.get("opening_hours"):
record["google_opening_hours_json"] = json.dumps(
gm["opening_hours"], ensure_ascii=False, default=str
)
# Reviews as JSON array
if gm.get("reviews"):
record["google_reviews_json"] = json.dumps(
gm["reviews"], ensure_ascii=False, default=str
)
# Fallback: Extract location from locations array if not set from Google Maps
# ==========================================================================
# YOUTUBE ENRICHMENT EXTRACTION
# ==========================================================================
yt = data.get("youtube_enrichment", {})
if yt:
record["youtube_enrichment_json"] = json.dumps(yt, ensure_ascii=False, default=str)
# Extract channel data
channel = yt.get("channel", {})
if channel:
record["youtube_channel_id"] = channel.get("channel_id", "")
record["youtube_channel_title"] = channel.get("title", "")
record["youtube_channel_url"] = channel.get("channel_url", "")
record["youtube_subscriber_count"] = channel.get("subscriber_count")
record["youtube_video_count"] = channel.get("video_count")
record["youtube_view_count"] = channel.get("view_count")
record["youtube_published_at"] = channel.get("published_at")
record["youtube_description"] = channel.get("description", "")
# 2. Fallback: GHCID location_resolution.source_coordinates
ghcid = data.get("ghcid", {})
if ghcid and record["latitude"] is None:
loc_res = ghcid.get("location_resolution", {})
src_coords = loc_res.get("source_coordinates", {})
lat = src_coords.get("latitude")
lon = src_coords.get("longitude")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# 3. Fallback: Wikidata coordinates
wd = data.get("wikidata_enrichment", {})
if wd and record["latitude"] is None:
wd_coords = wd.get("wikidata_coordinates", {})
lat = wd_coords.get("latitude")
lon = wd_coords.get("longitude")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# 4. Fallback: locations array
locations = data.get("locations", [])
if locations and isinstance(locations, list) and len(locations) > 0:
loc = locations[0] # Use first location
@ -202,13 +336,44 @@ def extract_top_level_fields(data: dict) -> dict:
record["city"] = loc.get("city", "")
if not record["country"] and loc.get("country"):
record["country"] = loc.get("country", "")
if record["latitude"] is None and loc.get("latitude"):
record["latitude"] = loc.get("latitude")
if record["longitude"] is None and loc.get("longitude"):
record["longitude"] = loc.get("longitude")
if record["latitude"] is None:
# Try latitude/longitude first, then lat/lon
lat = loc.get("latitude") or loc.get("lat")
lon = loc.get("longitude") or loc.get("lon")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# 5. Fallback: original_entry.locations array (Japanese files, etc.)
orig_locations = original.get("locations", []) if original else []
if orig_locations and isinstance(orig_locations, list) and len(orig_locations) > 0:
orig_loc = orig_locations[0]
if record["latitude"] is None:
lat = orig_loc.get("latitude") or orig_loc.get("lat")
lon = orig_loc.get("longitude") or orig_loc.get("lon")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# Also try to get city/country from original_entry.locations if not set
if not record["city"] and orig_loc.get("city"):
record["city"] = orig_loc.get("city", "")
if not record["country"] and orig_loc.get("country"):
record["country"] = orig_loc.get("country", "")
# 6. Fallback: Root-level coordinates
if record["latitude"] is None:
lat = data.get("latitude") or data.get("lat")
lon = data.get("longitude") or data.get("lon")
if is_valid_coord(lat, lon):
record["latitude"] = lat
record["longitude"] = lon
# ==========================================================================
# COUNTRY/CITY EXTRACTION - Fallbacks from GHCID
# ==========================================================================
# Fallback: Extract country from GHCID location_resolution
ghcid = data.get("ghcid", {})
if ghcid and not record["country"]:
loc_res = ghcid.get("location_resolution", {})
if loc_res.get("country_code"):
@ -264,6 +429,11 @@ def extract_top_level_fields(data: dict) -> dict:
data["digital_platforms"], ensure_ascii=False, default=str
)
if data.get("service_area"):
record["service_area_json"] = json.dumps(
data["service_area"], ensure_ascii=False, default=str
)
# Extract TimeSpan (CIDOC-CRM E52_Time-Span)
timespan = data.get("timespan", {})
if timespan:
@ -273,6 +443,77 @@ def extract_top_level_fields(data: dict) -> dict:
record["timespan_notes"] = timespan.get("notes", "")
record["timespan_json"] = json.dumps(timespan, ensure_ascii=False, default=str)
# ==========================================================================
# TEMPORAL DATA EXTRACTION - Multiple paths
# ==========================================================================
# Extract time_of_destruction (conflict-related: PS-GZ-*, PS-GZA-* files)
time_of_destruction = data.get("time_of_destruction", {})
if time_of_destruction:
record["time_of_destruction_json"] = json.dumps(time_of_destruction, ensure_ascii=False, default=str)
# Extract destruction date
if time_of_destruction.get("date"):
record["destruction_date"] = time_of_destruction.get("date")
# Extract conflict_status (current operational status)
conflict_status = data.get("conflict_status", {})
if conflict_status:
record["conflict_status_json"] = json.dumps(conflict_status, ensure_ascii=False, default=str)
# If status is 'destroyed' and we don't have destruction_date yet, use this
if conflict_status.get("status") == "destroyed" and not record.get("destruction_date"):
record["destruction_date"] = conflict_status.get("date")
# Extract temporal_extent (founding/dissolution dates)
temporal_extent = data.get("temporal_extent", {})
if temporal_extent:
record["temporal_extent_json"] = json.dumps(temporal_extent, ensure_ascii=False, default=str)
record["founding_date"] = temporal_extent.get("founding_date")
record["dissolution_date"] = temporal_extent.get("dissolution_date") or temporal_extent.get("end_date")
# Fallback: Check identifiers for temporal_extent
identifiers = data.get("identifiers", {})
if identifiers and isinstance(identifiers, dict):
id_temporal = identifiers.get("temporal_extent", {})
if id_temporal and not record.get("founding_date"):
record["founding_date"] = id_temporal.get("founding_date")
if id_temporal and not record.get("dissolution_date"):
record["dissolution_date"] = id_temporal.get("dissolution_date") or id_temporal.get("end_date")
# Also check for founding_year in identifiers
if identifiers.get("founding_year") and not record.get("founding_date"):
# Convert year to date format
record["founding_date"] = f"{identifiers['founding_year']}-01-01"
# Extract wikidata_inception from wikidata_enrichment
wd = data.get("wikidata_enrichment", {})
if wd:
# Direct wikidata_inception field
if wd.get("wikidata_inception"):
record["wikidata_inception"] = wd.get("wikidata_inception")
# Or from wikidata_claims.inception
elif wd.get("wikidata_claims", {}).get("inception"):
record["wikidata_inception"] = wd.get("wikidata_claims", {}).get("inception")
# Fallback: Check web_enrichment claims for inception or founding_date
web_enrichment = data.get("web_enrichment", {})
if web_enrichment and web_enrichment.get("claims"):
for claim in web_enrichment.get("claims", []):
claim_type = claim.get("claim_type", "")
if claim_type in ("inception", "founding_date") and not record.get("founding_date"):
record["founding_date"] = claim.get("claim_value")
break
# Final consolidation: If we have timespan_begin but no founding_date, use it
if record.get("timespan_begin") and not record.get("founding_date"):
record["founding_date"] = record["timespan_begin"]
# If we have timespan_end but no dissolution_date, use it
if record.get("timespan_end") and not record.get("dissolution_date"):
record["dissolution_date"] = record["timespan_end"]
# If we have destruction_date but no dissolution_date, use it
if record.get("destruction_date") and not record.get("dissolution_date"):
record["dissolution_date"] = record["destruction_date"]
return record
@ -403,8 +644,21 @@ def main():
# Show sample record
print("\nSample record (first):")
sample = records[0]
for key in ["file_name", "ghcid_current", "custodian_name", "city", "country"]:
print(f" {key}: {sample.get(key, 'N/A')}")
for key in ["file_name", "ghcid_current", "custodian_name", "city", "country",
"google_rating", "youtube_channel_id"]:
value = sample.get(key, 'N/A')
if value == "" or value is None:
value = "(empty)"
print(f" {key}: {value}")
# Count non-empty enrichment fields
yt_count = sum(1 for r in records if r.get("youtube_channel_id"))
gm_count = sum(1 for r in records if r.get("google_place_id"))
coord_count = sum(1 for r in records if r.get("latitude") is not None)
print(f"\nEnrichment summary:")
print(f" With coordinates: {coord_count}/{len(records)}")
print(f" With Google Maps: {gm_count}/{len(records)}")
print(f" With YouTube: {yt_count}/{len(records)}")
if args.dry_run:
print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.")