#!/usr/bin/env python3 """ Fix bad enrichments in custodian YAML files using CH-Annotator entity type validation. Detects and removes enrichment data that doesn't match the expected entity type: 1. Wikidata enrichments matching NON-INSTITUTION entity types: - Family names, surnames, given names (APP.NAM in CH-Annotator) - Geographic features: municipalities, streets, rivers (TOP in CH-Annotator) - Biological entities: genes, proteins, species (not in heritage taxonomy) - Works: songs, films, albums, video games (WRK in CH-Annotator) - Chemicals, software, etc. 2. Google Maps enrichments with wrong country: - Address country doesn't match GHCID country code Following AGENTS.md Rule 5: NEVER Delete Enriched Data - we move bad enrichments to a 'removed_bad_enrichments' section with reason, rather than deleting completely. Uses CH-Annotator entity type system (ch_annotator-v1_7_0) for validation logic. """ import os import sys import yaml import re from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional, Tuple, List # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) # ============================================================================= # CH-ANNOTATOR ENTITY TYPE VALIDATION # ============================================================================= # Based on data/entity_annotation/ch_annotator-v1_7_0.yaml # Heritage custodians should be GRP.HER (GROUP.HERITAGE_CUSTODIAN) type # Patterns indicating WRONG entity types (NOT heritage custodians) BAD_WIKIDATA_PATTERNS = { # APPELLATION (APP) - Names as linguistic entities, not institutions 'APP.NAM': [ r'\bfamily name\b', r'\bsurname\b', r'\bgiven name\b', r'\bfirst name\b', r'\blast name\b', r'\bpersonal name\b', r'\bname\s+of\s+\w+\s+origin\b', r'\bpatronymi[kc]\b', r'\bmatronym\b', ], # TOPONYM (TOP) - Place references, not institutions 'TOP.ADM': [ # Administrative units r'\bmunicipality\b(?!\s+(museum|library|archive|center|centre))', r'\bdistrict\b(?!\s+(museum|library|archive))', r'\bprovince\b(?!\s+(museum|library|archive))', r'\bstate\b(?!\s+(museum|library|archive))', r'\bregion\b(?!\s+(museum|library|archive))', r'\bcounty\b(?!\s+(museum|library|archive))', ], 'TOP.GEO': [ # Geographic features r'\bstreet\b(?!\s+art)', # "street art museum" is valid r'\briver\b(?!\s+museum)', r'\btributary\b', r'\bisland\b(?!\s+(museum|heritage))', r'\blake\b(?!\s+museum)', r'\bmountain\b(?!\s+museum)', r'\bforest\b(?!\s+museum)', r'\bvalley\b(?!\s+museum)', r'\bpeninsula\b', r'\bcanal\b(?!\s+museum)', ], # THING (THG) - Objects/entities that are NOT institutions 'THG.BIO': [ # Biological r'\bgene\b', r'\bprotein\b', r'\bspecies\b', r'\btaxon\b', r'\bbacteria\b', r'\bvirus\b', r'\bfungus\b', r'\bplant\b(?!\s+(museum|garden|society))', ], 'THG.CHM': [ # Chemical r'\bchemical\b(?!\s+heritage)', r'\bcompound\b', r'\bmolecule\b', r'\belement\b(?!\s+of)', ], 'THG.SFT': [ # Software/digital products r'\bsoftware\b', r'\bvideo game\b', r'\bmobile app\b', r'\boperating system\b', ], # WORK (WRK) - Creative works, not institutions 'WRK.AUD': [ # Audio works r'\bsong\b(?!\s+museum)', r'\balbum\b(?!\s+museum)', r'\bmusical\b(?!\s+museum)', ], 'WRK.VIS': [ # Visual works r'\bfilm\b(?!\s+(museum|archive|institute))', r'\bmovie\b(?!\s+(museum|archive))', r'\btelevision series\b', r'\bTV series\b', r'\bpainting\b(?!\s+(museum|collection))', r'\bsculpture\b(?!\s+(museum|garden))', ], # Wrong country/context indicators 'WRONG_CONTEXT': [ r'\bFlemish\b(?!.*\bmuseum\b)', # Flemish (BE) for NL files r'\bWalloon\b', # Belgian ], } # Wikidata entity types that are DEFINITELY wrong for heritage custodians WRONG_WIKIDATA_TYPES = [ 'Wikimedia disambiguation page', 'Wikimedia list article', 'Wikimedia category', 'Wikimedia template', 'scientific article', 'scholarly article', 'human biblical figure', # Not institutions ] # Country name to ISO 3166-1 alpha-2 code mapping COUNTRY_NAMES_TO_CODES = { # Netherlands 'netherlands': 'NL', 'nederland': 'NL', 'the netherlands': 'NL', 'holland': 'NL', # Belgium 'belgium': 'BE', 'belgie': 'BE', 'belgië': 'BE', 'belgique': 'BE', 'belgien': 'BE', # Germany 'germany': 'DE', 'deutschland': 'DE', 'allemagne': 'DE', # France 'france': 'FR', # United States 'usa': 'US', 'united states': 'US', 'united states of america': 'US', 'u.s.a.': 'US', # Austria 'austria': 'AT', 'osterreich': 'AT', 'österreich': 'AT', # Switzerland 'switzerland': 'CH', 'schweiz': 'CH', 'suisse': 'CH', 'svizzera': 'CH', # Italy 'italy': 'IT', 'italia': 'IT', # Spain 'spain': 'ES', 'espana': 'ES', 'españa': 'ES', # Portugal 'portugal': 'PT', # United Kingdom 'united kingdom': 'GB', 'uk': 'GB', 'great britain': 'GB', 'england': 'GB', # Canada 'canada': 'CA', # Australia 'australia': 'AU', # Japan 'japan': 'JP', # Brazil 'brazil': 'BR', 'brasil': 'BR', # Palestine 'palestine': 'PS', 'state of palestine': 'PS', # Israel 'israel': 'IL', } def extract_country_from_ghcid(filename: str) -> Optional[str]: """Extract country code from GHCID filename (e.g., NL-NH-AMS-M-RM.yaml -> NL).""" basename = os.path.basename(filename) match = re.match(r'^([A-Z]{2})-', basename) if match: return match.group(1) return None def extract_country_from_address(address: str) -> Optional[str]: """Extract country code from formatted address string.""" if not address: return None address_lower = address.lower().strip() for country_name, code in COUNTRY_NAMES_TO_CODES.items(): # Check if address ends with country name pattern = rf',\s*{re.escape(country_name)}\s*$' if re.search(pattern, address_lower): return code # Check for US-style zip code pattern (state abbreviation + 5-digit zip) if re.search(r',\s*[A-Z]{2}\s+\d{5}', address): if 'usa' in address_lower or 'united states' in address_lower: return 'US' # Assume US if has state+zip format and no other country for country_name in COUNTRY_NAMES_TO_CODES.keys(): if country_name in address_lower: break else: return 'US' # Default to US for state+zip pattern return None def is_bad_wikidata_enrichment(enrichment: dict, expected_country: Optional[str] = None) -> Tuple[bool, str, str]: """ Check if wikidata enrichment is bad using CH-Annotator entity type validation. Returns: Tuple of (is_bad, reason, ch_annotator_type) """ if not enrichment: return False, "", "" description = enrichment.get('wikidata_description', '') if not description: return False, "", "" description_lower = description.lower() # Check against all BAD patterns by CH-Annotator type for ch_type, patterns in BAD_WIKIDATA_PATTERNS.items(): for pattern in patterns: if re.search(pattern, description_lower, re.IGNORECASE): reason = f"Wikidata description '{description}' matches CH-Annotator type {ch_type} (not GRP.HER heritage custodian)" return True, reason, ch_type # Check for wrong Wikidata entity types for wrong_type in WRONG_WIKIDATA_TYPES: if wrong_type.lower() in description_lower: reason = f"Wikidata description '{description}' is type '{wrong_type}' (not heritage institution)" return True, reason, "WRONG_TYPE" # Check for country mismatch in description (e.g., "Flemish" for NL institution) if expected_country == 'NL': if re.search(r'\bFlemish\b', description, re.IGNORECASE): if not re.search(r'\b(museum|archive|library|collection)\b', description_lower): reason = f"Wikidata description '{description}' indicates Belgian (Flemish) entity for Netherlands institution" return True, reason, "WRONG_CONTEXT" return False, "", "" def is_bad_google_maps_enrichment(enrichment: dict, expected_country: Optional[str]) -> Tuple[bool, str]: """Check if Google Maps enrichment is for wrong country.""" if not enrichment or not expected_country: return False, "" address = enrichment.get('formatted_address', '') if not address: return False, "" detected_country = extract_country_from_address(address) if detected_country and detected_country != expected_country: return True, f"Google Maps address '{address}' is in {detected_country}, expected {expected_country}" return False, "" def fix_file(filepath: str, dry_run: bool = True) -> dict: """ Fix bad enrichments in a single file. Returns dict with: - fixed: bool - whether file was fixed - wikidata_bad: bool - had bad wikidata - google_bad: bool - had bad google maps - reasons: list of reasons - ch_types: list of CH-Annotator types detected """ result = { 'fixed': False, 'wikidata_bad': False, 'google_bad': False, 'reasons': [], 'ch_types': [] } try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: print(f" ERROR reading {filepath}: {e}") return result if not data or not isinstance(data, dict): return result expected_country = extract_country_from_ghcid(filepath) modified = False # Initialize removed_bad_enrichments section if needed if 'removed_bad_enrichments' not in data: data['removed_bad_enrichments'] = [] # Check Wikidata enrichment wikidata_enrichment = data.get('wikidata_enrichment') if wikidata_enrichment: is_bad, reason, ch_type = is_bad_wikidata_enrichment(wikidata_enrichment, expected_country) if is_bad: result['wikidata_bad'] = True result['reasons'].append(reason) result['ch_types'].append(ch_type) # Move to removed section (following AGENTS.md Rule 5) data['removed_bad_enrichments'].append({ 'type': 'wikidata_enrichment', 'reason': reason, 'ch_annotator_type': ch_type, 'validation_convention': 'ch_annotator-v1_7_0', 'removal_timestamp': datetime.now(timezone.utc).isoformat(), 'original_data': wikidata_enrichment }) del data['wikidata_enrichment'] modified = True print(f" Wikidata [{ch_type}]: {reason}") # Check Google Maps enrichment google_enrichment = data.get('google_maps_enrichment') if google_enrichment: is_bad, reason = is_bad_google_maps_enrichment(google_enrichment, expected_country) if is_bad: result['google_bad'] = True result['reasons'].append(reason) # Move to removed section data['removed_bad_enrichments'].append({ 'type': 'google_maps_enrichment', 'reason': reason, 'ch_annotator_type': 'WRONG_CONTEXT', 'validation_convention': 'ch_annotator-v1_7_0', 'removal_timestamp': datetime.now(timezone.utc).isoformat(), 'original_data': google_enrichment }) del data['google_maps_enrichment'] modified = True print(f" Google Maps: {reason}") # Clean up empty removed_bad_enrichments section if not data['removed_bad_enrichments']: del data['removed_bad_enrichments'] # Add provenance note if modified: if 'provenance' not in data: data['provenance'] = {} if 'notes' not in data['provenance']: data['provenance']['notes'] = [] elif isinstance(data['provenance']['notes'], str): # Convert string notes to list data['provenance']['notes'] = [data['provenance']['notes']] data['provenance']['notes'].append( f"Bad enrichment(s) removed {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} " f"via CH-Annotator validation (ch_annotator-v1_7_0): " + "; ".join(result['reasons']) ) result['fixed'] = True if not dry_run: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return result def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser( description='Fix bad enrichments using CH-Annotator entity type validation' ) parser.add_argument('--dry-run', action='store_true', default=True, help='Show what would be fixed without making changes (default: True)') parser.add_argument('--apply', action='store_true', help='Actually apply the fixes') parser.add_argument('--path', type=str, default='data/custodian', help='Path to custodian files directory') parser.add_argument('--file', type=str, help='Fix a single file instead of scanning directory') parser.add_argument('--verbose', '-v', action='store_true', help='Show all files being processed') args = parser.parse_args() dry_run = not args.apply print("=" * 70) print("CH-ANNOTATOR ENTITY TYPE VALIDATION") print("Convention: ch_annotator-v1_7_0") print("Expected type: GRP.HER (Heritage Custodian)") print("=" * 70) if dry_run: print("\n*** DRY RUN MODE (use --apply to make changes) ***\n") else: print("\n*** APPLYING FIXES ***\n") stats = { 'total_scanned': 0, 'wikidata_bad': 0, 'google_bad': 0, 'fixed': 0, 'by_ch_type': {}, 'files_with_issues': [] } if args.file: files = [args.file] else: custodian_dir = Path(args.path) files = sorted(custodian_dir.glob('*.yaml')) for filepath in files: filepath = str(filepath) stats['total_scanned'] += 1 if args.verbose: print(f"Processing: {os.path.basename(filepath)}") result = fix_file(filepath, dry_run=dry_run) if result['wikidata_bad']: stats['wikidata_bad'] += 1 if result['google_bad']: stats['google_bad'] += 1 if result['fixed']: stats['fixed'] += 1 stats['files_with_issues'].append({ 'file': filepath, 'reasons': result['reasons'], 'ch_types': result['ch_types'] }) for ch_type in result['ch_types']: stats['by_ch_type'][ch_type] = stats['by_ch_type'].get(ch_type, 0) + 1 print("\n" + "=" * 70) print("SUMMARY") print("=" * 70) print(f"Total files scanned: {stats['total_scanned']}") print(f"Bad Wikidata enrichments: {stats['wikidata_bad']}") print(f"Bad Google Maps enrichments: {stats['google_bad']}") print(f"Files to fix: {stats['fixed']}") if stats['by_ch_type']: print(f"\nBy CH-Annotator type:") for ch_type, count in sorted(stats['by_ch_type'].items(), key=lambda x: -x[1]): print(f" {ch_type}: {count}") if stats['files_with_issues']: print(f"\nFiles with issues:") for item in stats['files_with_issues']: print(f"\n {os.path.basename(item['file'])}") for reason in item['reasons']: print(f" → {reason}") if dry_run and stats['fixed'] > 0: print(f"\n*** Run with --apply to fix these {stats['fixed']} files ***") return 0 if __name__ == '__main__': sys.exit(main())