glam/infrastructure/scripts/deploy-webhook.py
kempersc a4184cb805 feat(infra): add webhook-based schema deployment pipeline
- Add FastAPI webhook receiver for Forgejo push events
- Add setup script for server deployment
- Add Caddy snippet for webhook endpoint
- Add local sync-schemas.sh helper script
- Sync frontend schemas with source (archived deprecated slots)

Infrastructure scripts staged for optional webhook deployment.
Current deployment uses: ./infrastructure/deploy.sh --frontend
2026-01-10 21:45:02 +01:00

262 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Webhook receiver for Forgejo push events.
Triggers schema sync when push events are received on the main branch.
Run with: uvicorn deploy-webhook:app --port 8099 --host 127.0.0.1
Or as systemd service: deploy-webhook.service
"""
import asyncio
import hashlib
import hmac
import json
import os
import subprocess
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="GLAM Deploy Webhook", version="1.0.0")
# Configuration
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "")
REPO_PATH = Path("/var/lib/glam/repo")
SCRIPTS_PATH = Path("/var/lib/glam/scripts")
FRONTEND_PATH = Path("/var/www/glam-frontend")
LINKML_SOURCE = REPO_PATH / "schemas/20251121/linkml"
LINKML_DEST = FRONTEND_PATH / "schemas/20251121/linkml"
# Lock to prevent concurrent deployments
deploy_lock = asyncio.Lock()
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify Forgejo webhook signature."""
if not WEBHOOK_SECRET:
logger.warning("WEBHOOK_SECRET not set, skipping signature verification")
return True
if not signature:
return False
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
# Forgejo uses sha256=<hex> format
if signature.startswith("sha256="):
signature = signature[7:]
return hmac.compare_digest(expected, signature)
class DeployResult(BaseModel):
success: bool
message: str
details: Optional[dict] = None
timestamp: str
async def run_command(cmd: list[str], cwd: Optional[Path] = None) -> tuple[int, str, str]:
"""Run a shell command asynchronously."""
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
return process.returncode, stdout.decode(), stderr.decode()
async def sync_schemas() -> dict:
"""Sync LinkML schemas from repo to frontend public directory."""
results = {}
# Pull latest changes
logger.info(f"Pulling latest changes in {REPO_PATH}")
code, stdout, stderr = await run_command(["git", "pull", "origin", "main"], cwd=REPO_PATH)
results["git_pull"] = {
"success": code == 0,
"stdout": stdout[:500] if stdout else "",
"stderr": stderr[:500] if stderr else ""
}
if code != 0:
logger.error(f"Git pull failed: {stderr}")
return results
# Ensure destination directory exists
LINKML_DEST.mkdir(parents=True, exist_ok=True)
# Sync LinkML schemas using rsync
logger.info(f"Syncing schemas: {LINKML_SOURCE} -> {LINKML_DEST}")
code, stdout, stderr = await run_command([
"rsync", "-av", "--delete",
"--exclude", "*.pyc",
"--exclude", "__pycache__",
"--exclude", ".git",
f"{LINKML_SOURCE}/",
f"{LINKML_DEST}/"
])
results["rsync_linkml"] = {
"success": code == 0,
"stdout": stdout[:1000] if stdout else "",
"stderr": stderr[:500] if stderr else ""
}
if code != 0:
logger.error(f"rsync failed: {stderr}")
else:
logger.info("Schema sync completed successfully")
return results
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "ok", "service": "deploy-webhook"}
@app.post("/webhook/deploy")
async def deploy_webhook(
request: Request,
x_forgejo_signature: Optional[str] = Header(None, alias="X-Forgejo-Signature"),
x_forgejo_event: Optional[str] = Header(None, alias="X-Forgejo-Event"),
x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"),
x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"),
):
"""
Handle Forgejo/Gitea push webhook.
Triggers schema sync on push to main branch.
"""
body = await request.body()
# Use Forgejo or Gitea headers (Forgejo is a Gitea fork)
signature = x_forgejo_signature or x_gitea_signature
event = x_forgejo_event or x_gitea_event
# Verify signature
if not verify_signature(body, signature or ""):
logger.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
try:
payload = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# Only process push events
if event != "push":
return JSONResponse({
"status": "ignored",
"reason": f"Event type '{event}' not handled"
})
# Only process pushes to main branch
ref = payload.get("ref", "")
if ref not in ["refs/heads/main", "refs/heads/master"]:
return JSONResponse({
"status": "ignored",
"reason": f"Push to non-main branch: {ref}"
})
# Check if schemas changed
commits = payload.get("commits", [])
schema_changed = False
for commit in commits:
modified = commit.get("modified", []) + commit.get("added", []) + commit.get("removed", [])
for path in modified:
if path.startswith("schemas/20251121/linkml/"):
schema_changed = True
break
if schema_changed:
break
if not schema_changed:
return JSONResponse({
"status": "ignored",
"reason": "No schema changes detected"
})
# Acquire lock to prevent concurrent deployments
if deploy_lock.locked():
return JSONResponse({
"status": "busy",
"message": "Deployment already in progress"
}, status_code=409)
async with deploy_lock:
logger.info(f"Starting deployment for push to {ref}")
try:
results = await sync_schemas()
success = all(r.get("success", False) for r in results.values())
return DeployResult(
success=success,
message="Schema sync completed" if success else "Schema sync failed",
details=results,
timestamp=datetime.utcnow().isoformat()
)
except Exception as e:
logger.exception("Deployment failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/deploy/manual")
async def manual_deploy(request: Request):
"""
Manual deployment trigger (for testing or forced sync).
Requires a simple auth token.
"""
auth = request.headers.get("Authorization", "")
expected = f"Bearer {WEBHOOK_SECRET}"
if WEBHOOK_SECRET and auth != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
if deploy_lock.locked():
return JSONResponse({
"status": "busy",
"message": "Deployment already in progress"
}, status_code=409)
async with deploy_lock:
logger.info("Starting manual deployment")
try:
results = await sync_schemas()
success = all(r.get("success", False) for r in results.values())
return DeployResult(
success=success,
message="Manual schema sync completed" if success else "Manual sync failed",
details=results,
timestamp=datetime.utcnow().isoformat()
)
except Exception as e:
logger.exception("Manual deployment failed")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8099)