380 lines
13 KiB
Python
380 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
German ISIL Database Harvester (SRU Protocol)
|
|
|
|
This script harvests all German ISIL records using the SRU (Search/Retrieve via URL)
|
|
protocol from the Deutsche Nationalbibliothek.
|
|
|
|
SRU Endpoint: https://services.dnb.de/sru/bib
|
|
Protocol: SRU 1.1
|
|
Format: PicaPlus-XML and RDF/XML
|
|
|
|
Author: OpenCode + MCP Tools
|
|
Date: 2025-11-19
|
|
"""
|
|
|
|
import xml.etree.ElementTree as ET
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
import requests
|
|
from urllib.parse import quote
|
|
|
|
# Configuration
|
|
SRU_BASE_URL = "https://services.dnb.de/sru/bib"
|
|
OUTPUT_DIR = Path("/Users/kempersc/apps/glam/data/isil/germany")
|
|
BATCH_SIZE = 100 # Records per request
|
|
REQUEST_DELAY = 1.0 # Seconds between requests
|
|
MAX_RETRIES = 3
|
|
|
|
# XML Namespaces
|
|
NS = {
|
|
'srw': 'http://www.loc.gov/zing/srw/',
|
|
'ppxml': 'http://www.oclcpica.org/xmlns/ppxml-1.0',
|
|
'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
|
|
'foaf': 'http://xmlns.com/foaf/0.1/'
|
|
}
|
|
|
|
# Create output directory
|
|
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def fetch_sru_batch(start_record: int, max_records: int, query: str = "isil=DE-*",
|
|
record_schema: str = "PicaPlus-xml") -> Optional[ET.Element]:
|
|
"""
|
|
Fetch a batch of records via SRU.
|
|
|
|
Args:
|
|
start_record: Starting position (1-indexed)
|
|
max_records: Number of records to fetch
|
|
query: CQL query
|
|
record_schema: Format (PicaPlus-xml or RDFxml)
|
|
|
|
Returns:
|
|
XML root element or None on error
|
|
"""
|
|
params = {
|
|
'version': '1.1',
|
|
'operation': 'searchRetrieve',
|
|
'query': query,
|
|
'startRecord': start_record,
|
|
'maximumRecords': max_records,
|
|
'recordSchema': record_schema
|
|
}
|
|
|
|
url = f"{SRU_BASE_URL}?" + "&".join(f"{k}={quote(str(v))}" for k, v in params.items())
|
|
|
|
for attempt in range(MAX_RETRIES):
|
|
try:
|
|
print(f"Fetching records {start_record}-{start_record + max_records - 1}...", end=' ')
|
|
response = requests.get(url, timeout=60)
|
|
response.raise_for_status()
|
|
|
|
root = ET.fromstring(response.content)
|
|
|
|
# Check for SRU diagnostics (errors)
|
|
diagnostics = root.find('.//srw:diagnostics', NS)
|
|
if diagnostics is not None:
|
|
message = diagnostics.find('.//srw:message', NS)
|
|
print(f"SRU Error: {message.text if message is not None else 'Unknown'}")
|
|
return None
|
|
|
|
num_records = root.find('.//srw:numberOfRecords', NS)
|
|
if num_records is not None:
|
|
print(f"OK (total: {num_records.text})")
|
|
else:
|
|
print("OK")
|
|
|
|
return root
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Attempt {attempt + 1}/{MAX_RETRIES} failed: {e}")
|
|
if attempt < MAX_RETRIES - 1:
|
|
time.sleep(REQUEST_DELAY * (attempt + 1))
|
|
else:
|
|
return None
|
|
except ET.ParseError as e:
|
|
print(f"XML parse error: {e}")
|
|
return None
|
|
|
|
|
|
def parse_pica_record(record_elem: ET.Element) -> Dict:
|
|
"""
|
|
Parse a single PICA+ XML record into a structured dictionary.
|
|
|
|
Args:
|
|
record_elem: XML element containing the record
|
|
|
|
Returns:
|
|
Dictionary with parsed fields
|
|
"""
|
|
result = {
|
|
'isil': None,
|
|
'name': None,
|
|
'alternative_names': [],
|
|
'institution_type': None,
|
|
'address': {},
|
|
'contact': {},
|
|
'urls': [],
|
|
'parent_org': None,
|
|
'interloan_region': None,
|
|
'notes': None,
|
|
'raw_pica': {}
|
|
}
|
|
|
|
# Extract PICA fields
|
|
ppxml_record = record_elem.find('.//ppxml:record', NS)
|
|
if ppxml_record is None:
|
|
return result
|
|
|
|
for tag in ppxml_record.findall('.//ppxml:tag', NS):
|
|
tag_id = tag.get('id')
|
|
subfields = {}
|
|
|
|
for subf in tag.findall('ppxml:subf', NS):
|
|
subf_id = subf.get('id')
|
|
subfields[subf_id] = subf.text
|
|
|
|
result['raw_pica'][tag_id] = subfields
|
|
|
|
# Parse specific fields
|
|
if tag_id == '008H': # ISIL and institution codes
|
|
result['isil'] = subfields.get('e')
|
|
result['institution_type'] = subfields.get('f')
|
|
|
|
elif tag_id == '029A': # Official name
|
|
result['name'] = subfields.get('a')
|
|
|
|
elif tag_id == '029@': # Alternative names
|
|
if 'a' in subfields:
|
|
result['alternative_names'].append(subfields['a'])
|
|
|
|
elif tag_id == '032P': # Address
|
|
if subfields.get('2') == 'S': # Street address
|
|
result['address'] = {
|
|
'street': subfields.get('a'),
|
|
'city': subfields.get('b'),
|
|
'postal_code': subfields.get('e'),
|
|
'country': subfields.get('d'),
|
|
'region': subfields.get('f'),
|
|
'latitude': subfields.get('l'),
|
|
'longitude': subfields.get('k')
|
|
}
|
|
|
|
elif tag_id == '035B': # Contact information
|
|
result['contact'] = {
|
|
'phone': f"+{subfields.get('d', '')}-{subfields.get('e', '')}-{subfields.get('f', '')}".strip('-'),
|
|
'fax': f"+{subfields.get('g', '')}-{subfields.get('h', '')}-{subfields.get('i', '')}".strip('-'),
|
|
'email': subfields.get('k')
|
|
}
|
|
|
|
elif tag_id == '009Q': # URLs
|
|
url_info = {
|
|
'url': subfields.get('u'),
|
|
'type': subfields.get('z'),
|
|
'label': subfields.get('x')
|
|
}
|
|
result['urls'].append(url_info)
|
|
|
|
elif tag_id == '029R': # Parent organization
|
|
result['parent_org'] = subfields.get('a')
|
|
|
|
elif tag_id == '035I': # Interloan region
|
|
result['interloan_region'] = subfields.get('a')
|
|
|
|
elif tag_id == '047A': # General notes
|
|
result['notes'] = subfields.get('a')
|
|
|
|
return result
|
|
|
|
|
|
def harvest_all_german_isil() -> List[Dict]:
|
|
"""
|
|
Harvest all German ISIL records.
|
|
|
|
Returns:
|
|
List of parsed records
|
|
"""
|
|
print(f"\n{'='*70}")
|
|
print(f"Harvesting German ISIL Database via SRU")
|
|
print(f"Endpoint: {SRU_BASE_URL}")
|
|
print(f"{'='*70}\n")
|
|
|
|
# First request to get total count
|
|
first_batch = fetch_sru_batch(1, 1)
|
|
if first_batch is None:
|
|
print("Failed to fetch initial batch. Aborting.")
|
|
return []
|
|
|
|
num_records_elem = first_batch.find('.//srw:numberOfRecords', NS)
|
|
if num_records_elem is None or num_records_elem.text is None:
|
|
print("Could not determine total number of records.")
|
|
return []
|
|
|
|
total_records = int(num_records_elem.text)
|
|
print(f"Total records to harvest: {total_records}")
|
|
print(f"Batch size: {BATCH_SIZE}\n")
|
|
|
|
all_records = []
|
|
start_record = 1
|
|
|
|
while start_record <= total_records:
|
|
# Fetch batch
|
|
batch = fetch_sru_batch(start_record, BATCH_SIZE)
|
|
if batch is None:
|
|
print(f"Warning: Failed to fetch batch starting at {start_record}. Skipping...")
|
|
start_record += BATCH_SIZE
|
|
continue
|
|
|
|
# Parse records in batch
|
|
records_elem = batch.findall('.//srw:record', NS)
|
|
for record_elem in records_elem:
|
|
parsed = parse_pica_record(record_elem)
|
|
if parsed['isil']: # Only include records with ISIL
|
|
all_records.append(parsed)
|
|
|
|
print(f"Progress: {len(all_records)}/{total_records} records parsed")
|
|
|
|
start_record += BATCH_SIZE
|
|
|
|
# Be respectful to the server
|
|
if start_record <= total_records:
|
|
time.sleep(REQUEST_DELAY)
|
|
|
|
print(f"\n{'='*70}")
|
|
print(f"Harvest complete: {len(all_records)} records")
|
|
print(f"{'='*70}\n")
|
|
|
|
return all_records
|
|
|
|
|
|
def save_records(records: List[Dict], format: str = "json"):
|
|
"""Save records to file."""
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
|
|
if format == "json":
|
|
output_file = OUTPUT_DIR / f"german_isil_complete_{timestamp}.json"
|
|
|
|
output = {
|
|
'metadata': {
|
|
'source': 'German ISIL Database (Staatsbibliothek zu Berlin)',
|
|
'source_url': 'https://sigel.staatsbibliothek-berlin.de/',
|
|
'api_endpoint': SRU_BASE_URL,
|
|
'protocol': 'SRU 1.1',
|
|
'harvest_date': datetime.utcnow().isoformat() + 'Z',
|
|
'total_records': len(records),
|
|
'format': 'PicaPlus-XML (parsed)',
|
|
'license': 'CC0 1.0 Universal (Public Domain)',
|
|
'coverage': 'All German libraries, archives, museums with ISIL identifiers'
|
|
},
|
|
'records': records
|
|
}
|
|
|
|
with open(output_file, 'w', encoding='utf-8') as f:
|
|
json.dump(output, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"✓ JSON saved to: {output_file}")
|
|
print(f" File size: {output_file.stat().st_size / 1024 / 1024:.2f} MB")
|
|
|
|
# Also save as JSONL for easier processing
|
|
jsonl_file = OUTPUT_DIR / f"german_isil_complete_{timestamp}.jsonl"
|
|
with open(jsonl_file, 'w', encoding='utf-8') as f:
|
|
for record in records:
|
|
f.write(json.dumps(record, ensure_ascii=False) + '\n')
|
|
|
|
print(f"✓ JSONL saved to: {jsonl_file}")
|
|
print(f" File size: {jsonl_file.stat().st_size / 1024 / 1024:.2f} MB\n")
|
|
|
|
|
|
def generate_statistics(records: List[Dict]):
|
|
"""Generate and display statistics."""
|
|
stats = {
|
|
'total': len(records),
|
|
'by_type': {},
|
|
'with_address': 0,
|
|
'with_email': 0,
|
|
'with_phone': 0,
|
|
'with_url': 0,
|
|
'with_coordinates': 0,
|
|
'by_region': {}
|
|
}
|
|
|
|
for record in records:
|
|
# Count by institution type
|
|
inst_type = record.get('institution_type', 'Unknown')
|
|
stats['by_type'][inst_type] = stats['by_type'].get(inst_type, 0) + 1
|
|
|
|
# Count completeness
|
|
if record.get('address', {}).get('street'):
|
|
stats['with_address'] += 1
|
|
if record.get('contact', {}).get('email'):
|
|
stats['with_email'] += 1
|
|
if record.get('contact', {}).get('phone'):
|
|
stats['with_phone'] += 1
|
|
if record.get('urls'):
|
|
stats['with_url'] += 1
|
|
if record.get('address', {}).get('latitude'):
|
|
stats['with_coordinates'] += 1
|
|
|
|
# Count by interloan region
|
|
region = record.get('interloan_region', 'Unknown')
|
|
stats['by_region'][region] = stats['by_region'].get(region, 0) + 1
|
|
|
|
print(f"\n{'='*70}")
|
|
print("Statistics:")
|
|
print(f"{'='*70}")
|
|
print(f"Total records: {stats['total']}")
|
|
print(f"\nData completeness:")
|
|
print(f" - With street address: {stats['with_address']} ({stats['with_address']/stats['total']*100:.1f}%)")
|
|
print(f" - With email: {stats['with_email']} ({stats['with_email']/stats['total']*100:.1f}%)")
|
|
print(f" - With phone: {stats['with_phone']} ({stats['with_phone']/stats['total']*100:.1f}%)")
|
|
print(f" - With URL: {stats['with_url']} ({stats['with_url']/stats['total']*100:.1f}%)")
|
|
print(f" - With coordinates: {stats['with_coordinates']} ({stats['with_coordinates']/stats['total']*100:.1f}%)")
|
|
|
|
print(f"\nTop 10 institution types:")
|
|
for inst_type, count in sorted(stats['by_type'].items(), key=lambda x: x[1], reverse=True)[:10]:
|
|
print(f" - {inst_type}: {count}")
|
|
|
|
print(f"\nTop 10 interloan regions:")
|
|
for region, count in sorted(stats['by_region'].items(), key=lambda x: x[1], reverse=True)[:10]:
|
|
print(f" - {region}: {count}")
|
|
|
|
print(f"{'='*70}\n")
|
|
|
|
return stats
|
|
|
|
|
|
def main():
|
|
"""Main execution."""
|
|
print(f"\n{'#'*70}")
|
|
print(f"# German ISIL Database Harvester (SRU Protocol)")
|
|
print(f"# Staatsbibliothek zu Berlin / Deutsche Nationalbibliothek")
|
|
print(f"{'#'*70}\n")
|
|
|
|
# Harvest all records
|
|
records = harvest_all_german_isil()
|
|
|
|
if not records:
|
|
print("No records harvested. Exiting.")
|
|
return
|
|
|
|
# Save records
|
|
save_records(records)
|
|
|
|
# Generate statistics
|
|
stats = generate_statistics(records)
|
|
|
|
# Save statistics
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
stats_file = OUTPUT_DIR / f"german_isil_stats_{timestamp}.json"
|
|
with open(stats_file, 'w', encoding='utf-8') as f:
|
|
json.dump(stats, f, ensure_ascii=False, indent=2)
|
|
print(f"✓ Statistics saved to: {stats_file}\n")
|
|
|
|
print("✓ Harvest complete!\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|