glam/scripts/sync/qdrant_person_sync.py

723 lines
26 KiB
Python

#!/usr/bin/env python3
"""
Qdrant Person Sync Module - Sync person/staff JSON files to Qdrant vector database.
This module syncs all person data (staff lists and entity profiles) to Qdrant
for semantic search of heritage institution personnel.
Usage:
python -m scripts.sync.qdrant_person_sync [--dry-run] [--limit N] [--host HOST] [--port PORT]
Data Sources:
- data/custodian/person/bu/*.json - Staff lists by custodian organization
- data/custodian/person/entity/*.json - Individual person profile files
"""
import argparse
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
import hashlib
# Add project root to path
PROJECT_ROOT = Path(__file__).parent.parent.parent
sys.path.insert(0, str(PROJECT_ROOT))
from scripts.sync import BaseSyncer, SyncResult, SyncStatus
# Configuration
QDRANT_HOST = os.getenv("QDRANT_HOST", "localhost")
QDRANT_PORT = int(os.getenv("QDRANT_PORT", "6333"))
QDRANT_URL = os.getenv("QDRANT_URL", "")
COLLECTION_NAME = "heritage_persons"
BATCH_SIZE = 100
# Person data directory
PERSON_DATA_DIR = PROJECT_ROOT / "data" / "custodian" / "person"
def generate_person_id(name: str, custodian_slug: str, staff_id: str = "") -> str:
"""Generate a unique ID for a person record."""
if staff_id:
return staff_id
# Create deterministic ID from name + custodian
unique_str = f"{custodian_slug}:{name}".lower()
return hashlib.md5(unique_str.encode()).hexdigest()[:16]
def extract_person_text_from_staff(person: dict, custodian_name: str, custodian_slug: str) -> str:
"""Extract searchable text from a staff record."""
parts = []
name = person.get("name", "")
if name:
parts.append(f"Name: {name}")
if custodian_name:
parts.append(f"Works at: {custodian_name}")
headline = person.get("headline", "")
if headline:
parts.append(f"Role: {headline}")
heritage_type = person.get("heritage_type", "")
if heritage_type:
type_names = {
"A": "Archive",
"M": "Museum",
"L": "Library",
"G": "Gallery",
"R": "Research center",
"E": "Education",
"D": "Digital platform",
"O": "Official institution",
}
type_name = type_names.get(heritage_type, heritage_type)
parts.append(f"Sector: {type_name}")
return "\n".join(parts)
def extract_person_text_from_entity(profile: dict) -> str:
"""Extract searchable text from an entity profile."""
parts = []
profile_data = profile.get("profile_data", {})
extraction_meta = profile.get("extraction_metadata", {})
# Name
name = profile_data.get("name") or profile_data.get("full_name", "")
if name:
parts.append(f"Name: {name}")
# Headline/role
headline = profile_data.get("headline", "")
if headline:
parts.append(f"Role: {headline}")
# Location
location = profile_data.get("location", "")
if location:
parts.append(f"Location: {location}")
# About/summary
about = profile_data.get("about") or profile_data.get("summary", "")
if about:
# Truncate long about sections
about_text = about[:500] if len(about) > 500 else about
parts.append(f"About: {about_text}")
# Experience - extract current/recent positions
experience = profile_data.get("experience", []) or profile_data.get("career_history", [])
if experience:
for exp in experience[:3]: # Top 3 positions
if isinstance(exp, dict):
title = exp.get("title") or exp.get("role", "")
company = exp.get("company") or exp.get("organization", "")
if title and company:
parts.append(f"Experience: {title} at {company}")
elif title:
parts.append(f"Experience: {title}")
# Heritage-relevant experience
heritage_exp = profile_data.get("heritage_relevant_experience", [])
if heritage_exp:
parts.append("Heritage sector experience:")
for exp in heritage_exp[:3]:
if isinstance(exp, dict):
title = exp.get("title") or exp.get("role", "")
company = exp.get("company") or exp.get("organization", "")
if title and company:
parts.append(f" - {title} at {company}")
# Skills
skills = profile_data.get("skills", [])
if skills and isinstance(skills, list):
skill_names = []
for s in skills[:10]:
if isinstance(s, str):
skill_names.append(s)
elif isinstance(s, dict):
skill_names.append(s.get("name", s.get("skill", "")))
if skill_names:
parts.append(f"Skills: {', '.join(skill_names)}")
return "\n".join(parts)
def extract_person_metadata_from_staff(
person: dict,
custodian_name: str,
custodian_slug: str,
source_file: str,
) -> dict:
"""Extract metadata for filtering from a staff record."""
metadata = {
"source_type": "staff_list",
"source_file": source_file,
"custodian_name": custodian_name,
"custodian_slug": custodian_slug,
}
name = person.get("name", "")
if name:
metadata["name"] = name
staff_id = person.get("staff_id", "")
if staff_id:
metadata["staff_id"] = staff_id
headline = person.get("headline", "")
if headline:
metadata["headline"] = headline
heritage_relevant = person.get("heritage_relevant", False)
metadata["heritage_relevant"] = heritage_relevant
heritage_type = person.get("heritage_type", "")
if heritage_type:
metadata["heritage_type"] = heritage_type
degree = person.get("degree", "")
if degree:
metadata["connection_degree"] = degree
return metadata
def extract_person_metadata_from_entity(profile: dict, source_file: str) -> dict:
"""Extract metadata for filtering from an entity profile."""
metadata = {
"source_type": "entity_profile",
"source_file": source_file,
}
profile_data = profile.get("profile_data", {})
extraction_meta = profile.get("extraction_metadata", {})
name = profile_data.get("name") or profile_data.get("full_name", "")
if name:
metadata["name"] = name
linkedin_url = (
profile_data.get("linkedin_url") or
extraction_meta.get("linkedin_url", "")
)
if linkedin_url:
metadata["linkedin_url"] = linkedin_url
staff_id = extraction_meta.get("staff_id", "")
if staff_id:
metadata["staff_id"] = staff_id
headline = profile_data.get("headline", "")
if headline:
metadata["headline"] = headline
location = profile_data.get("location", "")
if location:
metadata["location"] = location
# Extract custodian from experience if available
experience = profile_data.get("experience", []) or profile_data.get("career_history", [])
if experience and isinstance(experience, list) and len(experience) > 0:
current_exp = experience[0]
if isinstance(current_exp, dict):
company = current_exp.get("company") or current_exp.get("organization", "")
if company:
metadata["current_organization"] = company
return metadata
class QdrantPersonSyncer(BaseSyncer):
"""Sync person/staff JSON files to Qdrant vector database."""
database_name = "qdrant_persons"
def __init__(
self,
host: str = QDRANT_HOST,
port: int = QDRANT_PORT,
url: str = QDRANT_URL,
collection: str = COLLECTION_NAME,
batch_size: int = BATCH_SIZE,
person_dir: Path = PERSON_DATA_DIR,
**kwargs
):
super().__init__(**kwargs)
self.host = host
self.port = port
self.url = url
self.collection = collection
self.batch_size = batch_size
self.person_dir = person_dir
self._retriever = None
self._client = None
self._embedding_model = None
self._embedding_dim = None
self._use_openai = None
def _get_client(self):
"""Lazy-load the Qdrant client (lightweight, for connection checks)."""
if self._client is None:
try:
from qdrant_client import QdrantClient
if self.url:
self._client = QdrantClient(url=self.url, timeout=10, check_compatibility=False)
else:
self._client = QdrantClient(
host=self.host,
port=self.port,
timeout=10,
check_compatibility=False,
)
except ImportError as e:
self.logger.error(f"Cannot import qdrant_client: {e}")
raise
return self._client
def _get_embedding_model(self):
"""Get the embedding model (sentence-transformers or OpenAI wrapper)."""
if self._embedding_model is None:
has_openai = bool(os.getenv("OPENAI_API_KEY"))
if not has_openai:
# Use sentence-transformers
from sentence_transformers import SentenceTransformer
self._embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
self._embedding_dim = 384
self._use_openai = False
self.logger.info("Using sentence-transformers for embeddings")
else:
# Use OpenAI
import openai
self._embedding_model = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self._embedding_dim = 1536
self._use_openai = True
self.logger.info("Using OpenAI for embeddings")
return self._embedding_model
def _get_embeddings(self, texts: list[str]) -> list[list[float]]:
"""Get embeddings for a batch of texts."""
model = self._get_embedding_model()
if self._use_openai:
response = model.embeddings.create(
input=texts,
model="text-embedding-3-small"
)
return [item.embedding for item in sorted(response.data, key=lambda x: x.index)]
else:
# Sentence-transformers
embeddings = model.encode(texts, show_progress_bar=False)
return embeddings.tolist()
def _ensure_collection(self, recreate: bool = False):
"""Ensure the collection exists with correct settings."""
from qdrant_client.http.models import Distance, VectorParams
client = self._get_client()
_ = self._get_embedding_model() # Initialize to get embedding_dim
assert self._embedding_dim is not None, "Embedding dimension not set"
collections = client.get_collections().collections
collection_names = [c.name for c in collections]
if recreate and self.collection in collection_names:
self.logger.info(f"Deleting collection: {self.collection}")
client.delete_collection(self.collection)
collection_names.remove(self.collection)
if self.collection not in collection_names:
self.logger.info(f"Creating collection: {self.collection} (dim={self._embedding_dim})")
client.create_collection(
collection_name=self.collection,
vectors_config=VectorParams(
size=self._embedding_dim,
distance=Distance.COSINE
)
)
def _index_documents(self, documents: list[dict]) -> int:
"""Index documents to Qdrant with embeddings."""
from qdrant_client.http.models import PointStruct
import hashlib
import uuid
client = self._get_client()
total_indexed = 0
# Process in batches
for i in range(0, len(documents), self.batch_size):
batch = documents[i:i + self.batch_size]
texts = [d["text"] for d in batch if d.get("text")]
if not texts:
continue
# Get embeddings
embeddings = self._get_embeddings(texts)
# Create points
points = []
for doc, embedding in zip(batch, embeddings):
# Generate UUID v5 from text for deterministic ID
doc_text = doc.get("text", "")
doc_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, doc_text))
point = PointStruct(
id=doc_id,
vector=embedding,
payload=doc.get("metadata", {})
)
points.append(point)
# Upsert
client.upsert(
collection_name=self.collection,
points=points
)
total_indexed += len(points)
if i % 1000 == 0:
self.logger.info(f" Indexed {total_indexed}/{len(documents)} documents...")
return total_indexed
def check_connection(self) -> bool:
"""Check if Qdrant is available."""
try:
client = self._get_client()
collections = client.get_collections()
return True
except Exception as e:
self.logger.error(f"Qdrant connection failed: {e}")
return False
def get_status(self) -> dict:
"""Get Qdrant status for person collection."""
try:
client = self._get_client()
collections = client.get_collections().collections
collection_names = [c.name for c in collections]
if self.collection in collection_names:
info = client.get_collection(self.collection)
return {
"status": info.status.value if info.status else "unknown",
"vectors_count": info.vectors_count or 0,
"points_count": info.points_count or 0,
"collection_exists": True,
}
else:
return {
"status": "ready",
"vectors_count": 0,
"points_count": 0,
"collection_exists": False,
"available_collections": collection_names,
}
except Exception as e:
return {"status": "unavailable", "error": str(e)}
def _list_staff_files(self) -> list[Path]:
"""List all staff JSON files from multiple directories."""
staff_files = []
# Check bu/ directory (legacy location)
bu_dir = self.person_dir / "bu"
if bu_dir.exists():
staff_files.extend(bu_dir.glob("*_staff_*.json"))
# Check affiliated/parsed/ directory (current location)
affiliated_dir = self.person_dir / "affiliated" / "parsed"
if affiliated_dir.exists():
staff_files.extend(affiliated_dir.glob("*_staff_*.json"))
else:
self.logger.warning(f"Staff directory not found: {affiliated_dir}")
if not staff_files:
self.logger.warning("No staff files found in any directory")
return sorted(set(staff_files)) # Deduplicate and sort
def _list_entity_files(self) -> list[Path]:
"""List all entity profile JSON files."""
entity_dir = self.person_dir / "entity"
if not entity_dir.exists():
self.logger.warning(f"Entity directory not found: {entity_dir}")
return []
return sorted(entity_dir.glob("*.json"))
def _process_staff_file(self, filepath: Path) -> list[dict]:
"""Process a staff JSON file and return documents for indexing."""
documents = []
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
self.logger.warning(f"Error reading {filepath.name}: {e}")
return documents
custodian_meta = data.get("custodian_metadata", {})
custodian_name = custodian_meta.get("custodian_name") or custodian_meta.get("name", "")
custodian_slug = custodian_meta.get("custodian_slug", "")
# Derive slug from filename if not in metadata
if not custodian_slug:
# e.g., "nationaal-archief_staff_20251209T2354.json" -> "nationaal-archief"
custodian_slug = filepath.stem.split("_staff_")[0]
staff_list = data.get("staff", [])
for person in staff_list:
if not isinstance(person, dict):
continue
name = person.get("name", "")
if not name or name.lower() == "linkedin member":
continue
text = extract_person_text_from_staff(person, custodian_name, custodian_slug)
if not text or len(text) < 10:
continue
metadata = extract_person_metadata_from_staff(
person, custodian_name, custodian_slug, filepath.name
)
documents.append({
"text": text,
"metadata": metadata,
})
return documents
def _process_entity_file(self, filepath: Path) -> Optional[dict]:
"""Process an entity profile JSON file and return document for indexing."""
try:
with open(filepath, "r", encoding="utf-8") as f:
data = json.load(f)
except Exception as e:
self.logger.warning(f"Error reading {filepath.name}: {e}")
return None
profile_data = data.get("profile_data", {})
name = profile_data.get("name") or profile_data.get("full_name", "")
if not name:
return None
text = extract_person_text_from_entity(data)
if not text or len(text) < 10:
return None
metadata = extract_person_metadata_from_entity(data, filepath.name)
return {
"text": text,
"metadata": metadata,
}
def sync(
self,
limit: Optional[int] = None,
dry_run: bool = False,
recreate: bool = True,
include_staff: bool = True,
include_entities: bool = True,
) -> SyncResult:
"""Sync all person JSON files to Qdrant."""
result = SyncResult(
database="qdrant_persons",
status=SyncStatus.IN_PROGRESS,
start_time=datetime.now(timezone.utc),
)
# Check for embedding capability (OpenAI or sentence-transformers)
has_openai = bool(os.getenv("OPENAI_API_KEY"))
try:
from sentence_transformers import SentenceTransformer
has_st = True
except ImportError:
has_st = False
if not dry_run and not has_openai and not has_st:
result.status = SyncStatus.FAILED
result.error_message = (
"No embedding capability available. Either set OPENAI_API_KEY or "
"install sentence-transformers: pip install sentence-transformers"
)
result.end_time = datetime.now(timezone.utc)
return result
if not has_openai and has_st:
self.logger.info("Using sentence-transformers for embeddings (384 dimensions)")
elif has_openai:
self.logger.info("Using OpenAI for embeddings (1536 dimensions)")
documents = []
# Process staff files
if include_staff:
staff_files = self._list_staff_files()
self.logger.info(f"Found {len(staff_files)} staff files")
for filepath in staff_files:
docs = self._process_staff_file(filepath)
documents.extend(docs)
result.records_processed += 1
if limit and len(documents) >= limit:
break
# Process entity files
if include_entities:
entity_files = self._list_entity_files()
self.logger.info(f"Found {len(entity_files)} entity files")
for filepath in entity_files:
if limit and len(documents) >= limit:
break
doc = self._process_entity_file(filepath)
if doc:
documents.append(doc)
result.records_processed += 1
# Apply limit
if limit:
documents = documents[:limit]
result.records_succeeded = len(documents)
result.details["documents_prepared"] = len(documents)
result.details["staff_count"] = sum(
1 for d in documents if d["metadata"].get("source_type") == "staff_list"
)
result.details["entity_count"] = sum(
1 for d in documents if d["metadata"].get("source_type") == "entity_profile"
)
self.logger.info(
f"Prepared {len(documents)} documents "
f"(staff: {result.details['staff_count']}, entities: {result.details['entity_count']})"
)
if dry_run:
self.logger.info(f"[DRY RUN] Would index {len(documents)} documents to Qdrant")
result.status = SyncStatus.SUCCESS
result.details["dry_run"] = True
result.end_time = datetime.now(timezone.utc)
return result
# Check connection
if not self.check_connection():
result.status = SyncStatus.FAILED
result.error_message = f"Cannot connect to Qdrant at {self.url or f'{self.host}:{self.port}'}"
result.end_time = datetime.now(timezone.utc)
return result
# Ensure collection exists (recreate if requested)
try:
self._ensure_collection(recreate=recreate)
except Exception as e:
result.status = SyncStatus.FAILED
result.error_message = f"Failed to create collection: {e}"
result.end_time = datetime.now(timezone.utc)
return result
# Index documents
self.logger.info(f"Indexing {len(documents)} documents...")
try:
indexed = self._index_documents(documents)
result.details["indexed_count"] = indexed
# Get final count
client = self._get_client()
info = client.get_collection(self.collection)
result.details["final_vectors_count"] = info.points_count or 0
result.status = SyncStatus.SUCCESS
except Exception as e:
self.logger.error(f"Indexing failed: {e}")
result.status = SyncStatus.FAILED
result.error_message = str(e)
result.end_time = datetime.now(timezone.utc)
return result
def main():
parser = argparse.ArgumentParser(description="Sync person/staff JSON files to Qdrant")
parser.add_argument("--dry-run", action="store_true", help="Parse files but don't index")
parser.add_argument("--limit", type=int, help="Limit number of documents to process")
parser.add_argument("--host", default=QDRANT_HOST, help="Qdrant server hostname")
parser.add_argument("--port", type=int, default=QDRANT_PORT, help="Qdrant REST API port")
parser.add_argument("--url", default=QDRANT_URL, help="Full Qdrant URL (overrides host/port)")
parser.add_argument("--collection", default=COLLECTION_NAME, help="Qdrant collection name")
parser.add_argument("--batch-size", type=int, default=BATCH_SIZE, help="Documents per batch")
parser.add_argument("--no-recreate", action="store_true", help="Don't recreate collection")
parser.add_argument("--staff-only", action="store_true", help="Only index staff lists")
parser.add_argument("--entities-only", action="store_true", help="Only index entity profiles")
args = parser.parse_args()
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
syncer = QdrantPersonSyncer(
host=args.host,
port=args.port,
url=args.url,
collection=args.collection,
batch_size=args.batch_size,
)
print("=" * 60)
print("Qdrant Person Sync")
print("=" * 60)
if not args.dry_run:
location = args.url or f"{args.host}:{args.port}"
print(f"Checking connection to {location}...")
try:
status = syncer.get_status()
print(f" Status: {status.get('status', 'unknown')}")
if status.get('vectors_count'):
print(f" Vectors: {status['vectors_count']:,}")
except Exception as e:
print(f" Connection check failed: {e}")
include_staff = not args.entities_only
include_entities = not args.staff_only
result = syncer.sync(
limit=args.limit,
dry_run=args.dry_run,
recreate=not args.no_recreate,
include_staff=include_staff,
include_entities=include_entities,
)
print("\n" + "=" * 60)
print(f"Sync Result: {result.status.value.upper()}")
print(f" Processed: {result.records_processed}")
print(f" Documents: {result.details.get('documents_prepared', 0)}")
print(f" - Staff records: {result.details.get('staff_count', 0)}")
print(f" - Entity profiles: {result.details.get('entity_count', 0)}")
print(f" Duration: {result.duration_seconds:.2f}s")
if result.error_message:
print(f" Error: {result.error_message}")
print("=" * 60)
if __name__ == "__main__":
main()