glam/scripts/load_person_entities.py
2025-12-12 12:51:10 +01:00

252 lines
8 KiB
Python

#!/usr/bin/env python3
"""
Load person entity JSON files into PostgreSQL person_entity table.
Usage:
python scripts/load_person_entities.py [--host HOST] [--dry-run]
Example:
# Local PostgreSQL
python scripts/load_person_entities.py
# Remote server
python scripts/load_person_entities.py --host 91.98.224.44
"""
import json
import os
import sys
import argparse
import re
from datetime import datetime
from pathlib import Path
from typing import Optional
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
try:
import psycopg2
from psycopg2.extras import Json
except ImportError:
print("Error: psycopg2 not installed. Run: pip install psycopg2-binary")
sys.exit(1)
def extract_linkedin_slug(filename: str) -> str:
"""Extract LinkedIn slug from filename.
Examples:
giovannafossati_20251209T220000Z.json -> giovannafossati
alexandr-belov-bb547b46_20251210T120000Z.json -> alexandr-belov-bb547b46
"""
# Remove .json extension
base = filename.replace('.json', '')
# Split on timestamp pattern (underscore followed by date)
parts = re.split(r'_\d{8}T\d{6}Z', base)
return parts[0] if parts else base
def parse_extraction_date(data: dict) -> Optional[datetime]:
"""Extract the extraction timestamp from the JSON data."""
# Try exa_search_metadata.timestamp
if 'exa_search_metadata' in data:
ts = data['exa_search_metadata'].get('timestamp')
if ts:
try:
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except:
pass
# Try extraction_metadata.extraction_date
if 'extraction_metadata' in data:
ts = data['extraction_metadata'].get('extraction_date')
if ts:
try:
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except:
pass
# Try provenance.extraction_date
if 'provenance' in data:
ts = data['provenance'].get('extraction_date')
if ts:
try:
return datetime.fromisoformat(ts.replace('Z', '+00:00'))
except:
pass
return None
def load_entity_files(entity_dir: str) -> list:
"""Load all JSON files from the entity directory."""
entities = []
entity_path = Path(entity_dir)
if not entity_path.exists():
print(f"Error: Entity directory not found: {entity_dir}")
return entities
json_files = list(entity_path.glob("*.json"))
print(f"Found {len(json_files)} JSON files in {entity_dir}")
for filepath in json_files:
try:
with open(filepath, 'r', encoding='utf-8') as f:
data = json.load(f)
linkedin_slug = extract_linkedin_slug(filepath.name)
profile_data = data.get('profile_data', {})
# Extract key fields
name = profile_data.get('full_name') or profile_data.get('name', '')
headline = profile_data.get('headline', '')
location = profile_data.get('location', '')
extraction_date = parse_extraction_date(data)
entities.append({
'linkedin_slug': linkedin_slug,
'name': name,
'headline': headline,
'location': location,
'profile_data': data, # Store entire JSON
'extraction_date': extraction_date,
'source_file': filepath.name
})
except json.JSONDecodeError as e:
print(f"Error parsing {filepath.name}: {e}")
except Exception as e:
print(f"Error loading {filepath.name}: {e}")
return entities
def insert_entities(conn, entities: list, dry_run: bool = False):
"""Insert entities into person_entity table."""
inserted = 0
updated = 0
errors = 0
cursor = conn.cursor()
for entity in entities:
try:
if dry_run:
print(f" [DRY-RUN] Would insert: {entity['linkedin_slug']} ({entity['name']})")
inserted += 1
continue
# Use UPSERT - insert or update if exists
cursor.execute("""
INSERT INTO person_entity (
linkedin_slug, name, headline, location,
profile_data, extraction_date, updated_date
) VALUES (%s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (linkedin_slug)
DO UPDATE SET
name = EXCLUDED.name,
headline = EXCLUDED.headline,
location = EXCLUDED.location,
profile_data = EXCLUDED.profile_data,
extraction_date = EXCLUDED.extraction_date,
updated_date = NOW()
RETURNING (xmax = 0) AS inserted
""", (
entity['linkedin_slug'],
entity['name'][:500] if entity['name'] else None,
entity['headline'],
entity['location'][:500] if entity['location'] else None,
Json(entity['profile_data']),
entity['extraction_date']
))
result = cursor.fetchone()
if result and result[0]:
inserted += 1
else:
updated += 1
except Exception as e:
print(f" Error inserting {entity['linkedin_slug']}: {e}")
errors += 1
conn.rollback()
continue
if not dry_run:
conn.commit()
cursor.close()
return inserted, updated, errors
def main():
parser = argparse.ArgumentParser(description='Load person entity files into PostgreSQL')
parser.add_argument('--host', default='localhost', help='PostgreSQL host (default: localhost)')
parser.add_argument('--port', default=5432, type=int, help='PostgreSQL port (default: 5432)')
parser.add_argument('--database', default='glam', help='Database name (default: glam)')
parser.add_argument('--user', default='postgres', help='Database user (default: postgres)')
parser.add_argument('--password', default='', help='Database password')
parser.add_argument('--entity-dir', default='data/custodian/person/entity',
help='Entity directory path')
parser.add_argument('--dry-run', action='store_true', help='Show what would be done without executing')
args = parser.parse_args()
# Resolve entity directory
script_dir = Path(__file__).parent
project_root = script_dir.parent
entity_dir = project_root / args.entity_dir
print(f"Loading person entities from: {entity_dir}")
print(f"Target database: {args.user}@{args.host}:{args.port}/{args.database}")
if args.dry_run:
print("*** DRY RUN MODE - No changes will be made ***")
# Load entity files
entities = load_entity_files(str(entity_dir))
if not entities:
print("No entities to load.")
return
print(f"\nLoaded {len(entities)} entities from files")
# Connect to PostgreSQL
try:
conn = psycopg2.connect(
host=args.host,
port=args.port,
database=args.database,
user=args.user,
password=args.password
)
print(f"Connected to PostgreSQL")
except Exception as e:
print(f"Error connecting to PostgreSQL: {e}")
return
# Insert entities
print("\nInserting entities...")
inserted, updated, errors = insert_entities(conn, entities, args.dry_run)
print(f"\nResults:")
print(f" Inserted: {inserted}")
print(f" Updated: {updated}")
print(f" Errors: {errors}")
# Verify count
if not args.dry_run:
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM person_entity")
total = cursor.fetchone()[0]
cursor.close()
print(f" Total in database: {total}")
conn.close()
print("\nDone!")
if __name__ == '__main__':
main()