411 lines
14 KiB
Python
411 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Reprocess existing Linkup archives to extract founding dates that were missed
|
|
due to incomplete date parsing patterns.
|
|
|
|
This script:
|
|
1. Finds NL custodian files without TimeSpan AND without Wikidata inception
|
|
2. Checks if they have existing Linkup search results archived
|
|
3. Re-parses those archives with improved date extraction patterns
|
|
4. Updates the custodian files with newly extracted TimeSpan data
|
|
|
|
Usage:
|
|
python scripts/reprocess_linkup_archives.py --dry-run
|
|
python scripts/reprocess_linkup_archives.py --verbose
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import yaml
|
|
|
|
# Project paths
|
|
PROJECT_ROOT = Path(__file__).parent.parent
|
|
DATA_DIR = PROJECT_ROOT / "data" / "custodian"
|
|
WEB_ARCHIVE_DIR = DATA_DIR / "web"
|
|
|
|
|
|
class PreserveQuotesDumper(yaml.SafeDumper):
|
|
"""Custom YAML dumper that preserves string formatting."""
|
|
pass
|
|
|
|
|
|
def str_representer(dumper, data):
|
|
if '\n' in data:
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data, style='|')
|
|
return dumper.represent_scalar('tag:yaml.org,2002:str', data)
|
|
|
|
|
|
PreserveQuotesDumper.add_representer(str, str_representer)
|
|
|
|
|
|
# ============================================================================
|
|
# Date Extraction (improved patterns)
|
|
# ============================================================================
|
|
|
|
def parse_year_from_text(text: str) -> Optional[Tuple[int, str]]:
|
|
"""
|
|
Extract founding year from text using pattern matching.
|
|
Returns (year, context) tuple or None.
|
|
"""
|
|
founding_patterns = [
|
|
(r'opgericht\s+(?:in\s+)?(\d{4})', 'opgericht'),
|
|
(r'gesticht\s+(?:in\s+)?(\d{4})', 'gesticht'),
|
|
(r'sinds\s+(\d{4})', 'sinds'),
|
|
(r'founded\s+(?:in\s+)?(\d{4})', 'founded'),
|
|
(r'established\s+(?:in\s+)?(\d{4})', 'established'),
|
|
(r'gestart\s+(?:in\s+)?(\d{4})', 'gestart'),
|
|
(r'begonnen\s+(?:in\s+)?(\d{4})', 'begonnen'),
|
|
(r'ontstaan\s+(?:in\s+)?(\d{4})', 'ontstaan'),
|
|
(r'geopend\s+(?:in\s+)?(\d{4})', 'geopend'),
|
|
(r'opened\s+(?:in\s+)?(\d{4})', 'opened'),
|
|
(r'opening\s+(?:in\s+)?(\d{4})', 'opening'),
|
|
# "in YYYY opgericht" pattern
|
|
(r'in\s+(\d{4})\s+opgericht', 'in_year_opgericht'),
|
|
(r'in\s+(\d{4})\s+gesticht', 'in_year_gesticht'),
|
|
(r'in\s+(\d{4})\s+geopend', 'in_year_geopend'),
|
|
]
|
|
|
|
text_lower = text.lower()
|
|
|
|
for pattern, context in founding_patterns:
|
|
match = re.search(pattern, text_lower)
|
|
if match:
|
|
year = int(match.group(1))
|
|
if 1500 <= year <= datetime.now().year:
|
|
return (year, context)
|
|
|
|
return None
|
|
|
|
|
|
def parse_full_date_from_text(text: str) -> Optional[Tuple[str, str]]:
|
|
"""
|
|
Extract full date (day/month/year) from text.
|
|
Returns (ISO date string, context) or None.
|
|
"""
|
|
dutch_months = {
|
|
'januari': 1, 'februari': 2, 'maart': 3, 'april': 4,
|
|
'mei': 5, 'juni': 6, 'juli': 7, 'augustus': 8,
|
|
'september': 9, 'oktober': 10, 'november': 11, 'december': 12
|
|
}
|
|
|
|
english_months = {
|
|
'january': 1, 'february': 2, 'march': 3, 'april': 4,
|
|
'may': 5, 'june': 6, 'july': 7, 'august': 8,
|
|
'september': 9, 'october': 10, 'november': 11, 'december': 12
|
|
}
|
|
|
|
text_lower = text.lower()
|
|
|
|
# Numeric format: "23-11-2005" or "23/11/2005" (DD-MM-YYYY, European)
|
|
numeric_pattern = r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})'
|
|
match = re.search(numeric_pattern, text)
|
|
if match:
|
|
day, month, year = int(match.group(1)), int(match.group(2)), int(match.group(3))
|
|
if 1 <= day <= 31 and 1 <= month <= 12 and 1500 <= year <= datetime.now().year:
|
|
return (f"{year}-{month:02d}-{day:02d}T00:00:00Z", "full_date_numeric")
|
|
|
|
# Dutch format: "15 maart 1985"
|
|
for month_name, month_num in dutch_months.items():
|
|
pattern = rf'(\d{{1,2}})\s+{month_name}\s+(\d{{4}})'
|
|
match = re.search(pattern, text_lower)
|
|
if match:
|
|
day, year = int(match.group(1)), int(match.group(2))
|
|
if 1 <= day <= 31 and 1500 <= year <= datetime.now().year:
|
|
return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_nl")
|
|
|
|
# English format: "March 15, 1985"
|
|
for month_name, month_num in english_months.items():
|
|
pattern = rf'{month_name}\s+(\d{{1,2}}),?\s+(\d{{4}})'
|
|
match = re.search(pattern, text_lower)
|
|
if match:
|
|
day, year = int(match.group(1)), int(match.group(2))
|
|
if 1 <= day <= 31 and 1500 <= year <= datetime.now().year:
|
|
return (f"{year}-{month_num:02d}-{day:02d}T00:00:00Z", "full_date_en")
|
|
|
|
return None
|
|
|
|
|
|
def extract_dates_from_archive(archive_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract founding dates from archived Linkup search results.
|
|
"""
|
|
extracted = {
|
|
"founding_date": None,
|
|
"date_precision": None,
|
|
"source_url": None,
|
|
"source_urls": [],
|
|
"context": None,
|
|
"all_dates_found": [],
|
|
}
|
|
|
|
api_response = archive_data.get("api_response", {})
|
|
sources = api_response.get("sources", [])
|
|
|
|
for result in sources:
|
|
content = result.get("content", "") or result.get("snippet", "") or ""
|
|
url = result.get("url", "")
|
|
|
|
if not content:
|
|
continue
|
|
|
|
extracted["source_urls"].append(url)
|
|
|
|
# Try full date first (day precision)
|
|
full_date = parse_full_date_from_text(content)
|
|
if full_date:
|
|
date_str, context = full_date
|
|
extracted["all_dates_found"].append({
|
|
"date": date_str,
|
|
"precision": "day",
|
|
"url": url,
|
|
"context": context,
|
|
})
|
|
if not extracted["founding_date"]:
|
|
extracted["founding_date"] = date_str
|
|
extracted["date_precision"] = "day"
|
|
extracted["source_url"] = url
|
|
extracted["context"] = context
|
|
|
|
# Try year only (if no full date found yet)
|
|
if not extracted["founding_date"]:
|
|
year_result = parse_year_from_text(content)
|
|
if year_result:
|
|
year, context = year_result
|
|
date_str = f"{year}-01-01T00:00:00Z"
|
|
extracted["all_dates_found"].append({
|
|
"date": date_str,
|
|
"precision": "year",
|
|
"url": url,
|
|
"context": context,
|
|
})
|
|
if not extracted["founding_date"]:
|
|
extracted["founding_date"] = date_str
|
|
extracted["date_precision"] = "year"
|
|
extracted["source_url"] = url
|
|
extracted["context"] = context
|
|
|
|
return extracted
|
|
|
|
|
|
def create_timespan_from_extracted(extracted: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
"""Create CIDOC-CRM TimeSpan from extracted date information."""
|
|
if not extracted.get("founding_date"):
|
|
return None
|
|
|
|
founding_date = extracted["founding_date"]
|
|
precision = extracted.get("date_precision", "year")
|
|
|
|
timespan = {}
|
|
|
|
if precision == "day":
|
|
timespan["begin_of_the_begin"] = founding_date
|
|
timespan["end_of_the_begin"] = founding_date
|
|
elif precision == "year":
|
|
year_match = re.match(r'(\d{4})', founding_date)
|
|
if year_match:
|
|
year = year_match.group(1)
|
|
timespan["begin_of_the_begin"] = f"{year}-01-01T00:00:00Z"
|
|
timespan["end_of_the_begin"] = f"{year}-12-31T23:59:59Z"
|
|
else:
|
|
timespan["begin_of_the_begin"] = founding_date
|
|
timespan["end_of_the_begin"] = founding_date
|
|
|
|
timespan["begin_of_the_end"] = None
|
|
timespan["end_of_the_end"] = None
|
|
|
|
return timespan
|
|
|
|
|
|
# ============================================================================
|
|
# Main Processing
|
|
# ============================================================================
|
|
|
|
def find_missing_timespan_files() -> List[Path]:
|
|
"""Find NL files without TimeSpan AND without Wikidata inception."""
|
|
missing = []
|
|
for filepath in DATA_DIR.glob("NL-*.yaml"):
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if not data:
|
|
continue
|
|
|
|
# Skip if already has TimeSpan
|
|
timespan = data.get("timespan", {})
|
|
if timespan and timespan.get("begin_of_the_begin"):
|
|
continue
|
|
|
|
# Skip if has Wikidata inception
|
|
wikidata = data.get("wikidata_enrichment", {})
|
|
if wikidata.get("wikidata_inception"):
|
|
continue
|
|
|
|
missing.append(filepath)
|
|
except Exception:
|
|
continue
|
|
|
|
return missing
|
|
|
|
|
|
def get_archive_path(entry_index: int) -> Path:
|
|
"""Get archive directory for a custodian entry."""
|
|
entry_str = f"{entry_index:04d}"
|
|
return WEB_ARCHIVE_DIR / entry_str / "linkup"
|
|
|
|
|
|
def process_file(filepath: Path, dry_run: bool = False, verbose: bool = False) -> Tuple[bool, str]:
|
|
"""
|
|
Process a single file: check for existing archive, extract dates, update file.
|
|
"""
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
except Exception as e:
|
|
return (False, f"Error reading: {e}")
|
|
|
|
if not data:
|
|
return (False, "Empty file")
|
|
|
|
entry_index = data.get("entry_index")
|
|
if entry_index is None:
|
|
return (False, "No entry_index")
|
|
|
|
# Check for existing archive
|
|
archive_dir = get_archive_path(entry_index)
|
|
if not archive_dir.exists():
|
|
return (False, "No linkup archive")
|
|
|
|
# Find most recent archive file
|
|
archive_files = sorted(archive_dir.glob("linkup_founding_*.json"))
|
|
if not archive_files:
|
|
return (False, "No archive files found")
|
|
|
|
latest_archive = archive_files[-1]
|
|
|
|
if verbose:
|
|
print(f" Using archive: {latest_archive.name}")
|
|
|
|
# Load archive
|
|
try:
|
|
with open(latest_archive, 'r', encoding='utf-8') as f:
|
|
archive_data = json.load(f)
|
|
except Exception as e:
|
|
return (False, f"Error loading archive: {e}")
|
|
|
|
# Extract dates with improved patterns
|
|
extracted = extract_dates_from_archive(archive_data)
|
|
|
|
if not extracted.get("founding_date"):
|
|
return (False, "No founding date found in archive")
|
|
|
|
if verbose:
|
|
print(f" Found date: {extracted['founding_date']} ({extracted['date_precision']}) via {extracted['context']}")
|
|
|
|
if dry_run:
|
|
return (True, f"Would add TimeSpan: {extracted['founding_date']}")
|
|
|
|
# Create TimeSpan
|
|
timespan = create_timespan_from_extracted(extracted)
|
|
if not timespan:
|
|
return (False, "Could not create TimeSpan")
|
|
|
|
# Add sources and notes
|
|
timespan["sources"] = [f"Linkup web search: {extracted.get('source_url', 'archived results')}"]
|
|
if extracted.get("context"):
|
|
timespan["notes"] = f"Found via pattern: {extracted['context']}"
|
|
|
|
# Update data
|
|
data["timespan"] = timespan
|
|
|
|
# Add provenance
|
|
if "provenance" not in data:
|
|
data["provenance"] = {"sources": {}}
|
|
if "sources" not in data["provenance"]:
|
|
data["provenance"]["sources"] = {}
|
|
|
|
data["provenance"]["sources"]["linkup_timespan_reprocessed"] = [{
|
|
"source_type": "linkup_web_search_reprocessed",
|
|
"reprocess_timestamp": datetime.now(timezone.utc).isoformat(),
|
|
"original_archive": str(latest_archive.relative_to(PROJECT_ROOT)),
|
|
"source_urls": extracted.get("source_urls", [])[:5],
|
|
"claims_extracted": ["timespan_begin"],
|
|
"data_tier": "TIER_4_INFERRED",
|
|
}]
|
|
|
|
# Write back
|
|
try:
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
yaml.dump(data, f, Dumper=PreserveQuotesDumper,
|
|
default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
except Exception as e:
|
|
return (False, f"Error writing: {e}")
|
|
|
|
return (True, f"Added TimeSpan: {extracted['founding_date']} ({extracted['date_precision']})")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Reprocess Linkup archives for missing TimeSpan data")
|
|
parser.add_argument("--dry-run", action="store_true", help="Don't modify files")
|
|
parser.add_argument("--verbose", action="store_true", help="Show detailed progress")
|
|
parser.add_argument("--limit", type=int, default=None, help="Limit number of files to process")
|
|
args = parser.parse_args()
|
|
|
|
print("Finding NL files without TimeSpan or Wikidata inception...")
|
|
missing_files = find_missing_timespan_files()
|
|
print(f"Found {len(missing_files)} files to check")
|
|
|
|
if args.limit:
|
|
missing_files = missing_files[:args.limit]
|
|
print(f"Processing first {args.limit} files")
|
|
|
|
stats = {
|
|
"processed": 0,
|
|
"enriched": 0,
|
|
"no_archive": 0,
|
|
"no_date_found": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
for filepath in missing_files:
|
|
stats["processed"] += 1
|
|
|
|
if args.verbose:
|
|
print(f"\n[{stats['processed']}/{len(missing_files)}] {filepath.name}")
|
|
|
|
success, message = process_file(filepath, dry_run=args.dry_run, verbose=args.verbose)
|
|
|
|
if success:
|
|
stats["enriched"] += 1
|
|
if not args.verbose:
|
|
print(f"✓ {filepath.name}: {message}")
|
|
else:
|
|
if "No linkup archive" in message or "No archive files" in message:
|
|
stats["no_archive"] += 1
|
|
elif "No founding date" in message:
|
|
stats["no_date_found"] += 1
|
|
else:
|
|
stats["errors"] += 1
|
|
if args.verbose:
|
|
print(f" Skip: {message}")
|
|
|
|
print(f"\n{'='*60}")
|
|
print("Summary:")
|
|
print(f" Total processed: {stats['processed']}")
|
|
print(f" Enriched: {stats['enriched']}")
|
|
print(f" No archive: {stats['no_archive']}")
|
|
print(f" No date found: {stats['no_date_found']}")
|
|
print(f" Errors: {stats['errors']}")
|
|
|
|
if args.dry_run:
|
|
print("\n(Dry run - no files modified)")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|