glam/whatsapp_discovery_test.py
2025-12-14 17:09:55 +01:00

271 lines
No EOL
10 KiB
Python

#!/usr/bin/env python3
"""
WhatsApp Discovery Script for Heritage Professionals
Tests phone number variations to find WhatsApp accounts.
Uses exponential backoff strategy: 10^0, 10^1, 10^2, 10^3, 10^4
Usage:
python whatsapp_discovery_test.py <phone_base> <country_code>
Example:
python whatsapp_discovery_test.py 3162940 31
python whatsapp_discovery_test.py 3161489 31
python whatsapp_discovery_test.py 3165313 31
"""
import asyncio
import aiohttp
import json
import os
import sys
import time
from datetime import datetime
from pathlib import Path
class WhatsAppDiscoveryTester:
"""Tests WhatsApp phone number variations to discover active accounts."""
def __init__(self, phone_base: str, country_code: str):
self.phone_base = phone_base.replace('XXXX', '').replace('X', '')
self.country_code = country_code
self.results = []
async def test_single_variation(self, session, variation: str, attempt_num: int):
"""Test a single phone number variation."""
phone = f"{self.country_code}{variation}"
# WhatsApp Business API check
# Using WhatsApp Business Cloud API to check if phone number has WhatsApp account
if not os.getenv("WHATSAPP_BUSINESS_TOKEN"):
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "error",
"response": "No API token",
"exists": False,
"error": "WHATSAPP_BUSINESS_TOKEN environment variable not set"
}
headers = {
'Authorization': f'Bearer {os.getenv("WHATSAPP_BUSINESS_TOKEN")}',
'Content-Type': 'application/json'
}
# WhatsApp Business API endpoint for checking phone numbers
api_url = f"https://graph.facebook.com/v18.0/phone_numbers"
data = {
"phone_number": phone,
"fields": ["display_name", "is_business", "last_activity"]
}
try:
async with session.post(api_url, json=data, headers=headers) as response:
if response.status == 200:
result_data = await response.json()
if result_data.get("data"):
phone_info = result_data["data"][0]
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "found",
"response": f"{str(response.status)} OK",
"exists": True,
"display_name": phone_info.get("display_name"),
"is_business": phone_info.get("is_business", False),
"last_activity": phone_info.get("last_activity")
}
else:
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "no_account",
"response": f"{response.status} No Data",
"exists": False
}
elif response.status == 404:
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "not_found",
"response": f"{response.status} Not Found",
"exists": False
}
else:
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "error",
"response": f"{response.status} {response.reason}",
"exists": False,
"error": await response.text()
}
except Exception as e:
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "error",
"error": str(e),
"exists": False
}
except Exception as e:
return {
"phone": phone,
"variation": variation,
"attempt": attempt_num,
"status": "error",
"error": str(e),
"exists": False
}
def generate_variations(self):
"""Generate exponential backoff variations: 10^0, 10^1, 10^2, 10^3, 10^4."""
base = self.phone_base
variations = []
# 10^0: Original number
variations.append(base)
# 10^1: Replace last digit with 0-9
for i in range(10):
if len(base) >= 1:
variation = base[:-1] + str(i)
variations.append(variation)
# 10^2: Replace last 2 digits with 00-99
for i in range(100):
if len(base) >= 2:
variation = base[:-2] + f"{i:02d}"
variations.append(variation)
# 10^3: Replace last 3 digits with 000-999
for i in range(1000):
if len(base) >= 3:
variation = base[:-3] + f"{i:03d}"
variations.append(variation)
# 10^4: Replace last 4 digits with 0000-9999
for i in range(10000):
if len(base) >= 4:
variation = base[:-4] + f"{i:04d}"
variations.append(variation)
return variations
async def run_discovery(self, max_attempts: int = 100):
"""Run WhatsApp discovery with exponential backoff."""
print(f"🔍 Starting WhatsApp discovery for {self.country_code}{self.phone_base}XXXX")
print(f"📊 Testing up to {max_attempts} variations using exponential backoff strategy")
variations = self.generate_variations()
print(f"📈 Generated {len(variations)} total variations")
# Create HTTP session
timeout = aiohttp.ClientTimeout(total=30)
connector = aiohttp.TCPConnector(limit=100, force_close=True)
async with aiohttp.ClientSession(
timeout=timeout,
connector=connector,
headers={'User-Agent': 'WhatsApp-Discovery-Tester/1.0'}
) as session:
# Test variations with exponential backoff
for i, variation in enumerate(variations[:max_attempts]):
result = await self.test_single_variation(session, variation, i + 1)
self.results.append(result)
# Print progress
if result["exists"]:
print(f"✅ Attempt {i+1}: {variation} -> FOUND! {result['status']}")
if result.get("business_account"):
print(f" 📊 Business account detected, last seen: {result['last_seen']}")
break # Stop on first success
else:
status_symbol = "" if result["status"] == "not_found" else "⚠️"
print(f"{status_symbol} Attempt {i+1}: {variation} -> {result['status']}")
# Rate limiting
if (i + 1) % 10 == 0:
print(f"⏱️ Pausing for rate limit (attempt {i+1})...")
await asyncio.sleep(2)
# Summary
found_count = sum(1 for r in self.results if r["exists"])
print(f"\n📊 Discovery Complete!")
print(f" Total attempts: {len(self.results)}")
print(f" Successful finds: {found_count}")
print(f" Success rate: {(found_count/len(self.results)*100):.1f}%")
if found_count > 0:
successful_results = [r for r in self.results if r["exists"]]
print(f"\n🎯 Successful Discoveries:")
for result in successful_results:
print(f" 📱 {result['phone']} -> {result['status']}")
if result.get("business_account"):
print(f" 💼 Business: {result['last_seen']}")
return self.results
def save_results(self, output_file: str = None):
"""Save results to JSON file."""
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"whatsapp_discovery_{self.country_code}{self.phone_base}_{timestamp}.json"
output_path = Path(output_file)
# Prepare output data
output_data = {
"discovery_metadata": {
"phone_base": f"{self.country_code}{self.phone_base}XXXX",
"country_code": self.country_code,
"discovery_date": datetime.now().isoformat(),
"method": "exponential_backoff_wa_api_simulation",
"total_variations_tested": len(self.results),
"max_attempts": len(self.results)
},
"results": self.results
}
# Write to file
with open(output_path, 'w') as f:
json.dump(output_data, f, indent=2)
print(f"\n💾 Results saved to: {output_path}")
return output_path
async def main():
"""Main function to run WhatsApp discovery."""
if len(sys.argv) != 3:
print("Usage: python whatsapp_discovery_test.py <phone_base> <country_code>")
print("Example: python whatsapp_discovery_test.py 3162940 31")
print("\nAvailable phone bases from our enrichment:")
print(" 3162940 - Bas Schreuder")
print(" 3161489 - Bjorn de Jong")
print(" 3165313 - Mylène Da Silva")
sys.exit(1)
phone_base = sys.argv[1]
country_code = sys.argv[2]
tester = WhatsAppDiscoveryTester(phone_base, country_code)
results = await tester.run_discovery(max_attempts=100)
# Save results
output_file = tester.save_results()
print(f"\n🎯 Discovery complete! Check {output_file} for results.")
if __name__ == "__main__":
asyncio.run(main())