696 lines
24 KiB
Python
696 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Extract Australian Heritage Custodian Organizations from Trove API
|
|
===================================================================
|
|
|
|
This script extracts all contributor organizations from the Trove API (Australian National
|
|
Library's aggregation service) and converts them to LinkML-compliant HeritageCustodian records.
|
|
|
|
Trove contributors are organizations that contribute collections data to the Australian
|
|
National Bibliographic Database (ANBD) and Trove. Each contributor has a unique NUC
|
|
(National Union Catalogue) symbol, which is Australia's implementation of the ISIL standard.
|
|
|
|
Features:
|
|
- Extracts all Trove contributors via API
|
|
- Retrieves full metadata (name, NUC code, contact details, URLs)
|
|
- Maps to LinkML HeritageCustodian schema (v0.2.1)
|
|
- Generates GHCID persistent identifiers
|
|
- Exports to YAML, JSON, and CSV formats
|
|
- Tracks provenance metadata
|
|
|
|
Data Quality:
|
|
- Tier: TIER_1_AUTHORITATIVE (official Trove registry)
|
|
- Source: National Library of Australia Trove API
|
|
- Coverage: Only organizations that contribute to Trove (subset of full ISIL registry)
|
|
|
|
Usage:
|
|
python scripts/extract_trove_contributors.py --api-key YOUR_TROVE_API_KEY
|
|
|
|
Requirements:
|
|
- Trove API key (free registration at https://trove.nla.gov.au/about/create-something/using-api)
|
|
- Python packages: requests, pyyaml, pydantic
|
|
|
|
Author: GLAM Data Extraction Project
|
|
License: CC0 1.0 Universal
|
|
Version: 1.0.0
|
|
"""
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# TROVE API CLIENT
|
|
# =============================================================================
|
|
|
|
class TroveAPIClient:
|
|
"""Client for Trove API v3."""
|
|
|
|
BASE_URL = "https://api.trove.nla.gov.au/v3/"
|
|
|
|
def __init__(self, api_key: str):
|
|
"""Initialize Trove API client.
|
|
|
|
Args:
|
|
api_key: Trove API key (obtain from https://trove.nla.gov.au/)
|
|
"""
|
|
self.api_key = api_key
|
|
self.session = requests.Session()
|
|
self.session.headers.update({
|
|
'User-Agent': 'GLAM-Heritage-Custodian-Extractor/1.0 (Research Project)'
|
|
})
|
|
|
|
def get_all_contributors(self, encoding: str = "json") -> List[Dict[str, Any]]:
|
|
"""Retrieve all Trove contributors.
|
|
|
|
Args:
|
|
encoding: Response format ('json' or 'xml')
|
|
|
|
Returns:
|
|
List of contributor dictionaries
|
|
"""
|
|
logger.info("Fetching all Trove contributors...")
|
|
|
|
url = urljoin(self.BASE_URL, "contributor")
|
|
params = {
|
|
'key': self.api_key,
|
|
'encoding': encoding,
|
|
'reclevel': 'brief' # Start with brief records
|
|
}
|
|
|
|
try:
|
|
response = self.session.get(url, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Extract contributors from response
|
|
if 'contributor' in data:
|
|
contributors = data['contributor']
|
|
logger.info(f"Found {len(contributors)} contributors")
|
|
return contributors
|
|
else:
|
|
logger.warning("No 'contributor' key in API response")
|
|
return []
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"API request failed: {e}")
|
|
return []
|
|
|
|
def get_contributor_details(self, nuc_id: str, encoding: str = "json") -> Optional[Dict[str, Any]]:
|
|
"""Retrieve detailed information for a single contributor.
|
|
|
|
Args:
|
|
nuc_id: NUC (National Union Catalogue) identifier
|
|
encoding: Response format ('json' or 'xml')
|
|
|
|
Returns:
|
|
Contributor details dictionary or None if not found
|
|
"""
|
|
url = urljoin(self.BASE_URL, f"contributor/{nuc_id}")
|
|
params = {
|
|
'key': self.api_key,
|
|
'encoding': encoding,
|
|
'reclevel': 'full' # Get complete metadata
|
|
}
|
|
|
|
try:
|
|
response = self.session.get(url, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if 'contributor' in data:
|
|
return data['contributor'][0] if isinstance(data['contributor'], list) else data['contributor']
|
|
else:
|
|
logger.warning(f"No data returned for NUC {nuc_id}")
|
|
return None
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Failed to fetch details for NUC {nuc_id}: {e}")
|
|
return None
|
|
|
|
def get_all_contributors_with_details(self, delay: float = 0.3) -> List[Dict[str, Any]]:
|
|
"""Retrieve all contributors with full details.
|
|
|
|
Respects Trove API rate limits (200 requests per minute = ~0.3s delay).
|
|
|
|
Args:
|
|
delay: Delay in seconds between API calls (default 0.3s for 200 req/min)
|
|
|
|
Returns:
|
|
List of contributor dictionaries with full metadata
|
|
"""
|
|
# Get list of all contributors
|
|
contributors = self.get_all_contributors()
|
|
|
|
if not contributors:
|
|
logger.error("No contributors found")
|
|
return []
|
|
|
|
logger.info(f"Fetching full details for {len(contributors)} contributors...")
|
|
|
|
detailed_contributors = []
|
|
|
|
for i, contrib in enumerate(contributors, 1):
|
|
nuc_id = contrib.get('id') or contrib.get('nuc')
|
|
|
|
if not nuc_id:
|
|
logger.warning(f"Contributor {i} has no NUC ID: {contrib}")
|
|
continue
|
|
|
|
logger.info(f"[{i}/{len(contributors)}] Fetching details for {nuc_id}...")
|
|
|
|
details = self.get_contributor_details(nuc_id)
|
|
|
|
if details:
|
|
detailed_contributors.append(details)
|
|
else:
|
|
# Fallback to brief record if full details fail
|
|
logger.warning(f"Using brief record for {nuc_id}")
|
|
detailed_contributors.append(contrib)
|
|
|
|
# Rate limiting
|
|
if i < len(contributors):
|
|
time.sleep(delay)
|
|
|
|
logger.info(f"Successfully retrieved {len(detailed_contributors)} detailed records")
|
|
return detailed_contributors
|
|
|
|
|
|
# =============================================================================
|
|
# GHCID GENERATOR
|
|
# =============================================================================
|
|
|
|
def generate_ghcid_components(institution_type: str, country: str = "AU",
|
|
region: Optional[str] = None, city: Optional[str] = None,
|
|
name_abbreviation: Optional[str] = None) -> str:
|
|
"""Generate GHCID base identifier (without Q-number).
|
|
|
|
Args:
|
|
institution_type: Institution type code (G/L/A/M/etc.)
|
|
country: ISO 3166-1 alpha-2 country code
|
|
region: State/province/region code
|
|
city: City code (first 3 letters, uppercase)
|
|
name_abbreviation: Institution name abbreviation (2-3 letters)
|
|
|
|
Returns:
|
|
GHCID base string (e.g., "AU-NSW-SYD-L-NLA")
|
|
"""
|
|
components = [country]
|
|
|
|
if region:
|
|
components.append(region)
|
|
|
|
if city:
|
|
# Normalize city name to 3-letter code
|
|
city_code = city[:3].upper().replace(' ', '')
|
|
components.append(city_code)
|
|
|
|
# Institution type code
|
|
components.append(institution_type)
|
|
|
|
# Name abbreviation
|
|
if name_abbreviation:
|
|
components.append(name_abbreviation.upper().replace(' ', ''))
|
|
|
|
return '-'.join(components)
|
|
|
|
|
|
def generate_ghcid_uuid_v5(ghcid_base: str) -> str:
|
|
"""Generate deterministic UUID v5 from GHCID base.
|
|
|
|
Uses SHA-1 hashing (RFC 4122 standard).
|
|
|
|
Args:
|
|
ghcid_base: Base GHCID string
|
|
|
|
Returns:
|
|
UUID v5 string
|
|
"""
|
|
namespace = uuid.UUID('6ba7b810-9dad-11d1-80b4-00c04fd430c8') # DNS namespace
|
|
return str(uuid.uuid5(namespace, ghcid_base))
|
|
|
|
|
|
def generate_ghcid_numeric(ghcid_base: str) -> int:
|
|
"""Generate 64-bit numeric GHCID from base string.
|
|
|
|
Uses SHA-256 truncation for deterministic numeric ID.
|
|
|
|
Args:
|
|
ghcid_base: Base GHCID string
|
|
|
|
Returns:
|
|
64-bit integer
|
|
"""
|
|
import hashlib
|
|
|
|
hash_digest = hashlib.sha256(ghcid_base.encode('utf-8')).digest()
|
|
# Take first 8 bytes and convert to 64-bit integer
|
|
return int.from_bytes(hash_digest[:8], byteorder='big', signed=False)
|
|
|
|
|
|
# =============================================================================
|
|
# INSTITUTION TYPE CLASSIFIER
|
|
# =============================================================================
|
|
|
|
def classify_institution_type(contributor: Dict[str, Any]) -> str:
|
|
"""Classify institution type based on Trove contributor metadata.
|
|
|
|
Uses GLAMORCUBESFIXPHDNT taxonomy (19-type system).
|
|
|
|
Args:
|
|
contributor: Trove contributor dictionary
|
|
|
|
Returns:
|
|
Institution type code (G/L/A/M/etc.)
|
|
"""
|
|
name = contributor.get('name', '').lower()
|
|
nuc = contributor.get('id', '').upper()
|
|
|
|
# Library indicators
|
|
if any(keyword in name for keyword in ['library', 'bibliothek', 'biblioteca', 'bibliotheque']):
|
|
return 'L'
|
|
|
|
# Archive indicators
|
|
if any(keyword in name for keyword in ['archive', 'archiv', 'archivo', 'records']):
|
|
return 'A'
|
|
|
|
# Museum indicators
|
|
if any(keyword in name for keyword in ['museum', 'museo', 'musee', 'gallery']):
|
|
# Distinguish between museum and gallery
|
|
if 'gallery' in name and 'museum' not in name:
|
|
return 'G'
|
|
return 'M'
|
|
|
|
# University indicators (Education Provider)
|
|
if any(keyword in name for keyword in ['university', 'college', 'school', 'institut']):
|
|
return 'E'
|
|
|
|
# Official institution indicators
|
|
if any(keyword in name for keyword in ['national', 'state', 'government', 'department', 'ministry']):
|
|
return 'O'
|
|
|
|
# Research center indicators
|
|
if any(keyword in name for keyword in ['research', 'institute', 'center', 'centre']):
|
|
return 'R'
|
|
|
|
# Society indicators
|
|
if any(keyword in name for keyword in ['society', 'association', 'club', 'historical']):
|
|
return 'S'
|
|
|
|
# Default: UNKNOWN
|
|
return 'U'
|
|
|
|
|
|
# =============================================================================
|
|
# TROVE TO LINKML CONVERTER
|
|
# =============================================================================
|
|
|
|
class TroveToLinkMLConverter:
|
|
"""Convert Trove contributor data to LinkML HeritageCustodian records."""
|
|
|
|
def __init__(self):
|
|
"""Initialize converter."""
|
|
self.extraction_date = datetime.now(timezone.utc).isoformat()
|
|
|
|
def convert_contributor(self, contributor: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Convert single Trove contributor to HeritageCustodian record.
|
|
|
|
Args:
|
|
contributor: Trove API contributor dictionary
|
|
|
|
Returns:
|
|
LinkML-compliant HeritageCustodian dictionary
|
|
"""
|
|
nuc_id = contributor.get('id') or contributor.get('nuc')
|
|
name = contributor.get('name', 'Unknown Institution')
|
|
|
|
# Classify institution type
|
|
inst_type = classify_institution_type(contributor)
|
|
|
|
# Generate GHCID components
|
|
# Extract location from contributor data (if available)
|
|
location_str = contributor.get('location', '')
|
|
city = None
|
|
region = None
|
|
|
|
# Try to parse location (format varies in Trove data)
|
|
if location_str:
|
|
parts = location_str.split(',')
|
|
if len(parts) >= 2:
|
|
city = parts[0].strip()
|
|
region = parts[-1].strip().upper()[:3] # State abbreviation
|
|
|
|
# Generate abbreviated name from NUC code or name
|
|
name_abbrev = nuc_id if nuc_id else name[:3]
|
|
|
|
ghcid_base = generate_ghcid_components(
|
|
institution_type=inst_type,
|
|
country='AU',
|
|
region=region,
|
|
city=city,
|
|
name_abbreviation=name_abbrev
|
|
)
|
|
|
|
ghcid_uuid_v5 = generate_ghcid_uuid_v5(ghcid_base)
|
|
ghcid_numeric = generate_ghcid_numeric(ghcid_base)
|
|
|
|
# Build HeritageCustodian record
|
|
record = {
|
|
'id': f"https://w3id.org/heritage/custodian/au/{nuc_id.lower() if nuc_id else ghcid_uuid_v5}",
|
|
'record_id': str(uuid.uuid4()), # UUID v4 for database record
|
|
'ghcid_uuid': ghcid_uuid_v5,
|
|
'ghcid_numeric': ghcid_numeric,
|
|
'ghcid_current': ghcid_base,
|
|
'name': name,
|
|
'institution_type': inst_type,
|
|
'identifiers': [],
|
|
'locations': [],
|
|
'provenance': {
|
|
'data_source': 'TROVE_API',
|
|
'data_tier': 'TIER_1_AUTHORITATIVE',
|
|
'extraction_date': self.extraction_date,
|
|
'extraction_method': 'Trove API v3 /contributor endpoint with reclevel=full',
|
|
'confidence_score': 0.95,
|
|
'source_url': f"https://api.trove.nla.gov.au/v3/contributor/{nuc_id}" if nuc_id else None
|
|
}
|
|
}
|
|
|
|
# Add NUC identifier (Australia's ISIL equivalent)
|
|
if nuc_id:
|
|
record['identifiers'].append({
|
|
'identifier_scheme': 'NUC',
|
|
'identifier_value': nuc_id,
|
|
'identifier_url': f"https://www.nla.gov.au/apps/ilrs/?action=IlrsSearch&term={nuc_id}"
|
|
})
|
|
|
|
# NUC codes map to ISIL format AU-{NUC}
|
|
record['identifiers'].append({
|
|
'identifier_scheme': 'ISIL',
|
|
'identifier_value': f"AU-{nuc_id}",
|
|
'identifier_url': None
|
|
})
|
|
|
|
# Add alternative names
|
|
alt_names = []
|
|
if 'shortName' in contributor and contributor['shortName']:
|
|
alt_names.append(contributor['shortName'])
|
|
if alt_names:
|
|
record['alternative_names'] = alt_names
|
|
|
|
# Add official name (if different from display name)
|
|
if 'fullName' in contributor and contributor['fullName']:
|
|
record['official_name'] = contributor['fullName']
|
|
|
|
# Add homepage URL
|
|
if 'url' in contributor and contributor['url']:
|
|
record['homepage'] = contributor['url']
|
|
|
|
# Add catalogue URL as digital platform
|
|
if 'catalogueUrl' in contributor and contributor['catalogueUrl']:
|
|
record['digital_platforms'] = [{
|
|
'platform_name': 'Institutional Catalogue',
|
|
'platform_url': contributor['catalogueUrl'],
|
|
'platform_type': 'CATALOGUE'
|
|
}]
|
|
|
|
# Add location data
|
|
if location_str:
|
|
location = {
|
|
'city': city,
|
|
'region': region,
|
|
'country': 'AU'
|
|
}
|
|
record['locations'].append(location)
|
|
|
|
# Add access policy information (if available)
|
|
if 'accessPolicy' in contributor and contributor['accessPolicy']:
|
|
if 'description' not in record:
|
|
record['description'] = ''
|
|
record['description'] += f"\n\nAccess Policy: {contributor['accessPolicy']}"
|
|
|
|
# Add "open to public" flag
|
|
if 'openToPublic' in contributor:
|
|
if 'description' not in record:
|
|
record['description'] = ''
|
|
record['description'] += f"\n\nOpen to Public: {contributor['openToPublic']}"
|
|
|
|
return record
|
|
|
|
def convert_all(self, contributors: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
|
"""Convert all Trove contributors to HeritageCustodian records.
|
|
|
|
Args:
|
|
contributors: List of Trove contributor dictionaries
|
|
|
|
Returns:
|
|
List of LinkML-compliant HeritageCustodian dictionaries
|
|
"""
|
|
logger.info(f"Converting {len(contributors)} contributors to LinkML format...")
|
|
|
|
records = []
|
|
for i, contrib in enumerate(contributors, 1):
|
|
try:
|
|
record = self.convert_contributor(contrib)
|
|
records.append(record)
|
|
|
|
if i % 50 == 0:
|
|
logger.info(f"Converted {i}/{len(contributors)} records...")
|
|
|
|
except Exception as e:
|
|
nuc_id = contrib.get('id', 'unknown')
|
|
logger.error(f"Failed to convert contributor {nuc_id}: {e}")
|
|
continue
|
|
|
|
logger.info(f"Successfully converted {len(records)} records")
|
|
return records
|
|
|
|
|
|
# =============================================================================
|
|
# EXPORT FUNCTIONS
|
|
# =============================================================================
|
|
|
|
def export_to_yaml(records: List[Dict[str, Any]], output_path: Path):
|
|
"""Export records to YAML format.
|
|
|
|
Args:
|
|
records: List of HeritageCustodian dictionaries
|
|
output_path: Output file path
|
|
"""
|
|
logger.info(f"Exporting to YAML: {output_path}")
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
yaml.safe_dump(records, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
|
|
|
|
logger.info(f"Exported {len(records)} records to {output_path}")
|
|
|
|
|
|
def export_to_json(records: List[Dict[str, Any]], output_path: Path):
|
|
"""Export records to JSON format.
|
|
|
|
Args:
|
|
records: List of HeritageCustodian dictionaries
|
|
output_path: Output file path
|
|
"""
|
|
logger.info(f"Exporting to JSON: {output_path}")
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(records, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"Exported {len(records)} records to {output_path}")
|
|
|
|
|
|
def export_to_csv(records: List[Dict[str, Any]], output_path: Path):
|
|
"""Export records to CSV format (flattened).
|
|
|
|
Args:
|
|
records: List of HeritageCustodian dictionaries
|
|
output_path: Output file path
|
|
"""
|
|
logger.info(f"Exporting to CSV: {output_path}")
|
|
|
|
if not records:
|
|
logger.warning("No records to export")
|
|
return
|
|
|
|
# Define CSV columns
|
|
fieldnames = [
|
|
'id', 'record_id', 'ghcid_uuid', 'ghcid_numeric', 'ghcid_current',
|
|
'name', 'official_name', 'alternative_names', 'institution_type',
|
|
'nuc_code', 'isil_code', 'homepage', 'catalogue_url',
|
|
'city', 'region', 'country',
|
|
'data_source', 'data_tier', 'extraction_date', 'confidence_score',
|
|
'description'
|
|
]
|
|
|
|
with open(output_path, 'w', encoding='utf-8', newline='') as f:
|
|
writer = csv.DictWriter(f, fieldnames=fieldnames)
|
|
writer.writeheader()
|
|
|
|
for record in records:
|
|
# Flatten record for CSV
|
|
flat_record = {
|
|
'id': record.get('id'),
|
|
'record_id': record.get('record_id'),
|
|
'ghcid_uuid': record.get('ghcid_uuid'),
|
|
'ghcid_numeric': record.get('ghcid_numeric'),
|
|
'ghcid_current': record.get('ghcid_current'),
|
|
'name': record.get('name'),
|
|
'official_name': record.get('official_name'),
|
|
'alternative_names': '; '.join(record.get('alternative_names', [])),
|
|
'institution_type': record.get('institution_type'),
|
|
'description': record.get('description', '').strip()
|
|
}
|
|
|
|
# Extract NUC and ISIL codes
|
|
identifiers = record.get('identifiers', [])
|
|
for identifier in identifiers:
|
|
if identifier['identifier_scheme'] == 'NUC':
|
|
flat_record['nuc_code'] = identifier['identifier_value']
|
|
elif identifier['identifier_scheme'] == 'ISIL':
|
|
flat_record['isil_code'] = identifier['identifier_value']
|
|
|
|
# Extract homepage and catalogue URL
|
|
flat_record['homepage'] = record.get('homepage')
|
|
|
|
digital_platforms = record.get('digital_platforms', [])
|
|
if digital_platforms:
|
|
flat_record['catalogue_url'] = digital_platforms[0].get('platform_url')
|
|
|
|
# Extract location
|
|
locations = record.get('locations', [])
|
|
if locations:
|
|
location = locations[0]
|
|
flat_record['city'] = location.get('city')
|
|
flat_record['region'] = location.get('region')
|
|
flat_record['country'] = location.get('country')
|
|
|
|
# Extract provenance
|
|
provenance = record.get('provenance', {})
|
|
flat_record['data_source'] = provenance.get('data_source')
|
|
flat_record['data_tier'] = provenance.get('data_tier')
|
|
flat_record['extraction_date'] = provenance.get('extraction_date')
|
|
flat_record['confidence_score'] = provenance.get('confidence_score')
|
|
|
|
writer.writerow(flat_record)
|
|
|
|
logger.info(f"Exported {len(records)} records to {output_path}")
|
|
|
|
|
|
# =============================================================================
|
|
# MAIN EXTRACTION FUNCTION
|
|
# =============================================================================
|
|
|
|
def main():
|
|
"""Main extraction workflow."""
|
|
parser = argparse.ArgumentParser(
|
|
description='Extract Australian heritage custodians from Trove API',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__
|
|
)
|
|
parser.add_argument(
|
|
'--api-key',
|
|
required=True,
|
|
help='Trove API key (get from https://trove.nla.gov.au/)'
|
|
)
|
|
parser.add_argument(
|
|
'--output-dir',
|
|
type=Path,
|
|
default=Path('data/instances'),
|
|
help='Output directory (default: data/instances)'
|
|
)
|
|
parser.add_argument(
|
|
'--delay',
|
|
type=float,
|
|
default=0.3,
|
|
help='Delay between API calls in seconds (default: 0.3 for 200 req/min)'
|
|
)
|
|
parser.add_argument(
|
|
'--formats',
|
|
nargs='+',
|
|
choices=['yaml', 'json', 'csv'],
|
|
default=['yaml', 'json', 'csv'],
|
|
help='Output formats (default: all)'
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create output directory
|
|
args.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Initialize Trove API client
|
|
logger.info("Initializing Trove API client...")
|
|
client = TroveAPIClient(api_key=args.api_key)
|
|
|
|
# Extract all contributors with full details
|
|
contributors = client.get_all_contributors_with_details(delay=args.delay)
|
|
|
|
if not contributors:
|
|
logger.error("No contributors extracted. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Convert to LinkML format
|
|
converter = TroveToLinkMLConverter()
|
|
records = converter.convert_all(contributors)
|
|
|
|
if not records:
|
|
logger.error("No records generated. Exiting.")
|
|
sys.exit(1)
|
|
|
|
# Generate timestamp for filenames
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
# Export to requested formats
|
|
if 'yaml' in args.formats:
|
|
yaml_path = args.output_dir / f'trove_contributors_{timestamp}.yaml'
|
|
export_to_yaml(records, yaml_path)
|
|
|
|
if 'json' in args.formats:
|
|
json_path = args.output_dir / f'trove_contributors_{timestamp}.json'
|
|
export_to_json(records, json_path)
|
|
|
|
if 'csv' in args.formats:
|
|
csv_path = args.output_dir / f'trove_contributors_{timestamp}.csv'
|
|
export_to_csv(records, csv_path)
|
|
|
|
# Generate summary report
|
|
logger.info("\n" + "="*80)
|
|
logger.info("EXTRACTION SUMMARY")
|
|
logger.info("="*80)
|
|
logger.info(f"Total contributors extracted: {len(contributors)}")
|
|
logger.info(f"Total records converted: {len(records)}")
|
|
logger.info(f"Output directory: {args.output_dir}")
|
|
|
|
# Count by institution type
|
|
type_counts = {}
|
|
for record in records:
|
|
inst_type = record.get('institution_type', 'UNKNOWN')
|
|
type_counts[inst_type] = type_counts.get(inst_type, 0) + 1
|
|
|
|
logger.info("\nInstitution Type Distribution:")
|
|
for inst_type, count in sorted(type_counts.items()):
|
|
logger.info(f" {inst_type}: {count}")
|
|
|
|
logger.info("\nExtraction complete!")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|