#!/usr/bin/env python3 """ Migrate geocoded coordinate provenance to Rule 35 format. Updates coordinate_provenance fields to include: - source_url (reconstructed from geocode_query) - statement_created_at (from geocode_timestamp) - source_archived_at (same as statement_created_at for API calls) - retrieval_agent """ import argparse import logging from pathlib import Path from urllib.parse import quote_plus import yaml logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger(__name__) def migrate_file(filepath: Path, dry_run: bool = False) -> bool: """Migrate a single file's coordinate provenance.""" with open(filepath, "r", encoding="utf-8") as f: data = yaml.safe_load(f) if not data: return False location = data.get("location") or {} provenance = location.get("coordinate_provenance") or {} # Check if this is a Nominatim geocode that needs migration if provenance.get("source_type") != "NOMINATIM_GEOCODE": return False # Check if already migrated (has statement_created_at) if provenance.get("statement_created_at"): return False # Get the old timestamp old_timestamp = provenance.get("geocode_timestamp") if not old_timestamp: return False # Build the source URL geocode_query = provenance.get("geocode_query", "") encoded_query = quote_plus(geocode_query) source_url = f"https://nominatim.openstreetmap.org/search?q={encoded_query}&format=json" logger.info(f"Migrating {filepath.name}: {geocode_query}") if not dry_run: # Create new provenance structure new_provenance = { # Core identification "source_type": "NOMINATIM_GEOCODE", "source_url": source_url, # Dual timestamps (Rule 35) "statement_created_at": old_timestamp, "source_archived_at": old_timestamp, # API response is ephemeral # Retrieval agent "retrieval_agent": "geocode_missing_coordinates.py", # Geocoding specifics (preserve existing) "geocode_query": geocode_query, "osm_id": provenance.get("osm_id"), "osm_type": provenance.get("osm_type"), "display_name": provenance.get("display_name"), # Confidence and validation "geocode_confidence": provenance.get("geocode_confidence"), "city_match": provenance.get("city_match"), "result_city": provenance.get("result_city"), } # Remove None values new_provenance = {k: v for k, v in new_provenance.items() if v is not None} # Update the data data["location"]["coordinate_provenance"] = new_provenance # Write back with open(filepath, "w", encoding="utf-8") as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return True def main(): parser = argparse.ArgumentParser(description="Migrate geocode provenance to Rule 35 format") parser.add_argument("--data-dir", type=Path, default=Path("data/custodian"), help="Directory containing custodian YAML files") parser.add_argument("--pattern", type=str, default="*.yaml", help="Glob pattern for files to process") parser.add_argument("--dry-run", action="store_true", help="Don't write changes, just show what would be done") args = parser.parse_args() files = list(args.data_dir.glob(args.pattern)) logger.info(f"Found {len(files)} files matching {args.pattern}") migrated = 0 skipped = 0 for filepath in files: try: if migrate_file(filepath, args.dry_run): migrated += 1 else: skipped += 1 except Exception as e: logger.warning(f"Error processing {filepath.name}: {e}") skipped += 1 logger.info("=" * 60) logger.info(f"Migration complete:") logger.info(f" Files migrated: {migrated}") logger.info(f" Files skipped: {skipped}") if __name__ == "__main__": main()