import streamlit as st import requests import pandas as pd import json import time from datetime import datetime import sqlite3 import hashlib import gzip from typing import Dict, Any, List import warnings warnings.filterwarnings('ignore') # Optional plotly import with fallback PLOTLY_AVAILABLE = False try: import plotly.express as px PLOTLY_AVAILABLE = True except ImportError: st.warning("π Plotly not available - charts will be disabled") PLOTLY_AVAILABLE = False # Global ML availability flag ML_AVAILABLE = False # AI/ML Imports for enhanced functionality try: from transformers import pipeline from sentence_transformers import SentenceTransformer ML_AVAILABLE = True except ImportError: ML_AVAILABLE = False # Enhanced Page Configuration st.set_page_config( page_title="Simplified Data Harvester", page_icon="π", layout="wide", initial_sidebar_state="collapsed" ) # Enhanced CSS with modern, professional styling st.markdown(""" """, unsafe_allow_html=True) # Database Configuration DB_PATH = "simplified_harvester.db" # API Configuration - Tested and verified working endpoints SIMPLIFIED_API_CONFIG = { "Skolverket": { "name": "πΈπͺ Skolverket", "description": "Swedish National Agency for Education - School Units", "endpoints": [ { "url": "https://api.skolverket.se/planned-educations/v3/compact-school-units?coordinateSystemType=WGS84&page=0&size=20", "headers": {"Accept": "application/vnd.skolverket.plannededucations.api.v3.hal+json"}, "method": "GET", "data_path": "body._embedded.schoolUnits", "key_field": "schoolUnitCode" } ] }, "SCB": { "name": "πΈπͺ Statistics Sweden", "description": "Swedish National Statistics Office - Population Metadata", "endpoints": [ { "url": "https://api.scb.se/OV0104/v1/doris/sv/ssd/START/BE/BE0101/BE0101A/BefolkningNy", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "variables", "key_field": "code" } ] }, "Kolada": { "name": "πΈπͺ Kolada", "description": "Municipal Key Performance Indicators - Municipalities", "endpoints": [ { "url": "https://api.kolada.se/v2/municipality", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "values", "key_field": "id" } ] }, "Eurostat": { "name": "πͺπΊ Eurostat", "description": "European Union Statistics - Population Density", "endpoints": [ { "url": "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/demo_r_d3dens?format=JSON&lang=EN", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "value", "key_field": "index" } ] }, "WHO": { "name": "π WHO", "description": "World Health Organization - Country Dimensions", "endpoints": [ { "url": "https://ghoapi.azureedge.net/api/DIMENSION/COUNTRY/DimensionValues", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "value", "key_field": "Code" } ] }, "OECD": { "name": "π OECD", "description": "Organisation for Economic Co-operation and Development - Economic Data", "endpoints": [ { "url": "https://sdmx.oecd.org/public/rest/data/OECD.SDD.NAD,DSD_NAMAIN1@DF_QNA_EXPENDITURE_GROWTH_OECD/?format=jsondata", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "data.dataSets", "key_field": "series" } ] }, "WorldBank": { "name": "π World Bank", "description": "International Financial Institution - Population Data", "endpoints": [ { "url": "https://api.worldbank.org/v2/country/all/indicator/SP.POP.TOTL?format=json&per_page=50&date=2023", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "[1]", "key_field": "countryiso3code" } ] }, "Riksbanken": { "name": "πΈπͺ Riksbanken", "description": "Swedish Central Bank - Exchange Rates", "endpoints": [ { "url": "https://api.riksbank.se/swea/v1/Observations/Latest/ByGroup/130", "headers": { "Accept": "application/json" }, "method": "GET", "data_path": "", "key_field": "seriesId" } ] }, "Swecris": { "name": "πΈπͺ Swecris", "description": "Swedish Research Council Database - Research Projects", "endpoints": [ { "url": "https://swecris-api.vr.se/v1/projects?size=20", "headers": { "Accept": "application/json", "Authorization": "Bearer VRSwecrisAPI2025-1" }, "method": "GET", "data_path": "content", "key_field": "projectId", "requires_auth": True } ] }, "CSN": { "name": "πΈπͺ CSN", "description": "Swedish Board of Student Finance - Statistics Portal", "endpoints": [ { "url": "https://statistik.csn.se/PXWeb/api/v1/sv/CSNstat", "headers": {"Accept": "application/json"}, "method": "GET", "data_path": "", "key_field": "id", "note": "PX-Web format requires specific POST queries for data" } ] } } def init_database(): """Initialize optimized SQLite database with proper indexing and compression support""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() # Enable WAL mode for better concurrent access cursor.execute('PRAGMA journal_mode=WAL') cursor.execute('PRAGMA synchronous=NORMAL') cursor.execute('PRAGMA cache_size=10000') cursor.execute('PRAGMA temp_store=memory') # Create optimized table structure cursor.execute(''' CREATE TABLE IF NOT EXISTS harvested_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, api_name TEXT NOT NULL, endpoint_url TEXT NOT NULL, data_hash TEXT UNIQUE NOT NULL, raw_data TEXT, compressed_data BLOB, processed_data TEXT, record_count INTEGER DEFAULT 0, data_size_bytes INTEGER DEFAULT 0, compression_ratio REAL, fetch_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, fetch_duration_ms INTEGER DEFAULT 0, status TEXT DEFAULT 'success', session_id TEXT, data_path TEXT, key_field TEXT, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ''') # Create performance indexes cursor.execute('CREATE INDEX IF NOT EXISTS idx_api_name ON harvested_data(api_name)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_timestamp ON harvested_data(fetch_timestamp)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_status ON harvested_data(status)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_session ON harvested_data(session_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_hash ON harvested_data(data_hash)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_api_timestamp ON harvested_data(api_name, fetch_timestamp)') # Create sessions table for better tracking cursor.execute(''' CREATE TABLE IF NOT EXISTS fetch_sessions ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT UNIQUE NOT NULL, session_name TEXT, started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, completed_at TIMESTAMP, total_apis INTEGER DEFAULT 0, successful_fetches INTEGER DEFAULT 0, failed_fetches INTEGER DEFAULT 0, total_records INTEGER DEFAULT 0, total_size_bytes INTEGER DEFAULT 0, status TEXT DEFAULT 'running' ) ''') cursor.execute('CREATE INDEX IF NOT EXISTS idx_session_id ON fetch_sessions(session_id)') cursor.execute('CREATE INDEX IF NOT EXISTS idx_session_started ON fetch_sessions(started_at)') conn.commit() conn.close() class SimplifiedDataHarvester: """Simplified data harvester - one function to fetch from all APIs""" def __init__(self): self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Simplified-Data-Harvester/1.0 (Research & Analysis)' }) self.results = {} self.errors = {} def fetch_all_apis(self, progress_callback=None) -> Dict: """One function to fetch data from all APIs automatically""" session_id = f"simplified_{int(time.time())}" total_apis = len(SIMPLIFIED_API_CONFIG) completed = 0 if progress_callback: progress_callback(f"π Starting data collection from {total_apis} APIs...") for api_name, config in SIMPLIFIED_API_CONFIG.items(): if progress_callback: progress_callback(f"π Fetching from {config['name']}...") try: api_results = self._fetch_api_data(api_name, config, session_id) self.results[api_name] = api_results completed += 1 if progress_callback: progress = (completed / total_apis) * 100 progress_callback(f"β {config['name']} completed ({progress:.1f}%)") time.sleep(0.5) # Respectful delay except Exception as e: self.errors[api_name] = str(e) if progress_callback: progress_callback(f"β {config['name']} failed: {str(e)[:50]}...") completed += 1 if progress_callback: successful = len(self.results) failed = len(self.errors) progress_callback(f"π Collection complete! β {successful} successful, β {failed} failed") return { "results": self.results, "errors": self.errors, "session_id": session_id, "summary": { "total_apis": total_apis, "successful": len(self.results), "failed": len(self.errors), "success_rate": (len(self.results) / total_apis) * 100 } } def _fetch_api_data(self, api_name: str, config: Dict, session_id: str) -> Dict: """Fetch data from all endpoints for a specific API""" api_results = { "api_name": api_name, "endpoints": [], "total_records": 0, "total_size": 0 } for i, endpoint in enumerate(config['endpoints']): try: start_time = time.time() # Make request if endpoint.get('method', 'GET').upper() == 'POST': response = self.session.post( endpoint['url'], headers=endpoint.get('headers', {}), json=endpoint.get('data', {}), timeout=30 ) else: response = self.session.get( endpoint['url'], headers=endpoint.get('headers', {}), timeout=30 ) response.raise_for_status() # Process response data = self._process_response(response) fetch_duration = int((time.time() - start_time) * 1000) # Extract meaningful data using endpoint configuration processed_data = self._extract_api_data(data, api_name, endpoint) record_count = self._count_records(processed_data) data_size = len(response.content) # Save to database with enhanced metadata self._save_data_to_db( api_name, endpoint['url'], data, processed_data, session_id, fetch_duration, record_count, data_size, "success", endpoint ) endpoint_result = { "endpoint_url": endpoint['url'], "status": "success", "records": record_count, "size_bytes": data_size, "duration_ms": fetch_duration, "data_preview": str(processed_data)[:200] + "..." if len(str(processed_data)) > 200 else str(processed_data) } api_results["endpoints"].append(endpoint_result) api_results["total_records"] += record_count api_results["total_size"] += data_size except Exception as e: endpoint_result = { "endpoint_url": endpoint['url'], "status": "error", "error": str(e), "records": 0, "size_bytes": 0, "duration_ms": 0 } api_results["endpoints"].append(endpoint_result) return api_results def _process_response(self, response): """Process API response""" content_type = response.headers.get('content-type', '').lower() if 'json' in content_type: return response.json() else: try: return response.json() # Try JSON first except: return {"raw_content": response.text} def _extract_api_data(self, data: Any, api_name: str, endpoint_config: Dict) -> Any: """Extract meaningful data from API response using data_path configuration""" try: data_path = endpoint_config.get('data_path', '') if not data_path: return data # Handle nested path extraction current_data = data # Handle array index notation like "[1]" if data_path.startswith('[') and data_path.endswith(']'): index = int(data_path[1:-1]) if isinstance(current_data, list) and len(current_data) > index: return current_data[index] else: return current_data # Handle dot notation like "body._embedded.schoolUnits" if '.' in data_path: path_parts = data_path.split('.') for part in path_parts: if isinstance(current_data, dict): current_data = current_data.get(part, current_data) else: break return current_data # Handle simple key extraction if isinstance(current_data, dict): return current_data.get(data_path, current_data) return current_data except Exception as e: st.warning(f"Data extraction failed for {api_name}: {str(e)}") return data def _count_records(self, data: Any) -> int: """Count records in the data""" if isinstance(data, list): return len(data) elif isinstance(data, dict): for key, value in data.items(): if isinstance(value, list) and len(value) > 0: return len(value) return 1 else: return 1 if data else 0 def _save_data_to_db(self, api_name: str, endpoint_url: str, raw_data: Any, processed_data: Any, session_id: str, fetch_duration: int, record_count: int, data_size: int, status: str, endpoint_config: Dict): """Save data to database with compression and enhanced metadata""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() try: # Create data hash for deduplication raw_data_str = json.dumps(raw_data, sort_keys=True, default=str, separators=(',', ':')) processed_data_str = json.dumps(processed_data, sort_keys=True, default=str, separators=(',', ':')) data_hash = hashlib.sha256(raw_data_str.encode()).hexdigest() # Check if data exists cursor.execute('SELECT id FROM harvested_data WHERE data_hash = ?', (data_hash,)) if cursor.fetchone(): return # Skip duplicate # Implement smart compression raw_data_final = None compressed_data = None compression_ratio = 1.0 if data_size > 512: # Compress data larger than 512 bytes try: compressed_data = gzip.compress(raw_data_str.encode('utf-8')) compression_ratio = len(compressed_data) / len(raw_data_str.encode('utf-8')) # Only use compression if it saves significant space if compression_ratio < 0.8: raw_data_final = None else: raw_data_final = raw_data_str compressed_data = None compression_ratio = 1.0 except Exception: raw_data_final = raw_data_str compressed_data = None compression_ratio = 1.0 else: raw_data_final = raw_data_str # Extract endpoint metadata data_path = endpoint_config.get('data_path', '') key_field = endpoint_config.get('key_field', '') # Insert data with enhanced metadata cursor.execute(''' INSERT INTO harvested_data (api_name, endpoint_url, data_hash, raw_data, compressed_data, processed_data, record_count, data_size_bytes, compression_ratio, fetch_duration_ms, status, session_id, data_path, key_field) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( api_name, endpoint_url, data_hash, raw_data_final, compressed_data, processed_data_str, record_count, data_size, compression_ratio, fetch_duration, status, session_id, data_path, key_field )) conn.commit() except Exception as e: # Log error but don't fail the entire operation cursor.execute(''' INSERT INTO harvested_data (api_name, endpoint_url, data_hash, status, error_message, session_id) VALUES (?, ?, ?, ?, ?, ?) ''', (api_name, endpoint_url, f"error_{int(time.time())}", "error", str(e), session_id)) conn.commit() finally: conn.close() def get_database_stats(): """Get enhanced database statistics""" conn = sqlite3.connect(DB_PATH) cursor = conn.cursor() try: # Basic counts cursor.execute('SELECT COUNT(*) FROM harvested_data WHERE status = "success"') total_records = cursor.fetchone()[0] cursor.execute('SELECT COUNT(DISTINCT api_name) FROM harvested_data WHERE status = "success"') active_apis = cursor.fetchone()[0] cursor.execute('SELECT SUM(record_count) FROM harvested_data WHERE status = "success"') total_data_records = cursor.fetchone()[0] or 0 # Enhanced metrics cursor.execute('SELECT SUM(data_size_bytes) FROM harvested_data WHERE status = "success"') total_size_bytes = cursor.fetchone()[0] or 0 cursor.execute('SELECT AVG(compression_ratio) FROM harvested_data WHERE compression_ratio IS NOT NULL') avg_compression = cursor.fetchone()[0] or 1.0 cursor.execute('SELECT COUNT(*) FROM harvested_data WHERE status = "error"') error_count = cursor.fetchone()[0] cursor.execute('SELECT AVG(fetch_duration_ms) FROM harvested_data WHERE status = "success"') avg_fetch_time = cursor.fetchone()[0] or 0 # Latest session info cursor.execute(''' SELECT session_id, COUNT(*) as fetches, MAX(fetch_timestamp) as latest FROM harvested_data WHERE session_id IS NOT NULL GROUP BY session_id ORDER BY latest DESC LIMIT 1 ''') latest_session = cursor.fetchone() return { "total_records": total_records, "active_apis": active_apis, "total_data_records": total_data_records, "total_size_mb": round(total_size_bytes / (1024 * 1024), 2), "avg_compression": round(avg_compression, 3), "error_count": error_count, "avg_fetch_time_ms": round(avg_fetch_time, 1), "latest_session": latest_session[0] if latest_session else None, "latest_session_fetches": latest_session[1] if latest_session else 0 } finally: conn.close() # Initialize database init_database() # Initialize components if 'harvester' not in st.session_state: st.session_state.harvester = SimplifiedDataHarvester() if 'last_results' not in st.session_state: st.session_state.last_results = None # Header st.markdown("""
One-Click Data Collection from All APIs
Automatic data fetching from 10 international sources with smart database storage
π Optimized Data Harvester - Research-verified APIs with smart storage
β 10 Tested APIs β’ ποΈ Data compression β’ π Performance metrics β’ π Enhanced analytics
Featuring proper data extraction, optimized database with indexing, and compression for efficient storage