#!/usr/bin/env python3 """ Derive CustodianName from verified web_claims with XPath provenance. This script selects the best org_name claim from the web_claims section and stores it as custodian_name, following the emic name protocol. Priority order for org_name selection: 1. og:site_name meta tag (usually clean organization name) 2. schema.org Organization name (structured data) 3. h1 tag (main heading, often institution name) 4. title tag (may have tagline/separator) The selected name becomes the official CustodianName used for GHCID generation. Usage: python scripts/derive_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--dry-run] """ import argparse import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Optional, List, Dict, Any import yaml # Directories ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries') # Priority order for extraction methods (higher = better) EXTRACTION_METHOD_PRIORITY = { 'og_site_name': 100, # Most reliable - explicitly the site/org name 'schema_org_name': 90, # Structured data from JSON-LD 'h1_tag': 70, # Main heading, often institution name 'title_tag': 60, # May have tagline attached } # Generic/invalid names that should be rejected INVALID_ORG_NAMES = { 'home', 'home-nl', 'welkom', 'welcome', 'startpagina', 'homepage', 'default', 'default icon', 'untitled', 'index', 'main', 'facebook', 'instagram', 'linkedin', 'twitter', 'youtube', 'externe-link-icoon', 'verplicht', 'website', # Navigation/page section names 'adresgegevens', 'contact', 'contactgegevens', 'over ons', 'about us', 'nieuws', 'news', 'nieuwsberichten', 'agenda', 'kalender', 'activiteiten', 'events', 'evenementen', 'programma', 'nieuwe berichten', 'actueel', 'contact extranet', 'jaarprogramma', 'archief', 'archieven', 'publicaties', 'documenten', 'informatiepunt', 'informatie', 'bezoek', 'collectie', 'collecties', } def is_valid_org_name(name: str) -> bool: """ Check if an extracted org_name is actually valid. Rejects generic website terms, navigation elements, social media links. """ if not name: return False # Normalize for comparison normalized = name.strip().lower() # Reject known invalid names if normalized in INVALID_ORG_NAMES: return False # Reject very short names (likely navigation/button text) if len(normalized) < 3: return False # Reject if it's just "Website X" or "Startpagina X" if normalized.startswith('website ') or normalized.startswith('startpagina '): return False # Reject if it starts with generic action/greeting words generic_starts = [ 'wil jij', 'click', 'klik', 'meer info', 'lees meer', 'welkom bij', 'welkom in', 'welkom op', 'welkom', # Welcome messages 'onderzoeksagenda', 'jaarverslag', 'nieuwsbrief', # Publication titles ] for gs in generic_starts: if normalized.startswith(gs): return False return True def has_tagline(name: str) -> bool: """ Check if name appears to have a tagline/subtitle appended. Tagline indicators: |, -, comma followed by descriptive text """ if not name: return False # Check for separator characters with spaces (intentional separators) if ' | ' in name: return True # Check for comma followed by descriptive text (likely tagline) if ', de ' in name.lower() or ', het ' in name.lower() or ', een ' in name.lower(): return True # Check for dash with spaces only if the second part looks like tagline if ' - ' in name: parts = name.split(' - ') if len(parts) >= 2: second = parts[1].strip().lower() # If second part starts with article/preposition, likely tagline if second.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')): return True # If second part is much longer, likely tagline if len(parts[1]) > len(parts[0]) * 1.5: return True return False def extract_name_without_tagline(name: str) -> str: """ Extract the main organization name, stripping any tagline. """ if not name: return "" # Try pipe separator first (most explicit) if ' | ' in name: return name.split(' | ')[0].strip() # Try comma with article (e.g., "Museum, de beste plek...") lower = name.lower() for pattern in [', de ', ', het ', ', een ', ', jouw ', ', your ', ', the ']: if pattern in lower: idx = lower.find(pattern) return name[:idx].strip() # Try dash with spaces if ' - ' in name: parts = name.split(' - ') second_lower = parts[1].strip().lower() if len(parts) > 1 else '' # Only strip if second part looks like tagline if second_lower.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')): return parts[0].strip() if len(parts) > 1 and len(parts[1]) > len(parts[0]) * 1.5: return parts[0].strip() # Try other dash variants for sep in [' – ', ' — ']: if sep in name: return name.split(sep)[0].strip() return name def select_best_org_name(claims: List[Dict]) -> Optional[Dict]: """ Select the best VALID org_name claim from a list of claims. Returns the claim with highest priority extraction method, filtering out invalid/generic names. """ org_name_claims = [c for c in claims if c.get('claim_type') == 'org_name'] if not org_name_claims: return None # Filter to only valid org names valid_claims = [c for c in org_name_claims if is_valid_org_name(c.get('claim_value', ''))] if not valid_claims: return None # Sort by priority (highest first), then by xpath_match_score def priority_key(claim): method = claim.get('extraction_method', '') priority = EXTRACTION_METHOD_PRIORITY.get(method, 0) score = claim.get('xpath_match_score', 0) return (priority, score) sorted_claims = sorted(valid_claims, key=priority_key, reverse=True) return sorted_claims[0] def clean_org_name(name: str) -> str: """ Clean organization name for use as CustodianName. Removes common suffixes, normalizes whitespace. """ if not name: return "" # Normalize whitespace name = ' '.join(name.split()) # Remove trailing punctuation name = name.strip(' -–—|:.') return name def extract_entry_number(filename: str) -> str: """Extract entry number from filename.""" match = re.match(r'^(\d+)', filename) return match.group(1) if match else filename.replace('.yaml', '') def process_entry(filepath: Path, dry_run: bool = False) -> tuple[bool, Optional[str], str]: """ Process a single entry file to derive CustodianName. Returns: (success, custodian_name, source_description) """ with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: return False, None, "Empty file" custodian_name = None source_desc = "" # Try 1: web_claims (highest quality if valid) web_claims = data.get('web_claims', {}) claims = web_claims.get('claims', []) if claims: best_claim = select_best_org_name(claims) if best_claim: claim_value = best_claim['claim_value'] # Handle taglines - strip them for cleaner name if has_tagline(claim_value): claim_value = extract_name_without_tagline(claim_value) custodian_name = { 'claim_type': 'custodian_name', 'claim_value': clean_org_name(claim_value), 'raw_value': best_claim.get('claim_value'), # Preserve original 'source_url': best_claim.get('source_url', ''), 'retrieved_on': best_claim.get('retrieved_on', ''), 'xpath': best_claim.get('xpath', ''), 'html_file': best_claim.get('html_file', ''), 'xpath_match_score': best_claim.get('xpath_match_score', 1.0), 'extraction_method': best_claim.get('extraction_method', ''), 'selection_method': 'priority_ranking', 'selection_priority': EXTRACTION_METHOD_PRIORITY.get(best_claim.get('extraction_method', ''), 0), 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } source_desc = f"web:{best_claim.get('extraction_method', 'unknown')}" # Try 2: wikidata_label_nl (authoritative fallback) if not custodian_name: wikidata = data.get('wikidata_enrichment', {}) name = wikidata.get('wikidata_label_nl') if name and is_valid_org_name(name): custodian_name = { 'claim_type': 'custodian_name', 'claim_value': clean_org_name(name), 'source': 'wikidata', 'wikidata_id': wikidata.get('wikidata_id', ''), 'provenance_note': 'Derived from wikidata_label_nl (web_claims had no valid org_name)', 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } source_desc = "wikidata" # Try 3: original_entry.organisatie (CSV source fallback) if not custodian_name: original = data.get('original_entry', {}) name = original.get('organisatie') if name and is_valid_org_name(name): custodian_name = { 'claim_type': 'custodian_name', 'claim_value': clean_org_name(name), 'source': 'original_entry', 'provenance_note': 'Derived from original_entry.organisatie (no valid web_claims or wikidata)', 'extraction_timestamp': datetime.now(timezone.utc).isoformat(), } source_desc = "original_entry" if not custodian_name: return False, None, "No valid org_name from any source" if not dry_run: data['custodian_name'] = custodian_name with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return True, custodian_name.get('claim_value'), source_desc def main(): parser = argparse.ArgumentParser(description='Derive CustodianName from verified web_claims') parser.add_argument('--limit', type=int, default=None, help='Limit number of entries') parser.add_argument('--entry', type=str, default=None, help='Process specific entry number') parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing') parser.add_argument('--force', action='store_true', help='Re-derive even if custodian_name exists') args = parser.parse_args() # Find entry files if args.entry: files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml')) else: files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')]) if args.limit: files = files[:args.limit] # Track statistics by source stats = { 'web:og_site_name': 0, 'web:schema_org_name': 0, 'web:h1_tag': 0, 'web:title_tag': 0, 'wikidata': 0, 'original_entry': 0, 'skipped': 0, 'failed': 0, } print(f"Processing {len(files)} entries...") print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}") print() for filepath in files: if filepath.is_dir(): continue # Skip if already has custodian_name (unless --force) if not args.force: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if data and data.get('custodian_name', {}).get('claim_value'): stats['skipped'] += 1 continue success, name, source = process_entry(filepath, dry_run=args.dry_run) if success: stats[source] = stats.get(source, 0) + 1 print(f" ✓ {filepath.name}: {name} [{source}]") else: stats['failed'] += 1 print(f" ✗ {filepath.name}: {source}") print() print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:") print(f" From web og:site_name: {stats.get('web:og_site_name', 0)}") print(f" From web schema.org: {stats.get('web:schema_org_name', 0)}") print(f" From web h1 tag: {stats.get('web:h1_tag', 0)}") print(f" From web title tag: {stats.get('web:title_tag', 0)}") print(f" From Wikidata: {stats.get('wikidata', 0)}") print(f" From original entry: {stats.get('original_entry', 0)}") print(f" Skipped (already have name): {stats.get('skipped', 0)}") print(f" Failed (no sources): {stats.get('failed', 0)}") total_derived = sum(v for k, v in stats.items() if k not in ('skipped', 'failed')) print(f"\n TOTAL DERIVED: {total_derived}") return 0 if __name__ == '__main__': sys.exit(main())