glam/backend/typedb/main.py
2025-12-15 22:31:41 +01:00

1388 lines
51 KiB
Python

"""
TypeDB REST API for Heritage Custodian Data
FastAPI backend providing TypeQL query interface for bronhouder.nl
Updated for TypeDB 3.x driver API (no sessions, direct transactions)
Endpoints:
- GET / - Health check and statistics
- GET /status - Server status
- POST /query - Execute TypeQL match query (read-only)
- GET /databases - List all databases
- POST /databases/{n} - Create new database
- GET /schema - Get database schema types
- POST /schema/load - Load TypeQL schema from file
- POST /data/insert - Execute insert query
- POST /database/reset/{n} - Reset (delete/recreate) database
- GET /stats - Get detailed statistics
- POST /sync/postgresql - Sync data from PostgreSQL to TypeDB
"""
import os
import re
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
import asyncio
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
# TypeDB 3.x imports
from typedb.driver import TypeDB, TransactionType, Credentials, DriverOptions
# PostgreSQL for sync
try:
import psycopg2
import psycopg2.extras
HAS_PSYCOPG2 = True
except ImportError:
HAS_PSYCOPG2 = False
# ============================================================================
# Configuration
# ============================================================================
class Settings(BaseModel):
"""TypeDB server settings"""
host: str = os.getenv("TYPEDB_HOST", "localhost")
port: int = int(os.getenv("TYPEDB_PORT", "1729"))
database: str = os.getenv("TYPEDB_DATABASE", "glam")
# TypeDB 3.x authentication
username: str = os.getenv("TYPEDB_USERNAME", "admin")
password: str = os.getenv("TYPEDB_PASSWORD", "password")
# Server settings
api_host: str = os.getenv("API_HOST", "0.0.0.0")
api_port: int = int(os.getenv("API_PORT", "8003"))
# PostgreSQL settings for sync (glam_geo database with custodians table)
pg_host: str = os.getenv("POSTGRES_HOST", "localhost")
pg_port: int = int(os.getenv("POSTGRES_PORT", "5432"))
pg_database: str = os.getenv("POSTGRES_DATABASE", "glam_geo")
pg_user: str = os.getenv("POSTGRES_USER", "glam_api")
pg_password: str = os.getenv("POSTGRES_PASSWORD", "glam_secret_2025")
settings = Settings()
# ============================================================================
# Pydantic Models
# ============================================================================
class QueryRequest(BaseModel):
"""TypeQL query request"""
query: str = Field(..., description="TypeQL query to execute")
database: Optional[str] = Field(None, description="Database name (defaults to 'glam')")
class QueryResponse(BaseModel):
"""TypeQL query response"""
results: List[Dict[str, Any]]
result_count: int
execution_time_ms: float
query_type: str
class DatabaseInfo(BaseModel):
"""Database metadata"""
name: str
class StatusResponse(BaseModel):
"""Server status response"""
status: str
databases: List[str]
default_database: str
uptime_seconds: float
typedb_version: str
# Fields for frontend compatibility
connected: bool = False
database: str = ""
version: str = ""
# ============================================================================
# Global State
# ============================================================================
_driver: Any = None
_executor = ThreadPoolExecutor(max_workers=4)
_start_time: datetime = datetime.now()
def get_driver() -> Any:
"""Get or create TypeDB 3.x driver"""
global _driver
if _driver is None:
# TypeDB 3.x: requires Credentials and DriverOptions
address = f"{settings.host}:{settings.port}"
credentials = Credentials(settings.username, settings.password)
options = DriverOptions(is_tls_enabled=False, tls_root_ca_path=None)
_driver = TypeDB.driver(address, credentials, options)
return _driver
def close_driver():
"""Close the driver connection"""
global _driver
if _driver is not None:
_driver.close()
_driver = None
# ============================================================================
# Helper Functions for TypeDB 3.x
# ============================================================================
def serialize_concept_3x(concept: Any) -> Dict[str, Any]:
"""Convert TypeDB 3.x concept to JSON-serializable dict"""
result: Dict[str, Any] = {}
# Check if it's already a dict (concept document)
if isinstance(concept, dict):
return concept
# Get type information
if hasattr(concept, 'get_type'):
concept_type = concept.get_type()
if concept_type:
if hasattr(concept_type, 'get_label'):
label = concept_type.get_label()
result['_type'] = str(label)
result['type'] = str(label)
# Handle IID (Internal ID)
if hasattr(concept, 'get_iid'):
iid = concept.get_iid()
if iid is not None:
result['_iid'] = str(iid)
result['id'] = str(iid)
# Handle value (for attributes)
if hasattr(concept, 'get_value'):
try:
result['value'] = concept.get_value()
except Exception:
pass
# Try to get label directly (for type concepts)
if hasattr(concept, 'get_label'):
try:
result['label'] = str(concept.get_label())
except Exception:
pass
return result
def execute_read_query(database: str, query: str) -> tuple:
"""Execute a read query in TypeDB 3.x (blocking)
TypeDB 3.x query patterns:
- match $x isa entity-type; -> returns _ConceptRowIterator
- match $x isa type; fetch { "key": $x.attr }; -> returns _ConceptDocumentIterator
"""
driver = get_driver()
results: List[Dict[str, Any]] = []
query_type = "unknown"
# Determine query type
query_stripped = query.strip().lower()
if "fetch" in query_stripped:
query_type = "fetch"
elif query_stripped.startswith("match"):
query_type = "match"
elif query_stripped.startswith("define"):
query_type = "define"
elif query_stripped.startswith("insert"):
query_type = "insert"
# TypeDB 3.x: Transaction directly on driver, no session
with driver.transaction(database, TransactionType.READ) as tx:
# Execute query - TypeDB 3.x uses tx.query().resolve()
answer = tx.query(query).resolve()
# Handle different result types based on query type
if query_type == "fetch":
# Fetch query results return ConceptDocumentIterator
try:
for doc in answer.as_concept_documents():
results.append(doc)
except Exception as e:
# Fallback: try to iterate directly
try:
for item in answer:
if isinstance(item, dict):
results.append(item)
else:
results.append({"result": str(item)})
except Exception:
results.append({"error": str(e)})
else:
# Match query results return ConceptRowIterator
try:
for row in answer.as_concept_rows():
row_dict = {}
# Get column names - returns an iterator in TypeDB 3.x
col_names = list(row.column_names())
for var in col_names:
concept = row.get(var)
if concept:
row_dict[var] = serialize_concept_3x(concept)
results.append(row_dict)
except Exception as e:
# If as_concept_rows fails, try other methods
try:
for item in answer:
results.append({"result": str(item)})
except Exception:
results.append({"error": str(e)})
return results, query_type
def get_databases() -> List[str]:
"""Get list of databases"""
driver = get_driver()
return [db.name for db in driver.databases.all()]
def database_exists(database: str) -> bool:
"""Check if database exists"""
driver = get_driver()
return driver.databases.contains(database)
def get_schema_types(database: str) -> Dict[str, Any]:
"""Get schema types from database using TypeDB 3.x TypeQL queries"""
driver = get_driver()
schema: Dict[str, Any] = {"entity_types": [], "relation_types": [], "attribute_types": []}
try:
# TypeDB 3.x: Use SCHEMA transaction type
with driver.transaction(database, TransactionType.SCHEMA) as tx:
# Query for entity types using TypeDB 3.x syntax
# In TypeDB 3.x, use: match entity $x;
try:
result = tx.query("match entity $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept and hasattr(concept, 'get_label'):
label = str(concept.get_label())
if label != 'entity':
schema["entity_types"].append(label)
except Exception as e:
print(f"Error getting entity types: {e}")
# Query for relation types
try:
result = tx.query("match relation $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept and hasattr(concept, 'get_label'):
label = str(concept.get_label())
if label != 'relation':
schema["relation_types"].append(label)
except Exception as e:
print(f"Error getting relation types: {e}")
# Query for attribute types
try:
result = tx.query("match attribute $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept and hasattr(concept, 'get_label'):
label = str(concept.get_label())
if label != 'attribute':
schema["attribute_types"].append(label)
except Exception as e:
print(f"Error getting attribute types: {e}")
except Exception as e:
schema["error"] = str(e)
return schema
def get_stats_data(database: str) -> Dict[str, Any]:
"""Get database statistics including entity/relation/attribute counts"""
driver = get_driver()
stats: Dict[str, Any] = {
"database": database,
"entity_types": [],
"relation_types": [],
"attribute_types": [],
"total_entities": 0,
"total_relations": 0,
}
# Check if database exists
if not driver.databases.contains(database):
stats["error"] = f"Database '{database}' not found"
return stats
# Get schema types using SCHEMA transaction
try:
with driver.transaction(database, TransactionType.SCHEMA) as tx:
# Get entity types
try:
result = tx.query("match entity $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept:
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
if label != 'entity':
is_abstract = False
if hasattr(concept, 'is_abstract'):
is_abstract = concept.is_abstract()
stats["entity_types"].append({
"label": label,
"abstract": is_abstract,
"count": 0
})
except Exception as e:
print(f"Error getting entity types: {e}")
# Get relation types
try:
result = tx.query("match relation $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept:
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
if label != 'relation':
stats["relation_types"].append({
"label": label,
"roles": [],
"count": 0
})
except Exception as e:
print(f"Error getting relation types: {e}")
# Get attribute types
try:
result = tx.query("match attribute $x;").resolve()
for row in result.as_concept_rows():
concept = row.get('x')
if concept:
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
if label != 'attribute':
stats["attribute_types"].append({
"label": label,
"valueType": "unknown",
"count": 0
})
except Exception as e:
print(f"Error getting attribute types: {e}")
except Exception as e:
stats["schema_error"] = str(e)
# Count instances using READ transaction
try:
with driver.transaction(database, TransactionType.READ) as tx:
for et in stats["entity_types"]:
if et.get("abstract"):
et["count"] = 0
continue
try:
label = et["label"]
# Count query for this entity type
result = tx.query(f"match $x isa {label};").resolve()
count = sum(1 for _ in result.as_concept_rows())
et["count"] = count
stats["total_entities"] += count
except Exception as e:
print(f"Error counting {label}: {e}")
et["count"] = 0
for rt in stats["relation_types"]:
try:
label = rt["label"]
result = tx.query(f"match $x isa {label};").resolve()
count = sum(1 for _ in result.as_concept_rows())
rt["count"] = count
stats["total_relations"] += count
except Exception:
rt["count"] = 0
for at in stats["attribute_types"]:
try:
label = at["label"]
result = tx.query(f"match $x isa {label};").resolve()
count = sum(1 for _ in result.as_concept_rows())
at["count"] = count
except Exception:
at["count"] = 0
except Exception as e:
stats["data_error"] = str(e)
return stats
# ============================================================================
# FastAPI App
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler"""
# Startup: Initialize driver
try:
get_driver()
except Exception as e:
print(f"Warning: Could not connect to TypeDB on startup: {e}")
yield
# Shutdown: Close driver
close_driver()
_executor.shutdown(wait=True)
app = FastAPI(
title="TypeDB Heritage API",
description="REST API for heritage institution TypeQL queries (TypeDB 3.x)",
version="3.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# Schema and Data Loading Functions
# ============================================================================
def load_schema_from_file(database: str, filepath: str) -> Dict[str, Any]:
"""Load TypeQL schema from a .tql file"""
driver = get_driver()
# Read the schema file
with open(filepath, 'r', encoding='utf-8') as f:
schema_content = f.read()
# TypeDB 3.x: Schema operations use SCHEMA transaction type
with driver.transaction(database, TransactionType.SCHEMA) as tx:
tx.query(schema_content).resolve()
tx.commit()
return {"status": "success", "file": filepath, "database": database}
def execute_write_query(database: str, query: str) -> Dict[str, Any]:
"""Execute a write query (insert/delete) in TypeDB 3.x"""
driver = get_driver()
result: Dict[str, Any] = {"status": "success", "inserted": 0}
# TypeDB 3.x: Data operations use WRITE transaction type
with driver.transaction(database, TransactionType.WRITE) as tx:
answer = tx.query(query).resolve()
# Try to count results
try:
if hasattr(answer, 'as_concept_rows'):
count = sum(1 for _ in answer.as_concept_rows())
result["inserted"] = count
elif hasattr(answer, '__iter__'):
count = sum(1 for _ in answer)
result["inserted"] = count
except Exception:
result["inserted"] = 1 # Assume at least one insertion
tx.commit()
return result
def reset_database(database: str) -> Dict[str, Any]:
"""Delete and recreate a database"""
driver = get_driver()
# Delete if exists
try:
if driver.databases.contains(database):
driver.databases.get(database).delete()
except Exception:
pass
# Create new
driver.databases.create(database)
return {"status": "success", "database": database, "action": "reset"}
# ============================================================================
# API Endpoints
# ============================================================================
@app.get("/", response_model=StatusResponse)
async def get_root_status() -> StatusResponse:
"""Get server status and statistics (root endpoint)"""
return await get_status()
@app.get("/status")
async def get_status() -> StatusResponse:
"""Get server status and statistics"""
loop = asyncio.get_event_loop()
databases = []
connected = False
version = "3.7.0"
try:
databases = await loop.run_in_executor(_executor, get_databases)
connected = True
except Exception as e:
print(f"Status check error: {e}")
connected = False
uptime = (datetime.now() - _start_time).total_seconds()
return StatusResponse(
status="healthy" if connected else "error",
databases=databases,
default_database=settings.database,
uptime_seconds=uptime,
typedb_version=version,
connected=connected,
database=settings.database,
version=version,
)
@app.post("/query", response_model=QueryResponse)
async def execute_query(request: QueryRequest) -> QueryResponse:
"""Execute a TypeQL query (read-only)
Supports both match and fetch queries:
- match $x isa custodian-observation;
- match $x isa custodian-observation; fetch { "id": $x.id, "name": $x.observed-name };
"""
loop = asyncio.get_event_loop()
database = request.database or settings.database
# Security: Only allow match/fetch queries for now
query_stripped = request.query.strip().lower()
if not (query_stripped.startswith("match") or query_stripped.startswith("fetch")):
raise HTTPException(
status_code=403,
detail="Only match/fetch queries are allowed for read operations."
)
start_time = datetime.now()
try:
results, query_type = await loop.run_in_executor(
_executor,
execute_read_query,
database,
request.query
)
execution_time = (datetime.now() - start_time).total_seconds() * 1000
return QueryResponse(
results=results,
result_count=len(results),
execution_time_ms=round(execution_time, 2),
query_type=query_type,
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/databases", response_model=List[DatabaseInfo])
async def list_databases() -> List[DatabaseInfo]:
"""List all databases"""
loop = asyncio.get_event_loop()
try:
databases = await loop.run_in_executor(_executor, get_databases)
return [DatabaseInfo(name=db) for db in databases]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/schema")
async def get_schema(database: Optional[str] = None) -> Dict[str, Any]:
"""Get schema for a database"""
loop = asyncio.get_event_loop()
db = database or settings.database
try:
# Check database exists
databases = await loop.run_in_executor(_executor, get_databases)
if db not in databases:
raise HTTPException(status_code=404, detail=f"Database '{db}' not found")
schema = await loop.run_in_executor(_executor, get_schema_types, db)
return {"database": db, "schema": schema}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/databases/{name}")
async def create_database(name: str) -> Dict[str, str]:
"""Create a new database"""
loop = asyncio.get_event_loop()
def _create_db() -> str:
driver = get_driver()
driver.databases.create(name)
return name
try:
await loop.run_in_executor(_executor, _create_db)
return {"status": "created", "database": name}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/schema/load")
async def load_schema(filepath: str = "/var/www/backend/typedb/01_custodian_name.tql", database: Optional[str] = None) -> Dict[str, Any]:
"""Load TypeQL schema from a file on the server"""
loop = asyncio.get_event_loop()
db = database or settings.database
# Security: Only allow files in the backend directory
if not filepath.startswith("/var/www/backend/typedb/"):
raise HTTPException(status_code=403, detail="Schema files must be in /var/www/backend/typedb/")
try:
result = await loop.run_in_executor(_executor, load_schema_from_file, db, filepath)
return result
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"Schema file not found: {filepath}")
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/data/insert")
async def insert_data(request: QueryRequest) -> Dict[str, Any]:
"""Execute an insert query to add data"""
loop = asyncio.get_event_loop()
database = request.database or settings.database
# Security: Only allow insert queries
query_stripped = request.query.strip().lower()
if not (query_stripped.startswith("insert") or (query_stripped.startswith("match") and "insert" in query_stripped)):
raise HTTPException(
status_code=403,
detail="Only insert queries are allowed for this endpoint."
)
start_time = datetime.now()
try:
result = await loop.run_in_executor(_executor, execute_write_query, database, request.query)
execution_time = (datetime.now() - start_time).total_seconds() * 1000
result["execution_time_ms"] = round(execution_time, 2)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.post("/database/reset/{name}")
async def reset_db(name: str) -> Dict[str, Any]:
"""Reset (delete and recreate) a database"""
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(_executor, reset_database, name)
return result
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/stats")
async def get_stats(database: Optional[str] = None) -> Dict[str, Any]:
"""Get database statistics including entity/relation/attribute counts"""
loop = asyncio.get_event_loop()
db = database or settings.database
try:
stats = await loop.run_in_executor(_executor, get_stats_data, db)
return stats
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/graph/{entity_type}")
async def get_graph_data(entity_type: str, limit: int = 100, database: Optional[str] = None) -> Dict[str, Any]:
"""Get graph data for visualization (nodes + edges) for a specific entity type
Uses TypeDB 3.x fetch query to get entities with their attributes.
For custodian-observation entities, fetches all key attributes.
"""
loop = asyncio.get_event_loop()
db = database or settings.database
# Key attributes to fetch for custodian-observation (in priority order)
# These are the most commonly populated and useful for display
CUSTODIAN_ATTRS = [
('id', 'id'),
('observed-name', 'name'),
('city', 'city'),
('country-code', 'country'),
('institution-type', 'institutionType'),
('latitude', 'lat'),
('longitude', 'lon'),
('region', 'region'),
('region-code', 'regionCode'),
('wikidata-id', 'wikidataId'),
('isil-code', 'isilCode'),
('website', 'website'),
('street-address', 'address'),
('description', 'description'),
('rating', 'rating'),
]
def _get_graph() -> Dict[str, Any]:
driver = get_driver()
nodes: List[Dict[str, Any]] = []
edges: List[Dict[str, Any]] = []
# Check if database exists
if not driver.databases.contains(db):
return {"nodes": [], "edges": [], "nodeCount": 0, "edgeCount": 0, "error": f"Database '{db}' not found"}
with driver.transaction(db, TransactionType.READ) as tx:
# For custodian-observation, fetch key attributes efficiently
if entity_type == "custodian-observation":
try:
# Build a fetch query that gets entities with their key attributes
# First get all IDs
id_query = f"match $x isa {entity_type}, has id $id; limit {limit};"
id_result = tx.query(id_query).resolve()
entity_ids = []
for row in id_result.as_concept_rows():
concept = row.get('id')
if concept and hasattr(concept, 'get_value'):
entity_ids.append(concept.get_value())
# For each entity, fetch all its attributes using a single query
for eid in entity_ids:
# Escape the ID for the query
eid_escaped = eid.replace('"', '\\"')
# Get all attributes for this entity
attrs = {"id": eid, "type": entity_type}
# Query each key attribute individually (more reliable than trying optional blocks)
for attr_name, json_key in CUSTODIAN_ATTRS:
if attr_name == 'id':
continue # Already have id
try:
attr_query = f'match $x isa {entity_type}, has id "{eid_escaped}", has {attr_name} $v; fetch {{ "v": $v }};'
attr_result = tx.query(attr_query).resolve()
for doc in attr_result.as_concept_documents():
val = doc.get('v')
if val is not None:
attrs[json_key] = val
break # Take first value
except Exception:
pass # Attribute not present for this entity
# Use name as label if available, otherwise use id
label = attrs.get('name', eid)
if isinstance(label, str) and len(label) > 40:
label = label[:37] + "..."
nodes.append({
"id": eid,
"label": label,
"type": "entity",
"entityType": entity_type,
"attributes": attrs,
})
except Exception as e:
print(f"Error with custodian-observation query: {e}")
# Fallback to simple ID-only query
nodes = _fallback_query(tx, entity_type, limit)
else:
# For other entity types, use a simpler approach
nodes = _fallback_query(tx, entity_type, limit)
return {
"nodes": nodes,
"edges": edges,
"nodeCount": len(nodes),
"edgeCount": len(edges),
}
def _fallback_query(tx, entity_type: str, limit: int) -> List[Dict[str, Any]]:
"""Fallback: simple match query returning just IDs"""
nodes = []
try:
result = tx.query(f"match $x isa {entity_type}, has id $id; limit {limit};").resolve()
count = 0
for row in result.as_concept_rows():
if count >= limit:
break
concept = row.get('id')
if concept and hasattr(concept, 'get_value'):
node_id = concept.get_value()
nodes.append({
"id": node_id,
"label": node_id[:30],
"type": "entity",
"entityType": entity_type,
"attributes": {"id": node_id, "type": entity_type},
})
count += 1
except Exception as e:
print(f"Fallback query failed: {e}")
return nodes
try:
result = await loop.run_in_executor(_executor, _get_graph)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# RAG-Optimized Search Endpoint
# ============================================================================
class RAGSearchRequest(BaseModel):
"""Request for RAG-optimized semantic search"""
query: str = Field(..., description="Natural language search query")
search_type: str = Field("semantic", description="Search type: semantic, name, type, location, id")
limit: int = Field(10, ge=1, le=100, description="Maximum results to return")
include_coordinates: bool = Field(True, description="Include lat/lon in results")
class RAGSearchResult(BaseModel):
"""Single search result optimized for RAG context"""
id: str
name: str
city: str | None = None
country: str | None = None
institution_type: str | None = None
lat: float | None = None
lon: float | None = None
wikidata_id: str | None = None
description: str | None = None
relevance_score: float = 1.0
class RAGSearchResponse(BaseModel):
"""Response for RAG search endpoint"""
results: List[RAGSearchResult]
total_count: int
query_time_ms: float
search_type: str
query: str
@app.post("/rag/search", response_model=RAGSearchResponse)
async def rag_search(request: RAGSearchRequest, database: Optional[str] = None) -> RAGSearchResponse:
"""Fast search endpoint optimized for RAG context retrieval.
Uses efficient batch queries to fetch essential attributes for RAG context.
Supports multiple search strategies:
- **semantic**: Natural language query (parses type + location patterns)
- **name**: Search by institution name (partial match)
- **type**: Search by institution type (M=museum, A=archive, L=library, G=gallery)
- **location**: Search by city name
- **id**: Search by exact GHCID
Examples:
{"query": "museums in Amsterdam", "search_type": "semantic"}
{"query": "Rijksmuseum", "search_type": "name"}
{"query": "M", "search_type": "type"} # All museums
{"query": "Rotterdam", "search_type": "location"}
{"query": "NL-NH-AMS-M-RMA", "search_type": "id"}
"""
import time
start_time = time.time()
loop = asyncio.get_event_loop()
db = database or settings.database
def _search() -> List[Dict[str, Any]]:
driver = get_driver()
results = []
if not driver.databases.contains(db):
return []
query_text = request.query.strip()
search_type = request.search_type.lower()
limit = request.limit
with driver.transaction(db, TransactionType.READ) as tx:
# Build the appropriate query based on search type
# All queries must include 'has id $id' for entity ID extraction
if search_type == "id":
# Exact ID match
escaped_id = escape_typeql_string(query_text)
match_query = f'match $x isa custodian-observation, has id $id; $id = "{escaped_id}"'
elif search_type == "name":
# Name contains search
escaped_name = escape_typeql_string(query_text.lower())
match_query = f'match $x isa custodian-observation, has observed-name $name, has id $id; $name contains "{escaped_name}"'
elif search_type == "type":
# Institution type filter
type_code = query_text.upper()[:1] # First letter only
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has id $id'
elif search_type == "location":
# City search
escaped_city = escape_typeql_string(query_text.lower())
match_query = f'match $x isa custodian-observation, has city $city, has id $id; $city contains "{escaped_city}"'
else: # semantic - parse natural language
# Extract type and location from query
query_lower = query_text.lower()
# Detect institution type mentions
type_code = None
if any(w in query_lower for w in ['museum', 'musea', 'museums']):
type_code = 'M'
elif any(w in query_lower for w in ['archive', 'archief', 'archives']):
type_code = 'A'
elif any(w in query_lower for w in ['library', 'bibliotheek', 'libraries']):
type_code = 'L'
elif any(w in query_lower for w in ['gallery', 'galerie', 'galleries']):
type_code = 'G'
# Extract potential city names (simple heuristic: words after "in" or capitalized words)
city_pattern = None
if ' in ' in query_lower:
parts = query_lower.split(' in ')
if len(parts) > 1:
city_pattern = parts[-1].strip()
# Build query - need to structure for adding id fetch
# TypeQL requires: match clause; constraint; has id $id;
if type_code and city_pattern:
escaped_city = escape_typeql_string(city_pattern)
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has city $city, has id $id; $city contains "{escaped_city}"'
elif type_code:
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has id $id'
elif city_pattern:
escaped_city = escape_typeql_string(city_pattern)
match_query = f'match $x isa custodian-observation, has city $city, has id $id; $city contains "{escaped_city}"'
else:
# Fallback: search in name
escaped_q = escape_typeql_string(query_lower)
match_query = f'match $x isa custodian-observation, has observed-name $name, has id $id; $name contains "{escaped_q}"'
# First get all matching entity IDs
try:
# Query already has $id variable, just add limit
id_query = f"{match_query}; limit {limit};"
id_result = tx.query(id_query).resolve()
entity_ids = []
for row in id_result.as_concept_rows():
concept = row.get('id')
if concept and hasattr(concept, 'get_value'):
entity_ids.append(concept.get_value())
# Now fetch essential attributes for each entity in batch
# This is faster than per-entity queries for small result sets
for eid in entity_ids:
eid_escaped = eid.replace('"', '\\"')
# Fetch all key attributes in one query using fetch
try:
fetch_query = f'''
match $x isa custodian-observation, has id "{eid_escaped}";
$x has observed-name $name;
fetch {{
"id": $x.id,
"name": $name
}};
'''
# Note: This fetch approach may not work in TypeDB 3.x
# Fall back to individual attribute queries
except:
pass
# Individual attribute fetching (more reliable)
attrs = {"id": eid}
# Essential attributes for RAG context
essential_attrs = [
('observed-name', 'name'),
('city', 'city'),
('country-code', 'country'),
('institution-type', 'institution_type'),
('wikidata-id', 'wikidata_id'),
('description', 'description'),
]
if request.include_coordinates:
essential_attrs.extend([
('latitude', 'lat'),
('longitude', 'lon'),
])
for attr_name, json_key in essential_attrs:
try:
attr_query = f'match $x isa custodian-observation, has id "{eid_escaped}", has {attr_name} $v; fetch {{ "v": $v }};'
attr_result = tx.query(attr_query).resolve()
for doc in attr_result.as_concept_documents():
val = doc.get('v')
if val is not None:
attrs[json_key] = val
break
except Exception:
pass
results.append(attrs)
except Exception as e:
print(f"RAG search query failed: {e}")
return results
try:
raw_results = await loop.run_in_executor(_executor, _search)
# Convert to response model
search_results = []
for r in raw_results:
search_results.append(RAGSearchResult(
id=r.get('id', ''),
name=r.get('name', r.get('id', 'Unknown')),
city=r.get('city'),
country=r.get('country'),
institution_type=r.get('institution_type'),
lat=r.get('lat'),
lon=r.get('lon'),
wikidata_id=r.get('wikidata_id'),
description=r.get('description'),
relevance_score=1.0,
))
elapsed_ms = (time.time() - start_time) * 1000
return RAGSearchResponse(
results=search_results,
total_count=len(search_results),
query_time_ms=round(elapsed_ms, 2),
search_type=request.search_type,
query=request.query,
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# PostgreSQL Sync Functions
# ============================================================================
def escape_typeql_string(value: str) -> str:
"""Escape a string for TypeQL - handle quotes and special characters"""
if not value:
return ""
# Escape backslashes first, then double quotes
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
# Remove or replace problematic characters
escaped = re.sub(r'[\x00-\x1f\x7f]', '', escaped) # Remove control characters
return escaped
def get_pg_connection():
"""Get PostgreSQL connection"""
if not HAS_PSYCOPG2:
raise RuntimeError("psycopg2 not installed. Run: pip install psycopg2-binary")
return psycopg2.connect(
host=settings.pg_host,
port=settings.pg_port,
database=settings.pg_database,
user=settings.pg_user,
password=settings.pg_password,
)
def sync_postgresql_to_typedb(
database: str,
batch_size: int = 100,
dry_run: bool = False,
clear_existing: bool = False,
) -> Dict[str, Any]:
"""Sync institutions from PostgreSQL to TypeDB as custodian-observations
Args:
database: TypeDB database name
batch_size: Number of records to insert per transaction
dry_run: If True, only count records without inserting
clear_existing: If True, delete existing custodian-observations first
Returns:
Dict with sync statistics
"""
driver = get_driver()
result = {
"status": "success",
"database": database,
"dry_run": dry_run,
"total_in_postgresql": 0,
"existing_in_typedb": 0,
"inserted": 0,
"skipped": 0,
"errors": [],
}
# Get existing IDs from TypeDB
existing_ids = set()
try:
with driver.transaction(database, TransactionType.READ) as tx:
query_result = tx.query("match $x isa custodian-observation, has id $id;").resolve()
for row in query_result.as_concept_rows():
concept = row.get('id')
if concept and hasattr(concept, 'get_value'):
existing_ids.add(concept.get_value())
result["existing_in_typedb"] = len(existing_ids)
except Exception as e:
result["errors"].append(f"Error getting existing IDs: {e}")
# Clear existing if requested
if clear_existing and not dry_run:
try:
with driver.transaction(database, TransactionType.WRITE) as tx:
tx.query("match $x isa custodian-observation; delete $x;").resolve()
tx.commit()
existing_ids = set()
result["cleared_existing"] = True
except Exception as e:
result["errors"].append(f"Error clearing existing: {e}")
# Get institutions from PostgreSQL
try:
conn = get_pg_connection()
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
# Count total
cur.execute("SELECT COUNT(*) FROM custodians WHERE ghcid IS NOT NULL")
result["total_in_postgresql"] = cur.fetchone()[0]
if dry_run:
result["would_insert"] = result["total_in_postgresql"] - len(existing_ids)
cur.close()
conn.close()
return result
# Fetch all custodians with all relevant fields
cur.execute("""
SELECT ghcid, name, city, region, region_code, country_code, type, type_name,
lat, lon, wikidata_id, isil_code, viaf_id, google_place_id,
website, email, phone, street_address, postal_code,
description, rating, founding_year
FROM custodians
WHERE ghcid IS NOT NULL
ORDER BY ghcid
""")
rows = cur.fetchall()
cur.close()
conn.close()
except Exception as e:
result["status"] = "error"
result["errors"].append(f"PostgreSQL error: {e}")
return result
# Build and execute insert queries in batches
batch = []
for row in rows:
ghcid = row['ghcid']
# Skip if already exists (unless clear_existing)
if ghcid in existing_ids:
result["skipped"] += 1
continue
# Build insert statement for this custodian
name = escape_typeql_string(row['name'] or "")
city = escape_typeql_string(row['city'] or "")
region = escape_typeql_string(row['region'] or "")
region_code = escape_typeql_string(row['region_code'] or "")
inst_type = escape_typeql_string(row['type'] or "")
country_code = escape_typeql_string(row['country_code'] or "")
# Coordinates
lat = row['lat']
lon = row['lon']
# Identifiers
wikidata_id = escape_typeql_string(row['wikidata_id'] or "")
isil_code = escape_typeql_string(row['isil_code'] or "")
viaf_id = escape_typeql_string(row['viaf_id'] or "")
google_place_id = escape_typeql_string(row['google_place_id'] or "")
# Contact info
website = escape_typeql_string(row['website'] or "")
email = escape_typeql_string(row['email'] or "")
phone = escape_typeql_string(row['phone'] or "")
street_address = escape_typeql_string(row['street_address'] or "")
postal_code = escape_typeql_string(row['postal_code'] or "")
# Descriptive fields
description = escape_typeql_string(row['description'] or "")
rating = row['rating']
founding_year = row['founding_year']
# Fallback: Extract country code from GHCID if not in row
if not country_code and ghcid and len(ghcid) >= 2:
country_code = ghcid[:2]
# Build attributes - only add non-empty ones
attrs = [f'has id "{ghcid}"']
if name:
attrs.append(f'has observed-name "{name}"')
if city:
attrs.append(f'has city "{city}"')
if country_code:
attrs.append(f'has country-code "{country_code}"')
if inst_type:
attrs.append(f'has institution-type "{inst_type}"')
if region:
attrs.append(f'has region "{region}"')
if region_code:
attrs.append(f'has region-code "{region_code}"')
# Add coordinates (TypeDB double values)
if lat is not None:
attrs.append(f'has latitude {lat}')
if lon is not None:
attrs.append(f'has longitude {lon}')
# Add identifiers
if wikidata_id:
attrs.append(f'has wikidata-id "{wikidata_id}"')
if isil_code:
attrs.append(f'has isil-code "{isil_code}"')
if viaf_id:
attrs.append(f'has viaf-id "{viaf_id}"')
if google_place_id:
attrs.append(f'has google-place-id "{google_place_id}"')
# Add contact info
if website:
attrs.append(f'has website "{website}"')
if email:
attrs.append(f'has email "{email}"')
if phone:
attrs.append(f'has phone "{phone}"')
if street_address:
attrs.append(f'has street-address "{street_address}"')
if postal_code:
attrs.append(f'has postal-code "{postal_code}"')
# Add descriptive fields
if description:
attrs.append(f'has description "{description}"')
if rating is not None:
attrs.append(f'has rating {rating}')
if founding_year is not None:
attrs.append(f'has founding-year {founding_year}')
insert_stmt = f'$x{len(batch)} isa custodian-observation, {", ".join(attrs)};'
batch.append(insert_stmt)
# Execute batch when full
if len(batch) >= batch_size:
try:
query = "insert " + " ".join(batch)
with driver.transaction(database, TransactionType.WRITE) as tx:
tx.query(query).resolve()
tx.commit()
result["inserted"] += len(batch)
except Exception as e:
result["errors"].append(f"Batch insert error: {e}")
batch = []
# Insert remaining batch
if batch:
try:
query = "insert " + " ".join(batch)
with driver.transaction(database, TransactionType.WRITE) as tx:
tx.query(query).resolve()
tx.commit()
result["inserted"] += len(batch)
except Exception as e:
result["errors"].append(f"Final batch error: {e}")
return result
@app.post("/sync/postgresql")
async def sync_from_postgresql(
database: Optional[str] = None,
batch_size: int = Query(default=100, ge=1, le=1000),
dry_run: bool = Query(default=False),
clear_existing: bool = Query(default=False),
) -> Dict[str, Any]:
"""Sync institutions from PostgreSQL to TypeDB
- database: TypeDB database name (defaults to 'glam')
- batch_size: Number of records per transaction (1-1000)
- dry_run: If true, only count records without inserting
- clear_existing: If true, delete existing custodian-observations first
"""
loop = asyncio.get_event_loop()
db = database or settings.database
if not HAS_PSYCOPG2:
raise HTTPException(
status_code=500,
detail="psycopg2 not installed on server. Run: pip install psycopg2-binary"
)
try:
result = await loop.run_in_executor(
_executor,
sync_postgresql_to_typedb,
db,
batch_size,
dry_run,
clear_existing,
)
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============================================================================
# Main Entry Point
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.api_host,
port=settings.api_port,
reload=True,
)