#!/usr/bin/env python3 """ Contact Discovery Service - Educational Implementation Based on insights from WhatsApp vulnerability research This service demonstrates technical concepts behind contact discovery mechanisms with privacy-first design and ethical safeguards. """ import asyncio import logging import time import json import sqlite3 import hashlib from datetime import datetime, timezone from typing import List, Dict, Optional from dataclasses import dataclass, asdict from enum import Enum # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class DiscoveryMode(Enum): """Enumeration modes for contact discovery""" RESEARCH = "research" # Academic research with IRB approval DEFENSIVE = "defensive" # Testing own infrastructure AUDIT = "audit" # Compliance auditing DEMO = "demo" # Educational demonstration @dataclass class ContactInfo: """Contact information structure""" phone_number: str country_code: str is_active: bool = False profile_info: Optional[Dict] = None discovery_timestamp: Optional[datetime] = None source_method: str = "" def __post_init__(self): if self.discovery_timestamp is None: self.discovery_timestamp = datetime.now(timezone.utc) @dataclass class DiscoveryConfig: """Configuration for contact discovery""" mode: DiscoveryMode max_queries_per_second: int = 10 # Conservative rate limit batch_size: int = 50 enable_metadata_collection: bool = False respect_rate_limits: bool = True require_consent: bool = True log_all_activities: bool = True def validate(self) -> bool: """Validate configuration for compliance""" if self.mode == DiscoveryMode.RESEARCH: return self.require_consent and self.respect_rate_limits elif self.mode in [DiscoveryMode.DEFENSIVE, DiscoveryMode.DEMO]: return True return False class PhoneValidator: """Phone number validation utilities""" @staticmethod def validate_number(phone_number: str) -> bool: """Basic phone number validation""" # Check if it starts with + and has 10-15 digits if not phone_number.startswith('+'): return False digits = phone_number[1:] return len(digits) >= 10 and len(digits) <= 15 and digits.isdigit() @staticmethod def extract_country_code(phone_number: str) -> str: """Extract country code from phone number""" # Simple country code mapping country_map = { '+1': 'US', '+44': 'GB', '+49': 'DE', '+33': 'FR', '+31': 'NL', '+34': 'ES', '+39': 'IT', '+43': 'AT', '+41': 'CH', '+46': 'SE', '+47': 'NO', '+45': 'DK', '+358': 'FI', '+353': 'IE', '+351': 'PT', '+30': 'GR', '+420': 'CZ', '+36': 'HU', '+48': 'PL', '+40': 'RO', '+359': 'BG', '+385': 'HR', '+386': 'SI', '+421': 'SK', '+372': 'EE', '+371': 'LV', '+370': 'LT' } for code, country in country_map.items(): if phone_number.startswith(code): return country return 'UNKNOWN' class RateLimiter: """Rate limiting for ethical discovery""" def __init__(self, max_per_second: int): self.max_per_second = max_per_second self.request_times = [] self.lock = asyncio.Lock() async def acquire(self): """Acquire rate limit token""" async with self.lock: now = time.time() # Remove old requests (older than 1 second) self.request_times = [ req_time for req_time in self.request_times if now - req_time < 1.0 ] if len(self.request_times) >= self.max_per_second: # Calculate wait time sleep_time = 1.0 - (now - self.request_times[0]) if sleep_time > 0: logger.info(f"Rate limit reached, waiting {sleep_time:.2f}s") await asyncio.sleep(sleep_time) return await self.acquire() self.request_times.append(now) return True class ContactDiscoveryService: """Main contact discovery service""" def __init__(self, config: DiscoveryConfig): self.config = config self.validator = PhoneValidator() self.rate_limiter = RateLimiter(config.max_queries_per_second) self.db_path = "discovery_results.db" self._init_database() # Validate configuration if not config.validate(): raise ValueError("Invalid configuration for discovery mode") def _init_database(self): """Initialize SQLite database for results""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS discoveries ( id INTEGER PRIMARY KEY AUTOINCREMENT, phone_number TEXT UNIQUE, country_code TEXT, is_active BOOLEAN, discovery_timestamp TEXT, source_method TEXT, metadata_json TEXT, compliance_verified BOOLEAN DEFAULT FALSE ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp TEXT, action TEXT, details TEXT, compliance_check BOOLEAN ) ''') conn.commit() conn.close() async def discover_contacts(self, phone_numbers: List[str]) -> List[ContactInfo]: """Discover active contacts from phone numbers""" logger.info(f"Starting discovery for {len(phone_numbers)} numbers") logger.info(f"Mode: {self.config.mode.value}") logger.info(f"Rate limit: {self.config.max_queries_per_second}/second") # Log discovery start self._log_audit_event("DISCOVERY_START", { "count": len(phone_numbers), "mode": self.config.mode.value, "rate_limit": self.config.max_queries_per_second }) contacts = [] # Process in batches for i in range(0, len(phone_numbers), self.config.batch_size): batch = phone_numbers[i:i + self.config.batch_size] # Respect rate limits if self.config.respect_rate_limits: await self.rate_limiter.acquire() # Process batch batch_results = await self._process_batch(batch) contacts.extend(batch_results) # Progress update logger.info(f"Processed {min(i + self.config.batch_size, len(phone_numbers))}/{len(phone_numbers)}") # Ethical pause between batches if self.config.respect_rate_limits: await asyncio.sleep(0.1) # Log completion self._log_audit_event("DISCOVERY_COMPLETE", { "total_processed": len(phone_numbers), "active_found": sum(1 for c in contacts if c.is_active), "mode": self.config.mode.value }) return contacts async def _process_batch(self, phone_numbers: List[str]) -> List[ContactInfo]: """Process a batch of phone numbers""" results = [] for phone_number in phone_numbers: # Validate phone number if not self.validator.validate_number(phone_number): logger.warning(f"Invalid phone number: {phone_number}") continue # Create contact info contact = ContactInfo( phone_number=phone_number, country_code=self.validator.extract_country_code(phone_number), source_method="educational_simulation" ) # Process discovery with deterministic algorithm if self.config.mode == DiscoveryMode.DEMO: # Demo mode: deterministic pattern for demonstration contact.is_active = hash(phone_number) % 3 == 0 if contact.is_active and self.config.enable_metadata_collection: contact.profile_info = self._generate_mock_profile() else: # For other modes, default to not active for safety contact.is_active = False results.append(contact) # Store in database self._store_contact(contact) return results def _generate_mock_profile(self) -> Dict: """Generate demonstration profile for educational mode""" return { "about_text": "Educational demonstration profile - simulated data only", "profile_picture_url": None, "last_seen": None, "is_business": False, "device_type": "educational_demo" } def _store_contact(self, contact: ContactInfo): """Store contact in database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() timestamp = contact.discovery_timestamp or datetime.now(timezone.utc) timestamp_str = timestamp.isoformat() cursor.execute(''' INSERT OR REPLACE INTO discoveries (phone_number, country_code, is_active, discovery_timestamp, source_method, metadata_json) VALUES (?, ?, ?, ?, ?, ?) ''', ( contact.phone_number, contact.country_code, contact.is_active, timestamp_str, contact.source_method, json.dumps(contact.profile_info) if contact.profile_info else None )) conn.commit() conn.close() def _log_audit_event(self, action: str, details: Dict): """Log audit event""" if not self.config.log_all_activities: return conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' INSERT INTO audit_log (timestamp, action, details, compliance_check) VALUES (?, ?, ?, ?) ''', ( datetime.now(timezone.utc).isoformat(), action, json.dumps(details), True # All actions are logged as compliant by default )) conn.commit() conn.close() def generate_report(self) -> Dict: """Generate discovery report""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Get statistics cursor.execute("SELECT COUNT(*) FROM discoveries") total_processed = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM discoveries WHERE is_active = 1") active_found = cursor.fetchone()[0] cursor.execute(''' SELECT country_code, COUNT(*) FROM discoveries GROUP BY country_code ORDER BY COUNT(*) DESC ''') country_distribution = dict(cursor.fetchall()) cursor.execute(''' SELECT DATE(discovery_timestamp) as date, COUNT(*) FROM discoveries GROUP BY DATE(discovery_timestamp) ORDER BY date ''') daily_activity = dict(cursor.fetchall()) conn.close() return { "summary": { "total_processed": total_processed, "active_found": active_found, "success_rate": active_found / total_processed if total_processed > 0 else 0, "mode": self.config.mode.value }, "geographic_distribution": country_distribution, "daily_activity": daily_activity, "compliance": { "rate_limiting": self.config.respect_rate_limits, "consent_required": self.config.require_consent, "metadata_collection": self.config.enable_metadata_collection } } class ComplianceChecker: """Check compliance with privacy regulations""" @staticmethod def check_gdpr_compliance(config: DiscoveryConfig) -> Dict: """Check GDPR compliance""" checks = { "lawful_basis": config.mode in [DiscoveryMode.RESEARCH, DiscoveryMode.DEFENSIVE, DiscoveryMode.DEMO], "data_minimization": not config.enable_metadata_collection, "purpose_limitation": config.mode != DiscoveryMode.AUDIT, "consent": config.require_consent or config.mode == DiscoveryMode.DEMO, "security_measures": config.respect_rate_limits, "rights_respect": True } return { "compliant": all(checks.values()), "checks": checks, "recommendations": ComplianceChecker._get_gdpr_recommendations(checks) } @staticmethod def _get_gdpr_recommendations(checks: Dict) -> List[str]: """Get GDPR compliance recommendations""" recommendations = [] if not checks["lawful_basis"]: recommendations.append("Ensure lawful basis for processing (research/defensive only)") if not checks["data_minimization"]: recommendations.append("Disable metadata collection to minimize data") if not checks["consent"]: recommendations.append("Require explicit consent for data processing") if not checks["security_measures"]: recommendations.append("Implement rate limiting and security measures") return recommendations async def main(): """Main demonstration function""" print("=" * 60) print("CONTACT DISCOVERY SERVICE - EDUCATIONAL DEMONSTRATION") print("=" * 60) print() logger.info("Service initialized for educational demonstration") logger.info("All operations use mock data only") logger.info("Rate limiting and compliance checks enabled") print() # Create configuration for demo mode config = DiscoveryConfig( mode=DiscoveryMode.DEMO, max_queries_per_second=5, # Very conservative for demo batch_size=10, enable_metadata_collection=False, # Privacy-first respect_rate_limits=True, require_consent=False, # Demo mode with simulated data log_all_activities=True ) # Check compliance compliance = ComplianceChecker.check_gdpr_compliance(config) print(f"Compliance Status: {'✅ COMPLIANT' if compliance['compliant'] else '❌ NON-COMPLIANT'}") print() if not compliance['compliant']: print("Compliance Issues:") for rec in compliance['recommendations']: print(f" - {rec}") print() return # Initialize service service = ContactDiscoveryService(config) # Generate demonstration phone numbers logger.info("Generating demonstration phone numbers for educational testing") test_numbers = [ "+15555551234", # US demo number "+442012345678", # UK demo number "+491234567890", # Germany demo number "+33123456789", # France demo number "+31123456789", # Netherlands demo number "+34123456789", # Spain demo number "+39123456789", # Italy demo number "+43123456789", # Austria demo number "+41123456789", # Switzerland demo number "+46123456789", # Sweden demo number "+47123456789", # Norway demo number "+45123456789", # Denmark demo number "+358123456789", # Finland demo number "+353123456789", # Ireland demo number "+351123456789", # Portugal demo number "+30123456789", # Greece demo number "+420123456789", # Czech Republic demo number "+36123456789", # Hungary demo number "+48123456789", # Poland demo number "+40123456789", # Romania demo number "+359123456789", # Bulgaria demo number "+385123456789", # Croatia demo number "+386123456789", # Slovenia demo number "+421123456789", # Slovakia demo number "+372123456789", # Estonia demo number "+371123456789", # Latvia demo number "+370123456789", # Lithuania test number ] logger.info(f"Generated {len(test_numbers)} demonstration numbers") logger.info("All numbers are for educational testing only") print() # Run discovery print("Starting contact discovery demo...") contacts = await service.discover_contacts(test_numbers) # Results active_contacts = [c for c in contacts if c.is_active] print(f"\nResults:") print(f" Total processed: {len(contacts)}") print(f" Active found: {len(active_contacts)}") print(f" Success rate: {len(active_contacts)/len(contacts)*100:.1f}%") # Generate report report = service.generate_report() print(f"\nGeographic Distribution:") for country, count in report['geographic_distribution'].items(): print(f" {country}: {count}") print(f"\nCompliance Summary:") for key, value in report['compliance'].items(): status = "✅" if value else "❌" print(f" {key}: {status}") print(f"\nDatabase created: {service.db_path}") print("Use SQLite browser to view detailed results") print() print("=" * 60) print("EDUCATIONAL DEMONSTRATION COMPLETE") logger.info("Demonstration completed successfully") logger.info("Key insights from WhatsApp vulnerability research applied:") logger.info(" 1. Rate limiting prevents abuse") logger.info(" 2. Phone number validation ensures data quality") logger.info(" 3. Batch processing improves efficiency") logger.info(" 4. Audit logging maintains accountability") logger.info(" 5. Compliance checking enforces ethical use") logger.info(" 6. Mock data protects privacy in educational contexts") print("=" * 60) logger.info("Educational demonstration complete") logger.info("Always respect privacy and legal requirements in production") print("=" * 60) if __name__ == "__main__": asyncio.run(main())