- Fix scope_note → finding_aid_scope_note in FindingAid.yaml - Remove duplicate wikidata_entity slot from CustodianType.yaml (import instead) - Remove duplicate rico_record_set_type from class_metadata_slots.yaml - Fix range types for equals_string compatibility (uriorcurie → string) - Move class names from close_mappings to see_also in 10 RecordSetTypes files - Generate all RDF formats: OWL, N-Triples, RDF/XML, N3, JSON-LD context - Sync schemas to frontend/public/schemas/ Files: 1,151 changed (includes prior CustodianType migration)
258 lines
8.2 KiB
Python
258 lines
8.2 KiB
Python
"""
|
|
Temporal Conflict Resolution for Heritage Data
|
|
|
|
Handles cases where multiple facts exist for the same property at overlapping times.
|
|
Based on: docs/plan/external_design_patterns/04_temporal_semantic_hypergraph.md
|
|
|
|
Strategies:
|
|
1. Temporal ordering: Use fact valid at query time
|
|
2. Recency: Prefer more recent sources
|
|
3. Authority: Prefer authoritative sources (Tier 1)
|
|
4. Confidence: Use higher confidence facts
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class TemporalFact:
|
|
"""A fact with temporal validity."""
|
|
property: str
|
|
value: str
|
|
valid_from: datetime
|
|
valid_to: Optional[datetime]
|
|
source: str
|
|
confidence: float = 1.0
|
|
ghcid: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class ConflictResolution:
|
|
"""Result of conflict resolution."""
|
|
property: str
|
|
authoritative_value: str
|
|
valid_for_date: datetime
|
|
conflict_type: str
|
|
explanation: str
|
|
alternative_values: list[TemporalFact] = field(default_factory=list)
|
|
|
|
|
|
class TemporalConflictResolver:
|
|
"""
|
|
Resolve conflicts between temporal facts.
|
|
|
|
Uses a multi-factor scoring system:
|
|
- Source authority (Tier 1-4)
|
|
- Confidence scores
|
|
- Temporal recency
|
|
"""
|
|
|
|
SOURCE_AUTHORITY = {
|
|
"TIER_1_AUTHORITATIVE": 1.0,
|
|
"TIER_2_VERIFIED": 0.8,
|
|
"TIER_3_CROWD_SOURCED": 0.6,
|
|
"TIER_4_INFERRED": 0.4,
|
|
}
|
|
|
|
def resolve_conflicts(
|
|
self,
|
|
ghcid: str,
|
|
facts: list[TemporalFact],
|
|
query_date: Optional[datetime] = None
|
|
) -> list[ConflictResolution]:
|
|
"""
|
|
Resolve all conflicts in a set of facts.
|
|
|
|
Args:
|
|
ghcid: Institution identifier
|
|
facts: All facts about the institution
|
|
query_date: Point in time for resolution (default: now)
|
|
|
|
Returns:
|
|
List of conflict resolutions with authoritative values
|
|
"""
|
|
if query_date is None:
|
|
query_date = datetime.now()
|
|
|
|
# Group facts by property
|
|
by_property: dict[str, list[TemporalFact]] = {}
|
|
for fact in facts:
|
|
by_property.setdefault(fact.property, []).append(fact)
|
|
|
|
resolutions = []
|
|
|
|
for prop, prop_facts in by_property.items():
|
|
# Find facts valid at query_date
|
|
valid_facts = [
|
|
f for f in prop_facts
|
|
if f.valid_from <= query_date and
|
|
(f.valid_to is None or f.valid_to > query_date)
|
|
]
|
|
|
|
if len(valid_facts) <= 1:
|
|
# No conflict
|
|
continue
|
|
|
|
# Multiple valid facts - resolve conflict
|
|
resolution = self._resolve_property_conflict(
|
|
prop, valid_facts, query_date
|
|
)
|
|
resolutions.append(resolution)
|
|
|
|
return resolutions
|
|
|
|
def get_authoritative_value(
|
|
self,
|
|
ghcid: str,
|
|
property: str,
|
|
facts: list[TemporalFact],
|
|
query_date: Optional[datetime] = None
|
|
) -> Optional[str]:
|
|
"""
|
|
Get the authoritative value for a single property.
|
|
|
|
Convenience method for single-property lookups.
|
|
"""
|
|
if query_date is None:
|
|
query_date = datetime.now()
|
|
|
|
# Filter facts for this property
|
|
prop_facts = [f for f in facts if f.property == property]
|
|
|
|
if not prop_facts:
|
|
return None
|
|
|
|
# Find facts valid at query_date
|
|
valid_facts = [
|
|
f for f in prop_facts
|
|
if f.valid_from <= query_date and
|
|
(f.valid_to is None or f.valid_to > query_date)
|
|
]
|
|
|
|
if not valid_facts:
|
|
return None
|
|
|
|
if len(valid_facts) == 1:
|
|
return valid_facts[0].value
|
|
|
|
# Resolve conflict
|
|
resolution = self._resolve_property_conflict(property, valid_facts, query_date)
|
|
return resolution.authoritative_value
|
|
|
|
def _resolve_property_conflict(
|
|
self,
|
|
property: str,
|
|
facts: list[TemporalFact],
|
|
query_date: datetime
|
|
) -> ConflictResolution:
|
|
"""
|
|
Resolve conflict for a single property.
|
|
"""
|
|
# Score each fact
|
|
scored = []
|
|
for fact in facts:
|
|
score = self._compute_authority_score(fact)
|
|
scored.append((fact, score))
|
|
|
|
# Sort by score (descending)
|
|
scored.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
winner = scored[0][0]
|
|
alternatives = [f for f, s in scored[1:]]
|
|
|
|
# Determine conflict type
|
|
if all(f.value == winner.value for f in facts):
|
|
conflict_type = "redundant" # Same value from multiple sources
|
|
elif self._is_name_change(facts):
|
|
conflict_type = "name_change"
|
|
elif self._is_location_change(facts, property):
|
|
conflict_type = "location_change"
|
|
else:
|
|
conflict_type = "data_inconsistency"
|
|
|
|
explanation = self._generate_explanation(
|
|
property, winner, alternatives, conflict_type, query_date
|
|
)
|
|
|
|
return ConflictResolution(
|
|
property=property,
|
|
authoritative_value=winner.value,
|
|
valid_for_date=query_date,
|
|
conflict_type=conflict_type,
|
|
explanation=explanation,
|
|
alternative_values=alternatives
|
|
)
|
|
|
|
def _compute_authority_score(self, fact: TemporalFact) -> float:
|
|
"""Compute authority score for a fact."""
|
|
# Base authority from source tier
|
|
authority = self.SOURCE_AUTHORITY.get(fact.source, 0.5)
|
|
|
|
# Boost for confidence
|
|
authority *= fact.confidence
|
|
|
|
# Recency bonus (facts with recent valid_from get slight boost)
|
|
days_old = (datetime.now() - fact.valid_from).days
|
|
recency_factor = 1.0 / (1.0 + days_old / 365.0) # Decay over years
|
|
authority *= (0.8 + 0.2 * recency_factor)
|
|
|
|
return authority
|
|
|
|
def _is_name_change(self, facts: list[TemporalFact]) -> bool:
|
|
"""Check if conflict represents a name change."""
|
|
# Name changes typically have non-overlapping validity
|
|
facts_sorted = sorted(facts, key=lambda f: f.valid_from)
|
|
for i in range(len(facts_sorted) - 1):
|
|
if facts_sorted[i].valid_to == facts_sorted[i+1].valid_from:
|
|
return True
|
|
return False
|
|
|
|
def _is_location_change(self, facts: list[TemporalFact], property: str) -> bool:
|
|
"""Check if conflict represents a location change."""
|
|
return property in ["city", "address", "location", "settlementName", "subregionCode"]
|
|
|
|
def _generate_explanation(
|
|
self,
|
|
property: str,
|
|
winner: TemporalFact,
|
|
alternatives: list[TemporalFact],
|
|
conflict_type: str,
|
|
query_date: datetime
|
|
) -> str:
|
|
"""Generate human-readable explanation of resolution."""
|
|
if conflict_type == "name_change":
|
|
return (
|
|
f"The institution name changed over time. "
|
|
f"At {query_date.strftime('%Y-%m-%d')}, the authoritative name was '{winner.value}'. "
|
|
f"Previous names: {', '.join(f.value for f in alternatives)}."
|
|
)
|
|
elif conflict_type == "location_change":
|
|
return (
|
|
f"The institution relocated. "
|
|
f"At {query_date.strftime('%Y-%m-%d')}, it was located at '{winner.value}'."
|
|
)
|
|
elif conflict_type == "redundant":
|
|
return f"Multiple sources confirm: {winner.value}"
|
|
else:
|
|
return (
|
|
f"Data conflict for {property}. "
|
|
f"Using '{winner.value}' from {winner.source} (confidence: {winner.confidence:.2f}). "
|
|
f"Alternative values exist in other sources."
|
|
)
|
|
|
|
|
|
# Singleton instance
|
|
_resolver: Optional[TemporalConflictResolver] = None
|
|
|
|
|
|
def get_temporal_resolver() -> TemporalConflictResolver:
|
|
"""Get or create singleton resolver instance."""
|
|
global _resolver
|
|
if _resolver is None:
|
|
_resolver = TemporalConflictResolver()
|
|
return _resolver
|