glam/infrastructure/scripts/deploy-webhook.py
kempersc 3c3be47e32 feat(infra): add fast push-based schema sync to production
- Replace slow Forgejo→Server git pull with direct local rsync
- Add git-push-schemas.sh wrapper script for manual pushes
- Add post-commit hook for automatic schema sync
- Fix YAML syntax errors in slot comment blocks
- Update deploy-webhook.py to use master branch
2026-01-11 01:22:47 +01:00

278 lines
8.6 KiB
Python

#!/usr/bin/env python3
"""
Webhook receiver for Forgejo push events.
Triggers schema sync when push events are received on the master branch.
Run with: uvicorn deploy-webhook:app --port 8099 --host 127.0.0.1
Or as systemd service: deploy-webhook.service
"""
import asyncio
import hashlib
import hmac
import json
import os
import logging
from datetime import datetime
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, Request, HTTPException, Header
from fastapi.responses import JSONResponse
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI(title="GLAM Deploy Webhook", version="1.1.0")
# Configuration
WEBHOOK_SECRET = os.environ.get("WEBHOOK_SECRET", "")
REPO_PATH = Path("/var/lib/glam/repo")
FRONTEND_PATH = Path("/var/www/glam-frontend")
LINKML_SOURCE = REPO_PATH / "schemas/20251121/linkml"
LINKML_DEST = FRONTEND_PATH / "schemas/20251121/linkml"
# Lock to prevent concurrent deployments
deploy_lock = asyncio.Lock()
def verify_signature(payload: bytes, signature: str) -> bool:
"""Verify Forgejo webhook signature."""
if not WEBHOOK_SECRET:
logger.warning("WEBHOOK_SECRET not set, skipping signature verification")
return True
if not signature:
return False
expected = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
# Forgejo uses sha256=<hex> format
if signature.startswith("sha256="):
signature = signature[7:]
return hmac.compare_digest(expected, signature)
class DeployResult(BaseModel):
success: bool
message: str
details: Optional[dict] = None
timestamp: str
async def run_command(cmd: list[str], cwd: Optional[Path] = None) -> tuple[int, str, str]:
"""Run a shell command asynchronously."""
process = await asyncio.create_subprocess_exec(
*cmd,
cwd=cwd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
returncode = process.returncode if process.returncode is not None else -1
return returncode, stdout.decode(), stderr.decode()
async def sync_schemas() -> dict:
"""
Sync LinkML schemas from staging area to frontend public directory.
The staging area (/var/lib/glam/repo/schemas) is populated by rsync
from the deployer's machine, NOT by git clone (to avoid slow network).
"""
results = {}
# Ensure source exists
if not LINKML_SOURCE.exists():
logger.error(f"Source directory does not exist: {LINKML_SOURCE}")
results["check_source"] = {
"success": False,
"error": f"Source directory not found: {LINKML_SOURCE}"
}
return results
results["check_source"] = {"success": True, "path": str(LINKML_SOURCE)}
# Ensure destination directory exists
LINKML_DEST.parent.mkdir(parents=True, exist_ok=True)
# Sync LinkML schemas using rsync (local copy, very fast)
logger.info(f"Syncing schemas: {LINKML_SOURCE} -> {LINKML_DEST}")
code, stdout, stderr = await run_command([
"rsync", "-av", "--delete",
"--exclude", "*.pyc",
"--exclude", "__pycache__",
"--exclude", ".git",
f"{LINKML_SOURCE}/",
f"{LINKML_DEST}/"
])
results["rsync_linkml"] = {
"success": code == 0,
"files_synced": len([l for l in stdout.split('\n') if l and not l.startswith('sent') and not l.startswith('total')]),
"stdout": stdout[:1000] if stdout else "",
"stderr": stderr[:500] if stderr else ""
}
if code != 0:
logger.error(f"rsync failed: {stderr}")
else:
logger.info("Schema sync completed successfully")
return results
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "ok", "service": "deploy-webhook", "version": "1.1.0"}
@app.get("/webhook/status")
async def webhook_status():
"""Status endpoint showing configuration."""
return {
"status": "ready",
"source_path": str(LINKML_SOURCE),
"source_exists": LINKML_SOURCE.exists(),
"dest_path": str(LINKML_DEST),
"dest_exists": LINKML_DEST.exists(),
"secret_configured": bool(WEBHOOK_SECRET)
}
@app.post("/webhook/deploy")
async def deploy_webhook(
request: Request,
x_forgejo_signature: Optional[str] = Header(None, alias="X-Forgejo-Signature"),
x_forgejo_event: Optional[str] = Header(None, alias="X-Forgejo-Event"),
x_gitea_signature: Optional[str] = Header(None, alias="X-Gitea-Signature"),
x_gitea_event: Optional[str] = Header(None, alias="X-Gitea-Event"),
):
"""
Handle Forgejo/Gitea push webhook.
Triggers schema sync on push to master branch.
"""
body = await request.body()
# Use Forgejo or Gitea headers (Forgejo is a Gitea fork)
signature = x_forgejo_signature or x_gitea_signature
event = x_forgejo_event or x_gitea_event
# Verify signature
if not verify_signature(body, signature or ""):
logger.warning("Invalid webhook signature")
raise HTTPException(status_code=401, detail="Invalid signature")
# Parse payload
try:
payload = json.loads(body)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON payload")
# Only process push events
if event != "push":
return JSONResponse({
"status": "ignored",
"reason": f"Event type '{event}' not handled"
})
# Only process pushes to main/master branch
ref = payload.get("ref", "")
if ref not in ["refs/heads/main", "refs/heads/master"]:
return JSONResponse({
"status": "ignored",
"reason": f"Push to non-main branch: {ref}"
})
# Check if schemas changed
commits = payload.get("commits", [])
schema_changed = False
for commit in commits:
modified = commit.get("modified", []) + commit.get("added", []) + commit.get("removed", [])
for path in modified:
if path.startswith("schemas/20251121/linkml/"):
schema_changed = True
break
if schema_changed:
break
if not schema_changed:
return JSONResponse({
"status": "ignored",
"reason": "No schema changes detected"
})
# Acquire lock to prevent concurrent deployments
if deploy_lock.locked():
return JSONResponse({
"status": "busy",
"message": "Deployment already in progress"
}, status_code=409)
async with deploy_lock:
logger.info(f"Starting deployment for push to {ref}")
try:
results = await sync_schemas()
success = all(r.get("success", False) for r in results.values())
return DeployResult(
success=success,
message="Schema sync completed" if success else "Schema sync failed",
details=results,
timestamp=datetime.utcnow().isoformat()
)
except Exception as e:
logger.exception("Deployment failed")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/webhook/deploy/manual")
async def manual_deploy(request: Request):
"""
Manual deployment trigger (for testing or forced sync).
Requires a simple auth token.
"""
auth = request.headers.get("Authorization", "")
expected = f"Bearer {WEBHOOK_SECRET}"
if WEBHOOK_SECRET and auth != expected:
raise HTTPException(status_code=401, detail="Unauthorized")
if deploy_lock.locked():
return JSONResponse({
"status": "busy",
"message": "Deployment already in progress"
}, status_code=409)
async with deploy_lock:
logger.info("Starting manual deployment")
try:
results = await sync_schemas()
success = all(r.get("success", False) for r in results.values())
return DeployResult(
success=success,
message="Manual schema sync completed" if success else "Manual sync failed",
details=results,
timestamp=datetime.utcnow().isoformat()
)
except Exception as e:
logger.exception("Manual deployment failed")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8099)