""" Layer 3: Integration Tests for Heritage RAG These tests verify: - API endpoint health - Oxigraph connectivity - End-to-end query processing - Sample query responses Requires: - Live Oxigraph instance (via SSH tunnel or direct connection) - ANTHROPIC_API_KEY for LLM queries Run locally with SSH tunnel: ssh -f -N -L 7878:127.0.0.1:7878 root@91.98.224.44 export OXIGRAPH_ENDPOINT=http://127.0.0.1:7878 pytest tests/dspy_gitops/test_layer3_integration.py -v """ import os import time from typing import Any import httpx import pytest # Configuration - prefer local tunnel, fallback to environment variable # Oxigraph is NOT externally accessible, so we need SSH tunnel OXIGRAPH_URL = os.environ.get("OXIGRAPH_ENDPOINT", "http://127.0.0.1:7878") API_BASE_URL = os.environ.get("API_BASE_URL", "http://localhost:8000") # ============================================================================= # Oxigraph Connectivity Tests # ============================================================================= @pytest.mark.layer3 @pytest.mark.requires_oxigraph class TestOxigraphConnectivity: """Test Oxigraph SPARQL endpoint connectivity.""" def test_oxigraph_health(self): """Verify Oxigraph is accessible.""" query = "SELECT (COUNT(*) as ?count) WHERE { ?s ?p ?o } LIMIT 1" response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=10.0, ) assert response.status_code == 200 data = response.json() assert "results" in data assert "bindings" in data["results"] def test_oxigraph_triple_count(self): """Verify Oxigraph has data loaded.""" query = "SELECT (COUNT(*) as ?count) WHERE { ?s ?p ?o }" response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) # Should have substantial data assert count > 100000, f"Expected > 100k triples, got {count}" def test_dutch_institutions_exist(self): """Verify Dutch institution data is present.""" query = """ PREFIX hc: SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:countryCode "NL" . } """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) # Should have Dutch institutions assert count > 2000, f"Expected > 2000 Dutch institutions, got {count}" def test_dutch_institutions_with_coordinates(self): """Verify Dutch institutions have coordinate data. Note: Coordinates are stored on blank nodes via schema:location, NOT directly on the institution subject. """ query = """ PREFIX hc: PREFIX schema: PREFIX geo: SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:countryCode "NL" . ?s schema:location ?loc . ?loc geo:lat ?lat . } """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) # Should have geocoded institutions assert count > 2500, f"Expected > 2500 Dutch institutions with coords, got {count}" def test_amsterdam_institutions_query(self): """Test specific Amsterdam institutions query.""" # Use hc:settlementName (the actual schema field) query = """ PREFIX hc: SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:countryCode "NL" . ?s hc:settlementName "Amsterdam" . } """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) # Amsterdam should have many institutions assert count > 100, f"Expected > 100 Amsterdam institutions, got {count}" print(f"Found {count} institutions in Amsterdam") # ============================================================================= # API Health Tests # ============================================================================= @pytest.mark.layer3 class TestAPIHealth: """Test API endpoint health.""" @pytest.fixture def client(self): """Create HTTP client.""" return httpx.Client(base_url=API_BASE_URL, timeout=30.0) @pytest.mark.skip(reason="API server not always running in CI") def test_dspy_rag_health(self, client): """Test DSPy RAG health endpoint.""" response = client.get("/api/dspy/rag/health") assert response.status_code == 200 data = response.json() assert data.get("status") == "ok" assert "components" in data @pytest.mark.skip(reason="API server not always running in CI") def test_dspy_rag_training_data(self, client): """Test training data endpoint.""" response = client.get("/api/dspy/rag/training-data") assert response.status_code == 200 data = response.json() assert data.get("total_training", 0) > 0 assert data.get("total_validation", 0) > 0 # ============================================================================= # Sample Query Tests # ============================================================================= @pytest.mark.layer3 @pytest.mark.requires_llm class TestSampleQueries: """Test sample queries against live system.""" SAMPLE_QUERIES = [ { "question": "Hoeveel musea zijn er in Amsterdam?", "language": "nl", "expected_intent": "statistical", }, { "question": "Waar is het Rijksmuseum gevestigd?", "language": "nl", "expected_intent": "entity_lookup", }, { "question": "How many libraries are in the Netherlands?", "language": "en", "expected_intent": "statistical", }, ] @pytest.fixture def async_client(self): """Create async HTTP client.""" return httpx.AsyncClient(base_url=API_BASE_URL, timeout=60.0) @pytest.mark.skip(reason="API server not always running in CI") @pytest.mark.asyncio async def test_sample_queries(self, async_client): """Test sample queries return valid responses.""" for query in self.SAMPLE_QUERIES: start = time.time() response = await async_client.post( "/api/dspy/rag/query", json={ "question": query["question"], "language": query["language"], "include_visualization": False, "use_agent": False, }, ) duration_ms = (time.time() - start) * 1000 assert response.status_code == 200, f"Failed for: {query['question']}" data = response.json() # Verify response structure assert "answer" in data assert "intent" in data assert "confidence" in data # Verify answer is not empty assert data["answer"], f"Empty answer for: {query['question']}" # Log results print(f"\nQuery: {query['question'][:50]}...") print(f" Intent: {data['intent']} (expected: {query['expected_intent']})") print(f" Duration: {duration_ms:.0f}ms") print(f" Answer: {data['answer'][:100]}...") # ============================================================================= # Direct SPARQL Tests for Heritage Queries # ============================================================================= @pytest.mark.layer3 @pytest.mark.requires_oxigraph class TestHeritageSPARQL: """Test heritage-specific SPARQL queries directly. Note: Uses the actual hc: ontology schema, which uses: - hc:institutionType with single-letter codes (M=Museum, L=Library, A=Archive, etc.) - hc:settlementName for city names (NOT hc:city) - hc:countryCode for country codes - skos:prefLabel or schema:name for institution names """ def test_count_museums_amsterdam(self): """Count museums in Amsterdam via SPARQL.""" # Institution types use single-letter codes: M=Museum query = """ PREFIX hc: SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:institutionType "M" . ?s hc:countryCode "NL" . ?s hc:settlementName "Amsterdam" . } """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) print(f"Museums in Amsterdam: {count}") assert count > 30, f"Expected > 30 Amsterdam museums, got {count}" def test_find_rijksmuseum(self): """Find Rijksmuseum by name.""" query = """ PREFIX hc: PREFIX schema: SELECT ?s ?name ?city WHERE { ?s schema:name ?name . FILTER(CONTAINS(LCASE(?name), "rijksmuseum")) ?s hc:settlementName ?city . } LIMIT 5 """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() bindings = data["results"]["bindings"] assert len(bindings) > 0, "No Rijksmuseum found" # Should find Amsterdam Rijksmuseum names = [b["name"]["value"] for b in bindings] cities = [b.get("city", {}).get("value", "") for b in bindings] print(f"Found: {names}") print(f"Cities: {cities}") def test_count_libraries_nl(self): """Count libraries in Netherlands.""" # Institution type L = Library query = """ PREFIX hc: SELECT (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:institutionType "L" . ?s hc:countryCode "NL" . } """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() count = int(data["results"]["bindings"][0]["count"]["value"]) print(f"Libraries in Netherlands: {count}") assert count > 100, f"Expected > 100 libraries, got {count}" def test_geographic_query_amsterdam(self): """Test geographic query near Amsterdam coordinates. Note: Coordinates are stored on blank nodes via schema:location, NOT directly on the institution subject. Amsterdam coordinates: ~52.37, 4.89 """ query = """ PREFIX hc: PREFIX schema: PREFIX geo: SELECT ?s ?name ?lat ?lon WHERE { ?s hc:countryCode "NL" . ?s schema:name ?name . ?s schema:location ?loc . ?loc geo:lat ?lat . ?loc geo:long ?lon . FILTER( ?lat > 52.3 && ?lat < 52.4 && ?lon > 4.8 && ?lon < 5.0 ) } LIMIT 10 """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() bindings = data["results"]["bindings"] print(f"Found {len(bindings)} institutions near Amsterdam") for b in bindings[:5]: print(f" - {b.get('name', {}).get('value', 'N/A')}") # Should find institutions near Amsterdam center assert len(bindings) > 0, "No institutions found near Amsterdam coordinates" def test_institution_type_distribution(self): """Verify institution type distribution in data.""" query = """ PREFIX hc: SELECT ?type (COUNT(DISTINCT ?s) as ?count) WHERE { ?s hc:institutionType ?type . ?s hc:countryCode "NL" . } GROUP BY ?type ORDER BY DESC(?count) """ response = httpx.post( f"{OXIGRAPH_URL}/query", data={"query": query}, headers={"Accept": "application/sparql-results+json"}, timeout=30.0, ) assert response.status_code == 200 data = response.json() bindings = data["results"]["bindings"] # Should have multiple institution types assert len(bindings) > 5, f"Expected > 5 institution types, got {len(bindings)}" # Print distribution print("Institution type distribution (NL):") for b in bindings[:10]: type_code = b["type"]["value"] count = b["count"]["value"] print(f" {type_code}: {count}") if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])