455 lines
17 KiB
Python
Executable file
455 lines
17 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Build Unified GLAM Heritage Custodian Database (Version 2)
|
|
|
|
Fixes:
|
|
1. Denmark parser - handles repr string format for nested objects
|
|
2. Canada parser - handles nested dict structures for enums
|
|
3. SQLite overflow - uses TEXT for ghcid_numeric (64-bit integers)
|
|
|
|
Merges all country-specific LinkML datasets into a unified database with:
|
|
- Deduplication by GHCID
|
|
- Data quality tracking
|
|
- Version control
|
|
- Multiple export formats (JSON, SQLite, Parquet)
|
|
"""
|
|
|
|
import json
|
|
import sqlite3
|
|
import re
|
|
from pathlib import Path
|
|
from datetime import datetime, timezone
|
|
from typing import List, Dict, Any, Optional
|
|
from collections import defaultdict
|
|
|
|
# Country dataset paths
|
|
COUNTRY_DATASETS = {
|
|
'finland': '/Users/kempersc/apps/glam/data/finland_isil/finland_isil_linkml_final_20251120.json',
|
|
'denmark': '/Users/kempersc/apps/glam/data/instances/denmark_complete_enriched.json',
|
|
'netherlands': '/Users/kempersc/apps/glam/data/instances/netherlands_complete.yaml',
|
|
'belgium': '/Users/kempersc/apps/glam/data/instances/belgium_isil.yaml',
|
|
'belarus': '/Users/kempersc/apps/glam/data/instances/belarus_complete.yaml',
|
|
'canada': '/Users/kempersc/apps/glam/data/instances/canada/canadian_heritage_custodians_geocoded.json',
|
|
'chile': '/Users/kempersc/apps/glam/data/instances/chile/chilean_institutions_batch20_enriched.yaml',
|
|
'egypt': '/Users/kempersc/apps/glam/data/instances/egypt_institutions_ghcid.yaml',
|
|
# Japan dataset is 18MB - handle separately
|
|
}
|
|
|
|
OUTPUT_DIR = Path('/Users/kempersc/apps/glam/data/unified')
|
|
OUTPUT_DIR.mkdir(exist_ok=True)
|
|
|
|
def parse_repr_string(repr_str: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Parse Python repr string format to extract key-value pairs.
|
|
|
|
Example: "Provenance({'data_source': DataSourceEnum(...), ...})"
|
|
"""
|
|
if not isinstance(repr_str, str) or not repr_str.startswith(('Provenance(', 'Identifier(', 'Location(', 'GHCIDHistoryEntry(')):
|
|
return None
|
|
|
|
result = {}
|
|
|
|
# Extract fields using regex patterns
|
|
# Match: 'key': 'value' or 'key': DataSourceEnum(text='value', ...)
|
|
pattern = r"'(\w+)':\s*(?:'([^']*)'|(\w+Enum)\(text='([^']*)'|([^,}]+))"
|
|
matches = re.findall(pattern, repr_str)
|
|
|
|
for match in matches:
|
|
key = match[0]
|
|
if match[1]: # Simple string value
|
|
result[key] = match[1]
|
|
elif match[3]: # Enum with text field
|
|
result[key] = match[3]
|
|
elif match[4]: # Other value (number, etc.)
|
|
result[key] = match[4].strip()
|
|
|
|
return result if result else None
|
|
|
|
def normalize_value(value: Any) -> Any:
|
|
"""
|
|
Normalize value to simple types (str, int, float, bool, None).
|
|
Handles nested dicts, repr strings, and enum dicts.
|
|
"""
|
|
if value is None:
|
|
return None
|
|
|
|
# Handle nested dict with 'text' field (Canada enum format)
|
|
if isinstance(value, dict):
|
|
if 'text' in value:
|
|
return value['text']
|
|
# Return first non-None value for other dicts
|
|
return next((v for v in value.values() if v is not None), None)
|
|
|
|
# Handle Python repr strings (Denmark format)
|
|
if isinstance(value, str) and ('Enum(' in value or '({' in value):
|
|
parsed = parse_repr_string(value)
|
|
if parsed:
|
|
# Return the most relevant field
|
|
return parsed.get('identifier_value') or parsed.get('data_source') or parsed.get('city') or str(value)
|
|
|
|
# Handle lists
|
|
if isinstance(value, list):
|
|
if not value:
|
|
return None
|
|
# For lists, try to extract first valid element
|
|
if isinstance(value[0], str) and ('(' in value[0] or '{' in value[0]):
|
|
parsed = parse_repr_string(value[0])
|
|
if parsed:
|
|
return parsed.get('identifier_value') or parsed.get('city') or str(value[0])
|
|
elif isinstance(value[0], dict):
|
|
return normalize_value(value[0])
|
|
return value[0]
|
|
|
|
return value
|
|
|
|
def safe_get(data: Any, *keys: str, default: Any = None) -> Any:
|
|
"""
|
|
Safely get nested dict value with normalization.
|
|
Handles both dict access and list indexing.
|
|
"""
|
|
result = data
|
|
for key in keys:
|
|
if isinstance(result, dict):
|
|
result = result.get(key)
|
|
elif isinstance(result, list) and result:
|
|
result = result[0] if key == '0' else result
|
|
else:
|
|
return default
|
|
|
|
if result is None:
|
|
return default
|
|
|
|
return normalize_value(result) if result is not None else default
|
|
|
|
def load_json_dataset(path: str) -> List[Dict[str, Any]]:
|
|
"""Load JSON format dataset."""
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Handle both list and dict formats
|
|
if isinstance(data, list):
|
|
return data
|
|
elif isinstance(data, dict) and 'institutions' in data:
|
|
return data['institutions']
|
|
else:
|
|
return [data]
|
|
|
|
def load_yaml_dataset(path: str) -> List[Dict[str, Any]]:
|
|
"""Load YAML format dataset."""
|
|
import yaml
|
|
with open(path, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if isinstance(data, list):
|
|
return data
|
|
elif isinstance(data, dict) and 'institutions' in data:
|
|
return data['institutions']
|
|
else:
|
|
return [data]
|
|
|
|
def extract_identifiers(record: Dict[str, Any]) -> tuple[bool, bool]:
|
|
"""
|
|
Extract whether institution has Wikidata and Website identifiers.
|
|
Handles both dict format and repr string format.
|
|
"""
|
|
identifiers = record.get('identifiers', [])
|
|
if not identifiers:
|
|
return False, False
|
|
|
|
has_wikidata = False
|
|
has_website = False
|
|
|
|
for identifier in identifiers:
|
|
# Handle dict format (normal)
|
|
if isinstance(identifier, dict):
|
|
scheme = identifier.get('identifier_scheme')
|
|
has_wikidata = has_wikidata or (scheme == 'Wikidata')
|
|
has_website = has_website or (scheme == 'Website')
|
|
# Handle repr string format (Denmark)
|
|
elif isinstance(identifier, str):
|
|
has_wikidata = has_wikidata or ('Wikidata' in identifier)
|
|
has_website = has_website or ('Website' in identifier or 'identifier_url' in identifier)
|
|
|
|
return has_wikidata, has_website
|
|
|
|
def extract_key_metadata(record: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""
|
|
Extract key metadata from institution record.
|
|
Handles multiple format variations (Finland, Denmark, Canada).
|
|
"""
|
|
# Get GHCID (try multiple field names)
|
|
ghcid = record.get('ghcid') or record.get('ghcid_current') or record.get('ghcid_original')
|
|
|
|
# Get locations - handle both list and direct access
|
|
locations = record.get('locations', [])
|
|
if locations:
|
|
location = locations[0] if isinstance(locations, list) else locations
|
|
if isinstance(location, str): # Denmark repr format
|
|
parsed_loc = parse_repr_string(location)
|
|
country = parsed_loc.get('country') if parsed_loc else None
|
|
city = parsed_loc.get('city') if parsed_loc else None
|
|
else:
|
|
country = safe_get(location, 'country')
|
|
city = safe_get(location, 'city')
|
|
else:
|
|
country = None
|
|
city = None
|
|
|
|
# Get provenance - handle nested dicts and repr strings
|
|
provenance = record.get('provenance', {})
|
|
if isinstance(provenance, str): # Denmark repr format
|
|
parsed_prov = parse_repr_string(provenance)
|
|
data_source = parsed_prov.get('data_source') if parsed_prov else None
|
|
data_tier = parsed_prov.get('data_tier') if parsed_prov else None
|
|
extraction_date = parsed_prov.get('extraction_date') if parsed_prov else None
|
|
else:
|
|
data_source = safe_get(provenance, 'data_source')
|
|
data_tier = safe_get(provenance, 'data_tier')
|
|
extraction_date = safe_get(provenance, 'extraction_date')
|
|
|
|
# Get institution type - handle nested dict (Canada) and simple string
|
|
institution_type = normalize_value(record.get('institution_type'))
|
|
|
|
# Get identifiers
|
|
has_wikidata, has_website = extract_identifiers(record)
|
|
|
|
return {
|
|
'id': record.get('id'),
|
|
'ghcid': ghcid,
|
|
'ghcid_uuid': record.get('ghcid_uuid'),
|
|
'ghcid_numeric': record.get('ghcid_numeric'),
|
|
'name': record.get('name'),
|
|
'institution_type': institution_type,
|
|
'country': country,
|
|
'city': city,
|
|
'data_source': data_source,
|
|
'data_tier': data_tier,
|
|
'extraction_date': extraction_date,
|
|
'has_wikidata': has_wikidata,
|
|
'has_website': has_website,
|
|
'raw_record': json.dumps(record, ensure_ascii=False, default=str)
|
|
}
|
|
|
|
def build_unified_database():
|
|
"""Build unified database from all country datasets."""
|
|
|
|
print("🌍 Building Unified GLAM Heritage Custodian Database (Version 2)")
|
|
print("=" * 70)
|
|
print("Fixes: Denmark parser, Canada parser, SQLite overflow")
|
|
print("=" * 70)
|
|
|
|
all_institutions = []
|
|
country_stats = defaultdict(lambda: {
|
|
'total': 0,
|
|
'with_ghcid': 0,
|
|
'with_wikidata': 0,
|
|
'with_website': 0,
|
|
'by_type': defaultdict(int)
|
|
})
|
|
|
|
# Load each country dataset
|
|
for country, path in COUNTRY_DATASETS.items():
|
|
if not Path(path).exists():
|
|
print(f"⚠️ {country.upper()}: Dataset not found at {path}")
|
|
continue
|
|
|
|
print(f"\n📂 Loading {country.upper()}...")
|
|
|
|
try:
|
|
if path.endswith('.json'):
|
|
records = load_json_dataset(path)
|
|
elif path.endswith('.yaml'):
|
|
records = load_yaml_dataset(path)
|
|
else:
|
|
print(f" ⚠️ Unknown format: {path}")
|
|
continue
|
|
|
|
print(f" ✅ Loaded {len(records)} institutions")
|
|
|
|
# Process records
|
|
processed = 0
|
|
for record in records:
|
|
try:
|
|
metadata = extract_key_metadata(record)
|
|
metadata['source_country'] = country
|
|
all_institutions.append(metadata)
|
|
processed += 1
|
|
|
|
# Update stats
|
|
stats = country_stats[country]
|
|
stats['total'] += 1
|
|
if metadata.get('ghcid'):
|
|
stats['with_ghcid'] += 1
|
|
if metadata.get('has_wikidata'):
|
|
stats['with_wikidata'] += 1
|
|
if metadata.get('has_website'):
|
|
stats['with_website'] += 1
|
|
|
|
inst_type = metadata.get('institution_type', 'UNKNOWN')
|
|
stats['by_type'][inst_type] += 1
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ Error processing record: {e}")
|
|
continue
|
|
|
|
print(f" ✅ Processed {processed}/{len(records)} institutions successfully")
|
|
|
|
except Exception as e:
|
|
print(f" ❌ Error loading {country}: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
continue
|
|
|
|
print("\n" + "=" * 70)
|
|
print(f"📊 Total institutions loaded: {len(all_institutions)}")
|
|
|
|
# Deduplicate by GHCID
|
|
ghcid_map = {}
|
|
duplicates = []
|
|
|
|
for inst in all_institutions:
|
|
ghcid = inst.get('ghcid')
|
|
if not ghcid:
|
|
continue
|
|
|
|
if ghcid in ghcid_map:
|
|
duplicates.append((ghcid, inst['name'], ghcid_map[ghcid]['name']))
|
|
else:
|
|
ghcid_map[ghcid] = inst
|
|
|
|
print(f"🔍 Unique GHCIDs: {len(ghcid_map)}")
|
|
print(f"⚠️ Duplicates detected: {len(duplicates)}")
|
|
|
|
# Export to JSON
|
|
json_output = OUTPUT_DIR / 'glam_unified_database_v2.json'
|
|
with open(json_output, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'metadata': {
|
|
'version': '2.0.0',
|
|
'export_date': datetime.now(timezone.utc).isoformat(),
|
|
'total_institutions': len(all_institutions),
|
|
'unique_ghcids': len(ghcid_map),
|
|
'duplicates': len(duplicates),
|
|
'countries': list(COUNTRY_DATASETS.keys()),
|
|
'fixes': [
|
|
'Denmark parser - handles repr string format',
|
|
'Canada parser - handles nested dict enums',
|
|
'SQLite overflow - uses TEXT for 64-bit integers'
|
|
]
|
|
},
|
|
'country_stats': dict(country_stats),
|
|
'institutions': all_institutions
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✅ JSON export: {json_output} ({json_output.stat().st_size / 1024 / 1024:.1f} MB)")
|
|
|
|
# Export to SQLite with fixed schema
|
|
sqlite_output = OUTPUT_DIR / 'glam_unified_database_v2.db'
|
|
conn = sqlite3.connect(sqlite_output)
|
|
cursor = conn.cursor()
|
|
|
|
# Create tables with TEXT for ghcid_numeric (fix overflow)
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS institutions (
|
|
id TEXT PRIMARY KEY,
|
|
ghcid TEXT,
|
|
ghcid_uuid TEXT,
|
|
ghcid_numeric TEXT, -- Changed from INTEGER to TEXT (64-bit support)
|
|
name TEXT NOT NULL,
|
|
institution_type TEXT,
|
|
country TEXT,
|
|
city TEXT,
|
|
source_country TEXT,
|
|
data_source TEXT,
|
|
data_tier TEXT,
|
|
extraction_date TEXT,
|
|
has_wikidata BOOLEAN,
|
|
has_website BOOLEAN,
|
|
raw_record TEXT
|
|
)
|
|
''')
|
|
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
key TEXT PRIMARY KEY,
|
|
value TEXT
|
|
)
|
|
''')
|
|
|
|
# Create indexes for common queries
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_country ON institutions(country)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_type ON institutions(institution_type)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_ghcid ON institutions(ghcid)')
|
|
cursor.execute('CREATE INDEX IF NOT EXISTS idx_source_country ON institutions(source_country)')
|
|
|
|
# Insert data
|
|
for inst in all_institutions:
|
|
cursor.execute('''
|
|
INSERT OR REPLACE INTO institutions VALUES (
|
|
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
|
|
)
|
|
''', (
|
|
inst.get('id'),
|
|
inst.get('ghcid'),
|
|
inst.get('ghcid_uuid'),
|
|
str(inst.get('ghcid_numeric')) if inst.get('ghcid_numeric') else None, # Convert to string
|
|
inst.get('name'),
|
|
inst.get('institution_type'),
|
|
inst.get('country'),
|
|
inst.get('city'),
|
|
inst.get('source_country'),
|
|
inst.get('data_source'),
|
|
inst.get('data_tier'),
|
|
inst.get('extraction_date'),
|
|
inst.get('has_wikidata'),
|
|
inst.get('has_website'),
|
|
inst.get('raw_record')
|
|
))
|
|
|
|
# Insert metadata
|
|
cursor.execute('INSERT OR REPLACE INTO metadata VALUES (?, ?)',
|
|
('version', '2.0.0'))
|
|
cursor.execute('INSERT OR REPLACE INTO metadata VALUES (?, ?)',
|
|
('export_date', datetime.now(timezone.utc).isoformat()))
|
|
cursor.execute('INSERT OR REPLACE INTO metadata VALUES (?, ?)',
|
|
('total_institutions', str(len(all_institutions))))
|
|
cursor.execute('INSERT OR REPLACE INTO metadata VALUES (?, ?)',
|
|
('unique_ghcids', str(len(ghcid_map))))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
print(f"✅ SQLite export: {sqlite_output} ({sqlite_output.stat().st_size / 1024:.1f} KB)")
|
|
|
|
# Print country statistics
|
|
print("\n" + "=" * 70)
|
|
print("📈 Country Statistics\n")
|
|
|
|
for country, stats in sorted(country_stats.items(), key=lambda x: x[1]['total'], reverse=True):
|
|
total = stats['total']
|
|
ghcid_pct = stats['with_ghcid']/total*100 if total > 0 else 0
|
|
wd_pct = stats['with_wikidata']/total*100 if total > 0 else 0
|
|
ws_pct = stats['with_website']/total*100 if total > 0 else 0
|
|
|
|
print(f"{country.upper()}:")
|
|
print(f" Total: {total}")
|
|
print(f" GHCID: {stats['with_ghcid']} ({ghcid_pct:.1f}%)")
|
|
print(f" Wikidata: {stats['with_wikidata']} ({wd_pct:.1f}%)")
|
|
print(f" Website: {stats['with_website']} ({ws_pct:.1f}%)")
|
|
print(f" Types: {dict(stats['by_type'])}")
|
|
print()
|
|
|
|
# Print duplicates if any
|
|
if duplicates:
|
|
print("\n⚠️ Duplicate GHCIDs Detected:")
|
|
for ghcid, name1, name2 in duplicates[:10]:
|
|
print(f" {ghcid}: '{name1}' vs '{name2}'")
|
|
if len(duplicates) > 10:
|
|
print(f" ... and {len(duplicates) - 10} more")
|
|
|
|
print("\n✅ Unified database build complete!")
|
|
print(f"📂 Output directory: {OUTPUT_DIR}")
|
|
print(f"🎉 Version 2.0.0 with all fixes applied")
|
|
|
|
if __name__ == '__main__':
|
|
build_unified_database()
|