api-data-fetcher / app_enhanced.py
isakskogstad's picture
Upload app_enhanced.py with huggingface_hub
94820b8 verified
import streamlit as st
import requests
import pandas as pd
import time
import json
import sqlite3
import os
from datetime import datetime, timedelta
import asyncio
import aiohttp
from typing import Dict, Any, List, Optional
import xml.etree.ElementTree as ET
import io
import sys
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import hashlib
# Configuration
st.set_page_config(
page_title="🌍 Global API Data Harvester - 10 Data Sources",
page_icon="🌍",
layout="wide",
initial_sidebar_state="expanded"
)
# Database setup
DB_PATH = "api_data.db"
def init_database():
"""Initialize SQLite database for storing API data"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create tables for each API source
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_endpoints (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_name TEXT NOT NULL,
endpoint_name TEXT NOT NULL,
url TEXT NOT NULL,
method TEXT DEFAULT 'GET',
headers TEXT,
discovered_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP,
status TEXT DEFAULT 'active',
UNIQUE(api_name, endpoint_name)
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS api_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_name TEXT NOT NULL,
endpoint_name TEXT NOT NULL,
data_hash TEXT,
raw_data TEXT,
processed_data TEXT,
metadata TEXT,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
record_count INTEGER,
status TEXT DEFAULT 'success'
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS fetch_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_name TEXT NOT NULL,
apis_included TEXT,
started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
completed_at TIMESTAMP,
status TEXT DEFAULT 'running',
total_endpoints INTEGER,
completed_endpoints INTEGER,
errors TEXT
)
''')
conn.commit()
conn.close()
# Enhanced API Configuration with auto-discovery - ALL 10 APIS
API_DISCOVERY_CONFIG = {
"Skolverket": {
"base_url": "https://api.skolverket.se",
"discovery_endpoints": [
"/planned-educations/v3/",
"/skolenhetsregister/v2/",
"/syllabus/v1/",
"/susa-navet/v1/"
],
"known_endpoints": {
"planned-educations": "/planned-educations/v3/compact-school-units",
"skolenhetsregister": "/skolenhetsregister/v2/skolenhet",
"syllabus": "/syllabus/v1/studievag",
"susa-navet": "/susa-navet/v1/utbildningstillfalle"
},
"auth": None,
"rate_limit": None
},
"SCB": {
"base_url": "https://api.scb.se",
"discovery_endpoints": [
"/OV0104/v1/doris/",
],
"known_endpoints": {
"befolkning": "/OV0104/v1/doris/sv/ssd/BE/BE0101/BE0101A/BefolkningNy",
"arbetsmarknad": "/OV0104/v1/doris/sv/ssd/AM/AM0101/AM0101A/ArbStatusM"
},
"auth": None,
"rate_limit": {"requests": 10, "per_seconds": 10}
},
"Kolada": {
"base_url": "https://api.kolada.se",
"discovery_endpoints": [
"/v2/"
],
"known_endpoints": {
"kpi": "/v2/kpi",
"data": "/v2/data/kpi",
"municipality": "/v2/municipality",
"ou": "/v2/ou"
},
"auth": None,
"rate_limit": None
},
"Eurostat": {
"base_url": "https://ec.europa.eu/eurostat",
"discovery_endpoints": [
"/api/dissemination/statistics/1.0/",
],
"known_endpoints": {
"education": "/api/dissemination/statistics/1.0/data/educ_uoe_enra21",
"population": "/api/dissemination/statistics/1.0/data/demo_pjan",
"gdp": "/api/dissemination/statistics/1.0/data/nama_10_gdp"
},
"auth": None,
"rate_limit": None
},
"WHO": {
"base_url": "https://ghoapi.azureedge.net",
"discovery_endpoints": [
"/api/"
],
"known_endpoints": {
"indicators": "/api/Indicator",
"countries": "/api/DIMENSION/COUNTRY/DimensionValues",
"life_expectancy": "/api/GHO/WHOSIS_000001"
},
"auth": None,
"rate_limit": None
},
"OECD": {
"base_url": "https://sdmx.oecd.org",
"discovery_endpoints": [
"/public/rest/data/"
],
"known_endpoints": {
"gdp": "/public/rest/data/OECD.SDD.NAD,DSD_NAMAIN1@NAAG,1.0/.A.GDP_V_CAP.",
"unemployment": "/public/rest/data/OECD.ELS.SAE,DSD_SOCX@SOCX,1.0/",
"education": "/public/rest/data/OECD.EDU.IMEP,DSD_EAG@EAG,1.0/"
},
"auth": None,
"rate_limit": None
},
"VΓ€rldsbanken": {
"base_url": "https://api.worldbank.org",
"discovery_endpoints": [
"/v2/"
],
"known_endpoints": {
"countries": "/v2/country",
"indicators": "/v2/indicator",
"sources": "/v2/source",
"gdp_data": "/v2/country/all/indicator/NY.GDP.MKTP.CD",
"population_data": "/v2/country/all/indicator/SP.POP.TOTL"
},
"auth": None,
"rate_limit": None
},
"Riksbanken": {
"base_url": "https://api.riksbank.se",
"discovery_endpoints": [
"/swea/v1/"
],
"known_endpoints": {
"crossrates": "/swea/v1/CrossRates",
"observations": "/swea/v1/Observations",
"series": "/swea/v1/Series",
"eur_sek": "/swea/v1/Observations/SEKEURPMI",
"usd_sek": "/swea/v1/Observations/SEKUSDPMI"
},
"auth": None,
"rate_limit": None
},
"Swecris": {
"base_url": "https://swecris-api.vr.se",
"discovery_endpoints": [
"/v1/"
],
"known_endpoints": {
"projects": "/v1/projects",
"organizations": "/v1/organizations",
"persons": "/v1/persons",
"publications": "/v1/publications"
},
"auth": {"type": "Bearer", "token": "VRSwecrisAPI2025-1"},
"rate_limit": None
},
"CSN": {
"base_url": "https://statistik.csn.se",
"discovery_endpoints": [
"/PXWeb/api/v1/sv/CSNstat/"
],
"known_endpoints": {
"studiemedel_hogskola": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Studiemedel/Hogskola/SS0101B1.px",
"aterbetalning": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Aterbetalning/SS0201A1.px",
"studiestod_totalt": "/PXWeb/api/v1/sv/CSNstat/Studiestod/Totalt/"
},
"auth": None,
"rate_limit": None
}
}
class DatabaseManager:
"""Manage SQLite database operations"""
def __init__(self, db_path: str = DB_PATH):
self.db_path = db_path
init_database()
def save_endpoint(self, api_name: str, endpoint_name: str, url: str, method: str = 'GET', headers: Dict = None):
"""Save discovered endpoint to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
headers_json = json.dumps(headers) if headers else None
cursor.execute('''
INSERT OR REPLACE INTO api_endpoints
(api_name, endpoint_name, url, method, headers, discovered_at)
VALUES (?, ?, ?, ?, ?, ?)
''', (api_name, endpoint_name, url, method, headers_json, datetime.now()))
conn.commit()
conn.close()
def save_data(self, api_name: str, endpoint_name: str, data: Any, metadata: Dict = None):
"""Save fetched data to database"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create data hash for deduplication
data_str = json.dumps(data, sort_keys=True, default=str)
data_hash = hashlib.sha256(data_str.encode()).hexdigest()
# Count records if data is a list
record_count = len(data) if isinstance(data, list) else 1
cursor.execute('''
INSERT INTO api_data
(api_name, endpoint_name, data_hash, raw_data, metadata, fetched_at, record_count)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (api_name, endpoint_name, data_hash, data_str,
json.dumps(metadata) if metadata else None, datetime.now(), record_count))
conn.commit()
conn.close()
return data_hash
def get_endpoints(self, api_name: str = None) -> List[Dict]:
"""Get all discovered endpoints"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
if api_name:
cursor.execute('SELECT * FROM api_endpoints WHERE api_name = ? ORDER BY discovered_at DESC', (api_name,))
else:
cursor.execute('SELECT * FROM api_endpoints ORDER BY api_name, discovered_at DESC')
endpoints = []
for row in cursor.fetchall():
endpoints.append({
'id': row[0],
'api_name': row[1],
'endpoint_name': row[2],
'url': row[3],
'method': row[4],
'headers': json.loads(row[5]) if row[5] else {},
'discovered_at': row[6],
'last_used': row[7],
'status': row[8]
})
conn.close()
return endpoints
def get_data_summary(self) -> Dict:
"""Get summary of stored data"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT api_name, COUNT(*) as fetch_count, SUM(record_count) as total_records,
MAX(fetched_at) as last_fetch
FROM api_data
GROUP BY api_name
''')
summary = {}
for row in cursor.fetchall():
summary[row[0]] = {
'fetch_count': row[1],
'total_records': row[2],
'last_fetch': row[3]
}
conn.close()
return summary
class AutoAPIDiscoverer:
"""Automatically discover API endpoints"""
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)'
})
def discover_endpoints(self, api_name: str, progress_callback=None) -> List[Dict]:
"""Discover available endpoints for an API"""
config = API_DISCOVERY_CONFIG.get(api_name)
if not config:
return []
discovered = []
# First, register known endpoints
for endpoint_name, url in config["known_endpoints"].items():
full_url = config["base_url"] + url
self.db_manager.save_endpoint(api_name, endpoint_name, full_url)
discovered.append({
'name': endpoint_name,
'url': full_url,
'method': 'GET',
'status': 'known'
})
if progress_callback:
progress_callback(f"Registered known endpoint: {endpoint_name}")
# Try to discover additional endpoints
for discovery_path in config["discovery_endpoints"]:
try:
discovery_url = config["base_url"] + discovery_path
response = self.session.get(discovery_url, timeout=10)
if response.status_code == 200:
# Try to parse as JSON for API documentation
try:
data = response.json()
# Look for endpoint patterns in the response
self._extract_endpoints_from_json(api_name, data, config["base_url"], discovered)
except:
# Try to parse HTML for documentation links
self._extract_endpoints_from_html(api_name, response.text, config["base_url"], discovered)
if progress_callback:
progress_callback(f"Scanned discovery path: {discovery_path}")
except Exception as e:
if progress_callback:
progress_callback(f"Error scanning {discovery_path}: {str(e)}")
return discovered
def _extract_endpoints_from_json(self, api_name: str, data: Any, base_url: str, discovered: List[Dict]):
"""Extract endpoint information from JSON response"""
# This is a simplified implementation - would need to be customized per API
if isinstance(data, dict):
# Look for common API documentation patterns
if 'paths' in data: # OpenAPI/Swagger format
for path, methods in data['paths'].items():
endpoint_name = path.strip('/').replace('/', '_')
full_url = base_url + path
self.db_manager.save_endpoint(api_name, endpoint_name, full_url)
discovered.append({
'name': endpoint_name,
'url': full_url,
'method': 'GET',
'status': 'discovered'
})
def _extract_endpoints_from_html(self, api_name: str, html: str, base_url: str, discovered: List[Dict]):
"""Extract endpoint information from HTML documentation"""
# Simple pattern matching for common API documentation
import re
# Look for API endpoint patterns
patterns = [
r'/api/[^"\s]+',
r'/v\d+/[^"\s]+',
r'https?://[^"\s]*api[^"\s]*'
]
for pattern in patterns:
matches = re.findall(pattern, html)
for match in matches:
if match.startswith('http'):
endpoint_url = match
else:
endpoint_url = base_url + match
endpoint_name = match.strip('/').replace('/', '_').replace('.', '_')
# Avoid duplicates
if not any(d['url'] == endpoint_url for d in discovered):
self.db_manager.save_endpoint(api_name, endpoint_name, endpoint_url)
discovered.append({
'name': endpoint_name,
'url': endpoint_url,
'method': 'GET',
'status': 'discovered'
})
class BulkDataFetcher:
"""Fetch data from multiple APIs in parallel"""
def __init__(self, db_manager: DatabaseManager):
self.db_manager = db_manager
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Swedish-API-Harvester/1.0 (Educational Purpose)'
})
def fetch_all_data(self, selected_apis: List[str], progress_callback=None) -> Dict:
"""Fetch data from all endpoints of selected APIs"""
results = {
'total_endpoints': 0,
'successful_fetches': 0,
'failed_fetches': 0,
'total_records': 0,
'errors': []
}
# Get all endpoints for selected APIs
all_endpoints = []
for api_name in selected_apis:
endpoints = self.db_manager.get_endpoints(api_name)
all_endpoints.extend(endpoints)
results['total_endpoints'] = len(all_endpoints)
if progress_callback:
progress_callback(f"Starting bulk fetch from {len(all_endpoints)} endpoints across {len(selected_apis)} APIs")
# Use ThreadPoolExecutor for parallel fetching
with ThreadPoolExecutor(max_workers=5) as executor:
future_to_endpoint = {
executor.submit(self._fetch_endpoint_data, endpoint): endpoint
for endpoint in all_endpoints
}
for future in as_completed(future_to_endpoint):
endpoint = future_to_endpoint[future]
try:
result = future.result()
if result['status'] == 'success':
results['successful_fetches'] += 1
results['total_records'] += result.get('record_count', 0)
# Save to database
self.db_manager.save_data(
endpoint['api_name'],
endpoint['endpoint_name'],
result['data'],
result.get('metadata', {})
)
else:
results['failed_fetches'] += 1
results['errors'].append(f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {result.get('error', 'Unknown error')}")
if progress_callback:
progress_callback(f"Fetched {endpoint['api_name']}/{endpoint['endpoint_name']}: {result['status']}")
except Exception as e:
results['failed_fetches'] += 1
error_msg = f"{endpoint['api_name']}/{endpoint['endpoint_name']}: {str(e)}"
results['errors'].append(error_msg)
if progress_callback:
progress_callback(f"Error: {error_msg}")
return results
def _fetch_endpoint_data(self, endpoint: Dict) -> Dict:
"""Fetch data from a single endpoint"""
try:
api_name = endpoint['api_name']
config = API_DISCOVERY_CONFIG.get(api_name, {})
# Apply rate limiting if specified
rate_limit = config.get('rate_limit')
if rate_limit:
time.sleep(rate_limit['per_seconds'] / rate_limit['requests'])
# Setup headers with authentication if needed
headers = endpoint.get('headers', {})
auth_config = config.get('auth')
if auth_config and auth_config.get('type') == 'Bearer':
headers['Authorization'] = f"Bearer {auth_config['token']}"
# API-specific header configuration
if api_name == "Skolverket" and "planned-educations" in endpoint['url']:
headers['accept'] = "application/vnd.skolverket.plannededucations.api.v3.hal+json"
elif api_name == "Eurostat":
headers['accept'] = "application/json"
elif api_name == "WHO":
headers['accept'] = "application/json"
elif api_name == "OECD":
headers['accept'] = "application/vnd.sdmx.data+json;version=1.0.0"
elif api_name == "CSN":
headers['accept'] = "application/json"
# Handle special cases for API calls
if api_name == "CSN" and endpoint['url'].endswith('.px'):
# PX-Web APIs need POST with query data
query_data = {"query": [], "response": {"format": "json"}}
response = self.session.post(endpoint['url'], json=query_data, headers=headers, timeout=30)
else:
response = self.session.get(endpoint['url'], headers=headers, timeout=30)
response.raise_for_status()
# Parse response based on content type
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
data = response.json()
elif 'application/xml' in content_type or 'text/xml' in content_type:
# Simple XML to dict conversion
root = ET.fromstring(response.text)
data = self._xml_to_dict(root)
else:
data = response.text
# Extract relevant data based on API structure
processed_data = self._process_api_response(api_name, data)
return {
'status': 'success',
'data': processed_data,
'record_count': len(processed_data) if isinstance(processed_data, list) else 1,
'metadata': {
'endpoint': endpoint['url'],
'response_size': len(response.text),
'content_type': content_type
}
}
except Exception as e:
return {
'status': 'error',
'error': str(e)
}
def _process_api_response(self, api_name: str, data: Any) -> Any:
"""Process API response based on known data structures"""
if api_name == "Skolverket":
if isinstance(data, dict):
# Extract from HAL format
if '_embedded' in data:
for key, value in data['_embedded'].items():
if isinstance(value, list):
return value
# Extract from direct response
if 'Skolenheter' in data:
return data['Skolenheter']
return data
elif api_name == "SCB":
if isinstance(data, dict):
if 'data' in data:
return data['data']
elif 'variables' in data:
return data['variables']
return data
elif api_name == "Kolada":
if isinstance(data, dict) and 'values' in data:
return data['values']
return data
elif api_name == "Eurostat":
if isinstance(data, dict):
# Eurostat returns complex nested structures
if 'value' in data:
return data['value']
elif 'data' in data:
return data['data']
elif 'dimension' in data:
return data # Keep full structure for complex data
return data
elif api_name == "WHO":
if isinstance(data, dict):
# WHO GHO API structure
if 'value' in data:
return data['value']
elif 'fact' in data:
return data['fact']
elif 'IndicatorName' in data:
return data # Keep indicator metadata
elif isinstance(data, list):
return data
return data
elif api_name == "OECD":
if isinstance(data, dict):
# OECD SDMX structure
if 'data' in data:
return data['data']
elif 'dataSets' in data:
datasets = data['dataSets']
if isinstance(datasets, list) and len(datasets) > 0:
return datasets[0].get('observations', {})
return data
elif api_name == "VΓ€rldsbanken":
if isinstance(data, list) and len(data) > 1:
return data[1] if data[1] else data[0]
return data
elif api_name == "Riksbanken":
if isinstance(data, dict):
# Riksbank API structure
if 'observations' in data:
return data['observations']
elif 'data' in data:
return data['data']
return data
elif api_name == "Swecris":
if isinstance(data, dict):
# Swecris API structure
if 'items' in data:
return data['items']
elif 'projects' in data:
return data['projects']
elif 'organizations' in data:
return data['organizations']
return data
elif api_name == "CSN":
if isinstance(data, dict):
# PX-Web structure
if 'data' in data:
return data['data']
elif 'variables' in data:
return data['variables']
return data
else:
return data
def _xml_to_dict(self, element) -> Dict:
"""Convert XML element to dictionary"""
result = {}
if element.attrib:
result.update(element.attrib)
if element.text and element.text.strip():
if len(element) == 0:
return element.text.strip()
result['text'] = element.text.strip()
for child in element:
child_data = self._xml_to_dict(child)
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_data)
else:
result[child.tag] = child_data
return result
# Initialize components
if 'db_manager' not in st.session_state:
st.session_state.db_manager = DatabaseManager()
if 'discoverer' not in st.session_state:
st.session_state.discoverer = AutoAPIDiscoverer(st.session_state.db_manager)
if 'bulk_fetcher' not in st.session_state:
st.session_state.bulk_fetcher = BulkDataFetcher(st.session_state.db_manager)
# Main App Layout
st.title("🌍 Global API Data Harvester - 10 Data Sources")
st.markdown("**Automatisk datainsamling frΓ₯n svenska myndigheter, EU-organisationer och internationella institutioner**")
st.markdown("πŸ“Š **StΓΆdda API:er:** Skolverket, SCB, Kolada, Eurostat, WHO, OECD, VΓ€rldsbanken, Riksbanken, Swecris, CSN")
# Sidebar Navigation
st.sidebar.title("πŸ—‚οΈ Navigation")
page = st.sidebar.radio(
"VΓ€lj sida",
["🏠 Dashboard", "πŸ” Endpoint Discovery", "πŸ“Š Bulk Data Collection", "πŸ’Ύ Database Manager", "βš™οΈ Individual APIs"]
)
if page == "🏠 Dashboard":
st.header("πŸ“Š API Data Dashboard")
# Summary metrics
col1, col2, col3, col4 = st.columns(4)
data_summary = st.session_state.db_manager.get_data_summary()
total_apis = len(data_summary)
total_fetches = sum(api['fetch_count'] for api in data_summary.values())
total_records = sum(api['total_records'] for api in data_summary.values())
with col1:
st.metric("Active APIs", total_apis)
with col2:
st.metric("Total Fetches", total_fetches)
with col3:
st.metric("Total Records", f"{total_records:,}")
with col4:
st.metric("Database Size", f"{os.path.getsize(DB_PATH) / 1024 / 1024:.1f} MB" if os.path.exists(DB_PATH) else "0 MB")
# API Status Overview
st.subheader("πŸ” API Status Overview")
if data_summary:
df_summary = pd.DataFrame(data_summary).T
df_summary.reset_index(inplace=True)
df_summary.columns = ['API', 'Fetch Count', 'Total Records', 'Last Fetch']
st.dataframe(df_summary, use_container_width=True)
else:
st.info("No data collected yet. Use the Bulk Data Collection to start harvesting!")
# Quick Actions
st.subheader("πŸš€ Quick Actions")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("πŸ” Discover All Endpoints", type="primary"):
st.switch_page = "endpoint_discovery"
with col2:
if st.button("πŸ“Š Fetch All Data", type="primary"):
st.switch_page = "bulk_collection"
with col3:
if st.button("πŸ’Ύ Export Database", type="secondary"):
# Export functionality
conn = sqlite3.connect(DB_PATH)
# Get all tables
tables = ['api_endpoints', 'api_data', 'fetch_jobs']
export_data = {}
for table in tables:
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
export_data[table] = df
conn.close()
# Create downloadable JSON
export_json = json.dumps({
table: df.to_dict('records')
for table, df in export_data.items()
}, default=str, indent=2)
st.download_button(
"πŸ“„ Download Full Database (JSON)",
data=export_json,
file_name=f"api_harvester_export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
elif page == "πŸ” Endpoint Discovery":
st.header("πŸ” Automatic Endpoint Discovery")
st.markdown("""
Discover all available endpoints from Swedish APIs automatically.
This process will scan known API documentation and register new endpoints.
""")
# Select APIs to discover
selected_apis = st.multiselect(
"Select APIs to discover endpoints for:",
list(API_DISCOVERY_CONFIG.keys()),
default=list(API_DISCOVERY_CONFIG.keys())
)
if st.button("πŸ” Start Discovery", type="primary"):
if selected_apis:
progress_bar = st.progress(0)
status_text = st.empty()
results_container = st.empty()
all_discovered = {}
for i, api_name in enumerate(selected_apis):
status_text.text(f"Discovering endpoints for {api_name}...")
def progress_callback(message):
status_text.text(f"{api_name}: {message}")
discovered = st.session_state.discoverer.discover_endpoints(api_name, progress_callback)
all_discovered[api_name] = discovered
progress_bar.progress((i + 1) / len(selected_apis))
status_text.text("Discovery completed!")
# Show results
st.subheader("πŸ“‹ Discovery Results")
for api_name, endpoints in all_discovered.items():
with st.expander(f"{api_name} - {len(endpoints)} endpoints"):
if endpoints:
df = pd.DataFrame(endpoints)
st.dataframe(df, use_container_width=True)
else:
st.info("No new endpoints discovered")
else:
st.warning("Please select at least one API to discover")
# Show current endpoints
st.subheader("πŸ“Š Current Endpoints Database")
endpoints = st.session_state.db_manager.get_endpoints()
if endpoints:
df_endpoints = pd.DataFrame(endpoints)
st.dataframe(df_endpoints, use_container_width=True)
else:
st.info("No endpoints registered yet. Run discovery first!")
elif page == "πŸ“Š Bulk Data Collection":
st.header("πŸ“Š Bulk Data Collection")
st.markdown("""
Automatically fetch data from all discovered endpoints and store in database.
This process runs in parallel for efficiency.
""")
# Configuration
col1, col2 = st.columns(2)
with col1:
selected_apis = st.multiselect(
"Select APIs to fetch data from:",
list(API_DISCOVERY_CONFIG.keys()),
default=list(API_DISCOVERY_CONFIG.keys())
)
with col2:
max_workers = st.slider("Parallel Workers", 1, 10, 5)
timeout_seconds = st.slider("Request Timeout (seconds)", 10, 60, 30)
# Start bulk collection
if st.button("πŸš€ Start Bulk Collection", type="primary"):
if selected_apis:
# Check if endpoints exist
total_endpoints = 0
for api_name in selected_apis:
endpoints = st.session_state.db_manager.get_endpoints(api_name)
total_endpoints += len(endpoints)
if total_endpoints == 0:
st.warning("No endpoints found. Please run endpoint discovery first!")
else:
st.info(f"Starting bulk collection from {total_endpoints} endpoints...")
# Progress tracking
progress_bar = st.progress(0)
status_text = st.empty()
results_container = st.container()
# Track progress
progress_data = {
'completed': 0,
'total': total_endpoints,
'errors': []
}
def progress_callback(message):
progress_data['completed'] += 1
progress_bar.progress(progress_data['completed'] / progress_data['total'])
status_text.text(f"Progress: {progress_data['completed']}/{progress_data['total']} - {message}")
# Run bulk fetch
results = st.session_state.bulk_fetcher.fetch_all_data(selected_apis, progress_callback)
# Show results
with results_container:
st.success("βœ… Bulk collection completed!")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Successful Fetches", results['successful_fetches'])
with col2:
st.metric("Failed Fetches", results['failed_fetches'])
with col3:
st.metric("Total Records", f"{results['total_records']:,}")
if results['errors']:
st.subheader("❌ Errors")
for error in results['errors'][:10]: # Show first 10 errors
st.error(error)
if len(results['errors']) > 10:
st.info(f"... and {len(results['errors']) - 10} more errors")
else:
st.warning("Please select at least one API")
elif page == "πŸ’Ύ Database Manager":
st.header("πŸ’Ύ Database Management")
# Database statistics
st.subheader("πŸ“Š Database Statistics")
conn = sqlite3.connect(DB_PATH)
# Table sizes
col1, col2, col3 = st.columns(3)
with col1:
endpoint_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_endpoints", conn).iloc[0]['count']
st.metric("Registered Endpoints", endpoint_count)
with col2:
data_count = pd.read_sql_query("SELECT COUNT(*) as count FROM api_data", conn).iloc[0]['count']
st.metric("Data Records", data_count)
with col3:
job_count = pd.read_sql_query("SELECT COUNT(*) as count FROM fetch_jobs", conn).iloc[0]['count']
st.metric("Fetch Jobs", job_count)
conn.close()
# Data exploration
st.subheader("πŸ” Data Explorer")
tab1, tab2, tab3 = st.tabs(["πŸ“‘ Endpoints", "πŸ“Š Data Records", "πŸ”„ Fetch Jobs"])
with tab1:
conn = sqlite3.connect(DB_PATH)
df_endpoints = pd.read_sql_query("SELECT * FROM api_endpoints ORDER BY discovered_at DESC", conn)
conn.close()
if not df_endpoints.empty:
st.dataframe(df_endpoints, use_container_width=True)
# Export endpoints
csv = df_endpoints.to_csv(index=False)
st.download_button(
"πŸ“„ Export Endpoints (CSV)",
data=csv,
file_name=f"api_endpoints_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv"
)
else:
st.info("No endpoints registered yet")
with tab2:
conn = sqlite3.connect(DB_PATH)
df_data = pd.read_sql_query("""
SELECT api_name, endpoint_name, record_count, fetched_at, status
FROM api_data
ORDER BY fetched_at DESC
LIMIT 1000
""", conn)
conn.close()
if not df_data.empty:
st.dataframe(df_data, use_container_width=True)
# Data analysis
st.subheader("πŸ“ˆ Data Analysis")
col1, col2 = st.columns(2)
with col1:
# Records by API
api_records = df_data.groupby('api_name')['record_count'].sum().reset_index()
st.bar_chart(api_records.set_index('api_name'))
with col2:
# Fetches over time
df_data['fetch_date'] = pd.to_datetime(df_data['fetched_at']).dt.date
daily_fetches = df_data.groupby('fetch_date').size().reset_index(name='fetches')
st.line_chart(daily_fetches.set_index('fetch_date'))
else:
st.info("No data records yet")
with tab3:
conn = sqlite3.connect(DB_PATH)
df_jobs = pd.read_sql_query("SELECT * FROM fetch_jobs ORDER BY started_at DESC", conn)
conn.close()
if not df_jobs.empty:
st.dataframe(df_jobs, use_container_width=True)
else:
st.info("No fetch jobs yet")
# Database maintenance
st.subheader("πŸ”§ Database Maintenance")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("πŸ—‘οΈ Clear All Data"):
if st.checkbox("I understand this will delete all data"):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM api_data")
cursor.execute("DELETE FROM fetch_jobs")
conn.commit()
conn.close()
st.success("All data cleared!")
st.rerun()
with col2:
if st.button("♻️ Optimize Database"):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("VACUUM")
conn.commit()
conn.close()
st.success("Database optimized!")
with col3:
if st.button("πŸ“‹ Create Backup"):
import shutil
backup_path = f"api_data_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.db"
shutil.copy2(DB_PATH, backup_path)
st.success(f"Backup created: {backup_path}")
elif page == "βš™οΈ Individual APIs":
st.header("βš™οΈ Individual API Testing")
st.markdown("Test individual APIs and endpoints for debugging and development.")
# This would include the original individual API functionality
# For brevity, showing a simplified version
selected_api = st.selectbox("Select API", list(API_DISCOVERY_CONFIG.keys()))
endpoints = st.session_state.db_manager.get_endpoints(selected_api)
if endpoints:
endpoint_names = [f"{ep['endpoint_name']} ({ep['url']})" for ep in endpoints]
selected_endpoint_idx = st.selectbox("Select Endpoint", range(len(endpoints)), format_func=lambda x: endpoint_names[x])
if st.button("πŸ§ͺ Test Endpoint"):
endpoint = endpoints[selected_endpoint_idx]
with st.spinner("Testing endpoint..."):
result = st.session_state.bulk_fetcher._fetch_endpoint_data(endpoint)
if result['status'] == 'success':
st.success("βœ… Endpoint test successful!")
# Show data
data = result['data']
if isinstance(data, list) and len(data) > 0:
df = pd.DataFrame(data[:100]) # Show first 100 records
st.dataframe(df, use_container_width=True)
st.info(f"Showing first 100 of {len(data)} records")
else:
st.json(data)
else:
st.error(f"❌ Endpoint test failed: {result.get('error', 'Unknown error')}")
else:
st.info(f"No endpoints registered for {selected_api}. Run endpoint discovery first!")
# Footer
st.markdown("---")
st.markdown("**🌍 Global API Data Harvester** - Automated collection from 10 international data sources")
st.markdown("βœ… **10 APIs** β€’ **AUTO-DISCOVERY** β€’ **BULK COLLECTION** β€’ **DATABASE STORAGE** β€’ **REAL-TIME MONITORING**")
st.markdown("πŸ‡ΈπŸ‡ͺ Svenska: Skolverket, SCB, Kolada, Riksbanken, Swecris, CSN β€’ 🌍 Globala: Eurostat, WHO, OECD, VΓ€rldsbanken")