glam/scripts/scrapers/parse_kb_netherlands_isil.py
2025-11-19 23:25:22 +01:00

195 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
KB Netherlands ISIL Code Excel Parser
Extracts public library data from KB Netherlands Excel file.
Source: https://www.bibliotheeknetwerk.nl/
File: 20250401 Bnetwerk overzicht ISIL-codes Bibliotheken Nederland.xlsx
Author: GLAM Data Extraction Project
Date: 2025-11-17
License: MIT
"""
import openpyxl
import csv
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Dict, Optional
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Paths
INPUT_FILE = Path(__file__).parent.parent.parent / "data" / "isil" / "KB_Netherlands_ISIL_2025-04-01.xlsx"
OUTPUT_DIR = Path(__file__).parent.parent.parent / "data" / "isil" / "NL"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
class KBNetherlandsISILParser:
"""Parses KB Netherlands ISIL Excel file."""
def __init__(self, input_file: Path):
self.input_file = input_file
self.institutions = []
def parse_excel(self) -> List[Dict]:
"""
Parse KB Netherlands Excel file.
Returns:
List of institution dictionaries
"""
logger.info(f"Opening Excel file: {self.input_file}")
workbook = openpyxl.load_workbook(self.input_file)
sheet = workbook.active
logger.info(f"Sheet name: {sheet.title}")
logger.info(f"Sheet dimensions: {sheet.dimensions}")
# Read header row (row 3 contains actual headers)
headers = []
for cell in sheet[3]:
headers.append(cell.value)
logger.info(f"Headers: {headers}")
# Parse data rows (starting from row 4)
institutions = []
for row_idx, row in enumerate(sheet.iter_rows(min_row=4, values_only=True), start=4):
if not row[0]: # Skip empty rows
continue
# Create dict from row data
row_dict = {}
for idx, value in enumerate(row):
if idx < len(headers) and headers[idx]:
row_dict[headers[idx]] = value
# Standardize field names (adapt based on actual headers)
institution = self._standardize_fields(row_dict)
if institution:
institutions.append(institution)
if len(institutions) % 20 == 0:
logger.info(f"Processed {len(institutions)} institutions...")
logger.info(f"Successfully parsed {len(institutions)} institutions from Excel")
self.institutions = institutions
return institutions
def _standardize_fields(self, row_dict: Dict) -> Optional[Dict]:
"""
Standardize field names from Excel to our schema.
Args:
row_dict: Raw row data from Excel
Returns:
Standardized institution dictionary
"""
# We'll need to adjust this based on actual Excel structure
# For now, create a generic mapping
standardized = {
'isil_code': None,
'name': None,
'city': None,
'province': None,
'country': 'Netherlands',
'registry': 'KB Netherlands Library Network',
'source_url': 'https://www.bibliotheeknetwerk.nl/'
}
# Map Excel fields to our schema
# Actual KB Netherlands headers: ISIL-code, Naam bibliotheek, Vestigingsplaats, Opmerking
field_mappings = {
'ISIL-code': 'isil_code',
'Naam bibliotheek': 'name',
'Vestigingsplaats': 'city',
'Opmerking': 'notes',
}
for excel_field, our_field in field_mappings.items():
if excel_field in row_dict and row_dict[excel_field]:
standardized[our_field] = str(row_dict[excel_field]).strip()
# Validate we have minimum required fields
if not standardized['isil_code'] or not standardized['name']:
logger.warning(f"Skipping row with missing ISIL or name: {row_dict}")
return None
return standardized
def export_csv(self, output_file: Path):
"""Export to CSV."""
with open(output_file, 'w', newline='', encoding='utf-8') as f:
if not self.institutions:
logger.warning("No institutions to export")
return
fieldnames = self.institutions[0].keys()
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(self.institutions)
logger.info(f"Exported {len(self.institutions)} records to {output_file}")
def export_json(self, output_file: Path):
"""Export to JSON with metadata."""
output = {
'extraction_date': datetime.now(timezone.utc).isoformat(),
'data_source': 'KB Netherlands Library Network',
'source_url': 'https://www.bibliotheeknetwerk.nl/',
'source_file': 'KB_Netherlands_ISIL_2025-04-01.xlsx',
'parser_version': '1.0.0',
'country': 'Netherlands',
'record_count': len(self.institutions),
'institutions': self.institutions
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output, f, indent=2, ensure_ascii=False)
logger.info(f"Exported {len(self.institutions)} records to {output_file}")
def main():
"""Main execution."""
logger.info("=== KB Netherlands ISIL Parser ===")
if not INPUT_FILE.exists():
logger.error(f"Input file not found: {INPUT_FILE}")
logger.error("Please download the file first:")
logger.error("https://www.bibliotheeknetwerk.nl/sites/default/files/documents/20250401%20Bnetwerk%20overzicht%20ISIL-codes%20Bibliotheken%20Nederland.xlsx")
return
parser = KBNetherlandsISILParser(INPUT_FILE)
institutions = parser.parse_excel()
if not institutions:
logger.error("No institutions found in Excel file")
return
# Export results
csv_output = OUTPUT_DIR / "kb_netherlands_public_libraries.csv"
json_output = OUTPUT_DIR / "kb_netherlands_public_libraries.json"
parser.export_csv(csv_output)
parser.export_json(json_output)
logger.info("\n=== Parsing Complete ===")
logger.info(f"Total institutions extracted: {len(institutions)}")
logger.info(f"\nOutput directory: {OUTPUT_DIR}")
if __name__ == "__main__":
main()