1388 lines
51 KiB
Python
1388 lines
51 KiB
Python
"""
|
|
TypeDB REST API for Heritage Custodian Data
|
|
FastAPI backend providing TypeQL query interface for bronhouder.nl
|
|
|
|
Updated for TypeDB 3.x driver API (no sessions, direct transactions)
|
|
|
|
Endpoints:
|
|
- GET / - Health check and statistics
|
|
- GET /status - Server status
|
|
- POST /query - Execute TypeQL match query (read-only)
|
|
- GET /databases - List all databases
|
|
- POST /databases/{n} - Create new database
|
|
- GET /schema - Get database schema types
|
|
- POST /schema/load - Load TypeQL schema from file
|
|
- POST /data/insert - Execute insert query
|
|
- POST /database/reset/{n} - Reset (delete/recreate) database
|
|
- GET /stats - Get detailed statistics
|
|
- POST /sync/postgresql - Sync data from PostgreSQL to TypeDB
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional, List, Dict, Any
|
|
from contextlib import asynccontextmanager
|
|
import asyncio
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
from fastapi import FastAPI, HTTPException, Query
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel, Field
|
|
|
|
# TypeDB 3.x imports
|
|
from typedb.driver import TypeDB, TransactionType, Credentials, DriverOptions
|
|
|
|
# PostgreSQL for sync
|
|
try:
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
HAS_PSYCOPG2 = True
|
|
except ImportError:
|
|
HAS_PSYCOPG2 = False
|
|
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
class Settings(BaseModel):
|
|
"""TypeDB server settings"""
|
|
host: str = os.getenv("TYPEDB_HOST", "localhost")
|
|
port: int = int(os.getenv("TYPEDB_PORT", "1729"))
|
|
database: str = os.getenv("TYPEDB_DATABASE", "glam")
|
|
|
|
# TypeDB 3.x authentication
|
|
username: str = os.getenv("TYPEDB_USERNAME", "admin")
|
|
password: str = os.getenv("TYPEDB_PASSWORD", "password")
|
|
|
|
# Server settings
|
|
api_host: str = os.getenv("API_HOST", "0.0.0.0")
|
|
api_port: int = int(os.getenv("API_PORT", "8003"))
|
|
|
|
# PostgreSQL settings for sync (glam_geo database with custodians table)
|
|
pg_host: str = os.getenv("POSTGRES_HOST", "localhost")
|
|
pg_port: int = int(os.getenv("POSTGRES_PORT", "5432"))
|
|
pg_database: str = os.getenv("POSTGRES_DATABASE", "glam_geo")
|
|
pg_user: str = os.getenv("POSTGRES_USER", "glam_api")
|
|
pg_password: str = os.getenv("POSTGRES_PASSWORD", "glam_secret_2025")
|
|
|
|
|
|
settings = Settings()
|
|
|
|
|
|
# ============================================================================
|
|
# Pydantic Models
|
|
# ============================================================================
|
|
|
|
class QueryRequest(BaseModel):
|
|
"""TypeQL query request"""
|
|
query: str = Field(..., description="TypeQL query to execute")
|
|
database: Optional[str] = Field(None, description="Database name (defaults to 'glam')")
|
|
|
|
|
|
class QueryResponse(BaseModel):
|
|
"""TypeQL query response"""
|
|
results: List[Dict[str, Any]]
|
|
result_count: int
|
|
execution_time_ms: float
|
|
query_type: str
|
|
|
|
|
|
class DatabaseInfo(BaseModel):
|
|
"""Database metadata"""
|
|
name: str
|
|
|
|
|
|
class StatusResponse(BaseModel):
|
|
"""Server status response"""
|
|
status: str
|
|
databases: List[str]
|
|
default_database: str
|
|
uptime_seconds: float
|
|
typedb_version: str
|
|
# Fields for frontend compatibility
|
|
connected: bool = False
|
|
database: str = ""
|
|
version: str = ""
|
|
|
|
|
|
# ============================================================================
|
|
# Global State
|
|
# ============================================================================
|
|
|
|
_driver: Any = None
|
|
_executor = ThreadPoolExecutor(max_workers=4)
|
|
_start_time: datetime = datetime.now()
|
|
|
|
|
|
def get_driver() -> Any:
|
|
"""Get or create TypeDB 3.x driver"""
|
|
global _driver
|
|
|
|
if _driver is None:
|
|
# TypeDB 3.x: requires Credentials and DriverOptions
|
|
address = f"{settings.host}:{settings.port}"
|
|
credentials = Credentials(settings.username, settings.password)
|
|
options = DriverOptions(is_tls_enabled=False, tls_root_ca_path=None)
|
|
|
|
_driver = TypeDB.driver(address, credentials, options)
|
|
|
|
return _driver
|
|
|
|
|
|
def close_driver():
|
|
"""Close the driver connection"""
|
|
global _driver
|
|
if _driver is not None:
|
|
_driver.close()
|
|
_driver = None
|
|
|
|
|
|
# ============================================================================
|
|
# Helper Functions for TypeDB 3.x
|
|
# ============================================================================
|
|
|
|
def serialize_concept_3x(concept: Any) -> Dict[str, Any]:
|
|
"""Convert TypeDB 3.x concept to JSON-serializable dict"""
|
|
result: Dict[str, Any] = {}
|
|
|
|
# Check if it's already a dict (concept document)
|
|
if isinstance(concept, dict):
|
|
return concept
|
|
|
|
# Get type information
|
|
if hasattr(concept, 'get_type'):
|
|
concept_type = concept.get_type()
|
|
if concept_type:
|
|
if hasattr(concept_type, 'get_label'):
|
|
label = concept_type.get_label()
|
|
result['_type'] = str(label)
|
|
result['type'] = str(label)
|
|
|
|
# Handle IID (Internal ID)
|
|
if hasattr(concept, 'get_iid'):
|
|
iid = concept.get_iid()
|
|
if iid is not None:
|
|
result['_iid'] = str(iid)
|
|
result['id'] = str(iid)
|
|
|
|
# Handle value (for attributes)
|
|
if hasattr(concept, 'get_value'):
|
|
try:
|
|
result['value'] = concept.get_value()
|
|
except Exception:
|
|
pass
|
|
|
|
# Try to get label directly (for type concepts)
|
|
if hasattr(concept, 'get_label'):
|
|
try:
|
|
result['label'] = str(concept.get_label())
|
|
except Exception:
|
|
pass
|
|
|
|
return result
|
|
|
|
|
|
def execute_read_query(database: str, query: str) -> tuple:
|
|
"""Execute a read query in TypeDB 3.x (blocking)
|
|
|
|
TypeDB 3.x query patterns:
|
|
- match $x isa entity-type; -> returns _ConceptRowIterator
|
|
- match $x isa type; fetch { "key": $x.attr }; -> returns _ConceptDocumentIterator
|
|
"""
|
|
driver = get_driver()
|
|
results: List[Dict[str, Any]] = []
|
|
query_type = "unknown"
|
|
|
|
# Determine query type
|
|
query_stripped = query.strip().lower()
|
|
if "fetch" in query_stripped:
|
|
query_type = "fetch"
|
|
elif query_stripped.startswith("match"):
|
|
query_type = "match"
|
|
elif query_stripped.startswith("define"):
|
|
query_type = "define"
|
|
elif query_stripped.startswith("insert"):
|
|
query_type = "insert"
|
|
|
|
# TypeDB 3.x: Transaction directly on driver, no session
|
|
with driver.transaction(database, TransactionType.READ) as tx:
|
|
# Execute query - TypeDB 3.x uses tx.query().resolve()
|
|
answer = tx.query(query).resolve()
|
|
|
|
# Handle different result types based on query type
|
|
if query_type == "fetch":
|
|
# Fetch query results return ConceptDocumentIterator
|
|
try:
|
|
for doc in answer.as_concept_documents():
|
|
results.append(doc)
|
|
except Exception as e:
|
|
# Fallback: try to iterate directly
|
|
try:
|
|
for item in answer:
|
|
if isinstance(item, dict):
|
|
results.append(item)
|
|
else:
|
|
results.append({"result": str(item)})
|
|
except Exception:
|
|
results.append({"error": str(e)})
|
|
else:
|
|
# Match query results return ConceptRowIterator
|
|
try:
|
|
for row in answer.as_concept_rows():
|
|
row_dict = {}
|
|
# Get column names - returns an iterator in TypeDB 3.x
|
|
col_names = list(row.column_names())
|
|
for var in col_names:
|
|
concept = row.get(var)
|
|
if concept:
|
|
row_dict[var] = serialize_concept_3x(concept)
|
|
results.append(row_dict)
|
|
except Exception as e:
|
|
# If as_concept_rows fails, try other methods
|
|
try:
|
|
for item in answer:
|
|
results.append({"result": str(item)})
|
|
except Exception:
|
|
results.append({"error": str(e)})
|
|
|
|
return results, query_type
|
|
|
|
|
|
def get_databases() -> List[str]:
|
|
"""Get list of databases"""
|
|
driver = get_driver()
|
|
return [db.name for db in driver.databases.all()]
|
|
|
|
|
|
def database_exists(database: str) -> bool:
|
|
"""Check if database exists"""
|
|
driver = get_driver()
|
|
return driver.databases.contains(database)
|
|
|
|
|
|
def get_schema_types(database: str) -> Dict[str, Any]:
|
|
"""Get schema types from database using TypeDB 3.x TypeQL queries"""
|
|
driver = get_driver()
|
|
schema: Dict[str, Any] = {"entity_types": [], "relation_types": [], "attribute_types": []}
|
|
|
|
try:
|
|
# TypeDB 3.x: Use SCHEMA transaction type
|
|
with driver.transaction(database, TransactionType.SCHEMA) as tx:
|
|
# Query for entity types using TypeDB 3.x syntax
|
|
# In TypeDB 3.x, use: match entity $x;
|
|
try:
|
|
result = tx.query("match entity $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept and hasattr(concept, 'get_label'):
|
|
label = str(concept.get_label())
|
|
if label != 'entity':
|
|
schema["entity_types"].append(label)
|
|
except Exception as e:
|
|
print(f"Error getting entity types: {e}")
|
|
|
|
# Query for relation types
|
|
try:
|
|
result = tx.query("match relation $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept and hasattr(concept, 'get_label'):
|
|
label = str(concept.get_label())
|
|
if label != 'relation':
|
|
schema["relation_types"].append(label)
|
|
except Exception as e:
|
|
print(f"Error getting relation types: {e}")
|
|
|
|
# Query for attribute types
|
|
try:
|
|
result = tx.query("match attribute $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept and hasattr(concept, 'get_label'):
|
|
label = str(concept.get_label())
|
|
if label != 'attribute':
|
|
schema["attribute_types"].append(label)
|
|
except Exception as e:
|
|
print(f"Error getting attribute types: {e}")
|
|
|
|
except Exception as e:
|
|
schema["error"] = str(e)
|
|
|
|
return schema
|
|
|
|
|
|
def get_stats_data(database: str) -> Dict[str, Any]:
|
|
"""Get database statistics including entity/relation/attribute counts"""
|
|
driver = get_driver()
|
|
stats: Dict[str, Any] = {
|
|
"database": database,
|
|
"entity_types": [],
|
|
"relation_types": [],
|
|
"attribute_types": [],
|
|
"total_entities": 0,
|
|
"total_relations": 0,
|
|
}
|
|
|
|
# Check if database exists
|
|
if not driver.databases.contains(database):
|
|
stats["error"] = f"Database '{database}' not found"
|
|
return stats
|
|
|
|
# Get schema types using SCHEMA transaction
|
|
try:
|
|
with driver.transaction(database, TransactionType.SCHEMA) as tx:
|
|
# Get entity types
|
|
try:
|
|
result = tx.query("match entity $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept:
|
|
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
|
|
if label != 'entity':
|
|
is_abstract = False
|
|
if hasattr(concept, 'is_abstract'):
|
|
is_abstract = concept.is_abstract()
|
|
stats["entity_types"].append({
|
|
"label": label,
|
|
"abstract": is_abstract,
|
|
"count": 0
|
|
})
|
|
except Exception as e:
|
|
print(f"Error getting entity types: {e}")
|
|
|
|
# Get relation types
|
|
try:
|
|
result = tx.query("match relation $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept:
|
|
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
|
|
if label != 'relation':
|
|
stats["relation_types"].append({
|
|
"label": label,
|
|
"roles": [],
|
|
"count": 0
|
|
})
|
|
except Exception as e:
|
|
print(f"Error getting relation types: {e}")
|
|
|
|
# Get attribute types
|
|
try:
|
|
result = tx.query("match attribute $x;").resolve()
|
|
for row in result.as_concept_rows():
|
|
concept = row.get('x')
|
|
if concept:
|
|
label = str(concept.get_label()) if hasattr(concept, 'get_label') else str(concept)
|
|
if label != 'attribute':
|
|
stats["attribute_types"].append({
|
|
"label": label,
|
|
"valueType": "unknown",
|
|
"count": 0
|
|
})
|
|
except Exception as e:
|
|
print(f"Error getting attribute types: {e}")
|
|
except Exception as e:
|
|
stats["schema_error"] = str(e)
|
|
|
|
# Count instances using READ transaction
|
|
try:
|
|
with driver.transaction(database, TransactionType.READ) as tx:
|
|
for et in stats["entity_types"]:
|
|
if et.get("abstract"):
|
|
et["count"] = 0
|
|
continue
|
|
try:
|
|
label = et["label"]
|
|
# Count query for this entity type
|
|
result = tx.query(f"match $x isa {label};").resolve()
|
|
count = sum(1 for _ in result.as_concept_rows())
|
|
et["count"] = count
|
|
stats["total_entities"] += count
|
|
except Exception as e:
|
|
print(f"Error counting {label}: {e}")
|
|
et["count"] = 0
|
|
|
|
for rt in stats["relation_types"]:
|
|
try:
|
|
label = rt["label"]
|
|
result = tx.query(f"match $x isa {label};").resolve()
|
|
count = sum(1 for _ in result.as_concept_rows())
|
|
rt["count"] = count
|
|
stats["total_relations"] += count
|
|
except Exception:
|
|
rt["count"] = 0
|
|
|
|
for at in stats["attribute_types"]:
|
|
try:
|
|
label = at["label"]
|
|
result = tx.query(f"match $x isa {label};").resolve()
|
|
count = sum(1 for _ in result.as_concept_rows())
|
|
at["count"] = count
|
|
except Exception:
|
|
at["count"] = 0
|
|
except Exception as e:
|
|
stats["data_error"] = str(e)
|
|
|
|
return stats
|
|
|
|
|
|
# ============================================================================
|
|
# FastAPI App
|
|
# ============================================================================
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan handler"""
|
|
# Startup: Initialize driver
|
|
try:
|
|
get_driver()
|
|
except Exception as e:
|
|
print(f"Warning: Could not connect to TypeDB on startup: {e}")
|
|
yield
|
|
# Shutdown: Close driver
|
|
close_driver()
|
|
_executor.shutdown(wait=True)
|
|
|
|
|
|
app = FastAPI(
|
|
title="TypeDB Heritage API",
|
|
description="REST API for heritage institution TypeQL queries (TypeDB 3.x)",
|
|
version="3.0.0",
|
|
lifespan=lifespan,
|
|
)
|
|
|
|
# CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
|
|
# ============================================================================
|
|
# Schema and Data Loading Functions
|
|
# ============================================================================
|
|
|
|
def load_schema_from_file(database: str, filepath: str) -> Dict[str, Any]:
|
|
"""Load TypeQL schema from a .tql file"""
|
|
driver = get_driver()
|
|
|
|
# Read the schema file
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
schema_content = f.read()
|
|
|
|
# TypeDB 3.x: Schema operations use SCHEMA transaction type
|
|
with driver.transaction(database, TransactionType.SCHEMA) as tx:
|
|
tx.query(schema_content).resolve()
|
|
tx.commit()
|
|
|
|
return {"status": "success", "file": filepath, "database": database}
|
|
|
|
|
|
def execute_write_query(database: str, query: str) -> Dict[str, Any]:
|
|
"""Execute a write query (insert/delete) in TypeDB 3.x"""
|
|
driver = get_driver()
|
|
result: Dict[str, Any] = {"status": "success", "inserted": 0}
|
|
|
|
# TypeDB 3.x: Data operations use WRITE transaction type
|
|
with driver.transaction(database, TransactionType.WRITE) as tx:
|
|
answer = tx.query(query).resolve()
|
|
|
|
# Try to count results
|
|
try:
|
|
if hasattr(answer, 'as_concept_rows'):
|
|
count = sum(1 for _ in answer.as_concept_rows())
|
|
result["inserted"] = count
|
|
elif hasattr(answer, '__iter__'):
|
|
count = sum(1 for _ in answer)
|
|
result["inserted"] = count
|
|
except Exception:
|
|
result["inserted"] = 1 # Assume at least one insertion
|
|
|
|
tx.commit()
|
|
|
|
return result
|
|
|
|
|
|
def reset_database(database: str) -> Dict[str, Any]:
|
|
"""Delete and recreate a database"""
|
|
driver = get_driver()
|
|
|
|
# Delete if exists
|
|
try:
|
|
if driver.databases.contains(database):
|
|
driver.databases.get(database).delete()
|
|
except Exception:
|
|
pass
|
|
|
|
# Create new
|
|
driver.databases.create(database)
|
|
|
|
return {"status": "success", "database": database, "action": "reset"}
|
|
|
|
|
|
# ============================================================================
|
|
# API Endpoints
|
|
# ============================================================================
|
|
|
|
@app.get("/", response_model=StatusResponse)
|
|
async def get_root_status() -> StatusResponse:
|
|
"""Get server status and statistics (root endpoint)"""
|
|
return await get_status()
|
|
|
|
|
|
@app.get("/status")
|
|
async def get_status() -> StatusResponse:
|
|
"""Get server status and statistics"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
databases = []
|
|
connected = False
|
|
version = "3.7.0"
|
|
|
|
try:
|
|
databases = await loop.run_in_executor(_executor, get_databases)
|
|
connected = True
|
|
except Exception as e:
|
|
print(f"Status check error: {e}")
|
|
connected = False
|
|
|
|
uptime = (datetime.now() - _start_time).total_seconds()
|
|
|
|
return StatusResponse(
|
|
status="healthy" if connected else "error",
|
|
databases=databases,
|
|
default_database=settings.database,
|
|
uptime_seconds=uptime,
|
|
typedb_version=version,
|
|
connected=connected,
|
|
database=settings.database,
|
|
version=version,
|
|
)
|
|
|
|
|
|
@app.post("/query", response_model=QueryResponse)
|
|
async def execute_query(request: QueryRequest) -> QueryResponse:
|
|
"""Execute a TypeQL query (read-only)
|
|
|
|
Supports both match and fetch queries:
|
|
- match $x isa custodian-observation;
|
|
- match $x isa custodian-observation; fetch { "id": $x.id, "name": $x.observed-name };
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
database = request.database or settings.database
|
|
|
|
# Security: Only allow match/fetch queries for now
|
|
query_stripped = request.query.strip().lower()
|
|
if not (query_stripped.startswith("match") or query_stripped.startswith("fetch")):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Only match/fetch queries are allowed for read operations."
|
|
)
|
|
|
|
start_time = datetime.now()
|
|
|
|
try:
|
|
results, query_type = await loop.run_in_executor(
|
|
_executor,
|
|
execute_read_query,
|
|
database,
|
|
request.query
|
|
)
|
|
|
|
execution_time = (datetime.now() - start_time).total_seconds() * 1000
|
|
|
|
return QueryResponse(
|
|
results=results,
|
|
result_count=len(results),
|
|
execution_time_ms=round(execution_time, 2),
|
|
query_type=query_type,
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@app.get("/databases", response_model=List[DatabaseInfo])
|
|
async def list_databases() -> List[DatabaseInfo]:
|
|
"""List all databases"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
databases = await loop.run_in_executor(_executor, get_databases)
|
|
return [DatabaseInfo(name=db) for db in databases]
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/schema")
|
|
async def get_schema(database: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get schema for a database"""
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
try:
|
|
# Check database exists
|
|
databases = await loop.run_in_executor(_executor, get_databases)
|
|
if db not in databases:
|
|
raise HTTPException(status_code=404, detail=f"Database '{db}' not found")
|
|
|
|
schema = await loop.run_in_executor(_executor, get_schema_types, db)
|
|
return {"database": db, "schema": schema}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.post("/databases/{name}")
|
|
async def create_database(name: str) -> Dict[str, str]:
|
|
"""Create a new database"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
def _create_db() -> str:
|
|
driver = get_driver()
|
|
driver.databases.create(name)
|
|
return name
|
|
|
|
try:
|
|
await loop.run_in_executor(_executor, _create_db)
|
|
return {"status": "created", "database": name}
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@app.post("/schema/load")
|
|
async def load_schema(filepath: str = "/var/www/backend/typedb/01_custodian_name.tql", database: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Load TypeQL schema from a file on the server"""
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
# Security: Only allow files in the backend directory
|
|
if not filepath.startswith("/var/www/backend/typedb/"):
|
|
raise HTTPException(status_code=403, detail="Schema files must be in /var/www/backend/typedb/")
|
|
|
|
try:
|
|
result = await loop.run_in_executor(_executor, load_schema_from_file, db, filepath)
|
|
return result
|
|
except FileNotFoundError:
|
|
raise HTTPException(status_code=404, detail=f"Schema file not found: {filepath}")
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@app.post("/data/insert")
|
|
async def insert_data(request: QueryRequest) -> Dict[str, Any]:
|
|
"""Execute an insert query to add data"""
|
|
loop = asyncio.get_event_loop()
|
|
database = request.database or settings.database
|
|
|
|
# Security: Only allow insert queries
|
|
query_stripped = request.query.strip().lower()
|
|
if not (query_stripped.startswith("insert") or (query_stripped.startswith("match") and "insert" in query_stripped)):
|
|
raise HTTPException(
|
|
status_code=403,
|
|
detail="Only insert queries are allowed for this endpoint."
|
|
)
|
|
|
|
start_time = datetime.now()
|
|
|
|
try:
|
|
result = await loop.run_in_executor(_executor, execute_write_query, database, request.query)
|
|
execution_time = (datetime.now() - start_time).total_seconds() * 1000
|
|
result["execution_time_ms"] = round(execution_time, 2)
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@app.post("/database/reset/{name}")
|
|
async def reset_db(name: str) -> Dict[str, Any]:
|
|
"""Reset (delete and recreate) a database"""
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
|
result = await loop.run_in_executor(_executor, reset_database, name)
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
|
|
@app.get("/stats")
|
|
async def get_stats(database: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get database statistics including entity/relation/attribute counts"""
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
try:
|
|
stats = await loop.run_in_executor(_executor, get_stats_data, db)
|
|
return stats
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
@app.get("/graph/{entity_type}")
|
|
async def get_graph_data(entity_type: str, limit: int = 100, database: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Get graph data for visualization (nodes + edges) for a specific entity type
|
|
|
|
Uses TypeDB 3.x fetch query to get entities with their attributes.
|
|
For custodian-observation entities, fetches all key attributes.
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
# Key attributes to fetch for custodian-observation (in priority order)
|
|
# These are the most commonly populated and useful for display
|
|
CUSTODIAN_ATTRS = [
|
|
('id', 'id'),
|
|
('observed-name', 'name'),
|
|
('city', 'city'),
|
|
('country-code', 'country'),
|
|
('institution-type', 'institutionType'),
|
|
('latitude', 'lat'),
|
|
('longitude', 'lon'),
|
|
('region', 'region'),
|
|
('region-code', 'regionCode'),
|
|
('wikidata-id', 'wikidataId'),
|
|
('isil-code', 'isilCode'),
|
|
('website', 'website'),
|
|
('street-address', 'address'),
|
|
('description', 'description'),
|
|
('rating', 'rating'),
|
|
]
|
|
|
|
def _get_graph() -> Dict[str, Any]:
|
|
driver = get_driver()
|
|
nodes: List[Dict[str, Any]] = []
|
|
edges: List[Dict[str, Any]] = []
|
|
|
|
# Check if database exists
|
|
if not driver.databases.contains(db):
|
|
return {"nodes": [], "edges": [], "nodeCount": 0, "edgeCount": 0, "error": f"Database '{db}' not found"}
|
|
|
|
with driver.transaction(db, TransactionType.READ) as tx:
|
|
# For custodian-observation, fetch key attributes efficiently
|
|
if entity_type == "custodian-observation":
|
|
try:
|
|
# Build a fetch query that gets entities with their key attributes
|
|
# First get all IDs
|
|
id_query = f"match $x isa {entity_type}, has id $id; limit {limit};"
|
|
id_result = tx.query(id_query).resolve()
|
|
|
|
entity_ids = []
|
|
for row in id_result.as_concept_rows():
|
|
concept = row.get('id')
|
|
if concept and hasattr(concept, 'get_value'):
|
|
entity_ids.append(concept.get_value())
|
|
|
|
# For each entity, fetch all its attributes using a single query
|
|
for eid in entity_ids:
|
|
# Escape the ID for the query
|
|
eid_escaped = eid.replace('"', '\\"')
|
|
|
|
# Get all attributes for this entity
|
|
attrs = {"id": eid, "type": entity_type}
|
|
|
|
# Query each key attribute individually (more reliable than trying optional blocks)
|
|
for attr_name, json_key in CUSTODIAN_ATTRS:
|
|
if attr_name == 'id':
|
|
continue # Already have id
|
|
try:
|
|
attr_query = f'match $x isa {entity_type}, has id "{eid_escaped}", has {attr_name} $v; fetch {{ "v": $v }};'
|
|
attr_result = tx.query(attr_query).resolve()
|
|
for doc in attr_result.as_concept_documents():
|
|
val = doc.get('v')
|
|
if val is not None:
|
|
attrs[json_key] = val
|
|
break # Take first value
|
|
except Exception:
|
|
pass # Attribute not present for this entity
|
|
|
|
# Use name as label if available, otherwise use id
|
|
label = attrs.get('name', eid)
|
|
if isinstance(label, str) and len(label) > 40:
|
|
label = label[:37] + "..."
|
|
|
|
nodes.append({
|
|
"id": eid,
|
|
"label": label,
|
|
"type": "entity",
|
|
"entityType": entity_type,
|
|
"attributes": attrs,
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"Error with custodian-observation query: {e}")
|
|
# Fallback to simple ID-only query
|
|
nodes = _fallback_query(tx, entity_type, limit)
|
|
else:
|
|
# For other entity types, use a simpler approach
|
|
nodes = _fallback_query(tx, entity_type, limit)
|
|
|
|
return {
|
|
"nodes": nodes,
|
|
"edges": edges,
|
|
"nodeCount": len(nodes),
|
|
"edgeCount": len(edges),
|
|
}
|
|
|
|
def _fallback_query(tx, entity_type: str, limit: int) -> List[Dict[str, Any]]:
|
|
"""Fallback: simple match query returning just IDs"""
|
|
nodes = []
|
|
try:
|
|
result = tx.query(f"match $x isa {entity_type}, has id $id; limit {limit};").resolve()
|
|
count = 0
|
|
for row in result.as_concept_rows():
|
|
if count >= limit:
|
|
break
|
|
concept = row.get('id')
|
|
if concept and hasattr(concept, 'get_value'):
|
|
node_id = concept.get_value()
|
|
nodes.append({
|
|
"id": node_id,
|
|
"label": node_id[:30],
|
|
"type": "entity",
|
|
"entityType": entity_type,
|
|
"attributes": {"id": node_id, "type": entity_type},
|
|
})
|
|
count += 1
|
|
except Exception as e:
|
|
print(f"Fallback query failed: {e}")
|
|
return nodes
|
|
|
|
try:
|
|
result = await loop.run_in_executor(_executor, _get_graph)
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# RAG-Optimized Search Endpoint
|
|
# ============================================================================
|
|
|
|
class RAGSearchRequest(BaseModel):
|
|
"""Request for RAG-optimized semantic search"""
|
|
query: str = Field(..., description="Natural language search query")
|
|
search_type: str = Field("semantic", description="Search type: semantic, name, type, location, id")
|
|
limit: int = Field(10, ge=1, le=100, description="Maximum results to return")
|
|
include_coordinates: bool = Field(True, description="Include lat/lon in results")
|
|
|
|
|
|
class RAGSearchResult(BaseModel):
|
|
"""Single search result optimized for RAG context"""
|
|
id: str
|
|
name: str
|
|
city: str | None = None
|
|
country: str | None = None
|
|
institution_type: str | None = None
|
|
lat: float | None = None
|
|
lon: float | None = None
|
|
wikidata_id: str | None = None
|
|
description: str | None = None
|
|
relevance_score: float = 1.0
|
|
|
|
|
|
class RAGSearchResponse(BaseModel):
|
|
"""Response for RAG search endpoint"""
|
|
results: List[RAGSearchResult]
|
|
total_count: int
|
|
query_time_ms: float
|
|
search_type: str
|
|
query: str
|
|
|
|
|
|
@app.post("/rag/search", response_model=RAGSearchResponse)
|
|
async def rag_search(request: RAGSearchRequest, database: Optional[str] = None) -> RAGSearchResponse:
|
|
"""Fast search endpoint optimized for RAG context retrieval.
|
|
|
|
Uses efficient batch queries to fetch essential attributes for RAG context.
|
|
Supports multiple search strategies:
|
|
|
|
- **semantic**: Natural language query (parses type + location patterns)
|
|
- **name**: Search by institution name (partial match)
|
|
- **type**: Search by institution type (M=museum, A=archive, L=library, G=gallery)
|
|
- **location**: Search by city name
|
|
- **id**: Search by exact GHCID
|
|
|
|
Examples:
|
|
{"query": "museums in Amsterdam", "search_type": "semantic"}
|
|
{"query": "Rijksmuseum", "search_type": "name"}
|
|
{"query": "M", "search_type": "type"} # All museums
|
|
{"query": "Rotterdam", "search_type": "location"}
|
|
{"query": "NL-NH-AMS-M-RMA", "search_type": "id"}
|
|
"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
def _search() -> List[Dict[str, Any]]:
|
|
driver = get_driver()
|
|
results = []
|
|
|
|
if not driver.databases.contains(db):
|
|
return []
|
|
|
|
query_text = request.query.strip()
|
|
search_type = request.search_type.lower()
|
|
limit = request.limit
|
|
|
|
with driver.transaction(db, TransactionType.READ) as tx:
|
|
# Build the appropriate query based on search type
|
|
# All queries must include 'has id $id' for entity ID extraction
|
|
if search_type == "id":
|
|
# Exact ID match
|
|
escaped_id = escape_typeql_string(query_text)
|
|
match_query = f'match $x isa custodian-observation, has id $id; $id = "{escaped_id}"'
|
|
|
|
elif search_type == "name":
|
|
# Name contains search
|
|
escaped_name = escape_typeql_string(query_text.lower())
|
|
match_query = f'match $x isa custodian-observation, has observed-name $name, has id $id; $name contains "{escaped_name}"'
|
|
|
|
elif search_type == "type":
|
|
# Institution type filter
|
|
type_code = query_text.upper()[:1] # First letter only
|
|
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has id $id'
|
|
|
|
elif search_type == "location":
|
|
# City search
|
|
escaped_city = escape_typeql_string(query_text.lower())
|
|
match_query = f'match $x isa custodian-observation, has city $city, has id $id; $city contains "{escaped_city}"'
|
|
|
|
else: # semantic - parse natural language
|
|
# Extract type and location from query
|
|
query_lower = query_text.lower()
|
|
|
|
# Detect institution type mentions
|
|
type_code = None
|
|
if any(w in query_lower for w in ['museum', 'musea', 'museums']):
|
|
type_code = 'M'
|
|
elif any(w in query_lower for w in ['archive', 'archief', 'archives']):
|
|
type_code = 'A'
|
|
elif any(w in query_lower for w in ['library', 'bibliotheek', 'libraries']):
|
|
type_code = 'L'
|
|
elif any(w in query_lower for w in ['gallery', 'galerie', 'galleries']):
|
|
type_code = 'G'
|
|
|
|
# Extract potential city names (simple heuristic: words after "in" or capitalized words)
|
|
city_pattern = None
|
|
if ' in ' in query_lower:
|
|
parts = query_lower.split(' in ')
|
|
if len(parts) > 1:
|
|
city_pattern = parts[-1].strip()
|
|
|
|
# Build query - need to structure for adding id fetch
|
|
# TypeQL requires: match clause; constraint; has id $id;
|
|
if type_code and city_pattern:
|
|
escaped_city = escape_typeql_string(city_pattern)
|
|
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has city $city, has id $id; $city contains "{escaped_city}"'
|
|
elif type_code:
|
|
match_query = f'match $x isa custodian-observation, has institution-type "{type_code}", has id $id'
|
|
elif city_pattern:
|
|
escaped_city = escape_typeql_string(city_pattern)
|
|
match_query = f'match $x isa custodian-observation, has city $city, has id $id; $city contains "{escaped_city}"'
|
|
else:
|
|
# Fallback: search in name
|
|
escaped_q = escape_typeql_string(query_lower)
|
|
match_query = f'match $x isa custodian-observation, has observed-name $name, has id $id; $name contains "{escaped_q}"'
|
|
|
|
# First get all matching entity IDs
|
|
try:
|
|
# Query already has $id variable, just add limit
|
|
id_query = f"{match_query}; limit {limit};"
|
|
id_result = tx.query(id_query).resolve()
|
|
|
|
entity_ids = []
|
|
for row in id_result.as_concept_rows():
|
|
concept = row.get('id')
|
|
if concept and hasattr(concept, 'get_value'):
|
|
entity_ids.append(concept.get_value())
|
|
|
|
# Now fetch essential attributes for each entity in batch
|
|
# This is faster than per-entity queries for small result sets
|
|
for eid in entity_ids:
|
|
eid_escaped = eid.replace('"', '\\"')
|
|
|
|
# Fetch all key attributes in one query using fetch
|
|
try:
|
|
fetch_query = f'''
|
|
match $x isa custodian-observation, has id "{eid_escaped}";
|
|
$x has observed-name $name;
|
|
fetch {{
|
|
"id": $x.id,
|
|
"name": $name
|
|
}};
|
|
'''
|
|
# Note: This fetch approach may not work in TypeDB 3.x
|
|
# Fall back to individual attribute queries
|
|
except:
|
|
pass
|
|
|
|
# Individual attribute fetching (more reliable)
|
|
attrs = {"id": eid}
|
|
|
|
# Essential attributes for RAG context
|
|
essential_attrs = [
|
|
('observed-name', 'name'),
|
|
('city', 'city'),
|
|
('country-code', 'country'),
|
|
('institution-type', 'institution_type'),
|
|
('wikidata-id', 'wikidata_id'),
|
|
('description', 'description'),
|
|
]
|
|
|
|
if request.include_coordinates:
|
|
essential_attrs.extend([
|
|
('latitude', 'lat'),
|
|
('longitude', 'lon'),
|
|
])
|
|
|
|
for attr_name, json_key in essential_attrs:
|
|
try:
|
|
attr_query = f'match $x isa custodian-observation, has id "{eid_escaped}", has {attr_name} $v; fetch {{ "v": $v }};'
|
|
attr_result = tx.query(attr_query).resolve()
|
|
for doc in attr_result.as_concept_documents():
|
|
val = doc.get('v')
|
|
if val is not None:
|
|
attrs[json_key] = val
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
results.append(attrs)
|
|
|
|
except Exception as e:
|
|
print(f"RAG search query failed: {e}")
|
|
|
|
return results
|
|
|
|
try:
|
|
raw_results = await loop.run_in_executor(_executor, _search)
|
|
|
|
# Convert to response model
|
|
search_results = []
|
|
for r in raw_results:
|
|
search_results.append(RAGSearchResult(
|
|
id=r.get('id', ''),
|
|
name=r.get('name', r.get('id', 'Unknown')),
|
|
city=r.get('city'),
|
|
country=r.get('country'),
|
|
institution_type=r.get('institution_type'),
|
|
lat=r.get('lat'),
|
|
lon=r.get('lon'),
|
|
wikidata_id=r.get('wikidata_id'),
|
|
description=r.get('description'),
|
|
relevance_score=1.0,
|
|
))
|
|
|
|
elapsed_ms = (time.time() - start_time) * 1000
|
|
|
|
return RAGSearchResponse(
|
|
results=search_results,
|
|
total_count=len(search_results),
|
|
query_time_ms=round(elapsed_ms, 2),
|
|
search_type=request.search_type,
|
|
query=request.query,
|
|
)
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# PostgreSQL Sync Functions
|
|
# ============================================================================
|
|
|
|
def escape_typeql_string(value: str) -> str:
|
|
"""Escape a string for TypeQL - handle quotes and special characters"""
|
|
if not value:
|
|
return ""
|
|
# Escape backslashes first, then double quotes
|
|
escaped = value.replace("\\", "\\\\").replace('"', '\\"')
|
|
# Remove or replace problematic characters
|
|
escaped = re.sub(r'[\x00-\x1f\x7f]', '', escaped) # Remove control characters
|
|
return escaped
|
|
|
|
|
|
def get_pg_connection():
|
|
"""Get PostgreSQL connection"""
|
|
if not HAS_PSYCOPG2:
|
|
raise RuntimeError("psycopg2 not installed. Run: pip install psycopg2-binary")
|
|
|
|
return psycopg2.connect(
|
|
host=settings.pg_host,
|
|
port=settings.pg_port,
|
|
database=settings.pg_database,
|
|
user=settings.pg_user,
|
|
password=settings.pg_password,
|
|
)
|
|
|
|
|
|
def sync_postgresql_to_typedb(
|
|
database: str,
|
|
batch_size: int = 100,
|
|
dry_run: bool = False,
|
|
clear_existing: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Sync institutions from PostgreSQL to TypeDB as custodian-observations
|
|
|
|
Args:
|
|
database: TypeDB database name
|
|
batch_size: Number of records to insert per transaction
|
|
dry_run: If True, only count records without inserting
|
|
clear_existing: If True, delete existing custodian-observations first
|
|
|
|
Returns:
|
|
Dict with sync statistics
|
|
"""
|
|
driver = get_driver()
|
|
result = {
|
|
"status": "success",
|
|
"database": database,
|
|
"dry_run": dry_run,
|
|
"total_in_postgresql": 0,
|
|
"existing_in_typedb": 0,
|
|
"inserted": 0,
|
|
"skipped": 0,
|
|
"errors": [],
|
|
}
|
|
|
|
# Get existing IDs from TypeDB
|
|
existing_ids = set()
|
|
try:
|
|
with driver.transaction(database, TransactionType.READ) as tx:
|
|
query_result = tx.query("match $x isa custodian-observation, has id $id;").resolve()
|
|
for row in query_result.as_concept_rows():
|
|
concept = row.get('id')
|
|
if concept and hasattr(concept, 'get_value'):
|
|
existing_ids.add(concept.get_value())
|
|
result["existing_in_typedb"] = len(existing_ids)
|
|
except Exception as e:
|
|
result["errors"].append(f"Error getting existing IDs: {e}")
|
|
|
|
# Clear existing if requested
|
|
if clear_existing and not dry_run:
|
|
try:
|
|
with driver.transaction(database, TransactionType.WRITE) as tx:
|
|
tx.query("match $x isa custodian-observation; delete $x;").resolve()
|
|
tx.commit()
|
|
existing_ids = set()
|
|
result["cleared_existing"] = True
|
|
except Exception as e:
|
|
result["errors"].append(f"Error clearing existing: {e}")
|
|
|
|
# Get institutions from PostgreSQL
|
|
try:
|
|
conn = get_pg_connection()
|
|
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
|
|
# Count total
|
|
cur.execute("SELECT COUNT(*) FROM custodians WHERE ghcid IS NOT NULL")
|
|
result["total_in_postgresql"] = cur.fetchone()[0]
|
|
|
|
if dry_run:
|
|
result["would_insert"] = result["total_in_postgresql"] - len(existing_ids)
|
|
cur.close()
|
|
conn.close()
|
|
return result
|
|
|
|
# Fetch all custodians with all relevant fields
|
|
cur.execute("""
|
|
SELECT ghcid, name, city, region, region_code, country_code, type, type_name,
|
|
lat, lon, wikidata_id, isil_code, viaf_id, google_place_id,
|
|
website, email, phone, street_address, postal_code,
|
|
description, rating, founding_year
|
|
FROM custodians
|
|
WHERE ghcid IS NOT NULL
|
|
ORDER BY ghcid
|
|
""")
|
|
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["errors"].append(f"PostgreSQL error: {e}")
|
|
return result
|
|
|
|
# Build and execute insert queries in batches
|
|
batch = []
|
|
for row in rows:
|
|
ghcid = row['ghcid']
|
|
|
|
# Skip if already exists (unless clear_existing)
|
|
if ghcid in existing_ids:
|
|
result["skipped"] += 1
|
|
continue
|
|
|
|
# Build insert statement for this custodian
|
|
name = escape_typeql_string(row['name'] or "")
|
|
city = escape_typeql_string(row['city'] or "")
|
|
region = escape_typeql_string(row['region'] or "")
|
|
region_code = escape_typeql_string(row['region_code'] or "")
|
|
inst_type = escape_typeql_string(row['type'] or "")
|
|
country_code = escape_typeql_string(row['country_code'] or "")
|
|
|
|
# Coordinates
|
|
lat = row['lat']
|
|
lon = row['lon']
|
|
|
|
# Identifiers
|
|
wikidata_id = escape_typeql_string(row['wikidata_id'] or "")
|
|
isil_code = escape_typeql_string(row['isil_code'] or "")
|
|
viaf_id = escape_typeql_string(row['viaf_id'] or "")
|
|
google_place_id = escape_typeql_string(row['google_place_id'] or "")
|
|
|
|
# Contact info
|
|
website = escape_typeql_string(row['website'] or "")
|
|
email = escape_typeql_string(row['email'] or "")
|
|
phone = escape_typeql_string(row['phone'] or "")
|
|
street_address = escape_typeql_string(row['street_address'] or "")
|
|
postal_code = escape_typeql_string(row['postal_code'] or "")
|
|
|
|
# Descriptive fields
|
|
description = escape_typeql_string(row['description'] or "")
|
|
rating = row['rating']
|
|
founding_year = row['founding_year']
|
|
|
|
# Fallback: Extract country code from GHCID if not in row
|
|
if not country_code and ghcid and len(ghcid) >= 2:
|
|
country_code = ghcid[:2]
|
|
|
|
# Build attributes - only add non-empty ones
|
|
attrs = [f'has id "{ghcid}"']
|
|
if name:
|
|
attrs.append(f'has observed-name "{name}"')
|
|
if city:
|
|
attrs.append(f'has city "{city}"')
|
|
if country_code:
|
|
attrs.append(f'has country-code "{country_code}"')
|
|
if inst_type:
|
|
attrs.append(f'has institution-type "{inst_type}"')
|
|
if region:
|
|
attrs.append(f'has region "{region}"')
|
|
if region_code:
|
|
attrs.append(f'has region-code "{region_code}"')
|
|
|
|
# Add coordinates (TypeDB double values)
|
|
if lat is not None:
|
|
attrs.append(f'has latitude {lat}')
|
|
if lon is not None:
|
|
attrs.append(f'has longitude {lon}')
|
|
|
|
# Add identifiers
|
|
if wikidata_id:
|
|
attrs.append(f'has wikidata-id "{wikidata_id}"')
|
|
if isil_code:
|
|
attrs.append(f'has isil-code "{isil_code}"')
|
|
if viaf_id:
|
|
attrs.append(f'has viaf-id "{viaf_id}"')
|
|
if google_place_id:
|
|
attrs.append(f'has google-place-id "{google_place_id}"')
|
|
|
|
# Add contact info
|
|
if website:
|
|
attrs.append(f'has website "{website}"')
|
|
if email:
|
|
attrs.append(f'has email "{email}"')
|
|
if phone:
|
|
attrs.append(f'has phone "{phone}"')
|
|
if street_address:
|
|
attrs.append(f'has street-address "{street_address}"')
|
|
if postal_code:
|
|
attrs.append(f'has postal-code "{postal_code}"')
|
|
|
|
# Add descriptive fields
|
|
if description:
|
|
attrs.append(f'has description "{description}"')
|
|
if rating is not None:
|
|
attrs.append(f'has rating {rating}')
|
|
if founding_year is not None:
|
|
attrs.append(f'has founding-year {founding_year}')
|
|
|
|
insert_stmt = f'$x{len(batch)} isa custodian-observation, {", ".join(attrs)};'
|
|
batch.append(insert_stmt)
|
|
|
|
# Execute batch when full
|
|
if len(batch) >= batch_size:
|
|
try:
|
|
query = "insert " + " ".join(batch)
|
|
with driver.transaction(database, TransactionType.WRITE) as tx:
|
|
tx.query(query).resolve()
|
|
tx.commit()
|
|
result["inserted"] += len(batch)
|
|
except Exception as e:
|
|
result["errors"].append(f"Batch insert error: {e}")
|
|
batch = []
|
|
|
|
# Insert remaining batch
|
|
if batch:
|
|
try:
|
|
query = "insert " + " ".join(batch)
|
|
with driver.transaction(database, TransactionType.WRITE) as tx:
|
|
tx.query(query).resolve()
|
|
tx.commit()
|
|
result["inserted"] += len(batch)
|
|
except Exception as e:
|
|
result["errors"].append(f"Final batch error: {e}")
|
|
|
|
return result
|
|
|
|
|
|
@app.post("/sync/postgresql")
|
|
async def sync_from_postgresql(
|
|
database: Optional[str] = None,
|
|
batch_size: int = Query(default=100, ge=1, le=1000),
|
|
dry_run: bool = Query(default=False),
|
|
clear_existing: bool = Query(default=False),
|
|
) -> Dict[str, Any]:
|
|
"""Sync institutions from PostgreSQL to TypeDB
|
|
|
|
- database: TypeDB database name (defaults to 'glam')
|
|
- batch_size: Number of records per transaction (1-1000)
|
|
- dry_run: If true, only count records without inserting
|
|
- clear_existing: If true, delete existing custodian-observations first
|
|
"""
|
|
loop = asyncio.get_event_loop()
|
|
db = database or settings.database
|
|
|
|
if not HAS_PSYCOPG2:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail="psycopg2 not installed on server. Run: pip install psycopg2-binary"
|
|
)
|
|
|
|
try:
|
|
result = await loop.run_in_executor(
|
|
_executor,
|
|
sync_postgresql_to_typedb,
|
|
db,
|
|
batch_size,
|
|
dry_run,
|
|
clear_existing,
|
|
)
|
|
return result
|
|
except Exception as e:
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ============================================================================
|
|
# Main Entry Point
|
|
# ============================================================================
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(
|
|
"main:app",
|
|
host=settings.api_host,
|
|
port=settings.api_port,
|
|
reload=True,
|
|
)
|