glam/scripts/nde_to_hc_rdf.py
2025-12-02 14:36:01 +01:00

588 lines
23 KiB
Python

#!/usr/bin/env python3
"""
NDE Enriched YAML to Heritage Custodian RDF Transformer
Converts enriched NDE heritage custodian data from YAML format to RDF (Turtle),
aligned with the Heritage Custodian ontology.
Output: data/nde/rdf/{ghcid_numeric}.ttl
Usage:
python scripts/nde_to_hc_rdf.py # Transform all entries
python scripts/nde_to_hc_rdf.py --entry 0946 # Transform single entry
python scripts/nde_to_hc_rdf.py --dry-run # Preview without writing
Author: GLAM Data Extraction Project
Date: 2025-12-02
"""
import argparse
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from urllib.parse import quote
import yaml
from rdflib import Graph, Literal, Namespace, URIRef
from rdflib.namespace import DCTERMS, FOAF, RDF, RDFS, SKOS, XSD
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Project paths
PROJECT_ROOT = Path(__file__).parent.parent
NDE_ENRICHED_DIR = PROJECT_ROOT / "data" / "nde" / "enriched" / "entries"
RDF_OUTPUT_DIR = PROJECT_ROOT / "data" / "nde" / "rdf"
# Namespaces
HC = Namespace("https://nde.nl/ontology/hc/")
HCC = Namespace("https://nde.nl/ontology/hc/class/")
SCHEMA = Namespace("http://schema.org/")
CRM = Namespace("http://www.cidoc-crm.org/cidoc-crm/")
PROV = Namespace("http://www.w3.org/ns/prov#")
WD = Namespace("http://www.wikidata.org/entity/")
ORG = Namespace("http://www.w3.org/ns/org#")
# NDE type code to CustodianPrimaryTypeEnum mapping
TYPE_CODE_MAP = {
'G': 'GALLERY',
'L': 'LIBRARY',
'A': 'ARCHIVE',
'M': 'MUSEUM',
'O': 'OFFICIAL_INSTITUTION',
'R': 'RESEARCH_CENTER',
'C': 'COMMERCIAL',
'U': 'UNSPECIFIED',
'B': 'BIO_CUSTODIAN',
'E': 'EDUCATION_PROVIDER',
'S': 'HERITAGE_SOCIETY',
'F': 'FEATURE_CUSTODIAN',
'I': 'INTANGIBLE_HERITAGE_GROUP',
'X': 'MIXED',
'P': 'PERSONAL_COLLECTION',
'H': 'HOLY_SACRED_SITE',
'D': 'DIGITAL_PLATFORM',
'N': 'NON_PROFIT',
'T': 'TASTE_SCENT_HERITAGE',
}
# Social media claim type to SocialMediaPlatformTypeEnum mapping
SOCIAL_CLAIM_TYPE_MAP = {
'social_facebook': 'FACEBOOK',
'social_instagram': 'INSTAGRAM',
'social_linkedin': 'LINKEDIN',
'social_youtube': 'YOUTUBE',
'social_twitter': 'X_TWITTER',
'social_x': 'X_TWITTER',
'social_tiktok': 'TIKTOK',
'social_pinterest': 'PINTEREST',
'social_flickr': 'FLICKR',
'social_vimeo': 'VIMEO',
'social_threads': 'THREADS',
'social_bluesky': 'BLUESKY',
'social_mastodon': 'MASTODON',
}
# Identifier scheme to URIs
IDENTIFIER_SCHEME_MAP = {
'ISIL': 'https://www.iso.org/standard/77849.html',
'Wikidata': 'https://www.wikidata.org/wiki/',
'VIAF': 'https://viaf.org/viaf/',
'GND': 'https://d-nb.info/gnd/',
'ISNI': 'https://isni.org/isni/',
'LCNAF': 'https://id.loc.gov/authorities/names/',
'ROR': 'https://ror.org/',
'GHCID': 'https://nde.nl/ontology/hc/',
'GHCID_UUID': 'urn:uuid:',
'GHCID_UUID_SHA256': 'urn:uuid:',
'GHCID_NUMERIC': 'https://nde.nl/ontology/hc/',
'RECORD_ID': 'urn:uuid:',
'Ringgold': 'https://www.ringgold.com/identify/',
}
class NDEToHCTransformer:
"""Transform NDE enriched YAML to Heritage Custodian RDF."""
def __init__(self, dry_run: bool = False):
self.dry_run = dry_run
self.stats = {
'processed': 0,
'success': 0,
'errors': 0,
'skipped': 0,
}
def transform_entry(self, entry_path: Path) -> Optional[Graph]:
"""Transform a single NDE entry to RDF Graph."""
try:
with open(entry_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
if not entry:
logger.warning(f"Empty entry: {entry_path}")
return None
# Get GHCID numeric for URI
ghcid_data = entry.get('ghcid', {})
ghcid_numeric = ghcid_data.get('ghcid_numeric')
if not ghcid_numeric:
logger.warning(f"No GHCID numeric in {entry_path}, skipping")
self.stats['skipped'] += 1
return None
# Create RDF graph
g = Graph()
g.bind('hc', HC)
g.bind('hcc', HCC)
g.bind('schema', SCHEMA)
g.bind('crm', CRM)
g.bind('prov', PROV)
g.bind('foaf', FOAF)
g.bind('skos', SKOS)
g.bind('dcterms', DCTERMS)
g.bind('wd', WD)
g.bind('org', ORG)
# Custodian hub URI
custodian_uri = URIRef(f"{HC}{ghcid_numeric}")
# Add type assertion
g.add((custodian_uri, RDF.type, CRM.E39_Actor))
g.add((custodian_uri, RDF.type, HCC.Custodian))
# hc_id (dcterms:identifier)
g.add((custodian_uri, DCTERMS.identifier, Literal(str(ghcid_numeric))))
# preferred_label (skos:prefLabel)
self._add_preferred_label(g, custodian_uri, entry)
# custodian_type
self._add_custodian_type(g, custodian_uri, entry)
# identifiers (CustodianIdentifier instances)
self._add_identifiers(g, custodian_uri, entry, ghcid_numeric)
# digital_platform (DigitalPlatform)
self._add_digital_platform(g, custodian_uri, entry, ghcid_numeric)
# social_media_profiles (SocialMediaProfile)
self._add_social_media_profiles(g, custodian_uri, entry, ghcid_numeric)
# Location/place data (coordinates, address)
self._add_place_data(g, custodian_uri, entry, ghcid_numeric)
# created/modified timestamps
self._add_timestamps(g, custodian_uri, entry)
return g
except Exception as e:
logger.error(f"Error transforming {entry_path}: {e}")
self.stats['errors'] += 1
return None
def _add_preferred_label(self, g: Graph, custodian_uri: URIRef, entry: dict):
"""Add preferred label from custodian_name or fallbacks."""
label = None
# Priority: custodian_name > wikidata label > original entry
custodian_name = entry.get('custodian_name', {})
if custodian_name:
label = custodian_name.get('claim_value')
if not label:
wikidata = entry.get('wikidata_enrichment', {})
label = wikidata.get('wikidata_label_nl') or wikidata.get('wikidata_label_en')
if not label:
original = entry.get('original_entry', {})
label = original.get('organisatie')
if label:
g.add((custodian_uri, SKOS.prefLabel, Literal(label, lang='nl')))
# Also add English label if available
wikidata = entry.get('wikidata_enrichment', {})
en_label = wikidata.get('wikidata_label_en')
if en_label and en_label != label:
g.add((custodian_uri, SKOS.altLabel, Literal(en_label, lang='en')))
def _add_custodian_type(self, g: Graph, custodian_uri: URIRef, entry: dict):
"""Add custodian type from original entry type code."""
original = entry.get('original_entry', {})
type_codes = original.get('type', [])
if not type_codes:
return
for code in type_codes:
type_enum = TYPE_CODE_MAP.get(code, 'UNSPECIFIED')
g.add((custodian_uri, HC.custodian_type, Literal(type_enum)))
def _add_identifiers(self, g: Graph, custodian_uri: URIRef, entry: dict, ghcid_numeric: str):
"""Add CustodianIdentifier instances."""
# From entry.identifiers list
identifiers = entry.get('identifiers', [])
for idx, ident in enumerate(identifiers):
scheme = ident.get('identifier_scheme')
value = ident.get('identifier_value')
if not scheme or not value:
continue
# Create identifier URI
ident_uri = URIRef(f"{HC}identifier/{ghcid_numeric}/{scheme.lower()}")
g.add((ident_uri, RDF.type, CRM.E42_Identifier))
g.add((ident_uri, SKOS.inScheme, Literal(scheme)))
g.add((ident_uri, SKOS.notation, Literal(str(value))))
# Link to custodian
g.add((custodian_uri, CRM.P48_has_preferred_identifier, ident_uri))
g.add((ident_uri, CRM.P48i_is_preferred_identifier_of, custodian_uri))
# Add identifier URL if available
url = ident.get('identifier_url')
if url:
g.add((ident_uri, SCHEMA.url, URIRef(url)))
# Wikidata identifiers from enrichment
wikidata = entry.get('wikidata_enrichment', {})
wikidata_id = wikidata.get('wikidata_entity_id')
if wikidata_id:
wd_ident_uri = URIRef(f"{HC}identifier/{ghcid_numeric}/wikidata")
g.add((wd_ident_uri, RDF.type, CRM.E42_Identifier))
g.add((wd_ident_uri, SKOS.inScheme, Literal('Wikidata')))
g.add((wd_ident_uri, SKOS.notation, Literal(wikidata_id)))
g.add((wd_ident_uri, SCHEMA.url, URIRef(f"https://www.wikidata.org/wiki/{wikidata_id}")))
g.add((custodian_uri, CRM.P48_has_preferred_identifier, wd_ident_uri))
# Also add owl:sameAs to Wikidata entity
g.add((custodian_uri, URIRef("http://www.w3.org/2002/07/owl#sameAs"), WD[wikidata_id]))
# Additional Wikidata identifiers (VIAF, GND, ISNI, etc.)
wd_identifiers = wikidata.get('wikidata_identifiers', {})
for scheme, value in wd_identifiers.items():
scheme_upper = scheme.upper()
if scheme_upper in IDENTIFIER_SCHEME_MAP:
ext_ident_uri = URIRef(f"{HC}identifier/{ghcid_numeric}/{scheme.lower()}")
g.add((ext_ident_uri, RDF.type, CRM.E42_Identifier))
g.add((ext_ident_uri, SKOS.inScheme, Literal(scheme_upper)))
g.add((ext_ident_uri, SKOS.notation, Literal(str(value))))
g.add((custodian_uri, CRM.P48_has_preferred_identifier, ext_ident_uri))
def _add_digital_platform(self, g: Graph, custodian_uri: URIRef, entry: dict, ghcid_numeric: str):
"""Add DigitalPlatform for website."""
# Get website from various sources
website = None
# Priority: wikidata official website > google maps website > web enrichment
wikidata = entry.get('wikidata_enrichment', {})
website = wikidata.get('wikidata_official_website')
if not website:
google = entry.get('google_maps_enrichment', {})
website = google.get('website')
if not website:
return
# Create DigitalPlatform instance
platform_uri = URIRef(f"{HC}platform/{ghcid_numeric}/website")
g.add((platform_uri, RDF.type, HCC.DigitalPlatform))
g.add((platform_uri, FOAF.homepage, URIRef(website)))
g.add((platform_uri, SCHEMA.url, URIRef(website)))
# Link to custodian
g.add((custodian_uri, FOAF.homepage, platform_uri))
# Add online catalog URL if available
claims = wikidata.get('wikidata_claims', {})
catalog_claim = claims.get('P8768_online_catalog_url', {})
catalog_values = catalog_claim.get('value') if isinstance(catalog_claim, dict) else None
if catalog_values:
# Handle both single value and list of values
if isinstance(catalog_values, list):
for catalog_url in catalog_values:
if catalog_url:
g.add((platform_uri, HC.collection_url, URIRef(catalog_url)))
else:
g.add((platform_uri, HC.collection_url, URIRef(catalog_values)))
def _add_social_media_profiles(self, g: Graph, custodian_uri: URIRef, entry: dict, ghcid_numeric: str):
"""Add SocialMediaProfile instances from web_claims."""
web_claims = entry.get('web_claims', {})
claims = web_claims.get('claims', [])
social_count = 0
for claim in claims:
claim_type = claim.get('claim_type', '')
if not claim_type.startswith('social_'):
continue
platform_type = SOCIAL_CLAIM_TYPE_MAP.get(claim_type)
if not platform_type:
continue
profile_url = claim.get('claim_value')
if not profile_url:
continue
# Validate URL - skip share links, intent URLs, or URLs with query params
if '/share?' in profile_url or '/intent/' in profile_url or '&' in profile_url:
logger.debug(f"Skipping non-profile URL: {profile_url}")
continue
# Validate URL doesn't contain spaces or invalid characters
if ' ' in profile_url:
logger.debug(f"Skipping URL with spaces: {profile_url}")
continue
social_count += 1
# Create SocialMediaProfile instance
profile_uri = URIRef(f"{HC}social/{ghcid_numeric}/{platform_type.lower()}")
g.add((profile_uri, RDF.type, FOAF.OnlineAccount))
g.add((profile_uri, RDF.type, HCC.SocialMediaProfile))
g.add((profile_uri, HC.platform_type, Literal(platform_type)))
g.add((profile_uri, FOAF.accountServiceHomepage, URIRef(profile_url)))
# Extract account name from URL
account_name = self._extract_account_name(profile_url, platform_type)
if account_name:
g.add((profile_uri, FOAF.accountName, Literal(account_name)))
# Add provenance from web claim
source_url = claim.get('source_url')
if source_url:
g.add((profile_uri, PROV.wasDerivedFrom, URIRef(source_url)))
retrieved_on = claim.get('retrieved_on')
if retrieved_on:
g.add((profile_uri, PROV.generatedAtTime, Literal(retrieved_on, datatype=XSD.dateTime)))
# Link to custodian
g.add((custodian_uri, FOAF.account, profile_uri))
# Also check Wikidata for Twitter username
wikidata = entry.get('wikidata_enrichment', {})
claims_wd = wikidata.get('wikidata_claims', {})
twitter_claim = claims_wd.get('P2002_x__twitter__username', {})
twitter_value = twitter_claim.get('value') if isinstance(twitter_claim, dict) else None
if twitter_value:
# Handle both single value and list of values (take first)
if isinstance(twitter_value, list):
twitter_username = twitter_value[0] if twitter_value else None
else:
twitter_username = twitter_value
if twitter_username:
# Check if we already have Twitter from web_claims
existing_twitter_uri = URIRef(f"{HC}social/{ghcid_numeric}/x_twitter")
if (existing_twitter_uri, RDF.type, FOAF.OnlineAccount) not in g:
g.add((existing_twitter_uri, RDF.type, FOAF.OnlineAccount))
g.add((existing_twitter_uri, RDF.type, HCC.SocialMediaProfile))
g.add((existing_twitter_uri, HC.platform_type, Literal('X_TWITTER')))
g.add((existing_twitter_uri, FOAF.accountName, Literal(twitter_username)))
g.add((existing_twitter_uri, FOAF.accountServiceHomepage, URIRef(f"https://x.com/{twitter_username}")))
g.add((custodian_uri, FOAF.account, existing_twitter_uri))
def _extract_account_name(self, url: str, platform_type: str) -> Optional[str]:
"""Extract account name from social media URL."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
path = parsed.path.strip('/')
if platform_type in ('FACEBOOK', 'INSTAGRAM', 'LINKEDIN', 'YOUTUBE'):
# Usually the last path component
parts = path.split('/')
if parts:
# Handle linkedin.com/company/name format
if platform_type == 'LINKEDIN' and len(parts) >= 2:
return parts[-1]
# Handle youtube.com/channel/ID format
if platform_type == 'YOUTUBE' and 'channel' in parts:
idx = parts.index('channel')
if idx + 1 < len(parts):
return parts[idx + 1]
return parts[-1] if parts[-1] else (parts[-2] if len(parts) > 1 else None)
return path.split('/')[-1] if path else None
except Exception:
return None
def _add_place_data(self, g: Graph, custodian_uri: URIRef, entry: dict, ghcid_numeric: str):
"""Add geographic/place data."""
# Get coordinates from Google Maps or Wikidata
google = entry.get('google_maps_enrichment', {})
coords = google.get('coordinates', {})
if not coords:
wikidata = entry.get('wikidata_enrichment', {})
coords = wikidata.get('wikidata_coordinates', {})
if coords:
lat = coords.get('latitude')
lon = coords.get('longitude')
if lat and lon:
# Create Place instance
place_uri = URIRef(f"{HC}place/{ghcid_numeric}")
g.add((place_uri, RDF.type, SCHEMA.Place))
g.add((place_uri, SCHEMA.latitude, Literal(lat, datatype=XSD.decimal)))
g.add((place_uri, SCHEMA.longitude, Literal(lon, datatype=XSD.decimal)))
# Add address if available
address = google.get('formatted_address')
if address:
g.add((place_uri, SCHEMA.address, Literal(address)))
# Link to custodian
g.add((custodian_uri, CRM.P53_has_former_or_current_location, place_uri))
# Add GeoNames ID from location resolution
ghcid_data = entry.get('ghcid', {})
loc_resolution = ghcid_data.get('location_resolution', {})
geonames_id = loc_resolution.get('geonames_id')
if geonames_id:
geonames_uri = URIRef(f"https://sws.geonames.org/{geonames_id}/")
g.add((custodian_uri, SCHEMA.containedInPlace, geonames_uri))
def _add_timestamps(self, g: Graph, custodian_uri: URIRef, entry: dict):
"""Add created/modified timestamps."""
processing_ts = entry.get('processing_timestamp')
if processing_ts:
g.add((custodian_uri, SCHEMA.dateCreated, Literal(processing_ts, datatype=XSD.dateTime)))
# Use provenance timestamp as modified
provenance = entry.get('provenance', {})
generated_at = provenance.get('generated_at')
if generated_at:
g.add((custodian_uri, SCHEMA.dateModified, Literal(generated_at, datatype=XSD.dateTime)))
def transform_all(self):
"""Transform all NDE enriched entries."""
# Ensure output directory exists
RDF_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
# Get all entry files
entry_files = sorted(NDE_ENRICHED_DIR.glob("*.yaml"))
total = len(entry_files)
logger.info(f"Found {total} NDE enriched entries to transform")
for idx, entry_path in enumerate(entry_files, 1):
self.stats['processed'] += 1
logger.info(f"[{idx}/{total}] Transforming {entry_path.name}")
graph = self.transform_entry(entry_path)
if graph:
# Get GHCID numeric for filename
with open(entry_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
ghcid_numeric = entry.get('ghcid', {}).get('ghcid_numeric')
if ghcid_numeric:
output_path = RDF_OUTPUT_DIR / f"{ghcid_numeric}.ttl"
if not self.dry_run:
graph.serialize(destination=str(output_path), format='turtle')
logger.info(f" -> Wrote {output_path.name} ({len(graph)} triples)")
else:
logger.info(f" -> [DRY-RUN] Would write {output_path.name} ({len(graph)} triples)")
self.stats['success'] += 1
# Summary
logger.info("=" * 60)
logger.info("Transformation complete!")
logger.info(f" Processed: {self.stats['processed']}")
logger.info(f" Success: {self.stats['success']}")
logger.info(f" Skipped: {self.stats['skipped']}")
logger.info(f" Errors: {self.stats['errors']}")
def transform_single(self, entry_index: str):
"""Transform a single entry by index (e.g., '0946')."""
# Find the entry file
pattern = f"{entry_index}_*.yaml"
matches = list(NDE_ENRICHED_DIR.glob(pattern))
if not matches:
logger.error(f"No entry found matching {pattern}")
return
entry_path = matches[0]
logger.info(f"Transforming single entry: {entry_path.name}")
graph = self.transform_entry(entry_path)
if graph:
# Get GHCID numeric for filename
with open(entry_path, 'r', encoding='utf-8') as f:
entry = yaml.safe_load(f)
ghcid_numeric = entry.get('ghcid', {}).get('ghcid_numeric')
if ghcid_numeric:
RDF_OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
output_path = RDF_OUTPUT_DIR / f"{ghcid_numeric}.ttl"
if not self.dry_run:
graph.serialize(destination=str(output_path), format='turtle')
logger.info(f"Wrote {output_path.name} ({len(graph)} triples)")
# Also print the RDF
print("\n" + "=" * 60)
print("Generated RDF (Turtle):")
print("=" * 60)
print(graph.serialize(format='turtle'))
else:
logger.info(f"[DRY-RUN] Would write {output_path.name}")
print("\n" + "=" * 60)
print("Generated RDF (Turtle) [DRY-RUN]:")
print("=" * 60)
print(graph.serialize(format='turtle'))
def main():
parser = argparse.ArgumentParser(
description="Transform NDE enriched YAML to Heritage Custodian RDF"
)
parser.add_argument(
'--entry', '-e',
help="Transform single entry by index (e.g., '0946')"
)
parser.add_argument(
'--dry-run', '-n',
action='store_true',
help="Preview without writing files"
)
args = parser.parse_args()
transformer = NDEToHCTransformer(dry_run=args.dry_run)
if args.entry:
transformer.transform_single(args.entry)
else:
transformer.transform_all()
if __name__ == '__main__':
main()