Enrichment scripts for country-specific city data: - enrich_austrian_cities.py, enrich_belgian_cities.py, enrich_belgian_v2.py - enrich_bulgarian_cities.py, enrich_czech_cities.py, enrich_czech_cities_fast.py - enrich_japanese_cities.py, enrich_swiss_isil_cities.py, enrich_cities_google.py Location resolution utilities: - resolve_cities_from_file_coords.py - Resolve cities using coordinates in filenames - resolve_cities_wikidata.py - Use Wikidata P131 for city resolution - resolve_country_codes.py - Standardize country codes - resolve_cz_xx_regions.py - Fix Czech XX region codes - resolve_locations_by_name.py - Name-based location lookup - resolve_regions_from_city.py - Derive regions from city data - update_ghcid_with_geonames.py - Update GHCIDs with GeoNames data CH-Annotator integration: - create_custodian_from_ch_annotator.py - Create custodians from annotations - add_ch_annotator_location_claims.py - Add location claims - extract_locations_ch_annotator.py - Extract locations from annotations Migration and fixes: - migrate_egyptian_from_ch.py - Migrate Egyptian data - migrate_web_archives.py - Migrate web archive data - fix_belgian_cities.py - Fix Belgian city data
203 lines
7 KiB
Python
203 lines
7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Add CH-Annotator compliant location claims to recently resolved Czech institution files.
|
|
|
|
This script adds location claims (city, region, country, geonames_id) to the
|
|
ch_annotator.entity_claims array with proper 5-component provenance:
|
|
1. namespace (geonames)
|
|
2. path (xpath-style path to GeoNames resource)
|
|
3. timestamp (ISO 8601)
|
|
4. agent (opencode-claude-sonnet-4)
|
|
5. context_convention (ch_annotator-v1_7_0)
|
|
|
|
Per AGENTS.md Rule 5: Additive only - never delete existing data.
|
|
Per AGENTS.md Rule 10: CH-Annotator is the entity annotation convention.
|
|
"""
|
|
|
|
import os
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
RESEARCH_DATE = "2025-12-07"
|
|
|
|
|
|
def find_resolved_files():
|
|
"""Find all files resolved on the specified research date."""
|
|
resolved_files = []
|
|
|
|
for yaml_file in CUSTODIAN_DIR.glob("CZ-*.yaml"):
|
|
try:
|
|
with open(yaml_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
if f"research_date: '{RESEARCH_DATE}'" in content:
|
|
resolved_files.append(yaml_file)
|
|
except Exception as e:
|
|
print(f"Error reading {yaml_file}: {e}")
|
|
|
|
return sorted(resolved_files)
|
|
|
|
|
|
def add_location_claims(yaml_file: Path) -> bool:
|
|
"""
|
|
Add CH-Annotator location claims to a custodian file.
|
|
|
|
Returns True if claims were added, False if already present or error.
|
|
"""
|
|
try:
|
|
with open(yaml_file, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not data:
|
|
print(f" SKIP: Empty file {yaml_file.name}")
|
|
return False
|
|
|
|
# Get location data from ghcid.location_resolution
|
|
location_resolution = data.get('ghcid', {}).get('location_resolution', {})
|
|
location = data.get('location', {})
|
|
|
|
if not location_resolution.get('geonames_id'):
|
|
print(f" SKIP: No GeoNames ID in {yaml_file.name}")
|
|
return False
|
|
|
|
# Extract location values
|
|
city_name = location_resolution.get('city_name') or location.get('city')
|
|
region_name = location_resolution.get('region_name') or location.get('region')
|
|
country_code = location_resolution.get('country_code') or location.get('country')
|
|
geonames_id = location_resolution.get('geonames_id') or location.get('geonames_id')
|
|
resolution_timestamp = location_resolution.get('resolution_timestamp')
|
|
|
|
if not all([city_name, country_code, geonames_id]):
|
|
print(f" SKIP: Missing required location data in {yaml_file.name}")
|
|
return False
|
|
|
|
# Ensure ch_annotator.entity_claims exists
|
|
if 'ch_annotator' not in data:
|
|
data['ch_annotator'] = {}
|
|
if 'entity_claims' not in data['ch_annotator']:
|
|
data['ch_annotator']['entity_claims'] = []
|
|
|
|
entity_claims = data['ch_annotator']['entity_claims']
|
|
|
|
# Check if location claims already exist
|
|
existing_claim_types = {c.get('claim_type') for c in entity_claims if c}
|
|
if 'location_city' in existing_claim_types:
|
|
print(f" SKIP: Location claims already exist in {yaml_file.name}")
|
|
return False
|
|
|
|
# Create timestamp for provenance
|
|
timestamp = resolution_timestamp or datetime.now(timezone.utc).isoformat()
|
|
|
|
# Common provenance structure
|
|
def make_provenance(path_suffix: str):
|
|
return {
|
|
'namespace': 'geonames',
|
|
'path': f'/cities/{geonames_id}{path_suffix}',
|
|
'timestamp': timestamp,
|
|
'agent': 'glm4.6', # Z.AI GLM 4.6 - preferred model
|
|
'context_convention': 'ch_annotator-v1_7_0'
|
|
}
|
|
|
|
# Add location_city claim
|
|
entity_claims.append({
|
|
'claim_type': 'location_city',
|
|
'claim_value': city_name,
|
|
'property_uri': 'schema:addressLocality',
|
|
'provenance': make_provenance('/name'),
|
|
'confidence': 0.95,
|
|
'resolution_method': 'GEONAMES_RESEARCH'
|
|
})
|
|
|
|
# Add location_region claim (if available)
|
|
if region_name:
|
|
entity_claims.append({
|
|
'claim_type': 'location_region',
|
|
'claim_value': region_name,
|
|
'property_uri': 'schema:addressRegion',
|
|
'provenance': make_provenance('/admin1'),
|
|
'confidence': 0.95,
|
|
'resolution_method': 'GEONAMES_RESEARCH'
|
|
})
|
|
|
|
# Add location_country claim
|
|
entity_claims.append({
|
|
'claim_type': 'location_country',
|
|
'claim_value': country_code,
|
|
'property_uri': 'schema:addressCountry',
|
|
'provenance': make_provenance('/country'),
|
|
'confidence': 0.98,
|
|
'resolution_method': 'GEONAMES_RESEARCH'
|
|
})
|
|
|
|
# Add geonames_id claim
|
|
entity_claims.append({
|
|
'claim_type': 'geonames_id',
|
|
'claim_value': str(geonames_id),
|
|
'property_uri': 'gn:geonamesId',
|
|
'provenance': make_provenance(''),
|
|
'confidence': 0.98,
|
|
'resolution_method': 'GEONAMES_RESEARCH'
|
|
})
|
|
|
|
# Write back to file
|
|
with open(yaml_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
print(f" ADDED: 4 location claims to {yaml_file.name}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f" ERROR: {yaml_file.name}: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
print("=" * 70)
|
|
print("CH-Annotator Location Claims Addition Script")
|
|
print("=" * 70)
|
|
print(f"Looking for files resolved on: {RESEARCH_DATE}")
|
|
print()
|
|
|
|
# Find resolved files
|
|
resolved_files = find_resolved_files()
|
|
print(f"Found {len(resolved_files)} resolved files")
|
|
print()
|
|
|
|
# Process each file
|
|
added_count = 0
|
|
skipped_count = 0
|
|
error_count = 0
|
|
|
|
for yaml_file in resolved_files:
|
|
result = add_location_claims(yaml_file)
|
|
if result:
|
|
added_count += 1
|
|
elif result is False:
|
|
skipped_count += 1
|
|
else:
|
|
error_count += 1
|
|
|
|
# Summary
|
|
print()
|
|
print("=" * 70)
|
|
print("SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Files processed: {len(resolved_files)}")
|
|
print(f"Claims added: {added_count}")
|
|
print(f"Skipped: {skipped_count}")
|
|
print(f"Errors: {error_count}")
|
|
print()
|
|
|
|
if added_count > 0:
|
|
print("CH-Annotator location claims added successfully!")
|
|
print("Each file now has 4 new claims:")
|
|
print(" - location_city (schema:addressLocality)")
|
|
print(" - location_region (schema:addressRegion)")
|
|
print(" - location_country (schema:addressCountry)")
|
|
print(" - geonames_id (gn:geonamesId)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|