glam/contact_discovery_service.py
2025-12-14 17:09:55 +01:00

527 lines
No EOL
18 KiB
Python

#!/usr/bin/env python3
"""
Contact Discovery Service - Educational Implementation
Based on insights from WhatsApp vulnerability research
This service demonstrates technical concepts behind contact discovery mechanisms
with privacy-first design and ethical safeguards.
"""
import asyncio
import logging
import time
import json
import sqlite3
import hashlib
from datetime import datetime, timezone
from typing import List, Dict, Optional
from dataclasses import dataclass, asdict
from enum import Enum
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class DiscoveryMode(Enum):
"""Enumeration modes for contact discovery"""
RESEARCH = "research" # Academic research with IRB approval
DEFENSIVE = "defensive" # Testing own infrastructure
AUDIT = "audit" # Compliance auditing
DEMO = "demo" # Educational demonstration
@dataclass
class ContactInfo:
"""Contact information structure"""
phone_number: str
country_code: str
is_active: bool = False
profile_info: Optional[Dict] = None
discovery_timestamp: Optional[datetime] = None
source_method: str = ""
def __post_init__(self):
if self.discovery_timestamp is None:
self.discovery_timestamp = datetime.now(timezone.utc)
@dataclass
class DiscoveryConfig:
"""Configuration for contact discovery"""
mode: DiscoveryMode
max_queries_per_second: int = 10 # Conservative rate limit
batch_size: int = 50
enable_metadata_collection: bool = False
respect_rate_limits: bool = True
require_consent: bool = True
log_all_activities: bool = True
def validate(self) -> bool:
"""Validate configuration for compliance"""
if self.mode == DiscoveryMode.RESEARCH:
return self.require_consent and self.respect_rate_limits
elif self.mode in [DiscoveryMode.DEFENSIVE, DiscoveryMode.DEMO]:
return True
return False
class PhoneValidator:
"""Phone number validation utilities"""
@staticmethod
def validate_number(phone_number: str) -> bool:
"""Basic phone number validation"""
# Check if it starts with + and has 10-15 digits
if not phone_number.startswith('+'):
return False
digits = phone_number[1:]
return len(digits) >= 10 and len(digits) <= 15 and digits.isdigit()
@staticmethod
def extract_country_code(phone_number: str) -> str:
"""Extract country code from phone number"""
# Simple country code mapping
country_map = {
'+1': 'US',
'+44': 'GB',
'+49': 'DE',
'+33': 'FR',
'+31': 'NL',
'+34': 'ES',
'+39': 'IT',
'+43': 'AT',
'+41': 'CH',
'+46': 'SE',
'+47': 'NO',
'+45': 'DK',
'+358': 'FI',
'+353': 'IE',
'+351': 'PT',
'+30': 'GR',
'+420': 'CZ',
'+36': 'HU',
'+48': 'PL',
'+40': 'RO',
'+359': 'BG',
'+385': 'HR',
'+386': 'SI',
'+421': 'SK',
'+372': 'EE',
'+371': 'LV',
'+370': 'LT'
}
for code, country in country_map.items():
if phone_number.startswith(code):
return country
return 'UNKNOWN'
class RateLimiter:
"""Rate limiting for ethical discovery"""
def __init__(self, max_per_second: int):
self.max_per_second = max_per_second
self.request_times = []
self.lock = asyncio.Lock()
async def acquire(self):
"""Acquire rate limit token"""
async with self.lock:
now = time.time()
# Remove old requests (older than 1 second)
self.request_times = [
req_time for req_time in self.request_times
if now - req_time < 1.0
]
if len(self.request_times) >= self.max_per_second:
# Calculate wait time
sleep_time = 1.0 - (now - self.request_times[0])
if sleep_time > 0:
logger.info(f"Rate limit reached, waiting {sleep_time:.2f}s")
await asyncio.sleep(sleep_time)
return await self.acquire()
self.request_times.append(now)
return True
class ContactDiscoveryService:
"""Main contact discovery service"""
def __init__(self, config: DiscoveryConfig):
self.config = config
self.validator = PhoneValidator()
self.rate_limiter = RateLimiter(config.max_queries_per_second)
self.db_path = "discovery_results.db"
self._init_database()
# Validate configuration
if not config.validate():
raise ValueError("Invalid configuration for discovery mode")
def _init_database(self):
"""Initialize SQLite database for results"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS discoveries (
id INTEGER PRIMARY KEY AUTOINCREMENT,
phone_number TEXT UNIQUE,
country_code TEXT,
is_active BOOLEAN,
discovery_timestamp TEXT,
source_method TEXT,
metadata_json TEXT,
compliance_verified BOOLEAN DEFAULT FALSE
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
action TEXT,
details TEXT,
compliance_check BOOLEAN
)
''')
conn.commit()
conn.close()
async def discover_contacts(self, phone_numbers: List[str]) -> List[ContactInfo]:
"""Discover active contacts from phone numbers"""
logger.info(f"Starting discovery for {len(phone_numbers)} numbers")
logger.info(f"Mode: {self.config.mode.value}")
logger.info(f"Rate limit: {self.config.max_queries_per_second}/second")
# Log discovery start
self._log_audit_event("DISCOVERY_START", {
"count": len(phone_numbers),
"mode": self.config.mode.value,
"rate_limit": self.config.max_queries_per_second
})
contacts = []
# Process in batches
for i in range(0, len(phone_numbers), self.config.batch_size):
batch = phone_numbers[i:i + self.config.batch_size]
# Respect rate limits
if self.config.respect_rate_limits:
await self.rate_limiter.acquire()
# Process batch
batch_results = await self._process_batch(batch)
contacts.extend(batch_results)
# Progress update
logger.info(f"Processed {min(i + self.config.batch_size, len(phone_numbers))}/{len(phone_numbers)}")
# Ethical pause between batches
if self.config.respect_rate_limits:
await asyncio.sleep(0.1)
# Log completion
self._log_audit_event("DISCOVERY_COMPLETE", {
"total_processed": len(phone_numbers),
"active_found": sum(1 for c in contacts if c.is_active),
"mode": self.config.mode.value
})
return contacts
async def _process_batch(self, phone_numbers: List[str]) -> List[ContactInfo]:
"""Process a batch of phone numbers"""
results = []
for phone_number in phone_numbers:
# Validate phone number
if not self.validator.validate_number(phone_number):
logger.warning(f"Invalid phone number: {phone_number}")
continue
# Create contact info
contact = ContactInfo(
phone_number=phone_number,
country_code=self.validator.extract_country_code(phone_number),
source_method="educational_simulation"
)
# Process discovery with deterministic algorithm
if self.config.mode == DiscoveryMode.DEMO:
# Demo mode: deterministic pattern for demonstration
contact.is_active = hash(phone_number) % 3 == 0
if contact.is_active and self.config.enable_metadata_collection:
contact.profile_info = self._generate_mock_profile()
else:
# For other modes, default to not active for safety
contact.is_active = False
results.append(contact)
# Store in database
self._store_contact(contact)
return results
def _generate_mock_profile(self) -> Dict:
"""Generate demonstration profile for educational mode"""
return {
"about_text": "Educational demonstration profile - simulated data only",
"profile_picture_url": None,
"last_seen": None,
"is_business": False,
"device_type": "educational_demo"
}
def _store_contact(self, contact: ContactInfo):
"""Store contact in database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
timestamp = contact.discovery_timestamp or datetime.now(timezone.utc)
timestamp_str = timestamp.isoformat()
cursor.execute('''
INSERT OR REPLACE INTO discoveries
(phone_number, country_code, is_active, discovery_timestamp, source_method, metadata_json)
VALUES (?, ?, ?, ?, ?, ?)
''', (
contact.phone_number,
contact.country_code,
contact.is_active,
timestamp_str,
contact.source_method,
json.dumps(contact.profile_info) if contact.profile_info else None
))
conn.commit()
conn.close()
def _log_audit_event(self, action: str, details: Dict):
"""Log audit event"""
if not self.config.log_all_activities:
return
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO audit_log (timestamp, action, details, compliance_check)
VALUES (?, ?, ?, ?)
''', (
datetime.now(timezone.utc).isoformat(),
action,
json.dumps(details),
True # All actions are logged as compliant by default
))
conn.commit()
conn.close()
def generate_report(self) -> Dict:
"""Generate discovery report"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Get statistics
cursor.execute("SELECT COUNT(*) FROM discoveries")
total_processed = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM discoveries WHERE is_active = 1")
active_found = cursor.fetchone()[0]
cursor.execute('''
SELECT country_code, COUNT(*)
FROM discoveries
GROUP BY country_code
ORDER BY COUNT(*) DESC
''')
country_distribution = dict(cursor.fetchall())
cursor.execute('''
SELECT DATE(discovery_timestamp) as date, COUNT(*)
FROM discoveries
GROUP BY DATE(discovery_timestamp)
ORDER BY date
''')
daily_activity = dict(cursor.fetchall())
conn.close()
return {
"summary": {
"total_processed": total_processed,
"active_found": active_found,
"success_rate": active_found / total_processed if total_processed > 0 else 0,
"mode": self.config.mode.value
},
"geographic_distribution": country_distribution,
"daily_activity": daily_activity,
"compliance": {
"rate_limiting": self.config.respect_rate_limits,
"consent_required": self.config.require_consent,
"metadata_collection": self.config.enable_metadata_collection
}
}
class ComplianceChecker:
"""Check compliance with privacy regulations"""
@staticmethod
def check_gdpr_compliance(config: DiscoveryConfig) -> Dict:
"""Check GDPR compliance"""
checks = {
"lawful_basis": config.mode in [DiscoveryMode.RESEARCH, DiscoveryMode.DEFENSIVE, DiscoveryMode.DEMO],
"data_minimization": not config.enable_metadata_collection,
"purpose_limitation": config.mode != DiscoveryMode.AUDIT,
"consent": config.require_consent or config.mode == DiscoveryMode.DEMO,
"security_measures": config.respect_rate_limits,
"rights_respect": True
}
return {
"compliant": all(checks.values()),
"checks": checks,
"recommendations": ComplianceChecker._get_gdpr_recommendations(checks)
}
@staticmethod
def _get_gdpr_recommendations(checks: Dict) -> List[str]:
"""Get GDPR compliance recommendations"""
recommendations = []
if not checks["lawful_basis"]:
recommendations.append("Ensure lawful basis for processing (research/defensive only)")
if not checks["data_minimization"]:
recommendations.append("Disable metadata collection to minimize data")
if not checks["consent"]:
recommendations.append("Require explicit consent for data processing")
if not checks["security_measures"]:
recommendations.append("Implement rate limiting and security measures")
return recommendations
async def main():
"""Main demonstration function"""
print("=" * 60)
print("CONTACT DISCOVERY SERVICE - EDUCATIONAL DEMONSTRATION")
print("=" * 60)
print()
logger.info("Service initialized for educational demonstration")
logger.info("All operations use mock data only")
logger.info("Rate limiting and compliance checks enabled")
print()
# Create configuration for demo mode
config = DiscoveryConfig(
mode=DiscoveryMode.DEMO,
max_queries_per_second=5, # Very conservative for demo
batch_size=10,
enable_metadata_collection=False, # Privacy-first
respect_rate_limits=True,
require_consent=False, # Demo mode with simulated data
log_all_activities=True
)
# Check compliance
compliance = ComplianceChecker.check_gdpr_compliance(config)
print(f"Compliance Status: {'✅ COMPLIANT' if compliance['compliant'] else '❌ NON-COMPLIANT'}")
print()
if not compliance['compliant']:
print("Compliance Issues:")
for rec in compliance['recommendations']:
print(f" - {rec}")
print()
return
# Initialize service
service = ContactDiscoveryService(config)
# Generate demonstration phone numbers
logger.info("Generating demonstration phone numbers for educational testing")
test_numbers = [
"+15555551234", # US demo number
"+442012345678", # UK demo number
"+491234567890", # Germany demo number
"+33123456789", # France demo number
"+31123456789", # Netherlands demo number
"+34123456789", # Spain demo number
"+39123456789", # Italy demo number
"+43123456789", # Austria demo number
"+41123456789", # Switzerland demo number
"+46123456789", # Sweden demo number
"+47123456789", # Norway demo number
"+45123456789", # Denmark demo number
"+358123456789", # Finland demo number
"+353123456789", # Ireland demo number
"+351123456789", # Portugal demo number
"+30123456789", # Greece demo number
"+420123456789", # Czech Republic demo number
"+36123456789", # Hungary demo number
"+48123456789", # Poland demo number
"+40123456789", # Romania demo number
"+359123456789", # Bulgaria demo number
"+385123456789", # Croatia demo number
"+386123456789", # Slovenia demo number
"+421123456789", # Slovakia demo number
"+372123456789", # Estonia demo number
"+371123456789", # Latvia demo number
"+370123456789", # Lithuania test number
]
logger.info(f"Generated {len(test_numbers)} demonstration numbers")
logger.info("All numbers are for educational testing only")
print()
# Run discovery
print("Starting contact discovery demo...")
contacts = await service.discover_contacts(test_numbers)
# Results
active_contacts = [c for c in contacts if c.is_active]
print(f"\nResults:")
print(f" Total processed: {len(contacts)}")
print(f" Active found: {len(active_contacts)}")
print(f" Success rate: {len(active_contacts)/len(contacts)*100:.1f}%")
# Generate report
report = service.generate_report()
print(f"\nGeographic Distribution:")
for country, count in report['geographic_distribution'].items():
print(f" {country}: {count}")
print(f"\nCompliance Summary:")
for key, value in report['compliance'].items():
status = "" if value else ""
print(f" {key}: {status}")
print(f"\nDatabase created: {service.db_path}")
print("Use SQLite browser to view detailed results")
print()
print("=" * 60)
print("EDUCATIONAL DEMONSTRATION COMPLETE")
logger.info("Demonstration completed successfully")
logger.info("Key insights from WhatsApp vulnerability research applied:")
logger.info(" 1. Rate limiting prevents abuse")
logger.info(" 2. Phone number validation ensures data quality")
logger.info(" 3. Batch processing improves efficiency")
logger.info(" 4. Audit logging maintains accountability")
logger.info(" 5. Compliance checking enforces ethical use")
logger.info(" 6. Mock data protects privacy in educational contexts")
print("=" * 60)
logger.info("Educational demonstration complete")
logger.info("Always respect privacy and legal requirements in production")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())