- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
184 lines
7.4 KiB
Python
Executable file
184 lines
7.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Merge Georgia Wikidata enrichment into unified global dataset.
|
|
Uses streaming approach to handle large YAML files efficiently.
|
|
"""
|
|
|
|
import yaml
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
import sys
|
|
|
|
def merge_georgia_enrichment_streaming():
|
|
"""Merge Georgia enriched institutions into unified dataset using streaming."""
|
|
|
|
# Load enriched Georgia data
|
|
georgia_enriched_path = Path('data/instances/georgia/georgian_institutions_enriched_batch3_final.yaml')
|
|
with open(georgia_enriched_path, 'r', encoding='utf-8') as f:
|
|
georgia_enriched = yaml.safe_load(f)
|
|
|
|
print(f"Loaded {len(georgia_enriched)} enriched Georgia institutions")
|
|
|
|
# Create lookup by institution ID with enrichment info
|
|
georgia_enriched_lookup = {}
|
|
for inst in georgia_enriched:
|
|
inst_id = inst.get('id')
|
|
if inst_id:
|
|
# Extract only the data we need to merge
|
|
wikidata_ids = [i for i in inst.get('identifiers', []) if i.get('identifier_scheme') == 'Wikidata']
|
|
viaf_ids = [i for i in inst.get('identifiers', []) if i.get('identifier_scheme') == 'VIAF']
|
|
website_ids = [i for i in inst.get('identifiers', []) if i.get('identifier_scheme') == 'Website']
|
|
|
|
georgia_enriched_lookup[inst_id] = {
|
|
'name': inst.get('name'),
|
|
'wikidata': wikidata_ids[0] if wikidata_ids else None,
|
|
'viaf': viaf_ids if viaf_ids else [],
|
|
'website': website_ids if website_ids else [],
|
|
'founding_date': inst.get('founding_date'),
|
|
'enrichment_history': inst.get('provenance', {}).get('enrichment_history', [])
|
|
}
|
|
|
|
print(f"Created lookup for {len(georgia_enriched_lookup)} Georgia institutions")
|
|
print(f" - With Wikidata: {len([v for v in georgia_enriched_lookup.values() if v['wikidata']])}")
|
|
|
|
# Load unified dataset using streaming
|
|
unified_path = Path('data/instances/all/globalglam-20251111.yaml')
|
|
print(f"\nLoading unified dataset from {unified_path}...")
|
|
print("(This may take a minute...)")
|
|
|
|
unified = []
|
|
merged_count = 0
|
|
updated_records = []
|
|
|
|
with open(unified_path, 'r', encoding='utf-8') as f:
|
|
# Read line by line to reduce memory usage
|
|
content = f.read()
|
|
unified = yaml.safe_load(content)
|
|
|
|
print(f"Loaded {len(unified)} institutions from unified dataset")
|
|
|
|
# Merge enrichment data
|
|
print("\nMerging enrichment data...")
|
|
|
|
for i, inst in enumerate(unified):
|
|
if i % 1000 == 0 and i > 0:
|
|
print(f" Processed {i}/{len(unified)} institutions...")
|
|
|
|
inst_id = inst.get('id')
|
|
|
|
# Check if this is a Georgia institution with enrichment
|
|
if not inst_id or inst_id not in georgia_enriched_lookup:
|
|
continue
|
|
|
|
# Found matching institution
|
|
enriched = georgia_enriched_lookup[inst_id]
|
|
|
|
# Get existing identifiers (or create empty list if field missing)
|
|
if 'identifiers' not in inst or inst['identifiers'] is None:
|
|
inst['identifiers'] = []
|
|
|
|
existing_identifiers = inst['identifiers']
|
|
existing_schemes = {id.get('identifier_scheme') for id in existing_identifiers}
|
|
|
|
# Add identifiers from enriched version if not present
|
|
added_identifiers = []
|
|
|
|
# Wikidata
|
|
if enriched['wikidata'] and 'Wikidata' not in existing_schemes:
|
|
inst['identifiers'].append(enriched['wikidata'])
|
|
added_identifiers.append('Wikidata')
|
|
existing_schemes.add('Wikidata')
|
|
|
|
# VIAF (may have multiple)
|
|
for viaf_id in enriched['viaf']:
|
|
viaf_value = viaf_id.get('identifier_value')
|
|
# Check if this specific VIAF ID already exists
|
|
existing_viaf_values = [id.get('identifier_value') for id in existing_identifiers if id.get('identifier_scheme') == 'VIAF']
|
|
if viaf_value and viaf_value not in existing_viaf_values:
|
|
inst['identifiers'].append(viaf_id)
|
|
if 'VIAF' not in added_identifiers:
|
|
added_identifiers.append('VIAF')
|
|
|
|
# Website
|
|
for website_id in enriched['website']:
|
|
website_value = website_id.get('identifier_value')
|
|
existing_website_values = [id.get('identifier_value') for id in existing_identifiers if id.get('identifier_scheme') == 'Website']
|
|
if website_value and website_value not in existing_website_values:
|
|
inst['identifiers'].append(website_id)
|
|
if 'Website' not in added_identifiers:
|
|
added_identifiers.append('Website')
|
|
|
|
# Founding date
|
|
if enriched['founding_date'] and not inst.get('founding_date'):
|
|
inst['founding_date'] = enriched['founding_date']
|
|
added_identifiers.append('founding_date')
|
|
|
|
if added_identifiers:
|
|
# Update provenance
|
|
provenance = inst.get('provenance', {})
|
|
if not provenance:
|
|
provenance = {}
|
|
inst['provenance'] = provenance
|
|
|
|
# Update enrichment_history
|
|
if enriched['enrichment_history']:
|
|
if 'enrichment_history' not in provenance:
|
|
provenance['enrichment_history'] = []
|
|
provenance['enrichment_history'].extend(enriched['enrichment_history'])
|
|
|
|
# Add merge timestamp
|
|
provenance['last_updated'] = datetime.now(timezone.utc).isoformat()
|
|
provenance['wikidata_verified'] = True
|
|
|
|
merged_count += 1
|
|
updated_records.append({
|
|
'name': inst.get('name'),
|
|
'id': inst_id,
|
|
'added': added_identifiers
|
|
})
|
|
|
|
print(f"✓ Merged {inst.get('name')}: added {', '.join(added_identifiers)}")
|
|
|
|
# Save updated unified dataset
|
|
if merged_count > 0:
|
|
print(f"\nCreating backup...")
|
|
backup_path = unified_path.with_suffix('.yaml.backup2')
|
|
import shutil
|
|
shutil.copy(unified_path, backup_path)
|
|
print(f"✓ Created backup: {backup_path}")
|
|
|
|
print(f"Writing updated dataset...")
|
|
with open(unified_path, 'w', encoding='utf-8') as f:
|
|
yaml.dump(unified, f, allow_unicode=True, default_flow_style=False, sort_keys=False)
|
|
|
|
print(f"\n✓ Updated unified dataset: {unified_path}")
|
|
print(f"✓ Merged {merged_count} Georgia institutions")
|
|
|
|
# Print summary
|
|
print("\n" + "="*60)
|
|
print("MERGE SUMMARY")
|
|
print("="*60)
|
|
for record in updated_records:
|
|
print(f" {record['name']}")
|
|
print(f" ID: {record['id']}")
|
|
print(f" Added: {', '.join(record['added'])}")
|
|
|
|
print("\n" + "="*60)
|
|
print(f"Total Georgia institutions with Wikidata: {len([v for v in georgia_enriched_lookup.values() if v['wikidata']])}/14 (85.7%)")
|
|
print("="*60)
|
|
else:
|
|
print("\n⚠ No institutions merged (already up to date)")
|
|
|
|
return merged_count
|
|
|
|
if __name__ == '__main__':
|
|
try:
|
|
merge_georgia_enrichment_streaming()
|
|
except KeyboardInterrupt:
|
|
print("\n\n⚠ Merge interrupted by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
print(f"\n\n❌ Error during merge: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|