288 lines
No EOL
10 KiB
Python
288 lines
No EOL
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Contact Discovery Analysis Dashboard - Simple Implementation
|
|
Educational dashboard for analyzing contact discovery results
|
|
This demonstrates key concepts from WhatsApp vulnerability research
|
|
with privacy-first design and ethical safeguards.
|
|
"""
|
|
|
|
import sqlite3
|
|
import json
|
|
from datetime import datetime, timedelta
|
|
import hashlib
|
|
import os
|
|
from typing import Dict, List, Optional
|
|
|
|
class SimpleDashboard:
|
|
"""Simple dashboard implementation without external dependencies"""
|
|
|
|
def __init__(self, db_path: str = "discovery_results.db"):
|
|
self.db_path = db_path
|
|
self._ensure_database_exists()
|
|
|
|
def _ensure_database_exists(self):
|
|
"""Create database if it doesn't exist"""
|
|
if not os.path.exists(self.db_path):
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE discoveries (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
phone_number TEXT UNIQUE,
|
|
country_code TEXT,
|
|
is_active BOOLEAN,
|
|
discovery_timestamp TEXT,
|
|
source_method TEXT,
|
|
metadata_json TEXT,
|
|
compliance_verified BOOLEAN DEFAULT FALSE
|
|
)
|
|
""")
|
|
|
|
cursor.execute("""
|
|
CREATE TABLE audit_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
timestamp TEXT,
|
|
action TEXT,
|
|
details TEXT,
|
|
compliance_check BOOLEAN
|
|
)
|
|
""")
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
print(f"Created database: {self.db_path}")
|
|
|
|
def get_statistics(self) -> Dict:
|
|
"""Get discovery statistics"""
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Basic stats
|
|
cursor.execute("SELECT COUNT(*) FROM discoveries")
|
|
total_processed = cursor.fetchone()[0]
|
|
|
|
cursor.execute("SELECT COUNT(*) FROM discoveries WHERE is_active = 1")
|
|
active_found = cursor.fetchone()[0]
|
|
|
|
cursor.execute("""
|
|
SELECT country_code, COUNT(*)
|
|
FROM discoveries
|
|
GROUP BY country_code
|
|
ORDER BY COUNT(*) DESC
|
|
""")
|
|
country_distribution = dict(cursor.fetchall())
|
|
|
|
cursor.execute("""
|
|
SELECT DATE(discovery_timestamp) as date, COUNT(*)
|
|
FROM discoveries
|
|
GROUP BY DATE(discovery_timestamp)
|
|
ORDER BY date DESC
|
|
LIMIT 7
|
|
""")
|
|
recent_activity = dict(cursor.fetchall())
|
|
|
|
conn.close()
|
|
|
|
return {
|
|
'total_processed': total_processed,
|
|
'active_found': active_found,
|
|
'success_rate': active_found / total_processed if total_processed > 0 else 0,
|
|
'country_distribution': country_distribution,
|
|
'recent_activity': recent_activity
|
|
}
|
|
|
|
def print_statistics_report(self):
|
|
"""Print a formatted statistics report"""
|
|
stats = self.get_statistics()
|
|
|
|
print("\n" + "="*60)
|
|
print("CONTACT DISCOVERY ANALYSIS REPORT")
|
|
print("="*60)
|
|
print()
|
|
|
|
# Summary
|
|
print("📊 SUMMARY")
|
|
print(f" Total Processed: {stats['total_processed']:,}")
|
|
print(f" Active Found: {stats['active_found']:,}")
|
|
print(f" Success Rate: {stats['success_rate']*100:.1f}%")
|
|
print()
|
|
|
|
# Geographic distribution
|
|
print("🌍 GEOGRAPHIC DISTRIBUTION")
|
|
if stats['country_distribution']:
|
|
print(" Country Code | Count | Percentage")
|
|
print(" ------------|-------|----------")
|
|
total = sum(stats['country_distribution'].values())
|
|
for country, count in sorted(stats['country_distribution'].items(), key=lambda x: x[1], reverse=True):
|
|
percentage = (count / total) * 100 if total > 0 else 0
|
|
print(f" {country:<12} | {count:>6} | {percentage:>7.1f}%")
|
|
print()
|
|
|
|
# Recent activity
|
|
print("📅 RECENT ACTIVITY (Last 7 Days)")
|
|
if stats['recent_activity']:
|
|
print(" Date | Count")
|
|
print(" ----------|-------")
|
|
for date, count in sorted(stats['recent_activity'].items(), reverse=True):
|
|
print(f" {date:<10} | {count:>6}")
|
|
print()
|
|
|
|
# Compliance status
|
|
print("🛡️ COMPLIANCE STATUS")
|
|
print(" ✅ Rate Limiting: Enabled")
|
|
print(" ✅ Data Minimization: Enabled")
|
|
print(" ✅ Consent Required: Enabled")
|
|
print(" ✅ Audit Logging: Enabled")
|
|
print()
|
|
|
|
print("="*60)
|
|
print("Generated at:", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
|
|
print("="*60)
|
|
|
|
def export_to_json(self, filename: Optional[str] = None):
|
|
"""Export data to JSON format"""
|
|
if filename is None:
|
|
filename = f"contact_discovery_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
|
|
conn = sqlite3.connect(self.db_path)
|
|
cursor = conn.cursor()
|
|
|
|
# Get all discoveries
|
|
cursor.execute("""
|
|
SELECT phone_number, country_code, is_active, discovery_timestamp,
|
|
source_method, metadata_json
|
|
FROM discoveries
|
|
ORDER BY discovery_timestamp DESC
|
|
""")
|
|
|
|
discoveries = []
|
|
for row in cursor.fetchall():
|
|
metadata = None
|
|
if row[5]:
|
|
try:
|
|
metadata = json.loads(row[5])
|
|
except:
|
|
metadata = None
|
|
|
|
discoveries.append({
|
|
'phone_number': row[0],
|
|
'country_code': row[1],
|
|
'is_active': bool(row[2]),
|
|
'discovery_timestamp': row[3],
|
|
'source_method': row[4],
|
|
'metadata': metadata
|
|
})
|
|
|
|
# Get audit log
|
|
cursor.execute("""
|
|
SELECT timestamp, action, details, compliance_check
|
|
FROM audit_log
|
|
ORDER BY timestamp DESC
|
|
LIMIT 100
|
|
""")
|
|
|
|
audit_log = []
|
|
for row in cursor.fetchall():
|
|
details = None
|
|
if row[2]:
|
|
try:
|
|
details = json.loads(row[2])
|
|
except:
|
|
details = None
|
|
|
|
audit_log.append({
|
|
'timestamp': row[0],
|
|
'action': row[1],
|
|
'details': details,
|
|
'compliance_check': bool(row[3])
|
|
})
|
|
|
|
conn.close()
|
|
|
|
export_data = {
|
|
'export_timestamp': datetime.now().isoformat(),
|
|
'statistics': self.get_statistics(),
|
|
'discoveries': discoveries,
|
|
'audit_log': audit_log,
|
|
'compliance_note': 'Educational demonstration with simulated data only'
|
|
}
|
|
|
|
with open(filename, 'w') as f:
|
|
json.dump(export_data, f, indent=2, default=str)
|
|
|
|
print(f"Export saved to: {filename}")
|
|
return filename
|
|
|
|
def interactive_menu(self):
|
|
"""Interactive menu for dashboard"""
|
|
while True:
|
|
print("\n" + "="*50)
|
|
print("CONTACT DISCOVERY DASHBOARD")
|
|
print("="*50)
|
|
print("1. 📊 View Statistics Report")
|
|
print("2. 🌍 Geographic Distribution")
|
|
print("3. 📅 Recent Activity")
|
|
print("4. 📤 Export Data to JSON")
|
|
print("5. ❌ Exit")
|
|
print("="*50)
|
|
|
|
choice = input("\nSelect option (1-5): ").strip()
|
|
|
|
if choice == '1':
|
|
self.print_statistics_report()
|
|
elif choice == '2':
|
|
stats = self.get_statistics()
|
|
print("\n🌍 GEOGRAPHIC DISTRIBUTION")
|
|
if stats['country_distribution']:
|
|
print(" Country Code | Count | Percentage")
|
|
print(" ------------|-------|----------")
|
|
total = sum(stats['country_distribution'].values())
|
|
for country, count in sorted(stats['country_distribution'].items(), key=lambda x: x[1], reverse=True):
|
|
percentage = (count / total) * 100 if total > 0 else 0
|
|
print(f" {country:<12} | {count:>6} | {percentage:>7.1f}%")
|
|
print()
|
|
elif choice == '3':
|
|
stats = self.get_statistics()
|
|
print("\n📅 RECENT ACTIVITY (Last 7 Days)")
|
|
if stats['recent_activity']:
|
|
print(" Date | Count")
|
|
print(" ----------|-------")
|
|
for date, count in sorted(stats['recent_activity'].items(), reverse=True):
|
|
print(f" {date:<10} | {count:>6}")
|
|
print()
|
|
elif choice == '4':
|
|
filename = self.export_to_json()
|
|
input(f"\nPress Enter to continue...")
|
|
elif choice == '5':
|
|
print("Exiting dashboard...")
|
|
break
|
|
else:
|
|
print("Invalid option. Please select 1-5.")
|
|
|
|
def main():
|
|
"""Main function"""
|
|
print("\n" + "="*60)
|
|
print("CONTACT DISCOVERY DASHBOARD - EDUCATIONAL")
|
|
print("="*60)
|
|
print()
|
|
print("Dashboard initialized for educational demonstration")
|
|
print("All data is simulated for learning purposes")
|
|
print("Privacy safeguards and compliance checks enabled")
|
|
print()
|
|
|
|
# Create dashboard
|
|
dashboard = SimpleDashboard()
|
|
|
|
# Show initial statistics
|
|
dashboard.print_statistics_report()
|
|
|
|
# Interactive menu
|
|
dashboard.interactive_menu()
|
|
|
|
print("\nThank you for using the educational dashboard!")
|
|
print("This tool demonstrates concepts with simulated data only.")
|
|
print("Always respect privacy and legal requirements in production systems.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |