#!/usr/bin/env python3 """ Phase 1 Migration Script: ISIL/CSV Sources → Proper Dual Timestamp Provenance Migrates custodian files from: - agent: claude-conversation - single timestamp field To: - agent: batch-script-create-custodian-from-ch-annotator - source_archived_at: (from existing timestamp) - statement_created_at: (from annotation_date) This script handles Category 1 files (ISIL registries, CSV sources) which have path patterns like `/files/*.yaml` or `/files/*.csv`. Usage: # Dry run (default) - shows what would be changed python scripts/migrate_provenance_phase1.py # Actually apply changes python scripts/migrate_provenance_phase1.py --apply # Process specific files python scripts/migrate_provenance_phase1.py --apply --file data/custodian/JP-20-SHI-L-S-shiojiritankakan.yaml # Process only files from specific source python scripts/migrate_provenance_phase1.py --apply --source japan_complete References: - Rule 35: .opencode/PROVENANCE_TIMESTAMP_RULES.md - Migration Spec: .opencode/CLAUDE_CONVERSATION_MIGRATION_SPEC.md """ import argparse import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional import yaml # YAML configuration to preserve formatting class PreservingDumper(yaml.SafeDumper): """Custom YAML dumper that preserves order and handles special types.""" pass def str_representer(dumper, data): """Handle multiline strings and special characters.""" if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) PreservingDumper.add_representer(str, str_representer) # Constants INVALID_AGENTS = {'claude-conversation', 'claude', 'ai', 'opencode', 'llm'} CATEGORY_1_PATH_PATTERN = re.compile(r'^/files/.*\.(yaml|csv)$') # Source type mapping based on path SOURCE_TYPE_MAP = { 'japan_complete': 'isil_registry_csv', 'czech_unified': 'library_registry_api', 'switzerland_isil': 'isil_registry_csv', 'austria_complete': 'isil_registry_csv', 'belarus_complete': 'isil_registry_csv', 'argentina_complete': 'isil_registry_csv', 'bulgaria_complete': 'isil_registry_csv', 'belgium_complete': 'isil_registry_csv', } # Agent mapping based on source type AGENT_MAP = { 'isil_registry_csv': 'batch-script-create-custodian-from-ch-annotator', 'library_registry_api': 'batch-script-create-custodian-from-ch-annotator', } def load_yaml_file(filepath: Path) -> Optional[dict]: """Load a YAML file safely.""" try: with open(filepath, 'r', encoding='utf-8') as f: return yaml.safe_load(f) except Exception as e: print(f" ERROR loading {filepath}: {e}", file=sys.stderr) return None def save_yaml_file(filepath: Path, data: dict) -> bool: """Save data to YAML file with proper formatting.""" try: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=PreservingDumper, default_flow_style=False, allow_unicode=True, sort_keys=False, width=120) return True except Exception as e: print(f" ERROR saving {filepath}: {e}", file=sys.stderr) return False def is_category_1_file(data: dict) -> bool: """Check if this file is a Category 1 (ISIL/CSV source) file.""" ch_annotator = data.get('ch_annotator', {}) extraction_prov = ch_annotator.get('extraction_provenance', {}) path = extraction_prov.get('path', '') return bool(CATEGORY_1_PATH_PATTERN.match(path)) def get_source_type(path: str) -> str: """Determine source type from path.""" for key, source_type in SOURCE_TYPE_MAP.items(): if key in path: return source_type return 'unknown_csv_source' def get_new_agent(source_type: str) -> str: """Get the appropriate agent identifier for a source type.""" return AGENT_MAP.get(source_type, 'batch-script-create-custodian-from-ch-annotator') def migrate_provenance_block(prov: dict, annotation_date: Optional[str], source_type: str, migration_timestamp: str) -> dict: """ Migrate a single provenance block to dual timestamp format. Args: prov: The provenance dict to migrate annotation_date: The annotation_date from annotation_provenance (for statement_created_at) source_type: The determined source type migration_timestamp: When this migration is happening Returns: Updated provenance dict """ # Get existing timestamp existing_timestamp = prov.get('timestamp') existing_agent = prov.get('agent', '') # Skip if already migrated (has dual timestamps) if 'source_archived_at' in prov and 'statement_created_at' in prov: return prov # Skip if agent is not invalid if existing_agent not in INVALID_AGENTS: return prov # Create new provenance structure new_prov = dict(prov) # Preserve existing fields # Add dual timestamps if existing_timestamp: new_prov['source_archived_at'] = existing_timestamp if annotation_date: new_prov['statement_created_at'] = annotation_date else: # Fall back to existing timestamp if no annotation_date new_prov['statement_created_at'] = existing_timestamp or migration_timestamp # Update agent new_prov['agent'] = get_new_agent(source_type) # Add source type new_prov['source_type'] = source_type # Add migration note new_prov['migration_note'] = f'Migrated from agent:claude-conversation on {migration_timestamp[:10]}' # Remove old timestamp field (now split into dual timestamps) if 'timestamp' in new_prov and 'source_archived_at' in new_prov: del new_prov['timestamp'] return new_prov def migrate_file(filepath: Path, apply: bool = False, migration_timestamp: Optional[str] = None) -> dict: """ Migrate a single custodian file to dual timestamp provenance. Args: filepath: Path to the YAML file apply: If True, actually write changes; if False, just report migration_timestamp: ISO timestamp for migration tracking Returns: dict with migration results: { 'migrated': bool, 'reason': str, 'changes': list of change descriptions } """ if migration_timestamp is None: migration_timestamp = datetime.now(timezone.utc).isoformat() result = { 'migrated': False, 'reason': '', 'changes': [], 'filepath': str(filepath) } # Load file data = load_yaml_file(filepath) if data is None: result['reason'] = 'Failed to load YAML' return result # Check if Category 1 if not is_category_1_file(data): result['reason'] = 'Not a Category 1 file (ISIL/CSV source)' return result ch_annotator = data.get('ch_annotator', {}) extraction_prov = ch_annotator.get('extraction_provenance', {}) annotation_prov = ch_annotator.get('annotation_provenance', {}) entity_claims = ch_annotator.get('entity_claims', []) # Check if migration needed current_agent = extraction_prov.get('agent', '') if current_agent not in INVALID_AGENTS: result['reason'] = f'Agent already valid: {current_agent}' return result # Get metadata for migration path = extraction_prov.get('path', '') source_type = get_source_type(path) annotation_date = annotation_prov.get('annotation_date') changes = [] # Migrate extraction_provenance new_extraction_prov = migrate_provenance_block( extraction_prov, annotation_date, source_type, migration_timestamp ) if new_extraction_prov != extraction_prov: ch_annotator['extraction_provenance'] = new_extraction_prov changes.append(f'extraction_provenance: agent={new_extraction_prov.get("agent")}') # Migrate each entity_claim provenance for i, claim in enumerate(entity_claims): claim_prov = claim.get('provenance', {}) if claim_prov: new_claim_prov = migrate_provenance_block( claim_prov, annotation_date, source_type, migration_timestamp ) if new_claim_prov != claim_prov: claim['provenance'] = new_claim_prov changes.append(f'entity_claims[{i}].provenance migrated') if not changes: result['reason'] = 'No changes needed' return result result['changes'] = changes # Apply changes if requested if apply: if save_yaml_file(filepath, data): result['migrated'] = True result['reason'] = f'Successfully migrated ({len(changes)} changes)' else: result['reason'] = 'Failed to save file' else: result['migrated'] = False result['reason'] = f'Dry run - would apply {len(changes)} changes' return result def find_custodian_files(base_dir: Path, source_filter: Optional[str] = None) -> list[Path]: """Find all custodian YAML files, optionally filtered by source.""" files = [] for filepath in base_dir.glob('*.yaml'): if filepath.is_file(): if source_filter: # Quick check if file contains the source filter string try: content = filepath.read_text(encoding='utf-8') if source_filter not in content: continue except: continue files.append(filepath) return files def main(): parser = argparse.ArgumentParser( description='Migrate custodian files to dual timestamp provenance (Phase 1: ISIL/CSV sources)' ) parser.add_argument( '--apply', action='store_true', help='Actually apply changes (default is dry run)' ) parser.add_argument( '--file', type=str, help='Migrate a specific file instead of all files' ) parser.add_argument( '--source', type=str, help='Only process files from a specific source (e.g., japan_complete, czech_unified)' ) parser.add_argument( '--limit', type=int, default=None, help='Limit number of files to process (for testing)' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Show detailed output for each file' ) args = parser.parse_args() # Set up paths project_root = Path(__file__).parent.parent custodian_dir = project_root / 'data' / 'custodian' migration_timestamp = datetime.now(timezone.utc).isoformat() print(f"=== Phase 1 Migration: ISIL/CSV Sources ===") print(f"Mode: {'APPLY CHANGES' if args.apply else 'DRY RUN'}") print(f"Migration timestamp: {migration_timestamp}") print() # Determine files to process if args.file: files = [Path(args.file)] print(f"Processing single file: {args.file}") else: print(f"Scanning {custodian_dir}...") files = find_custodian_files(custodian_dir, args.source) print(f"Found {len(files)} files") if args.source: print(f"Filtered by source: {args.source}") if args.limit: files = files[:args.limit] print(f"Limited to first {args.limit} files") print() # Process files stats = { 'processed': 0, 'migrated': 0, 'skipped_not_cat1': 0, 'skipped_already_valid': 0, 'errors': 0, } for i, filepath in enumerate(files): if args.verbose or (i % 1000 == 0 and i > 0): print(f"Processing {i}/{len(files)}...") result = migrate_file(filepath, apply=args.apply, migration_timestamp=migration_timestamp) stats['processed'] += 1 if result['migrated']: stats['migrated'] += 1 if args.verbose: print(f" ✓ {filepath.name}: {result['reason']}") elif 'Not a Category 1' in result['reason']: stats['skipped_not_cat1'] += 1 if args.verbose: print(f" - {filepath.name}: {result['reason']}") elif 'already valid' in result['reason']: stats['skipped_already_valid'] += 1 if args.verbose: print(f" - {filepath.name}: {result['reason']}") elif 'would apply' in result['reason']: # Dry run - would migrate stats['migrated'] += 1 if args.verbose: print(f" ~ {filepath.name}: {result['reason']}") for change in result['changes']: print(f" {change}") elif 'Failed' in result['reason'] or 'ERROR' in result['reason']: stats['errors'] += 1 print(f" ✗ {filepath.name}: {result['reason']}", file=sys.stderr) # Summary print() print("=== Migration Summary ===") print(f"Total processed: {stats['processed']}") print(f"{'Would migrate:' if not args.apply else 'Migrated:'} {stats['migrated']}") print(f"Skipped (not Cat 1): {stats['skipped_not_cat1']}") print(f"Skipped (already OK): {stats['skipped_already_valid']}") print(f"Errors: {stats['errors']}") if not args.apply and stats['migrated'] > 0: print() print("This was a DRY RUN. To apply changes, run with --apply") if __name__ == '__main__': main()