#!/usr/bin/env python3 """ Transform crawl4ai_enrichment data into proper digital_platform YAML structure. This script processes custodian YAML files that have crawl4ai_enrichment data and creates/updates the digital_platform block conforming to the LinkML schema. Schema Reference: - DigitalPlatform: schemas/20251121/linkml/modules/classes/DigitalPlatform.yaml - AuxiliaryDigitalPlatform: schemas/20251121/linkml/modules/classes/AuxiliaryDigitalPlatform.yaml - DigitalPlatformTypeEnum: schemas/20251121/linkml/modules/enums/DigitalPlatformTypeEnum.yaml Usage: python scripts/transform_crawl4ai_to_digital_platform.py [--dry-run] [--file FILE] """ import argparse import logging import re import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.parse import unquote, urlparse import yaml # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(f'logs/transform_digital_platform_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log') ] ) logger = logging.getLogger(__name__) # Mapping from crawl4ai detected_catalog_urls type to DigitalPlatformTypeEnum # and to the appropriate slot (collection_web_addresses or inventory_web_addresses) CATALOG_TYPE_MAPPING = { # Image collections → collection_web_addresses 'beeldbank': { 'platform_types': ['PHOTOGRAPH_COLLECTION'], 'slot': 'collection_web_addresses', 'description': 'Image/photograph collection' }, # Genealogy → collection_web_addresses (specialized database) 'genealogie': { 'platform_types': ['GENEALOGY_DATABASE'], 'slot': 'collection_web_addresses', 'description': 'Genealogy records database' }, # Archives/inventories → inventory_web_addresses 'archieven': { 'platform_types': ['ARCHIVES_PORTAL'], 'slot': 'inventory_web_addresses', 'description': 'Archival finding aids and inventories' }, 'inventaris': { 'platform_types': ['ARCHIVES_PORTAL'], 'slot': 'inventory_web_addresses', 'description': 'Archival inventory' }, # Collections → collection_web_addresses 'collectie': { 'platform_types': ['ONLINE_DATABASE'], 'slot': 'collection_web_addresses', 'description': 'General collection access' }, # Library → collection_web_addresses 'bibliotheek': { 'platform_types': ['DIGITAL_LIBRARY'], 'slot': 'collection_web_addresses', 'description': 'Library catalog' }, # Search interfaces → collection_web_addresses 'zoeken': { 'platform_types': ['ONLINE_DATABASE'], 'slot': 'collection_web_addresses', 'description': 'Search interface' }, # Kranten (newspapers) → collection_web_addresses 'kranten': { 'platform_types': ['ONLINE_NEWS_ARCHIVE'], 'slot': 'collection_web_addresses', 'description': 'Historical newspapers' }, } # Mapping for external archive platforms to AuxiliaryDigitalPlatformTypeEnum EXTERNAL_PLATFORM_MAPPING = { 'archieven.nl': { 'platform_name': 'Archieven.nl', 'auxiliary_platform_type': 'AGGREGATOR', 'description': 'National Dutch archives aggregator' }, 'archiefweb.eu': { 'platform_name': 'Archiefweb.eu', 'auxiliary_platform_type': 'ARCHIVAL_REPOSITORY', 'description': 'Web archiving service' }, 'memorix.nl': { 'platform_name': 'Memorix', 'auxiliary_platform_type': 'DIGITAL_ARCHIVE', 'description': 'Heritage information management platform' }, 'opendata.archieven.nl': { 'platform_name': 'Open Data Archieven.nl', 'auxiliary_platform_type': 'OPEN_DATA_PORTAL', 'description': 'Open data from Dutch archives' }, 'regionaalarchief': { 'platform_name': 'Regionaal Archief', 'auxiliary_platform_type': 'ARCHIVES_PORTAL', 'description': 'Regional archive portal' }, 'delpher.nl': { 'platform_name': 'Delpher', 'auxiliary_platform_type': 'DIGITAL_LIBRARY', 'description': 'KB digitized newspapers, books, and periodicals' }, 'wiewaswie.nl': { 'platform_name': 'WieWasWie', 'auxiliary_platform_type': 'GENEALOGY_DATABASE', 'description': 'Dutch genealogy database' }, } def normalize_url(url: str) -> str: """Normalize URL by decoding and extracting base path.""" if not url: return url # URL decode decoded = unquote(url) # Parse URL parsed = urlparse(decoded) # Reconstruct without query parameters for deduplication key base_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" # Remove trailing slash for consistency (except root) if base_url.endswith('/') and len(parsed.path) > 1: base_url = base_url[:-1] return base_url def extract_base_path_key(url: str) -> str: """Extract base path for deduplication (without query params).""" parsed = urlparse(url) return f"{parsed.netloc}{parsed.path}".rstrip('/') def deduplicate_catalog_urls(catalog_urls: list[dict]) -> list[dict]: """ Deduplicate catalog URLs, preferring entries with XPath provenance. Strategy: 1. Group URLs by base path (without query params) 2. For each group, prefer entries with xpath provenance 3. Return one representative URL per type per base path """ if not catalog_urls: return [] # Group by (base_path, type) grouped: dict[tuple[str, str], list[dict]] = defaultdict(list) for entry in catalog_urls: url = entry.get('url', '') url_type = entry.get('type', 'unknown') base_key = extract_base_path_key(url) grouped[(base_key, url_type)].append(entry) # Select best entry from each group deduplicated = [] for (base_key, url_type), entries in grouped.items(): # Sort: entries with xpath first, then by URL length (shorter preferred) sorted_entries = sorted( entries, key=lambda e: (0 if e.get('xpath') else 1, len(e.get('url', ''))) ) best = sorted_entries[0] # Normalize the URL best_copy = best.copy() best_copy['url'] = normalize_url(best['url']) deduplicated.append(best_copy) return deduplicated def generate_platform_id(ghcid: str) -> str: """Generate platform_id URI from GHCID.""" ghcid_lower = ghcid.lower().replace('_', '-') return f"https://nde.nl/ontology/hc/platform/{ghcid_lower}-website" def extract_ghcid_from_file(file_path: Path) -> str | None: """Extract GHCID from filename.""" stem = file_path.stem # GHCID pattern: CC-RR-CCC-T-ABBREV (e.g., NL-DR-ASS-A-DA) if re.match(r'^[A-Z]{2}-[A-Z]{2,3}-[A-Z]{3}-[A-Z]-', stem): return stem return None def determine_platform_types(catalog_urls: list[dict]) -> list[str]: """ Determine platform types from detected catalog URLs. Returns list of DigitalPlatformTypeEnum values. """ types_set = set() for entry in catalog_urls: url_type = entry.get('type', '') mapping = CATALOG_TYPE_MAPPING.get(url_type, {}) for pt in mapping.get('platform_types', []): types_set.add(pt) # If we have catalog URLs but no specific types, add generic ONLINE_DATABASE if catalog_urls and not types_set: types_set.add('ONLINE_DATABASE') # Always include INSTITUTIONAL_WEBSITE as base type types_set.add('INSTITUTIONAL_WEBSITE') return sorted(list(types_set)) def categorize_urls_by_slot(catalog_urls: list[dict]) -> dict[str, list[str]]: """ Categorize URLs by target slot (collection_web_addresses vs inventory_web_addresses). """ slots = { 'collection_web_addresses': [], 'inventory_web_addresses': [] } seen_urls = set() for entry in catalog_urls: url = entry.get('url', '') if not url or url in seen_urls: continue url_type = entry.get('type', '') mapping = CATALOG_TYPE_MAPPING.get(url_type, {}) slot = mapping.get('slot', 'collection_web_addresses') slots[slot].append(url) seen_urls.add(url) return slots def transform_external_platforms(external_platforms: list[dict]) -> list[dict]: """ Transform external_archive_platforms to auxiliary_platforms structure. """ if not external_platforms: return [] auxiliary = [] seen_platforms = set() for entry in external_platforms: url = entry.get('url', '') platform_key = entry.get('platform', '') if not url or platform_key in seen_platforms: continue # Find mapping mapping = None for key, config in EXTERNAL_PLATFORM_MAPPING.items(): if key in platform_key or key in url: mapping = config break if not mapping: # Generic external platform mapping = { 'platform_name': platform_key.replace('.', ' ').title() if platform_key else 'External Platform', 'auxiliary_platform_type': 'WEB_PORTAL', 'description': 'External heritage platform' } aux_platform = { 'platform_name': mapping['platform_name'], 'platform_url': url, 'auxiliary_platform_type': mapping['auxiliary_platform_type'], 'platform_purpose': mapping.get('description', '') } auxiliary.append(aux_platform) seen_platforms.add(platform_key) return auxiliary def get_platform_name(data: dict, ghcid: str) -> str: """ Determine the best platform name from available data. Priority: 1. custodian_name.emic_name or custodian_name.name 2. crawl4ai_enrichment.title (cleaned) 3. GHCID-based fallback """ # Try custodian_name first custodian_name = data.get('custodian_name', {}) if isinstance(custodian_name, dict): name = custodian_name.get('emic_name') or custodian_name.get('name') if name: return f"{name} Website" # Try top-level name if data.get('name'): return f"{data['name']} Website" # Try crawl4ai title crawl4ai = data.get('crawl4ai_enrichment', {}) title = crawl4ai.get('title', '') if title: # Clean up title (remove common suffixes) cleaned = re.sub(r'\s*[-–|]\s*.+$', '', title).strip() if cleaned and len(cleaned) > 3: return f"{cleaned} Website" # Fallback to GHCID return f"{ghcid} Website" def transform_crawl4ai_to_digital_platform(data: dict, ghcid: str) -> dict | None: """ Transform crawl4ai_enrichment into digital_platform structure. Args: data: Full custodian YAML data ghcid: Global Heritage Custodian Identifier Returns: digital_platform dict or None if no crawl4ai_enrichment """ crawl4ai = data.get('crawl4ai_enrichment') if not crawl4ai: return None # Skip failed fetches - accept 2xx and 3xx status codes status_code = crawl4ai.get('status_code') if status_code is None or status_code >= 400: logger.debug(f"Skipping {ghcid}: HTTP status {status_code}") return None source_url = crawl4ai.get('source_url', '') if not source_url: return None # Get and deduplicate catalog URLs catalog_urls = crawl4ai.get('detected_catalog_urls', []) deduped_catalogs = deduplicate_catalog_urls(catalog_urls) # Determine platform types platform_types = determine_platform_types(deduped_catalogs) # Categorize URLs by slot url_slots = categorize_urls_by_slot(deduped_catalogs) # Transform external platforms external_platforms = crawl4ai.get('external_archive_platforms', []) auxiliary_platforms = transform_external_platforms(external_platforms) # Build digital_platform structure digital_platform = { 'platform_id': generate_platform_id(ghcid), 'platform_name': get_platform_name(data, ghcid), 'homepage_web_address': source_url, 'refers_to_custodian': f"https://nde.nl/ontology/hc/{ghcid.lower()}" } # Add platform types if we have more than just INSTITUTIONAL_WEBSITE if platform_types and len(platform_types) > 1: digital_platform['platform_type'] = platform_types elif platform_types: digital_platform['platform_type'] = platform_types # Add collection URLs if url_slots['collection_web_addresses']: digital_platform['collection_web_addresses'] = url_slots['collection_web_addresses'] # Add inventory URLs if url_slots['inventory_web_addresses']: digital_platform['inventory_web_addresses'] = url_slots['inventory_web_addresses'] # Add auxiliary platforms if auxiliary_platforms: digital_platform['auxiliary_platforms'] = auxiliary_platforms # Add transformation metadata digital_platform['_transformation_metadata'] = { 'source': 'crawl4ai_enrichment', 'transformation_date': datetime.now(timezone.utc).isoformat(), 'catalog_urls_original': len(catalog_urls), 'catalog_urls_deduplicated': len(deduped_catalogs), 'external_platforms_count': len(external_platforms) } return digital_platform def process_file(file_path: Path, dry_run: bool = False) -> dict: """ Process a single custodian YAML file. Returns: dict with processing statistics """ stats = { 'file': str(file_path.name), 'status': 'skipped', 'has_crawl4ai': False, 'has_digital_platform': False, 'catalog_urls': 0, 'external_platforms': 0 } try: # Read YAML file with open(file_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) if not data: stats['status'] = 'empty' return stats # Extract GHCID ghcid = extract_ghcid_from_file(file_path) if not ghcid: stats['status'] = 'no_ghcid' return stats # Check for crawl4ai_enrichment crawl4ai = data.get('crawl4ai_enrichment') if not crawl4ai: stats['status'] = 'no_crawl4ai' return stats stats['has_crawl4ai'] = True stats['catalog_urls'] = len(crawl4ai.get('detected_catalog_urls', [])) stats['external_platforms'] = len(crawl4ai.get('external_archive_platforms', [])) # Check if digital_platform_v2 already exists (avoid overwriting) if 'digital_platform_v2' in data: stats['has_digital_platform'] = True stats['status'] = 'already_transformed' return stats # Transform to digital_platform digital_platform = transform_crawl4ai_to_digital_platform(data, ghcid) if not digital_platform: stats['status'] = 'transform_failed' return stats # Add to data as digital_platform_v2 (to distinguish from any existing digital_platform) data['digital_platform_v2'] = digital_platform if not dry_run: # Write back to file with open(file_path, 'w', encoding='utf-8') as f: yaml.dump(data, f, allow_unicode=True, default_flow_style=False, sort_keys=False) stats['status'] = 'transformed' else: stats['status'] = 'would_transform' logger.info(f"[DRY-RUN] Would transform {file_path.name}") logger.debug(f" Platform types: {digital_platform.get('platform_type', [])}") logger.debug(f" Collection URLs: {len(digital_platform.get('collection_web_addresses', []))}") logger.debug(f" Inventory URLs: {len(digital_platform.get('inventory_web_addresses', []))}") logger.debug(f" Auxiliary platforms: {len(digital_platform.get('auxiliary_platforms', []))}") return stats except yaml.YAMLError as e: logger.error(f"YAML error in {file_path.name}: {e}") stats['status'] = 'yaml_error' return stats except Exception as e: logger.error(f"Error processing {file_path.name}: {e}") stats['status'] = 'error' return stats def main(): parser = argparse.ArgumentParser( description='Transform crawl4ai_enrichment to digital_platform structure' ) parser.add_argument( '--dry-run', action='store_true', help='Show what would be done without making changes' ) parser.add_argument( '--file', type=Path, help='Process a single file instead of all NL-*.yaml files' ) parser.add_argument( '--verbose', '-v', action='store_true', help='Enable verbose logging' ) args = parser.parse_args() if args.verbose: logging.getLogger().setLevel(logging.DEBUG) # Ensure logs directory exists Path('logs').mkdir(exist_ok=True) # Get files to process data_dir = Path('data/custodian') if args.file: if not args.file.exists(): logger.error(f"File not found: {args.file}") sys.exit(1) files = [args.file] else: files = sorted(data_dir.glob('NL-*.yaml')) logger.info(f"Processing {len(files)} files...") if args.dry_run: logger.info("DRY-RUN MODE - no files will be modified") # Process files stats_summary = defaultdict(int) total_catalog_urls = 0 total_external_platforms = 0 for i, file_path in enumerate(files): if (i + 1) % 100 == 0: logger.info(f"Progress: {i + 1}/{len(files)} files processed") stats = process_file(file_path, dry_run=args.dry_run) stats_summary[stats['status']] += 1 total_catalog_urls += stats.get('catalog_urls', 0) total_external_platforms += stats.get('external_platforms', 0) # Print summary logger.info("\n" + "=" * 60) logger.info("TRANSFORMATION SUMMARY") logger.info("=" * 60) logger.info(f"Total files processed: {len(files)}") for status, count in sorted(stats_summary.items()): logger.info(f" {status}: {count}") logger.info(f"\nTotal catalog URLs found: {total_catalog_urls}") logger.info(f"Total external platforms found: {total_external_platforms}") if args.dry_run: logger.info("\n[DRY-RUN] No files were modified. Run without --dry-run to apply changes.") if __name__ == '__main__': main()