- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
399 lines
13 KiB
Python
Executable file
399 lines
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Validate Dutch Institution Extraction Quality
|
|
|
|
Cross-links extracted NL institutions with authoritative ISIL registry to measure:
|
|
- Precision: % of extracted institutions that are real
|
|
- Recall: % of known institutions that were extracted
|
|
- F1 Score: Harmonic mean of precision and recall
|
|
|
|
Outputs detailed analysis to help identify false positives and false negatives.
|
|
"""
|
|
|
|
import csv
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Set, Tuple
|
|
from dataclasses import dataclass
|
|
from rapidfuzz import fuzz
|
|
import re
|
|
|
|
@dataclass
|
|
class Institution:
|
|
"""Represents a heritage institution."""
|
|
name: str
|
|
city: str
|
|
isil_code: str
|
|
source: str # 'extracted' or 'registry'
|
|
raw_record: Dict
|
|
|
|
def __str__(self):
|
|
return f"{self.name} ({self.city}) [{self.isil_code or 'no ISIL'}]"
|
|
|
|
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize institution name for matching."""
|
|
if not name:
|
|
return ""
|
|
|
|
# Lowercase
|
|
normalized = name.lower()
|
|
|
|
# Remove common prefixes/suffixes
|
|
normalized = re.sub(r'\b(stichting|gemeente|museum|archief|bibliotheek)\b', '', normalized)
|
|
|
|
# Remove punctuation and extra whitespace
|
|
normalized = re.sub(r'[^\w\s]', ' ', normalized)
|
|
normalized = re.sub(r'\s+', ' ', normalized).strip()
|
|
|
|
return normalized
|
|
|
|
|
|
def extract_isil_codes(identifiers_str: str) -> List[str]:
|
|
"""Extract ISIL codes from semicolon-separated identifier string."""
|
|
if not identifiers_str:
|
|
return []
|
|
|
|
isil_codes = []
|
|
for part in identifiers_str.split(';'):
|
|
if 'ISIL:' in part:
|
|
code = part.split('ISIL:')[1].strip()
|
|
if code and code.startswith('NL-'):
|
|
isil_codes.append(code)
|
|
|
|
return isil_codes
|
|
|
|
|
|
def load_extracted_institutions(csv_path: Path) -> List[Institution]:
|
|
"""Load extracted institutions from batch extraction CSV."""
|
|
institutions = []
|
|
|
|
with open(csv_path, 'r', encoding='utf-8') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# Filter to Netherlands only
|
|
if row.get('country') != 'NL':
|
|
continue
|
|
|
|
# Extract ISIL codes from identifiers field
|
|
isil_codes = extract_isil_codes(row.get('identifiers', ''))
|
|
isil_code = isil_codes[0] if isil_codes else ''
|
|
|
|
inst = Institution(
|
|
name=row['name'],
|
|
city=row.get('city', ''),
|
|
isil_code=isil_code,
|
|
source='extracted',
|
|
raw_record=row
|
|
)
|
|
institutions.append(inst)
|
|
|
|
return institutions
|
|
|
|
|
|
def load_isil_registry(csv_path: Path) -> List[Institution]:
|
|
"""Load institutions from ISIL registry CSV using existing parser."""
|
|
import re
|
|
|
|
institutions = []
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
lines = f.readlines()
|
|
|
|
# Skip header
|
|
for line in lines[1:]:
|
|
# Extract quoted fields using regex
|
|
fields = re.findall(r'"([^"]*)"', line)
|
|
# Filter out separators (commas, semicolons, empty strings)
|
|
fields = [f for f in fields if f and f not in (',', ';')]
|
|
|
|
if len(fields) < 4:
|
|
continue
|
|
|
|
# Parse fields: Volgnr, Plaats, Instelling, ISIL code, Toegekend op, Opmerking
|
|
plaats = fields[1].strip()
|
|
instelling = fields[2].strip()
|
|
isil_code = fields[3].strip()
|
|
|
|
# Skip empty or invalid
|
|
if not instelling or not isil_code or not isil_code.startswith('NL-'):
|
|
continue
|
|
|
|
inst = Institution(
|
|
name=instelling,
|
|
city=plaats,
|
|
isil_code=isil_code,
|
|
source='registry',
|
|
raw_record={'plaats': plaats, 'instelling': instelling, 'isil_code': isil_code}
|
|
)
|
|
institutions.append(inst)
|
|
|
|
return institutions
|
|
|
|
|
|
def cross_link_by_isil(
|
|
extracted: List[Institution],
|
|
registry: List[Institution]
|
|
) -> Tuple[List[Tuple[Institution, Institution]], List[Institution], List[Institution]]:
|
|
"""
|
|
Cross-link institutions by ISIL code.
|
|
|
|
Returns:
|
|
- matched: List of (extracted, registry) pairs
|
|
- unmatched_extracted: Extracted institutions not in registry
|
|
- unmatched_registry: Registry institutions not found in extraction
|
|
"""
|
|
# Build ISIL lookup for registry
|
|
registry_by_isil = {inst.isil_code: inst for inst in registry if inst.isil_code}
|
|
extracted_by_isil = {inst.isil_code: inst for inst in extracted if inst.isil_code}
|
|
|
|
# Find matches
|
|
matched = []
|
|
for isil, extracted_inst in extracted_by_isil.items():
|
|
if isil in registry_by_isil:
|
|
matched.append((extracted_inst, registry_by_isil[isil]))
|
|
|
|
# Find unmatched
|
|
matched_isil_codes = {m[0].isil_code for m in matched}
|
|
unmatched_extracted = [
|
|
inst for inst in extracted
|
|
if not inst.isil_code or inst.isil_code not in matched_isil_codes
|
|
]
|
|
unmatched_registry = [
|
|
inst for inst in registry
|
|
if inst.isil_code not in matched_isil_codes
|
|
]
|
|
|
|
return matched, unmatched_extracted, unmatched_registry
|
|
|
|
|
|
def fuzzy_match_by_name(
|
|
unmatched_extracted: List[Institution],
|
|
unmatched_registry: List[Institution],
|
|
threshold: float = 85.0
|
|
) -> Tuple[List[Tuple[Institution, Institution, float]], List[Institution], List[Institution]]:
|
|
"""
|
|
Fuzzy match institutions by name.
|
|
|
|
Returns:
|
|
- matched: List of (extracted, registry, score) triples
|
|
- remaining_extracted: Still unmatched extracted institutions
|
|
- remaining_registry: Still unmatched registry institutions
|
|
"""
|
|
matched = []
|
|
used_registry = set()
|
|
used_extracted = set()
|
|
|
|
for ext_inst in unmatched_extracted:
|
|
ext_name = normalize_name(ext_inst.name)
|
|
|
|
best_match = None
|
|
best_score = 0.0
|
|
|
|
for i, reg_inst in enumerate(unmatched_registry):
|
|
if i in used_registry:
|
|
continue
|
|
|
|
reg_name = normalize_name(reg_inst.name)
|
|
|
|
# Try different fuzzy matching strategies
|
|
ratio = fuzz.ratio(ext_name, reg_name)
|
|
partial = fuzz.partial_ratio(ext_name, reg_name)
|
|
token_sort = fuzz.token_sort_ratio(ext_name, reg_name)
|
|
|
|
# Use highest score
|
|
score = max(ratio, partial, token_sort)
|
|
|
|
if score > best_score and score >= threshold:
|
|
best_score = score
|
|
best_match = (reg_inst, i)
|
|
|
|
if best_match:
|
|
matched.append((ext_inst, best_match[0], best_score))
|
|
used_extracted.add(id(ext_inst))
|
|
used_registry.add(best_match[1])
|
|
|
|
# Remaining unmatched
|
|
remaining_extracted = [
|
|
inst for inst in unmatched_extracted
|
|
if id(inst) not in used_extracted
|
|
]
|
|
remaining_registry = [
|
|
inst for i, inst in enumerate(unmatched_registry)
|
|
if i not in used_registry
|
|
]
|
|
|
|
return matched, remaining_extracted, remaining_registry
|
|
|
|
|
|
def calculate_metrics(
|
|
isil_matched: int,
|
|
fuzzy_matched: int,
|
|
total_extracted_nl: int,
|
|
total_registry: int
|
|
) -> Dict[str, float]:
|
|
"""Calculate precision, recall, and F1 score."""
|
|
total_matched = isil_matched + fuzzy_matched
|
|
|
|
# Precision: What % of extracted NL institutions are real?
|
|
precision = total_matched / total_extracted_nl if total_extracted_nl > 0 else 0.0
|
|
|
|
# Recall: What % of known NL institutions did we find?
|
|
recall = total_matched / total_registry if total_registry > 0 else 0.0
|
|
|
|
# F1 Score: Harmonic mean
|
|
f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
|
|
|
|
return {
|
|
'precision': precision,
|
|
'recall': recall,
|
|
'f1_score': f1,
|
|
'total_matched': total_matched,
|
|
'isil_matched': isil_matched,
|
|
'fuzzy_matched': fuzzy_matched
|
|
}
|
|
|
|
|
|
def print_report(
|
|
extracted: List[Institution],
|
|
registry: List[Institution],
|
|
isil_matches: List[Tuple[Institution, Institution]],
|
|
fuzzy_matches: List[Tuple[Institution, Institution, float]],
|
|
false_positives: List[Institution],
|
|
false_negatives: List[Institution],
|
|
metrics: Dict[str, float]
|
|
):
|
|
"""Print comprehensive validation report."""
|
|
|
|
print("=" * 80)
|
|
print("DUTCH INSTITUTION EXTRACTION VALIDATION REPORT")
|
|
print("=" * 80)
|
|
print()
|
|
|
|
print("📊 DATASET SUMMARY")
|
|
print("-" * 80)
|
|
print(f"Total extracted institutions (country=NL): {len(extracted)}")
|
|
print(f"Total ISIL registry institutions: {len(registry)}")
|
|
print()
|
|
|
|
print("🎯 MATCHING RESULTS")
|
|
print("-" * 80)
|
|
print(f"ISIL code matches: {len(isil_matches):3d} institutions")
|
|
print(f"Fuzzy name matches: {len(fuzzy_matches):3d} institutions (threshold: 85%)")
|
|
print(f"Total matched: {metrics['total_matched']:3d} institutions")
|
|
print()
|
|
|
|
print("📈 QUALITY METRICS")
|
|
print("-" * 80)
|
|
print(f"Precision: {metrics['precision']:6.1%} (What % of extracted are real?)")
|
|
print(f"Recall: {metrics['recall']:6.1%} (What % of known institutions found?)")
|
|
print(f"F1 Score: {metrics['f1_score']:6.1%} (Harmonic mean of precision/recall)")
|
|
print()
|
|
|
|
print("❌ FALSE POSITIVES (extracted but not in registry)")
|
|
print("-" * 80)
|
|
print(f"Count: {len(false_positives)}")
|
|
if false_positives:
|
|
print("\nTop 15 false positives:")
|
|
for i, inst in enumerate(false_positives[:15], 1):
|
|
confidence = inst.raw_record.get('confidence_score', 'N/A')
|
|
print(f" {i:2d}. {inst.name[:60]:60s} (confidence: {confidence})")
|
|
print()
|
|
|
|
print("❌ FALSE NEGATIVES (in registry but not extracted)")
|
|
print("-" * 80)
|
|
print(f"Count: {len(false_negatives)}")
|
|
if false_negatives:
|
|
print("\nTop 15 false negatives:")
|
|
for i, inst in enumerate(false_negatives[:15], 1):
|
|
print(f" {i:2d}. {inst.name[:60]:60s} [{inst.isil_code}] ({inst.city})")
|
|
print()
|
|
|
|
if fuzzy_matches:
|
|
print("🔍 FUZZY NAME MATCHES (require verification)")
|
|
print("-" * 80)
|
|
print(f"Count: {len(fuzzy_matches)}")
|
|
print("\nTop 10 fuzzy matches:")
|
|
for i, (ext, reg, score) in enumerate(sorted(fuzzy_matches, key=lambda x: -x[2])[:10], 1):
|
|
print(f" {i:2d}. Score: {score:5.1f}%")
|
|
print(f" Extracted: {ext.name}")
|
|
print(f" Registry: {reg.name} [{reg.isil_code}]")
|
|
print()
|
|
|
|
print("=" * 80)
|
|
print("📋 SUMMARY")
|
|
print("-" * 80)
|
|
print(f"✅ High-confidence matches (ISIL): {len(isil_matches):3d}")
|
|
print(f"⚠️ Medium-confidence (fuzzy name): {len(fuzzy_matches):3d}")
|
|
print(f"❌ False positives: {len(false_positives):3d}")
|
|
print(f"❌ False negatives: {len(false_negatives):3d}")
|
|
print()
|
|
print(f"Overall quality: Precision={metrics['precision']:.1%}, Recall={metrics['recall']:.1%}, F1={metrics['f1_score']:.1%}")
|
|
print("=" * 80)
|
|
|
|
|
|
def main():
|
|
"""Main validation workflow."""
|
|
# Paths
|
|
extracted_csv = Path('output/institutions.csv')
|
|
registry_csv = Path('data/ISIL-codes_2025-08-01.csv')
|
|
|
|
# Check files exist
|
|
if not extracted_csv.exists():
|
|
print(f"Error: {extracted_csv} not found", file=sys.stderr)
|
|
return 1
|
|
|
|
if not registry_csv.exists():
|
|
print(f"Error: {registry_csv} not found", file=sys.stderr)
|
|
return 1
|
|
|
|
# Load data
|
|
print("Loading extracted institutions...")
|
|
extracted = load_extracted_institutions(extracted_csv)
|
|
print(f" Found {len(extracted)} NL institutions")
|
|
|
|
print("Loading ISIL registry...")
|
|
registry = load_isil_registry(registry_csv)
|
|
print(f" Found {len(registry)} registry institutions")
|
|
print()
|
|
|
|
# Cross-link by ISIL code
|
|
print("Cross-linking by ISIL code...")
|
|
isil_matches, unmatched_extracted, unmatched_registry = cross_link_by_isil(extracted, registry)
|
|
print(f" {len(isil_matches)} ISIL matches")
|
|
print()
|
|
|
|
# Fuzzy match by name
|
|
print("Fuzzy matching by institution name...")
|
|
fuzzy_matches, false_positives, false_negatives = fuzzy_match_by_name(
|
|
unmatched_extracted,
|
|
unmatched_registry,
|
|
threshold=85.0
|
|
)
|
|
print(f" {len(fuzzy_matches)} fuzzy matches (≥85% similarity)")
|
|
print()
|
|
|
|
# Calculate metrics
|
|
metrics = calculate_metrics(
|
|
isil_matched=len(isil_matches),
|
|
fuzzy_matched=len(fuzzy_matches),
|
|
total_extracted_nl=len(extracted),
|
|
total_registry=len(registry)
|
|
)
|
|
|
|
# Print report
|
|
print_report(
|
|
extracted=extracted,
|
|
registry=registry,
|
|
isil_matches=isil_matches,
|
|
fuzzy_matches=fuzzy_matches,
|
|
false_positives=false_positives,
|
|
false_negatives=false_negatives,
|
|
metrics=metrics
|
|
)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|