glam/scripts/derive_custodian_name.py
2025-12-01 16:06:34 +01:00

377 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Derive CustodianName from verified web_claims with XPath provenance.
This script selects the best org_name claim from the web_claims section
and stores it as custodian_name, following the emic name protocol.
Priority order for org_name selection:
1. og:site_name meta tag (usually clean organization name)
2. schema.org Organization name (structured data)
3. h1 tag (main heading, often institution name)
4. title tag (may have tagline/separator)
The selected name becomes the official CustodianName used for GHCID generation.
Usage:
python scripts/derive_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
"""
import argparse
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional, List, Dict, Any
import yaml
# Directories
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
# Priority order for extraction methods (higher = better)
EXTRACTION_METHOD_PRIORITY = {
'og_site_name': 100, # Most reliable - explicitly the site/org name
'schema_org_name': 90, # Structured data from JSON-LD
'h1_tag': 70, # Main heading, often institution name
'title_tag': 60, # May have tagline attached
}
# Generic/invalid names that should be rejected
INVALID_ORG_NAMES = {
'home', 'home-nl', 'welkom', 'welcome', 'startpagina', 'homepage',
'default', 'default icon', 'untitled', 'index', 'main',
'facebook', 'instagram', 'linkedin', 'twitter', 'youtube',
'externe-link-icoon', 'verplicht', 'website',
# Navigation/page section names
'adresgegevens', 'contact', 'contactgegevens', 'over ons', 'about us',
'nieuws', 'news', 'nieuwsberichten', 'agenda', 'kalender',
'activiteiten', 'events', 'evenementen', 'programma',
'nieuwe berichten', 'actueel', 'contact extranet', 'jaarprogramma',
'archief', 'archieven', 'publicaties', 'documenten',
'informatiepunt', 'informatie', 'bezoek', 'collectie', 'collecties',
}
def is_valid_org_name(name: str) -> bool:
"""
Check if an extracted org_name is actually valid.
Rejects generic website terms, navigation elements, social media links.
"""
if not name:
return False
# Normalize for comparison
normalized = name.strip().lower()
# Reject known invalid names
if normalized in INVALID_ORG_NAMES:
return False
# Reject very short names (likely navigation/button text)
if len(normalized) < 3:
return False
# Reject if it's just "Website X" or "Startpagina X"
if normalized.startswith('website ') or normalized.startswith('startpagina '):
return False
# Reject if it starts with generic action/greeting words
generic_starts = [
'wil jij', 'click', 'klik', 'meer info', 'lees meer',
'welkom bij', 'welkom in', 'welkom op', 'welkom', # Welcome messages
'onderzoeksagenda', 'jaarverslag', 'nieuwsbrief', # Publication titles
]
for gs in generic_starts:
if normalized.startswith(gs):
return False
return True
def has_tagline(name: str) -> bool:
"""
Check if name appears to have a tagline/subtitle appended.
Tagline indicators: |, -, comma followed by descriptive text
"""
if not name:
return False
# Check for separator characters with spaces (intentional separators)
if ' | ' in name:
return True
# Check for comma followed by descriptive text (likely tagline)
if ', de ' in name.lower() or ', het ' in name.lower() or ', een ' in name.lower():
return True
# Check for dash with spaces only if the second part looks like tagline
if ' - ' in name:
parts = name.split(' - ')
if len(parts) >= 2:
second = parts[1].strip().lower()
# If second part starts with article/preposition, likely tagline
if second.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')):
return True
# If second part is much longer, likely tagline
if len(parts[1]) > len(parts[0]) * 1.5:
return True
return False
def extract_name_without_tagline(name: str) -> str:
"""
Extract the main organization name, stripping any tagline.
"""
if not name:
return ""
# Try pipe separator first (most explicit)
if ' | ' in name:
return name.split(' | ')[0].strip()
# Try comma with article (e.g., "Museum, de beste plek...")
lower = name.lower()
for pattern in [', de ', ', het ', ', een ', ', jouw ', ', your ', ', the ']:
if pattern in lower:
idx = lower.find(pattern)
return name[:idx].strip()
# Try dash with spaces
if ' - ' in name:
parts = name.split(' - ')
second_lower = parts[1].strip().lower() if len(parts) > 1 else ''
# Only strip if second part looks like tagline
if second_lower.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')):
return parts[0].strip()
if len(parts) > 1 and len(parts[1]) > len(parts[0]) * 1.5:
return parts[0].strip()
# Try other dash variants
for sep in [' ', '']:
if sep in name:
return name.split(sep)[0].strip()
return name
def select_best_org_name(claims: List[Dict]) -> Optional[Dict]:
"""
Select the best VALID org_name claim from a list of claims.
Returns the claim with highest priority extraction method,
filtering out invalid/generic names.
"""
org_name_claims = [c for c in claims if c.get('claim_type') == 'org_name']
if not org_name_claims:
return None
# Filter to only valid org names
valid_claims = [c for c in org_name_claims if is_valid_org_name(c.get('claim_value', ''))]
if not valid_claims:
return None
# Sort by priority (highest first), then by xpath_match_score
def priority_key(claim):
method = claim.get('extraction_method', '')
priority = EXTRACTION_METHOD_PRIORITY.get(method, 0)
score = claim.get('xpath_match_score', 0)
return (priority, score)
sorted_claims = sorted(valid_claims, key=priority_key, reverse=True)
return sorted_claims[0]
def clean_org_name(name: str) -> str:
"""
Clean organization name for use as CustodianName.
Removes common suffixes, normalizes whitespace.
"""
if not name:
return ""
# Normalize whitespace
name = ' '.join(name.split())
# Remove trailing punctuation
name = name.strip(' -–—|:.')
return name
def extract_entry_number(filename: str) -> str:
"""Extract entry number from filename."""
match = re.match(r'^(\d+)', filename)
return match.group(1) if match else filename.replace('.yaml', '')
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[bool, Optional[str], str]:
"""
Process a single entry file to derive CustodianName.
Returns: (success, custodian_name, source_description)
"""
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return False, None, "Empty file"
custodian_name = None
source_desc = ""
# Try 1: web_claims (highest quality if valid)
web_claims = data.get('web_claims', {})
claims = web_claims.get('claims', [])
if claims:
best_claim = select_best_org_name(claims)
if best_claim:
claim_value = best_claim['claim_value']
# Handle taglines - strip them for cleaner name
if has_tagline(claim_value):
claim_value = extract_name_without_tagline(claim_value)
custodian_name = {
'claim_type': 'custodian_name',
'claim_value': clean_org_name(claim_value),
'raw_value': best_claim.get('claim_value'), # Preserve original
'source_url': best_claim.get('source_url', ''),
'retrieved_on': best_claim.get('retrieved_on', ''),
'xpath': best_claim.get('xpath', ''),
'html_file': best_claim.get('html_file', ''),
'xpath_match_score': best_claim.get('xpath_match_score', 1.0),
'extraction_method': best_claim.get('extraction_method', ''),
'selection_method': 'priority_ranking',
'selection_priority': EXTRACTION_METHOD_PRIORITY.get(best_claim.get('extraction_method', ''), 0),
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
source_desc = f"web:{best_claim.get('extraction_method', 'unknown')}"
# Try 2: wikidata_label_nl (authoritative fallback)
if not custodian_name:
wikidata = data.get('wikidata_enrichment', {})
name = wikidata.get('wikidata_label_nl')
if name and is_valid_org_name(name):
custodian_name = {
'claim_type': 'custodian_name',
'claim_value': clean_org_name(name),
'source': 'wikidata',
'wikidata_id': wikidata.get('wikidata_id', ''),
'provenance_note': 'Derived from wikidata_label_nl (web_claims had no valid org_name)',
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
source_desc = "wikidata"
# Try 3: original_entry.organisatie (CSV source fallback)
if not custodian_name:
original = data.get('original_entry', {})
name = original.get('organisatie')
if name and is_valid_org_name(name):
custodian_name = {
'claim_type': 'custodian_name',
'claim_value': clean_org_name(name),
'source': 'original_entry',
'provenance_note': 'Derived from original_entry.organisatie (no valid web_claims or wikidata)',
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
}
source_desc = "original_entry"
if not custodian_name:
return False, None, "No valid org_name from any source"
if not dry_run:
data['custodian_name'] = custodian_name
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True, custodian_name.get('claim_value'), source_desc
def main():
parser = argparse.ArgumentParser(description='Derive CustodianName from verified web_claims')
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
parser.add_argument('--force', action='store_true', help='Re-derive even if custodian_name exists')
args = parser.parse_args()
# Find entry files
if args.entry:
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
else:
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
if args.limit:
files = files[:args.limit]
# Track statistics by source
stats = {
'web:og_site_name': 0,
'web:schema_org_name': 0,
'web:h1_tag': 0,
'web:title_tag': 0,
'wikidata': 0,
'original_entry': 0,
'skipped': 0,
'failed': 0,
}
print(f"Processing {len(files)} entries...")
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
print()
for filepath in files:
if filepath.is_dir():
continue
# Skip if already has custodian_name (unless --force)
if not args.force:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if data and data.get('custodian_name', {}).get('claim_value'):
stats['skipped'] += 1
continue
success, name, source = process_entry(filepath, dry_run=args.dry_run)
if success:
stats[source] = stats.get(source, 0) + 1
print(f"{filepath.name}: {name} [{source}]")
else:
stats['failed'] += 1
print(f"{filepath.name}: {source}")
print()
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
print(f" From web og:site_name: {stats.get('web:og_site_name', 0)}")
print(f" From web schema.org: {stats.get('web:schema_org_name', 0)}")
print(f" From web h1 tag: {stats.get('web:h1_tag', 0)}")
print(f" From web title tag: {stats.get('web:title_tag', 0)}")
print(f" From Wikidata: {stats.get('wikidata', 0)}")
print(f" From original entry: {stats.get('original_entry', 0)}")
print(f" Skipped (already have name): {stats.get('skipped', 0)}")
print(f" Failed (no sources): {stats.get('failed', 0)}")
total_derived = sum(v for k, v in stats.items() if k not in ('skipped', 'failed'))
print(f"\n TOTAL DERIVED: {total_derived}")
return 0
if __name__ == '__main__':
sys.exit(main())