glam/scripts/extract_timeline_events.py
2025-12-16 20:27:39 +01:00

513 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Extract timeline events from Linkup API JSON data and update custodian YAML files.
This script:
1. Parses the entry-to-GHCID mapping file
2. Extracts historical events from Linkup JSON files (answer text only by default)
3. Updates custodian YAML files with events array
4. Filters events by relevance to target institution and date validity
Event types extracted:
- founding: opgericht, gesticht, ontstaan
- reopening: heropend, heropening
- merger: fusie, fuseerde, samengevoegd, ging op in
- relocation: verhuisd, verplaatst, nieuwe locatie, betrok nieuw pand
- expansion: uitgebreid, verbouwd, nieuwbouw, gemoderniseerd
- name_change: hernoemd, nieuwe naam
- dissolution: opgeheven, gesloten
- predecessor: voortzetting van, opvolger van
- friends_org: vrienden van, stichting vrienden
Filtering:
- Only dates >= 1800 (filters out historical references)
- Only events from main answer text (most reliable)
- Uses institution name from query to filter relevant sources
"""
import json
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Any
import yaml
# Minimum year for institutional events (filters out historical references like "1648 landrecht")
MIN_YEAR = 1800
# Dutch month names to numbers
DUTCH_MONTHS = {
'januari': '01', 'februari': '02', 'maart': '03', 'april': '04',
'mei': '05', 'juni': '06', 'juli': '07', 'augustus': '08',
'september': '09', 'oktober': '10', 'november': '11', 'december': '12'
}
# Event type keywords (Dutch)
EVENT_KEYWORDS = {
'founding': [
r'opgericht\s+(?:op|in)',
r'gesticht\s+(?:op|in)',
r'werd\s+opgericht',
r'is\s+opgericht',
r'ontstaan\s+in',
r'opgericht\s+op\s+\d',
],
'reopening': [
r'heropend\s+in',
r'heropening\s+in',
r'weer\s+geopend',
],
'merger': [
r'fusie\s+(?:van|tussen|met)',
r'fuseerde\s+met',
r'samengevoegd\s+met',
r'ging\s+(?:ook\s+)?(?:hier\s+)?in\s+op',
r'ging\s+op\s+in',
r'voortgekomen\s+uit\s+een\s+fusie',
r'ontstaan\s+uit\s+een\s+fusie',
],
'relocation': [
r'verhuisd\s+naar',
r'verplaatst\s+naar',
r'nieuwe\s+locatie',
r'betrok\s+(?:een\s+)?nieuw\s+pand',
r'gevestigd\s+(?:aan|in|op)',
],
'expansion': [
r'uitgebreid\s+(?:en|in)',
r'verbouwd\s+in',
r'nieuwbouw\s+in',
r'gemoderniseerd',
r'werd\s+(?:in\s+\d{4}\s+)?uitgebreid',
],
'name_change': [
r'hernoemd\s+(?:naar|tot)',
r'nieuwe\s+naam',
r'naam\s+gewijzigd',
r'naam\s+veranderd',
],
'dissolution': [
r'opgeheven\s+in',
r'gesloten\s+in',
r'opgegaan\s+in',
r'beëindigd\s+in',
],
'predecessor': [
r'voortzetting\s+van',
r'opvolger\s+van',
r'voortgezet\s+als',
],
'friends_org': [
r'(?:stichting\s+)?vrienden\s+van\s+.*\s+opgericht',
],
}
# Date extraction patterns
DATE_PATTERNS = [
# Full date: "30 april 2005" or "op 30 april 2005"
(r'(?:op\s+)?(\d{1,2})\s+(' + '|'.join(DUTCH_MONTHS.keys()) + r')\s+(\d{4})', 'full'),
# Full date: "30-4-2005" or "30/4/2005"
(r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', 'numeric'),
# Year with context: "in 1854", "sinds 2005", "vanaf 2006"
(r'(?:in|sinds|vanaf|anno|per)\s+(\d{4})', 'year'),
# Year in parentheses: "(2000)"
(r'\((\d{4})\)', 'year'),
# Approximate: "circa 1900"
(r'circa\s+(\d{4})', 'circa'),
# Year only after "werd" or "is": "werd in 1980"
(r'werd\s+in\s+(\d{4})', 'year'),
]
def parse_dutch_date(match: tuple, date_type: str) -> tuple[str, bool]:
"""
Parse a Dutch date match to ISO format.
Returns:
Tuple of (iso_date_string, is_approximate)
"""
if date_type == 'full':
day, month_name, year = match
month = DUTCH_MONTHS.get(month_name.lower(), '01')
return f"{year}-{month}-{int(day):02d}", False
elif date_type == 'numeric':
day, month, year = match
return f"{year}-{int(month):02d}-{int(day):02d}", False
elif date_type == 'year':
year = match[0] if isinstance(match, tuple) else match
return f"{year}", False
elif date_type == 'circa':
year = match[0] if isinstance(match, tuple) else match
return f"{year}", True
return "", False
def is_valid_year(date_str: str) -> bool:
"""Check if the year in a date string is >= MIN_YEAR."""
try:
year = int(date_str[:4])
return year >= MIN_YEAR
except (ValueError, IndexError):
return False
def extract_events_from_text(text: str, source_url: str | None = None) -> list[dict]:
"""
Extract historical events with dates from text.
Args:
text: Text to search for events
source_url: URL source for provenance
Returns:
List of event dictionaries
"""
events = []
text_lower = text.lower()
for event_type, patterns in EVENT_KEYWORDS.items():
for pattern in patterns:
matches = list(re.finditer(pattern, text_lower))
for match in matches:
# Get surrounding context (200 chars before and after)
start = max(0, match.start() - 200)
end = min(len(text), match.end() + 200)
context = text[start:end]
# Try to find a date in the context
for date_pattern, date_type in DATE_PATTERNS:
date_matches = re.findall(date_pattern, context.lower())
if date_matches:
date_match = date_matches[0]
iso_date, is_approx = parse_dutch_date(date_match, date_type)
# Filter out dates before MIN_YEAR (historical references)
if not iso_date or not is_valid_year(iso_date):
continue
# Extract description from original case text
orig_context = text[start:end]
# Clean up the description
desc_start = match.start() - start
desc_end = min(desc_start + 150, len(orig_context))
description = orig_context[desc_start:desc_end].strip()
# Clean up description
description = re.sub(r'\s+', ' ', description)
description = description.split('.')[0] # First sentence
if len(description) > 200:
description = description[:200] + '...'
event = {
'date': iso_date,
'type': event_type,
'description': description,
'approximate': is_approx,
}
if source_url:
event['source'] = source_url
events.append(event)
break # Only take first date match per event
return events
def deduplicate_events(events: list[dict]) -> list[dict]:
"""
Remove duplicate events based on date and type.
"""
seen = set()
unique = []
for event in events:
key = (event.get('date'), event.get('type'))
if key not in seen:
seen.add(key)
unique.append(event)
return unique
def extract_institution_name_from_query(query: str) -> str | None:
"""
Extract institution name from linkup query string.
The query format is typically: "Institution Name" city opgericht OR gesticht...
"""
# Try to extract quoted name first
match = re.search(r'"([^"]+)"', query)
if match:
return match.group(1)
# Fall back to first part before city name
return None
def is_source_relevant(source_name: str, source_url: str, institution_name: str | None) -> bool:
"""
Check if a source is relevant to the target institution.
Filters out sources about other institutions.
"""
if not institution_name:
return True # Can't filter without institution name
# Check if institution name appears in source name or URL
inst_lower = institution_name.lower()
# Extract key words from institution name (skip short words)
key_words = [w for w in inst_lower.split() if len(w) > 3]
source_lower = source_name.lower()
url_lower = source_url.lower()
# Check if any key word appears in source
for word in key_words:
if word in source_lower or word in url_lower:
return True
# Also allow generic sources like Wikipedia for the institution
if 'wikipedia' in url_lower:
# Check if it's about this institution
for word in key_words:
if word in source_lower:
return True
return False
def parse_linkup_json(json_path: Path, include_sources: bool = False) -> list[dict]:
"""
Parse a Linkup JSON file and extract events.
Args:
json_path: Path to linkup JSON file
include_sources: If True, also extract from source snippets (higher noise)
Returns:
List of event dictionaries
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
except (json.JSONDecodeError, FileNotFoundError) as e:
print(f" Warning: Could not parse {json_path}: {e}")
return []
events = []
api_response = data.get('api_response', {})
query = data.get('query', '')
institution_name = extract_institution_name_from_query(query)
# Extract from main answer (most reliable - about the target institution)
answer = api_response.get('answer', '')
if answer:
events.extend(extract_events_from_text(answer))
# Optionally extract from sources (higher noise, more events about other institutions)
if include_sources:
sources = api_response.get('sources', [])
for source in sources:
source_name = source.get('name', '')
source_url = source.get('url', '')
snippet = source.get('snippet', '')
# Only include sources relevant to the target institution
if snippet and is_source_relevant(source_name, source_url, institution_name):
source_events = extract_events_from_text(snippet, source_url)
events.extend(source_events)
return deduplicate_events(events)
def load_mapping(mapping_path: Path) -> dict[int, str]:
"""
Load entry number to GHCID mapping.
Returns:
Dict mapping entry number to GHCID (first occurrence wins for duplicates)
"""
mapping = {}
with open(mapping_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(' ', 1)
if len(parts) == 2:
try:
entry_num = int(parts[0])
ghcid = parts[1]
if entry_num not in mapping: # First occurrence wins
mapping[entry_num] = ghcid
except ValueError:
continue
return mapping
def update_yaml_timespan(yaml_path: Path, events: list[dict], dry_run: bool = False) -> bool:
"""
Update a custodian YAML file with new events.
Args:
yaml_path: Path to custodian YAML file
events: List of events to add
dry_run: If True, don't write changes
Returns:
True if file was updated
"""
if not yaml_path.exists():
print(f" Warning: YAML file not found: {yaml_path}")
return False
try:
with open(yaml_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except yaml.YAMLError as e:
print(f" Warning: Could not parse YAML {yaml_path}: {e}")
return False
if data is None:
data = {}
# Initialize timespan if not exists
if 'timespan' not in data:
data['timespan'] = {}
timespan = data['timespan']
# Initialize events array if not exists
if 'events' not in timespan:
timespan['events'] = []
# Get existing event keys (date, type) to avoid duplicates
existing_keys = {
(e.get('date'), e.get('type'))
for e in timespan['events']
}
# Add new events
new_count = 0
for event in events:
key = (event.get('date'), event.get('type'))
if key not in existing_keys:
# Clean event for YAML
clean_event = {
'date': event['date'],
'type': event['type'],
'description': event['description'],
}
if event.get('approximate'):
clean_event['approximate'] = True
if event.get('source'):
clean_event['source'] = event['source']
timespan['events'].append(clean_event)
existing_keys.add(key)
new_count += 1
# Sort events by date
timespan['events'].sort(key=lambda e: e.get('date', ''))
if new_count > 0 and not dry_run:
with open(yaml_path, 'w', encoding='utf-8') as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
return new_count > 0
def main():
"""Main entry point."""
import argparse
parser = argparse.ArgumentParser(description='Extract timeline events from Linkup JSON')
parser.add_argument('--dry-run', action='store_true', help='Do not write changes')
parser.add_argument('--limit', type=int, help='Limit number of entries to process')
parser.add_argument('--entry', type=int, help='Process specific entry number')
parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output')
parser.add_argument('--include-sources', action='store_true',
help='Also extract from source snippets (higher noise, more false positives)')
args = parser.parse_args()
# Paths
base_path = Path('/Users/kempersc/apps/glam')
mapping_path = base_path / 'data/custodian/web/_entry_to_ghcid.txt'
web_path = base_path / 'data/custodian/web'
custodian_path = base_path / 'data/custodian'
# Load mapping
print("Loading entry-to-GHCID mapping...")
mapping = load_mapping(mapping_path)
print(f" Loaded {len(mapping)} mappings")
# Process entries
processed = 0
updated = 0
total_events = 0
entries_to_process = [args.entry] if args.entry else sorted(mapping.keys())
if args.limit:
entries_to_process = entries_to_process[:args.limit]
print(f"\nProcessing {len(entries_to_process)} entries...")
for entry_num in entries_to_process:
ghcid = mapping.get(entry_num)
if not ghcid:
continue
# Find linkup JSON file
entry_dir = web_path / f"{entry_num:04d}" / 'linkup'
if not entry_dir.exists():
if args.verbose:
print(f" Skipping entry {entry_num}: no linkup directory")
continue
json_files = list(entry_dir.glob('linkup_founding_*.json'))
if not json_files:
if args.verbose:
print(f" Skipping entry {entry_num}: no linkup JSON files")
continue
# Process all JSON files for this entry
all_events = []
for json_file in json_files:
events = parse_linkup_json(json_file, include_sources=args.include_sources)
all_events.extend(events)
all_events = deduplicate_events(all_events)
if not all_events:
if args.verbose:
print(f" Entry {entry_num} ({ghcid}): no events extracted")
processed += 1
continue
# Update YAML file
yaml_file = custodian_path / f"{ghcid}.yaml"
if args.verbose or args.dry_run:
print(f"\n Entry {entry_num} ({ghcid}):")
for event in all_events:
approx = " (approx)" if event.get('approximate') else ""
print(f" - {event['date']}{approx} [{event['type']}]: {event['description'][:60]}...")
if update_yaml_timespan(yaml_file, all_events, dry_run=args.dry_run):
updated += 1
total_events += len(all_events)
if not args.verbose:
print(f" Updated {ghcid}: +{len(all_events)} events")
processed += 1
# Summary
print(f"\n{'=' * 60}")
print(f"Summary:")
print(f" Entries processed: {processed}")
print(f" YAML files updated: {updated}")
print(f" Total events added: {total_events}")
if args.dry_run:
print(" (DRY RUN - no files modified)")
if __name__ == '__main__':
main()