glam/scripts/populate_cache.py
2025-12-21 00:01:54 +01:00

275 lines
8.5 KiB
Python

#!/usr/bin/env python3
"""
Populate the Qdrant-backed semantic cache with common Dutch archive queries.
This script:
1. Makes real RAG streaming requests to /api/rag/dspy/query/stream
2. Stores the responses in the Qdrant cache via /api/cache/store
Usage:
python scripts/populate_cache.py [--base-url URL] [--dry-run]
Examples:
# Populate cache on production
python scripts/populate_cache.py --base-url https://archief.support
# Test locally
python scripts/populate_cache.py --base-url http://localhost:8010 --dry-run
"""
import argparse
import json
import sys
import time
from typing import Optional
import httpx
# Common Dutch archive queries to pre-cache
COMMON_QUERIES = [
# Core archive questions
"Wat is het Nationaal Archief?",
"Hoe kan ik mijn stamboom onderzoeken?",
"Waar vind ik geboorteaktes?",
"Welke archieven zijn er in Nederland?",
"Hoe kan ik een overlijdensakte opvragen?",
"Wat is het VOC archief?",
"Waar vind ik oude kadasterkaarten?",
"Hoe zoek ik naar voorouders?",
"Wat zijn Bevolkingsregisters?",
"Hoe vind ik militaire stamboeken?",
# Regional archives
"Welke archieven zijn er in Amsterdam?",
"Waar vind ik het stadsarchief van Rotterdam?",
"Welke archieven zijn er in Den Haag?",
"Waar is het Utrechts Archief?",
"Welke archieven zijn er in Groningen?",
# Specific document types
"Waar vind ik trouwaktes?",
"Hoe vind ik scheepvaartregisters?",
"Waar zijn notariële aktes te vinden?",
"Hoe zoek ik in de Burgerlijke Stand?",
"Waar vind ik emigratieregisters?",
# Research topics
"Hoe onderzoek ik mijn familiegeschiedenis?",
"Waar vind ik informatie over de Tweede Wereldoorlog?",
"Hoe vind ik informatie over de slavernijgeschiedenis?",
"Waar zijn koloniaal archieven te vinden?",
# Museum/Library questions
"Hoeveel musea zijn er in Nederland?",
"Welke bibliotheken hebben oude manuscripten?",
"Wat is het grootste museum van Nederland?",
"Waar vind ik de Koninklijke Bibliotheek?",
]
def make_rag_request(base_url: str, query: str, timeout: float = 120.0) -> Optional[dict]:
"""
Make a streaming RAG request and collect the final response.
Args:
base_url: Base URL (e.g., https://archief.support)
query: The query string
timeout: Request timeout in seconds
Returns:
Final response data dict, or None on error
"""
url = f"{base_url}/api/rag/dspy/query/stream"
payload = {
"question": query,
"language": "nl",
"include_visualization": False,
"use_agent": False,
"llm_provider": "zai", # Free provider
"llm_model": "glm-4.6",
}
final_data = None
try:
with httpx.stream("POST", url, json=payload, timeout=timeout) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line.strip():
continue
try:
event = json.loads(line)
if event.get("type") == "status":
stage = event.get("stage", "")
message = event.get("message", "")
print(f" [{stage}] {message}")
elif event.get("type") == "error":
print(f" ERROR: {event.get('error')}")
return None
elif event.get("type") == "complete":
final_data = event.get("data", {})
except json.JSONDecodeError:
continue
except httpx.TimeoutException:
print(f" TIMEOUT after {timeout}s")
return None
except httpx.HTTPStatusError as e:
print(f" HTTP ERROR: {e.response.status_code}")
return None
except Exception as e:
print(f" ERROR: {e}")
return None
return final_data
def store_in_cache(base_url: str, query: str, response_data: dict, timeout: float = 30.0) -> bool:
"""
Store a query/response pair in the Qdrant cache.
Args:
base_url: Base URL (e.g., https://archief.support)
query: The original query
response_data: The RAG response data
timeout: Request timeout
Returns:
True if stored successfully
"""
url = f"{base_url}/api/cache/store"
# Extract relevant fields for caching
answer = response_data.get("answer", "")
sources = response_data.get("sources_used", [])
# Parse institutions from retrieved_results
institutions = []
for r in response_data.get("retrieved_results", []):
metadata = r.get("metadata", {})
institutions.append({
"name": r.get("name", ""),
"type": metadata.get("institution_type"),
"city": metadata.get("city"),
"country": metadata.get("country"),
"description": metadata.get("description"),
"website": r.get("website"),
})
payload = {
"query": query,
"response": {
"answer": answer,
"sources": [{"database": s, "name": s} for s in sources],
"institutions": institutions,
},
"language": "nl",
"model": "glm-4.6",
}
try:
resp = httpx.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
return True
except Exception as e:
print(f" CACHE STORE ERROR: {e}")
return False
def get_cache_stats(base_url: str) -> dict:
"""Get current cache statistics."""
try:
resp = httpx.get(f"{base_url}/api/cache/stats", timeout=10.0)
resp.raise_for_status()
return resp.json()
except Exception:
return {}
def main():
parser = argparse.ArgumentParser(description="Populate semantic cache with common queries")
parser.add_argument("--base-url", default="https://archief.support", help="Base URL for API")
parser.add_argument("--dry-run", action="store_true", help="Only show what would be done")
parser.add_argument("--limit", type=int, default=None, help="Limit number of queries to process")
parser.add_argument("--delay", type=float, default=2.0, help="Delay between queries (seconds)")
args = parser.parse_args()
print(f"=== Semantic Cache Population Script ===")
print(f"Base URL: {args.base_url}")
print(f"Dry run: {args.dry_run}")
print()
# Check cache stats first
stats = get_cache_stats(args.base_url)
if stats:
print(f"Current cache: {stats.get('total_entries', 0)} entries")
print()
queries = COMMON_QUERIES[:args.limit] if args.limit else COMMON_QUERIES
if args.dry_run:
print(f"Would process {len(queries)} queries:")
for i, q in enumerate(queries, 1):
print(f" {i}. {q}")
return 0
# Process queries
success_count = 0
fail_count = 0
for i, query in enumerate(queries, 1):
print(f"[{i}/{len(queries)}] {query}")
# Make RAG request
print(" Making RAG request...")
start_time = time.time()
response_data = make_rag_request(args.base_url, query)
elapsed = time.time() - start_time
if response_data:
answer_preview = response_data.get("answer", "")[:100]
print(f" Got response ({elapsed:.1f}s): {answer_preview}...")
# Store in cache
print(" Storing in cache...")
if store_in_cache(args.base_url, query, response_data):
print(" ✓ Cached successfully")
success_count += 1
else:
fail_count += 1
else:
print(" ✗ No response")
fail_count += 1
# Delay between requests to avoid rate limiting
if i < len(queries) and args.delay > 0:
print(f" Waiting {args.delay}s...")
time.sleep(args.delay)
print()
# Final stats
print("=== Summary ===")
print(f"Processed: {len(queries)} queries")
print(f"Success: {success_count}")
print(f"Failed: {fail_count}")
# Check final cache stats
stats = get_cache_stats(args.base_url)
if stats:
print(f"Cache now has: {stats.get('total_entries', 0)} entries")
return 0 if fail_count == 0 else 1
if __name__ == "__main__":
sys.exit(main())