feat(scripts): Add batch crawling and data quality scripts

- batch_crawl4ai_recrawl.py: Retry failed URL crawls
- batch_firecrawl_recrawl.py: FireCrawl batch processing
- batch_httpx_scrape.py: HTTPX-based scraping
- detect_name_mismatch.py: Find name mismatches in data
- enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment
- fix_collision_victims.py: GHCID collision resolution
- fix_generic_platform_names*.py: Platform name cleanup
- fix_ghcid_type.py: GHCID type corrections
- fix_simon_kemper_contamination.py: Data cleanup
- scan_dutch_data_quality.py: Data quality scanning
- transform_crawl4ai_to_digital_platform.py: Data transformation
This commit is contained in:
kempersc 2025-12-15 01:47:46 +01:00
parent 70c30a52d4
commit 0c36429257
15 changed files with 4881 additions and 11 deletions

View file

@ -0,0 +1,371 @@
#!/usr/bin/env python3
"""
Batch re-crawl failed URLs using crawl4ai (free, local) and transform to digital_platform_v2.
This script:
1. Reads the list of failed crawl URLs
2. Uses crawl4ai to fetch content (free, no API limits)
3. Transforms results to digital_platform_v2 format
4. Updates the custodian YAML files
Usage:
python scripts/batch_crawl4ai_recrawl.py --limit 100 --start 0
"""
import argparse
import asyncio
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import yaml
from crawl4ai import AsyncWebCrawler
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
# Platform type detection patterns
PLATFORM_PATTERNS = {
'DISCOVERY_PORTAL': [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/archief', r'/archive',
r'/beeldbank', r'/images', r'/foto', r'/photo',
],
'DIGITAL_ARCHIVE': [
r'archieven\.nl', r'archief', r'archive',
r'/inventaris', r'/inventory', r'/toegang',
],
'EDUCATION': [
r'/educatie', r'/education', r'/onderwijs', r'/leren',
r'/scholen', r'/schools', r'/lesmateriaal',
],
'INSTITUTIONAL_WEBSITE': [
r'/over-ons', r'/about', r'/contact', r'/bezoek',
r'/visit', r'/openingstijden', r'/hours',
],
}
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
"""Detect the platform type based on URL patterns and extracted links."""
url_lower = url.lower()
all_urls = [url_lower] + [l.lower() for l in (links or [])]
for platform_type, patterns in PLATFORM_PATTERNS.items():
for pattern in patterns:
for check_url in all_urls:
if re.search(pattern, check_url):
return platform_type
return 'INSTITUTIONAL_WEBSITE'
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
"""Extract URLs that appear to be collection/catalog pages."""
collection_patterns = [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
]
collection_urls = []
base_domain = urlparse(base_url).netloc
for link in links:
try:
parsed = urlparse(link)
if base_domain in parsed.netloc or parsed.netloc in base_domain:
for pattern in collection_patterns:
if re.search(pattern, link.lower()):
if link not in collection_urls:
collection_urls.append(link)
break
except Exception:
continue
return collection_urls[:10]
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict]:
"""Extract external platform links (aggregators, portals, etc.)."""
external_patterns = {
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
}
base_domain = urlparse(base_url).netloc
auxiliary = []
seen_domains = set()
for link in links:
try:
parsed = urlparse(link)
domain = parsed.netloc.replace('www.', '')
if base_domain in domain or domain in base_domain:
continue
for pattern, info in external_patterns.items():
if pattern in domain and domain not in seen_domains:
seen_domains.add(domain)
auxiliary.append({
'platform_name': info['name'],
'platform_url': link,
'platform_type': info['type'],
'integration_type': 'external_aggregator',
})
break
except Exception:
continue
return auxiliary[:5]
def is_generic_title(title: str) -> bool:
"""Check if a title is too generic to use as platform name."""
generic_patterns = [
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
'index', 'main', 'website', 'webpagina', 'homepagina',
]
if not title:
return True
title_lower = title.lower().strip()
for pattern in generic_patterns:
if title_lower == pattern or title_lower.startswith(f"{pattern} -") or title_lower.startswith(f"{pattern} |"):
return True
return len(title) < 3
def transform_to_platform_v2(crawl_result, source_url: str, org_name: str) -> dict[str, Any]:
"""Transform crawl4ai result to digital_platform_v2 format."""
metadata = crawl_result.metadata or {}
# Get internal links
internal_links = []
if crawl_result.links:
internal_links = [l.get('href', '') for l in crawl_result.links.get('internal', []) if l.get('href')]
# Extract title, checking for generic titles
candidate_titles = [
metadata.get('og:title'),
metadata.get('title', '').split(' - ')[0].strip(),
metadata.get('title', '').split(' | ')[0].strip(),
metadata.get('og:site_name'),
]
title = org_name # Default fallback
for candidate in candidate_titles:
if candidate and not is_generic_title(candidate):
title = candidate
break
# Generate platform ID
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
platform_id = f"primary_website_{domain}"
# Detect platform type
platform_type = detect_platform_type(source_url, internal_links)
# Extract collection URLs
collection_urls = extract_collection_urls(internal_links, source_url)
# Extract auxiliary platforms
auxiliary_platforms = extract_auxiliary_platforms(internal_links, source_url)
# Build digital_platform_v2 structure
platform_v2: dict[str, Any] = {
'transformation_metadata': {
'transformed_from': 'crawl4ai_recrawl',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'transformation_version': '2.0',
'source_status_code': crawl_result.status_code,
},
'primary_platform': {
'platform_id': platform_id,
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
'platform_url': source_url,
'platform_type': platform_type,
'description': metadata.get('description') or metadata.get('og:description', ''),
'language': metadata.get('language', 'nl'),
'og_image': metadata.get('og:image'),
'favicon': metadata.get('favicon'),
},
}
if collection_urls:
platform_v2['primary_platform']['collection_urls'] = collection_urls
if auxiliary_platforms:
platform_v2['auxiliary_platforms'] = auxiliary_platforms
if internal_links:
platform_v2['navigation_links'] = internal_links[:20]
return platform_v2
def update_custodian_file(filepath: Path, platform_v2: dict) -> bool:
"""Update a custodian YAML file with digital_platform_v2 data."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data is None:
data = {}
data['digital_platform_v2'] = platform_v2
if 'crawl4ai_enrichment' in data:
data['crawl4ai_enrichment']['recrawled_with'] = 'crawl4ai_v2'
data['crawl4ai_enrichment']['recrawl_date'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
def load_failed_urls() -> list[tuple[str, str]]:
"""Load the list of failed URLs with their file paths."""
urls = []
with open(FAILED_URLS_FILE, 'r') as f:
for line in f:
line = line.strip()
if '\t' in line:
filename, url = line.split('\t', 1)
urls.append((filename, url))
return urls
def get_org_name(filepath: Path) -> str:
"""Extract organization name from custodian file."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data:
if 'original_entry' in data and data['original_entry'].get('organisatie'):
return data['original_entry']['organisatie']
if 'custodian_name' in data:
return data['custodian_name'].get('emic_name', '') or data['custodian_name'].get('preferred_name', '')
if 'name' in data:
return data['name']
stem = filepath.stem
parts = stem.split('-')
return parts[-1] if parts else stem
except Exception:
return filepath.stem
async def scrape_single_url(crawler: AsyncWebCrawler, url: str) -> Any:
"""Scrape a single URL using crawl4ai."""
try:
result = await crawler.arun(url, verbose=False)
if result.success:
return result
print(f" Crawl failed: {result.error_message}")
return None
except Exception as e:
print(f" Exception: {e}")
return None
async def main_async(args):
"""Async main function."""
all_urls = load_failed_urls()
print(f"Loaded {len(all_urls)} failed URLs")
if args.limit > 0:
urls_to_process = all_urls[args.start:args.start + args.limit]
else:
urls_to_process = all_urls[args.start:]
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
if args.dry_run:
print("\n[DRY RUN MODE - No changes will be made]")
for filename, url in urls_to_process[:10]:
print(f" Would scrape: {filename} -> {url}")
print(f" ... and {len(urls_to_process) - 10} more")
return
success_count = 0
fail_count = 0
skip_count = 0
async with AsyncWebCrawler(verbose=False) as crawler:
for i, (filename, url) in enumerate(urls_to_process):
filepath = CUSTODIAN_DIR / filename
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
print(f" URL: {url}")
if not filepath.exists():
print(f" SKIP: File not found")
skip_count += 1
continue
# Check if already has digital_platform_v2
with open(filepath, 'r') as f:
content = f.read()
if 'digital_platform_v2:' in content:
print(f" SKIP: Already has digital_platform_v2")
skip_count += 1
continue
org_name = get_org_name(filepath)
result = await scrape_single_url(crawler, url)
if result:
platform_v2 = transform_to_platform_v2(result, url, org_name)
if update_custodian_file(filepath, platform_v2):
success_count += 1
print(f" SUCCESS: {platform_v2['primary_platform']['platform_name']}")
else:
fail_count += 1
else:
fail_count += 1
print(f" FAILED: Could not scrape URL")
# Small delay to be polite
await asyncio.sleep(args.delay)
if (i + 1) % 50 == 0:
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, skip={skip_count}, fail={fail_count}) ===\n")
print(f"\n=== Final Results ===")
print(f"Success: {success_count}")
print(f"Skipped: {skip_count}")
print(f"Failed: {fail_count}")
print(f"Total: {len(urls_to_process)}")
def main():
parser = argparse.ArgumentParser(description='Batch re-crawl failed URLs with crawl4ai')
parser.add_argument('--start', type=int, default=0, help='Starting index')
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
parser.add_argument('--delay', type=float, default=0.5, help='Delay between requests in seconds')
args = parser.parse_args()
asyncio.run(main_async(args))
if __name__ == '__main__':
main()

View file

@ -0,0 +1,434 @@
#!/usr/bin/env python3
"""
Batch re-crawl failed URLs using Firecrawl and transform to digital_platform_v2.
This script:
1. Reads the list of failed crawl URLs
2. Uses Firecrawl batch_scrape or individual scrape to fetch content
3. Transforms results to digital_platform_v2 format
4. Updates the custodian YAML files
Usage:
python scripts/batch_firecrawl_recrawl.py --batch-size 50 --start 0
Firecrawl API reference: https://docs.firecrawl.dev/api-reference/endpoint/scrape
"""
import argparse
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import httpx
import yaml
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
FIRECRAWL_API_KEY = os.environ.get("FIRECRAWL_API_KEY", "")
FIRECRAWL_BASE_URL = "https://api.firecrawl.dev/v1"
# Platform type detection patterns
PLATFORM_PATTERNS = {
'DISCOVERY_PORTAL': [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/archief', r'/archive',
r'/beeldbank', r'/images', r'/foto', r'/photo',
],
'DIGITAL_ARCHIVE': [
r'archieven\.nl', r'archief', r'archive',
r'/inventaris', r'/inventory', r'/toegang',
],
'EDUCATION': [
r'/educatie', r'/education', r'/onderwijs', r'/leren',
r'/scholen', r'/schools', r'/lesmateriaal',
],
'INSTITUTIONAL_WEBSITE': [
r'/over-ons', r'/about', r'/contact', r'/bezoek',
r'/visit', r'/openingstijden', r'/hours',
],
}
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
"""Detect the platform type based on URL patterns and extracted links."""
url_lower = url.lower()
all_urls = [url_lower] + [l.lower() for l in (links or [])]
for platform_type, patterns in PLATFORM_PATTERNS.items():
for pattern in patterns:
for check_url in all_urls:
if re.search(pattern, check_url):
return platform_type
return 'INSTITUTIONAL_WEBSITE'
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
"""Extract URLs that appear to be collection/catalog pages."""
collection_patterns = [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
]
collection_urls = []
base_domain = urlparse(base_url).netloc
for link in links:
try:
parsed = urlparse(link)
# Only include links from same domain or subdomains
if base_domain in parsed.netloc or parsed.netloc in base_domain:
for pattern in collection_patterns:
if re.search(pattern, link.lower()):
if link not in collection_urls:
collection_urls.append(link)
break
except Exception:
continue
return collection_urls[:10] # Limit to 10 collection URLs
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict]:
"""Extract external platform links (aggregators, portals, etc.)."""
external_patterns = {
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
'archiefweb.eu': {'name': 'Archiefweb', 'type': 'DIGITAL_ARCHIVE'},
}
base_domain = urlparse(base_url).netloc
auxiliary = []
seen_domains = set()
for link in links:
try:
parsed = urlparse(link)
domain = parsed.netloc.replace('www.', '')
# Skip if same domain as base URL
if base_domain in domain or domain in base_domain:
continue
# Check for known external platforms
for pattern, info in external_patterns.items():
if pattern in domain and domain not in seen_domains:
seen_domains.add(domain)
auxiliary.append({
'platform_name': info['name'],
'platform_url': link,
'platform_type': info['type'],
'integration_type': 'external_aggregator',
})
break
except Exception:
continue
return auxiliary[:5] # Limit to 5 auxiliary platforms
def is_generic_title(title: str) -> bool:
"""Check if a title is too generic to use as platform name."""
generic_patterns = [
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
'index', 'main', 'website', 'webpagina', 'web page',
]
if not title:
return True
title_lower = title.lower().strip()
# Check if title is just one of the generic patterns
for pattern in generic_patterns:
if title_lower == pattern or title_lower == f"{pattern} -" or title_lower.startswith(f"{pattern} |"):
return True
return len(title) < 3
def transform_to_platform_v2(scrape_result: dict, source_url: str, org_name: str) -> dict[str, Any]:
"""Transform Firecrawl scrape result to digital_platform_v2 format."""
metadata = scrape_result.get('metadata', {})
links = scrape_result.get('links', [])
markdown = scrape_result.get('markdown', '')
# Extract title from metadata, checking for generic titles
candidate_titles = [
metadata.get('ogTitle'),
metadata.get('title', '').split(' - ')[0].strip(),
metadata.get('title', '').split(' | ')[0].strip(),
metadata.get('og:title'),
metadata.get('ogSiteName'),
metadata.get('og:site_name'),
]
# Find first non-generic title
title = org_name # Default fallback
for candidate in candidate_titles:
if candidate and not is_generic_title(candidate):
title = candidate
break
# Generate platform ID
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
platform_id = f"primary_website_{domain}"
# Detect platform type
platform_type = detect_platform_type(source_url, links)
# Extract collection URLs
collection_urls = extract_collection_urls(links, source_url)
# Extract auxiliary platforms
auxiliary_platforms = extract_auxiliary_platforms(links, source_url)
# Build digital_platform_v2 structure
platform_v2 = {
'transformation_metadata': {
'transformed_from': 'firecrawl_scrape',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'transformation_version': '2.0',
'source_status_code': metadata.get('statusCode', 200),
},
'primary_platform': {
'platform_id': platform_id,
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
'platform_url': source_url,
'platform_type': platform_type,
'description': metadata.get('description') or metadata.get('ogDescription', ''),
'language': metadata.get('language', 'nl'),
'og_image': metadata.get('ogImage') or metadata.get('og:image'),
'favicon': metadata.get('favicon'),
},
}
# Add collection URLs if found
if collection_urls:
platform_v2['primary_platform']['collection_urls'] = collection_urls
# Add auxiliary platforms if found
if auxiliary_platforms:
platform_v2['auxiliary_platforms'] = auxiliary_platforms
# Add internal navigation links (sample)
internal_links = [
l for l in links
if urlparse(l).netloc in urlparse(source_url).netloc
][:20]
if internal_links:
platform_v2['navigation_links'] = internal_links
return platform_v2
def scrape_single_url(url: str, client: httpx.Client, max_retries: int = 3) -> dict | None:
"""Scrape a single URL using Firecrawl API with retry on rate limit."""
for attempt in range(max_retries):
try:
response = client.post(
f"{FIRECRAWL_BASE_URL}/scrape",
json={
'url': url,
'formats': ['markdown', 'links'],
'onlyMainContent': True,
},
timeout=60.0,
)
if response.status_code == 200:
data = response.json()
if data.get('success'):
return data.get('data', {})
# Handle rate limiting (429)
if response.status_code == 429:
wait_time = 15 * (attempt + 1) # 15s, 30s, 45s
print(f" Rate limited, waiting {wait_time}s (attempt {attempt + 1}/{max_retries})")
time.sleep(wait_time)
continue
print(f" Error {response.status_code}: {response.text[:200]}")
return None
except Exception as e:
print(f" Exception: {e}")
if attempt < max_retries - 1:
time.sleep(5)
continue
return None
print(f" Max retries exceeded")
return None
def update_custodian_file(filepath: Path, platform_v2: dict) -> bool:
"""Update a custodian YAML file with digital_platform_v2 data."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data is None:
data = {}
# Add digital_platform_v2 section
data['digital_platform_v2'] = platform_v2
# Update crawl4ai_enrichment status
if 'crawl4ai_enrichment' in data:
data['crawl4ai_enrichment']['recrawled_with'] = 'firecrawl'
data['crawl4ai_enrichment']['recrawl_date'] = datetime.now(timezone.utc).isoformat()
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
def load_failed_urls() -> list[tuple[str, str]]:
"""Load the list of failed URLs with their file paths."""
urls = []
with open(FAILED_URLS_FILE, 'r') as f:
for line in f:
line = line.strip()
if '\t' in line:
filename, url = line.split('\t', 1)
urls.append((filename, url))
return urls
def get_org_name(filepath: Path) -> str:
"""Extract organization name from custodian file."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
# Try different name fields
if data:
if 'original_entry' in data and data['original_entry'].get('organisatie'):
return data['original_entry']['organisatie']
if 'custodian_name' in data:
return data['custodian_name'].get('emic_name', '') or data['custodian_name'].get('preferred_name', '')
if 'name' in data:
return data['name']
# Fallback: extract from filename
stem = filepath.stem
parts = stem.split('-')
return parts[-1] if parts else stem
except Exception:
return filepath.stem
def main():
parser = argparse.ArgumentParser(description='Batch re-crawl failed URLs with Firecrawl')
parser.add_argument('--batch-size', type=int, default=50, help='Number of URLs per batch')
parser.add_argument('--start', type=int, default=0, help='Starting index')
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
parser.add_argument('--delay', type=float, default=6.0, help='Delay between requests in seconds (default 6 for rate limits)')
args = parser.parse_args()
if not FIRECRAWL_API_KEY:
print("Error: FIRECRAWL_API_KEY environment variable not set")
sys.exit(1)
# Load URLs
all_urls = load_failed_urls()
print(f"Loaded {len(all_urls)} failed URLs")
# Slice based on start and limit
if args.limit > 0:
urls_to_process = all_urls[args.start:args.start + args.limit]
else:
urls_to_process = all_urls[args.start:]
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
if args.dry_run:
print("\n[DRY RUN MODE - No changes will be made]")
for filename, url in urls_to_process[:10]:
print(f" Would scrape: {filename} -> {url}")
print(f" ... and {len(urls_to_process) - 10} more")
return
# Create HTTP client
client = httpx.Client(
headers={
'Authorization': f'Bearer {FIRECRAWL_API_KEY}',
'Content-Type': 'application/json',
}
)
success_count = 0
fail_count = 0
try:
for i, (filename, url) in enumerate(urls_to_process):
filepath = CUSTODIAN_DIR / filename
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
print(f" URL: {url}")
if not filepath.exists():
print(f" SKIP: File not found")
continue
# Check if already has digital_platform_v2
with open(filepath, 'r') as f:
content = f.read()
if 'digital_platform_v2:' in content:
print(f" SKIP: Already has digital_platform_v2")
continue
# Get org name for platform naming
org_name = get_org_name(filepath)
# Scrape URL
result = scrape_single_url(url, client)
if result:
# Transform to platform_v2
platform_v2 = transform_to_platform_v2(result, url, org_name)
# Update file
if update_custodian_file(filepath, platform_v2):
success_count += 1
print(f" SUCCESS: {platform_v2['primary_platform']['platform_name']}")
else:
fail_count += 1
else:
fail_count += 1
print(f" FAILED: Could not scrape URL")
# Rate limiting
time.sleep(args.delay)
# Progress update every 50 URLs
if (i + 1) % 50 == 0:
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, fail={fail_count}) ===\n")
finally:
client.close()
print(f"\n=== Final Results ===")
print(f"Success: {success_count}")
print(f"Failed: {fail_count}")
print(f"Total: {len(urls_to_process)}")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,488 @@
#!/usr/bin/env python3
"""
Batch web scraper using httpx + BeautifulSoup for digital_platform_v2 enrichment.
This script:
1. Reads the list of failed crawl URLs
2. Uses httpx to fetch HTML content directly (no browser, no external API)
3. Uses BeautifulSoup to parse and extract metadata
4. Transforms results to digital_platform_v2 format
5. Updates the custodian YAML files
Usage:
python scripts/batch_httpx_scrape.py --limit 10
python scripts/batch_httpx_scrape.py --start 100 --limit 50
python scripts/batch_httpx_scrape.py --dry-run
No API keys or external services required!
"""
from __future__ import annotations
import argparse
import re
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urljoin, urlparse
import httpx
import yaml
from bs4 import BeautifulSoup
# Configuration
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
FAILED_URLS_FILE = Path("/Users/kempersc/apps/glam/data/failed_crawl_urls.txt")
# User agent to mimic a real browser
USER_AGENT = (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# Platform type detection patterns
PLATFORM_PATTERNS: dict[str, list[str]] = {
'DISCOVERY_PORTAL': [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/archief', r'/archive',
r'/beeldbank', r'/images', r'/foto', r'/photo',
],
'DIGITAL_ARCHIVE': [
r'archieven\.nl', r'archief', r'archive',
r'/inventaris', r'/inventory', r'/toegang',
],
'EDUCATION': [
r'/educatie', r'/education', r'/onderwijs', r'/leren',
r'/scholen', r'/schools', r'/lesmateriaal',
],
'INSTITUTIONAL_WEBSITE': [
r'/over-ons', r'/about', r'/contact', r'/bezoek',
r'/visit', r'/openingstijden', r'/hours',
],
}
def detect_platform_type(url: str, links: list[str] | None = None) -> str:
"""Detect the platform type based on URL patterns and extracted links."""
url_lower = url.lower()
all_urls = [url_lower] + [link.lower() for link in (links or [])]
for platform_type, patterns in PLATFORM_PATTERNS.items():
for pattern in patterns:
for check_url in all_urls:
if re.search(pattern, check_url):
return platform_type
return 'INSTITUTIONAL_WEBSITE'
def extract_collection_urls(links: list[str], base_url: str) -> list[str]:
"""Extract URLs that appear to be collection/catalog pages."""
collection_patterns = [
r'/collectie', r'/collection', r'/catalogus', r'/catalog',
r'/zoeken', r'/search', r'/beeldbank', r'/inventaris',
r'/archief(?!en\.)', r'/archiefstukken', r'/toegangen',
]
collection_urls: list[str] = []
base_domain = urlparse(base_url).netloc
for link in links:
try:
parsed = urlparse(link)
if base_domain in parsed.netloc or parsed.netloc in base_domain:
for pattern in collection_patterns:
if re.search(pattern, link.lower()):
if link not in collection_urls:
collection_urls.append(link)
break
except Exception:
continue
return collection_urls[:10]
def extract_auxiliary_platforms(links: list[str], base_url: str) -> list[dict[str, str]]:
"""Extract external platform links (aggregators, portals, etc.)."""
external_patterns: dict[str, dict[str, str]] = {
'archieven.nl': {'name': 'Archieven.nl', 'type': 'AGGREGATOR'},
'europeana.eu': {'name': 'Europeana', 'type': 'AGGREGATOR'},
'collectienederland.nl': {'name': 'Collectie Nederland', 'type': 'AGGREGATOR'},
'erfgoedthesaurus.nl': {'name': 'Erfgoedthesaurus', 'type': 'THESAURUS'},
'delpher.nl': {'name': 'Delpher', 'type': 'DIGITAL_ARCHIVE'},
'geheugen.nl': {'name': 'Geheugen van Nederland', 'type': 'AGGREGATOR'},
'archiefweb.eu': {'name': 'Archiefweb', 'type': 'DIGITAL_ARCHIVE'},
}
base_domain = urlparse(base_url).netloc
auxiliary: list[dict[str, str]] = []
seen_domains: set[str] = set()
for link in links:
try:
parsed = urlparse(link)
domain = parsed.netloc.replace('www.', '')
if base_domain in domain or domain in base_domain:
continue
for pattern, info in external_patterns.items():
if pattern in domain and domain not in seen_domains:
seen_domains.add(domain)
auxiliary.append({
'platform_name': info['name'],
'platform_url': link,
'platform_type': info['type'],
'integration_type': 'external_aggregator',
})
break
except Exception:
continue
return auxiliary[:5]
def is_generic_title(title: str | None) -> bool:
"""Check if a title is too generic to use as platform name."""
generic_patterns = [
'home', 'homepage', 'welkom', 'welcome', 'startpagina',
'index', 'main', 'website', 'webpagina', 'web page',
]
if not title:
return True
title_lower = title.lower().strip()
for pattern in generic_patterns:
if title_lower == pattern or title_lower == f"{pattern} -" or title_lower.startswith(f"{pattern} |"):
return True
return len(title) < 3
def scrape_with_httpx(url: str, client: httpx.Client, timeout: float = 30.0) -> dict[str, Any] | None:
"""Scrape a URL using httpx and return parsed metadata."""
try:
response = client.get(url, timeout=timeout, follow_redirects=True)
if response.status_code != 200:
return {'error': f'HTTP {response.status_code}', 'status_code': response.status_code}
# Parse HTML
soup = BeautifulSoup(response.text, 'html.parser')
# Extract metadata
metadata: dict[str, Any] = {
'status_code': response.status_code,
'final_url': str(response.url),
}
# Title
title_tag = soup.find('title')
metadata['title'] = title_tag.get_text(strip=True) if title_tag else None
# Meta tags
for meta in soup.find_all('meta'):
name = str(meta.get('name', '')).lower()
prop = str(meta.get('property', '')).lower()
content = str(meta.get('content', ''))
if name == 'description' or prop == 'og:description':
if 'description' not in metadata or prop == 'og:description':
metadata['description'] = content
elif prop == 'og:title':
metadata['og_title'] = content
elif prop == 'og:image':
metadata['og_image'] = urljoin(url, content) if content else None
elif prop == 'og:site_name':
metadata['og_site_name'] = content
elif name == 'language' or str(meta.get('http-equiv', '')).lower() == 'content-language':
metadata['language'] = content.split(',')[0].split('-')[0]
# Detect language from html tag
html_tag = soup.find('html')
if html_tag:
lang_attr = html_tag.get('lang')
if lang_attr:
lang_str = str(lang_attr) if not isinstance(lang_attr, list) else str(lang_attr[0])
metadata['language'] = lang_str.split('-')[0]
# Favicon
for link in soup.find_all('link'):
rel = link.get('rel')
if rel is None:
rel = []
if isinstance(rel, list):
rel_str = ' '.join(str(r) for r in rel)
else:
rel_str = str(rel)
if 'icon' in rel_str.lower():
href = link.get('href')
if href:
metadata['favicon'] = urljoin(url, str(href))
break
# Extract links
links: list[str] = []
for a in soup.find_all('a', href=True):
href = str(a['href'])
if href.startswith('http') or href.startswith('/'):
full_url = urljoin(url, href)
if full_url not in links:
links.append(full_url)
metadata['links'] = links[:100] # Limit to 100 links
return metadata
except httpx.TimeoutException:
return {'error': 'Timeout', 'status_code': None}
except httpx.ConnectError as e:
return {'error': f'Connection error: {e}', 'status_code': None}
except httpx.HTTPError as e:
return {'error': f'HTTP error: {e}', 'status_code': None}
except Exception as e:
return {'error': f'Exception: {e}', 'status_code': None}
def transform_to_platform_v2(scrape_result: dict[str, Any], source_url: str, org_name: str) -> dict[str, Any]:
"""Transform scrape result to digital_platform_v2 format."""
links: list[str] = scrape_result.get('links', [])
# Extract title, preferring og:title, then site_name, then page title
raw_title = scrape_result.get('title', '') or ''
candidate_titles: list[str | None] = [
scrape_result.get('og_title'),
scrape_result.get('og_site_name'),
raw_title.split(' - ')[0].strip() if raw_title else None,
raw_title.split(' | ')[0].strip() if raw_title else None,
]
title = org_name # Default fallback
for candidate in candidate_titles:
if candidate and not is_generic_title(candidate):
title = candidate
break
# Generate platform ID
domain = urlparse(source_url).netloc.replace('www.', '').replace('.', '_')
platform_id = f"primary_website_{domain}"
# Detect platform type
platform_type = detect_platform_type(source_url, links)
# Extract collection URLs
collection_urls = extract_collection_urls(links, source_url)
# Extract auxiliary platforms
auxiliary_platforms = extract_auxiliary_platforms(links, source_url)
# Build digital_platform_v2 structure
platform_v2: dict[str, Any] = {
'transformation_metadata': {
'transformed_from': 'httpx_beautifulsoup',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'transformation_version': '2.1',
'source_status_code': scrape_result.get('status_code', 200),
},
'primary_platform': {
'platform_id': platform_id,
'platform_name': f"{title} Website" if 'website' not in title.lower() else title,
'platform_url': scrape_result.get('final_url', source_url),
'platform_type': platform_type,
'description': scrape_result.get('description', ''),
'language': scrape_result.get('language', 'nl'),
'og_image': scrape_result.get('og_image'),
'favicon': scrape_result.get('favicon'),
},
}
# Add collection URLs if found
if collection_urls:
platform_v2['primary_platform']['collection_urls'] = collection_urls
# Add auxiliary platforms if found
if auxiliary_platforms:
platform_v2['auxiliary_platforms'] = auxiliary_platforms
# Add internal navigation links (sample)
base_domain = urlparse(source_url).netloc
internal_links = [link for link in links if base_domain in urlparse(link).netloc][:20]
if internal_links:
platform_v2['navigation_links'] = internal_links
return platform_v2
def update_custodian_file(filepath: Path, platform_v2: dict[str, Any]) -> bool:
"""Update a custodian YAML file with digital_platform_v2 data."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data is None:
data = {}
# Add digital_platform_v2 section
data['digital_platform_v2'] = platform_v2
with open(filepath, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return True
except Exception as e:
print(f" Error updating {filepath}: {e}")
return False
def load_failed_urls() -> list[tuple[str, str]]:
"""Load the list of failed URLs with their file paths."""
urls: list[tuple[str, str]] = []
with open(FAILED_URLS_FILE, 'r') as f:
for line in f:
line = line.strip()
if '\t' in line:
filename, url = line.split('\t', 1)
urls.append((filename, url))
return urls
def get_org_name(filepath: Path) -> str:
"""Extract organization name from custodian file."""
try:
with open(filepath, 'r') as f:
data = yaml.safe_load(f)
if data:
if 'original_entry' in data and data['original_entry'].get('organisatie'):
return str(data['original_entry']['organisatie'])
if 'custodian_name' in data:
cn = data['custodian_name']
return str(cn.get('emic_name', '') or cn.get('preferred_name', ''))
if 'name' in data:
return str(data['name'])
# Fallback: extract from filename
stem = filepath.stem
parts = stem.split('-')
return parts[-1] if parts else stem
except Exception:
return filepath.stem
def main() -> None:
parser = argparse.ArgumentParser(description='Batch web scraper using httpx + BeautifulSoup')
parser.add_argument('--start', type=int, default=0, help='Starting index')
parser.add_argument('--limit', type=int, default=0, help='Maximum URLs to process (0=all)')
parser.add_argument('--dry-run', action='store_true', help='Print what would be done without making changes')
parser.add_argument('--delay', type=float, default=1.0, help='Delay between requests in seconds (default 1)')
parser.add_argument('--timeout', type=float, default=30.0, help='Request timeout in seconds (default 30)')
parser.add_argument('--skip-existing', action='store_true', default=True, help='Skip files that already have digital_platform_v2')
args = parser.parse_args()
# Check for BeautifulSoup
try:
from bs4 import BeautifulSoup as _ # noqa: F401
except ImportError:
print("Error: BeautifulSoup not installed. Run: pip install beautifulsoup4")
sys.exit(1)
# Load URLs
all_urls = load_failed_urls()
print(f"Loaded {len(all_urls)} failed URLs from {FAILED_URLS_FILE}")
# Slice based on start and limit
if args.limit > 0:
urls_to_process = all_urls[args.start:args.start + args.limit]
else:
urls_to_process = all_urls[args.start:]
print(f"Processing {len(urls_to_process)} URLs (start={args.start}, limit={args.limit or 'all'})")
if args.dry_run:
print("\n[DRY RUN MODE - No changes will be made]")
for filename, url in urls_to_process[:10]:
print(f" Would scrape: {filename} -> {url}")
if len(urls_to_process) > 10:
print(f" ... and {len(urls_to_process) - 10} more")
return
# Create HTTP client with headers
client = httpx.Client(
headers={
'User-Agent': USER_AGENT,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'nl,en-US;q=0.9,en;q=0.8',
},
follow_redirects=True,
timeout=args.timeout,
)
success_count = 0
skip_count = 0
fail_count = 0
try:
for i, (filename, url) in enumerate(urls_to_process):
filepath = CUSTODIAN_DIR / filename
print(f"\n[{i+1}/{len(urls_to_process)}] {filename}")
print(f" URL: {url}")
if not filepath.exists():
print(f" SKIP: File not found")
skip_count += 1
continue
# Check if already has digital_platform_v2
if args.skip_existing:
with open(filepath, 'r') as f:
content = f.read()
if 'digital_platform_v2:' in content:
print(f" SKIP: Already has digital_platform_v2")
skip_count += 1
continue
# Get org name for platform naming
org_name = get_org_name(filepath)
# Scrape URL
result = scrape_with_httpx(url, client, timeout=args.timeout)
if result and 'error' not in result:
# Transform to platform_v2
platform_v2 = transform_to_platform_v2(result, url, org_name)
# Update file
if update_custodian_file(filepath, platform_v2):
success_count += 1
platform_name = platform_v2['primary_platform']['platform_name']
print(f" SUCCESS: {platform_name}")
else:
fail_count += 1
else:
fail_count += 1
error_msg = result.get('error', 'Unknown error') if result else 'No result'
print(f" FAILED: {error_msg}")
# Rate limiting
if args.delay > 0:
time.sleep(args.delay)
# Progress update every 50 URLs
if (i + 1) % 50 == 0:
print(f"\n=== Progress: {i+1}/{len(urls_to_process)} (success={success_count}, skip={skip_count}, fail={fail_count}) ===\n")
finally:
client.close()
print(f"\n=== Final Results ===")
print(f"Success: {success_count}")
print(f"Skipped: {skip_count}")
print(f"Failed: {fail_count}")
print(f"Total: {len(urls_to_process)}")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
Detect name mismatches in LinkedIn entity profiles.
Compares the LinkedIn URL slug with the assigned name to find:
1. Profiles where the name doesn't match the slug at all
2. Patterns of repeated wrong names (like "Simon Kemper")
3. Other potential filler/hallucinated names
"""
import json
import os
import re
from pathlib import Path
from collections import Counter, defaultdict
from urllib.parse import unquote
import unicodedata
def normalize_name(name: str) -> str:
"""Normalize a name for comparison."""
if not name:
return ""
# Decode URL encoding
name = unquote(name)
# Normalize unicode
name = unicodedata.normalize('NFD', name)
# Remove diacritics
name = ''.join(c for c in name if unicodedata.category(c) != 'Mn')
# Lowercase
name = name.lower()
# Remove common suffixes like numbers, hyphens
name = re.sub(r'[-_\d]+$', '', name)
# Replace hyphens/underscores with spaces
name = re.sub(r'[-_]+', ' ', name)
# Remove extra whitespace
name = ' '.join(name.split())
return name
def extract_name_from_slug(slug: str) -> str:
"""Extract a human-readable name from a LinkedIn slug."""
# Decode URL encoding
slug = unquote(slug)
# Remove timestamp suffix like _20251214T115050Z
slug = re.sub(r'_\d{8}T\d{6}Z\.json$', '', slug)
# Remove trailing numbers/IDs
slug = re.sub(r'[-_][\da-f]{6,}$', '', slug)
slug = re.sub(r'[-_]\d+$', '', slug)
return normalize_name(slug)
def names_match(slug_name: str, profile_name: str) -> bool:
"""Check if the slug name and profile name are reasonably similar."""
if not slug_name or not profile_name:
return False
slug_normalized = normalize_name(slug_name)
profile_normalized = normalize_name(profile_name)
# Direct match
if slug_normalized == profile_normalized:
return True
# Check if all words from slug appear in profile name
slug_words = set(slug_normalized.split())
profile_words = set(profile_normalized.split())
# If slug has meaningful words, check overlap
if slug_words and len(slug_words) >= 2:
# At least half the slug words should be in profile
overlap = slug_words & profile_words
if len(overlap) >= len(slug_words) * 0.5:
return True
# Check if first name matches
slug_parts = slug_normalized.split()
profile_parts = profile_normalized.split()
if slug_parts and profile_parts:
if slug_parts[0] == profile_parts[0]:
return True
return False
def analyze_entity_files(entity_dir: Path):
"""Analyze all entity files for name mismatches."""
mismatches = []
name_counter = Counter()
files_by_name = defaultdict(list)
total_files = 0
fallback_files = 0
for filepath in entity_dir.glob("*.json"):
total_files += 1
filename = filepath.name
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, IOError) as e:
print(f"Error reading {filename}: {e}")
continue
# Get the profile name
profile_name = None
if 'profile_data' in data and 'name' in data['profile_data']:
profile_name = data['profile_data']['name']
elif 'source_staff_info' in data and 'name' in data['source_staff_info']:
profile_name = data['source_staff_info']['name']
if not profile_name:
continue
# Track all names for frequency analysis
name_counter[profile_name] += 1
files_by_name[profile_name].append(filename)
# Check if this is a fallback file
extraction_method = data.get('extraction_metadata', {}).get('extraction_method', '')
if extraction_method == 'fallback_basic':
fallback_files += 1
# Extract name from slug
slug_name = extract_name_from_slug(filename)
# Check for mismatch
if not names_match(slug_name, profile_name):
mismatches.append({
'filename': filename,
'slug_name': slug_name,
'profile_name': profile_name,
'extraction_method': extraction_method,
'linkedin_url': data.get('extraction_metadata', {}).get('linkedin_url', '')
})
return {
'total_files': total_files,
'fallback_files': fallback_files,
'mismatches': mismatches,
'name_counter': name_counter,
'files_by_name': files_by_name
}
def main():
entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
print("=" * 80)
print("LINKEDIN ENTITY NAME MISMATCH ANALYSIS")
print("=" * 80)
print()
results = analyze_entity_files(entity_dir)
print(f"Total entity files analyzed: {results['total_files']}")
print(f"Fallback (basic) files: {results['fallback_files']}")
print(f"Total mismatches detected: {len(results['mismatches'])}")
print()
# Find names that appear suspiciously often (potential filler names)
print("=" * 80)
print("NAMES APPEARING MORE THAN 5 TIMES (Potential Filler Names)")
print("=" * 80)
frequent_names = [(name, count) for name, count in results['name_counter'].most_common(50) if count > 5]
for name, count in frequent_names:
# Check if this name appears in mismatches
mismatch_count = sum(1 for m in results['mismatches'] if m['profile_name'] == name)
print(f" '{name}': {count} occurrences ({mismatch_count} are mismatches)")
print()
print("=" * 80)
print("ALL MISMATCHED FILES (slug name != profile name)")
print("=" * 80)
# Group mismatches by profile_name to see patterns
mismatch_by_name = defaultdict(list)
for m in results['mismatches']:
mismatch_by_name[m['profile_name']].append(m)
# Sort by frequency of the mismatched name
sorted_names = sorted(mismatch_by_name.items(), key=lambda x: -len(x[1]))
for profile_name, items in sorted_names[:30]: # Top 30 most frequent mismatch names
print(f"\n--- '{profile_name}' assigned to {len(items)} different slugs ---")
for item in items[:10]: # Show first 10 examples
print(f" Slug: {item['slug_name']}")
print(f" File: {item['filename']}")
print(f" Method: {item['extraction_method']}")
print()
# Output detailed CSV for further analysis
csv_path = entity_dir.parent / "name_mismatch_report.csv"
with open(csv_path, 'w', encoding='utf-8') as f:
f.write("filename,slug_name,profile_name,extraction_method,linkedin_url\n")
for m in results['mismatches']:
f.write(f'"{m["filename"]}","{m["slug_name"]}","{m["profile_name"]}","{m["extraction_method"]}","{m["linkedin_url"]}"\n')
print(f"\nDetailed report saved to: {csv_path}")
# Also output JSON for programmatic use
json_path = entity_dir.parent / "name_mismatch_report.json"
with open(json_path, 'w', encoding='utf-8') as f:
json.dump({
'total_files': results['total_files'],
'fallback_files': results['fallback_files'],
'total_mismatches': len(results['mismatches']),
'mismatches_by_name': {name: len(items) for name, items in mismatch_by_name.items()},
'frequent_names': [(name, count) for name, count in results['name_counter'].most_common(100)],
'mismatches': results['mismatches']
}, f, indent=2, ensure_ascii=False)
print(f"JSON report saved to: {json_path}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,666 @@
#!/usr/bin/env python3
"""
Enrich Dutch custodian YAML files with web data using Crawl4AI (free, local).
This script replaces the Firecrawl-based enrichment with Crawl4AI which:
1. Runs locally using Playwright (no API costs)
2. Extracts links, metadata, and content with XPath provenance
3. Detects APIs, catalogs, and metadata standards
Usage:
python scripts/enrich_dutch_custodians_crawl4ai.py [options]
Options:
--dry-run Show what would be enriched without modifying files
--limit N Process only first N files (for testing)
--start-index N Start from index N (for resuming)
--resume Resume from last checkpoint
--force Re-enrich even if already has crawl4ai_enrichment
--file PATH Process a single specific file
"""
import argparse
import asyncio
import json
import logging
import os
import re
import sys
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse, urlunparse
import yaml
from dotenv import load_dotenv
from lxml import etree
# Crawl4AI imports
from crawl4ai import AsyncWebCrawler, CrawlerRunConfig, CacheMode
# Load environment variables from .env file
load_dotenv()
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
CUSTODIAN_DIR = Path(__file__).parent.parent / "data" / "custodian"
CHECKPOINT_FILE = CUSTODIAN_DIR / ".crawl4ai_enrichment_checkpoint.json"
# Rate limiting - be nice to websites even though we're local
REQUEST_DELAY = 2.0 # seconds between requests
# Digital platform detection patterns
API_ENDPOINT_PATTERNS = [
r'/oai[-_]?pmh',
r'/api/',
r'/rest/',
r'/sparql',
r'/graphql',
r'/iiif/',
r'/sru',
r'/z39\.50',
r'/opensearch',
]
CATALOG_PATTERNS = [
r'/catalogu[es]?(?:/|\?|$)',
r'/collecti[eo]n?[s]?(?:/|\?|$)',
r'/archie[fv](?:/|\?|$)',
r'/beeldbank(?:/|\?|$)',
r'/zoeken(?:/|\?|$)',
r'/search(?:/|\?|$)',
r'/discover(?:/|\?|$)',
r'/browse(?:/|\?|$)',
]
# Dutch-specific catalog type detection
CATALOG_TYPE_PATTERNS = {
'beeldbank': {
'patterns': [r'/beeldbank', r'/beeld', r'/images', r'/foto'],
'label': 'Image Collection',
'description_nl': 'Beeldbank met gedigitaliseerde foto\'s, kaarten en afbeeldingen',
},
'genealogie': {
'patterns': [r'/genealogie', r'/stamboom', r'/persons', r'/akten'],
'label': 'Genealogy Records',
'description_nl': 'Genealogische bronnen en persoonsgegevens',
},
'archieven': {
'patterns': [r'/archie[fv]', r'/inventaris', r'/toegangen', r'/finding'],
'label': 'Archive Finding Aids',
'description_nl': 'Archiefinventarissen en toegangen',
},
'collectie': {
'patterns': [r'/collectie', r'/collection', r'/object'],
'label': 'Collection Portal',
'description_nl': 'Collectieportaal met objecten en kunstwerken',
},
'kranten': {
'patterns': [r'/kranten', r'/newspaper', r'/periodiek'],
'label': 'Newspaper Archive',
'description_nl': 'Gedigitaliseerde kranten en periodieken',
},
'kaarten': {
'patterns': [r'/kaart', r'/map', r'/cartogra'],
'label': 'Map Collection',
'description_nl': 'Historische kaarten en cartografisch materiaal',
},
'bibliotheek': {
'patterns': [r'/catalogu', r'/biblio', r'/library', r'/boek'],
'label': 'Library Catalog',
'description_nl': 'Bibliotheekcatalogus',
},
'zoeken': {
'patterns': [r'/zoeken', r'/search', r'/discover', r'/browse'],
'label': 'Search Interface',
'description_nl': 'Algemene zoekinterface',
},
}
CMS_INDICATORS = {
'atlantis': ['atlantis', 'picturae'],
'mais_flexis': ['mais-flexis', 'mais flexis', 'de ree'],
'adlib': ['adlib', 'axiell'],
'collective_access': ['collectiveaccess', 'collective access'],
'archivematica': ['archivematica'],
'archivesspace': ['archivesspace'],
'atom': ['accesstomemory', 'atom'],
'omeka': ['omeka'],
'contentdm': ['contentdm'],
'dspace': ['dspace'],
'islandora': ['islandora'],
'memorix': ['memorix'],
}
# Metadata standards detection patterns with regex word boundaries
METADATA_STANDARDS_PATTERNS = [
(r'\bdublin\s+core\b', 'Dublin Core', True),
(r'\bdc:', 'Dublin Core', True),
(r'\bdcterms\b', 'Dublin Core', True),
(r'\bmarc\s*21\b', 'MARC21', True),
(r'\bmarc21\b', 'MARC21', True),
(r'\bead\b', 'EAD', True),
(r'encoded\s+archival\s+description', 'EAD', True),
(r'\bead\s*2002\b', 'EAD', True),
(r'\bead3\b', 'EAD', True),
(r'\bmets\b', 'METS', True),
(r'metadata\s+encoding\s+and\s+transmission', 'METS', True),
(r'\bmods\b', 'MODS', True),
(r'metadata\s+object\s+description', 'MODS', True),
(r'\blido\b', 'LIDO', True),
(r'lightweight\s+information\s+describing', 'LIDO', True),
(r'\bcidoc[-\s]?crm\b', 'CIDOC-CRM', True),
(r'\bschema\.org\b', 'Schema.org', True),
(r'\bschema:', 'Schema.org', True),
(r'\bric[-\s]?o\b', 'RiC-O', True),
(r'records\s+in\s+contexts', 'RiC-O', True),
(r'\bpremis\b', 'PREMIS', True),
(r'preservation\s+metadata', 'PREMIS', True),
(r'\bbibframe\b', 'BIBFRAME', True),
(r'\biiif\b', 'IIIF', True),
(r'image\s+interoperability\s+framework', 'IIIF', True),
]
# Dutch archive platform domains to detect
DUTCH_ARCHIVE_PLATFORMS = [
'archieven.nl',
'memorix.nl',
'archiefweb.eu',
'atlantisdigitaal.nl',
'picturae.nl',
'mais-flexis.nl',
'delpher.nl',
'geheugen.nl',
]
def get_xpath(element, tree) -> str:
"""Generate XPath for an lxml element."""
parts = []
while element is not None:
parent = element.getparent()
if parent is None:
parts.append(element.tag)
else:
siblings = [c for c in parent if c.tag == element.tag]
if len(siblings) == 1:
parts.append(element.tag)
else:
index = siblings.index(element) + 1
parts.append(f'{element.tag}[{index}]')
element = parent
return '/' + '/'.join(reversed(parts))
def normalize_url(url: str) -> str:
"""Normalize URL by removing noise query parameters."""
if not url:
return url
parsed = urlparse(url)
# Remove common tracking/session parameters
noise_params = ['sort', 'order', 'view', 'mode', 'ss', 'page', 'offset',
'limit', 'random', 'session', 'sid', 'token', 'ref']
if parsed.query:
params = dict(p.split('=', 1) if '=' in p else (p, '')
for p in parsed.query.split('&'))
filtered = {k: v for k, v in params.items()
if not any(k.startswith(n) for n in noise_params + ['utm_', 'fbclid', 'gclid'])}
new_query = '&'.join(f'{k}={v}' if v else k for k, v in sorted(filtered.items()))
return urlunparse(parsed._replace(query=new_query))
return url
def detect_catalog_type(url: str) -> dict | None:
"""Detect catalog type from URL pattern."""
url_lower = url.lower()
for type_key, type_info in CATALOG_TYPE_PATTERNS.items():
for pattern in type_info['patterns']:
if re.search(pattern, url_lower):
return {
'type': type_key,
'label': type_info['label'],
'description_nl': type_info['description_nl'],
}
return None
def detect_metadata_standards(content: str) -> list[str]:
"""Detect metadata standards mentioned in content using regex word boundaries."""
if not content:
return []
content_lower = content.lower()
standards_found = set()
for pattern, standard_name, use_regex in METADATA_STANDARDS_PATTERNS:
if use_regex:
if re.search(pattern, content_lower, re.IGNORECASE):
standards_found.add(standard_name)
else:
if pattern.lower() in content_lower:
standards_found.add(standard_name)
return sorted(list(standards_found))
def detect_cms(content: str) -> str | None:
"""Detect CMS/collection management system from content."""
if not content:
return None
content_lower = content.lower()
for cms_name, indicators in CMS_INDICATORS.items():
for indicator in indicators:
if indicator in content_lower:
return cms_name
return None
def extract_website_url(entry: dict) -> str | None:
"""Extract website URL from custodian entry."""
# Check various possible locations for website
if 'website' in entry:
return entry['website']
# Check in enrichment data
for enrichment_key in ['zcbs_enrichment', 'google_maps_enrichment', 'wikidata_enrichment']:
if enrichment_key in entry:
enrichment = entry[enrichment_key]
if isinstance(enrichment, dict):
if 'website' in enrichment:
return enrichment['website']
if 'url' in enrichment:
return enrichment['url']
# Check identifiers
if 'identifiers' in entry:
for identifier in entry.get('identifiers', []):
if isinstance(identifier, dict):
if identifier.get('identifier_scheme') == 'Website':
return identifier.get('identifier_value')
return None
async def crawl_website(crawler: AsyncWebCrawler, url: str) -> dict:
"""
Crawl a website and extract structured data with XPath provenance.
Returns a dict with:
- success: bool
- title: str
- description: str
- html: str (raw HTML for further processing)
- markdown: str
- links: list of dicts with href, text, xpath
- metadata: dict of og/meta tags
- error: str (if failed)
"""
config = CrawlerRunConfig(
cache_mode=CacheMode.BYPASS,
verbose=False,
# Wait for page to fully load
wait_until="networkidle",
page_timeout=30000,
)
try:
result = await crawler.arun(url=url, config=config)
if not result.success:
return {
'success': False,
'error': f'Crawl failed with status {result.status_code}',
'status_code': result.status_code,
}
# Parse HTML with lxml to extract XPaths
links_with_xpath = []
if result.html:
try:
tree = etree.HTML(result.html)
link_elements = tree.xpath('//a[@href]')
for link_el in link_elements:
href = link_el.get('href', '')
text = ''.join(link_el.itertext()).strip()
xpath = get_xpath(link_el, tree)
# Skip empty links and javascript
if href and not href.startswith(('javascript:', '#', 'mailto:', 'tel:')):
links_with_xpath.append({
'href': href,
'text': text[:200] if text else '', # Truncate long text
'xpath': xpath,
})
except Exception as e:
logger.warning(f"Error parsing HTML for XPath extraction: {e}")
# Also include crawl4ai's extracted links for completeness
internal_links = result.links.get('internal', []) if result.links else []
external_links = result.links.get('external', []) if result.links else []
return {
'success': True,
'status_code': result.status_code,
'title': result.metadata.get('title', '') if result.metadata else '',
'description': result.metadata.get('description', '') if result.metadata else '',
'html': result.html,
'markdown': result.markdown.raw_markdown if result.markdown else '',
'links_with_xpath': links_with_xpath,
'internal_links': [l.get('href', '') for l in internal_links if isinstance(l, dict)],
'external_links': [l.get('href', '') for l in external_links if isinstance(l, dict)],
'metadata': result.metadata or {},
}
except Exception as e:
logger.error(f"Error crawling {url}: {e}")
return {
'success': False,
'error': str(e),
}
def analyze_crawl_results(crawl_data: dict, base_url: str) -> dict:
"""
Analyze crawl results to extract APIs, catalogs, and metadata standards.
Returns enrichment dict ready to add to YAML.
"""
enrichment = {
'retrieval_timestamp': datetime.now(timezone.utc).isoformat(),
'retrieval_agent': 'crawl4ai',
'source_url': base_url,
'status_code': crawl_data.get('status_code'),
}
if not crawl_data.get('success'):
enrichment['error'] = crawl_data.get('error', 'Unknown error')
return enrichment
# Basic metadata
enrichment['title'] = crawl_data.get('title', '')
enrichment['description'] = crawl_data.get('description', '')
enrichment['links_count'] = len(crawl_data.get('links_with_xpath', []))
# Collect all URLs for analysis
all_urls = set()
links_with_xpath = crawl_data.get('links_with_xpath', [])
for link in links_with_xpath:
href = link.get('href', '')
if href:
# Make absolute URL if relative
if href.startswith('/'):
parsed_base = urlparse(base_url)
href = f"{parsed_base.scheme}://{parsed_base.netloc}{href}"
all_urls.add(href)
# Add internal/external links from crawl4ai
for link in crawl_data.get('internal_links', []):
if link:
all_urls.add(link)
for link in crawl_data.get('external_links', []):
if link:
all_urls.add(link)
# Detect API endpoints
detected_apis = []
for url in all_urls:
url_lower = url.lower()
for pattern in API_ENDPOINT_PATTERNS:
if re.search(pattern, url_lower):
detected_apis.append({
'url': normalize_url(url),
'pattern_matched': pattern,
})
break
if detected_apis:
enrichment['detected_api_endpoints'] = detected_apis
# Detect catalog URLs with type classification
detected_catalogs = []
for url in all_urls:
url_lower = url.lower()
for pattern in CATALOG_PATTERNS:
if re.search(pattern, url_lower):
catalog_entry = {
'url': normalize_url(url),
}
catalog_type = detect_catalog_type(url)
if catalog_type:
catalog_entry['type'] = catalog_type['type']
catalog_entry['label'] = catalog_type['label']
# Find XPath for this link
for link in links_with_xpath:
if link.get('href', '').rstrip('/') == url.rstrip('/') or \
(link.get('href', '').startswith('/') and url.endswith(link.get('href', ''))):
catalog_entry['xpath'] = link.get('xpath')
catalog_entry['link_text'] = link.get('text', '')
break
detected_catalogs.append(catalog_entry)
break
if detected_catalogs:
enrichment['detected_catalog_urls'] = detected_catalogs
# Detect external archive platforms
external_platforms = []
for url in all_urls:
url_lower = url.lower()
for platform in DUTCH_ARCHIVE_PLATFORMS:
if platform in url_lower:
external_platforms.append({
'url': normalize_url(url),
'platform': platform,
})
break
if external_platforms:
enrichment['external_archive_platforms'] = external_platforms
# Detect metadata standards from content
# Handle None values explicitly to avoid string concatenation errors
markdown = crawl_data.get('markdown') or ''
title = crawl_data.get('title') or ''
description = crawl_data.get('description') or ''
content = f"{markdown} {title} {description}"
standards = detect_metadata_standards(content)
if standards:
enrichment['detected_standards'] = standards
# Detect CMS
cms = detect_cms(content)
if cms:
enrichment['detected_cms'] = cms
# Extract OG/meta tags of interest
metadata = crawl_data.get('metadata', {})
og_data = {}
for key in ['og:title', 'og:description', 'og:image', 'og:url', 'og:site_name']:
if key in metadata:
og_data[key.replace('og:', '')] = metadata[key]
if og_data:
enrichment['open_graph'] = og_data
return enrichment
def load_checkpoint() -> dict:
"""Load checkpoint from file."""
if CHECKPOINT_FILE.exists():
with open(CHECKPOINT_FILE, 'r') as f:
return json.load(f)
return {}
def save_checkpoint(checkpoint: dict):
"""Save checkpoint to file."""
with open(CHECKPOINT_FILE, 'w') as f:
json.dump(checkpoint, f, indent=2)
async def process_single_file(
crawler: AsyncWebCrawler,
filepath: Path,
dry_run: bool = False,
force: bool = False,
) -> bool:
"""Process a single custodian YAML file."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
logger.warning(f"Empty file: {filepath}")
return False
# Check if already enriched
if 'crawl4ai_enrichment' in entry and not force:
logger.info(f"Skipping {filepath.name}: already has crawl4ai_enrichment")
return True
# Extract website URL
website_url = extract_website_url(entry)
if not website_url:
logger.info(f"Skipping {filepath.name}: no website URL found")
return False
# Ensure URL has protocol
if not website_url.startswith(('http://', 'https://')):
website_url = 'https://' + website_url
logger.info(f"Processing {filepath.name}: {website_url}")
if dry_run:
logger.info(f" -> DRY RUN: would crawl {website_url}")
return True
# Crawl the website
crawl_data = await crawl_website(crawler, website_url)
# Analyze results
enrichment = analyze_crawl_results(crawl_data, website_url)
# Add enrichment to entry
entry['crawl4ai_enrichment'] = enrichment
# Write back to file
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(entry, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
# Log summary
apis_count = len(enrichment.get('detected_api_endpoints', []))
catalogs_count = len(enrichment.get('detected_catalog_urls', []))
platforms_count = len(enrichment.get('external_archive_platforms', []))
logger.info(f" -> success: {apis_count} APIs, {catalogs_count} catalogs, {platforms_count} external platforms found")
return True
except Exception as e:
logger.error(f"Error processing {filepath}: {e}")
return False
async def main():
parser = argparse.ArgumentParser(description='Enrich Dutch custodians with Crawl4AI')
parser.add_argument('--dry-run', action='store_true', help='Show what would be enriched')
parser.add_argument('--limit', type=int, help='Process only first N files')
parser.add_argument('--start-index', type=int, default=0, help='Start from index N')
parser.add_argument('--resume', action='store_true', help='Resume from last checkpoint')
parser.add_argument('--force', action='store_true', help='Re-enrich even if already enriched')
parser.add_argument('--file', type=str, help='Process a single specific file')
args = parser.parse_args()
# Create logs directory
logs_dir = Path(__file__).parent.parent / "logs"
logs_dir.mkdir(exist_ok=True)
# Add file handler for logging
log_file = logs_dir / f"crawl4ai_enrichment_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
# Single file mode
if args.file:
filepath = Path(args.file)
if not filepath.exists():
logger.error(f"File not found: {filepath}")
sys.exit(1)
async with AsyncWebCrawler() as crawler:
success = await process_single_file(crawler, filepath, args.dry_run, args.force)
sys.exit(0 if success else 1)
# Batch mode
files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml"))
logger.info(f"Found {len(files)} Dutch custodian files")
# Handle resume
start_index = args.start_index
if args.resume:
checkpoint = load_checkpoint()
if 'last_processed_index' in checkpoint:
start_index = checkpoint['last_processed_index'] + 1
logger.info(f"Resuming from index {start_index}")
# Apply limit
end_index = len(files)
if args.limit:
end_index = min(start_index + args.limit, len(files))
logger.info(f"Processing files {start_index} to {end_index - 1}")
# Process files
success_count = 0
error_count = 0
async with AsyncWebCrawler() as crawler:
for i, filepath in enumerate(files[start_index:end_index], start=start_index):
logger.info(f"[{i + 1}/{len(files)}] Processing {filepath.name}")
success = await process_single_file(crawler, filepath, args.dry_run, args.force)
if success:
success_count += 1
else:
error_count += 1
# Save checkpoint
if not args.dry_run:
save_checkpoint({
'last_processed_index': i,
'last_processed_file': str(filepath),
'last_processed_time': datetime.now(timezone.utc).isoformat(),
'success_count': success_count,
'error_count': error_count,
})
# Rate limiting
if i < end_index - 1:
await asyncio.sleep(REQUEST_DELAY)
# Summary
logger.info(f"\n{'='*50}")
logger.info(f"Enrichment complete!")
logger.info(f" Success: {success_count}")
logger.info(f" Errors: {error_count}")
logger.info(f" Log file: {log_file}")
if __name__ == '__main__':
asyncio.run(main())

View file

@ -0,0 +1,281 @@
#!/usr/bin/env python3
"""
Fix GHCID collision victim files.
These files have a trailing dash in their filename (e.g., NL-DR-ASS-L-BD-.yaml)
indicating they were collision victims whose internal GHCID was incorrectly set
to their collision partner's GHCID instead of getting their own unique GHCID.
This script:
1. Reads the institution's real name from original_entry.organisatie
2. Generates a proper name suffix from that name
3. Creates a new unique GHCID with the proper suffix
4. Regenerates all GHCID-derived identifiers (UUID, numeric)
5. Updates the file with correct identifiers
6. Renames the file to match the new GHCID
"""
import hashlib
import re
import shutil
import unicodedata
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# GHCID namespace for UUID generation
GHCID_NAMESPACE = uuid.NAMESPACE_URL
GHCID_URL_PREFIX = "https://glam.registry/"
# Skip words for abbreviation generation (Dutch and common)
SKIP_WORDS = {
'de', 'het', 'een', 'van', 'voor', 'in', 'op', 'te', 'den', 'der', 'des',
's', 'aan', 'bij', 'met', 'naar', 'om', 'tot', 'uit', 'over', 'onder',
'door', 'en', 'of', 'stichting', 'vereniging', 'foundation', 'the', 'a',
'an', 'of', 'in', 'at', 'on', 'to', 'for', 'with', 'from', 'by', 'as',
'museum', 'bibliotheek', 'archief', 'collectie'
}
def normalize_diacritics(text: str) -> str:
"""Normalize diacritics to ASCII equivalents."""
normalized = unicodedata.normalize('NFD', text)
ascii_text = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
return ascii_text
def generate_name_suffix(native_name: str) -> str:
"""Convert native language institution name to snake_case suffix.
Examples:
"Biblionet Drenthe POI" "biblionet_drenthe_poi"
"Fries Verzetsmuseum" "fries_verzetsmuseum"
"Musée d'Orsay" "musee_dorsay"
"""
# Normalize unicode and remove diacritics
ascii_name = normalize_diacritics(native_name)
# Convert to lowercase
lowercase = ascii_name.lower()
# Remove apostrophes, commas, and other punctuation
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase)
# Replace spaces and hyphens with underscores
underscored = re.sub(r'[\s\-]+', '_', no_punct)
# Remove any remaining non-alphanumeric characters (except underscores)
clean = re.sub(r'[^a-z0-9_]', '', underscored)
# Collapse multiple underscores
final = re.sub(r'_+', '_', clean).strip('_')
return final
def generate_ghcid_uuid(ghcid: str) -> str:
"""Generate UUID v5 from GHCID."""
return str(uuid.uuid5(GHCID_NAMESPACE, f"{GHCID_URL_PREFIX}{ghcid}"))
def generate_ghcid_uuid_sha256(ghcid: str) -> str:
"""Generate UUID v8 (SHA-256 based) from GHCID."""
return str(uuid.uuid5(GHCID_NAMESPACE, f"{GHCID_URL_PREFIX}sha256/{ghcid}"))
def generate_ghcid_numeric(ghcid: str) -> int:
"""Generate 64-bit numeric ID from GHCID."""
sha256_hash = hashlib.sha256(ghcid.encode()).hexdigest()
return int(sha256_hash[:16], 16)
def fix_collision_victim(file_path: Path, dry_run: bool = False) -> Optional[Path]:
"""Fix a single collision victim file.
Args:
file_path: Path to the collision victim YAML file
dry_run: If True, only print what would be done
Returns:
New file path after renaming, or None if skipped/failed
"""
print(f"\n{'='*80}")
print(f"Processing: {file_path.name}")
print(f"{'='*80}")
# Read file
try:
with open(file_path) as f:
data = yaml.safe_load(f)
except Exception as e:
print(f" ERROR: Could not read file: {e}")
return None
if data is None:
print(f" SKIP: File is empty or invalid")
return None
# Get institution name
org_name = data.get('original_entry', {}).get('organisatie')
if not org_name:
print(f" ERROR: No organisatie found in original_entry")
return None
print(f" Institution: {org_name}")
# Get current GHCID info
ghcid_data = data.get('ghcid', {})
old_ghcid = ghcid_data.get('ghcid_current', '')
print(f" Old GHCID: {old_ghcid}")
# Extract base GHCID from filename (remove trailing dash)
base_ghcid = file_path.stem.rstrip('-')
print(f" Base GHCID: {base_ghcid}")
# Generate new name suffix from institution name
name_suffix = generate_name_suffix(org_name)
print(f" Name suffix: {name_suffix}")
# Create new GHCID
new_ghcid = f"{base_ghcid}-{name_suffix}"
print(f" New GHCID: {new_ghcid}")
# Check if this would be the same as old (only filename is wrong)
if new_ghcid == old_ghcid:
expected_filename = f"{new_ghcid}.yaml"
if file_path.name != expected_filename:
print(f" GHCID correct, but filename wrong - needs rename only")
if dry_run:
print(f" DRY RUN: Would rename to {expected_filename}")
return None
new_file_path = file_path.parent / expected_filename
if new_file_path.exists():
print(f" ERROR: Target file already exists: {new_file_path.name}")
return None
shutil.move(str(file_path), str(new_file_path))
print(f" Renamed: {file_path.name}{new_file_path.name}")
return new_file_path
else:
print(f" SKIP: GHCID and filename both correct")
return None
# Generate new identifiers
new_uuid = generate_ghcid_uuid(new_ghcid)
new_uuid_sha256 = generate_ghcid_uuid_sha256(new_ghcid)
new_numeric = generate_ghcid_numeric(new_ghcid)
print(f" New UUID: {new_uuid}")
print(f" New numeric: {new_numeric}")
if dry_run:
print(f" DRY RUN: Would update file and rename to {new_ghcid}.yaml")
return None
# Update GHCID section
timestamp = datetime.now(timezone.utc).isoformat()
# Preserve old GHCID in history
ghcid_history = ghcid_data.get('ghcid_history', [])
# Add history entry for the fix
ghcid_history.append({
'ghcid': old_ghcid,
'ghcid_uuid': ghcid_data.get('ghcid_uuid', ''),
'ghcid_numeric': ghcid_data.get('ghcid_numeric', 0),
'valid_from': ghcid_data.get('generated_at', ''),
'valid_to': timestamp,
'reason': f"Collision fix: had partner's GHCID, corrected to institution's own GHCID based on name '{org_name}'"
})
data['ghcid'] = {
'ghcid_current': new_ghcid,
'ghcid_uuid': new_uuid,
'ghcid_uuid_sha256': new_uuid_sha256,
'ghcid_numeric': new_numeric,
'generated_at': timestamp,
'ghcid_history': ghcid_history
}
# Update identifiers list
identifiers = data.get('identifiers', [])
updated_identifiers = []
for ident in identifiers:
scheme = ident.get('identifier_scheme', '')
if scheme == 'GHCID':
ident['identifier_value'] = new_ghcid
ident['identifier_url'] = f"https://w3id.org/heritage/custodian/{new_ghcid}"
elif scheme == 'GHCID_UUID':
ident['identifier_value'] = new_uuid
elif scheme == 'GHCID_NUMERIC':
ident['identifier_value'] = str(new_numeric)
updated_identifiers.append(ident)
data['identifiers'] = updated_identifiers
# Write updated data back to file
with open(file_path, 'w') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
print(f" Updated file content")
# Rename file to match new GHCID
new_file_path = file_path.parent / f"{new_ghcid}.yaml"
if new_file_path.exists():
print(f" ERROR: Target file already exists: {new_file_path.name}")
return None
shutil.move(str(file_path), str(new_file_path))
print(f" Renamed: {file_path.name}{new_file_path.name}")
return new_file_path
def main():
import argparse
parser = argparse.ArgumentParser(description='Fix GHCID collision victim files')
parser.add_argument('--dry-run', action='store_true', help='Only show what would be done')
parser.add_argument('--file', type=str, help='Process only this specific file')
args = parser.parse_args()
custodian_dir = Path('data/custodian')
if args.file:
files = [Path(args.file)]
else:
# Find all collision victim files (trailing dash pattern)
files = sorted(custodian_dir.glob('NL-*-.yaml'))
print(f"Found {len(files)} collision victim file(s)")
fixed = 0
skipped = 0
errors = 0
for f in files:
result = fix_collision_victim(f, dry_run=args.dry_run)
if result:
fixed += 1
elif result is None:
# Check if it was empty
if f.stat().st_size == 0:
print(f"\n EMPTY FILE: {f.name} - should be deleted")
errors += 1
else:
skipped += 1
print(f"\n{'='*80}")
print(f"SUMMARY")
print(f"{'='*80}")
print(f" Fixed: {fixed}")
print(f" Skipped: {skipped}")
print(f" Errors/Empty: {errors}")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,140 @@
#!/usr/bin/env python3
"""
Fix generic platform names ('Home Website', 'Homepage Website') by using
the organisatie field from original_entry.
Also filters invalid platform types (ONLINEMARKETING, ONLINEBRANDING).
"""
import yaml
import os
import sys
from pathlib import Path
from datetime import datetime, timezone
# Custom YAML representer to preserve formatting
def str_representer(dumper, data):
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
yaml.add_representer(str, str_representer)
GENERIC_NAMES = {'Home Website', 'Homepage Website', 'Welkom Website'}
INVALID_TYPES = {'ONLINEMARKETING', 'ONLINEBRANDING', 'ONLINEWEBSITE', 'ONLINE'}
def fix_file(filepath: Path, dry_run: bool = False) -> dict:
"""Fix a single file. Returns stats dict."""
stats = {
'name_fixed': False,
'types_fixed': False,
'old_name': None,
'new_name': None,
'removed_types': []
}
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
return stats
# Check if file has digital_platform_v2
if 'digital_platform_v2' not in data:
return stats
dpv2 = data['digital_platform_v2']
modified = False
# Fix 1: Generic platform names
current_name = dpv2.get('platform_name', '')
if current_name in GENERIC_NAMES:
# Try to get organisation name
org_name = None
if 'original_entry' in data and 'organisatie' in data['original_entry']:
org_name = data['original_entry']['organisatie']
elif 'museum_register_enrichment' in data and 'museum_name' in data['museum_register_enrichment']:
org_name = data['museum_register_enrichment']['museum_name']
elif 'wikidata_enrichment' in data and 'wikidata_label_nl' in data['wikidata_enrichment']:
org_name = data['wikidata_enrichment']['wikidata_label_nl']
if org_name:
new_name = f"{org_name} Website"
stats['old_name'] = current_name
stats['new_name'] = new_name
stats['name_fixed'] = True
dpv2['platform_name'] = new_name
modified = True
# Fix 2: Invalid platform types
if 'platform_type' in dpv2 and isinstance(dpv2['platform_type'], list):
original_types = dpv2['platform_type'].copy()
filtered_types = [t for t in original_types if t not in INVALID_TYPES]
if len(filtered_types) < len(original_types):
stats['removed_types'] = [t for t in original_types if t in INVALID_TYPES]
stats['types_fixed'] = True
dpv2['platform_type'] = filtered_types if filtered_types else ['INSTITUTIONAL_WEBSITE']
modified = True
# Add fix metadata
if modified:
if '_transformation_metadata' not in dpv2:
dpv2['_transformation_metadata'] = {}
dpv2['_transformation_metadata']['quality_fix_date'] = datetime.now(timezone.utc).isoformat()
if stats['name_fixed']:
dpv2['_transformation_metadata']['name_source'] = 'organisatie_field'
if not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return stats
def main():
import argparse
parser = argparse.ArgumentParser(description='Fix generic platform names')
parser.add_argument('--dry-run', action='store_true', help='Show what would be changed without modifying files')
parser.add_argument('--path', default='/Users/kempersc/apps/glam/data/custodian', help='Path to custodian files')
args = parser.parse_args()
custodian_path = Path(args.path)
# Find files with digital_platform_v2
files_fixed_names = 0
files_fixed_types = 0
total_checked = 0
print(f"{'[DRY RUN] ' if args.dry_run else ''}Scanning {custodian_path}...")
print()
for filepath in sorted(custodian_path.glob('NL-*.yaml')):
stats = fix_file(filepath, dry_run=args.dry_run)
if stats['name_fixed'] or stats['types_fixed']:
total_checked += 1
if stats['name_fixed']:
files_fixed_names += 1
print(f"{filepath.name}")
print(f" Name: '{stats['old_name']}''{stats['new_name']}'")
if stats['types_fixed']:
files_fixed_types += 1
print(f" Removed types: {stats['removed_types']}")
print()
print("=" * 60)
print(f"{'[DRY RUN] ' if args.dry_run else ''}Summary:")
print(f" Files with name fixed: {files_fixed_names}")
print(f" Files with types fixed: {files_fixed_types}")
print(f" Total files modified: {total_checked}")
if args.dry_run:
print()
print("Run without --dry-run to apply changes.")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,97 @@
#!/usr/bin/env python3
"""
Fast fix for generic platform names - processes only files from stdin or file list.
"""
import yaml
import sys
from pathlib import Path
from datetime import datetime, timezone
GENERIC_NAMES = {'Home Website', 'Homepage Website', 'Welkom Website'}
INVALID_TYPES = {'ONLINEMARKETING', 'ONLINEBRANDING', 'ONLINEWEBSITE', 'ONLINE'}
def fix_file(filepath: Path, dry_run: bool = False) -> dict:
"""Fix a single file."""
stats = {'name_fixed': False, 'types_fixed': False, 'old_name': None, 'new_name': None, 'removed_types': []}
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
data = yaml.safe_load(content)
if not data or 'digital_platform_v2' not in data:
return stats
dpv2 = data['digital_platform_v2']
modified = False
# Fix generic names
current_name = dpv2.get('platform_name', '')
if current_name in GENERIC_NAMES:
org_name = None
if 'original_entry' in data and data['original_entry'].get('organisatie'):
org_name = data['original_entry']['organisatie']
elif 'museum_register_enrichment' in data and data['museum_register_enrichment'].get('museum_name'):
org_name = data['museum_register_enrichment']['museum_name']
elif 'wikidata_enrichment' in data and data['wikidata_enrichment'].get('wikidata_label_nl'):
org_name = data['wikidata_enrichment']['wikidata_label_nl']
if org_name:
new_name = f"{org_name} Website"
stats['old_name'] = current_name
stats['new_name'] = new_name
stats['name_fixed'] = True
dpv2['platform_name'] = new_name
modified = True
# Fix invalid types
if 'platform_type' in dpv2 and isinstance(dpv2['platform_type'], list):
original_types = dpv2['platform_type'].copy()
filtered_types = [t for t in original_types if t not in INVALID_TYPES]
if len(filtered_types) < len(original_types):
stats['removed_types'] = [t for t in original_types if t in INVALID_TYPES]
stats['types_fixed'] = True
dpv2['platform_type'] = filtered_types if filtered_types else ['INSTITUTIONAL_WEBSITE']
modified = True
if modified:
if '_transformation_metadata' not in dpv2:
dpv2['_transformation_metadata'] = {}
dpv2['_transformation_metadata']['quality_fix_date'] = datetime.now(timezone.utc).isoformat()
if not dry_run:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return stats
def main():
dry_run = '--dry-run' in sys.argv
file_list = sys.argv[1] if len(sys.argv) > 1 and not sys.argv[1].startswith('--') else None
if file_list:
with open(file_list) as f:
files = [Path(line.strip()) for line in f if line.strip()]
else:
files = [Path(line.strip()) for line in sys.stdin if line.strip()]
fixed_names = 0
fixed_types = 0
for filepath in files:
if not filepath.exists():
continue
stats = fix_file(filepath, dry_run=dry_run)
if stats['name_fixed'] or stats['types_fixed']:
if stats['name_fixed']:
fixed_names += 1
print(f"{filepath.name}: '{stats['old_name']}''{stats['new_name']}'")
if stats['types_fixed']:
fixed_types += 1
print(f" Removed: {stats['removed_types']}")
print(f"\n{'[DRY RUN] ' if dry_run else ''}Fixed: {fixed_names} names, {fixed_types} type lists")
if __name__ == '__main__':
main()

523
scripts/fix_ghcid_type.py Normal file
View file

@ -0,0 +1,523 @@
#!/usr/bin/env python3
"""
Fix GHCID type codes in Dutch custodian files.
This script corrects GHCID type codes (position 4) for files where the
type was incorrectly assigned. Common corrections:
- UM: Unknown should be Museum
- UI: Unknown should be Intangible Heritage
- UT: Unknown should be Taste/Smell Heritage
- XI: Mixed should be Intangible Heritage (single type)
Usage:
# Dry run (preview changes)
python scripts/fix_ghcid_type.py --dry-run
# Apply fixes
python scripts/fix_ghcid_type.py
# Process specific correction type only
python scripts/fix_ghcid_type.py --correction U-to-I --dry-run
python scripts/fix_ghcid_type.py --correction U-to-M
# Process a single file
python scripts/fix_ghcid_type.py --file data/custodian/NL-DR-FRE-U-FCFE.yaml --new-type I
Author: GLAM Data Quality Team
Date: 2025-12-14
"""
import argparse
import hashlib
import shutil
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import yaml
# GHCID namespace for UUID v5 generation (same as DNS namespace per project spec)
GHCID_NAMESPACE = uuid.UUID("6ba7b810-9dad-11d1-80b4-00c04fd430c8")
# Type code corrections: filename pattern -> new type code
# These are determined by analyzing original_entry.type_organisatie in each file
#
# Current U-type breakdown (173 files):
# - 143 files: type_organisatie: museum → should be M
# - 14 files: type_organisatie: intangible_heritage_custodian → should be I
# - 7 files: type_organisatie: unknown → keep as U (correct)
#
# Current X-type files (2 files):
# - Both are intangible_heritage_custodian → should be I (single type, not mixed)
#
TYPE_CORRECTIONS = {
# U→I: Intangible heritage custodians incorrectly marked as Unknown (14 files)
"U-to-I": {
"files": [
"NL-DR-FRE-U-FCFE.yaml",
"NL-GE-TIE-U-BO.yaml",
"NL-LI-VAL-U-C.yaml",
"NL-NH-AMS-U-C.yaml",
"NL-NH-ASS-U-HA.yaml",
"NL-NH-SAN-U-HSO.yaml",
"NL-OV-GEN-U-GB.yaml",
"NL-OV-GEN-U-GMS.yaml",
"NL-OV-OMM-U-EO.yaml",
"NL-OV-SAA-U-BS.yaml",
"NL-ZH-BOD-U-GB.yaml",
"NL-ZH-GOU-U-BI.yaml",
"NL-ZH-HIL-U-HHO.yaml",
"NL-ZH-LIS-U-HLO.yaml",
],
"old_type": "U",
"new_type": "I",
"reason": "Type corrected: intangible_heritage_custodian should use type I (Intangible Heritage), not U (Unknown)",
},
# X→I: Mixed type should be Intangible (single primary type) (2 files)
"X-to-I": {
"files": [
"NL-OV-KAL-X-BW.yaml",
"NL-GE-HAT-X-IGR.yaml",
],
"old_type": "X",
"new_type": "I",
"reason": "Type corrected: intangible_heritage_custodian should use type I (Intangible Heritage), not X (Mixed)",
},
# U→M: Museums incorrectly marked as Unknown (143 files)
# Use --auto-detect-museums flag to populate this list dynamically
"U-to-M": {
"files": [
# Auto-detected by checking original_entry.type_organisatie == "museum"
# Run with: python scripts/fix_ghcid_type.py --auto-detect-museums --dry-run
],
"old_type": "U",
"new_type": "M",
"reason": "Type corrected: museum should use type M (Museum), not U (Unknown)",
},
}
def generate_uuid_v5(ghcid_string: str) -> str:
"""Generate deterministic UUID v5 from GHCID string."""
return str(uuid.uuid5(GHCID_NAMESPACE, ghcid_string))
def generate_uuid_v8_sha256(ghcid_string: str) -> str:
"""Generate UUID v8 from SHA-256 hash of GHCID string."""
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
# Take first 16 bytes for UUID
uuid_bytes = bytearray(sha256_hash[:16])
# Set version to 8 (custom)
uuid_bytes[6] = (uuid_bytes[6] & 0x0f) | 0x80
# Set variant to RFC 4122
uuid_bytes[8] = (uuid_bytes[8] & 0x3f) | 0x80
return str(uuid.UUID(bytes=bytes(uuid_bytes)))
def generate_numeric_id(ghcid_string: str) -> int:
"""Generate 64-bit numeric ID from SHA-256 hash."""
sha256_hash = hashlib.sha256(ghcid_string.encode('utf-8')).digest()
# Take first 8 bytes as 64-bit unsigned integer
numeric_id = int.from_bytes(sha256_hash[:8], byteorder='big')
return numeric_id
def fix_ghcid_type(ghcid: str, old_type: str, new_type: str) -> str:
"""
Replace the type code in a GHCID string.
GHCID format: CC-RR-CCC-T-ABBREV[-suffix]
Position 4 (0-indexed 3) is the type code.
Examples:
NL-DR-FRE-U-FCFE NL-DR-FRE-I-FCFE
NL-OV-KAL-X-BW NL-OV-KAL-I-BW
"""
parts = ghcid.split('-')
if len(parts) < 5:
raise ValueError(f"Invalid GHCID format: {ghcid}")
current_type = parts[3]
if current_type != old_type:
raise ValueError(f"Expected type '{old_type}' but found '{current_type}' in GHCID: {ghcid}")
parts[3] = new_type
return '-'.join(parts)
def process_file(
file_path: Path,
old_type: str,
new_type: str,
reason: str,
dry_run: bool = True
) -> Optional[dict]:
"""
Process a single YAML file to fix GHCID type code.
Returns dict with change info, or None if no change needed or error.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
print(f" Error reading {file_path}: {e}")
return None
if not data or 'ghcid' not in data:
print(f" Warning: No ghcid section in {file_path}")
return None
ghcid_section = data.get('ghcid', {})
old_ghcid = ghcid_section.get('ghcid_current', '')
if not old_ghcid:
print(f" Warning: No ghcid_current in {file_path}")
return None
# Check if the type matches what we expect to fix
parts = old_ghcid.split('-')
if len(parts) < 5:
print(f" Warning: Invalid GHCID format in {file_path}: {old_ghcid}")
return None
current_type = parts[3]
if current_type != old_type:
print(f" Skipping {file_path}: type is '{current_type}', expected '{old_type}'")
return None
# Fix the GHCID
try:
new_ghcid = fix_ghcid_type(old_ghcid, old_type, new_type)
except ValueError as e:
print(f" Error: {e}")
return None
if new_ghcid == old_ghcid:
return None
# Generate new identifiers
new_uuid_v5 = generate_uuid_v5(new_ghcid)
new_uuid_v8 = generate_uuid_v8_sha256(new_ghcid)
new_numeric = generate_numeric_id(new_ghcid)
timestamp_now = datetime.now(timezone.utc).isoformat()
change_info = {
'file': str(file_path),
'old_ghcid': old_ghcid,
'new_ghcid': new_ghcid,
'old_type': old_type,
'new_type': new_type,
'old_uuid': ghcid_section.get('ghcid_uuid', ''),
'new_uuid': new_uuid_v5,
'old_numeric': ghcid_section.get('ghcid_numeric', 0),
'new_numeric': new_numeric,
}
if dry_run:
return change_info
# Update ghcid section
ghcid_section['ghcid_current'] = new_ghcid
ghcid_section['ghcid_uuid'] = new_uuid_v5
ghcid_section['ghcid_uuid_sha256'] = new_uuid_v8
ghcid_section['ghcid_numeric'] = new_numeric
# Keep ghcid_original as-is (for historical reference)
# Add history entry for the fix
ghcid_history = ghcid_section.get('ghcid_history', [])
# Add new entry at the beginning
new_history_entry = {
'ghcid': new_ghcid,
'ghcid_numeric': new_numeric,
'valid_from': timestamp_now,
'reason': reason,
}
# Mark previous entry as superseded
if ghcid_history:
if 'valid_to' not in ghcid_history[0] or ghcid_history[0]['valid_to'] is None:
ghcid_history[0]['valid_to'] = timestamp_now
ghcid_history[0]['superseded_by'] = new_ghcid
ghcid_section['ghcid_history'] = [new_history_entry] + ghcid_history
data['ghcid'] = ghcid_section
# Update identifiers section
identifiers = data.get('identifiers', [])
for ident in identifiers:
scheme = ident.get('identifier_scheme')
if scheme == 'GHCID':
ident['identifier_value'] = new_ghcid
elif scheme == 'GHCID_UUID':
ident['identifier_value'] = new_uuid_v5
ident['identifier_url'] = f"urn:uuid:{new_uuid_v5}"
elif scheme == 'GHCID_UUID_SHA256':
ident['identifier_value'] = new_uuid_v8
ident['identifier_url'] = f"urn:uuid:{new_uuid_v8}"
elif scheme == 'GHCID_NUMERIC':
ident['identifier_value'] = str(new_numeric)
data['identifiers'] = identifiers
# Also update original_entry.type if present (to keep consistency)
if 'original_entry' in data and 'type' in data['original_entry']:
# Update type list to use new type
current_types = data['original_entry']['type']
if isinstance(current_types, list):
# Replace old type with new type in the list
data['original_entry']['type'] = [
new_type if t == old_type else t for t in current_types
]
# Write updated file
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
# Rename file to match new GHCID
old_filename = file_path.name
new_filename = f"{new_ghcid}.yaml"
if old_filename != new_filename:
new_file_path = file_path.parent / new_filename
if new_file_path.exists():
print(f" Warning: Target file already exists: {new_file_path}")
# Don't rename if target exists
else:
shutil.move(str(file_path), str(new_file_path))
change_info['new_file'] = str(new_file_path)
return change_info
def find_files_for_correction(
custodian_dir: Path,
correction_key: str
) -> list[Path]:
"""Find files that need the specified type correction."""
correction = TYPE_CORRECTIONS.get(correction_key)
if not correction:
print(f"Unknown correction type: {correction_key}")
return []
files = []
for filename in correction['files']:
file_path = custodian_dir / filename
if file_path.exists():
files.append(file_path)
else:
print(f" Warning: File not found: {file_path}")
return files
def auto_detect_museum_files(custodian_dir: Path) -> list[Path]:
"""
Auto-detect files where type should be M (Museum) based on:
- original_entry.type_organisatie == "museum"
- Current GHCID type is U (Unknown)
"""
museum_files = []
# Find all NL-*-U-*.yaml files (Dutch files with Unknown type)
for file_path in custodian_dir.glob("NL-*-U-*.yaml"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
# Check if type_organisatie indicates this is a museum
orig_entry = data.get('original_entry', {})
type_org = orig_entry.get('type_organisatie', '').lower()
if type_org == 'museum':
museum_files.append(file_path)
except Exception:
continue
return museum_files
def main():
parser = argparse.ArgumentParser(
description="Fix GHCID type codes in Dutch custodian files"
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Preview changes without modifying files'
)
parser.add_argument(
'--correction',
choices=['U-to-I', 'U-to-M', 'X-to-I', 'all'],
default='all',
help='Which correction type to apply (default: all)'
)
parser.add_argument(
'--file',
type=str,
help='Process a single file instead of batch'
)
parser.add_argument(
'--new-type',
type=str,
help='New type code when processing single file'
)
parser.add_argument(
'--auto-detect-museums',
action='store_true',
help='Auto-detect museum files based on type_organisatie field'
)
parser.add_argument(
'--custodian-dir',
type=str,
default='data/custodian',
help='Path to custodian directory (default: data/custodian)'
)
args = parser.parse_args()
# Find project root (where data/ directory is)
script_dir = Path(__file__).parent
project_root = script_dir.parent
custodian_dir = project_root / args.custodian_dir
if not custodian_dir.exists():
print(f"Error: Custodian directory not found: {custodian_dir}")
return 1
print(f"GHCID Type Correction Script")
print(f"{'=' * 50}")
print(f"Mode: {'DRY RUN' if args.dry_run else 'APPLY CHANGES'}")
print(f"Custodian directory: {custodian_dir}")
print()
all_changes = []
# Single file mode
if args.file:
if not args.new_type:
print("Error: --new-type is required when using --file")
return 1
file_path = Path(args.file)
if not file_path.is_absolute():
file_path = project_root / file_path
if not file_path.exists():
print(f"Error: File not found: {file_path}")
return 1
# Detect old type from filename
parts = file_path.stem.split('-')
if len(parts) >= 4:
old_type = parts[3]
else:
print(f"Error: Cannot determine type from filename: {file_path}")
return 1
reason = f"Type corrected: {old_type}{args.new_type} (manual correction)"
print(f"Processing single file: {file_path}")
change = process_file(file_path, old_type, args.new_type, reason, args.dry_run)
if change:
all_changes.append(change)
# Auto-detect museum files
elif args.auto_detect_museums:
print("Auto-detecting museum files...")
museum_files = auto_detect_museum_files(custodian_dir)
print(f"Found {len(museum_files)} museum files with type U")
# Update the U-to-M correction with detected files
TYPE_CORRECTIONS['U-to-M']['files'] = [f.name for f in museum_files]
# Process them
correction = TYPE_CORRECTIONS['U-to-M']
for file_path in museum_files:
change = process_file(
file_path,
correction['old_type'],
correction['new_type'],
correction['reason'],
args.dry_run
)
if change:
all_changes.append(change)
# Batch mode
else:
corrections_to_apply = []
if args.correction == 'all':
corrections_to_apply = list(TYPE_CORRECTIONS.keys())
else:
corrections_to_apply = [args.correction]
for correction_key in corrections_to_apply:
correction = TYPE_CORRECTIONS[correction_key]
if not correction['files']:
print(f"\nSkipping {correction_key}: no files specified")
continue
print(f"\nProcessing {correction_key}:")
print(f" {correction['old_type']}{correction['new_type']}")
print(f" Files: {len(correction['files'])}")
files = find_files_for_correction(custodian_dir, correction_key)
for file_path in files:
change = process_file(
file_path,
correction['old_type'],
correction['new_type'],
correction['reason'],
args.dry_run
)
if change:
all_changes.append(change)
# Summary
print(f"\n{'=' * 50}")
print(f"SUMMARY")
print(f"{'=' * 50}")
if not all_changes:
print("No changes needed or no matching files found.")
return 0
print(f"Total changes: {len(all_changes)}")
print()
# Group by type change
by_type_change = {}
for change in all_changes:
key = f"{change['old_type']}{change['new_type']}"
if key not in by_type_change:
by_type_change[key] = []
by_type_change[key].append(change)
for key, changes in sorted(by_type_change.items()):
print(f"\n{key}: {len(changes)} files")
for change in changes:
print(f" {change['old_ghcid']}{change['new_ghcid']}")
if 'new_file' in change:
print(f" Renamed to: {Path(change['new_file']).name}")
if args.dry_run:
print(f"\n{'=' * 50}")
print("DRY RUN - No files were modified.")
print("Run without --dry-run to apply changes.")
else:
print(f"\n{'=' * 50}")
print(f"Successfully updated {len(all_changes)} files.")
return 0
if __name__ == '__main__':
exit(main())

View file

@ -0,0 +1,269 @@
#!/usr/bin/env python3
"""
Fix Simon Kemper contamination in entity profiles.
For entries where:
1. Name is "Simon Kemper"
2. But the LinkedIn slug clearly indicates a different person
We derive the correct name from the slug and update the profile.
IMPORTANT: Per Rule 21 (Data Fabrication Prohibition) - if we cannot reliably
derive the name from the slug, we mark it as "Unknown" rather than guessing.
Compound slugs without hyphens (like "jponjee") cannot be reliably parsed.
"""
import json
import os
import re
from pathlib import Path
from urllib.parse import unquote
from datetime import datetime, timezone
def is_compound_slug(slug: str) -> bool:
"""Check if slug is a compound name without separators.
Returns True for slugs like:
- 'jponjee' (no hyphens, all lowercase)
- 'sharellyemanuelson'
- 'addieroelofsen'
- 'adheliap'
Returns False for slugs like:
- 'willem-blok' (has hyphens)
- 'jan-van-den-borre' (has hyphens)
- 'miriam-h' (has hyphens, even if short)
- 'olivi%C3%AB-7153658' (has hyphens after URL decoding)
"""
# First decode URL encoding (e.g., %C3%AB -> ë)
slug = unquote(slug)
# After removing trailing ID, check if there are NO hyphens
clean_slug = re.sub(r'[-_][\da-f]{6,}$', '', slug)
clean_slug = re.sub(r'[-_]\d{5,}$', '', clean_slug)
# If no hyphens remain, it's a compound slug that can't be reliably parsed
# Even short ones like "jponjee" (7 chars) could be "J. Ponjee" or "J Ponjee"
if '-' not in clean_slug:
return True
return False
def slug_to_name(slug: str) -> tuple[str, bool]:
"""Convert a LinkedIn slug to a human-readable name.
Returns:
tuple: (name, is_reliable) where:
- name: The derived name or "Unknown"
- is_reliable: True if we're confident in the derivation
Examples:
'willem-blok-b6a46648' -> ('Willem Blok', True)
'dave-van-den-nieuwenhof-4446b3146' -> ('Dave van den Nieuwenhof', True)
'olivi%C3%AB-7153658' -> ('Olivië', True)
'jponjee' -> ('Unknown', False) # Compound slug, cannot parse reliably
'sharellyemanuelson' -> ('Unknown', False) # Compound slug
"""
# Decode URL encoding
slug = unquote(slug)
# Remove trailing ID (hex or numeric)
clean_slug = re.sub(r'[-_][\da-f]{6,}$', '', slug)
clean_slug = re.sub(r'[-_]\d{5,}$', '', clean_slug)
# Check if this is a compound slug we can't reliably parse
if is_compound_slug(slug):
return ("Unknown", False)
# Split by hyphens
parts = clean_slug.split('-')
# Filter out empty parts
parts = [p for p in parts if p]
if not parts:
return ("Unknown", False)
# Capitalize appropriately
# Dutch particles that should stay lowercase: van, de, den, der, het, 't
dutch_particles = {'van', 'de', 'den', 'der', 'het', 't', "'t"}
name_parts = []
for i, part in enumerate(parts):
if part.lower() in dutch_particles and i > 0:
name_parts.append(part.lower())
else:
# Capitalize first letter, preserve rest
name_parts.append(part.capitalize())
name = ' '.join(name_parts)
# Additional validation - name should have at least 2 characters
if len(name) < 2:
return ("Unknown", False)
return (name, True)
def fix_contaminated_files(entity_dir: Path, dry_run: bool = True):
"""Find and fix Simon Kemper contaminated files.
Only processes files where name is ACTUALLY "Simon Kemper" (contaminated).
Skips files where name was already corrected or was never contaminated.
Returns:
tuple: (contaminated_list, fixed_list, unreliable_list)
"""
contaminated = []
fixed = []
unreliable = [] # Files where we couldn't reliably derive the name
for filepath in entity_dir.glob("*.json"):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, IOError):
continue
# Check if this is a Simon Kemper contamination
profile_name = data.get('profile_data', {}).get('name', '')
source_name = data.get('source_staff_info', {}).get('name', '')
# ONLY process files where the name is ACTUALLY "Simon Kemper"
if profile_name != 'Simon Kemper' and source_name != 'Simon Kemper':
continue
# Get the slug from filename or URL
filename = filepath.name
linkedin_url = data.get('extraction_metadata', {}).get('linkedin_url', '')
# Extract slug from URL
slug_match = re.search(r'/in/([^/]+)/?$', linkedin_url)
if not slug_match:
continue
slug = slug_match.group(1)
# Check if this is truly contamination (slug doesn't match simon kemper)
slug_lower = slug.lower().replace('%', '')
if 'simonkemper' in slug_lower or 'simon-kemper' in slug_lower:
# This is the real Simon Kemper, skip
continue
# Derive correct name from slug
correct_name, is_reliable = slug_to_name(slug)
entry = {
'file': filepath.name,
'slug': slug,
'profile_name': profile_name,
'source_name': source_name,
'contaminated_field': 'profile_data.name' if profile_name == 'Simon Kemper' else 'source_staff_info.name',
'correct_name': correct_name,
'is_reliable': is_reliable,
'headline': data.get('profile_data', {}).get('headline', ''),
'custodian': data.get('affiliations', [{}])[0].get('custodian_name', '') if data.get('affiliations') else ''
}
if is_reliable:
contaminated.append(entry)
else:
unreliable.append(entry)
if not dry_run:
# Fix the data
if 'profile_data' in data:
data['profile_data']['name'] = correct_name
if 'source_staff_info' in data:
data['source_staff_info']['name'] = correct_name
# Add fix metadata
if 'extraction_metadata' not in data:
data['extraction_metadata'] = {}
if is_reliable:
fix_note = f"Name corrected from 'Simon Kemper' (contamination) to '{correct_name}' (derived from slug) on {datetime.now(timezone.utc).isoformat()}"
else:
fix_note = f"Name set to 'Unknown' (was 'Simon Kemper' contamination). Original slug: {slug}. Compound slug cannot be reliably parsed. Fixed on {datetime.now(timezone.utc).isoformat()}"
# Also preserve slug in a dedicated field for future reference
data['extraction_metadata']['original_slug'] = slug
existing_notes = data['extraction_metadata'].get('notes', '')
if existing_notes:
data['extraction_metadata']['notes'] = f"{existing_notes} | {fix_note}"
else:
data['extraction_metadata']['notes'] = fix_note
# Write back
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
fixed.append(filepath.name)
return contaminated, fixed, unreliable
def main():
import argparse
parser = argparse.ArgumentParser(description='Fix Simon Kemper contamination')
parser.add_argument('--fix', action='store_true', help='Actually fix files (default: dry run)')
args = parser.parse_args()
entity_dir = Path("/Users/kempersc/apps/glam/data/custodian/person/entity")
dry_run = not args.fix
mode = "DRY RUN" if dry_run else "FIXING"
print("=" * 80)
print(f"SIMON KEMPER CONTAMINATION FIX - {mode}")
print("=" * 80)
contaminated, fixed, unreliable = fix_contaminated_files(entity_dir, dry_run=dry_run)
print(f"\n{'='*40}")
print(f"RELIABLY PARSEABLE ({len(contaminated)} files)")
print(f"{'='*40}")
print("These slugs have hyphens and can be reliably converted to names:\n")
for c in contaminated:
print(f" File: {c['file']}")
print(f" Slug: {c['slug']}")
print(f" Contaminated: {c['contaminated_field']} = 'Simon Kemper'")
print(f" Correct name: '{c['correct_name']}'")
headline = c['headline']
print(f" Headline: {headline[:60]}..." if len(headline) > 60 else f" Headline: {headline}")
print(f" Custodian: {c['custodian']}")
print()
if unreliable:
print(f"\n{'='*40}")
print(f"COMPOUND SLUGS - SET TO 'Unknown' ({len(unreliable)} files)")
print(f"{'='*40}")
print("These slugs have no hyphens and cannot be reliably parsed.")
print("Per Rule 21: Names will be set to 'Unknown' (no hallucination).\n")
for u in unreliable:
print(f" File: {u['file']}")
print(f" Slug: {u['slug']}")
print(f" Contaminated: {u['contaminated_field']} = 'Simon Kemper'")
print(f" Will be set to: 'Unknown' (slug preserved in metadata)")
headline = u['headline']
print(f" Headline: {headline[:60]}..." if len(headline) > 60 else f" Headline: {headline}")
print(f" Custodian: {u['custodian']}")
print()
print(f"\n{'='*40}")
print("SUMMARY")
print(f"{'='*40}")
print(f" Reliably fixable: {len(contaminated)}")
print(f" Set to 'Unknown': {len(unreliable)}")
print(f" Total: {len(contaminated) + len(unreliable)}")
if not dry_run:
print(f"\n✅ Fixed {len(fixed)} files")
else:
print(f"\n⚠️ DRY RUN - No files modified. Run with --fix to apply changes.")
if __name__ == "__main__":
main()

View file

@ -99,6 +99,62 @@ NON_HERITAGE_KEYWORDS = [
'organiser', 'opruimhulp', 'verpleeg', 'nurse'
]
# Organizations that are explicitly NOT heritage institutions
# These should never be classified as heritage-relevant
NON_HERITAGE_ORGANIZATIONS = [
# Banks & Financial
'ing ', 'ing nederland', 'rabobank', 'abn amro', 'postbank', 'triodos',
# Security companies
'i-sec', 'g4s', 'securitas', 'trigion', 'chubb',
# Police/Government (non-cultural)
'politie', 'police', 'rijkswaterstaat', 'belastingdienst', 'douane', 'defensie',
# Political parties
'vvd', 'pvda', 'cda', 'd66', 'groenlinks', 'pvv', 'bbb', 'nsc', 'volt',
'sp ', 'forum voor democratie', 'ja21', 'bij1', 'denk', 'sgp', 'cu ',
# Tech companies (non-heritage)
'google', 'microsoft', 'amazon', 'meta', 'facebook', 'apple', 'netflix',
'uber', 'airbnb', 'booking.com', 'adyen', 'mollie', 'messagebird',
'coolblue', 'bol.com', 'picnic', 'takeaway', 'just eat',
# Telecom
'kpn', 'vodafone', 't-mobile', 'ziggo',
# Postal / Logistics
'postnl', 'postkantoren', 'dhl', 'ups', 'fedex',
# Healthcare
'ziekenhuis', 'hospital', 'ggz', 'ggd', 'thuiszorg',
# Retail
'albert heijn', 'jumbo', 'lidl', 'aldi', 'ikea', 'hema', 'action',
# Consulting / Professional services
'deloitte', 'kpmg', 'pwc', 'ey ', 'ernst & young', 'mckinsey', 'bcg',
'accenture', 'capgemini', 'ordina', 'atos', 'cgi ',
# Recruitment / HR
'randstad', 'tempo-team', 'manpower', 'hays', 'brunel',
# Energy / Utilities
'shell', 'bp ', 'eneco', 'vattenfall', 'essent', 'nuon',
# Transport
'ns ', 'prorail', 'schiphol', 'klm', 'transavia',
# Other
'freelance', 'zelfstandig', 'zzp', 'eigen bedrijf',
]
# Heritage organization keywords - organizations that ARE heritage institutions
# Used to validate that 'D' (Digital) roles are actually at heritage orgs
HERITAGE_ORGANIZATION_KEYWORDS = [
# Archives
'archief', 'archive', 'nationaal archief', 'stadsarchief', 'regionaal archief',
'beeld en geluid', 'beeld & geluid', 'niod', 'iish', 'iisg',
# Museums
'museum', 'musea', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis',
'tropenmuseum', 'allard pierson', 'kröller', 'boijmans',
# Libraries
'bibliotheek', 'library', 'koninklijke bibliotheek', 'kb ',
# Film/AV heritage
'eye film', 'filmmuseum', 'eye ', 'sound and vision',
# Heritage platforms
'erfgoed', 'heritage', 'cultural', 'cultureel',
# Research institutes (heritage-focused)
'knaw', 'humanities cluster', 'meertens', 'huygens',
]
# Lines that indicate LinkedIn UI noise (to skip entirely)
NOISE_EXACT = {
'0 notifications', 'Search', 'Home', 'My Network', 'Jobs', 'Messaging',
@ -276,16 +332,35 @@ def is_location_line(line: str) -> bool:
def detect_heritage_type(headline: str) -> tuple[bool, Optional[str]]:
"""
Detect if a headline is heritage-relevant and what type.
Two-stage classification:
1. Check if organization is explicitly non-heritage (blocklist)
2. Check if role/organization matches heritage patterns
For 'D' (Digital) type, require BOTH a tech role AND a heritage organization.
"""
headline_lower = headline.lower()
# Check for non-heritage indicators
# Stage 1: Check for non-heritage organizations (blocklist)
for org in NON_HERITAGE_ORGANIZATIONS:
if org.lower() in headline_lower:
return (False, None)
# Stage 2: Check for non-heritage role indicators
for keyword in NON_HERITAGE_KEYWORDS:
if keyword.lower() in headline_lower:
return (False, None)
# Stage 3: Check if this is a heritage organization
is_heritage_org = False
for org_keyword in HERITAGE_ORGANIZATION_KEYWORDS:
if org_keyword.lower() in headline_lower:
is_heritage_org = True
break
# Check heritage keywords by type (order matters - more specific first)
type_order = ['A', 'M', 'L', 'O', 'R', 'E', 'D', 'G', 'S', 'C']
# 'D' (Digital) is checked last and requires heritage org validation
type_order = ['A', 'M', 'L', 'G', 'S', 'C', 'O', 'R', 'E'] # D removed from here
for heritage_type in type_order:
keywords = HERITAGE_KEYWORDS.get(heritage_type, [])
@ -293,7 +368,15 @@ def detect_heritage_type(headline: str) -> tuple[bool, Optional[str]]:
if keyword.lower() in headline_lower:
return (True, heritage_type)
# Generic heritage terms
# Special handling for 'D' (Digital) - ONLY if at a heritage organization
# This prevents generic IT workers from being classified as heritage-relevant
if is_heritage_org:
digital_keywords = HERITAGE_KEYWORDS.get('D', [])
for keyword in digital_keywords:
if keyword.lower() in headline_lower:
return (True, 'D')
# Generic heritage terms (without specific type)
generic_heritage = [
'heritage', 'erfgoed', 'culture', 'cultuur', 'cultural',
'film', 'cinema', 'media', 'arts', 'kunst', 'creative',

View file

@ -66,6 +66,62 @@ NON_HERITAGE_KEYWORDS = [
'investment', 'e-commerce', 'organiser', 'opruimhulp', 'verpleeg', 'nurse'
]
# Organizations that are explicitly NOT heritage institutions
# These should never be classified as heritage-relevant
NON_HERITAGE_ORGANIZATIONS = [
# Banks & Financial
'ing ', 'ing nederland', 'rabobank', 'abn amro', 'postbank', 'triodos',
# Security companies
'i-sec', 'g4s', 'securitas', 'trigion', 'chubb',
# Police/Government (non-cultural)
'politie', 'police', 'rijkswaterstaat', 'belastingdienst', 'douane', 'defensie',
# Political parties
'vvd', 'pvda', 'cda', 'd66', 'groenlinks', 'pvv', 'bbb', 'nsc', 'volt',
'sp ', 'forum voor democratie', 'ja21', 'bij1', 'denk', 'sgp', 'cu ',
# Tech companies (non-heritage)
'google', 'microsoft', 'amazon', 'meta', 'facebook', 'apple', 'netflix',
'uber', 'airbnb', 'booking.com', 'adyen', 'mollie', 'messagebird',
'coolblue', 'bol.com', 'picnic', 'takeaway', 'just eat',
# Telecom
'kpn', 'vodafone', 't-mobile', 'ziggo',
# Postal / Logistics
'postnl', 'postkantoren', 'dhl', 'ups', 'fedex',
# Healthcare
'ziekenhuis', 'hospital', 'ggz', 'ggd', 'thuiszorg',
# Retail
'albert heijn', 'jumbo', 'lidl', 'aldi', 'ikea', 'hema', 'action',
# Consulting / Professional services
'deloitte', 'kpmg', 'pwc', 'ey ', 'ernst & young', 'mckinsey', 'bcg',
'accenture', 'capgemini', 'ordina', 'atos', 'cgi ',
# Recruitment / HR
'randstad', 'tempo-team', 'manpower', 'hays', 'brunel',
# Energy / Utilities
'shell', 'bp ', 'eneco', 'vattenfall', 'essent', 'nuon',
# Transport
'ns ', 'prorail', 'schiphol', 'klm', 'transavia',
# Other
'freelance', 'zelfstandig', 'zzp', 'eigen bedrijf',
]
# Heritage organization keywords - organizations that ARE heritage institutions
# Used to validate that 'D' (Digital) roles are actually at heritage orgs
HERITAGE_ORGANIZATION_KEYWORDS = [
# Archives
'archief', 'archive', 'nationaal archief', 'stadsarchief', 'regionaal archief',
'beeld en geluid', 'beeld & geluid', 'niod', 'iish', 'iisg',
# Museums
'museum', 'musea', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis',
'tropenmuseum', 'allard pierson', 'kröller', 'boijmans',
# Libraries
'bibliotheek', 'library', 'koninklijke bibliotheek', 'kb ',
# Film/AV heritage
'eye film', 'filmmuseum', 'eye ', 'sound and vision',
# Heritage platforms
'erfgoed', 'heritage', 'cultural', 'cultureel',
# Research institutes (heritage-focused)
'knaw', 'humanities cluster', 'meertens', 'huygens',
]
# LinkedIn status phrases that pollute name fields (extracted from img alt text)
# These should be removed from names and stored as metadata
LINKEDIN_STATUS_PHRASES = [
@ -168,8 +224,8 @@ class LinkedInProfileCardParser(HTMLParser):
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]) -> None:
attrs_dict = dict(attrs)
attr_id = attrs_dict.get('id', '')
attr_class = attrs_dict.get('class', '')
attr_id = attrs_dict.get('id') or ''
attr_class = attrs_dict.get('class') or ''
# Detect profile card start - can be on <a> tag (regular) OR <img> tag (anonymous)
if 'org-people-profile-card__profile-image' in attr_id:
@ -367,28 +423,58 @@ class LinkedInProfileCardParser(HTMLParser):
def detect_heritage_type(headline: str) -> tuple[bool, Optional[str]]:
"""Detect if a headline is heritage-relevant and what type."""
"""
Detect if a headline is heritage-relevant and what type.
Two-stage classification:
1. Check if organization is explicitly non-heritage (blocklist)
2. Check if role/organization matches heritage patterns
For 'D' (Digital) type, require BOTH a tech role AND a heritage organization.
This prevents generic IT workers at banks/police from being classified as heritage.
"""
if not headline:
return (False, None)
headline_lower = headline.lower()
# Check non-heritage first
# Stage 1: Check for non-heritage organizations (blocklist)
for org in NON_HERITAGE_ORGANIZATIONS:
if org.lower() in headline_lower:
return (False, None)
# Stage 2: Check for non-heritage role indicators
for keyword in NON_HERITAGE_KEYWORDS:
if keyword.lower() in headline_lower:
return (False, None)
# Check heritage keywords by type
type_order = ['A', 'M', 'L', 'O', 'R', 'E', 'D', 'G', 'S', 'C']
# Stage 3: Check if this is a heritage organization
is_heritage_org = False
for org_keyword in HERITAGE_ORGANIZATION_KEYWORDS:
if org_keyword.lower() in headline_lower:
is_heritage_org = True
break
# Check heritage keywords by type (order matters - more specific first)
# 'D' (Digital) is checked last and requires heritage org validation
type_order = ['A', 'M', 'L', 'G', 'S', 'C', 'O', 'R', 'E'] # D removed from main loop
for heritage_type in type_order:
keywords = HERITAGE_KEYWORDS.get(heritage_type, [])
for keyword in keywords:
if keyword.lower() in headline_lower:
return (True, heritage_type)
# Generic heritage terms
# Special handling for 'D' (Digital) - ONLY if at a heritage organization
if is_heritage_org:
digital_keywords = HERITAGE_KEYWORDS.get('D', [])
for keyword in digital_keywords:
if keyword.lower() in headline_lower:
return (True, 'D')
# Generic heritage terms (without specific type)
generic = ['heritage', 'erfgoed', 'culture', 'cultuur', 'cultural', 'film', 'cinema',
'media', 'arts', 'kunst', 'creative', 'preservation', 'conservation']
'media', 'arts', 'kunst', 'creative', 'preservation', 'conservation', 'collection']
for keyword in generic:
if keyword in headline_lower:
return (True, None)

View file

@ -0,0 +1,445 @@
#!/usr/bin/env python3
"""
Comprehensive data quality scan for Dutch custodian YAML files.
Identifies issues like wrong GHCID types, missing web claims, Google Maps mismatches, etc.
"""
import os
import re
import yaml
from pathlib import Path
from collections import defaultdict
from datetime import datetime
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
# Issue categories
issues = defaultdict(list)
def extract_ghcid_type(filename):
"""Extract type code from GHCID filename (e.g., NL-ZH-ZOE-A-SAZS -> A)"""
match = re.match(r'NL-[A-Z]{2}-[A-Z]{3}-([A-Z])-', filename)
return match.group(1) if match else None
def get_expected_type(data):
"""Determine expected type from original_entry or other fields"""
# Check original_entry.type
if 'original_entry' in data:
oe = data['original_entry']
if 'type' in oe and oe['type']:
types = oe['type']
if isinstance(types, list) and len(types) > 0:
return types[0]
if 'type_organisatie' in oe:
type_org = oe['type_organisatie']
if type_org:
type_map = {
'archive': 'A', 'archief': 'A',
'library': 'L', 'bibliotheek': 'L',
'museum': 'M',
'gallery': 'G', 'galerie': 'G',
}
return type_map.get(type_org.lower(), None)
return None
def check_google_maps_mismatch(data, filename):
"""Check if Google Maps name doesn't match organization name"""
if 'google_maps_enrichment' not in data:
return None
gm = data['google_maps_enrichment']
gm_name = gm.get('name', '')
# Get original org name
org_name = ''
if 'original_entry' in data:
org_name = data['original_entry'].get('organisatie', '')
if 'custodian_name' in data:
cn = data['custodian_name']
if isinstance(cn, dict):
org_name = cn.get('claim_value', org_name)
if not gm_name or not org_name:
return None
# Simple similarity check - if names share less than 30% of words, flag it
gm_words = set(gm_name.lower().split())
org_words = set(org_name.lower().split())
# Remove common words
stopwords = {'de', 'het', 'van', 'en', 'in', 'te', 'der', 'voor', 'stichting', 'vereniging'}
gm_words = gm_words - stopwords
org_words = org_words - stopwords
if len(gm_words) == 0 or len(org_words) == 0:
return None
overlap = len(gm_words & org_words)
similarity = overlap / max(len(gm_words), len(org_words))
if similarity < 0.3:
return {
'google_name': gm_name,
'org_name': org_name,
'similarity': round(similarity, 2)
}
return None
def check_absolute_paths(data, filename):
"""Check for absolute paths that should be relative"""
yaml_str = yaml.dump(data, default_flow_style=False)
abs_paths = []
patterns = [
r'/Volumes/KINGSTON/',
r'/Users/kempersc/',
r'/mnt/',
r'C:\\',
r'D:\\'
]
for pattern in patterns:
if re.search(pattern, yaml_str):
abs_paths.append(pattern.rstrip('/\\'))
return abs_paths if abs_paths else None
def check_web_claims(data, filename):
"""Check web claims quality"""
issues_found = []
if 'web_claims' not in data:
return ['no_web_claims']
wc = data['web_claims']
# Check if claims exist
claims = wc.get('claims', [])
if not claims:
issues_found.append('empty_claims')
# Check for verified_claims
if 'verified_claims' not in wc:
issues_found.append('no_verified_claims')
else:
vc = wc['verified_claims']
if isinstance(vc, dict):
vc_claims = vc.get('claims', [])
# Check for XPath provenance
claims_without_xpath = 0
for claim in vc_claims:
if isinstance(claim, dict) and 'xpath' not in claim:
claims_without_xpath += 1
if claims_without_xpath > 0:
issues_found.append(f'claims_missing_xpath:{claims_without_xpath}')
return issues_found if issues_found else None
def check_coordinates(data, filename):
"""Check for coordinate issues"""
issues_found = []
# Check if location exists
if 'location' not in data:
issues_found.append('no_location')
return issues_found
loc = data['location']
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is None or lon is None:
issues_found.append('missing_coordinates')
elif not (50.5 < lat < 53.7 and 3.3 < lon < 7.3):
# Rough Netherlands bounding box
issues_found.append('coordinates_outside_netherlands')
# Check if coordinates from Google Maps differ significantly from corrected
if 'coordinate_provenance' in loc:
prov = loc['coordinate_provenance']
if 'previous_coordinates' in prov:
issues_found.append('has_coordinate_correction')
return issues_found if issues_found else None
def check_digital_platforms(data, filename):
"""Check for missing digital platforms"""
if 'digital_platforms' not in data or not data['digital_platforms']:
return ['no_digital_platforms']
platforms = data['digital_platforms']
if len(platforms) == 0:
return ['empty_digital_platforms']
return None
def check_identifiers(data, filename):
"""Check identifier completeness"""
issues_found = []
if 'identifiers' not in data:
issues_found.append('no_identifiers')
return issues_found
ids = data['identifiers']
id_types = [i.get('identifier_scheme') for i in ids if isinstance(i, dict)]
if 'ISIL' not in id_types:
issues_found.append('no_isil')
if 'GHCID' not in id_types:
issues_found.append('no_ghcid')
return issues_found if issues_found else None
def check_wikidata(data, filename):
"""Check Wikidata enrichment status"""
if 'wikidata_enrichment' not in data:
return 'no_wikidata_enrichment'
wd = data['wikidata_enrichment']
status = wd.get('status', '')
if status == 'NOT_FOUND':
return 'wikidata_not_found'
elif status in ['SUCCESS', 'ENRICHED']:
return None
else:
return f'wikidata_status:{status}'
def check_url(data, filename):
"""Check URL issues"""
issues_found = []
url = data.get('url', '')
if not url:
issues_found.append('no_url')
elif url.startswith('http://'):
issues_found.append('http_not_https')
# Check if URL was corrected (indicates previous wrong URL)
if 'url_correction' in data:
issues_found.append('has_url_correction')
return issues_found if issues_found else None
def scan_file(filepath):
"""Scan a single file for all issue types"""
filename = filepath.name
file_issues = {}
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
return {'parse_error': str(e)}
if not data:
return {'empty_file': True}
# 1. Check GHCID type mismatch
ghcid_type = extract_ghcid_type(filename)
expected_type = get_expected_type(data)
if ghcid_type and expected_type and ghcid_type != expected_type:
if ghcid_type == 'U' and expected_type != 'U':
file_issues['wrong_ghcid_type'] = {
'current': ghcid_type,
'expected': expected_type
}
# Also check for U type that should be something else
if ghcid_type == 'U':
file_issues['unknown_type'] = True
# 2. Check Google Maps mismatch
gm_mismatch = check_google_maps_mismatch(data, filename)
if gm_mismatch:
file_issues['google_maps_mismatch'] = gm_mismatch
# 3. Check absolute paths
abs_paths = check_absolute_paths(data, filename)
if abs_paths:
file_issues['absolute_paths'] = abs_paths
# 4. Check web claims
wc_issues = check_web_claims(data, filename)
if wc_issues:
file_issues['web_claims_issues'] = wc_issues
# 5. Check coordinates
coord_issues = check_coordinates(data, filename)
if coord_issues:
file_issues['coordinate_issues'] = coord_issues
# 6. Check digital platforms
dp_issues = check_digital_platforms(data, filename)
if dp_issues:
file_issues['digital_platform_issues'] = dp_issues
# 7. Check identifiers
id_issues = check_identifiers(data, filename)
if id_issues:
file_issues['identifier_issues'] = id_issues
# 8. Check Wikidata
wd_issue = check_wikidata(data, filename)
if wd_issue:
file_issues['wikidata_issue'] = wd_issue
# 9. Check URL
url_issues = check_url(data, filename)
if url_issues:
file_issues['url_issues'] = url_issues
return file_issues
def main():
print(f"Scanning Dutch custodian files in {CUSTODIAN_DIR}")
print(f"Scan started: {datetime.now().isoformat()}")
print("=" * 80)
# Collect all issues
all_issues = {}
issue_counts = defaultdict(int)
files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml"))
total_files = len(files)
print(f"Found {total_files} Dutch custodian files\n")
for i, filepath in enumerate(files):
if (i + 1) % 200 == 0:
print(f"Progress: {i+1}/{total_files} files scanned...", flush=True)
file_issues = scan_file(filepath)
if file_issues:
all_issues[filepath.name] = file_issues
for issue_type in file_issues.keys():
issue_counts[issue_type] += 1
print(f"\nScan complete: {total_files} files analyzed")
print("=" * 80)
# Summary report
print("\n" + "=" * 80)
print("SUMMARY REPORT: Data Quality Issues")
print("=" * 80)
print(f"\nTotal files scanned: {total_files}")
print(f"Files with issues: {len(all_issues)}")
print(f"Files without issues: {total_files - len(all_issues)}")
print("\n" + "-" * 80)
print("ISSUE BREAKDOWN BY TYPE")
print("-" * 80)
# Sort issues by count
sorted_issues = sorted(issue_counts.items(), key=lambda x: -x[1])
for issue_type, count in sorted_issues:
pct = (count / total_files) * 100
print(f"{issue_type:40} {count:5} files ({pct:5.1f}%)")
# Detailed breakdown for critical issues
print("\n" + "=" * 80)
print("CRITICAL ISSUES - REQUIRE IMMEDIATE ATTENTION")
print("=" * 80)
# 1. Wrong GHCID type
wrong_type_files = [(f, d) for f, d in all_issues.items() if 'wrong_ghcid_type' in d]
print(f"\n1. WRONG GHCID TYPE ({len(wrong_type_files)} files)")
print("-" * 40)
if wrong_type_files:
for filename, data in wrong_type_files[:20]:
info = data['wrong_ghcid_type']
print(f" {filename}: {info['current']} -> should be {info['expected']}")
if len(wrong_type_files) > 20:
print(f" ... and {len(wrong_type_files) - 20} more")
else:
print(" None found")
# 2. Google Maps mismatches
gm_mismatch_files = [(f, d) for f, d in all_issues.items() if 'google_maps_mismatch' in d]
print(f"\n2. GOOGLE MAPS MISMATCHES ({len(gm_mismatch_files)} files)")
print("-" * 40)
if gm_mismatch_files:
for filename, data in gm_mismatch_files[:20]:
info = data['google_maps_mismatch']
print(f" {filename}")
print(f" Google: {info['google_name']}")
print(f" Org: {info['org_name']}")
print(f" Similarity: {info['similarity']}")
if len(gm_mismatch_files) > 20:
print(f" ... and {len(gm_mismatch_files) - 20} more")
else:
print(" None found")
# 3. Absolute paths
abs_path_files = [(f, d) for f, d in all_issues.items() if 'absolute_paths' in d]
print(f"\n3. ABSOLUTE PATHS ({len(abs_path_files)} files)")
print("-" * 40)
if abs_path_files:
for filename, data in abs_path_files[:10]:
print(f" {filename}: {data['absolute_paths']}")
if len(abs_path_files) > 10:
print(f" ... and {len(abs_path_files) - 10} more")
else:
print(" None found")
# 4. Unknown type (U)
unknown_type_files = [f for f, d in all_issues.items() if 'unknown_type' in d]
print(f"\n4. UNKNOWN TYPE CODE 'U' ({len(unknown_type_files)} files)")
print("-" * 40)
if unknown_type_files:
for filename in unknown_type_files[:30]:
print(f" {filename}")
if len(unknown_type_files) > 30:
print(f" ... and {len(unknown_type_files) - 30} more")
else:
print(" None found")
print("\n" + "=" * 80)
print("ENRICHMENT GAPS")
print("=" * 80)
# Web claims issues
no_verified_claims = [f for f, d in all_issues.items()
if 'web_claims_issues' in d and 'no_verified_claims' in d['web_claims_issues']]
print(f"\n5. NO VERIFIED WEB CLAIMS ({len(no_verified_claims)} files)")
# Digital platforms
no_platforms = [f for f, d in all_issues.items()
if 'digital_platform_issues' in d]
print(f"6. NO DIGITAL PLATFORMS ({len(no_platforms)} files)")
# Wikidata
no_wikidata = [f for f, d in all_issues.items()
if d.get('wikidata_issue') in ['no_wikidata_enrichment', 'wikidata_not_found']]
print(f"7. NO WIKIDATA ENRICHMENT ({len(no_wikidata)} files)")
# URLs
no_url = [f for f, d in all_issues.items()
if 'url_issues' in d and 'no_url' in d['url_issues']]
print(f"8. NO URL ({len(no_url)} files)")
# Save detailed report
report_file = CUSTODIAN_DIR.parent / 'reports' / 'dutch_data_quality_scan.yaml'
report_file.parent.mkdir(exist_ok=True)
report = {
'scan_timestamp': datetime.now().isoformat(),
'total_files': total_files,
'files_with_issues': len(all_issues),
'issue_counts': dict(sorted_issues),
'detailed_issues': all_issues
}
with open(report_file, 'w', encoding='utf-8') as f:
yaml.dump(report, f, default_flow_style=False, allow_unicode=True)
print(f"\n\nDetailed report saved to: {report_file}")
print(f"Scan completed: {datetime.now().isoformat()}")
if __name__ == '__main__':
main()

199
scripts/scan_dutch_fast.py Normal file
View file

@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""Fast data quality scan - optimized for speed."""
import os
import re
import yaml
from pathlib import Path
from collections import defaultdict
from datetime import datetime
# Use C loader for speed
try:
from yaml import CSafeLoader as SafeLoader
except ImportError:
from yaml import SafeLoader
CUSTODIAN_DIR = Path("/Users/kempersc/apps/glam/data/custodian")
def extract_ghcid_type(filename):
match = re.match(r'NL-[A-Z]{2}-[A-Z]{3}-([A-Z])-', filename)
return match.group(1) if match else None
def scan_file_fast(filepath):
"""Fast scan using string operations where possible."""
filename = filepath.name
issues = []
try:
with open(filepath, 'r', encoding='utf-8') as f:
content = f.read()
except Exception as e:
return ['parse_error']
# Quick string-based checks first
# Absolute paths
if '/Volumes/KINGSTON/' in content or '/Users/kempersc/' in content:
issues.append('absolute_paths')
# No URL
if '\nurl:' not in content and 'url: ' not in content[:500]:
issues.append('no_url')
# HTTP instead of HTTPS
if 'url: http://' in content:
issues.append('http_not_https')
# No digital_platforms
if 'digital_platforms:' not in content:
issues.append('no_digital_platforms')
elif 'digital_platforms: []\n' in content or 'digital_platforms:\n-' not in content:
issues.append('empty_digital_platforms')
# No verified_claims
if 'verified_claims:' not in content:
issues.append('no_verified_claims')
# Wikidata NOT_FOUND
if "status: NOT_FOUND" in content:
issues.append('wikidata_not_found')
elif 'wikidata_enrichment:' not in content:
issues.append('no_wikidata_enrichment')
# Unknown type in filename
ghcid_type = extract_ghcid_type(filename)
if ghcid_type == 'U':
issues.append('unknown_type_U')
# Parse YAML only for complex checks
try:
data = yaml.load(content, Loader=SafeLoader)
except:
issues.append('yaml_parse_error')
return issues
if not data:
issues.append('empty_file')
return issues
# Check GHCID type mismatch
if 'original_entry' in data:
oe = data['original_entry']
expected = None
if 'type' in oe and oe['type'] and isinstance(oe['type'], list):
expected = oe['type'][0]
elif 'type_organisatie' in oe and oe['type_organisatie']:
type_map = {'archive': 'A', 'archief': 'A', 'library': 'L',
'bibliotheek': 'L', 'museum': 'M', 'gallery': 'G'}
expected = type_map.get(oe['type_organisatie'].lower())
if expected and ghcid_type and ghcid_type != expected:
issues.append(f'wrong_type:{ghcid_type}{expected}')
# Check Google Maps mismatch
if 'google_maps_enrichment' in data and 'original_entry' in data:
gm_name = data['google_maps_enrichment'].get('name', '').lower()
org_name = data['original_entry'].get('organisatie', '').lower()
if gm_name and org_name:
gm_words = set(gm_name.split()) - {'de', 'het', 'van', 'en', 'stichting'}
org_words = set(org_name.split()) - {'de', 'het', 'van', 'en', 'stichting'}
if gm_words and org_words:
overlap = len(gm_words & org_words)
similarity = overlap / max(len(gm_words), len(org_words))
if similarity < 0.25:
issues.append('google_maps_mismatch')
# Check coordinates
if 'location' in data:
loc = data['location']
lat = loc.get('latitude')
lon = loc.get('longitude')
if lat is None or lon is None:
issues.append('missing_coordinates')
elif not (50.5 < lat < 53.7 and 3.3 < lon < 7.3):
issues.append('coords_outside_NL')
else:
issues.append('no_location')
return issues
def main():
print(f"Fast scan started: {datetime.now().isoformat()}")
files = sorted(CUSTODIAN_DIR.glob("NL-*.yaml"))
total = len(files)
print(f"Scanning {total} Dutch custodian files...")
issue_counts = defaultdict(int)
files_with_issues = defaultdict(list)
for i, fp in enumerate(files):
issues = scan_file_fast(fp)
for issue in issues:
issue_counts[issue] += 1
files_with_issues[issue].append(fp.name)
print(f"\nScan complete: {datetime.now().isoformat()}")
print("\n" + "=" * 80)
print("DATA QUALITY SUMMARY REPORT")
print("=" * 80)
print(f"\nTotal files: {total}")
# Count files with any issue
all_issue_files = set()
for files_list in files_with_issues.values():
all_issue_files.update(files_list)
print(f"Files with issues: {len(all_issue_files)} ({100*len(all_issue_files)/total:.1f}%)")
print(f"Clean files: {total - len(all_issue_files)}")
print("\n" + "-" * 80)
print("ISSUE BREAKDOWN")
print("-" * 80)
# Sort by count
for issue, count in sorted(issue_counts.items(), key=lambda x: -x[1]):
pct = 100 * count / total
bar = "" * int(pct / 2)
print(f"{issue:35} {count:5} ({pct:5.1f}%) {bar}")
# Critical issues detail
print("\n" + "=" * 80)
print("CRITICAL ISSUES (require manual fix)")
print("=" * 80)
critical_issues = ['wrong_type:', 'google_maps_mismatch', 'absolute_paths', 'unknown_type_U']
for critical in critical_issues:
matching = [(k, v) for k, v in files_with_issues.items() if critical in k or k == critical]
if matching:
for issue_key, file_list in matching:
print(f"\n{issue_key} ({len(file_list)} files):")
for f in file_list[:15]:
print(f" - {f}")
if len(file_list) > 15:
print(f" ... and {len(file_list) - 15} more")
# Save report
report_path = CUSTODIAN_DIR.parent / 'reports' / 'dutch_data_quality_fast.yaml'
report_path.parent.mkdir(exist_ok=True)
report = {
'scan_timestamp': datetime.now().isoformat(),
'total_files': total,
'files_with_issues': len(all_issue_files),
'issue_counts': dict(sorted(issue_counts.items(), key=lambda x: -x[1])),
'files_by_issue': {k: v for k, v in files_with_issues.items()}
}
with open(report_path, 'w') as f:
yaml.dump(report, f, default_flow_style=False, allow_unicode=True)
print(f"\n\nFull report saved: {report_path}")
if __name__ == '__main__':
main()

View file

@ -0,0 +1,575 @@
#!/usr/bin/env python3
"""
Transform crawl4ai_enrichment data into proper digital_platform YAML structure.
This script processes custodian YAML files that have crawl4ai_enrichment data
and creates/updates the digital_platform block conforming to the LinkML schema.
Schema Reference:
- DigitalPlatform: schemas/20251121/linkml/modules/classes/DigitalPlatform.yaml
- AuxiliaryDigitalPlatform: schemas/20251121/linkml/modules/classes/AuxiliaryDigitalPlatform.yaml
- DigitalPlatformTypeEnum: schemas/20251121/linkml/modules/enums/DigitalPlatformTypeEnum.yaml
Usage:
python scripts/transform_crawl4ai_to_digital_platform.py [--dry-run] [--file FILE]
"""
import argparse
import logging
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import unquote, urlparse
import yaml
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(f'logs/transform_digital_platform_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
]
)
logger = logging.getLogger(__name__)
# Mapping from crawl4ai detected_catalog_urls type to DigitalPlatformTypeEnum
# and to the appropriate slot (collection_web_addresses or inventory_web_addresses)
CATALOG_TYPE_MAPPING = {
# Image collections → collection_web_addresses
'beeldbank': {
'platform_types': ['PHOTOGRAPH_COLLECTION'],
'slot': 'collection_web_addresses',
'description': 'Image/photograph collection'
},
# Genealogy → collection_web_addresses (specialized database)
'genealogie': {
'platform_types': ['GENEALOGY_DATABASE'],
'slot': 'collection_web_addresses',
'description': 'Genealogy records database'
},
# Archives/inventories → inventory_web_addresses
'archieven': {
'platform_types': ['ARCHIVES_PORTAL'],
'slot': 'inventory_web_addresses',
'description': 'Archival finding aids and inventories'
},
'inventaris': {
'platform_types': ['ARCHIVES_PORTAL'],
'slot': 'inventory_web_addresses',
'description': 'Archival inventory'
},
# Collections → collection_web_addresses
'collectie': {
'platform_types': ['ONLINE_DATABASE'],
'slot': 'collection_web_addresses',
'description': 'General collection access'
},
# Library → collection_web_addresses
'bibliotheek': {
'platform_types': ['DIGITAL_LIBRARY'],
'slot': 'collection_web_addresses',
'description': 'Library catalog'
},
# Search interfaces → collection_web_addresses
'zoeken': {
'platform_types': ['ONLINE_DATABASE'],
'slot': 'collection_web_addresses',
'description': 'Search interface'
},
# Kranten (newspapers) → collection_web_addresses
'kranten': {
'platform_types': ['ONLINE_NEWS_ARCHIVE'],
'slot': 'collection_web_addresses',
'description': 'Historical newspapers'
},
}
# Mapping for external archive platforms to AuxiliaryDigitalPlatformTypeEnum
EXTERNAL_PLATFORM_MAPPING = {
'archieven.nl': {
'platform_name': 'Archieven.nl',
'auxiliary_platform_type': 'AGGREGATOR',
'description': 'National Dutch archives aggregator'
},
'archiefweb.eu': {
'platform_name': 'Archiefweb.eu',
'auxiliary_platform_type': 'ARCHIVAL_REPOSITORY',
'description': 'Web archiving service'
},
'memorix.nl': {
'platform_name': 'Memorix',
'auxiliary_platform_type': 'DIGITAL_ARCHIVE',
'description': 'Heritage information management platform'
},
'opendata.archieven.nl': {
'platform_name': 'Open Data Archieven.nl',
'auxiliary_platform_type': 'OPEN_DATA_PORTAL',
'description': 'Open data from Dutch archives'
},
'regionaalarchief': {
'platform_name': 'Regionaal Archief',
'auxiliary_platform_type': 'ARCHIVES_PORTAL',
'description': 'Regional archive portal'
},
'delpher.nl': {
'platform_name': 'Delpher',
'auxiliary_platform_type': 'DIGITAL_LIBRARY',
'description': 'KB digitized newspapers, books, and periodicals'
},
'wiewaswie.nl': {
'platform_name': 'WieWasWie',
'auxiliary_platform_type': 'GENEALOGY_DATABASE',
'description': 'Dutch genealogy database'
},
}
def normalize_url(url: str) -> str:
"""Normalize URL by decoding and extracting base path."""
if not url:
return url
# URL decode
decoded = unquote(url)
# Parse URL
parsed = urlparse(decoded)
# Reconstruct without query parameters for deduplication key
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
# Remove trailing slash for consistency (except root)
if base_url.endswith('/') and len(parsed.path) > 1:
base_url = base_url[:-1]
return base_url
def extract_base_path_key(url: str) -> str:
"""Extract base path for deduplication (without query params)."""
parsed = urlparse(url)
return f"{parsed.netloc}{parsed.path}".rstrip('/')
def deduplicate_catalog_urls(catalog_urls: list[dict]) -> list[dict]:
"""
Deduplicate catalog URLs, preferring entries with XPath provenance.
Strategy:
1. Group URLs by base path (without query params)
2. For each group, prefer entries with xpath provenance
3. Return one representative URL per type per base path
"""
if not catalog_urls:
return []
# Group by (base_path, type)
grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
for entry in catalog_urls:
url = entry.get('url', '')
url_type = entry.get('type', 'unknown')
base_key = extract_base_path_key(url)
grouped[(base_key, url_type)].append(entry)
# Select best entry from each group
deduplicated = []
for (base_key, url_type), entries in grouped.items():
# Sort: entries with xpath first, then by URL length (shorter preferred)
sorted_entries = sorted(
entries,
key=lambda e: (0 if e.get('xpath') else 1, len(e.get('url', '')))
)
best = sorted_entries[0]
# Normalize the URL
best_copy = best.copy()
best_copy['url'] = normalize_url(best['url'])
deduplicated.append(best_copy)
return deduplicated
def generate_platform_id(ghcid: str) -> str:
"""Generate platform_id URI from GHCID."""
ghcid_lower = ghcid.lower().replace('_', '-')
return f"https://nde.nl/ontology/hc/platform/{ghcid_lower}-website"
def extract_ghcid_from_file(file_path: Path) -> str | None:
"""Extract GHCID from filename."""
stem = file_path.stem
# GHCID pattern: CC-RR-CCC-T-ABBREV (e.g., NL-DR-ASS-A-DA)
if re.match(r'^[A-Z]{2}-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-', stem):
return stem
return None
def determine_platform_types(catalog_urls: list[dict]) -> list[str]:
"""
Determine platform types from detected catalog URLs.
Returns list of DigitalPlatformTypeEnum values.
"""
types_set = set()
for entry in catalog_urls:
url_type = entry.get('type', '')
mapping = CATALOG_TYPE_MAPPING.get(url_type, {})
for pt in mapping.get('platform_types', []):
types_set.add(pt)
# If we have catalog URLs but no specific types, add generic ONLINE_DATABASE
if catalog_urls and not types_set:
types_set.add('ONLINE_DATABASE')
# Always include INSTITUTIONAL_WEBSITE as base type
types_set.add('INSTITUTIONAL_WEBSITE')
return sorted(list(types_set))
def categorize_urls_by_slot(catalog_urls: list[dict]) -> dict[str, list[str]]:
"""
Categorize URLs by target slot (collection_web_addresses vs inventory_web_addresses).
"""
slots = {
'collection_web_addresses': [],
'inventory_web_addresses': []
}
seen_urls = set()
for entry in catalog_urls:
url = entry.get('url', '')
if not url or url in seen_urls:
continue
url_type = entry.get('type', '')
mapping = CATALOG_TYPE_MAPPING.get(url_type, {})
slot = mapping.get('slot', 'collection_web_addresses')
slots[slot].append(url)
seen_urls.add(url)
return slots
def transform_external_platforms(external_platforms: list[dict]) -> list[dict]:
"""
Transform external_archive_platforms to auxiliary_platforms structure.
"""
if not external_platforms:
return []
auxiliary = []
seen_platforms = set()
for entry in external_platforms:
url = entry.get('url', '')
platform_key = entry.get('platform', '')
if not url or platform_key in seen_platforms:
continue
# Find mapping
mapping = None
for key, config in EXTERNAL_PLATFORM_MAPPING.items():
if key in platform_key or key in url:
mapping = config
break
if not mapping:
# Generic external platform
mapping = {
'platform_name': platform_key.replace('.', ' ').title() if platform_key else 'External Platform',
'auxiliary_platform_type': 'WEB_PORTAL',
'description': 'External heritage platform'
}
aux_platform = {
'platform_name': mapping['platform_name'],
'platform_url': url,
'auxiliary_platform_type': mapping['auxiliary_platform_type'],
'platform_purpose': mapping.get('description', '')
}
auxiliary.append(aux_platform)
seen_platforms.add(platform_key)
return auxiliary
def get_platform_name(data: dict, ghcid: str) -> str:
"""
Determine the best platform name from available data.
Priority:
1. custodian_name.emic_name or custodian_name.name
2. crawl4ai_enrichment.title (cleaned)
3. GHCID-based fallback
"""
# Try custodian_name first
custodian_name = data.get('custodian_name', {})
if isinstance(custodian_name, dict):
name = custodian_name.get('emic_name') or custodian_name.get('name')
if name:
return f"{name} Website"
# Try top-level name
if data.get('name'):
return f"{data['name']} Website"
# Try crawl4ai title
crawl4ai = data.get('crawl4ai_enrichment', {})
title = crawl4ai.get('title', '')
if title:
# Clean up title (remove common suffixes)
cleaned = re.sub(r'\s*[-|]\s*.+$', '', title).strip()
if cleaned and len(cleaned) > 3:
return f"{cleaned} Website"
# Fallback to GHCID
return f"{ghcid} Website"
def transform_crawl4ai_to_digital_platform(data: dict, ghcid: str) -> dict | None:
"""
Transform crawl4ai_enrichment into digital_platform structure.
Args:
data: Full custodian YAML data
ghcid: Global Heritage Custodian Identifier
Returns:
digital_platform dict or None if no crawl4ai_enrichment
"""
crawl4ai = data.get('crawl4ai_enrichment')
if not crawl4ai:
return None
# Skip failed fetches - accept 2xx and 3xx status codes
status_code = crawl4ai.get('status_code')
if status_code is None or status_code >= 400:
logger.debug(f"Skipping {ghcid}: HTTP status {status_code}")
return None
source_url = crawl4ai.get('source_url', '')
if not source_url:
return None
# Get and deduplicate catalog URLs
catalog_urls = crawl4ai.get('detected_catalog_urls', [])
deduped_catalogs = deduplicate_catalog_urls(catalog_urls)
# Determine platform types
platform_types = determine_platform_types(deduped_catalogs)
# Categorize URLs by slot
url_slots = categorize_urls_by_slot(deduped_catalogs)
# Transform external platforms
external_platforms = crawl4ai.get('external_archive_platforms', [])
auxiliary_platforms = transform_external_platforms(external_platforms)
# Build digital_platform structure
digital_platform = {
'platform_id': generate_platform_id(ghcid),
'platform_name': get_platform_name(data, ghcid),
'homepage_web_address': source_url,
'refers_to_custodian': f"https://nde.nl/ontology/hc/{ghcid.lower()}"
}
# Add platform types if we have more than just INSTITUTIONAL_WEBSITE
if platform_types and len(platform_types) > 1:
digital_platform['platform_type'] = platform_types
elif platform_types:
digital_platform['platform_type'] = platform_types
# Add collection URLs
if url_slots['collection_web_addresses']:
digital_platform['collection_web_addresses'] = url_slots['collection_web_addresses']
# Add inventory URLs
if url_slots['inventory_web_addresses']:
digital_platform['inventory_web_addresses'] = url_slots['inventory_web_addresses']
# Add auxiliary platforms
if auxiliary_platforms:
digital_platform['auxiliary_platforms'] = auxiliary_platforms
# Add transformation metadata
digital_platform['_transformation_metadata'] = {
'source': 'crawl4ai_enrichment',
'transformation_date': datetime.now(timezone.utc).isoformat(),
'catalog_urls_original': len(catalog_urls),
'catalog_urls_deduplicated': len(deduped_catalogs),
'external_platforms_count': len(external_platforms)
}
return digital_platform
def process_file(file_path: Path, dry_run: bool = False) -> dict:
"""
Process a single custodian YAML file.
Returns:
dict with processing statistics
"""
stats = {
'file': str(file_path.name),
'status': 'skipped',
'has_crawl4ai': False,
'has_digital_platform': False,
'catalog_urls': 0,
'external_platforms': 0
}
try:
# Read YAML file
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
stats['status'] = 'empty'
return stats
# Extract GHCID
ghcid = extract_ghcid_from_file(file_path)
if not ghcid:
stats['status'] = 'no_ghcid'
return stats
# Check for crawl4ai_enrichment
crawl4ai = data.get('crawl4ai_enrichment')
if not crawl4ai:
stats['status'] = 'no_crawl4ai'
return stats
stats['has_crawl4ai'] = True
stats['catalog_urls'] = len(crawl4ai.get('detected_catalog_urls', []))
stats['external_platforms'] = len(crawl4ai.get('external_archive_platforms', []))
# Check if digital_platform_v2 already exists (avoid overwriting)
if 'digital_platform_v2' in data:
stats['has_digital_platform'] = True
stats['status'] = 'already_transformed'
return stats
# Transform to digital_platform
digital_platform = transform_crawl4ai_to_digital_platform(data, ghcid)
if not digital_platform:
stats['status'] = 'transform_failed'
return stats
# Add to data as digital_platform_v2 (to distinguish from any existing digital_platform)
data['digital_platform_v2'] = digital_platform
if not dry_run:
# Write back to file
with open(file_path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
stats['status'] = 'transformed'
else:
stats['status'] = 'would_transform'
logger.info(f"[DRY-RUN] Would transform {file_path.name}")
logger.debug(f" Platform types: {digital_platform.get('platform_type', [])}")
logger.debug(f" Collection URLs: {len(digital_platform.get('collection_web_addresses', []))}")
logger.debug(f" Inventory URLs: {len(digital_platform.get('inventory_web_addresses', []))}")
logger.debug(f" Auxiliary platforms: {len(digital_platform.get('auxiliary_platforms', []))}")
return stats
except yaml.YAMLError as e:
logger.error(f"YAML error in {file_path.name}: {e}")
stats['status'] = 'yaml_error'
return stats
except Exception as e:
logger.error(f"Error processing {file_path.name}: {e}")
stats['status'] = 'error'
return stats
def main():
parser = argparse.ArgumentParser(
description='Transform crawl4ai_enrichment to digital_platform structure'
)
parser.add_argument(
'--dry-run',
action='store_true',
help='Show what would be done without making changes'
)
parser.add_argument(
'--file',
type=Path,
help='Process a single file instead of all NL-*.yaml files'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='Enable verbose logging'
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
# Ensure logs directory exists
Path('logs').mkdir(exist_ok=True)
# Get files to process
data_dir = Path('data/custodian')
if args.file:
if not args.file.exists():
logger.error(f"File not found: {args.file}")
sys.exit(1)
files = [args.file]
else:
files = sorted(data_dir.glob('NL-*.yaml'))
logger.info(f"Processing {len(files)} files...")
if args.dry_run:
logger.info("DRY-RUN MODE - no files will be modified")
# Process files
stats_summary = defaultdict(int)
total_catalog_urls = 0
total_external_platforms = 0
for i, file_path in enumerate(files):
if (i + 1) % 100 == 0:
logger.info(f"Progress: {i + 1}/{len(files)} files processed")
stats = process_file(file_path, dry_run=args.dry_run)
stats_summary[stats['status']] += 1
total_catalog_urls += stats.get('catalog_urls', 0)
total_external_platforms += stats.get('external_platforms', 0)
# Print summary
logger.info("\n" + "=" * 60)
logger.info("TRANSFORMATION SUMMARY")
logger.info("=" * 60)
logger.info(f"Total files processed: {len(files)}")
for status, count in sorted(stats_summary.items()):
logger.info(f" {status}: {count}")
logger.info(f"\nTotal catalog URLs found: {total_catalog_urls}")
logger.info(f"Total external platforms found: {total_external_platforms}")
if args.dry_run:
logger.info("\n[DRY-RUN] No files were modified. Run without --dry-run to apply changes.")
if __name__ == '__main__':
main()