Spaces:
Sleeping
Sleeping
| # app.py | |
| import gradio as gr | |
| import pandas as pd | |
| import io | |
| import os | |
| import google.generativeai as genai | |
| import gc | |
| import traceback | |
| from typing import Tuple, Optional | |
| # Load API key from secrets (don't put key in code) | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| if not GEMINI_API_KEY: | |
| raise ValueError("Gemini API key not set. Please add GEMINI_API_KEY in Space Secrets.") | |
| genai.configure(api_key=GEMINI_API_KEY) | |
| # session DataFrame (kept in memory for the session) | |
| session_df = None | |
| # ---------------- robust file-reading helper ---------------- | |
| def read_file_bytes_flexible(file) -> Tuple[Optional[bytes], Optional[str], Optional[str]]: | |
| """ | |
| Try many ways to extract raw bytes and filename from the uploaded object. | |
| Returns: (content_bytes | None, filename | None, error_message | None) | |
| """ | |
| if file is None: | |
| return None, None, "No file uploaded." | |
| # 1) If it's already raw bytes | |
| if isinstance(file, (bytes, bytearray)): | |
| return bytes(file), None, None | |
| # 2) If object has attribute 'bytes' (some wrappers do) | |
| try: | |
| b = getattr(file, "bytes", None) | |
| if isinstance(b, (bytes, bytearray)): | |
| # try name if available | |
| name = getattr(file, "name", None) or getattr(file, "filename", None) | |
| return bytes(b), name, None | |
| except Exception: | |
| pass | |
| # 3) If object has attribute 'read' and calling it works | |
| read_attr = getattr(file, "read", None) | |
| if callable(read_attr): | |
| try: | |
| content = read_attr() | |
| # some frameworks return coroutine for read() - handle it gracefully | |
| if hasattr(content, "__await__"): | |
| # can't await in sync; try file.file.read() below | |
| pass | |
| else: | |
| if isinstance(content, (bytes, bytearray)): | |
| name = getattr(file, "name", None) or getattr(file, "filename", None) | |
| return bytes(content), name, None | |
| # sometimes read() returns str (rare), turn to bytes | |
| if isinstance(content, str): | |
| return content.encode("utf-8"), getattr(file, "name", None), None | |
| except TypeError: | |
| # read() may require args or be not callable in this context | |
| pass | |
| except Exception: | |
| # ignore and try other ways | |
| pass | |
| # 4) If object has a .file attribute (like starlette UploadFile.file) | |
| try: | |
| attr_file = getattr(file, "file", None) | |
| if attr_file is not None and hasattr(attr_file, "read"): | |
| try: | |
| content = attr_file.read() | |
| if isinstance(content, (bytes, bytearray)): | |
| name = getattr(file, "name", None) or getattr(file, "filename", None) | |
| return bytes(content), name, None | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| # 5) If object is a dict-like (some environments) | |
| try: | |
| if isinstance(file, dict): | |
| # common keys | |
| for k in ("content", "data", "bytes", "file", "body"): | |
| v = file.get(k) | |
| if isinstance(v, (bytes, bytearray)): | |
| name = file.get("name") or file.get("filename") | |
| return bytes(v), name, None | |
| if isinstance(v, str) and os.path.exists(v): | |
| with open(v, "rb") as f: | |
| return f.read(), os.path.basename(v), None | |
| except Exception: | |
| pass | |
| # 6) Fallback: try attributes that might contain a path string | |
| try: | |
| for attr in ("name", "filename", "path"): | |
| val = getattr(file, attr, None) | |
| if isinstance(val, str) and os.path.exists(val): | |
| with open(val, "rb") as f: | |
| return f.read(), os.path.basename(val), None | |
| except Exception: | |
| pass | |
| # 7) Give up with a helpful error (include repr for debugging) | |
| try: | |
| rep = repr(file) | |
| except Exception: | |
| rep = "<unrepresentable object>" | |
| return None, None, f"Uploaded file format not supported by this server environment. Object repr: {rep}" | |
| # ---------------- load file to DataFrame ---------------- | |
| def load_file(file) -> Tuple[Optional[pd.DataFrame], str]: | |
| """ | |
| Returns (df or None, status_message). | |
| """ | |
| global session_df | |
| content, fname, err = read_file_bytes_flexible(file) | |
| if err: | |
| return None, f"Error reading file: {err}" | |
| if content is None: | |
| return None, "No bytes could be read from uploaded object." | |
| try: | |
| name = (fname or "").lower() | |
| # Quick heuristic: csv if filename endswith .csv or bytes contain commas/newlines in header | |
| if name.endswith(".csv") or (isinstance(content, (bytes, bytearray)) and b"," in content[:200]): | |
| df = pd.read_csv(io.BytesIO(content)) | |
| else: | |
| # assume excel by default | |
| df = pd.read_excel(io.BytesIO(content)) | |
| except Exception as e: | |
| # include traceback to help debug unusual formats (will show in UI only) | |
| tb = traceback.format_exc() | |
| return None, f"Error parsing file into DataFrame: {e}\n{tb}" | |
| finally: | |
| try: | |
| del content | |
| except Exception: | |
| pass | |
| gc.collect() | |
| session_df = df | |
| return df, f"File loaded: {df.shape[0]} rows x {df.shape[1]} columns." | |
| # ---------------- Gemini-powered question answering ---------------- | |
| def ask_question_gemini(query: str): | |
| """ | |
| Sends the user's query and a small preview to Gemini; expects back Python code that sets `result`. | |
| Executes the code in a controlled local environment. | |
| """ | |
| global session_df | |
| if session_df is None: | |
| return None, "Please upload and load a file first." | |
| # build prompt: include columns & small preview | |
| cols = list(session_df.columns) | |
| preview_csv = session_df.head(10).to_csv(index=False) | |
| prompt = f""" | |
| You are a helpful Python data analyst. The user uploaded a dataset with columns: {cols}. | |
| Here are the first 10 rows (CSV): | |
| {preview_csv} | |
| User question: {query} | |
| Return ONLY Python code (no explanations) that when executed will create a pandas DataFrame named `result` | |
| that contains the answer (a DataFrame, up to 200 rows). Use `df` as the variable for the dataset. | |
| Do not import libraries; assume pandas is available as pd. If you need to compute percentages, include them as columns. | |
| If the query asks for a single number, return it as a one-row DataFrame, e.g. pd.DataFrame({'value':[...]}). | |
| """ | |
| try: | |
| model = genai.GenerativeModel("gemini-pro") | |
| response = model.generate_content(prompt) | |
| code = response.text.strip("`\n ") | |
| except Exception as e: | |
| return None, f"Error calling Gemini: {e}" | |
| # Execute the code in a controlled namespace | |
| local_vars = {"pd": pd, "df": session_df.copy(), "result": None} | |
| try: | |
| exec(code, {}, local_vars) | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| return None, f"Error executing code returned by Gemini: {e}\nCode was:\n{code}\n\nTraceback:\n{tb}" | |
| result = local_vars.get("result", None) | |
| if isinstance(result, pd.DataFrame): | |
| # limit to 200 rows to avoid huge outputs | |
| return result.head(200), f"Success — executed Gemini code." | |
| else: | |
| # If not a DataFrame, try to wrap scalar into DF | |
| if isinstance(result, (int, float, str)): | |
| return pd.DataFrame({"value": [result]}), "Gemini returned a scalar; wrapped into DataFrame." | |
| return None, f"Gemini did not return a DataFrame. Code was:\n{code}" | |
| # ---------------- Gradio functions ---------------- | |
| def fn_load(file): | |
| df, msg = load_file(file) | |
| if df is None: | |
| return None, msg | |
| preview = df.head(5) | |
| return preview, msg | |
| def fn_ask(query): | |
| res, msg = ask_question_gemini(query) | |
| return res, msg | |
| def fn_clear(): | |
| global session_df | |
| session_df = None | |
| gc.collect() | |
| return ( | |
| gr.File.update(value=None), | |
| gr.Dataframe.update(value=None), | |
| gr.Textbox.update(value=""), | |
| gr.Textbox.update(value=""), | |
| ) | |
| # ---------------- UI ---------------- | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Chat-with-CSV — Gemini-powered (secure API key via Secrets)") | |
| with gr.Row(): | |
| file_input = gr.File(label="Upload CSV or XLSX (will not be saved)") | |
| load_btn = gr.Button("Load file") | |
| preview_table = gr.Dataframe(headers=None, label="Preview (first 5 rows)") | |
| file_status = gr.Textbox(label="File status") | |
| query_input = gr.Textbox(label="Ask a question (English)") | |
| ask_btn = gr.Button("Ask Gemini") | |
| result_table = gr.Dataframe(headers=None, label="Result") | |
| status = gr.Textbox(label="Status / Messages") | |
| clear_btn = gr.Button("Clear / Reset") | |
| load_btn.click(fn=fn_load, inputs=file_input, outputs=[preview_table, file_status]) | |
| ask_btn.click(fn=fn_ask, inputs=query_input, outputs=[result_table, status]) | |
| clear_btn.click(fn=fn_clear, outputs=[file_input, preview_table, query_input, result_table]) | |
| if __name__ == "__main__": | |
| demo.launch() | |