#!/usr/bin/env python3 """ Extract timeline events from Linkup API JSON data and update custodian YAML files. This script: 1. Parses the entry-to-GHCID mapping file 2. Extracts historical events from Linkup JSON files (answer text only by default) 3. Updates custodian YAML files with events array 4. Filters events by relevance to target institution and date validity Event types extracted: - founding: opgericht, gesticht, ontstaan - reopening: heropend, heropening - merger: fusie, fuseerde, samengevoegd, ging op in - relocation: verhuisd, verplaatst, nieuwe locatie, betrok nieuw pand - expansion: uitgebreid, verbouwd, nieuwbouw, gemoderniseerd - name_change: hernoemd, nieuwe naam - dissolution: opgeheven, gesloten - predecessor: voortzetting van, opvolger van - friends_org: vrienden van, stichting vrienden Filtering: - Only dates >= 1800 (filters out historical references) - Only events from main answer text (most reliable) - Uses institution name from query to filter relevant sources """ import json import re import sys from datetime import datetime from pathlib import Path from typing import Any import yaml # Minimum year for institutional events (filters out historical references like "1648 landrecht") MIN_YEAR = 1800 # Dutch month names to numbers DUTCH_MONTHS = { 'januari': '01', 'februari': '02', 'maart': '03', 'april': '04', 'mei': '05', 'juni': '06', 'juli': '07', 'augustus': '08', 'september': '09', 'oktober': '10', 'november': '11', 'december': '12' } # Event type keywords (Dutch) EVENT_KEYWORDS = { 'founding': [ r'opgericht\s+(?:op|in)', r'gesticht\s+(?:op|in)', r'werd\s+opgericht', r'is\s+opgericht', r'ontstaan\s+in', r'opgericht\s+op\s+\d', ], 'reopening': [ r'heropend\s+in', r'heropening\s+in', r'weer\s+geopend', ], 'merger': [ r'fusie\s+(?:van|tussen|met)', r'fuseerde\s+met', r'samengevoegd\s+met', r'ging\s+(?:ook\s+)?(?:hier\s+)?in\s+op', r'ging\s+op\s+in', r'voortgekomen\s+uit\s+een\s+fusie', r'ontstaan\s+uit\s+een\s+fusie', ], 'relocation': [ r'verhuisd\s+naar', r'verplaatst\s+naar', r'nieuwe\s+locatie', r'betrok\s+(?:een\s+)?nieuw\s+pand', r'gevestigd\s+(?:aan|in|op)', ], 'expansion': [ r'uitgebreid\s+(?:en|in)', r'verbouwd\s+in', r'nieuwbouw\s+in', r'gemoderniseerd', r'werd\s+(?:in\s+\d{4}\s+)?uitgebreid', ], 'name_change': [ r'hernoemd\s+(?:naar|tot)', r'nieuwe\s+naam', r'naam\s+gewijzigd', r'naam\s+veranderd', ], 'dissolution': [ r'opgeheven\s+in', r'gesloten\s+in', r'opgegaan\s+in', r'beƫindigd\s+in', ], 'predecessor': [ r'voortzetting\s+van', r'opvolger\s+van', r'voortgezet\s+als', ], 'friends_org': [ r'(?:stichting\s+)?vrienden\s+van\s+.*\s+opgericht', ], } # Date extraction patterns DATE_PATTERNS = [ # Full date: "30 april 2005" or "op 30 april 2005" (r'(?:op\s+)?(\d{1,2})\s+(' + '|'.join(DUTCH_MONTHS.keys()) + r')\s+(\d{4})', 'full'), # Full date: "30-4-2005" or "30/4/2005" (r'(\d{1,2})[-/](\d{1,2})[-/](\d{4})', 'numeric'), # Year with context: "in 1854", "sinds 2005", "vanaf 2006" (r'(?:in|sinds|vanaf|anno|per)\s+(\d{4})', 'year'), # Year in parentheses: "(2000)" (r'\((\d{4})\)', 'year'), # Approximate: "circa 1900" (r'circa\s+(\d{4})', 'circa'), # Year only after "werd" or "is": "werd in 1980" (r'werd\s+in\s+(\d{4})', 'year'), ] def parse_dutch_date(match: tuple, date_type: str) -> tuple[str, bool]: """ Parse a Dutch date match to ISO format. Returns: Tuple of (iso_date_string, is_approximate) """ if date_type == 'full': day, month_name, year = match month = DUTCH_MONTHS.get(month_name.lower(), '01') return f"{year}-{month}-{int(day):02d}", False elif date_type == 'numeric': day, month, year = match return f"{year}-{int(month):02d}-{int(day):02d}", False elif date_type == 'year': year = match[0] if isinstance(match, tuple) else match return f"{year}", False elif date_type == 'circa': year = match[0] if isinstance(match, tuple) else match return f"{year}", True return "", False def is_valid_year(date_str: str) -> bool: """Check if the year in a date string is >= MIN_YEAR.""" try: year = int(date_str[:4]) return year >= MIN_YEAR except (ValueError, IndexError): return False def extract_events_from_text(text: str, source_url: str | None = None) -> list[dict]: """ Extract historical events with dates from text. Args: text: Text to search for events source_url: URL source for provenance Returns: List of event dictionaries """ events = [] text_lower = text.lower() for event_type, patterns in EVENT_KEYWORDS.items(): for pattern in patterns: matches = list(re.finditer(pattern, text_lower)) for match in matches: # Get surrounding context (200 chars before and after) start = max(0, match.start() - 200) end = min(len(text), match.end() + 200) context = text[start:end] # Try to find a date in the context for date_pattern, date_type in DATE_PATTERNS: date_matches = re.findall(date_pattern, context.lower()) if date_matches: date_match = date_matches[0] iso_date, is_approx = parse_dutch_date(date_match, date_type) # Filter out dates before MIN_YEAR (historical references) if not iso_date or not is_valid_year(iso_date): continue # Extract description from original case text orig_context = text[start:end] # Clean up the description desc_start = match.start() - start desc_end = min(desc_start + 150, len(orig_context)) description = orig_context[desc_start:desc_end].strip() # Clean up description description = re.sub(r'\s+', ' ', description) description = description.split('.')[0] # First sentence if len(description) > 200: description = description[:200] + '...' event = { 'date': iso_date, 'type': event_type, 'description': description, 'approximate': is_approx, } if source_url: event['source'] = source_url events.append(event) break # Only take first date match per event return events def deduplicate_events(events: list[dict]) -> list[dict]: """ Remove duplicate events based on date and type. """ seen = set() unique = [] for event in events: key = (event.get('date'), event.get('type')) if key not in seen: seen.add(key) unique.append(event) return unique def extract_institution_name_from_query(query: str) -> str | None: """ Extract institution name from linkup query string. The query format is typically: "Institution Name" city opgericht OR gesticht... """ # Try to extract quoted name first match = re.search(r'"([^"]+)"', query) if match: return match.group(1) # Fall back to first part before city name return None def is_source_relevant(source_name: str, source_url: str, institution_name: str | None) -> bool: """ Check if a source is relevant to the target institution. Filters out sources about other institutions. """ if not institution_name: return True # Can't filter without institution name # Check if institution name appears in source name or URL inst_lower = institution_name.lower() # Extract key words from institution name (skip short words) key_words = [w for w in inst_lower.split() if len(w) > 3] source_lower = source_name.lower() url_lower = source_url.lower() # Check if any key word appears in source for word in key_words: if word in source_lower or word in url_lower: return True # Also allow generic sources like Wikipedia for the institution if 'wikipedia' in url_lower: # Check if it's about this institution for word in key_words: if word in source_lower: return True return False def parse_linkup_json(json_path: Path, include_sources: bool = False) -> list[dict]: """ Parse a Linkup JSON file and extract events. Args: json_path: Path to linkup JSON file include_sources: If True, also extract from source snippets (higher noise) Returns: List of event dictionaries """ try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) except (json.JSONDecodeError, FileNotFoundError) as e: print(f" Warning: Could not parse {json_path}: {e}") return [] events = [] api_response = data.get('api_response', {}) query = data.get('query', '') institution_name = extract_institution_name_from_query(query) # Extract from main answer (most reliable - about the target institution) answer = api_response.get('answer', '') if answer: events.extend(extract_events_from_text(answer)) # Optionally extract from sources (higher noise, more events about other institutions) if include_sources: sources = api_response.get('sources', []) for source in sources: source_name = source.get('name', '') source_url = source.get('url', '') snippet = source.get('snippet', '') # Only include sources relevant to the target institution if snippet and is_source_relevant(source_name, source_url, institution_name): source_events = extract_events_from_text(snippet, source_url) events.extend(source_events) return deduplicate_events(events) def load_mapping(mapping_path: Path) -> dict[int, str]: """ Load entry number to GHCID mapping. Returns: Dict mapping entry number to GHCID (first occurrence wins for duplicates) """ mapping = {} with open(mapping_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if not line: continue parts = line.split(' ', 1) if len(parts) == 2: try: entry_num = int(parts[0]) ghcid = parts[1] if entry_num not in mapping: # First occurrence wins mapping[entry_num] = ghcid except ValueError: continue return mapping def update_yaml_timespan(yaml_path: Path, events: list[dict], dry_run: bool = False) -> bool: """ Update a custodian YAML file with new events. Args: yaml_path: Path to custodian YAML file events: List of events to add dry_run: If True, don't write changes Returns: True if file was updated """ if not yaml_path.exists(): print(f" Warning: YAML file not found: {yaml_path}") return False try: with open(yaml_path, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) except yaml.YAMLError as e: print(f" Warning: Could not parse YAML {yaml_path}: {e}") return False if data is None: data = {} # Initialize timespan if not exists if 'timespan' not in data: data['timespan'] = {} timespan = data['timespan'] # Initialize events array if not exists if 'events' not in timespan: timespan['events'] = [] # Get existing event keys (date, type) to avoid duplicates existing_keys = { (e.get('date'), e.get('type')) for e in timespan['events'] } # Add new events new_count = 0 for event in events: key = (event.get('date'), event.get('type')) if key not in existing_keys: # Clean event for YAML clean_event = { 'date': event['date'], 'type': event['type'], 'description': event['description'], } if event.get('approximate'): clean_event['approximate'] = True if event.get('source'): clean_event['source'] = event['source'] timespan['events'].append(clean_event) existing_keys.add(key) new_count += 1 # Sort events by date timespan['events'].sort(key=lambda e: e.get('date', '')) if new_count > 0 and not dry_run: with open(yaml_path, 'w', encoding='utf-8') as f: yaml.dump(data, f, default_flow_style=False, allow_unicode=True, sort_keys=False) return new_count > 0 def main(): """Main entry point.""" import argparse parser = argparse.ArgumentParser(description='Extract timeline events from Linkup JSON') parser.add_argument('--dry-run', action='store_true', help='Do not write changes') parser.add_argument('--limit', type=int, help='Limit number of entries to process') parser.add_argument('--entry', type=int, help='Process specific entry number') parser.add_argument('--verbose', '-v', action='store_true', help='Verbose output') parser.add_argument('--include-sources', action='store_true', help='Also extract from source snippets (higher noise, more false positives)') args = parser.parse_args() # Paths base_path = Path('/Users/kempersc/apps/glam') mapping_path = base_path / 'data/custodian/web/_entry_to_ghcid.txt' web_path = base_path / 'data/custodian/web' custodian_path = base_path / 'data/custodian' # Load mapping print("Loading entry-to-GHCID mapping...") mapping = load_mapping(mapping_path) print(f" Loaded {len(mapping)} mappings") # Process entries processed = 0 updated = 0 total_events = 0 entries_to_process = [args.entry] if args.entry else sorted(mapping.keys()) if args.limit: entries_to_process = entries_to_process[:args.limit] print(f"\nProcessing {len(entries_to_process)} entries...") for entry_num in entries_to_process: ghcid = mapping.get(entry_num) if not ghcid: continue # Find linkup JSON file entry_dir = web_path / f"{entry_num:04d}" / 'linkup' if not entry_dir.exists(): if args.verbose: print(f" Skipping entry {entry_num}: no linkup directory") continue json_files = list(entry_dir.glob('linkup_founding_*.json')) if not json_files: if args.verbose: print(f" Skipping entry {entry_num}: no linkup JSON files") continue # Process all JSON files for this entry all_events = [] for json_file in json_files: events = parse_linkup_json(json_file, include_sources=args.include_sources) all_events.extend(events) all_events = deduplicate_events(all_events) if not all_events: if args.verbose: print(f" Entry {entry_num} ({ghcid}): no events extracted") processed += 1 continue # Update YAML file yaml_file = custodian_path / f"{ghcid}.yaml" if args.verbose or args.dry_run: print(f"\n Entry {entry_num} ({ghcid}):") for event in all_events: approx = " (approx)" if event.get('approximate') else "" print(f" - {event['date']}{approx} [{event['type']}]: {event['description'][:60]}...") if update_yaml_timespan(yaml_file, all_events, dry_run=args.dry_run): updated += 1 total_events += len(all_events) if not args.verbose: print(f" Updated {ghcid}: +{len(all_events)} events") processed += 1 # Summary print(f"\n{'=' * 60}") print(f"Summary:") print(f" Entries processed: {processed}") print(f" YAML files updated: {updated}") print(f" Total events added: {total_events}") if args.dry_run: print(" (DRY RUN - no files modified)") if __name__ == '__main__': main()