#!/usr/bin/env python3 """ Batch process all conversation JSON files to extract heritage institutions. This script: 1. Scans all conversation JSON files in the project root 2. Parses each conversation 3. Extracts institutions using the NLP extractor 4. Enriches location data with GeoNames geocoding 5. Deduplicates institutions across conversations 6. Exports to multiple formats (JSON-LD, CSV, SQLite) Usage: python scripts/batch_extract_institutions.py python scripts/batch_extract_institutions.py --limit 10 # Process first 10 files only python scripts/batch_extract_institutions.py --country BR # Filter by country """ import argparse import json import re import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path from typing import List, Dict, Set, Optional from dataclasses import dataclass # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root / "src")) from glam_extractor.parsers.conversation import ConversationParser from glam_extractor.extractors.nlp_extractor import InstitutionExtractor from glam_extractor.geocoding.geonames_lookup import GeoNamesDB from glam_extractor.models import HeritageCustodian, Location @dataclass class ExtractionStats: """Statistics for batch extraction""" files_processed: int = 0 files_skipped: int = 0 institutions_extracted: int = 0 institutions_deduplicated: int = 0 locations_geocoded: int = 0 errors: Optional[List[str]] = None def __post_init__(self): if self.errors is None: self.errors = [] def add_error(self, error: str): """Add an error to the list""" if self.errors is not None: self.errors.append(error) class BatchInstitutionExtractor: """ Batch processor for extracting institutions from conversation files. """ def __init__( self, conversation_dir: Path, output_dir: Path, use_geocoding: bool = True ): """ Initialize batch extractor. Args: conversation_dir: Directory containing conversation JSON files output_dir: Directory for output files use_geocoding: Whether to enrich locations with GeoNames data """ self.conversation_dir = conversation_dir self.output_dir = output_dir self.use_geocoding = use_geocoding self.conversation_parser = ConversationParser() self.extractor = InstitutionExtractor() self.geonames_db = GeoNamesDB() if use_geocoding else None self.stats = ExtractionStats() self.all_institutions: List[HeritageCustodian] = [] # Deduplication tracking self.seen_institutions: Dict[str, HeritageCustodian] = {} # Ensure output directory exists self.output_dir.mkdir(parents=True, exist_ok=True) def find_conversation_files( self, pattern: str = "*.json", exclude_patterns: Optional[List[str]] = None ) -> List[Path]: """ Find all conversation JSON files. Args: pattern: Glob pattern for finding files exclude_patterns: List of filename patterns to exclude Returns: List of conversation file paths """ if exclude_patterns is None: exclude_patterns = [ "package.json", "tsconfig.json", "schema.json", ] all_files = list(self.conversation_dir.glob(pattern)) # Filter out excluded files conversation_files = [ f for f in all_files if not any(excl in f.name for excl in exclude_patterns) ] return sorted(conversation_files) def process_file(self, file_path: Path) -> int: """ Process a single conversation file. Args: file_path: Path to conversation JSON file Returns: Number of institutions extracted """ try: print(f"Processing: {file_path.name}") # Parse conversation conversation = self.conversation_parser.parse_file(file_path) # Extract all text from conversation full_text = conversation.extract_all_text() if not full_text.strip(): print(f" ⚠ No text content found") self.stats.files_skipped += 1 return 0 # Extract institutions extract_result = self.extractor.extract_from_text( full_text, conversation_id=conversation.uuid ) if not extract_result.success: self.stats.add_error( f"{file_path.name}: Extraction error - {extract_result.error}" ) self.stats.files_skipped += 1 return 0 institutions = extract_result.value if not institutions: print(f" ℹ No institutions found") self.stats.files_processed += 1 return 0 # Enrich with geocoding if self.use_geocoding and self.geonames_db: institutions = self._enrich_with_geocoding(institutions) # Deduplicate and add to collection new_count = self._add_institutions(institutions) print(f" ✓ Extracted {len(institutions)} institutions ({new_count} new)") self.stats.files_processed += 1 self.stats.institutions_extracted += len(institutions) return new_count except Exception as e: error_msg = f"{file_path.name}: Unexpected error - {str(e)}" self.stats.add_error(error_msg) print(f" ✗ Error: {e}") self.stats.files_skipped += 1 return 0 def _enrich_with_geocoding( self, institutions: List[HeritageCustodian] ) -> List[HeritageCustodian]: """ Enrich institution locations with GeoNames data. Args: institutions: List of institutions to enrich Returns: Enriched institutions """ for institution in institutions: if not institution.locations: continue for location in institution.locations: # Skip if already has coordinates if location.latitude and location.longitude: continue # Skip if missing city or country if not location.city or not location.country: continue # Lookup in GeoNames if self.geonames_db is None: continue city_info = self.geonames_db.lookup_city( location.city, location.country ) if city_info: # Enrich location with geocoded data location.latitude = city_info.latitude location.longitude = city_info.longitude location.geonames_id = str(city_info.geonames_id) self.stats.locations_geocoded += 1 return institutions def _add_institutions( self, institutions: List[HeritageCustodian] ) -> int: """ Add institutions to collection with deduplication. Args: institutions: Institutions to add Returns: Number of new (non-duplicate) institutions added """ new_count = 0 for institution in institutions: # Create deduplication key (name + country) country = "UNKNOWN" if institution.locations and len(institution.locations) > 0: country = institution.locations[0].country or "UNKNOWN" dedup_key = f"{institution.name.lower()}:{country}" # Check if we've seen this institution before if dedup_key in self.seen_institutions: # Institution already exists - merge metadata if needed existing = self.seen_institutions[dedup_key] # Keep the one with higher confidence existing_conf = existing.provenance.confidence_score or 0.0 new_conf = institution.provenance.confidence_score or 0.0 if new_conf > existing_conf: self.seen_institutions[dedup_key] = institution # Replace in all_institutions list for i, inst in enumerate(self.all_institutions): if (inst.name.lower() == institution.name.lower() and inst.locations and institution.locations and inst.locations[0].country == institution.locations[0].country): self.all_institutions[i] = institution break self.stats.institutions_deduplicated += 1 else: # New institution self.seen_institutions[dedup_key] = institution self.all_institutions.append(institution) new_count += 1 return new_count def process_all( self, limit: Optional[int] = None, country_filter: Optional[str] = None ) -> ExtractionStats: """ Process all conversation files. Args: limit: Optional limit on number of files to process country_filter: Optional country code to filter conversations Returns: Extraction statistics """ # Find conversation files files = self.find_conversation_files() # Apply country filter if specified if country_filter: country_files = [ f for f in files if country_filter.lower() in f.name.lower() ] print(f"Filtering to {len(country_files)} files matching country: {country_filter}") files = country_files # Apply limit if specified if limit: files = files[:limit] print(f"Found {len(files)} conversation files to process") print("=" * 70) print() # Process each file for file_path in files: self.process_file(file_path) print() print("=" * 70) self.print_summary() return self.stats def print_summary(self): """Print extraction summary statistics""" print("Extraction Summary:") print(f" Files processed: {self.stats.files_processed}") print(f" Files skipped: {self.stats.files_skipped}") print(f" Institutions extracted: {self.stats.institutions_extracted}") print(f" Unique institutions: {len(self.all_institutions)}") print(f" Duplicates removed: {self.stats.institutions_deduplicated}") if self.use_geocoding: print(f" Locations geocoded: {self.stats.locations_geocoded}") if self.stats.errors: print(f" Errors: {len(self.stats.errors)}") print() print("Error details:") for error in self.stats.errors[:10]: # Show first 10 errors print(f" - {error}") if len(self.stats.errors) > 10: print(f" ... and {len(self.stats.errors) - 10} more errors") # Institution type distribution type_counts = defaultdict(int) for inst in self.all_institutions: type_counts[inst.institution_type] += 1 print() print("Institution Types:") for inst_type, count in sorted(type_counts.items(), key=lambda x: -x[1]): print(f" {inst_type}: {count}") # Country distribution country_counts = defaultdict(int) for inst in self.all_institutions: if inst.locations and len(inst.locations) > 0: country = inst.locations[0].country or "UNKNOWN" country_counts[country] += 1 else: country_counts["UNKNOWN"] += 1 print() print("Top 20 Countries:") for country, count in sorted(country_counts.items(), key=lambda x: -x[1])[:20]: print(f" {country}: {count}") def apply_quality_filters(self) -> int: """ Apply post-extraction quality filters to remove false positives. Filters: 1. Name length >= 5 characters (rejects fragments like "M Museum") 2. No markdown artifacts (**, __, ##, ``` indicate extraction errors) 3. No blacklisted AI/technical prefixes (AI-powered, AI-processed, etc.) 4. No generic/stopword names (museum, archive, libraries) 5. No sentence fragments (starts with "of ", "and ", etc. + short) 6. Confidence score >= 0.5 (rejects low-confidence extractions) 7. Valid ISO 3166-1 alpha-2 country codes only 8. Special "AI" country validation (requires valid ISIL, filters false positives) Returns: Number of institutions filtered out """ # Valid ISO 3166-1 alpha-2 country codes VALID_COUNTRY_CODES = { 'AD', 'AE', 'AF', 'AG', 'AI', 'AL', 'AM', 'AO', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AW', 'AX', 'AZ', 'BA', 'BB', 'BD', 'BE', 'BF', 'BG', 'BH', 'BI', 'BJ', 'BL', 'BM', 'BN', 'BO', 'BQ', 'BR', 'BS', 'BT', 'BV', 'BW', 'BY', 'BZ', 'CA', 'CC', 'CD', 'CF', 'CG', 'CH', 'CI', 'CK', 'CL', 'CM', 'CN', 'CO', 'CR', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DE', 'DJ', 'DK', 'DM', 'DO', 'DZ', 'EC', 'EE', 'EG', 'EH', 'ER', 'ES', 'ET', 'FI', 'FJ', 'FK', 'FM', 'FO', 'FR', 'GA', 'GB', 'GD', 'GE', 'GF', 'GG', 'GH', 'GI', 'GL', 'GM', 'GN', 'GP', 'GQ', 'GR', 'GS', 'GT', 'GU', 'GW', 'GY', 'HK', 'HM', 'HN', 'HR', 'HT', 'HU', 'ID', 'IE', 'IL', 'IM', 'IN', 'IO', 'IQ', 'IR', 'IS', 'IT', 'JE', 'JM', 'JO', 'JP', 'KE', 'KG', 'KH', 'KI', 'KM', 'KN', 'KP', 'KR', 'KW', 'KY', 'KZ', 'LA', 'LB', 'LC', 'LI', 'LK', 'LR', 'LS', 'LT', 'LU', 'LV', 'LY', 'MA', 'MC', 'MD', 'ME', 'MF', 'MG', 'MH', 'MK', 'ML', 'MM', 'MN', 'MO', 'MP', 'MQ', 'MR', 'MS', 'MT', 'MU', 'MV', 'MW', 'MX', 'MY', 'MZ', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NL', 'NO', 'NP', 'NR', 'NU', 'NZ', 'OM', 'PA', 'PE', 'PF', 'PG', 'PH', 'PK', 'PL', 'PM', 'PN', 'PR', 'PS', 'PT', 'PW', 'PY', 'QA', 'RE', 'RO', 'RS', 'RU', 'RW', 'SA', 'SB', 'SC', 'SD', 'SE', 'SG', 'SH', 'SI', 'SJ', 'SK', 'SL', 'SM', 'SN', 'SO', 'SR', 'SS', 'ST', 'SV', 'SX', 'SY', 'SZ', 'TC', 'TD', 'TF', 'TG', 'TH', 'TJ', 'TK', 'TL', 'TM', 'TN', 'TO', 'TR', 'TT', 'TV', 'TW', 'TZ', 'UA', 'UG', 'UM', 'US', 'UY', 'UZ', 'VA', 'VC', 'VE', 'VG', 'VI', 'VN', 'VU', 'WF', 'WS', 'YE', 'YT', 'ZA', 'ZM', 'ZW' } # Stopwords/generic names that indicate false positives GENERIC_NAMES = { 'museum', 'archive', 'archives', 'library', 'bibliotheek', 'which museum', 'many museum', 'some archive', 'the museum', 'phd thesis', 'museum pass', 'm museum', 'and archive', 'of archive', 'vision for', 'collaborations', 'partnerships', 'libraries, archives', 'access archives', # NEW: Dutch-specific generic patterns (Nov 2025) 'dutch museum', 'dutch archive', 'dutch library', 'dutch national archive', 'resistance museum', 'for museum', 'for archives', 'for library', 'latest museum', 'core museum', 'major museum', 'university museum', # Too generic without specific name } # ISIL blacklist terms that indicate AI/technical false positives ISIL_BLACKLIST_PREFIXES = { 'ai-powered', 'ai-processed', 'ai-driven', 'ai-based', 'ai-enhanced', 'ai-assisted', 'ai-generated', 'ai-gedreven', 'ai-aangedreven', 'ai-ondersteund' } # Markdown/formatting artifacts MARKDOWN_ARTIFACTS = ['**', '__', '##', '```'] # NEW: Fragment patterns indicating incomplete extraction (Nov 2025) FRAGMENT_PATTERNS = [ r'^(for|of|and|the|a|an)\s', # Starts with preposition/article r':\s*$', # Ends with colon (list item) r'^\(', # Starts with parenthesis r',\s+(archive|museum|library)$', # Ends with ", Archive" (list separator) r'^archivees\s', # Common typo pattern r'^library,\s', # "Library, Archive" fragments r'^galleries,?\s+(libraries|archives)', # GLAM acronym expansion fragments r'^corporate\s+(archives?|museum)$', # Generic category names r'^religious\s+(archives?|museum)$', r'^family\s+archives?$', r'^general\s+pattern:', # Meta-discussion text r'museum\s+connections?:', # Discussion metadata ] # NEW: Dutch-specific validation patterns (Nov 2025) DUTCH_GENERIC_PATTERNS = [ r'^dutch\s+(museum|archive|library)', r'^for\s+(museum|archive|library|archives?)$', r'^(museum|archive|library)\s+amsterdam$', # Too generic r'^major\s+.*(museum|archive|library)', r'^latest\s+(museum|archive)', r'^core\s+(museum|archive)', r'^\w+\s+museum\s+connections?:', # "Dutch Museum connections:" ] # NEW: Known wrong-country institutions (Nov 2025) WRONG_COUNTRY_INSTITUTIONS = { 'library of congress', # US institution 'linnaeus university', # Swedish institution 'hmml, library', # Unclear affiliation 'smithsonian', # US institution 'british library', # UK institution 'bibliothèque nationale de france', # France } # NEW: Known wrong-country institutions (Nov 2025) WRONG_COUNTRY_INSTITUTIONS = { 'library of congress', # US institution 'linnaeus university', # Swedish institution 'hmml, library', # Unclear affiliation 'smithsonian', # US institution 'british library', # UK institution 'bibliothèque nationale de france', # France } before_count = len(self.all_institutions) filtered = [] removed_reasons = defaultdict(int) for inst in self.all_institutions: # Filter 1: Name length if len(inst.name) < 5: removed_reasons['name_too_short'] += 1 continue # Filter 2: Markdown artifacts (indicates extraction error) if any(artifact in inst.name for artifact in MARKDOWN_ARTIFACTS): removed_reasons['markdown_artifact'] += 1 continue # Filter 3: Blacklisted ISIL prefix (AI-powered, etc.) name_lower = inst.name.lower() if any(name_lower.startswith(prefix) for prefix in ISIL_BLACKLIST_PREFIXES): removed_reasons['blacklisted_ai_prefix'] += 1 continue # Filter 4: Generic/stopword names if name_lower in GENERIC_NAMES: removed_reasons['generic_name'] += 1 continue # Check if name is mostly a stopword if any(generic in name_lower and len(generic) / len(name_lower) > 0.7 for generic in GENERIC_NAMES): removed_reasons['mostly_generic'] += 1 continue # Filter 5: Enhanced fragment patterns (Nov 2025) is_fragment = False for pattern in FRAGMENT_PATTERNS: if re.search(pattern, name_lower): removed_reasons['sentence_fragment'] += 1 is_fragment = True break if is_fragment: continue # Filter 5b: Dutch-specific generic patterns (Nov 2025) if inst.locations and inst.locations[0].country == 'NL': is_dutch_generic = False for pattern in DUTCH_GENERIC_PATTERNS: if re.search(pattern, name_lower): removed_reasons['dutch_generic_pattern'] += 1 is_dutch_generic = True break if is_dutch_generic: continue # Filter 5c: NL institutions MUST have city (Nov 2025) if not inst.locations[0].city: removed_reasons['nl_missing_city'] += 1 continue # Filter 5d: Reject single-word names for NL (Nov 2025) if len(inst.name.split()) < 2: removed_reasons['nl_single_word_name'] += 1 continue # Filter 5e: Known wrong-country institutions (Nov 2025) if inst.locations and inst.locations[0].country: country = inst.locations[0].country if name_lower in WRONG_COUNTRY_INSTITUTIONS and country == 'NL': removed_reasons['wrong_country_misclassified'] += 1 continue # Filter 6: Confidence score confidence = inst.provenance.confidence_score or 0.0 if confidence < 0.5: removed_reasons['low_confidence'] += 1 continue # Filter 7: Valid country codes if inst.locations and len(inst.locations) > 0: country = inst.locations[0].country # Reject if country is full name instead of code if country and len(country) > 2: removed_reasons['country_name_not_code'] += 1 continue # Special case: "AI" country with no valid ISIL suggests false positive # Anguilla (AI) institutions should have proper ISIL codes if country == 'AI': has_valid_isil = any( ident.identifier_scheme == 'ISIL' and ident.identifier_value.startswith('AI-') and not any(ident.identifier_value.lower().startswith(prefix) for prefix in ISIL_BLACKLIST_PREFIXES) for ident in (inst.identifiers or []) ) if not has_valid_isil: removed_reasons['ai_country_no_valid_isil'] += 1 continue # Reject if invalid country code (except UNKNOWN/None which we keep) if country and country != 'UNKNOWN' and country not in VALID_COUNTRY_CODES: removed_reasons['invalid_country_code'] += 1 continue # Passed all filters filtered.append(inst) # Update institutions list self.all_institutions = filtered removed_count = before_count - len(filtered) # Print filter summary if removed_count > 0: print() print("=" * 70) print("Quality Filter Summary:") print(f" Before filtering: {before_count} institutions") print(f" After filtering: {len(filtered)} institutions") print(f" Removed: {removed_count} institutions ({removed_count/before_count*100:.1f}%)") print() print("Removal reasons:") for reason, count in sorted(removed_reasons.items(), key=lambda x: -x[1]): print(f" {reason}: {count}") print("=" * 70) return removed_count def export_json(self, output_path: Optional[Path] = None): """ Export institutions to JSON file. Args: output_path: Output file path (defaults to output_dir/institutions.json) """ if output_path is None: output_path = self.output_dir / "institutions.json" print() print(f"Exporting to JSON: {output_path}") # Convert to dictionaries data = { "metadata": { "extraction_date": datetime.now(timezone.utc).isoformat(), "total_institutions": len(self.all_institutions), "files_processed": self.stats.files_processed, "data_tier": "TIER_4_INFERRED", }, "institutions": [inst.dict() for inst in self.all_institutions] } with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False, default=str) print(f" ✓ Exported {len(self.all_institutions)} institutions") def export_csv(self, output_path: Optional[Path] = None): """ Export institutions to CSV file. Args: output_path: Output file path (defaults to output_dir/institutions.csv) """ import csv if output_path is None: output_path = self.output_dir / "institutions.csv" print(f"Exporting to CSV: {output_path}") with open(output_path, 'w', encoding='utf-8', newline='') as f: writer = csv.writer(f) # Write header writer.writerow([ "name", "institution_type", "status", "city", "country", "latitude", "longitude", "geonames_id", "identifiers", "confidence_score", "data_source", ]) # Write rows for inst in self.all_institutions: # Extract location info city = "" country = "" lat = "" lon = "" geonames_id = "" if inst.locations and len(inst.locations) > 0: loc = inst.locations[0] city = loc.city or "" country = loc.country or "" lat = str(loc.latitude) if loc.latitude else "" lon = str(loc.longitude) if loc.longitude else "" geonames_id = loc.geonames_id or "" # Format identifiers identifiers = [] if inst.identifiers: identifiers = [ f"{id.identifier_scheme}:{id.identifier_value}" for id in inst.identifiers ] identifiers_str = "; ".join(identifiers) writer.writerow([ inst.name, inst.institution_type, inst.organization_status, city, country, lat, lon, geonames_id, identifiers_str, inst.provenance.confidence_score, inst.provenance.data_source, ]) print(f" ✓ Exported {len(self.all_institutions)} institutions") def main(): """Main entry point""" parser = argparse.ArgumentParser( description="Batch extract institutions from conversation files" ) parser.add_argument( "--conversation-dir", type=Path, default=project_root, help="Directory containing conversation JSON files" ) parser.add_argument( "--limit", type=int, help="Limit number of files to process" ) parser.add_argument( "--country", type=str, help="Filter to conversations about a specific country" ) parser.add_argument( "--no-geocoding", action="store_true", help="Disable GeoNames geocoding enrichment" ) parser.add_argument( "--output-dir", type=Path, default=Path("output"), help="Output directory for results" ) args = parser.parse_args() # Initialize batch extractor batch_extractor = BatchInstitutionExtractor( conversation_dir=args.conversation_dir, output_dir=args.output_dir, use_geocoding=not args.no_geocoding ) # Process all files stats = batch_extractor.process_all( limit=args.limit, country_filter=args.country ) # Export results if batch_extractor.all_institutions: # Apply quality filters before export batch_extractor.apply_quality_filters() batch_extractor.export_json() batch_extractor.export_csv() print() print("=" * 70) print("✓ Batch extraction complete!") print(f" Results saved to: {args.output_dir}") else: print() print("⚠ No institutions extracted") if __name__ == "__main__": main()