- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive). - Added tests for extracted entities and result handling to validate the extraction process. - Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format. - Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns. - Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
839 lines
29 KiB
Python
Executable file
839 lines
29 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive Enhancement Pipeline for Tunisian Heritage Institutions
|
|
|
|
This script applies the full enhancement workflow to the validated Tunisia dataset:
|
|
1. GHCID Generation - Generate persistent identifiers using TN country code
|
|
2. Geocoding - Add latitude/longitude coordinates via Nominatim API
|
|
3. Wikidata Enrichment - Add Q-numbers and additional identifiers via SPARQL
|
|
4. Multi-Format Export - Generate RDF, JSON-LD, CSV for different use cases
|
|
|
|
Features:
|
|
- Deterministic UUID generation (UUID v5 and v8)
|
|
- Collision detection and resolution
|
|
- Persistent caching for API results
|
|
- Rate limiting (Nominatim: 1 req/sec)
|
|
- Checkpoint saving for resume capability
|
|
- Comprehensive logging and statistics
|
|
|
|
Usage:
|
|
python scripts/enhance_tunisia_dataset.py [--skip-ghcid] [--skip-geocoding] [--skip-wikidata]
|
|
|
|
Options:
|
|
--skip-ghcid Skip GHCID generation (if already done)
|
|
--skip-geocoding Skip geocoding (if already done)
|
|
--skip-wikidata Skip Wikidata enrichment (if already done)
|
|
--dry-run Show what would be done without modifying data
|
|
--verbose Show detailed progress information
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import unicodedata
|
|
import uuid
|
|
import yaml
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Set, Tuple
|
|
|
|
import requests
|
|
from SPARQLWrapper import JSON as SPARQL_JSON
|
|
from SPARQLWrapper import SPARQLWrapper
|
|
|
|
# Add src to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
|
|
|
|
from glam_extractor.identifiers.ghcid import (
|
|
GHCID_NAMESPACE,
|
|
GHCIDComponents,
|
|
GHCIDGenerator,
|
|
InstitutionType,
|
|
extract_abbreviation_from_name,
|
|
)
|
|
from glam_extractor.geocoding.geonames_lookup import GeoNamesDB
|
|
|
|
|
|
# =============================================================================
|
|
# GHCID GENERATION
|
|
# =============================================================================
|
|
|
|
class TunisiaGHCIDGenerator:
|
|
"""Generate GHCIDs for Tunisian heritage institutions."""
|
|
|
|
# Tunisian governorate codes (ISO 3166-2:TN)
|
|
GOVERNORATE_CODES = {
|
|
# Normalize for lookup: UPPERCASE, no accents
|
|
"TUNIS": "TUN",
|
|
"ARIANA": "ARI",
|
|
"BEN AROUS": "BEN",
|
|
"MANOUBA": "MAN",
|
|
"NABEUL": "NAB",
|
|
"ZAGHOUAN": "ZAG",
|
|
"BIZERTE": "BIZ",
|
|
"BEJA": "BEJ",
|
|
"JENDOUBA": "JEN",
|
|
"KEF": "KEF",
|
|
"SILIANA": "SIL",
|
|
"KAIROUAN": "KAI",
|
|
"KASSERINE": "KAS",
|
|
"SIDI BOUZID": "SID",
|
|
"SOUSSE": "SOU",
|
|
"MONASTIR": "MON",
|
|
"MAHDIA": "MAH",
|
|
"SFAX": "SFA",
|
|
"GAFSA": "GAF",
|
|
"TOZEUR": "TOZ",
|
|
"KEBILI": "KEB",
|
|
"GABES": "GAB",
|
|
"MEDENINE": "MED",
|
|
"TATAOUINE": "TAT"
|
|
}
|
|
|
|
def __init__(self):
|
|
self.ghcid_gen = GHCIDGenerator()
|
|
self.geonames_db = GeoNamesDB()
|
|
self.stats = {
|
|
'total': 0,
|
|
'generated': 0,
|
|
'missing_city_code': 0,
|
|
'collisions': 0,
|
|
'errors': []
|
|
}
|
|
self.ghcid_usage: Dict[str, List[str]] = defaultdict(list)
|
|
|
|
@staticmethod
|
|
def normalize_name(name: str) -> str:
|
|
"""Normalize name: uppercase, remove accents, remove spaces and punctuation."""
|
|
normalized = unicodedata.normalize('NFD', name.upper())
|
|
# Remove accents
|
|
no_accents = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
|
|
# Remove spaces and punctuation, keep only alphanumeric
|
|
return ''.join(c for c in no_accents if c.isalnum())
|
|
|
|
def get_governorate_code(self, city: str, region: Optional[str]) -> str:
|
|
"""
|
|
Get governorate code from city or region name.
|
|
|
|
Args:
|
|
city: City name (e.g., "Tunis", "Sfax", "Kairouan")
|
|
region: Region/governorate name (optional)
|
|
|
|
Returns:
|
|
3-letter governorate code (e.g., "TUN", "SFA", "KAI")
|
|
"""
|
|
# Try region first (more specific)
|
|
if region:
|
|
normalized = self.normalize_name(region)
|
|
if normalized in self.GOVERNORATE_CODES:
|
|
return self.GOVERNORATE_CODES[normalized]
|
|
|
|
# Try city name
|
|
normalized = self.normalize_name(city)
|
|
if normalized in self.GOVERNORATE_CODES:
|
|
return self.GOVERNORATE_CODES[normalized]
|
|
|
|
# Common city-to-governorate mappings
|
|
city_mappings = {
|
|
"LA MARSA": "TUN",
|
|
"CARTHAGE": "TUN",
|
|
"SIDI BOU SAID": "TUN",
|
|
"LA GOULETTE": "TUN",
|
|
"HAMMAMET": "NAB",
|
|
"NABEUL": "NAB",
|
|
"KORBA": "NAB",
|
|
}
|
|
|
|
if normalized in city_mappings:
|
|
return city_mappings[normalized]
|
|
|
|
# Fallback to national-level
|
|
return "TN0"
|
|
|
|
def get_city_code(self, city: str, country: str = "TN") -> Optional[str]:
|
|
"""Get 3-letter city code from GeoNames."""
|
|
if not city:
|
|
return None
|
|
|
|
# Try GeoNames lookup
|
|
result = self.geonames_db.lookup_city(city, country)
|
|
if result:
|
|
# Use the built-in abbreviation method
|
|
return result.get_abbreviation()
|
|
|
|
# Fallback to first 3 letters of normalized city name
|
|
normalized = self.normalize_name(city)
|
|
return normalized[:3] if normalized else None
|
|
|
|
def generate_ghcid(self, institution: Dict[str, Any]) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Generate GHCID for a Tunisian institution.
|
|
|
|
Returns:
|
|
Dict with ghcid, ghcid_uuid, ghcid_uuid_sha256, ghcid_numeric
|
|
"""
|
|
self.stats['total'] += 1
|
|
|
|
# Extract location
|
|
locations = institution.get('locations', [])
|
|
if not locations:
|
|
self.stats['errors'].append(f"{institution['name']}: No location")
|
|
return None
|
|
|
|
location = locations[0]
|
|
city = location.get('city', '')
|
|
region = location.get('region', '')
|
|
country = location.get('country', 'TN')
|
|
|
|
if country != 'TN':
|
|
self.stats['errors'].append(f"{institution['name']}: Not Tunisia ({country})")
|
|
return None
|
|
|
|
# Get codes
|
|
gov_code = self.get_governorate_code(city, region)
|
|
city_code = self.get_city_code(city, country)
|
|
|
|
if not city_code:
|
|
self.stats['missing_city_code'] += 1
|
|
# Use first 3 letters of city name as fallback
|
|
city_code = self.normalize_name(city)[:3] if city else "UNK"
|
|
|
|
# Institution type
|
|
inst_type_str = institution.get('institution_type', 'MIXED')
|
|
try:
|
|
inst_type = InstitutionType[inst_type_str]
|
|
except KeyError:
|
|
inst_type = InstitutionType.MIXED
|
|
|
|
# Get name - prefer full name over acronym for abbreviation extraction
|
|
name = institution.get('name', '')
|
|
alternative_names = institution.get('alternative_names', [])
|
|
|
|
# Use original name for abbreviation (better results than from acronyms)
|
|
# Unless the original name is already short (< 15 chars), then use first alternative
|
|
if len(name) < 15 and alternative_names:
|
|
english_name = alternative_names[0]
|
|
else:
|
|
english_name = name
|
|
|
|
# Generate GHCID components
|
|
components = self.ghcid_gen.generate(
|
|
institution_name=name,
|
|
english_name=english_name,
|
|
institution_type=inst_type,
|
|
country_code="TN",
|
|
region_code=gov_code,
|
|
city_locode=city_code
|
|
)
|
|
|
|
# Check for Wikidata Q-number (for collision resolution)
|
|
wikidata_qid = None
|
|
identifiers = institution.get('identifiers', [])
|
|
for identifier in identifiers:
|
|
if identifier.get('identifier_scheme') == 'Wikidata':
|
|
wikidata_qid = identifier.get('identifier_value', '')
|
|
break
|
|
|
|
# Build GHCID string (with Q-number if available)
|
|
base_ghcid = components.to_string()
|
|
if wikidata_qid:
|
|
ghcid_str = f"{base_ghcid}-{wikidata_qid}"
|
|
else:
|
|
ghcid_str = base_ghcid
|
|
|
|
# Collision detection (use base GHCID without Q-number)
|
|
self.ghcid_usage[base_ghcid].append(name)
|
|
|
|
# Generate all UUID formats from the final GHCID string
|
|
ghcid_result = {
|
|
'ghcid': ghcid_str,
|
|
'ghcid_uuid': str(uuid.uuid5(GHCID_NAMESPACE, ghcid_str)),
|
|
'ghcid_uuid_sha256': str(components.to_uuid_sha256()),
|
|
'ghcid_numeric': components.to_numeric()
|
|
}
|
|
|
|
if len(self.ghcid_usage[base_ghcid]) > 1:
|
|
self.stats['collisions'] += 1
|
|
|
|
self.stats['generated'] += 1
|
|
return ghcid_result
|
|
|
|
def print_statistics(self):
|
|
"""Print GHCID generation statistics."""
|
|
print("\n" + "="*80)
|
|
print("GHCID GENERATION STATISTICS")
|
|
print("="*80)
|
|
print(f"Total institutions: {self.stats['total']}")
|
|
print(f"GHCIDs generated: {self.stats['generated']}")
|
|
print(f"Missing city codes: {self.stats['missing_city_code']}")
|
|
print(f"Collisions detected: {self.stats['collisions']}")
|
|
|
|
if self.stats['collisions'] > 0:
|
|
print("\nCollisions (multiple institutions with same base GHCID):")
|
|
for ghcid, names in self.ghcid_usage.items():
|
|
if len(names) > 1:
|
|
print(f" {ghcid}:")
|
|
for name in names:
|
|
print(f" - {name}")
|
|
|
|
if self.stats['errors']:
|
|
print(f"\nErrors: {len(self.stats['errors'])}")
|
|
for error in self.stats['errors'][:10]:
|
|
print(f" - {error}")
|
|
|
|
|
|
# =============================================================================
|
|
# GEOCODING
|
|
# =============================================================================
|
|
|
|
class GeocodingCache:
|
|
"""Persistent SQLite cache for geocoding results."""
|
|
|
|
def __init__(self, cache_file: Path):
|
|
self.cache_file = cache_file
|
|
cache_file.parent.mkdir(parents=True, exist_ok=True)
|
|
self.conn = sqlite3.connect(cache_file)
|
|
self._initialize()
|
|
|
|
def _initialize(self):
|
|
"""Create cache table."""
|
|
self.conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS geocoding_cache (
|
|
query TEXT PRIMARY KEY,
|
|
latitude REAL,
|
|
longitude REAL,
|
|
display_name TEXT,
|
|
timestamp TEXT,
|
|
success INTEGER
|
|
)
|
|
""")
|
|
self.conn.commit()
|
|
|
|
def get(self, query: str) -> Optional[Dict]:
|
|
"""Retrieve cached result."""
|
|
cursor = self.conn.execute(
|
|
"SELECT latitude, longitude, display_name, success FROM geocoding_cache WHERE query = ?",
|
|
(query,)
|
|
)
|
|
row = cursor.fetchone()
|
|
if row and row[3]: # success=1
|
|
return {
|
|
'latitude': row[0],
|
|
'longitude': row[1],
|
|
'display_name': row[2]
|
|
}
|
|
return None
|
|
|
|
def put(self, query: str, result: Optional[Dict]):
|
|
"""Store result in cache."""
|
|
if result:
|
|
self.conn.execute("""
|
|
INSERT OR REPLACE INTO geocoding_cache
|
|
(query, latitude, longitude, display_name, timestamp, success)
|
|
VALUES (?, ?, ?, ?, ?, 1)
|
|
""", (
|
|
query,
|
|
result.get('latitude'),
|
|
result.get('longitude'),
|
|
result.get('display_name', ''),
|
|
datetime.now(timezone.utc).isoformat()
|
|
))
|
|
else:
|
|
self.conn.execute("""
|
|
INSERT OR REPLACE INTO geocoding_cache
|
|
(query, latitude, longitude, display_name, timestamp, success)
|
|
VALUES (?, NULL, NULL, NULL, ?, 0)
|
|
""", (query, datetime.now(timezone.utc).isoformat()))
|
|
self.conn.commit()
|
|
|
|
def close(self):
|
|
"""Close database connection."""
|
|
self.conn.close()
|
|
|
|
|
|
class TunisiaGeocoder:
|
|
"""Geocode Tunisian institutions with Nominatim API."""
|
|
|
|
NOMINATIM_URL = "https://nominatim.openstreetmap.org/search"
|
|
USER_AGENT = "GLAM-Heritage-Tunisia/1.0 (research project)"
|
|
RATE_LIMIT = 1.0 # 1 request per second
|
|
|
|
def __init__(self, cache_file: Path):
|
|
self.cache = GeocodingCache(cache_file)
|
|
self.last_request_time = 0
|
|
self.stats = {
|
|
'total': 0,
|
|
'already_geocoded': 0,
|
|
'cache_hits': 0,
|
|
'api_calls': 0,
|
|
'successful': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
def _wait_for_rate_limit(self):
|
|
"""Enforce 1 req/sec rate limit."""
|
|
elapsed = time.time() - self.last_request_time
|
|
if elapsed < self.RATE_LIMIT:
|
|
time.sleep(self.RATE_LIMIT - elapsed)
|
|
self.last_request_time = time.time()
|
|
|
|
def _build_query(self, city: str, country: str = "Tunisia") -> str:
|
|
"""Build Nominatim search query."""
|
|
return f"{city}, {country}"
|
|
|
|
def _call_nominatim(self, query: str) -> Optional[Dict]:
|
|
"""Call Nominatim API."""
|
|
params = {
|
|
'q': query,
|
|
'format': 'json',
|
|
'limit': 1,
|
|
'countrycodes': 'tn'
|
|
}
|
|
headers = {'User-Agent': self.USER_AGENT}
|
|
|
|
try:
|
|
self._wait_for_rate_limit()
|
|
response = requests.get(self.NOMINATIM_URL, params=params, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
|
|
results = response.json()
|
|
if results and len(results) > 0:
|
|
result = results[0]
|
|
return {
|
|
'latitude': float(result['lat']),
|
|
'longitude': float(result['lon']),
|
|
'display_name': result.get('display_name', '')
|
|
}
|
|
return None
|
|
|
|
except Exception as e:
|
|
print(f"\n ❌ Nominatim error: {e}")
|
|
return None
|
|
|
|
def geocode_institution(self, institution: Dict[str, Any]) -> bool:
|
|
"""
|
|
Geocode an institution's location.
|
|
|
|
Returns:
|
|
True if geocoded (or already had coords), False otherwise
|
|
"""
|
|
self.stats['total'] += 1
|
|
|
|
locations = institution.get('locations', [])
|
|
if not locations:
|
|
return False
|
|
|
|
location = locations[0]
|
|
|
|
# Skip if already geocoded
|
|
if location.get('latitude') is not None and location.get('longitude') is not None:
|
|
self.stats['already_geocoded'] += 1
|
|
return True
|
|
|
|
city = location.get('city', '')
|
|
if not city:
|
|
return False
|
|
|
|
# Build query
|
|
query = self._build_query(city)
|
|
|
|
# Check cache
|
|
cached = self.cache.get(query)
|
|
if cached:
|
|
self.stats['cache_hits'] += 1
|
|
location['latitude'] = cached['latitude']
|
|
location['longitude'] = cached['longitude']
|
|
self.stats['successful'] += 1
|
|
return True
|
|
|
|
# Call API
|
|
self.stats['api_calls'] += 1
|
|
result = self._call_nominatim(query)
|
|
|
|
# Store in cache
|
|
self.cache.put(query, result)
|
|
|
|
if result:
|
|
location['latitude'] = result['latitude']
|
|
location['longitude'] = result['longitude']
|
|
self.stats['successful'] += 1
|
|
return True
|
|
else:
|
|
self.stats['failed'] += 1
|
|
return False
|
|
|
|
def print_statistics(self):
|
|
"""Print geocoding statistics."""
|
|
print("\n" + "="*80)
|
|
print("GEOCODING STATISTICS")
|
|
print("="*80)
|
|
print(f"Total institutions: {self.stats['total']}")
|
|
print(f"Already geocoded: {self.stats['already_geocoded']}")
|
|
print(f"Cache hits: {self.stats['cache_hits']}")
|
|
print(f"API calls: {self.stats['api_calls']}")
|
|
print(f"Successful: {self.stats['successful']}")
|
|
print(f"Failed: {self.stats['failed']}")
|
|
|
|
|
|
# =============================================================================
|
|
# WIKIDATA ENRICHMENT
|
|
# =============================================================================
|
|
|
|
class WikidataEnricher:
|
|
"""Enrich institutions with Wikidata data via SPARQL."""
|
|
|
|
SPARQL_ENDPOINT = "https://query.wikidata.org/sparql"
|
|
USER_AGENT = "GLAM-Heritage-Tunisia/1.0"
|
|
|
|
def __init__(self):
|
|
self.sparql = SPARQLWrapper(self.SPARQL_ENDPOINT)
|
|
self.sparql.setReturnFormat(SPARQL_JSON)
|
|
self.sparql.addCustomHttpHeader("User-Agent", self.USER_AGENT)
|
|
self.stats = {
|
|
'total': 0,
|
|
'already_enriched': 0,
|
|
'searched': 0,
|
|
'found': 0,
|
|
'enriched': 0,
|
|
'failed': 0
|
|
}
|
|
|
|
def search_wikidata(self, name: str, city: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Search Wikidata for an institution.
|
|
|
|
Args:
|
|
name: Institution name
|
|
city: City name
|
|
|
|
Returns:
|
|
Dict with qid, name, identifiers, etc.
|
|
"""
|
|
query = f"""
|
|
SELECT DISTINCT ?item ?itemLabel ?itemDescription ?viaf ?isil ?website ?coords ?inception
|
|
WHERE {{
|
|
?item rdfs:label "{name}"@en .
|
|
?item wdt:P31/wdt:P279* ?type .
|
|
VALUES ?type {{
|
|
wd:Q33506 wd:Q7075 wd:Q166118 wd:Q1030034 wd:Q473972
|
|
wd:Q570116 wd:Q22687 wd:Q28564
|
|
}}
|
|
|
|
OPTIONAL {{ ?item wdt:P214 ?viaf . }}
|
|
OPTIONAL {{ ?item wdt:P791 ?isil . }}
|
|
OPTIONAL {{ ?item wdt:P856 ?website . }}
|
|
OPTIONAL {{ ?item wdt:P625 ?coords . }}
|
|
OPTIONAL {{ ?item wdt:P571 ?inception . }}
|
|
|
|
SERVICE wikibase:label {{ bd:serviceParam wikibase:language "en,fr,ar" . }}
|
|
}}
|
|
LIMIT 1
|
|
"""
|
|
|
|
self.sparql.setQuery(query)
|
|
|
|
try:
|
|
time.sleep(0.5) # Rate limiting
|
|
raw_results = self.sparql.query().convert()
|
|
bindings = raw_results.get("results", {}).get("bindings", [])
|
|
|
|
if not bindings:
|
|
return None
|
|
|
|
binding = bindings[0]
|
|
item_uri = binding.get("item", {}).get("value", "")
|
|
qid = item_uri.split("/")[-1] if item_uri else None
|
|
|
|
if not qid or not qid.startswith("Q"):
|
|
return None
|
|
|
|
result = {
|
|
"qid": qid,
|
|
"name": binding.get("itemLabel", {}).get("value", ""),
|
|
"description": binding.get("itemDescription", {}).get("value", ""),
|
|
"identifiers": {}
|
|
}
|
|
|
|
if "viaf" in binding:
|
|
result["identifiers"]["VIAF"] = binding["viaf"]["value"]
|
|
|
|
if "isil" in binding:
|
|
result["identifiers"]["ISIL"] = binding["isil"]["value"]
|
|
|
|
if "website" in binding:
|
|
result["identifiers"]["Website"] = binding["website"]["value"]
|
|
|
|
if "inception" in binding:
|
|
result["founded_date"] = binding["inception"]["value"].split("T")[0]
|
|
|
|
if "coords" in binding:
|
|
coords_str = binding["coords"]["value"]
|
|
if coords_str.startswith("Point("):
|
|
lon, lat = coords_str[6:-1].split()
|
|
result["latitude"] = float(lat)
|
|
result["longitude"] = float(lon)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
print(f"\n ❌ Wikidata error: {e}")
|
|
return None
|
|
|
|
def enrich_institution(self, institution: Dict[str, Any]) -> bool:
|
|
"""
|
|
Enrich institution with Wikidata data.
|
|
|
|
Returns:
|
|
True if enriched, False otherwise
|
|
"""
|
|
self.stats['total'] += 1
|
|
|
|
# Check if already has Wikidata ID
|
|
identifiers = institution.get('identifiers', [])
|
|
existing_schemes = {i.get('identifier_scheme') for i in identifiers}
|
|
|
|
if 'Wikidata' in existing_schemes:
|
|
self.stats['already_enriched'] += 1
|
|
return False
|
|
|
|
# Extract search parameters
|
|
name = institution.get('name', '')
|
|
locations = institution.get('locations', [])
|
|
city = locations[0].get('city', '') if locations else ''
|
|
|
|
if not name:
|
|
return False
|
|
|
|
# Search Wikidata
|
|
self.stats['searched'] += 1
|
|
wd_data = self.search_wikidata(name, city)
|
|
|
|
if not wd_data:
|
|
self.stats['failed'] += 1
|
|
return False
|
|
|
|
self.stats['found'] += 1
|
|
|
|
# Add Wikidata identifier
|
|
identifiers.append({
|
|
'identifier_scheme': 'Wikidata',
|
|
'identifier_value': wd_data['qid'],
|
|
'identifier_url': f"https://www.wikidata.org/wiki/{wd_data['qid']}"
|
|
})
|
|
|
|
# Add other identifiers
|
|
for scheme, value in wd_data.get('identifiers', {}).items():
|
|
if scheme not in existing_schemes:
|
|
id_obj = {
|
|
'identifier_scheme': scheme,
|
|
'identifier_value': value
|
|
}
|
|
|
|
if scheme == 'VIAF':
|
|
id_obj['identifier_url'] = f"https://viaf.org/viaf/{value}"
|
|
elif scheme == 'Website':
|
|
id_obj['identifier_url'] = value
|
|
|
|
identifiers.append(id_obj)
|
|
|
|
# Add founding date if missing
|
|
if 'founded_date' in wd_data and not institution.get('founded_date'):
|
|
institution['founded_date'] = wd_data['founded_date']
|
|
|
|
# Add coordinates if missing
|
|
if 'latitude' in wd_data and locations:
|
|
location = locations[0]
|
|
if location.get('latitude') is None:
|
|
location['latitude'] = wd_data['latitude']
|
|
location['longitude'] = wd_data['longitude']
|
|
|
|
# Update provenance
|
|
prov = institution.get('provenance', {})
|
|
existing_method = prov.get('extraction_method', '')
|
|
prov['extraction_method'] = f"{existing_method} + Wikidata enrichment"
|
|
|
|
self.stats['enriched'] += 1
|
|
return True
|
|
|
|
def print_statistics(self):
|
|
"""Print enrichment statistics."""
|
|
print("\n" + "="*80)
|
|
print("WIKIDATA ENRICHMENT STATISTICS")
|
|
print("="*80)
|
|
print(f"Total institutions: {self.stats['total']}")
|
|
print(f"Already enriched: {self.stats['already_enriched']}")
|
|
print(f"Searched: {self.stats['searched']}")
|
|
print(f"Found: {self.stats['found']}")
|
|
print(f"Enriched: {self.stats['enriched']}")
|
|
print(f"Failed: {self.stats['failed']}")
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN PIPELINE
|
|
# =============================================================================
|
|
|
|
def load_institutions(yaml_file: Path) -> List[Dict[str, Any]]:
|
|
"""Load institutions from YAML file."""
|
|
with open(yaml_file, 'r', encoding='utf-8') as f:
|
|
data = yaml.safe_load(f)
|
|
|
|
if isinstance(data, list):
|
|
return data
|
|
elif isinstance(data, dict) and 'institutions' in data:
|
|
return data['institutions']
|
|
else:
|
|
raise ValueError(f"Unexpected YAML structure in {yaml_file}")
|
|
|
|
|
|
def save_institutions(institutions: List[Dict[str, Any]], yaml_file: Path):
|
|
"""Save institutions to YAML file."""
|
|
# Add metadata header
|
|
output = {
|
|
'_metadata': {
|
|
'title': 'Tunisian Heritage Institutions - Enhanced Dataset',
|
|
'description': 'Validated and enriched dataset with GHCIDs, geocoding, and Wikidata links',
|
|
'generated': datetime.now(timezone.utc).isoformat(),
|
|
'count': len(institutions),
|
|
'schema_version': '0.2.1',
|
|
'enhancements': [
|
|
'GHCID generation with UUID v5/v8',
|
|
'Geocoding via Nominatim API',
|
|
'Wikidata enrichment via SPARQL'
|
|
]
|
|
},
|
|
'institutions': institutions
|
|
}
|
|
|
|
with open(yaml_file, 'w', encoding='utf-8') as f:
|
|
yaml.dump(output, f, default_flow_style=False, allow_unicode=True, sort_keys=False, width=120)
|
|
|
|
print(f"\n✅ Saved {len(institutions)} institutions to {yaml_file}")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Enhance Tunisia heritage dataset")
|
|
parser.add_argument('--skip-ghcid', action='store_true', help="Skip GHCID generation")
|
|
parser.add_argument('--skip-geocoding', action='store_true', help="Skip geocoding")
|
|
parser.add_argument('--skip-wikidata', action='store_true', help="Skip Wikidata enrichment")
|
|
parser.add_argument('--dry-run', action='store_true', help="Show what would be done")
|
|
parser.add_argument('--verbose', action='store_true', help="Verbose output")
|
|
args = parser.parse_args()
|
|
|
|
# Paths
|
|
base_dir = Path(__file__).parent.parent
|
|
input_file = base_dir / "data" / "instances" / "tunisia" / "tunisian_institutions.yaml"
|
|
output_file = base_dir / "data" / "instances" / "tunisia" / "tunisian_institutions_enhanced.yaml"
|
|
cache_dir = base_dir / "data" / "cache"
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
print("="*80)
|
|
print("TUNISIA HERITAGE INSTITUTIONS - ENHANCEMENT PIPELINE")
|
|
print("="*80)
|
|
print(f"Input: {input_file}")
|
|
print(f"Output: {output_file}")
|
|
|
|
if args.dry_run:
|
|
print("\n⚠️ DRY RUN MODE - No changes will be made\n")
|
|
|
|
# Load data
|
|
print(f"\n📖 Loading institutions...")
|
|
institutions = load_institutions(input_file)
|
|
print(f" Loaded {len(institutions)} institutions")
|
|
|
|
# Step 1: GHCID Generation
|
|
if not args.skip_ghcid:
|
|
print("\n" + "="*80)
|
|
print("STEP 1: GHCID GENERATION")
|
|
print("="*80)
|
|
|
|
ghcid_gen = TunisiaGHCIDGenerator()
|
|
|
|
for i, inst in enumerate(institutions, 1):
|
|
if args.verbose:
|
|
print(f"\n[{i}/{len(institutions)}] {inst.get('name', 'Unknown')}")
|
|
|
|
if not args.dry_run:
|
|
ghcid_result = ghcid_gen.generate_ghcid(inst)
|
|
if ghcid_result:
|
|
inst['ghcid'] = ghcid_result['ghcid']
|
|
inst['ghcid_uuid'] = ghcid_result['ghcid_uuid']
|
|
inst['ghcid_uuid_sha256'] = ghcid_result['ghcid_uuid_sha256']
|
|
inst['ghcid_numeric'] = ghcid_result['ghcid_numeric']
|
|
|
|
if args.verbose:
|
|
print(f" GHCID: {ghcid_result['ghcid']}")
|
|
print(f" UUID: {ghcid_result['ghcid_uuid']}")
|
|
|
|
ghcid_gen.print_statistics()
|
|
|
|
# Step 2: Geocoding
|
|
if not args.skip_geocoding:
|
|
print("\n" + "="*80)
|
|
print("STEP 2: GEOCODING")
|
|
print("="*80)
|
|
|
|
cache_file = cache_dir / "tunisia_geocoding.db"
|
|
geocoder = TunisiaGeocoder(cache_file)
|
|
|
|
for i, inst in enumerate(institutions, 1):
|
|
if args.verbose:
|
|
print(f"\n[{i}/{len(institutions)}] {inst.get('name', 'Unknown')}")
|
|
|
|
if not args.dry_run:
|
|
success = geocoder.geocode_institution(inst)
|
|
if args.verbose and success:
|
|
locations = inst.get('locations', [])
|
|
if locations and locations[0].get('latitude'):
|
|
print(f" Coordinates: {locations[0]['latitude']:.4f}, {locations[0]['longitude']:.4f}")
|
|
|
|
geocoder.print_statistics()
|
|
geocoder.cache.close()
|
|
|
|
# Step 3: Wikidata Enrichment
|
|
if not args.skip_wikidata:
|
|
print("\n" + "="*80)
|
|
print("STEP 3: WIKIDATA ENRICHMENT")
|
|
print("="*80)
|
|
print("⚠️ This step searches Wikidata and may take several minutes...")
|
|
|
|
enricher = WikidataEnricher()
|
|
|
|
for i, inst in enumerate(institutions, 1):
|
|
if args.verbose:
|
|
print(f"\n[{i}/{len(institutions)}] {inst.get('name', 'Unknown')}")
|
|
|
|
if not args.dry_run:
|
|
enriched = enricher.enrich_institution(inst)
|
|
if args.verbose and enriched:
|
|
# Find Wikidata ID
|
|
for identifier in inst.get('identifiers', []):
|
|
if identifier.get('identifier_scheme') == 'Wikidata':
|
|
print(f" ✅ Wikidata: {identifier['identifier_value']}")
|
|
break
|
|
|
|
enricher.print_statistics()
|
|
|
|
# Save results
|
|
if not args.dry_run:
|
|
print("\n" + "="*80)
|
|
print("SAVING ENHANCED DATASET")
|
|
print("="*80)
|
|
save_institutions(institutions, output_file)
|
|
|
|
print("\n" + "="*80)
|
|
print("✅ ENHANCEMENT COMPLETE")
|
|
print("="*80)
|
|
print(f"\nEnhanced dataset: {output_file}")
|
|
print("\nNext steps:")
|
|
print(" 1. Review enhancements and validate results")
|
|
print(" 2. Export to RDF/JSON-LD: python scripts/export_tunisia_formats.py")
|
|
print(" 3. Integrate into global GLAM dataset")
|
|
else:
|
|
print("\n✅ Dry run complete - no changes made")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|