glam/scripts/resolve_country_codes.py
kempersc e45c1a3c85 feat(scripts): add city enrichment and location resolution utilities
Enrichment scripts for country-specific city data:
- enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py
- enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py
- enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py

Location resolution utilities:
- resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames
- resolve_cities_wikidata.py - Use Wikidata P131 for city resolution
- resolve_country_codes.py - Standardize country codes
- resolve_cz_xx_regions.py - Fix Czech XX region codes
- resolve_locations_by_name.py - Name-based location lookup
- resolve_regions_from_city.py - Derive regions from city data
- update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data

CH-Annotator integration:
- create_custodian_from_ch_annotator.py - Create custodians from annotations
- add_ch_annotator_location_claims.py - Add location claims
- extract_locations_ch_annotator.py - Extract locations from annotations

Migration and fixes:
- migrate_egyptian_from_ch.py - Migrate Egyptian data
- migrate_web_archives.py - Migrate web archive data
- fix_belgian_cities.py - Fix Belgian city data
2025-12-07 14:26:59 +01:00

472 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Resolve XX country codes using Wikidata P17 (country) lookup.
This script:
1. Finds files with XX country code
2. Extracts Wikidata IDs from the files
3. Queries Wikidata P17 to get country
4. Updates files with resolved country code
5. Renames files to match new GHCID
Following AGENTS.md Rules:
- Rule 5: Additive only - never delete existing data
"""
import os
import sys
import yaml
import json
import re
import urllib.request
import urllib.parse
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
# Wikidata entity ID to ISO 3166-1 alpha-2 country code mapping
WIKIDATA_COUNTRY_TO_ISO = {
'Q213': 'CZ', # Czechia
'Q40': 'AT', # Austria
'Q183': 'DE', # Germany
'Q36': 'PL', # Poland
'Q39': 'CH', # Switzerland
'Q31': 'BE', # Belgium
'Q142': 'FR', # France
'Q145': 'GB', # United Kingdom
'Q38': 'IT', # Italy
'Q29': 'ES', # Spain
'Q55': 'NL', # Netherlands
'Q30': 'US', # United States
'Q17': 'JP', # Japan
'Q884': 'KR', # South Korea
'Q148': 'CN', # China
'Q668': 'IN', # India
'Q155': 'BR', # Brazil
'Q96': 'MX', # Mexico
'Q414': 'AR', # Argentina
'Q298': 'CL', # Chile
'Q45': 'PT', # Portugal
'Q27': 'IE', # Ireland
'Q20': 'NO', # Norway
'Q35': 'DK', # Denmark
'Q34': 'SE', # Sweden
'Q33': 'FI', # Finland
'Q211': 'LV', # Latvia
'Q37': 'LT', # Lithuania
'Q191': 'EE', # Estonia
'Q159': 'RU', # Russia
'Q212': 'UA', # Ukraine
'Q184': 'BY', # Belarus
'Q219': 'BG', # Bulgaria
'Q218': 'RO', # Romania
'Q28': 'HU', # Hungary
'Q214': 'SK', # Slovakia
'Q215': 'SI', # Slovenia
'Q224': 'HR', # Croatia
'Q225': 'BA', # Bosnia and Herzegovina
'Q117': 'GH', # Ghana
'Q115': 'ET', # Ethiopia
'Q1033': 'NG', # Nigeria
'Q258': 'ZA', # South Africa
'Q916': 'AO', # Angola
'Q1008': 'CI', # Ivory Coast
'Q114': 'KE', # Kenya
'Q1044': 'SN', # Senegal
'Q262': 'DZ', # Algeria
'Q1028': 'MA', # Morocco
'Q948': 'TN', # Tunisia
'Q79': 'EG', # Egypt
'Q1030': 'LY', # Libya
'Q265': 'UZ', # Uzbekistan
'Q232': 'KZ', # Kazakhstan
'Q863': 'TJ', # Tajikistan
'Q874': 'TM', # Turkmenistan
'Q813': 'KG', # Kyrgyzstan
'Q889': 'AF', # Afghanistan
'Q794': 'IR', # Iran
'Q796': 'IQ', # Iraq
'Q858': 'SY', # Syria
'Q801': 'IL', # Israel
'Q810': 'JO', # Jordan
'Q822': 'LB', # Lebanon
'Q846': 'QA', # Qatar
'Q878': 'AE', # United Arab Emirates
'Q851': 'SA', # Saudi Arabia
'Q805': 'YE', # Yemen
'Q842': 'OM', # Oman
'Q398': 'BH', # Bahrain
'Q817': 'KW', # Kuwait
'Q16': 'CA', # Canada
'Q408': 'AU', # Australia
'Q664': 'NZ', # New Zealand
'Q869': 'TH', # Thailand
'Q881': 'VN', # Vietnam
'Q928': 'PH', # Philippines
'Q252': 'ID', # Indonesia
'Q833': 'MY', # Malaysia
'Q334': 'SG', # Singapore
'Q836': 'MM', # Myanmar
'Q424': 'KH', # Cambodia
'Q819': 'LA', # Laos
'Q865': 'TW', # Taiwan
'Q921': 'BN', # Brunei
'Q399': 'AM', # Armenia
'Q230': 'GE', # Georgia
'Q227': 'AZ', # Azerbaijan
'Q217': 'MD', # Moldova
'Q229': 'CY', # Cyprus
'Q41': 'GR', # Greece
'Q43': 'TR', # Turkey
'Q221': 'MK', # North Macedonia
'Q222': 'AL', # Albania
'Q403': 'RS', # Serbia
'Q236': 'ME', # Montenegro
'Q23635': 'XK', # Kosovo
'Q347': 'LI', # Liechtenstein
'Q32': 'LU', # Luxembourg
'Q235': 'MC', # Monaco
'Q238': 'SM', # San Marino
'Q237': 'VA', # Vatican City
'Q228': 'AD', # Andorra
'Q233': 'MT', # Malta
'Q189': 'IS', # Iceland
'Q219060': 'PS', # Palestine
# Add more as needed
}
def extract_wikidata_ids(data: Dict[str, Any]) -> List[str]:
"""Extract all Wikidata IDs from custodian data."""
wikidata_ids = []
# Check identifiers array
if 'identifiers' in data:
for ident in data['identifiers']:
if ident.get('identifier_scheme') == 'Wikidata':
value = ident.get('identifier_value', '')
if value.startswith('Q'):
wikidata_ids.append(value)
# Check original_entry.identifiers
if 'original_entry' in data and 'identifiers' in data['original_entry']:
for ident in data['original_entry']['identifiers']:
if ident.get('identifier_scheme') == 'Wikidata':
value = ident.get('identifier_value', '')
if value.startswith('Q') and value not in wikidata_ids:
wikidata_ids.append(value)
# Check wikidata_enrichment
if 'wikidata_enrichment' in data:
wd_id = data['wikidata_enrichment'].get('wikidata_entity_id', '')
if wd_id.startswith('Q') and wd_id not in wikidata_ids:
wikidata_ids.append(wd_id)
return wikidata_ids
def query_wikidata_countries(wikidata_ids: List[str]) -> Dict[str, str]:
"""Query Wikidata for P17 (country) in batch."""
if not wikidata_ids:
return {}
values = ' '.join([f'wd:{qid}' for qid in wikidata_ids])
query = f"""
SELECT ?item ?country WHERE {{
VALUES ?item {{ {values} }}
?item wdt:P17 ?country.
}}
"""
url = "https://query.wikidata.org/sparql"
headers = {
'Accept': 'application/sparql-results+json',
'User-Agent': 'GLAM-Data-Project/1.0 (heritage institution research)'
}
data = urllib.parse.urlencode({'query': query}).encode('utf-8')
try:
request = urllib.request.Request(url, data=data, headers=headers)
with urllib.request.urlopen(request, timeout=60) as response:
result = json.loads(response.read().decode('utf-8'))
bindings = result.get('results', {}).get('bindings', [])
except Exception as e:
print(f" Wikidata SPARQL error: {e}")
return {}
country_map = {}
for row in bindings:
item_uri = row.get('item', {}).get('value', '')
country_uri = row.get('country', {}).get('value', '')
if item_uri and country_uri:
qid = item_uri.split('/')[-1]
country_qid = country_uri.split('/')[-1]
if country_qid in WIKIDATA_COUNTRY_TO_ISO:
country_map[qid] = WIKIDATA_COUNTRY_TO_ISO[country_qid]
return country_map
def update_custodian_file(filepath: Path, country_code: str, dry_run: bool = True) -> Tuple[bool, Optional[Path]]:
"""Update a custodian file with resolved country code."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f" Error reading {filepath}: {e}")
return False, None
if 'ghcid' not in data:
return False, None
ghcid = data['ghcid']
if 'location_resolution' not in ghcid:
ghcid['location_resolution'] = {}
loc_res = ghcid['location_resolution']
# Check if country code is XX
old_country = loc_res.get('country_code', 'XX')
if old_country != 'XX':
return False, None
# Update country code
loc_res['country_code'] = country_code
loc_res['method'] = 'WIKIDATA_P17'
loc_res['resolution_timestamp'] = datetime.now(timezone.utc).isoformat()
# Update GHCID string
old_ghcid = ghcid.get('ghcid_current', '')
new_ghcid = old_ghcid.replace('XX-XX-', f'{country_code}-XX-')
if new_ghcid != old_ghcid:
ghcid['ghcid_current'] = new_ghcid
# Add to history
if 'ghcid_history' not in ghcid:
ghcid['ghcid_history'] = []
ghcid['ghcid_history'].append({
'ghcid': new_ghcid,
'valid_from': datetime.now(timezone.utc).isoformat(),
'reason': f"Country resolved via Wikidata P17: XX→{country_code}"
})
# Add provenance note
if 'provenance' not in data:
data['provenance'] = {}
if 'notes' not in data['provenance']:
data['provenance']['notes'] = []
elif isinstance(data['provenance']['notes'], str):
data['provenance']['notes'] = [data['provenance']['notes']]
data['provenance']['notes'].append(
f"Country resolved {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}: "
f"XX→{country_code} via Wikidata P17"
)
# Determine new filename
old_filename = filepath.name
new_filename = old_filename.replace('XX-XX-', f'{country_code}-XX-')
new_filepath = filepath.parent / new_filename
if not dry_run:
# Write updated file
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Rename if needed
if new_filepath != filepath and not new_filepath.exists():
filepath.rename(new_filepath)
return True, new_filepath if new_filepath != filepath else None
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(
description='Resolve XX country codes using Wikidata P17 lookup'
)
parser.add_argument('--apply', action='store_true',
help='Actually apply the fixes (default: dry run)')
parser.add_argument('--path', type=str, default='data/custodian',
help='Path to custodian files directory')
parser.add_argument('--limit', type=int, default=100,
help='Limit number of files to process')
args = parser.parse_args()
custodian_dir = Path(args.path)
if not custodian_dir.exists():
print(f"Error: Directory {custodian_dir} does not exist")
sys.exit(1)
dry_run = not args.apply
print("=" * 70)
print("COUNTRY CODE RESOLUTION VIA WIKIDATA P17")
print("=" * 70)
print(f"Mode: {'DRY RUN' if dry_run else 'APPLYING CHANGES'}")
print()
# Find files with XX country code
files_to_process = list(custodian_dir.glob('XX-*.yaml'))[:args.limit]
print(f"Found {len(files_to_process)} files with XX country code")
print()
# Load files and extract Wikidata IDs
file_data = []
for filepath in files_to_process:
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
wikidata_ids = extract_wikidata_ids(data)
file_data.append({
'filepath': filepath,
'data': data,
'wikidata_ids': wikidata_ids
})
except Exception as e:
print(f"Error loading {filepath}: {e}")
print(f"Loaded {len(file_data)} files")
# Count files with Wikidata IDs
with_wikidata = [f for f in file_data if f['wikidata_ids']]
without_wikidata = [f for f in file_data if not f['wikidata_ids']]
print(f" With Wikidata IDs: {len(with_wikidata)}")
print(f" Without Wikidata IDs: {len(without_wikidata)}")
print()
# Query Wikidata for countries in batch
all_wikidata_ids = []
for f in with_wikidata:
all_wikidata_ids.extend(f['wikidata_ids'])
all_wikidata_ids = list(set(all_wikidata_ids))
print(f"Querying Wikidata for {len(all_wikidata_ids)} entities...")
# Batch in groups of 50
all_countries = {}
for i in range(0, len(all_wikidata_ids), 50):
batch = all_wikidata_ids[i:i+50]
countries = query_wikidata_countries(batch)
all_countries.update(countries)
if i + 50 < len(all_wikidata_ids):
import time
time.sleep(1) # Rate limiting
print(f" Retrieved country for {len(all_countries)} entities")
print()
# Process files
resolved = 0
renamed = 0
no_country = []
# First process files with Wikidata IDs
for f in with_wikidata:
filepath = f['filepath']
wikidata_ids = f['wikidata_ids']
# Find country from any Wikidata ID
country_code = None
for wid in wikidata_ids:
if wid in all_countries:
country_code = all_countries[wid]
break
if not country_code:
no_country.append(filepath.name)
continue
# Update file
success, new_path = update_custodian_file(filepath, country_code, dry_run=dry_run)
if success:
resolved += 1
if new_path:
renamed += 1
print(f" {filepath.name}{new_path.name}")
else:
print(f" Updated: {filepath.name}")
# Now process files without Wikidata IDs using source-based inference
source_resolved = 0
for f in without_wikidata:
filepath = f['filepath']
data = f['data']
# Try to infer country from source file
country_code = None
source = data.get('original_entry', {}).get('source', '')
# Czech source patterns
if 'czech' in source.lower() or 'cz_' in source.lower():
country_code = 'CZ'
# Austrian source patterns
elif 'austria' in source.lower() or 'at_' in source.lower():
country_code = 'AT'
# German source patterns
elif 'german' in source.lower() or 'de_' in source.lower():
country_code = 'DE'
# Swiss source patterns
elif 'swiss' in source.lower() or 'switzerland' in source.lower() or 'ch_' in source.lower():
country_code = 'CH'
# Belgian source patterns
elif 'belgium' in source.lower() or 'belgian' in source.lower() or 'be_' in source.lower():
country_code = 'BE'
# Dutch source patterns
elif 'dutch' in source.lower() or 'netherlands' in source.lower() or 'nl_' in source.lower():
country_code = 'NL'
# Japanese source patterns
elif 'japan' in source.lower() or 'jp_' in source.lower():
country_code = 'JP'
if country_code:
success, new_path = update_custodian_file(filepath, country_code, dry_run=dry_run)
if success:
source_resolved += 1
resolved += 1
if new_path:
renamed += 1
print(f" [source-inferred] {filepath.name}{new_path.name}")
else:
no_country.append(filepath.name)
print()
print("=" * 70)
print("SUMMARY")
print("=" * 70)
print(f"Files processed: {len(file_data)}")
print(f"With Wikidata IDs: {len(with_wikidata)}")
print(f"Source-inferred: {source_resolved}")
print(f"Resolved: {resolved}")
print(f"Renamed: {renamed}")
print(f"No country found: {len(no_country)}")
print(f"Without Wikidata IDs: {len(without_wikidata)}")
if no_country and len(no_country) <= 20:
print()
print("Files without country resolution:")
for name in no_country:
print(f" - {name}")
if dry_run:
print()
print("This was a DRY RUN. Use --apply to make changes.")
if __name__ == '__main__':
main()