import pandas as pd import gradio as gr import numpy as np import os from openai import OpenAI # requires OPENAI_API_KEY set as env var # 1. Configure OpenAI client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) MODEL_NAME = "gpt-5" # 2. Load Excel data # Load all sheets df1_all = pd.read_excel( os.path.join("data_source", "OC Onboarding Information.xlsx"), sheet_name=None, header=None # load raw to inspect ) df2_all = pd.read_excel( os.path.join("data_source", "The Alex Ideas Report.xlsx"), sheet_name=None, header=None # load raw to inspect ) # Handle special header cases df1 = {} for sheet, raw_df in df1_all.items(): if sheet == "Budget": # Load raw first budget_raw = pd.read_excel( os.path.join("data_source", "OC Onboarding Information.xlsx"), sheet_name="Budget", header=None ) # Part 1: detailed breakdown (rows 3–20 with headers at row 3) budget_details = pd.read_excel( os.path.join("data_source", "OC Onboarding Information.xlsx"), sheet_name="Budget", header=2, nrows=17 # rows 3–19 (inclusive) ) # Part 2: summary section (rows 21–33) headers = budget_raw.iloc[2, 4:].tolist() # row3, cols E+ as headers metrics = budget_raw.iloc[20:33, 1].tolist() # colB = row labels values = budget_raw.iloc[20:33, 4:].reset_index(drop=True) values.columns = headers values.insert(0, "Metric", metrics) budget_summary = values # Add into df1 dict df1["Budget_Details"] = budget_details df1["Budget_Summary"] = budget_summary elif sheet == "Rooms per category": # use row 4 as header df1[sheet] = pd.read_excel( os.path.join("data_source", "OC Onboarding Information.xlsx"), sheet_name=sheet, header=3 ) else: df1[sheet] = pd.read_excel( os.path.join("data_source", "OC Onboarding Information.xlsx"), sheet_name=sheet, header=1 ) df2 = {} for sheet, raw_df in df2_all.items(): if sheet == "Report Criteria": # use row 3 as header df2[sheet] = pd.read_excel( os.path.join("data_source", "The Alex Ideas Report.xlsx"), sheet_name=sheet, header=2 ) else: df2[sheet] = pd.read_excel( os.path.join("data_source", "The Alex Ideas Report.xlsx"), sheet_name=sheet ) # Build schema info for prompts def get_schema_info(): lines = ["Report 1 - OC Onboarding Information:"] for sheet, df in df1.items(): if sheet == "Budget_Details": lines.append("Sheet: Budget_Details (line-item breakdown by channel/segment)") lines.append(f"Columns: {list(df.columns)}") elif sheet == "Budget_Summary": lines.append("Sheet: Budget_Summary (metrics by month; first column = 'Metric')") lines.append(f"Metrics available: {df['Metric'].tolist()}") lines.append(f"Month columns: {[c for c in df.columns if c != 'Metric']}") else: lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}") # Add one example row for context try: sample = df.head(1).to_dict(orient="records")[0] lines.append(f"Example row: {sample}") except Exception: lines.append("Example row: [no data available]") lines.append("\nReport 2 - The Alex Ideas Report:") for sheet, df in df2.items(): lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}") try: sample = df.head(1).to_dict(orient="records")[0] lines.append(f"Example row: {sample}") except Exception: lines.append("Example row: [no data available]") return "\n".join(lines) schema_info = get_schema_info() print("Prompt size (chars):", len(schema_info)) # Helper to format results nicely def format_result(result): # Convert numpy scalars if isinstance(result, np.generic): return round(result.item(), 2) if isinstance(result, (int, float)): return round(result, 2) # Convert dicts into readable strings if isinstance(result, dict): return "\n".join([f"{k}: {format_result(v)}" for k, v in result.items()]) # Convert lists into comma-separated string if isinstance(result, list): return ", ".join(map(str, result)) if isinstance(result, pd.Series): return result.to_string() if isinstance(result, pd.DataFrame): return result.head().to_string(index=False) return str(result) # 3. Core function MAX_REQUESTS_PER_DAY = 100 # Set your desired limit USAGE_FILE = "openai_usage.txt" def get_usage_count(): if os.path.exists(USAGE_FILE): with open(USAGE_FILE, "r") as f: return int(f.read().strip()) return 0 def increment_usage_count(): count = get_usage_count() + 1 with open(USAGE_FILE, "w") as f: f.write(str(count)) def answer_question(history, message): # Define a restricted set of built-ins safe_builtins = { "abs": abs, "all": all, "any": any, "bool": bool, "dict": dict, "float": float, "int": int, "len": len, "list": list, "max": max, "min": min, "range": range, "str": str, "sum": sum, "round": round, "KeyError": KeyError, "ValueError": ValueError, "sorted": sorted, "enumerate": enumerate, "zip": zip, "set": set, "type": type, "isinstance": isinstance, "pd": pd, "np": np, "print": print, "Exception": Exception } if get_usage_count() >= MAX_REQUESTS_PER_DAY: return "OpenAI API usage limit reached for today." increment_usage_count() """ history: chat history (list of [user, assistant] pairs) message: latest user message (string) """ prompt = f""" You are a data analysis assistant. You can ONLY answer questions using the two Excel reports provided (df1 and df2). Do not hallucinate or use external knowledge. The reports are loaded as dictionaries of DataFrames: - Access Report 1 with df1['SheetName'] - Access Report 2 with df2['SheetName'] Do not reload Excel files with pandas. If unsure whether the question is relevant, try to reason using the available columns. If absolutely no relation to provided sheets, respond with: "I can only answer questions about the provided Excel reports." The reports are: - OC Onboarding Information (df1): planned hotel data including a budget set by the hotel for the year. - "Budget_Details": line-item breakdown (channels, segments, rates, rooms, revenue, ADR). - "Budget_Summary": pivot-style table with 'Metric' as the first column (e.g. Total, Occupancy %, RevPar, Capacity), and the other columns representing monthly values such as 'Jan Rooms', 'Jan Rooms Revenue', 'Jan ADR', 'Feb Rooms', etc. - The Alex Ideas Report (df2): actual hotel revenue and performance data. They have the following schema (sheet names, columns, and example rows): {schema_info} The user asked: {message} The history of the conversation is: {history} Rules: - Use ONLY pandas, df1, df2, and Python built-ins. - Do NOT write import statements (pandas is already imported as pd). - Access all dataframes ONLY as df1["SheetName"] or df2["SheetName"]. Never assign them to new variables (e.g. budget_df, property_df). - Do not create aliases for dataframes. Always reference them directly. - When using df1["Budget_Summary"], always filter by 'Metric' first, then select the appropriate month column. - ALWAYS put the final answer in a variable named `result`. - Return ONLY Python code, nothing else. - If multiple values are tied for the maximum, include all of them in a list. - If result is numeric, round to 2 decimal places. - If result is a list, return the full list (not just the first element). - If a required column or metric is missing, return a clear error string in `result` instead of crashing. - List of allowed functions: {list(safe_builtins.keys())} """ try: # Ask OpenAI response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt} ] ) code = response.choices[0].message.content.strip() # Strip markdown fences if any if code.startswith("```"): code = code.strip("```").replace("python", "").strip() # Debug info print("RAW GPT CODE:\n", code) print("Prompt tokens:", response.usage.prompt_tokens) print("Completion tokens:", response.usage.completion_tokens) print("Finish reason:", response.choices[0].finish_reason) # Remove dangerous or irrelevant lines safe_lines = [] for line in code.splitlines(): if line.strip().startswith("import"): continue if line.strip().startswith("!"): # e.g. shell commands continue safe_lines.append(line) safe_code = "\n".join(safe_lines) # Prepare sandbox local_vars = {"df1": df1, "df2": df2, "pd": pd} # Execute code safe_globals = {"__builtins__": safe_builtins, "df1": df1, "df2": df2, "pd": pd} local_vars = {} exec(safe_code, safe_globals, local_vars) # Fetch result result = local_vars.get("result", safe_globals.get("result", "No result produced")) return str(format_result(result)) except Exception as e: return f"Execution error: {e}" # 5. Gradio UI with gr.Blocks() as demo: gr.Markdown(""" # 🏨 Excel Hotel Data Chatbot Ask any question about the two provided Excel reports. The assistant will analyze the hotel data and answer using the loaded sheets. """) chatbot = gr.Chatbot(height=500, show_copy_button=True) msg = gr.Textbox(placeholder="Ask me a question about the reports...") send_btn = gr.Button("Send") clear = gr.ClearButton([msg, chatbot]) def respond(message, chat_history): answer = answer_question(chat_history, message) chat_history.append((message, answer)) return "", chat_history msg.submit(respond, [msg, chatbot], [msg, chatbot]) send_btn.click(respond, [msg, chatbot], [msg, chatbot]) # 6. Run locally (Spaces will call demo.launch() automatically) if __name__ == "__main__": demo.launch()