#!/usr/bin/env python3 """ Merge enriched datasets into unified global heritage institutions database. Merges enriched datasets (Task 6+ - November 2025): 1. Tunisia Enhanced (68 institutions, 76.5% Wikidata) 2. Georgia Enriched (14 institutions, 78.6% Wikidata) 3. Belgium Manual Enriched (7 institutions, 100% Wikidata) Strategy: - Handle both file formats (plain list vs. _metadata wrapper) - Deduplicate by ID/GHCID - Preserve enrichment metadata (replace if new is more enriched) - Create timestamped backup before merge - Accept country parameter to selectively merge datasets """ import yaml import sys from pathlib import Path from datetime import datetime, timezone from collections import defaultdict from typing import Dict, List, Any # File paths BASE_DIR = Path("/Users/kempersc/apps/glam") UNIFIED_DB = BASE_DIR / "data/instances/all/globalglam-20251111.yaml" # Mapping of country codes to enriched dataset files ENRICHED_FILES = { 'tunisia': BASE_DIR / "data/instances/tunisia/tunisian_institutions_enhanced.yaml", 'georgia': BASE_DIR / "data/instances/georgia/georgian_institutions_enriched_batch3_final.yaml", 'belgium': BASE_DIR / "data/instances/belgium/be_institutions_enriched_manual.yaml", } # Default: merge all available enriched files DEFAULT_SOURCE_FILES = list(ENRICHED_FILES.values()) def load_yaml_file(filepath: Path) -> List[Dict[str, Any]]: """Load YAML file and extract institutions list.""" print(f"šŸ“– Loading: {filepath.name}") with open(filepath, 'r', encoding='utf-8') as f: data = yaml.safe_load(f) # Handle both formats if isinstance(data, dict) and 'institutions' in data: # Format: {_metadata: {...}, institutions: [...]} institutions = data['institutions'] print(f" Format: metadata wrapper, {len(institutions)} institutions") elif isinstance(data, list): # Format: [...] institutions = data print(f" Format: plain list, {len(institutions)} institutions") else: raise ValueError(f"Unexpected format in {filepath}") return institutions def get_institution_key(inst: Dict[str, Any]) -> str: """Get unique key for institution (for deduplication).""" # Primary: id field if 'id' in inst and inst['id']: return inst['id'] # Secondary: GHCID if 'ghcid' in inst and inst['ghcid']: return f"ghcid:{inst['ghcid']}" # Fallback: name + country name = inst.get('name', 'unknown') country = 'unknown' if 'locations' in inst and inst['locations']: country = inst['locations'][0].get('country', 'unknown') return f"name:{country}:{name}" def has_wikidata(inst: Dict[str, Any]) -> bool: """Check if institution has Wikidata identifier.""" if 'identifiers' not in inst: return False for ident in inst['identifiers']: if ident.get('identifier_scheme') == 'Wikidata': return True return False def count_wikidata(institutions: List[Dict[str, Any]]) -> int: """Count institutions with Wikidata identifiers.""" return sum(1 for inst in institutions if has_wikidata(inst)) def is_more_enriched(new: Dict[str, Any], existing: Dict[str, Any]) -> bool: """Determine if new record is more enriched than existing.""" # Check Wikidata presence new_has_wd = has_wikidata(new) existing_has_wd = has_wikidata(existing) if new_has_wd and not existing_has_wd: return True if existing_has_wd and not new_has_wd: return False # Check enrichment history presence new_has_history = 'enrichment_history' in new.get('provenance', {}) existing_has_history = 'enrichment_history' in existing.get('provenance', {}) if new_has_history and not existing_has_history: return True if existing_has_history and not new_has_history: return False # Check number of identifiers new_id_count = len(new.get('identifiers', [])) existing_id_count = len(existing.get('identifiers', [])) if new_id_count > existing_id_count: return True # Default: keep existing return False def merge_institutions( unified: List[Dict[str, Any]], new_institutions: List[Dict[str, Any]], source_name: str ) -> tuple[List[Dict[str, Any]], Dict[str, Any]]: """ Merge new institutions into unified list. Returns: (merged_list, stats_dict) """ # Build index of existing institutions unified_index = {get_institution_key(inst): i for i, inst in enumerate(unified)} stats = { 'source': source_name, 'total_new': len(new_institutions), 'added': 0, 'duplicates_skipped': 0, 'duplicates_replaced': 0, } for new_inst in new_institutions: key = get_institution_key(new_inst) if key in unified_index: # Duplicate found - check which is more enriched existing_idx = unified_index[key] existing_inst = unified[existing_idx] if is_more_enriched(new_inst, existing_inst): # Replace with more enriched version unified[existing_idx] = new_inst stats['duplicates_replaced'] += 1 print(f" āœļø Replaced: {new_inst.get('name', 'unknown')} (more enriched)") else: # Keep existing stats['duplicates_skipped'] += 1 else: # New institution - add it unified.append(new_inst) unified_index[key] = len(unified) - 1 stats['added'] += 1 return unified, stats def create_backup(filepath: Path) -> Path: """Create timestamped backup of file.""" timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S") backup_path = filepath.parent / f"{filepath.stem}_backup_{timestamp}{filepath.suffix}" print(f"šŸ’¾ Creating backup: {backup_path.name}") import shutil shutil.copy2(filepath, backup_path) return backup_path def main(): print("=" * 80) print("MERGE ENRICHED DATASETS INTO UNIFIED GLOBAL DATABASE") print("=" * 80) print() # Parse command line arguments if len(sys.argv) > 1: country = sys.argv[1].lower() if country in ENRICHED_FILES: source_files = [ENRICHED_FILES[country]] print(f"šŸŒ Mode: Merging {country.upper()} only\n") else: print(f"āŒ Unknown country: {country}") print(f"Available countries: {', '.join(ENRICHED_FILES.keys())}") return 1 else: source_files = DEFAULT_SOURCE_FILES print(f"šŸŒ Mode: Merging all enriched datasets\n") # Step 1: Load unified database print("šŸ“š STEP 1: Load unified global database") print("-" * 80) unified_institutions = load_yaml_file(UNIFIED_DB) initial_count = len(unified_institutions) initial_wikidata = count_wikidata(unified_institutions) print(f"āœ… Loaded {initial_count:,} institutions ({initial_wikidata:,} with Wikidata, {initial_wikidata/initial_count*100:.1f}%)") print() # Step 2: Create backup print("šŸ’¾ STEP 2: Create backup") print("-" * 80) backup_path = create_backup(UNIFIED_DB) print(f"āœ… Backup created: {backup_path}") print() # Step 3: Load and merge source files print("šŸ“„ STEP 3: Load and merge source files") print("-" * 80) all_stats = [] for source_file in source_files: print(f"\nšŸ”„ Processing: {source_file.name}") try: new_institutions = load_yaml_file(source_file) new_wikidata = count_wikidata(new_institutions) print(f" Loaded: {len(new_institutions)} institutions ({new_wikidata} with Wikidata, {new_wikidata/len(new_institutions)*100:.1f}%)") unified_institutions, stats = merge_institutions( unified_institutions, new_institutions, source_file.name ) all_stats.append(stats) print(f" āœ… Added: {stats['added']}") print(f" ā­ļø Skipped duplicates: {stats['duplicates_skipped']}") print(f" āœļø Replaced (more enriched): {stats['duplicates_replaced']}") except Exception as e: print(f" āŒ ERROR: {e}") import traceback traceback.print_exc() continue print() print("-" * 80) # Step 4: Statistics print() print("šŸ“Š STEP 4: Statistics") print("-" * 80) final_count = len(unified_institutions) final_wikidata = count_wikidata(unified_institutions) total_added = sum(s['added'] for s in all_stats) total_replaced = sum(s['duplicates_replaced'] for s in all_stats) total_skipped = sum(s['duplicates_skipped'] for s in all_stats) print(f"Initial count: {initial_count:,} institutions") print(f"Final count: {final_count:,} institutions") print(f"Net change: +{final_count - initial_count:,} institutions") print() print(f"Added (new): {total_added:,}") print(f"Replaced (enriched): {total_replaced:,}") print(f"Skipped (duplicates): {total_skipped:,}") print() print(f"Wikidata coverage:") print(f" Before: {initial_wikidata:,} / {initial_count:,} ({initial_wikidata/initial_count*100:.1f}%)") print(f" After: {final_wikidata:,} / {final_count:,} ({final_wikidata/final_count*100:.1f}%)") print(f" Change: +{final_wikidata - initial_wikidata:,} Wikidata IDs") print() # Step 5: Save merged database print("šŸ’¾ STEP 5: Save merged database") print("-" * 80) print(f"Writing to: {UNIFIED_DB}") with open(UNIFIED_DB, 'w', encoding='utf-8') as f: yaml.dump(unified_institutions, f, default_flow_style=False, allow_unicode=True, sort_keys=False) print(f"āœ… Saved {final_count:,} institutions") print() # Final summary print("=" * 80) print("āœ… MERGE COMPLETE") print("=" * 80) print() print(f"Unified database: {UNIFIED_DB}") print(f"Backup location: {backup_path}") print(f"Total institutions: {initial_count:,} → {final_count:,} (+{final_count - initial_count:,})") print(f"Wikidata coverage: {initial_wikidata/initial_count*100:.1f}% → {final_wikidata/final_count*100:.1f}%") print() return 0 if __name__ == "__main__": sys.exit(main())