46 KiB
UNESCO Data Extraction - Design Patterns
Project: Global GLAM Dataset - UNESCO World Heritage Sites Extraction
Document: 05 - Design Patterns
Version: 1.1
Date: 2025-11-10
Status: Draft
Executive Summary
This document defines the architectural design patterns for the UNESCO World Heritage Sites extraction project. These patterns ensure consistency with the existing Global GLAM Dataset architecture while addressing UNESCO-specific requirements such as institution type classification, multi-language handling, and authoritative data tier management.
Key Principles:
- Consistency with Global GLAM patterns - Reuse existing patterns from
docs/plan/global_glam/05-design-patterns.md - UNESCO-specific adaptations - Handle unique UNESCO data characteristics
- SOLID principles - Single Responsibility, Open/Closed, Liskov Substitution, Interface Segregation, Dependency Inversion
- Testability - All components designed for easy unit testing
Core Design Patterns
Pattern 1: Repository Pattern (Data Access Layer)
Purpose: Abstract data access logic to allow flexible data sources (API, cache, fixtures).
Implementation:
# src/glam_extractor/repositories/unesco_repository.py
from abc import ABC, abstractmethod
from typing import List, Optional, Dict
from glam_extractor.models import HeritageCustodian
class UNESCORepository(ABC):
"""Abstract repository for UNESCO site data."""
@abstractmethod
def fetch_site_list(self) -> List[Dict]:
"""Fetch list of all UNESCO World Heritage Sites."""
pass
@abstractmethod
def fetch_site_details(self, whc_id: int) -> Dict:
"""Fetch detailed information for a specific site."""
pass
@abstractmethod
def fetch_sites_by_country(self, country_code: str) -> List[Dict]:
"""Fetch all sites in a specific country."""
pass
class UNESCOAPIRepository(UNESCORepository):
"""Repository implementation using UNESCO DataHub API."""
def __init__(self, api_client: UNESCOAPIClient):
self.api_client = api_client
def fetch_site_list(self) -> List[Dict]:
return self.api_client.get("/catalog/datasets/whc-sites-2021/records")
def fetch_site_details(self, whc_id: int) -> Dict:
return self.api_client.get(f"/catalog/datasets/whc-sites-2021/records/{whc_id}")
def fetch_sites_by_country(self, country_code: str) -> List[Dict]:
all_sites = self.fetch_site_list()
# OpenDataSoft nested structure: record.fields.states_name_en
return [s for s in all_sites if country_code in s.get('record', {}).get('fields', {}).get('states_name_en', '')]
class UNESCOCachedRepository(UNESCORepository):
"""Repository with caching layer."""
def __init__(self, api_repository: UNESCOAPIRepository, cache: Cache):
self.api_repository = api_repository
self.cache = cache
def fetch_site_details(self, whc_id: int) -> Dict:
cache_key = f"unesco_site_{whc_id}"
# Try cache first
cached = self.cache.get(cache_key)
if cached:
return cached
# Fetch from API and cache
data = self.api_repository.fetch_site_details(whc_id)
self.cache.set(cache_key, data, ttl=86400) # 24 hours
return data
class UNESCOFixtureRepository(UNESCORepository):
"""Repository using test fixtures (for testing)."""
def __init__(self, fixtures_dir: Path):
self.fixtures_dir = fixtures_dir
def fetch_site_details(self, whc_id: int) -> Dict:
fixture_path = self.fixtures_dir / f"site_{whc_id}.json"
with open(fixture_path) as f:
return json.load(f)
Benefits:
- ✅ Easy to mock for testing (use
UNESCOFixtureRepository) - ✅ Separation of concerns (API logic vs. business logic)
- ✅ Flexible data sources (API, cache, fixtures, future: database)
- ✅ Adheres to Dependency Inversion Principle
Usage:
# Production: API with caching
api_client = UNESCOAPIClient()
api_repo = UNESCOAPIRepository(api_client)
cache = SQLiteCache("cache/unesco.db")
repo = UNESCOCachedRepository(api_repo, cache)
# Testing: Fixtures
repo = UNESCOFixtureRepository(Path("tests/fixtures/unesco_api_responses"))
# Extract data (same interface regardless of source)
site_data = repo.fetch_site_details(600)
Pattern 2: Strategy Pattern (Institution Type Classification)
Purpose: Allow multiple classification strategies (keyword-based, ML-based, rule-based) with the same interface.
Implementation:
# src/glam_extractor/classifiers/strategy.py
from abc import ABC, abstractmethod
from typing import Dict
from glam_extractor.models import InstitutionTypeEnum
class ClassificationStrategy(ABC):
"""Abstract strategy for institution type classification."""
@abstractmethod
def classify(self, unesco_data: Dict) -> Dict[str, any]:
"""
Classify institution type from UNESCO data.
Returns:
{
'institution_type': InstitutionTypeEnum,
'confidence_score': float,
'reasoning': str
}
"""
pass
class KeywordClassificationStrategy(ClassificationStrategy):
"""Classification based on keyword matching."""
MUSEUM_KEYWORDS = ['museum', 'musée', 'museo', 'muzeum', 'gallery']
LIBRARY_KEYWORDS = ['library', 'bibliothèque', 'biblioteca', 'bibliotheek']
ARCHIVE_KEYWORDS = ['archive', 'archiv', 'archivo', 'archief']
BOTANICAL_KEYWORDS = ['botanical garden', 'jardin botanique', 'arboretum', 'botanic']
HOLY_SITE_KEYWORDS = ['cathedral', 'church', 'monastery', 'abbey', 'temple', 'mosque', 'synagogue']
def classify(self, unesco_data: Dict) -> Dict[str, any]:
# Extract from OpenDataSoft nested structure
fields = unesco_data.get('record', {}).get('fields', {})
description = fields.get('short_description_en', '').lower()
justification = fields.get('justification_en', '').lower()
text = f"{description} {justification}"
scores = {
InstitutionTypeEnum.MUSEUM: self._count_keywords(text, self.MUSEUM_KEYWORDS),
InstitutionTypeEnum.LIBRARY: self._count_keywords(text, self.LIBRARY_KEYWORDS),
InstitutionTypeEnum.ARCHIVE: self._count_keywords(text, self.ARCHIVE_KEYWORDS),
InstitutionTypeEnum.BOTANICAL_ZOO: self._count_keywords(text, self.BOTANICAL_KEYWORDS),
InstitutionTypeEnum.HOLY_SITES: self._count_keywords(text, self.HOLY_SITE_KEYWORDS),
}
# Find highest scoring type
best_type = max(scores, key=scores.get)
max_score = scores[best_type]
if max_score == 0:
return {
'institution_type': InstitutionTypeEnum.MIXED,
'confidence_score': 0.5,
'reasoning': 'No keywords matched, defaulted to MIXED'
}
confidence = min(1.0, max_score / 3) # Normalize to 0-1 range
return {
'institution_type': best_type,
'confidence_score': confidence,
'reasoning': f'Matched {max_score} keyword(s) for {best_type}'
}
def _count_keywords(self, text: str, keywords: List[str]) -> int:
return sum(1 for kw in keywords if kw in text)
class RuleBasedClassificationStrategy(ClassificationStrategy):
"""Classification using explicit rules and heuristics."""
def classify(self, unesco_data: Dict) -> Dict[str, any]:
# Extract from OpenDataSoft nested structure
fields = unesco_data.get('record', {}).get('fields', {})
category = fields.get('category', '')
description = fields.get('short_description_en', '').lower()
# Rule 1: Natural sites are rarely GLAM institutions
if category == 'Natural':
if 'visitor center' in description or 'interpretation' in description:
return {
'institution_type': InstitutionTypeEnum.MIXED,
'confidence_score': 0.6,
'reasoning': 'Natural site with visitor facilities'
}
else:
return {
'institution_type': InstitutionTypeEnum.MIXED,
'confidence_score': 0.4,
'reasoning': 'Natural site, likely not a GLAM custodian'
}
# Rule 2: Sites with "library" in name are libraries
site_name = fields.get('name_en', '').lower()
if 'library' in site_name or 'bibliothèque' in site_name:
return {
'institution_type': InstitutionTypeEnum.LIBRARY,
'confidence_score': 0.95,
'reasoning': 'Site name explicitly mentions library'
}
# Rule 3: Religious sites with collections are HOLY_SITES
if any(word in description for word in ['cathedral', 'monastery', 'abbey', 'church']):
if any(word in description for word in ['collection', 'archive', 'library', 'manuscript']):
return {
'institution_type': InstitutionTypeEnum.HOLY_SITES,
'confidence_score': 0.85,
'reasoning': 'Religious site with heritage collection'
}
# Fallback: Use keyword strategy
keyword_strategy = KeywordClassificationStrategy()
return keyword_strategy.classify(unesco_data)
class CompositeClassificationStrategy(ClassificationStrategy):
"""Combine multiple strategies and vote."""
def __init__(self, strategies: List[ClassificationStrategy]):
self.strategies = strategies
def classify(self, unesco_data: Dict) -> Dict[str, any]:
results = [strategy.classify(unesco_data) for strategy in self.strategies]
# Weighted voting (weight by confidence score)
votes = {}
for result in results:
inst_type = result['institution_type']
confidence = result['confidence_score']
votes[inst_type] = votes.get(inst_type, 0) + confidence
# Find winner
best_type = max(votes, key=votes.get)
avg_confidence = votes[best_type] / len(self.strategies)
return {
'institution_type': best_type,
'confidence_score': avg_confidence,
'reasoning': f'Composite vote from {len(self.strategies)} strategies'
}
class InstitutionTypeClassifier:
"""Context class for classification strategies."""
def __init__(self, strategy: ClassificationStrategy):
self.strategy = strategy
def classify(self, unesco_data: Dict) -> Dict[str, any]:
return self.strategy.classify(unesco_data)
def set_strategy(self, strategy: ClassificationStrategy):
"""Allow runtime strategy switching."""
self.strategy = strategy
Benefits:
- ✅ Easy to add new classification strategies (Open/Closed Principle)
- ✅ Strategies can be tested independently
- ✅ Composite strategy allows voting across multiple approaches
- ✅ Runtime strategy switching for A/B testing
Usage:
# Use keyword strategy
classifier = InstitutionTypeClassifier(KeywordClassificationStrategy())
result = classifier.classify(unesco_site_data)
# Switch to rule-based strategy
classifier.set_strategy(RuleBasedClassificationStrategy())
result = classifier.classify(unesco_site_data)
# Use composite (voting) strategy
composite = CompositeClassificationStrategy([
KeywordClassificationStrategy(),
RuleBasedClassificationStrategy()
])
classifier.set_strategy(composite)
result = classifier.classify(unesco_site_data)
Pattern 3: Builder Pattern (HeritageCustodian Construction)
Purpose: Construct complex HeritageCustodian objects step-by-step from UNESCO data.
Implementation:
# src/glam_extractor/builders/heritage_custodian_builder.py
from datetime import datetime, timezone
from typing import Optional, List
from glam_extractor.models import (
HeritageCustodian, Location, Identifier,
Provenance, DataSource, DataTier
)
class HeritageCustodianBuilder:
"""Builder for constructing HeritageCustodian from UNESCO data."""
def __init__(self):
self._custodian = HeritageCustodian()
self._unesco_data = None
def from_unesco_data(self, unesco_data: Dict) -> 'HeritageCustodianBuilder':
"""Initialize builder with UNESCO site data."""
self._unesco_data = unesco_data
return self
def with_name(self, name: Optional[str] = None) -> 'HeritageCustodianBuilder':
"""Set institution name (defaults to UNESCO site name)."""
if name:
self._custodian.name = name
elif self._unesco_data:
# Extract from OpenDataSoft nested structure
fields = self._unesco_data.get('record', {}).get('fields', {})
self._custodian.name = fields.get('name_en', '')
return self
def with_institution_type(
self,
classifier: InstitutionTypeClassifier
) -> 'HeritageCustodianBuilder':
"""Classify and set institution type."""
if self._unesco_data:
result = classifier.classify(self._unesco_data)
self._custodian.institution_type = result['institution_type']
# Store confidence in provenance
if not self._custodian.provenance:
self._custodian.provenance = Provenance()
self._custodian.provenance.confidence_score = result['confidence_score']
return self
def with_location(self) -> 'HeritageCustodianBuilder':
"""Extract and set location from UNESCO data."""
if not self._unesco_data:
return self
# Extract from OpenDataSoft nested structure
fields = self._unesco_data.get('record', {}).get('fields', {})
location = Location(
city=self._extract_city(),
country=self._extract_country_code(),
region=fields.get('region_en', ''),
latitude=fields.get('latitude'),
longitude=fields.get('longitude')
)
self._custodian.locations = [location]
return self
def with_unesco_identifiers(self) -> 'HeritageCustodianBuilder':
"""Add UNESCO WHC ID and any linked identifiers."""
if not self._unesco_data:
return self
identifiers = []
# Extract from OpenDataSoft nested structure
fields = self._unesco_data.get('record', {}).get('fields', {})
# UNESCO WHC ID
whc_id = fields.get('unique_number')
if whc_id:
identifiers.append(Identifier(
identifier_scheme='UNESCO_WHC',
identifier_value=str(whc_id),
identifier_url=f'https://whc.unesco.org/en/list/{whc_id}'
))
# Wikidata Q-number from http_url field (if present)
http_url = fields.get('http_url', '')
if 'wikidata.org' in http_url:
q_number = http_url.split('/')[-1]
identifiers.append(Identifier(
identifier_scheme='Wikidata',
identifier_value=q_number,
identifier_url=http_url
))
self._custodian.identifiers = identifiers
return self
def with_alternative_names(self) -> 'HeritageCustodianBuilder':
"""Extract multi-language names as alternative names."""
if not self._unesco_data:
return self
alt_names = []
primary_name = self._custodian.name
# Extract from OpenDataSoft nested structure
fields = self._unesco_data.get('record', {}).get('fields', {})
# Check multi-language name fields
for lang_suffix in ['fr', 'es', 'ar', 'ru', 'zh']:
name_field = f'name_{lang_suffix}'
if name_field in fields:
name = fields[name_field]
if name and name != primary_name:
alt_names.append(f"{name}@{lang_suffix}")
if alt_names:
self._custodian.alternative_names = alt_names
return self
def with_description(self) -> 'HeritageCustodianBuilder':
"""Set description from UNESCO short_description."""
if self._unesco_data:
description = self._unesco_data.get('short_description', '')
if description:
self._custodian.description = description
return self
def with_provenance(
self,
extraction_method: str = "UNESCO DataHub API extraction"
) -> 'HeritageCustodianBuilder':
"""Set provenance metadata."""
self._custodian.provenance = Provenance(
data_source=DataSource.UNESCO_WORLD_HERITAGE,
data_tier=DataTier.TIER_1_AUTHORITATIVE,
extraction_date=datetime.now(timezone.utc).isoformat(),
extraction_method=extraction_method,
confidence_score=self._custodian.provenance.confidence_score if self._custodian.provenance else 1.0
)
return self
def with_ghcid(
self,
ghcid_generator: 'GHCIDGenerator'
) -> 'HeritageCustodianBuilder':
"""Generate and set GHCID."""
if self._custodian.name and self._custodian.institution_type and self._custodian.locations:
self._custodian.ghcid = ghcid_generator.generate(self._custodian)
self._custodian.ghcid_uuid = ghcid_generator.generate_uuid_v5(self._custodian.ghcid)
return self
def build(self) -> HeritageCustodian:
"""Return the constructed HeritageCustodian."""
# Validate that required fields are set
if not self._custodian.name:
raise ValueError("HeritageCustodian must have a name")
if not self._custodian.provenance:
self.with_provenance()
return self._custodian
# Helper methods
def _extract_city(self) -> str:
"""Extract city name from UNESCO data."""
# UNESCO doesn't always provide city, try to extract from description
states = self._unesco_data.get('states', '')
# Simplified: In real implementation, use geocoding or NLP
return states.split(',')[0] if ',' in states else states
def _extract_country_code(self) -> str:
"""Extract ISO country code from UNESCO states field."""
states = self._unesco_data.get('states', '')
# Map country name to ISO code (simplified, use pycountry in real implementation)
country_map = {
'France': 'FR',
'United Kingdom': 'GB',
'Brazil': 'BR',
'India': 'IN',
'Japan': 'JP',
# ... full mapping
}
return country_map.get(states.strip(), 'XX')
Benefits:
- ✅ Fluent interface (method chaining)
- ✅ Step-by-step construction with validation
- ✅ Easy to add new construction steps
- ✅ Separates construction logic from model
Usage:
# Construct HeritageCustodian with fluent API
custodian = (
HeritageCustodianBuilder()
.from_unesco_data(unesco_site_data)
.with_name()
.with_institution_type(classifier)
.with_location()
.with_unesco_identifiers()
.with_alternative_names()
.with_description()
.with_provenance()
.with_ghcid(ghcid_generator)
.build()
)
Pattern 4: Result Pattern (Error Handling)
Purpose: Handle errors without exceptions, making error handling explicit in type signatures.
Implementation:
# src/glam_extractor/utils/result.py
from typing import TypeVar, Generic, Union, Callable
from dataclasses import dataclass
T = TypeVar('T')
E = TypeVar('E')
@dataclass
class Ok(Generic[T]):
"""Success result."""
value: T
def is_ok(self) -> bool:
return True
def is_err(self) -> bool:
return False
def unwrap(self) -> T:
return self.value
def map(self, func: Callable[[T], 'U']) -> 'Result[U, E]':
return Ok(func(self.value))
def and_then(self, func: Callable[[T], 'Result[U, E]']) -> 'Result[U, E]':
return func(self.value)
@dataclass
class Err(Generic[E]):
"""Error result."""
error: E
def is_ok(self) -> bool:
return False
def is_err(self) -> bool:
return True
def unwrap(self) -> T:
raise ValueError(f"Called unwrap on Err: {self.error}")
def map(self, func: Callable[[T], 'U']) -> 'Result[U, E]':
return self # Propagate error
def and_then(self, func: Callable[[T], 'Result[U, E]']) -> 'Result[U, E]':
return self # Propagate error
Result = Union[Ok[T], Err[E]]
Usage:
# src/glam_extractor/parsers/unesco_parser.py
def parse_unesco_site(unesco_data: Dict) -> Result[HeritageCustodian, str]:
"""Parse UNESCO site data to HeritageCustodian."""
try:
# Validate required fields
if 'id_number' not in unesco_data:
return Err("Missing required field: id_number")
if 'site' not in unesco_data:
return Err("Missing required field: site")
# Build custodian
builder = HeritageCustodianBuilder()
custodian = (
builder
.from_unesco_data(unesco_data)
.with_name()
.with_institution_type(classifier)
.with_location()
.with_unesco_identifiers()
.with_provenance()
.build()
)
return Ok(custodian)
except Exception as e:
return Err(f"Parsing error: {str(e)}")
# Usage
result = parse_unesco_site(site_data)
if result.is_ok():
custodian = result.unwrap()
print(f"Successfully parsed: {custodian.name}")
else:
print(f"Parsing failed: {result.error}")
# Functional chaining
result = (
parse_unesco_site(site_data)
.map(lambda c: enrich_with_wikidata(c))
.map(lambda c: generate_ghcid(c))
)
Benefits:
- ✅ Explicit error handling (no hidden exceptions)
- ✅ Type-safe (mypy can verify error handling)
- ✅ Functional composition (map, and_then)
- ✅ Forces callers to handle errors
Pattern 5: Adapter Pattern (LinkML Map Integration)
Purpose: Adapt UNESCO JSON structure to LinkML schema using LinkML Map.
Implementation:
# src/glam_extractor/adapters/linkml_adapter.py
from typing import Dict
from linkml_map import Mapper
from glam_extractor.models import HeritageCustodian
class LinkMLAdapter:
"""Adapter for transforming UNESCO JSON to LinkML instances."""
def __init__(self, map_schema_path: str):
self.mapper = Mapper(schema_path=map_schema_path)
def adapt(self, unesco_data: Dict) -> Dict:
"""
Transform UNESCO JSON to LinkML-compliant dictionary.
Args:
unesco_data: Raw UNESCO API response
Returns:
Dictionary conforming to HeritageCustodian schema
"""
return self.mapper.transform(unesco_data)
def adapt_to_model(self, unesco_data: Dict) -> HeritageCustodian:
"""
Transform UNESCO JSON directly to HeritageCustodian model.
Args:
unesco_data: Raw UNESCO API response
Returns:
HeritageCustodian instance
"""
adapted = self.adapt(unesco_data)
return HeritageCustodian(**adapted)
# Alternative: Manual adapter (if LinkML Map not sufficient)
class ManualLinkMLAdapter:
"""Manual transformation when LinkML Map can't handle complex logic."""
def __init__(self, classifier: InstitutionTypeClassifier):
self.classifier = classifier
def adapt(self, unesco_data: Dict) -> Dict:
"""Manually transform UNESCO data to LinkML format."""
# Extract multi-language names
primary_name = unesco_data.get('site', '')
alt_names = [
f"{n['name']}@{n['lang']}"
for n in unesco_data.get('names', [])
if n['name'] != primary_name
]
# Classify institution type
classification = self.classifier.classify(unesco_data)
# Build location
location = {
'city': self._extract_city(unesco_data),
'country': self._extract_country_code(unesco_data),
'latitude': unesco_data.get('latitude'),
'longitude': unesco_data.get('longitude')
}
# Build identifiers
identifiers = [
{
'identifier_scheme': 'UNESCO_WHC',
'identifier_value': str(unesco_data['id_number']),
'identifier_url': f"https://whc.unesco.org/en/list/{unesco_data['id_number']}"
}
]
# Extract Wikidata from links
for link in unesco_data.get('links', []):
if 'wikidata.org' in link.get('url', ''):
q_number = link['url'].split('/')[-1]
identifiers.append({
'identifier_scheme': 'Wikidata',
'identifier_value': q_number,
'identifier_url': link['url']
})
return {
'name': primary_name,
'alternative_names': alt_names,
'institution_type': classification['institution_type'],
'locations': [location],
'identifiers': identifiers,
'description': unesco_data.get('short_description', ''),
'provenance': {
'data_source': 'UNESCO_WORLD_HERITAGE',
'data_tier': 'TIER_1_AUTHORITATIVE',
'extraction_date': datetime.now(timezone.utc).isoformat(),
'confidence_score': classification['confidence_score']
}
}
Benefits:
- ✅ Abstracts transformation logic
- ✅ Easy to switch between LinkML Map and manual transformation
- ✅ Testable in isolation
- ✅ Handles complex transformations LinkML Map can't express
Usage:
# Use LinkML Map adapter
adapter = LinkMLAdapter("schemas/maps/unesco_to_heritage_custodian.yaml")
custodian_dict = adapter.adapt(unesco_site_data)
# Or use manual adapter for complex cases
manual_adapter = ManualLinkMLAdapter(classifier)
custodian_dict = manual_adapter.adapt(unesco_site_data)
Pattern 6: Factory Pattern (GHCID Generation)
Purpose: Encapsulate GHCID generation logic with different strategies (base GHCID, with native name suffix for collisions, etc.).
Note
: Collision resolution uses native language institution name in snake_case format (NOT Wikidata Q-numbers). See
docs/plan/global_glam/07-ghcid-collision-resolution.mdfor authoritative documentation.
Implementation:
# src/glam_extractor/identifiers/ghcid_factory.py
from abc import ABC, abstractmethod
import uuid
import hashlib
from glam_extractor.models import HeritageCustodian
class GHCIDGenerator(ABC):
"""Abstract factory for GHCID generation."""
@abstractmethod
def generate(self, custodian: HeritageCustodian) -> str:
"""Generate GHCID string."""
pass
def generate_uuid_v5(self, ghcid: str) -> uuid.UUID:
"""Generate deterministic UUID v5 from GHCID."""
namespace = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # DNS namespace
return uuid.uuid5(namespace, ghcid)
def generate_uuid_v8_sha256(self, ghcid: str) -> uuid.UUID:
"""Generate deterministic UUID v8 using SHA-256."""
hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest()[:16]
# Set version (8) and variant bits
hash_bytes = bytearray(hash_bytes)
hash_bytes[6] = (hash_bytes[6] & 0x0F) | 0x80 # Version 8
hash_bytes[8] = (hash_bytes[8] & 0x3F) | 0x80 # Variant RFC 4122
return uuid.UUID(bytes=bytes(hash_bytes))
def generate_numeric_id(self, ghcid: str) -> int:
"""Generate 64-bit numeric ID from GHCID."""
hash_bytes = hashlib.sha256(ghcid.encode('utf-8')).digest()
return int.from_bytes(hash_bytes[:8], byteorder='big')
class BaseGHCIDGenerator(GHCIDGenerator):
"""Generate base GHCID without collision suffix."""
def generate(self, custodian: HeritageCustodian) -> str:
if not custodian.locations:
raise ValueError("Cannot generate GHCID without location")
location = custodian.locations[0]
country = location.country
region = self._get_region_code(location.region)
city = self._get_city_code(location.city)
inst_type = self._get_type_code(custodian.institution_type)
abbrev = self._get_name_abbreviation(custodian.name)
return f"{country}-{region}-{city}-{inst_type}-{abbrev}"
def _get_region_code(self, region: str) -> str:
"""Get 2-3 letter region code."""
# Simplified: Use first 2 letters of region
return region[:2].upper() if region else "XX"
def _get_city_code(self, city: str) -> str:
"""Get 3-letter city code."""
# Use UN/LOCODE or first 3 letters
return city[:3].upper() if city else "XXX"
def _get_type_code(self, inst_type: str) -> str:
"""Get single-letter type code."""
type_map = {
'MUSEUM': 'M',
'LIBRARY': 'L',
'ARCHIVE': 'A',
'GALLERY': 'G',
'BOTANICAL_ZOO': 'B',
'HOLY_SITES': 'H',
'MIXED': 'X'
}
return type_map.get(inst_type, 'X')
def _get_name_abbreviation(self, name: str) -> str:
"""Get 2-5 letter abbreviation from name."""
# Simple: Use first letters of words
words = name.split()
if len(words) == 1:
return words[0][:3].upper()
else:
return ''.join(w[0] for w in words[:5]).upper()
class NameSuffixGHCIDGenerator(GHCIDGenerator):
"""Generate GHCID with native language name suffix for collision resolution.
DEPRECATED: QNumberGHCIDGenerator (uses Wikidata Q-numbers)
CURRENT: NameSuffixGHCIDGenerator (uses native language name in snake_case)
See: docs/plan/global_glam/07-ghcid-collision-resolution.md
"""
def __init__(self, base_generator: BaseGHCIDGenerator):
self.base_generator = base_generator
def generate(self, custodian: HeritageCustodian) -> str:
base_ghcid = self.base_generator.generate(custodian)
# Generate native name suffix in snake_case
name_suffix = self._generate_name_suffix(custodian.name)
if name_suffix:
return f"{base_ghcid}-{name_suffix}"
else:
return base_ghcid
def _generate_name_suffix(self, native_name: str) -> str:
"""Convert native language institution name to snake_case suffix.
Examples:
"Stedelijk Museum Amsterdam" -> "stedelijk_museum_amsterdam"
"Musee d'Orsay" -> "musee_dorsay"
"Osterreichische Nationalbibliothek" -> "osterreichische_nationalbibliothek"
"""
import re
import unicodedata
# Normalize unicode (NFD decomposition) and remove diacritics
normalized = unicodedata.normalize('NFD', native_name)
ascii_name = ''.join(c for c in normalized if unicodedata.category(c) != 'Mn')
# Convert to lowercase
lowercase = ascii_name.lower()
# Remove apostrophes, commas, and other punctuation
no_punct = re.sub(r"[''`\",.:;!?()[\]{}]", '', lowercase)
# Replace spaces and hyphens with underscores
underscored = re.sub(r'[\s\-]+', '_', no_punct)
# Remove any remaining non-alphanumeric characters (except underscores)
clean = re.sub(r'[^a-z0-9_]', '', underscored)
# Collapse multiple underscores
final = re.sub(r'_+', '_', clean).strip('_')
return final
# DEPRECATED: QNumberGHCIDGenerator - DO NOT USE
# Wikidata Q-numbers are no longer used for collision resolution.
# Use NameSuffixGHCIDGenerator instead.
# See: docs/plan/global_glam/07-ghcid-collision-resolution.md
class CollisionAwareGHCIDGenerator(GHCIDGenerator):
"""Generate GHCID with collision detection and name suffix resolution."""
def __init__(
self,
base_generator: BaseGHCIDGenerator,
existing_dataset: List[HeritageCustodian]
):
self.base_generator = base_generator
self.name_suffix_generator = NameSuffixGHCIDGenerator(base_generator)
self.existing_ghcids = {c.ghcid for c in existing_dataset}
def generate(self, custodian: HeritageCustodian) -> str:
base_ghcid = self.base_generator.generate(custodian)
# Check for collision
if base_ghcid in self.existing_ghcids:
# Collision detected, use name suffix
return self.name_suffix_generator.generate(custodian)
else:
return base_ghcid
Benefits:
- ✅ Separation of concerns (base GHCID vs. name suffix logic)
- ✅ Easy to extend with new generation strategies
- ✅ Collision detection encapsulated
- ✅ Testable components
Usage:
# Generate base GHCID
base_gen = BaseGHCIDGenerator()
ghcid = base_gen.generate(custodian) # "FR-IDF-PAR-M-LOU"
# Generate with name suffix
name_gen = NameSuffixGHCIDGenerator(base_gen)
ghcid = name_gen.generate(custodian) # "FR-IDF-PAR-M-LOU-musee_du_louvre"
# Generate with collision detection
existing_dataset = load_existing_dataset()
collision_gen = CollisionAwareGHCIDGenerator(base_gen, existing_dataset)
ghcid = collision_gen.generate(custodian) # Auto-appends name suffix if collision
Pattern 7: Observer Pattern (Progress Tracking)
Purpose: Track extraction progress and notify listeners (logging, progress bars, metrics).
Implementation:
# src/glam_extractor/observers/progress_observer.py
from abc import ABC, abstractmethod
from typing import List, Dict
class ProgressObserver(ABC):
"""Abstract observer for extraction progress."""
@abstractmethod
def on_start(self, total_sites: int):
"""Called when extraction starts."""
pass
@abstractmethod
def on_site_success(self, site_id: int, custodian: HeritageCustodian):
"""Called when a site is successfully extracted."""
pass
@abstractmethod
def on_site_error(self, site_id: int, error: str):
"""Called when a site extraction fails."""
pass
@abstractmethod
def on_complete(self, success_count: int, error_count: int):
"""Called when extraction completes."""
pass
class LoggingObserver(ProgressObserver):
"""Observer that logs to a file."""
def __init__(self, log_file: Path):
self.log_file = log_file
def on_start(self, total_sites: int):
with open(self.log_file, 'a') as f:
f.write(f"[{datetime.now()}] Starting extraction of {total_sites} sites\n")
def on_site_success(self, site_id: int, custodian: HeritageCustodian):
with open(self.log_file, 'a') as f:
f.write(f"[{datetime.now()}] SUCCESS: Site {site_id} - {custodian.name}\n")
def on_site_error(self, site_id: int, error: str):
with open(self.log_file, 'a') as f:
f.write(f"[{datetime.now()}] ERROR: Site {site_id} - {error}\n")
def on_complete(self, success_count: int, error_count: int):
with open(self.log_file, 'a') as f:
f.write(f"[{datetime.now()}] Completed: {success_count} success, {error_count} errors\n")
class ProgressBarObserver(ProgressObserver):
"""Observer that displays a progress bar."""
def __init__(self):
self.pbar = None
def on_start(self, total_sites: int):
from tqdm import tqdm
self.pbar = tqdm(total=total_sites, desc="Extracting UNESCO sites")
def on_site_success(self, site_id: int, custodian: HeritageCustodian):
if self.pbar:
self.pbar.update(1)
self.pbar.set_postfix_str(f"Last: {custodian.name[:30]}")
def on_site_error(self, site_id: int, error: str):
if self.pbar:
self.pbar.update(1)
def on_complete(self, success_count: int, error_count: int):
if self.pbar:
self.pbar.close()
class MetricsObserver(ProgressObserver):
"""Observer that collects metrics."""
def __init__(self):
self.metrics = {
'start_time': None,
'end_time': None,
'total_sites': 0,
'success_count': 0,
'error_count': 0,
'institution_types': {},
'countries': {}
}
def on_start(self, total_sites: int):
self.metrics['start_time'] = datetime.now()
self.metrics['total_sites'] = total_sites
def on_site_success(self, site_id: int, custodian: HeritageCustodian):
self.metrics['success_count'] += 1
# Track institution types
inst_type = custodian.institution_type
self.metrics['institution_types'][inst_type] = \
self.metrics['institution_types'].get(inst_type, 0) + 1
# Track countries
if custodian.locations:
country = custodian.locations[0].country
self.metrics['countries'][country] = \
self.metrics['countries'].get(country, 0) + 1
def on_site_error(self, site_id: int, error: str):
self.metrics['error_count'] += 1
def on_complete(self, success_count: int, error_count: int):
self.metrics['end_time'] = datetime.now()
duration = self.metrics['end_time'] - self.metrics['start_time']
self.metrics['duration_seconds'] = duration.total_seconds()
def get_metrics(self) -> Dict:
return self.metrics
class UNESCOExtractor:
"""Extraction orchestrator with observer pattern."""
def __init__(self, repository: UNESCORepository):
self.repository = repository
self.observers: List[ProgressObserver] = []
def add_observer(self, observer: ProgressObserver):
self.observers.append(observer)
def extract_all_sites(self) -> List[HeritageCustodian]:
"""Extract all UNESCO sites with progress tracking."""
site_list = self.repository.fetch_site_list()
total_sites = len(site_list)
# Notify observers: start
for observer in self.observers:
observer.on_start(total_sites)
results = []
errors = 0
for site_data in site_list:
site_id = site_data['id_number']
try:
# Fetch details and parse
details = self.repository.fetch_site_details(site_id)
custodian = parse_unesco_site(details)
results.append(custodian)
# Notify observers: success
for observer in self.observers:
observer.on_site_success(site_id, custodian)
except Exception as e:
errors += 1
# Notify observers: error
for observer in self.observers:
observer.on_site_error(site_id, str(e))
# Notify observers: complete
for observer in self.observers:
observer.on_complete(len(results), errors)
return results
Benefits:
- ✅ Decouples extraction logic from logging/progress tracking
- ✅ Easy to add new observers without modifying extractor
- ✅ Multiple observers can run simultaneously (logging + progress bar + metrics)
- ✅ Testable (mock observers in tests)
Usage:
# Set up extraction with observers
extractor = UNESCOExtractor(repository)
extractor.add_observer(LoggingObserver(Path("logs/unesco_extraction.log")))
extractor.add_observer(ProgressBarObserver())
metrics_observer = MetricsObserver()
extractor.add_observer(metrics_observer)
# Run extraction
custodians = extractor.extract_all_sites()
# Get metrics after completion
metrics = metrics_observer.get_metrics()
print(f"Extracted {metrics['success_count']} sites in {metrics['duration_seconds']:.2f}s")
Integration Patterns with Existing GLAM Architecture
Pattern 8: Consistent Provenance Tracking
Align with existing GLAM patterns:
# Match provenance structure from Dutch ISIL/Orgs parsers
provenance = Provenance(
data_source=DataSource.UNESCO_WORLD_HERITAGE, # Add to enums.yaml
data_tier=DataTier.TIER_1_AUTHORITATIVE,
extraction_date=datetime.now(timezone.utc).isoformat(),
extraction_method="UNESCO DataHub API with LinkML Map transformation",
confidence_score=classification_result['confidence_score'],
# Notes go in custodian.description, NOT provenance (schema quirk!)
)
Pattern 9: GHCID Collision Resolution
Follow existing collision handling:
# From AGENTS.md - Temporal priority rule
def resolve_ghcid_collision(
new_custodian: HeritageCustodian,
existing_custodian: HeritageCustodian
) -> str:
"""Apply temporal priority rule for GHCID collision."""
new_date = new_custodian.provenance.extraction_date
existing_date = existing_custodian.provenance.extraction_date
if new_date == existing_date:
# First batch collision: BOTH get name suffixes
return generate_ghcid_with_name_suffix(new_custodian)
else:
# Historical addition: ONLY new gets name suffix
# Existing GHCID preserved (PID stability)
return generate_ghcid_with_name_suffix(new_custodian)
Pattern 10: Multi-format Export
Reuse existing exporter patterns:
# Match structure from Dutch dataset exporters
class UNESCOExportPipeline:
"""Export pipeline matching existing GLAM patterns."""
def __init__(self, dataset: List[HeritageCustodian]):
self.dataset = dataset
def export_all_formats(self, output_dir: Path):
"""Export to all formats (RDF, JSON-LD, CSV, Parquet, SQLite)."""
self.export_rdf(output_dir / "unesco_glam.ttl")
self.export_jsonld(output_dir / "unesco_glam.jsonld")
self.export_csv(output_dir / "unesco_glam.csv")
self.export_parquet(output_dir / "unesco_glam.parquet")
self.export_sqlite(output_dir / "unesco_glam.db")
Testing Patterns
Pattern 11: Test Fixture Management
# Centralized fixture loading
@pytest.fixture(scope="session")
def unesco_fixtures():
"""Load all UNESCO test fixtures."""
return UNESCOFixtureRepository(Path("tests/fixtures/unesco_api_responses"))
@pytest.fixture
def sample_museum_site(unesco_fixtures):
"""Fixture for a museum site."""
return unesco_fixtures.fetch_site_details(600) # Louvre
@pytest.fixture
def sample_library_site(unesco_fixtures):
"""Fixture for a library site."""
return unesco_fixtures.fetch_site_details(1110) # National Library of Vietnam
Pattern 12: Property-Based Testing for GHCIDs
from hypothesis import given, strategies as st
@given(
country=st.sampled_from(["FR", "NL", "BR", "IN", "JP"]),
city=st.text(min_size=3, max_size=20),
inst_type=st.sampled_from(list(InstitutionTypeEnum))
)
def test_ghcid_determinism(country, city, inst_type):
"""Property: GHCID generation is deterministic."""
custodian = create_test_custodian(country, city, inst_type)
ghcid1 = generator.generate(custodian)
ghcid2 = generator.generate(custodian)
assert ghcid1 == ghcid2
Anti-Patterns to Avoid
❌ Anti-Pattern 1: God Class
Bad:
class UNESCOExtractor:
def fetch_data(self): ...
def parse_data(self): ...
def classify_type(self): ...
def generate_ghcid(self): ...
def validate(self): ...
def export_rdf(self): ...
# 1000+ lines, does everything!
Good: Separate concerns (Repository, Parser, Classifier, Generator, Exporter)
❌ Anti-Pattern 2: Primitive Obsession
Bad:
def generate_ghcid(country: str, city: str, type: str, name: str) -> str:
# 4 primitive parameters, easy to mix up
Good: Use domain objects
def generate_ghcid(custodian: HeritageCustodian) -> str:
# Single parameter, type-safe
❌ Anti-Pattern 3: Exception-Driven Control Flow
Bad:
try:
custodian = parse(data)
return custodian
except ValueError:
return None # Silent failure, hard to debug
Good: Use Result pattern
result = parse(data)
if result.is_err():
log.error(f"Parse error: {result.error}")
return None
return result.unwrap()
Summary of Patterns
| Pattern | Purpose | Benefit |
|---|---|---|
| Repository | Abstract data access | Testability, flexibility |
| Strategy | Pluggable classification | Extensibility, A/B testing |
| Builder | Complex object construction | Fluent API, step-by-step validation |
| Result | Explicit error handling | Type safety, no hidden exceptions |
| Adapter | LinkML integration | Abstraction, testability |
| Factory | GHCID generation | Separation of concerns, extensibility |
| Observer | Progress tracking | Decoupling, multiple listeners |
| Provenance | Consistent metadata | Integration with existing GLAM patterns |
| Collision Resolution | GHCID uniqueness | PID stability, determinism |
| Multi-format Export | Standard outputs | Reusability, consumer flexibility |
Next Steps
Completed:
- ✅
01-dependencies.md- Technical dependencies - ✅
02-consumers.md- Use cases and data consumers - ✅
03-implementation-phases.md- 6-week timeline - ✅
04-tdd-strategy.md- Testing strategy - ✅
05-design-patterns.md- THIS DOCUMENT - Architectural patterns
Next to Create:
06-linkml-map-schema.md- CRITICAL - Complete LinkML Map transformation rules07-master-checklist.md- Implementation checklist tying everything together
Document Status: Complete
Next Document: 06-linkml-map-schema.md - LinkML Map transformation rules
Version: 1.0