524 lines
20 KiB
Python
524 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Normalize Custodian Files to Canonical Schema v1.0.0
|
|
|
|
This script adds a canonical `location` object to all custodian files,
|
|
extracting coordinates and address data from various enrichment sources
|
|
using a priority-based cascade.
|
|
|
|
The canonical `location` object is THE authoritative source for coordinates
|
|
and location data. After normalization, downstream tools should ONLY use
|
|
`location.latitude` and `location.longitude`.
|
|
|
|
Coordinate Source Priority (highest to lowest):
|
|
1. manual_location_override.coordinates - Human-verified overrides
|
|
2. google_maps_enrichment.coordinates - Google Maps API
|
|
3. wikidata_enrichment.wikidata_coordinates - Wikidata
|
|
4. wikidata_enrichment.coordinates - Legacy Wikidata format
|
|
5. ghcid.location_resolution.source_coordinates - GHCID resolution
|
|
6. ghcid.location_resolution (direct lat/lon) - GHCID resolution alt
|
|
7. original_entry.locations[0] - CH-Annotator source
|
|
8. locations[0] (root level) - Root level locations array
|
|
9. location (root level) - Existing location object
|
|
|
|
Usage:
|
|
python normalize_custodian_files.py --dry-run # Preview changes
|
|
python normalize_custodian_files.py # Apply changes
|
|
python normalize_custodian_files.py --file NL-ZH-SCH-A-GAS.yaml # Single file
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import yaml
|
|
|
|
# Preserve YAML formatting
|
|
class QuotedString(str):
|
|
pass
|
|
|
|
def quoted_str_representer(dumper, data):
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style="'")
|
|
|
|
yaml.add_representer(QuotedString, quoted_str_representer)
|
|
|
|
# Configuration
|
|
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
|
|
|
|
# Coordinate extraction priority (highest to lowest)
|
|
COORDINATE_SOURCES = [
|
|
("manual_location_override.coordinates", "MANUAL_OVERRIDE"),
|
|
("google_maps_enrichment.coordinates", "GOOGLE_MAPS"),
|
|
("wikidata_enrichment.wikidata_coordinates", "WIKIDATA"),
|
|
("wikidata_enrichment.coordinates", "WIKIDATA_LEGACY"),
|
|
("ghcid.location_resolution.source_coordinates", "GHCID_RESOLUTION"),
|
|
("ghcid.location_resolution", "GHCID_RESOLUTION_DIRECT"),
|
|
("original_entry.locations[0]", "ORIGINAL_ENTRY"),
|
|
("locations[0]", "ROOT_LOCATIONS"),
|
|
("location", "EXISTING_LOCATION"),
|
|
]
|
|
|
|
|
|
def get_nested(data: dict, path: str) -> Optional[Any]:
|
|
"""Get a nested value from a dict using dot notation with array support."""
|
|
parts = path.replace("[0]", ".0").split(".")
|
|
current = data
|
|
for part in parts:
|
|
if current is None:
|
|
return None
|
|
if part == "0":
|
|
if isinstance(current, list) and len(current) > 0:
|
|
current = current[0]
|
|
else:
|
|
return None
|
|
elif isinstance(current, dict):
|
|
current = current.get(part)
|
|
else:
|
|
return None
|
|
return current
|
|
|
|
|
|
def extract_coordinates(data: dict) -> tuple[Optional[float], Optional[float], Optional[dict]]:
|
|
"""
|
|
Extract coordinates from data using priority-based cascade.
|
|
|
|
Returns:
|
|
Tuple of (latitude, longitude, provenance_dict) or (None, None, None) if not found.
|
|
|
|
provenance_dict includes:
|
|
- source_type: e.g., "google_maps_api", "wikidata_api", "manual_override"
|
|
- source_path: The data path where coordinates were found
|
|
- original_timestamp: When the source data was fetched (if available)
|
|
- api_endpoint: The API endpoint used (if applicable)
|
|
- entity_id: Wikidata ID, Google place_id, etc. (if applicable)
|
|
"""
|
|
for path, source_name in COORDINATE_SOURCES:
|
|
value = get_nested(data, path)
|
|
if value is None:
|
|
continue
|
|
|
|
lat = None
|
|
lon = None
|
|
|
|
if isinstance(value, dict):
|
|
# Check for latitude/longitude keys
|
|
lat = value.get("latitude") or value.get("lat")
|
|
lon = value.get("longitude") or value.get("lon") or value.get("lng")
|
|
|
|
if lat is not None and lon is not None:
|
|
try:
|
|
lat_float = float(lat)
|
|
lon_float = float(lon)
|
|
# Validate coordinates are in valid ranges
|
|
if -90 <= lat_float <= 90 and -180 <= lon_float <= 180:
|
|
# Build rich provenance based on source
|
|
provenance = build_coordinate_provenance(data, path, source_name)
|
|
return lat_float, lon_float, provenance
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
return None, None, None
|
|
|
|
|
|
def build_coordinate_provenance(data: dict, path: str, source_name: str) -> dict:
|
|
"""
|
|
Build rich provenance metadata for coordinate extraction.
|
|
|
|
Returns a dictionary with:
|
|
- source_type: Normalized source type identifier
|
|
- source_path: The data path where coordinates were found
|
|
- original_timestamp: When the source data was originally fetched
|
|
- api_endpoint: API endpoint (if applicable)
|
|
- entity_id: External identifier (Wikidata Q-ID, Google place_id, etc.)
|
|
"""
|
|
provenance = {
|
|
"source_type": source_name,
|
|
"source_path": path,
|
|
}
|
|
|
|
if source_name == "GOOGLE_MAPS":
|
|
gm = data.get("google_maps_enrichment", {})
|
|
# Get timestamp and place_id directly from enrichment block first
|
|
provenance["original_timestamp"] = gm.get("fetch_timestamp")
|
|
provenance["entity_id"] = gm.get("place_id")
|
|
# Try to get api_endpoint from provenance.sources.google_maps
|
|
gm_sources = get_nested(data, "provenance.sources.google_maps")
|
|
if isinstance(gm_sources, list) and len(gm_sources) > 0:
|
|
gm_source = gm_sources[0]
|
|
provenance["api_endpoint"] = gm_source.get("api_endpoint")
|
|
# Backfill if not found in enrichment block
|
|
if not provenance.get("original_timestamp"):
|
|
provenance["original_timestamp"] = gm_source.get("fetch_timestamp")
|
|
if not provenance.get("entity_id"):
|
|
provenance["entity_id"] = gm_source.get("place_id")
|
|
|
|
elif source_name in ("WIKIDATA", "WIKIDATA_LEGACY"):
|
|
wd = data.get("wikidata_enrichment", {})
|
|
# Try api_metadata first (newer format)
|
|
api_meta = wd.get("api_metadata", {})
|
|
if api_meta:
|
|
provenance["original_timestamp"] = api_meta.get("fetch_timestamp")
|
|
provenance["api_endpoint"] = api_meta.get("api_endpoint")
|
|
# Get entity_id from wikidata_entity_id or wikidata_id
|
|
provenance["entity_id"] = (
|
|
wd.get("wikidata_entity_id") or
|
|
wd.get("wikidata_id") or
|
|
get_nested(data, "original_entry.wikidata_id")
|
|
)
|
|
# Fallback to provenance.sources.wikidata if api_metadata not found
|
|
if not provenance.get("original_timestamp"):
|
|
wd_sources = get_nested(data, "provenance.sources.wikidata")
|
|
if isinstance(wd_sources, list) and len(wd_sources) > 0:
|
|
wd_source = wd_sources[0]
|
|
provenance["original_timestamp"] = wd_source.get("fetch_timestamp")
|
|
provenance["api_endpoint"] = wd_source.get("api_endpoint")
|
|
if not provenance.get("entity_id"):
|
|
provenance["entity_id"] = wd_source.get("entity_id")
|
|
|
|
elif source_name == "MANUAL_OVERRIDE":
|
|
override = data.get("manual_location_override", {})
|
|
provenance["original_timestamp"] = override.get("override_timestamp")
|
|
provenance["override_reason"] = override.get("reason")
|
|
provenance["override_by"] = override.get("verified_by")
|
|
|
|
elif source_name in ("GHCID_RESOLUTION", "GHCID_RESOLUTION_DIRECT"):
|
|
ghcid_loc = get_nested(data, "ghcid.location_resolution")
|
|
if isinstance(ghcid_loc, dict):
|
|
provenance["original_timestamp"] = ghcid_loc.get("resolution_timestamp")
|
|
provenance["entity_id"] = ghcid_loc.get("geonames_id")
|
|
provenance["resolution_method"] = ghcid_loc.get("method")
|
|
|
|
elif source_name == "ORIGINAL_ENTRY":
|
|
# Try to get source info from provenance.sources.original_entry
|
|
oe_sources = get_nested(data, "provenance.sources.original_entry")
|
|
if isinstance(oe_sources, list) and len(oe_sources) > 0:
|
|
oe_source = oe_sources[0]
|
|
provenance["source_type"] = oe_source.get("source_type", source_name)
|
|
provenance["data_tier"] = oe_source.get("data_tier")
|
|
|
|
# Clean up None values
|
|
return {k: v for k, v in provenance.items() if v is not None}
|
|
|
|
|
|
def extract_address_components(data: dict) -> dict:
|
|
"""
|
|
Extract address components from various sources.
|
|
|
|
Returns:
|
|
Dictionary with address components (city, region, country, etc.)
|
|
"""
|
|
address = {}
|
|
|
|
# Try Google Maps first (most detailed)
|
|
gm = data.get("google_maps_enrichment", {})
|
|
if gm:
|
|
address["formatted_address"] = gm.get("formatted_address")
|
|
address["street_address"] = gm.get("short_address")
|
|
|
|
# Extract from address_components
|
|
components = gm.get("address_components", [])
|
|
for comp in components:
|
|
types = comp.get("types", [])
|
|
if "locality" in types:
|
|
address["city"] = comp.get("long_name")
|
|
elif "administrative_area_level_1" in types:
|
|
address["region"] = comp.get("long_name")
|
|
address["region_code"] = comp.get("short_name")
|
|
elif "country" in types:
|
|
address["country"] = comp.get("short_name")
|
|
elif "postal_code" in types:
|
|
address["postal_code"] = comp.get("long_name")
|
|
|
|
# Fallback to GHCID location resolution
|
|
ghcid_loc = get_nested(data, "ghcid.location_resolution")
|
|
if ghcid_loc and isinstance(ghcid_loc, dict):
|
|
if not address.get("city"):
|
|
address["city"] = ghcid_loc.get("city_name") or ghcid_loc.get("geonames_name") or ghcid_loc.get("google_maps_locality")
|
|
if not address.get("region"):
|
|
address["region"] = ghcid_loc.get("region_name")
|
|
if not address.get("region_code"):
|
|
address["region_code"] = ghcid_loc.get("region_code")
|
|
if not address.get("country"):
|
|
address["country"] = ghcid_loc.get("country_code")
|
|
if not address.get("geonames_id"):
|
|
address["geonames_id"] = ghcid_loc.get("geonames_id")
|
|
if not address.get("geonames_name"):
|
|
address["geonames_name"] = ghcid_loc.get("geonames_name")
|
|
if not address.get("feature_code"):
|
|
address["feature_code"] = ghcid_loc.get("feature_code")
|
|
|
|
# Fallback to original_entry.locations
|
|
orig_loc = get_nested(data, "original_entry.locations[0]")
|
|
if orig_loc and isinstance(orig_loc, dict):
|
|
if not address.get("city"):
|
|
address["city"] = orig_loc.get("city")
|
|
if not address.get("region"):
|
|
address["region"] = orig_loc.get("region")
|
|
if not address.get("country"):
|
|
address["country"] = orig_loc.get("country")
|
|
if not address.get("postal_code"):
|
|
address["postal_code"] = orig_loc.get("postal_code")
|
|
if not address.get("street_address"):
|
|
address["street_address"] = orig_loc.get("street_address")
|
|
|
|
# Fallback to root-level locations
|
|
root_loc = get_nested(data, "locations[0]")
|
|
if root_loc and isinstance(root_loc, dict):
|
|
if not address.get("city"):
|
|
address["city"] = root_loc.get("city")
|
|
if not address.get("region"):
|
|
address["region"] = root_loc.get("region")
|
|
if not address.get("country"):
|
|
address["country"] = root_loc.get("country")
|
|
|
|
# Fallback to original_entry plaatsnaam (NDE files)
|
|
orig_entry = data.get("original_entry", {})
|
|
if not address.get("city"):
|
|
address["city"] = orig_entry.get("plaatsnaam_bezoekadres")
|
|
|
|
# Clean up None values
|
|
return {k: v for k, v in address.items() if v is not None}
|
|
|
|
|
|
def create_canonical_location(data: dict) -> Optional[dict]:
|
|
"""
|
|
Create a canonical location object from all available sources.
|
|
|
|
Returns:
|
|
Canonical location dictionary or None if no coordinates found.
|
|
|
|
The location object includes:
|
|
- latitude/longitude: The coordinates
|
|
- coordinate_provenance: Rich provenance for coordinates (source, timestamp, entity_id, etc.)
|
|
- city, region, country: Address components
|
|
- geonames_id, geonames_name: GeoNames reference
|
|
- normalization_timestamp: When this canonical location was created
|
|
"""
|
|
lat, lon, coord_provenance = extract_coordinates(data)
|
|
address = extract_address_components(data)
|
|
|
|
# Build canonical location object
|
|
location = {}
|
|
|
|
# Coordinates (if available)
|
|
if lat is not None and lon is not None:
|
|
location["latitude"] = lat
|
|
location["longitude"] = lon
|
|
# Rich provenance for coordinates
|
|
if coord_provenance:
|
|
location["coordinate_provenance"] = coord_provenance
|
|
|
|
# Address components
|
|
if address.get("city"):
|
|
location["city"] = address["city"]
|
|
if address.get("region"):
|
|
location["region"] = address["region"]
|
|
if address.get("region_code"):
|
|
location["region_code"] = address["region_code"]
|
|
if address.get("country"):
|
|
location["country"] = address["country"]
|
|
if address.get("postal_code"):
|
|
location["postal_code"] = address["postal_code"]
|
|
if address.get("street_address"):
|
|
location["street_address"] = address["street_address"]
|
|
if address.get("formatted_address"):
|
|
location["formatted_address"] = address["formatted_address"]
|
|
|
|
# GeoNames reference
|
|
if address.get("geonames_id"):
|
|
location["geonames_id"] = address["geonames_id"]
|
|
if address.get("geonames_name"):
|
|
location["geonames_name"] = address["geonames_name"]
|
|
if address.get("feature_code"):
|
|
location["feature_code"] = address["feature_code"]
|
|
|
|
# Normalization timestamp (when this canonical location was created)
|
|
if location:
|
|
location["normalization_timestamp"] = datetime.now(timezone.utc).isoformat()
|
|
|
|
return location if location else None
|
|
|
|
|
|
def normalize_file(filepath: Path, dry_run: bool = False) -> dict:
|
|
"""
|
|
Normalize a single custodian file.
|
|
|
|
Returns:
|
|
Dictionary with normalization results:
|
|
- success: bool
|
|
- changed: bool
|
|
- has_coordinates: bool
|
|
- coordinate_source: str or None (source_type from provenance)
|
|
- error: str or None
|
|
"""
|
|
result = {
|
|
"success": False,
|
|
"changed": False,
|
|
"has_coordinates": False,
|
|
"coordinate_source": None,
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not isinstance(data, dict):
|
|
result["error"] = "File is not a valid YAML dictionary"
|
|
return result
|
|
|
|
# Create canonical location
|
|
canonical_location = create_canonical_location(data)
|
|
|
|
if canonical_location:
|
|
result["has_coordinates"] = "latitude" in canonical_location
|
|
# Extract source_type from coordinate_provenance for stats
|
|
coord_prov = canonical_location.get("coordinate_provenance", {})
|
|
result["coordinate_source"] = coord_prov.get("source_type")
|
|
|
|
# Check if location already exists and is identical
|
|
existing_location = data.get("location")
|
|
if existing_location == canonical_location:
|
|
result["success"] = True
|
|
result["changed"] = False
|
|
return result
|
|
|
|
# Update the data
|
|
data["location"] = canonical_location
|
|
result["changed"] = True
|
|
|
|
# Add normalization provenance
|
|
if "provenance" not in data:
|
|
data["provenance"] = {}
|
|
if "notes" not in data["provenance"]:
|
|
data["provenance"]["notes"] = []
|
|
|
|
normalization_note = f"Canonical location added via normalize_custodian_files.py on {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')}"
|
|
if normalization_note not in data["provenance"]["notes"]:
|
|
data["provenance"]["notes"].append(normalization_note)
|
|
|
|
if not dry_run:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False, width=120)
|
|
|
|
result["success"] = True
|
|
return result
|
|
|
|
except Exception as e:
|
|
result["error"] = str(e)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Normalize custodian files to canonical schema v1.0.0"
|
|
)
|
|
parser.add_argument(
|
|
"--dry-run",
|
|
action="store_true",
|
|
help="Preview changes without writing to files"
|
|
)
|
|
parser.add_argument(
|
|
"--file",
|
|
type=str,
|
|
help="Process a single file (filename only, e.g., NL-ZH-SCH-A-GAS.yaml)"
|
|
)
|
|
parser.add_argument(
|
|
"--limit",
|
|
type=int,
|
|
default=0,
|
|
help="Limit processing to N files (0 = no limit)"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Show detailed output for each file"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.dry_run:
|
|
print("DRY RUN - No files will be modified\n")
|
|
|
|
# Get list of files to process
|
|
if args.file:
|
|
files = [CUSTODIAN_DIR / args.file]
|
|
if not files[0].exists():
|
|
print(f"Error: File not found: {files[0]}")
|
|
sys.exit(1)
|
|
else:
|
|
files = sorted(CUSTODIAN_DIR.glob("*.yaml"))
|
|
|
|
if args.limit > 0:
|
|
files = files[:args.limit]
|
|
|
|
print(f"Processing {len(files)} files...\n")
|
|
|
|
# Statistics
|
|
stats = {
|
|
"total": len(files),
|
|
"success": 0,
|
|
"changed": 0,
|
|
"with_coordinates": 0,
|
|
"without_coordinates": 0,
|
|
"errors": 0,
|
|
"by_source": {},
|
|
}
|
|
|
|
errors = []
|
|
|
|
for i, filepath in enumerate(files):
|
|
result = normalize_file(filepath, dry_run=args.dry_run)
|
|
|
|
if result["success"]:
|
|
stats["success"] += 1
|
|
if result["changed"]:
|
|
stats["changed"] += 1
|
|
if result["has_coordinates"]:
|
|
stats["with_coordinates"] += 1
|
|
source = result["coordinate_source"]
|
|
stats["by_source"][source] = stats["by_source"].get(source, 0) + 1
|
|
else:
|
|
stats["without_coordinates"] += 1
|
|
else:
|
|
stats["errors"] += 1
|
|
errors.append((filepath.name, result["error"]))
|
|
|
|
if args.verbose:
|
|
status = "OK" if result["success"] else "ERROR"
|
|
changed = " [CHANGED]" if result["changed"] else ""
|
|
coords = f" coords={result['coordinate_source']}" if result["has_coordinates"] else " NO_COORDS"
|
|
print(f"[{i+1}/{len(files)}] {filepath.name}: {status}{changed}{coords}")
|
|
elif (i + 1) % 1000 == 0:
|
|
print(f"Processed {i+1}/{len(files)} files...")
|
|
|
|
# Print summary
|
|
print("\n" + "=" * 60)
|
|
print("NORMALIZATION SUMMARY")
|
|
print("=" * 60)
|
|
print(f"Total files: {stats['total']}")
|
|
print(f"Successfully processed: {stats['success']}")
|
|
print(f"Files changed: {stats['changed']}")
|
|
print(f"With coordinates: {stats['with_coordinates']} ({100*stats['with_coordinates']/stats['total']:.1f}%)")
|
|
print(f"Without coordinates: {stats['without_coordinates']} ({100*stats['without_coordinates']/stats['total']:.1f}%)")
|
|
print(f"Errors: {stats['errors']}")
|
|
|
|
if stats["by_source"]:
|
|
print("\nCoordinate sources:")
|
|
for source, count in sorted(stats["by_source"].items(), key=lambda x: -x[1]):
|
|
print(f" {source}: {count}")
|
|
|
|
if errors:
|
|
print(f"\nFirst {min(10, len(errors))} errors:")
|
|
for filename, error in errors[:10]:
|
|
print(f" {filename}: {error}")
|
|
|
|
if args.dry_run:
|
|
print("\n(DRY RUN - No files were modified)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|