#!/usr/bin/env python3 """ WhatsApp Discovery Script for Heritage Professionals Tests phone number variations to find WhatsApp accounts. Uses exponential backoff strategy: 10^0, 10^1, 10^2, 10^3, 10^4 Usage: python whatsapp_discovery_test.py Example: python whatsapp_discovery_test.py 3162940 31 python whatsapp_discovery_test.py 3161489 31 python whatsapp_discovery_test.py 3165313 31 """ import asyncio import aiohttp import json import os import sys import time from datetime import datetime from pathlib import Path class WhatsAppDiscoveryTester: """Tests WhatsApp phone number variations to discover active accounts.""" def __init__(self, phone_base: str, country_code: str): self.phone_base = phone_base.replace('XXXX', '').replace('X', '') self.country_code = country_code self.results = [] async def test_single_variation(self, session, variation: str, attempt_num: int): """Test a single phone number variation.""" phone = f"{self.country_code}{variation}" # WhatsApp Business API check # Using WhatsApp Business Cloud API to check if phone number has WhatsApp account if not os.getenv("WHATSAPP_BUSINESS_TOKEN"): return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "error", "response": "No API token", "exists": False, "error": "WHATSAPP_BUSINESS_TOKEN environment variable not set" } headers = { 'Authorization': f'Bearer {os.getenv("WHATSAPP_BUSINESS_TOKEN")}', 'Content-Type': 'application/json' } # WhatsApp Business API endpoint for checking phone numbers api_url = f"https://graph.facebook.com/v18.0/phone_numbers" data = { "phone_number": phone, "fields": ["display_name", "is_business", "last_activity"] } try: async with session.post(api_url, json=data, headers=headers) as response: if response.status == 200: result_data = await response.json() if result_data.get("data"): phone_info = result_data["data"][0] return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "found", "response": f"{str(response.status)} OK", "exists": True, "display_name": phone_info.get("display_name"), "is_business": phone_info.get("is_business", False), "last_activity": phone_info.get("last_activity") } else: return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "no_account", "response": f"{response.status} No Data", "exists": False } elif response.status == 404: return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "not_found", "response": f"{response.status} Not Found", "exists": False } else: return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "error", "response": f"{response.status} {response.reason}", "exists": False, "error": await response.text() } except Exception as e: return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "error", "error": str(e), "exists": False } except Exception as e: return { "phone": phone, "variation": variation, "attempt": attempt_num, "status": "error", "error": str(e), "exists": False } def generate_variations(self): """Generate exponential backoff variations: 10^0, 10^1, 10^2, 10^3, 10^4.""" base = self.phone_base variations = [] # 10^0: Original number variations.append(base) # 10^1: Replace last digit with 0-9 for i in range(10): if len(base) >= 1: variation = base[:-1] + str(i) variations.append(variation) # 10^2: Replace last 2 digits with 00-99 for i in range(100): if len(base) >= 2: variation = base[:-2] + f"{i:02d}" variations.append(variation) # 10^3: Replace last 3 digits with 000-999 for i in range(1000): if len(base) >= 3: variation = base[:-3] + f"{i:03d}" variations.append(variation) # 10^4: Replace last 4 digits with 0000-9999 for i in range(10000): if len(base) >= 4: variation = base[:-4] + f"{i:04d}" variations.append(variation) return variations async def run_discovery(self, max_attempts: int = 100): """Run WhatsApp discovery with exponential backoff.""" print(f"πŸ” Starting WhatsApp discovery for {self.country_code}{self.phone_base}XXXX") print(f"πŸ“Š Testing up to {max_attempts} variations using exponential backoff strategy") variations = self.generate_variations() print(f"πŸ“ˆ Generated {len(variations)} total variations") # Create HTTP session timeout = aiohttp.ClientTimeout(total=30) connector = aiohttp.TCPConnector(limit=100, force_close=True) async with aiohttp.ClientSession( timeout=timeout, connector=connector, headers={'User-Agent': 'WhatsApp-Discovery-Tester/1.0'} ) as session: # Test variations with exponential backoff for i, variation in enumerate(variations[:max_attempts]): result = await self.test_single_variation(session, variation, i + 1) self.results.append(result) # Print progress if result["exists"]: print(f"βœ… Attempt {i+1}: {variation} -> FOUND! {result['status']}") if result.get("business_account"): print(f" πŸ“Š Business account detected, last seen: {result['last_seen']}") break # Stop on first success else: status_symbol = "❌" if result["status"] == "not_found" else "⚠️" print(f"{status_symbol} Attempt {i+1}: {variation} -> {result['status']}") # Rate limiting if (i + 1) % 10 == 0: print(f"⏱️ Pausing for rate limit (attempt {i+1})...") await asyncio.sleep(2) # Summary found_count = sum(1 for r in self.results if r["exists"]) print(f"\nπŸ“Š Discovery Complete!") print(f" Total attempts: {len(self.results)}") print(f" Successful finds: {found_count}") print(f" Success rate: {(found_count/len(self.results)*100):.1f}%") if found_count > 0: successful_results = [r for r in self.results if r["exists"]] print(f"\n🎯 Successful Discoveries:") for result in successful_results: print(f" πŸ“± {result['phone']} -> {result['status']}") if result.get("business_account"): print(f" πŸ’Ό Business: {result['last_seen']}") return self.results def save_results(self, output_file: str = None): """Save results to JSON file.""" if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"whatsapp_discovery_{self.country_code}{self.phone_base}_{timestamp}.json" output_path = Path(output_file) # Prepare output data output_data = { "discovery_metadata": { "phone_base": f"{self.country_code}{self.phone_base}XXXX", "country_code": self.country_code, "discovery_date": datetime.now().isoformat(), "method": "exponential_backoff_wa_api_simulation", "total_variations_tested": len(self.results), "max_attempts": len(self.results) }, "results": self.results } # Write to file with open(output_path, 'w') as f: json.dump(output_data, f, indent=2) print(f"\nπŸ’Ύ Results saved to: {output_path}") return output_path async def main(): """Main function to run WhatsApp discovery.""" if len(sys.argv) != 3: print("Usage: python whatsapp_discovery_test.py ") print("Example: python whatsapp_discovery_test.py 3162940 31") print("\nAvailable phone bases from our enrichment:") print(" 3162940 - Bas Schreuder") print(" 3161489 - Bjorn de Jong") print(" 3165313 - MylΓ¨ne Da Silva") sys.exit(1) phone_base = sys.argv[1] country_code = sys.argv[2] tester = WhatsAppDiscoveryTester(phone_base, country_code) results = await tester.run_discovery(max_attempts=100) # Save results output_file = tester.save_results() print(f"\n🎯 Discovery complete! Check {output_file} for results.") if __name__ == "__main__": asyncio.run(main())