248 lines
7.9 KiB
Python
248 lines
7.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Verify website URLs extracted from LinkedIn About pages.
|
|
|
|
Checks:
|
|
- HTTP status codes
|
|
- Redirects (follows and records final URL)
|
|
- SSL/TLS errors
|
|
- Connection timeouts
|
|
- DNS failures
|
|
|
|
Usage:
|
|
python scripts/verify_website_links.py [--input FILE] [--output-dir DIR] [--timeout SECONDS]
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import sys
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.parse import urlparse
|
|
|
|
import httpx
|
|
|
|
|
|
# Default timeout in seconds
|
|
DEFAULT_TIMEOUT = 15
|
|
MAX_CONCURRENT = 20 # Limit concurrent requests
|
|
|
|
|
|
async def check_url(
|
|
client: httpx.AsyncClient,
|
|
url: str,
|
|
custodian_name: str,
|
|
timeout: float = DEFAULT_TIMEOUT
|
|
) -> dict[str, Any]:
|
|
"""Check if a URL is accessible and return verification result."""
|
|
result = {
|
|
'custodian_name': custodian_name,
|
|
'original_url': url,
|
|
'final_url': None,
|
|
'status_code': None,
|
|
'is_alive': False,
|
|
'is_redirect': False,
|
|
'error': None,
|
|
'response_time_ms': None,
|
|
'checked_at': datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
|
|
}
|
|
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
try:
|
|
# Use GET with follow_redirects to get final URL
|
|
response = await client.get(url, follow_redirects=True, timeout=timeout)
|
|
|
|
end_time = asyncio.get_event_loop().time()
|
|
result['response_time_ms'] = int((end_time - start_time) * 1000)
|
|
result['status_code'] = response.status_code
|
|
result['final_url'] = str(response.url)
|
|
result['is_redirect'] = str(response.url) != url
|
|
|
|
# Consider 2xx and 3xx as alive
|
|
result['is_alive'] = 200 <= response.status_code < 400
|
|
|
|
except httpx.TimeoutException:
|
|
result['error'] = 'timeout'
|
|
except httpx.ConnectError as e:
|
|
if 'Name or service not known' in str(e) or 'getaddrinfo' in str(e):
|
|
result['error'] = 'dns_failure'
|
|
elif 'Connection refused' in str(e):
|
|
result['error'] = 'connection_refused'
|
|
else:
|
|
result['error'] = f'connection_error: {str(e)[:100]}'
|
|
except httpx.HTTPStatusError as e:
|
|
result['status_code'] = e.response.status_code
|
|
result['error'] = f'http_error: {e.response.status_code}'
|
|
except Exception as e:
|
|
error_type = type(e).__name__
|
|
result['error'] = f'{error_type}: {str(e)[:100]}'
|
|
|
|
return result
|
|
|
|
|
|
async def verify_urls(
|
|
urls: list[dict[str, str]],
|
|
timeout: float = DEFAULT_TIMEOUT,
|
|
max_concurrent: int = MAX_CONCURRENT
|
|
) -> list[dict[str, Any]]:
|
|
"""Verify a list of URLs concurrently."""
|
|
results = []
|
|
semaphore = asyncio.Semaphore(max_concurrent)
|
|
|
|
async with httpx.AsyncClient(
|
|
headers={
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
|
|
'Accept-Language': 'en-US,en;q=0.5',
|
|
},
|
|
verify=False, # Skip SSL verification for checking dead links
|
|
) as client:
|
|
|
|
async def check_with_semaphore(item: dict[str, str]) -> dict[str, Any]:
|
|
async with semaphore:
|
|
return await check_url(
|
|
client,
|
|
item['url'],
|
|
item['custodian_name'],
|
|
timeout
|
|
)
|
|
|
|
tasks = [check_with_semaphore(item) for item in urls]
|
|
results = await asyncio.gather(*tasks)
|
|
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Verify website URLs extracted from LinkedIn About pages'
|
|
)
|
|
parser.add_argument(
|
|
'--input', '-i',
|
|
type=Path,
|
|
help='Input JSON file with About page data (defaults to most recent)'
|
|
)
|
|
parser.add_argument(
|
|
'--output-dir', '-o',
|
|
type=Path,
|
|
default=Path('data/custodian/person/affiliated/verified_links'),
|
|
help='Directory to save verification results'
|
|
)
|
|
parser.add_argument(
|
|
'--timeout', '-t',
|
|
type=float,
|
|
default=DEFAULT_TIMEOUT,
|
|
help=f'Request timeout in seconds (default: {DEFAULT_TIMEOUT})'
|
|
)
|
|
parser.add_argument(
|
|
'--limit', '-l',
|
|
type=int,
|
|
default=None,
|
|
help='Limit number of URLs to verify'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Find input file
|
|
about_data_dir = Path('data/custodian/person/affiliated/about_data')
|
|
if args.input:
|
|
input_file = args.input
|
|
else:
|
|
# Find most recent about_data file
|
|
about_files = sorted(about_data_dir.glob('about_data_*.json'), reverse=True)
|
|
if not about_files:
|
|
print("Error: No about_data files found", file=sys.stderr)
|
|
sys.exit(1)
|
|
input_file = about_files[0]
|
|
|
|
print(f"Loading data from: {input_file}")
|
|
|
|
with open(input_file, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
# Extract URLs to verify
|
|
urls_to_verify = []
|
|
for item in data['data']:
|
|
if item['website_url']:
|
|
urls_to_verify.append({
|
|
'custodian_name': item['custodian_name'],
|
|
'url': item['website_url'],
|
|
})
|
|
|
|
print(f"Found {len(urls_to_verify)} URLs to verify")
|
|
|
|
if args.limit:
|
|
urls_to_verify = urls_to_verify[:args.limit]
|
|
print(f"Limited to {len(urls_to_verify)} URLs")
|
|
|
|
# Verify URLs
|
|
print(f"Verifying URLs with {args.timeout}s timeout...")
|
|
results = asyncio.run(verify_urls(urls_to_verify, args.timeout))
|
|
|
|
# Analyze results
|
|
alive = [r for r in results if r['is_alive']]
|
|
dead = [r for r in results if not r['is_alive']]
|
|
redirects = [r for r in results if r['is_redirect']]
|
|
|
|
# Categorize errors
|
|
error_types = {}
|
|
for r in dead:
|
|
error = r['error'] or 'unknown'
|
|
error_type = error.split(':')[0] if ':' in error else error
|
|
error_types[error_type] = error_types.get(error_type, 0) + 1
|
|
|
|
print(f"\n{'='*60}")
|
|
print("VERIFICATION RESULTS")
|
|
print(f" Total checked: {len(results)}")
|
|
print(f" Alive: {len(alive)} ({100*len(alive)/len(results):.1f}%)")
|
|
print(f" Dead: {len(dead)} ({100*len(dead)/len(results):.1f}%)")
|
|
print(f" Redirected: {len(redirects)}")
|
|
print(f"\nDead link errors:")
|
|
for error_type, count in sorted(error_types.items(), key=lambda x: -x[1]):
|
|
print(f" {error_type}: {count}")
|
|
|
|
# Save results
|
|
args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')
|
|
|
|
# Save full results
|
|
full_output = args.output_dir / f'verification_results_{timestamp}.json'
|
|
with open(full_output, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'timestamp': timestamp,
|
|
'source_file': str(input_file),
|
|
'total_checked': len(results),
|
|
'alive_count': len(alive),
|
|
'dead_count': len(dead),
|
|
'redirect_count': len(redirects),
|
|
'error_summary': error_types,
|
|
'results': results,
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
# Save dead links separately for follow-up
|
|
dead_output = args.output_dir / f'dead_links_{timestamp}.json'
|
|
with open(dead_output, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
'timestamp': timestamp,
|
|
'count': len(dead),
|
|
'links': dead,
|
|
}, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\nOutput files:")
|
|
print(f" Full results: {full_output}")
|
|
print(f" Dead links: {dead_output}")
|
|
|
|
# Print some dead links for reference
|
|
if dead:
|
|
print(f"\nSample dead links (need website discovery):")
|
|
for item in dead[:10]:
|
|
print(f" - {item['custodian_name']}: {item['original_url']} ({item['error']})")
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|