glam/scripts/extract_isil_from_wikidata.py
2025-12-09 09:16:19 +01:00

248 lines
7.7 KiB
Python

#!/usr/bin/env python3
"""
Extract ISIL codes from Wikidata for Czech institutions.
During Wikidata enrichment, we queried ISIL codes but didn't systematically
extract them. This script queries Wikidata again for institutions that have
Q-numbers but are missing ISIL codes.
Process:
1. Load czech_unified.yaml (8,694 institutions)
2. Filter institutions WITH Wikidata Q-numbers BUT WITHOUT ISIL codes
3. Query Wikidata for ISIL codes (P791 property)
4. Add ISIL identifiers to records
5. Save to czech_unified_isil.yaml
Expected: 306 ISIL codes available in Wikidata (from previous SPARQL query)
"""
import yaml
import requests
from typing import List, Dict, Optional
from datetime import datetime, timezone
# Wikidata SPARQL endpoint
WIKIDATA_SPARQL = "https://query.wikidata.org/sparql"
def get_isil_codes_batch(qids: List[str]) -> Dict[str, str]:
"""
Query Wikidata for ISIL codes for a batch of Q-numbers.
Args:
qids: List of Wikidata Q-numbers (e.g., ['Q642884', 'Q1144653'])
Returns:
Dict mapping Q-number to ISIL code
"""
if not qids:
return {}
# Build VALUES clause for SPARQL
qid_values = " ".join(f"wd:{qid}" for qid in qids)
query = f"""
SELECT ?item ?isil WHERE {{
VALUES ?item {{ {qid_values} }}
?item wdt:P791 ?isil .
}}
"""
headers = {
'User-Agent': 'GLAM-Data-Extraction/0.2.0 (ISIL code enrichment)',
'Accept': 'application/sparql-results+json'
}
try:
response = requests.get(
WIKIDATA_SPARQL,
params={'query': query},
headers=headers,
timeout=30
)
response.raise_for_status()
data = response.json()
# Parse results
isil_map = {}
for binding in data['results']['bindings']:
qid = binding['item']['value'].split('/')[-1]
isil_code = binding['isil']['value']
isil_map[qid] = isil_code
return isil_map
except Exception as e:
print(f"Error querying Wikidata: {e}")
return {}
def extract_isil_codes():
"""Main extraction workflow."""
print("="*80)
print("CZECH INSTITUTIONS - ISIL CODE EXTRACTION FROM WIKIDATA")
print("="*80)
print()
# Load unified dataset
print("Loading czech_unified.yaml...")
with open('data/instances/czech_unified.yaml', 'r', encoding='utf-8') as f:
institutions = yaml.safe_load(f)
print(f"Loaded {len(institutions)} institutions")
# Filter institutions with Wikidata Q-numbers but no ISIL codes
needs_isil = []
has_isil = 0
has_wikidata = 0
for inst in institutions:
# Check if has Wikidata Q-number
wikidata_qid = None
for identifier in inst.get('identifiers', []):
if identifier.get('identifier_scheme') == 'Wikidata':
wikidata_qid = identifier.get('identifier_value')
has_wikidata += 1
break
if not wikidata_qid:
continue
# Check if already has ISIL code
has_isil_code = any(
i.get('identifier_scheme') == 'ISIL'
for i in inst.get('identifiers', [])
)
if has_isil_code:
has_isil += 1
else:
needs_isil.append({
'institution': inst,
'qid': wikidata_qid
})
print(f"Institutions with Wikidata Q-numbers: {has_wikidata}")
print(f"Institutions with ISIL codes (before): {has_isil}")
print(f"Institutions needing ISIL codes: {len(needs_isil)}")
print()
if not needs_isil:
print("No institutions need ISIL codes. Exiting.")
return
# Query Wikidata in batches (50 Q-numbers at a time to avoid URL length limits)
print("Querying Wikidata for ISIL codes...")
batch_size = 50
all_isil_codes = {}
for i in range(0, len(needs_isil), batch_size):
batch = needs_isil[i:i+batch_size]
qids = [item['qid'] for item in batch]
print(f" Querying batch {i//batch_size + 1}/{(len(needs_isil)-1)//batch_size + 1} ({len(qids)} Q-numbers)...")
batch_isil = get_isil_codes_batch(qids)
all_isil_codes.update(batch_isil)
print(f"\nFound {len(all_isil_codes)} ISIL codes in Wikidata")
print()
# Add ISIL codes to institutions
added_count = 0
for item in needs_isil:
inst = item['institution']
qid = item['qid']
if qid in all_isil_codes:
isil_code = all_isil_codes[qid]
# Add ISIL identifier
if 'identifiers' not in inst:
inst['identifiers'] = []
inst['identifiers'].append({
'identifier_scheme': 'ISIL',
'identifier_value': isil_code,
# ISIL codes don't have a universal URL
})
# Update provenance
if 'enrichment_history' not in inst['provenance']:
inst['provenance']['enrichment_history'] = []
inst['provenance']['enrichment_history'].append({
'enrichment_date': datetime.now(timezone.utc).isoformat(),
'enrichment_method': 'Wikidata ISIL code extraction (P791 property)',
'match_score': 100.0, # Direct extraction, no fuzzy matching
'verified': True
})
added_count += 1
print(f"✅ Added {added_count} ISIL codes to institutions")
print()
# Statistics
final_isil_count = has_isil + added_count
total = len(institutions)
print("="*80)
print("ISIL CODE COVERAGE (BEFORE → AFTER)")
print("="*80)
print(f"Before: {has_isil:5} / {total} ({has_isil/total*100:5.1f}%)")
print(f"After: {final_isil_count:5} / {total} ({final_isil_count/total*100:5.1f}%)")
print(f"Increase: +{added_count} ISIL codes (+{(final_isil_count-has_isil)/total*100:.1f}% coverage)")
print()
# Save enriched dataset
output_path = 'data/instances/czech_unified_isil.yaml'
print(f"Saving enriched dataset to {output_path}...")
with open(output_path, 'w', encoding='utf-8') as f:
yaml.dump(
institutions,
f,
allow_unicode=True,
sort_keys=False,
default_flow_style=False,
width=100
)
print(f"✅ Saved {len(institutions)} institutions")
print()
# Sample institutions with ISIL codes
print("="*80)
print("SAMPLE INSTITUTIONS WITH ISIL CODES")
print("="*80)
sample_count = 0
for inst in institutions:
isil_codes = [
i.get('identifier_value')
for i in inst.get('identifiers', [])
if i.get('identifier_scheme') == 'ISIL'
]
if isil_codes:
print(f"{inst['name'][:60]:60} | ISIL: {isil_codes[0]}")
sample_count += 1
if sample_count >= 20:
break
print()
print("="*80)
print("EXTRACTION COMPLETE")
print("="*80)
print(f"Total institutions: {len(institutions)}")
print(f"With ISIL codes: {final_isil_count} ({final_isil_count/total*100:.1f}%)")
print(f"With Wikidata Q-numbers: {has_wikidata} ({has_wikidata/total*100:.1f}%)")
print(f"With GPS coordinates: {sum(1 for i in institutions if i.get('locations') and i['locations'][0].get('latitude'))} ({sum(1 for i in institutions if i.get('locations') and i['locations'][0].get('latitude'))/total*100:.1f}%)")
print()
print("Next step: Replace czech_unified.yaml with czech_unified_isil.yaml")
if __name__ == '__main__':
extract_isil_codes()