glam/scripts/extract_with_agents.py
kempersc e5a532a8bc Add comprehensive tests for NLP institution extraction and RDF partnership integration
- Introduced `test_nlp_extractor.py` with unit tests for the InstitutionExtractor, covering various extraction patterns (ISIL, Wikidata, VIAF, city names) and ensuring proper classification of institutions (museum, library, archive).
- Added tests for extracted entities and result handling to validate the extraction process.
- Created `test_partnership_rdf_integration.py` to validate the end-to-end process of extracting partnerships from a conversation and exporting them to RDF format.
- Implemented tests for temporal properties in partnerships and ensured compliance with W3C Organization Ontology patterns.
- Verified that extracted partnerships are correctly linked with PROV-O provenance metadata.
2025-11-19 23:20:47 +01:00

507 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Orchestration script for NLP extraction using OpenCode subagents.
This script coordinates multiple specialized OpenCode agents to extract
structured heritage institution data from conversation JSON files.
Usage:
python scripts/extract_with_agents.py <conversation_json_path>
Example:
python scripts/extract_with_agents.py conversations/Brazilian_GLAM_collection_inventories.json
The script will:
1. Parse the conversation JSON using ConversationParser
2. Extract text content from messages
3. Invoke specialized OpenCode agents:
- @institution-extractor: Extract institution names and types
- @location-extractor: Extract geographic information
- @identifier-extractor: Extract ISIL, Wikidata, VIAF, etc.
- @event-extractor: Extract organizational change events
4. Combine results into HeritageCustodian records
5. Validate with LinkML schema
6. Export to JSON-LD
Note: This script is designed to work WITH OpenCode agents, not independently.
Run this script from within an OpenCode session where you can invoke agents.
"""
import sys
import json
from pathlib import Path
from datetime import datetime, timezone
from typing import List, Dict, Any, Optional
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from glam_extractor.parsers.conversation import ConversationParser
from glam_extractor.models import (
HeritageCustodian,
Provenance,
DataSource,
DataTier,
InstitutionType,
OrganizationStatus,
Location,
Identifier,
ChangeEvent,
ChangeType,
)
class AgentOrchestrator:
"""
Orchestrates multiple OpenCode agents to extract heritage institution data.
This class does NOT directly invoke agents (that's done by OpenCode).
Instead, it:
- Prepares input text for agents
- Provides helper methods for processing agent responses
- Combines results into HeritageCustodian records
"""
def __init__(self, conversation_path: Path):
self.conversation_path = conversation_path
self.parser = ConversationParser()
self.conversation = self.parser.parse_file(conversation_path)
def extract_conversation_text(
self,
sender: Optional[str] = None,
max_length: int = 50000
) -> str:
"""
Extract text content from conversation for agent processing.
Args:
sender: Filter by sender ('human' or 'assistant'). None for all.
max_length: Maximum characters to extract (agent context limits)
Returns:
Combined text from conversation messages
"""
# Use Conversation's extract_all_text method
combined = self.conversation.extract_all_text(sender=sender)
# Truncate if too long (respect agent context limits)
if len(combined) > max_length:
combined = combined[:max_length] + "\n\n[... text truncated ...]"
return combined
def prepare_institution_extraction_prompt(self) -> str:
"""
Prepare prompt for @institution-extractor agent.
Returns:
Formatted prompt with conversation text
"""
text = self.extract_conversation_text(max_length=40000)
conversation_name = self.conversation.name
conversation_id = self.conversation.uuid
prompt = f"""
# Institution Extraction Task
**Conversation**: {conversation_name}
**Conversation ID**: {conversation_id}
**File**: {self.conversation_path.name}
## Instructions
Please extract all heritage institutions (museums, libraries, archives, galleries, etc.)
mentioned in the following conversation text.
Return your results as a JSON array of institution objects following the format
specified in your agent configuration.
## Conversation Text
{text}
## Output Format
Return ONLY the JSON array, no additional commentary:
```json
{{
"institutions": [...]
}}
```
"""
return prompt.strip()
def prepare_location_extraction_prompt(self, institution_context: str = "") -> str:
"""
Prepare prompt for @location-extractor agent.
Args:
institution_context: Optional context about specific institution
Returns:
Formatted prompt with conversation text
"""
text = self.extract_conversation_text(max_length=40000)
prompt = f"""
# Location Extraction Task
**File**: {self.conversation_path.name}
## Instructions
Please extract all geographic locations (cities, addresses, regions, countries)
mentioned for heritage institutions in the following conversation text.
{institution_context}
Return your results as a JSON array following the format specified in your agent configuration.
## Conversation Text
{text}
## Output Format
Return ONLY the JSON array, no additional commentary:
```json
{{
"locations": [...]
}}
```
"""
return prompt.strip()
def prepare_identifier_extraction_prompt(self) -> str:
"""
Prepare prompt for @identifier-extractor agent.
Returns:
Formatted prompt with conversation text
"""
text = self.extract_conversation_text(max_length=40000)
prompt = f"""
# Identifier Extraction Task
**File**: {self.conversation_path.name}
## Instructions
Please extract all external identifiers (ISIL codes, Wikidata IDs, VIAF, KvK numbers, URLs, etc.)
mentioned for heritage institutions in the following conversation text.
Look for patterns like:
- ISIL: NL-AsdAM, US-DLC, etc.
- Wikidata: Q123456
- VIAF: 147143282
- KvK: 41231987
- URLs: https://www.example.org
Return your results as a JSON array following the format specified in your agent configuration.
## Conversation Text
{text}
## Output Format
Return ONLY the JSON array, no additional commentary:
```json
{{
"identifiers": [...]
}}
```
"""
return prompt.strip()
def prepare_event_extraction_prompt(self) -> str:
"""
Prepare prompt for @event-extractor agent.
Returns:
Formatted prompt with conversation text
"""
text = self.extract_conversation_text(max_length=40000)
prompt = f"""
# Change Event Extraction Task
**File**: {self.conversation_path.name}
## Instructions
Please extract all organizational change events (founding, mergers, relocations, name changes, etc.)
mentioned for heritage institutions in the following conversation text.
Look for temporal patterns like:
- "Founded in 1985"
- "Merged with X in 2001"
- "Relocated to Y in 2010"
- "Renamed from A to B"
Return your results as a JSON array following the format specified in your agent configuration.
## Conversation Text
{text}
## Output Format
Return ONLY the JSON array, no additional commentary:
```json
{{
"change_events": [...]
}}
```
"""
return prompt.strip()
def create_heritage_custodian_record(
self,
institution_data: Dict[str, Any],
locations_data: List[Dict[str, Any]],
identifiers_data: List[Dict[str, Any]],
events_data: List[Dict[str, Any]]
) -> HeritageCustodian:
"""
Combine agent extraction results into a HeritageCustodian record.
Args:
institution_data: From @institution-extractor
locations_data: From @location-extractor
identifiers_data: From @identifier-extractor
events_data: From @event-extractor
Returns:
Validated HeritageCustodian record
"""
# Create provenance metadata
provenance = Provenance(
data_source=DataSource.CONVERSATION_NLP,
data_tier=DataTier.TIER_4_INFERRED,
extraction_date=datetime.now(timezone.utc),
extraction_method="OpenCode multi-agent NLP extraction",
confidence_score=institution_data.get("confidence_score", 0.8),
conversation_id=self.conversation.uuid,
source_url=None,
verified_date=None,
verified_by=None
)
# Parse institution type
institution_type = InstitutionType.UNKNOWN
type_str = institution_data.get("institution_type", "").upper()
if type_str in InstitutionType.__members__:
institution_type = InstitutionType[type_str]
# Create Location objects
locations = []
for loc_data in locations_data:
location = Location(
location_type=loc_data.get("location_type"),
city=loc_data.get("city"),
street_address=loc_data.get("street_address"),
postal_code=loc_data.get("postal_code"),
region=loc_data.get("region"),
country=loc_data.get("country"),
geonames_id=loc_data.get("geonames_id"),
latitude=loc_data.get("latitude"),
longitude=loc_data.get("longitude"),
is_primary=loc_data.get("is_primary", False)
)
locations.append(location)
# Create Identifier objects
identifiers = []
for id_data in identifiers_data:
scheme = id_data.get("identifier_scheme")
value = id_data.get("identifier_value")
if scheme and value: # Skip if missing required fields
identifier = Identifier(
identifier_scheme=scheme,
identifier_value=value,
identifier_url=id_data.get("identifier_uri"), # Agent uses identifier_uri
assigned_date=None # Not extracted by agents
)
identifiers.append(identifier)
# Create ChangeEvent objects
change_history = []
for event_data in events_data:
change_type_str = event_data.get("change_type", "").upper()
if change_type_str in ChangeType.__members__:
change_type = ChangeType[change_type_str]
else:
change_type = ChangeType.OTHER
# Parse event_date (can be string or date object from agent)
event_date = event_data.get("event_date")
if isinstance(event_date, str):
# Try to parse ISO date string
from datetime import date
try:
event_date = date.fromisoformat(event_date)
except (ValueError, TypeError):
continue # Skip if date parsing fails
# Skip if no valid date
if event_date is None:
continue
event = ChangeEvent(
event_id=event_data.get("event_id", f"event-{len(change_history)}"),
change_type=change_type,
event_date=event_date,
event_description=event_data.get("event_description"),
affected_organization=event_data.get("affected_organization"),
resulting_organization=event_data.get("resulting_organization"),
related_organizations=event_data.get("related_organizations", []),
source_documentation=event_data.get("source_documentation")
)
change_history.append(event)
# Create HeritageCustodian record
name = institution_data.get("name", "Unknown Institution")
custodian = HeritageCustodian(
id=f"heritage-custodian-{institution_data.get('institution_id', 'unknown')}",
name=name,
alternative_names=institution_data.get("alternative_names", []),
institution_type=institution_type,
organization_status=OrganizationStatus.UNKNOWN, # Not extracted by agents
description=institution_data.get("description"),
parent_organization=None, # Not extracted by agents
founded_date=None, # Extracted separately via events
closed_date=None, # Extracted separately via events
homepage=None, # Could be in identifiers as WEBSITE
ghcid_numeric=None, # Generated later
ghcid_current=None, # Generated later
ghcid_original=None, # Generated later
ghcid_history=None, # Generated later
contact_info=None, # Not extracted by agents
locations=locations if locations else [],
identifiers=identifiers if identifiers else [],
change_history=change_history if change_history else [],
provenance=provenance
)
return custodian
def export_to_jsonld(
self,
custodians: List[HeritageCustodian],
output_path: Path
):
"""
Export HeritageCustodian records to JSON-LD.
Args:
custodians: List of validated records
output_path: Where to save JSON-LD file
"""
jsonld_data = {
"@context": "https://w3id.org/heritage/custodian/context.jsonld",
"@graph": [custodian.dict(exclude_none=True) for custodian in custodians]
}
with output_path.open('w', encoding='utf-8') as f:
json.dump(jsonld_data, f, indent=2, ensure_ascii=False, default=str)
print(f"✅ Exported {len(custodians)} records to {output_path}")
def main():
"""
Main entry point for the orchestration script.
NOTE: This script prepares prompts and data structures, but does NOT
directly invoke agents. Agent invocation happens through OpenCode.
To use this script:
1. Run it to see the prompts for each agent
2. Copy/paste prompts to invoke agents via @mention
3. Collect agent responses
4. Use helper methods to process results
"""
if len(sys.argv) < 2:
print("Usage: python scripts/extract_with_agents.py <conversation_json_path>")
print()
print("Example:")
print(" python scripts/extract_with_agents.py conversations/Brazilian_GLAM_collection_inventories.json")
sys.exit(1)
conversation_path = Path(sys.argv[1])
if not conversation_path.exists():
print(f"❌ Error: File not found: {conversation_path}")
sys.exit(1)
print(f"🔄 Processing conversation: {conversation_path.name}")
print()
# Initialize orchestrator
orchestrator = AgentOrchestrator(conversation_path)
print("=" * 80)
print("AGENT INVOCATION PROMPTS")
print("=" * 80)
print()
# Generate prompts for each agent
print("📋 STEP 1: Invoke @institution-extractor")
print("-" * 80)
institution_prompt = orchestrator.prepare_institution_extraction_prompt()
print(institution_prompt)
print()
print("⏸ Copy the above prompt and send to @institution-extractor")
print()
print("=" * 80)
print("📋 STEP 2: Invoke @location-extractor")
print("-" * 80)
location_prompt = orchestrator.prepare_location_extraction_prompt()
print(location_prompt)
print()
print("⏸ Copy the above prompt and send to @location-extractor")
print()
print("=" * 80)
print("📋 STEP 3: Invoke @identifier-extractor")
print("-" * 80)
identifier_prompt = orchestrator.prepare_identifier_extraction_prompt()
print(identifier_prompt)
print()
print("⏸ Copy the above prompt and send to @identifier-extractor")
print()
print("=" * 80)
print("📋 STEP 4: Invoke @event-extractor")
print("-" * 80)
event_prompt = orchestrator.prepare_event_extraction_prompt()
print(event_prompt)
print()
print("⏸ Copy the above prompt and send to @event-extractor")
print()
print("=" * 80)
print("📊 NEXT STEPS")
print("=" * 80)
print()
print("After receiving responses from all agents:")
print("1. Collect JSON responses from each agent")
print("2. Use orchestrator.create_heritage_custodian_record() to combine results")
print("3. Validate records with LinkML schema")
print("4. Export to JSON-LD using orchestrator.export_to_jsonld()")
print()
print("Or continue working with the orchestrator object in your OpenCode session.")
print()
if __name__ == "__main__":
main()