471 lines
16 KiB
Python
471 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Fix bad enrichments in custodian YAML files using CH-Annotator entity type validation.
|
|
|
|
Detects and removes enrichment data that doesn't match the expected entity type:
|
|
|
|
1. Wikidata enrichments matching NON-INSTITUTION entity types:
|
|
- Family names, surnames, given names (APP.NAM in CH-Annotator)
|
|
- Geographic features: municipalities, streets, rivers (TOP in CH-Annotator)
|
|
- Biological entities: genes, proteins, species (not in heritage taxonomy)
|
|
- Works: songs, films, albums, video games (WRK in CH-Annotator)
|
|
- Chemicals, software, etc.
|
|
|
|
2. Google Maps enrichments with wrong country:
|
|
- Address country doesn't match GHCID country code
|
|
|
|
Following AGENTS.md Rule 5: NEVER Delete Enriched Data - we move bad enrichments
|
|
to a 'removed_bad_enrichments' section with reason, rather than deleting completely.
|
|
|
|
Uses CH-Annotator entity type system (ch_annotator-v1_7_0) for validation logic.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import yaml
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Optional, Tuple, List
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
# =============================================================================
|
|
# CH-ANNOTATOR ENTITY TYPE VALIDATION
|
|
# =============================================================================
|
|
# Based on data/entity_annotation/ch_annotator-v1_7_0.yaml
|
|
# Heritage custodians should be GRP.HER (GROUP.HERITAGE_CUSTODIAN) type
|
|
|
|
# Patterns indicating WRONG entity types (NOT heritage custodians)
|
|
BAD_WIKIDATA_PATTERNS = {
|
|
# APPELLATION (APP) - Names as linguistic entities, not institutions
|
|
'APP.NAM': [
|
|
r'\bfamily name\b',
|
|
r'\bsurname\b',
|
|
r'\bgiven name\b',
|
|
r'\bfirst name\b',
|
|
r'\blast name\b',
|
|
r'\bpersonal name\b',
|
|
r'\bname\s+of\s+\w+\s+origin\b',
|
|
r'\bpatronymi[kc]\b',
|
|
r'\bmatronym\b',
|
|
],
|
|
|
|
# TOPONYM (TOP) - Place references, not institutions
|
|
'TOP.ADM': [ # Administrative units
|
|
r'\bmunicipality\b(?!\s+(museum|library|archive|center|centre))',
|
|
r'\bdistrict\b(?!\s+(museum|library|archive))',
|
|
r'\bprovince\b(?!\s+(museum|library|archive))',
|
|
r'\bstate\b(?!\s+(museum|library|archive))',
|
|
r'\bregion\b(?!\s+(museum|library|archive))',
|
|
r'\bcounty\b(?!\s+(museum|library|archive))',
|
|
],
|
|
'TOP.GEO': [ # Geographic features
|
|
r'\bstreet\b(?!\s+art)', # "street art museum" is valid
|
|
r'\briver\b(?!\s+museum)',
|
|
r'\btributary\b',
|
|
r'\bisland\b(?!\s+(museum|heritage))',
|
|
r'\blake\b(?!\s+museum)',
|
|
r'\bmountain\b(?!\s+museum)',
|
|
r'\bforest\b(?!\s+museum)',
|
|
r'\bvalley\b(?!\s+museum)',
|
|
r'\bpeninsula\b',
|
|
r'\bcanal\b(?!\s+museum)',
|
|
],
|
|
|
|
# THING (THG) - Objects/entities that are NOT institutions
|
|
'THG.BIO': [ # Biological
|
|
r'\bgene\b',
|
|
r'\bprotein\b',
|
|
r'\bspecies\b',
|
|
r'\btaxon\b',
|
|
r'\bbacteria\b',
|
|
r'\bvirus\b',
|
|
r'\bfungus\b',
|
|
r'\bplant\b(?!\s+(museum|garden|society))',
|
|
],
|
|
'THG.CHM': [ # Chemical
|
|
r'\bchemical\b(?!\s+heritage)',
|
|
r'\bcompound\b',
|
|
r'\bmolecule\b',
|
|
r'\belement\b(?!\s+of)',
|
|
],
|
|
'THG.SFT': [ # Software/digital products
|
|
r'\bsoftware\b',
|
|
r'\bvideo game\b',
|
|
r'\bmobile app\b',
|
|
r'\boperating system\b',
|
|
],
|
|
|
|
# WORK (WRK) - Creative works, not institutions
|
|
'WRK.AUD': [ # Audio works
|
|
r'\bsong\b(?!\s+museum)',
|
|
r'\balbum\b(?!\s+museum)',
|
|
r'\bmusical\b(?!\s+museum)',
|
|
],
|
|
'WRK.VIS': [ # Visual works
|
|
r'\bfilm\b(?!\s+(museum|archive|institute))',
|
|
r'\bmovie\b(?!\s+(museum|archive))',
|
|
r'\btelevision series\b',
|
|
r'\bTV series\b',
|
|
r'\bpainting\b(?!\s+(museum|collection))',
|
|
r'\bsculpture\b(?!\s+(museum|garden))',
|
|
],
|
|
|
|
# Wrong country/context indicators
|
|
'WRONG_CONTEXT': [
|
|
r'\bFlemish\b(?!.*\bmuseum\b)', # Flemish (BE) for NL files
|
|
r'\bWalloon\b', # Belgian
|
|
],
|
|
}
|
|
|
|
# Wikidata entity types that are DEFINITELY wrong for heritage custodians
|
|
WRONG_WIKIDATA_TYPES = [
|
|
'Wikimedia disambiguation page',
|
|
'Wikimedia list article',
|
|
'Wikimedia category',
|
|
'Wikimedia template',
|
|
'scientific article',
|
|
'scholarly article',
|
|
'human biblical figure', # Not institutions
|
|
]
|
|
|
|
# Country name to ISO 3166-1 alpha-2 code mapping
|
|
COUNTRY_NAMES_TO_CODES = {
|
|
# Netherlands
|
|
'netherlands': 'NL', 'nederland': 'NL', 'the netherlands': 'NL', 'holland': 'NL',
|
|
# Belgium
|
|
'belgium': 'BE', 'belgie': 'BE', 'belgië': 'BE', 'belgique': 'BE', 'belgien': 'BE',
|
|
# Germany
|
|
'germany': 'DE', 'deutschland': 'DE', 'allemagne': 'DE',
|
|
# France
|
|
'france': 'FR',
|
|
# United States
|
|
'usa': 'US', 'united states': 'US', 'united states of america': 'US', 'u.s.a.': 'US',
|
|
# Austria
|
|
'austria': 'AT', 'osterreich': 'AT', 'österreich': 'AT',
|
|
# Switzerland
|
|
'switzerland': 'CH', 'schweiz': 'CH', 'suisse': 'CH', 'svizzera': 'CH',
|
|
# Italy
|
|
'italy': 'IT', 'italia': 'IT',
|
|
# Spain
|
|
'spain': 'ES', 'espana': 'ES', 'españa': 'ES',
|
|
# Portugal
|
|
'portugal': 'PT',
|
|
# United Kingdom
|
|
'united kingdom': 'GB', 'uk': 'GB', 'great britain': 'GB', 'england': 'GB',
|
|
# Canada
|
|
'canada': 'CA',
|
|
# Australia
|
|
'australia': 'AU',
|
|
# Japan
|
|
'japan': 'JP',
|
|
# Brazil
|
|
'brazil': 'BR', 'brasil': 'BR',
|
|
# Palestine
|
|
'palestine': 'PS', 'state of palestine': 'PS',
|
|
# Israel
|
|
'israel': 'IL',
|
|
}
|
|
|
|
|
|
def extract_country_from_ghcid(filename: str) -> Optional[str]:
|
|
"""Extract country code from GHCID filename (e.g., NL-NH-AMS-M-RM.yaml -> NL)."""
|
|
basename = os.path.basename(filename)
|
|
match = re.match(r'^([A-Z]{2})-', basename)
|
|
if match:
|
|
return match.group(1)
|
|
return None
|
|
|
|
|
|
def extract_country_from_address(address: str) -> Optional[str]:
|
|
"""Extract country code from formatted address string."""
|
|
if not address:
|
|
return None
|
|
|
|
address_lower = address.lower().strip()
|
|
|
|
for country_name, code in COUNTRY_NAMES_TO_CODES.items():
|
|
# Check if address ends with country name
|
|
pattern = rf',\s*{re.escape(country_name)}\s*$'
|
|
if re.search(pattern, address_lower):
|
|
return code
|
|
|
|
# Check for US-style zip code pattern (state abbreviation + 5-digit zip)
|
|
if re.search(r',\s*[A-Z]{2}\s+\d{5}', address):
|
|
if 'usa' in address_lower or 'united states' in address_lower:
|
|
return 'US'
|
|
# Assume US if has state+zip format and no other country
|
|
for country_name in COUNTRY_NAMES_TO_CODES.keys():
|
|
if country_name in address_lower:
|
|
break
|
|
else:
|
|
return 'US' # Default to US for state+zip pattern
|
|
|
|
return None
|
|
|
|
|
|
def is_bad_wikidata_enrichment(enrichment: dict, expected_country: Optional[str] = None) -> Tuple[bool, str, str]:
|
|
"""
|
|
Check if wikidata enrichment is bad using CH-Annotator entity type validation.
|
|
|
|
Returns:
|
|
Tuple of (is_bad, reason, ch_annotator_type)
|
|
"""
|
|
if not enrichment:
|
|
return False, "", ""
|
|
|
|
description = enrichment.get('wikidata_description', '')
|
|
if not description:
|
|
return False, "", ""
|
|
|
|
description_lower = description.lower()
|
|
|
|
# Check against all BAD patterns by CH-Annotator type
|
|
for ch_type, patterns in BAD_WIKIDATA_PATTERNS.items():
|
|
for pattern in patterns:
|
|
if re.search(pattern, description_lower, re.IGNORECASE):
|
|
reason = f"Wikidata description '{description}' matches CH-Annotator type {ch_type} (not GRP.HER heritage custodian)"
|
|
return True, reason, ch_type
|
|
|
|
# Check for wrong Wikidata entity types
|
|
for wrong_type in WRONG_WIKIDATA_TYPES:
|
|
if wrong_type.lower() in description_lower:
|
|
reason = f"Wikidata description '{description}' is type '{wrong_type}' (not heritage institution)"
|
|
return True, reason, "WRONG_TYPE"
|
|
|
|
# Check for country mismatch in description (e.g., "Flemish" for NL institution)
|
|
if expected_country == 'NL':
|
|
if re.search(r'\bFlemish\b', description, re.IGNORECASE):
|
|
if not re.search(r'\b(museum|archive|library|collection)\b', description_lower):
|
|
reason = f"Wikidata description '{description}' indicates Belgian (Flemish) entity for Netherlands institution"
|
|
return True, reason, "WRONG_CONTEXT"
|
|
|
|
return False, "", ""
|
|
|
|
|
|
def is_bad_google_maps_enrichment(enrichment: dict, expected_country: Optional[str]) -> Tuple[bool, str]:
|
|
"""Check if Google Maps enrichment is for wrong country."""
|
|
if not enrichment or not expected_country:
|
|
return False, ""
|
|
|
|
address = enrichment.get('formatted_address', '')
|
|
if not address:
|
|
return False, ""
|
|
|
|
detected_country = extract_country_from_address(address)
|
|
|
|
if detected_country and detected_country != expected_country:
|
|
return True, f"Google Maps address '{address}' is in {detected_country}, expected {expected_country}"
|
|
|
|
return False, ""
|
|
|
|
|
|
def fix_file(filepath: str, dry_run: bool = True) -> dict:
|
|
"""
|
|
Fix bad enrichments in a single file.
|
|
|
|
Returns dict with:
|
|
- fixed: bool - whether file was fixed
|
|
- wikidata_bad: bool - had bad wikidata
|
|
- google_bad: bool - had bad google maps
|
|
- reasons: list of reasons
|
|
- ch_types: list of CH-Annotator types detected
|
|
"""
|
|
result = {
|
|
'fixed': False,
|
|
'wikidata_bad': False,
|
|
'google_bad': False,
|
|
'reasons': [],
|
|
'ch_types': []
|
|
}
|
|
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
print(f" ERROR reading {filepath}: {e}")
|
|
return result
|
|
|
|
if not data or not isinstance(data, dict):
|
|
return result
|
|
|
|
expected_country = extract_country_from_ghcid(filepath)
|
|
modified = False
|
|
|
|
# Initialize removed_bad_enrichments section if needed
|
|
if 'removed_bad_enrichments' not in data:
|
|
data['removed_bad_enrichments'] = []
|
|
|
|
# Check Wikidata enrichment
|
|
wikidata_enrichment = data.get('wikidata_enrichment')
|
|
if wikidata_enrichment:
|
|
is_bad, reason, ch_type = is_bad_wikidata_enrichment(wikidata_enrichment, expected_country)
|
|
if is_bad:
|
|
result['wikidata_bad'] = True
|
|
result['reasons'].append(reason)
|
|
result['ch_types'].append(ch_type)
|
|
|
|
# Move to removed section (following AGENTS.md Rule 5)
|
|
data['removed_bad_enrichments'].append({
|
|
'type': 'wikidata_enrichment',
|
|
'reason': reason,
|
|
'ch_annotator_type': ch_type,
|
|
'validation_convention': 'ch_annotator-v1_7_0',
|
|
'removal_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'original_data': wikidata_enrichment
|
|
})
|
|
del data['wikidata_enrichment']
|
|
modified = True
|
|
print(f" Wikidata [{ch_type}]: {reason}")
|
|
|
|
# Check Google Maps enrichment
|
|
google_enrichment = data.get('google_maps_enrichment')
|
|
if google_enrichment:
|
|
is_bad, reason = is_bad_google_maps_enrichment(google_enrichment, expected_country)
|
|
if is_bad:
|
|
result['google_bad'] = True
|
|
result['reasons'].append(reason)
|
|
|
|
# Move to removed section
|
|
data['removed_bad_enrichments'].append({
|
|
'type': 'google_maps_enrichment',
|
|
'reason': reason,
|
|
'ch_annotator_type': 'WRONG_CONTEXT',
|
|
'validation_convention': 'ch_annotator-v1_7_0',
|
|
'removal_timestamp': datetime.now(timezone.utc).isoformat(),
|
|
'original_data': google_enrichment
|
|
})
|
|
del data['google_maps_enrichment']
|
|
modified = True
|
|
print(f" Google Maps: {reason}")
|
|
|
|
# Clean up empty removed_bad_enrichments section
|
|
if not data['removed_bad_enrichments']:
|
|
del data['removed_bad_enrichments']
|
|
|
|
# Add provenance note
|
|
if modified:
|
|
if 'provenance' not in data:
|
|
data['provenance'] = {}
|
|
if 'notes' not in data['provenance']:
|
|
data['provenance']['notes'] = []
|
|
elif isinstance(data['provenance']['notes'], str):
|
|
# Convert string notes to list
|
|
data['provenance']['notes'] = [data['provenance']['notes']]
|
|
|
|
data['provenance']['notes'].append(
|
|
f"Bad enrichment(s) removed {datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ')} "
|
|
f"via CH-Annotator validation (ch_annotator-v1_7_0): "
|
|
+ "; ".join(result['reasons'])
|
|
)
|
|
|
|
result['fixed'] = True
|
|
|
|
if not dry_run:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
return result
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Fix bad enrichments using CH-Annotator entity type validation'
|
|
)
|
|
parser.add_argument('--dry-run', action='store_true', default=True,
|
|
help='Show what would be fixed without making changes (default: True)')
|
|
parser.add_argument('--apply', action='store_true',
|
|
help='Actually apply the fixes')
|
|
parser.add_argument('--path', type=str, default='data/custodian',
|
|
help='Path to custodian files directory')
|
|
parser.add_argument('--file', type=str,
|
|
help='Fix a single file instead of scanning directory')
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Show all files being processed')
|
|
|
|
args = parser.parse_args()
|
|
|
|
dry_run = not args.apply
|
|
|
|
print("=" * 70)
|
|
print("CH-ANNOTATOR ENTITY TYPE VALIDATION")
|
|
print("Convention: ch_annotator-v1_7_0")
|
|
print("Expected type: GRP.HER (Heritage Custodian)")
|
|
print("=" * 70)
|
|
|
|
if dry_run:
|
|
print("\n*** DRY RUN MODE (use --apply to make changes) ***\n")
|
|
else:
|
|
print("\n*** APPLYING FIXES ***\n")
|
|
|
|
stats = {
|
|
'total_scanned': 0,
|
|
'wikidata_bad': 0,
|
|
'google_bad': 0,
|
|
'fixed': 0,
|
|
'by_ch_type': {},
|
|
'files_with_issues': []
|
|
}
|
|
|
|
if args.file:
|
|
files = [args.file]
|
|
else:
|
|
custodian_dir = Path(args.path)
|
|
files = sorted(custodian_dir.glob('*.yaml'))
|
|
|
|
for filepath in files:
|
|
filepath = str(filepath)
|
|
stats['total_scanned'] += 1
|
|
|
|
if args.verbose:
|
|
print(f"Processing: {os.path.basename(filepath)}")
|
|
|
|
result = fix_file(filepath, dry_run=dry_run)
|
|
|
|
if result['wikidata_bad']:
|
|
stats['wikidata_bad'] += 1
|
|
if result['google_bad']:
|
|
stats['google_bad'] += 1
|
|
if result['fixed']:
|
|
stats['fixed'] += 1
|
|
stats['files_with_issues'].append({
|
|
'file': filepath,
|
|
'reasons': result['reasons'],
|
|
'ch_types': result['ch_types']
|
|
})
|
|
for ch_type in result['ch_types']:
|
|
stats['by_ch_type'][ch_type] = stats['by_ch_type'].get(ch_type, 0) + 1
|
|
|
|
print("\n" + "=" * 70)
|
|
print("SUMMARY")
|
|
print("=" * 70)
|
|
print(f"Total files scanned: {stats['total_scanned']}")
|
|
print(f"Bad Wikidata enrichments: {stats['wikidata_bad']}")
|
|
print(f"Bad Google Maps enrichments: {stats['google_bad']}")
|
|
print(f"Files to fix: {stats['fixed']}")
|
|
|
|
if stats['by_ch_type']:
|
|
print(f"\nBy CH-Annotator type:")
|
|
for ch_type, count in sorted(stats['by_ch_type'].items(), key=lambda x: -x[1]):
|
|
print(f" {ch_type}: {count}")
|
|
|
|
if stats['files_with_issues']:
|
|
print(f"\nFiles with issues:")
|
|
for item in stats['files_with_issues']:
|
|
print(f"\n {os.path.basename(item['file'])}")
|
|
for reason in item['reasons']:
|
|
print(f" → {reason}")
|
|
|
|
if dry_run and stats['fixed'] > 0:
|
|
print(f"\n*** Run with --apply to fix these {stats['fixed']} files ***")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|