glam/scripts/reprocess_linkup_archives.py
2025-12-16 09:02:52 +01:00

411 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Reprocess existing Linkup archives to extract founding dates that were missed
due to incomplete date parsing patterns.
This script:
1. Finds NL custodian files without TimeSpan AND without Wikidata inception
2. Checks if they have existing Linkup search results archived
3. Re-parses those archives with improved date extraction patterns
4. Updates the custodian files with newly extracted TimeSpan data
Usage:
python scripts/reprocess_linkup_archives.py --dry-run
python scripts/reprocess_linkup_archives.py --verbose
"""
import argparse
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
DATA_DIR = PROJECT_ROOT / "data" / "custodian"
WEB_ARCHIVE_DIR = DATA_DIR / "web"
class PreserveQuotesDumper(yaml.SafeDumper):
"""Custom YAML dumper that preserves string formatting."""
pass
def str_representer(dumper, data):
if '\n' in data:
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
PreserveQuotesDumper.add_representer(str, str_representer)
# ============================================================================
# Date Extraction (improved patterns)
# ============================================================================
def parse_year_from_text(text: str) -> Optional[Tuple[int, str]]:
"""
Extract founding year from text using pattern matching.
Returns (year, context) tuple or None.
"""
founding_patterns = [
(r'opgericht\s+(?:in\s+)?(\d{4})', 'opgericht'),
(r'gesticht\s+(?:in\s+)?(\d{4})', 'gesticht'),
(r'sinds\s+(\d{4})', 'sinds'),
(r'founded\s+(?:in\s+)?(\d{4})', 'founded'),
(r'established\s+(?:in\s+)?(\d{4})', 'established'),
(r'gestart\s+(?:in\s+)?(\d{4})', 'gestart'),
(r'begonnen\s+(?:in\s+)?(\d{4})', 'begonnen'),
(r'ontstaan\s+(?:in\s+)?(\d{4})', 'ontstaan'),
(r'geopend\s+(?:in\s+)?(\d{4})', 'geopend'),
(r'opened\s+(?:in\s+)?(\d{4})', 'opened'),
(r'opening\s+(?:in\s+)?(\d{4})', 'opening'),
# "in YYYY opgericht" pattern
(r'in\s+(\d{4})\s+opgericht', 'in_year_opgericht'),
(r'in\s+(\d{4})\s+gesticht', 'in_year_gesticht'),
(r'in\s+(\d{4})\s+geopend', 'in_year_geopend'),
]
text_lower = text.lower()
for pattern, context in founding_patterns:
match = re.search(pattern, text_lower)
if match:
year = int(match.group(1))
if 1500 <= year <= datetime.now().year:
return (year, context)
return None
def parse_full_date_from_text(text: str) -> Optional[Tuple[str, str]]:
"""
Extract full date (day/month/year) from text.
Returns (ISO date string, context) or None.
"""
dutch_months = {
'januari': 1, 'februari': 2, 'maart': 3, 'april': 4,
'mei': 5, 'juni': 6, 'juli': 7, 'augustus': 8,
'september': 9, 'oktober': 10, 'november': 11, 'december': 12
}
english_months = {
'january': 1, 'february': 2, 'march': 3, 'april': 4,
'may': 5, 'june': 6, 'july': 7, 'august': 8,
'september': 9, 'october': 10, 'november': 11, 'december': 12
}
text_lower = text.lower()
# Numeric format: "23-11-2005" or "23/11/2005" (DD-MM-YYYY, European)
numeric_pattern = r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})'
match = re.search(numeric_pattern, text)
if match:
day, month, year = int(match.group(1)), int(match.group(2)), int(match.group(3))
if 1 <= day <= 31 and 1 <= month <= 12 and 1500 <= year <= datetime.now().year:
return (f"{year}-{month:02d}-{day:02d}T00:00:00Z", "full_date_numeric")
# Dutch format: "15 maart 1985"
for month_name, month_num in dutch_months.items():
pattern = rf'(\d{{1,2}})\s+{month_name}\s+(\d{{4}})'
match = re.search(pattern, text_lower)
if match:
day, year = int(match.group(1)), int(match.group(2))
if 1 <= day <= 31 and 1500 <= year <= datetime.now().year:
return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_nl")
# English format: "March 15, 1985"
for month_name, month_num in english_months.items():
pattern = rf'{month_name}\s+(\d{{1,2}}),?\s+(\d{{4}})'
match = re.search(pattern, text_lower)
if match:
day, year = int(match.group(1)), int(match.group(2))
if 1 <= day <= 31 and 1500 <= year <= datetime.now().year:
return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_en")
return None
def extract_dates_from_archive(archive_data: Dict[str, Any]) -> Dict[str, Any]:
"""
Extract founding dates from archived Linkup search results.
"""
extracted = {
"founding_date": None,
"date_precision": None,
"source_url": None,
"source_urls": [],
"context": None,
"all_dates_found": [],
}
api_response = archive_data.get("api_response", {})
sources = api_response.get("sources", [])
for result in sources:
content = result.get("content", "") or result.get("snippet", "") or ""
url = result.get("url", "")
if not content:
continue
extracted["source_urls"].append(url)
# Try full date first (day precision)
full_date = parse_full_date_from_text(content)
if full_date:
date_str, context = full_date
extracted["all_dates_found"].append({
"date": date_str,
"precision": "day",
"url": url,
"context": context,
})
if not extracted["founding_date"]:
extracted["founding_date"] = date_str
extracted["date_precision"] = "day"
extracted["source_url"] = url
extracted["context"] = context
# Try year only (if no full date found yet)
if not extracted["founding_date"]:
year_result = parse_year_from_text(content)
if year_result:
year, context = year_result
date_str = f"{year}-01-01T00:00:00Z"
extracted["all_dates_found"].append({
"date": date_str,
"precision": "year",
"url": url,
"context": context,
})
if not extracted["founding_date"]:
extracted["founding_date"] = date_str
extracted["date_precision"] = "year"
extracted["source_url"] = url
extracted["context"] = context
return extracted
def create_timespan_from_extracted(extracted: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Create CIDOC-CRM TimeSpan from extracted date information."""
if not extracted.get("founding_date"):
return None
founding_date = extracted["founding_date"]
precision = extracted.get("date_precision", "year")
timespan = {}
if precision == "day":
timespan["begin_of_the_begin"] = founding_date
timespan["end_of_the_begin"] = founding_date
elif precision == "year":
year_match = re.match(r'(\d{4})', founding_date)
if year_match:
year = year_match.group(1)
timespan["begin_of_the_begin"] = f"{year}-01-01T00:00:00Z"
timespan["end_of_the_begin"] = f"{year}-12-31T23:59:59Z"
else:
timespan["begin_of_the_begin"] = founding_date
timespan["end_of_the_begin"] = founding_date
timespan["begin_of_the_end"] = None
timespan["end_of_the_end"] = None
return timespan
# ============================================================================
# Main Processing
# ============================================================================
def find_missing_timespan_files() -> List[Path]:
"""Find NL files without TimeSpan AND without Wikidata inception."""
missing = []
for filepath in DATA_DIR.glob("NL-*.yaml"):
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
if not data:
continue
# Skip if already has TimeSpan
timespan = data.get("timespan", {})
if timespan and timespan.get("begin_of_the_begin"):
continue
# Skip if has Wikidata inception
wikidata = data.get("wikidata_enrichment", {})
if wikidata.get("wikidata_inception"):
continue
missing.append(filepath)
except Exception:
continue
return missing
def get_archive_path(entry_index: int) -> Path:
"""Get archive directory for a custodian entry."""
entry_str = f"{entry_index:04d}"
return WEB_ARCHIVE_DIR / entry_str / "linkup"
def process_file(filepath: Path, dry_run: bool = False, verbose: bool = False) -> Tuple[bool, str]:
"""
Process a single file: check for existing archive, extract dates, update file.
"""
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as e:
return (False, f"Error reading: {e}")
if not data:
return (False, "Empty file")
entry_index = data.get("entry_index")
if entry_index is None:
return (False, "No entry_index")
# Check for existing archive
archive_dir = get_archive_path(entry_index)
if not archive_dir.exists():
return (False, "No linkup archive")
# Find most recent archive file
archive_files = sorted(archive_dir.glob("linkup_founding_*.json"))
if not archive_files:
return (False, "No archive files found")
latest_archive = archive_files[-1]
if verbose:
print(f" Using archive: {latest_archive.name}")
# Load archive
try:
with open(latest_archive, 'r', encoding='utf-8') as f:
archive_data = json.load(f)
except Exception as e:
return (False, f"Error loading archive: {e}")
# Extract dates with improved patterns
extracted = extract_dates_from_archive(archive_data)
if not extracted.get("founding_date"):
return (False, "No founding date found in archive")
if verbose:
print(f" Found date: {extracted['founding_date']} ({extracted['date_precision']}) via {extracted['context']}")
if dry_run:
return (True, f"Would add TimeSpan: {extracted['founding_date']}")
# Create TimeSpan
timespan = create_timespan_from_extracted(extracted)
if not timespan:
return (False, "Could not create TimeSpan")
# Add sources and notes
timespan["sources"] = [f"Linkup web search: {extracted.get('source_url', 'archived results')}"]
if extracted.get("context"):
timespan["notes"] = f"Found via pattern: {extracted['context']}"
# Update data
data["timespan"] = timespan
# Add provenance
if "provenance" not in data:
data["provenance"] = {"sources": {}}
if "sources" not in data["provenance"]:
data["provenance"]["sources"] = {}
data["provenance"]["sources"]["linkup_timespan_reprocessed"] = [{
"source_type": "linkup_web_search_reprocessed",
"reprocess_timestamp": datetime.now(timezone.utc).isoformat(),
"original_archive": str(latest_archive.relative_to(PROJECT_ROOT)),
"source_urls": extracted.get("source_urls", [])[:5],
"claims_extracted": ["timespan_begin"],
"data_tier": "TIER_4_INFERRED",
}]
# Write back
try:
with open(filepath, 'w', encoding='utf-8') as f:
yaml.dump(data, f, Dumper=PreserveQuotesDumper,
default_flow_style=False, allow_unicode=True, sort_keys=False)
except Exception as e:
return (False, f"Error writing: {e}")
return (True, f"Added TimeSpan: {extracted['founding_date']} ({extracted['date_precision']})")
def main():
parser = argparse.ArgumentParser(description="Reprocess Linkup archives for missing TimeSpan data")
parser.add_argument("--dry-run", action="store_true", help="Don't modify files")
parser.add_argument("--verbose", action="store_true", help="Show detailed progress")
parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process")
args = parser.parse_args()
print("Finding NL files without TimeSpan or Wikidata inception...")
missing_files = find_missing_timespan_files()
print(f"Found {len(missing_files)} files to check")
if args.limit:
missing_files = missing_files[:args.limit]
print(f"Processing first {args.limit} files")
stats = {
"processed": 0,
"enriched": 0,
"no_archive": 0,
"no_date_found": 0,
"errors": 0,
}
for filepath in missing_files:
stats["processed"] += 1
if args.verbose:
print(f"\n[{stats['processed']}/{len(missing_files)}] {filepath.name}")
success, message = process_file(filepath, dry_run=args.dry_run, verbose=args.verbose)
if success:
stats["enriched"] += 1
if not args.verbose:
print(f"{filepath.name}: {message}")
else:
if "No linkup archive" in message or "No archive files" in message:
stats["no_archive"] += 1
elif "No founding date" in message:
stats["no_date_found"] += 1
else:
stats["errors"] += 1
if args.verbose:
print(f" Skip: {message}")
print(f"\n{'='*60}")
print("Summary:")
print(f" Total processed: {stats['processed']}")
print(f" Enriched: {stats['enriched']}")
print(f" No archive: {stats['no_archive']}")
print(f" No date found: {stats['no_date_found']}")
print(f" Errors: {stats['errors']}")
if args.dry_run:
print("\n(Dry run - no files modified)")
if __name__ == "__main__":
main()