glam/contact_discovery_api.py
2025-12-14 17:09:55 +01:00

529 lines
No EOL
16 KiB
Python

#!/usr/bin/env python3
"""
Contact Discovery API Server
RESTful API for ethical contact discovery research
This server provides endpoints for conducting contact discovery research
with built-in compliance checks and rate limiting.
"""
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional, Dict
import asyncio
import logging
import time
import hashlib
import sqlite3
from datetime import datetime, timezone
import os
from contextlib import asynccontextmanager
import uvicorn
from contact_discovery_service import (
ContactDiscoveryService,
DiscoveryConfig,
DiscoveryMode,
ContactInfo,
ComplianceChecker
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Security
security = HTTPBearer(auto_error=False)
# Database for API keys
API_DB_PATH = "api_keys.db"
class DiscoveryRequest(BaseModel):
"""Request model for contact discovery"""
phone_numbers: List[str] = Field(
...,
max_length=1000,
description="Phone numbers to discover"
)
mode: DiscoveryMode = Field(DiscoveryMode.DEMO, description="Discovery mode")
max_queries_per_second: int = Field(
10,
ge=1,
le=1000,
description="Rate limit"
)
enable_metadata: bool = Field(False, description="Enable metadata collection")
consent_required: bool = Field(True, description="Require explicit consent")
research_purpose: Optional[str] = Field(None, description="Research purpose description")
irb_approval: Optional[str] = Field(None, description="IRB approval number")
class DiscoveryResponse(BaseModel):
"""Response model for contact discovery"""
request_id: str
status: str
total_processed: int
active_found: int
processing_time: float
compliance_status: str
contacts: Optional[List[ContactInfo]] = None
message: Optional[str] = None
class ComplianceReport(BaseModel):
"""Compliance report model"""
compliant: bool
checks: Dict[str, bool]
recommendations: List[str]
gdpr_score: float
class APIKey(BaseModel):
"""API key model"""
key_id: str
key_hash: str
permissions: List[str]
rate_limit: int
created_at: datetime
last_used: Optional[datetime] = None
usage_count: int = 0
is_active: bool = True
# Global service instance
discovery_service = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifecycle"""
global discovery_service
# Initialize database
init_api_database()
# Create default discovery service
config = DiscoveryConfig(
mode=DiscoveryMode.DEMO,
max_queries_per_second=10,
batch_size=100,
enable_metadata_collection=False,
respect_rate_limits=True,
require_consent=True,
log_all_activities=True
)
discovery_service = ContactDiscoveryService(config)
logger.info("Contact discovery service initialized")
yield
logger.info("Shutting down contact discovery service")
# Initialize FastAPI app
app = FastAPI(
title="Contact Discovery API",
description="Ethical contact discovery service for research purposes",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Restrict in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
def init_api_database():
"""Initialize API database"""
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_keys (
key_id TEXT PRIMARY KEY,
key_hash TEXT UNIQUE NOT NULL,
permissions TEXT NOT NULL,
rate_limit INTEGER NOT NULL,
created_at TEXT NOT NULL,
last_used TEXT,
usage_count INTEGER DEFAULT 0,
is_active BOOLEAN DEFAULT TRUE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS usage_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
endpoint TEXT NOT NULL,
request_count INTEGER NOT NULL,
compliance_check BOOLEAN DEFAULT TRUE
)
''')
conn.commit()
conn.close()
def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify API key"""
if not credentials:
raise HTTPException(status_code=401, detail="API key required")
key_hash = hashlib.sha256(credentials.credentials.encode()).hexdigest()
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute('''
SELECT key_id, permissions, rate_limit, is_active
FROM api_keys
WHERE key_hash = ? AND is_active = TRUE
''', (key_hash,))
result = cursor.fetchone()
conn.close()
if not result:
raise HTTPException(status_code=401, detail="Invalid or inactive API key")
key_id, permissions, rate_limit, is_active = result
# Update usage
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute('''
UPDATE api_keys
SET last_used = ?, usage_count = usage_count + 1
WHERE key_id = ?
''', (datetime.now(timezone.utc).isoformat(), key_id))
conn.commit()
conn.close()
return {
"key_id": key_id,
"permissions": permissions.split(","),
"rate_limit": rate_limit
}
def log_api_usage(key_info: Dict, endpoint: str, request_count: int = 1):
"""Log API usage"""
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO usage_log (key_id, timestamp, endpoint, request_count)
VALUES (?, ?, ?, ?)
''', (
key_info["key_id"],
datetime.now(timezone.utc).isoformat(),
endpoint,
request_count
))
conn.commit()
conn.close()
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "Contact Discovery API",
"version": "1.0.0",
"purpose": "Ethical contact discovery for research",
"warning": "For authorized research purposes only",
"endpoints": {
"discovery": "/discover",
"compliance": "/compliance",
"stats": "/stats",
"docs": "/docs"
}
}
@app.post("/discover", response_model=DiscoveryResponse)
async def discover_contacts(
request: DiscoveryRequest,
background_tasks: BackgroundTasks,
key_info: Dict = Depends(verify_api_key)
):
"""Discover contacts from phone numbers"""
start_time = time.time()
# Validate permissions
if "discovery" not in key_info["permissions"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Check rate limit
if request.max_queries_per_second > key_info["rate_limit"]:
raise HTTPException(
status_code=400,
detail=f"Rate limit exceeds allowed maximum of {key_info['rate_limit']}/second"
)
# Validate research mode
if request.mode == DiscoveryMode.RESEARCH:
if not request.research_purpose or not request.irb_approval:
raise HTTPException(
status_code=400,
detail="Research mode requires purpose and IRB approval"
)
# Create configuration
config = DiscoveryConfig(
mode=request.mode,
max_queries_per_second=request.max_queries_per_second,
batch_size=100,
enable_metadata_collection=request.enable_metadata,
respect_rate_limits=True,
require_consent=request.consent_required,
log_all_activities=True
)
# Check compliance
compliance = ComplianceChecker.check_gdpr_compliance(config)
if not compliance["compliant"]:
return DiscoveryResponse(
request_id=hashlib.md5(str(request).encode()).hexdigest()[:16],
status="REJECTED",
total_processed=0,
active_found=0,
processing_time=time.time() - start_time,
compliance_status="NON_COMPLIANT",
message=f"Compliance check failed: {', '.join(compliance['recommendations'])}"
)
# Create service instance
service = ContactDiscoveryService(config)
# Process discovery
try:
contacts = await service.discover_contacts(request.phone_numbers)
# Log usage in background
background_tasks.add_task(
log_api_usage,
key_info,
"/discover",
len(request.phone_numbers)
)
active_count = sum(1 for c in contacts if c.is_active)
return DiscoveryResponse(
request_id=hashlib.md5(str(request).encode()).hexdigest()[:16],
status="COMPLETED",
total_processed=len(contacts),
active_found=active_count,
processing_time=time.time() - start_time,
compliance_status="COMPLIANT",
contacts=contacts if request.mode == DiscoveryMode.DEMO else None
)
except Exception as e:
logger.error(f"Discovery failed: {str(e)}")
raise HTTPException(status_code=500, detail=f"Discovery failed: {str(e)}")
@app.post("/compliance", response_model=ComplianceReport)
async def check_compliance(
config: DiscoveryConfig,
key_info: Dict = Depends(verify_api_key)
):
"""Check compliance of discovery configuration"""
if "compliance" not in key_info["permissions"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
compliance = ComplianceChecker.check_gdpr_compliance(config)
# Calculate GDPR score
gdpr_score = sum(compliance["checks"].values()) / len(compliance["checks"]) * 100
return ComplianceReport(
compliant=compliance["compliant"],
checks=compliance["checks"],
recommendations=compliance["recommendations"],
gdpr_score=gdpr_score
)
@app.get("/stats")
async def get_statistics(key_info: Dict = Depends(verify_api_key)):
"""Get service statistics"""
if "stats" not in key_info["permissions"]:
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Get global stats from database
if discovery_service:
conn = sqlite3.connect(discovery_service.db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM discoveries")
total_processed = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM discoveries WHERE is_active = 1")
active_found = cursor.fetchone()[0]
cursor.execute("""
SELECT country_code, COUNT(*)
FROM discoveries
GROUP BY country_code
ORDER BY COUNT(*) DESC
LIMIT 10
""")
top_countries = dict(cursor.fetchall())
cursor.execute("""
SELECT DATE(discovery_timestamp) as date, COUNT(*)
FROM discoveries
WHERE DATE(discovery_timestamp) >= DATE('now', '-7 days')
GROUP BY DATE(discovery_timestamp)
ORDER BY date
""")
recent_activity = dict(cursor.fetchall())
conn.close()
# Get API usage stats
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT key_id, COUNT(*) as requests
FROM usage_log
WHERE timestamp >= datetime('now', '-24 hours')
GROUP BY key_id
""")
daily_usage = dict(cursor.fetchall())
conn.close()
return {
"discovery_stats": {
"total_processed": total_processed,
"active_found": active_found,
"success_rate": active_found / total_processed if total_processed > 0 else 0,
"top_countries": top_countries,
"recent_activity": recent_activity
},
"api_usage": {
"daily_requests": sum(daily_usage.values()),
"active_keys": len(daily_usage),
"usage_by_key": daily_usage
},
"compliance": {
"gdpr_compliant": True,
"rate_limiting": True,
"audit_logging": True,
"consent_required": True
}
}
else:
return {"error": "Service not initialized"}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": "contact_discovery",
"version": "1.0.0"
}
@app.post("/generate-demo-key")
async def generate_demo_key():
"""Generate a demo API key for testing"""
# Generate demo key
demo_key = f"demo_{hashlib.md5(str(time.time()).encode()).hexdigest()[:16]}"
key_hash = hashlib.sha256(demo_key.encode()).hexdigest()
# Store in database
conn = sqlite3.connect(API_DB_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO api_keys
(key_id, key_hash, permissions, rate_limit, created_at)
VALUES (?, ?, ?, ?, ?)
''', (
demo_key,
key_hash,
"discovery,compliance,stats",
50, # Conservative rate limit for demo
datetime.now(timezone.utc).isoformat()
))
conn.commit()
conn.close()
logger.info(f"Generated demo API key: {demo_key}")
return {
"api_key": demo_key,
"permissions": ["discovery", "compliance", "stats"],
"rate_limit": 50,
"warning": "Demo key for educational purposes only",
"expires_in": "24 hours"
}
# Error handlers
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Handle HTTP exceptions"""
return JSONResponse(
status_code=exc.status_code,
content={
"error": exc.detail,
"status_code": exc.status_code,
"timestamp": datetime.now(timezone.utc).isoformat()
}
)
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""Handle general exceptions"""
logger.error(f"Unhandled exception: {str(exc)}")
return JSONResponse(
status_code=500,
content={
"error": "Internal server error",
"status_code": 500,
"timestamp": datetime.now(timezone.utc).isoformat()
}
)
if __name__ == "__main__":
print("Starting Contact Discovery API Server")
print("Educational demonstration only")
print()
# Generate demo key on startup
import asyncio
async def startup():
app = FastAPI()
async with app.router.lifespan_context(app):
response = await generate_demo_key()
print(f"Demo API Key: {response['api_key']}")
print(f"Permissions: {', '.join(response['permissions'])}")
print(f"Rate limit: {response['rate_limit']}/second")
print()
print("Use this key in Authorization header:")
print("Authorization: Bearer <API_KEY>")
print()
print("Server starting on http://localhost:8000")
print("API docs available at http://localhost:8000/docs")
asyncio.run(startup())
# Start server
uvicorn.run(
"contact_discovery_api:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)