glam/scripts/enrich_hyponyms_with_wikidata.py.bak
2025-11-19 23:25:22 +01:00

606 lines
22 KiB
Python

#!/usr/bin/env python3
"""
Enrich hyponyms_curated.yaml with full Wikidata metadata.
This script:
1. Reads hyponyms_curated.yaml
2. Fetches ALL Wikidata properties for each Q-number (except 'exclude' section)
3. Maintains a register of fetched entities to avoid re-fetching
4. Preserves existing curation data (country, time, hypernym, duplicate, type, etc.)
5. Outputs to hyponyms_curated_full.yaml
Usage:
python scripts/enrich_hyponyms_with_wikidata.py
# Force refresh specific entities:
python scripts/enrich_hyponyms_with_wikidata.py --refresh Q12345,Q67890
# Refresh all entities:
python scripts/enrich_hyponyms_with_wikidata.py --refresh-all
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
import requests
import yaml
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
# Constants
INPUT_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/hyponyms_curated.yaml"
OUTPUT_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/hyponyms_curated_full.yaml"
REGISTER_FILE = PROJECT_ROOT / "data/wikidata/GLAMORCUBEPSXHFN/.fetch_register.json"
WIKIDATA_API_BASE = "https://www.wikidata.org/w/api.php"
RATE_LIMIT_DELAY = 0.1 # 100ms between requests (10 req/sec)
# Sections to process (exclude is NOT processed)
SECTIONS_TO_PROCESS = ['hypernym', 'entity', 'entity_list', 'standard', 'collection']
class WikidataFetcher:
"""Fetch and cache Wikidata entity data."""
def __init__(self, register_file: Path):
self.register_file = register_file
self.register = self._load_register()
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'GLAM-Enrichment-Bot/1.0 (https://github.com/glam-project)'
})
def _load_register(self) -> Dict[str, Any]:
"""Load fetch register from disk."""
if self.register_file.exists():
with open(self.register_file, 'r', encoding='utf-8') as f:
return json.load(f)
return {
'entities': {},
'last_updated': None,
'total_fetches': 0
}
def _save_register(self):
"""Save fetch register to disk."""
self.register['last_updated'] = datetime.now(timezone.utc).isoformat()
self.register_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.register_file, 'w', encoding='utf-8') as f:
json.dump(self.register, f, indent=2, ensure_ascii=False)
def get_entity_data(self, identifier: str, force_refresh: bool = False) -> Optional[Dict[str, Any]]:
"""
Fetch complete entity data from Wikidata.
Args:
identifier: Wikidata identifier (Q12345, P31, Category:..., List_of_...)
force_refresh: Force re-fetch even if cached
Returns:
Dictionary with all Wikidata properties or None if fetch fails
"""
# Check cache
if not force_refresh and identifier in self.register['entities']:
cached = self.register['entities'][identifier]
print(f"{identifier} (cached from {cached.get('fetch_date', 'unknown')})")
return cached.get('data')
# Fetch from API
print(f" ⬇ Fetching {identifier} from Wikidata...")
try:
# Get entity data using Wikibase REST API
params = {
'action': 'wbgetentities',
'ids': identifier,
'format': 'json',
'props': 'info|sitelinks|aliases|labels|descriptions|claims|datatype'
}
response = self.session.get(WIKIDATA_API_BASE, params=params, timeout=30)
response.raise_for_status()
data = response.json()
# Check for errors
if 'error' in data:
print(f" ✗ Error fetching {identifier}: {data['error'].get('info', 'Unknown error')}")
return None
# Extract entity data
entities = data.get('entities', {})
if identifier not in entities:
print(f" ✗ Entity {identifier} not found in Wikidata")
return None
entity_data = entities[identifier]
# Check if entity exists (not deleted/missing)
if 'missing' in entity_data:
print(f" ✗ Entity {identifier} is missing or deleted")
return None
# Cache the data
self.register['entities'][identifier] = {
'data': entity_data,
'fetch_date': datetime.now(timezone.utc).isoformat(),
'fetch_count': self.register['entities'].get(identifier, {}).get('fetch_count', 0) + 1
}
self.register['total_fetches'] += 1
# Rate limiting
time.sleep(RATE_LIMIT_DELAY)
print(f"{identifier} fetched successfully")
return entity_data
except requests.RequestException as e:
print(f" ✗ Network error fetching {identifier}: {e}")
return None
except Exception as e:
print(f" ✗ Unexpected error fetching {identifier}: {e}")
return None
class HyponymEnricher:
"""Enrich hyponyms with Wikidata metadata."""
def __init__(self, input_file: Path, output_file: Path, fetcher: WikidataFetcher):
self.input_file = input_file
self.output_file = output_file
self.fetcher = fetcher
self.stats = {
'total_entities': 0,
'enriched': 0,
'cached': 0,
'failed': 0,
'skipped': 0
}
def load_input(self) -> Dict[str, Any]:
"""Load hyponyms_curated.yaml."""
print(f"\n📂 Loading {self.input_file.name}...")
with open(self.input_file, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
def extract_identifier(self, label: Any) -> Optional[str]:
"""
Extract Wikidata identifier from label.
Handles multiple identifier types:
- Q-numbers: Q12345, 'Q12345 - Museum'
- P-numbers: P31, P2671
- Category identifiers: Category:Virtual_museums
- List identifiers: List_of_museums
Returns:
Identifier string (Q12345, P31, Category:..., List_of_...) or None
"""
if not label:
return None
# Convert to string if it's an integer or other type
label_str = str(label).strip()
# Handle Category: identifiers (Wikipedia categories)
if label_str.startswith('Category:'):
return label_str
# Handle List_of_ identifiers (Wikipedia list pages)
if label_str.startswith('List_of_'):
return label_str
# Handle Q-numbers and P-numbers
parts = label_str.split()
if parts:
first_part = parts[0]
# Q-number (entity)
if first_part.startswith('Q') and first_part[1:].isdigit():
return first_part
# P-number (property)
if first_part.startswith('P') and first_part[1:].isdigit():
return first_part
# Invalid label (like a year integer without Q/P prefix)
# These should be skipped
return None
def enrich_entity(self, entity: Dict[str, Any], force_refresh: bool = False) -> Dict[str, Any]:
"""
Enrich a single entity with Wikidata metadata.
Args:
entity: Entity dict from hyponyms_curated.yaml
force_refresh: Force re-fetch from Wikidata
Returns:
Enriched entity dict with 'curated' and 'wikidata' keys
"""
label = entity.get('label')
identifier = self.extract_identifier(label)
if not identifier:
self.stats['skipped'] += 1
return {
'curated': entity,
'wikidata': None,
'enrichment_status': 'no_identifier',
'identifier': None
}
self.stats['total_entities'] += 1
# Handle Category: and List_of_ identifiers (no Wikidata fetch needed)
if identifier.startswith('Category:') or identifier.startswith('List_of_'):
self.stats['skipped'] += 1
return {
'curated': entity,
'wikidata': None,
'enrichment_status': 'category_or_list',
'identifier': identifier,
'enrichment_note': 'Wikipedia category or list page (no Wikidata entity)'
}
# Fetch Wikidata data (for Q and P identifiers)
wd_data = self.fetcher.get_entity_data(identifier, force_refresh)
if wd_data is None:
self.stats['failed'] += 1
return {
'curated': entity,
'wikidata': None,
'enrichment_status': 'fetch_failed',
'identifier': identifier
}
# Check if from cache
if identifier in self.fetcher.register['entities']:
cached_entry = self.fetcher.register['entities'][identifier]
if cached_entry.get('fetch_count', 0) == 1:
self.stats['enriched'] += 1
else:
self.stats['cached'] += 1
# Build enriched entity
enriched = {
'curated': entity,
'wikidata': self._flatten_wikidata(wd_data),
'enrichment_status': 'success',
'identifier': identifier,
'enrichment_date': datetime.now(timezone.utc).isoformat()
}
return enriched
def _flatten_wikidata(self, wd_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Flatten Wikidata entity data for readability.
Extracts:
- Labels (all languages)
- Descriptions (all languages)
- Aliases (all languages)
- Claims/statements (all properties)
- Sitelinks
- Entity metadata (id, type, modified date)
"""
flattened = {
'id': wd_data.get('id'),
'type': wd_data.get('type'),
'modified': wd_data.get('modified'),
'labels': {},
'descriptions': {},
'aliases': {},
'claims': {},
'sitelinks': {},
'metadata': {}
}
# Labels (all languages)
for lang, label_data in wd_data.get('labels', {}).items():
flattened['labels'][lang] = label_data.get('value')
# Descriptions (all languages)
for lang, desc_data in wd_data.get('descriptions', {}).items():
flattened['descriptions'][lang] = desc_data.get('value')
# Aliases (all languages)
for lang, alias_list in wd_data.get('aliases', {}).items():
flattened['aliases'][lang] = [a.get('value') for a in alias_list]
# Claims (all properties with values)
for prop_id, claim_list in wd_data.get('claims', {}).items():
flattened['claims'][prop_id] = self._extract_claims(claim_list)
# Sitelinks (Wikipedia and other wikis)
for site, sitelink_data in wd_data.get('sitelinks', {}).items():
flattened['sitelinks'][site] = {
'title': sitelink_data.get('title'),
'url': sitelink_data.get('url'),
'badges': sitelink_data.get('badges', [])
}
# Additional metadata
flattened['metadata'] = {
'pageid': wd_data.get('pageid'),
'ns': wd_data.get('ns'),
'title': wd_data.get('title'),
'lastrevid': wd_data.get('lastrevid')
}
return flattened
def _extract_claims(self, claim_list: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""Extract simplified claim/statement data."""
simplified = []
for claim in claim_list:
mainsnak = claim.get('mainsnak', {})
datavalue = mainsnak.get('datavalue', {})
claim_data = {
'property': mainsnak.get('property'),
'rank': claim.get('rank'),
'datatype': mainsnak.get('datatype'),
'value': self._extract_value(datavalue)
}
# Qualifiers
if 'qualifiers' in claim:
claim_data['qualifiers'] = {}
for qual_prop, qual_list in claim['qualifiers'].items():
claim_data['qualifiers'][qual_prop] = [
self._extract_value(q.get('datavalue', {}))
for q in qual_list
]
# References
if 'references' in claim:
claim_data['references'] = [
{
'hash': ref.get('hash'),
'snaks': {
prop: [self._extract_value(s.get('datavalue', {})) for s in snak_list]
for prop, snak_list in ref.get('snaks', {}).items()
}
}
for ref in claim['references']
]
simplified.append(claim_data)
return simplified
def _extract_value(self, datavalue: Dict[str, Any]) -> Any:
"""Extract value from Wikidata datavalue object."""
if not datavalue:
return None
value_type = datavalue.get('type')
value = datavalue.get('value')
if not value:
return None
if value_type == 'wikibase-entityid':
# Entity reference (Q-number)
return f"Q{value.get('numeric-id')}" if isinstance(value, dict) else None
elif value_type == 'string':
return value
elif value_type == 'time' and isinstance(value, dict):
# Time value
return {
'time': value.get('time'),
'precision': value.get('precision'),
'timezone': value.get('timezone'),
'calendarmodel': value.get('calendarmodel')
}
elif value_type == 'quantity' and isinstance(value, dict):
# Quantity with unit
return {
'amount': value.get('amount'),
'unit': value.get('unit'),
'upperBound': value.get('upperBound'),
'lowerBound': value.get('lowerBound')
}
elif value_type == 'monolingualtext' and isinstance(value, dict):
# Text with language
return {
'text': value.get('text'),
'language': value.get('language')
}
elif value_type == 'globecoordinate' and isinstance(value, dict):
# Geographic coordinates
return {
'latitude': value.get('latitude'),
'longitude': value.get('longitude'),
'precision': value.get('precision'),
'globe': value.get('globe')
}
else:
# Other types - return as-is
return value
def enrich_section(self, section_name: str, section_data: List[Dict[str, Any]],
force_refresh: Optional[Set[str]] = None) -> List[Dict[str, Any]]:
"""Enrich all entities in a section."""
if force_refresh is None:
force_refresh = set()
print(f"\n📊 Enriching section: {section_name}")
print(f" Entities: {len(section_data)}")
enriched = []
for i, entity in enumerate(section_data, 1):
if i % 50 == 0:
print(f" Progress: {i}/{len(section_data)} entities")
identifier = self.extract_identifier(entity.get('label'))
should_refresh = identifier in force_refresh if identifier else False
enriched_entity = self.enrich_entity(entity, force_refresh=should_refresh)
enriched.append(enriched_entity)
return enriched
def enrich_all(self, force_refresh: Optional[Set[str]] = None) -> Dict[str, Any]:
"""Enrich all sections in hyponyms_curated.yaml."""
data = self.load_input()
enriched_data = {
'metadata': {
'source_file': str(self.input_file),
'enrichment_date': datetime.now(timezone.utc).isoformat(),
'enrichment_script': __file__,
'wikidata_api': WIKIDATA_API_BASE
},
'sources': data.get('sources', [])
}
# Process each section (except 'exclude')
for section_name in SECTIONS_TO_PROCESS:
if section_name in data:
enriched_data[section_name] = self.enrich_section(
section_name,
data[section_name],
force_refresh=force_refresh
)
else:
print(f"⚠ Section '{section_name}' not found in input file")
# Copy exclude section as-is (no enrichment)
if 'exclude' in data:
enriched_data['exclude'] = data['exclude']
print(f"\n📋 Copied 'exclude' section ({len(data['exclude'])} entries) without enrichment")
return enriched_data
def save_output(self, enriched_data: Dict[str, Any]):
"""Save enriched data to YAML."""
print(f"\n💾 Saving to {self.output_file.name}...")
self.output_file.parent.mkdir(parents=True, exist_ok=True)
with open(self.output_file, 'w', encoding='utf-8') as f:
yaml.dump(enriched_data, f,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
width=120)
print(f"✓ Saved {self.output_file}")
def print_stats(self):
"""Print enrichment statistics."""
print("\n" + "="*60)
print("📈 ENRICHMENT STATISTICS")
print("="*60)
print(f"Total entities processed: {self.stats['total_entities']}")
print(f" ✓ Newly enriched: {self.stats['enriched']}")
print(f" ✓ From cache: {self.stats['cached']}")
print(f" ✗ Failed to fetch: {self.stats['failed']}")
print(f" ⊘ Skipped (no Q-ID): {self.stats['skipped']}")
print("="*60)
def main():
parser = argparse.ArgumentParser(
description='Enrich hyponyms_curated.yaml with Wikidata metadata',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__
)
parser.add_argument(
'--refresh',
type=str,
help='Comma-separated list of Q-numbers to force refresh (e.g., Q12345,Q67890)'
)
parser.add_argument(
'--refresh-all',
action='store_true',
help='Force refresh all entities (re-fetch from Wikidata)'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Perform enrichment but do not save output'
)
args = parser.parse_args()
# Parse refresh list
force_refresh = set()
if args.refresh_all:
force_refresh = {'*'} # Special marker for all entities
elif args.refresh:
force_refresh = {q.strip() for q in args.refresh.split(',')}
print(f"🔄 Force refresh: {', '.join(force_refresh)}")
# Initialize
print("="*60)
print("🚀 WIKIDATA ENRICHMENT SCRIPT")
print("="*60)
fetcher = WikidataFetcher(REGISTER_FILE)
enricher = HyponymEnricher(INPUT_FILE, OUTPUT_FILE, fetcher)
# Check if refresh-all
if args.refresh_all:
print("⚠ REFRESH ALL mode enabled - will re-fetch all entities")
response = input("Continue? (y/n): ")
if response.lower() != 'y':
print("Aborted.")
return
# Convert special marker to actual set of all Q-IDs
data = enricher.load_input()
all_qids = set()
for section in SECTIONS_TO_PROCESS:
if section in data:
for entity in data[section]:
identifier = enricher.extract_identifier(entity.get('label'))
if identifier:
all_qids.add(identifier)
force_refresh = all_qids
print(f"📋 Will refresh {len(force_refresh)} entities")
# Enrich
try:
enriched_data = enricher.enrich_all(force_refresh=force_refresh)
# Save
if not args.dry_run:
enricher.save_output(enriched_data)
fetcher._save_register()
print(f"✓ Fetch register saved to {REGISTER_FILE}")
else:
print("\n🔍 DRY RUN - Output not saved")
# Stats
enricher.print_stats()
if not args.dry_run:
print(f"\n✅ SUCCESS - Enriched data saved to:")
print(f" {OUTPUT_FILE}")
except KeyboardInterrupt:
print("\n\n⚠ Interrupted by user")
fetcher._save_register()
print(f"✓ Fetch register saved to {REGISTER_FILE}")
sys.exit(1)
except Exception as e:
print(f"\n\n❌ ERROR: {e}")
import traceback
traceback.print_exc()
fetcher._save_register()
print(f"✓ Fetch register saved to {REGISTER_FILE}")
sys.exit(1)
if __name__ == '__main__':
main()