- batch_crawl4ai_recrawl.py: Retry failed URL crawls - batch_firecrawl_recrawl.py: FireCrawl batch processing - batch_httpx_scrape.py: HTTPX-based scraping - detect_name_mismatch.py: Find name mismatches in data - enrich_dutch_custodians_crawl4ai.py: Dutch custodian enrichment - fix_collision_victims.py: GHCID collision resolution - fix_generic_platform_names*.py: Platform name cleanup - fix_ghcid_type.py: GHCID type corrections - fix_simon_kemper_contamination.py: Data cleanup - scan_dutch_data_quality.py: Data quality scanning - transform_crawl4ai_to_digital_platform.py: Data transformation
575 lines
19 KiB
Python
575 lines
19 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Transform crawl4ai_enrichment data into proper digital_platform YAML structure.
|
||
|
||
This script processes custodian YAML files that have crawl4ai_enrichment data
|
||
and creates/updates the digital_platform block conforming to the LinkML schema.
|
||
|
||
Schema Reference:
|
||
- DigitalPlatform: schemas/20251121/linkml/modules/classes/DigitalPlatform.yaml
|
||
- AuxiliaryDigitalPlatform: schemas/20251121/linkml/modules/classes/AuxiliaryDigitalPlatform.yaml
|
||
- DigitalPlatformTypeEnum: schemas/20251121/linkml/modules/enums/DigitalPlatformTypeEnum.yaml
|
||
|
||
Usage:
|
||
python scripts/transform_crawl4ai_to_digital_platform.py [--dry-run] [--file FILE]
|
||
"""
|
||
|
||
import argparse
|
||
import logging
|
||
import re
|
||
import sys
|
||
from collections import defaultdict
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any
|
||
from urllib.parse import unquote, urlparse
|
||
|
||
import yaml
|
||
|
||
# Configure logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(sys.stdout),
|
||
logging.FileHandler(f'logs/transform_digital_platform_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# Mapping from crawl4ai detected_catalog_urls type to DigitalPlatformTypeEnum
|
||
# and to the appropriate slot (collection_web_addresses or inventory_web_addresses)
|
||
CATALOG_TYPE_MAPPING = {
|
||
# Image collections → collection_web_addresses
|
||
'beeldbank': {
|
||
'platform_types': ['PHOTOGRAPH_COLLECTION'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'Image/photograph collection'
|
||
},
|
||
# Genealogy → collection_web_addresses (specialized database)
|
||
'genealogie': {
|
||
'platform_types': ['GENEALOGY_DATABASE'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'Genealogy records database'
|
||
},
|
||
# Archives/inventories → inventory_web_addresses
|
||
'archieven': {
|
||
'platform_types': ['ARCHIVES_PORTAL'],
|
||
'slot': 'inventory_web_addresses',
|
||
'description': 'Archival finding aids and inventories'
|
||
},
|
||
'inventaris': {
|
||
'platform_types': ['ARCHIVES_PORTAL'],
|
||
'slot': 'inventory_web_addresses',
|
||
'description': 'Archival inventory'
|
||
},
|
||
# Collections → collection_web_addresses
|
||
'collectie': {
|
||
'platform_types': ['ONLINE_DATABASE'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'General collection access'
|
||
},
|
||
# Library → collection_web_addresses
|
||
'bibliotheek': {
|
||
'platform_types': ['DIGITAL_LIBRARY'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'Library catalog'
|
||
},
|
||
# Search interfaces → collection_web_addresses
|
||
'zoeken': {
|
||
'platform_types': ['ONLINE_DATABASE'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'Search interface'
|
||
},
|
||
# Kranten (newspapers) → collection_web_addresses
|
||
'kranten': {
|
||
'platform_types': ['ONLINE_NEWS_ARCHIVE'],
|
||
'slot': 'collection_web_addresses',
|
||
'description': 'Historical newspapers'
|
||
},
|
||
}
|
||
|
||
# Mapping for external archive platforms to AuxiliaryDigitalPlatformTypeEnum
|
||
EXTERNAL_PLATFORM_MAPPING = {
|
||
'archieven.nl': {
|
||
'platform_name': 'Archieven.nl',
|
||
'auxiliary_platform_type': 'AGGREGATOR',
|
||
'description': 'National Dutch archives aggregator'
|
||
},
|
||
'archiefweb.eu': {
|
||
'platform_name': 'Archiefweb.eu',
|
||
'auxiliary_platform_type': 'ARCHIVAL_REPOSITORY',
|
||
'description': 'Web archiving service'
|
||
},
|
||
'memorix.nl': {
|
||
'platform_name': 'Memorix',
|
||
'auxiliary_platform_type': 'DIGITAL_ARCHIVE',
|
||
'description': 'Heritage information management platform'
|
||
},
|
||
'opendata.archieven.nl': {
|
||
'platform_name': 'Open Data Archieven.nl',
|
||
'auxiliary_platform_type': 'OPEN_DATA_PORTAL',
|
||
'description': 'Open data from Dutch archives'
|
||
},
|
||
'regionaalarchief': {
|
||
'platform_name': 'Regionaal Archief',
|
||
'auxiliary_platform_type': 'ARCHIVES_PORTAL',
|
||
'description': 'Regional archive portal'
|
||
},
|
||
'delpher.nl': {
|
||
'platform_name': 'Delpher',
|
||
'auxiliary_platform_type': 'DIGITAL_LIBRARY',
|
||
'description': 'KB digitized newspapers, books, and periodicals'
|
||
},
|
||
'wiewaswie.nl': {
|
||
'platform_name': 'WieWasWie',
|
||
'auxiliary_platform_type': 'GENEALOGY_DATABASE',
|
||
'description': 'Dutch genealogy database'
|
||
},
|
||
}
|
||
|
||
|
||
def normalize_url(url: str) -> str:
|
||
"""Normalize URL by decoding and extracting base path."""
|
||
if not url:
|
||
return url
|
||
|
||
# URL decode
|
||
decoded = unquote(url)
|
||
|
||
# Parse URL
|
||
parsed = urlparse(decoded)
|
||
|
||
# Reconstruct without query parameters for deduplication key
|
||
base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
||
|
||
# Remove trailing slash for consistency (except root)
|
||
if base_url.endswith('/') and len(parsed.path) > 1:
|
||
base_url = base_url[:-1]
|
||
|
||
return base_url
|
||
|
||
|
||
def extract_base_path_key(url: str) -> str:
|
||
"""Extract base path for deduplication (without query params)."""
|
||
parsed = urlparse(url)
|
||
return f"{parsed.netloc}{parsed.path}".rstrip('/')
|
||
|
||
|
||
def deduplicate_catalog_urls(catalog_urls: list[dict]) -> list[dict]:
|
||
"""
|
||
Deduplicate catalog URLs, preferring entries with XPath provenance.
|
||
|
||
Strategy:
|
||
1. Group URLs by base path (without query params)
|
||
2. For each group, prefer entries with xpath provenance
|
||
3. Return one representative URL per type per base path
|
||
"""
|
||
if not catalog_urls:
|
||
return []
|
||
|
||
# Group by (base_path, type)
|
||
grouped: dict[tuple[str, str], list[dict]] = defaultdict(list)
|
||
|
||
for entry in catalog_urls:
|
||
url = entry.get('url', '')
|
||
url_type = entry.get('type', 'unknown')
|
||
base_key = extract_base_path_key(url)
|
||
grouped[(base_key, url_type)].append(entry)
|
||
|
||
# Select best entry from each group
|
||
deduplicated = []
|
||
for (base_key, url_type), entries in grouped.items():
|
||
# Sort: entries with xpath first, then by URL length (shorter preferred)
|
||
sorted_entries = sorted(
|
||
entries,
|
||
key=lambda e: (0 if e.get('xpath') else 1, len(e.get('url', '')))
|
||
)
|
||
best = sorted_entries[0]
|
||
|
||
# Normalize the URL
|
||
best_copy = best.copy()
|
||
best_copy['url'] = normalize_url(best['url'])
|
||
deduplicated.append(best_copy)
|
||
|
||
return deduplicated
|
||
|
||
|
||
def generate_platform_id(ghcid: str) -> str:
|
||
"""Generate platform_id URI from GHCID."""
|
||
ghcid_lower = ghcid.lower().replace('_', '-')
|
||
return f"https://nde.nl/ontology/hc/platform/{ghcid_lower}-website"
|
||
|
||
|
||
def extract_ghcid_from_file(file_path: Path) -> str | None:
|
||
"""Extract GHCID from filename."""
|
||
stem = file_path.stem
|
||
# GHCID pattern: CC-RR-CCC-T-ABBREV (e.g., NL-DR-ASS-A-DA)
|
||
if re.match(r'^[A-Z]{2}-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-', stem):
|
||
return stem
|
||
return None
|
||
|
||
|
||
def determine_platform_types(catalog_urls: list[dict]) -> list[str]:
|
||
"""
|
||
Determine platform types from detected catalog URLs.
|
||
|
||
Returns list of DigitalPlatformTypeEnum values.
|
||
"""
|
||
types_set = set()
|
||
|
||
for entry in catalog_urls:
|
||
url_type = entry.get('type', '')
|
||
mapping = CATALOG_TYPE_MAPPING.get(url_type, {})
|
||
for pt in mapping.get('platform_types', []):
|
||
types_set.add(pt)
|
||
|
||
# If we have catalog URLs but no specific types, add generic ONLINE_DATABASE
|
||
if catalog_urls and not types_set:
|
||
types_set.add('ONLINE_DATABASE')
|
||
|
||
# Always include INSTITUTIONAL_WEBSITE as base type
|
||
types_set.add('INSTITUTIONAL_WEBSITE')
|
||
|
||
return sorted(list(types_set))
|
||
|
||
|
||
def categorize_urls_by_slot(catalog_urls: list[dict]) -> dict[str, list[str]]:
|
||
"""
|
||
Categorize URLs by target slot (collection_web_addresses vs inventory_web_addresses).
|
||
"""
|
||
slots = {
|
||
'collection_web_addresses': [],
|
||
'inventory_web_addresses': []
|
||
}
|
||
|
||
seen_urls = set()
|
||
|
||
for entry in catalog_urls:
|
||
url = entry.get('url', '')
|
||
if not url or url in seen_urls:
|
||
continue
|
||
|
||
url_type = entry.get('type', '')
|
||
mapping = CATALOG_TYPE_MAPPING.get(url_type, {})
|
||
slot = mapping.get('slot', 'collection_web_addresses')
|
||
|
||
slots[slot].append(url)
|
||
seen_urls.add(url)
|
||
|
||
return slots
|
||
|
||
|
||
def transform_external_platforms(external_platforms: list[dict]) -> list[dict]:
|
||
"""
|
||
Transform external_archive_platforms to auxiliary_platforms structure.
|
||
"""
|
||
if not external_platforms:
|
||
return []
|
||
|
||
auxiliary = []
|
||
seen_platforms = set()
|
||
|
||
for entry in external_platforms:
|
||
url = entry.get('url', '')
|
||
platform_key = entry.get('platform', '')
|
||
|
||
if not url or platform_key in seen_platforms:
|
||
continue
|
||
|
||
# Find mapping
|
||
mapping = None
|
||
for key, config in EXTERNAL_PLATFORM_MAPPING.items():
|
||
if key in platform_key or key in url:
|
||
mapping = config
|
||
break
|
||
|
||
if not mapping:
|
||
# Generic external platform
|
||
mapping = {
|
||
'platform_name': platform_key.replace('.', ' ').title() if platform_key else 'External Platform',
|
||
'auxiliary_platform_type': 'WEB_PORTAL',
|
||
'description': 'External heritage platform'
|
||
}
|
||
|
||
aux_platform = {
|
||
'platform_name': mapping['platform_name'],
|
||
'platform_url': url,
|
||
'auxiliary_platform_type': mapping['auxiliary_platform_type'],
|
||
'platform_purpose': mapping.get('description', '')
|
||
}
|
||
|
||
auxiliary.append(aux_platform)
|
||
seen_platforms.add(platform_key)
|
||
|
||
return auxiliary
|
||
|
||
|
||
def get_platform_name(data: dict, ghcid: str) -> str:
|
||
"""
|
||
Determine the best platform name from available data.
|
||
|
||
Priority:
|
||
1. custodian_name.emic_name or custodian_name.name
|
||
2. crawl4ai_enrichment.title (cleaned)
|
||
3. GHCID-based fallback
|
||
"""
|
||
# Try custodian_name first
|
||
custodian_name = data.get('custodian_name', {})
|
||
if isinstance(custodian_name, dict):
|
||
name = custodian_name.get('emic_name') or custodian_name.get('name')
|
||
if name:
|
||
return f"{name} Website"
|
||
|
||
# Try top-level name
|
||
if data.get('name'):
|
||
return f"{data['name']} Website"
|
||
|
||
# Try crawl4ai title
|
||
crawl4ai = data.get('crawl4ai_enrichment', {})
|
||
title = crawl4ai.get('title', '')
|
||
if title:
|
||
# Clean up title (remove common suffixes)
|
||
cleaned = re.sub(r'\s*[-–|]\s*.+$', '', title).strip()
|
||
if cleaned and len(cleaned) > 3:
|
||
return f"{cleaned} Website"
|
||
|
||
# Fallback to GHCID
|
||
return f"{ghcid} Website"
|
||
|
||
|
||
def transform_crawl4ai_to_digital_platform(data: dict, ghcid: str) -> dict | None:
|
||
"""
|
||
Transform crawl4ai_enrichment into digital_platform structure.
|
||
|
||
Args:
|
||
data: Full custodian YAML data
|
||
ghcid: Global Heritage Custodian Identifier
|
||
|
||
Returns:
|
||
digital_platform dict or None if no crawl4ai_enrichment
|
||
"""
|
||
crawl4ai = data.get('crawl4ai_enrichment')
|
||
if not crawl4ai:
|
||
return None
|
||
|
||
# Skip failed fetches - accept 2xx and 3xx status codes
|
||
status_code = crawl4ai.get('status_code')
|
||
if status_code is None or status_code >= 400:
|
||
logger.debug(f"Skipping {ghcid}: HTTP status {status_code}")
|
||
return None
|
||
|
||
source_url = crawl4ai.get('source_url', '')
|
||
if not source_url:
|
||
return None
|
||
|
||
# Get and deduplicate catalog URLs
|
||
catalog_urls = crawl4ai.get('detected_catalog_urls', [])
|
||
deduped_catalogs = deduplicate_catalog_urls(catalog_urls)
|
||
|
||
# Determine platform types
|
||
platform_types = determine_platform_types(deduped_catalogs)
|
||
|
||
# Categorize URLs by slot
|
||
url_slots = categorize_urls_by_slot(deduped_catalogs)
|
||
|
||
# Transform external platforms
|
||
external_platforms = crawl4ai.get('external_archive_platforms', [])
|
||
auxiliary_platforms = transform_external_platforms(external_platforms)
|
||
|
||
# Build digital_platform structure
|
||
digital_platform = {
|
||
'platform_id': generate_platform_id(ghcid),
|
||
'platform_name': get_platform_name(data, ghcid),
|
||
'homepage_web_address': source_url,
|
||
'refers_to_custodian': f"https://nde.nl/ontology/hc/{ghcid.lower()}"
|
||
}
|
||
|
||
# Add platform types if we have more than just INSTITUTIONAL_WEBSITE
|
||
if platform_types and len(platform_types) > 1:
|
||
digital_platform['platform_type'] = platform_types
|
||
elif platform_types:
|
||
digital_platform['platform_type'] = platform_types
|
||
|
||
# Add collection URLs
|
||
if url_slots['collection_web_addresses']:
|
||
digital_platform['collection_web_addresses'] = url_slots['collection_web_addresses']
|
||
|
||
# Add inventory URLs
|
||
if url_slots['inventory_web_addresses']:
|
||
digital_platform['inventory_web_addresses'] = url_slots['inventory_web_addresses']
|
||
|
||
# Add auxiliary platforms
|
||
if auxiliary_platforms:
|
||
digital_platform['auxiliary_platforms'] = auxiliary_platforms
|
||
|
||
# Add transformation metadata
|
||
digital_platform['_transformation_metadata'] = {
|
||
'source': 'crawl4ai_enrichment',
|
||
'transformation_date': datetime.now(timezone.utc).isoformat(),
|
||
'catalog_urls_original': len(catalog_urls),
|
||
'catalog_urls_deduplicated': len(deduped_catalogs),
|
||
'external_platforms_count': len(external_platforms)
|
||
}
|
||
|
||
return digital_platform
|
||
|
||
|
||
def process_file(file_path: Path, dry_run: bool = False) -> dict:
|
||
"""
|
||
Process a single custodian YAML file.
|
||
|
||
Returns:
|
||
dict with processing statistics
|
||
"""
|
||
stats = {
|
||
'file': str(file_path.name),
|
||
'status': 'skipped',
|
||
'has_crawl4ai': False,
|
||
'has_digital_platform': False,
|
||
'catalog_urls': 0,
|
||
'external_platforms': 0
|
||
}
|
||
|
||
try:
|
||
# Read YAML file
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
data = yaml.safe_load(f)
|
||
|
||
if not data:
|
||
stats['status'] = 'empty'
|
||
return stats
|
||
|
||
# Extract GHCID
|
||
ghcid = extract_ghcid_from_file(file_path)
|
||
if not ghcid:
|
||
stats['status'] = 'no_ghcid'
|
||
return stats
|
||
|
||
# Check for crawl4ai_enrichment
|
||
crawl4ai = data.get('crawl4ai_enrichment')
|
||
if not crawl4ai:
|
||
stats['status'] = 'no_crawl4ai'
|
||
return stats
|
||
|
||
stats['has_crawl4ai'] = True
|
||
stats['catalog_urls'] = len(crawl4ai.get('detected_catalog_urls', []))
|
||
stats['external_platforms'] = len(crawl4ai.get('external_archive_platforms', []))
|
||
|
||
# Check if digital_platform_v2 already exists (avoid overwriting)
|
||
if 'digital_platform_v2' in data:
|
||
stats['has_digital_platform'] = True
|
||
stats['status'] = 'already_transformed'
|
||
return stats
|
||
|
||
# Transform to digital_platform
|
||
digital_platform = transform_crawl4ai_to_digital_platform(data, ghcid)
|
||
|
||
if not digital_platform:
|
||
stats['status'] = 'transform_failed'
|
||
return stats
|
||
|
||
# Add to data as digital_platform_v2 (to distinguish from any existing digital_platform)
|
||
data['digital_platform_v2'] = digital_platform
|
||
|
||
if not dry_run:
|
||
# Write back to file
|
||
with open(file_path, 'w', encoding='utf-8') as f:
|
||
yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
||
stats['status'] = 'transformed'
|
||
else:
|
||
stats['status'] = 'would_transform'
|
||
logger.info(f"[DRY-RUN] Would transform {file_path.name}")
|
||
logger.debug(f" Platform types: {digital_platform.get('platform_type', [])}")
|
||
logger.debug(f" Collection URLs: {len(digital_platform.get('collection_web_addresses', []))}")
|
||
logger.debug(f" Inventory URLs: {len(digital_platform.get('inventory_web_addresses', []))}")
|
||
logger.debug(f" Auxiliary platforms: {len(digital_platform.get('auxiliary_platforms', []))}")
|
||
|
||
return stats
|
||
|
||
except yaml.YAMLError as e:
|
||
logger.error(f"YAML error in {file_path.name}: {e}")
|
||
stats['status'] = 'yaml_error'
|
||
return stats
|
||
except Exception as e:
|
||
logger.error(f"Error processing {file_path.name}: {e}")
|
||
stats['status'] = 'error'
|
||
return stats
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description='Transform crawl4ai_enrichment to digital_platform structure'
|
||
)
|
||
parser.add_argument(
|
||
'--dry-run',
|
||
action='store_true',
|
||
help='Show what would be done without making changes'
|
||
)
|
||
parser.add_argument(
|
||
'--file',
|
||
type=Path,
|
||
help='Process a single file instead of all NL-*.yaml files'
|
||
)
|
||
parser.add_argument(
|
||
'--verbose', '-v',
|
||
action='store_true',
|
||
help='Enable verbose logging'
|
||
)
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.verbose:
|
||
logging.getLogger().setLevel(logging.DEBUG)
|
||
|
||
# Ensure logs directory exists
|
||
Path('logs').mkdir(exist_ok=True)
|
||
|
||
# Get files to process
|
||
data_dir = Path('data/custodian')
|
||
|
||
if args.file:
|
||
if not args.file.exists():
|
||
logger.error(f"File not found: {args.file}")
|
||
sys.exit(1)
|
||
files = [args.file]
|
||
else:
|
||
files = sorted(data_dir.glob('NL-*.yaml'))
|
||
|
||
logger.info(f"Processing {len(files)} files...")
|
||
if args.dry_run:
|
||
logger.info("DRY-RUN MODE - no files will be modified")
|
||
|
||
# Process files
|
||
stats_summary = defaultdict(int)
|
||
total_catalog_urls = 0
|
||
total_external_platforms = 0
|
||
|
||
for i, file_path in enumerate(files):
|
||
if (i + 1) % 100 == 0:
|
||
logger.info(f"Progress: {i + 1}/{len(files)} files processed")
|
||
|
||
stats = process_file(file_path, dry_run=args.dry_run)
|
||
stats_summary[stats['status']] += 1
|
||
total_catalog_urls += stats.get('catalog_urls', 0)
|
||
total_external_platforms += stats.get('external_platforms', 0)
|
||
|
||
# Print summary
|
||
logger.info("\n" + "=" * 60)
|
||
logger.info("TRANSFORMATION SUMMARY")
|
||
logger.info("=" * 60)
|
||
logger.info(f"Total files processed: {len(files)}")
|
||
|
||
for status, count in sorted(stats_summary.items()):
|
||
logger.info(f" {status}: {count}")
|
||
|
||
logger.info(f"\nTotal catalog URLs found: {total_catalog_urls}")
|
||
logger.info(f"Total external platforms found: {total_external_platforms}")
|
||
|
||
if args.dry_run:
|
||
logger.info("\n[DRY-RUN] No files were modified. Run without --dry-run to apply changes.")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|