29 KiB
Global GLAM Dataset: Design Patterns
Overview
This document describes the software design patterns and architectural principles used throughout the Global GLAM Dataset extraction system.
Core Design Principles
1. Separation of Concerns
- Parsing ≠ Extraction ≠ Validation ≠ Export
- Each component has a single, well-defined responsibility
- Components communicate through clearly defined interfaces
2. Data Pipeline Architecture
- Immutable intermediate stages
- Clear data transformations at each step
- Traceable provenance from source to output
3. Fail-Safe Processing
- Graceful degradation when external services unavailable
- Partial results better than no results
- Continue processing even when individual items fail
4. Provenance Tracking
- Every piece of data tracks its source
- Confidence scores for derived data
- Audit trail for transformations
Architectural Patterns
Pattern 1: Pipeline Pattern
Intent: Process data through a series of sequential transformations
Structure:
from abc import ABC, abstractmethod
from typing import Generic, TypeVar, List
TInput = TypeVar('TInput')
TOutput = TypeVar('TOutput')
class PipelineStage(ABC, Generic[TInput, TOutput]):
"""Base class for pipeline stages"""
@abstractmethod
def process(self, input_data: TInput) -> TOutput:
"""Transform input to output"""
pass
@abstractmethod
def validate_input(self, input_data: TInput) -> bool:
"""Validate input before processing"""
pass
def handle_error(self, error: Exception, input_data: TInput) -> TOutput:
"""Handle processing errors gracefully"""
pass
class Pipeline(Generic[TInput, TOutput]):
"""Compose multiple stages into a pipeline"""
def __init__(self):
self.stages: List[PipelineStage] = []
def add_stage(self, stage: PipelineStage) -> 'Pipeline':
self.stages.append(stage)
return self # Fluent interface
def execute(self, input_data: TInput) -> TOutput:
"""Execute all stages in sequence"""
current_data = input_data
for stage in self.stages:
if stage.validate_input(current_data):
try:
current_data = stage.process(current_data)
except Exception as e:
current_data = stage.handle_error(e, current_data)
else:
raise ValueError(f"Invalid input for stage {stage.__class__.__name__}")
return current_data
Usage:
# Build extraction pipeline
pipeline = (Pipeline()
.add_stage(ConversationParser())
.add_stage(EntityExtractor())
.add_stage(AttributeExtractor())
.add_stage(WebCrawler())
.add_stage(LinkMLMapper())
.add_stage(RDFExporter())
)
# Process a conversation file
result = pipeline.execute(conversation_file)
Benefits:
- Clear data flow
- Easy to add/remove stages
- Testable in isolation
- Composable
Pattern 2: Repository Pattern
Intent: Separate data access logic from business logic
Structure:
from abc import ABC, abstractmethod
from typing import List, Optional, TypeVar, Generic
T = TypeVar('T')
class Repository(ABC, Generic[T]):
"""Abstract repository for data access"""
@abstractmethod
def get_by_id(self, id: str) -> Optional[T]:
"""Retrieve single entity by ID"""
pass
@abstractmethod
def get_all(self) -> List[T]:
"""Retrieve all entities"""
pass
@abstractmethod
def save(self, entity: T) -> None:
"""Persist entity"""
pass
@abstractmethod
def delete(self, id: str) -> None:
"""Delete entity"""
pass
@abstractmethod
def find_by(self, **criteria) -> List[T]:
"""Find entities matching criteria"""
pass
class ConversationRepository(Repository[Conversation]):
"""Repository for conversation files"""
def __init__(self, data_dir: Path):
self.data_dir = data_dir
self._index = self._build_index()
def get_by_id(self, id: str) -> Optional[Conversation]:
if id in self._index:
return self._load_conversation(self._index[id])
return None
def find_by(self, **criteria) -> List[Conversation]:
"""
Find conversations by criteria
Examples:
- find_by(country='Brazil')
- find_by(date_after='2025-09-01')
- find_by(glam_type='museum')
"""
results = []
for conv in self.get_all():
if self._matches_criteria(conv, criteria):
results.append(conv)
return results
class InstitutionRepository(Repository[HeritageCustodian]):
"""Repository for heritage institutions"""
def __init__(self, db_path: Path):
self.db = duckdb.connect(str(db_path))
def get_by_id(self, id: str) -> Optional[HeritageCustodian]:
result = self.db.execute(
"SELECT * FROM institutions WHERE id = ?", [id]
).fetchone()
return self._to_entity(result) if result else None
def find_by_isil(self, isil_code: str) -> Optional[HeritageCustodian]:
"""Domain-specific query method"""
return self.find_by(isil_code=isil_code)[0] if \
self.find_by(isil_code=isil_code) else None
Benefits:
- Swap storage backends (SQLite, DuckDB, files)
- Easier testing (mock repositories)
- Centralized data access logic
- Domain-specific query methods
Pattern 3: Strategy Pattern
Intent: Define a family of algorithms and make them interchangeable
Structure:
class ExtractionStrategy(ABC):
"""Base strategy for entity extraction"""
@abstractmethod
def extract_institutions(self, text: str) -> List[ExtractedInstitution]:
pass
class SpacyNERStrategy(ExtractionStrategy):
"""Use spaCy for entity extraction"""
def __init__(self, model_name: str = "en_core_web_lg"):
self.nlp = spacy.load(model_name)
def extract_institutions(self, text: str) -> List[ExtractedInstitution]:
doc = self.nlp(text)
institutions = []
for ent in doc.ents:
if ent.label_ in ["ORG", "FAC"]:
# Extract institution
inst = self._parse_entity(ent)
institutions.append(inst)
return institutions
class TransformerNERStrategy(ExtractionStrategy):
"""Use transformer models for entity extraction"""
def __init__(self, model_name: str = "dslim/bert-base-NER"):
from transformers import pipeline
self.ner = pipeline("ner", model=model_name)
def extract_institutions(self, text: str) -> List[ExtractedInstitution]:
entities = self.ner(text)
# Process transformer output...
return institutions
class RegexPatternStrategy(ExtractionStrategy):
"""Use regex patterns for extraction"""
def __init__(self, patterns: Dict[str, str]):
self.patterns = {k: re.compile(v) for k, v in patterns.items()}
def extract_institutions(self, text: str) -> List[ExtractedInstitution]:
# Pattern-based extraction...
return institutions
# Usage
class EntityExtractor:
def __init__(self, strategy: ExtractionStrategy):
self.strategy = strategy
def set_strategy(self, strategy: ExtractionStrategy):
"""Change extraction strategy at runtime"""
self.strategy = strategy
def extract(self, text: str) -> List[ExtractedInstitution]:
return self.strategy.extract_institutions(text)
# Configure different strategies for different use cases
extractor = EntityExtractor(SpacyNERStrategy()) # Default
extractor.set_strategy(TransformerNERStrategy()) # Switch to transformer
Benefits:
- Easy to add new extraction methods
- Switch algorithms at runtime
- Compare performance of different approaches
- A/B testing different strategies
Pattern 4: Builder Pattern
Intent: Construct complex objects step by step
Structure:
class HeritageCustodianBuilder:
"""Builder for HeritageCustodian instances"""
def __init__(self):
self._institution = HeritageCustodian()
self._provenance = []
def with_name(self, name: str, source: str = None) -> 'HeritageCustodianBuilder':
self._institution.name = name
if source:
self._provenance.append(ProvenanceRecord(
field="name", source=source, confidence=1.0
))
return self
def with_isil_code(self, code: str, source: str = None) -> 'HeritageCustodianBuilder':
self._institution.isil_code = code
if source:
self._provenance.append(ProvenanceRecord(
field="isil_code", source=source, confidence=1.0
))
return self
def with_type(self, inst_type: str, confidence: float = 1.0) -> 'HeritageCustodianBuilder':
self._institution.institution_types.append(inst_type)
self._provenance.append(ProvenanceRecord(
field="institution_types",
source="inferred",
confidence=confidence
))
return self
def with_location(self, city: str, country: str,
coordinates: Tuple[float, float] = None) -> 'HeritageCustodianBuilder':
self._institution.geographic_coverage = GeographicCoverage(
municipality=city,
country=country,
coordinates=coordinates
)
return self
def from_csv_record(self, record: DutchOrganizationRecord) -> 'HeritageCustodianBuilder':
"""Bulk population from CSV record"""
self.with_name(record.organization_name, source="dutch_org_registry")
if record.isil_code:
self.with_isil_code(record.isil_code, source="dutch_org_registry")
self.with_location(record.city, "Netherlands", record.coordinates)
# ... more fields
return self
def from_extracted_entity(self, entity: ExtractedInstitution) -> 'HeritageCustodianBuilder':
"""Bulk population from NLP extraction"""
self.with_name(entity.name, source="nlp_extraction")
if entity.isil_code:
self.with_isil_code(entity.isil_code, source="nlp_extraction")
for inst_type in entity.institution_type:
self.with_type(inst_type, confidence=entity.extraction_confidence)
# ... more fields
return self
def merge_from(self, other: HeritageCustodian,
priority: str = "authoritative") -> 'HeritageCustodianBuilder':
"""Merge data from another instance"""
# Complex merge logic...
return self
def build(self) -> HeritageCustodian:
"""Construct final instance with provenance"""
self._institution.provenance = self._provenance
return self._institution
# Usage
institution = (HeritageCustodianBuilder()
.with_name("Rijksmuseum")
.with_isil_code("NL-AsdRM")
.with_type("museum")
.with_location("Amsterdam", "Netherlands", (52.36, 4.88))
.build()
)
# Or from existing data sources
institution = (HeritageCustodianBuilder()
.from_csv_record(csv_record)
.merge_from(extracted_entity)
.build()
)
Benefits:
- Complex object construction
- Fluent, readable API
- Track provenance during construction
- Merge data from multiple sources
Pattern 5: Adapter Pattern
Intent: Convert interfaces to work together
Structure:
class WebCrawlerAdapter(ABC):
"""Adapter for different web crawling libraries"""
@abstractmethod
async def fetch(self, url: str) -> CrawlResult:
pass
@abstractmethod
async def extract_metadata(self, url: str) -> Dict[str, Any]:
pass
class Crawl4AIAdapter(WebCrawlerAdapter):
"""Adapter for crawl4ai library"""
def __init__(self):
from crawl4ai import AsyncWebCrawler
self.crawler = AsyncWebCrawler()
async def fetch(self, url: str) -> CrawlResult:
result = await self.crawler.arun(url)
# Convert crawl4ai result to our CrawlResult format
return CrawlResult(
url=url,
content=result.extracted_content,
metadata=result.metadata,
status_code=result.status_code
)
async def extract_metadata(self, url: str) -> Dict[str, Any]:
result = await self.fetch(url)
# Extract Schema.org, OpenGraph, etc.
return self._parse_metadata(result.content)
class BeautifulSoupAdapter(WebCrawlerAdapter):
"""Adapter for BeautifulSoup (fallback)"""
async def fetch(self, url: str) -> CrawlResult:
async with httpx.AsyncClient() as client:
response = await client.get(url)
soup = BeautifulSoup(response.text, 'lxml')
return CrawlResult(
url=url,
content=soup.get_text(),
metadata=self._extract_meta_tags(soup),
status_code=response.status_code
)
# Usage - swap implementations transparently
class WebCrawler:
def __init__(self, adapter: WebCrawlerAdapter):
self.adapter = adapter
async def crawl(self, url: str) -> CrawlResult:
return await self.adapter.fetch(url)
# Try crawl4ai, fall back to BeautifulSoup
try:
crawler = WebCrawler(Crawl4AIAdapter())
except ImportError:
crawler = WebCrawler(BeautifulSoupAdapter())
Benefits:
- Swap implementations easily
- Isolate external dependencies
- Fallback mechanisms
- Easier testing with mock adapters
Pattern 6: Observer Pattern
Intent: Notify observers when processing events occur
Structure:
from typing import List, Callable
class ProcessingEvent:
"""Base class for events"""
def __init__(self, stage: str, data: Any):
self.stage = stage
self.data = data
self.timestamp = datetime.now()
class Observer(ABC):
"""Observer interface"""
@abstractmethod
def update(self, event: ProcessingEvent) -> None:
pass
class LoggingObserver(Observer):
"""Log all processing events"""
def update(self, event: ProcessingEvent) -> None:
logging.info(f"[{event.stage}] {event.timestamp}: {event.data}")
class MetricsObserver(Observer):
"""Collect metrics on processing"""
def __init__(self):
self.metrics = defaultdict(list)
def update(self, event: ProcessingEvent) -> None:
self.metrics[event.stage].append({
'timestamp': event.timestamp,
'success': event.data.get('success', True),
'duration': event.data.get('duration', 0)
})
class ProgressObserver(Observer):
"""Display progress bars"""
def __init__(self):
from tqdm import tqdm
self.progress_bars = {}
def update(self, event: ProcessingEvent) -> None:
if event.stage not in self.progress_bars:
self.progress_bars[event.stage] = tqdm(desc=event.stage)
self.progress_bars[event.stage].update(1)
class Observable:
"""Observable base class for pipeline stages"""
def __init__(self):
self._observers: List[Observer] = []
def attach(self, observer: Observer) -> None:
self._observers.append(observer)
def detach(self, observer: Observer) -> None:
self._observers.remove(observer)
def notify(self, event: ProcessingEvent) -> None:
for observer in self._observers:
observer.update(event)
# Usage
class ObservablePipeline(Pipeline, Observable):
"""Pipeline with event notifications"""
def execute(self, input_data):
self.notify(ProcessingEvent('pipeline_start', {'input': input_data}))
for stage in self.stages:
self.notify(ProcessingEvent(stage.__class__.__name__, {'status': 'starting'}))
start = time.time()
result = stage.process(input_data)
duration = time.time() - start
self.notify(ProcessingEvent(stage.__class__.__name__, {
'status': 'completed',
'duration': duration,
'success': True
}))
self.notify(ProcessingEvent('pipeline_complete', {'output': result}))
return result
# Attach observers
pipeline = ObservablePipeline()
pipeline.attach(LoggingObserver())
pipeline.attach(MetricsObserver())
pipeline.attach(ProgressObserver())
Benefits:
- Decouple logging, metrics, progress tracking
- Easy to add new observers
- Monitor pipeline execution
- Collect performance data
Pattern 7: Factory Pattern
Intent: Create objects without specifying exact class
Structure:
class ExporterFactory:
"""Factory for creating exporters"""
_exporters = {}
@classmethod
def register(cls, format_name: str, exporter_class: Type[Exporter]):
"""Register an exporter for a format"""
cls._exporters[format_name] = exporter_class
@classmethod
def create(cls, format_name: str, **kwargs) -> Exporter:
"""Create exporter instance"""
if format_name not in cls._exporters:
raise ValueError(f"Unknown format: {format_name}")
exporter_class = cls._exporters[format_name]
return exporter_class(**kwargs)
@classmethod
def available_formats(cls) -> List[str]:
"""List available export formats"""
return list(cls._exporters.keys())
# Register exporters
ExporterFactory.register('rdf', RDFExporter)
ExporterFactory.register('csv', CSVExporter)
ExporterFactory.register('jsonld', JSONLDExporter)
ExporterFactory.register('parquet', ParquetExporter)
# Usage
exporter = ExporterFactory.create('rdf', format='turtle')
exporter.export(institutions, output_path)
# Or export to multiple formats
for format_name in ['rdf', 'csv', 'jsonld']:
exporter = ExporterFactory.create(format_name)
exporter.export(institutions, output_dir / f"output.{format_name}")
Benefits:
- Extensible (register new formats)
- Centralized creation logic
- Easy to discover available formats
Pattern 8: Command Pattern
Intent: Encapsulate requests as objects
Structure:
class Command(ABC):
"""Base command interface"""
@abstractmethod
def execute(self) -> Any:
pass
@abstractmethod
def undo(self) -> None:
pass
class ExtractConversationCommand(Command):
"""Extract institutions from a conversation"""
def __init__(self, conversation_id: str,
conversation_repo: ConversationRepository,
institution_repo: InstitutionRepository,
extractor: EntityExtractor):
self.conversation_id = conversation_id
self.conversation_repo = conversation_repo
self.institution_repo = institution_repo
self.extractor = extractor
self.extracted_institutions = []
def execute(self) -> List[HeritageCustodian]:
# Load conversation
conversation = self.conversation_repo.get_by_id(self.conversation_id)
# Extract institutions
self.extracted_institutions = self.extractor.extract(conversation.content)
# Save to repository
for inst in self.extracted_institutions:
self.institution_repo.save(inst)
return self.extracted_institutions
def undo(self) -> None:
# Remove extracted institutions
for inst in self.extracted_institutions:
self.institution_repo.delete(inst.id)
class CommandQueue:
"""Queue and execute commands"""
def __init__(self):
self.commands = []
self.executed = []
def add_command(self, command: Command) -> None:
self.commands.append(command)
def execute_all(self) -> List[Any]:
results = []
for command in self.commands:
result = command.execute()
self.executed.append(command)
results.append(result)
return results
def undo_last(self) -> None:
if self.executed:
command = self.executed.pop()
command.undo()
# Usage - queue up work
queue = CommandQueue()
for conversation_id in conversation_ids:
cmd = ExtractConversationCommand(
conversation_id,
conversation_repo,
institution_repo,
extractor
)
queue.add_command(cmd)
# Execute all
results = queue.execute_all()
# Undo if needed
if error_detected:
queue.undo_last()
Benefits:
- Queue operations
- Undo/redo support
- Batch processing
- Deferred execution
Data Patterns
Pattern 9: Value Object Pattern
Intent: Objects that represent values (immutable, equality by value)
Structure:
from dataclasses import dataclass, field
from typing import Tuple, Optional
@dataclass(frozen=True) # Immutable
class GeographicCoverage:
"""Value object for geographic location"""
country: str
municipality: Optional[str] = None
province: Optional[str] = None
coordinates: Optional[Tuple[float, float]] = None
def __post_init__(self):
# Validation
if self.coordinates:
lat, lon = self.coordinates
if not (-90 <= lat <= 90 and -180 <= lon <= 180):
raise ValueError("Invalid coordinates")
def distance_to(self, other: 'GeographicCoverage') -> Optional[float]:
"""Calculate distance to another location"""
if self.coordinates and other.coordinates:
# Haversine formula
return calculate_distance(self.coordinates, other.coordinates)
return None
@dataclass(frozen=True)
class Identifier:
"""Value object for identifiers"""
scheme: str # 'isil', 'wikidata', 'kvk', etc.
value: str
assigned_date: Optional[date] = None
def __str__(self):
return f"{self.scheme}:{self.value}"
@dataclass(frozen=True)
class ProvenanceRecord:
"""Value object for provenance tracking"""
source: str # 'isil_registry', 'nlp_extraction', etc.
field: Optional[str] = None
confidence: float = 1.0
timestamp: datetime = field(default_factory=datetime.now)
def __post_init__(self):
if not 0.0 <= self.confidence <= 1.0:
raise ValueError("Confidence must be between 0 and 1")
Benefits:
- Immutable (thread-safe)
- Equality by value
- Self-validating
- Reusable across entities
Pattern 10: Entity Pattern
Intent: Objects with identity that persists over time
Structure:
from dataclasses import dataclass, field
from typing import List
import uuid
@dataclass
class HeritageCustodian:
"""Entity with persistent identity"""
# Identity
id: str = field(default_factory=lambda: str(uuid.uuid4()))
# Essential attributes
name: str = ""
institution_types: List[str] = field(default_factory=list)
# Value objects
geographic_coverage: Optional[GeographicCoverage] = None
identifiers: List[Identifier] = field(default_factory=list)
# Provenance
provenance: List[ProvenanceRecord] = field(default_factory=list)
# Version tracking
version: int = 1
last_updated: datetime = field(default_factory=datetime.now)
def __eq__(self, other):
"""Equality by ID, not by value"""
if not isinstance(other, HeritageCustodian):
return False
return self.id == other.id
def __hash__(self):
return hash(self.id)
def update_name(self, new_name: str, source: str):
"""Update with provenance tracking"""
self.name = new_name
self.provenance.append(ProvenanceRecord(
source=source,
field="name",
confidence=1.0
))
self.version += 1
self.last_updated = datetime.now()
Benefits:
- Clear identity
- Mutable (can be updated)
- Track changes over time
- Distinct from value objects
Error Handling Patterns
Pattern 11: Result Pattern
Intent: Explicit error handling without exceptions
Structure:
from typing import Generic, TypeVar, Union
from dataclasses import dataclass
T = TypeVar('T')
E = TypeVar('E')
@dataclass
class Success(Generic[T]):
"""Successful result"""
value: T
def is_success(self) -> bool:
return True
def is_failure(self) -> bool:
return False
@dataclass
class Failure(Generic[E]):
"""Failed result"""
error: E
def is_success(self) -> bool:
return False
def is_failure(self) -> bool:
return True
Result = Union[Success[T], Failure[E]]
# Usage
def extract_isil_code(text: str) -> Result[str, str]:
"""Extract ISIL code from text"""
pattern = r'(NL-[A-Za-z0-9]+)'
match = re.search(pattern, text)
if match:
return Success(match.group(1))
else:
return Failure("No ISIL code found in text")
# Consumer
result = extract_isil_code(text)
if result.is_success():
print(f"Found ISIL: {result.value}")
else:
print(f"Error: {result.error}")
Benefits:
- Explicit error handling
- No silent failures
- Type-safe
- Forces error consideration
Testing Patterns
Pattern 12: Test Fixture Pattern
Intent: Reusable test data and setup
Structure:
import pytest
from pathlib import Path
@pytest.fixture
def sample_conversation():
"""Fixture providing sample conversation"""
return Conversation(
id="test-001",
title="Brazilian GLAM institutions",
created_at=datetime(2025, 9, 22, 14, 40),
messages=[
Message(role="user", content="Tell me about Brazilian museums"),
Message(role="assistant", content="The National Museum of Brazil...")
],
citations=[
Citation(url="https://www.museu.br", title="National Museum")
]
)
@pytest.fixture
def sample_csv_record():
"""Fixture providing sample CSV record"""
return DutchOrganizationRecord(
organization_name="Rijksmuseum",
organization_type="museum",
city="Amsterdam",
province="Noord-Holland",
isil_code="NL-AsdRM",
website_url="https://www.rijksmuseum.nl"
)
@pytest.fixture
def temp_output_dir(tmp_path):
"""Fixture providing temporary output directory"""
output_dir = tmp_path / "output"
output_dir.mkdir()
return output_dir
# Usage in tests
def test_conversation_parser(sample_conversation):
parser = ConversationParser()
result = parser.parse(sample_conversation)
assert len(result.messages) == 2
def test_csv_to_linkml(sample_csv_record, temp_output_dir):
mapper = LinkMLMapper()
instance = mapper.map_from_csv(sample_csv_record)
exporter = YAMLExporter()
exporter.export(instance, temp_output_dir / "test.yaml")
assert (temp_output_dir / "test.yaml").exists()
Configuration Patterns
Pattern 13: Configuration Object
Intent: Centralized, typed configuration
Structure:
from pydantic import BaseSettings, Field
from typing import List, Optional
class ExtractionConfig(BaseSettings):
"""Configuration for extraction pipeline"""
# NLP settings
spacy_model: str = "en_core_web_lg"
use_transformers: bool = False
transformer_model: str = "dslim/bert-base-NER"
# Web crawling
max_concurrent_requests: int = 10
request_timeout: int = 30
user_agent: str = "GLAM-Dataset-Extractor/0.1.0"
# Geocoding
enable_geocoding: bool = True
geocoding_provider: str = "nominatim"
# Enrichment
enable_wikidata_linking: bool = True
enable_viaf_linking: bool = False
# Output
output_formats: List[str] = ["rdf", "csv", "jsonld"]
output_directory: Path = Path("output/")
# Performance
batch_size: int = 100
max_workers: int = 4
class Config:
env_file = ".env"
env_prefix = "GLAM_"
# Usage
config = ExtractionConfig() # Loads from environment
# Override specific values
config = ExtractionConfig(
spacy_model="nl_core_news_lg", # Dutch model
enable_geocoding=False
)
Benefits:
- Type-safe configuration
- Environment variable support
- Validation
- Documentation
Summary
These design patterns provide:
- Modularity: Components can be developed and tested independently
- Flexibility: Easy to swap implementations (Strategy, Adapter)
- Maintainability: Clear structure and responsibilities
- Extensibility: New features can be added without modifying existing code
- Testability: Patterns like Repository and Fixture make testing easier
- Observability: Observer pattern enables monitoring
- Error Handling: Result pattern makes errors explicit
- Data Quality: Value Objects and Entities ensure valid state
The combination of these patterns creates a robust, maintainable system for processing diverse data sources into a unified heritage dataset.