Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import requests | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from plotly.subplots import make_subplots | |
| import json | |
| import time | |
| from datetime import datetime, timedelta | |
| import asyncio | |
| import aiohttp | |
| from typing import Dict, Any, List, Optional, Set | |
| import sqlite3 | |
| import hashlib | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import xml.etree.ElementTree as ET | |
| import re | |
| import os | |
| import pickle | |
| from urllib.parse import urljoin, urlparse | |
| import threading | |
| from pathlib import Path | |
| import numpy as np | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import warnings | |
| warnings.filterwarnings('ignore') | |
| # Global ML availability flag | |
| ML_AVAILABLE = False | |
| # AI/ML Imports for enhanced functionality | |
| try: | |
| from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification | |
| from sentence_transformers import SentenceTransformer | |
| ML_AVAILABLE = True | |
| except ImportError: | |
| ML_AVAILABLE = False | |
| # Enhanced Page Configuration | |
| st.set_page_config( | |
| page_title="Ultimate Data Harvester", | |
| page_icon="π", | |
| layout="wide", | |
| initial_sidebar_state="collapsed" | |
| ) | |
| # Enhanced CSS with modern, professional styling | |
| st.markdown(""" | |
| <style> | |
| .main > div { | |
| padding-top: 1rem; | |
| } | |
| .stApp { | |
| background: linear-gradient(135deg, #f5f7fa 0%, #c3cfe2 100%); | |
| color: #2c3e50; | |
| } | |
| .metric-card { | |
| background: rgba(255, 255, 255, 0.95); | |
| border-radius: 12px; | |
| padding: 1.5rem; | |
| margin: 0.5rem 0; | |
| border: 1px solid rgba(52, 73, 94, 0.1); | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1); | |
| transition: all 0.3s ease; | |
| } | |
| .metric-card:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 8px 25px rgba(0, 0, 0, 0.15); | |
| } | |
| .api-card { | |
| background: rgba(255, 255, 255, 0.9); | |
| border-radius: 10px; | |
| padding: 1.2rem; | |
| margin: 0.5rem; | |
| border: 1px solid rgba(52, 73, 94, 0.15); | |
| transition: all 0.3s ease; | |
| position: relative; | |
| overflow: hidden; | |
| } | |
| .api-card:hover { | |
| transform: translateY(-3px); | |
| box-shadow: 0 8px 20px rgba(0, 0, 0, 0.12); | |
| border-color: #3498db; | |
| } | |
| .title-container { | |
| text-align: center; | |
| padding: 2rem 0; | |
| background: rgba(255, 255, 255, 0.9); | |
| border-radius: 15px; | |
| margin-bottom: 2rem; | |
| border: 1px solid rgba(52, 73, 94, 0.1); | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.05); | |
| } | |
| .status-indicator { | |
| width: 10px; | |
| height: 10px; | |
| border-radius: 50%; | |
| display: inline-block; | |
| margin-right: 8px; | |
| } | |
| .status-active { background-color: #27ae60; } | |
| .status-discovering { background-color: #f39c12; } | |
| .status-error { background-color: #e74c3c; } | |
| .status-paused { background-color: #95a5a6; } | |
| .ai-panel { | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 10px; | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| color: white; | |
| border: none; | |
| } | |
| .discovery-progress { | |
| background: rgba(255, 255, 255, 0.95); | |
| border-radius: 8px; | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| border: 1px solid rgba(52, 73, 94, 0.1); | |
| box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05); | |
| } | |
| .endpoint-item { | |
| background: rgba(255, 255, 255, 0.8); | |
| border-radius: 6px; | |
| padding: 0.5rem; | |
| margin: 0.3rem 0; | |
| border-left: 3px solid #3498db; | |
| font-size: 0.9rem; | |
| color: #34495e; | |
| } | |
| /* Custom button styling */ | |
| .stButton > button { | |
| background: linear-gradient(135deg, #3498db, #2980b9); | |
| color: white; | |
| border: none; | |
| border-radius: 8px; | |
| padding: 0.5rem 1rem; | |
| font-weight: 500; | |
| transition: all 0.3s ease; | |
| } | |
| .stButton > button:hover { | |
| background: linear-gradient(135deg, #2980b9, #1f4e79); | |
| transform: translateY(-1px); | |
| box-shadow: 0 4px 12px rgba(52, 152, 219, 0.3); | |
| } | |
| /* Tab styling */ | |
| .stTabs [data-baseweb="tab-list"] { | |
| gap: 8px; | |
| } | |
| .stTabs [data-baseweb="tab"] { | |
| background-color: rgba(255, 255, 255, 0.7); | |
| border-radius: 8px; | |
| color: #2c3e50; | |
| font-weight: 500; | |
| } | |
| .stTabs [aria-selected="true"] { | |
| background-color: #3498db; | |
| color: white; | |
| } | |
| /* Metrics styling */ | |
| [data-testid="metric-container"] { | |
| background: rgba(255, 255, 255, 0.9); | |
| border: 1px solid rgba(52, 73, 94, 0.1); | |
| padding: 1rem; | |
| border-radius: 8px; | |
| box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05); | |
| } | |
| /* Sidebar styling */ | |
| .css-1d391kg { | |
| background: linear-gradient(135deg, #ecf0f1 0%, #bdc3c7 100%); | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Enhanced Database Configuration | |
| DB_PATH = "ultimate_data_harvester.db" | |
| SESSION_PATH = "harvester_session.pkl" | |
| ENDPOINTS_CACHE = "discovered_endpoints.json" | |
| # AI Enhancement Classes | |
| class AIDataQualityAssessor: | |
| """AI-powered data quality assessment using transformers""" | |
| def __init__(self): | |
| self.quality_model = None | |
| self.embeddings_model = None | |
| self._initialize_models() | |
| def _initialize_models(self): | |
| """Initialize AI models for quality assessment""" | |
| global ML_AVAILABLE | |
| if ML_AVAILABLE: | |
| try: | |
| # Initialize quality classifier | |
| self.quality_model = pipeline( | |
| "text-classification", | |
| model="distilbert-base-uncased-finetuned-sst-2-english", | |
| return_all_scores=True | |
| ) | |
| # Initialize embeddings model for similarity | |
| self.embeddings_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| except Exception as e: | |
| ML_AVAILABLE = False | |
| self.quality_model = None | |
| self.embeddings_model = None | |
| def assess_data_quality(self, data: Any, api_name: str) -> Dict: | |
| """Comprehensive AI-powered data quality assessment""" | |
| if not ML_AVAILABLE or not self.quality_model: | |
| return self._basic_quality_assessment(data, api_name) | |
| try: | |
| # Convert data to text for analysis | |
| text_data = self._data_to_text(data) | |
| # AI quality scoring | |
| ai_scores = self.quality_model(text_data[:512]) # Limit to 512 chars | |
| quality_score = max([score['score'] for score in ai_scores[0]]) | |
| # Basic quality metrics | |
| completeness = self._check_completeness(data) | |
| consistency = self._check_consistency(data, api_name) | |
| structure_quality = self._assess_structure(data) | |
| # Anomaly detection | |
| anomalies = self._detect_anomalies(data) | |
| return { | |
| "ai_quality_score": round(quality_score, 3), | |
| "completeness_score": completeness, | |
| "consistency_score": consistency, | |
| "structure_score": structure_quality, | |
| "anomaly_count": len(anomalies), | |
| "anomalies": anomalies[:5], # Top 5 anomalies | |
| "overall_grade": self._calculate_overall_grade( | |
| quality_score, completeness, consistency, structure_quality | |
| ), | |
| "recommendations": self._generate_quality_recommendations( | |
| quality_score, completeness, consistency, anomalies | |
| ) | |
| } | |
| except Exception as e: | |
| return self._basic_quality_assessment(data, api_name) | |
| def _data_to_text(self, data: Any) -> str: | |
| """Convert any data format to text for AI analysis""" | |
| if isinstance(data, str): | |
| return data | |
| elif isinstance(data, dict): | |
| return json.dumps(data, ensure_ascii=False)[:1000] | |
| elif isinstance(data, list): | |
| return str(data)[:1000] | |
| else: | |
| return str(data)[:1000] | |
| def _check_completeness(self, data: Any) -> float: | |
| """Check data completeness""" | |
| if isinstance(data, dict): | |
| total_fields = len(data) | |
| complete_fields = sum(1 for v in data.values() if v is not None and v != "") | |
| return complete_fields / total_fields if total_fields > 0 else 0.0 | |
| elif isinstance(data, list): | |
| if not data: | |
| return 0.0 | |
| if isinstance(data[0], dict): | |
| return np.mean([self._check_completeness(item) for item in data]) | |
| return 1.0 | |
| return 1.0 if data is not None else 0.0 | |
| def _check_consistency(self, data: Any, api_name: str) -> float: | |
| """Check data consistency based on API expectations""" | |
| consistency_score = 1.0 | |
| if isinstance(data, list): | |
| if len(data) > 1: | |
| # Check if all items have similar structure | |
| first_item = data[0] if data else {} | |
| if isinstance(first_item, dict): | |
| first_keys = set(first_item.keys()) | |
| consistency_scores = [] | |
| for item in data[1:6]: # Check first 5 items | |
| if isinstance(item, dict): | |
| item_keys = set(item.keys()) | |
| similarity = len(first_keys & item_keys) / len(first_keys | item_keys) | |
| consistency_scores.append(similarity) | |
| if consistency_scores: | |
| consistency_score = np.mean(consistency_scores) | |
| return consistency_score | |
| def _assess_structure(self, data: Any) -> float: | |
| """Assess data structure quality""" | |
| if isinstance(data, dict): | |
| # Check for nested structure, proper keys, etc. | |
| score = 0.8 # Base score for dictionary | |
| if len(data) > 0: | |
| score += 0.1 | |
| if any(isinstance(v, (dict, list)) for v in data.values()): | |
| score += 0.1 # Bonus for nested structure | |
| return min(score, 1.0) | |
| elif isinstance(data, list): | |
| return 0.9 if data else 0.5 | |
| else: | |
| return 0.6 # Basic data | |
| def _detect_anomalies(self, data: Any) -> List[str]: | |
| """Detect data anomalies""" | |
| anomalies = [] | |
| if isinstance(data, dict): | |
| # Check for suspicious values | |
| for key, value in data.items(): | |
| if value is None: | |
| anomalies.append(f"Null value in field: {key}") | |
| elif isinstance(value, str) and len(value) > 1000: | |
| anomalies.append(f"Unusually long string in field: {key}") | |
| elif isinstance(value, (int, float)) and abs(value) > 1e10: | |
| anomalies.append(f"Extreme numeric value in field: {key}") | |
| elif isinstance(data, list): | |
| if len(data) > 10000: | |
| anomalies.append(f"Very large dataset: {len(data)} items") | |
| # Check for inconsistent types | |
| if data: | |
| first_type = type(data[0]) | |
| if not all(isinstance(item, first_type) for item in data[:10]): | |
| anomalies.append("Inconsistent data types in list") | |
| return anomalies | |
| def _calculate_overall_grade(self, ai_score: float, completeness: float, | |
| consistency: float, structure: float) -> str: | |
| """Calculate overall data quality grade""" | |
| overall_score = (ai_score + completeness + consistency + structure) / 4 | |
| if overall_score >= 0.9: | |
| return "A+ (Excellent)" | |
| elif overall_score >= 0.8: | |
| return "A (Very Good)" | |
| elif overall_score >= 0.7: | |
| return "B (Good)" | |
| elif overall_score >= 0.6: | |
| return "C (Fair)" | |
| else: | |
| return "D (Poor)" | |
| def _generate_quality_recommendations(self, ai_score: float, completeness: float, | |
| consistency: float, anomalies: List[str]) -> List[str]: | |
| """Generate AI-powered recommendations for data quality improvement""" | |
| recommendations = [] | |
| if ai_score < 0.7: | |
| recommendations.append("π Consider data validation and cleaning") | |
| if completeness < 0.8: | |
| recommendations.append("π Investigate missing data fields") | |
| if consistency < 0.8: | |
| recommendations.append("βοΈ Standardize data format across records") | |
| if len(anomalies) > 3: | |
| recommendations.append("π¨ Multiple anomalies detected - requires investigation") | |
| if not recommendations: | |
| recommendations.append("β Data quality is good - no immediate action needed") | |
| return recommendations | |
| def _basic_quality_assessment(self, data: Any, api_name: str) -> Dict: | |
| """Basic quality assessment without AI""" | |
| return { | |
| "ai_quality_score": 0.0, | |
| "completeness_score": self._check_completeness(data), | |
| "consistency_score": 0.8, # Default | |
| "structure_score": self._assess_structure(data), | |
| "anomaly_count": 0, | |
| "anomalies": [], | |
| "overall_grade": "C (Basic Assessment)", | |
| "recommendations": ["Install ML libraries for advanced AI assessment"] | |
| } | |
| class SemanticDataAnalyzer: | |
| """Semantic analysis and similarity detection""" | |
| def __init__(self): | |
| self.embeddings_model = None | |
| self.stored_embeddings = {} | |
| self._initialize_model() | |
| def _initialize_model(self): | |
| """Initialize sentence transformer model""" | |
| global ML_AVAILABLE | |
| if ML_AVAILABLE: | |
| try: | |
| self.embeddings_model = SentenceTransformer('all-MiniLM-L6-v2') | |
| except Exception as e: | |
| ML_AVAILABLE = False | |
| self.embeddings_model = None | |
| def find_similar_datasets(self, new_data: Any, api_name: str, threshold: float = 0.85) -> List[Dict]: | |
| """Find semantically similar datasets""" | |
| if not self.embeddings_model: | |
| return [] | |
| try: | |
| # Convert data to text and create embedding | |
| text_data = self._data_to_text(new_data) | |
| new_embedding = self.embeddings_model.encode([text_data]) | |
| # Compare with stored embeddings | |
| similar_datasets = [] | |
| for stored_key, stored_embedding in self.stored_embeddings.items(): | |
| similarity = cosine_similarity(new_embedding, [stored_embedding])[0][0] | |
| if similarity > threshold: | |
| similar_datasets.append({ | |
| "dataset": stored_key, | |
| "similarity": float(similarity), | |
| "api_name": stored_key.split("_")[0] if "_" in stored_key else "unknown" | |
| }) | |
| # Store new embedding | |
| embedding_key = f"{api_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| self.stored_embeddings[embedding_key] = new_embedding[0] | |
| return sorted(similar_datasets, key=lambda x: x['similarity'], reverse=True) | |
| except Exception as e: | |
| return [] | |
| def _data_to_text(self, data: Any) -> str: | |
| """Convert data to text for embedding""" | |
| if isinstance(data, str): | |
| return data[:500] | |
| elif isinstance(data, dict): | |
| # Extract key information | |
| text_parts = [] | |
| for key, value in list(data.items())[:10]: # First 10 keys | |
| text_parts.append(f"{key}: {str(value)[:100]}") | |
| return " | ".join(text_parts) | |
| elif isinstance(data, list) and data: | |
| return str(data[0])[:500] | |
| else: | |
| return str(data)[:500] | |
| class APIHealthMonitor: | |
| """Intelligent API health monitoring with anomaly detection""" | |
| def __init__(self): | |
| self.anomaly_detector = IsolationForest(contamination=0.1, random_state=42) | |
| self.health_history = {} | |
| self.is_trained = False | |
| def monitor_api_health(self, api_name: str, response_time: float, | |
| success_rate: float, data_size: int) -> Dict: | |
| """Comprehensive API health assessment""" | |
| current_metrics = { | |
| "response_time": response_time, | |
| "success_rate": success_rate, | |
| "data_size": data_size, | |
| "timestamp": time.time() | |
| } | |
| # Store health history | |
| if api_name not in self.health_history: | |
| self.health_history[api_name] = [] | |
| self.health_history[api_name].append(current_metrics) | |
| # Keep only last 50 measurements | |
| if len(self.health_history[api_name]) > 50: | |
| self.health_history[api_name] = self.health_history[api_name][-50:] | |
| # Calculate health score | |
| health_score = self._calculate_health_score(current_metrics) | |
| # Detect anomalies if we have enough data | |
| anomaly_score = 0.0 | |
| if len(self.health_history[api_name]) >= 10: | |
| anomaly_score = self._detect_performance_anomaly(api_name, current_metrics) | |
| # Generate recommendations | |
| recommendations = self._generate_health_recommendations( | |
| current_metrics, health_score, anomaly_score | |
| ) | |
| return { | |
| "health_score": health_score, | |
| "status": self._get_health_status(health_score), | |
| "anomaly_score": anomaly_score, | |
| "is_anomaly": anomaly_score < -0.5, | |
| "recommendations": recommendations, | |
| "trend": self._calculate_trend(api_name), | |
| "metrics": current_metrics | |
| } | |
| def _calculate_health_score(self, metrics: Dict) -> float: | |
| """Calculate overall health score (0-1)""" | |
| # Response time score (lower is better) | |
| time_score = max(0, 1 - (metrics["response_time"] / 10000)) # 10s max | |
| # Success rate score | |
| success_score = metrics["success_rate"] | |
| # Data size score (normalized) | |
| size_score = min(1.0, metrics["data_size"] / 1000000) # 1MB reference | |
| # Weighted average | |
| health_score = (time_score * 0.4 + success_score * 0.5 + size_score * 0.1) | |
| return max(0, min(1, health_score)) | |
| def _detect_performance_anomaly(self, api_name: str, current_metrics: Dict) -> float: | |
| """Detect performance anomalies using isolation forest""" | |
| try: | |
| history = self.health_history[api_name] | |
| # Prepare training data | |
| training_data = [] | |
| for h in history[:-1]: # Exclude current measurement | |
| training_data.append([ | |
| h["response_time"], | |
| h["success_rate"], | |
| h["data_size"] | |
| ]) | |
| if len(training_data) >= 5: | |
| # Train anomaly detector | |
| self.anomaly_detector.fit(training_data) | |
| # Check current metrics | |
| current_data = [[ | |
| current_metrics["response_time"], | |
| current_metrics["success_rate"], | |
| current_metrics["data_size"] | |
| ]] | |
| anomaly_score = self.anomaly_detector.decision_function(current_data)[0] | |
| return float(anomaly_score) | |
| except Exception as e: | |
| pass # Silent fail for anomaly detection | |
| return 0.0 | |
| def _get_health_status(self, health_score: float) -> str: | |
| """Get health status based on score""" | |
| if health_score >= 0.9: | |
| return "π’ Excellent" | |
| elif health_score >= 0.7: | |
| return "π‘ Good" | |
| elif health_score >= 0.5: | |
| return "π Fair" | |
| else: | |
| return "π΄ Poor" | |
| def _generate_health_recommendations(self, metrics: Dict, health_score: float, | |
| anomaly_score: float) -> List[str]: | |
| """Generate health improvement recommendations""" | |
| recommendations = [] | |
| if metrics["response_time"] > 5000: | |
| recommendations.append("β±οΈ High response time detected - consider caching") | |
| if metrics["success_rate"] < 0.9: | |
| recommendations.append("β Low success rate - check API status") | |
| if anomaly_score < -0.5: | |
| recommendations.append("π¨ Performance anomaly detected - investigate") | |
| if health_score < 0.6: | |
| recommendations.append("β οΈ Overall poor health - consider alternatives") | |
| if not recommendations: | |
| recommendations.append("β API performing well") | |
| return recommendations | |
| def _calculate_trend(self, api_name: str) -> str: | |
| """Calculate performance trend""" | |
| if api_name not in self.health_history or len(self.health_history[api_name]) < 5: | |
| return "π Insufficient data" | |
| recent_scores = [] | |
| for metrics in self.health_history[api_name][-5:]: | |
| score = self._calculate_health_score(metrics) | |
| recent_scores.append(score) | |
| if len(recent_scores) >= 3: | |
| trend = np.polyfit(range(len(recent_scores)), recent_scores, 1)[0] | |
| if trend > 0.02: | |
| return "π Improving" | |
| elif trend < -0.02: | |
| return "π Declining" | |
| else: | |
| return "β‘οΈ Stable" | |
| return "π Monitoring" | |
| # Initialize AI components | |
| if ML_AVAILABLE: | |
| ai_quality_assessor = AIDataQualityAssessor() | |
| semantic_analyzer = SemanticDataAnalyzer() | |
| health_monitor = APIHealthMonitor() | |
| else: | |
| ai_quality_assessor = None | |
| semantic_analyzer = None | |
| health_monitor = None | |
| # Simplified API Configuration - Real working endpoints | |
| SIMPLIFIED_API_CONFIG = { | |
| "Skolverket": { | |
| "name": "πΈπͺ Skolverket", | |
| "description": "Swedish National Agency for Education", | |
| "endpoints": [ | |
| { | |
| "url": "https://api.skolverket.se/planned-educations/v3", | |
| "headers": {"Accept": "application/vnd.skolverket.plannededucations.api.v3.hal+json"}, | |
| "method": "GET" | |
| }, | |
| { | |
| "url": "https://api.skolverket.se/skolenhetsregister/v2/skolenhet", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "SCB": { | |
| "name": "πΈπͺ Statistics Sweden", | |
| "description": "Swedish National Statistics Office", | |
| "endpoints": [ | |
| { | |
| "url": "https://api.scb.se/OV0104/v1/doris/sv/ssd/START/BE/BE0101/BE0101A/BefolkningNy", | |
| "headers": {"Content-Type": "application/json"}, | |
| "method": "POST", | |
| "data": { | |
| "query": [ | |
| {"code": "Region", "selection": {"filter": "item", "values": ["00"]}}, | |
| {"code": "Civilstand", "selection": {"filter": "item", "values": ["TOT"]}}, | |
| {"code": "Alder", "selection": {"filter": "item", "values": ["tot"]}}, | |
| {"code": "Kon", "selection": {"filter": "item", "values": ["1", "2"]}}, | |
| {"code": "ContentsCode", "selection": {"filter": "item", "values": ["BE0101N1"]}}, | |
| {"code": "Tid", "selection": {"filter": "item", "values": ["2023"]}} | |
| ], | |
| "response": {"format": "json"} | |
| } | |
| } | |
| ], | |
| "rate_limit": {"requests": 10, "per_seconds": 10} | |
| }, | |
| "Kolada": { | |
| "name": "πΈπͺ Kolada", | |
| "description": "Municipal Key Performance Indicators", | |
| "endpoints": [ | |
| { | |
| "url": "https://api.kolada.se/v2/municipality", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| }, | |
| { | |
| "url": "https://api.kolada.se/v2/kpi", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "Eurostat": { | |
| "name": "πͺπΊ Eurostat", | |
| "description": "European Union Statistics", | |
| "endpoints": [ | |
| { | |
| "url": "https://ec.europa.eu/eurostat/api/dissemination/statistics/1.0/data/demo_pjan?format=JSON&lang=en&geo=EU27_2020&age=TOTAL&sex=T&time=2023", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "WHO": { | |
| "name": "π WHO", | |
| "description": "World Health Organization", | |
| "endpoints": [ | |
| { | |
| "url": "https://ghoapi.azureedge.net/api/WHOSIS_000001", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| }, | |
| { | |
| "url": "https://ghoapi.azureedge.net/api/Dimension", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "OECD": { | |
| "name": "π OECD", | |
| "description": "Organisation for Economic Co-operation and Development", | |
| "endpoints": [ | |
| { | |
| "url": "https://sdmx.oecd.org/public/rest/data/OECD.SDD.NAD,DSD_NAMAIN1@DF_QNA,1.0/AUS.B1GQ.C.Q?format=jsondata", | |
| "headers": {"Accept": "application/vnd.sdmx.data+json;version=1.0.0"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "WorldBank": { | |
| "name": "π World Bank", | |
| "description": "International Financial Institution", | |
| "endpoints": [ | |
| { | |
| "url": "https://api.worldbank.org/v2/country?format=json&per_page=50", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| }, | |
| { | |
| "url": "https://api.worldbank.org/v2/indicator/SP.POP.TOTL?format=json&date=2023&per_page=50", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "Riksbanken": { | |
| "name": "πΈπͺ Riksbanken", | |
| "description": "Swedish Central Bank", | |
| "endpoints": [ | |
| { | |
| "url": "https://api.riksbank.se/swea/v1/Observations/SEKEURPMI/2023-01-01/2023-12-31", | |
| "headers": {"Accept": "application/json"}, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": {"requests": 5, "per_seconds": 60} | |
| }, | |
| "Swecris": { | |
| "name": "πΈπͺ Swecris", | |
| "description": "Swedish Research Council Database", | |
| "endpoints": [ | |
| { | |
| "url": "https://swecris-api.vr.se/v1/projects?size=50", | |
| "headers": { | |
| "Accept": "application/json", | |
| "Authorization": "Bearer VRSwecrisAPI2025-1" | |
| }, | |
| "method": "GET" | |
| } | |
| ], | |
| "rate_limit": None | |
| }, | |
| "CSN": { | |
| "name": "πΈπͺ CSN", | |
| "description": "Swedish Board of Student Finance", | |
| "endpoints": [ | |
| { | |
| "url": "https://statistik.csn.se/PXWeb/api/v1/sv/CSNstat/StudiebidragGymnasieskola/SS0101B1.px", | |
| "headers": {"Content-Type": "application/json"}, | |
| "method": "POST", | |
| "data": { | |
| "query": [ | |
| {"code": "Region", "selection": {"filter": "item", "values": ["00"]}}, | |
| {"code": "ContentsCode", "selection": {"filter": "item", "values": ["SS0101B1"]}}, | |
| {"code": "Tid", "selection": {"filter": "item", "values": ["2023"]}} | |
| ], | |
| "response": {"format": "json"} | |
| } | |
| } | |
| ], | |
| "rate_limit": None | |
| } | |
| } | |
| def init_enhanced_database(): | |
| """Initialize optimized SQLite database with comprehensive schema and performance enhancements""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Enable WAL mode for better concurrency and performance | |
| cursor.execute('PRAGMA journal_mode=WAL') | |
| cursor.execute('PRAGMA synchronous=NORMAL') | |
| cursor.execute('PRAGMA cache_size=10000') | |
| cursor.execute('PRAGMA temp_store=MEMORY') | |
| cursor.execute('PRAGMA mmap_size=268435456') # 256MB | |
| # Enhanced endpoints table with better indexing | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS discovered_endpoints ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| endpoint_path TEXT NOT NULL, | |
| full_url TEXT NOT NULL, | |
| discovery_method TEXT, | |
| depth_level INTEGER DEFAULT 0, | |
| parent_endpoint TEXT, | |
| endpoint_type TEXT, | |
| last_checked TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| is_active BOOLEAN DEFAULT 1, | |
| response_format TEXT, | |
| parameters_schema TEXT, | |
| estimated_records INTEGER DEFAULT 0, | |
| last_fetch_status TEXT, | |
| creation_date DATE DEFAULT (date('now')), | |
| UNIQUE(api_name, endpoint_path) | |
| ) | |
| ''') | |
| # Create indexes for endpoints table | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_endpoints_api_name ON discovered_endpoints(api_name)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_endpoints_active ON discovered_endpoints(is_active)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_endpoints_last_checked ON discovered_endpoints(last_checked)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_endpoints_depth ON discovered_endpoints(depth_level)') | |
| # Optimized data storage table with compression and partitioning support | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS harvested_data ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| endpoint_path TEXT NOT NULL, | |
| data_hash TEXT UNIQUE NOT NULL, | |
| raw_data_compressed BLOB, | |
| processed_data_compressed BLOB, | |
| raw_data_size INTEGER, | |
| processed_data_size INTEGER, | |
| record_count INTEGER DEFAULT 0, | |
| data_size_bytes INTEGER DEFAULT 0, | |
| fetch_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| fetch_duration_ms INTEGER DEFAULT 0, | |
| status TEXT DEFAULT 'success', | |
| error_message TEXT, | |
| session_id TEXT, | |
| quality_score REAL DEFAULT 0.0, | |
| health_score REAL DEFAULT 0.0, | |
| similar_datasets TEXT DEFAULT '[]', | |
| data_format TEXT, | |
| api_version TEXT, | |
| fetch_date DATE DEFAULT (date('now')), | |
| last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| access_count INTEGER DEFAULT 1, | |
| CHECK (status IN ('success', 'error', 'partial', 'timeout')) | |
| ) | |
| ''') | |
| # Create comprehensive indexes for data table | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_api_name ON harvested_data(api_name)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_fetch_date ON harvested_data(fetch_date)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_status ON harvested_data(status)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_session ON harvested_data(session_id)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_hash ON harvested_data(data_hash)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_quality ON harvested_data(quality_score)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_records ON harvested_data(record_count)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_data_size ON harvested_data(data_size_bytes)') | |
| # Enhanced session management table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS harvest_sessions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| session_id TEXT UNIQUE NOT NULL, | |
| session_name TEXT, | |
| started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| completed_at TIMESTAMP, | |
| total_endpoints INTEGER DEFAULT 0, | |
| processed_endpoints INTEGER DEFAULT 0, | |
| successful_fetches INTEGER DEFAULT 0, | |
| failed_fetches INTEGER DEFAULT 0, | |
| total_records INTEGER DEFAULT 0, | |
| total_data_size INTEGER DEFAULT 0, | |
| session_status TEXT DEFAULT 'active', | |
| current_api TEXT, | |
| current_endpoint TEXT, | |
| session_config TEXT, | |
| error_count INTEGER DEFAULT 0, | |
| avg_fetch_time REAL DEFAULT 0.0, | |
| session_type TEXT DEFAULT 'manual', | |
| priority INTEGER DEFAULT 1, | |
| CHECK (session_status IN ('active', 'paused', 'completed', 'failed', 'cancelled')) | |
| ) | |
| ''') | |
| # Create indexes for sessions table | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_status ON harvest_sessions(session_status)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_started ON harvest_sessions(started_at)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_activity ON harvest_sessions(last_activity)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_sessions_priority ON harvest_sessions(priority)') | |
| # Enhanced discovery progress table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS discovery_progress ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| discovery_session TEXT, | |
| started_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| completed_at TIMESTAMP, | |
| endpoints_found INTEGER DEFAULT 0, | |
| depth_reached INTEGER DEFAULT 0, | |
| discovery_status TEXT DEFAULT 'running', | |
| discovery_config TEXT, | |
| errors_encountered INTEGER DEFAULT 0, | |
| success_rate REAL DEFAULT 0.0, | |
| estimated_total INTEGER DEFAULT 0, | |
| CHECK (discovery_status IN ('running', 'completed', 'failed', 'paused')) | |
| ) | |
| ''') | |
| # Create indexes for discovery table | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_discovery_api ON discovery_progress(api_name)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_discovery_status ON discovery_progress(discovery_status)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_discovery_started ON discovery_progress(started_at)') | |
| # Data quality and metadata table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS data_quality_metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| data_id INTEGER REFERENCES harvested_data(id), | |
| api_name TEXT NOT NULL, | |
| completeness_score REAL DEFAULT 0.0, | |
| consistency_score REAL DEFAULT 0.0, | |
| accuracy_score REAL DEFAULT 0.0, | |
| timeliness_score REAL DEFAULT 0.0, | |
| overall_quality REAL DEFAULT 0.0, | |
| anomalies_detected INTEGER DEFAULT 0, | |
| anomaly_details TEXT, | |
| validation_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| validation_rules_version TEXT DEFAULT '1.0' | |
| ) | |
| ''') | |
| # Create quality metrics indexes | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_quality_api ON data_quality_metrics(api_name)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_quality_overall ON data_quality_metrics(overall_quality)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_quality_timestamp ON data_quality_metrics(validation_timestamp)') | |
| # API performance tracking table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS api_performance_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| api_name TEXT NOT NULL, | |
| endpoint_path TEXT NOT NULL, | |
| response_time_ms INTEGER, | |
| response_size_bytes INTEGER, | |
| http_status_code INTEGER, | |
| success BOOLEAN, | |
| error_type TEXT, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| date_only DATE DEFAULT (date('now')) | |
| ) | |
| ''') | |
| # Create performance indexes | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_perf_api_date ON api_performance_log(api_name, date_only)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_perf_success ON api_performance_log(success)') | |
| cursor.execute('CREATE INDEX IF NOT EXISTS idx_perf_response_time ON api_performance_log(response_time_ms)') | |
| # Data archival management table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS data_archive_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| original_data_id INTEGER, | |
| archive_path TEXT, | |
| archive_format TEXT DEFAULT 'gzip', | |
| archived_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| original_size INTEGER, | |
| compressed_size INTEGER, | |
| compression_ratio REAL, | |
| checksum TEXT, | |
| retention_date DATE, | |
| archive_status TEXT DEFAULT 'active' | |
| ) | |
| ''') | |
| # Create views for common queries | |
| cursor.execute(''' | |
| CREATE VIEW IF NOT EXISTS v_api_summary AS | |
| SELECT | |
| api_name, | |
| COUNT(*) as total_fetches, | |
| COUNT(CASE WHEN status = 'success' THEN 1 END) as successful_fetches, | |
| SUM(record_count) as total_records, | |
| SUM(data_size_bytes) as total_data_size, | |
| AVG(fetch_duration_ms) as avg_fetch_time, | |
| AVG(quality_score) as avg_quality_score, | |
| MAX(fetch_timestamp) as last_fetch, | |
| MIN(fetch_timestamp) as first_fetch | |
| FROM harvested_data | |
| GROUP BY api_name | |
| ''') | |
| cursor.execute(''' | |
| CREATE VIEW IF NOT EXISTS v_session_summary AS | |
| SELECT | |
| session_id, | |
| session_name, | |
| session_status, | |
| started_at, | |
| completed_at, | |
| total_endpoints, | |
| processed_endpoints, | |
| successful_fetches, | |
| failed_fetches, | |
| total_records, | |
| total_data_size, | |
| CASE | |
| WHEN total_endpoints > 0 THEN | |
| ROUND((processed_endpoints * 100.0) / total_endpoints, 2) | |
| ELSE 0 | |
| END as completion_percentage, | |
| CASE | |
| WHEN processed_endpoints > 0 THEN | |
| ROUND((successful_fetches * 100.0) / processed_endpoints, 2) | |
| ELSE 0 | |
| END as success_percentage | |
| FROM harvest_sessions | |
| ''') | |
| # Enable automatic statistics collection | |
| cursor.execute('PRAGMA optimize') | |
| conn.commit() | |
| conn.close() | |
| # Database optimization and maintenance functions | |
| def optimize_database(): | |
| """Perform database optimization and maintenance""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Update statistics | |
| cursor.execute('ANALYZE') | |
| # Vacuum if necessary (reclaim space) | |
| cursor.execute('PRAGMA auto_vacuum=INCREMENTAL') | |
| cursor.execute('PRAGMA incremental_vacuum') | |
| # Optimize query planner | |
| cursor.execute('PRAGMA optimize') | |
| conn.commit() | |
| return True | |
| except Exception as e: | |
| return False | |
| finally: | |
| conn.close() | |
| def get_database_stats(): | |
| """Get comprehensive database statistics""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| stats = {} | |
| # Basic table counts | |
| tables = ['discovered_endpoints', 'harvested_data', 'harvest_sessions', 'discovery_progress'] | |
| for table in tables: | |
| cursor.execute(f'SELECT COUNT(*) FROM {table}') | |
| stats[f'{table}_count'] = cursor.fetchone()[0] | |
| # Database size | |
| cursor.execute('PRAGMA page_count') | |
| page_count = cursor.fetchone()[0] | |
| cursor.execute('PRAGMA page_size') | |
| page_size = cursor.fetchone()[0] | |
| stats['database_size_mb'] = round((page_count * page_size) / (1024 * 1024), 2) | |
| # Data quality stats | |
| cursor.execute('SELECT AVG(quality_score), AVG(health_score) FROM harvested_data WHERE status = "success"') | |
| quality_stats = cursor.fetchone() | |
| stats['avg_quality_score'] = round(quality_stats[0] or 0, 3) | |
| stats['avg_health_score'] = round(quality_stats[1] or 0, 3) | |
| # Recent activity | |
| cursor.execute(''' | |
| SELECT COUNT(*) FROM harvested_data | |
| WHERE fetch_timestamp > datetime('now', '-24 hours') | |
| ''') | |
| stats['recent_fetches_24h'] = cursor.fetchone()[0] | |
| return stats | |
| finally: | |
| conn.close() | |
| def compress_old_data(days_old=30): | |
| """Compress data older than specified days""" | |
| import gzip | |
| import json | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Find old data to compress | |
| cursor.execute(''' | |
| SELECT id, raw_data, processed_data | |
| FROM harvested_data | |
| WHERE fetch_timestamp < datetime('now', '-{} days') | |
| AND raw_data_compressed IS NULL | |
| '''.format(days_old)) | |
| old_records = cursor.fetchall() | |
| compressed_count = 0 | |
| for record_id, raw_data, processed_data in old_records: | |
| try: | |
| # Compress raw data | |
| raw_compressed = None | |
| if raw_data: | |
| raw_compressed = gzip.compress(raw_data.encode('utf-8')) | |
| # Compress processed data | |
| processed_compressed = None | |
| if processed_data: | |
| processed_compressed = gzip.compress(processed_data.encode('utf-8')) | |
| # Update record with compressed data | |
| cursor.execute(''' | |
| UPDATE harvested_data | |
| SET raw_data_compressed = ?, | |
| processed_data_compressed = ?, | |
| raw_data = NULL, | |
| processed_data = NULL, | |
| raw_data_size = ?, | |
| processed_data_size = ? | |
| WHERE id = ? | |
| ''', ( | |
| raw_compressed, | |
| processed_compressed, | |
| len(raw_data) if raw_data else 0, | |
| len(processed_data) if processed_data else 0, | |
| record_id | |
| )) | |
| compressed_count += 1 | |
| except Exception as e: | |
| continue # Skip problematic records | |
| conn.commit() | |
| return compressed_count | |
| finally: | |
| conn.close() | |
| def backup_database(backup_path=None): | |
| """Create a backup of the database""" | |
| import shutil | |
| from datetime import datetime | |
| if backup_path is None: | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| backup_path = f"backup_harvester_{timestamp}.db" | |
| try: | |
| shutil.copy2(DB_PATH, backup_path) | |
| return backup_path | |
| except Exception as e: | |
| return None | |
| class SimplifiedDataHarvester: | |
| """Simplified data harvester - one function to fetch from all APIs""" | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Simplified-Data-Harvester/1.0 (Research & Analysis)' | |
| }) | |
| self.results = {} | |
| self.errors = {} | |
| def fetch_all_apis(self, progress_callback=None) -> Dict: | |
| """One function to fetch data from all APIs automatically""" | |
| session_id = f"simplified_{int(time.time())}" | |
| total_apis = len(SIMPLIFIED_API_CONFIG) | |
| completed = 0 | |
| if progress_callback: | |
| progress_callback(f"π Starting comprehensive data collection from {total_apis} APIs...") | |
| for api_name, config in SIMPLIFIED_API_CONFIG.items(): | |
| if progress_callback: | |
| progress_callback(f"π Fetching from {config['name']}...") | |
| try: | |
| api_results = self._fetch_api_data(api_name, config, session_id) | |
| self.results[api_name] = api_results | |
| completed += 1 | |
| if progress_callback: | |
| progress = (completed / total_apis) * 100 | |
| progress_callback(f"β {config['name']} completed ({progress:.1f}%)") | |
| # Apply rate limiting if specified | |
| if config.get('rate_limit'): | |
| rate_limit = config['rate_limit'] | |
| sleep_time = rate_limit['per_seconds'] / rate_limit['requests'] | |
| time.sleep(sleep_time) | |
| else: | |
| time.sleep(0.5) # Default delay between APIs | |
| except Exception as e: | |
| self.errors[api_name] = str(e) | |
| if progress_callback: | |
| progress_callback(f"β {config['name']} failed: {str(e)[:50]}...") | |
| completed += 1 | |
| if progress_callback: | |
| successful = len(self.results) | |
| failed = len(self.errors) | |
| progress_callback(f"π Collection complete! β {successful} successful, β {failed} failed") | |
| return { | |
| "results": self.results, | |
| "errors": self.errors, | |
| "session_id": session_id, | |
| "summary": { | |
| "total_apis": total_apis, | |
| "successful": len(self.results), | |
| "failed": len(self.errors), | |
| "success_rate": (len(self.results) / total_apis) * 100 | |
| } | |
| } | |
| def _fetch_api_data(self, api_name: str, config: Dict, session_id: str) -> Dict: | |
| """Fetch data from all endpoints for a specific API""" | |
| api_results = { | |
| "api_name": api_name, | |
| "endpoints": [], | |
| "total_records": 0, | |
| "total_size": 0 | |
| } | |
| for i, endpoint in enumerate(config['endpoints']): | |
| try: | |
| start_time = time.time() | |
| # Make request | |
| if endpoint.get('method', 'GET').upper() == 'POST': | |
| response = self.session.post( | |
| endpoint['url'], | |
| headers=endpoint.get('headers', {}), | |
| json=endpoint.get('data', {}), | |
| timeout=30 | |
| ) | |
| else: | |
| response = self.session.get( | |
| endpoint['url'], | |
| headers=endpoint.get('headers', {}), | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| # Process response | |
| data = self._process_response(response, api_name) | |
| fetch_duration = int((time.time() - start_time) * 1000) | |
| # Extract meaningful data | |
| processed_data = self._extract_api_data(data, api_name) | |
| record_count = self._count_records(processed_data) | |
| data_size = len(response.content) | |
| # Save to database | |
| endpoint_path = f"endpoint_{i+1}" | |
| self._save_data_to_db( | |
| api_name, endpoint_path, processed_data, session_id, | |
| fetch_duration, record_count, data_size, "success" | |
| ) | |
| endpoint_result = { | |
| "endpoint_url": endpoint['url'], | |
| "status": "success", | |
| "records": record_count, | |
| "size_bytes": data_size, | |
| "duration_ms": fetch_duration, | |
| "data_preview": self._create_data_preview(processed_data) | |
| } | |
| api_results["endpoints"].append(endpoint_result) | |
| api_results["total_records"] += record_count | |
| api_results["total_size"] += data_size | |
| except Exception as e: | |
| endpoint_result = { | |
| "endpoint_url": endpoint['url'], | |
| "status": "error", | |
| "error": str(e), | |
| "records": 0, | |
| "size_bytes": 0, | |
| "duration_ms": 0 | |
| } | |
| api_results["endpoints"].append(endpoint_result) | |
| return api_results | |
| def _process_response(self, response, api_name: str): | |
| """Process API response based on content type""" | |
| content_type = response.headers.get('content-type', '').lower() | |
| if 'json' in content_type: | |
| return response.json() | |
| elif 'xml' in content_type: | |
| return self._xml_to_dict(response.text) | |
| else: | |
| try: | |
| return response.json() # Try JSON first | |
| except: | |
| return {"raw_content": response.text} | |
| def _xml_to_dict(self, xml_text: str) -> Dict: | |
| """Convert XML to dictionary""" | |
| try: | |
| import xml.etree.ElementTree as ET | |
| root = ET.fromstring(xml_text) | |
| return self._element_to_dict(root) | |
| except: | |
| return {"raw_xml": xml_text} | |
| def _element_to_dict(self, element) -> Dict: | |
| """Convert XML element to dictionary""" | |
| result = {} | |
| if element.attrib: | |
| result.update(element.attrib) | |
| if element.text and element.text.strip(): | |
| if len(element) == 0: | |
| return element.text.strip() | |
| result['text'] = element.text.strip() | |
| for child in element: | |
| child_data = self._element_to_dict(child) | |
| if child.tag in result: | |
| if not isinstance(result[child.tag], list): | |
| result[child.tag] = [result[child.tag]] | |
| result[child.tag].append(child_data) | |
| else: | |
| result[child.tag] = child_data | |
| return result | |
| def _extract_api_data(self, data: Any, api_name: str) -> Any: | |
| """Extract meaningful data from API response based on API type""" | |
| if api_name == "Skolverket": | |
| if isinstance(data, dict): | |
| if "_embedded" in data: | |
| return data["_embedded"] | |
| elif "skolenheter" in data: | |
| return data["skolenheter"] | |
| return data | |
| elif api_name == "SCB": | |
| if isinstance(data, dict): | |
| return data.get("data", data.get("variables", data)) | |
| elif api_name == "Kolada": | |
| if isinstance(data, dict): | |
| return data.get("values", data) | |
| elif api_name == "Eurostat": | |
| if isinstance(data, dict): | |
| return data.get("value", data.get("data", data)) | |
| elif api_name == "WHO": | |
| if isinstance(data, dict): | |
| return data.get("value", data.get("fact", data)) | |
| elif api_name == "OECD": | |
| if isinstance(data, dict): | |
| if "data" in data: | |
| return data["data"] | |
| return data | |
| elif api_name == "WorldBank": | |
| if isinstance(data, list) and len(data) > 1: | |
| return data[1] if data[1] else data[0] | |
| return data | |
| elif api_name == "Riksbanken": | |
| if isinstance(data, dict): | |
| return data.get("observations", data.get("data", data)) | |
| elif api_name == "Swecris": | |
| if isinstance(data, dict): | |
| return data.get("items", data.get("projects", data)) | |
| elif api_name == "CSN": | |
| if isinstance(data, dict): | |
| return data.get("data", data.get("variables", data)) | |
| return data | |
| def _count_records(self, data: Any) -> int: | |
| """Count records in the data""" | |
| if isinstance(data, list): | |
| return len(data) | |
| elif isinstance(data, dict): | |
| # Try to find arrays that represent records | |
| for key, value in data.items(): | |
| if isinstance(value, list) and len(value) > 0: | |
| return len(value) | |
| return 1 | |
| else: | |
| return 1 if data else 0 | |
| def _create_data_preview(self, data: Any) -> Dict: | |
| """Create a preview of the data for display""" | |
| preview = { | |
| "type": type(data).__name__, | |
| "sample": None | |
| } | |
| if isinstance(data, list): | |
| preview["length"] = len(data) | |
| preview["sample"] = data[:3] if len(data) > 3 else data | |
| elif isinstance(data, dict): | |
| preview["keys"] = list(data.keys())[:10] | |
| if data: | |
| first_key = list(data.keys())[0] | |
| preview["sample"] = {first_key: data[first_key]} | |
| else: | |
| preview["sample"] = str(data)[:200] | |
| return preview | |
| def _save_data_to_db(self, api_name: str, endpoint_path: str, data: Any, | |
| session_id: str, fetch_duration: int, record_count: int, | |
| data_size: int, status: str, error_message: str = None): | |
| """Save data to database with optimization""" | |
| import gzip | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Create data hash for deduplication | |
| data_str = json.dumps(data, sort_keys=True, default=str) | |
| data_hash = hashlib.sha256(data_str.encode()).hexdigest() | |
| # Check if data exists | |
| cursor.execute('SELECT id FROM harvested_data WHERE data_hash = ?', (data_hash,)) | |
| if cursor.fetchone(): | |
| return # Skip duplicate | |
| # Compress if large | |
| raw_data_compressed = None | |
| raw_data = None | |
| if data_size > 1024: | |
| try: | |
| raw_data_compressed = gzip.compress(data_str.encode('utf-8')) | |
| except: | |
| raw_data = data_str | |
| else: | |
| raw_data = data_str | |
| # Insert data | |
| cursor.execute(''' | |
| INSERT INTO harvested_data | |
| (api_name, endpoint_path, data_hash, raw_data, raw_data_compressed, | |
| record_count, data_size_bytes, fetch_duration_ms, status, | |
| error_message, session_id, data_format) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| api_name, endpoint_path, data_hash, raw_data, raw_data_compressed, | |
| record_count, data_size, fetch_duration, status, error_message, | |
| session_id, self._detect_data_format(data) | |
| )) | |
| conn.commit() | |
| except Exception as e: | |
| # Fallback to basic schema | |
| try: | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO harvested_data | |
| (api_name, endpoint_path, data_hash, raw_data, record_count, | |
| data_size_bytes, fetch_duration_ms, status, session_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| api_name, endpoint_path, data_hash, data_str[:10000], # Limit size | |
| record_count, data_size, fetch_duration, status, session_id | |
| )) | |
| conn.commit() | |
| except: | |
| pass # Silent fail | |
| finally: | |
| conn.close() | |
| def _detect_data_format(self, data: Any) -> str: | |
| """Detect data format""" | |
| if isinstance(data, dict): | |
| if "_embedded" in data or "_links" in data: | |
| return "HAL+JSON" | |
| elif "query" in data or "variables" in data: | |
| return "PX-Web" | |
| else: | |
| return "JSON" | |
| elif isinstance(data, list): | |
| return "JSON-Array" | |
| else: | |
| return "Unknown" | |
| if not config: | |
| return [] | |
| discovered = [] | |
| session_id = f"discovery_{api_name}_{int(time.time())}" | |
| if progress_callback: | |
| progress_callback(f"π Starting deep discovery for {api_name}...") | |
| # Apply rate limiting | |
| self._apply_rate_limit(config) | |
| # Explore each known root recursively | |
| for root_path in config["known_roots"]: | |
| if progress_callback: | |
| progress_callback(f"π Exploring root: {root_path}") | |
| root_endpoints = self._explore_endpoint_recursively( | |
| api_name, config, root_path, 0, config["explore_depth"], progress_callback | |
| ) | |
| discovered.extend(root_endpoints) | |
| # Try to discover through API documentation pages | |
| doc_endpoints = self._discover_from_documentation(api_name, config, progress_callback) | |
| discovered.extend(doc_endpoints) | |
| # Save discovery results | |
| self._save_discovery_results(api_name, session_id, discovered) | |
| if progress_callback: | |
| progress_callback(f"β Discovery complete: {len(discovered)} endpoints found for {api_name}") | |
| return discovered | |
| def _explore_endpoint_recursively(self, api_name: str, config: Dict, path: str, | |
| current_depth: int, max_depth: int, progress_callback=None) -> List[Dict]: | |
| """Recursively explore API endpoints""" | |
| if current_depth >= max_depth: | |
| return [] | |
| discovered = [] | |
| full_url = config["base_url"] + path | |
| try: | |
| # Apply authentication if needed | |
| headers = self._get_auth_headers(config) | |
| response = self.session.get(full_url, headers=headers, timeout=15) | |
| if response.status_code == 200: | |
| # Parse response to find more endpoints | |
| endpoints = self._extract_endpoints_from_response( | |
| api_name, config, response, path, current_depth | |
| ) | |
| for endpoint in endpoints: | |
| discovered.append(endpoint) | |
| # Recursively explore found endpoints | |
| if current_depth < max_depth - 1: | |
| sub_endpoints = self._explore_endpoint_recursively( | |
| api_name, config, endpoint["path"], | |
| current_depth + 1, max_depth, progress_callback | |
| ) | |
| discovered.extend(sub_endpoints) | |
| if progress_callback and discovered: | |
| progress_callback(f"π‘ Found {len(discovered)} endpoints at depth {current_depth}") | |
| except Exception as e: | |
| if progress_callback: | |
| progress_callback(f"β οΈ Error exploring {path}: {str(e)[:100]}") | |
| self._apply_rate_limit(config) | |
| return discovered | |
| def _extract_endpoints_from_response(self, api_name: str, config: Dict, response: requests.Response, | |
| parent_path: str, depth: int) -> List[Dict]: | |
| """Extract endpoint information from API response""" | |
| endpoints = [] | |
| try: | |
| # Try JSON parsing first | |
| if 'application/json' in response.headers.get('Content-Type', ''): | |
| data = response.json() | |
| endpoints.extend(self._extract_from_json(api_name, config, data, parent_path, depth)) | |
| # Parse HTML for documentation links | |
| elif 'text/html' in response.headers.get('Content-Type', ''): | |
| endpoints.extend(self._extract_from_html(api_name, config, response.text, parent_path, depth)) | |
| # Parse XML responses | |
| elif 'xml' in response.headers.get('Content-Type', ''): | |
| endpoints.extend(self._extract_from_xml(api_name, config, response.text, parent_path, depth)) | |
| except Exception as e: | |
| pass # Continue with other extraction methods | |
| return endpoints | |
| def _extract_from_json(self, api_name: str, config: Dict, data: Any, parent_path: str, depth: int) -> List[Dict]: | |
| """Extract endpoints from JSON response""" | |
| endpoints = [] | |
| if isinstance(data, dict): | |
| # Look for common API documentation patterns | |
| if '_links' in data: # HAL format | |
| for link_key, link_data in data['_links'].items(): | |
| if isinstance(link_data, dict) and 'href' in link_data: | |
| endpoint_path = self._normalize_path(link_data['href']) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'HAL_link', parent_path, depth + 1 | |
| )) | |
| if 'paths' in data: # OpenAPI/Swagger | |
| for path in data['paths'].keys(): | |
| endpoint_path = self._normalize_path(path) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'OpenAPI', parent_path, depth + 1 | |
| )) | |
| # Look for URL patterns in values | |
| for key, value in data.items() if isinstance(data, dict) else []: | |
| if isinstance(value, str) and self._is_api_path(value, config): | |
| endpoint_path = self._normalize_path(value) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'JSON_value', parent_path, depth + 1 | |
| )) | |
| elif isinstance(data, list): | |
| for item in data: | |
| endpoints.extend(self._extract_from_json(api_name, config, item, parent_path, depth)) | |
| return endpoints | |
| def _extract_from_html(self, api_name: str, config: Dict, html: str, parent_path: str, depth: int) -> List[Dict]: | |
| """Extract endpoints from HTML documentation""" | |
| endpoints = [] | |
| # Look for API endpoint patterns in HTML | |
| patterns = [ | |
| r'href=["\']([^"\']*(?:api|/v\d+)[^"\']*)["\']', | |
| r'url["\']?\s*[:=]\s*["\']([^"\']*(?:api|/v\d+)[^"\']*)["\']', | |
| r'endpoint["\']?\s*[:=]\s*["\']([^"\']*)["\']' | |
| ] | |
| for pattern in patterns: | |
| matches = re.finditer(pattern, html, re.IGNORECASE) | |
| for match in matches: | |
| potential_path = match.group(1) | |
| if self._is_api_path(potential_path, config): | |
| endpoint_path = self._normalize_path(potential_path) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'HTML_link', parent_path, depth + 1 | |
| )) | |
| return endpoints | |
| def _extract_from_xml(self, api_name: str, config: Dict, xml_text: str, parent_path: str, depth: int) -> List[Dict]: | |
| """Extract endpoints from XML response""" | |
| endpoints = [] | |
| try: | |
| root = ET.fromstring(xml_text) | |
| # Look for URL attributes and text content | |
| for elem in root.iter(): | |
| # Check attributes | |
| for attr_value in elem.attrib.values(): | |
| if self._is_api_path(attr_value, config): | |
| endpoint_path = self._normalize_path(attr_value) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'XML_attr', parent_path, depth + 1 | |
| )) | |
| # Check text content | |
| if elem.text and self._is_api_path(elem.text, config): | |
| endpoint_path = self._normalize_path(elem.text) | |
| endpoints.append(self._create_endpoint_info( | |
| api_name, endpoint_path, 'XML_text', parent_path, depth + 1 | |
| )) | |
| except ET.ParseError: | |
| pass | |
| return endpoints | |
| def _discover_from_documentation(self, api_name: str, config: Dict, progress_callback=None) -> List[Dict]: | |
| """Discover endpoints from API documentation pages""" | |
| endpoints = [] | |
| # Common documentation paths | |
| doc_paths = [ | |
| '/docs', '/documentation', '/api-docs', '/swagger', '/openapi', | |
| '/help', '/reference', '/guide', '/v1/docs', '/v2/docs' | |
| ] | |
| for doc_path in doc_paths: | |
| try: | |
| full_url = config["base_url"] + doc_path | |
| headers = self._get_auth_headers(config) | |
| response = self.session.get(full_url, headers=headers, timeout=10) | |
| if response.status_code == 200: | |
| doc_endpoints = self._extract_endpoints_from_response( | |
| api_name, config, response, doc_path, 0 | |
| ) | |
| endpoints.extend(doc_endpoints) | |
| if progress_callback and doc_endpoints: | |
| progress_callback(f"π Found {len(doc_endpoints)} endpoints in documentation") | |
| except Exception: | |
| continue | |
| self._apply_rate_limit(config) | |
| return endpoints | |
| def _is_api_path(self, path: str, config: Dict) -> bool: | |
| """Check if a path looks like a valid API endpoint""" | |
| if not isinstance(path, str) or len(path) < 2: | |
| return False | |
| # Must start with / or be a relative path | |
| if not (path.startswith('/') or not path.startswith('http')): | |
| return False | |
| # Check against discovery patterns | |
| for pattern in config["discovery_patterns"]: | |
| if re.match(pattern, path): | |
| return True | |
| # General API path indicators | |
| api_indicators = ['/api/', '/v1/', '/v2/', '/v3/', '/rest/', '/data/'] | |
| return any(indicator in path.lower() for indicator in api_indicators) | |
| def _normalize_path(self, path: str) -> str: | |
| """Normalize API path""" | |
| # Remove base URL if present | |
| if path.startswith('http'): | |
| parsed = urlparse(path) | |
| path = parsed.path | |
| # Ensure starts with / | |
| if not path.startswith('/'): | |
| path = '/' + path | |
| # Remove trailing slash | |
| if path.endswith('/') and len(path) > 1: | |
| path = path[:-1] | |
| return path | |
| def _create_endpoint_info(self, api_name: str, path: str, discovery_method: str, | |
| parent_path: str, depth: int) -> Dict: | |
| """Create endpoint information dictionary""" | |
| return { | |
| "api_name": api_name, | |
| "path": path, | |
| "full_url": DEEP_API_CONFIG[api_name]["base_url"] + path, | |
| "discovery_method": discovery_method, | |
| "parent_path": parent_path, | |
| "depth": depth, | |
| "discovered_at": datetime.now().isoformat() | |
| } | |
| def _get_auth_headers(self, config: Dict) -> Dict: | |
| """Get authentication headers for API""" | |
| headers = {} | |
| auth = config.get("auth") | |
| if auth and auth.get("type") == "Bearer": | |
| headers["Authorization"] = f"Bearer {auth['token']}" | |
| return headers | |
| def _apply_rate_limit(self, config: Dict): | |
| """Apply rate limiting for API""" | |
| rate_limit = config.get("rate_limit") | |
| if rate_limit: | |
| sleep_time = rate_limit["per_seconds"] / rate_limit["requests"] | |
| time.sleep(sleep_time) | |
| def _save_discovery_results(self, api_name: str, session_id: str, endpoints: List[Dict]): | |
| """Save discovery results to database""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| for endpoint in endpoints: | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO discovered_endpoints | |
| (api_name, endpoint_path, full_url, discovery_method, depth_level, | |
| parent_endpoint, last_checked, response_format) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| endpoint["api_name"], | |
| endpoint["path"], | |
| endpoint["full_url"], | |
| endpoint["discovery_method"], | |
| endpoint["depth"], | |
| endpoint.get("parent_path", ""), | |
| datetime.now(), | |
| "unknown" | |
| )) | |
| # Update discovery progress | |
| cursor.execute(''' | |
| INSERT INTO discovery_progress | |
| (api_name, discovery_session, completed_at, endpoints_found, discovery_status) | |
| VALUES (?, ?, ?, ?, ?) | |
| ''', (api_name, session_id, datetime.now(), len(endpoints), "completed")) | |
| conn.commit() | |
| conn.close() | |
| class SessionManager: | |
| """Manage harvest sessions with resumption capability""" | |
| def __init__(self): | |
| self.current_session = None | |
| def create_session(self, session_name: str = None) -> str: | |
| """Create a new harvest session""" | |
| session_id = f"session_{int(time.time())}" | |
| if not session_name: | |
| session_name = f"Harvest Session {datetime.now().strftime('%Y-%m-%d %H:%M')}" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| INSERT INTO harvest_sessions | |
| (session_id, session_name, session_status) | |
| VALUES (?, ?, ?) | |
| ''', (session_id, session_name, "active")) | |
| conn.commit() | |
| conn.close() | |
| self.current_session = session_id | |
| return session_id | |
| def get_last_session(self) -> Optional[Dict]: | |
| """Get the most recent session for resumption""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT * FROM harvest_sessions | |
| WHERE session_status != 'completed' | |
| ORDER BY last_activity DESC | |
| LIMIT 1 | |
| ''') | |
| row = cursor.fetchone() | |
| conn.close() | |
| if row: | |
| return { | |
| "session_id": row[1], | |
| "session_name": row[2], | |
| "started_at": row[3], | |
| "last_activity": row[4], | |
| "total_endpoints": row[6], | |
| "processed_endpoints": row[7], | |
| "current_api": row[11], | |
| "current_endpoint": row[12] | |
| } | |
| return None | |
| def resume_session(self, session_id: str) -> Dict: | |
| """Resume a previous session""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Get session details | |
| cursor.execute('SELECT * FROM harvest_sessions WHERE session_id = ?', (session_id,)) | |
| session = cursor.fetchone() | |
| if not session: | |
| conn.close() | |
| return {} | |
| # Get processed endpoints | |
| cursor.execute(''' | |
| SELECT DISTINCT api_name, endpoint_path | |
| FROM harvested_data | |
| WHERE session_id = ? | |
| ''', (session_id,)) | |
| processed = cursor.fetchall() | |
| processed_endpoints = {f"{row[0]}:{row[1]}" for row in processed} | |
| conn.close() | |
| self.current_session = session_id | |
| return { | |
| "session_id": session_id, | |
| "processed_endpoints": processed_endpoints, | |
| "total_endpoints": session[6], | |
| "processed_count": session[7], | |
| "current_api": session[11], | |
| "current_endpoint": session[12] | |
| } | |
| def update_session_progress(self, session_id: str, current_api: str, | |
| current_endpoint: str, processed_count: int): | |
| """Update session progress""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| UPDATE harvest_sessions | |
| SET last_activity = ?, current_api = ?, current_endpoint = ?, processed_endpoints = ? | |
| WHERE session_id = ? | |
| ''', (datetime.now(), current_api, current_endpoint, processed_count, session_id)) | |
| conn.commit() | |
| conn.close() | |
| def complete_session(self, session_id: str): | |
| """Mark session as completed""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| UPDATE harvest_sessions | |
| SET completed_at = ?, session_status = 'completed' | |
| WHERE session_id = ? | |
| ''', (datetime.now(), session_id)) | |
| conn.commit() | |
| conn.close() | |
| class UltimateDataHarvester: | |
| """Ultimate data harvester with resumption and intelligent storage""" | |
| def __init__(self): | |
| self.session = requests.Session() | |
| self.session.headers.update({ | |
| 'User-Agent': 'Ultimate-Data-Harvester/2.0 (Comprehensive Research Purpose)' | |
| }) | |
| self.discoverer = DeepEndpointDiscoverer() | |
| self.session_manager = SessionManager() | |
| init_enhanced_database() | |
| def get_all_discovered_endpoints(self, api_name: str = None) -> List[Dict]: | |
| """Get all discovered endpoints from database""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| if api_name: | |
| cursor.execute(''' | |
| SELECT * FROM discovered_endpoints | |
| WHERE api_name = ? AND is_active = 1 | |
| ORDER BY api_name, depth_level, endpoint_path | |
| ''', (api_name,)) | |
| else: | |
| cursor.execute(''' | |
| SELECT * FROM discovered_endpoints | |
| WHERE is_active = 1 | |
| ORDER BY api_name, depth_level, endpoint_path | |
| ''') | |
| columns = [desc[0] for desc in cursor.description] | |
| endpoints = [dict(zip(columns, row)) for row in cursor.fetchall()] | |
| conn.close() | |
| return endpoints | |
| def harvest_with_resumption(self, selected_apis: List[str], session_id: str = None, | |
| progress_callback=None) -> Dict: | |
| """Harvest data with session resumption capability""" | |
| # Resume existing session or create new one | |
| if session_id: | |
| session_info = self.session_manager.resume_session(session_id) | |
| processed_endpoints = session_info.get("processed_endpoints", set()) | |
| else: | |
| session_id = self.session_manager.create_session() | |
| processed_endpoints = set() | |
| results = { | |
| "session_id": session_id, | |
| "total_endpoints": 0, | |
| "processed_endpoints": 0, | |
| "successful_fetches": 0, | |
| "failed_fetches": 0, | |
| "total_records": 0, | |
| "skipped_endpoints": 0, | |
| "errors": [] | |
| } | |
| # Get all endpoints for selected APIs | |
| all_endpoints = [] | |
| for api_name in selected_apis: | |
| endpoints = self.get_all_discovered_endpoints(api_name) | |
| all_endpoints.extend(endpoints) | |
| results["total_endpoints"] = len(all_endpoints) | |
| if progress_callback: | |
| progress_callback(f"π Starting harvest from {len(all_endpoints)} endpoints (Session: {session_id})") | |
| # Process endpoints with resumption | |
| for i, endpoint in enumerate(all_endpoints): | |
| endpoint_key = f"{endpoint['api_name']}:{endpoint['endpoint_path']}" | |
| # Skip if already processed in this session | |
| if endpoint_key in processed_endpoints: | |
| results["skipped_endpoints"] += 1 | |
| if progress_callback: | |
| progress_callback(f"βοΈ Skipping already processed: {endpoint_key}") | |
| continue | |
| # Update session progress | |
| self.session_manager.update_session_progress( | |
| session_id, endpoint['api_name'], endpoint['endpoint_path'], | |
| results["processed_endpoints"] | |
| ) | |
| # Fetch data from endpoint | |
| fetch_result = self._fetch_endpoint_data(endpoint, session_id) | |
| if fetch_result["status"] == "success": | |
| results["successful_fetches"] += 1 | |
| results["total_records"] += fetch_result.get("record_count", 0) | |
| else: | |
| results["failed_fetches"] += 1 | |
| results["errors"].append(f"{endpoint_key}: {fetch_result.get('error', 'Unknown error')}") | |
| results["processed_endpoints"] += 1 | |
| processed_endpoints.add(endpoint_key) | |
| if progress_callback: | |
| progress_callback(f"π Processed {results['processed_endpoints']}/{results['total_endpoints']}: {endpoint_key}") | |
| # Apply rate limiting | |
| config = DEEP_API_CONFIG.get(endpoint['api_name'], {}) | |
| self._apply_rate_limit(config) | |
| # Complete session | |
| self.session_manager.complete_session(session_id) | |
| if progress_callback: | |
| progress_callback(f"β Harvest completed! Session: {session_id}") | |
| return results | |
| def _fetch_endpoint_data(self, endpoint: Dict, session_id: str) -> Dict: | |
| """Fetch data from a single endpoint with intelligent storage""" | |
| start_time = time.time() | |
| try: | |
| api_name = endpoint["api_name"] | |
| config = DEEP_API_CONFIG.get(api_name, {}) | |
| # Setup headers with authentication | |
| headers = {} | |
| auth = config.get("auth") | |
| if auth and auth.get("type") == "Bearer": | |
| headers["Authorization"] = f"Bearer {auth['token']}" | |
| # Make request | |
| response = self.session.get(endpoint["full_url"], headers=headers, timeout=30) | |
| response.raise_for_status() | |
| # Parse response | |
| content_type = response.headers.get('Content-Type', '') | |
| if 'application/json' in content_type: | |
| data = response.json() | |
| elif 'application/xml' in content_type or 'text/xml' in content_type: | |
| data = self._xml_to_dict(response.text) | |
| else: | |
| data = {"raw_response": response.text} | |
| # Process and clean data | |
| processed_data = self._process_api_response(api_name, data) | |
| # Calculate metrics | |
| fetch_duration = int((time.time() - start_time) * 1000) | |
| record_count = len(processed_data) if isinstance(processed_data, list) else 1 | |
| data_size = len(json.dumps(processed_data, default=str).encode('utf-8')) | |
| # Save to database with intelligent categorization | |
| self._save_harvested_data( | |
| api_name, endpoint["endpoint_path"], processed_data, | |
| session_id, fetch_duration, record_count, data_size | |
| ) | |
| return { | |
| "status": "success", | |
| "record_count": record_count, | |
| "data_size": data_size, | |
| "fetch_duration": fetch_duration | |
| } | |
| except Exception as e: | |
| fetch_duration = int((time.time() - start_time) * 1000) | |
| # Save error information | |
| self._save_harvested_data( | |
| endpoint["api_name"], endpoint["endpoint_path"], {}, | |
| session_id, fetch_duration, 0, 0, "error", str(e) | |
| ) | |
| return { | |
| "status": "error", | |
| "error": str(e), | |
| "fetch_duration": fetch_duration | |
| } | |
| def _process_api_response(self, api_name: str, data: Any) -> Any: | |
| """Process API response with intelligent data extraction""" | |
| if api_name == "Skolverket": | |
| if isinstance(data, dict): | |
| if "_embedded" in data: | |
| for key, value in data["_embedded"].items(): | |
| if isinstance(value, list): | |
| return value | |
| if "Skolenheter" in data: | |
| return data["Skolenheter"] | |
| return data | |
| elif api_name == "SCB": | |
| if isinstance(data, dict): | |
| return data.get("data", data.get("variables", data)) | |
| elif api_name == "Kolada": | |
| if isinstance(data, dict): | |
| return data.get("values", data) | |
| elif api_name == "Eurostat": | |
| if isinstance(data, dict): | |
| return data.get("value", data.get("data", data)) | |
| elif api_name == "WHO": | |
| if isinstance(data, dict): | |
| return data.get("value", data.get("fact", data)) | |
| elif api_name == "OECD": | |
| if isinstance(data, dict): | |
| if "dataSets" in data: | |
| return data["dataSets"] | |
| return data.get("data", data) | |
| elif api_name == "WorldBank": | |
| if isinstance(data, list) and len(data) > 1: | |
| return data[1] if data[1] else data[0] | |
| return data | |
| elif api_name == "Riksbanken": | |
| if isinstance(data, dict): | |
| return data.get("observations", data.get("data", data)) | |
| elif api_name == "Swecris": | |
| if isinstance(data, dict): | |
| return data.get("items", data.get("projects", data)) | |
| elif api_name == "CSN": | |
| if isinstance(data, dict): | |
| return data.get("data", data.get("variables", data)) | |
| return data | |
| def _save_harvested_data(self, api_name: str, endpoint_path: str, data: Any, | |
| session_id: str, fetch_duration: int, record_count: int, | |
| data_size: int, status: str = "success", error_message: str = None): | |
| """Save harvested data with optimized storage and AI-enhanced analysis""" | |
| import gzip | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Create data hash for deduplication | |
| data_str = json.dumps(data, sort_keys=True, default=str) | |
| data_hash = hashlib.sha256(data_str.encode()).hexdigest() | |
| # Check if this data already exists | |
| cursor.execute('SELECT id FROM harvested_data WHERE data_hash = ?', (data_hash,)) | |
| if cursor.fetchone(): | |
| # Update access count and last accessed time | |
| cursor.execute(''' | |
| UPDATE harvested_data | |
| SET access_count = access_count + 1, last_accessed = CURRENT_TIMESTAMP | |
| WHERE data_hash = ? | |
| ''', (data_hash,)) | |
| conn.commit() | |
| conn.close() | |
| return | |
| # AI Quality Assessment | |
| quality_assessment = {} | |
| if ai_quality_assessor and status == "success": | |
| quality_assessment = ai_quality_assessor.assess_data_quality(data, api_name) | |
| # Semantic Similarity Analysis | |
| similar_datasets = [] | |
| if semantic_analyzer and status == "success": | |
| similar_datasets = semantic_analyzer.find_similar_datasets(data, api_name) | |
| # API Health Monitoring | |
| health_info = {} | |
| if health_monitor: | |
| success_rate = 1.0 if status == "success" else 0.0 | |
| health_info = health_monitor.monitor_api_health( | |
| api_name, fetch_duration, success_rate, data_size | |
| ) | |
| # Determine data format | |
| data_format = self._detect_data_format(data) | |
| # Compress data if it's large | |
| raw_data_compressed = None | |
| processed_data_compressed = None | |
| raw_data = None | |
| processed_data = None | |
| if data_size > 1024: # Compress if larger than 1KB | |
| try: | |
| raw_data_compressed = gzip.compress(data_str.encode('utf-8')) | |
| processed_data_compressed = gzip.compress(json.dumps(data, default=str).encode('utf-8')) | |
| except: | |
| # Fallback to uncompressed storage | |
| raw_data = data_str | |
| processed_data = json.dumps(data, default=str) | |
| else: | |
| raw_data = data_str | |
| processed_data = json.dumps(data, default=str) | |
| try: | |
| cursor.execute(''' | |
| INSERT INTO harvested_data | |
| (api_name, endpoint_path, data_hash, raw_data_compressed, processed_data_compressed, | |
| raw_data, processed_data, raw_data_size, processed_data_size, | |
| record_count, data_size_bytes, fetch_duration_ms, status, | |
| error_message, session_id, quality_score, health_score, similar_datasets, | |
| data_format, access_count) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| api_name, endpoint_path, data_hash, raw_data_compressed, processed_data_compressed, | |
| raw_data, processed_data, len(data_str), len(json.dumps(data, default=str)), | |
| record_count, data_size, fetch_duration, status, error_message, session_id, | |
| quality_assessment.get('ai_quality_score', 0.0), | |
| health_info.get('health_score', 0.0), | |
| json.dumps(similar_datasets[:3], default=str), | |
| data_format, 1 | |
| )) | |
| # Log API performance | |
| cursor.execute(''' | |
| INSERT INTO api_performance_log | |
| (api_name, endpoint_path, response_time_ms, response_size_bytes, | |
| http_status_code, success, error_type) | |
| VALUES (?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| api_name, endpoint_path, fetch_duration, data_size, | |
| 200 if status == "success" else 500, | |
| status == "success", | |
| error_message if status != "success" else None | |
| )) | |
| conn.commit() | |
| # Display AI insights if available | |
| if quality_assessment and st.session_state.get('show_ai_insights', True): | |
| self._display_ai_insights(api_name, quality_assessment, health_info, similar_datasets) | |
| except sqlite3.OperationalError as e: | |
| # Handle database schema updates | |
| if "no such column" in str(e): | |
| self._upgrade_database_schema() | |
| # Retry with basic data structure | |
| cursor.execute(''' | |
| INSERT OR REPLACE INTO harvested_data | |
| (api_name, endpoint_path, data_hash, raw_data, processed_data, | |
| record_count, data_size_bytes, fetch_duration_ms, status, | |
| error_message, session_id) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| api_name, endpoint_path, data_hash, raw_data or data_str, | |
| processed_data or json.dumps(data, default=str), record_count, data_size, | |
| fetch_duration, status, error_message, session_id | |
| )) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def _detect_data_format(self, data: Any) -> str: | |
| """Detect the format of the data""" | |
| if isinstance(data, dict): | |
| if "_embedded" in data or "_links" in data: | |
| return "HAL+JSON" | |
| elif "dataSets" in data or "structure" in data: | |
| return "SDMX-JSON" | |
| else: | |
| return "JSON" | |
| elif isinstance(data, list): | |
| return "JSON-Array" | |
| elif isinstance(data, str): | |
| if data.strip().startswith('<'): | |
| return "XML" | |
| else: | |
| return "Text" | |
| else: | |
| return "Unknown" | |
| def _display_ai_insights(self, api_name: str, quality_assessment: Dict, | |
| health_info: Dict, similar_datasets: List[Dict]): | |
| """Display AI-powered insights in real-time""" | |
| if quality_assessment: | |
| with st.expander(f"π€ AI Insights for {api_name}", expanded=False): | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Quality Grade", quality_assessment.get('overall_grade', 'N/A')) | |
| st.metric("Completeness", f"{quality_assessment.get('completeness_score', 0):.2f}") | |
| with col2: | |
| if health_info: | |
| st.metric("Health Status", health_info.get('status', 'Unknown')) | |
| st.metric("Performance Trend", health_info.get('trend', 'N/A')) | |
| with col3: | |
| st.metric("Anomalies", quality_assessment.get('anomaly_count', 0)) | |
| if similar_datasets: | |
| st.metric("Similar Datasets", len(similar_datasets)) | |
| # Recommendations | |
| recommendations = quality_assessment.get('recommendations', []) | |
| if recommendations: | |
| st.write("**π― Recommendations:**") | |
| for rec in recommendations[:3]: | |
| st.write(f"β’ {rec}") | |
| # Similar datasets | |
| if similar_datasets: | |
| st.write("**π Similar Datasets Found:**") | |
| for sim in similar_datasets[:2]: | |
| st.write(f"β’ {sim['dataset']} (similarity: {sim['similarity']:.2f})") | |
| def _upgrade_database_schema(self): | |
| """Upgrade database schema to include AI columns""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| try: | |
| # Add AI enhancement columns | |
| cursor.execute('ALTER TABLE harvested_data ADD COLUMN quality_score REAL DEFAULT 0.0') | |
| cursor.execute('ALTER TABLE harvested_data ADD COLUMN health_score REAL DEFAULT 0.0') | |
| cursor.execute('ALTER TABLE harvested_data ADD COLUMN similar_datasets TEXT DEFAULT "[]"') | |
| conn.commit() | |
| except sqlite3.OperationalError: | |
| pass # Columns already exist | |
| finally: | |
| conn.close() | |
| def _xml_to_dict(self, xml_text: str) -> Dict: | |
| """Convert XML to dictionary""" | |
| try: | |
| root = ET.fromstring(xml_text) | |
| return self._element_to_dict(root) | |
| except ET.ParseError: | |
| return {"raw_xml": xml_text} | |
| def _element_to_dict(self, element) -> Dict: | |
| """Convert XML element to dictionary""" | |
| result = {} | |
| if element.attrib: | |
| result.update(element.attrib) | |
| if element.text and element.text.strip(): | |
| if len(element) == 0: | |
| return element.text.strip() | |
| result['text'] = element.text.strip() | |
| for child in element: | |
| child_data = self._element_to_dict(child) | |
| if child.tag in result: | |
| if not isinstance(result[child.tag], list): | |
| result[child.tag] = [result[child.tag]] | |
| result[child.tag].append(child_data) | |
| else: | |
| result[child.tag] = child_data | |
| return result | |
| def _apply_rate_limit(self, config: Dict): | |
| """Apply rate limiting""" | |
| rate_limit = config.get("rate_limit") | |
| if rate_limit: | |
| sleep_time = rate_limit["per_seconds"] / rate_limit["requests"] | |
| time.sleep(sleep_time) | |
| def get_database_stats(self) -> Dict: | |
| """Get comprehensive database statistics""" | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Endpoint statistics | |
| cursor.execute('SELECT COUNT(*) FROM discovered_endpoints') | |
| total_endpoints = cursor.fetchone()[0] | |
| cursor.execute('SELECT COUNT(DISTINCT api_name) FROM discovered_endpoints') | |
| active_apis = cursor.fetchone()[0] | |
| # Data statistics | |
| cursor.execute('SELECT COUNT(*), SUM(record_count), SUM(data_size_bytes) FROM harvested_data WHERE status = "success"') | |
| data_stats = cursor.fetchone() | |
| # Session statistics | |
| cursor.execute('SELECT COUNT(*) FROM harvest_sessions') | |
| total_sessions = cursor.fetchone()[0] | |
| conn.close() | |
| return { | |
| "total_endpoints": total_endpoints, | |
| "active_apis": active_apis, | |
| "successful_fetches": data_stats[0] or 0, | |
| "total_records": data_stats[1] or 0, | |
| "total_data_size": data_stats[2] or 0, | |
| "total_sessions": total_sessions | |
| } | |
| # Initialize simplified components | |
| if 'harvester' not in st.session_state: | |
| st.session_state.harvester = SimplifiedDataHarvester() | |
| if 'last_results' not in st.session_state: | |
| st.session_state.last_results = None | |
| # Enhanced Header | |
| st.markdown(""" | |
| <div class="title-container"> | |
| <h1 style="font-size: 2.5rem; margin: 0; color: #2c3e50;"> | |
| π Ultimate Data Harvester | |
| </h1> | |
| <p style="font-size: 1.1rem; margin: 0.5rem 0 0 0; color: #34495e;"> | |
| AI-Enhanced Deep Discovery β’ Session Resumption β’ Intelligent Storage | |
| </p> | |
| <p style="font-size: 0.95rem; margin: 0.3rem 0 0 0; color: #7f8c8d;"> | |
| Comprehensive data collection from 10 international APIs with advanced analytics | |
| </p> | |
| <div style="margin-top: 1rem;"> | |
| <span style="background: #ecf0f1; color: #2c3e50; padding: 0.3rem 0.8rem; border-radius: 15px; margin: 0 0.3rem; font-size: 0.9rem;">π Recursive Discovery</span> | |
| <span style="background: #ecf0f1; color: #2c3e50; padding: 0.3rem 0.8rem; border-radius: 15px; margin: 0 0.3rem; font-size: 0.9rem;">π― Auto-Resume</span> | |
| <span style="background: #ecf0f1; color: #2c3e50; padding: 0.3rem 0.8rem; border-radius: 15px; margin: 0 0.3rem; font-size: 0.9rem;">πΎ Smart Storage</span> | |
| </div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Display ML status prominently | |
| if ML_AVAILABLE: | |
| st.success("π€ **AI Enhanced Mode Active** - Advanced quality assessment and semantic analysis enabled") | |
| else: | |
| st.info("π **Standard Mode** - Basic functionality available. Install transformers and sentence-transformers for AI features.") | |
| # Session Management Section | |
| st.markdown("### π― Session Management") | |
| col1, col2, col3 = st.columns([2, 1, 1]) | |
| with col1: | |
| if st.session_state.last_session_info: | |
| last_session = st.session_state.last_session_info | |
| st.markdown(f""" | |
| <div class="discovery-progress"> | |
| <strong>π Last Session Available</strong><br> | |
| <strong>Name:</strong> {last_session['session_name']}<br> | |
| <strong>Progress:</strong> {last_session['processed_endpoints']}/{last_session['total_endpoints']} endpoints<br> | |
| <strong>Last API:</strong> {last_session.get('current_api', 'N/A')}<br> | |
| <strong>Started:</strong> {last_session['started_at'][:19]} | |
| </div> | |
| """, unsafe_allow_html=True) | |
| else: | |
| st.info("No previous session found. Ready to start fresh!") | |
| with col2: | |
| if st.button("π Resume Last Session", disabled=not st.session_state.last_session_info, use_container_width=True): | |
| st.session_state.current_session = st.session_state.last_session_info['session_id'] | |
| st.success(f"Resumed session: {st.session_state.last_session_info['session_name']}") | |
| with col3: | |
| if st.button("π Start New Session", use_container_width=True): | |
| session_id = st.session_state.harvester.session_manager.create_session() | |
| st.session_state.current_session = session_id | |
| st.session_state.last_session_info = None | |
| st.success(f"New session created: {session_id}") | |
| # Database Statistics | |
| st.markdown("### π Database Overview") | |
| stats = st.session_state.harvester.get_database_stats() | |
| col1, col2, col3, col4, col5 = st.columns(5) | |
| with col1: | |
| st.markdown(f""" | |
| <div class="metric-card"> | |
| <div style="font-size: 0.9rem; opacity: 0.8;">π― Discovered Endpoints</div> | |
| <div style="font-size: 1.8rem; font-weight: bold;">{stats['total_endpoints']:,}</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col2: | |
| st.markdown(f""" | |
| <div class="metric-card"> | |
| <div style="font-size: 0.9rem; opacity: 0.8;">π Active APIs</div> | |
| <div style="font-size: 1.8rem; font-weight: bold;">{stats['active_apis']}</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col3: | |
| st.markdown(f""" | |
| <div class="metric-card"> | |
| <div style="font-size: 0.9rem; opacity: 0.8;">β Successful Fetches</div> | |
| <div style="font-size: 1.8rem; font-weight: bold;">{stats['successful_fetches']:,}</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col4: | |
| st.markdown(f""" | |
| <div class="metric-card"> | |
| <div style="font-size: 0.9rem; opacity: 0.8;">π Total Records</div> | |
| <div style="font-size: 1.8rem; font-weight: bold;">{stats['total_records']:,}</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| with col5: | |
| data_size_mb = stats['total_data_size'] / 1024 / 1024 if stats['total_data_size'] else 0 | |
| st.markdown(f""" | |
| <div class="metric-card"> | |
| <div style="font-size: 0.9rem; opacity: 0.8;">πΎ Data Size</div> | |
| <div style="font-size: 1.8rem; font-weight: bold;">{data_size_mb:.1f} MB</div> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Main Operations | |
| st.markdown("### π Operations") | |
| tab1, tab2, tab3 = st.tabs(["π Deep Discovery", "π Data Harvesting", "π Analytics"]) | |
| with tab1: | |
| st.markdown("**π€ AI-Enhanced Deep Discovery - Find all endpoints with intelligent analysis**") | |
| # AI Settings | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| enable_ai_insights = st.checkbox("π€ Enable AI Quality Assessment", value=True, key="enable_ai") | |
| with col2: | |
| show_similarity = st.checkbox("π Show Semantic Similarity", value=True, key="enable_similarity") | |
| st.session_state['show_ai_insights'] = enable_ai_insights | |
| # API Selection for Discovery | |
| selected_apis_discovery = st.multiselect( | |
| "Select APIs for deep endpoint discovery:", | |
| list(DEEP_API_CONFIG.keys()), | |
| default=[], | |
| key="discovery_apis" | |
| ) | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| if st.button("π Start Deep Discovery", disabled=not selected_apis_discovery, use_container_width=True): | |
| progress_container = st.container() | |
| status_container = st.empty() | |
| with progress_container: | |
| progress_bar = st.progress(0) | |
| for i, api_name in enumerate(selected_apis_discovery): | |
| st.session_state.discovery_status[api_name] = "discovering" | |
| def progress_callback(message): | |
| status_container.text(f"π {api_name}: {message}") | |
| # Run deep discovery | |
| discovered = st.session_state.harvester.discoverer.discover_all_endpoints( | |
| api_name, progress_callback | |
| ) | |
| st.session_state.discovery_status[api_name] = "completed" | |
| progress_bar.progress((i + 1) / len(selected_apis_discovery)) | |
| # Show results | |
| st.success(f"β {api_name}: {len(discovered)} endpoints discovered") | |
| status_container.text("π Deep discovery completed for all selected APIs!") | |
| with col2: | |
| if st.button("π View All Endpoints", use_container_width=True): | |
| endpoints = st.session_state.harvester.get_all_discovered_endpoints() | |
| if endpoints: | |
| df_endpoints = pd.DataFrame(endpoints) | |
| st.dataframe( | |
| df_endpoints[['api_name', 'endpoint_path', 'discovery_method', 'depth_level', 'last_checked']], | |
| use_container_width=True | |
| ) | |
| else: | |
| st.info("No endpoints discovered yet. Run discovery first!") | |
| with tab2: | |
| st.markdown("**Harvest data from all discovered endpoints with session resumption**") | |
| # API Selection for Harvesting | |
| selected_apis_harvest = st.multiselect( | |
| "Select APIs for data harvesting:", | |
| list(DEEP_API_CONFIG.keys()), | |
| default=list(DEEP_API_CONFIG.keys()), | |
| key="harvest_apis" | |
| ) | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| if st.button("π Start Ultimate Harvest", disabled=not selected_apis_harvest, use_container_width=True): | |
| progress_container = st.container() | |
| status_container = st.empty() | |
| results_container = st.container() | |
| with progress_container: | |
| progress_bar = st.progress(0) | |
| def progress_callback(message): | |
| status_container.text(message) | |
| # Start harvest with resumption | |
| results = st.session_state.harvester.harvest_with_resumption( | |
| selected_apis_harvest, | |
| st.session_state.current_session, | |
| progress_callback | |
| ) | |
| # Update progress bar to completion | |
| progress_bar.progress(1.0) | |
| # Show results | |
| with results_container: | |
| st.success("π Ultimate harvest completed!") | |
| col_a, col_b, col_c, col_d = st.columns(4) | |
| with col_a: | |
| st.metric("β Successful", results['successful_fetches']) | |
| with col_b: | |
| st.metric("β Failed", results['failed_fetches']) | |
| with col_c: | |
| st.metric("π Records", f"{results['total_records']:,}") | |
| with col_d: | |
| st.metric("βοΈ Skipped", results['skipped_endpoints']) | |
| with col2: | |
| # Export options | |
| st.markdown("**Export Data**") | |
| if st.button("π Export Database (JSON)", use_container_width=True): | |
| conn = sqlite3.connect(DB_PATH) | |
| # Export all tables | |
| tables = ['discovered_endpoints', 'harvested_data', 'harvest_sessions'] | |
| export_data = {} | |
| for table in tables: | |
| df = pd.read_sql_query(f"SELECT * FROM {table}", conn) | |
| export_data[table] = df.to_dict('records') | |
| conn.close() | |
| # Create download | |
| export_json = json.dumps(export_data, default=str, indent=2) | |
| st.download_button( | |
| "πΎ Download Complete Database", | |
| data=export_json, | |
| file_name=f"ultimate_harvest_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
| mime="application/json", | |
| use_container_width=True | |
| ) | |
| with tab3: | |
| st.markdown("**Analytics and Insights from Harvested Data**") | |
| # Get harvested data for analytics | |
| conn = sqlite3.connect(DB_PATH) | |
| try: | |
| df_data = pd.read_sql_query(''' | |
| SELECT api_name, COUNT(*) as fetches, SUM(record_count) as total_records, | |
| AVG(fetch_duration_ms) as avg_duration, SUM(data_size_bytes) as total_size | |
| FROM harvested_data | |
| WHERE status = 'success' | |
| GROUP BY api_name | |
| ''', conn) | |
| if not df_data.empty: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| # Records by API | |
| fig_records = px.bar( | |
| df_data, | |
| x='api_name', | |
| y='total_records', | |
| title="π Records Harvested by API", | |
| color='total_records', | |
| color_continuous_scale='viridis' | |
| ) | |
| fig_records.update_layout( | |
| paper_bgcolor="rgba(255,255,255,0.9)", | |
| plot_bgcolor="rgba(255,255,255,0.9)", | |
| font_color="#2c3e50" | |
| ) | |
| st.plotly_chart(fig_records, use_container_width=True) | |
| with col2: | |
| # Data size by API | |
| df_data['size_mb'] = df_data['total_size'] / 1024 / 1024 | |
| fig_size = px.pie( | |
| df_data, | |
| values='size_mb', | |
| names='api_name', | |
| title="πΎ Data Size Distribution (MB)" | |
| ) | |
| fig_size.update_layout( | |
| paper_bgcolor="rgba(255,255,255,0.9)", | |
| plot_bgcolor="rgba(255,255,255,0.9)", | |
| font_color="#2c3e50" | |
| ) | |
| st.plotly_chart(fig_size, use_container_width=True) | |
| # Performance metrics | |
| st.markdown("**β‘ Performance Metrics**") | |
| fig_perf = px.bar( | |
| df_data, | |
| x='api_name', | |
| y='avg_duration', | |
| title="β±οΈ Average Fetch Duration by API (ms)", | |
| color='avg_duration', | |
| color_continuous_scale='plasma' | |
| ) | |
| fig_perf.update_layout( | |
| paper_bgcolor="rgba(255,255,255,0.9)", | |
| plot_bgcolor="rgba(255,255,255,0.9)", | |
| font_color="#2c3e50" | |
| ) | |
| st.plotly_chart(fig_perf, use_container_width=True) | |
| else: | |
| st.info("No data available for analytics. Start harvesting first!") | |
| finally: | |
| conn.close() | |
| # Database Management Section | |
| with st.expander("ποΈ Database Management & Statistics", expanded=False): | |
| st.markdown("**Database Performance & Maintenance Tools**") | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| if st.button("π Get Database Stats", use_container_width=True): | |
| with st.spinner("Analyzing database..."): | |
| stats = get_database_stats() | |
| st.markdown("**Database Statistics:**") | |
| for key, value in stats.items(): | |
| formatted_key = key.replace('_', ' ').title() | |
| if 'size_mb' in key: | |
| st.metric(formatted_key, f"{value} MB") | |
| elif 'score' in key: | |
| st.metric(formatted_key, f"{value:.3f}") | |
| else: | |
| st.metric(formatted_key, value) | |
| with col2: | |
| if st.button("π§ Optimize Database", use_container_width=True): | |
| with st.spinner("Optimizing database..."): | |
| success = optimize_database() | |
| if success: | |
| st.success("β Database optimized successfully!") | |
| else: | |
| st.error("β Database optimization failed") | |
| with col3: | |
| if st.button("ποΈ Compress Old Data", use_container_width=True): | |
| with st.spinner("Compressing old data..."): | |
| compressed_count = compress_old_data(days_old=7) # Compress data older than 7 days | |
| if compressed_count > 0: | |
| st.success(f"β Compressed {compressed_count} old records") | |
| else: | |
| st.info("βΉοΈ No old data found to compress") | |
| with col4: | |
| if st.button("πΎ Create Backup", use_container_width=True): | |
| with st.spinner("Creating backup..."): | |
| backup_path = backup_database() | |
| if backup_path: | |
| st.success(f"β Backup created: {backup_path}") | |
| # Offer download | |
| try: | |
| with open(backup_path, 'rb') as f: | |
| st.download_button( | |
| label="β¬οΈ Download Backup", | |
| data=f.read(), | |
| file_name=backup_path, | |
| mime="application/x-sqlite3" | |
| ) | |
| except: | |
| pass | |
| else: | |
| st.error("β Backup creation failed") | |
| # Enhanced database insights | |
| st.markdown("---") | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| # Show recent activity summary | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("**π Recent Activity (Last 24h)**") | |
| df_recent = pd.read_sql_query(''' | |
| SELECT api_name, COUNT(*) as fetches, SUM(record_count) as records | |
| FROM harvested_data | |
| WHERE fetch_timestamp > datetime('now', '-1 day') | |
| GROUP BY api_name | |
| ORDER BY fetches DESC | |
| ''', conn) | |
| if not df_recent.empty: | |
| st.dataframe(df_recent, use_container_width=True) | |
| else: | |
| st.info("No recent activity") | |
| with col2: | |
| st.markdown("**π― Data Quality Overview**") | |
| df_quality = pd.read_sql_query(''' | |
| SELECT | |
| api_name, | |
| ROUND(AVG(quality_score), 3) as avg_quality, | |
| ROUND(AVG(health_score), 3) as avg_health, | |
| COUNT(*) as total_records | |
| FROM harvested_data | |
| WHERE status = 'success' AND quality_score > 0 | |
| GROUP BY api_name | |
| ORDER BY avg_quality DESC | |
| ''', conn) | |
| if not df_quality.empty: | |
| st.dataframe(df_quality, use_container_width=True) | |
| else: | |
| st.info("No quality data available") | |
| conn.close() | |
| except Exception as e: | |
| st.error(f"Database error: {e}") | |
| # Storage efficiency metrics | |
| st.markdown("**πΎ Storage Efficiency**") | |
| try: | |
| conn = sqlite3.connect(DB_PATH) | |
| cursor = conn.cursor() | |
| # Calculate compression ratios | |
| cursor.execute(''' | |
| SELECT | |
| COUNT(*) as total_records, | |
| COUNT(CASE WHEN raw_data_compressed IS NOT NULL THEN 1 END) as compressed_records, | |
| SUM(data_size_bytes) as total_original_size, | |
| SUM(CASE WHEN raw_data_compressed IS NOT NULL THEN raw_data_size ELSE data_size_bytes END) as effective_size | |
| FROM harvested_data | |
| ''') | |
| storage_stats = cursor.fetchone() | |
| if storage_stats and storage_stats[0] > 0: | |
| total_records, compressed_records, original_size, effective_size = storage_stats | |
| col1, col2, col3, col4 = st.columns(4) | |
| with col1: | |
| st.metric("Total Records", total_records) | |
| with col2: | |
| st.metric("Compressed Records", compressed_records) | |
| with col3: | |
| compression_ratio = 0 | |
| if original_size and effective_size: | |
| compression_ratio = (1 - effective_size / original_size) * 100 | |
| st.metric("Compression Ratio", f"{compression_ratio:.1f}%") | |
| with col4: | |
| space_saved = (original_size - effective_size) if original_size and effective_size else 0 | |
| space_saved_mb = space_saved / (1024 * 1024) | |
| st.metric("Space Saved", f"{space_saved_mb:.2f} MB") | |
| conn.close() | |
| except Exception as e: | |
| st.warning(f"Could not calculate storage metrics: {e}") | |
| # AI Enhancement Panel | |
| st.markdown("---") | |
| with st.expander("π€ AI Enhancement Status", expanded=False): | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.markdown("**π― Quality Assessment**") | |
| if ML_AVAILABLE and ai_quality_assessor and ai_quality_assessor.quality_model: | |
| st.success("β Active - DistilBERT") | |
| else: | |
| st.error("β Not Available") | |
| with col2: | |
| st.markdown("**π Semantic Analysis**") | |
| if ML_AVAILABLE and semantic_analyzer and semantic_analyzer.embeddings_model: | |
| st.success("β Active - MiniLM-L6-v2") | |
| else: | |
| st.error("β Not Available") | |
| with col3: | |
| st.markdown("**π Health Monitoring**") | |
| if health_monitor: | |
| st.success("β Active - Isolation Forest") | |
| else: | |
| st.error("β Not Available") | |
| if ML_AVAILABLE: | |
| st.info("π‘ AI models are loaded and ready for enhanced data analysis!") | |
| else: | |
| st.warning("β οΈ Install ML libraries (transformers, sentence-transformers) for AI features") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style="text-align: center; padding: 1rem; background: rgba(255,255,255,0.8); border-radius: 10px; color: #2c3e50;"> | |
| <p><strong>π Ultimate Data Harvester with AI</strong> - Professional data collection platform</p> | |
| <p style="font-size: 0.9rem; color: #7f8c8d;"> | |
| π Recursive endpoint discovery β’ π€ AI quality assessment β’ π― Session management β’ πΎ Smart database storage β’ π Real-time analytics | |
| </p> | |
| </div> | |
| """, unsafe_allow_html=True) |