glam/backend/postgres/main.py
kempersc 68c5aa2724 feat(api): Add heritage person classification and RAG retry logic
- Add GLAMORCUBESFIXPHDNT heritage type detection for person profiles
- Two-stage classification: blocklist non-heritage orgs, then match keywords
- Special handling for Digital (D) type: requires heritage org context
- Add career_history heritage_relevant and heritage_type fields
- Add exponential backoff retry for Anthropic API overload errors
- Fix DSPy 3.x async context with dspy.context() wrapper
2025-12-15 01:31:54 +01:00

1643 lines
61 KiB
Python

"""
PostgreSQL REST API for Heritage Custodian Data
FastAPI backend providing SQL query interface for bronhouder.nl
Endpoints:
- GET / - Health check and statistics
- POST /query - Execute SQL query
- GET /tables - List all tables with metadata
- GET /schema/:table - Get table schema
"""
import os
import json
from datetime import datetime
from typing import Optional, List, Dict, Any
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, Query, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import asyncpg
# ============================================================================
# Configuration
# ============================================================================
class Settings(BaseModel):
"""PostgreSQL server settings"""
host: str = os.getenv("POSTGRES_HOST", "localhost")
port: int = int(os.getenv("POSTGRES_PORT", "5432"))
database: str = os.getenv("POSTGRES_DB", "glam_heritage")
user: str = os.getenv("POSTGRES_USER", "kempersc")
password: str = os.getenv("POSTGRES_PASSWORD", "")
# Server settings
api_host: str = os.getenv("API_HOST", "0.0.0.0")
api_port: int = int(os.getenv("API_PORT", "8001"))
settings = Settings()
# ============================================================================
# Heritage Classification
# ============================================================================
# Heritage type detection keywords for GLAMORCUBESFIXPHDNT taxonomy
HERITAGE_KEYWORDS = {
'G': ['gallery', 'galerie', 'kunsthal', 'art dealer', 'art gallery', 'exhibition space'],
'L': ['library', 'bibliotheek', 'bibliothek', 'librarian', 'bibliothecaris', 'KB ', 'national library'],
'A': ['archive', 'archief', 'archivist', 'archivaris', 'archival', 'beeld en geluid', 'beeld & geluid',
'NISV', 'filmmuseum', 'eye film', 'EYE ', 'audiovisual', 'nationaal archief', 'stadsarchief',
'gemeentearchief', 'rijksarchief', 'NIOD', 'IISH', 'IISG', 'archiefspecialist'],
'M': ['museum', 'musea', 'curator', 'conservator', 'collection manager', 'rijksmuseum', 'van gogh',
'stedelijk', 'mauritshuis', 'tropenmuseum', 'allard pierson', 'museale', 'collectiebeheerder',
'collectiespecialist', 'collectie'],
'O': ['ministry', 'ministerie', 'government', 'overheid', 'gemeente', 'province', 'provincie', 'OCW'],
'R': ['research', 'onderzoek', 'researcher', 'onderzoeker', 'KNAW', 'humanities cluster', 'NWO',
'documentatie', 'documentation', 'kenniscentrum', 'historicus'],
'C': ['corporate archive', 'bedrijfsarchief', 'company history'],
'E': ['university', 'universiteit', 'professor', 'lecturer', 'docent', 'hogeschool', 'academy',
'academie', 'PhD', 'phd candidate', 'student', 'teacher', 'onderwijs', 'education', 'UvA',
'VU ', 'leiden university', 'reinwardt', 'film academy', 'graduate', 'assistant professor',
'associate professor', 'hoogleraar', 'educatie', 'educator'],
'S': ['society', 'vereniging', 'genootschap', 'historical society', 'historische vereniging'],
'D': ['digital', 'digitaal', 'platform', 'software', 'IT ', 'tech', 'developer', 'engineer',
'data ', 'AI ', 'machine learning', 'digitalisering', 'datamanagement', 'data analist'],
}
NON_HERITAGE_KEYWORDS = [
'marketing', 'sales', 'HR ', 'human resources', 'recruiter', 'finance', 'accounting',
'legal', 'lawyer', 'advocaat', 'consultant', 'coach', 'therapy', 'health', 'medical',
'food', 'restaurant', 'retail', 'fashion', 'real estate', 'insurance', 'banking',
'investment', 'e-commerce', 'organiser', 'opruimhulp', 'verpleeg', 'nurse'
]
# Organizations that are explicitly NOT heritage institutions
NON_HERITAGE_ORGANIZATIONS = [
# Banks & Financial
'ing ', 'ing nederland', 'rabobank', 'abn amro', 'postbank', 'triodos',
# Security companies
'i-sec', 'g4s', 'securitas', 'trigion', 'chubb',
# Police/Government (non-cultural)
'politie', 'police', 'rijkswaterstaat', 'belastingdienst', 'douane', 'defensie',
# Political parties
'vvd', 'pvda', 'cda', 'd66', 'groenlinks', 'pvv', 'bbb', 'nsc', 'volt',
'sp ', 'forum voor democratie', 'ja21', 'bij1', 'denk', 'sgp', 'cu ',
# Tech companies (non-heritage)
'google', 'microsoft', 'amazon', 'meta', 'facebook', 'apple', 'netflix',
'uber', 'airbnb', 'booking.com', 'adyen', 'mollie', 'messagebird',
'coolblue', 'bol.com', 'picnic', 'takeaway', 'just eat',
# Telecom
'kpn', 'vodafone', 't-mobile', 'ziggo',
# Postal / Logistics
'postnl', 'postkantoren', 'dhl', 'ups', 'fedex',
# Healthcare
'ziekenhuis', 'hospital', 'ggz', 'ggd', 'thuiszorg',
# Retail
'albert heijn', 'jumbo', 'lidl', 'aldi', 'ikea', 'hema', 'action',
# Consulting / Professional services
'deloitte', 'kpmg', 'pwc', 'ey ', 'ernst & young', 'mckinsey', 'bcg',
'accenture', 'capgemini', 'ordina', 'atos', 'cgi ',
# Recruitment / HR
'randstad', 'tempo-team', 'manpower', 'hays', 'brunel',
# Energy / Utilities
'shell', 'bp ', 'eneco', 'vattenfall', 'essent', 'nuon',
# Transport
'ns ', 'prorail', 'schiphol', 'klm', 'transavia',
# Other
'freelance', 'zelfstandig', 'zzp', 'eigen bedrijf',
]
# Heritage organization keywords - organizations that ARE heritage institutions
HERITAGE_ORGANIZATION_KEYWORDS = [
# Archives
'archief', 'archive', 'nationaal archief', 'stadsarchief', 'regionaal archief',
'beeld en geluid', 'beeld & geluid', 'niod', 'iish', 'iisg',
# Museums
'museum', 'musea', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis',
'tropenmuseum', 'allard pierson', 'kröller', 'boijmans',
# Libraries
'bibliotheek', 'library', 'koninklijke bibliotheek', 'kb ',
# Film/AV heritage
'eye film', 'filmmuseum', 'eye ', 'sound and vision',
# Heritage platforms
'erfgoed', 'heritage', 'cultural', 'cultureel',
# Research institutes (heritage-focused)
'knaw', 'humanities cluster', 'meertens', 'huygens',
]
def detect_heritage_type(role: Optional[str], company: Optional[str]) -> tuple:
"""
Detect if a position is heritage-relevant and what type.
Two-stage classification:
1. Check if organization is explicitly non-heritage (blocklist)
2. Check if role/organization matches heritage patterns
For 'D' (Digital) type, require BOTH a tech role AND a heritage organization.
This prevents generic IT workers at banks/police from being classified as heritage.
Args:
role: Job title/role text
company: Company/organization name
Returns:
Tuple of (heritage_relevant: bool, heritage_type: Optional[str])
"""
import re
# Combine role and company for full context
role_text = role or ''
company_text = company or ''
combined = f"{role_text} {company_text}".lower()
if not combined.strip():
return (False, None)
# Stage 1: Check for non-heritage organizations (blocklist)
# Use word boundary matching to avoid false positives like "sharing" matching "ing "
for org in NON_HERITAGE_ORGANIZATIONS:
org_pattern = org.lower().strip()
# Use word boundary regex for patterns that could have false positives
if re.search(r'\b' + re.escape(org_pattern) + r'\b', combined):
return (False, None)
# Stage 2: Check for non-heritage role indicators
for keyword in NON_HERITAGE_KEYWORDS:
keyword_pattern = keyword.lower().strip()
if re.search(r'\b' + re.escape(keyword_pattern) + r'\b', combined):
return (False, None)
# Stage 3: Check if this is a heritage organization
is_heritage_org = False
for org_keyword in HERITAGE_ORGANIZATION_KEYWORDS:
if org_keyword.lower() in combined:
is_heritage_org = True
break
# Check heritage keywords by type (order matters - more specific first)
# 'D' (Digital) is checked last and requires heritage org validation
type_order = ['A', 'M', 'L', 'G', 'S', 'C', 'O', 'R', 'E'] # D removed from main loop
for heritage_type in type_order:
keywords = HERITAGE_KEYWORDS.get(heritage_type, [])
for keyword in keywords:
if keyword.lower() in combined:
return (True, heritage_type)
# Special handling for 'D' (Digital) - ONLY if at a heritage organization
if is_heritage_org:
digital_keywords = HERITAGE_KEYWORDS.get('D', [])
for keyword in digital_keywords:
if keyword.lower() in combined:
return (True, 'D')
# Generic heritage terms (without specific type)
generic = ['heritage', 'erfgoed', 'culture', 'cultuur', 'cultural', 'film', 'cinema',
'media', 'arts', 'kunst', 'creative', 'preservation', 'conservation', 'collection']
for keyword in generic:
if keyword in combined:
return (True, None)
return (False, None)
# ============================================================================
# Pydantic Models
# ============================================================================
class QueryRequest(BaseModel):
"""SQL query request"""
sql: str = Field(..., description="SQL query to execute")
params: Optional[List[Any]] = Field(None, description="Query parameters")
class QueryResponse(BaseModel):
"""SQL query response"""
columns: List[str]
rows: List[List[Any]]
row_count: int
execution_time_ms: float
class TableInfo(BaseModel):
"""Table metadata"""
name: str
schema_name: str
row_count: int
column_count: int
size_bytes: Optional[int] = None
class ColumnInfo(BaseModel):
"""Column metadata"""
name: str
data_type: str
is_nullable: bool
default_value: Optional[str] = None
description: Optional[str] = None
class StatusResponse(BaseModel):
"""Server status response"""
status: str
database: str
tables: int
total_rows: int
uptime_seconds: float
postgres_version: str
# ============================================================================
# Global State
# ============================================================================
_pool: Optional[asyncpg.Pool] = None
_start_time: datetime = datetime.now()
async def get_pool() -> asyncpg.Pool:
"""Get or create connection pool"""
global _pool
if _pool is None:
_pool = await asyncpg.create_pool(
host=settings.host,
port=settings.port,
database=settings.database,
user=settings.user,
password=settings.password,
min_size=2,
max_size=10,
)
return _pool
# ============================================================================
# FastAPI App
# ============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler"""
# Startup: Initialize connection pool
await get_pool()
yield
# Shutdown: Close pool
global _pool
if _pool:
await _pool.close()
_pool = None
app = FastAPI(
title="PostgreSQL Heritage API",
description="REST API for heritage institution SQL queries",
version="1.0.0",
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# Helper Functions
# ============================================================================
def serialize_value(val: Any) -> Any:
"""Convert PostgreSQL values to JSON-serializable format"""
if val is None:
return None
elif isinstance(val, datetime):
return val.isoformat()
elif isinstance(val, (dict, list)):
return val
elif isinstance(val, bytes):
return val.decode('utf-8', errors='replace')
else:
return val
# ============================================================================
# API Endpoints
# ============================================================================
@app.get("/", response_model=StatusResponse)
async def get_status() -> StatusResponse:
"""Get server status and statistics"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get PostgreSQL version
version = await conn.fetchval("SELECT version()")
# Get table count
tables = await conn.fetchval("""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'public' AND table_type = 'BASE TABLE'
""")
# Get total row count (approximate)
total_rows = await conn.fetchval("""
SELECT COALESCE(SUM(n_tup_ins - n_tup_del), 0)::bigint
FROM pg_stat_user_tables
""")
uptime = (datetime.now() - _start_time).total_seconds()
return StatusResponse(
status="healthy",
database=settings.database,
tables=tables or 0,
total_rows=total_rows or 0,
uptime_seconds=uptime,
postgres_version=version.split(',')[0] if version else "unknown",
)
@app.post("/query", response_model=QueryResponse)
async def execute_query(request: QueryRequest) -> QueryResponse:
"""Execute a SQL query (read-only)"""
pool = await get_pool()
# Security: Only allow SELECT queries for now
sql_upper = request.sql.strip().upper()
if not sql_upper.startswith("SELECT") and not sql_upper.startswith("WITH"):
raise HTTPException(
status_code=403,
detail="Only SELECT queries are allowed. Use WITH...SELECT for CTEs."
)
start_time = datetime.now()
try:
async with pool.acquire() as conn:
if request.params:
result = await conn.fetch(request.sql, *request.params)
else:
result = await conn.fetch(request.sql)
if result:
columns = list(result[0].keys())
rows = [[serialize_value(row[col]) for col in columns] for row in result]
else:
columns = []
rows = []
execution_time = (datetime.now() - start_time).total_seconds() * 1000
return QueryResponse(
columns=columns,
rows=rows,
row_count=len(rows),
execution_time_ms=round(execution_time, 2),
)
except asyncpg.PostgresError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/tables", response_model=List[TableInfo])
async def list_tables() -> List[TableInfo]:
"""List all tables with metadata"""
pool = await get_pool()
async with pool.acquire() as conn:
tables = await conn.fetch("""
SELECT
t.table_name,
t.table_schema,
(SELECT COUNT(*) FROM information_schema.columns c
WHERE c.table_name = t.table_name AND c.table_schema = t.table_schema) as column_count,
COALESCE(s.n_tup_ins - s.n_tup_del, 0) as row_count,
pg_total_relation_size(quote_ident(t.table_schema) || '.' || quote_ident(t.table_name)) as size_bytes
FROM information_schema.tables t
LEFT JOIN pg_stat_user_tables s
ON s.schemaname = t.table_schema AND s.relname = t.table_name
WHERE t.table_schema = 'public'
AND t.table_type = 'BASE TABLE'
ORDER BY t.table_name
""")
return [
TableInfo(
name=row['table_name'],
schema_name=row['table_schema'],
column_count=row['column_count'],
row_count=row['row_count'] or 0,
size_bytes=row['size_bytes'],
)
for row in tables
]
@app.get("/schema/{table_name}", response_model=List[ColumnInfo])
async def get_table_schema(table_name: str) -> List[ColumnInfo]:
"""Get schema for a specific table"""
pool = await get_pool()
async with pool.acquire() as conn:
# Check table exists
exists = await conn.fetchval("""
SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = 'public' AND table_name = $1
)
""", table_name)
if not exists:
raise HTTPException(status_code=404, detail=f"Table '{table_name}' not found")
columns = await conn.fetch("""
SELECT
column_name,
data_type,
is_nullable,
column_default,
col_description(
(quote_ident(table_schema) || '.' || quote_ident(table_name))::regclass,
ordinal_position
) as description
FROM information_schema.columns
WHERE table_schema = 'public' AND table_name = $1
ORDER BY ordinal_position
""", table_name)
return [
ColumnInfo(
name=col['column_name'],
data_type=col['data_type'],
is_nullable=col['is_nullable'] == 'YES',
default_value=col['column_default'],
description=col['description'],
)
for col in columns
]
@app.get("/stats")
async def get_database_stats() -> Dict[str, Any]:
"""Get detailed database statistics"""
pool = await get_pool()
async with pool.acquire() as conn:
# Database size
db_size = await conn.fetchval("""
SELECT pg_size_pretty(pg_database_size($1))
""", settings.database)
# Table sizes
table_sizes = await conn.fetch("""
SELECT
relname as table_name,
pg_size_pretty(pg_total_relation_size(relid)) as total_size,
n_live_tup as row_count
FROM pg_stat_user_tables
ORDER BY pg_total_relation_size(relid) DESC
LIMIT 10
""")
return {
"database": settings.database,
"size": db_size,
"largest_tables": [
{
"name": t['table_name'],
"size": t['total_size'],
"rows": t['row_count']
}
for t in table_sizes
]
}
# ============================================================================
# LinkML Schema API Endpoints
# ============================================================================
class LinkMLSchemaVersion(BaseModel):
"""LinkML schema version info"""
id: int
version: str
schema_name: str
description: Optional[str]
is_current: bool
created_at: str
class_count: int
slot_count: int
enum_count: int
class LinkMLClass(BaseModel):
"""LinkML class definition"""
id: int
class_name: str
class_id: str
title: Optional[str]
is_a: Optional[str]
class_uri: Optional[str]
abstract: bool
description: Optional[str]
exact_mappings: List[str]
close_mappings: List[str]
broad_mappings: List[str]
narrow_mappings: List[str]
class LinkMLSlot(BaseModel):
"""LinkML slot definition"""
id: int
slot_name: str
slot_id: str
range: Optional[str]
slot_uri: Optional[str]
required: bool
multivalued: bool
identifier: bool
description: Optional[str]
class LinkMLEnum(BaseModel):
"""LinkML enum definition"""
id: int
enum_name: str
enum_id: str
title: Optional[str]
description: Optional[str]
values: List[Dict[str, Any]]
class LinkMLSearchResult(BaseModel):
"""LinkML search result"""
element_type: str
element_name: str
element_uri: Optional[str]
description: Optional[str]
rank: float
class ProfileResponse(BaseModel):
"""Extended LinkedIn profile response"""
profile_data: Dict[str, Any]
linkedin_slug: str
extraction_date: Optional[str]
updated_date: Optional[str]
source: Optional[str] = "database"
@app.get("/linkml/versions", response_model=List[LinkMLSchemaVersion])
async def list_linkml_versions() -> List[LinkMLSchemaVersion]:
"""List all LinkML schema versions with statistics"""
pool = await get_pool()
async with pool.acquire() as conn:
versions = await conn.fetch("""
SELECT * FROM linkml_schema_stats ORDER BY created_at DESC
""")
return [
LinkMLSchemaVersion(
id=v['id'] if 'id' in v.keys() else 0,
version=v['version'],
schema_name=v['schema_name'],
description=None,
is_current=v['is_current'],
created_at=v['created_at'].isoformat() if v['created_at'] else "",
class_count=v['class_count'] or 0,
slot_count=v['slot_count'] or 0,
enum_count=v['enum_count'] or 0,
)
for v in versions
]
@app.get("/linkml/classes", response_model=List[LinkMLClass])
async def list_linkml_classes(
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> List[LinkMLClass]:
"""List all classes in the LinkML schema"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
classes = await conn.fetch("""
SELECT c.* FROM linkml_classes c
JOIN linkml_schema_versions v ON c.version_id = v.id
WHERE v.version = $1
ORDER BY c.class_name
""", version)
else:
classes = await conn.fetch("""
SELECT * FROM linkml_current_classes ORDER BY class_name
""")
return [
LinkMLClass(
id=c['id'],
class_name=c['class_name'],
class_id=c['class_id'],
title=c['title'],
is_a=c['is_a'],
class_uri=c['class_uri'],
abstract=c['abstract'] or False,
description=c['description'],
exact_mappings=c['exact_mappings'] or [],
close_mappings=c['close_mappings'] or [],
broad_mappings=c['broad_mappings'] or [],
narrow_mappings=c['narrow_mappings'] or [],
)
for c in classes
]
@app.get("/linkml/classes/{class_name}")
async def get_linkml_class(
class_name: str,
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> Dict[str, Any]:
"""Get detailed information about a specific class including slots"""
pool = await get_pool()
async with pool.acquire() as conn:
# Get class
if version:
cls = await conn.fetchrow("""
SELECT c.* FROM linkml_classes c
JOIN linkml_schema_versions v ON c.version_id = v.id
WHERE v.version = $1 AND c.class_name = $2
""", version, class_name)
else:
cls = await conn.fetchrow("""
SELECT * FROM linkml_current_classes WHERE class_name = $1
""", class_name)
if not cls:
raise HTTPException(status_code=404, detail=f"Class '{class_name}' not found")
# Get slots for this class
slots = await conn.fetch("""
SELECT s.slot_name, s.range, s.slot_uri, s.required, s.multivalued,
s.identifier, s.description, cs.slot_usage
FROM linkml_class_slots cs
JOIN linkml_slots s ON cs.slot_id = s.id
WHERE cs.class_id = $1
ORDER BY cs.slot_order
""", cls['id'])
# Get inheritance chain
inheritance = await conn.fetch("""
SELECT * FROM get_class_inheritance($1)
""", class_name)
return {
"id": cls['id'],
"class_name": cls['class_name'],
"class_id": cls['class_id'],
"title": cls['title'],
"is_a": cls['is_a'],
"class_uri": cls['class_uri'],
"abstract": cls['abstract'],
"description": cls['description'],
"comments": cls['comments'],
"exact_mappings": cls['exact_mappings'] or [],
"close_mappings": cls['close_mappings'] or [],
"broad_mappings": cls['broad_mappings'] or [],
"narrow_mappings": cls['narrow_mappings'] or [],
"slots": [
{
"slot_name": s['slot_name'],
"range": s['range'],
"slot_uri": s['slot_uri'],
"required": s['required'],
"multivalued": s['multivalued'],
"identifier": s['identifier'],
"description": s['description'],
"slot_usage": json.loads(s['slot_usage']) if s['slot_usage'] else None,
}
for s in slots
],
"inheritance": [
{
"level": i['level'],
"class_name": i['class_name'],
"class_uri": i['class_uri'],
"abstract": i['abstract'],
}
for i in inheritance
],
}
@app.get("/linkml/slots", response_model=List[LinkMLSlot])
async def list_linkml_slots(
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> List[LinkMLSlot]:
"""List all slots in the LinkML schema"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
slots = await conn.fetch("""
SELECT s.* FROM linkml_slots s
JOIN linkml_schema_versions v ON s.version_id = v.id
WHERE v.version = $1
ORDER BY s.slot_name
""", version)
else:
slots = await conn.fetch("""
SELECT * FROM linkml_current_slots ORDER BY slot_name
""")
return [
LinkMLSlot(
id=s['id'],
slot_name=s['slot_name'],
slot_id=s['slot_id'],
range=s['range'],
slot_uri=s['slot_uri'],
required=s['required'] or False,
multivalued=s['multivalued'] or False,
identifier=s['identifier'] or False,
description=s['description'],
)
for s in slots
]
@app.get("/linkml/slots/{slot_name}")
async def get_linkml_slot(
slot_name: str,
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> Dict[str, Any]:
"""Get detailed information about a specific slot"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
slot = await conn.fetchrow("""
SELECT s.* FROM linkml_slots s
JOIN linkml_schema_versions v ON s.version_id = v.id
WHERE v.version = $1 AND s.slot_name = $2
""", version, slot_name)
else:
slot = await conn.fetchrow("""
SELECT * FROM linkml_current_slots WHERE slot_name = $1
""", slot_name)
if not slot:
raise HTTPException(status_code=404, detail=f"Slot '{slot_name}' not found")
# Get classes that use this slot
classes = await conn.fetch("""
SELECT c.class_name, cs.slot_usage
FROM linkml_class_slots cs
JOIN linkml_classes c ON cs.class_id = c.id
WHERE cs.slot_id = $1
ORDER BY c.class_name
""", slot['id'])
return {
"id": slot['id'],
"slot_name": slot['slot_name'],
"slot_id": slot['slot_id'],
"range": slot['range'],
"slot_uri": slot['slot_uri'],
"required": slot['required'],
"multivalued": slot['multivalued'],
"identifier": slot['identifier'],
"inlined": slot['inlined'],
"inlined_as_list": slot['inlined_as_list'],
"pattern": slot['pattern'],
"description": slot['description'],
"comments": slot['comments'],
"examples": json.loads(slot['examples']) if slot['examples'] else None,
"used_by_classes": [
{
"class_name": c['class_name'],
"slot_usage": json.loads(c['slot_usage']) if c['slot_usage'] else None,
}
for c in classes
],
}
@app.get("/linkml/enums", response_model=List[LinkMLEnum])
async def list_linkml_enums(
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> List[LinkMLEnum]:
"""List all enums in the LinkML schema with their values"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
enums = await conn.fetch("""
SELECT e.* FROM linkml_enums e
JOIN linkml_schema_versions v ON e.version_id = v.id
WHERE v.version = $1
ORDER BY e.enum_name
""", version)
else:
enums = await conn.fetch("""
SELECT * FROM linkml_current_enums ORDER BY enum_name
""")
result = []
for e in enums:
# Get values for this enum
values = await conn.fetch("""
SELECT value_name, meaning, description, comments
FROM linkml_enum_values
WHERE enum_id = $1
ORDER BY value_order
""", e['id'])
result.append(LinkMLEnum(
id=e['id'],
enum_name=e['enum_name'],
enum_id=e['enum_id'],
title=e['title'],
description=e['description'],
values=[
{
"name": v['value_name'],
"meaning": v['meaning'],
"description": v['description'],
"comments": v['comments'],
}
for v in values
],
))
return result
@app.get("/linkml/enums/{enum_name}")
async def get_linkml_enum(
enum_name: str,
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> Dict[str, Any]:
"""Get detailed information about a specific enum"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
enum = await conn.fetchrow("""
SELECT e.* FROM linkml_enums e
JOIN linkml_schema_versions v ON e.version_id = v.id
WHERE v.version = $1 AND e.enum_name = $2
""", version, enum_name)
else:
enum = await conn.fetchrow("""
SELECT * FROM linkml_current_enums WHERE enum_name = $1
""", enum_name)
if not enum:
raise HTTPException(status_code=404, detail=f"Enum '{enum_name}' not found")
# Get values
values = await conn.fetch("""
SELECT * FROM linkml_enum_values
WHERE enum_id = $1
ORDER BY value_order
""", enum['id'])
return {
"id": enum['id'],
"enum_name": enum['enum_name'],
"enum_id": enum['enum_id'],
"title": enum['title'],
"description": enum['description'],
"comments": enum['comments'],
"values": [
{
"name": v['value_name'],
"meaning": v['meaning'],
"description": v['description'],
"comments": v['comments'],
}
for v in values
],
}
@app.get("/linkml/search", response_model=List[LinkMLSearchResult])
async def search_linkml(
element_type: Optional[str] = Query(None, description="Element type to search for"),
element_name: Optional[str] = Query(None, description="Element name to search for"),
limit: int = Query(10, description="Maximum number of results"),
pool: asyncpg.Pool = Depends(get_pool)
) -> List[LinkMLSearchResult]:
"""Search LinkML schema elements"""
pool = await get_pool()
# Build search query
where_conditions = []
params = []
if element_type:
where_conditions.append("element_type = %s")
params.append(element_type)
if element_name:
where_conditions.append("element_name ILIKE %s")
params.append(f"%{element_name}%")
where_clause = " AND ".join(where_conditions) if where_conditions else "1=1"
async with pool.acquire() as conn:
results = await conn.fetch(f"""
SELECT
element_type, element_name, element_uri, description, rank,
class_name, enum_id, title, values
FROM linkml_search_index
WHERE {where_clause}
ORDER BY rank ASC
LIMIT %s
""", *params, limit)
return [
LinkMLSearchResult(
element_type=result['element_type'],
element_name=result['element_name'],
element_uri=result.get('element_uri'),
description=result.get('description'),
rank=result['rank'],
class_name=result.get('class_name'),
enum_id=result.get('enum_id'),
title=result.get('title'),
values=result.get('values', [])
)
for result in results
]
@app.get("/profile/{linkedin_slug}", response_model=ProfileResponse)
async def get_profile(
linkedin_slug: str,
pool: asyncpg.Pool = Depends(get_pool)
) -> ProfileResponse:
"""Get extended LinkedIn profile data by LinkedIn slug"""
from urllib.parse import unquote
import unicodedata
# Normalize the slug: URL-decode and convert to ASCII
decoded_slug = unquote(linkedin_slug)
# NFD decomposition separates base characters from combining marks
normalized_slug = unicodedata.normalize('NFD', decoded_slug)
# Remove combining marks (diacritics) to get ASCII equivalent
ascii_slug = ''.join(c for c in normalized_slug if unicodedata.category(c) != 'Mn')
pool = await get_pool()
async with pool.acquire() as conn:
# Try to find profile in person_entity table
# First try the original slug, then the normalized version
result = await conn.fetchrow("""
SELECT profile_data, linkedin_slug, extraction_date, updated_date
FROM person_entity
WHERE linkedin_slug = $1 OR linkedin_slug = $2
""", linkedin_slug, ascii_slug)
if result and result['profile_data']:
# asyncpg may return JSONB as string - parse if needed
profile_data = result['profile_data']
if isinstance(profile_data, str):
profile_data = json.loads(profile_data)
# Transform experience → career_history for frontend compatibility
# The database stores 'experience' but frontend expects 'career_history'
inner_profile = profile_data.get('profile_data', {})
if inner_profile and 'experience' in inner_profile and 'career_history' not in inner_profile:
experience = inner_profile.get('experience', [])
if experience:
# Map field names: title→role, company→organization, date_range→dates
# Also classify each position as heritage-relevant or not
career_history = []
for job in experience:
role = job.get('title')
company = job.get('company')
heritage_relevant, heritage_type = detect_heritage_type(role, company)
career_item = {
'role': role,
'organization': company,
'dates': job.get('date_range') or job.get('duration'), # date_range has year info
'location': job.get('location'),
'description': job.get('description'),
'company_size': job.get('company_details'),
'current': job.get('current', False),
'heritage_relevant': heritage_relevant,
'heritage_type': heritage_type,
}
career_history.append(career_item)
inner_profile['career_history'] = career_history
profile_data['profile_data'] = inner_profile
# Also add heritage classification to existing career_history entries that lack it
if inner_profile and 'career_history' in inner_profile:
career_history = inner_profile.get('career_history', [])
needs_update = False
for job in career_history:
if job.get('heritage_relevant') is None:
needs_update = True
role = job.get('role') or job.get('title')
company = job.get('organization') or job.get('company')
heritage_relevant, heritage_type = detect_heritage_type(role, company)
job['heritage_relevant'] = heritage_relevant
job['heritage_type'] = heritage_type
if needs_update:
inner_profile['career_history'] = career_history
profile_data['profile_data'] = inner_profile
return ProfileResponse(
profile_data=profile_data,
linkedin_slug=result['linkedin_slug'],
extraction_date=result['extraction_date'].isoformat() if result['extraction_date'] else None,
updated_date=result['updated_date'].isoformat() if result['updated_date'] else None,
source="database"
)
# If not in database, try to load from entity directory (fallback)
import os
entity_dir = os.environ.get("ENTITY_DIR", "/opt/glam-backend/postgres/data/custodian/person/entity")
profile_files = [f for f in os.listdir(entity_dir) if f.endswith('.json')]
for filename in profile_files:
if linkedin_slug in filename:
file_path = os.path.join(entity_dir, filename)
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
file_profile_data = data.get('profile_data', {})
# Transform experience → career_history for frontend compatibility
# Handle both nested (profile_data.profile_data) and flat (profile_data) structures
nested_profile = file_profile_data.get('profile_data', {})
inner_profile = nested_profile if nested_profile else file_profile_data
if inner_profile and 'experience' in inner_profile and 'career_history' not in inner_profile:
experience = inner_profile.get('experience', [])
if experience:
# Map field names: title→role, company→organization, date_range→dates
# Also classify each position as heritage-relevant or not
career_history = []
for job in experience:
role = job.get('title')
company = job.get('company')
heritage_relevant, heritage_type = detect_heritage_type(role, company)
career_item = {
'role': role,
'organization': company,
'dates': job.get('date_range') or job.get('duration'), # date_range has year info
'location': job.get('location'),
'description': job.get('description'),
'company_size': job.get('company_details'),
'current': job.get('current', False),
'heritage_relevant': heritage_relevant,
'heritage_type': heritage_type,
}
career_history.append(career_item)
inner_profile['career_history'] = career_history
# career_history is now in inner_profile which is either nested or file_profile_data directly
# Also add heritage classification to existing career_history entries that lack it
if inner_profile and 'career_history' in inner_profile:
career_history = inner_profile.get('career_history', [])
for job in career_history:
if job.get('heritage_relevant') is None:
role = job.get('role') or job.get('title')
company = job.get('organization') or job.get('company')
heritage_relevant, heritage_type = detect_heritage_type(role, company)
job['heritage_relevant'] = heritage_relevant
job['heritage_type'] = heritage_type
return ProfileResponse(
profile_data=file_profile_data,
linkedin_slug=linkedin_slug,
extraction_date=data.get('exa_search_metadata', {}).get('enrichment_timestamp'),
updated_date=None,
source="entity_file"
)
except Exception as e:
print(f"Error loading profile file {filename}: {e}")
continue
raise HTTPException(status_code=404, detail="Profile not found")
@app.get("/linkml/hierarchy")
async def get_class_hierarchy(
version: Optional[str] = Query(None, description="Schema version (default: current)")
) -> List[Dict[str, Any]]:
"""Get the complete class hierarchy as a tree structure"""
pool = await get_pool()
async with pool.acquire() as conn:
if version:
classes = await conn.fetch("""
SELECT c.class_name, c.is_a, c.class_uri, c.abstract, c.title, c.description
FROM linkml_classes c
JOIN linkml_schema_versions v ON c.version_id = v.id
WHERE v.version = $1
ORDER BY c.class_name
""", version)
else:
classes = await conn.fetch("""
SELECT class_name, is_a, class_uri, abstract, title, description
FROM linkml_current_classes
ORDER BY class_name
""")
# Build tree structure
class_map = {c['class_name']: dict(c) for c in classes}
children_map: Dict[str, List[str]] = {}
for c in classes:
parent = c['is_a']
if parent:
if parent not in children_map:
children_map[parent] = []
children_map[parent].append(c['class_name'])
def build_tree(class_name: str) -> Dict[str, Any]:
node = class_map.get(class_name, {"class_name": class_name})
children = children_map.get(class_name, [])
return {
"class_name": class_name,
"class_uri": node.get('class_uri'),
"abstract": node.get('abstract', False),
"title": node.get('title'),
"children": [build_tree(child) for child in sorted(children)],
}
# Find root classes (no parent or parent not in schema)
roots = [
c['class_name'] for c in classes
if not c['is_a'] or c['is_a'] not in class_map
]
return [build_tree(root) for root in sorted(roots)]
# ============================================================================
# Boundary/GIS Endpoints
# ============================================================================
# PostGIS-backed endpoints for administrative boundaries and service areas
class BoundaryInfo(BaseModel):
"""Administrative boundary metadata"""
id: int
code: str
name: str
name_local: Optional[str] = None
admin_level: int
country_code: str
parent_code: Optional[str] = None
area_km2: Optional[float] = None
source: str
class PointLookupResult(BaseModel):
"""Result of point-in-polygon lookup"""
country: Optional[BoundaryInfo] = None
admin1: Optional[BoundaryInfo] = None
admin2: Optional[BoundaryInfo] = None
geonames_settlement: Optional[Dict[str, Any]] = None
class GeoJSONFeature(BaseModel):
"""GeoJSON Feature"""
type: str = "Feature"
properties: Dict[str, Any]
geometry: Optional[Dict[str, Any]] = None
class GeoJSONFeatureCollection(BaseModel):
"""GeoJSON FeatureCollection"""
type: str = "FeatureCollection"
features: List[GeoJSONFeature]
@app.get("/boundaries/countries")
async def list_countries() -> List[Dict[str, Any]]:
"""List all countries with boundary data"""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
bc.id,
bc.iso_a2 as code,
bc.country_name as name,
bc.country_name_local as name_local,
bc.area_km2,
bds.source_code as source,
ST_AsGeoJSON(bc.centroid)::json as centroid
FROM boundary_countries bc
LEFT JOIN boundary_data_sources bds ON bc.source_id = bds.id
WHERE bc.valid_to IS NULL
ORDER BY bc.country_name
""")
return [
{
"id": r['id'],
"code": r['code'].strip(),
"name": r['name'],
"name_local": r['name_local'],
"area_km2": float(r['area_km2']) if r['area_km2'] else None,
"source": r['source'],
"centroid": r['centroid'],
}
for r in rows
]
@app.get("/boundaries/countries/{country_code}/admin1")
async def list_admin1(country_code: str) -> List[Dict[str, Any]]:
"""List admin level 1 divisions (provinces/states) for a country"""
pool = await get_pool()
async with pool.acquire() as conn:
rows = await conn.fetch("""
SELECT
ba.id,
ba.admin1_code as code,
ba.admin1_name as name,
ba.admin1_name_local as name_local,
ba.iso_3166_2 as iso_code,
ba.area_km2,
bc.iso_a2 as country_code,
bds.source_code as source,
ST_AsGeoJSON(ba.centroid)::json as centroid
FROM boundary_admin1 ba
JOIN boundary_countries bc ON ba.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba.source_id = bds.id
WHERE bc.iso_a2 = $1 AND ba.valid_to IS NULL
ORDER BY ba.admin1_name
""", country_code.upper())
return [
{
"id": r['id'],
"code": r['code'],
"name": r['name'],
"name_local": r['name_local'],
"iso_code": r['iso_code'],
"country_code": r['country_code'].strip() if r['country_code'] else None,
"area_km2": float(r['area_km2']) if r['area_km2'] else None,
"source": r['source'],
"centroid": r['centroid'],
}
for r in rows
]
@app.get("/boundaries/countries/{country_code}/admin2")
async def list_admin2(
country_code: str,
admin1_code: Optional[str] = Query(None, description="Filter by admin1 code")
) -> List[Dict[str, Any]]:
"""List admin level 2 divisions (municipalities/counties) for a country"""
pool = await get_pool()
async with pool.acquire() as conn:
if admin1_code:
rows = await conn.fetch("""
SELECT
ba2.id,
ba2.admin2_code as code,
ba2.admin2_name as name,
ba2.admin2_name_local as name_local,
ba1.admin1_code,
ba1.admin1_name,
bc.iso_a2 as country_code,
ba2.area_km2,
bds.source_code as source,
ST_AsGeoJSON(ba2.centroid)::json as centroid
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id
WHERE bc.iso_a2 = $1
AND ba1.admin1_code = $2
AND ba2.valid_to IS NULL
ORDER BY ba2.admin2_name
""", country_code.upper(), admin1_code)
else:
rows = await conn.fetch("""
SELECT
ba2.id,
ba2.admin2_code as code,
ba2.admin2_name as name,
ba2.admin2_name_local as name_local,
ba1.admin1_code,
ba1.admin1_name,
bc.iso_a2 as country_code,
ba2.area_km2,
bds.source_code as source,
ST_AsGeoJSON(ba2.centroid)::json as centroid
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id
WHERE bc.iso_a2 = $1 AND ba2.valid_to IS NULL
ORDER BY ba2.admin2_name
""", country_code.upper())
return [
{
"id": r['id'],
"code": r['code'],
"name": r['name'],
"name_local": r['name_local'],
"admin1_code": r['admin1_code'],
"admin1_name": r['admin1_name'],
"country_code": r['country_code'].strip() if r['country_code'] else None,
"area_km2": float(r['area_km2']) if r['area_km2'] else None,
"source": r['source'],
"centroid": r['centroid'],
}
for r in rows
]
@app.get("/boundaries/lookup")
async def lookup_point(
lat: float = Query(..., description="Latitude (WGS84)"),
lon: float = Query(..., description="Longitude (WGS84)"),
country_code: Optional[str] = Query(None, description="ISO country code to filter results")
) -> PointLookupResult:
"""Find administrative boundaries containing a point"""
pool = await get_pool()
async with pool.acquire() as conn:
# Use the find_admin_for_point function
# Returns rows with: admin_level, admin_code, admin_name, iso_code, geonames_id
if country_code:
results = await conn.fetch("""
SELECT * FROM find_admin_for_point($1, $2, $3)
""", lon, lat, country_code.upper())
else:
results = await conn.fetch("""
SELECT * FROM find_admin_for_point($1, $2)
""", lon, lat)
# Parse results by admin level
admin1_info = None
admin2_info = None
for row in results:
level = row['admin_level']
if level == 1 and admin1_info is None:
admin1_info = BoundaryInfo(
id=0,
code=row['admin_code'],
name=row['admin_name'],
admin_level=1,
country_code=country_code or "XX",
source="PostGIS"
)
elif level == 2 and admin2_info is None:
admin2_info = BoundaryInfo(
id=0,
code=row['admin_code'],
name=row['admin_name'],
admin_level=2,
country_code=country_code or "XX",
source="PostGIS"
)
# Also try to determine country from the point
country_info = None
country_row = await conn.fetchrow("""
SELECT bc.id, bc.iso_a2, bc.country_name
FROM boundary_countries bc
WHERE ST_Contains(bc.geom, ST_SetSRID(ST_Point($1, $2), 4326))
AND bc.valid_to IS NULL
LIMIT 1
""", lon, lat)
if country_row:
country_info = BoundaryInfo(
id=country_row['id'],
code=country_row['iso_a2'].strip(),
name=country_row['country_name'],
admin_level=0,
country_code=country_row['iso_a2'].strip(),
source="PostGIS"
)
# Update admin boundaries with proper country code
if admin1_info:
admin1_info.country_code = country_row['iso_a2'].strip()
if admin2_info:
admin2_info.country_code = country_row['iso_a2'].strip()
return PointLookupResult(
country=country_info,
admin1=admin1_info,
admin2=admin2_info,
)
@app.get("/boundaries/admin2/{admin2_id}/geojson")
async def get_admin2_geojson(admin2_id: int) -> GeoJSONFeature:
"""Get GeoJSON geometry for a specific admin2 boundary"""
pool = await get_pool()
async with pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
ba2.id,
ba2.admin2_code as code,
ba2.admin2_name as name,
ba2.admin2_name_local as name_local,
bc.iso_a2 as country_code,
ba1.admin1_code,
ba2.area_km2,
bds.source_code as source,
ST_AsGeoJSON(ba2.geom, 6)::json as geometry
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id
WHERE ba2.id = $1
""", admin2_id)
if not row:
raise HTTPException(status_code=404, detail=f"Admin2 boundary {admin2_id} not found")
# asyncpg may return geometry as string or dict depending on version
geom = row['geometry']
if isinstance(geom, str):
geom = json.loads(geom)
return GeoJSONFeature(
properties={
"id": row['id'],
"code": row['code'],
"name": row['name'],
"name_local": row['name_local'],
"country_code": row['country_code'].strip() if row['country_code'] else None,
"admin1_code": row['admin1_code'],
"area_km2": float(row['area_km2']) if row['area_km2'] else None,
"source": row['source'],
},
geometry=geom
)
@app.get("/boundaries/countries/{country_code}/admin2/geojson")
async def get_country_admin2_geojson(
country_code: str,
admin1_code: Optional[str] = Query(None, description="Filter by admin1 code"),
simplify: float = Query(0.001, description="Geometry simplification tolerance (degrees)")
) -> GeoJSONFeatureCollection:
"""Get GeoJSON FeatureCollection of all admin2 boundaries for a country"""
pool = await get_pool()
async with pool.acquire() as conn:
if admin1_code:
rows = await conn.fetch("""
SELECT
ba2.id,
ba2.admin2_code as code,
ba2.admin2_name as name,
ba2.admin2_name_local as name_local,
ba1.admin1_code,
ba1.admin1_name,
ba2.area_km2,
bds.source_code as source,
ST_AsGeoJSON(ST_Simplify(ba2.geom, $3), 6)::json as geometry
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id
WHERE bc.iso_a2 = $1
AND ba1.admin1_code = $2
AND ba2.valid_to IS NULL
ORDER BY ba2.admin2_name
""", country_code.upper(), admin1_code, simplify)
else:
rows = await conn.fetch("""
SELECT
ba2.id,
ba2.admin2_code as code,
ba2.admin2_name as name,
ba2.admin2_name_local as name_local,
ba1.admin1_code,
ba1.admin1_name,
ba2.area_km2,
bds.source_code as source,
ST_AsGeoJSON(ST_Simplify(ba2.geom, $2), 6)::json as geometry
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
LEFT JOIN boundary_data_sources bds ON ba2.source_id = bds.id
WHERE bc.iso_a2 = $1 AND ba2.valid_to IS NULL
ORDER BY ba2.admin2_name
""", country_code.upper(), simplify)
features = []
for r in rows:
# asyncpg may return geometry as string or dict depending on version
geom = r['geometry']
if isinstance(geom, str):
geom = json.loads(geom)
features.append(GeoJSONFeature(
properties={
"id": r['id'],
"code": r['code'],
"name": r['name'],
"name_local": r['name_local'],
"admin1_code": r['admin1_code'],
"area_km2": float(r['area_km2']) if r['area_km2'] else None,
"source": r['source'],
},
geometry=geom
))
return GeoJSONFeatureCollection(features=features)
@app.get("/boundaries/stats")
async def get_boundary_stats() -> Dict[str, Any]:
"""Get statistics about loaded boundary data"""
pool = await get_pool()
async with pool.acquire() as conn:
stats = {}
# Countries
country_count = await conn.fetchval("""
SELECT COUNT(*) FROM boundary_countries WHERE valid_to IS NULL
""")
stats['countries'] = country_count
# Admin1 by country
admin1_stats = await conn.fetch("""
SELECT bc.iso_a2 as country_code, COUNT(*) as count
FROM boundary_admin1 ba1
JOIN boundary_countries bc ON ba1.country_id = bc.id
WHERE ba1.valid_to IS NULL
GROUP BY bc.iso_a2
ORDER BY bc.iso_a2
""")
stats['admin1_by_country'] = {
r['country_code'].strip(): r['count'] for r in admin1_stats
}
stats['admin1_total'] = sum(r['count'] for r in admin1_stats)
# Admin2 by country
admin2_stats = await conn.fetch("""
SELECT bc.iso_a2 as country_code, COUNT(*) as count
FROM boundary_admin2 ba2
JOIN boundary_admin1 ba1 ON ba2.admin1_id = ba1.id
JOIN boundary_countries bc ON ba1.country_id = bc.id
WHERE ba2.valid_to IS NULL
GROUP BY bc.iso_a2
ORDER BY bc.iso_a2
""")
stats['admin2_by_country'] = {
r['country_code'].strip(): r['count'] for r in admin2_stats
}
stats['admin2_total'] = sum(r['count'] for r in admin2_stats)
# Data sources
sources = await conn.fetch("""
SELECT source_code, source_name, coverage_scope
FROM boundary_data_sources
ORDER BY source_code
""")
stats['data_sources'] = [
{
"code": s['source_code'],
"name": s['source_name'],
"scope": s['coverage_scope'],
}
for s in sources
]
return stats
# ============================================================================
# Main Entry Point
# ============================================================================
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.api_host,
port=settings.api_port,
reload=True,
)