#!/usr/bin/env python3 """ Webhook receiver for Forgejo push events. Triggers schema sync when push events are received on the master branch. Run with: uvicorn deploy-webhook:app --port 8099 --host 127.0.0.1 Or as systemd service: deploy-webhook.service """ import asyncio import hashlib import hmac import json import os import logging from datetime import datetime from pathlib import Path from typing import Optional from fastapi import FastAPI, Request, HTTPException, Header from fastapi.responses import JSONResponse from pydantic import BaseModel # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) app = FastAPI(title="GLAM Deploy Webhook", version="1.1.0") # Configuration WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "") REPO_PATH = Path("/var/lib/glam/repo") FRONTEND_PATH = Path("/var/www/glam-frontend") LINKML_SOURCE = REPO_PATH / "schemas/20251121/linkml" LINKML_DEST = FRONTEND_PATH / "schemas/20251121/linkml" # Lock to prevent concurrent deployments deploy_lock = asyncio.Lock() def verify_signature(payload: bytes, signature: str) -> bool: """Verify Forgejo webhook signature.""" if not WEBHOOK_SECRET: logger.warning("WEBHOOK_SECRET not set, skipping signature verification") return True if not signature: return False expected = hmac.new( WEBHOOK_SECRET.encode(), payload, hashlib.sha256 ).hexdigest() # Forgejo uses sha256= format if signature.startswith("sha256="): signature = signature[7:] return hmac.compare_digest(expected, signature) class DeployResult(BaseModel): success: bool message: str details: Optional[dict] = None timestamp: str async def run_command(cmd: list[str], cwd: Optional[Path] = None) -> tuple[int, str, str]: """Run a shell command asynchronously.""" process = await asyncio.create_subprocess_exec( *cmd, cwd=cwd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() returncode = process.returncode if process.returncode is not None else -1 return returncode, stdout.decode(), stderr.decode() async def sync_schemas() -> dict: """ Sync LinkML schemas from staging area to frontend public directory. The staging area (/var/lib/glam/repo/schemas) is populated by rsync from the deployer's machine, NOT by git clone (to avoid slow network). """ results = {} # Ensure source exists if not LINKML_SOURCE.exists(): logger.error(f"Source directory does not exist: {LINKML_SOURCE}") results["check_source"] = { "success": False, "error": f"Source directory not found: {LINKML_SOURCE}" } return results results["check_source"] = {"success": True, "path": str(LINKML_SOURCE)} # Ensure destination directory exists LINKML_DEST.parent.mkdir(parents=True, exist_ok=True) # Sync LinkML schemas using rsync (local copy, very fast) logger.info(f"Syncing schemas: {LINKML_SOURCE} -> {LINKML_DEST}") code, stdout, stderr = await run_command([ "rsync", "-av", "--delete", "--exclude", "*.pyc", "--exclude", "__pycache__", "--exclude", ".git", f"{LINKML_SOURCE}/", f"{LINKML_DEST}/" ]) results["rsync_linkml"] = { "success": code == 0, "files_synced": len([l for l in stdout.split('\n') if l and not l.startswith('sent') and not l.startswith('total')]), "stdout": stdout[:1000] if stdout else "", "stderr": stderr[:500] if stderr else "" } if code != 0: logger.error(f"rsync failed: {stderr}") else: logger.info("Schema sync completed successfully") return results @app.get("/health") async def health(): """Health check endpoint.""" return {"status": "ok", "service": "deploy-webhook", "version": "1.1.0"} @app.get("/webhook/status") async def webhook_status(): """Status endpoint showing configuration.""" return { "status": "ready", "source_path": str(LINKML_SOURCE), "source_exists": LINKML_SOURCE.exists(), "dest_path": str(LINKML_DEST), "dest_exists": LINKML_DEST.exists(), "secret_configured": bool(WEBHOOK_SECRET) } @app.post("/webhook/deploy") async def deploy_webhook( request: Request, x_forgejo_signature: Optional[str] = Header(None, alias="X-Forgejo-Signature"), x_forgejo_event: Optional[str] = Header(None, alias="X-Forgejo-Event"), x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"), x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"), ): """ Handle Forgejo/Gitea push webhook. Triggers schema sync on push to master branch. """ body = await request.body() # Use Forgejo or Gitea headers (Forgejo is a Gitea fork) signature = x_forgejo_signature or x_gitea_signature event = x_forgejo_event or x_gitea_event # Verify signature if not verify_signature(body, signature or ""): logger.warning("Invalid webhook signature") raise HTTPException(status_code=401, detail="Invalid signature") # Parse payload try: payload = json.loads(body) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON payload") # Only process push events if event != "push": return JSONResponse({ "status": "ignored", "reason": f"Event type '{event}' not handled" }) # Only process pushes to main/master branch ref = payload.get("ref", "") if ref not in ["refs/heads/main", "refs/heads/master"]: return JSONResponse({ "status": "ignored", "reason": f"Push to non-main branch: {ref}" }) # Check if schemas changed commits = payload.get("commits", []) schema_changed = False for commit in commits: modified = commit.get("modified", []) + commit.get("added", []) + commit.get("removed", []) for path in modified: if path.startswith("schemas/20251121/linkml/"): schema_changed = True break if schema_changed: break if not schema_changed: return JSONResponse({ "status": "ignored", "reason": "No schema changes detected" }) # Acquire lock to prevent concurrent deployments if deploy_lock.locked(): return JSONResponse({ "status": "busy", "message": "Deployment already in progress" }, status_code=409) async with deploy_lock: logger.info(f"Starting deployment for push to {ref}") try: results = await sync_schemas() success = all(r.get("success", False) for r in results.values()) return DeployResult( success=success, message="Schema sync completed" if success else "Schema sync failed", details=results, timestamp=datetime.utcnow().isoformat() ) except Exception as e: logger.exception("Deployment failed") raise HTTPException(status_code=500, detail=str(e)) @app.post("/webhook/deploy/manual") async def manual_deploy(request: Request): """ Manual deployment trigger (for testing or forced sync). Requires a simple auth token. """ auth = request.headers.get("Authorization", "") expected = f"Bearer {WEBHOOK_SECRET}" if WEBHOOK_SECRET and auth != expected: raise HTTPException(status_code=401, detail="Unauthorized") if deploy_lock.locked(): return JSONResponse({ "status": "busy", "message": "Deployment already in progress" }, status_code=409) async with deploy_lock: logger.info("Starting manual deployment") try: results = await sync_schemas() success = all(r.get("success", False) for r in results.values()) return DeployResult( success=success, message="Manual schema sync completed" if success else "Manual sync failed", details=results, timestamp=datetime.utcnow().isoformat() ) except Exception as e: logger.exception("Manual deployment failed") raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="127.0.0.1", port=8099)