Spaces:
Running
Running
| import os | |
| import gradio as gr | |
| import pandas as pd | |
| import plotly.express as px | |
| import re | |
| import io | |
| import subprocess | |
| # Ensure the script is executable | |
| os.system("chmod +x gpu_info_collector.sh") | |
| # ========================================== | |
| # 1. Define function to run the script | |
| # ========================================== | |
| def run_shell_script(secret_key): | |
| # Security check: Verify the secret key to prevent unauthorized execution. | |
| # Note: Set "RUN_KEY" in Space Settings -> Variables and secrets. | |
| expected_key = os.environ.get("RUN_KEY") | |
| if not expected_key: | |
| return "❌ Auth failed: RUN_KEY environment variable is not configured on the server!" | |
| if secret_key != expected_key: | |
| return "❌ Auth failed: Incorrect secret key!" | |
| print("Command received, starting script execution...") | |
| # Execute the .sh file | |
| try: | |
| result = subprocess.run( | |
| ["./gpu_info_collector.sh"], | |
| shell=True, | |
| capture_output=True, | |
| text=True | |
| ) | |
| log_output = f"Standard Output:\n{result.stdout}\n\nError Output:\n{result.stderr}" | |
| print(log_output) | |
| return f"✅ Script execution completed!\n{log_output}" | |
| except Exception as e: | |
| return f"⚠️ Execution error: {str(e)}" | |
| # ========================================== | |
| # 2. Data Reading Engine | |
| # ========================================== | |
| def clean_and_read_file(file_path): | |
| if not file_path or not os.path.exists(file_path): | |
| return pd.DataFrame() | |
| # --- Strategy A: Try reading as Excel --- | |
| try: | |
| df = pd.read_excel(file_path) | |
| return df | |
| except Exception: | |
| pass | |
| # --- Strategy B: Read as Text --- | |
| raw_data = b"" | |
| try: | |
| with open(file_path, 'rb') as f: | |
| raw_data = f.read() | |
| except Exception as e: | |
| print(f"File read error: {e}") | |
| return pd.DataFrame() | |
| # Decode | |
| content = "" | |
| for enc in ['utf-8', 'gb18030', 'gbk']: | |
| try: | |
| content = raw_data.decode(enc) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| if not content: | |
| content = raw_data.decode('utf-8', errors='replace') | |
| # --- Cleaning --- | |
| content = re.sub(r"\\", "", content) | |
| lines = content.splitlines() | |
| cleaned_lines = [] | |
| buffer = "" | |
| date_pattern = re.compile(r'^\s*202\d-\d{2}-\d{2}') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| is_header = "Date" in line and ("," in line) | |
| is_date_row = date_pattern.match(line) is not None | |
| if is_header or is_date_row: | |
| if buffer: | |
| cleaned_lines.append(buffer) | |
| buffer = line | |
| else: | |
| buffer += " " + line | |
| if buffer: | |
| cleaned_lines.append(buffer) | |
| csv_content = "\n".join(cleaned_lines) | |
| try: | |
| df = pd.read_csv(io.StringIO(csv_content)) | |
| except Exception: | |
| try: | |
| df = pd.read_csv(io.StringIO(csv_content), | |
| sep=None, | |
| engine='python') | |
| except Exception: | |
| return pd.DataFrame() | |
| return df | |
| # ========================================== | |
| # 3. Data Processing | |
| # ========================================== | |
| def process_gpu_data(df): | |
| if df.empty: | |
| return df | |
| df.columns = [str(c).strip() for c in df.columns] | |
| if 'Date' in df.columns: | |
| df['Date'] = pd.to_datetime(df['Date'], errors='coerce') | |
| def clean_currency(x): | |
| if isinstance(x, (int, float)): | |
| return float(x) | |
| if isinstance(x, str): | |
| match = re.search(r'(\d+\.?\d*)', x) | |
| return float(match.group(1)) if match else 0.0 | |
| return 0.0 | |
| target_col = None | |
| if 'Cloud Rent (/hr)' in df.columns: | |
| target_col = 'Cloud Rent (/hr)' | |
| else: | |
| for c in df.columns: | |
| if 'Rent' in c or '/hr' in c: | |
| target_col = c | |
| break | |
| if target_col: | |
| df['Rent_Price_Num'] = df[target_col].apply(clean_currency) | |
| return df | |
| def process_llm_data(df): | |
| if df.empty: | |
| return df | |
| df.columns = [str(c).strip() for c in df.columns] | |
| if 'Date' in df.columns: | |
| df['Date'] = pd.to_datetime(df['Date'], errors='coerce') | |
| return df | |
| # ========================================== | |
| # 4. Plotting Logic | |
| # ========================================== | |
| def plot_gpu_trends(df): | |
| if df is None or df.empty or 'Rent_Price_Num' not in df.columns: | |
| return None | |
| plot_df = df.dropna(subset=['Date', 'Rent_Price_Num']) | |
| if plot_df.empty: | |
| return None | |
| # Defensive fix: Prevent Index out of bounds if df columns are insufficient | |
| chip_col = 'Chip' if 'Chip' in df.columns else (df.columns[1] if len(df.columns) > 1 else None) | |
| fig = px.line(plot_df, | |
| x='Date', | |
| y='Rent_Price_Num', | |
| color=chip_col if chip_col in df.columns else None, | |
| title='GPU Cloud Rental Price Trends ($/hr)', | |
| labels={ | |
| 'Rent_Price_Num': 'Price ($/hr)', | |
| 'Date': 'Date' | |
| }, | |
| markers=True) | |
| return fig | |
| def plot_llm_trends(df): | |
| if df is None or df.empty: | |
| return None | |
| value_vars = [c for c in df.columns if c != 'Date'] | |
| if not value_vars: | |
| return None | |
| plot_df = df[['Date'] + value_vars].copy().dropna(subset=['Date']) | |
| df_long = plot_df.melt(id_vars=['Date'], var_name='Model', value_name='Price') | |
| fig = px.line( | |
| df_long, | |
| x='Date', | |
| y='Price', | |
| color='Model', | |
| title='LLM API Price Trends', | |
| labels={'Price': 'Price', 'Date': 'Date', 'Model': 'Model Type'}, | |
| markers=True | |
| ) | |
| return fig | |
| # ========================================== | |
| # 5. Gradio Interface | |
| # ========================================== | |
| DEFAULT_GPU_FILE = "gpu_price_history.csv" | |
| DEFAULT_LLM_FILE = "llm_price_trends.csv" | |
| def load_gpu_pipeline(): | |
| df = clean_and_read_file(DEFAULT_GPU_FILE) | |
| df = process_gpu_data(df) | |
| return df, plot_gpu_trends(df) | |
| def load_llm_pipeline(): | |
| df = clean_and_read_file(DEFAULT_LLM_FILE) | |
| df = process_llm_data(df) | |
| return df, plot_llm_trends(df) | |
| # --- UI Definition --- | |
| with gr.Blocks(title="AI Price Tracker") as demo: | |
| gr.Markdown("## 📊 AI Compute & Model Price Trends") | |
| with gr.Tabs(): | |
| # GPU Tab | |
| with gr.TabItem("GPU Prices"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gpu_plot = gr.Plot(label="Price Trend") | |
| with gr.Row(): | |
| with gr.Accordion("Data Preview", open=False): | |
| gpu_table = gr.DataFrame() | |
| # LLM Tab | |
| with gr.TabItem("LLM Prices"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| llm_plot = gr.Plot(label="Price Trend") | |
| with gr.Row(): | |
| with gr.Accordion("Data Preview", open=False): | |
| llm_table = gr.DataFrame() | |
| # Hidden components to expose the API safely without breaking UI | |
| api_input = gr.Textbox(visible=False) | |
| api_output = gr.Textbox(visible=False) | |
| api_trigger = gr.Button(visible=False) | |
| api_trigger.click( | |
| fn=run_shell_script, | |
| inputs=[api_input], | |
| outputs=[api_output], | |
| api_name="run_collector" | |
| ) | |
| # --- Initialization Logic --- | |
| def init_on_load(): | |
| g_df, g_fig = load_gpu_pipeline() | |
| l_df, l_fig = load_llm_pipeline() | |
| return g_fig, g_df, l_fig, l_df | |
| demo.load( | |
| init_on_load, | |
| inputs=None, | |
| outputs=[gpu_plot, gpu_table, llm_plot, llm_table] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(share=True) |