glam/scripts/merge_global_datasets.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

384 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Global Dataset Merge Script
Merges all regional ISIL datasets into a unified global heritage custodian database:
- Japan ISIL institutions (12,065 records)
- Netherlands ISIL institutions (369 records)
- EU institutions (10 records)
- Latin America institutions (304 records)
Output: Comprehensive global dataset with ~12,748 institutions
Author: GLAM Data Extraction Project
Date: 2025-11-07
"""
import yaml
from pathlib import Path
from datetime import datetime, timezone
from typing import List, Dict, Any
from collections import Counter
def load_yaml_dataset(file_path: Path) -> List[Dict[str, Any]]:
"""Load YAML dataset from file."""
print(f"Loading {file_path.name}...")
with open(file_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
# Handle both list and dict formats
if isinstance(data, list):
return data
elif isinstance(data, dict):
# If it's a single institution, wrap in list
return [data]
else:
raise ValueError(f"Unexpected data format in {file_path}")
def analyze_dataset(institutions: List[Dict[str, Any]], name: str) -> Dict[str, Any]:
"""Analyze dataset structure and contents."""
print(f"\n{'='*60}")
print(f"Analyzing {name}")
print(f"{'='*60}")
stats = {
'name': name,
'total_count': len(institutions),
'countries': Counter(),
'institution_types': Counter(),
'data_sources': Counter(),
'data_tiers': Counter(),
'has_ghcid': 0,
'has_coordinates': 0,
'has_website': 0,
'has_identifiers': 0,
}
for inst in institutions:
# Country distribution
if 'locations' in inst and inst['locations']:
country = inst['locations'][0].get('country', 'UNKNOWN')
stats['countries'][country] += 1
# Check for coordinates
if inst['locations'][0].get('latitude') or inst['locations'][0].get('longitude'):
stats['has_coordinates'] += 1
# Institution type
inst_type = inst.get('institution_type', 'UNKNOWN')
stats['institution_types'][inst_type] += 1
# Provenance
if 'provenance' in inst:
prov = inst['provenance']
stats['data_sources'][prov.get('data_source', 'UNKNOWN')] += 1
stats['data_tiers'][prov.get('data_tier', 'UNKNOWN')] += 1
# GHCID
if inst.get('ghcid'):
stats['has_ghcid'] += 1
# Identifiers
if inst.get('identifiers'):
stats['has_identifiers'] += 1
# Check for website
for identifier in inst['identifiers']:
if identifier.get('identifier_scheme') == 'Website':
stats['has_website'] += 1
break
# Print summary
print(f"Total Records: {stats['total_count']:,}")
print(f"\nCountries ({len(stats['countries'])}):")
for country, count in stats['countries'].most_common():
print(f" {country}: {count:,} ({count/stats['total_count']*100:.1f}%)")
print(f"\nInstitution Types:")
for inst_type, count in stats['institution_types'].most_common():
print(f" {inst_type}: {count:,} ({count/stats['total_count']*100:.1f}%)")
print(f"\nData Quality:")
print(f" GHCID Coverage: {stats['has_ghcid']:,} ({stats['has_ghcid']/stats['total_count']*100:.1f}%)")
print(f" Has Coordinates: {stats['has_coordinates']:,} ({stats['has_coordinates']/stats['total_count']*100:.1f}%)")
print(f" Has Website: {stats['has_website']:,} ({stats['has_website']/stats['total_count']*100:.1f}%)")
print(f" Has Identifiers: {stats['has_identifiers']:,} ({stats['has_identifiers']/stats['total_count']*100:.1f}%)")
return stats
def deduplicate_institutions(institutions: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Deduplicate institutions by GHCID (primary) or ISIL code (fallback).
For duplicates, prefer records with:
1. Higher data tier (TIER_1 > TIER_2 > TIER_3 > TIER_4)
2. More complete data (more fields filled)
3. Most recent extraction date
"""
seen_ghcids = {}
seen_isil = {}
duplicates = []
tier_priority = {
'TIER_1_AUTHORITATIVE': 4,
'TIER_2_VERIFIED': 3,
'TIER_3_CROWD_SOURCED': 2,
'TIER_4_INFERRED': 1,
}
def completeness_score(inst: Dict[str, Any]) -> int:
"""Calculate how complete an institution record is."""
score = 0
score += 1 if inst.get('name') else 0
score += 1 if inst.get('description') else 0
score += len(inst.get('identifiers', []))
score += len(inst.get('locations', []))
score += len(inst.get('digital_platforms', []))
score += 1 if inst.get('ghcid') else 0
if inst.get('locations'):
loc = inst['locations'][0]
score += 1 if loc.get('street_address') else 0
score += 1 if loc.get('postal_code') else 0
score += 1 if loc.get('latitude') else 0
return score
def is_better_record(new_inst: Dict[str, Any], existing_inst: Dict[str, Any]) -> bool:
"""Determine if new record is better than existing."""
new_prov = new_inst.get('provenance', {})
existing_prov = existing_inst.get('provenance', {})
# Compare data tiers
new_tier = tier_priority.get(new_prov.get('data_tier', ''), 0)
existing_tier = tier_priority.get(existing_prov.get('data_tier', ''), 0)
if new_tier != existing_tier:
return new_tier > existing_tier
# Compare completeness
new_score = completeness_score(new_inst)
existing_score = completeness_score(existing_inst)
if new_score != existing_score:
return new_score > existing_score
# Compare extraction dates (more recent is better)
new_date = new_prov.get('extraction_date', '')
existing_date = existing_prov.get('extraction_date', '')
return new_date > existing_date
deduped = []
for inst in institutions:
ghcid = inst.get('ghcid')
isil_code = None
# Extract ISIL code
if inst.get('identifiers'):
for identifier in inst['identifiers']:
if identifier.get('identifier_scheme') == 'ISIL':
isil_code = identifier.get('identifier_value')
break
# Check for GHCID duplicates
if ghcid:
if ghcid in seen_ghcids:
duplicates.append({
'ghcid': ghcid,
'name1': seen_ghcids[ghcid].get('name'),
'name2': inst.get('name'),
})
# Keep better record
if is_better_record(inst, seen_ghcids[ghcid]):
# Remove old record from deduped
deduped = [i for i in deduped if i.get('ghcid') != ghcid]
seen_ghcids[ghcid] = inst
deduped.append(inst)
# else: keep existing record
continue
else:
seen_ghcids[ghcid] = inst
# Check for ISIL duplicates (only if no GHCID)
elif isil_code:
if isil_code in seen_isil:
duplicates.append({
'isil': isil_code,
'name1': seen_isil[isil_code].get('name'),
'name2': inst.get('name'),
})
# Keep better record
if is_better_record(inst, seen_isil[isil_code]):
# Remove old record from deduped
deduped = [i for i in deduped
if not any(id.get('identifier_value') == isil_code
for id in i.get('identifiers', [])
if id.get('identifier_scheme') == 'ISIL')]
seen_isil[isil_code] = inst
deduped.append(inst)
continue
else:
seen_isil[isil_code] = inst
deduped.append(inst)
if duplicates:
print(f"\n⚠️ Found {len(duplicates)} duplicates (resolved by keeping best record):")
for dup in duplicates[:10]: # Show first 10
if 'ghcid' in dup:
print(f" GHCID {dup['ghcid']}: '{dup['name1']}' vs '{dup['name2']}'")
else:
print(f" ISIL {dup['isil']}: '{dup['name1']}' vs '{dup['name2']}'")
if len(duplicates) > 10:
print(f" ... and {len(duplicates) - 10} more")
return deduped
def merge_datasets(datasets: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, Any]]:
"""Merge all datasets with deduplication."""
print(f"\n{'='*60}")
print("Merging Datasets")
print(f"{'='*60}")
# Combine all institutions
all_institutions = []
for name, institutions in datasets.items():
print(f"Adding {len(institutions):,} records from {name}")
all_institutions.extend(institutions)
print(f"\nTotal before deduplication: {len(all_institutions):,}")
# Deduplicate
merged = deduplicate_institutions(all_institutions)
print(f"Total after deduplication: {len(merged):,}")
print(f"Removed: {len(all_institutions) - len(merged):,} duplicates")
return merged
def main():
"""Main execution."""
base_path = Path('/Users/kempersc/apps/glam')
# Define source datasets
datasets = {
'Japan ISIL': base_path / 'data/instances/japan/jp_institutions_resolved.yaml', # Using collision-resolved dataset
'Netherlands ISIL': base_path / 'data/dutch_institutions_with_ghcids.yaml',
'EU Institutions': base_path / 'data/instances/eu_institutions.yaml',
'Latin America': base_path / 'data/instances/latin_american_institutions_AUTHORITATIVE.yaml',
}
# Load all datasets
loaded_datasets = {}
for name, path in datasets.items():
if path.exists():
loaded_datasets[name] = load_yaml_dataset(path)
else:
print(f"⚠️ Warning: {name} not found at {path}")
# Analyze each dataset
stats = {}
for name, institutions in loaded_datasets.items():
stats[name] = analyze_dataset(institutions, name)
# Merge datasets
merged_institutions = merge_datasets(loaded_datasets)
# Analyze merged dataset
merged_stats = analyze_dataset(merged_institutions, "GLOBAL MERGED DATASET")
# Generate output files
output_dir = base_path / 'data/instances/global'
output_dir.mkdir(parents=True, exist_ok=True)
# 1. Main YAML file
output_yaml = output_dir / 'global_heritage_institutions.yaml'
print(f"\n{'='*60}")
print(f"Writing merged dataset to {output_yaml.name}")
print(f"{'='*60}")
with open(output_yaml, 'w', encoding='utf-8') as f:
yaml.dump(merged_institutions, f,
allow_unicode=True,
default_flow_style=False,
sort_keys=False,
width=120)
print(f"✅ Wrote {len(merged_institutions):,} institutions to {output_yaml}")
# 2. Statistics report
stats_file = output_dir / 'merge_statistics.yaml'
merge_metadata = {
'merge_date': datetime.now(timezone.utc).isoformat(),
'total_institutions': len(merged_institutions),
'source_datasets': {
name: {
'count': len(institutions),
'file': str(datasets[name].relative_to(base_path))
}
for name, institutions in loaded_datasets.items()
},
'regional_statistics': stats,
'merged_statistics': merged_stats,
}
with open(stats_file, 'w', encoding='utf-8') as f:
yaml.dump(merge_metadata, f,
allow_unicode=True,
default_flow_style=False,
sort_keys=False)
print(f"✅ Wrote statistics to {stats_file}")
# 3. Summary report (markdown)
report_file = output_dir / 'merge_report.md'
with open(report_file, 'w', encoding='utf-8') as f:
f.write("# Global Heritage Institutions Dataset - Merge Report\n\n")
f.write(f"**Merge Date**: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}\n\n")
f.write(f"**Total Institutions**: {len(merged_institutions):,}\n\n")
f.write("## Source Datasets\n\n")
for name, institutions in loaded_datasets.items():
f.write(f"- **{name}**: {len(institutions):,} records\n")
f.write(f"\n## Country Distribution\n\n")
for country, count in merged_stats['countries'].most_common():
pct = count / merged_stats['total_count'] * 100
f.write(f"- **{country}**: {count:,} ({pct:.1f}%)\n")
f.write(f"\n## Institution Types\n\n")
for inst_type, count in merged_stats['institution_types'].most_common():
pct = count / merged_stats['total_count'] * 100
f.write(f"- **{inst_type}**: {count:,} ({pct:.1f}%)\n")
f.write(f"\n## Data Quality Metrics\n\n")
total = merged_stats['total_count']
f.write(f"- **GHCID Coverage**: {merged_stats['has_ghcid']:,} ({merged_stats['has_ghcid']/total*100:.1f}%)\n")
f.write(f"- **Geocoded (has coordinates)**: {merged_stats['has_coordinates']:,} ({merged_stats['has_coordinates']/total*100:.1f}%)\n")
f.write(f"- **Has Website**: {merged_stats['has_website']:,} ({merged_stats['has_website']/total*100:.1f}%)\n")
f.write(f"- **Has Identifiers**: {merged_stats['has_identifiers']:,} ({merged_stats['has_identifiers']/total*100:.1f}%)\n")
f.write(f"\n## Next Steps\n\n")
f.write("1. **Geocoding**: Add coordinates to remaining institutions\n")
f.write("2. **Enrichment**: Add Wikidata/VIAF identifiers\n")
f.write("3. **Validation**: Schema compliance check\n")
f.write("4. **Export**: Generate JSON-LD, GeoJSON, CSV formats\n")
print(f"✅ Wrote report to {report_file}")
print(f"\n{'='*60}")
print("Global Dataset Merge Complete! 🎉")
print(f"{'='*60}")
print(f"Output directory: {output_dir}")
print(f"Total institutions: {len(merged_institutions):,}")
if __name__ == '__main__':
main()