414 lines
14 KiB
Python
Executable file
414 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Phase 1 Migration Script: ISIL/CSV Sources → Proper Dual Timestamp Provenance
|
|
|
|
Migrates custodian files from:
|
|
- agent: claude-conversation
|
|
- single timestamp field
|
|
|
|
To:
|
|
- agent: batch-script-create-custodian-from-ch-annotator
|
|
- source_archived_at: (from existing timestamp)
|
|
- statement_created_at: (from annotation_date)
|
|
|
|
This script handles Category 1 files (ISIL registries, CSV sources) which
|
|
have path patterns like `/files/*.yaml` or `/files/*.csv`.
|
|
|
|
Usage:
|
|
# Dry run (default) - shows what would be changed
|
|
python scripts/migrate_provenance_phase1.py
|
|
|
|
# Actually apply changes
|
|
python scripts/migrate_provenance_phase1.py --apply
|
|
|
|
# Process specific files
|
|
python scripts/migrate_provenance_phase1.py --apply --file data/custodian/JP-20-SHI-L-S-shiojiritankakan.yaml
|
|
|
|
# Process only files from specific source
|
|
python scripts/migrate_provenance_phase1.py --apply --source japan_complete
|
|
|
|
References:
|
|
- Rule 35: .opencode/PROVENANCE_TIMESTAMP_RULES.md
|
|
- Migration Spec: .opencode/CLAUDE_CONVERSATION_MIGRATION_SPEC.md
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional
|
|
|
|
import yaml
|
|
|
|
# YAML configuration to preserve formatting
|
|
class PreservingDumper(yaml.SafeDumper):
|
|
"""Custom YAML dumper that preserves order and handles special types."""
|
|
pass
|
|
|
|
def str_representer(dumper, data):
|
|
"""Handle multiline strings and special characters."""
|
|
if '\n' in data:
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
PreservingDumper.add_representer(str, str_representer)
|
|
|
|
|
|
# Constants
|
|
INVALID_AGENTS = {'claude-conversation', 'claude', 'ai', 'opencode', 'llm'}
|
|
CATEGORY_1_PATH_PATTERN = re.compile(r'^/files/.*\.(yaml|csv)$')
|
|
|
|
# Source type mapping based on path
|
|
SOURCE_TYPE_MAP = {
|
|
'japan_complete': 'isil_registry_csv',
|
|
'czech_unified': 'library_registry_api',
|
|
'switzerland_isil': 'isil_registry_csv',
|
|
'austria_complete': 'isil_registry_csv',
|
|
'belarus_complete': 'isil_registry_csv',
|
|
'argentina_complete': 'isil_registry_csv',
|
|
'bulgaria_complete': 'isil_registry_csv',
|
|
'belgium_complete': 'isil_registry_csv',
|
|
}
|
|
|
|
# Agent mapping based on source type
|
|
AGENT_MAP = {
|
|
'isil_registry_csv': 'batch-script-create-custodian-from-ch-annotator',
|
|
'library_registry_api': 'batch-script-create-custodian-from-ch-annotator',
|
|
}
|
|
|
|
|
|
def load_yaml_file(filepath: Path) -> Optional[dict]:
|
|
"""Load a YAML file safely."""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
return yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f" ERROR loading {filepath}: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def save_yaml_file(filepath: Path, data: dict) -> bool:
|
|
"""Save data to YAML file with proper formatting."""
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, Dumper=PreservingDumper,
|
|
default_flow_style=False,
|
|
allow_unicode=True,
|
|
sort_keys=False,
|
|
width=120)
|
|
return True
|
|
except Exception as e:
|
|
print(f" ERROR saving {filepath}: {e}", file=sys.stderr)
|
|
return False
|
|
|
|
|
|
def is_category_1_file(data: dict) -> bool:
|
|
"""Check if this file is a Category 1 (ISIL/CSV source) file."""
|
|
ch_annotator = data.get('ch_annotator', {})
|
|
extraction_prov = ch_annotator.get('extraction_provenance', {})
|
|
path = extraction_prov.get('path', '')
|
|
|
|
return bool(CATEGORY_1_PATH_PATTERN.match(path))
|
|
|
|
|
|
def get_source_type(path: str) -> str:
|
|
"""Determine source type from path."""
|
|
for key, source_type in SOURCE_TYPE_MAP.items():
|
|
if key in path:
|
|
return source_type
|
|
return 'unknown_csv_source'
|
|
|
|
|
|
def get_new_agent(source_type: str) -> str:
|
|
"""Get the appropriate agent identifier for a source type."""
|
|
return AGENT_MAP.get(source_type, 'batch-script-create-custodian-from-ch-annotator')
|
|
|
|
|
|
def migrate_provenance_block(prov: dict, annotation_date: Optional[str],
|
|
source_type: str, migration_timestamp: str) -> dict:
|
|
"""
|
|
Migrate a single provenance block to dual timestamp format.
|
|
|
|
Args:
|
|
prov: The provenance dict to migrate
|
|
annotation_date: The annotation_date from annotation_provenance (for statement_created_at)
|
|
source_type: The determined source type
|
|
migration_timestamp: When this migration is happening
|
|
|
|
Returns:
|
|
Updated provenance dict
|
|
"""
|
|
# Get existing timestamp
|
|
existing_timestamp = prov.get('timestamp')
|
|
existing_agent = prov.get('agent', '')
|
|
|
|
# Skip if already migrated (has dual timestamps)
|
|
if 'source_archived_at' in prov and 'statement_created_at' in prov:
|
|
return prov
|
|
|
|
# Skip if agent is not invalid
|
|
if existing_agent not in INVALID_AGENTS:
|
|
return prov
|
|
|
|
# Create new provenance structure
|
|
new_prov = dict(prov) # Preserve existing fields
|
|
|
|
# Add dual timestamps
|
|
if existing_timestamp:
|
|
new_prov['source_archived_at'] = existing_timestamp
|
|
|
|
if annotation_date:
|
|
new_prov['statement_created_at'] = annotation_date
|
|
else:
|
|
# Fall back to existing timestamp if no annotation_date
|
|
new_prov['statement_created_at'] = existing_timestamp or migration_timestamp
|
|
|
|
# Update agent
|
|
new_prov['agent'] = get_new_agent(source_type)
|
|
|
|
# Add source type
|
|
new_prov['source_type'] = source_type
|
|
|
|
# Add migration note
|
|
new_prov['migration_note'] = f'Migrated from agent:claude-conversation on {migration_timestamp[:10]}'
|
|
|
|
# Remove old timestamp field (now split into dual timestamps)
|
|
if 'timestamp' in new_prov and 'source_archived_at' in new_prov:
|
|
del new_prov['timestamp']
|
|
|
|
return new_prov
|
|
|
|
|
|
def migrate_file(filepath: Path, apply: bool = False,
|
|
migration_timestamp: Optional[str] = None) -> dict:
|
|
"""
|
|
Migrate a single custodian file to dual timestamp provenance.
|
|
|
|
Args:
|
|
filepath: Path to the YAML file
|
|
apply: If True, actually write changes; if False, just report
|
|
migration_timestamp: ISO timestamp for migration tracking
|
|
|
|
Returns:
|
|
dict with migration results: {
|
|
'migrated': bool,
|
|
'reason': str,
|
|
'changes': list of change descriptions
|
|
}
|
|
"""
|
|
if migration_timestamp is None:
|
|
migration_timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
result = {
|
|
'migrated': False,
|
|
'reason': '',
|
|
'changes': [],
|
|
'filepath': str(filepath)
|
|
}
|
|
|
|
# Load file
|
|
data = load_yaml_file(filepath)
|
|
if data is None:
|
|
result['reason'] = 'Failed to load YAML'
|
|
return result
|
|
|
|
# Check if Category 1
|
|
if not is_category_1_file(data):
|
|
result['reason'] = 'Not a Category 1 file (ISIL/CSV source)'
|
|
return result
|
|
|
|
ch_annotator = data.get('ch_annotator', {})
|
|
extraction_prov = ch_annotator.get('extraction_provenance', {})
|
|
annotation_prov = ch_annotator.get('annotation_provenance', {})
|
|
entity_claims = ch_annotator.get('entity_claims', [])
|
|
|
|
# Check if migration needed
|
|
current_agent = extraction_prov.get('agent', '')
|
|
if current_agent not in INVALID_AGENTS:
|
|
result['reason'] = f'Agent already valid: {current_agent}'
|
|
return result
|
|
|
|
# Get metadata for migration
|
|
path = extraction_prov.get('path', '')
|
|
source_type = get_source_type(path)
|
|
annotation_date = annotation_prov.get('annotation_date')
|
|
|
|
changes = []
|
|
|
|
# Migrate extraction_provenance
|
|
new_extraction_prov = migrate_provenance_block(
|
|
extraction_prov, annotation_date, source_type, migration_timestamp
|
|
)
|
|
if new_extraction_prov != extraction_prov:
|
|
ch_annotator['extraction_provenance'] = new_extraction_prov
|
|
changes.append(f'extraction_provenance: agent={new_extraction_prov.get("agent")}')
|
|
|
|
# Migrate each entity_claim provenance
|
|
for i, claim in enumerate(entity_claims):
|
|
claim_prov = claim.get('provenance', {})
|
|
if claim_prov:
|
|
new_claim_prov = migrate_provenance_block(
|
|
claim_prov, annotation_date, source_type, migration_timestamp
|
|
)
|
|
if new_claim_prov != claim_prov:
|
|
claim['provenance'] = new_claim_prov
|
|
changes.append(f'entity_claims[{i}].provenance migrated')
|
|
|
|
if not changes:
|
|
result['reason'] = 'No changes needed'
|
|
return result
|
|
|
|
result['changes'] = changes
|
|
|
|
# Apply changes if requested
|
|
if apply:
|
|
if save_yaml_file(filepath, data):
|
|
result['migrated'] = True
|
|
result['reason'] = f'Successfully migrated ({len(changes)} changes)'
|
|
else:
|
|
result['reason'] = 'Failed to save file'
|
|
else:
|
|
result['migrated'] = False
|
|
result['reason'] = f'Dry run - would apply {len(changes)} changes'
|
|
|
|
return result
|
|
|
|
|
|
def find_custodian_files(base_dir: Path, source_filter: Optional[str] = None) -> list[Path]:
|
|
"""Find all custodian YAML files, optionally filtered by source."""
|
|
files = []
|
|
for filepath in base_dir.glob('*.yaml'):
|
|
if filepath.is_file():
|
|
if source_filter:
|
|
# Quick check if file contains the source filter string
|
|
try:
|
|
content = filepath.read_text(encoding='utf-8')
|
|
if source_filter not in content:
|
|
continue
|
|
except:
|
|
continue
|
|
files.append(filepath)
|
|
return files
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Migrate custodian files to dual timestamp provenance (Phase 1: ISIL/CSV sources)'
|
|
)
|
|
parser.add_argument(
|
|
'--apply',
|
|
action='store_true',
|
|
help='Actually apply changes (default is dry run)'
|
|
)
|
|
parser.add_argument(
|
|
'--file',
|
|
type=str,
|
|
help='Migrate a specific file instead of all files'
|
|
)
|
|
parser.add_argument(
|
|
'--source',
|
|
type=str,
|
|
help='Only process files from a specific source (e.g., japan_complete, czech_unified)'
|
|
)
|
|
parser.add_argument(
|
|
'--limit',
|
|
type=int,
|
|
default=None,
|
|
help='Limit number of files to process (for testing)'
|
|
)
|
|
parser.add_argument(
|
|
'--verbose', '-v',
|
|
action='store_true',
|
|
help='Show detailed output for each file'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set up paths
|
|
project_root = Path(__file__).parent.parent
|
|
custodian_dir = project_root / 'data' / 'custodian'
|
|
|
|
migration_timestamp = datetime.now(timezone.utc).isoformat()
|
|
|
|
print(f"=== Phase 1 Migration: ISIL/CSV Sources ===")
|
|
print(f"Mode: {'APPLY CHANGES' if args.apply else 'DRY RUN'}")
|
|
print(f"Migration timestamp: {migration_timestamp}")
|
|
print()
|
|
|
|
# Determine files to process
|
|
if args.file:
|
|
files = [Path(args.file)]
|
|
print(f"Processing single file: {args.file}")
|
|
else:
|
|
print(f"Scanning {custodian_dir}...")
|
|
files = find_custodian_files(custodian_dir, args.source)
|
|
print(f"Found {len(files)} files")
|
|
|
|
if args.source:
|
|
print(f"Filtered by source: {args.source}")
|
|
|
|
if args.limit:
|
|
files = files[:args.limit]
|
|
print(f"Limited to first {args.limit} files")
|
|
|
|
print()
|
|
|
|
# Process files
|
|
stats = {
|
|
'processed': 0,
|
|
'migrated': 0,
|
|
'skipped_not_cat1': 0,
|
|
'skipped_already_valid': 0,
|
|
'errors': 0,
|
|
}
|
|
|
|
for i, filepath in enumerate(files):
|
|
if args.verbose or (i % 1000 == 0 and i > 0):
|
|
print(f"Processing {i}/{len(files)}...")
|
|
|
|
result = migrate_file(filepath, apply=args.apply,
|
|
migration_timestamp=migration_timestamp)
|
|
|
|
stats['processed'] += 1
|
|
|
|
if result['migrated']:
|
|
stats['migrated'] += 1
|
|
if args.verbose:
|
|
print(f" ✓ {filepath.name}: {result['reason']}")
|
|
elif 'Not a Category 1' in result['reason']:
|
|
stats['skipped_not_cat1'] += 1
|
|
if args.verbose:
|
|
print(f" - {filepath.name}: {result['reason']}")
|
|
elif 'already valid' in result['reason']:
|
|
stats['skipped_already_valid'] += 1
|
|
if args.verbose:
|
|
print(f" - {filepath.name}: {result['reason']}")
|
|
elif 'would apply' in result['reason']:
|
|
# Dry run - would migrate
|
|
stats['migrated'] += 1
|
|
if args.verbose:
|
|
print(f" ~ {filepath.name}: {result['reason']}")
|
|
for change in result['changes']:
|
|
print(f" {change}")
|
|
elif 'Failed' in result['reason'] or 'ERROR' in result['reason']:
|
|
stats['errors'] += 1
|
|
print(f" ✗ {filepath.name}: {result['reason']}", file=sys.stderr)
|
|
|
|
# Summary
|
|
print()
|
|
print("=== Migration Summary ===")
|
|
print(f"Total processed: {stats['processed']}")
|
|
print(f"{'Would migrate:' if not args.apply else 'Migrated:'} {stats['migrated']}")
|
|
print(f"Skipped (not Cat 1): {stats['skipped_not_cat1']}")
|
|
print(f"Skipped (already OK): {stats['skipped_already_valid']}")
|
|
print(f"Errors: {stats['errors']}")
|
|
|
|
if not args.apply and stats['migrated'] > 0:
|
|
print()
|
|
print("This was a DRY RUN. To apply changes, run with --apply")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|