""" DuckLake API Server FastAPI backend for DuckLake lakehouse with time travel support DuckLake provides: - ACID transactions - Time travel (query historical snapshots) - Schema evolution tracking - Multi-client access via PostgreSQL/MySQL catalog - Open format (Parquet files) Documentation: https://ducklake.select/ """ import os import json import tempfile from datetime import datetime from pathlib import Path from typing import Optional, List, Dict, Any, Union from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, UploadFile, File, Form, Query from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, StreamingResponse from pydantic import BaseModel, Field import duckdb # ============================================================================ # Configuration # ============================================================================ class Settings(BaseModel): """DuckLake server settings""" # DuckLake catalog configuration # For dev: uses local SQLite catalog # For prod: use PostgreSQL catalog for multi-client ACID catalog_type: str = os.getenv("DUCKLAKE_CATALOG_TYPE", "sqlite") catalog_connection: str = os.getenv( "DUCKLAKE_CATALOG_CONNECTION", "ducklake:sqlite:/var/lib/glam/ducklake/catalog/heritage.db" ) # Data storage path (Parquet files) # For dev: local filesystem # For prod: S3 bucket data_path: str = os.getenv("DUCKLAKE_DATA_PATH", "/var/lib/glam/ducklake/data") # Database alias in DuckDB db_alias: str = os.getenv("DUCKLAKE_DB_ALIAS", "heritage") # Web archives database path web_archives_path: str = os.getenv( "DUCKLAKE_WEB_ARCHIVES_PATH", "/var/lib/glam/ducklake/web_archives.duckdb" ) # Server settings host: str = os.getenv("DUCKLAKE_HOST", "0.0.0.0") port: int = int(os.getenv("DUCKLAKE_PORT", "8765")) settings = Settings() # ============================================================================ # Pydantic Models # ============================================================================ class QueryRequest(BaseModel): """SQL query request""" query: str = Field(..., description="SQL query to execute") snapshot_id: Optional[int] = Field(None, description="Optional snapshot ID for time travel") class QueryResponse(BaseModel): """SQL query response""" columns: List[str] rows: List[List[Any]] row_count: int execution_time_ms: float snapshot_id: Optional[int] = None class TableInfo(BaseModel): """Table metadata""" name: str row_count: int column_count: int columns: List[Dict[str, str]] size_bytes: Optional[int] = None class SnapshotInfo(BaseModel): """DuckLake snapshot metadata""" snapshot_id: int created_at: datetime row_count: Optional[int] = None description: Optional[str] = None class SchemaChange(BaseModel): """Schema evolution event""" change_id: int table_name: str change_type: str # ADD_COLUMN, DROP_COLUMN, ALTER_TYPE, etc. column_name: Optional[str] old_type: Optional[str] new_type: Optional[str] changed_at: datetime class StatusResponse(BaseModel): """Server status response""" status: str duckdb_version: str ducklake_available: bool catalog_type: str data_path: str db_alias: str tables: int total_rows: int snapshots: int last_snapshot_at: Optional[datetime] = None uptime_seconds: float class UploadResponse(BaseModel): """Data upload response""" success: bool table_name: str rows_inserted: int snapshot_id: int message: str # ============================================================================ # Global State # ============================================================================ # DuckDB connection (thread-safe for reads, single writer) _conn: Optional[duckdb.DuckDBPyConnection] = None _start_time: datetime = datetime.now() _ducklake_available: bool = False def get_connection() -> duckdb.DuckDBPyConnection: """Get or create DuckDB connection with DuckLake""" global _conn, _ducklake_available if _conn is None: _conn = duckdb.connect(":memory:") # Try to load DuckLake extension # DuckLake is available from the default core repository (not nightly or community) try: _conn.execute("INSTALL ducklake;") _conn.execute("LOAD ducklake;") _ducklake_available = True # Ensure data directory exists Path(settings.data_path).mkdir(parents=True, exist_ok=True) # Attach DuckLake database # Format depends on catalog type if settings.catalog_type == "sqlite": catalog_path = settings.catalog_connection.replace("ducklake:sqlite:", "") Path(catalog_path).parent.mkdir(parents=True, exist_ok=True) attach_sql = f""" ATTACH 'ducklake:sqlite:{catalog_path}' AS {settings.db_alias} (DATA_PATH '{settings.data_path}') """ elif settings.catalog_type == "postgres": attach_sql = f""" ATTACH '{settings.catalog_connection}' AS {settings.db_alias} (DATA_PATH '{settings.data_path}') """ else: raise ValueError(f"Unsupported catalog type: {settings.catalog_type}") _conn.execute(attach_sql) print(f"DuckLake attached: {settings.db_alias} -> {settings.data_path}") # Attach web archives database (read-only) if Path(settings.web_archives_path).exists(): try: _conn.execute(f""" ATTACH '{settings.web_archives_path}' AS web_archives (READ_ONLY) """) # Create views in heritage schema for seamless access _conn.execute(""" CREATE OR REPLACE VIEW heritage.web_archives AS SELECT * FROM web_archives.web_archives """) _conn.execute(""" CREATE OR REPLACE VIEW heritage.web_pages AS SELECT * FROM web_archives.web_pages """) _conn.execute(""" CREATE OR REPLACE VIEW heritage.web_claims AS SELECT * FROM web_archives.web_claims """) print(f"Web archives attached: {settings.web_archives_path}") except Exception as wa_err: print(f"Warning: Could not attach web archives: {wa_err}") else: print(f"Web archives not found: {settings.web_archives_path}") except Exception as e: print(f"DuckLake extension not available: {e}") print("Falling back to standard DuckDB mode") _ducklake_available = False # Create regular database for fallback Path(settings.data_path).mkdir(parents=True, exist_ok=True) db_path = f"{settings.data_path}/heritage.duckdb" _conn = duckdb.connect(db_path) return _conn # ============================================================================ # FastAPI App # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler""" # Startup: Initialize connection get_connection() yield # Shutdown: Close connection global _conn if _conn: _conn.close() _conn = None app = FastAPI( title="DuckLake Heritage API", description="Lakehouse API for heritage institution data with time travel", version="1.0.0", lifespan=lifespan, ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================ # API Endpoints # ============================================================================ @app.get("/", response_model=StatusResponse) async def get_status() -> StatusResponse: """Get server status and statistics""" conn = get_connection() # Get DuckDB version version_result = conn.execute("SELECT version()").fetchone() duckdb_version = version_result[0] if version_result else "unknown" # Get table count and row count tables = 0 total_rows = 0 snapshots = 0 last_snapshot_at = None try: if _ducklake_available: # DuckLake mode - query the attached database tables_result = conn.execute(f""" SELECT COUNT(*) FROM information_schema.tables WHERE table_catalog = '{settings.db_alias}' """).fetchone() tables = tables_result[0] if tables_result else 0 # Get snapshot info try: snap_result = conn.execute(f""" SELECT COUNT(*), MAX(snapshot_time) FROM ducklake_snapshots('{settings.db_alias}') """).fetchone() snapshots = snap_result[0] if snap_result else 0 last_snapshot_at = snap_result[1] if snap_result and snap_result[1] else None except: pass else: # Standard DuckDB mode tables_result = conn.execute(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'main' """).fetchone() tables = tables_result[0] if tables_result else 0 except Exception as e: print(f"Error getting stats: {e}") uptime = (datetime.now() - _start_time).total_seconds() return StatusResponse( status="healthy", duckdb_version=duckdb_version, ducklake_available=_ducklake_available, catalog_type=settings.catalog_type, data_path=settings.data_path, db_alias=settings.db_alias, tables=tables, total_rows=total_rows, snapshots=snapshots, last_snapshot_at=last_snapshot_at, uptime_seconds=uptime, ) @app.post("/query", response_model=QueryResponse) async def execute_query(request: QueryRequest) -> QueryResponse: """Execute a SQL query, optionally at a specific snapshot""" conn = get_connection() start_time = datetime.now() try: query = request.query # Handle time travel queries for DuckLake if request.snapshot_id and _ducklake_available: # Rewrite query to use AT SNAPSHOT # This is a simple implementation - complex queries may need parsing query = query.replace( f"FROM {settings.db_alias}.", f"FROM {settings.db_alias}. AT SNAPSHOT {request.snapshot_id} " ) result = conn.execute(query) columns = [desc[0] for desc in result.description] if result.description else [] rows = result.fetchall() # Convert rows to serializable format serializable_rows = [] for row in rows: serializable_row = [] for val in row: if isinstance(val, datetime): serializable_row.append(val.isoformat()) elif isinstance(val, bytes): serializable_row.append(val.decode('utf-8', errors='replace')) else: serializable_row.append(val) serializable_rows.append(serializable_row) execution_time = (datetime.now() - start_time).total_seconds() * 1000 return QueryResponse( columns=columns, rows=serializable_rows, row_count=len(rows), execution_time_ms=round(execution_time, 2), snapshot_id=request.snapshot_id, ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/tables", response_model=List[TableInfo]) async def list_tables() -> List[TableInfo]: """List all tables with metadata""" conn = get_connection() try: if _ducklake_available: tables_result = conn.execute(f""" SELECT table_name FROM information_schema.tables WHERE table_catalog = '{settings.db_alias}' AND table_type = 'BASE TABLE' """).fetchall() else: tables_result = conn.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'main' AND table_type = 'BASE TABLE' """).fetchall() tables = [] for (table_name,) in tables_result: # Get column info if _ducklake_available: cols = conn.execute(f""" SELECT column_name, data_type FROM information_schema.columns WHERE table_catalog = '{settings.db_alias}' AND table_name = '{table_name}' """).fetchall() # Get row count count_result = conn.execute(f""" SELECT COUNT(*) FROM {settings.db_alias}.{table_name} """).fetchone() else: cols = conn.execute(f""" SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = 'main' AND table_name = '{table_name}' """).fetchall() count_result = conn.execute(f""" SELECT COUNT(*) FROM main.{table_name} """).fetchone() row_count = count_result[0] if count_result else 0 tables.append(TableInfo( name=table_name, row_count=row_count, column_count=len(cols), columns=[{"name": c[0], "type": c[1]} for c in cols], )) return tables except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/snapshots", response_model=List[SnapshotInfo]) async def list_snapshots( table_name: Optional[str] = Query(None, description="Filter by table name") ) -> List[SnapshotInfo]: """List all snapshots (time travel points) - DuckLake only""" if not _ducklake_available: return [] conn = get_connection() try: if table_name: result = conn.execute(f""" SELECT snapshot_id, snapshot_time FROM ducklake_table_snapshots('{settings.db_alias}', '{table_name}') ORDER BY snapshot_id DESC """).fetchall() else: result = conn.execute(f""" SELECT snapshot_id, snapshot_time FROM ducklake_snapshots('{settings.db_alias}') ORDER BY snapshot_id DESC """).fetchall() return [ SnapshotInfo( snapshot_id=row[0], created_at=row[1] if isinstance(row[1], datetime) else datetime.fromisoformat(str(row[1])), ) for row in result ] except Exception as e: # Snapshots may not be available yet print(f"Error listing snapshots: {e}") return [] @app.get("/snapshots/{snapshot_id}") async def get_snapshot_diff( snapshot_id: int, table_name: Optional[str] = Query(None, description="Filter by table name") ) -> Dict[str, Any]: """Get changes between snapshots - DuckLake only""" if not _ducklake_available: raise HTTPException(status_code=501, detail="DuckLake not available - time travel disabled") conn = get_connection() try: if table_name: result = conn.execute(f""" SELECT * FROM ducklake_table_changes('{settings.db_alias}', '{table_name}') WHERE snapshot_id = {snapshot_id} """).fetchall() else: result = conn.execute(f""" SELECT * FROM ducklake_changes('{settings.db_alias}') WHERE snapshot_id = {snapshot_id} """).fetchall() return { "snapshot_id": snapshot_id, "changes": [dict(zip(["table", "operation", "count"], row)) for row in result] } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/schema-evolution", response_model=List[SchemaChange]) async def get_schema_evolution( table_name: Optional[str] = Query(None, description="Filter by table name") ) -> List[SchemaChange]: """Get schema evolution history - DuckLake only""" if not _ducklake_available: return [] conn = get_connection() try: if table_name: result = conn.execute(f""" SELECT * FROM ducklake_table_schema_changes('{settings.db_alias}', '{table_name}') ORDER BY change_id DESC """).fetchall() else: result = conn.execute(f""" SELECT * FROM ducklake_schema_changes('{settings.db_alias}') ORDER BY change_id DESC """).fetchall() changes = [] for row in result: changes.append(SchemaChange( change_id=row[0], table_name=row[1], change_type=row[2], column_name=row[3] if len(row) > 3 else None, old_type=row[4] if len(row) > 4 else None, new_type=row[5] if len(row) > 5 else None, changed_at=row[6] if len(row) > 6 else datetime.now(), )) return changes except Exception as e: print(f"Error getting schema evolution: {e}") return [] @app.post("/upload", response_model=UploadResponse) async def upload_data( file: UploadFile = File(...), table_name: str = Form(...), mode: str = Form("append"), # append, replace, or create ) -> UploadResponse: """Upload data file (JSON, CSV, Parquet) to a table""" conn = get_connection() # Determine file type filename = file.filename or "data" suffix = Path(filename).suffix.lower() try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp: content = await file.read() tmp.write(content) tmp_path = tmp.name # Determine qualified table name if _ducklake_available: qualified_name = f"{settings.db_alias}.{table_name}" else: qualified_name = f"main.{table_name}" # Read file based on type if suffix == ".json": read_func = f"read_json_auto('{tmp_path}')" elif suffix == ".csv": read_func = f"read_csv_auto('{tmp_path}')" elif suffix == ".parquet": read_func = f"read_parquet('{tmp_path}')" else: raise HTTPException(status_code=400, detail=f"Unsupported file type: {suffix}") # Execute insert/create if mode == "replace": conn.execute(f"DROP TABLE IF EXISTS {qualified_name}") conn.execute(f"CREATE TABLE {qualified_name} AS SELECT * FROM {read_func}") elif mode == "create": conn.execute(f"CREATE TABLE {qualified_name} AS SELECT * FROM {read_func}") else: # append try: conn.execute(f"INSERT INTO {qualified_name} SELECT * FROM {read_func}") except: # Table doesn't exist, create it conn.execute(f"CREATE TABLE {qualified_name} AS SELECT * FROM {read_func}") # Get row count count_result = conn.execute(f"SELECT COUNT(*) FROM {qualified_name}").fetchone() row_count = count_result[0] if count_result else 0 # Get current snapshot ID if DuckLake snapshot_id = 0 if _ducklake_available: try: snap_result = conn.execute(f""" SELECT MAX(snapshot_id) FROM ducklake_snapshots('{settings.db_alias}') """).fetchone() snapshot_id = snap_result[0] if snap_result and snap_result[0] else 0 except: pass # Cleanup temp file os.unlink(tmp_path) return UploadResponse( success=True, table_name=table_name, rows_inserted=row_count, snapshot_id=snapshot_id, message=f"Successfully loaded data into {table_name}", ) except Exception as e: # Cleanup temp file on error if 'tmp_path' in locals(): try: os.unlink(tmp_path) except: pass raise HTTPException(status_code=400, detail=str(e)) @app.delete("/tables/{table_name}") async def drop_table(table_name: str) -> Dict[str, str]: """Drop a table""" conn = get_connection() try: if _ducklake_available: conn.execute(f"DROP TABLE IF EXISTS {settings.db_alias}.{table_name}") else: conn.execute(f"DROP TABLE IF EXISTS main.{table_name}") return {"status": "success", "message": f"Table {table_name} dropped"} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/export/{table_name}") async def export_table( table_name: str, format: str = Query("json", description="Export format: json, csv, parquet"), snapshot_id: Optional[int] = Query(None, description="Snapshot ID for time travel export"), ) -> StreamingResponse: """Export table data in various formats""" conn = get_connection() try: # Build query if _ducklake_available: qualified_name = f"{settings.db_alias}.{table_name}" if snapshot_id: query = f"SELECT * FROM {qualified_name} AT SNAPSHOT {snapshot_id}" else: query = f"SELECT * FROM {qualified_name}" else: query = f"SELECT * FROM main.{table_name}" result = conn.execute(query) columns = [desc[0] for desc in result.description] rows = result.fetchall() if format == "json": # Convert to list of dicts data = [dict(zip(columns, row)) for row in rows] content = json.dumps(data, indent=2, default=str) media_type = "application/json" filename = f"{table_name}.json" elif format == "csv": import csv import io output = io.StringIO() writer = csv.writer(output) writer.writerow(columns) writer.writerows(rows) content = output.getvalue() media_type = "text/csv" filename = f"{table_name}.csv" elif format == "parquet": # For parquet, write to temp file and stream back with tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") as tmp: conn.execute(f"COPY ({query}) TO '{tmp.name}' (FORMAT PARQUET)") tmp.flush() with open(tmp.name, "rb") as f: content = f.read() os.unlink(tmp.name) media_type = "application/octet-stream" filename = f"{table_name}.parquet" return StreamingResponse( iter([content]), media_type=media_type, headers={"Content-Disposition": f"attachment; filename={filename}"} ) else: raise HTTPException(status_code=400, detail=f"Unsupported format: {format}") return StreamingResponse( iter([content]), media_type=media_type, headers={"Content-Disposition": f"attachment; filename={filename}"} ) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host=settings.host, port=settings.port, reload=True, )