File size: 6,259 Bytes
7273bff
 
288f777
5ad7d73
 
 
 
 
c161698
5ad7d73
 
2c65316
7f64dba
 
 
 
 
2c65316
 
 
 
 
7f64dba
2c65316
7f64dba
 
 
 
 
 
 
 
 
 
 
 
 
 
5ad7d73
2c65316
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ad7d73
 
7f64dba
13a0a1c
7f64dba
 
 
 
13a0a1c
7f64dba
 
 
 
5ad7d73
 
 
288f777
 
 
 
7f64dba
 
 
288f777
 
 
 
 
 
7f64dba
 
 
 
 
288f777
5ad7d73
 
 
 
 
 
 
8f502b4
288f777
8f502b4
7f64dba
 
 
 
 
 
8f502b4
 
 
 
 
 
 
 
 
 
 
 
 
7f64dba
 
 
 
5ad7d73
 
414da23
8f502b4
5ad7d73
8f502b4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5ad7d73
 
1f650a0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8f502b4
1f650a0
8f502b4
 
 
288f777
5ad7d73
414da23
8f502b4
7273bff
5ad7d73
 
7273bff
5ad7d73
 
 
 
 
 
 
 
7273bff
5ad7d73
7273bff
5ad7d73
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import pandas as pd
import gradio as gr
import numpy as np
import os
import google.generativeai as genai  # requires GOOGLE_API_KEY set as env var

# 1. Configure Gemini
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
model = genai.GenerativeModel("gemini-2.5-pro")

# 2. Load Excel data
# Load all sheets
df1_all = pd.read_excel(
    os.path.join("data_source", "OC Onboarding Information.xlsx"),
    sheet_name=None,
    header=None  # load raw to inspect
)
df2_all = pd.read_excel(
    os.path.join("data_source", "The Alex Ideas Report.xlsx"),
    sheet_name=None,
    header=None  # load raw to inspect
)

# Handle special header cases
df1 = {}
for sheet, raw_df in df1_all.items():
    if sheet == "PY Event Diary":
        # use row 2 as header
        df1[sheet] = pd.read_excel(
            os.path.join("data_source", "OC Onboarding Information.xlsx"),
            sheet_name=sheet,
            header=1
        )
    else:
        df1[sheet] = pd.read_excel(
            os.path.join("data_source", "OC Onboarding Information.xlsx"),
            sheet_name=sheet
        )

df2 = {}
for sheet, raw_df in df2_all.items():
    if sheet == "Report Criteria":
        # use row 3 as header
        df2[sheet] = pd.read_excel(
            os.path.join("data_source", "The Alex Ideas Report.xlsx"),
            sheet_name=sheet,
            header=2
        )
    else:
        df2[sheet] = pd.read_excel(
            os.path.join("data_source", "The Alex Ideas Report.xlsx"),
            sheet_name=sheet
        )

# Build schema info for prompts
def get_schema_info():
    lines = ["Report 1 - OC Onboarding Information:"]
    for sheet, df in df1.items():
        lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}")
        sample = df.head(1).to_dict(orient="records")[0]
        lines.append(f"Example row: {sample}")
    lines.append("\nReport 2 - The Alex Ideas Report:")
    for sheet, df in df2.items():
        lines.append(f"Sheet: {sheet}, Columns: {list(df.columns)}")
        sample = df.head(1).to_dict(orient="records")[0]
        lines.append(f"Example row: {sample}")
    return "\n".join(lines)

schema_info = get_schema_info()

# Helper to format results nicely
def format_result(result):
    # Convert numpy scalars
    if isinstance(result, np.generic):
        return round(result.item(), 2)
    if isinstance(result, (int, float)):
        return round(result, 2)
    # Convert dicts into readable strings
    if isinstance(result, dict):
        return "\n".join([f"{k}: {format_result(v)}" for k, v in result.items()])
    # Convert lists into comma-separated string
    if isinstance(result, list):
        return ", ".join(map(str, result))
    if isinstance(result, pd.Series):
        return result.to_string()
    if isinstance(result, pd.DataFrame):
        return result.head().to_string(index=False)
    return str(result)

# 3. Core function
def answer_question(history, message):
    """
    history: chat history (list of [user, assistant] pairs)
    message: latest user message (string)
    """
    prompt = f"""
You are a data analysis assistant. 
You can ONLY answer questions using the two Excel reports provided (df1 and df2).
Do not hallucinate or use external knowledge.
The reports are loaded as dictionaries of DataFrames:
- Access Report 1 with df1['SheetName']
- Access Report 2 with df2['SheetName']
Do not reload Excel files with pandas.
If unsure is the question relevant, try to reason using columns available.
If absolutely no relation to provided sheets, respond with:
"I can only answer questions about the provided Excel reports."

The reports have the following schema:
{schema_info}

The user asked:
{message}

Rules:
- Use only pandas, df1, df2, and Python built-ins.
- Do NOT write import statements (pandas is already imported as pd).
- Always put the answer in a variable named `result`.
- Return ONLY Python code, nothing else.
- If multiple values are tied for the maximum, include all of them in a list.
- If result is numeric, round to 2 decimal places.
- If result is a list, return the full list (not just the first element).
- If a column is missing, return a clear error string in `result`, do not crash.
    """

    try:
        # Ask Gemini
        response = model.generate_content(prompt)
        code = response.text.strip()

        # Strip markdown fences if any
        if code.startswith("```"):
            code = code.strip("```").replace("python", "").strip()

        # Remove dangerous or irrelevant lines
        safe_lines = []
        for line in code.splitlines():
            if line.strip().startswith("import"):
                continue
            if line.strip().startswith("!"):  # e.g. shell commands
                continue
            safe_lines.append(line)
        safe_code = "\n".join(safe_lines)

        # Prepare sandbox
        local_vars = {"df1": df1, "df2": df2, "pd": pd}

        # Define a restricted set of built-ins
        safe_builtins = {
            "abs": abs,
            "all": all,
            "any": any,
            "bool": bool,
            "dict": dict,
            "float": float,
            "int": int,
            "len": len,
            "list": list,
            "max": max,
            "min": min,
            "range": range,
            "str": str,
            "sum": sum,
            "round": round,
            "KeyError": KeyError,
            "ValueError": ValueError,
        }

        # Execute code
        exec(safe_code, {"__builtins__": safe_builtins}, local_vars)

        # Fetch result
        result = local_vars.get("result", "No result produced")
        return str(format_result(result))

    except Exception as e:
        return f"Execution error: {e}"


# 5. Gradio UI
with gr.Blocks() as demo:
    chatbot = gr.Chatbot()
    msg = gr.Textbox(placeholder="Ask me a question about the reports...")
    clear = gr.ClearButton([msg, chatbot])

    def respond(message, chat_history):
        answer = answer_question(chat_history, message)
        chat_history.append((message, answer))
        return "", chat_history

    msg.submit(respond, [msg, chatbot], [msg, chatbot])

# 6. Run locally (Spaces will call demo.launch() automatically)
if __name__ == "__main__":
    demo.launch()