#!/usr/bin/env python3 """ Authenticate Unipile with LinkedIn credentials. This script connects Unipile to LinkedIn using username/password. """ import os import sys import httpx import json from datetime import datetime from dotenv import load_dotenv # Load environment variables load_dotenv() def authenticate_unipile(): """Authenticate Unipile with LinkedIn credentials.""" # Get credentials api_key = os.getenv('UNIPILE_API_KEY') linkedin_user = os.getenv('LINKEDIN_USER') linkedin_pass = os.getenv('LINKEDIN_PASSWORD') dsn = os.getenv('UNIPILE_DSN', 'api1.unipile.com:13111') if not all([api_key, linkedin_user, linkedin_pass]): print("❌ Missing required environment variables:") print(f" UNIPILE_API_KEY: {'✓' if api_key else '✗'}") print(f" LINKEDIN_USER: {'✓' if linkedin_user else '✗'}") print(f" LINKEDIN_PASSWORD: {'✓' if linkedin_pass else '✗'}") return False print("\n" + "="*80) print("AUTHENTICATING UNIPILE WITH LINKEDIN") print("="*80) print(f"API Key: {api_key[:20]}...") print(f"LinkedIn User: {linkedin_user}") print(f"DSN: {dsn}") # Prepare authentication request auth_url = f"https://{dsn}/api/v1/accounts" headers = { "X-API-KEY": api_key, "accept": "application/json", "content-type": "application/json" } auth_data = { "provider": "LINKEDIN", "username": linkedin_user, "password": linkedin_pass, "user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36" } try: with httpx.Client(timeout=60.0) as client: print(f"\n📡 Sending authentication request to: {auth_url}") response = client.post(auth_url, headers=headers, json=auth_data) print(f"Status Code: {response.status_code}") if response.status_code == 200: data = response.json() print("\n✅ Authentication successful!") print(f"Account ID: {data.get('id')}") print(f"Provider: {data.get('provider')}") print(f"Status: {data.get('status')}") # Save authentication info auth_info = { "authenticated_at": datetime.now().isoformat() + "Z", "account_id": data.get('id'), "provider": data.get('provider'), "status": data.get('status'), "unipile_dsn": dsn } with open('.unipile_auth.json', 'w') as f: json.dump(auth_info, f, indent=2) print("\n💾 Authentication info saved to: .unipile_auth.json") print("\n🚀 You can now run LinkedIn enrichment!") print(" python scripts/enrich_linkedin_ultimate.py") return True elif response.status_code == 401: print("\n❌ Authentication failed!") print("Please check your LinkedIn credentials.") error_data = response.json() if 'message' in error_data: print(f"Error: {error_data['message']}") return False elif response.status_code == 403: print("\n❌ Access forbidden!") print("Account may be locked or credentials invalid.") return False else: print(f"\n❌ Unexpected status code: {response.status_code}") print("Response:") print(response.text) return False except httpx.TimeoutException: print("\n❌ Request timed out!") print("Please try again.") return False except Exception as e: print(f"\n❌ Exception occurred: {e}") return False def check_authentication_status(): """Check if already authenticated.""" if os.path.exists('.unipile_auth.json'): with open('.unipile_auth.json', 'r') as f: auth = json.load(f) print("\n" + "="*80) print("CURRENT AUTHENTICATION STATUS") print("="*80) print(f"Authenticated: ✅") print(f"Account ID: {auth.get('account_id')}") print(f"Provider: {auth.get('provider')}") print(f"Authenticated at: {auth.get('authenticated_at')}") print(f"DSN: {auth.get('unipile_dsn')}") # Check if recent (within 24 hours) auth_time = datetime.fromisoformat(auth['authenticated_at'].replace('Z', '+00:00')) now = datetime.now() hours_diff = (now - auth_time).total_seconds() / 3600 if hours_diff < 24: print("\n✅ Authentication is recent (within 24 hours)") print("You can proceed with LinkedIn enrichment.") else: print(f"\n⚠️ Authentication is old ({hours_diff:.1f} hours ago)") print("You may need to re-authenticate.") return True else: print("\n❌ No authentication found.") print("Please run authentication first.") return False def main(): """Main function.""" if len(sys.argv) > 1 and sys.argv[1] == '--check': check_authentication_status() else: authenticate_unipile() if __name__ == "__main__": main()