glam/scripts/batch_extract_institutions.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

789 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
Batch process all conversation JSON files to extract heritage institutions.
This script:
1. Scans all conversation JSON files in the project root
2. Parses each conversation
3. Extracts institutions using the NLP extractor
4. Enriches location data with GeoNames geocoding
5. Deduplicates institutions across conversations
6. Exports to multiple formats (JSON-LD, CSV, SQLite)
Usage:
python scripts/batch_extract_institutions.py
python scripts/batch_extract_institutions.py --limit 10 # Process first 10 files only
python scripts/batch_extract_institutions.py --country BR # Filter by country
"""
import argparse
import json
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Set, Optional
from dataclasses import dataclass
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root / "src"))
from glam_extractor.parsers.conversation import ConversationParser
from glam_extractor.extractors.nlp_extractor import InstitutionExtractor
from glam_extractor.geocoding.geonames_lookup import GeoNamesDB
from glam_extractor.models import HeritageCustodian, Location
@dataclass
class ExtractionStats:
"""Statistics for batch extraction"""
files_processed: int = 0
files_skipped: int = 0
institutions_extracted: int = 0
institutions_deduplicated: int = 0
locations_geocoded: int = 0
errors: Optional[List[str]] = None
def __post_init__(self):
if self.errors is None:
self.errors = []
def add_error(self, error: str):
"""Add an error to the list"""
if self.errors is not None:
self.errors.append(error)
class BatchInstitutionExtractor:
"""
Batch processor for extracting institutions from conversation files.
"""
def __init__(
self,
conversation_dir: Path,
output_dir: Path,
use_geocoding: bool = True
):
"""
Initialize batch extractor.
Args:
conversation_dir: Directory containing conversation JSON files
output_dir: Directory for output files
use_geocoding: Whether to enrich locations with GeoNames data
"""
self.conversation_dir = conversation_dir
self.output_dir = output_dir
self.use_geocoding = use_geocoding
self.conversation_parser = ConversationParser()
self.extractor = InstitutionExtractor()
self.geonames_db = GeoNamesDB() if use_geocoding else None
self.stats = ExtractionStats()
self.all_institutions: List[HeritageCustodian] = []
# Deduplication tracking
self.seen_institutions: Dict[str, HeritageCustodian] = {}
# Ensure output directory exists
self.output_dir.mkdir(parents=True, exist_ok=True)
def find_conversation_files(
self,
pattern: str = "*.json",
exclude_patterns: Optional[List[str]] = None
) -> List[Path]:
"""
Find all conversation JSON files.
Args:
pattern: Glob pattern for finding files
exclude_patterns: List of filename patterns to exclude
Returns:
List of conversation file paths
"""
if exclude_patterns is None:
exclude_patterns = [
"package.json",
"tsconfig.json",
"schema.json",
]
all_files = list(self.conversation_dir.glob(pattern))
# Filter out excluded files
conversation_files = [
f for f in all_files
if not any(excl in f.name for excl in exclude_patterns)
]
return sorted(conversation_files)
def process_file(self, file_path: Path) -> int:
"""
Process a single conversation file.
Args:
file_path: Path to conversation JSON file
Returns:
Number of institutions extracted
"""
try:
print(f"Processing: {file_path.name}")
# Parse conversation
conversation = self.conversation_parser.parse_file(file_path)
# Extract all text from conversation
full_text = conversation.extract_all_text()
if not full_text.strip():
print(f" ⚠ No text content found")
self.stats.files_skipped += 1
return 0
# Extract institutions
extract_result = self.extractor.extract_from_text(
full_text,
conversation_id=conversation.uuid
)
if not extract_result.success:
self.stats.add_error(
f"{file_path.name}: Extraction error - {extract_result.error}"
)
self.stats.files_skipped += 1
return 0
institutions = extract_result.value
if not institutions:
print(f" No institutions found")
self.stats.files_processed += 1
return 0
# Enrich with geocoding
if self.use_geocoding and self.geonames_db:
institutions = self._enrich_with_geocoding(institutions)
# Deduplicate and add to collection
new_count = self._add_institutions(institutions)
print(f" ✓ Extracted {len(institutions)} institutions ({new_count} new)")
self.stats.files_processed += 1
self.stats.institutions_extracted += len(institutions)
return new_count
except Exception as e:
error_msg = f"{file_path.name}: Unexpected error - {str(e)}"
self.stats.add_error(error_msg)
print(f" ✗ Error: {e}")
self.stats.files_skipped += 1
return 0
def _enrich_with_geocoding(
self,
institutions: List[HeritageCustodian]
) -> List[HeritageCustodian]:
"""
Enrich institution locations with GeoNames data.
Args:
institutions: List of institutions to enrich
Returns:
Enriched institutions
"""
for institution in institutions:
if not institution.locations:
continue
for location in institution.locations:
# Skip if already has coordinates
if location.latitude and location.longitude:
continue
# Skip if missing city or country
if not location.city or not location.country:
continue
# Lookup in GeoNames
if self.geonames_db is None:
continue
city_info = self.geonames_db.lookup_city(
location.city,
location.country
)
if city_info:
# Enrich location with geocoded data
location.latitude = city_info.latitude
location.longitude = city_info.longitude
location.geonames_id = str(city_info.geonames_id)
self.stats.locations_geocoded += 1
return institutions
def _add_institutions(
self,
institutions: List[HeritageCustodian]
) -> int:
"""
Add institutions to collection with deduplication.
Args:
institutions: Institutions to add
Returns:
Number of new (non-duplicate) institutions added
"""
new_count = 0
for institution in institutions:
# Create deduplication key (name + country)
country = "UNKNOWN"
if institution.locations and len(institution.locations) > 0:
country = institution.locations[0].country or "UNKNOWN"
dedup_key = f"{institution.name.lower()}:{country}"
# Check if we've seen this institution before
if dedup_key in self.seen_institutions:
# Institution already exists - merge metadata if needed
existing = self.seen_institutions[dedup_key]
# Keep the one with higher confidence
existing_conf = existing.provenance.confidence_score or 0.0
new_conf = institution.provenance.confidence_score or 0.0
if new_conf > existing_conf:
self.seen_institutions[dedup_key] = institution
# Replace in all_institutions list
for i, inst in enumerate(self.all_institutions):
if (inst.name.lower() == institution.name.lower() and
inst.locations and institution.locations and
inst.locations[0].country == institution.locations[0].country):
self.all_institutions[i] = institution
break
self.stats.institutions_deduplicated += 1
else:
# New institution
self.seen_institutions[dedup_key] = institution
self.all_institutions.append(institution)
new_count += 1
return new_count
def process_all(
self,
limit: Optional[int] = None,
country_filter: Optional[str] = None
) -> ExtractionStats:
"""
Process all conversation files.
Args:
limit: Optional limit on number of files to process
country_filter: Optional country code to filter conversations
Returns:
Extraction statistics
"""
# Find conversation files
files = self.find_conversation_files()
# Apply country filter if specified
if country_filter:
country_files = [
f for f in files
if country_filter.lower() in f.name.lower()
]
print(f"Filtering to {len(country_files)} files matching country: {country_filter}")
files = country_files
# Apply limit if specified
if limit:
files = files[:limit]
print(f"Found {len(files)} conversation files to process")
print("=" * 70)
print()
# Process each file
for file_path in files:
self.process_file(file_path)
print()
print("=" * 70)
self.print_summary()
return self.stats
def print_summary(self):
"""Print extraction summary statistics"""
print("Extraction Summary:")
print(f" Files processed: {self.stats.files_processed}")
print(f" Files skipped: {self.stats.files_skipped}")
print(f" Institutions extracted: {self.stats.institutions_extracted}")
print(f" Unique institutions: {len(self.all_institutions)}")
print(f" Duplicates removed: {self.stats.institutions_deduplicated}")
if self.use_geocoding:
print(f" Locations geocoded: {self.stats.locations_geocoded}")
if self.stats.errors:
print(f" Errors: {len(self.stats.errors)}")
print()
print("Error details:")
for error in self.stats.errors[:10]: # Show first 10 errors
print(f" - {error}")
if len(self.stats.errors) > 10:
print(f" ... and {len(self.stats.errors) - 10} more errors")
# Institution type distribution
type_counts = defaultdict(int)
for inst in self.all_institutions:
type_counts[inst.institution_type] += 1
print()
print("Institution Types:")
for inst_type, count in sorted(type_counts.items(), key=lambda x: -x[1]):
print(f" {inst_type}: {count}")
# Country distribution
country_counts = defaultdict(int)
for inst in self.all_institutions:
if inst.locations and len(inst.locations) > 0:
country = inst.locations[0].country or "UNKNOWN"
country_counts[country] += 1
else:
country_counts["UNKNOWN"] += 1
print()
print("Top 20 Countries:")
for country, count in sorted(country_counts.items(), key=lambda x: -x[1])[:20]:
print(f" {country}: {count}")
def apply_quality_filters(self) -> int:
"""
Apply post-extraction quality filters to remove false positives.
Filters:
1. Name length >= 5 characters (rejects fragments like "M Museum")
2. No markdown artifacts (**, __, ##, ``` indicate extraction errors)
3. No blacklisted AI/technical prefixes (AI-powered, AI-processed, etc.)
4. No generic/stopword names (museum, archive, libraries)
5. No sentence fragments (starts with "of ", "and ", etc. + short)
6. Confidence score >= 0.5 (rejects low-confidence extractions)
7. Valid ISO 3166-1 alpha-2 country codes only
8. Special "AI" country validation (requires valid ISIL, filters false positives)
Returns:
Number of institutions filtered out
"""
# Valid ISO 3166-1 alpha-2 country codes
VALID_COUNTRY_CODES = {
'AD', 'AE', 'AF', 'AG', 'AI', 'AL', 'AM', 'AO', 'AQ', 'AR', 'AS', 'AT',
'AU', 'AW', 'AX', 'AZ', 'BA', 'BB', 'BD', 'BE', 'BF', 'BG', 'BH', 'BI',
'BJ', 'BL', 'BM', 'BN', 'BO', 'BQ', 'BR', 'BS', 'BT', 'BV', 'BW', 'BY',
'BZ', 'CA', 'CC', 'CD', 'CF', 'CG', 'CH', 'CI', 'CK', 'CL', 'CM', 'CN',
'CO', 'CR', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DE', 'DJ', 'DK', 'DM',
'DO', 'DZ', 'EC', 'EE', 'EG', 'EH', 'ER', 'ES', 'ET', 'FI', 'FJ', 'FK',
'FM', 'FO', 'FR', 'GA', 'GB', 'GD', 'GE', 'GF', 'GG', 'GH', 'GI', 'GL',
'GM', 'GN', 'GP', 'GQ', 'GR', 'GS', 'GT', 'GU', 'GW', 'GY', 'HK', 'HM',
'HN', 'HR', 'HT', 'HU', 'ID', 'IE', 'IL', 'IM', 'IN', 'IO', 'IQ', 'IR',
'IS', 'IT', 'JE', 'JM', 'JO', 'JP', 'KE', 'KG', 'KH', 'KI', 'KM', 'KN',
'KP', 'KR', 'KW', 'KY', 'KZ', 'LA', 'LB', 'LC', 'LI', 'LK', 'LR', 'LS',
'LT', 'LU', 'LV', 'LY', 'MA', 'MC', 'MD', 'ME', 'MF', 'MG', 'MH', 'MK',
'ML', 'MM', 'MN', 'MO', 'MP', 'MQ', 'MR', 'MS', 'MT', 'MU', 'MV', 'MW',
'MX', 'MY', 'MZ', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NL', 'NO', 'NP',
'NR', 'NU', 'NZ', 'OM', 'PA', 'PE', 'PF', 'PG', 'PH', 'PK', 'PL', 'PM',
'PN', 'PR', 'PS', 'PT', 'PW', 'PY', 'QA', 'RE', 'RO', 'RS', 'RU', 'RW',
'SA', 'SB', 'SC', 'SD', 'SE', 'SG', 'SH', 'SI', 'SJ', 'SK', 'SL', 'SM',
'SN', 'SO', 'SR', 'SS', 'ST', 'SV', 'SX', 'SY', 'SZ', 'TC', 'TD', 'TF',
'TG', 'TH', 'TJ', 'TK', 'TL', 'TM', 'TN', 'TO', 'TR', 'TT', 'TV', 'TW',
'TZ', 'UA', 'UG', 'UM', 'US', 'UY', 'UZ', 'VA', 'VC', 'VE', 'VG', 'VI',
'VN', 'VU', 'WF', 'WS', 'YE', 'YT', 'ZA', 'ZM', 'ZW'
}
# Stopwords/generic names that indicate false positives
GENERIC_NAMES = {
'museum', 'archive', 'archives', 'library', 'bibliotheek',
'which museum', 'many museum', 'some archive', 'the museum',
'phd thesis', 'museum pass', 'm museum', 'and archive',
'of archive', 'vision for', 'collaborations', 'partnerships',
'libraries, archives', 'access archives',
# NEW: Dutch-specific generic patterns (Nov 2025)
'dutch museum', 'dutch archive', 'dutch library',
'dutch national archive', 'resistance museum',
'for museum', 'for archives', 'for library',
'latest museum', 'core museum', 'major museum',
'university museum', # Too generic without specific name
}
# ISIL blacklist terms that indicate AI/technical false positives
ISIL_BLACKLIST_PREFIXES = {
'ai-powered', 'ai-processed', 'ai-driven', 'ai-based',
'ai-enhanced', 'ai-assisted', 'ai-generated',
'ai-gedreven', 'ai-aangedreven', 'ai-ondersteund'
}
# Markdown/formatting artifacts
MARKDOWN_ARTIFACTS = ['**', '__', '##', '```']
# NEW: Fragment patterns indicating incomplete extraction (Nov 2025)
FRAGMENT_PATTERNS = [
r'^(for|of|and|the|a|an)\s', # Starts with preposition/article
r':\s*$', # Ends with colon (list item)
r'^\(', # Starts with parenthesis
r',\s+(archive|museum|library)$', # Ends with ", Archive" (list separator)
r'^archivees\s', # Common typo pattern
r'^library,\s', # "Library, Archive" fragments
r'^galleries,?\s+(libraries|archives)', # GLAM acronym expansion fragments
r'^corporate\s+(archives?|museum)$', # Generic category names
r'^religious\s+(archives?|museum)$',
r'^family\s+archives?$',
r'^general\s+pattern:', # Meta-discussion text
r'museum\s+connections?:', # Discussion metadata
]
# NEW: Dutch-specific validation patterns (Nov 2025)
DUTCH_GENERIC_PATTERNS = [
r'^dutch\s+(museum|archive|library)',
r'^for\s+(museum|archive|library|archives?)$',
r'^(museum|archive|library)\s+amsterdam$', # Too generic
r'^major\s+.*(museum|archive|library)',
r'^latest\s+(museum|archive)',
r'^core\s+(museum|archive)',
r'^\w+\s+museum\s+connections?:', # "Dutch Museum connections:"
]
# NEW: Known wrong-country institutions (Nov 2025)
WRONG_COUNTRY_INSTITUTIONS = {
'library of congress', # US institution
'linnaeus university', # Swedish institution
'hmml, library', # Unclear affiliation
'smithsonian', # US institution
'british library', # UK institution
'bibliothèque nationale de france', # France
}
# NEW: Known wrong-country institutions (Nov 2025)
WRONG_COUNTRY_INSTITUTIONS = {
'library of congress', # US institution
'linnaeus university', # Swedish institution
'hmml, library', # Unclear affiliation
'smithsonian', # US institution
'british library', # UK institution
'bibliothèque nationale de france', # France
}
before_count = len(self.all_institutions)
filtered = []
removed_reasons = defaultdict(int)
for inst in self.all_institutions:
# Filter 1: Name length
if len(inst.name) < 5:
removed_reasons['name_too_short'] += 1
continue
# Filter 2: Markdown artifacts (indicates extraction error)
if any(artifact in inst.name for artifact in MARKDOWN_ARTIFACTS):
removed_reasons['markdown_artifact'] += 1
continue
# Filter 3: Blacklisted ISIL prefix (AI-powered, etc.)
name_lower = inst.name.lower()
if any(name_lower.startswith(prefix) for prefix in ISIL_BLACKLIST_PREFIXES):
removed_reasons['blacklisted_ai_prefix'] += 1
continue
# Filter 4: Generic/stopword names
if name_lower in GENERIC_NAMES:
removed_reasons['generic_name'] += 1
continue
# Check if name is mostly a stopword
if any(generic in name_lower and len(generic) / len(name_lower) > 0.7
for generic in GENERIC_NAMES):
removed_reasons['mostly_generic'] += 1
continue
# Filter 5: Enhanced fragment patterns (Nov 2025)
is_fragment = False
for pattern in FRAGMENT_PATTERNS:
if re.search(pattern, name_lower):
removed_reasons['sentence_fragment'] += 1
is_fragment = True
break
if is_fragment:
continue
# Filter 5b: Dutch-specific generic patterns (Nov 2025)
if inst.locations and inst.locations[0].country == 'NL':
is_dutch_generic = False
for pattern in DUTCH_GENERIC_PATTERNS:
if re.search(pattern, name_lower):
removed_reasons['dutch_generic_pattern'] += 1
is_dutch_generic = True
break
if is_dutch_generic:
continue
# Filter 5c: NL institutions MUST have city (Nov 2025)
if not inst.locations[0].city:
removed_reasons['nl_missing_city'] += 1
continue
# Filter 5d: Reject single-word names for NL (Nov 2025)
if len(inst.name.split()) < 2:
removed_reasons['nl_single_word_name'] += 1
continue
# Filter 5e: Known wrong-country institutions (Nov 2025)
if inst.locations and inst.locations[0].country:
country = inst.locations[0].country
if name_lower in WRONG_COUNTRY_INSTITUTIONS and country == 'NL':
removed_reasons['wrong_country_misclassified'] += 1
continue
# Filter 6: Confidence score
confidence = inst.provenance.confidence_score or 0.0
if confidence < 0.5:
removed_reasons['low_confidence'] += 1
continue
# Filter 7: Valid country codes
if inst.locations and len(inst.locations) > 0:
country = inst.locations[0].country
# Reject if country is full name instead of code
if country and len(country) > 2:
removed_reasons['country_name_not_code'] += 1
continue
# Special case: "AI" country with no valid ISIL suggests false positive
# Anguilla (AI) institutions should have proper ISIL codes
if country == 'AI':
has_valid_isil = any(
ident.identifier_scheme == 'ISIL'
and ident.identifier_value.startswith('AI-')
and not any(ident.identifier_value.lower().startswith(prefix)
for prefix in ISIL_BLACKLIST_PREFIXES)
for ident in (inst.identifiers or [])
)
if not has_valid_isil:
removed_reasons['ai_country_no_valid_isil'] += 1
continue
# Reject if invalid country code (except UNKNOWN/None which we keep)
if country and country != 'UNKNOWN' and country not in VALID_COUNTRY_CODES:
removed_reasons['invalid_country_code'] += 1
continue
# Passed all filters
filtered.append(inst)
# Update institutions list
self.all_institutions = filtered
removed_count = before_count - len(filtered)
# Print filter summary
if removed_count > 0:
print()
print("=" * 70)
print("Quality Filter Summary:")
print(f" Before filtering: {before_count} institutions")
print(f" After filtering: {len(filtered)} institutions")
print(f" Removed: {removed_count} institutions ({removed_count/before_count*100:.1f}%)")
print()
print("Removal reasons:")
for reason, count in sorted(removed_reasons.items(), key=lambda x: -x[1]):
print(f" {reason}: {count}")
print("=" * 70)
return removed_count
def export_json(self, output_path: Optional[Path] = None):
"""
Export institutions to JSON file.
Args:
output_path: Output file path (defaults to output_dir/institutions.json)
"""
if output_path is None:
output_path = self.output_dir / "institutions.json"
print()
print(f"Exporting to JSON: {output_path}")
# Convert to dictionaries
data = {
"metadata": {
"extraction_date": datetime.now(timezone.utc).isoformat(),
"total_institutions": len(self.all_institutions),
"files_processed": self.stats.files_processed,
"data_tier": "TIER_4_INFERRED",
},
"institutions": [inst.dict() for inst in self.all_institutions]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False, default=str)
print(f" ✓ Exported {len(self.all_institutions)} institutions")
def export_csv(self, output_path: Optional[Path] = None):
"""
Export institutions to CSV file.
Args:
output_path: Output file path (defaults to output_dir/institutions.csv)
"""
import csv
if output_path is None:
output_path = self.output_dir / "institutions.csv"
print(f"Exporting to CSV: {output_path}")
with open(output_path, 'w', encoding='utf-8', newline='') as f:
writer = csv.writer(f)
# Write header
writer.writerow([
"name",
"institution_type",
"status",
"city",
"country",
"latitude",
"longitude",
"geonames_id",
"identifiers",
"confidence_score",
"data_source",
])
# Write rows
for inst in self.all_institutions:
# Extract location info
city = ""
country = ""
lat = ""
lon = ""
geonames_id = ""
if inst.locations and len(inst.locations) > 0:
loc = inst.locations[0]
city = loc.city or ""
country = loc.country or ""
lat = str(loc.latitude) if loc.latitude else ""
lon = str(loc.longitude) if loc.longitude else ""
geonames_id = loc.geonames_id or ""
# Format identifiers
identifiers = []
if inst.identifiers:
identifiers = [
f"{id.identifier_scheme}:{id.identifier_value}"
for id in inst.identifiers
]
identifiers_str = "; ".join(identifiers)
writer.writerow([
inst.name,
inst.institution_type,
inst.organization_status,
city,
country,
lat,
lon,
geonames_id,
identifiers_str,
inst.provenance.confidence_score,
inst.provenance.data_source,
])
print(f" ✓ Exported {len(self.all_institutions)} institutions")
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Batch extract institutions from conversation files"
)
parser.add_argument(
"--conversation-dir",
type=Path,
default=project_root,
help="Directory containing conversation JSON files"
)
parser.add_argument(
"--limit",
type=int,
help="Limit number of files to process"
)
parser.add_argument(
"--country",
type=str,
help="Filter to conversations about a specific country"
)
parser.add_argument(
"--no-geocoding",
action="store_true",
help="Disable GeoNames geocoding enrichment"
)
parser.add_argument(
"--output-dir",
type=Path,
default=Path("output"),
help="Output directory for results"
)
args = parser.parse_args()
# Initialize batch extractor
batch_extractor = BatchInstitutionExtractor(
conversation_dir=args.conversation_dir,
output_dir=args.output_dir,
use_geocoding=not args.no_geocoding
)
# Process all files
stats = batch_extractor.process_all(
limit=args.limit,
country_filter=args.country
)
# Export results
if batch_extractor.all_institutions:
# Apply quality filters before export
batch_extractor.apply_quality_filters()
batch_extractor.export_json()
batch_extractor.export_csv()
print()
print("=" * 70)
print("✓ Batch extraction complete!")
print(f" Results saved to: {args.output_dir}")
else:
print()
print("⚠ No institutions extracted")
if __name__ == "__main__":
main()