glam/scripts/scrapers/harvest_german_isil_sru.py
2025-11-19 23:25:22 +01:00

380 lines
13 KiB
Python

#!/usr/bin/env python3
"""
German ISIL Database Harvester (SRU Protocol)
This script harvests all German ISIL records using the SRU (Search/Retrieve via URL)
protocol from the Deutsche Nationalbibliothek.
SRU Endpoint: https://services.dnb.de/sru/bib
Protocol: SRU 1.1
Format: PicaPlus-XML and RDF/XML
Author: OpenCode + MCP Tools
Date: 2025-11-19
"""
import xml.etree.ElementTree as ET
import json
import time
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
import requests
from urllib.parse import quote
# Configuration
SRU_BASE_URL = "https://services.dnb.de/sru/bib"
OUTPUT_DIR = Path("/Users/kempersc/apps/glam/data/isil/germany")
BATCH_SIZE = 100 # Records per request
REQUEST_DELAY = 1.0 # Seconds between requests
MAX_RETRIES = 3
# XML Namespaces
NS = {
'srw': 'http://www.loc.gov/zing/srw/',
'ppxml': 'http://www.oclcpica.org/xmlns/ppxml-1.0',
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
'foaf': 'http://xmlns.com/foaf/0.1/'
}
# Create output directory
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
def fetch_sru_batch(start_record: int, max_records: int, query: str = "isil=DE-*",
record_schema: str = "PicaPlus-xml") -> Optional[ET.Element]:
"""
Fetch a batch of records via SRU.
Args:
start_record: Starting position (1-indexed)
max_records: Number of records to fetch
query: CQL query
record_schema: Format (PicaPlus-xml or RDFxml)
Returns:
XML root element or None on error
"""
params = {
'version': '1.1',
'operation': 'searchRetrieve',
'query': query,
'startRecord': start_record,
'maximumRecords': max_records,
'recordSchema': record_schema
}
url = f"{SRU_BASE_URL}?" + "&".join(f"{k}={quote(str(v))}" for k, v in params.items())
for attempt in range(MAX_RETRIES):
try:
print(f"Fetching records {start_record}-{start_record + max_records - 1}...", end=' ')
response = requests.get(url, timeout=60)
response.raise_for_status()
root = ET.fromstring(response.content)
# Check for SRU diagnostics (errors)
diagnostics = root.find('.//srw:diagnostics', NS)
if diagnostics is not None:
message = diagnostics.find('.//srw:message', NS)
print(f"SRU Error: {message.text if message is not None else 'Unknown'}")
return None
num_records = root.find('.//srw:numberOfRecords', NS)
if num_records is not None:
print(f"OK (total: {num_records.text})")
else:
print("OK")
return root
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(REQUEST_DELAY * (attempt + 1))
else:
return None
except ET.ParseError as e:
print(f"XML parse error: {e}")
return None
def parse_pica_record(record_elem: ET.Element) -> Dict:
"""
Parse a single PICA+ XML record into a structured dictionary.
Args:
record_elem: XML element containing the record
Returns:
Dictionary with parsed fields
"""
result = {
'isil': None,
'name': None,
'alternative_names': [],
'institution_type': None,
'address': {},
'contact': {},
'urls': [],
'parent_org': None,
'interloan_region': None,
'notes': None,
'raw_pica': {}
}
# Extract PICA fields
ppxml_record = record_elem.find('.//ppxml:record', NS)
if ppxml_record is None:
return result
for tag in ppxml_record.findall('.//ppxml:tag', NS):
tag_id = tag.get('id')
subfields = {}
for subf in tag.findall('ppxml:subf', NS):
subf_id = subf.get('id')
subfields[subf_id] = subf.text
result['raw_pica'][tag_id] = subfields
# Parse specific fields
if tag_id == '008H': # ISIL and institution codes
result['isil'] = subfields.get('e')
result['institution_type'] = subfields.get('f')
elif tag_id == '029A': # Official name
result['name'] = subfields.get('a')
elif tag_id == '029@': # Alternative names
if 'a' in subfields:
result['alternative_names'].append(subfields['a'])
elif tag_id == '032P': # Address
if subfields.get('2') == 'S': # Street address
result['address'] = {
'street': subfields.get('a'),
'city': subfields.get('b'),
'postal_code': subfields.get('e'),
'country': subfields.get('d'),
'region': subfields.get('f'),
'latitude': subfields.get('l'),
'longitude': subfields.get('k')
}
elif tag_id == '035B': # Contact information
result['contact'] = {
'phone': f"+{subfields.get('d', '')}-{subfields.get('e', '')}-{subfields.get('f', '')}".strip('-'),
'fax': f"+{subfields.get('g', '')}-{subfields.get('h', '')}-{subfields.get('i', '')}".strip('-'),
'email': subfields.get('k')
}
elif tag_id == '009Q': # URLs
url_info = {
'url': subfields.get('u'),
'type': subfields.get('z'),
'label': subfields.get('x')
}
result['urls'].append(url_info)
elif tag_id == '029R': # Parent organization
result['parent_org'] = subfields.get('a')
elif tag_id == '035I': # Interloan region
result['interloan_region'] = subfields.get('a')
elif tag_id == '047A': # General notes
result['notes'] = subfields.get('a')
return result
def harvest_all_german_isil() -> List[Dict]:
"""
Harvest all German ISIL records.
Returns:
List of parsed records
"""
print(f"\n{'='*70}")
print(f"Harvesting German ISIL Database via SRU")
print(f"Endpoint: {SRU_BASE_URL}")
print(f"{'='*70}\n")
# First request to get total count
first_batch = fetch_sru_batch(1, 1)
if first_batch is None:
print("Failed to fetch initial batch. Aborting.")
return []
num_records_elem = first_batch.find('.//srw:numberOfRecords', NS)
if num_records_elem is None or num_records_elem.text is None:
print("Could not determine total number of records.")
return []
total_records = int(num_records_elem.text)
print(f"Total records to harvest: {total_records}")
print(f"Batch size: {BATCH_SIZE}\n")
all_records = []
start_record = 1
while start_record <= total_records:
# Fetch batch
batch = fetch_sru_batch(start_record, BATCH_SIZE)
if batch is None:
print(f"Warning: Failed to fetch batch starting at {start_record}. Skipping...")
start_record += BATCH_SIZE
continue
# Parse records in batch
records_elem = batch.findall('.//srw:record', NS)
for record_elem in records_elem:
parsed = parse_pica_record(record_elem)
if parsed['isil']: # Only include records with ISIL
all_records.append(parsed)
print(f"Progress: {len(all_records)}/{total_records} records parsed")
start_record += BATCH_SIZE
# Be respectful to the server
if start_record <= total_records:
time.sleep(REQUEST_DELAY)
print(f"\n{'='*70}")
print(f"Harvest complete: {len(all_records)} records")
print(f"{'='*70}\n")
return all_records
def save_records(records: List[Dict], format: str = "json"):
"""Save records to file."""
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
if format == "json":
output_file = OUTPUT_DIR / f"german_isil_complete_{timestamp}.json"
output = {
'metadata': {
'source': 'German ISIL Database (Staatsbibliothek zu Berlin)',
'source_url': 'https://sigel.staatsbibliothek-berlin.de/',
'api_endpoint': SRU_BASE_URL,
'protocol': 'SRU 1.1',
'harvest_date': datetime.utcnow().isoformat() + 'Z',
'total_records': len(records),
'format': 'PicaPlus-XML (parsed)',
'license': 'CC0 1.0 Universal (Public Domain)',
'coverage': 'All German libraries, archives, museums with ISIL identifiers'
},
'records': records
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output, f, ensure_ascii=False, indent=2)
print(f"✓ JSON saved to: {output_file}")
print(f" File size: {output_file.stat().st_size / 1024 / 1024:.2f} MB")
# Also save as JSONL for easier processing
jsonl_file = OUTPUT_DIR / f"german_isil_complete_{timestamp}.jsonl"
with open(jsonl_file, 'w', encoding='utf-8') as f:
for record in records:
f.write(json.dumps(record, ensure_ascii=False) + '\n')
print(f"✓ JSONL saved to: {jsonl_file}")
print(f" File size: {jsonl_file.stat().st_size / 1024 / 1024:.2f} MB\n")
def generate_statistics(records: List[Dict]):
"""Generate and display statistics."""
stats = {
'total': len(records),
'by_type': {},
'with_address': 0,
'with_email': 0,
'with_phone': 0,
'with_url': 0,
'with_coordinates': 0,
'by_region': {}
}
for record in records:
# Count by institution type
inst_type = record.get('institution_type', 'Unknown')
stats['by_type'][inst_type] = stats['by_type'].get(inst_type, 0) + 1
# Count completeness
if record.get('address', {}).get('street'):
stats['with_address'] += 1
if record.get('contact', {}).get('email'):
stats['with_email'] += 1
if record.get('contact', {}).get('phone'):
stats['with_phone'] += 1
if record.get('urls'):
stats['with_url'] += 1
if record.get('address', {}).get('latitude'):
stats['with_coordinates'] += 1
# Count by interloan region
region = record.get('interloan_region', 'Unknown')
stats['by_region'][region] = stats['by_region'].get(region, 0) + 1
print(f"\n{'='*70}")
print("Statistics:")
print(f"{'='*70}")
print(f"Total records: {stats['total']}")
print(f"\nData completeness:")
print(f" - With street address: {stats['with_address']} ({stats['with_address']/stats['total']*100:.1f}%)")
print(f" - With email: {stats['with_email']} ({stats['with_email']/stats['total']*100:.1f}%)")
print(f" - With phone: {stats['with_phone']} ({stats['with_phone']/stats['total']*100:.1f}%)")
print(f" - With URL: {stats['with_url']} ({stats['with_url']/stats['total']*100:.1f}%)")
print(f" - With coordinates: {stats['with_coordinates']} ({stats['with_coordinates']/stats['total']*100:.1f}%)")
print(f"\nTop 10 institution types:")
for inst_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" - {inst_type}: {count}")
print(f"\nTop 10 interloan regions:")
for region, count in sorted(stats['by_region'].items(), key=lambda x: x[1], reverse=True)[:10]:
print(f" - {region}: {count}")
print(f"{'='*70}\n")
return stats
def main():
"""Main execution."""
print(f"\n{'#'*70}")
print(f"# German ISIL Database Harvester (SRU Protocol)")
print(f"# Staatsbibliothek zu Berlin / Deutsche Nationalbibliothek")
print(f"{'#'*70}\n")
# Harvest all records
records = harvest_all_german_isil()
if not records:
print("No records harvested. Exiting.")
return
# Save records
save_records(records)
# Generate statistics
stats = generate_statistics(records)
# Save statistics
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
stats_file = OUTPUT_DIR / f"german_isil_stats_{timestamp}.json"
with open(stats_file, 'w', encoding='utf-8') as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
print(f"✓ Statistics saved to: {stats_file}\n")
print("✓ Harvest complete!\n")
if __name__ == "__main__":
main()