#!/usr/bin/env python3 """ Index Heritage Persons in Qdrant This script reads person entity JSON files and indexes them in Qdrant for semantic search and RAG-enhanced queries about heritage sector professionals. Usage: python scripts/index_persons_qdrant.py [--data-dir DATA_DIR] [--host HOST] [--port PORT] Examples: # Index all persons from default data directory python scripts/index_persons_qdrant.py # Index from specific directory python scripts/index_persons_qdrant.py --data-dir data/custodian/person/entity/ # Connect to remote Qdrant python scripts/index_persons_qdrant.py --host 91.98.224.44 --port 6333 """ import argparse import json import logging import os import sys from pathlib import Path from typing import Any # Add project root to path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT / "src")) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S" ) logger = logging.getLogger(__name__) def load_json_file(filepath: Path) -> dict[str, Any] | None: """Load a JSON file and return its contents.""" try: with open(filepath, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: logger.warning(f"Failed to load {filepath}: {e}") return None def extract_person_text(data: dict[str, Any]) -> str: """Extract searchable text from person entity data.""" parts = [] profile = data.get("profile_data", {}) person = data.get("person", {}) source_staff = data.get("source_staff_info", {}) extraction = data.get("extraction_metadata", {}) # Full name - check ALL possible locations in order of priority name = ( profile.get("full_name") or profile.get("name") or person.get("full_name") or person.get("name") or source_staff.get("name") or source_staff.get("person_name") or extraction.get("person_name") or data.get("name") or "" ) if name: parts.append(f"Name: {name}") # Headline / current position headline = profile.get("headline", "") if headline: parts.append(f"Role: {headline}") # Location location = profile.get("location", "") if location: parts.append(f"Location: {location}") # About / summary about = profile.get("about", "") if about: parts.append(about[:500]) # Truncate long about sections # Current position details current_pos = profile.get("current_position", {}) if current_pos and isinstance(current_pos, dict): company = current_pos.get("company", "") title = current_pos.get("title", "") if company: parts.append(f"Currently at: {company}") if title and title != headline: parts.append(f"Current title: {title}") # Career history - focus on heritage-relevant positions heritage_exp = profile.get("heritage_relevant_experience", []) if heritage_exp: for exp in heritage_exp[:5]: # Limit to 5 most relevant if isinstance(exp, dict): title = exp.get("title", "") company = exp.get("company", "") note = exp.get("relevance_note", "") if title and company: parts.append(f"Experience: {title} at {company}") if note: parts.append(note) # If no heritage-specific experience, use general career history if not heritage_exp: career = profile.get("career_history", []) for job in career[:5]: if isinstance(job, dict): title = job.get("title", "") company = job.get("company", "") if title and company: parts.append(f"Experience: {title} at {company}") # Custodian affiliations affiliations = data.get("custodian_affiliations", []) if affiliations: aff_names = [] for aff in affiliations[:5]: if isinstance(aff, dict): name = aff.get("custodian_name", "") role = aff.get("role", "") current = aff.get("current", False) if name: status = "(current)" if current else "(past)" aff_names.append(f"{name} {status}") if aff_names: parts.append(f"Affiliations: {', '.join(aff_names)}") # Education education = profile.get("education") or [] for edu in education[:3]: if isinstance(edu, dict): degree = edu.get("degree", "") institution = edu.get("institution", "") field = edu.get("field", "") if institution: edu_str = institution if degree: edu_str = f"{degree} from {institution}" if field: edu_str += f" ({field})" parts.append(f"Education: {edu_str}") # Skills skills = profile.get("skills", []) if skills: parts.append(f"Skills: {', '.join(skills[:15])}") # Languages languages = profile.get("languages", []) if languages: lang_strs = [] for lang in languages: if isinstance(lang, str): lang_strs.append(lang) elif isinstance(lang, dict): lang_strs.append(lang.get("name", lang.get("language", str(lang)))) if lang_strs: parts.append(f"Languages: {', '.join(lang_strs)}") # Network analysis notes network = data.get("network_analysis", {}) if network: notes = network.get("notes", "") if notes: parts.append(notes[:300]) # Heritage sector assessment assessment = data.get("heritage_sector_assessment", {}) if assessment: classification = assessment.get("sector_classification", "") leadership = assessment.get("leadership_level", "") if classification: parts.append(f"Sector: {classification}") if leadership: parts.append(f"Leadership level: {leadership}") return "\n".join(parts) def calculate_richness_score(data: dict[str, Any]) -> float: """Calculate a metadata richness score (0.0 - 1.0) for a person profile. This score is used to boost search results for profiles with more complete data. Profiles with rich metadata (about section, career history, skills, education) should rank higher than sparse profiles with only a name and headline. Scoring components (max 1.0): - Has full name: 0.05 - Has headline: 0.05 - Has location: 0.05 - Has about/summary (>100 chars): 0.15 - Has about/summary (>300 chars): 0.05 bonus - Has career history (1-3 jobs): 0.10 - Has career history (4+ jobs): 0.10 bonus - Has skills (1-5): 0.10 - Has skills (6+): 0.05 bonus - Has education (1+): 0.10 - Has languages: 0.05 - Has heritage-relevant experience: 0.10 - Has LinkedIn URL: 0.05 """ score = 0.0 profile = data.get("profile_data", {}) # Basic info (0.15 max) if profile.get("full_name"): score += 0.05 if profile.get("headline"): score += 0.05 if profile.get("location"): score += 0.05 # About section (0.20 max) - most important for context about = profile.get("about", "") or "" if len(about) > 100: score += 0.15 if len(about) > 300: score += 0.05 # Career history (0.20 max) career = profile.get("career_history", []) or [] if len(career) >= 1: score += 0.10 if len(career) >= 4: score += 0.10 # Skills (0.15 max) skills = profile.get("skills", []) or [] if len(skills) >= 1: score += 0.10 if len(skills) >= 6: score += 0.05 # Education (0.10 max) education = profile.get("education", []) or [] if len(education) >= 1: score += 0.10 # Languages (0.05 max) languages = profile.get("languages", []) or [] if len(languages) >= 1: score += 0.05 # Heritage-relevant experience (0.10 max) - important for domain relevance heritage_exp = profile.get("heritage_relevant_experience", []) or [] if len(heritage_exp) >= 1: score += 0.10 # LinkedIn URL (0.05 max) - indicates verifiable profile if data.get("linkedin_profile_url") or data.get("extraction_metadata", {}).get("linkedin_url"): score += 0.05 return min(score, 1.0) # Cap at 1.0 def extract_metadata(data: dict[str, Any], filepath: Path) -> dict[str, Any]: """Extract metadata for filtering from person data.""" metadata: dict[str, Any] = { "filename": filepath.name, "type": "person", } profile = data.get("profile_data", {}) person = data.get("person", {}) source_staff = data.get("source_staff_info", {}) extraction = data.get("extraction_metadata", {}) # Full name - check ALL possible field names (same as extract_person_text) name = ( profile.get("full_name") or profile.get("name") or person.get("full_name") or person.get("name") or source_staff.get("name") or source_staff.get("person_name") or extraction.get("person_name") or data.get("name") or "" ) if name: metadata["name"] = name # LinkedIn URL - check multiple locations linkedin_url = ( data.get("linkedin_profile_url", "") or profile.get("linkedin_url", "") or extraction.get("linkedin_url", "") ) if linkedin_url: metadata["linkedin_url"] = linkedin_url # Extract slug from URL if "/in/" in linkedin_url: slug = linkedin_url.split("/in/")[-1].rstrip("/") metadata["linkedin_slug"] = slug # Current position/headline headline = profile.get("headline", "") if headline: metadata["headline"] = headline # Location location = profile.get("location", "") if location: metadata["location"] = location # Try to extract city/country if ", " in location: parts = [p.strip() for p in location.split(",")] metadata["city"] = parts[0] if len(parts) >= 3: metadata["country"] = parts[-1] # Current company - from current_position or experience current_pos = profile.get("current_position", {}) if current_pos and isinstance(current_pos, dict): company = current_pos.get("company", "") if company: metadata["current_company"] = company # If no current_position, try to find current job in experience if "current_company" not in metadata: experience = profile.get("experience", []) for exp in experience: if isinstance(exp, dict) and exp.get("current"): company = exp.get("company", "") if company: metadata["current_company"] = company break # Heritage relevance - check heritage_relevance section heritage_rel = data.get("heritage_relevance", {}) if heritage_rel: is_relevant = heritage_rel.get("is_heritage_relevant", False) metadata["heritage_relevant"] = is_relevant # Primary heritage type primary_type = heritage_rel.get("primary_heritage_type", "") if primary_type: metadata["heritage_type"] = primary_type # Heritage institution types worked at (from experience) heritage_exp = profile.get("heritage_relevant_experience", []) heritage_types = set() for exp in heritage_exp: if isinstance(exp, dict): h_type = exp.get("heritage_type", "") if h_type: heritage_types.add(h_type) if heritage_types: metadata["heritage_types"] = list(heritage_types) # If heritage_relevant not set but has heritage experience, mark as relevant if "heritage_relevant" not in metadata: metadata["heritage_relevant"] = True # Auto-detect heritage relevance from headline/company if not explicitly set if "heritage_relevant" not in metadata: heritage_keywords = [ "museum", "archief", "archive", "bibliotheek", "library", "erfgoed", "heritage", "collectie", "collection", "curator", "archivist", "conservator", "nationaal archief", "rijksmuseum", "rijksarchief", "digitaal erfgoed", "digital heritage", "cultureel erfgoed", "cultural heritage" ] text_to_check = f"{headline} {metadata.get('current_company', '')}".lower() if any(kw in text_to_check for kw in heritage_keywords): metadata["heritage_relevant"] = True # Default heritage_relevant to False if still not set if "heritage_relevant" not in metadata: metadata["heritage_relevant"] = False # Current affiliations and custodian information for filtering # Try both "affiliations" (correct key) and "custodian_affiliations" (legacy key) affiliations = data.get("affiliations", []) or data.get("custodian_affiliations", []) current_affiliations = [] custodian_slugs = [] custodian_names = [] for aff in affiliations: if isinstance(aff, dict): custodian_name = aff.get("custodian_name", "") custodian_slug = aff.get("custodian_slug", "") is_current = aff.get("current", False) if custodian_name: custodian_names.append(custodian_name) if is_current: current_affiliations.append(custodian_name) if custodian_slug: custodian_slugs.append(custodian_slug) # Also check source_staff_info for custodian data source_staff = data.get("source_staff_info", {}) if source_staff: staff_custodian = source_staff.get("custodian", "") staff_slug = source_staff.get("custodian_slug", "") if staff_custodian and staff_custodian not in custodian_names: custodian_names.append(staff_custodian) if staff_slug and staff_slug not in custodian_slugs: custodian_slugs.append(staff_slug) # Store all custodian information for filtering if current_affiliations: metadata["current_affiliations"] = current_affiliations if custodian_names: metadata["custodian_names"] = custodian_names # Also store primary custodian (first one, usually current) metadata["custodian_name"] = custodian_names[0] if custodian_slugs: metadata["custodian_slugs"] = custodian_slugs # Also store primary custodian slug for simple filtering metadata["custodian_slug"] = custodian_slugs[0] # Generate custodian_slug from custodian_name if not available # This allows text-based filtering when slug is missing if "custodian_slug" not in metadata and custodian_names: # Create a simple slug from the name (lowercase, hyphenated) import re name = custodian_names[0] slug = re.sub(r'[^a-z0-9\s-]', '', name.lower()) slug = re.sub(r'[\s_]+', '-', slug) slug = re.sub(r'-+', '-', slug).strip('-') if slug: metadata["custodian_slug"] = slug # Extraction metadata if extraction: extraction_date = extraction.get("extraction_date", "") if extraction_date: metadata["extraction_date"] = extraction_date # Calculate richness score for search ranking metadata["richness_score"] = calculate_richness_score(data) return metadata def find_person_files(data_dir: Path) -> list[Path]: """Find all person JSON files in the data directory.""" files = [] # Look for JSON files patterns = [ "*.json", ] for pattern in patterns: files.extend(data_dir.glob(pattern)) # Filter out non-person files excluded_patterns = [ "_schema", "_config", "_template", "test_", "example_", ".DS_Store", "_connections_", # Connection files, not person profiles "_staff_", # Staff list aggregates, not individual profiles ] filtered = [] for f in files: if not any(excl in f.name for excl in excluded_patterns): filtered.append(f) return sorted(filtered) class PersonRetriever: """Qdrant retriever specifically for person entities. Uses MiniLM (384-dim) embeddings by default for consistency with the hybrid_retriever.py query-time embedding model. """ def __init__( self, host: str = "localhost", port: int = 6333, collection_name: str = "heritage_persons", embedding_model: str = "all-MiniLM-L6-v2", # MiniLM for local embeddings embedding_dim: int = 384, # MiniLM output dimension url: str | None = None, https: bool = False, prefix: str | None = None, ): from qdrant_client import QdrantClient from qdrant_client.http.models import Distance, VectorParams self.collection_name = collection_name self.embedding_model = embedding_model self.embedding_dim = embedding_dim # MiniLM model runs locally, no API key needed # Initialize Qdrant client if url: self.client = QdrantClient(url=url, prefer_grpc=False, timeout=60) elif https or port == 443: self.client = QdrantClient( host=host, port=port, https=True, prefix=prefix, prefer_grpc=False, timeout=60 ) else: self.client = QdrantClient(host=host, port=port, timeout=60) self._sentence_model = None @property def sentence_model(self): """Lazy-load SentenceTransformer model.""" if self._sentence_model is None: from sentence_transformers import SentenceTransformer logger.info(f"Loading embedding model: {self.embedding_model}") self._sentence_model = SentenceTransformer(self.embedding_model) return self._sentence_model def _get_embeddings_batch(self, texts: list[str]) -> list[list[float]]: """Get embedding vectors for multiple texts using MiniLM.""" if not texts: return [] embeddings = self.sentence_model.encode(texts, show_progress_bar=False) return embeddings.tolist() def ensure_collection(self) -> None: """Ensure the collection exists, create if not.""" from qdrant_client.http.models import Distance, VectorParams collections = self.client.get_collections().collections collection_names = [c.name for c in collections] if self.collection_name not in collection_names: logger.info(f"Creating collection: {self.collection_name}") self.client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=self.embedding_dim, distance=Distance.COSINE ) ) def delete_collection(self) -> None: """Delete the collection if it exists.""" try: self.client.delete_collection(self.collection_name) logger.info(f"Deleted collection: {self.collection_name}") except Exception as e: logger.warning(f"Could not delete collection: {e}") def add_documents( self, documents: list[dict[str, Any]], batch_size: int = 50 ) -> int: """Add documents to the collection.""" import hashlib from qdrant_client.http import models self.ensure_collection() valid_docs = [d for d in documents if d.get("text")] total_indexed = 0 for i in range(0, len(valid_docs), batch_size): batch = valid_docs[i:i + batch_size] texts = [d["text"] for d in batch] logger.info(f"Embedding batch {i//batch_size + 1}/{(len(valid_docs) + batch_size - 1)//batch_size} ({len(batch)} docs)") embeddings = self._get_embeddings_batch(texts) points = [] for j, (doc, embedding) in enumerate(zip(batch, embeddings)): # Generate deterministic ID from text doc_id = hashlib.md5(doc["text"].encode()).hexdigest() points.append(models.PointStruct( id=doc_id, vector=embedding, payload={ "text": doc["text"], **doc.get("metadata", {}) } )) self.client.upsert( collection_name=self.collection_name, points=points ) total_indexed += len(points) logger.info(f"Indexed {total_indexed}/{len(valid_docs)} documents") return total_indexed def get_collection_info(self) -> dict[str, Any]: """Get collection information.""" try: info = self.client.get_collection(self.collection_name) return { "status": info.status, "vectors_count": getattr(info, "vectors_count", None) or getattr(info, "points_count", 0), "points_count": getattr(info, "points_count", 0), } except Exception as e: return {"error": str(e)} def main(): parser = argparse.ArgumentParser( description="Index heritage persons in Qdrant for semantic search" ) parser.add_argument( "--data-dir", type=Path, default=PROJECT_ROOT / "data" / "person", help="Directory containing person JSON files (default: data/person/ with 330K+ PPID profiles)" ) parser.add_argument( "--host", default=os.getenv("QDRANT_HOST", "localhost"), help="Qdrant server hostname" ) parser.add_argument( "--port", type=int, default=int(os.getenv("QDRANT_PORT", "6333")), help="Qdrant REST API port" ) parser.add_argument( "--url", default=os.getenv("QDRANT_URL", ""), help="Full Qdrant URL. Overrides host/port." ) parser.add_argument( "--collection", default="heritage_persons", help="Qdrant collection name" ) parser.add_argument( "--batch-size", type=int, default=50, help="Number of documents to index per batch" ) parser.add_argument( "--recreate", action="store_true", help="Delete and recreate the collection" ) parser.add_argument( "--dry-run", action="store_true", help="Parse files but don't index" ) parser.add_argument( "--https", action="store_true", help="Use HTTPS for connection" ) parser.add_argument( "--prefix", default=None, help="URL path prefix (e.g., 'qdrant' for /qdrant/*)" ) args = parser.parse_args() # Check data directory exists if not args.data_dir.exists(): logger.error(f"Data directory not found: {args.data_dir}") sys.exit(1) # Find person files logger.info(f"Scanning for person files in {args.data_dir}") files = find_person_files(args.data_dir) logger.info(f"Found {len(files)} person files") if not files: logger.warning("No person files found") sys.exit(0) # Prepare documents documents = [] for filepath in files: data = load_json_file(filepath) if not data: continue text = extract_person_text(data) if not text or len(text) < 20: logger.debug(f"Skipping {filepath.name}: insufficient text") continue metadata = extract_metadata(data, filepath) documents.append({ "text": text, "metadata": metadata, }) logger.info(f"Prepared {len(documents)} documents for indexing") if args.dry_run: logger.info("Dry run - not indexing") for doc in documents[:5]: logger.info(f" - {doc['metadata'].get('name', 'Unknown')}: {len(doc['text'])} chars") logger.info(f" Metadata: {list(doc['metadata'].keys())}") sys.exit(0) # Note: MiniLM model runs locally, no API key needed # Create retriever if args.url: logger.info(f"Connecting to Qdrant at {args.url}") retriever = PersonRetriever(url=args.url, collection_name=args.collection) elif args.https or args.prefix: prefix_str = f"/{args.prefix}" if args.prefix else "" logger.info(f"Connecting to Qdrant at https://{args.host}:{args.port}{prefix_str}") retriever = PersonRetriever( host=args.host, port=args.port, collection_name=args.collection, https=args.https, prefix=args.prefix, ) else: logger.info(f"Connecting to Qdrant at {args.host}:{args.port}") retriever = PersonRetriever( host=args.host, port=args.port, collection_name=args.collection, ) # Optionally recreate collection if args.recreate: logger.warning(f"Deleting collection: {args.collection}") retriever.delete_collection() # Index documents logger.info(f"Indexing {len(documents)} documents...") indexed = retriever.add_documents(documents, batch_size=args.batch_size) # Report results info = retriever.get_collection_info() logger.info("Indexing complete!") logger.info(f" Documents indexed: {indexed}") logger.info(f" Collection status: {info.get('status', 'unknown')}") logger.info(f" Total vectors: {info.get('vectors_count', 0)}") if __name__ == "__main__": main()