"""
Layer 3: Integration Tests for Heritage RAG
These tests verify:
- API endpoint health
- Oxigraph connectivity
- End-to-end query processing
- Sample query responses
Requires:
- Live Oxigraph instance (via SSH tunnel or direct connection)
- ANTHROPIC_API_KEY for LLM queries
Run locally with SSH tunnel:
ssh -f -N -L 7878:127.0.0.1:7878 root@91.98.224.44
export OXIGRAPH_ENDPOINT=http://127.0.0.1:7878
pytest tests/dspy_gitops/test_layer3_integration.py -v
"""
import os
import time
from typing import Any
import httpx
import pytest
# Configuration - prefer local tunnel, fallback to environment variable
# Oxigraph is NOT externally accessible, so we need SSH tunnel
OXIGRAPH_URL = os.environ.get("OXIGRAPH_ENDPOINT", "http://127.0.0.1:7878")
API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:8000")
# =============================================================================
# Oxigraph Connectivity Tests
# =============================================================================
@pytest.mark.layer3
@pytest.mark.requires_oxigraph
class TestOxigraphConnectivity:
"""Test Oxigraph SPARQL endpoint connectivity."""
def test_oxigraph_health(self):
"""Verify Oxigraph is accessible."""
query = "SELECT (COUNT(*) as ?count) WHERE { ?s ?p ?o } LIMIT 1"
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=10.0,
)
assert response.status_code == 200
data = response.json()
assert "results" in data
assert "bindings" in data["results"]
def test_oxigraph_triple_count(self):
"""Verify Oxigraph has data loaded."""
query = "SELECT (COUNT(*) as ?count) WHERE { ?s ?p ?o }"
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
# Should have substantial data
assert count > 100000, f"Expected > 100k triples, got {count}"
def test_dutch_institutions_exist(self):
"""Verify Dutch institution data is present."""
query = """
PREFIX hc:
SELECT (COUNT(DISTINCT ?s) as ?count)
WHERE { ?s hc:countryCode "NL" . }
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
# Should have Dutch institutions
assert count > 2000, f"Expected > 2000 Dutch institutions, got {count}"
def test_dutch_institutions_with_coordinates(self):
"""Verify Dutch institutions have coordinate data.
Note: Coordinates are stored on blank nodes via schema:location,
NOT directly on the institution subject.
"""
query = """
PREFIX hc:
PREFIX schema:
PREFIX geo:
SELECT (COUNT(DISTINCT ?s) as ?count)
WHERE {
?s hc:countryCode "NL" .
?s schema:location ?loc .
?loc geo:lat ?lat .
}
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
# Should have geocoded institutions
assert count > 2500, f"Expected > 2500 Dutch institutions with coords, got {count}"
def test_amsterdam_institutions_query(self):
"""Test specific Amsterdam institutions query."""
# Use hc:settlementName (the actual schema field)
query = """
PREFIX hc:
SELECT (COUNT(DISTINCT ?s) as ?count)
WHERE {
?s hc:countryCode "NL" .
?s hc:settlementName "Amsterdam" .
}
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
# Amsterdam should have many institutions
assert count > 100, f"Expected > 100 Amsterdam institutions, got {count}"
print(f"Found {count} institutions in Amsterdam")
# =============================================================================
# API Health Tests
# =============================================================================
@pytest.mark.layer3
class TestAPIHealth:
"""Test API endpoint health."""
@pytest.fixture
def client(self):
"""Create HTTP client."""
return httpx.Client(base_url=API_BASE_URL, timeout=30.0)
@pytest.mark.skip(reason="API server not always running in CI")
def test_dspy_rag_health(self, client):
"""Test DSPy RAG health endpoint."""
response = client.get("/api/dspy/rag/health")
assert response.status_code == 200
data = response.json()
assert data.get("status") == "ok"
assert "components" in data
@pytest.mark.skip(reason="API server not always running in CI")
def test_dspy_rag_training_data(self, client):
"""Test training data endpoint."""
response = client.get("/api/dspy/rag/training-data")
assert response.status_code == 200
data = response.json()
assert data.get("total_training", 0) > 0
assert data.get("total_validation", 0) > 0
# =============================================================================
# Sample Query Tests
# =============================================================================
@pytest.mark.layer3
@pytest.mark.requires_llm
class TestSampleQueries:
"""Test sample queries against live system."""
SAMPLE_QUERIES = [
{
"question": "Hoeveel musea zijn er in Amsterdam?",
"language": "nl",
"expected_intent": "statistical",
},
{
"question": "Waar is het Rijksmuseum gevestigd?",
"language": "nl",
"expected_intent": "entity_lookup",
},
{
"question": "How many libraries are in the Netherlands?",
"language": "en",
"expected_intent": "statistical",
},
]
@pytest.fixture
def async_client(self):
"""Create async HTTP client."""
return httpx.AsyncClient(base_url=API_BASE_URL, timeout=60.0)
@pytest.mark.skip(reason="API server not always running in CI")
@pytest.mark.asyncio
async def test_sample_queries(self, async_client):
"""Test sample queries return valid responses."""
for query in self.SAMPLE_QUERIES:
start = time.time()
response = await async_client.post(
"/api/dspy/rag/query",
json={
"question": query["question"],
"language": query["language"],
"include_visualization": False,
"use_agent": False,
},
)
duration_ms = (time.time() - start) * 1000
assert response.status_code == 200, f"Failed for: {query['question']}"
data = response.json()
# Verify response structure
assert "answer" in data
assert "intent" in data
assert "confidence" in data
# Verify answer is not empty
assert data["answer"], f"Empty answer for: {query['question']}"
# Log results
print(f"\nQuery: {query['question'][:50]}...")
print(f" Intent: {data['intent']} (expected: {query['expected_intent']})")
print(f" Duration: {duration_ms:.0f}ms")
print(f" Answer: {data['answer'][:100]}...")
# =============================================================================
# Direct SPARQL Tests for Heritage Queries
# =============================================================================
@pytest.mark.layer3
@pytest.mark.requires_oxigraph
class TestHeritageSPARQL:
"""Test heritage-specific SPARQL queries directly.
Note: Uses the actual hc: ontology schema, which uses:
- hc:institutionType with single-letter codes (M=Museum, L=Library, A=Archive, etc.)
- hc:settlementName for city names (NOT hc:city)
- hc:countryCode for country codes
- skos:prefLabel or schema:name for institution names
"""
def test_count_museums_amsterdam(self):
"""Count museums in Amsterdam via SPARQL."""
# Institution types use single-letter codes: M=Museum
query = """
PREFIX hc:
SELECT (COUNT(DISTINCT ?s) as ?count)
WHERE {
?s hc:institutionType "M" .
?s hc:countryCode "NL" .
?s hc:settlementName "Amsterdam" .
}
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
print(f"Museums in Amsterdam: {count}")
assert count > 30, f"Expected > 30 Amsterdam museums, got {count}"
def test_find_rijksmuseum(self):
"""Find Rijksmuseum by name."""
query = """
PREFIX hc:
PREFIX schema:
SELECT ?s ?name ?city
WHERE {
?s schema:name ?name .
FILTER(CONTAINS(LCASE(?name), "rijksmuseum"))
?s hc:settlementName ?city .
}
LIMIT 5
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
bindings = data["results"]["bindings"]
assert len(bindings) > 0, "No Rijksmuseum found"
# Should find Amsterdam Rijksmuseum
names = [b["name"]["value"] for b in bindings]
cities = [b.get("city", {}).get("value", "") for b in bindings]
print(f"Found: {names}")
print(f"Cities: {cities}")
def test_count_libraries_nl(self):
"""Count libraries in Netherlands."""
# Institution type L = Library
query = """
PREFIX hc:
SELECT (COUNT(DISTINCT ?s) as ?count)
WHERE {
?s hc:institutionType "L" .
?s hc:countryCode "NL" .
}
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
count = int(data["results"]["bindings"][0]["count"]["value"])
print(f"Libraries in Netherlands: {count}")
assert count > 100, f"Expected > 100 libraries, got {count}"
def test_geographic_query_amsterdam(self):
"""Test geographic query near Amsterdam coordinates.
Note: Coordinates are stored on blank nodes via schema:location,
NOT directly on the institution subject.
Amsterdam coordinates: ~52.37, 4.89
"""
query = """
PREFIX hc:
PREFIX schema:
PREFIX geo:
SELECT ?s ?name ?lat ?lon
WHERE {
?s hc:countryCode "NL" .
?s schema:name ?name .
?s schema:location ?loc .
?loc geo:lat ?lat .
?loc geo:long ?lon .
FILTER(
?lat > 52.3 &&
?lat < 52.4 &&
?lon > 4.8 &&
?lon < 5.0
)
}
LIMIT 10
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
bindings = data["results"]["bindings"]
print(f"Found {len(bindings)} institutions near Amsterdam")
for b in bindings[:5]:
print(f" - {b.get('name', {}).get('value', 'N/A')}")
# Should find institutions near Amsterdam center
assert len(bindings) > 0, "No institutions found near Amsterdam coordinates"
def test_institution_type_distribution(self):
"""Verify institution type distribution in data."""
query = """
PREFIX hc:
SELECT ?type (COUNT(DISTINCT ?s) as ?count)
WHERE {
?s hc:institutionType ?type .
?s hc:countryCode "NL" .
}
GROUP BY ?type
ORDER BY DESC(?count)
"""
response = httpx.post(
f"{OXIGRAPH_URL}/query",
data={"query": query},
headers={"Accept": "application/sparql-results+json"},
timeout=30.0,
)
assert response.status_code == 200
data = response.json()
bindings = data["results"]["bindings"]
# Should have multiple institution types
assert len(bindings) > 5, f"Expected > 5 institution types, got {len(bindings)}"
# Print distribution
print("Institution type distribution (NL):")
for b in bindings[:10]:
type_code = b["type"]["value"]
count = b["count"]["value"]
print(f" {type_code}: {count}")
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])