#!/usr/bin/env python3 """ Enrich Dutch heritage custodian files with TimeSpan data using Linkup web search. This script: 1. Identifies custodian files without temporal data 2. Searches for founding/establishment dates via Linkup API 3. Archives retrieved webpages in data/custodian/web/{entry_index}/linkup/ 4. Extracts dates and adds CIDOC-CRM E52_Time-Span compliant data 5. Adds proper provenance tracking TimeSpan follows CIDOC-CRM E52_Time-Span pattern: - begin_of_the_begin: Earliest possible start (P82a) - end_of_the_begin: Latest possible start (P81a) - begin_of_the_end: Earliest possible end (P81b) - end_of_the_end: Latest possible end (P82b) Usage: python scripts/enrich_timespan_linkup.py [--dry-run] [--verbose] [--limit N] python scripts/enrich_timespan_linkup.py --resume # Resume from checkpoint """ import argparse import json import os import re import sys import time from datetime import datetime, timezone from pathlib import Path from typing import Optional, Dict, Any, List, Tuple import yaml # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) # Load environment variables from .env try: from dotenv import load_dotenv load_dotenv(PROJECT_ROOT / ".env") except ImportError: pass # dotenv not required if env vars set externally # Check for required dependencies try: import httpx except ImportError: print("ERROR: httpx not installed. Run: pip install httpx") sys.exit(1) # ============================================================================ # Configuration # ============================================================================ LINKUP_API_URL = "https://api.linkup.so/v1/search" CHECKPOINT_FILE = PROJECT_ROOT / "data/custodian/.linkup_timespan_checkpoint.json" CUSTODIAN_DIR = PROJECT_ROOT / "data/custodian" WEB_ARCHIVE_DIR = PROJECT_ROOT / "data/custodian/web" # Rate limiting REQUESTS_PER_MINUTE = 10 REQUEST_DELAY = 60.0 / REQUESTS_PER_MINUTE # 6 seconds between requests # Dutch keywords for founding dates FOUNDING_KEYWORDS_NL = [ "opgericht", "gesticht", "sinds", "ontstaan", "oprichting", "begon", "gestart", "geopend", ] FOUNDING_KEYWORDS_EN = [ "founded", "established", "since", "opened", "created", "started", ] # ============================================================================ # YAML Handling (preserve formatting) # ============================================================================ class PreserveQuotesLoader(yaml.SafeLoader): pass class PreserveQuotesDumper(yaml.SafeDumper): pass def str_representer(dumper, data): if '\n' in data: return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|') return dumper.represent_scalar('tag:yaml.org,2002:str', data) PreserveQuotesDumper.add_representer(str, str_representer) # ============================================================================ # Checkpoint Management # ============================================================================ def load_checkpoint() -> Dict[str, Any]: """Load processing checkpoint.""" if CHECKPOINT_FILE.exists(): with open(CHECKPOINT_FILE, 'r') as f: return json.load(f) return {"processed": [], "last_timestamp": None, "stats": {}} def save_checkpoint(checkpoint: Dict[str, Any]): """Save processing checkpoint.""" checkpoint["last_timestamp"] = datetime.now(timezone.utc).isoformat() CHECKPOINT_FILE.parent.mkdir(parents=True, exist_ok=True) with open(CHECKPOINT_FILE, 'w') as f: json.dump(checkpoint, f, indent=2) # ============================================================================ # Date Parsing # ============================================================================ def parse_year_from_text(text: str) -> Optional[Tuple[int, str]]: """ Extract year from text with context. Returns (year, context) tuple or None. """ # Pattern: "opgericht in 1985" / "founded in 1985" / "since 1985" / etc. patterns = [ # Dutch patterns (r'opgericht\s+(?:in\s+)?(\d{4})', 'opgericht'), (r'gesticht\s+(?:in\s+)?(\d{4})', 'gesticht'), (r'sinds\s+(\d{4})', 'sinds'), (r'oprichting\s+(?:in\s+)?(\d{4})', 'oprichting'), (r'geopend\s+(?:in\s+)?(\d{4})', 'geopend'), (r'begon\s+(?:in\s+)?(\d{4})', 'begon'), (r'ontstaan\s+(?:in\s+)?(\d{4})', 'ontstaan'), # English patterns (r'founded\s+(?:in\s+)?(\d{4})', 'founded'), (r'established\s+(?:in\s+)?(\d{4})', 'established'), (r'since\s+(\d{4})', 'since'), (r'opened\s+(?:in\s+)?(\d{4})', 'opened'), (r'created\s+(?:in\s+)?(\d{4})', 'created'), # Generic year with context (r'in\s+(\d{4})\s+(?:opgericht|gesticht|geopend)', 'year_context'), ] text_lower = text.lower() for pattern, context in patterns: match = re.search(pattern, text_lower) if match: year = int(match.group(1)) # Validate year range (1500 - current year) if 1500 <= year <= datetime.now().year: return (year, context) return None def parse_full_date_from_text(text: str) -> Optional[Tuple[str, str]]: """ Extract full date (day/month/year) from text. Returns (ISO date string, context) or None. """ # Pattern: "15 maart 1985" / "March 15, 1985" / "15-03-1985" / "23-11-2005" dutch_months = { 'januari': 1, 'februari': 2, 'maart': 3, 'april': 4, 'mei': 5, 'juni': 6, 'juli': 7, 'augustus': 8, 'september': 9, 'oktober': 10, 'november': 11, 'december': 12 } english_months = { 'january': 1, 'february': 2, 'march': 3, 'april': 4, 'may': 5, 'june': 6, 'july': 7, 'august': 8, 'september': 9, 'october': 10, 'november': 11, 'december': 12 } text_lower = text.lower() # Numeric format: "23-11-2005" or "23/11/2005" (DD-MM-YYYY, European) numeric_pattern = r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})' match = re.search(numeric_pattern, text) if match: day, month, year = int(match.group(1)), int(match.group(2)), int(match.group(3)) if 1 <= day <= 31 and 1 <= month <= 12 and 1500 <= year <= datetime.now().year: return (f"{year}-{month:02d}-{day:02d}T00:00:00Z", "full_date_numeric") # Dutch format: "15 maart 1985" for month_name, month_num in dutch_months.items(): pattern = rf'(\d{{1,2}})\s+{month_name}\s+(\d{{4}})' match = re.search(pattern, text_lower) if match: day, year = int(match.group(1)), int(match.group(2)) if 1 <= day <= 31 and 1500 <= year <= datetime.now().year: return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_nl") # English format: "March 15, 1985" for month_name, month_num in english_months.items(): pattern = rf'{month_name}\s+(\d{{1,2}}),?\s+(\d{{4}})' match = re.search(pattern, text_lower) if match: day, year = int(match.group(1)), int(match.group(2)) if 1 <= day <= 31 and 1500 <= year <= datetime.now().year: return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_en") return None def extract_dates_from_linkup_results(results: List[Dict]) -> Dict[str, Any]: """ Extract founding dates from Linkup search results. Returns dict with: - founding_date: ISO date string or None - date_precision: 'year', 'month', 'day' - source_url: URL where date was found - context: snippet where date appears """ extracted = { "founding_date": None, "date_precision": None, "source_url": None, "source_urls": [], "context": None, "all_dates_found": [], } for result in results: content = result.get("content", "") or result.get("snippet", "") or "" url = result.get("url", "") if not content: continue # Try full date first full_date = parse_full_date_from_text(content) if full_date: date_str, context = full_date extracted["all_dates_found"].append({ "date": date_str, "precision": "day", "url": url, "context": context, }) if not extracted["founding_date"]: extracted["founding_date"] = date_str extracted["date_precision"] = "day" extracted["source_url"] = url extracted["context"] = context continue # Try year only year_result = parse_year_from_text(content) if year_result: year, context = year_result year_date = f"{year}-01-01T00:00:00Z" extracted["all_dates_found"].append({ "date": year_date, "year": year, "precision": "year", "url": url, "context": context, }) if not extracted["founding_date"]: extracted["founding_date"] = year_date extracted["date_precision"] = "year" extracted["source_url"] = url extracted["context"] = context if url and url not in extracted["source_urls"]: extracted["source_urls"].append(url) return extracted # ============================================================================ # Linkup API Integration # ============================================================================ def search_linkup(query: str, api_key: str) -> Optional[Dict[str, Any]]: """ Search using Linkup API. Returns search results or None on error. """ headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } payload = { "q": query, "depth": "standard", # or "deep" for more thorough search "outputType": "sourcedAnswer", } try: with httpx.Client(timeout=30.0) as client: response = client.post(LINKUP_API_URL, json=payload, headers=headers) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: print(f" Linkup API error: {e.response.status_code} - {e.response.text[:200]}") return None except Exception as e: print(f" Linkup request error: {e}") return None def build_search_query(custodian_data: Dict[str, Any]) -> str: """Build optimal search query for finding founding date.""" # Get organization name org_name = ( custodian_data.get("custodian_name", {}).get("claim_value") or custodian_data.get("original_entry", {}).get("organisatie") or custodian_data.get("google_maps_enrichment", {}).get("name") or "" ) # Get location city = ( custodian_data.get("location", {}).get("city") or custodian_data.get("original_entry", {}).get("plaatsnaam_bezoekadres") or "" ) if not org_name: return "" # Build query: "Organization Name" city opgericht query = f'"{org_name}"' if city: query += f" {city}" query += " opgericht OR gesticht OR sinds" return query # ============================================================================ # Web Archive Management # ============================================================================ def get_archive_path(entry_index: int) -> Path: """Get archive directory for a custodian entry.""" entry_str = f"{entry_index:04d}" return WEB_ARCHIVE_DIR / entry_str / "linkup" def archive_linkup_results(entry_index: int, query: str, results: Dict[str, Any]) -> Path: """ Archive Linkup search results to filesystem. Returns path to archived file. """ archive_dir = get_archive_path(entry_index) archive_dir.mkdir(parents=True, exist_ok=True) timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") filename = f"linkup_founding_{timestamp}.json" archive_path = archive_dir / filename archive_data = { "query": query, "fetch_timestamp": datetime.now(timezone.utc).isoformat(), "api_response": results, } with open(archive_path, 'w', encoding='utf-8') as f: json.dump(archive_data, f, indent=2, ensure_ascii=False) return archive_path # ============================================================================ # TimeSpan Creation # ============================================================================ def create_timespan_from_extracted(extracted: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Create CIDOC-CRM TimeSpan from extracted date information. Handles uncertainty based on precision: - 'day': begin_of_the_begin == end_of_the_begin (precise) - 'year': begin_of_the_begin = Jan 1, end_of_the_begin = Dec 31 """ if not extracted.get("founding_date"): return None founding_date = extracted["founding_date"] precision = extracted.get("date_precision", "year") timespan = {} if precision == "day": # Precise date known timespan["begin_of_the_begin"] = founding_date timespan["end_of_the_begin"] = founding_date elif precision == "year": # Year known, month/day uncertain # Extract year and create range year_match = re.match(r'(\d{4})', founding_date) if year_match: year = year_match.group(1) timespan["begin_of_the_begin"] = f"{year}-01-01T00:00:00Z" timespan["end_of_the_begin"] = f"{year}-12-31T23:59:59Z" else: # Approximate date timespan["begin_of_the_begin"] = founding_date timespan["end_of_the_begin"] = founding_date # End dates null (still operating) timespan["begin_of_the_end"] = None timespan["end_of_the_end"] = None return timespan # ============================================================================ # File Processing # ============================================================================ def needs_timespan_enrichment(data: Dict[str, Any]) -> bool: """Check if custodian needs Linkup TimeSpan enrichment.""" # Skip if already has timespan with begin dates existing = data.get("timespan", {}) if existing and existing.get("begin_of_the_begin"): return False # Skip if has Wikidata inception wikidata = data.get("wikidata_enrichment", {}) if wikidata.get("wikidata_inception"): return False return True def process_file( filepath: Path, api_key: str, dry_run: bool = False, verbose: bool = False ) -> Tuple[bool, str]: """ Process a single custodian file. Returns (success, status_message) tuple. """ try: with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except Exception as e: return (False, f"Error reading file: {e}") if not data: return (False, "Empty file") # Check if needs enrichment if not needs_timespan_enrichment(data): return (False, "Already has temporal data") # Get entry index entry_index = data.get("entry_index") if entry_index is None: return (False, "No entry_index") # Build search query query = build_search_query(data) if not query: return (False, "Could not build search query") if verbose: org_name = data.get("original_entry", {}).get("organisatie", filepath.stem) print(f" Searching: {query[:80]}...") if dry_run: return (True, "Would search Linkup (dry run)") # Search Linkup results = search_linkup(query, api_key) if not results: return (False, "Linkup search failed") # Archive results FIRST (before analysis) archive_path = archive_linkup_results(entry_index, query, results) if verbose: print(f" Archived to: {archive_path.relative_to(PROJECT_ROOT)}") # Extract dates from results sources = results.get("sources", []) if not sources: return (False, "No sources in Linkup results") extracted = extract_dates_from_linkup_results(sources) if not extracted.get("founding_date"): return (False, "No founding date found in results") # Create TimeSpan timespan = create_timespan_from_extracted(extracted) if not timespan: return (False, "Could not create TimeSpan") # Add sources to timespan timespan["sources"] = [f"Linkup web search: {extracted.get('source_url', 'multiple sources')}"] if extracted.get("context"): timespan["notes"] = f"Found via pattern: {extracted['context']}" # Update data data["timespan"] = timespan # Add provenance if "provenance" not in data: data["provenance"] = {"sources": {}} if "sources" not in data["provenance"]: data["provenance"]["sources"] = {} data["provenance"]["sources"]["linkup_timespan"] = [{ "source_type": "linkup_web_search", "fetch_timestamp": datetime.now(timezone.utc).isoformat(), "search_query": query, "source_urls": extracted.get("source_urls", [])[:5], # Limit to 5 URLs "claims_extracted": ["timespan_begin"], "data_tier": "TIER_4_INFERRED", "archive_path": str(archive_path.relative_to(PROJECT_ROOT)), }] # Write back try: with open(filepath, 'w', encoding='utf-8') as f: yaml.dump(data, f, Dumper=PreserveQuotesDumper, default_flow_style=False, allow_unicode=True, sort_keys=False) except Exception as e: return (False, f"Error writing file: {e}") if verbose: print(f" Added TimeSpan: begin={timespan.get('begin_of_the_begin')}") return (True, f"Added TimeSpan from {extracted.get('date_precision', 'unknown')} precision date") # ============================================================================ # Main # ============================================================================ def main(): parser = argparse.ArgumentParser(description='Enrich custodian files with TimeSpan via Linkup') parser.add_argument('--dry-run', action='store_true', help='Do not write changes or make API calls') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--limit', type=int, default=0, help='Limit number of files to process (0=unlimited)') parser.add_argument('--resume', action='store_true', help='Resume from checkpoint') parser.add_argument('--pattern', default='NL-*.yaml', help='File pattern to match (default: NL-*.yaml)') args = parser.parse_args() # Get API key api_key = os.environ.get("LINKUP_API_KEY", "") if not api_key and not args.dry_run: print("ERROR: LINKUP_API_KEY environment variable not set") print("Set it with: export LINKUP_API_KEY=your_key") sys.exit(1) # Load checkpoint if resuming checkpoint = load_checkpoint() if args.resume else {"processed": [], "stats": {}} processed_files = set(checkpoint.get("processed", [])) print(f"TimeSpan Enrichment via Linkup") print(f"=" * 50) print(f"Pattern: {args.pattern}") print(f"Dry run: {args.dry_run}") print(f"Limit: {args.limit if args.limit > 0 else 'unlimited'}") print(f"Resume: {args.resume} ({len(processed_files)} already processed)") print() # Collect files to process files = list(CUSTODIAN_DIR.glob(args.pattern)) files.sort() print(f"Found {len(files)} files matching pattern") # Filter out already processed if args.resume: files = [f for f in files if f.name not in processed_files] print(f"After filtering: {len(files)} files to process") # Apply limit if args.limit > 0: files = files[:args.limit] print(f"Limited to: {len(files)} files") print() # Stats stats = { "total": len(files), "enriched": 0, "skipped": 0, "errors": 0, } try: for i, filepath in enumerate(files, 1): print(f"[{i}/{len(files)}] {filepath.name}") success, message = process_file(filepath, api_key, args.dry_run, args.verbose) if success: stats["enriched"] += 1 if args.verbose: print(f" ✓ {message}") else: if "Already has" in message or "No entry_index" in message: stats["skipped"] += 1 else: stats["errors"] += 1 if args.verbose: print(f" - {message}") # Update checkpoint checkpoint["processed"].append(filepath.name) checkpoint["stats"] = stats # Save checkpoint periodically if i % 10 == 0: save_checkpoint(checkpoint) # Rate limiting if not args.dry_run and i < len(files): time.sleep(REQUEST_DELAY) except KeyboardInterrupt: print("\n\nInterrupted! Saving checkpoint...") save_checkpoint(checkpoint) # Final save save_checkpoint(checkpoint) print() print(f"=" * 50) print(f"Results:") print(f" Total processed: {stats['total']}") print(f" Enriched with TimeSpan: {stats['enriched']}") print(f" Skipped (existing data): {stats['skipped']}") print(f" Errors: {stats['errors']}") if args.dry_run: print("\n(Dry run - no files were modified)") if __name__ == '__main__': main()