Spaces:
Sleeping
Sleeping
| import pandas as pd | |
| import gradio as gr | |
| import numpy as np | |
| import os | |
| from openai import OpenAI # requires OPENAI_API_KEY set as env var | |
| # 1. Configure OpenAI | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| MODEL_NAME = "gpt-5" | |
| # 2. Load Excel data | |
| # Load all sheets | |
| df1_all = pd.read_excel( | |
| os.path.join("data_source", "OC Onboarding Information.xlsx"), | |
| sheet_name=None, | |
| header=None # load raw to inspect | |
| ) | |
| df2_all = pd.read_excel( | |
| os.path.join("data_source", "The Alex Ideas Report.xlsx"), | |
| sheet_name=None, | |
| header=None # load raw to inspect | |
| ) | |
| # Handle special header cases | |
| df1 = {} | |
| for sheet, raw_df in df1_all.items(): | |
| if sheet == "Budget": | |
| # Load raw first | |
| budget_raw = pd.read_excel( | |
| os.path.join("data_source", "OC Onboarding Information.xlsx"), | |
| sheet_name="Budget", | |
| header=None | |
| ) | |
| # Part 1: detailed breakdown (rows 3β20 with headers at row 3) | |
| budget_details = pd.read_excel( | |
| os.path.join("data_source", "OC Onboarding Information.xlsx"), | |
| sheet_name="Budget", | |
| header=2, | |
| nrows=17 # rows 3β19 (inclusive) | |
| ) | |
| # Part 2: summary section (rows 21β33) | |
| headers = budget_raw.iloc[2, 4:].tolist() # row3, cols E+ as headers | |
| metrics = budget_raw.iloc[20:33, 1].tolist() # colB = row labels | |
| values = budget_raw.iloc[20:33, 4:].reset_index(drop=True) | |
| values.columns = headers | |
| values.insert(0, "Metric", metrics) | |
| budget_summary = values | |
| # Add into df1 dict | |
| df1["Budget_Details"] = budget_details | |
| df1["Budget_Summary"] = budget_summary | |
| elif sheet == "Rooms per category": | |
| # use row 4 as header | |
| df1[sheet] = pd.read_excel( | |
| os.path.join("data_source", "OC Onboarding Information.xlsx"), | |
| sheet_name=sheet, | |
| header=3 | |
| ) | |
| else: | |
| df1[sheet] = pd.read_excel( | |
| os.path.join("data_source", "OC Onboarding Information.xlsx"), | |
| sheet_name=sheet, | |
| header=1 | |
| ) | |
| df2 = {} | |
| for sheet, raw_df in df2_all.items(): | |
| if sheet == "Report Criteria": | |
| # use row 3 as header | |
| df2[sheet] = pd.read_excel( | |
| os.path.join("data_source", "The Alex Ideas Report.xlsx"), | |
| sheet_name=sheet, | |
| header=2 | |
| ) | |
| else: | |
| df2[sheet] = pd.read_excel( | |
| os.path.join("data_source", "The Alex Ideas Report.xlsx"), | |
| sheet_name=sheet | |
| ) | |
| # Build schema info for prompts | |
| def get_schema_info(): | |
| lines = ["Report 1 - OC Onboarding Information:"] | |
| for sheet, df in df1.items(): | |
| if sheet == "Budget_Details": | |
| lines.append("Sheet: Budget_Details (line-item breakdown by channel/segment)") | |
| lines.append(f"Columns: {list(df.columns)}") | |
| elif sheet == "Budget_Summary": | |
| lines.append("Sheet: Budget_Summary (metrics by month; first column = 'Metric')") | |
| lines.append(f"Metrics available: {df['Metric'].tolist()}") | |
| lines.append(f"Month columns: {[c for c in df.columns if c != 'Metric']}") | |
| else: | |
| lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}") | |
| # Add one example row for context | |
| try: | |
| sample = df.head(1).to_dict(orient="records")[0] | |
| lines.append(f"Example row: {sample}") | |
| except Exception: | |
| lines.append("Example row: [no data available]") | |
| lines.append("\nReport 2 - The Alex Ideas Report:") | |
| for sheet, df in df2.items(): | |
| lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}") | |
| try: | |
| sample = df.head(1).to_dict(orient="records")[0] | |
| lines.append(f"Example row: {sample}") | |
| except Exception: | |
| lines.append("Example row: [no data available]") | |
| return "\n".join(lines) | |
| schema_info = get_schema_info() | |
| print("Prompt size (chars):", len(schema_info)) | |
| # Helper to format results nicely | |
| def format_result(result): | |
| # Convert numpy scalars | |
| if isinstance(result, np.generic): | |
| return round(result.item(), 2) | |
| if isinstance(result, (int, float)): | |
| return round(result, 2) | |
| # Convert dicts into readable strings | |
| if isinstance(result, dict): | |
| return "\n".join([f"{k}: {format_result(v)}" for k, v in result.items()]) | |
| # Convert lists into comma-separated string | |
| if isinstance(result, list): | |
| return ", ".join(map(str, result)) | |
| if isinstance(result, pd.Series): | |
| return result.to_string() | |
| if isinstance(result, pd.DataFrame): | |
| return result.head().to_string(index=False) | |
| return str(result) | |
| # 3. Core function | |
| MAX_REQUESTS_PER_DAY = 100 # Set your desired limit | |
| USAGE_FILE = "openai_usage.txt" | |
| def get_usage_count(): | |
| if os.path.exists(USAGE_FILE): | |
| with open(USAGE_FILE, "r") as f: | |
| return int(f.read().strip()) | |
| return 0 | |
| def increment_usage_count(): | |
| count = get_usage_count() + 1 | |
| with open(USAGE_FILE, "w") as f: | |
| f.write(str(count)) | |
| def answer_question(history, message): | |
| # Define a restricted set of built-ins | |
| safe_builtins = { | |
| "abs": abs, | |
| "all": all, | |
| "any": any, | |
| "bool": bool, | |
| "dict": dict, | |
| "float": float, | |
| "int": int, | |
| "len": len, | |
| "list": list, | |
| "max": max, | |
| "min": min, | |
| "range": range, | |
| "str": str, | |
| "sum": sum, | |
| "round": round, | |
| "KeyError": KeyError, | |
| "ValueError": ValueError, | |
| "sorted": sorted, | |
| "enumerate": enumerate, | |
| "zip": zip, | |
| "set": set, | |
| "type": type, | |
| "isinstance": isinstance, | |
| "pd": pd, | |
| "np": np, | |
| "print": print, | |
| "Exception": Exception | |
| } | |
| if get_usage_count() >= MAX_REQUESTS_PER_DAY: | |
| return "OpenAI API usage limit reached for today." | |
| increment_usage_count() | |
| """ | |
| history: chat history (list of [user, assistant] pairs) | |
| message: latest user message (string) | |
| """ | |
| prompt = f""" | |
| You are a data analysis assistant. | |
| You can ONLY answer questions using the two Excel reports provided (df1 and df2). | |
| Do not hallucinate or use external knowledge. | |
| The reports are loaded as dictionaries of DataFrames: | |
| - Access Report 1 with df1['SheetName'] | |
| - Access Report 2 with df2['SheetName'] | |
| Do not reload Excel files with pandas. | |
| If unsure whether the question is relevant, try to reason using the available columns. | |
| If absolutely no relation to provided sheets, respond with: | |
| "I can only answer questions about the provided Excel reports." | |
| The reports are: | |
| - OC Onboarding Information (df1): planned hotel data including a budget set by the hotel for the year. | |
| - "Budget_Details": line-item breakdown (channels, segments, rates, rooms, revenue, ADR). | |
| - "Budget_Summary": pivot-style table with 'Metric' as the first column | |
| (e.g. Total, Occupancy %, RevPar, Capacity), and the other columns representing monthly values | |
| such as 'Jan Rooms', 'Jan Rooms Revenue', 'Jan ADR', 'Feb Rooms', etc. | |
| - The Alex Ideas Report (df2): actual hotel revenue and performance data. | |
| They have the following schema (sheet names, columns, and example rows): | |
| {schema_info} | |
| The user asked: | |
| {message} | |
| The history of the conversation is: | |
| {history} | |
| Rules: | |
| - Use ONLY pandas, df1, df2, and Python built-ins. | |
| - Do NOT write import statements (pandas is already imported as pd). | |
| - Access all dataframes ONLY as df1["SheetName"] or df2["SheetName"]. | |
| Never assign them to new variables (e.g. budget_df, property_df). | |
| - Do not create aliases for dataframes. Always reference them directly. | |
| - When using df1["Budget_Summary"], always filter by 'Metric' first, | |
| then select the appropriate month column. | |
| - ALWAYS put the final answer in a variable named `result`. | |
| - Return ONLY Python code, nothing else. | |
| - If multiple values are tied for the maximum, include all of them in a list. | |
| - If result is numeric, round to 2 decimal places. | |
| - If result is a list, return the full list (not just the first element). | |
| - If a required column or metric is missing, return a clear error string in `result` instead of crashing. | |
| - List of allowed functions: {list(safe_builtins.keys())} | |
| """ | |
| try: | |
| # Ask OpenAI | |
| response = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful assistant."}, | |
| {"role": "user", "content": prompt} | |
| ] | |
| ) | |
| code = response.choices[0].message.content.strip() | |
| # Strip markdown fences if any | |
| if code.startswith("```"): | |
| code = code.strip("```").replace("python", "").strip() | |
| # Debug info | |
| print("RAW GPT CODE:\n", code) | |
| print("Prompt tokens:", response.usage.prompt_tokens) | |
| print("Completion tokens:", response.usage.completion_tokens) | |
| print("Finish reason:", response.choices[0].finish_reason) | |
| # Remove dangerous or irrelevant lines | |
| safe_lines = [] | |
| for line in code.splitlines(): | |
| if line.strip().startswith("import"): | |
| continue | |
| if line.strip().startswith("!"): # e.g. shell commands | |
| continue | |
| safe_lines.append(line) | |
| safe_code = "\n".join(safe_lines) | |
| # Prepare sandbox | |
| local_vars = {"df1": df1, "df2": df2, "pd": pd} | |
| # Execute code | |
| safe_globals = {"__builtins__": safe_builtins, "df1": df1, "df2": df2, "pd": pd} | |
| local_vars = {} | |
| exec(safe_code, safe_globals, local_vars) | |
| # Fetch result | |
| result = local_vars.get("result", safe_globals.get("result", "No result produced")) | |
| return str(format_result(result)) | |
| except Exception as e: | |
| return f"Execution error: {e}" | |
| # 5. Gradio UI | |
| with gr.Blocks() as demo: | |
| gr.Markdown(""" | |
| # π¨ Excel Hotel Data Chatbot | |
| Ask any question about the two provided Excel reports. The assistant will analyze the hotel data and answer using the loaded sheets. | |
| """) | |
| chatbot = gr.Chatbot(height=500, show_copy_button=True) | |
| msg = gr.Textbox(placeholder="Ask me a question about the reports...") | |
| send_btn = gr.Button("Send") | |
| clear = gr.ClearButton([msg, chatbot]) | |
| def respond(message, chat_history): | |
| answer = answer_question(chat_history, message) | |
| chat_history.append((message, answer)) | |
| return "", chat_history | |
| msg.submit(respond, [msg, chatbot], [msg, chatbot]) | |
| send_btn.click(respond, [msg, chatbot], [msg, chatbot]) | |
| # 6. Run locally (Spaces will call demo.launch() automatically) | |
| if __name__ == "__main__": | |
| demo.launch() | |