Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import pandas as pd | |
| import time | |
| import json | |
| import sqlite3 | |
| import os | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import aiohttp | |
| from typing import Dict, Any, List, Optional | |
| import xml.etree.ElementTree as ET | |
| import io | |
| import sys | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import hashlib | |
| # Configuration | |
| st.set_page_config( | |
| page_title="π Global API Data Harvester - 10 Data Sources", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Database setup | |
| DB_PATH = "api_data.db" | |
| def init_database(): | |
| """Initialize SQLite database for storing API data""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Create tables for each API source | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS api_endpoints ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| endpoint_name TEXT NOT NULL, | |
| url TEXT NOT NULL, | |
| method TEXT DEFAULT 'GET', | |
| headers TEXT, | |
| discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| last_used TIMESTAMP, | |
| status TEXT DEFAULT 'active', | |
| UNIQUE(api_name, endpoint_name) | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS api_data ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| endpoint_name TEXT NOT NULL, | |
| data_hash TEXT, | |
| raw_data TEXT, | |
| processed_data TEXT, | |
| metadata TEXT, | |
| fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| record_count INTEGER, | |
| status TEXT DEFAULT 'success' | |
| ) | |
| ''') | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS fetch_jobs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| job_name TEXT NOT NULL, | |
| apis_included TEXT, | |
| started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| completed_at TIMESTAMP, | |
| status TEXT DEFAULT 'running', | |
| total_endpoints INTEGER, | |
| completed_endpoints INTEGER, | |
| errors TEXT | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| # Enhanced API Configuration with auto-discovery - ALL 10 APIS | |
| API_DISCOVERY_CONFIG = { | |
| "Skolverket": { | |
| "base_url": "https://api.skolverket.se", | |
| "discovery_endpoints": [ | |
| "/planned-educations/v3/", | |
| "/skolenhetsregister/v2/", | |
| "/syllabus/v1/", | |
| "/susa-navet/v1/" | |
| ], | |
| "known_endpoints": { | |
| "planned-educations": "/planned-educations/v3/compact-school-units", | |
| "skolenhetsregister": "/skolenhetsregister/v2/skolenhet", | |
| "syllabus": "/syllabus/v1/studievag", | |
| "susa-navet": "/susa-navet/v1/utbildningstillfalle" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "SCB": { | |
| "base_url": "https://api.scb.se", | |
| "discovery_endpoints": [ | |
| "/OV0104/v1/doris/", | |
| ], | |
| "known_endpoints": { | |
| "befolkning": "/OV0104/v1/doris/sv/ssd/BE/BE0101/BE0101A/BefolkningNy", | |
| "arbetsmarknad": "/OV0104/v1/doris/sv/ssd/AM/AM0101/AM0101A/ArbStatusM" | |
| }, | |
| "auth": None, | |
| "rate_limit": {"requests": 10, "per_seconds": 10} | |
| }, | |
| "Kolada": { | |
| "base_url": "https://api.kolada.se", | |
| "discovery_endpoints": [ | |
| "/v2/" | |
| ], | |
| "known_endpoints": { | |
| "kpi": "/v2/kpi", | |
| "data": "/v2/data/kpi", | |
| "municipality": "/v2/municipality", | |
| "ou": "/v2/ou" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "Eurostat": { | |
| "base_url": "https://ec.europa.eu/eurostat", | |
| "discovery_endpoints": [ | |
| "/api/dissemination/statistics/1.0/", | |
| ], | |
| "known_endpoints": { | |
| "education": "/api/dissemination/statistics/1.0/data/educ_uoe_enra21", | |
| "population": "/api/dissemination/statistics/1.0/data/demo_pjan", | |
| "gdp": "/api/dissemination/statistics/1.0/data/nama_10_gdp" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "WHO": { | |
| "base_url": "https://ghoapi.azureedge.net", | |
| "discovery_endpoints": [ | |
| "/api/" | |
| ], | |
| "known_endpoints": { | |
| "indicators": "/api/Indicator", | |
| "countries": "/api/DIMENSION/COUNTRY/DimensionValues", | |
| "life_expectancy": "/api/GHO/WHOSIS_000001" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "OECD": { | |
| "base_url": "https://sdmx.oecd.org", | |
| "discovery_endpoints": [ | |
| "/public/rest/data/" | |
| ], | |
| "known_endpoints": { | |
| "gdp": "/public/rest/data/OECD.SDD.NAD,DSD_NAMAIN1@NAAG,1.0/.A.GDP_V_CAP.", | |
| "unemployment": "/public/rest/data/OECD.ELS.SAE,DSD_SOCX@SOCX,1.0/", | |
| "education": "/public/rest/data/OECD.EDU.IMEP,DSD_EAG@EAG,1.0/" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "VΓ€rldsbanken": { | |
| "base_url": "https://api.worldbank.org", | |
| "discovery_endpoints": [ | |
| "/v2/" | |
| ], | |
| "known_endpoints": { | |
| "countries": "/v2/country", | |
| "indicators": "/v2/indicator", | |
| "sources": "/v2/source", | |
| "gdp_data": "/v2/country/all/indicator/NY.GDP.MKTP.CD", | |
| "population_data": "/v2/country/all/indicator/SP.POP.TOTL" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "Riksbanken": { | |
| "base_url": "https://api.riksbank.se", | |
| "discovery_endpoints": [ | |
| "/swea/v1/" | |
| ], | |
| "known_endpoints": { | |
| "crossrates": "/swea/v1/CrossRates", | |
| "observations": "/swea/v1/Observations", | |
| "series": "/swea/v1/Series", | |
| "eur_sek": "/swea/v1/Observations/SEKEURPMI", | |
| "usd_sek": "/swea/v1/Observations/SEKUSDPMI" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| }, | |
| "Swecris": { | |
| "base_url": "https://swecris-api.vr.se", | |
| "discovery_endpoints": [ | |
| "/v1/" | |
| ], | |
| "known_endpoints": { | |
| "projects": "/v1/projects", | |
| "organizations": "/v1/organizations", | |
| "persons": "/v1/persons", | |
| "publications": "/v1/publications" | |
| }, | |
| "auth": {"type": "Bearer", "token": "VRSwecrisAPI2025-1"}, | |
| "rate_limit": None | |
| }, | |
| "CSN": { | |
| "base_url": "https://statistik.csn.se", | |
| "discovery_endpoints": [ | |
| "/PXWeb/api/v1/sv/CSNstat/" | |
| ], | |
| "known_endpoints": { | |
| "studiemedel_hogskola": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Studiemedel/Hogskola/SS0101B1.px", | |
| "aterbetalning": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Aterbetalning/SS0201A1.px", | |
| "studiestod_totalt": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Totalt/" | |
| }, | |
| "auth": None, | |
| "rate_limit": None | |
| } | |
| } | |
| class DatabaseManager: | |
| """Manage SQLite database operations""" | |
| def __init__(self, db_path: str = DB_PATH): | |
| self.db_path = db_path | |
| init_database() | |
| def save_endpoint(self, api_name: str, endpoint_name: str, url: str, method: str = 'GET', headers: Dict = None): | |
| """Save discovered endpoint to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| headers_json = json.dumps(headers) if headers else None | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO api_endpoints | |
| (api_name, endpoint_name, url, method, headers, discovered_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', (api_name, endpoint_name, url, method, headers_json, datetime.now())) | |
| conn.commit() | |
| conn.close() | |
| def save_data(self, api_name: str, endpoint_name: str, data: Any, metadata: Dict = None): | |
| """Save fetched data to database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create data hash for deduplication | |
| data_str = json.dumps(data, sort_keys=True, default=str) | |
| data_hash = hashlib.sha256(data_str.encode()).hexdigest() | |
| # Count records if data is a list | |
| record_count = len(data) if isinstance(data, list) else 1 | |
| cursor.execute(''' | |
| INSERT INTO api_data | |
| (api_name, endpoint_name, data_hash, raw_data, metadata, fetched_at, record_count) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', (api_name, endpoint_name, data_hash, data_str, | |
| json.dumps(metadata) if metadata else None, datetime.now(), record_count)) | |
| conn.commit() | |
| conn.close() | |
| return data_hash | |
| def get_endpoints(self, api_name: str = None) -> List[Dict]: | |
| """Get all discovered endpoints""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if api_name: | |
| cursor.execute('SELECT * FROM api_endpoints WHERE api_name = ? ORDER BY discovered_at DESC', (api_name,)) | |
| else: | |
| cursor.execute('SELECT * FROM api_endpoints ORDER BY api_name, discovered_at DESC') | |
| endpoints = [] | |
| for row in cursor.fetchall(): | |
| endpoints.append({ | |
| 'id': row[0], | |
| 'api_name': row[1], | |
| 'endpoint_name': row[2], | |
| 'url': row[3], | |
| 'method': row[4], | |
| 'headers': json.loads(row[5]) if row[5] else {}, | |
| 'discovered_at': row[6], | |
| 'last_used': row[7], | |
| 'status': row[8] | |
| }) | |
| conn.close() | |
| return endpoints | |
| def get_data_summary(self) -> Dict: | |
| """Get summary of stored data""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT api_name, COUNT(*) as fetch_count, SUM(record_count) as total_records, | |
| MAX(fetched_at) as last_fetch | |
| FROM api_data | |
| GROUP BY api_name | |
| ''') | |
| summary = {} | |
| for row in cursor.fetchall(): | |
| summary[row[0]] = { | |
| 'fetch_count': row[1], | |
| 'total_records': row[2], | |
| 'last_fetch': row[3] | |
| } | |
| conn.close() | |
| return summary | |
| class AutoAPIDiscoverer: | |
| """Automatically discover API endpoints""" | |
| def __init__(self, db_manager: DatabaseManager): | |
| self.db_manager = db_manager | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)' | |
| }) | |
| def discover_endpoints(self, api_name: str, progress_callback=None) -> List[Dict]: | |
| """Discover available endpoints for an API""" | |
| config = API_DISCOVERY_CONFIG.get(api_name) | |
| if not config: | |
| return [] | |
| discovered = [] | |
| # First, register known endpoints | |
| for endpoint_name, url in config["known_endpoints"].items(): | |
| full_url = config["base_url"] + url | |
| self.db_manager.save_endpoint(api_name, endpoint_name, full_url) | |
| discovered.append({ | |
| 'name': endpoint_name, | |
| 'url': full_url, | |
| 'method': 'GET', | |
| 'status': 'known' | |
| }) | |
| if progress_callback: | |
| progress_callback(f"Registered known endpoint: {endpoint_name}") | |
| # Try to discover additional endpoints | |
| for discovery_path in config["discovery_endpoints"]: | |
| try: | |
| discovery_url = config["base_url"] + discovery_path | |
| response = self.session.get(discovery_url, timeout=10) | |
| if response.status_code == 200: | |
| # Try to parse as JSON for API documentation | |
| try: | |
| data = response.json() | |
| # Look for endpoint patterns in the response | |
| self._extract_endpoints_from_json(api_name, data, config["base_url"], discovered) | |
| except: | |
| # Try to parse HTML for documentation links | |
| self._extract_endpoints_from_html(api_name, response.text, config["base_url"], discovered) | |
| if progress_callback: | |
| progress_callback(f"Scanned discovery path: {discovery_path}") | |
| except Exception as e: | |
| if progress_callback: | |
| progress_callback(f"Error scanning {discovery_path}: {str(e)}") | |
| return discovered | |
| def _extract_endpoints_from_json(self, api_name: str, data: Any, base_url: str, discovered: List[Dict]): | |
| """Extract endpoint information from JSON response""" | |
| # This is a simplified implementation - would need to be customized per API | |
| if isinstance(data, dict): | |
| # Look for common API documentation patterns | |
| if 'paths' in data: # OpenAPI/Swagger format | |
| for path, methods in data['paths'].items(): | |
| endpoint_name = path.strip('/').replace('/', '_') | |
| full_url = base_url + path | |
| self.db_manager.save_endpoint(api_name, endpoint_name, full_url) | |
| discovered.append({ | |
| 'name': endpoint_name, | |
| 'url': full_url, | |
| 'method': 'GET', | |
| 'status': 'discovered' | |
| }) | |
| def _extract_endpoints_from_html(self, api_name: str, html: str, base_url: str, discovered: List[Dict]): | |
| """Extract endpoint information from HTML documentation""" | |
| # Simple pattern matching for common API documentation | |
| import re | |
| # Look for API endpoint patterns | |
| patterns = [ | |
| r'/api/[^"\s]+', | |
| r'/v\d+/[^"\s]+', | |
| r'https?://[^"\s]*api[^"\s]*' | |
| ] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, html) | |
| for match in matches: | |
| if match.startswith('http'): | |
| endpoint_url = match | |
| else: | |
| endpoint_url = base_url + match | |
| endpoint_name = match.strip('/').replace('/', '_').replace('.', '_') | |
| # Avoid duplicates | |
| if not any(d['url'] == endpoint_url for d in discovered): | |
| self.db_manager.save_endpoint(api_name, endpoint_name, endpoint_url) | |
| discovered.append({ | |
| 'name': endpoint_name, | |
| 'url': endpoint_url, | |
| 'method': 'GET', | |
| 'status': 'discovered' | |
| }) | |
| class BulkDataFetcher: | |
| """Fetch data from multiple APIs in parallel""" | |
| def __init__(self, db_manager: DatabaseManager): | |
| self.db_manager = db_manager | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)' | |
| }) | |
| def fetch_all_data(self, selected_apis: List[str], progress_callback=None) -> Dict: | |
| """Fetch data from all endpoints of selected APIs""" | |
| results = { | |
| 'total_endpoints': 0, | |
| 'successful_fetches': 0, | |
| 'failed_fetches': 0, | |
| 'total_records': 0, | |
| 'errors': [] | |
| } | |
| # Get all endpoints for selected APIs | |
| all_endpoints = [] | |
| for api_name in selected_apis: | |
| endpoints = self.db_manager.get_endpoints(api_name) | |
| all_endpoints.extend(endpoints) | |
| results['total_endpoints'] = len(all_endpoints) | |
| if progress_callback: | |
| progress_callback(f"Starting bulk fetch from {len(all_endpoints)} endpoints across {len(selected_apis)} APIs") | |
| # Use ThreadPoolExecutor for parallel fetching | |
| with ThreadPoolExecutor(max_workers=5) as executor: | |
| future_to_endpoint = { | |
| executor.submit(self._fetch_endpoint_data, endpoint): endpoint | |
| for endpoint in all_endpoints | |
| } | |
| for future in as_completed(future_to_endpoint): | |
| endpoint = future_to_endpoint[future] | |
| try: | |
| result = future.result() | |
| if result['status'] == 'success': | |
| results['successful_fetches'] += 1 | |
| results['total_records'] += result.get('record_count', 0) | |
| # Save to database | |
| self.db_manager.save_data( | |
| endpoint['api_name'], | |
| endpoint['endpoint_name'], | |
| result['data'], | |
| result.get('metadata', {}) | |
| ) | |
| else: | |
| results['failed_fetches'] += 1 | |
| results['errors'].append(f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {result.get('error', 'Unknown error')}") | |
| if progress_callback: | |
| progress_callback(f"Fetched {endpoint['api_name']}/{endpoint['endpoint_name']}: {result['status']}") | |
| except Exception as e: | |
| results['failed_fetches'] += 1 | |
| error_msg = f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {str(e)}" | |
| results['errors'].append(error_msg) | |
| if progress_callback: | |
| progress_callback(f"Error: {error_msg}") | |
| return results | |
| def _fetch_endpoint_data(self, endpoint: Dict) -> Dict: | |
| """Fetch data from a single endpoint""" | |
| try: | |
| api_name = endpoint['api_name'] | |
| config = API_DISCOVERY_CONFIG.get(api_name, {}) | |
| # Apply rate limiting if specified | |
| rate_limit = config.get('rate_limit') | |
| if rate_limit: | |
| time.sleep(rate_limit['per_seconds'] / rate_limit['requests']) | |
| # Setup headers with authentication if needed | |
| headers = endpoint.get('headers', {}) | |
| auth_config = config.get('auth') | |
| if auth_config and auth_config.get('type') == 'Bearer': | |
| headers['Authorization'] = f"Bearer {auth_config['token']}" | |
| # API-specific header configuration | |
| if api_name == "Skolverket" and "planned-educations" in endpoint['url']: | |
| headers['accept'] = "application/vnd.skolverket.plannededucations.api.v3.hal+json" | |
| elif api_name == "Eurostat": | |
| headers['accept'] = "application/json" | |
| elif api_name == "WHO": | |
| headers['accept'] = "application/json" | |
| elif api_name == "OECD": | |
| headers['accept'] = "application/vnd.sdmx.data+json;version=1.0.0" | |
| elif api_name == "CSN": | |
| headers['accept'] = "application/json" | |
| # Handle special cases for API calls | |
| if api_name == "CSN" and endpoint['url'].endswith('.px'): | |
| # PX-Web APIs need POST with query data | |
| query_data = {"query": [], "response": {"format": "json"}} | |
| response = self.session.post(endpoint['url'], json=query_data, headers=headers, timeout=30) | |
| else: | |
| response = self.session.get(endpoint['url'], headers=headers, timeout=30) | |
| response.raise_for_status() | |
| # Parse response based on content type | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'application/json' in content_type: | |
| data = response.json() | |
| elif 'application/xml' in content_type or 'text/xml' in content_type: | |
| # Simple XML to dict conversion | |
| root = ET.fromstring(response.text) | |
| data = self._xml_to_dict(root) | |
| else: | |
| data = response.text | |
| # Extract relevant data based on API structure | |
| processed_data = self._process_api_response(api_name, data) | |
| return { | |
| 'status': 'success', | |
| 'data': processed_data, | |
| 'record_count': len(processed_data) if isinstance(processed_data, list) else 1, | |
| 'metadata': { | |
| 'endpoint': endpoint['url'], | |
| 'response_size': len(response.text), | |
| 'content_type': content_type | |
| } | |
| } | |
| except Exception as e: | |
| return { | |
| 'status': 'error', | |
| 'error': str(e) | |
| } | |
| def _process_api_response(self, api_name: str, data: Any) -> Any: | |
| """Process API response based on known data structures""" | |
| if api_name == "Skolverket": | |
| if isinstance(data, dict): | |
| # Extract from HAL format | |
| if '_embedded' in data: | |
| for key, value in data['_embedded'].items(): | |
| if isinstance(value, list): | |
| return value | |
| # Extract from direct response | |
| if 'Skolenheter' in data: | |
| return data['Skolenheter'] | |
| return data | |
| elif api_name == "SCB": | |
| if isinstance(data, dict): | |
| if 'data' in data: | |
| return data['data'] | |
| elif 'variables' in data: | |
| return data['variables'] | |
| return data | |
| elif api_name == "Kolada": | |
| if isinstance(data, dict) and 'values' in data: | |
| return data['values'] | |
| return data | |
| elif api_name == "Eurostat": | |
| if isinstance(data, dict): | |
| # Eurostat returns complex nested structures | |
| if 'value' in data: | |
| return data['value'] | |
| elif 'data' in data: | |
| return data['data'] | |
| elif 'dimension' in data: | |
| return data # Keep full structure for complex data | |
| return data | |
| elif api_name == "WHO": | |
| if isinstance(data, dict): | |
| # WHO GHO API structure | |
| if 'value' in data: | |
| return data['value'] | |
| elif 'fact' in data: | |
| return data['fact'] | |
| elif 'IndicatorName' in data: | |
| return data # Keep indicator metadata | |
| elif isinstance(data, list): | |
| return data | |
| return data | |
| elif api_name == "OECD": | |
| if isinstance(data, dict): | |
| # OECD SDMX structure | |
| if 'data' in data: | |
| return data['data'] | |
| elif 'dataSets' in data: | |
| datasets = data['dataSets'] | |
| if isinstance(datasets, list) and len(datasets) > 0: | |
| return datasets[0].get('observations', {}) | |
| return data | |
| elif api_name == "VΓ€rldsbanken": | |
| if isinstance(data, list) and len(data) > 1: | |
| return data[1] if data[1] else data[0] | |
| return data | |
| elif api_name == "Riksbanken": | |
| if isinstance(data, dict): | |
| # Riksbank API structure | |
| if 'observations' in data: | |
| return data['observations'] | |
| elif 'data' in data: | |
| return data['data'] | |
| return data | |
| elif api_name == "Swecris": | |
| if isinstance(data, dict): | |
| # Swecris API structure | |
| if 'items' in data: | |
| return data['items'] | |
| elif 'projects' in data: | |
| return data['projects'] | |
| elif 'organizations' in data: | |
| return data['organizations'] | |
| return data | |
| elif api_name == "CSN": | |
| if isinstance(data, dict): | |
| # PX-Web structure | |
| if 'data' in data: | |
| return data['data'] | |
| elif 'variables' in data: | |
| return data['variables'] | |
| return data | |
| else: | |
| return data | |
| def _xml_to_dict(self, element) -> Dict: | |
| """Convert XML element to dictionary""" | |
| result = {} | |
| if element.attrib: | |
| result.update(element.attrib) | |
| if element.text and element.text.strip(): | |
| if len(element) == 0: | |
| return element.text.strip() | |
| result['text'] = element.text.strip() | |
| for child in element: | |
| child_data = self._xml_to_dict(child) | |
| if child.tag in result: | |
| if not isinstance(result[child.tag], list): | |
| result[child.tag] = [result[child.tag]] | |
| result[child.tag].append(child_data) | |
| else: | |
| result[child.tag] = child_data | |
| return result | |
| # Initialize components | |
| if 'db_manager' not in st.session_state: | |
| st.session_state.db_manager = DatabaseManager() | |
| if 'discoverer' not in st.session_state: | |
| st.session_state.discoverer = AutoAPIDiscoverer(st.session_state.db_manager) | |
| if 'bulk_fetcher' not in st.session_state: | |
| st.session_state.bulk_fetcher = BulkDataFetcher(st.session_state.db_manager) | |
| # Main App Layout | |
| st.title("π Global API Data Harvester - 10 Data Sources") | |
| st.markdown("**Automatisk datainsamling frΓ₯n svenska myndigheter, EU-organisationer och internationella institutioner**") | |
| st.markdown("π **StΓΆdda API:er:** Skolverket, SCB, Kolada, Eurostat, WHO, OECD, VΓ€rldsbanken, Riksbanken, Swecris, CSN") | |
| # Sidebar Navigation | |
| st.sidebar.title("ποΈ Navigation") | |
| page = st.sidebar.radio( | |
| "VΓ€lj sida", | |
| ["π Dashboard", "π Endpoint Discovery", "π Bulk Data Collection", "πΎ Database Manager", "βοΈ Individual APIs"] | |
| ) | |
| if page == "π Dashboard": | |
| st.header("π API Data Dashboard") | |
| # Summary metrics | |
| col1, col2, col3, col4 = st.columns(4) | |
| data_summary = st.session_state.db_manager.get_data_summary() | |
| total_apis = len(data_summary) | |
| total_fetches = sum(api['fetch_count'] for api in data_summary.values()) | |
| total_records = sum(api['total_records'] for api in data_summary.values()) | |
| with col1: | |
| st.metric("Active APIs", total_apis) | |
| with col2: | |
| st.metric("Total Fetches", total_fetches) | |
| with col3: | |
| st.metric("Total Records", f"{total_records:,}") | |
| with col4: | |
| st.metric("Database Size", f"{os.path.getsize(DB_PATH) / 1024 / 1024:.1f} MB" if os.path.exists(DB_PATH) else "0 MB") | |
| # API Status Overview | |
| st.subheader("π API Status Overview") | |
| if data_summary: | |
| df_summary = pd.DataFrame(data_summary).T | |
| df_summary.reset_index(inplace=True) | |
| df_summary.columns = ['API', 'Fetch Count', 'Total Records', 'Last Fetch'] | |
| st.dataframe(df_summary, use_container_width=True) | |
| else: | |
| st.info("No data collected yet. Use the Bulk Data Collection to start harvesting!") | |
| # Quick Actions | |
| st.subheader("π Quick Actions") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("π Discover All Endpoints", type="primary"): | |
| st.switch_page = "endpoint_discovery" | |
| with col2: | |
| if st.button("π Fetch All Data", type="primary"): | |
| st.switch_page = "bulk_collection" | |
| with col3: | |
| if st.button("πΎ Export Database", type="secondary"): | |
| # Export functionality | |
| conn = sqlite3.connect(DB_PATH) | |
| # Get all tables | |
| tables = ['api_endpoints', 'api_data', 'fetch_jobs'] | |
| export_data = {} | |
| for table in tables: | |
| df = pd.read_sql_query(f"SELECT * FROM {table}", conn) | |
| export_data[table] = df | |
| conn.close() | |
| # Create downloadable JSON | |
| export_json = json.dumps({ | |
| table: df.to_dict('records') | |
| for table, df in export_data.items() | |
| }, default=str, indent=2) | |
| st.download_button( | |
| "π Download Full Database (JSON)", | |
| data=export_json, | |
| file_name=f"api_harvester_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
| mime="application/json" | |
| ) | |
| elif page == "π Endpoint Discovery": | |
| st.header("π Automatic Endpoint Discovery") | |
| st.markdown(""" | |
| Discover all available endpoints from Swedish APIs automatically. | |
| This process will scan known API documentation and register new endpoints. | |
| """) | |
| # Select APIs to discover | |
| selected_apis = st.multiselect( | |
| "Select APIs to discover endpoints for:", | |
| list(API_DISCOVERY_CONFIG.keys()), | |
| default=list(API_DISCOVERY_CONFIG.keys()) | |
| ) | |
| if st.button("π Start Discovery", type="primary"): | |
| if selected_apis: | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| results_container = st.empty() | |
| all_discovered = {} | |
| for i, api_name in enumerate(selected_apis): | |
| status_text.text(f"Discovering endpoints for {api_name}...") | |
| def progress_callback(message): | |
| status_text.text(f"{api_name}: {message}") | |
| discovered = st.session_state.discoverer.discover_endpoints(api_name, progress_callback) | |
| all_discovered[api_name] = discovered | |
| progress_bar.progress((i + 1) / len(selected_apis)) | |
| status_text.text("Discovery completed!") | |
| # Show results | |
| st.subheader("π Discovery Results") | |
| for api_name, endpoints in all_discovered.items(): | |
| with st.expander(f"{api_name} - {len(endpoints)} endpoints"): | |
| if endpoints: | |
| df = pd.DataFrame(endpoints) | |
| st.dataframe(df, use_container_width=True) | |
| else: | |
| st.info("No new endpoints discovered") | |
| else: | |
| st.warning("Please select at least one API to discover") | |
| # Show current endpoints | |
| st.subheader("π Current Endpoints Database") | |
| endpoints = st.session_state.db_manager.get_endpoints() | |
| if endpoints: | |
| df_endpoints = pd.DataFrame(endpoints) | |
| st.dataframe(df_endpoints, use_container_width=True) | |
| else: | |
| st.info("No endpoints registered yet. Run discovery first!") | |
| elif page == "π Bulk Data Collection": | |
| st.header("π Bulk Data Collection") | |
| st.markdown(""" | |
| Automatically fetch data from all discovered endpoints and store in database. | |
| This process runs in parallel for efficiency. | |
| """) | |
| # Configuration | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| selected_apis = st.multiselect( | |
| "Select APIs to fetch data from:", | |
| list(API_DISCOVERY_CONFIG.keys()), | |
| default=list(API_DISCOVERY_CONFIG.keys()) | |
| ) | |
| with col2: | |
| max_workers = st.slider("Parallel Workers", 1, 10, 5) | |
| timeout_seconds = st.slider("Request Timeout (seconds)", 10, 60, 30) | |
| # Start bulk collection | |
| if st.button("π Start Bulk Collection", type="primary"): | |
| if selected_apis: | |
| # Check if endpoints exist | |
| total_endpoints = 0 | |
| for api_name in selected_apis: | |
| endpoints = st.session_state.db_manager.get_endpoints(api_name) | |
| total_endpoints += len(endpoints) | |
| if total_endpoints == 0: | |
| st.warning("No endpoints found. Please run endpoint discovery first!") | |
| else: | |
| st.info(f"Starting bulk collection from {total_endpoints} endpoints...") | |
| # Progress tracking | |
| progress_bar = st.progress(0) | |
| status_text = st.empty() | |
| results_container = st.container() | |
| # Track progress | |
| progress_data = { | |
| 'completed': 0, | |
| 'total': total_endpoints, | |
| 'errors': [] | |
| } | |
| def progress_callback(message): | |
| progress_data['completed'] += 1 | |
| progress_bar.progress(progress_data['completed'] / progress_data['total']) | |
| status_text.text(f"Progress: {progress_data['completed']}/{progress_data['total']} - {message}") | |
| # Run bulk fetch | |
| results = st.session_state.bulk_fetcher.fetch_all_data(selected_apis, progress_callback) | |
| # Show results | |
| with results_container: | |
| st.success("β Bulk collection completed!") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Successful Fetches", results['successful_fetches']) | |
| with col2: | |
| st.metric("Failed Fetches", results['failed_fetches']) | |
| with col3: | |
| st.metric("Total Records", f"{results['total_records']:,}") | |
| if results['errors']: | |
| st.subheader("β Errors") | |
| for error in results['errors'][:10]: # Show first 10 errors | |
| st.error(error) | |
| if len(results['errors']) > 10: | |
| st.info(f"... and {len(results['errors']) - 10} more errors") | |
| else: | |
| st.warning("Please select at least one API") | |
| elif page == "πΎ Database Manager": | |
| st.header("πΎ Database Management") | |
| # Database statistics | |
| st.subheader("π Database Statistics") | |
| conn = sqlite3.connect(DB_PATH) | |
| # Table sizes | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| endpoint_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_endpoints", conn).iloc[0]['count'] | |
| st.metric("Registered Endpoints", endpoint_count) | |
| with col2: | |
| data_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_data", conn).iloc[0]['count'] | |
| st.metric("Data Records", data_count) | |
| with col3: | |
| job_count = pd.read_sql_query("SELECT COUNT(*) as count FROM fetch_jobs", conn).iloc[0]['count'] | |
| st.metric("Fetch Jobs", job_count) | |
| conn.close() | |
| # Data exploration | |
| st.subheader("π Data Explorer") | |
| tab1, tab2, tab3 = st.tabs(["π‘ Endpoints", "π Data Records", "π Fetch Jobs"]) | |
| with tab1: | |
| conn = sqlite3.connect(DB_PATH) | |
| df_endpoints = pd.read_sql_query("SELECT * FROM api_endpoints ORDER BY discovered_at DESC", conn) | |
| conn.close() | |
| if not df_endpoints.empty: | |
| st.dataframe(df_endpoints, use_container_width=True) | |
| # Export endpoints | |
| csv = df_endpoints.to_csv(index=False) | |
| st.download_button( | |
| "π Export Endpoints (CSV)", | |
| data=csv, | |
| file_name=f"api_endpoints_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", | |
| mime="text/csv" | |
| ) | |
| else: | |
| st.info("No endpoints registered yet") | |
| with tab2: | |
| conn = sqlite3.connect(DB_PATH) | |
| df_data = pd.read_sql_query(""" | |
| SELECT api_name, endpoint_name, record_count, fetched_at, status | |
| FROM api_data | |
| ORDER BY fetched_at DESC | |
| LIMIT 1000 | |
| """, conn) | |
| conn.close() | |
| if not df_data.empty: | |
| st.dataframe(df_data, use_container_width=True) | |
| # Data analysis | |
| st.subheader("π Data Analysis") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # Records by API | |
| api_records = df_data.groupby('api_name')['record_count'].sum().reset_index() | |
| st.bar_chart(api_records.set_index('api_name')) | |
| with col2: | |
| # Fetches over time | |
| df_data['fetch_date'] = pd.to_datetime(df_data['fetched_at']).dt.date | |
| daily_fetches = df_data.groupby('fetch_date').size().reset_index(name='fetches') | |
| st.line_chart(daily_fetches.set_index('fetch_date')) | |
| else: | |
| st.info("No data records yet") | |
| with tab3: | |
| conn = sqlite3.connect(DB_PATH) | |
| df_jobs = pd.read_sql_query("SELECT * FROM fetch_jobs ORDER BY started_at DESC", conn) | |
| conn.close() | |
| if not df_jobs.empty: | |
| st.dataframe(df_jobs, use_container_width=True) | |
| else: | |
| st.info("No fetch jobs yet") | |
| # Database maintenance | |
| st.subheader("π§ Database Maintenance") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| if st.button("ποΈ Clear All Data"): | |
| if st.checkbox("I understand this will delete all data"): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM api_data") | |
| cursor.execute("DELETE FROM fetch_jobs") | |
| conn.commit() | |
| conn.close() | |
| st.success("All data cleared!") | |
| st.rerun() | |
| with col2: | |
| if st.button("β»οΈ Optimize Database"): | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("VACUUM") | |
| conn.commit() | |
| conn.close() | |
| st.success("Database optimized!") | |
| with col3: | |
| if st.button("π Create Backup"): | |
| import shutil | |
| backup_path = f"api_data_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" | |
| shutil.copy2(DB_PATH, backup_path) | |
| st.success(f"Backup created: {backup_path}") | |
| elif page == "βοΈ Individual APIs": | |
| st.header("βοΈ Individual API Testing") | |
| st.markdown("Test individual APIs and endpoints for debugging and development.") | |
| # This would include the original individual API functionality | |
| # For brevity, showing a simplified version | |
| selected_api = st.selectbox("Select API", list(API_DISCOVERY_CONFIG.keys())) | |
| endpoints = st.session_state.db_manager.get_endpoints(selected_api) | |
| if endpoints: | |
| endpoint_names = [f"{ep['endpoint_name']} ({ep['url']})" for ep in endpoints] | |
| selected_endpoint_idx = st.selectbox("Select Endpoint", range(len(endpoints)), format_func=lambda x: endpoint_names[x]) | |
| if st.button("π§ͺ Test Endpoint"): | |
| endpoint = endpoints[selected_endpoint_idx] | |
| with st.spinner("Testing endpoint..."): | |
| result = st.session_state.bulk_fetcher._fetch_endpoint_data(endpoint) | |
| if result['status'] == 'success': | |
| st.success("β Endpoint test successful!") | |
| # Show data | |
| data = result['data'] | |
| if isinstance(data, list) and len(data) > 0: | |
| df = pd.DataFrame(data[:100]) # Show first 100 records | |
| st.dataframe(df, use_container_width=True) | |
| st.info(f"Showing first 100 of {len(data)} records") | |
| else: | |
| st.json(data) | |
| else: | |
| st.error(f"β Endpoint test failed: {result.get('error', 'Unknown error')}") | |
| else: | |
| st.info(f"No endpoints registered for {selected_api}. Run endpoint discovery first!") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown("**π Global API Data Harvester** - Automated collection from 10 international data sources") | |
| st.markdown("β **10 APIs** β’ **AUTO-DISCOVERY** β’ **BULK COLLECTION** β’ **DATABASE STORAGE** β’ **REAL-TIME MONITORING**") | |
| st.markdown("πΈπͺ Svenska: Skolverket, SCB, Kolada, Riksbanken, Swecris, CSN β’ π Globala: Eurostat, WHO, OECD, VΓ€rldsbanken") |