""" PostGIS Geo API for Heritage Custodian Map FastAPI backend providing spatial queries for bronhouder.nl map Mounted at /api/geo/ via Caddy reverse proxy. Endpoints: - GET / - Health check and geo statistics - GET /countries - Get all countries as GeoJSON with institution counts - GET /provinces - Get all provinces as GeoJSON - GET /municipalities - Get municipalities (with filters) - GET /institutions - Get institutions as GeoJSON (with bbox/type filters) - GET /institution/:ghcid - Get single institution details - GET /historical - Get historical boundaries - GET /search - Search institutions by name - GET /admin/point - Find admin unit for a point - GET /nearby - Find institutions near a point - GET /stats/by-type - Institution counts by type - GET /stats/by-province - Institution counts by province Person Endpoints (Beschermers): - GET /persons - List persons with filters (custodian, type, country) - GET /persons/count - Get total person count for stats - GET /persons/search - Search persons by name/headline/custodian - GET /person/:staff_id - Get single person details """ import os import json from datetime import datetime from typing import Optional, List, Dict, Any from contextlib import asynccontextmanager from decimal import Decimal from fastapi import FastAPI, HTTPException, Query, APIRouter from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.gzip import GZipMiddleware from fastapi.responses import JSONResponse, Response from pydantic import BaseModel, Field import asyncpg import httpx import hashlib from urllib.parse import urlparse # ============================================================================ # Configuration # ============================================================================ class GeoSettings(BaseModel): """PostGIS geo database settings - connects to glam_geo with PostGIS boundaries""" host: str = os.getenv("GEO_POSTGRES_HOST", "localhost") port: int = int(os.getenv("GEO_POSTGRES_PORT", "5432")) database: str = os.getenv("GEO_POSTGRES_DB", "glam_geo") # glam_geo has boundary data user: str = os.getenv("GEO_POSTGRES_USER", "glam_api") password: str = os.getenv("GEO_POSTGRES_PASSWORD", "") # Server settings api_host: str = os.getenv("GEO_API_HOST", "0.0.0.0") api_port: int = int(os.getenv("GEO_API_PORT", "8002")) settings = GeoSettings() # ============================================================================ # Pydantic Models # ============================================================================ class GeoStatsResponse(BaseModel): """Geo database statistics""" status: str database: str provinces: int municipalities: int institutions: int historical_boundaries: int postgres_version: str class InstitutionDetail(BaseModel): """Detailed institution information""" ghcid: str name: str type: str type_name: Optional[str] lat: Optional[float] lon: Optional[float] address: Optional[str] city: Optional[str] province: Optional[str] website: Optional[str] phone: Optional[str] wikidata_id: Optional[str] rating: Optional[float] total_ratings: Optional[int] description: Optional[str] reviews: Optional[List[Dict]] genealogiewerkbalk: Optional[Dict] class AdminPoint(BaseModel): """Admin unit for a point""" province_code: Optional[str] province_name: Optional[str] municipality_code: Optional[str] municipality_name: Optional[str] class NearbyInstitution(BaseModel): """Institution with distance""" ghcid: str name: str type: str type_name: Optional[str] distance_km: float city: Optional[str] province: Optional[str] rating: Optional[float] class PersonSummary(BaseModel): """Summary person information for list views""" staff_id: str name: str headline: Optional[str] location: Optional[str] country_code: Optional[str] custodian_slug: Optional[str] custodian_name: Optional[str] linkedin_url: Optional[str] profile_image_url: Optional[str] heritage_relevant: bool = True heritage_types: List[str] = [] class PersonDetail(BaseModel): """Detailed person information""" staff_id: str name: str headline: Optional[str] location: Optional[str] country_code: Optional[str] custodian_slug: Optional[str] custodian_name: Optional[str] linkedin_url: Optional[str] profile_image_url: Optional[str] heritage_relevant: bool = True heritage_types: List[str] = [] experience: List[Dict] = [] education: List[Dict] = [] skills: List[str] = [] languages: List[Dict] = [] about: Optional[str] connections: Optional[str] extraction_date: Optional[str] extraction_method: Optional[str] source_file: Optional[str] # ============================================================================ # Heritage Classification (copied from main.py for experience item classification) # ============================================================================ import re # Heritage type detection keywords for GLAMORCUBESFIXPHDNT taxonomy HERITAGE_KEYWORDS = { 'G': ['gallery', 'galerie', 'kunsthal', 'art dealer', 'art gallery', 'exhibition space'], 'L': ['library', 'bibliotheek', 'bibliothek', 'librarian', 'bibliothecaris', 'KB ', 'national library'], 'A': ['archive', 'archief', 'archivist', 'archivaris', 'archival', 'beeld en geluid', 'beeld & geluid', 'NISV', 'filmmuseum', 'eye film', 'EYE ', 'audiovisual', 'nationaal archief', 'stadsarchief', 'gemeentearchief', 'rijksarchief', 'NIOD', 'IISH', 'IISG', 'archiefspecialist'], 'M': ['museum', 'musea', 'curator', 'conservator', 'collection manager', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis', 'tropenmuseum', 'allard pierson', 'museale', 'collectiebeheerder', 'collectiespecialist', 'collectie'], 'O': ['ministry', 'ministerie', 'government', 'overheid', 'gemeente', 'province', 'provincie', 'OCW'], 'R': ['research', 'onderzoek', 'researcher', 'onderzoeker', 'KNAW', 'humanities cluster', 'NWO', 'documentatie', 'documentation', 'kenniscentrum', 'historicus'], 'C': ['corporate archive', 'bedrijfsarchief', 'company history'], 'E': ['university', 'universiteit', 'professor', 'lecturer', 'docent', 'hogeschool', 'academy', 'academie', 'PhD', 'phd candidate', 'student', 'teacher', 'onderwijs', 'education', 'UvA', 'VU ', 'leiden university', 'reinwardt', 'film academy', 'graduate', 'assistant professor', 'associate professor', 'hoogleraar', 'educatie', 'educator'], 'S': ['society', 'vereniging', 'genootschap', 'historical society', 'historische vereniging'], 'D': ['digital', 'digitaal', 'platform', 'software', 'IT ', 'tech', 'developer', 'engineer', 'data ', 'AI ', 'machine learning', 'digitalisering', 'datamanagement', 'data analist'], } NON_HERITAGE_KEYWORDS = [ 'marketing', 'sales', 'HR ', 'human resources', 'recruiter', 'finance', 'accounting', 'legal', 'lawyer', 'advocaat', 'consultant', 'coach', 'therapy', 'health', 'medical', 'food', 'restaurant', 'retail', 'fashion', 'real estate', 'insurance', 'banking', 'investment', 'e-commerce', 'organiser', 'opruimhulp', 'verpleeg', 'nurse' ] # Organizations that are explicitly NOT heritage institutions NON_HERITAGE_ORGANIZATIONS = [ # Banks & Financial 'ing ', 'ing nederland', 'rabobank', 'abn amro', 'postbank', 'triodos', # Security companies 'i-sec', 'g4s', 'securitas', 'trigion', 'chubb', # Police/Government (non-cultural) 'politie', 'police', 'rijkswaterstaat', 'belastingdienst', 'douane', 'defensie', # Political parties 'vvd', 'pvda', 'cda', 'd66', 'groenlinks', 'pvv', 'bbb', 'nsc', 'volt', 'sp ', 'forum voor democratie', 'ja21', 'bij1', 'denk', 'sgp', 'cu ', # Tech companies (non-heritage) 'google', 'microsoft', 'amazon', 'meta', 'facebook', 'apple', 'netflix', 'uber', 'airbnb', 'booking.com', 'adyen', 'mollie', 'messagebird', 'coolblue', 'bol.com', 'picnic', 'takeaway', 'just eat', # Telecom 'kpn', 'vodafone', 't-mobile', 'ziggo', # Postal / Logistics 'postnl', 'postkantoren', 'dhl', 'ups', 'fedex', # Healthcare 'ziekenhuis', 'hospital', 'ggz', 'ggd', 'thuiszorg', # Retail 'albert heijn', 'jumbo', 'lidl', 'aldi', 'ikea', 'hema', 'action', # Consulting / Professional services 'deloitte', 'kpmg', 'pwc', 'ey ', 'ernst & young', 'mckinsey', 'bcg', 'accenture', 'capgemini', 'ordina', 'atos', 'cgi ', # Recruitment / HR 'randstad', 'tempo-team', 'manpower', 'hays', 'brunel', # Energy / Utilities 'shell', 'bp ', 'eneco', 'vattenfall', 'essent', 'nuon', # Transport 'ns ', 'prorail', 'schiphol', 'klm', 'transavia', # Other 'freelance', 'zelfstandig', 'zzp', 'eigen bedrijf', ] # Heritage organization keywords - organizations that ARE heritage institutions HERITAGE_ORGANIZATION_KEYWORDS = [ # Archives 'archief', 'archive', 'nationaal archief', 'stadsarchief', 'regionaal archief', 'beeld en geluid', 'beeld & geluid', 'niod', 'iish', 'iisg', # Museums 'museum', 'musea', 'rijksmuseum', 'van gogh', 'stedelijk', 'mauritshuis', 'tropenmuseum', 'allard pierson', 'kröller', 'boijmans', # Libraries 'bibliotheek', 'library', 'koninklijke bibliotheek', 'kb ', # Film/AV heritage 'eye film', 'filmmuseum', 'eye ', 'sound and vision', # Heritage platforms 'erfgoed', 'heritage', 'cultural', 'cultureel', # Research institutes (heritage-focused) 'knaw', 'humanities cluster', 'meertens', 'huygens', ] def detect_heritage_type(role: Optional[str], company: Optional[str]) -> tuple: """ Detect if a position is heritage-relevant and what type. Two-stage classification: 1. Check if organization is explicitly non-heritage (blocklist) 2. Check if role/organization matches heritage patterns For 'D' (Digital) type, require BOTH a tech role AND a heritage organization. This prevents generic IT workers at banks/police from being classified as heritage. Args: role: Job title/role text company: Company/organization name Returns: Tuple of (heritage_relevant: bool, heritage_type: Optional[str]) """ # Combine role and company for full context role_text = role or '' company_text = company or '' combined = f"{role_text} {company_text}".lower() if not combined.strip(): return (False, None) # Stage 1: Check for non-heritage organizations (blocklist) # Use word boundary matching to avoid false positives like "sharing" matching "ing " for org in NON_HERITAGE_ORGANIZATIONS: org_pattern = org.lower().strip() # Use word boundary regex for patterns that could have false positives if re.search(r'\b' + re.escape(org_pattern) + r'\b', combined): return (False, None) # Stage 2: Check for non-heritage role indicators for keyword in NON_HERITAGE_KEYWORDS: keyword_pattern = keyword.lower().strip() if re.search(r'\b' + re.escape(keyword_pattern) + r'\b', combined): return (False, None) # Stage 3: Check if this is a heritage organization is_heritage_org = False for org_keyword in HERITAGE_ORGANIZATION_KEYWORDS: if org_keyword.lower() in combined: is_heritage_org = True break # Check heritage keywords by type (order matters - more specific first) # 'D' (Digital) is checked last and requires heritage org validation type_order = ['A', 'M', 'L', 'G', 'S', 'C', 'O', 'R', 'E'] # D removed from main loop for heritage_type in type_order: keywords = HERITAGE_KEYWORDS.get(heritage_type, []) for keyword in keywords: if keyword.lower() in combined: return (True, heritage_type) # Special handling for 'D' (Digital) - ONLY if at a heritage organization if is_heritage_org: digital_keywords = HERITAGE_KEYWORDS.get('D', []) for keyword in digital_keywords: if keyword.lower() in combined: return (True, 'D') # Generic heritage terms (without specific type) generic = ['heritage', 'erfgoed', 'culture', 'cultuur', 'cultural', 'film', 'cinema', 'media', 'arts', 'kunst', 'creative', 'preservation', 'conservation', 'collection'] for keyword in generic: if keyword in combined: return (True, None) return (False, None) def enrich_experience_with_heritage(experience: List) -> List[Dict]: """ Add heritage_relevant and heritage_type fields to each experience item. Handles both dict and JSON string inputs (asyncpg returns jsonb array elements as strings that need parsing). Args: experience: List of experience items (dicts or JSON strings) Returns: Same list with heritage_relevant and heritage_type added to each item """ if not experience: return [] enriched = [] for exp in experience: # Handle case where exp is a JSON string instead of dict # (asyncpg returns jsonb array elements as strings) if isinstance(exp, str): try: exp = json.loads(exp) except json.JSONDecodeError: continue # Skip if still not a dict if not isinstance(exp, dict): continue # Get role and company for classification role = exp.get('title') or exp.get('role') or '' company = exp.get('company') or exp.get('organization') or '' # Detect heritage relevance heritage_relevant, heritage_type = detect_heritage_type(role, company) # Create new dict with heritage fields added enriched_exp = {**exp} enriched_exp['heritage_relevant'] = heritage_relevant enriched_exp['heritage_type'] = heritage_type enriched.append(enriched_exp) return enriched def parse_jsonb_list(data) -> List: """ Parse a jsonb list field from PostgreSQL. asyncpg returns jsonb columns in various forms: - Sometimes as a proper Python list with dict elements - Sometimes as a JSON string that needs parsing - Sometimes as a list where each element is a JSON string - Sometimes as a list where each element is a Python repr string (single quotes) This function handles all these cases. Args: data: Either a list, a JSON string representing a list, or None Returns: Parsed list with all elements as proper Python objects (empty list if None or invalid) """ import ast if data is None: return [] result = [] # If it's a string, try to parse the whole thing as JSON first if isinstance(data, str): try: data = json.loads(data) except json.JSONDecodeError: return [] # Now data should be a list if not isinstance(data, list): return [] # Parse each element if it's a string for item in data: if isinstance(item, str): # Try JSON first (double quotes) try: parsed_item = json.loads(item) result.append(parsed_item) continue except json.JSONDecodeError: pass # Try Python literal (single quotes) - handles malformed data try: parsed_item = ast.literal_eval(item) result.append(parsed_item) continue except (ValueError, SyntaxError): pass # Keep as string if neither works (e.g., plain skill strings) result.append(item) else: result.append(item) return result # ============================================================================ # Global State # ============================================================================ _pool: Optional[asyncpg.Pool] = None _start_time: datetime = datetime.now() async def get_pool() -> asyncpg.Pool: """Get or create connection pool""" global _pool if _pool is None: _pool = await asyncpg.create_pool( host=settings.host, port=settings.port, database=settings.database, user=settings.user, password=settings.password, min_size=2, max_size=10, ) return _pool # ============================================================================ # FastAPI App # ============================================================================ @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan handler""" await get_pool() yield global _pool if _pool: await _pool.close() _pool = None app = FastAPI( title="PostGIS Geo API", description="Spatial REST API for heritage institution map", version="1.0.0", lifespan=lifespan, ) # CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # GZip compression middleware - compresses responses >1KB # Reduces ~126MB JSON payload to ~20-30MB (70-80% reduction) app.add_middleware(GZipMiddleware, minimum_size=1000) # ============================================================================ # Helper Functions # ============================================================================ def serialize_value(val: Any) -> Any: """Convert PostgreSQL values to JSON-serializable format""" if val is None: return None elif isinstance(val, datetime): return val.isoformat() elif isinstance(val, Decimal): return float(val) elif isinstance(val, (dict, list)): return val elif isinstance(val, bytes): return val.decode('utf-8', errors='replace') else: return val def row_to_dict(row: asyncpg.Record) -> Dict[str, Any]: """Convert asyncpg row to dict with serialization""" return {key: serialize_value(row[key]) for key in row.keys()} # ============================================================================ # API Endpoints # ============================================================================ @app.get("/", response_model=GeoStatsResponse) async def get_geo_status() -> GeoStatsResponse: """Get geo database status and statistics""" pool = await get_pool() async with pool.acquire() as conn: version = await conn.fetchval("SELECT version()") provinces = await conn.fetchval("SELECT COUNT(*) FROM provinces") municipalities = await conn.fetchval("SELECT COUNT(*) FROM municipalities") institutions = await conn.fetchval("SELECT COUNT(*) FROM institutions") historical = await conn.fetchval("SELECT COUNT(*) FROM historical_boundaries") return GeoStatsResponse( status="healthy", database=settings.database, provinces=provinces or 0, municipalities=municipalities or 0, institutions=institutions or 0, historical_boundaries=historical or 0, postgres_version=version.split(',')[0] if version else "unknown", ) @app.get("/provinces") async def get_provinces( simplified: bool = Query(True, description="Return simplified geometries") ): """Get all provinces as GeoJSON FeatureCollection""" pool = await get_pool() tolerance = 0.001 if simplified else 0 async with pool.acquire() as conn: rows = await conn.fetch(f""" SELECT id, province_code, iso_code, name, ST_AsGeoJSON( {'ST_Simplify(geom, ' + str(tolerance) + ')' if simplified else 'geom'} )::json as geometry, ST_X(centroid) as centroid_lon, ST_Y(centroid) as centroid_lat, area_km2 FROM provinces ORDER BY name """) features = [] for row in rows: features.append({ "type": "Feature", "id": row['province_code'], "geometry": row['geometry'], "properties": { "id": row['id'], "province_code": row['province_code'], "iso_code": row['iso_code'], "name": row['name'], "centroid_lon": float(row['centroid_lon']) if row['centroid_lon'] else None, "centroid_lat": float(row['centroid_lat']) if row['centroid_lat'] else None, "area_km2": float(row['area_km2']) if row['area_km2'] else None, } }) return { "type": "FeatureCollection", "features": features } @app.get("/countries") async def get_countries( simplified: bool = Query(True, description="Return simplified geometries"), with_counts: bool = Query(False, description="Include institution counts per country"), ): """Get all countries as GeoJSON FeatureCollection with optional institution counts""" pool = await get_pool() # Use more aggressive simplification for countries (world view) tolerance = 0.01 if simplified else 0 async with pool.acquire() as conn: if with_counts: # Join with custodians to get counts per country rows = await conn.fetch(f""" SELECT bc.id, bc.iso_a2 as country_code, bc.iso_a3, bc.country_name as name, ST_AsGeoJSON( {'ST_Simplify(bc.geom, ' + str(tolerance) + ')' if simplified else 'bc.geom'} ) as geometry, ST_X(bc.centroid) as centroid_lon, ST_Y(bc.centroid) as centroid_lat, bc.area_km2, COALESCE(counts.institution_count, 0) as institution_count FROM boundary_countries bc LEFT JOIN ( SELECT country_code, COUNT(*) as institution_count FROM custodians WHERE country_code IS NOT NULL GROUP BY country_code ) counts ON bc.iso_a2 = counts.country_code WHERE bc.geom IS NOT NULL ORDER BY bc.country_name """) else: rows = await conn.fetch(f""" SELECT id, iso_a2 as country_code, iso_a3, country_name as name, ST_AsGeoJSON( {'ST_Simplify(geom, ' + str(tolerance) + ')' if simplified else 'geom'} ) as geometry, ST_X(centroid) as centroid_lon, ST_Y(centroid) as centroid_lat, area_km2 FROM boundary_countries WHERE geom IS NOT NULL ORDER BY country_name """) features = [] total_institutions = 0 countries_with_data = 0 for row in rows: # Parse geometry from string to dict (ST_AsGeoJSON returns text) geometry = row['geometry'] if geometry is None: # Skip countries with no geometry (e.g., Vatican City) continue if isinstance(geometry, str): geometry = json.loads(geometry) # Ensure geometry has required structure if not isinstance(geometry, dict) or 'type' not in geometry or 'coordinates' not in geometry: continue iso_a2 = row['country_code'].strip() if row['country_code'] else None iso_a3 = row['iso_a3'].strip() if row['iso_a3'] else None institution_count = row['institution_count'] if with_counts else 0 # Track totals if with_counts: total_institutions += institution_count if institution_count > 0: countries_with_data += 1 # Build properties with frontend-expected field names properties = { "id": row['id'], "iso_a2": iso_a2, # Frontend expects iso_a2 "iso_a3": iso_a3, "name": row['name'], "institution_count": institution_count, "centroid": [ float(row['centroid_lon']) if row['centroid_lon'] else None, float(row['centroid_lat']) if row['centroid_lat'] else None, ], "area_km2": float(row['area_km2']) if row['area_km2'] else None, } features.append({ "type": "Feature", "id": iso_a2, "geometry": geometry, "properties": properties }) return { "type": "FeatureCollection", "features": features, "metadata": { "count": len(features), "total_institutions": total_institutions, "countries_with_data": countries_with_data, "type_filter": None, "simplified": simplified, } } @app.get("/municipalities") async def get_municipalities( province: Optional[str] = Query(None, description="Filter by province ISO code (e.g., NH)"), simplified: bool = Query(True, description="Return simplified geometries"), limit: int = Query(500, ge=1, le=1000, description="Maximum results") ): """Get municipalities as GeoJSON FeatureCollection""" pool = await get_pool() tolerance = 0.0005 if simplified else 0 query = f""" SELECT m.id, m.municipality_code, m.name, p.iso_code as province_iso, p.name as province_name, ST_AsGeoJSON( {'ST_Simplify(m.geom, ' + str(tolerance) + ')' if simplified else 'm.geom'} )::json as geometry, ST_X(m.centroid) as centroid_lon, ST_Y(m.centroid) as centroid_lat, m.area_km2 FROM municipalities m LEFT JOIN provinces p ON m.province_id = p.id {'WHERE p.iso_code = $1' if province else ''} ORDER BY m.name LIMIT {'$2' if province else '$1'} """ async with pool.acquire() as conn: if province: rows = await conn.fetch(query, province.upper(), limit) else: rows = await conn.fetch(query, limit) features = [] for row in rows: features.append({ "type": "Feature", "id": row['municipality_code'], "geometry": row['geometry'], "properties": { "id": row['id'], "code": row['municipality_code'], "name": row['name'], "province_iso": row['province_iso'], "province_name": row['province_name'], "centroid_lon": float(row['centroid_lon']) if row['centroid_lon'] else None, "centroid_lat": float(row['centroid_lat']) if row['centroid_lat'] else None, "area_km2": float(row['area_km2']) if row['area_km2'] else None, } }) return { "type": "FeatureCollection", "features": features } @app.get("/institutions") async def get_institutions( bbox: Optional[str] = Query(None, description="Bounding box: minLon,minLat,maxLon,maxLat"), province: Optional[str] = Query(None, description="Filter by province ISO code (e.g., NH, ZH)"), country: Optional[str] = Query(None, description="Filter by country code (e.g., NL, DE, JP)"), type: Optional[str] = Query(None, description="Filter by institution type (G,L,A,M,O,R,C,U,B,E,S,F,I,X,P,H,D,N,T)"), limit: int = Query(50000, ge=1, le=100000, description="Maximum results") ): """Get institutions as GeoJSON FeatureCollection with full metadata from custodians table""" pool = await get_pool() # Build WHERE clauses - query custodians table directly conditions = ["lat IS NOT NULL AND lon IS NOT NULL"] params = [] param_count = 0 if bbox: try: min_lon, min_lat, max_lon, max_lat = map(float, bbox.split(',')) param_count += 4 conditions.append(f""" lon >= ${param_count-3} AND lat >= ${param_count-2} AND lon <= ${param_count-1} AND lat <= ${param_count} """) params.extend([min_lon, min_lat, max_lon, max_lat]) except ValueError: raise HTTPException(status_code=400, detail="Invalid bbox format. Use: minLon,minLat,maxLon,maxLat") if province: param_count += 1 conditions.append(f"region_code = ${param_count}") params.append(province.upper()) if type: param_count += 1 conditions.append(f"type = ${param_count}") params.append(type.upper()) if country: param_count += 1 conditions.append(f"country_code = ${param_count}") params.append(country.upper()) param_count += 1 where_clause = " AND ".join(conditions) # Query custodians table with all rich metadata fields query = f""" SELECT ghcid, name, emic_name, type, type_name, lon, lat, city, region as province, region_code as province_iso, country_code, formatted_address, street_address, postal_code, rating, total_ratings, wikidata_id, website, phone, email, isil_code, google_place_id, description, opening_hours, reviews, photos, photo_urls, business_status, street_view_url, founding_year, dissolution_year, temporal_extent, museum_register, youtube_channel_url, youtube_subscriber_count, youtube_video_count, youtube_enrichment, social_facebook, social_twitter, social_instagram, wikidata_label_en, wikidata_description_en, logo_url, web_claims FROM custodians WHERE {where_clause} ORDER BY name LIMIT ${param_count} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(query, *params) features = [] for row in rows: # Build properties with all available metadata props = { "ghcid": row['ghcid'], "name": row['name'], "emic_name": row['emic_name'], "type": row['type'], "type_name": row['type_name'], "city": row['city'], "province": row['province'], "province_iso": row['province_iso'], "country_code": row['country_code'], "formatted_address": row['formatted_address'], "rating": float(row['rating']) if row['rating'] else None, "total_ratings": row['total_ratings'], "wikidata_id": row['wikidata_id'], "website": row['website'], "phone": row['phone'], "email": row['email'], "isil_code": row['isil_code'], "google_place_id": row['google_place_id'], "description": row['description'], "business_status": row['business_status'], "street_view_url": row['street_view_url'], "founding_year": row['founding_year'], "dissolution_year": row['dissolution_year'], } # Add JSONB fields (handle potential None values) if row['opening_hours']: props["opening_hours"] = row['opening_hours'] if row['reviews']: props["reviews"] = row['reviews'] if row['photos']: props["photos"] = row['photos'] if row['photo_urls']: props["photo_urls"] = row['photo_urls'] if row['temporal_extent']: props["temporal_extent"] = row['temporal_extent'] if row['museum_register']: props["museum_register"] = row['museum_register'] if row['youtube_enrichment']: props["youtube_enrichment"] = row['youtube_enrichment'] elif row['youtube_channel_url']: # Build minimal YouTube data if enrichment not present props["youtube"] = { "channel_url": row['youtube_channel_url'], "subscriber_count": row['youtube_subscriber_count'], "video_count": row['youtube_video_count'], } # Social media social = {} if row['social_facebook']: social['facebook'] = row['social_facebook'] if row['social_twitter']: social['twitter'] = row['social_twitter'] if row['social_instagram']: social['instagram'] = row['social_instagram'] if social: props["social_media"] = social # Wikidata labels if row['wikidata_label_en']: props["wikidata_label"] = row['wikidata_label_en'] if row['wikidata_description_en']: props["wikidata_description"] = row['wikidata_description_en'] # Logo URL from web claims extraction if row['logo_url']: props["logo_url"] = row['logo_url'] # Web claims (financial documents, etc.) if row['web_claims']: props["web_claims"] = row['web_claims'] features.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(row['lon']), float(row['lat'])] }, "properties": props }) return { "type": "FeatureCollection", "features": features, "metadata": { "count": len(features), "limit": limit, "filters": { "bbox": bbox, "province": province, "type": type } } } @app.get("/institution/{ghcid}") async def get_institution(ghcid: str): """Get detailed information for a single institution with full metadata""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT ghcid, name, emic_name, verified_name, type, type_name, lon, lat, city, region as province, region_code as province_iso, country_code, formatted_address, street_address, postal_code, website, phone, email, wikidata_id, isil_code, google_place_id, rating, total_ratings, description, business_status, street_view_url, google_maps_url, opening_hours, reviews, photos, photo_urls, founding_year, founding_date, dissolution_year, dissolution_date, temporal_extent, museum_register, youtube_channel_id, youtube_channel_url, youtube_subscriber_count, youtube_video_count, youtube_view_count, youtube_enrichment, social_facebook, social_twitter, social_instagram, social_linkedin, social_youtube, logo_url, wikidata_label_nl, wikidata_label_en, wikidata_description_nl, wikidata_description_en, wikidata_types, wikidata_inception, wikidata_enrichment, genealogiewerkbalk, nan_isil_enrichment, kb_enrichment, zcbs_enrichment, web_claims, ghcid_uuid, ghcid_numeric, identifiers, data_source, data_tier, provenance FROM custodians WHERE ghcid = $1 """, ghcid) if not row: raise HTTPException(status_code=404, detail=f"Institution '{ghcid}' not found") # Build comprehensive response with all metadata result = { "ghcid": row['ghcid'], "name": row['name'], "emic_name": row['emic_name'], "verified_name": row['verified_name'], "type": row['type'], "type_name": row['type_name'], "lat": float(row['lat']) if row['lat'] else None, "lon": float(row['lon']) if row['lon'] else None, "city": row['city'], "province": row['province'], "province_iso": row['province_iso'], "country_code": row['country_code'], "formatted_address": row['formatted_address'], "street_address": row['street_address'], "postal_code": row['postal_code'], "website": row['website'], "phone": row['phone'], "email": row['email'], "wikidata_id": row['wikidata_id'], "isil_code": row['isil_code'], "google_place_id": row['google_place_id'], "rating": float(row['rating']) if row['rating'] else None, "total_ratings": row['total_ratings'], "description": row['description'], "business_status": row['business_status'], "street_view_url": row['street_view_url'], "google_maps_url": row['google_maps_url'], } # JSONB fields - only include if present if row['opening_hours']: result["opening_hours"] = row['opening_hours'] if row['reviews']: result["reviews"] = row['reviews'] if row['photos']: result["photos"] = row['photos'] if row['photo_urls']: result["photo_urls"] = row['photo_urls'] if row['identifiers']: result["identifiers"] = row['identifiers'] # Temporal data temporal = {} if row['founding_year']: temporal["founding_year"] = row['founding_year'] if row['founding_date']: temporal["founding_date"] = row['founding_date'].isoformat() if row['founding_date'] else None if row['dissolution_year']: temporal["dissolution_year"] = row['dissolution_year'] if row['dissolution_date']: temporal["dissolution_date"] = row['dissolution_date'].isoformat() if row['dissolution_date'] else None if row['temporal_extent']: temporal["extent"] = row['temporal_extent'] if temporal: result["temporal"] = temporal # Museum register if row['museum_register']: result["museum_register"] = row['museum_register'] # YouTube enrichment youtube = {} if row['youtube_channel_id']: youtube["channel_id"] = row['youtube_channel_id'] if row['youtube_channel_url']: youtube["channel_url"] = row['youtube_channel_url'] if row['youtube_subscriber_count']: youtube["subscriber_count"] = row['youtube_subscriber_count'] if row['youtube_video_count']: youtube["video_count"] = row['youtube_video_count'] if row['youtube_view_count']: youtube["view_count"] = row['youtube_view_count'] if row['youtube_enrichment']: youtube["enrichment"] = row['youtube_enrichment'] if youtube: result["youtube"] = youtube # Social media social = {} if row['social_facebook']: social["facebook"] = row['social_facebook'] if row['social_twitter']: social["twitter"] = row['social_twitter'] if row['social_instagram']: social["instagram"] = row['social_instagram'] if row['social_linkedin']: social["linkedin"] = row['social_linkedin'] if row['social_youtube']: social["youtube"] = row['social_youtube'] if social: result["social_media"] = social # Wikidata wikidata = {} if row['wikidata_label_nl']: wikidata["label_nl"] = row['wikidata_label_nl'] if row['wikidata_label_en']: wikidata["label_en"] = row['wikidata_label_en'] if row['wikidata_description_nl']: wikidata["description_nl"] = row['wikidata_description_nl'] if row['wikidata_description_en']: wikidata["description_en"] = row['wikidata_description_en'] if row['wikidata_types']: wikidata["types"] = row['wikidata_types'] if row['wikidata_inception']: wikidata["inception"] = row['wikidata_inception'] if row['wikidata_enrichment']: wikidata["enrichment"] = row['wikidata_enrichment'] if wikidata: result["wikidata"] = wikidata # Logo if row['logo_url']: result["logo_url"] = row['logo_url'] # Other enrichment data if row['genealogiewerkbalk']: result["genealogiewerkbalk"] = row['genealogiewerkbalk'] if row['nan_isil_enrichment']: result["nan_isil_enrichment"] = row['nan_isil_enrichment'] if row['kb_enrichment']: result["kb_enrichment"] = row['kb_enrichment'] if row['zcbs_enrichment']: result["zcbs_enrichment"] = row['zcbs_enrichment'] if row['web_claims']: result["web_claims"] = row['web_claims'] # GHCID details ghcid_data = {"current": row['ghcid']} if row['ghcid_uuid']: ghcid_data["uuid"] = str(row['ghcid_uuid']) if row['ghcid_numeric']: ghcid_data["numeric"] = int(row['ghcid_numeric']) result["ghcid_details"] = ghcid_data # Provenance if row['data_source'] or row['data_tier'] or row['provenance']: result["provenance"] = { "data_source": row['data_source'], "data_tier": row['data_tier'], "details": row['provenance'], } return result @app.get("/search") async def search_institutions( q: str = Query(..., min_length=2, description="Search query (name, GHCID, or description)"), type: Optional[str] = Query(None, description="Filter by institution type"), include_persons: bool = Query(False, description="Also search for associated persons"), limit: int = Query(50, ge=1, le=200, description="Maximum results") ): """ Search institutions by name, GHCID, or description. Supports: - Full-text search on name and description - Exact GHCID matching (e.g., NL-OV-ZWO-D-WIWO) - Partial GHCID matching (e.g., NL-OV or WIWO) - Optionally includes persons associated with matching custodians """ pool = await get_pool() q_upper = q.upper() q_pattern = f"%{q}%" # Check if query looks like a GHCID pattern (contains hyphen and letter-hyphen pattern) # Accept both uppercase and lowercase input (e.g., "nl-nh-ams" or "NL-NH-AMS") is_ghcid_query = '-' in q and any(c.isalpha() for c in q) async with pool.acquire() as conn: rows = [] # Priority 1: Exact GHCID match if is_ghcid_query: exact_query = """ SELECT c.ghcid as ghcid, c.name, c.type, c.type_name, c.lon, c.lat, c.city, c.region_code as province_iso, c.rating, 1000 as rank FROM custodians c WHERE c.ghcid = $1 """ rows = await conn.fetch(exact_query, q_upper) # Priority 2: Partial GHCID match (if no exact match) if not rows and is_ghcid_query: partial_ghcid_query = """ SELECT c.ghcid as ghcid, c.name, c.type, c.type_name, c.lon, c.lat, c.city, c.region_code as province_iso, c.rating, 500 as rank FROM custodians c WHERE c.ghcid ILIKE $1 """ params = [f"%{q_upper}%"] if type: partial_ghcid_query += " AND c.type = $2" params.append(type.upper()) partial_ghcid_query += f" ORDER BY c.ghcid LIMIT ${len(params) + 1}" params.append(limit) rows = await conn.fetch(partial_ghcid_query, *params) # Priority 3: Name/description text search if not rows: # Try ILIKE search on name, description, city, and emic_name text_query = """ SELECT c.ghcid as ghcid, c.name, c.type, c.type_name, c.lon, c.lat, c.city, c.region_code as province_iso, c.rating, CASE WHEN c.name ILIKE $2 THEN 100 WHEN c.name ILIKE $1 THEN 50 WHEN c.emic_name ILIKE $1 THEN 40 WHEN c.city ILIKE $1 THEN 30 ELSE 10 END as rank FROM custodians c WHERE c.name ILIKE $1 OR c.emic_name ILIKE $1 OR c.description ILIKE $1 OR c.city ILIKE $1 """ params = [q_pattern, f"{q}%"] param_count = 2 if type: param_count += 1 text_query += f" AND c.type = ${param_count}" params.append(type.upper()) param_count += 1 text_query += f" ORDER BY rank DESC, c.name LIMIT ${param_count}" params.append(limit) rows = await conn.fetch(text_query, *params) # Build institution results with match type indicator def get_match_type(rank: int) -> str: if rank >= 1000: return "exact_ghcid" elif rank >= 500: return "partial_ghcid" else: return "text" institutions = [ { "ghcid": row['ghcid'], "name": row['name'], "type": row['type'], "type_name": row['type_name'], "lon": float(row['lon']) if row['lon'] else None, "lat": float(row['lat']) if row['lat'] else None, "city": row['city'], "province_iso": row['province_iso'], "rating": float(row['rating']) if row['rating'] else None, "match_type": get_match_type(row['rank']), } for row in rows ] # Optionally search for persons persons = [] if include_persons and institutions: # Get custodian slugs from results to find associated persons ghcids = [inst['ghcid'] for inst in institutions if inst['ghcid']] if ghcids: # Search persons by custodian association or by name person_query = """ SELECT DISTINCT ON (p.staff_id) p.staff_id, p.name, p.headline, p.location, p.country_code, p.custodian_slug, p.custodian_name, p.linkedin_url, p.profile_image_url, p.heritage_relevant, p.heritage_types FROM persons p WHERE p.name ILIKE $1 OR p.headline ILIKE $1 OR p.custodian_name ILIKE $1 ORDER BY p.staff_id, p.name LIMIT $2 """ person_rows = await conn.fetch(person_query, q_pattern, limit) persons = [ { "staff_id": row['staff_id'], "name": row['name'], "headline": row['headline'], "location": row['location'], "country_code": row['country_code'], "custodian_slug": row['custodian_slug'], "custodian_name": row['custodian_name'], "linkedin_url": row['linkedin_url'], "profile_image_url": row['profile_image_url'], "heritage_relevant": row['heritage_relevant'], "heritage_types": row['heritage_types'] or [], } for row in person_rows ] result = { "query": q, "count": len(institutions), "results": institutions, } if include_persons: result["persons"] = persons result["persons_count"] = len(persons) return result @app.get("/nearby", response_model=List[NearbyInstitution]) async def find_nearby( lon: float = Query(..., description="Longitude"), lat: float = Query(..., description="Latitude"), radius_km: float = Query(10, ge=0.1, le=100, description="Search radius in km"), type: Optional[str] = Query(None, description="Filter by institution type"), limit: int = Query(50, ge=1, le=200, description="Maximum results") ): """Find institutions near a point""" pool = await get_pool() # Use custodians table with lat/lon columns (no PostGIS geometry) query = """ SELECT c.ghcid as ghcid, c.name, c.type, c.type_name, ( 6371 * acos( cos(radians($2)) * cos(radians(c.lat)) * cos(radians(c.lon) - radians($1)) + sin(radians($2)) * sin(radians(c.lat)) ) ) as distance_km, c.city, c.region as province, c.rating FROM custodians c WHERE c.lat IS NOT NULL AND c.lon IS NOT NULL AND ( 6371 * acos( cos(radians($2)) * cos(radians(c.lat)) * cos(radians(c.lon) - radians($1)) + sin(radians($2)) * sin(radians(c.lat)) ) ) <= $3 """ params: list = [lon, lat, radius_km] param_count = 3 if type: param_count += 1 query += f" AND c.type = ${param_count}" params.append(type.upper()) param_count += 1 query += f" ORDER BY distance_km LIMIT ${param_count}" params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(query, *params) return [ NearbyInstitution( ghcid=row['ghcid'], name=row['name'], type=row['type'], type_name=row['type_name'], distance_km=round(float(row['distance_km']), 2), city=row['city'], province=row['province'], rating=float(row['rating']) if row['rating'] else None, ) for row in rows ] @app.get("/admin/point", response_model=AdminPoint) async def get_admin_for_point( lon: float = Query(..., description="Longitude"), lat: float = Query(..., description="Latitude") ): """Find which municipality/province contains a point""" pool = await get_pool() async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT p.province_code, p.name as province_name, m.municipality_code, m.name as municipality_name FROM municipalities m JOIN provinces p ON m.province_id = p.id WHERE ST_Contains(m.geom, ST_SetSRID(ST_Point($1, $2), 4326)) LIMIT 1 """, lon, lat) if not row: # Try province only async with pool.acquire() as conn: row = await conn.fetchrow(""" SELECT province_code, name as province_name, NULL as municipality_code, NULL as municipality_name FROM provinces WHERE ST_Contains(geom, ST_SetSRID(ST_Point($1, $2), 4326)) LIMIT 1 """, lon, lat) if not row: return AdminPoint( province_code=None, province_name=None, municipality_code=None, municipality_name=None ) return AdminPoint( province_code=row['province_code'], province_name=row['province_name'], municipality_code=row['municipality_code'], municipality_name=row['municipality_name'] ) @app.get("/historical") async def get_historical_boundaries( year: int = Query(1500, description="Reference year"), boundary_type: Optional[str] = Query(None, description="Boundary type filter"), simplified: bool = Query(True, description="Return simplified geometries"), limit: int = Query(1000, ge=1, le=10000, description="Maximum results") ): """Get historical boundaries as GeoJSON""" pool = await get_pool() tolerance = 0.001 if simplified else 0 conditions = ["reference_year = $1"] params = [year] param_count = 1 if boundary_type: param_count += 1 conditions.append(f"boundary_type = ${param_count}") params.append(boundary_type) param_count += 1 where_clause = " AND ".join(conditions) query = f""" SELECT id, boundary_code, name, boundary_type, reference_year, ST_AsGeoJSON( {'ST_Simplify(geom, ' + str(tolerance) + ')' if simplified else 'geom'} )::json as geometry, ST_X(centroid) as centroid_lon, ST_Y(centroid) as centroid_lat, area_km2 FROM historical_boundaries WHERE {where_clause} ORDER BY name LIMIT ${param_count} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(query, *params) features = [] for row in rows: if row['geometry']: features.append({ "type": "Feature", "id": row['boundary_code'], "geometry": row['geometry'], "properties": { "id": row['id'], "code": row['boundary_code'], "name": row['name'], "type": row['boundary_type'], "year": row['reference_year'], "centroid_lon": float(row['centroid_lon']) if row['centroid_lon'] else None, "centroid_lat": float(row['centroid_lat']) if row['centroid_lat'] else None, "area_km2": float(row['area_km2']) if row['area_km2'] else None, } }) return { "type": "FeatureCollection", "features": features, "metadata": { "year": year, "boundary_type": boundary_type, "count": len(features) } } @app.get("/stats/by-type") async def get_stats_by_type(): """Get institution counts by type""" pool = await get_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT institution_type as type, type_name, COUNT(*) as count, ROUND(AVG(rating)::numeric, 2) as avg_rating FROM institutions WHERE geom IS NOT NULL GROUP BY institution_type, type_name ORDER BY count DESC """) return { "stats": [ { "type": row['type'], "type_name": row['type_name'], "count": row['count'], "avg_rating": float(row['avg_rating']) if row['avg_rating'] else None } for row in rows ] } @app.get("/stats/by-province") async def get_stats_by_province(): """Get institution counts by province""" pool = await get_pool() async with pool.acquire() as conn: rows = await conn.fetch(""" SELECT p.iso_code, p.name as province_name, COUNT(i.id) as count, ROUND(AVG(i.rating)::numeric, 2) as avg_rating FROM provinces p LEFT JOIN institutions i ON i.province_id = p.id GROUP BY p.id, p.iso_code, p.name ORDER BY count DESC """) return { "stats": [ { "province_iso": row['iso_code'], "province_name": row['province_name'], "count": row['count'], "avg_rating": float(row['avg_rating']) if row['avg_rating'] else None } for row in rows ] } # ============================================================================ # Optimized Loading Endpoints (Pagination, Viewport, Lite) # ============================================================================ @app.get("/institutions/lite") async def get_institutions_lite( bbox: Optional[str] = Query(None, description="Bounding box: minLon,minLat,maxLon,maxLat"), country: Optional[str] = Query(None, description="Filter by country code (e.g., NL, DE, JP)"), type: Optional[str] = Query(None, description="Filter by institution type"), limit: int = Query(100000, ge=1, le=200000, description="Maximum results") ): """ Get lightweight institution data for map markers. Returns only essential fields (~5-10MB instead of ~126MB). Use /institution/{ghcid} for full details on click. """ pool = await get_pool() conditions = ["lat IS NOT NULL AND lon IS NOT NULL"] params = [] param_count = 0 if bbox: try: min_lon, min_lat, max_lon, max_lat = map(float, bbox.split(',')) param_count += 4 conditions.append(f""" lon >= ${param_count-3} AND lat >= ${param_count-2} AND lon <= ${param_count-1} AND lat <= ${param_count} """) params.extend([min_lon, min_lat, max_lon, max_lat]) except ValueError: raise HTTPException(status_code=400, detail="Invalid bbox format") if country: param_count += 1 conditions.append(f"country_code = ${param_count}") params.append(country.upper()) if type: param_count += 1 conditions.append(f"type = ${param_count}") params.append(type.upper()) param_count += 1 where_clause = " AND ".join(conditions) # Minimal fields for markers - dramatically reduces payload query = f""" SELECT ghcid, name, type, lon, lat, city, country_code, rating FROM custodians WHERE {where_clause} ORDER BY name LIMIT ${param_count} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(query, *params) features = [] for row in rows: features.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(row['lon']), float(row['lat'])] }, "properties": { "ghcid": row['ghcid'], "name": row['name'], "type": row['type'], "city": row['city'], "country_code": row['country_code'], "rating": float(row['rating']) if row['rating'] else None, } }) return { "type": "FeatureCollection", "features": features, "metadata": { "count": len(features), "mode": "lite", "filters": {"bbox": bbox, "country": country, "type": type} } } @app.get("/institutions/page") async def get_institutions_paginated( page: int = Query(1, ge=1, description="Page number (1-indexed)"), page_size: int = Query(1000, ge=100, le=5000, description="Items per page"), country: Optional[str] = Query(None, description="Filter by country code"), type: Optional[str] = Query(None, description="Filter by institution type"), sort_by: str = Query("name", description="Sort field: name, city, rating, type"), sort_order: str = Query("asc", description="Sort order: asc, desc"), ): """ Get paginated institutions with full metadata. ~1MB per page instead of ~126MB full download. Supports cursor-based iteration for large datasets. """ pool = await get_pool() # Validate sort parameters valid_sort_fields = {"name", "city", "rating", "type", "country_code"} if sort_by not in valid_sort_fields: sort_by = "name" sort_direction = "DESC" if sort_order.lower() == "desc" else "ASC" conditions = ["lat IS NOT NULL AND lon IS NOT NULL"] params = [] param_count = 0 if country: param_count += 1 conditions.append(f"country_code = ${param_count}") params.append(country.upper()) if type: param_count += 1 conditions.append(f"type = ${param_count}") params.append(type.upper()) where_clause = " AND ".join(conditions) # Get total count for pagination metadata count_query = f"SELECT COUNT(*) FROM custodians WHERE {where_clause}" # Calculate offset offset = (page - 1) * page_size param_count += 1 limit_param = param_count param_count += 1 offset_param = param_count # Full metadata query with pagination query = f""" SELECT ghcid, name, emic_name, type, type_name, lon, lat, city, region as province, region_code as province_iso, country_code, formatted_address, street_address, postal_code, rating, total_ratings, wikidata_id, website, phone, email, isil_code, google_place_id, description, opening_hours, reviews, photos, photo_urls, business_status, street_view_url, founding_year, dissolution_year, temporal_extent, museum_register, youtube_channel_url, youtube_subscriber_count, youtube_video_count, youtube_enrichment, social_facebook, social_twitter, social_instagram, wikidata_label_en, wikidata_description_en FROM custodians WHERE {where_clause} ORDER BY {sort_by} {sort_direction} LIMIT ${limit_param} OFFSET ${offset_param} """ params.extend([page_size, offset]) async with pool.acquire() as conn: total_count = await conn.fetchval(count_query, *params[:param_count-2] if params else []) rows = await conn.fetch(query, *params) total_pages = (total_count + page_size - 1) // page_size features = [] for row in rows: props = { "ghcid": row['ghcid'], "name": row['name'], "emic_name": row['emic_name'], "type": row['type'], "type_name": row['type_name'], "city": row['city'], "province": row['province'], "province_iso": row['province_iso'], "country_code": row['country_code'], "formatted_address": row['formatted_address'], "rating": float(row['rating']) if row['rating'] else None, "total_ratings": row['total_ratings'], "wikidata_id": row['wikidata_id'], "website": row['website'], "phone": row['phone'], "email": row['email'], "isil_code": row['isil_code'], "google_place_id": row['google_place_id'], "description": row['description'], "business_status": row['business_status'], "street_view_url": row['street_view_url'], "founding_year": row['founding_year'], "dissolution_year": row['dissolution_year'], } # Add JSONB fields if present if row['opening_hours']: props["opening_hours"] = row['opening_hours'] if row['reviews']: props["reviews"] = row['reviews'] if row['photos']: props["photos"] = row['photos'] if row['photo_urls']: props["photo_urls"] = row['photo_urls'] if row['temporal_extent']: props["temporal_extent"] = row['temporal_extent'] if row['museum_register']: props["museum_register"] = row['museum_register'] if row['youtube_enrichment']: props["youtube_enrichment"] = row['youtube_enrichment'] elif row['youtube_channel_url']: props["youtube"] = { "channel_url": row['youtube_channel_url'], "subscriber_count": row['youtube_subscriber_count'], "video_count": row['youtube_video_count'], } social = {} if row['social_facebook']: social['facebook'] = row['social_facebook'] if row['social_twitter']: social['twitter'] = row['social_twitter'] if row['social_instagram']: social['instagram'] = row['social_instagram'] if social: props["social_media"] = social if row['wikidata_label_en']: props["wikidata_label"] = row['wikidata_label_en'] if row['wikidata_description_en']: props["wikidata_description"] = row['wikidata_description_en'] features.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(row['lon']), float(row['lat'])] }, "properties": props }) return { "type": "FeatureCollection", "features": features, "pagination": { "page": page, "page_size": page_size, "total_count": total_count, "total_pages": total_pages, "has_next": page < total_pages, "has_prev": page > 1, "next_page": page + 1 if page < total_pages else None, "prev_page": page - 1 if page > 1 else None, }, "metadata": { "mode": "paginated", "filters": {"country": country, "type": type}, "sort": {"field": sort_by, "order": sort_order} } } @app.get("/institutions/viewport") async def get_institutions_viewport( bbox: str = Query(..., description="Bounding box: minLon,minLat,maxLon,maxLat (REQUIRED)"), zoom: int = Query(10, ge=1, le=20, description="Map zoom level (affects detail)"), country: Optional[str] = Query(None, description="Filter by country code"), type: Optional[str] = Query(None, description="Filter by institution type"), limit: int = Query(2000, ge=100, le=10000, description="Maximum results for viewport"), ): """ Get institutions visible in current map viewport. Returns lite data at low zoom, full data at high zoom. Optimized for map pan/zoom interactions. """ pool = await get_pool() # Parse bbox try: min_lon, min_lat, max_lon, max_lat = map(float, bbox.split(',')) except ValueError: raise HTTPException(status_code=400, detail="Invalid bbox format. Use: minLon,minLat,maxLon,maxLat") conditions = [ "lat IS NOT NULL AND lon IS NOT NULL", f"lon >= $1 AND lat >= $2 AND lon <= $3 AND lat <= $4" ] params = [min_lon, min_lat, max_lon, max_lat] param_count = 4 if country: param_count += 1 conditions.append(f"country_code = ${param_count}") params.append(country.upper()) if type: param_count += 1 conditions.append(f"type = ${param_count}") params.append(type.upper()) where_clause = " AND ".join(conditions) # Adaptive detail based on zoom level # Low zoom (world/continent view): minimal fields # Medium zoom (country view): basic fields # High zoom (city view): full fields if zoom <= 6: # World/continent view - just markers select_fields = "ghcid, name, type, lon, lat, country_code" detail_level = "minimal" elif zoom <= 10: # Country view - basic info select_fields = "ghcid, name, type, type_name, lon, lat, city, country_code, rating" detail_level = "basic" elif zoom <= 14: # Region view - moderate detail select_fields = """ ghcid, name, emic_name, type, type_name, lon, lat, city, region as province, country_code, rating, total_ratings, website, wikidata_id, description """ detail_level = "moderate" else: # City/street view - full detail select_fields = """ ghcid, name, emic_name, type, type_name, lon, lat, city, region as province, region_code as province_iso, country_code, formatted_address, street_address, postal_code, rating, total_ratings, wikidata_id, website, phone, email, isil_code, google_place_id, description, opening_hours, reviews, photos, photo_urls, business_status, street_view_url, founding_year, dissolution_year, social_facebook, social_twitter, social_instagram, wikidata_label_en, wikidata_description_en """ detail_level = "full" param_count += 1 query = f""" SELECT {select_fields} FROM custodians WHERE {where_clause} ORDER BY rating DESC NULLS LAST, name LIMIT ${param_count} """ params.append(limit) async with pool.acquire() as conn: rows = await conn.fetch(query, *params) features = [] for row in rows: row_dict = dict(row) props = {"ghcid": row_dict['ghcid'], "name": row_dict['name'], "type": row_dict['type']} # Add fields based on detail level if 'type_name' in row_dict: props['type_name'] = row_dict['type_name'] if 'city' in row_dict: props['city'] = row_dict['city'] if 'country_code' in row_dict: props['country_code'] = row_dict['country_code'] if 'rating' in row_dict and row_dict['rating']: props['rating'] = float(row_dict['rating']) if 'total_ratings' in row_dict: props['total_ratings'] = row_dict['total_ratings'] if 'province' in row_dict: props['province'] = row_dict['province'] if 'province_iso' in row_dict: props['province_iso'] = row_dict['province_iso'] if 'emic_name' in row_dict: props['emic_name'] = row_dict['emic_name'] if 'website' in row_dict: props['website'] = row_dict['website'] if 'wikidata_id' in row_dict: props['wikidata_id'] = row_dict['wikidata_id'] if 'description' in row_dict: props['description'] = row_dict['description'] if 'formatted_address' in row_dict: props['formatted_address'] = row_dict['formatted_address'] if 'street_address' in row_dict: props['street_address'] = row_dict['street_address'] if 'postal_code' in row_dict: props['postal_code'] = row_dict['postal_code'] if 'phone' in row_dict: props['phone'] = row_dict['phone'] if 'email' in row_dict: props['email'] = row_dict['email'] if 'isil_code' in row_dict: props['isil_code'] = row_dict['isil_code'] if 'google_place_id' in row_dict: props['google_place_id'] = row_dict['google_place_id'] if 'business_status' in row_dict: props['business_status'] = row_dict['business_status'] if 'street_view_url' in row_dict: props['street_view_url'] = row_dict['street_view_url'] if 'founding_year' in row_dict: props['founding_year'] = row_dict['founding_year'] if 'dissolution_year' in row_dict: props['dissolution_year'] = row_dict['dissolution_year'] # JSONB fields at full detail if 'opening_hours' in row_dict and row_dict['opening_hours']: props['opening_hours'] = row_dict['opening_hours'] if 'reviews' in row_dict and row_dict['reviews']: props['reviews'] = row_dict['reviews'] if 'photos' in row_dict and row_dict['photos']: props['photos'] = row_dict['photos'] if 'photo_urls' in row_dict and row_dict['photo_urls']: props['photo_urls'] = row_dict['photo_urls'] # Social media at full detail social = {} if 'social_facebook' in row_dict and row_dict['social_facebook']: social['facebook'] = row_dict['social_facebook'] if 'social_twitter' in row_dict and row_dict['social_twitter']: social['twitter'] = row_dict['social_twitter'] if 'social_instagram' in row_dict and row_dict['social_instagram']: social['instagram'] = row_dict['social_instagram'] if social: props['social_media'] = social if 'wikidata_label_en' in row_dict and row_dict['wikidata_label_en']: props['wikidata_label'] = row_dict['wikidata_label_en'] if 'wikidata_description_en' in row_dict and row_dict['wikidata_description_en']: props['wikidata_description'] = row_dict['wikidata_description_en'] features.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(row_dict['lon']), float(row_dict['lat'])] }, "properties": props }) return { "type": "FeatureCollection", "features": features, "metadata": { "count": len(features), "mode": "viewport", "detail_level": detail_level, "zoom": zoom, "bbox": {"min_lon": min_lon, "min_lat": min_lat, "max_lon": max_lon, "max_lat": max_lat}, "filters": {"country": country, "type": type}, "limit": limit, "truncated": len(features) >= limit } } @app.get("/institutions/cluster") async def get_institutions_clustered( bbox: str = Query(..., description="Bounding box: minLon,minLat,maxLon,maxLat"), zoom: int = Query(5, ge=1, le=20, description="Map zoom level"), grid_size: Optional[float] = Query(None, description="Grid cell size in degrees (auto if not set)"), ): """ Get clustered institution counts for overview maps. Returns grid cells with counts instead of individual markers. Ideal for world/country views with 10k+ institutions. """ pool = await get_pool() try: min_lon, min_lat, max_lon, max_lat = map(float, bbox.split(',')) except ValueError: raise HTTPException(status_code=400, detail="Invalid bbox format") # Auto-calculate grid size based on zoom # Higher zoom = smaller grid cells = more detail if grid_size is None: if zoom <= 3: grid_size = 10.0 # ~1000km cells elif zoom <= 5: grid_size = 5.0 # ~500km cells elif zoom <= 7: grid_size = 2.0 # ~200km cells elif zoom <= 9: grid_size = 1.0 # ~100km cells elif zoom <= 11: grid_size = 0.5 # ~50km cells elif zoom <= 13: grid_size = 0.2 # ~20km cells else: grid_size = 0.1 # ~10km cells # Use PostgreSQL to cluster into grid cells query = """ SELECT FLOOR(lon / $5) * $5 + $5/2 as cell_lon, FLOOR(lat / $5) * $5 + $5/2 as cell_lat, COUNT(*) as count, array_agg(DISTINCT type) as types, AVG(rating) as avg_rating FROM custodians WHERE lat IS NOT NULL AND lon IS NOT NULL AND lon >= $1 AND lat >= $2 AND lon <= $3 AND lat <= $4 GROUP BY FLOOR(lon / $5), FLOOR(lat / $5) ORDER BY count DESC """ async with pool.acquire() as conn: rows = await conn.fetch(query, min_lon, min_lat, max_lon, max_lat, grid_size) features = [] total_count = 0 for row in rows: count = row['count'] total_count += count features.append({ "type": "Feature", "geometry": { "type": "Point", "coordinates": [float(row['cell_lon']), float(row['cell_lat'])] }, "properties": { "cluster": True, "count": count, "types": row['types'], "avg_rating": round(float(row['avg_rating']), 2) if row['avg_rating'] else None, } }) return { "type": "FeatureCollection", "features": features, "metadata": { "mode": "clustered", "cluster_count": len(features), "total_institutions": total_count, "grid_size": grid_size, "zoom": zoom, "bbox": {"min_lon": min_lon, "min_lat": min_lat, "max_lon": max_lon, "max_lat": max_lat} } } # ============================================================================ # Person Endpoints (Beschermers) # ============================================================================ @app.get("/persons", response_model=Dict[str, Any]) async def list_persons( custodian_slug: Optional[str] = Query(None, description="Filter by custodian slug"), heritage_type: Optional[str] = Query(None, description="Filter by heritage type (A, L, M, etc.)"), country_code: Optional[str] = Query(None, description="Filter by country code"), heritage_relevant: Optional[bool] = Query(None, description="Filter by heritage relevance (true/false)"), limit: int = Query(50, ge=1, le=500, description="Max results to return"), offset: int = Query(0, ge=0, description="Offset for pagination"), ): """ List persons with optional filters. Returns paginated list of heritage professionals (beschermers). """ pool = await get_pool() # Build query with optional filters conditions = [] params = [] param_idx = 1 if custodian_slug: conditions.append(f"custodian_slug = ${param_idx}") params.append(custodian_slug) param_idx += 1 if heritage_type: conditions.append(f"${param_idx} = ANY(heritage_types)") params.append(heritage_type) param_idx += 1 if country_code: conditions.append(f"country_code = ${param_idx}") params.append(country_code) param_idx += 1 if heritage_relevant is not None: conditions.append(f"heritage_relevant = ${param_idx}") params.append(heritage_relevant) param_idx += 1 where_clause = f"WHERE {' AND '.join(conditions)}" if conditions else "" # Add pagination params params.extend([limit, offset]) query = f""" SELECT staff_id, name, headline, location, country_code, custodian_slug, custodian_name, linkedin_url, profile_image_url, heritage_relevant, heritage_types FROM persons {where_clause} ORDER BY name LIMIT ${param_idx} OFFSET ${param_idx + 1} """ # Count query count_query = f"SELECT COUNT(*) FROM persons {where_clause}" async with pool.acquire() as conn: rows = await conn.fetch(query, *params) count_params = params[:-2] if params else [] # Remove limit/offset for count total = await conn.fetchval(count_query, *count_params) if count_params else await conn.fetchval(count_query) persons = [] for row in rows: persons.append(PersonSummary( staff_id=row['staff_id'], name=row['name'], headline=row['headline'], location=row['location'], country_code=row['country_code'], custodian_slug=row['custodian_slug'], custodian_name=row['custodian_name'], linkedin_url=row['linkedin_url'], profile_image_url=row['profile_image_url'], heritage_relevant=row['heritage_relevant'] if row['heritage_relevant'] is not None else True, heritage_types=row['heritage_types'] if row['heritage_types'] else [], )) return { "persons": [p.model_dump() for p in persons], "total": total, "limit": limit, "offset": offset, "has_more": offset + len(persons) < total, } @app.get("/persons/count") async def get_persons_count(): """Get total person count for stats display.""" pool = await get_pool() async with pool.acquire() as conn: total = await conn.fetchval("SELECT COUNT(*) FROM persons") heritage_relevant = await conn.fetchval("SELECT COUNT(*) FROM persons WHERE heritage_relevant = true") return { "total": total, "heritage_relevant": heritage_relevant, } @app.get("/persons/search", response_model=Dict[str, Any]) async def search_persons( q: str = Query(..., min_length=2, description="Search query"), limit: int = Query(20, ge=1, le=100, description="Max results"), ): """ Search persons by name, headline, or custodian name. Uses PostgreSQL full-text search. """ pool = await get_pool() # Use ILIKE for simple search (full-text search can be added later if index exists) search_pattern = f"%{q}%" query = """ SELECT staff_id, name, headline, location, country_code, custodian_slug, custodian_name, linkedin_url, profile_image_url, heritage_relevant, heritage_types FROM persons WHERE name ILIKE $1 OR headline ILIKE $1 OR custodian_name ILIKE $1 ORDER BY CASE WHEN name ILIKE $2 THEN 0 ELSE 1 END, name LIMIT $3 """ async with pool.acquire() as conn: rows = await conn.fetch(query, search_pattern, f"{q}%", limit) persons = [] for row in rows: persons.append(PersonSummary( staff_id=row['staff_id'], name=row['name'], headline=row['headline'], location=row['location'], country_code=row['country_code'], custodian_slug=row['custodian_slug'], custodian_name=row['custodian_name'], linkedin_url=row['linkedin_url'], profile_image_url=row['profile_image_url'], heritage_relevant=row['heritage_relevant'] if row['heritage_relevant'] is not None else True, heritage_types=row['heritage_types'] if row['heritage_types'] else [], )) return { "persons": [p.model_dump() for p in persons], "count": len(persons), "query": q, } @app.get("/person/{staff_id}", response_model=PersonDetail) async def get_person(staff_id: str): """Get detailed information for a single person.""" pool = await get_pool() query = """ SELECT staff_id, name, headline, location, country_code, custodian_slug, custodian_name, linkedin_url, profile_image_url, heritage_relevant, heritage_types, experience, education, skills, languages, about, connections, extraction_date, extraction_method, source_file FROM persons WHERE staff_id = $1 """ async with pool.acquire() as conn: row = await conn.fetchrow(query, staff_id) if not row: raise HTTPException(status_code=404, detail=f"Person not found: {staff_id}") return PersonDetail( staff_id=row['staff_id'], name=row['name'], headline=row['headline'], location=row['location'], country_code=row['country_code'], custodian_slug=row['custodian_slug'], custodian_name=row['custodian_name'], linkedin_url=row['linkedin_url'], profile_image_url=row['profile_image_url'], heritage_relevant=row['heritage_relevant'] if row['heritage_relevant'] is not None else True, heritage_types=parse_jsonb_list(row['heritage_types']), experience=enrich_experience_with_heritage(parse_jsonb_list(row['experience'])), education=parse_jsonb_list(row['education']), skills=parse_jsonb_list(row['skills']), languages=parse_jsonb_list(row['languages']), about=row['about'], connections=row['connections'], extraction_date=row['extraction_date'].isoformat() if row['extraction_date'] else None, extraction_method=row['extraction_method'], source_file=row['source_file'], ) # ============================================================================ # Image Proxy (Avoid Hotlinking Issues) # ============================================================================ # In-memory cache for proxied images (simple TTL-based) _image_cache: Dict[str, tuple] = {} # hash -> (content, content_type, timestamp) IMAGE_CACHE_TTL = 3600 # 1 hour # Allowed image domains for security ALLOWED_IMAGE_DOMAINS = { # Google Maps 'lh3.googleusercontent.com', 'lh4.googleusercontent.com', 'lh5.googleusercontent.com', 'lh6.googleusercontent.com', 'maps.gstatic.com', 'maps.googleapis.com', # Wikidata/Wikimedia 'upload.wikimedia.org', 'commons.wikimedia.org', # Institution domains (add as needed) # Generic patterns handled below } def is_allowed_image_url(url: str) -> bool: """Check if URL is from an allowed domain for proxying.""" try: parsed = urlparse(url) domain = parsed.netloc.lower() # Check exact matches if domain in ALLOWED_IMAGE_DOMAINS: return True # Allow any .nl domain (Dutch institutions) if domain.endswith('.nl'): return True # Allow any .org domain (many heritage institutions) if domain.endswith('.org'): return True # Allow any .museum domain if domain.endswith('.museum'): return True # Check for Google user content subdomains if 'googleusercontent.com' in domain: return True return False except Exception: return False @app.get("/image-proxy") async def proxy_image(url: str = Query(..., description="Image URL to proxy")): """ Proxy external images to avoid hotlinking issues. Many external servers block direct embedding (hotlinking) of their images. This endpoint fetches the image server-side and returns it with proper headers. Features: - Validates URL is from allowed domains (security) - Caches images in memory for 1 hour (performance) - Sets proper Content-Type headers - Avoids CORS issues Usage: /image-proxy?url=https://example.com/logo.png """ # Security: validate URL if not url or not url.startswith(('http://', 'https://')): raise HTTPException(status_code=400, detail="Invalid URL") if not is_allowed_image_url(url): raise HTTPException(status_code=403, detail="Domain not allowed for proxying") # Check cache url_hash = hashlib.md5(url.encode()).hexdigest() if url_hash in _image_cache: content, content_type, timestamp = _image_cache[url_hash] if datetime.now().timestamp() - timestamp < IMAGE_CACHE_TTL: return Response( content=content, media_type=content_type, headers={ "Cache-Control": "public, max-age=3600", "X-Proxy-Cache": "HIT", } ) # Fetch image try: async with httpx.AsyncClient(timeout=10.0, follow_redirects=True) as client: response = await client.get( url, headers={ # Spoof headers to avoid hotlink detection "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.9,nl;q=0.8", "Referer": urlparse(url).scheme + "://" + urlparse(url).netloc + "/", } ) if response.status_code != 200: raise HTTPException(status_code=502, detail=f"Failed to fetch image: {response.status_code}") content = response.content content_type = response.headers.get("content-type", "image/png") # Validate it's actually an image if not content_type.startswith("image/"): raise HTTPException(status_code=400, detail="URL does not point to an image") # Cache the result _image_cache[url_hash] = (content, content_type, datetime.now().timestamp()) # Limit cache size (simple LRU-like cleanup) if len(_image_cache) > 1000: # Remove oldest entries sorted_entries = sorted(_image_cache.items(), key=lambda x: x[1][2]) for key, _ in sorted_entries[:500]: del _image_cache[key] return Response( content=content, media_type=content_type, headers={ "Cache-Control": "public, max-age=3600", "X-Proxy-Cache": "MISS", } ) except httpx.TimeoutException: raise HTTPException(status_code=504, detail="Timeout fetching image") except httpx.RequestError as e: raise HTTPException(status_code=502, detail=f"Error fetching image: {str(e)}") # ============================================================================ # Main # ============================================================================ if __name__ == "__main__": import uvicorn uvicorn.run( "geo_api:app", host=settings.api_host, port=settings.api_port, reload=True, )