import streamlit as st import requests import pandas as pd import time import json import sqlite3 import os from datetime import datetime, timedelta import asyncio import aiohttp from typing import Dict, Any, List, Optional import xml.etree.ElementTree as ET import io import sys import threading from concurrent.futures import ThreadPoolExecutor, as_completed import hashlib # Configuration st.set_page_config( page_title="🌍 Global API Data Harvester - 10 Data Sources", page_icon="🌍", layout="wide", initial_sidebar_state="expanded" ) # Database setup DB_PATH = "api_data.db" def init_database(): """Initialize SQLite database for storing API data""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Create tables for each API source cursor.execute(''' CREATE TABLE IF NOT EXISTS api_endpoints ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_name TEXT NOT NULL, endpoint_name TEXT NOT NULL, url TEXT NOT NULL, method TEXT DEFAULT 'GET', headers TEXT, discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_used TIMESTAMP, status TEXT DEFAULT 'active', UNIQUE(api_name, endpoint_name) ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS api_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_name TEXT NOT NULL, endpoint_name TEXT NOT NULL, data_hash TEXT, raw_data TEXT, processed_data TEXT, metadata TEXT, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, record_count INTEGER, status TEXT DEFAULT 'success' ) ''') cursor.execute(''' CREATE TABLE IF NOT EXISTS fetch_jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_name TEXT NOT NULL, apis_included TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, status TEXT DEFAULT 'running', total_endpoints INTEGER, completed_endpoints INTEGER, errors TEXT ) ''') conn.commit() conn.close() # Enhanced API Configuration with auto-discovery - ALL 10 APIS API_DISCOVERY_CONFIG = { "Skolverket": { "base_url": "https://api.skolverket.se", "discovery_endpoints": [ "/planned-educations/v3/", "/skolenhetsregister/v2/", "/syllabus/v1/", "/susa-navet/v1/" ], "known_endpoints": { "planned-educations": "/planned-educations/v3/compact-school-units", "skolenhetsregister": "/skolenhetsregister/v2/skolenhet", "syllabus": "/syllabus/v1/studievag", "susa-navet": "/susa-navet/v1/utbildningstillfalle" }, "auth": None, "rate_limit": None }, "SCB": { "base_url": "https://api.scb.se", "discovery_endpoints": [ "/OV0104/v1/doris/", ], "known_endpoints": { "befolkning": "/OV0104/v1/doris/sv/ssd/BE/BE0101/BE0101A/BefolkningNy", "arbetsmarknad": "/OV0104/v1/doris/sv/ssd/AM/AM0101/AM0101A/ArbStatusM" }, "auth": None, "rate_limit": {"requests": 10, "per_seconds": 10} }, "Kolada": { "base_url": "https://api.kolada.se", "discovery_endpoints": [ "/v2/" ], "known_endpoints": { "kpi": "/v2/kpi", "data": "/v2/data/kpi", "municipality": "/v2/municipality", "ou": "/v2/ou" }, "auth": None, "rate_limit": None }, "Eurostat": { "base_url": "https://ec.europa.eu/eurostat", "discovery_endpoints": [ "/api/dissemination/statistics/1.0/", ], "known_endpoints": { "education": "/api/dissemination/statistics/1.0/data/educ_uoe_enra21", "population": "/api/dissemination/statistics/1.0/data/demo_pjan", "gdp": "/api/dissemination/statistics/1.0/data/nama_10_gdp" }, "auth": None, "rate_limit": None }, "WHO": { "base_url": "https://ghoapi.azureedge.net", "discovery_endpoints": [ "/api/" ], "known_endpoints": { "indicators": "/api/Indicator", "countries": "/api/DIMENSION/COUNTRY/DimensionValues", "life_expectancy": "/api/GHO/WHOSIS_000001" }, "auth": None, "rate_limit": None }, "OECD": { "base_url": "https://sdmx.oecd.org", "discovery_endpoints": [ "/public/rest/data/" ], "known_endpoints": { "gdp": "/public/rest/data/OECD.SDD.NAD,DSD_NAMAIN1@NAAG,1.0/.A.GDP_V_CAP.", "unemployment": "/public/rest/data/OECD.ELS.SAE,DSD_SOCX@SOCX,1.0/", "education": "/public/rest/data/OECD.EDU.IMEP,DSD_EAG@EAG,1.0/" }, "auth": None, "rate_limit": None }, "VΓ€rldsbanken": { "base_url": "https://api.worldbank.org", "discovery_endpoints": [ "/v2/" ], "known_endpoints": { "countries": "/v2/country", "indicators": "/v2/indicator", "sources": "/v2/source", "gdp_data": "/v2/country/all/indicator/NY.GDP.MKTP.CD", "population_data": "/v2/country/all/indicator/SP.POP.TOTL" }, "auth": None, "rate_limit": None }, "Riksbanken": { "base_url": "https://api.riksbank.se", "discovery_endpoints": [ "/swea/v1/" ], "known_endpoints": { "crossrates": "/swea/v1/CrossRates", "observations": "/swea/v1/Observations", "series": "/swea/v1/Series", "eur_sek": "/swea/v1/Observations/SEKEURPMI", "usd_sek": "/swea/v1/Observations/SEKUSDPMI" }, "auth": None, "rate_limit": None }, "Swecris": { "base_url": "https://swecris-api.vr.se", "discovery_endpoints": [ "/v1/" ], "known_endpoints": { "projects": "/v1/projects", "organizations": "/v1/organizations", "persons": "/v1/persons", "publications": "/v1/publications" }, "auth": {"type": "Bearer", "token": "VRSwecrisAPI2025-1"}, "rate_limit": None }, "CSN": { "base_url": "https://statistik.csn.se", "discovery_endpoints": [ "/PXWeb/api/v1/sv/CSNstat/" ], "known_endpoints": { "studiemedel_hogskola": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Studiemedel/Hogskola/SS0101B1.px", "aterbetalning": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Aterbetalning/SS0201A1.px", "studiestod_totalt": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Totalt/" }, "auth": None, "rate_limit": None } } class DatabaseManager: """Manage SQLite database operations""" def __init__(self, db_path: str = DB_PATH): self.db_path = db_path init_database() def save_endpoint(self, api_name: str, endpoint_name: str, url: str, method: str = 'GET', headers: Dict = None): """Save discovered endpoint to database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() headers_json = json.dumps(headers) if headers else None cursor.execute(''' INSERT OR REPLACE INTO api_endpoints (api_name, endpoint_name, url, method, headers, discovered_at) VALUES (?, ?, ?, ?, ?, ?) ''', (api_name, endpoint_name, url, method, headers_json, datetime.now())) conn.commit() conn.close() def save_data(self, api_name: str, endpoint_name: str, data: Any, metadata: Dict = None): """Save fetched data to database""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create data hash for deduplication data_str = json.dumps(data, sort_keys=True, default=str) data_hash = hashlib.sha256(data_str.encode()).hexdigest() # Count records if data is a list record_count = len(data) if isinstance(data, list) else 1 cursor.execute(''' INSERT INTO api_data (api_name, endpoint_name, data_hash, raw_data, metadata, fetched_at, record_count) VALUES (?, ?, ?, ?, ?, ?, ?) ''', (api_name, endpoint_name, data_hash, data_str, json.dumps(metadata) if metadata else None, datetime.now(), record_count)) conn.commit() conn.close() return data_hash def get_endpoints(self, api_name: str = None) -> List[Dict]: """Get all discovered endpoints""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() if api_name: cursor.execute('SELECT * FROM api_endpoints WHERE api_name = ? ORDER BY discovered_at DESC', (api_name,)) else: cursor.execute('SELECT * FROM api_endpoints ORDER BY api_name, discovered_at DESC') endpoints = [] for row in cursor.fetchall(): endpoints.append({ 'id': row[0], 'api_name': row[1], 'endpoint_name': row[2], 'url': row[3], 'method': row[4], 'headers': json.loads(row[5]) if row[5] else {}, 'discovered_at': row[6], 'last_used': row[7], 'status': row[8] }) conn.close() return endpoints def get_data_summary(self) -> Dict: """Get summary of stored data""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT api_name, COUNT(*) as fetch_count, SUM(record_count) as total_records, MAX(fetched_at) as last_fetch FROM api_data GROUP BY api_name ''') summary = {} for row in cursor.fetchall(): summary[row[0]] = { 'fetch_count': row[1], 'total_records': row[2], 'last_fetch': row[3] } conn.close() return summary class AutoAPIDiscoverer: """Automatically discover API endpoints""" def __init__(self, db_manager: DatabaseManager): self.db_manager = db_manager self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)' }) def discover_endpoints(self, api_name: str, progress_callback=None) -> List[Dict]: """Discover available endpoints for an API""" config = API_DISCOVERY_CONFIG.get(api_name) if not config: return [] discovered = [] # First, register known endpoints for endpoint_name, url in config["known_endpoints"].items(): full_url = config["base_url"] + url self.db_manager.save_endpoint(api_name, endpoint_name, full_url) discovered.append({ 'name': endpoint_name, 'url': full_url, 'method': 'GET', 'status': 'known' }) if progress_callback: progress_callback(f"Registered known endpoint: {endpoint_name}") # Try to discover additional endpoints for discovery_path in config["discovery_endpoints"]: try: discovery_url = config["base_url"] + discovery_path response = self.session.get(discovery_url, timeout=10) if response.status_code == 200: # Try to parse as JSON for API documentation try: data = response.json() # Look for endpoint patterns in the response self._extract_endpoints_from_json(api_name, data, config["base_url"], discovered) except: # Try to parse HTML for documentation links self._extract_endpoints_from_html(api_name, response.text, config["base_url"], discovered) if progress_callback: progress_callback(f"Scanned discovery path: {discovery_path}") except Exception as e: if progress_callback: progress_callback(f"Error scanning {discovery_path}: {str(e)}") return discovered def _extract_endpoints_from_json(self, api_name: str, data: Any, base_url: str, discovered: List[Dict]): """Extract endpoint information from JSON response""" # This is a simplified implementation - would need to be customized per API if isinstance(data, dict): # Look for common API documentation patterns if 'paths' in data: # OpenAPI/Swagger format for path, methods in data['paths'].items(): endpoint_name = path.strip('/').replace('/', '_') full_url = base_url + path self.db_manager.save_endpoint(api_name, endpoint_name, full_url) discovered.append({ 'name': endpoint_name, 'url': full_url, 'method': 'GET', 'status': 'discovered' }) def _extract_endpoints_from_html(self, api_name: str, html: str, base_url: str, discovered: List[Dict]): """Extract endpoint information from HTML documentation""" # Simple pattern matching for common API documentation import re # Look for API endpoint patterns patterns = [ r'/api/[^"\s]+', r'/v\d+/[^"\s]+', r'https?://[^"\s]*api[^"\s]*' ] for pattern in patterns: matches = re.findall(pattern, html) for match in matches: if match.startswith('http'): endpoint_url = match else: endpoint_url = base_url + match endpoint_name = match.strip('/').replace('/', '_').replace('.', '_') # Avoid duplicates if not any(d['url'] == endpoint_url for d in discovered): self.db_manager.save_endpoint(api_name, endpoint_name, endpoint_url) discovered.append({ 'name': endpoint_name, 'url': endpoint_url, 'method': 'GET', 'status': 'discovered' }) class BulkDataFetcher: """Fetch data from multiple APIs in parallel""" def __init__(self, db_manager: DatabaseManager): self.db_manager = db_manager self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)' }) def fetch_all_data(self, selected_apis: List[str], progress_callback=None) -> Dict: """Fetch data from all endpoints of selected APIs""" results = { 'total_endpoints': 0, 'successful_fetches': 0, 'failed_fetches': 0, 'total_records': 0, 'errors': [] } # Get all endpoints for selected APIs all_endpoints = [] for api_name in selected_apis: endpoints = self.db_manager.get_endpoints(api_name) all_endpoints.extend(endpoints) results['total_endpoints'] = len(all_endpoints) if progress_callback: progress_callback(f"Starting bulk fetch from {len(all_endpoints)} endpoints across {len(selected_apis)} APIs") # Use ThreadPoolExecutor for parallel fetching with ThreadPoolExecutor(max_workers=5) as executor: future_to_endpoint = { executor.submit(self._fetch_endpoint_data, endpoint): endpoint for endpoint in all_endpoints } for future in as_completed(future_to_endpoint): endpoint = future_to_endpoint[future] try: result = future.result() if result['status'] == 'success': results['successful_fetches'] += 1 results['total_records'] += result.get('record_count', 0) # Save to database self.db_manager.save_data( endpoint['api_name'], endpoint['endpoint_name'], result['data'], result.get('metadata', {}) ) else: results['failed_fetches'] += 1 results['errors'].append(f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {result.get('error', 'Unknown error')}") if progress_callback: progress_callback(f"Fetched {endpoint['api_name']}/{endpoint['endpoint_name']}: {result['status']}") except Exception as e: results['failed_fetches'] += 1 error_msg = f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {str(e)}" results['errors'].append(error_msg) if progress_callback: progress_callback(f"Error: {error_msg}") return results def _fetch_endpoint_data(self, endpoint: Dict) -> Dict: """Fetch data from a single endpoint""" try: api_name = endpoint['api_name'] config = API_DISCOVERY_CONFIG.get(api_name, {}) # Apply rate limiting if specified rate_limit = config.get('rate_limit') if rate_limit: time.sleep(rate_limit['per_seconds'] / rate_limit['requests']) # Setup headers with authentication if needed headers = endpoint.get('headers', {}) auth_config = config.get('auth') if auth_config and auth_config.get('type') == 'Bearer': headers['Authorization'] = f"Bearer {auth_config['token']}" # API-specific header configuration if api_name == "Skolverket" and "planned-educations" in endpoint['url']: headers['accept'] = "application/vnd.skolverket.plannededucations.api.v3.hal+json" elif api_name == "Eurostat": headers['accept'] = "application/json" elif api_name == "WHO": headers['accept'] = "application/json" elif api_name == "OECD": headers['accept'] = "application/vnd.sdmx.data+json;version=1.0.0" elif api_name == "CSN": headers['accept'] = "application/json" # Handle special cases for API calls if api_name == "CSN" and endpoint['url'].endswith('.px'): # PX-Web APIs need POST with query data query_data = {"query": [], "response": {"format": "json"}} response = self.session.post(endpoint['url'], json=query_data, headers=headers, timeout=30) else: response = self.session.get(endpoint['url'], headers=headers, timeout=30) response.raise_for_status() # Parse response based on content type content_type = response.headers.get('Content-Type', '') if 'application/json' in content_type: data = response.json() elif 'application/xml' in content_type or 'text/xml' in content_type: # Simple XML to dict conversion root = ET.fromstring(response.text) data = self._xml_to_dict(root) else: data = response.text # Extract relevant data based on API structure processed_data = self._process_api_response(api_name, data) return { 'status': 'success', 'data': processed_data, 'record_count': len(processed_data) if isinstance(processed_data, list) else 1, 'metadata': { 'endpoint': endpoint['url'], 'response_size': len(response.text), 'content_type': content_type } } except Exception as e: return { 'status': 'error', 'error': str(e) } def _process_api_response(self, api_name: str, data: Any) -> Any: """Process API response based on known data structures""" if api_name == "Skolverket": if isinstance(data, dict): # Extract from HAL format if '_embedded' in data: for key, value in data['_embedded'].items(): if isinstance(value, list): return value # Extract from direct response if 'Skolenheter' in data: return data['Skolenheter'] return data elif api_name == "SCB": if isinstance(data, dict): if 'data' in data: return data['data'] elif 'variables' in data: return data['variables'] return data elif api_name == "Kolada": if isinstance(data, dict) and 'values' in data: return data['values'] return data elif api_name == "Eurostat": if isinstance(data, dict): # Eurostat returns complex nested structures if 'value' in data: return data['value'] elif 'data' in data: return data['data'] elif 'dimension' in data: return data # Keep full structure for complex data return data elif api_name == "WHO": if isinstance(data, dict): # WHO GHO API structure if 'value' in data: return data['value'] elif 'fact' in data: return data['fact'] elif 'IndicatorName' in data: return data # Keep indicator metadata elif isinstance(data, list): return data return data elif api_name == "OECD": if isinstance(data, dict): # OECD SDMX structure if 'data' in data: return data['data'] elif 'dataSets' in data: datasets = data['dataSets'] if isinstance(datasets, list) and len(datasets) > 0: return datasets[0].get('observations', {}) return data elif api_name == "VΓ€rldsbanken": if isinstance(data, list) and len(data) > 1: return data[1] if data[1] else data[0] return data elif api_name == "Riksbanken": if isinstance(data, dict): # Riksbank API structure if 'observations' in data: return data['observations'] elif 'data' in data: return data['data'] return data elif api_name == "Swecris": if isinstance(data, dict): # Swecris API structure if 'items' in data: return data['items'] elif 'projects' in data: return data['projects'] elif 'organizations' in data: return data['organizations'] return data elif api_name == "CSN": if isinstance(data, dict): # PX-Web structure if 'data' in data: return data['data'] elif 'variables' in data: return data['variables'] return data else: return data def _xml_to_dict(self, element) -> Dict: """Convert XML element to dictionary""" result = {} if element.attrib: result.update(element.attrib) if element.text and element.text.strip(): if len(element) == 0: return element.text.strip() result['text'] = element.text.strip() for child in element: child_data = self._xml_to_dict(child) if child.tag in result: if not isinstance(result[child.tag], list): result[child.tag] = [result[child.tag]] result[child.tag].append(child_data) else: result[child.tag] = child_data return result # Initialize components if 'db_manager' not in st.session_state: st.session_state.db_manager = DatabaseManager() if 'discoverer' not in st.session_state: st.session_state.discoverer = AutoAPIDiscoverer(st.session_state.db_manager) if 'bulk_fetcher' not in st.session_state: st.session_state.bulk_fetcher = BulkDataFetcher(st.session_state.db_manager) # Main App Layout st.title("🌍 Global API Data Harvester - 10 Data Sources") st.markdown("**Automatisk datainsamling frΓ₯n svenska myndigheter, EU-organisationer och internationella institutioner**") st.markdown("πŸ“Š **StΓΆdda API:er:** Skolverket, SCB, Kolada, Eurostat, WHO, OECD, VΓ€rldsbanken, Riksbanken, Swecris, CSN") # Sidebar Navigation st.sidebar.title("πŸ—‚οΈ Navigation") page = st.sidebar.radio( "VΓ€lj sida", ["🏠 Dashboard", "πŸ” Endpoint Discovery", "πŸ“Š Bulk Data Collection", "πŸ’Ύ Database Manager", "βš™οΈ Individual APIs"] ) if page == "🏠 Dashboard": st.header("πŸ“Š API Data Dashboard") # Summary metrics col1, col2, col3, col4 = st.columns(4) data_summary = st.session_state.db_manager.get_data_summary() total_apis = len(data_summary) total_fetches = sum(api['fetch_count'] for api in data_summary.values()) total_records = sum(api['total_records'] for api in data_summary.values()) with col1: st.metric("Active APIs", total_apis) with col2: st.metric("Total Fetches", total_fetches) with col3: st.metric("Total Records", f"{total_records:,}") with col4: st.metric("Database Size", f"{os.path.getsize(DB_PATH) / 1024 / 1024:.1f} MB" if os.path.exists(DB_PATH) else "0 MB") # API Status Overview st.subheader("πŸ” API Status Overview") if data_summary: df_summary = pd.DataFrame(data_summary).T df_summary.reset_index(inplace=True) df_summary.columns = ['API', 'Fetch Count', 'Total Records', 'Last Fetch'] st.dataframe(df_summary, use_container_width=True) else: st.info("No data collected yet. Use the Bulk Data Collection to start harvesting!") # Quick Actions st.subheader("πŸš€ Quick Actions") col1, col2, col3 = st.columns(3) with col1: if st.button("πŸ” Discover All Endpoints", type="primary"): st.switch_page = "endpoint_discovery" with col2: if st.button("πŸ“Š Fetch All Data", type="primary"): st.switch_page = "bulk_collection" with col3: if st.button("πŸ’Ύ Export Database", type="secondary"): # Export functionality conn = sqlite3.connect(DB_PATH) # Get all tables tables = ['api_endpoints', 'api_data', 'fetch_jobs'] export_data = {} for table in tables: df = pd.read_sql_query(f"SELECT * FROM {table}", conn) export_data[table] = df conn.close() # Create downloadable JSON export_json = json.dumps({ table: df.to_dict('records') for table, df in export_data.items() }, default=str, indent=2) st.download_button( "πŸ“„ Download Full Database (JSON)", data=export_json, file_name=f"api_harvester_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", mime="application/json" ) elif page == "πŸ” Endpoint Discovery": st.header("πŸ” Automatic Endpoint Discovery") st.markdown(""" Discover all available endpoints from Swedish APIs automatically. This process will scan known API documentation and register new endpoints. """) # Select APIs to discover selected_apis = st.multiselect( "Select APIs to discover endpoints for:", list(API_DISCOVERY_CONFIG.keys()), default=list(API_DISCOVERY_CONFIG.keys()) ) if st.button("πŸ” Start Discovery", type="primary"): if selected_apis: progress_bar = st.progress(0) status_text = st.empty() results_container = st.empty() all_discovered = {} for i, api_name in enumerate(selected_apis): status_text.text(f"Discovering endpoints for {api_name}...") def progress_callback(message): status_text.text(f"{api_name}: {message}") discovered = st.session_state.discoverer.discover_endpoints(api_name, progress_callback) all_discovered[api_name] = discovered progress_bar.progress((i + 1) / len(selected_apis)) status_text.text("Discovery completed!") # Show results st.subheader("πŸ“‹ Discovery Results") for api_name, endpoints in all_discovered.items(): with st.expander(f"{api_name} - {len(endpoints)} endpoints"): if endpoints: df = pd.DataFrame(endpoints) st.dataframe(df, use_container_width=True) else: st.info("No new endpoints discovered") else: st.warning("Please select at least one API to discover") # Show current endpoints st.subheader("πŸ“Š Current Endpoints Database") endpoints = st.session_state.db_manager.get_endpoints() if endpoints: df_endpoints = pd.DataFrame(endpoints) st.dataframe(df_endpoints, use_container_width=True) else: st.info("No endpoints registered yet. Run discovery first!") elif page == "πŸ“Š Bulk Data Collection": st.header("πŸ“Š Bulk Data Collection") st.markdown(""" Automatically fetch data from all discovered endpoints and store in database. This process runs in parallel for efficiency. """) # Configuration col1, col2 = st.columns(2) with col1: selected_apis = st.multiselect( "Select APIs to fetch data from:", list(API_DISCOVERY_CONFIG.keys()), default=list(API_DISCOVERY_CONFIG.keys()) ) with col2: max_workers = st.slider("Parallel Workers", 1, 10, 5) timeout_seconds = st.slider("Request Timeout (seconds)", 10, 60, 30) # Start bulk collection if st.button("πŸš€ Start Bulk Collection", type="primary"): if selected_apis: # Check if endpoints exist total_endpoints = 0 for api_name in selected_apis: endpoints = st.session_state.db_manager.get_endpoints(api_name) total_endpoints += len(endpoints) if total_endpoints == 0: st.warning("No endpoints found. Please run endpoint discovery first!") else: st.info(f"Starting bulk collection from {total_endpoints} endpoints...") # Progress tracking progress_bar = st.progress(0) status_text = st.empty() results_container = st.container() # Track progress progress_data = { 'completed': 0, 'total': total_endpoints, 'errors': [] } def progress_callback(message): progress_data['completed'] += 1 progress_bar.progress(progress_data['completed'] / progress_data['total']) status_text.text(f"Progress: {progress_data['completed']}/{progress_data['total']} - {message}") # Run bulk fetch results = st.session_state.bulk_fetcher.fetch_all_data(selected_apis, progress_callback) # Show results with results_container: st.success("βœ… Bulk collection completed!") col1, col2, col3 = st.columns(3) with col1: st.metric("Successful Fetches", results['successful_fetches']) with col2: st.metric("Failed Fetches", results['failed_fetches']) with col3: st.metric("Total Records", f"{results['total_records']:,}") if results['errors']: st.subheader("❌ Errors") for error in results['errors'][:10]: # Show first 10 errors st.error(error) if len(results['errors']) > 10: st.info(f"... and {len(results['errors']) - 10} more errors") else: st.warning("Please select at least one API") elif page == "πŸ’Ύ Database Manager": st.header("πŸ’Ύ Database Management") # Database statistics st.subheader("πŸ“Š Database Statistics") conn = sqlite3.connect(DB_PATH) # Table sizes col1, col2, col3 = st.columns(3) with col1: endpoint_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_endpoints", conn).iloc[0]['count'] st.metric("Registered Endpoints", endpoint_count) with col2: data_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_data", conn).iloc[0]['count'] st.metric("Data Records", data_count) with col3: job_count = pd.read_sql_query("SELECT COUNT(*) as count FROM fetch_jobs", conn).iloc[0]['count'] st.metric("Fetch Jobs", job_count) conn.close() # Data exploration st.subheader("πŸ” Data Explorer") tab1, tab2, tab3 = st.tabs(["πŸ“‘ Endpoints", "πŸ“Š Data Records", "πŸ”„ Fetch Jobs"]) with tab1: conn = sqlite3.connect(DB_PATH) df_endpoints = pd.read_sql_query("SELECT * FROM api_endpoints ORDER BY discovered_at DESC", conn) conn.close() if not df_endpoints.empty: st.dataframe(df_endpoints, use_container_width=True) # Export endpoints csv = df_endpoints.to_csv(index=False) st.download_button( "πŸ“„ Export Endpoints (CSV)", data=csv, file_name=f"api_endpoints_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", mime="text/csv" ) else: st.info("No endpoints registered yet") with tab2: conn = sqlite3.connect(DB_PATH) df_data = pd.read_sql_query(""" SELECT api_name, endpoint_name, record_count, fetched_at, status FROM api_data ORDER BY fetched_at DESC LIMIT 1000 """, conn) conn.close() if not df_data.empty: st.dataframe(df_data, use_container_width=True) # Data analysis st.subheader("πŸ“ˆ Data Analysis") col1, col2 = st.columns(2) with col1: # Records by API api_records = df_data.groupby('api_name')['record_count'].sum().reset_index() st.bar_chart(api_records.set_index('api_name')) with col2: # Fetches over time df_data['fetch_date'] = pd.to_datetime(df_data['fetched_at']).dt.date daily_fetches = df_data.groupby('fetch_date').size().reset_index(name='fetches') st.line_chart(daily_fetches.set_index('fetch_date')) else: st.info("No data records yet") with tab3: conn = sqlite3.connect(DB_PATH) df_jobs = pd.read_sql_query("SELECT * FROM fetch_jobs ORDER BY started_at DESC", conn) conn.close() if not df_jobs.empty: st.dataframe(df_jobs, use_container_width=True) else: st.info("No fetch jobs yet") # Database maintenance st.subheader("πŸ”§ Database Maintenance") col1, col2, col3 = st.columns(3) with col1: if st.button("πŸ—‘οΈ Clear All Data"): if st.checkbox("I understand this will delete all data"): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("DELETE FROM api_data") cursor.execute("DELETE FROM fetch_jobs") conn.commit() conn.close() st.success("All data cleared!") st.rerun() with col2: if st.button("♻️ Optimize Database"): conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() cursor.execute("VACUUM") conn.commit() conn.close() st.success("Database optimized!") with col3: if st.button("πŸ“‹ Create Backup"): import shutil backup_path = f"api_data_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db" shutil.copy2(DB_PATH, backup_path) st.success(f"Backup created: {backup_path}") elif page == "βš™οΈ Individual APIs": st.header("βš™οΈ Individual API Testing") st.markdown("Test individual APIs and endpoints for debugging and development.") # This would include the original individual API functionality # For brevity, showing a simplified version selected_api = st.selectbox("Select API", list(API_DISCOVERY_CONFIG.keys())) endpoints = st.session_state.db_manager.get_endpoints(selected_api) if endpoints: endpoint_names = [f"{ep['endpoint_name']} ({ep['url']})" for ep in endpoints] selected_endpoint_idx = st.selectbox("Select Endpoint", range(len(endpoints)), format_func=lambda x: endpoint_names[x]) if st.button("πŸ§ͺ Test Endpoint"): endpoint = endpoints[selected_endpoint_idx] with st.spinner("Testing endpoint..."): result = st.session_state.bulk_fetcher._fetch_endpoint_data(endpoint) if result['status'] == 'success': st.success("βœ… Endpoint test successful!") # Show data data = result['data'] if isinstance(data, list) and len(data) > 0: df = pd.DataFrame(data[:100]) # Show first 100 records st.dataframe(df, use_container_width=True) st.info(f"Showing first 100 of {len(data)} records") else: st.json(data) else: st.error(f"❌ Endpoint test failed: {result.get('error', 'Unknown error')}") else: st.info(f"No endpoints registered for {selected_api}. Run endpoint discovery first!") # Footer st.markdown("---") st.markdown("**🌍 Global API Data Harvester** - Automated collection from 10 international data sources") st.markdown("βœ… **10 APIs** β€’ **AUTO-DISCOVERY** β€’ **BULK COLLECTION** β€’ **DATABASE STORAGE** β€’ **REAL-TIME MONITORING**") st.markdown("πŸ‡ΈπŸ‡ͺ Svenska: Skolverket, SCB, Kolada, Riksbanken, Swecris, CSN β€’ 🌍 Globala: Eurostat, WHO, OECD, VΓ€rldsbanken")