252 lines
8 KiB
Python
252 lines
8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Load person entity JSON files into PostgreSQL person_entity table.
|
|
|
|
Usage:
|
|
python scripts/load_person_entities.py [--host HOST] [--dry-run]
|
|
|
|
Example:
|
|
# Local PostgreSQL
|
|
python scripts/load_person_entities.py
|
|
|
|
# Remote server
|
|
python scripts/load_person_entities.py --host 91.98.224.44
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
import argparse
|
|
import re
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
try:
|
|
import psycopg2
|
|
from psycopg2.extras import Json
|
|
except ImportError:
|
|
print("Error: psycopg2 not installed. Run: pip install psycopg2-binary")
|
|
sys.exit(1)
|
|
|
|
|
|
def extract_linkedin_slug(filename: str) -> str:
|
|
"""Extract LinkedIn slug from filename.
|
|
|
|
Examples:
|
|
giovannafossati_20251209T220000Z.json -> giovannafossati
|
|
alexandr-belov-bb547b46_20251210T120000Z.json -> alexandr-belov-bb547b46
|
|
"""
|
|
# Remove .json extension
|
|
base = filename.replace('.json', '')
|
|
# Split on timestamp pattern (underscore followed by date)
|
|
parts = re.split(r'_\d{8}T\d{6}Z', base)
|
|
return parts[0] if parts else base
|
|
|
|
|
|
def parse_extraction_date(data: dict) -> Optional[datetime]:
|
|
"""Extract the extraction timestamp from the JSON data."""
|
|
# Try exa_search_metadata.timestamp
|
|
if 'exa_search_metadata' in data:
|
|
ts = data['exa_search_metadata'].get('timestamp')
|
|
if ts:
|
|
try:
|
|
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
|
except:
|
|
pass
|
|
|
|
# Try extraction_metadata.extraction_date
|
|
if 'extraction_metadata' in data:
|
|
ts = data['extraction_metadata'].get('extraction_date')
|
|
if ts:
|
|
try:
|
|
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
|
except:
|
|
pass
|
|
|
|
# Try provenance.extraction_date
|
|
if 'provenance' in data:
|
|
ts = data['provenance'].get('extraction_date')
|
|
if ts:
|
|
try:
|
|
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
|
|
except:
|
|
pass
|
|
|
|
return None
|
|
|
|
|
|
def load_entity_files(entity_dir: str) -> list:
|
|
"""Load all JSON files from the entity directory."""
|
|
entities = []
|
|
entity_path = Path(entity_dir)
|
|
|
|
if not entity_path.exists():
|
|
print(f"Error: Entity directory not found: {entity_dir}")
|
|
return entities
|
|
|
|
json_files = list(entity_path.glob("*.json"))
|
|
print(f"Found {len(json_files)} JSON files in {entity_dir}")
|
|
|
|
for filepath in json_files:
|
|
try:
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
linkedin_slug = extract_linkedin_slug(filepath.name)
|
|
profile_data = data.get('profile_data', {})
|
|
|
|
# Extract key fields
|
|
name = profile_data.get('full_name') or profile_data.get('name', '')
|
|
headline = profile_data.get('headline', '')
|
|
location = profile_data.get('location', '')
|
|
extraction_date = parse_extraction_date(data)
|
|
|
|
entities.append({
|
|
'linkedin_slug': linkedin_slug,
|
|
'name': name,
|
|
'headline': headline,
|
|
'location': location,
|
|
'profile_data': data, # Store entire JSON
|
|
'extraction_date': extraction_date,
|
|
'source_file': filepath.name
|
|
})
|
|
|
|
except json.JSONDecodeError as e:
|
|
print(f"Error parsing {filepath.name}: {e}")
|
|
except Exception as e:
|
|
print(f"Error loading {filepath.name}: {e}")
|
|
|
|
return entities
|
|
|
|
|
|
def insert_entities(conn, entities: list, dry_run: bool = False):
|
|
"""Insert entities into person_entity table."""
|
|
inserted = 0
|
|
updated = 0
|
|
errors = 0
|
|
|
|
cursor = conn.cursor()
|
|
|
|
for entity in entities:
|
|
try:
|
|
if dry_run:
|
|
print(f" [DRY-RUN] Would insert: {entity['linkedin_slug']} ({entity['name']})")
|
|
inserted += 1
|
|
continue
|
|
|
|
# Use UPSERT - insert or update if exists
|
|
cursor.execute("""
|
|
INSERT INTO person_entity (
|
|
linkedin_slug, name, headline, location,
|
|
profile_data, extraction_date, updated_date
|
|
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
|
|
ON CONFLICT (linkedin_slug)
|
|
DO UPDATE SET
|
|
name = EXCLUDED.name,
|
|
headline = EXCLUDED.headline,
|
|
location = EXCLUDED.location,
|
|
profile_data = EXCLUDED.profile_data,
|
|
extraction_date = EXCLUDED.extraction_date,
|
|
updated_date = NOW()
|
|
RETURNING (xmax = 0) AS inserted
|
|
""", (
|
|
entity['linkedin_slug'],
|
|
entity['name'][:500] if entity['name'] else None,
|
|
entity['headline'],
|
|
entity['location'][:500] if entity['location'] else None,
|
|
Json(entity['profile_data']),
|
|
entity['extraction_date']
|
|
))
|
|
|
|
result = cursor.fetchone()
|
|
if result and result[0]:
|
|
inserted += 1
|
|
else:
|
|
updated += 1
|
|
|
|
except Exception as e:
|
|
print(f" Error inserting {entity['linkedin_slug']}: {e}")
|
|
errors += 1
|
|
conn.rollback()
|
|
continue
|
|
|
|
if not dry_run:
|
|
conn.commit()
|
|
|
|
cursor.close()
|
|
return inserted, updated, errors
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Load person entity files into PostgreSQL')
|
|
parser.add_argument('--host', default='localhost', help='PostgreSQL host (default: localhost)')
|
|
parser.add_argument('--port', default=5432, type=int, help='PostgreSQL port (default: 5432)')
|
|
parser.add_argument('--database', default='glam', help='Database name (default: glam)')
|
|
parser.add_argument('--user', default='postgres', help='Database user (default: postgres)')
|
|
parser.add_argument('--password', default='', help='Database password')
|
|
parser.add_argument('--entity-dir', default='data/custodian/person/entity',
|
|
help='Entity directory path')
|
|
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without executing')
|
|
args = parser.parse_args()
|
|
|
|
# Resolve entity directory
|
|
script_dir = Path(__file__).parent
|
|
project_root = script_dir.parent
|
|
entity_dir = project_root / args.entity_dir
|
|
|
|
print(f"Loading person entities from: {entity_dir}")
|
|
print(f"Target database: {args.user}@{args.host}:{args.port}/{args.database}")
|
|
|
|
if args.dry_run:
|
|
print("*** DRY RUN MODE - No changes will be made ***")
|
|
|
|
# Load entity files
|
|
entities = load_entity_files(str(entity_dir))
|
|
|
|
if not entities:
|
|
print("No entities to load.")
|
|
return
|
|
|
|
print(f"\nLoaded {len(entities)} entities from files")
|
|
|
|
# Connect to PostgreSQL
|
|
try:
|
|
conn = psycopg2.connect(
|
|
host=args.host,
|
|
port=args.port,
|
|
database=args.database,
|
|
user=args.user,
|
|
password=args.password
|
|
)
|
|
print(f"Connected to PostgreSQL")
|
|
except Exception as e:
|
|
print(f"Error connecting to PostgreSQL: {e}")
|
|
return
|
|
|
|
# Insert entities
|
|
print("\nInserting entities...")
|
|
inserted, updated, errors = insert_entities(conn, entities, args.dry_run)
|
|
|
|
print(f"\nResults:")
|
|
print(f" Inserted: {inserted}")
|
|
print(f" Updated: {updated}")
|
|
print(f" Errors: {errors}")
|
|
|
|
# Verify count
|
|
if not args.dry_run:
|
|
cursor = conn.cursor()
|
|
cursor.execute("SELECT COUNT(*) FROM person_entity")
|
|
total = cursor.fetchone()[0]
|
|
cursor.close()
|
|
print(f" Total in database: {total}")
|
|
|
|
conn.close()
|
|
print("\nDone!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|