# app.py import gradio as gr import pandas as pd import io import os import google.generativeai as genai import gc import traceback from typing import Tuple, Optional # Load API key from secrets (don't put key in code) GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") if not GEMINI_API_KEY: raise ValueError("Gemini API key not set. Please add GEMINI_API_KEY in Space Secrets.") genai.configure(api_key=GEMINI_API_KEY) # session DataFrame (kept in memory for the session) session_df = None # ---------------- robust file-reading helper ---------------- def read_file_bytes_flexible(file) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: """ Try many ways to extract raw bytes and filename from the uploaded object. Returns: (content_bytes | None, filename | None, error_message | None) """ if file is None: return None, None, "No file uploaded." # 1) If it's already raw bytes if isinstance(file, (bytes, bytearray)): return bytes(file), None, None # 2) If object has attribute 'bytes' (some wrappers do) try: b = getattr(file, "bytes", None) if isinstance(b, (bytes, bytearray)): # try name if available name = getattr(file, "name", None) or getattr(file, "filename", None) return bytes(b), name, None except Exception: pass # 3) If object has attribute 'read' and calling it works read_attr = getattr(file, "read", None) if callable(read_attr): try: content = read_attr() # some frameworks return coroutine for read() - handle it gracefully if hasattr(content, "__await__"): # can't await in sync; try file.file.read() below pass else: if isinstance(content, (bytes, bytearray)): name = getattr(file, "name", None) or getattr(file, "filename", None) return bytes(content), name, None # sometimes read() returns str (rare), turn to bytes if isinstance(content, str): return content.encode("utf-8"), getattr(file, "name", None), None except TypeError: # read() may require args or be not callable in this context pass except Exception: # ignore and try other ways pass # 4) If object has a .file attribute (like starlette UploadFile.file) try: attr_file = getattr(file, "file", None) if attr_file is not None and hasattr(attr_file, "read"): try: content = attr_file.read() if isinstance(content, (bytes, bytearray)): name = getattr(file, "name", None) or getattr(file, "filename", None) return bytes(content), name, None except Exception: pass except Exception: pass # 5) If object is a dict-like (some environments) try: if isinstance(file, dict): # common keys for k in ("content", "data", "bytes", "file", "body"): v = file.get(k) if isinstance(v, (bytes, bytearray)): name = file.get("name") or file.get("filename") return bytes(v), name, None if isinstance(v, str) and os.path.exists(v): with open(v, "rb") as f: return f.read(), os.path.basename(v), None except Exception: pass # 6) Fallback: try attributes that might contain a path string try: for attr in ("name", "filename", "path"): val = getattr(file, attr, None) if isinstance(val, str) and os.path.exists(val): with open(val, "rb") as f: return f.read(), os.path.basename(val), None except Exception: pass # 7) Give up with a helpful error (include repr for debugging) try: rep = repr(file) except Exception: rep = "" return None, None, f"Uploaded file format not supported by this server environment. Object repr: {rep}" # ---------------- load file to DataFrame ---------------- def load_file(file) -> Tuple[Optional[pd.DataFrame], str]: """ Returns (df or None, status_message). """ global session_df content, fname, err = read_file_bytes_flexible(file) if err: return None, f"Error reading file: {err}" if content is None: return None, "No bytes could be read from uploaded object." try: name = (fname or "").lower() # Quick heuristic: csv if filename endswith .csv or bytes contain commas/newlines in header if name.endswith(".csv") or (isinstance(content, (bytes, bytearray)) and b"," in content[:200]): df = pd.read_csv(io.BytesIO(content)) else: # assume excel by default df = pd.read_excel(io.BytesIO(content)) except Exception as e: # include traceback to help debug unusual formats (will show in UI only) tb = traceback.format_exc() return None, f"Error parsing file into DataFrame: {e}\n{tb}" finally: try: del content except Exception: pass gc.collect() session_df = df return df, f"File loaded: {df.shape[0]} rows x {df.shape[1]} columns." # ---------------- Gemini-powered question answering ---------------- def ask_question_gemini(query: str): """ Sends the user's query and a small preview to Gemini; expects back Python code that sets `result`. Executes the code in a controlled local environment. """ global session_df if session_df is None: return None, "Please upload and load a file first." # build prompt: include columns & small preview cols = list(session_df.columns) preview_csv = session_df.head(10).to_csv(index=False) prompt = f""" You are a helpful Python data analyst. The user uploaded a dataset with columns: {cols}. Here are the first 10 rows (CSV): {preview_csv} User question: {query} Return ONLY Python code (no explanations) that when executed will create a pandas DataFrame named `result` that contains the answer (a DataFrame, up to 200 rows). Use `df` as the variable for the dataset. Do not import libraries; assume pandas is available as pd. If you need to compute percentages, include them as columns. If the query asks for a single number, return it as a one-row DataFrame, e.g. pd.DataFrame({'value':[...]}). """ try: model = genai.GenerativeModel("gemini-pro") response = model.generate_content(prompt) code = response.text.strip("`\n ") except Exception as e: return None, f"Error calling Gemini: {e}" # Execute the code in a controlled namespace local_vars = {"pd": pd, "df": session_df.copy(), "result": None} try: exec(code, {}, local_vars) except Exception as e: tb = traceback.format_exc() return None, f"Error executing code returned by Gemini: {e}\nCode was:\n{code}\n\nTraceback:\n{tb}" result = local_vars.get("result", None) if isinstance(result, pd.DataFrame): # limit to 200 rows to avoid huge outputs return result.head(200), f"Success — executed Gemini code." else: # If not a DataFrame, try to wrap scalar into DF if isinstance(result, (int, float, str)): return pd.DataFrame({"value": [result]}), "Gemini returned a scalar; wrapped into DataFrame." return None, f"Gemini did not return a DataFrame. Code was:\n{code}" # ---------------- Gradio functions ---------------- def fn_load(file): df, msg = load_file(file) if df is None: return None, msg preview = df.head(5) return preview, msg def fn_ask(query): res, msg = ask_question_gemini(query) return res, msg def fn_clear(): global session_df session_df = None gc.collect() return ( gr.File.update(value=None), gr.Dataframe.update(value=None), gr.Textbox.update(value=""), gr.Textbox.update(value=""), ) # ---------------- UI ---------------- with gr.Blocks() as demo: gr.Markdown("# Chat-with-CSV — Gemini-powered (secure API key via Secrets)") with gr.Row(): file_input = gr.File(label="Upload CSV or XLSX (will not be saved)") load_btn = gr.Button("Load file") preview_table = gr.Dataframe(headers=None, label="Preview (first 5 rows)") file_status = gr.Textbox(label="File status") query_input = gr.Textbox(label="Ask a question (English)") ask_btn = gr.Button("Ask Gemini") result_table = gr.Dataframe(headers=None, label="Result") status = gr.Textbox(label="Status / Messages") clear_btn = gr.Button("Clear / Reset") load_btn.click(fn=fn_load, inputs=file_input, outputs=[preview_table, file_status]) ask_btn.click(fn=fn_ask, inputs=query_input, outputs=[result_table, status]) clear_btn.click(fn=fn_clear, outputs=[file_input, preview_table, query_input, result_table]) if __name__ == "__main__": demo.launch()