#!/usr/bin/env python3 """ Migrate web claims from 1,456 ad-hoc types to canonical types. This script: 1. Reads existing web_enrichment claims from NDE entry YAML files 2. Maps claim types to canonical types (from CanonicalClaimTypes.yaml) 3. Drops metadata fields that aren't institution claims 4. Converts nested claims (branches_0_name) to structured arrays 5. Validates that TIER 3 claims have XPath provenance 6. Writes updated entries with standardized claims Usage: # Analyze without writing changes python scripts/migrate_claims_to_canonical.py --dry-run # Migrate all entries python scripts/migrate_claims_to_canonical.py # Migrate single entry python scripts/migrate_claims_to_canonical.py --entry 0001 """ import argparse import logging import re import sys from collections import Counter, defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # ======================================== # CANONICAL TYPE MAPPINGS # ======================================== # Direct mappings from old types to canonical types CANONICAL_MAPPINGS = { # Identity claims -> full_name or short_name 'org_name': 'full_name', 'custodian_name': 'full_name', 'full_name': 'full_name', 'organization_name': 'full_name', 'museum_name': 'full_name', 'operating_name': 'full_name', 'legal_name': 'full_name', 'name': 'full_name', 'brand_name': 'short_name', 'short_name': 'short_name', 'abbreviation': 'short_name', # Description claims 'description': 'description', 'description_short': 'description', 'mission': 'description', 'slogan': 'description', 'tagline': 'description', 'about': 'description', 'history': 'description', # Contact claims 'email': 'email', 'phone': 'phone', 'telephone': 'phone', 'address': 'address', 'postal_code': 'postal_code', 'city': 'city', 'street_address': 'address', # Social media normalization 'facebook': 'social_facebook', 'social_facebook': 'social_facebook', 'social_media_facebook': 'social_facebook', 'instagram': 'social_instagram', 'social_instagram': 'social_instagram', 'social_media_instagram': 'social_instagram', 'twitter': 'social_twitter', 'social_twitter': 'social_twitter', 'social_media_twitter': 'social_twitter', 'x': 'social_twitter', 'linkedin': 'social_linkedin', 'social_linkedin': 'social_linkedin', 'social_media_linkedin': 'social_linkedin', 'youtube': 'social_youtube', 'social_youtube': 'social_youtube', 'social_media_youtube': 'social_youtube', # Website 'website': 'website', 'social_media_website': 'website', 'source_urls': 'website', 'url': 'website', # Identifiers 'isil_code': 'isil_code', 'isil': 'isil_code', 'kvk_number': 'kvk_number', 'kvk': 'kvk_number', 'wikidata_id': 'wikidata_id', 'wikidata': 'wikidata_id', 'rsin': 'kvk_number', # Organization metadata 'organization_type': 'organization_type', 'institution_type': 'organization_type', 'type': 'organization_type', 'legal_form': 'legal_form', 'rechtsvorm': 'legal_form', 'founding_date': 'founding_date', 'founded': 'founding_date', 'founding_year': 'founding_date', 'established': 'founding_date', 'opening_hours': 'opening_hours', 'hours': 'opening_hours', # Structural (TIER 1) - from docling 'page_title': 'page_title', 'page_count': 'page_count', 'image_count': 'image_count', 'table_count': 'table_count', 'markdown_length': 'markdown_length', # Pattern-based (TIER 2) 'main_h1': 'main_h1', 'nav_items': 'nav_items', 'has_contact_section': 'has_contact_section', 'has_footer': 'has_footer', 'language_detected': 'language_detected', } # Types to DROP (metadata, not institution claims) DROP_TYPES = { 'extraction_timestamp', 'extraction_method', 'confidence_score', 'enrichment_method', 'name_verified', 'needs_verification', 'verification_notes', 'note', 'notes', 'claim_notes', 'source', 'xpath_match_score', # Keep as claim metadata, not claim type 'retrieved_on', # Keep as claim metadata 'html_file', # Keep as claim metadata } # Generic UI text to filter out INVALID_CLAIMS_PATTERNS = [ r'^Home$', r'^Contact$', r'^Over ons$', r'^Collectie$', r'^Bezoek$', r'^Menu$', r'^Search$', r'^Zoeken$', r'^Nederlands$', r'^English$', r'^Skip to', r'^Cookie', r'share.*facebook', r'share.*twitter', r'intent/tweet', r'sharer\.php', ] INVALID_CLAIMS_RE = [re.compile(p, re.IGNORECASE) for p in INVALID_CLAIMS_PATTERNS] # Nested claim patterns NESTED_PATTERNS = [ (re.compile(r'^branches_(\d+)_(.+)$'), 'branches'), (re.compile(r'^programs_(\d+)_(.+)$'), 'programs'), (re.compile(r'^collections_(\d+)_(.+)$'), 'collections'), (re.compile(r'^digital_platforms_(\d+)_(.+)$'), 'digital_platforms'), (re.compile(r'^organization_details_(.+)$'), 'organization_details'), (re.compile(r'^location_details_(.+)$'), 'location'), (re.compile(r'^contact_(.+)$'), 'contact'), ] # TIER 3 claims that MUST have XPath provenance TIER_3_CLAIMS = { 'full_name', 'short_name', 'description', 'email', 'phone', 'address', 'postal_code', 'city', 'organization_type', 'legal_form', 'founding_date', 'opening_hours' } class ClaimMigrator: """Migrate claims to canonical types.""" def __init__(self, entries_dir: Path, dry_run: bool = False): self.entries_dir = entries_dir self.dry_run = dry_run self.stats = { 'entries_processed': 0, 'claims_migrated': 0, 'claims_dropped': 0, 'claims_invalid': 0, 'claims_nested': 0, 'claims_unmapped': 0, 'tier3_missing_xpath': 0, } self.type_counts = Counter() self.unmapped_types = Counter() def find_entry_files(self) -> list[Path]: """Find all entry YAML files.""" return sorted(self.entries_dir.glob('*.yaml')) def is_invalid_claim_value(self, value: str) -> bool: """Check if a claim value is invalid (generic UI text).""" if not value or not isinstance(value, str): return False value = value.strip() if len(value) < 3: return True # Too short to be meaningful for pattern in INVALID_CLAIMS_RE: if pattern.search(value): return True return False def map_claim_type(self, claim_type: str) -> tuple[str | None, str]: """ Map a claim type to canonical type. Returns: (canonical_type, action) where action is: - 'map': Direct mapping - 'drop': Should be dropped - 'nest': Should be converted to nested structure - 'unknown': Unknown type """ # Normalize type name claim_type_lower = claim_type.lower().strip() # Check for DROP types if claim_type_lower in DROP_TYPES: return None, 'drop' # Check for nested patterns for pattern, parent in NESTED_PATTERNS: if pattern.match(claim_type): return parent, 'nest' # Check direct mapping if claim_type_lower in CANONICAL_MAPPINGS: return CANONICAL_MAPPINGS[claim_type_lower], 'map' # Try with underscores removed claim_type_clean = claim_type_lower.replace('_', '') for old_type, new_type in CANONICAL_MAPPINGS.items(): if old_type.replace('_', '') == claim_type_clean: return new_type, 'map' return None, 'unknown' def migrate_claim(self, claim: dict) -> dict | None: """ Migrate a single claim to canonical type. Returns migrated claim or None if should be dropped. """ claim_type = claim.get('claim_type', '') claim_value = claim.get('claim_value', '') # Check for invalid value if self.is_invalid_claim_value(claim_value): self.stats['claims_invalid'] += 1 return None # Map type canonical_type, action = self.map_claim_type(claim_type) if action == 'drop': self.stats['claims_dropped'] += 1 return None if action == 'nest': self.stats['claims_nested'] += 1 # For now, we'll keep nested claims but mark them # A separate pass will convert to structured arrays return { **claim, 'claim_type': claim_type, # Keep original for later processing '_nested_parent': canonical_type, } if action == 'unknown': self.stats['claims_unmapped'] += 1 self.unmapped_types[claim_type] += 1 # Keep unknown claims but mark them return { **claim, '_unmapped': True, } # Successfully mapped self.stats['claims_migrated'] += 1 self.type_counts[canonical_type] += 1 # Check TIER 3 XPath requirement if canonical_type in TIER_3_CLAIMS: if not claim.get('xpath'): self.stats['tier3_missing_xpath'] += 1 # Mark as needing verification claim['_missing_xpath'] = True return { **claim, 'claim_type': canonical_type, '_original_type': claim_type if claim_type != canonical_type else None, } def migrate_entry(self, entry: dict) -> dict: """Migrate all claims in an entry.""" # Check both web_claims and web_enrichment for claims web_claims = entry.get('web_claims', {}) web_enrichment = entry.get('web_enrichment', {}) # Try web_claims first (newer structure), then web_enrichment claims = web_claims.get('claims', []) or web_enrichment.get('claims', []) if not claims: return entry migrated_claims = [] for claim in claims: migrated = self.migrate_claim(claim) if migrated: # Clean up internal markers for final output if not self.dry_run: migrated.pop('_unmapped', None) migrated.pop('_nested_parent', None) orig_type = migrated.pop('_original_type', None) # Optionally store original type for audit if orig_type: migrated['original_claim_type'] = orig_type migrated_claims.append(migrated) # Update the appropriate section if 'web_claims' in entry: entry['web_claims']['claims'] = migrated_claims entry['web_claims']['claims_migrated'] = True entry['web_claims']['migration_timestamp'] = datetime.now(timezone.utc).isoformat() elif 'web_enrichment' in entry: if 'claims' not in entry['web_enrichment']: entry['web_enrichment'] = entry.get('web_enrichment', {}) entry['web_enrichment']['claims'] = migrated_claims entry['web_enrichment']['claims_migrated'] = True entry['web_enrichment']['migration_timestamp'] = datetime.now(timezone.utc).isoformat() return entry def process_entry_file(self, path: Path) -> bool: """Process a single entry file.""" try: with open(path, 'r', encoding='utf-8') as f: entry = yaml.safe_load(f) if not entry: return False # Check if already migrated web_claims = entry.get('web_claims', {}) web_enrichment = entry.get('web_enrichment', {}) if web_claims.get('claims_migrated') or web_enrichment.get('claims_migrated'): logger.debug(f"Skipping {path.name} - already migrated") return False # Migrate migrated = self.migrate_entry(entry) self.stats['entries_processed'] += 1 # Write if not dry run if not self.dry_run: with open(path, 'w', encoding='utf-8') as f: yaml.dump(migrated, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return True except Exception as e: logger.error(f"Error processing {path}: {e}") return False def run(self, entry_filter: str | None = None): """Run migration on all entries.""" files = self.find_entry_files() if entry_filter: files = [f for f in files if entry_filter in f.name] logger.info(f"Found {len(files)} entry files") for path in files: self.process_entry_file(path) if self.stats['entries_processed'] % 100 == 0 and self.stats['entries_processed'] > 0: logger.info(f"Processed {self.stats['entries_processed']} entries...") self.report() def report(self): """Print migration report.""" print("\n" + "=" * 60) print("CLAIM MIGRATION REPORT") print("=" * 60) print(f"\nEntries processed: {self.stats['entries_processed']}") print(f"\nClaims:") print(f" - Migrated to canonical: {self.stats['claims_migrated']}") print(f" - Dropped (metadata): {self.stats['claims_dropped']}") print(f" - Invalid (UI text): {self.stats['claims_invalid']}") print(f" - Nested (to convert): {self.stats['claims_nested']}") print(f" - Unmapped (unknown): {self.stats['claims_unmapped']}") print(f"\nTIER 3 claims missing XPath: {self.stats['tier3_missing_xpath']}") if self.type_counts: print("\nCanonical type distribution:") for claim_type, count in self.type_counts.most_common(20): print(f" {claim_type}: {count}") if self.unmapped_types: print(f"\nUnmapped types ({len(self.unmapped_types)} unique):") for claim_type, count in self.unmapped_types.most_common(30): print(f" {claim_type}: {count}") def main(): parser = argparse.ArgumentParser(description='Migrate web claims to canonical types') parser.add_argument('--entries-dir', type=Path, default=Path('data/nde/enriched/entries'), help='Path to entries directory') parser.add_argument('--entry', type=str, help='Filter to specific entry ID (e.g., 0001)') parser.add_argument('--dry-run', action='store_true', help='Analyze without writing changes') args = parser.parse_args() if not args.entries_dir.exists(): logger.error(f"Entries directory not found: {args.entries_dir}") sys.exit(1) mode = "DRY RUN" if args.dry_run else "MIGRATION" logger.info(f"Starting {mode}...") migrator = ClaimMigrator(args.entries_dir, dry_run=args.dry_run) migrator.run(entry_filter=args.entry) if __name__ == '__main__': main()