feat(loaders): update DuckLake and TypeDB loaders with relation support

This commit is contained in:
kempersc 2025-12-08 15:00:14 +01:00
parent 486bbee813
commit 0938cce6cf
2 changed files with 487 additions and 2 deletions

View file

@ -392,6 +392,8 @@ def extract_top_level_fields(data: dict) -> dict:
if cn:
record["custodian_name"] = cn.get("claim_value", "")
record["custodian_name_confidence"] = cn.get("confidence")
record["emic_name"] = cn.get("emic_name", "")
record["name_language"] = cn.get("name_language", "")
# Store complex objects as JSON
if data.get("wikidata_enrichment"):
@ -644,8 +646,8 @@ def main():
# Show sample record
print("\nSample record (first):")
sample = records[0]
for key in ["file_name", "ghcid_current", "custodian_name", "city", "country",
"google_rating", "youtube_channel_id"]:
for key in ["file_name", "ghcid_current", "custodian_name", "emic_name", "name_language",
"city", "country", "google_rating", "youtube_channel_id"]:
value = sample.get(key, 'N/A')
if value == "" or value is None:
value = "(empty)"
@ -655,10 +657,12 @@ def main():
yt_count = sum(1 for r in records if r.get("youtube_channel_id"))
gm_count = sum(1 for r in records if r.get("google_place_id"))
coord_count = sum(1 for r in records if r.get("latitude") is not None)
emic_count = sum(1 for r in records if r.get("emic_name"))
print(f"\nEnrichment summary:")
print(f" With coordinates: {coord_count}/{len(records)}")
print(f" With Google Maps: {gm_count}/{len(records)}")
print(f" With YouTube: {yt_count}/{len(records)}")
print(f" With emic_name: {emic_count}/{len(records)}")
if args.dry_run:
print("\n[DRY RUN] Would upload to DuckLake. Exiting without upload.")

View file

@ -0,0 +1,481 @@
#!/usr/bin/env python3
"""
Load NDE institution data into TypeDB with proper relations (edges)
This script creates:
1. institution entities with their attributes
2. location entities with geographic data
3. identifier entities for GHCID, Wikidata, etc.
4. RELATIONS connecting these entities (for graph visualization)
Key relations created:
- located_at: institution location
- has_identifier: institution identifier
- affiliated_with: institution provincial_archive (genealogiewerkbalk data)
Requires: typedb-driver>=2.28.0,<3.0.0
Usage: python scripts/load_typedb_data_with_relations.py
"""
import json
import sys
from pathlib import Path
from typedb.driver import TypeDB, SessionType, TransactionType
# Configuration
TYPEDB_HOST = "localhost"
TYPEDB_PORT = 1729
DATABASE = "glam"
# Path to NDE data
DATA_FILE = Path(__file__).parent.parent / "frontend/public/data/nde_institutions.json"
def escape_string(s: str) -> str:
"""Escape string for TypeQL"""
if s is None:
return ""
return str(s).replace("\\", "\\\\").replace('"', '\\"')
def define_schema(driver):
"""Define schema with relations for proper graph structure"""
print("Defining schema with relations...")
schema = """
define
# === ENTITY TYPES ===
institution sub entity,
owns name,
owns institution_type,
owns website,
owns description,
owns founding_year,
owns rating,
owns phone,
owns address,
plays located_at:subject,
plays has_identifier:subject,
plays affiliated_with:member,
plays same_province:institution;
location sub entity,
owns city,
owns province,
owns country,
owns latitude,
owns longitude,
plays located_at:place,
plays same_province:location;
identifier sub entity,
owns identifier_scheme,
owns identifier_value,
owns identifier_url,
plays has_identifier:identifier;
archive sub entity,
owns name,
owns website,
owns isil_code,
plays affiliated_with:archive;
# === ATTRIBUTE TYPES ===
name sub attribute, value string;
institution_type sub attribute, value string;
website sub attribute, value string;
description sub attribute, value string;
founding_year sub attribute, value long;
rating sub attribute, value double;
phone sub attribute, value string;
address sub attribute, value string;
city sub attribute, value string;
province sub attribute, value string;
country sub attribute, value string;
latitude sub attribute, value double;
longitude sub attribute, value double;
identifier_scheme sub attribute, value string;
identifier_value sub attribute, value string;
identifier_url sub attribute, value string;
isil_code sub attribute, value string;
# === RELATION TYPES (These create edges in the graph!) ===
# Institution is located at a place
located_at sub relation,
relates subject,
relates place;
# Institution has an identifier
has_identifier sub relation,
relates subject,
relates identifier;
# Institution is affiliated with an archive
affiliated_with sub relation,
relates member,
relates archive;
# Institutions in the same province (inferred relation for graph connectivity)
same_province sub relation,
relates institution,
relates location;
"""
with driver.session(DATABASE, SessionType.SCHEMA) as session:
with session.transaction(TransactionType.WRITE) as tx:
tx.query.define(schema)
tx.commit()
print("✅ Schema defined with relations")
def clear_data(driver):
"""Clear existing data"""
print("Clearing existing data...")
with driver.session(DATABASE, SessionType.DATA) as session:
with session.transaction(TransactionType.WRITE) as tx:
# Delete relations first (TypeDB requires this order)
try:
tx.query.delete("match $r isa located_at; delete $r isa located_at;")
except:
pass
try:
tx.query.delete("match $r isa has_identifier; delete $r isa has_identifier;")
except:
pass
try:
tx.query.delete("match $r isa affiliated_with; delete $r isa affiliated_with;")
except:
pass
# Then delete entities
try:
tx.query.delete("match $x isa institution; delete $x isa institution;")
except:
pass
try:
tx.query.delete("match $x isa location; delete $x isa location;")
except:
pass
try:
tx.query.delete("match $x isa identifier; delete $x isa identifier;")
except:
pass
try:
tx.query.delete("match $x isa archive; delete $x isa archive;")
except:
pass
tx.commit()
print("✅ Data cleared")
def load_data(driver, institutions: list):
"""Load institutions with proper relations"""
batch_size = 25 # Smaller batches for complex inserts
total = len(institutions)
inserted = 0
relations_created = 0
errors = 0
# Track unique locations and archives to avoid duplicates
locations_created = set()
archives_created = set()
with driver.session(DATABASE, SessionType.DATA) as session:
for i in range(0, total, batch_size):
batch = institutions[i:i + batch_size]
with session.transaction(TransactionType.WRITE) as tx:
for inst in batch:
try:
# Extract data
name = escape_string(inst.get("name", "Unknown"))
inst_type = escape_string(inst.get("type_name", "Unknown"))
website = escape_string(inst.get("website", ""))
description = escape_string(inst.get("description", ""))
founding_year = inst.get("founding_year")
rating = inst.get("rating")
phone = escape_string(inst.get("phone", ""))
address_str = escape_string(inst.get("address", ""))
city = escape_string(inst.get("city", ""))
province = escape_string(inst.get("province", ""))
lat = inst.get("lat")
lon = inst.get("lon")
# Get GHCID as unique key
ghcid = ""
ghcid_data = inst.get("ghcid", {})
if ghcid_data:
ghcid = escape_string(ghcid_data.get("current", ""))
if not ghcid:
# Skip entries without GHCID (can't create unique references)
continue
# Wikidata ID
wikidata_id = escape_string(inst.get("wikidata_id", ""))
# === 1. INSERT INSTITUTION ===
inst_parts = [f'insert $inst isa institution, has name "{name}"']
if inst_type:
inst_parts.append(f', has institution_type "{inst_type}"')
if website:
inst_parts.append(f', has website "{website}"')
if description:
inst_parts.append(f', has description "{description}"')
if founding_year:
inst_parts.append(f', has founding_year {int(founding_year)}')
if rating:
inst_parts.append(f', has rating {float(rating)}')
if phone:
inst_parts.append(f', has phone "{phone}"')
if address_str:
inst_parts.append(f', has address "{address_str}"')
inst_query = "".join(inst_parts) + ";"
tx.query.insert(inst_query)
inserted += 1
# === 2. INSERT LOCATION & CREATE RELATION ===
if lat is not None and lon is not None:
location_key = f"{province}:{city}" if city else f"{province}:unknown"
if location_key not in locations_created:
# Insert new location
loc_parts = [f'insert $loc isa location']
loc_parts.append(f', has latitude {lat}')
loc_parts.append(f', has longitude {lon}')
loc_parts.append(', has country "NL"')
if city:
loc_parts.append(f', has city "{city}"')
if province:
loc_parts.append(f', has province "{province}"')
loc_query = "".join(loc_parts) + ";"
tx.query.insert(loc_query)
locations_created.add(location_key)
# Create located_at relation using name match
# We use the institution name and location attributes to match
rel_query = f'''
match
$inst isa institution, has name "{name}";
$loc isa location, has latitude {lat}, has longitude {lon};
insert
(subject: $inst, place: $loc) isa located_at;
'''
tx.query.insert(rel_query)
relations_created += 1
# === 3. INSERT IDENTIFIERS & CREATE RELATIONS ===
# GHCID identifier
if ghcid:
ghcid_uuid = ghcid_data.get("uuid", "")
ghcid_query = f'''
insert $id isa identifier,
has identifier_scheme "GHCID",
has identifier_value "{ghcid}",
has identifier_url "";
'''
tx.query.insert(ghcid_query)
# Create has_identifier relation
id_rel_query = f'''
match
$inst isa institution, has name "{name}";
$id isa identifier, has identifier_value "{ghcid}";
insert
(subject: $inst, identifier: $id) isa has_identifier;
'''
tx.query.insert(id_rel_query)
relations_created += 1
# Wikidata identifier
if wikidata_id:
wikidata_query = f'''
insert $id isa identifier,
has identifier_scheme "Wikidata",
has identifier_value "{wikidata_id}",
has identifier_url "https://www.wikidata.org/wiki/{wikidata_id}";
'''
tx.query.insert(wikidata_query)
# Create has_identifier relation
wd_rel_query = f'''
match
$inst isa institution, has name "{name}";
$id isa identifier, has identifier_value "{wikidata_id}";
insert
(subject: $inst, identifier: $id) isa has_identifier;
'''
tx.query.insert(wd_rel_query)
relations_created += 1
# === 4. INSERT ARCHIVE AFFILIATIONS ===
genealogie = inst.get("genealogiewerkbalk", {})
if genealogie:
prov_archive = genealogie.get("provincial_archive", {})
if prov_archive:
archive_name = escape_string(prov_archive.get("name", ""))
archive_website = escape_string(prov_archive.get("website", ""))
if archive_name and archive_name not in archives_created:
# Insert archive
archive_query = f'''
insert $arch isa archive,
has name "{archive_name}",
has website "{archive_website}";
'''
tx.query.insert(archive_query)
archives_created.add(archive_name)
if archive_name:
# Create affiliated_with relation
aff_query = f'''
match
$inst isa institution, has name "{name}";
$arch isa archive, has name "{archive_name}";
insert
(member: $inst, archive: $arch) isa affiliated_with;
'''
tx.query.insert(aff_query)
relations_created += 1
except Exception as e:
errors += 1
if errors <= 5:
print(f" ❌ Error inserting {inst.get('name', 'unknown')}: {e}")
tx.commit()
# Progress
pct = (i + len(batch)) / total * 100
print(f" Progress: {i + len(batch)}/{total} ({pct:.1f}%) - {inserted} institutions, {relations_created} relations, {errors} errors")
return inserted, relations_created, errors
def verify_data(driver):
"""Verify loaded data and relations"""
print("\n🔍 Verifying loaded data...")
with driver.session(DATABASE, SessionType.DATA) as session:
with session.transaction(TransactionType.READ) as tx:
# Count entities
inst_count = sum(1 for _ in tx.query.get("match $x isa institution; get $x;"))
loc_count = sum(1 for _ in tx.query.get("match $x isa location; get $x;"))
id_count = sum(1 for _ in tx.query.get("match $x isa identifier; get $x;"))
arch_count = sum(1 for _ in tx.query.get("match $x isa archive; get $x;"))
# Count relations
located_count = sum(1 for _ in tx.query.get("match $r isa located_at; get $r;"))
has_id_count = sum(1 for _ in tx.query.get("match $r isa has_identifier; get $r;"))
aff_count = sum(1 for _ in tx.query.get("match $r isa affiliated_with; get $r;"))
print(f"""
📊 Data Summary:
ENTITIES:
Institutions: {inst_count}
Locations: {loc_count}
Identifiers: {id_count}
Archives: {arch_count}
RELATIONS (EDGES):
located_at: {located_count}
has_identifier: {has_id_count}
affiliated_with: {aff_count}
Total edges: {located_count + has_id_count + aff_count}
""")
# Sample a graph query
print("📈 Sample graph query (institution → location):")
sample = tx.query.get("""
match
$inst isa institution, has name $name;
(subject: $inst, place: $loc) isa located_at;
$loc has city $city, has province $prov;
get $name, $city, $prov;
limit 5;
""")
for row in sample:
name = row.get("name")
city = row.get("city")
prov = row.get("prov")
if name and city:
print(f"{name.get_value()}{city.get_value()}, {prov.get_value()}")
def main():
"""Main entry point"""
print("=" * 60)
print("TypeDB Data Loader with Relations")
print("=" * 60)
print()
# Load JSON data
print(f"📂 Loading data from {DATA_FILE}...")
with open(DATA_FILE, "r", encoding="utf-8") as f:
institutions = json.load(f)
print(f" Found {len(institutions)} institutions")
# Connect to TypeDB
print(f"\n🔗 Connecting to TypeDB at {TYPEDB_HOST}:{TYPEDB_PORT}...")
driver = TypeDB.core_driver(f"{TYPEDB_HOST}:{TYPEDB_PORT}")
try:
# Check database exists
db_names = [db.name for db in driver.databases.all()]
if DATABASE not in db_names:
print(f" Creating database '{DATABASE}'...")
driver.databases.create(DATABASE)
print(f" ✅ Connected to database '{DATABASE}'")
# Define schema
define_schema(driver)
# Clear existing data
clear_data(driver)
# Load data with relations
print("\n📥 Loading data with relations...")
inserted, relations, errors = load_data(driver, institutions)
print(f"""
Loading complete!
Institutions inserted: {inserted}
Relations created: {relations}
Errors: {errors}
""")
# Verify
verify_data(driver)
finally:
driver.close()
print("\n" + "=" * 60)
print("Done! Graph data ready for visualization at bronhouder.nl/database")
print("=" * 60)
if __name__ == "__main__":
main()