""" PostgreSQL REST API for Heritage Custodian Data FastAPI backend providing SQL query interface for bronhouder.nl Endpoints: - GET / - Health check and statistics - POST /query - Execute SQL query - GET /tables - List all tables with metadata - GET /schema/:table - Get table schema """ import os import json from datetime import datetime from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field import asyncpg # ============================================================================ # Configuration # ============================================================================ class Settings(BaseModel): """PostgreSQL server settings""" host: str = os.getenv("POSTGRES_HOST", "localhost") port: int = int(os.getenv("POSTGRES_PORT", "5432")) database: str = os.getenv("POSTGRES_DB", "glam") user: str = os.getenv("POSTGRES_USER", "glam_api") password: str = os.getenv("POSTGRES_PASSWORD", "glam_secret_2025") # Server settings api_host: str = os.getenv("API_HOST", "0.0.0.0") api_port: int = int(os.getenv("API_PORT", "8001")) settings = Settings() # ============================================================================ # Pydantic Models # ============================================================================ class QueryRequest(BaseModel): """SQL query request""" sql: str = Field(..., description="SQL query to execute") params: Optional[List[Any]] = Field(None, description="Query parameters") class QueryResponse(BaseModel): """SQL query response""" columns: List[str] rows: List[List[Any]] row_count: int execution_time_ms: float class TableInfo(BaseModel): """Table metadata""" name: str schema_name: str row_count: int column_count: int size_bytes: Optional[int] = None class ColumnInfo(BaseModel): """Column metadata""" name: str data_type: str is_nullable: bool default_value: Optional[str] = None description: Optional[str] = None class StatusResponse(BaseModel): """Server status response""" status: str database: str tables: int total_rows: int uptime_seconds: float postgres_version: str # ============================================================================ # Global State # ============================================================================ _pool: Optional[asyncpg.Pool] = None _start_time: datetime = datetime.now() async def get_pool() -> asyncpg.Pool: """Get or create connection pool""" global _pool if _pool is None: _pool = await asyncpg.create_pool( host=settings.host, port=settings.port, database=settings.database, user=settings.user, password=settings.password, min_size=2, max_size=10, ) return _pool # ============================================================================ # FastAPI App # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler""" # Startup: Initialize connection pool await get_pool() yield # Shutdown: Close pool global _pool if _pool: await _pool.close() _pool = None app = FastAPI( title="PostgreSQL Heritage API", description="REST API for heritage institution SQL queries", version="1.0.0", lifespan=lifespan, ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Configure for production allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ============================================================================ # Helper Functions # ============================================================================ def serialize_value(val: Any) -> Any: """Convert PostgreSQL values to JSON-serializable format""" if val is None: return None elif isinstance(val, datetime): return val.isoformat() elif isinstance(val, (dict, list)): return val elif isinstance(val, bytes): return val.decode('utf-8', errors='replace') else: return val # ============================================================================ # API Endpoints # ============================================================================ @app.get("/", response_model=StatusResponse) async def get_status() -> StatusResponse: """Get server status and statistics""" pool = await get_pool() async with pool.acquire() as conn: # Get PostgreSQL version version = await conn.fetchval("SELECT version()") # Get table count tables = await conn.fetchval(""" SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = 'public' AND table_type = 'BASE TABLE' """) # Get total row count (approximate) total_rows = await conn.fetchval(""" SELECT COALESCE(SUM(n_tup_ins - n_tup_del), 0)::bigint FROM pg_stat_user_tables """) uptime = (datetime.now() - _start_time).total_seconds() return StatusResponse( status="healthy", database=settings.database, tables=tables or 0, total_rows=total_rows or 0, uptime_seconds=uptime, postgres_version=version.split(',')[0] if version else "unknown", ) @app.post("/query", response_model=QueryResponse) async def execute_query(request: QueryRequest) -> QueryResponse: """Execute a SQL query (read-only)""" pool = await get_pool() # Security: Only allow SELECT queries for now sql_upper = request.sql.strip().upper() if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"): raise HTTPException( status_code=403, detail="Only SELECT queries are allowed. Use WITH...SELECT for CTEs." ) start_time = datetime.now() try: async with pool.acquire() as conn: if request.params: result = await conn.fetch(request.sql, *request.params) else: result = await conn.fetch(request.sql) if result: columns = list(result[0].keys()) rows = [[serialize_value(row[col]) for col in columns] for row in result] else: columns = [] rows = [] execution_time = (datetime.now() - start_time).total_seconds() * 1000 return QueryResponse( columns=columns, rows=rows, row_count=len(rows), execution_time_ms=round(execution_time, 2), ) except asyncpg.PostgresError as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/tables", response_model=List[TableInfo]) async def list_tables() -> List[TableInfo]: """List all tables with metadata""" pool = await get_pool() async with pool.acquire() as conn: tables = await conn.fetch(""" SELECT t.table_name, t.table_schema, (SELECT COUNT(*) FROM information_schema.columns c WHERE c.table_name = t.table_name AND c.table_schema = t.table_schema) as column_count, COALESCE(s.n_tup_ins - s.n_tup_del, 0) as row_count, pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)) as size_bytes FROM information_schema.tables t LEFT JOIN pg_stat_user_tables s ON s.schemaname = t.table_schema AND s.relname = t.table_name WHERE t.table_schema = 'public' AND t.table_type = 'BASE TABLE' ORDER BY t.table_name """) return [ TableInfo( name=row['table_name'], schema_name=row['table_schema'], column_count=row['column_count'], row_count=row['row_count'] or 0, size_bytes=row['size_bytes'], ) for row in tables ] @app.get("/schema/{table_name}", response_model=List[ColumnInfo]) async def get_table_schema(table_name: str) -> List[ColumnInfo]: """Get schema for a specific table""" pool = await get_pool() async with pool.acquire() as conn: # Check table exists exists = await conn.fetchval(""" SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1 ) """, table_name) if not exists: raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found") columns = await conn.fetch(""" SELECT column_name, data_type, is_nullable, column_default, col_description( (quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass, ordinal_position ) as description FROM information_schema.columns WHERE table_schema = 'public' AND table_name = $1 ORDER BY ordinal_position """, table_name) return [ ColumnInfo( name=col['column_name'], data_type=col['data_type'], is_nullable=col['is_nullable'] == 'YES', default_value=col['column_default'], description=col['description'], ) for col in columns ] @app.get("/stats") async def get_database_stats() -> Dict[str, Any]: """Get detailed database statistics""" pool = await get_pool() async with pool.acquire() as conn: # Database size db_size = await conn.fetchval(""" SELECT pg_size_pretty(pg_database_size($1)) """, settings.database) # Table sizes table_sizes = await conn.fetch(""" SELECT relname as table_name, pg_size_pretty(pg_total_relation_size(relid)) as total_size, n_live_tup as row_count FROM pg_stat_user_tables ORDER BY pg_total_relation_size(relid) DESC LIMIT 10 """) return { "database": settings.database, "size": db_size, "largest_tables": [ { "name": t['table_name'], "size": t['total_size'], "rows": t['row_count'] } for t in table_sizes ] } # ============================================================================ # LinkML Schema API Endpoints # ============================================================================ class LinkMLSchemaVersion(BaseModel): """LinkML schema version info""" id: int version: str schema_name: str description: Optional[str] is_current: bool created_at: str class_count: int slot_count: int enum_count: int class LinkMLClass(BaseModel): """LinkML class definition""" id: int class_name: str class_id: str title: Optional[str] is_a: Optional[str] class_uri: Optional[str] abstract: bool description: Optional[str] exact_mappings: List[str] close_mappings: List[str] broad_mappings: List[str] narrow_mappings: List[str] class LinkMLSlot(BaseModel): """LinkML slot definition""" id: int slot_name: str slot_id: str range: Optional[str] slot_uri: Optional[str] required: bool multivalued: bool identifier: bool description: Optional[str] class LinkMLEnum(BaseModel): """LinkML enum definition""" id: int enum_name: str enum_id: str title: Optional[str] description: Optional[str] values: List[Dict[str, Any]] class LinkMLSearchResult(BaseModel): """LinkML search result""" element_type: str element_name: str element_uri: Optional[str] description: Optional[str] rank: float @app.get("/linkml/versions", response_model=List[LinkMLSchemaVersion]) async def list_linkml_versions() -> List[LinkMLSchemaVersion]: """List all LinkML schema versions with statistics""" pool = await get_pool() async with pool.acquire() as conn: versions = await conn.fetch(""" SELECT * FROM linkml_schema_stats ORDER BY created_at DESC """) return [ LinkMLSchemaVersion( id=v['id'] if 'id' in v.keys() else 0, version=v['version'], schema_name=v['schema_name'], description=None, is_current=v['is_current'], created_at=v['created_at'].isoformat() if v['created_at'] else "", class_count=v['class_count'] or 0, slot_count=v['slot_count'] or 0, enum_count=v['enum_count'] or 0, ) for v in versions ] @app.get("/linkml/classes", response_model=List[LinkMLClass]) async def list_linkml_classes( version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> List[LinkMLClass]: """List all classes in the LinkML schema""" pool = await get_pool() async with pool.acquire() as conn: if version: classes = await conn.fetch(""" SELECT c.* FROM linkml_classes c JOIN linkml_schema_versions v ON c.version_id = v.id WHERE v.version = $1 ORDER BY c.class_name """, version) else: classes = await conn.fetch(""" SELECT * FROM linkml_current_classes ORDER BY class_name """) return [ LinkMLClass( id=c['id'], class_name=c['class_name'], class_id=c['class_id'], title=c['title'], is_a=c['is_a'], class_uri=c['class_uri'], abstract=c['abstract'] or False, description=c['description'], exact_mappings=c['exact_mappings'] or [], close_mappings=c['close_mappings'] or [], broad_mappings=c['broad_mappings'] or [], narrow_mappings=c['narrow_mappings'] or [], ) for c in classes ] @app.get("/linkml/classes/{class_name}") async def get_linkml_class( class_name: str, version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> Dict[str, Any]: """Get detailed information about a specific class including slots""" pool = await get_pool() async with pool.acquire() as conn: # Get class if version: cls = await conn.fetchrow(""" SELECT c.* FROM linkml_classes c JOIN linkml_schema_versions v ON c.version_id = v.id WHERE v.version = $1 AND c.class_name = $2 """, version, class_name) else: cls = await conn.fetchrow(""" SELECT * FROM linkml_current_classes WHERE class_name = $1 """, class_name) if not cls: raise HTTPException(status_code=404, detail=f"Class '{class_name}' not found") # Get slots for this class slots = await conn.fetch(""" SELECT s.slot_name, s.range, s.slot_uri, s.required, s.multivalued, s.identifier, s.description, cs.slot_usage FROM linkml_class_slots cs JOIN linkml_slots s ON cs.slot_id = s.id WHERE cs.class_id = $1 ORDER BY cs.slot_order """, cls['id']) # Get inheritance chain inheritance = await conn.fetch(""" SELECT * FROM get_class_inheritance($1) """, class_name) return { "id": cls['id'], "class_name": cls['class_name'], "class_id": cls['class_id'], "title": cls['title'], "is_a": cls['is_a'], "class_uri": cls['class_uri'], "abstract": cls['abstract'], "description": cls['description'], "comments": cls['comments'], "exact_mappings": cls['exact_mappings'] or [], "close_mappings": cls['close_mappings'] or [], "broad_mappings": cls['broad_mappings'] or [], "narrow_mappings": cls['narrow_mappings'] or [], "slots": [ { "slot_name": s['slot_name'], "range": s['range'], "slot_uri": s['slot_uri'], "required": s['required'], "multivalued": s['multivalued'], "identifier": s['identifier'], "description": s['description'], "slot_usage": json.loads(s['slot_usage']) if s['slot_usage'] else None, } for s in slots ], "inheritance": [ { "level": i['level'], "class_name": i['class_name'], "class_uri": i['class_uri'], "abstract": i['abstract'], } for i in inheritance ], } @app.get("/linkml/slots", response_model=List[LinkMLSlot]) async def list_linkml_slots( version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> List[LinkMLSlot]: """List all slots in the LinkML schema""" pool = await get_pool() async with pool.acquire() as conn: if version: slots = await conn.fetch(""" SELECT s.* FROM linkml_slots s JOIN linkml_schema_versions v ON s.version_id = v.id WHERE v.version = $1 ORDER BY s.slot_name """, version) else: slots = await conn.fetch(""" SELECT * FROM linkml_current_slots ORDER BY slot_name """) return [ LinkMLSlot( id=s['id'], slot_name=s['slot_name'], slot_id=s['slot_id'], range=s['range'], slot_uri=s['slot_uri'], required=s['required'] or False, multivalued=s['multivalued'] or False, identifier=s['identifier'] or False, description=s['description'], ) for s in slots ] @app.get("/linkml/slots/{slot_name}") async def get_linkml_slot( slot_name: str, version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> Dict[str, Any]: """Get detailed information about a specific slot""" pool = await get_pool() async with pool.acquire() as conn: if version: slot = await conn.fetchrow(""" SELECT s.* FROM linkml_slots s JOIN linkml_schema_versions v ON s.version_id = v.id WHERE v.version = $1 AND s.slot_name = $2 """, version, slot_name) else: slot = await conn.fetchrow(""" SELECT * FROM linkml_current_slots WHERE slot_name = $1 """, slot_name) if not slot: raise HTTPException(status_code=404, detail=f"Slot '{slot_name}' not found") # Get classes that use this slot classes = await conn.fetch(""" SELECT c.class_name, cs.slot_usage FROM linkml_class_slots cs JOIN linkml_classes c ON cs.class_id = c.id WHERE cs.slot_id = $1 ORDER BY c.class_name """, slot['id']) return { "id": slot['id'], "slot_name": slot['slot_name'], "slot_id": slot['slot_id'], "range": slot['range'], "slot_uri": slot['slot_uri'], "required": slot['required'], "multivalued": slot['multivalued'], "identifier": slot['identifier'], "inlined": slot['inlined'], "inlined_as_list": slot['inlined_as_list'], "pattern": slot['pattern'], "description": slot['description'], "comments": slot['comments'], "examples": json.loads(slot['examples']) if slot['examples'] else None, "used_by_classes": [ { "class_name": c['class_name'], "slot_usage": json.loads(c['slot_usage']) if c['slot_usage'] else None, } for c in classes ], } @app.get("/linkml/enums", response_model=List[LinkMLEnum]) async def list_linkml_enums( version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> List[LinkMLEnum]: """List all enums in the LinkML schema with their values""" pool = await get_pool() async with pool.acquire() as conn: if version: enums = await conn.fetch(""" SELECT e.* FROM linkml_enums e JOIN linkml_schema_versions v ON e.version_id = v.id WHERE v.version = $1 ORDER BY e.enum_name """, version) else: enums = await conn.fetch(""" SELECT * FROM linkml_current_enums ORDER BY enum_name """) result = [] for e in enums: # Get values for this enum values = await conn.fetch(""" SELECT value_name, meaning, description, comments FROM linkml_enum_values WHERE enum_id = $1 ORDER BY value_order """, e['id']) result.append(LinkMLEnum( id=e['id'], enum_name=e['enum_name'], enum_id=e['enum_id'], title=e['title'], description=e['description'], values=[ { "name": v['value_name'], "meaning": v['meaning'], "description": v['description'], "comments": v['comments'], } for v in values ], )) return result @app.get("/linkml/enums/{enum_name}") async def get_linkml_enum( enum_name: str, version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> Dict[str, Any]: """Get detailed information about a specific enum""" pool = await get_pool() async with pool.acquire() as conn: if version: enum = await conn.fetchrow(""" SELECT e.* FROM linkml_enums e JOIN linkml_schema_versions v ON e.version_id = v.id WHERE v.version = $1 AND e.enum_name = $2 """, version, enum_name) else: enum = await conn.fetchrow(""" SELECT * FROM linkml_current_enums WHERE enum_name = $1 """, enum_name) if not enum: raise HTTPException(status_code=404, detail=f"Enum '{enum_name}' not found") # Get values values = await conn.fetch(""" SELECT * FROM linkml_enum_values WHERE enum_id = $1 ORDER BY value_order """, enum['id']) return { "id": enum['id'], "enum_name": enum['enum_name'], "enum_id": enum['enum_id'], "title": enum['title'], "description": enum['description'], "comments": enum['comments'], "values": [ { "name": v['value_name'], "meaning": v['meaning'], "description": v['description'], "comments": v['comments'], } for v in values ], } @app.get("/linkml/search", response_model=List[LinkMLSearchResult]) async def search_linkml_schema( q: str = Query(..., description="Search query"), version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> List[LinkMLSearchResult]: """Search across all LinkML schema elements (classes, slots, enums)""" pool = await get_pool() async with pool.acquire() as conn: # Get version ID if version: version_id = await conn.fetchval(""" SELECT id FROM linkml_schema_versions WHERE version = $1 """, version) else: version_id = await conn.fetchval(""" SELECT id FROM linkml_schema_versions WHERE is_current = TRUE """) if not version_id: raise HTTPException(status_code=404, detail="Schema version not found") # Search using the function results = await conn.fetch(""" SELECT * FROM search_linkml_schema($1, $2) LIMIT 50 """, q, version_id) return [ LinkMLSearchResult( element_type=r['element_type'], element_name=r['element_name'], element_uri=r['element_uri'], description=r['description'], rank=float(r['rank']) if r['rank'] else 0.0, ) for r in results ] @app.get("/linkml/hierarchy") async def get_class_hierarchy( version: Optional[str] = Query(None, description="Schema version (default: current)") ) -> List[Dict[str, Any]]: """Get the complete class hierarchy as a tree structure""" pool = await get_pool() async with pool.acquire() as conn: if version: classes = await conn.fetch(""" SELECT c.class_name, c.is_a, c.class_uri, c.abstract, c.title, c.description FROM linkml_classes c JOIN linkml_schema_versions v ON c.version_id = v.id WHERE v.version = $1 ORDER BY c.class_name """, version) else: classes = await conn.fetch(""" SELECT class_name, is_a, class_uri, abstract, title, description FROM linkml_current_classes ORDER BY class_name """) # Build tree structure class_map = {c['class_name']: dict(c) for c in classes} children_map: Dict[str, List[str]] = {} for c in classes: parent = c['is_a'] if parent: if parent not in children_map: children_map[parent] = [] children_map[parent].append(c['class_name']) def build_tree(class_name: str) -> Dict[str, Any]: node = class_map.get(class_name, {"class_name": class_name}) children = children_map.get(class_name, []) return { "class_name": class_name, "class_uri": node.get('class_uri'), "abstract": node.get('abstract', False), "title": node.get('title'), "children": [build_tree(child) for child in sorted(children)], } # Find root classes (no parent or parent not in schema) roots = [ c['class_name'] for c in classes if not c['is_a'] or c['is_a'] not in class_map ] return [build_tree(root) for root in sorted(roots)] # ============================================================================ # Boundary/GIS Endpoints # ============================================================================ # PostGIS-backed endpoints for administrative boundaries and service areas class BoundaryInfo(BaseModel): """Administrative boundary metadata""" id: int code: str name: str name_local: Optional[str] = None admin_level: int country_code: str parent_code: Optional[str] = None area_km2: Optional[float] = None source: str class PointLookupResult(BaseModel): """Result of point-in-polygon lookup""" country: Optional[BoundaryInfo] = None admin1: Optional[BoundaryInfo] = None admin2: Optional[BoundaryInfo] = None geonames_settlement: Optional[Dict[str, Any]] = None class GeoJSONFeature(BaseModel): """GeoJSON Feature""" type: str = "Feature" properties: Dict[str, Any] geometry: Optional[Dict[str, Any]] = None class GeoJSONFeatureCollection(BaseModel): """GeoJSON FeatureCollection""" type: str = "FeatureCollection" features: List[GeoJSONFeature] @app.get("/boundaries/countries") async def list_countries() -> List[Dict[str, Any]]: """List all countries with boundary data""" pool = await get_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT bc.id, bc.iso_a2 as code, bc.country_name as name, bc.country_name_local as name_local, bc.area_km2, bds.source_code as source, ST_AsGeoJSON(bc.centroid)::json as centroid FROM boundary_countries bc LEFT JOIN boundary_data_sources bds ON bc.source_id = bds.id WHERE bc.valid_to IS NULL ORDER BY bc.country_name """) return [ { "id": r['id'], "code": r['code'].strip(), "name": r['name'], "name_local": r['name_local'], "area_km2": float(r['area_km2']) if r['area_km2'] else None, "source": r['source'], "centroid": r['centroid'], } for r in rows ] @app.get("/boundaries/countries/{country_code}/admin1") async def list_admin1(country_code: str) -> List[Dict[str, Any]]: """List admin level 1 divisions (provinces/states) for a country""" pool = await get_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT ba.id, ba.admin1_code as code, ba.admin1_name as name, ba.admin1_name_local as name_local, ba.iso_3166_2 as iso_code, ba.area_km2, bc.iso_a2 as country_code, bds.source_code as source, ST_AsGeoJSON(ba.centroid)::json as centroid FROM boundary_admin1 ba JOIN boundary_countries bc ON ba.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba.source_id = bds.id WHERE bc.iso_a2 = $1 AND ba.valid_to IS NULL ORDER BY ba.admin1_name """, country_code.upper()) return [ { "id": r['id'], "code": r['code'], "name": r['name'], "name_local": r['name_local'], "iso_code": r['iso_code'], "country_code": r['country_code'].strip() if r['country_code'] else None, "area_km2": float(r['area_km2']) if r['area_km2'] else None, "source": r['source'], "centroid": r['centroid'], } for r in rows ] @app.get("/boundaries/countries/{country_code}/admin2") async def list_admin2( country_code: str, admin1_code: Optional[str] = Query(None, description="Filter by admin1 code") ) -> List[Dict[str, Any]]: """List admin level 2 divisions (municipalities/counties) for a country""" pool = await get_pool() async with pool.acquire() as conn: if admin1_code: rows = await conn.fetch(""" SELECT ba2.id, ba2.admin2_code as code, ba2.admin2_name as name, ba2.admin2_name_local as name_local, ba1.admin1_code, ba1.admin1_name, bc.iso_a2 as country_code, ba2.area_km2, bds.source_code as source, ST_AsGeoJSON(ba2.centroid)::json as centroid FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id WHERE bc.iso_a2 = $1 AND ba1.admin1_code = $2 AND ba2.valid_to IS NULL ORDER BY ba2.admin2_name """, country_code.upper(), admin1_code) else: rows = await conn.fetch(""" SELECT ba2.id, ba2.admin2_code as code, ba2.admin2_name as name, ba2.admin2_name_local as name_local, ba1.admin1_code, ba1.admin1_name, bc.iso_a2 as country_code, ba2.area_km2, bds.source_code as source, ST_AsGeoJSON(ba2.centroid)::json as centroid FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id WHERE bc.iso_a2 = $1 AND ba2.valid_to IS NULL ORDER BY ba2.admin2_name """, country_code.upper()) return [ { "id": r['id'], "code": r['code'], "name": r['name'], "name_local": r['name_local'], "admin1_code": r['admin1_code'], "admin1_name": r['admin1_name'], "country_code": r['country_code'].strip() if r['country_code'] else None, "area_km2": float(r['area_km2']) if r['area_km2'] else None, "source": r['source'], "centroid": r['centroid'], } for r in rows ] @app.get("/boundaries/lookup") async def lookup_point( lat: float = Query(..., description="Latitude (WGS84)"), lon: float = Query(..., description="Longitude (WGS84)"), country_code: Optional[str] = Query(None, description="ISO country code to filter results") ) -> PointLookupResult: """Find administrative boundaries containing a point""" pool = await get_pool() async with pool.acquire() as conn: # Use the find_admin_for_point function # Returns rows with: admin_level, admin_code, admin_name, iso_code, geonames_id if country_code: results = await conn.fetch(""" SELECT * FROM find_admin_for_point($1, $2, $3) """, lon, lat, country_code.upper()) else: results = await conn.fetch(""" SELECT * FROM find_admin_for_point($1, $2) """, lon, lat) # Parse results by admin level admin1_info = None admin2_info = None for row in results: level = row['admin_level'] if level == 1 and admin1_info is None: admin1_info = BoundaryInfo( id=0, code=row['admin_code'], name=row['admin_name'], admin_level=1, country_code=country_code or "XX", source="PostGIS" ) elif level == 2 and admin2_info is None: admin2_info = BoundaryInfo( id=0, code=row['admin_code'], name=row['admin_name'], admin_level=2, country_code=country_code or "XX", source="PostGIS" ) # Also try to determine country from the point country_info = None country_row = await conn.fetchrow(""" SELECT bc.id, bc.iso_a2, bc.country_name FROM boundary_countries bc WHERE ST_Contains(bc.geom, ST_SetSRID(ST_Point($1, $2), 4326)) AND bc.valid_to IS NULL LIMIT 1 """, lon, lat) if country_row: country_info = BoundaryInfo( id=country_row['id'], code=country_row['iso_a2'].strip(), name=country_row['country_name'], admin_level=0, country_code=country_row['iso_a2'].strip(), source="PostGIS" ) # Update admin boundaries with proper country code if admin1_info: admin1_info.country_code = country_row['iso_a2'].strip() if admin2_info: admin2_info.country_code = country_row['iso_a2'].strip() return PointLookupResult( country=country_info, admin1=admin1_info, admin2=admin2_info, ) @app.get("/boundaries/admin2/{admin2_id}/geojson") async def get_admin2_geojson(admin2_id: int) -> GeoJSONFeature: """Get GeoJSON geometry for a specific admin2 boundary""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT ba2.id, ba2.admin2_code as code, ba2.admin2_name as name, ba2.admin2_name_local as name_local, bc.iso_a2 as country_code, ba1.admin1_code, ba2.area_km2, bds.source_code as source, ST_AsGeoJSON(ba2.geom, 6)::json as geometry FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id WHERE ba2.id = $1 """, admin2_id) if not row: raise HTTPException(status_code=404, detail=f"Admin2 boundary {admin2_id} not found") return GeoJSONFeature( properties={ "id": row['id'], "code": row['code'], "name": row['name'], "name_local": row['name_local'], "country_code": row['country_code'].strip() if row['country_code'] else None, "admin1_code": row['admin1_code'], "area_km2": float(row['area_km2']) if row['area_km2'] else None, "source": row['source'], }, geometry=row['geometry'] ) @app.get("/boundaries/countries/{country_code}/admin2/geojson") async def get_country_admin2_geojson( country_code: str, admin1_code: Optional[str] = Query(None, description="Filter by admin1 code"), simplify: float = Query(0.001, description="Geometry simplification tolerance (degrees)") ) -> GeoJSONFeatureCollection: """Get GeoJSON FeatureCollection of all admin2 boundaries for a country""" pool = await get_pool() async with pool.acquire() as conn: if admin1_code: rows = await conn.fetch(""" SELECT ba2.id, ba2.admin2_code as code, ba2.admin2_name as name, ba2.admin2_name_local as name_local, ba1.admin1_code, ba1.admin1_name, ba2.area_km2, bds.source_code as source, ST_AsGeoJSON(ST_Simplify(ba2.geom, $3), 6)::json as geometry FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id WHERE bc.iso_a2 = $1 AND ba1.admin1_code = $2 AND ba2.valid_to IS NULL ORDER BY ba2.admin2_name """, country_code.upper(), admin1_code, simplify) else: rows = await conn.fetch(""" SELECT ba2.id, ba2.admin2_code as code, ba2.admin2_name as name, ba2.admin2_name_local as name_local, ba1.admin1_code, ba1.admin1_name, ba2.area_km2, bds.source_code as source, ST_AsGeoJSON(ST_Simplify(ba2.geom, $2), 6)::json as geometry FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id WHERE bc.iso_a2 = $1 AND ba2.valid_to IS NULL ORDER BY ba2.admin2_name """, country_code.upper(), simplify) features = [ GeoJSONFeature( properties={ "id": r['id'], "code": r['code'], "name": r['name'], "name_local": r['name_local'], "admin1_code": r['admin1_code'], "area_km2": float(r['area_km2']) if r['area_km2'] else None, "source": r['source'], }, geometry=r['geometry'] ) for r in rows ] return GeoJSONFeatureCollection(features=features) @app.get("/boundaries/stats") async def get_boundary_stats() -> Dict[str, Any]: """Get statistics about loaded boundary data""" pool = await get_pool() async with pool.acquire() as conn: stats = {} # Countries country_count = await conn.fetchval(""" SELECT COUNT(*) FROM boundary_countries WHERE valid_to IS NULL """) stats['countries'] = country_count # Admin1 by country admin1_stats = await conn.fetch(""" SELECT bc.iso_a2 as country_code, COUNT(*) as count FROM boundary_admin1 ba1 JOIN boundary_countries bc ON ba1.country_id = bc.id WHERE ba1.valid_to IS NULL GROUP BY bc.iso_a2 ORDER BY bc.iso_a2 """) stats['admin1_by_country'] = { r['country_code'].strip(): r['count'] for r in admin1_stats } stats['admin1_total'] = sum(r['count'] for r in admin1_stats) # Admin2 by country admin2_stats = await conn.fetch(""" SELECT bc.iso_a2 as country_code, COUNT(*) as count FROM boundary_admin2 ba2 JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id JOIN boundary_countries bc ON ba1.country_id = bc.id WHERE ba2.valid_to IS NULL GROUP BY bc.iso_a2 ORDER BY bc.iso_a2 """) stats['admin2_by_country'] = { r['country_code'].strip(): r['count'] for r in admin2_stats } stats['admin2_total'] = sum(r['count'] for r in admin2_stats) # Data sources sources = await conn.fetch(""" SELECT source_code, source_name, coverage_scope FROM boundary_data_sources ORDER BY source_code """) stats['data_sources'] = [ { "code": s['source_code'], "name": s['source_name'], "scope": s['coverage_scope'], } for s in sources ] return stats # ============================================================================ # Main Entry Point # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run( "main:app", host=settings.api_host, port=settings.api_port, reload=True, )