import os import requests import gradio as gr from google import genai from typing import List, Dict, Any, Optional from supabase import create_client, Client import jwt from datetime import datetime, timedelta # --- Configuration --- GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY") JWT_SECRET = os.getenv("JWT_SECRET", "a-strong-secret-key-for-local-testing") # --- External API Config --- QUANTUM_API_BASE_URL = os.getenv("QUANTUM_API_BASE_URL", "https://virtserver.swaggerhub.com/JOCALL3_1/jamesburvel/1.0") # CHANGED: The API key is now optional. It will be None if the secret is not set. QUANTUM_API_KEY = os.getenv("QUANTUM_API_KEY") # --- Initialize Supabase Admin Client --- try: supabase_admin: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY) print("Successfully created Supabase admin client.") except Exception as e: supabase_admin = None print(f"Warning: Supabase service key invalid. User authentication will fail. Error: {e}") # --- Real API Client for the "Quantum Core 3.0" Service --- class QuantumAPIClient: # CHANGED: The __init__ method is updated to handle an optional API key. def __init__(self, base_url: str, api_key: Optional[str] = None): self.base_url = base_url self.headers = { "Content-Type": "application/json", "Accept": "application/json" } if api_key: print("Quantum API Key found, adding Authorization header.") self.headers["Authorization"] = f"Bearer {api_key}" else: print("No Quantum API Key found, making public API calls.") def _get(self, endpoint: str, params: Dict[str, Any] = None): try: response = requests.get(f"{self.base_url}{endpoint}", headers=self.headers, params=params, timeout=20) response.raise_for_status() return response.json() except Exception as e: return {"error": f"API call failed: {e}"} def _post(self, endpoint: str, data: Dict[str, Any]): try: response = requests.post(f"{self.base_url}{endpoint}", headers=self.headers, json=data, timeout=20) response.raise_for_status() return response.json() except Exception as e: return {"error": f"API call failed: {e}"} quantum_client = QuantumAPIClient(base_url=QUANTUM_API_BASE_URL, api_key=QUANTUM_API_KEY) # --- Tool Definitions (Internal Memory + External API) --- def get_ai_advisor_chat_history(supabase_user_client: Client, user_id: str): print(f"TOOL CALL: get_ai_advisor_chat_history for user '{user_id}' from Supabase") try: response = supabase_user_client.table('chat_history').select("*").eq('user_id', user_id).order('timestamp').limit(50).execute() return response.data except Exception as e: return {"error": f"Database query failed: {str(e)}"} def save_chat_turn(supabase_user_client: Client, user_id: str, session_id: str, user_message: str, assistant_message: str): try: supabase_user_client.table('chat_history').insert([ {"user_id": user_id, "session_id": session_id, "role": "user", "content": user_message}, {"user_id": user_id, "session_id": session_id, "role": "assistant", "content": assistant_message} ]).execute() except Exception as e: print(f"Error saving chat turn: {e}") def run_standard_financial_simulation(prompt: str, duration_years: int = 5): """Submits a 'What-If' scenario to the Quantum Oracle AI for standard financial impact analysis.""" print(f"TOOL CALL: run_standard_financial_simulation with prompt: '{prompt}'") payload = {"prompt": prompt, "parameters": {"durationYears": duration_years}} return quantum_client._post("/ai/oracle/simulate", data=payload) def generate_video_ad(prompt: str, style: str = "Cinematic", length_seconds: int = 15): """Submits a request to generate a high-quality video ad using the Veo 2.0 generative AI model.""" print(f"TOOL CALL: generate_video_ad with prompt: '{prompt}'") payload = {"prompt": prompt, "style": style, "lengthSeconds": length_seconds} return quantum_client._post("/ai/ads/generate", data=payload) def list_available_ai_tools(): """Retrieves a list of all integrated AI tools that Quantum can invoke.""" print("TOOL CALL: list_available_ai_tools -> GET /ai/advisor/tools") return quantum_client._get("/ai/advisor/tools") # --- Core Agent & Auth Logic --- def perform_login(user_id: str): if not supabase_admin: raise Exception("Auth service not configured.") payload = { "sub": user_id, "aud": "authenticated", "exp": datetime.utcnow() + timedelta(hours=1), "role": "authenticated" } return jwt.encode(payload, JWT_SECRET, algorithm="HS256") def run_agent_logic(prompt: str, user_id: str, session_id: str, token: str): try: decoded_token = jwt.decode(token, JWT_SECRET, algorithms=["HS256"], audience="authenticated") if not decoded_token.get("sub") or decoded_token.get("sub") != user_id: raise Exception("Invalid token or user mismatch.") supabase_user_client = create_client(SUPABASE_URL, SUPABASE_KEY) supabase_user_client.auth.set_session(access_token=token, refresh_token=token) except jwt.PyJWTError as e: raise Exception(f"Invalid token: {e}") def get_my_chat_history(): """Fetches my most recent conversation history to provide context.""" return get_ai_advisor_chat_history(supabase_user_client, user_id) all_tools = [ get_my_chat_history, run_standard_financial_simulation, generate_video_ad, list_available_ai_tools, ] try: genai.configure(api_key=GOOGLE_API_KEY) history_data = get_my_chat_history() conversation_history = [] if isinstance(history_data, list): for turn in history_data: role = "user" if turn['role'] == 'user' else "model" conversation_history.append({'role': role, 'parts': [{'text': turn['content']}]}) conversation_history.append({'role': 'user', 'parts': [{'text': prompt}]}) model = genai.GenerativeModel('gemini-pro') response = model.generate_content( conversation_history, tools=all_tools, safety_settings={'HARM_CATEGORY_DANGEROUS_CONTENT': 'BLOCK_NONE', 'HARM_CATEGORY_HARASSMENT': 'BLOCK_NONE', 'HARM_CATEGORY_HATE_SPEECH': 'BLOCK_NONE', 'HARM_CATEGORY_SEXUALLY_EXPLICIT': 'BLOCK_NONE'} ) final_response_text = response.text save_chat_turn(supabase_user_client, user_id, session_id, prompt, final_response_text) return final_response_text except Exception as e: print(f"Error in agent invocation: {e}") return f"An error occurred: {e}" # --- Gradio UI (Pure Gradio, No FastAPI) --- with gr.Blocks() as demo: gr.Markdown("# 💎 Secure, Multi-User AI Agent (Full API)") gr.Markdown("Log in with a User ID. The agent can fetch chat history (from Supabase) and call external APIs for simulations or ads.") user_id_state = gr.State("") token_state = gr.State("") session_id_state = gr.State("session-" + os.urandom(8).hex()) with gr.Row(): user_id_input = gr.Textbox(label="Enter User ID to Log In", placeholder="e.g., user-a") login_button = gr.Button("Login") login_status = gr.Markdown("") chatbot = gr.Chatbot(label="Agent Chat", visible=False, height=500) msg_input = gr.Textbox(label="Your Message", visible=False, show_label=False, placeholder="Type your message here...") def login_fn(user_id): if not user_id: return {login_status: gr.update(value="
Please enter a User ID.
")} try: token = perform_login(user_id) status_html = f"Logged in as: {user_id}. You can start chatting.
" return { login_status: gr.update(value=status_html), user_id_state: user_id, token_state: token, chatbot: gr.update(visible=True), msg_input: gr.update(visible=True), } except Exception as e: return {login_status: gr.update(value=f"Login failed: {e}
")} def chat_fn(message, chat_history, user_id, token, session_id): if not token: chat_history.append((message, "Error: You are not logged in.")) return "", chat_history bot_message = run_agent_logic(message, user_id, session_id, token) chat_history.append((message, bot_message)) return "", chat_history login_button.click(login_fn, inputs=[user_id_input], outputs=[login_status, user_id_state, token_state, chatbot, msg_input]) msg_input.submit(chat_fn, [msg_input, chatbot, user_id_state, token_state, session_id_state], [msg_input, chatbot]) # The standard way to launch a Gradio app on Hugging Face demo.launch()