275 lines
8.5 KiB
Python
275 lines
8.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Populate the Qdrant-backed semantic cache with common Dutch archive queries.
|
|
|
|
This script:
|
|
1. Makes real RAG streaming requests to /api/rag/dspy/query/stream
|
|
2. Stores the responses in the Qdrant cache via /api/cache/store
|
|
|
|
Usage:
|
|
python scripts/populate_cache.py [--base-url URL] [--dry-run]
|
|
|
|
Examples:
|
|
# Populate cache on production
|
|
python scripts/populate_cache.py --base-url https://archief.support
|
|
|
|
# Test locally
|
|
python scripts/populate_cache.py --base-url http://localhost:8010 --dry-run
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
|
|
# Common Dutch archive queries to pre-cache
|
|
COMMON_QUERIES = [
|
|
# Core archive questions
|
|
"Wat is het Nationaal Archief?",
|
|
"Hoe kan ik mijn stamboom onderzoeken?",
|
|
"Waar vind ik geboorteaktes?",
|
|
"Welke archieven zijn er in Nederland?",
|
|
"Hoe kan ik een overlijdensakte opvragen?",
|
|
"Wat is het VOC archief?",
|
|
"Waar vind ik oude kadasterkaarten?",
|
|
"Hoe zoek ik naar voorouders?",
|
|
"Wat zijn Bevolkingsregisters?",
|
|
"Hoe vind ik militaire stamboeken?",
|
|
|
|
# Regional archives
|
|
"Welke archieven zijn er in Amsterdam?",
|
|
"Waar vind ik het stadsarchief van Rotterdam?",
|
|
"Welke archieven zijn er in Den Haag?",
|
|
"Waar is het Utrechts Archief?",
|
|
"Welke archieven zijn er in Groningen?",
|
|
|
|
# Specific document types
|
|
"Waar vind ik trouwaktes?",
|
|
"Hoe vind ik scheepvaartregisters?",
|
|
"Waar zijn notariële aktes te vinden?",
|
|
"Hoe zoek ik in de Burgerlijke Stand?",
|
|
"Waar vind ik emigratieregisters?",
|
|
|
|
# Research topics
|
|
"Hoe onderzoek ik mijn familiegeschiedenis?",
|
|
"Waar vind ik informatie over de Tweede Wereldoorlog?",
|
|
"Hoe vind ik informatie over de slavernijgeschiedenis?",
|
|
"Waar zijn koloniaal archieven te vinden?",
|
|
|
|
# Museum/Library questions
|
|
"Hoeveel musea zijn er in Nederland?",
|
|
"Welke bibliotheken hebben oude manuscripten?",
|
|
"Wat is het grootste museum van Nederland?",
|
|
"Waar vind ik de Koninklijke Bibliotheek?",
|
|
]
|
|
|
|
|
|
def make_rag_request(base_url: str, query: str, timeout: float = 120.0) -> Optional[dict]:
|
|
"""
|
|
Make a streaming RAG request and collect the final response.
|
|
|
|
Args:
|
|
base_url: Base URL (e.g., https://archief.support)
|
|
query: The query string
|
|
timeout: Request timeout in seconds
|
|
|
|
Returns:
|
|
Final response data dict, or None on error
|
|
"""
|
|
url = f"{base_url}/api/rag/dspy/query/stream"
|
|
|
|
payload = {
|
|
"question": query,
|
|
"language": "nl",
|
|
"include_visualization": False,
|
|
"use_agent": False,
|
|
"llm_provider": "zai", # Free provider
|
|
"llm_model": "glm-4.6",
|
|
}
|
|
|
|
final_data = None
|
|
|
|
try:
|
|
with httpx.stream("POST", url, json=payload, timeout=timeout) as response:
|
|
response.raise_for_status()
|
|
|
|
for line in response.iter_lines():
|
|
if not line.strip():
|
|
continue
|
|
|
|
try:
|
|
event = json.loads(line)
|
|
|
|
if event.get("type") == "status":
|
|
stage = event.get("stage", "")
|
|
message = event.get("message", "")
|
|
print(f" [{stage}] {message}")
|
|
|
|
elif event.get("type") == "error":
|
|
print(f" ERROR: {event.get('error')}")
|
|
return None
|
|
|
|
elif event.get("type") == "complete":
|
|
final_data = event.get("data", {})
|
|
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
except httpx.TimeoutException:
|
|
print(f" TIMEOUT after {timeout}s")
|
|
return None
|
|
except httpx.HTTPStatusError as e:
|
|
print(f" HTTP ERROR: {e.response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
print(f" ERROR: {e}")
|
|
return None
|
|
|
|
return final_data
|
|
|
|
|
|
def store_in_cache(base_url: str, query: str, response_data: dict, timeout: float = 30.0) -> bool:
|
|
"""
|
|
Store a query/response pair in the Qdrant cache.
|
|
|
|
Args:
|
|
base_url: Base URL (e.g., https://archief.support)
|
|
query: The original query
|
|
response_data: The RAG response data
|
|
timeout: Request timeout
|
|
|
|
Returns:
|
|
True if stored successfully
|
|
"""
|
|
url = f"{base_url}/api/cache/store"
|
|
|
|
# Extract relevant fields for caching
|
|
answer = response_data.get("answer", "")
|
|
sources = response_data.get("sources_used", [])
|
|
|
|
# Parse institutions from retrieved_results
|
|
institutions = []
|
|
for r in response_data.get("retrieved_results", []):
|
|
metadata = r.get("metadata", {})
|
|
institutions.append({
|
|
"name": r.get("name", ""),
|
|
"type": metadata.get("institution_type"),
|
|
"city": metadata.get("city"),
|
|
"country": metadata.get("country"),
|
|
"description": metadata.get("description"),
|
|
"website": r.get("website"),
|
|
})
|
|
|
|
payload = {
|
|
"query": query,
|
|
"response": {
|
|
"answer": answer,
|
|
"sources": [{"database": s, "name": s} for s in sources],
|
|
"institutions": institutions,
|
|
},
|
|
"language": "nl",
|
|
"model": "glm-4.6",
|
|
}
|
|
|
|
try:
|
|
resp = httpx.post(url, json=payload, timeout=timeout)
|
|
resp.raise_for_status()
|
|
return True
|
|
except Exception as e:
|
|
print(f" CACHE STORE ERROR: {e}")
|
|
return False
|
|
|
|
|
|
def get_cache_stats(base_url: str) -> dict:
|
|
"""Get current cache statistics."""
|
|
try:
|
|
resp = httpx.get(f"{base_url}/api/cache/stats", timeout=10.0)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Populate semantic cache with common queries")
|
|
parser.add_argument("--base-url", default="https://archief.support", help="Base URL for API")
|
|
parser.add_argument("--dry-run", action="store_true", help="Only show what would be done")
|
|
parser.add_argument("--limit", type=int, default=None, help="Limit number of queries to process")
|
|
parser.add_argument("--delay", type=float, default=2.0, help="Delay between queries (seconds)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
print(f"=== Semantic Cache Population Script ===")
|
|
print(f"Base URL: {args.base_url}")
|
|
print(f"Dry run: {args.dry_run}")
|
|
print()
|
|
|
|
# Check cache stats first
|
|
stats = get_cache_stats(args.base_url)
|
|
if stats:
|
|
print(f"Current cache: {stats.get('total_entries', 0)} entries")
|
|
print()
|
|
|
|
queries = COMMON_QUERIES[:args.limit] if args.limit else COMMON_QUERIES
|
|
|
|
if args.dry_run:
|
|
print(f"Would process {len(queries)} queries:")
|
|
for i, q in enumerate(queries, 1):
|
|
print(f" {i}. {q}")
|
|
return 0
|
|
|
|
# Process queries
|
|
success_count = 0
|
|
fail_count = 0
|
|
|
|
for i, query in enumerate(queries, 1):
|
|
print(f"[{i}/{len(queries)}] {query}")
|
|
|
|
# Make RAG request
|
|
print(" Making RAG request...")
|
|
start_time = time.time()
|
|
response_data = make_rag_request(args.base_url, query)
|
|
elapsed = time.time() - start_time
|
|
|
|
if response_data:
|
|
answer_preview = response_data.get("answer", "")[:100]
|
|
print(f" Got response ({elapsed:.1f}s): {answer_preview}...")
|
|
|
|
# Store in cache
|
|
print(" Storing in cache...")
|
|
if store_in_cache(args.base_url, query, response_data):
|
|
print(" ✓ Cached successfully")
|
|
success_count += 1
|
|
else:
|
|
fail_count += 1
|
|
else:
|
|
print(" ✗ No response")
|
|
fail_count += 1
|
|
|
|
# Delay between requests to avoid rate limiting
|
|
if i < len(queries) and args.delay > 0:
|
|
print(f" Waiting {args.delay}s...")
|
|
time.sleep(args.delay)
|
|
|
|
print()
|
|
|
|
# Final stats
|
|
print("=== Summary ===")
|
|
print(f"Processed: {len(queries)} queries")
|
|
print(f"Success: {success_count}")
|
|
print(f"Failed: {fail_count}")
|
|
|
|
# Check final cache stats
|
|
stats = get_cache_stats(args.base_url)
|
|
if stats:
|
|
print(f"Cache now has: {stats.get('total_entries', 0)} entries")
|
|
|
|
return 0 if fail_count == 0 else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|