377 lines
13 KiB
Python
377 lines
13 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Derive CustodianName from verified web_claims with XPath provenance.
|
||
|
||
This script selects the best org_name claim from the web_claims section
|
||
and stores it as custodian_name, following the emic name protocol.
|
||
|
||
Priority order for org_name selection:
|
||
1. og:site_name meta tag (usually clean organization name)
|
||
2. schema.org Organization name (structured data)
|
||
3. h1 tag (main heading, often institution name)
|
||
4. title tag (may have tagline/separator)
|
||
|
||
The selected name becomes the official CustodianName used for GHCID generation.
|
||
|
||
Usage:
|
||
python scripts/derive_custodian_name.py [--limit N] [--entry ENTRY_NUM] [--dry-run]
|
||
"""
|
||
|
||
import argparse
|
||
import re
|
||
import sys
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict, Any
|
||
|
||
import yaml
|
||
|
||
|
||
# Directories
|
||
ENTRIES_DIR = Path('/Users/kempersc/apps/glam/data/nde/enriched/entries')
|
||
|
||
|
||
# Priority order for extraction methods (higher = better)
|
||
EXTRACTION_METHOD_PRIORITY = {
|
||
'og_site_name': 100, # Most reliable - explicitly the site/org name
|
||
'schema_org_name': 90, # Structured data from JSON-LD
|
||
'h1_tag': 70, # Main heading, often institution name
|
||
'title_tag': 60, # May have tagline attached
|
||
}
|
||
|
||
|
||
# Generic/invalid names that should be rejected
|
||
INVALID_ORG_NAMES = {
|
||
'home', 'home-nl', 'welkom', 'welcome', 'startpagina', 'homepage',
|
||
'default', 'default icon', 'untitled', 'index', 'main',
|
||
'facebook', 'instagram', 'linkedin', 'twitter', 'youtube',
|
||
'externe-link-icoon', 'verplicht', 'website',
|
||
# Navigation/page section names
|
||
'adresgegevens', 'contact', 'contactgegevens', 'over ons', 'about us',
|
||
'nieuws', 'news', 'nieuwsberichten', 'agenda', 'kalender',
|
||
'activiteiten', 'events', 'evenementen', 'programma',
|
||
'nieuwe berichten', 'actueel', 'contact extranet', 'jaarprogramma',
|
||
'archief', 'archieven', 'publicaties', 'documenten',
|
||
'informatiepunt', 'informatie', 'bezoek', 'collectie', 'collecties',
|
||
}
|
||
|
||
|
||
def is_valid_org_name(name: str) -> bool:
|
||
"""
|
||
Check if an extracted org_name is actually valid.
|
||
|
||
Rejects generic website terms, navigation elements, social media links.
|
||
"""
|
||
if not name:
|
||
return False
|
||
|
||
# Normalize for comparison
|
||
normalized = name.strip().lower()
|
||
|
||
# Reject known invalid names
|
||
if normalized in INVALID_ORG_NAMES:
|
||
return False
|
||
|
||
# Reject very short names (likely navigation/button text)
|
||
if len(normalized) < 3:
|
||
return False
|
||
|
||
# Reject if it's just "Website X" or "Startpagina X"
|
||
if normalized.startswith('website ') or normalized.startswith('startpagina '):
|
||
return False
|
||
|
||
# Reject if it starts with generic action/greeting words
|
||
generic_starts = [
|
||
'wil jij', 'click', 'klik', 'meer info', 'lees meer',
|
||
'welkom bij', 'welkom in', 'welkom op', 'welkom', # Welcome messages
|
||
'onderzoeksagenda', 'jaarverslag', 'nieuwsbrief', # Publication titles
|
||
]
|
||
for gs in generic_starts:
|
||
if normalized.startswith(gs):
|
||
return False
|
||
|
||
return True
|
||
|
||
|
||
def has_tagline(name: str) -> bool:
|
||
"""
|
||
Check if name appears to have a tagline/subtitle appended.
|
||
|
||
Tagline indicators: |, -, comma followed by descriptive text
|
||
"""
|
||
if not name:
|
||
return False
|
||
|
||
# Check for separator characters with spaces (intentional separators)
|
||
if ' | ' in name:
|
||
return True
|
||
|
||
# Check for comma followed by descriptive text (likely tagline)
|
||
if ', de ' in name.lower() or ', het ' in name.lower() or ', een ' in name.lower():
|
||
return True
|
||
|
||
# Check for dash with spaces only if the second part looks like tagline
|
||
if ' - ' in name:
|
||
parts = name.split(' - ')
|
||
if len(parts) >= 2:
|
||
second = parts[1].strip().lower()
|
||
# If second part starts with article/preposition, likely tagline
|
||
if second.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')):
|
||
return True
|
||
# If second part is much longer, likely tagline
|
||
if len(parts[1]) > len(parts[0]) * 1.5:
|
||
return True
|
||
|
||
return False
|
||
|
||
|
||
def extract_name_without_tagline(name: str) -> str:
|
||
"""
|
||
Extract the main organization name, stripping any tagline.
|
||
"""
|
||
if not name:
|
||
return ""
|
||
|
||
# Try pipe separator first (most explicit)
|
||
if ' | ' in name:
|
||
return name.split(' | ')[0].strip()
|
||
|
||
# Try comma with article (e.g., "Museum, de beste plek...")
|
||
lower = name.lower()
|
||
for pattern in [', de ', ', het ', ', een ', ', jouw ', ', your ', ', the ']:
|
||
if pattern in lower:
|
||
idx = lower.find(pattern)
|
||
return name[:idx].strip()
|
||
|
||
# Try dash with spaces
|
||
if ' - ' in name:
|
||
parts = name.split(' - ')
|
||
second_lower = parts[1].strip().lower() if len(parts) > 1 else ''
|
||
# Only strip if second part looks like tagline
|
||
if second_lower.startswith(('de ', 'het ', 'een ', 'jouw ', 'your ', 'the ')):
|
||
return parts[0].strip()
|
||
if len(parts) > 1 and len(parts[1]) > len(parts[0]) * 1.5:
|
||
return parts[0].strip()
|
||
|
||
# Try other dash variants
|
||
for sep in [' – ', ' — ']:
|
||
if sep in name:
|
||
return name.split(sep)[0].strip()
|
||
|
||
return name
|
||
|
||
|
||
def select_best_org_name(claims: List[Dict]) -> Optional[Dict]:
|
||
"""
|
||
Select the best VALID org_name claim from a list of claims.
|
||
|
||
Returns the claim with highest priority extraction method,
|
||
filtering out invalid/generic names.
|
||
"""
|
||
org_name_claims = [c for c in claims if c.get('claim_type') == 'org_name']
|
||
|
||
if not org_name_claims:
|
||
return None
|
||
|
||
# Filter to only valid org names
|
||
valid_claims = [c for c in org_name_claims if is_valid_org_name(c.get('claim_value', ''))]
|
||
|
||
if not valid_claims:
|
||
return None
|
||
|
||
# Sort by priority (highest first), then by xpath_match_score
|
||
def priority_key(claim):
|
||
method = claim.get('extraction_method', '')
|
||
priority = EXTRACTION_METHOD_PRIORITY.get(method, 0)
|
||
score = claim.get('xpath_match_score', 0)
|
||
return (priority, score)
|
||
|
||
sorted_claims = sorted(valid_claims, key=priority_key, reverse=True)
|
||
return sorted_claims[0]
|
||
|
||
|
||
def clean_org_name(name: str) -> str:
|
||
"""
|
||
Clean organization name for use as CustodianName.
|
||
|
||
Removes common suffixes, normalizes whitespace.
|
||
"""
|
||
if not name:
|
||
return ""
|
||
|
||
# Normalize whitespace
|
||
name = ' '.join(name.split())
|
||
|
||
# Remove trailing punctuation
|
||
name = name.strip(' -–—|:.')
|
||
|
||
return name
|
||
|
||
|
||
def extract_entry_number(filename: str) -> str:
|
||
"""Extract entry number from filename."""
|
||
match = re.match(r'^(\d+)', filename)
|
||
return match.group(1) if match else filename.replace('.yaml', '')
|
||
|
||
|
||
def process_entry(filepath: Path, dry_run: bool = False) -> tuple[bool, Optional[str], str]:
|
||
"""
|
||
Process a single entry file to derive CustodianName.
|
||
|
||
Returns: (success, custodian_name, source_description)
|
||
"""
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
if not data:
|
||
return False, None, "Empty file"
|
||
|
||
custodian_name = None
|
||
source_desc = ""
|
||
|
||
# Try 1: web_claims (highest quality if valid)
|
||
web_claims = data.get('web_claims', {})
|
||
claims = web_claims.get('claims', [])
|
||
|
||
if claims:
|
||
best_claim = select_best_org_name(claims)
|
||
|
||
if best_claim:
|
||
claim_value = best_claim['claim_value']
|
||
|
||
# Handle taglines - strip them for cleaner name
|
||
if has_tagline(claim_value):
|
||
claim_value = extract_name_without_tagline(claim_value)
|
||
|
||
custodian_name = {
|
||
'claim_type': 'custodian_name',
|
||
'claim_value': clean_org_name(claim_value),
|
||
'raw_value': best_claim.get('claim_value'), # Preserve original
|
||
'source_url': best_claim.get('source_url', ''),
|
||
'retrieved_on': best_claim.get('retrieved_on', ''),
|
||
'xpath': best_claim.get('xpath', ''),
|
||
'html_file': best_claim.get('html_file', ''),
|
||
'xpath_match_score': best_claim.get('xpath_match_score', 1.0),
|
||
'extraction_method': best_claim.get('extraction_method', ''),
|
||
'selection_method': 'priority_ranking',
|
||
'selection_priority': EXTRACTION_METHOD_PRIORITY.get(best_claim.get('extraction_method', ''), 0),
|
||
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
source_desc = f"web:{best_claim.get('extraction_method', 'unknown')}"
|
||
|
||
# Try 2: wikidata_label_nl (authoritative fallback)
|
||
if not custodian_name:
|
||
wikidata = data.get('wikidata_enrichment', {})
|
||
name = wikidata.get('wikidata_label_nl')
|
||
|
||
if name and is_valid_org_name(name):
|
||
custodian_name = {
|
||
'claim_type': 'custodian_name',
|
||
'claim_value': clean_org_name(name),
|
||
'source': 'wikidata',
|
||
'wikidata_id': wikidata.get('wikidata_id', ''),
|
||
'provenance_note': 'Derived from wikidata_label_nl (web_claims had no valid org_name)',
|
||
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
source_desc = "wikidata"
|
||
|
||
# Try 3: original_entry.organisatie (CSV source fallback)
|
||
if not custodian_name:
|
||
original = data.get('original_entry', {})
|
||
name = original.get('organisatie')
|
||
|
||
if name and is_valid_org_name(name):
|
||
custodian_name = {
|
||
'claim_type': 'custodian_name',
|
||
'claim_value': clean_org_name(name),
|
||
'source': 'original_entry',
|
||
'provenance_note': 'Derived from original_entry.organisatie (no valid web_claims or wikidata)',
|
||
'extraction_timestamp': datetime.now(timezone.utc).isoformat(),
|
||
}
|
||
source_desc = "original_entry"
|
||
|
||
if not custodian_name:
|
||
return False, None, "No valid org_name from any source"
|
||
|
||
if not dry_run:
|
||
data['custodian_name'] = custodian_name
|
||
|
||
with open(filepath, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
||
|
||
return True, custodian_name.get('claim_value'), source_desc
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description='Derive CustodianName from verified web_claims')
|
||
parser.add_argument('--limit', type=int, default=None, help='Limit number of entries')
|
||
parser.add_argument('--entry', type=str, default=None, help='Process specific entry number')
|
||
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without writing')
|
||
parser.add_argument('--force', action='store_true', help='Re-derive even if custodian_name exists')
|
||
args = parser.parse_args()
|
||
|
||
# Find entry files
|
||
if args.entry:
|
||
files = list(ENTRIES_DIR.glob(f'{args.entry}*.yaml'))
|
||
else:
|
||
files = sorted([f for f in ENTRIES_DIR.glob('*.yaml') if f.is_file() and not f.name.startswith('.')])
|
||
|
||
if args.limit:
|
||
files = files[:args.limit]
|
||
|
||
# Track statistics by source
|
||
stats = {
|
||
'web:og_site_name': 0,
|
||
'web:schema_org_name': 0,
|
||
'web:h1_tag': 0,
|
||
'web:title_tag': 0,
|
||
'wikidata': 0,
|
||
'original_entry': 0,
|
||
'skipped': 0,
|
||
'failed': 0,
|
||
}
|
||
|
||
print(f"Processing {len(files)} entries...")
|
||
print(f"Mode: {'DRY RUN' if args.dry_run else 'LIVE'}")
|
||
print()
|
||
|
||
for filepath in files:
|
||
if filepath.is_dir():
|
||
continue
|
||
|
||
# Skip if already has custodian_name (unless --force)
|
||
if not args.force:
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
if data and data.get('custodian_name', {}).get('claim_value'):
|
||
stats['skipped'] += 1
|
||
continue
|
||
|
||
success, name, source = process_entry(filepath, dry_run=args.dry_run)
|
||
|
||
if success:
|
||
stats[source] = stats.get(source, 0) + 1
|
||
print(f" ✓ {filepath.name}: {name} [{source}]")
|
||
else:
|
||
stats['failed'] += 1
|
||
print(f" ✗ {filepath.name}: {source}")
|
||
|
||
print()
|
||
print(f"{'DRY RUN - ' if args.dry_run else ''}Summary:")
|
||
print(f" From web og:site_name: {stats.get('web:og_site_name', 0)}")
|
||
print(f" From web schema.org: {stats.get('web:schema_org_name', 0)}")
|
||
print(f" From web h1 tag: {stats.get('web:h1_tag', 0)}")
|
||
print(f" From web title tag: {stats.get('web:title_tag', 0)}")
|
||
print(f" From Wikidata: {stats.get('wikidata', 0)}")
|
||
print(f" From original entry: {stats.get('original_entry', 0)}")
|
||
print(f" Skipped (already have name): {stats.get('skipped', 0)}")
|
||
print(f" Failed (no sources): {stats.get('failed', 0)}")
|
||
|
||
total_derived = sum(v for k, v in stats.items() if k not in ('skipped', 'failed'))
|
||
print(f"\n TOTAL DERIVED: {total_derived}")
|
||
|
||
return 0
|
||
|
||
|
||
if __name__ == '__main__':
|
||
sys.exit(main())
|