aippk / app.py
admin08077's picture
Update app.py
ae36ed6 verified
import os
import requests
import gradio as gr
from google import genai
from typing import List, Dict, Any, Optional
from supabase import create_client, Client
import jwt
from datetime import datetime, timedelta
# --- Configuration ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
SUPABASE_SERVICE_KEY = os.getenv("SUPABASE_SERVICE_KEY")
JWT_SECRET = os.getenv("JWT_SECRET", "a-strong-secret-key-for-local-testing")
# --- External API Config ---
QUANTUM_API_BASE_URL = os.getenv("QUANTUM_API_BASE_URL", "https://virtserver.swaggerhub.com/JOCALL3_1/jamesburvel/1.0")
# CHANGED: The API key is now optional. It will be None if the secret is not set.
QUANTUM_API_KEY = os.getenv("QUANTUM_API_KEY")
# --- Initialize Supabase Admin Client ---
try:
supabase_admin: Client = create_client(SUPABASE_URL, SUPABASE_SERVICE_KEY)
print("Successfully created Supabase admin client.")
except Exception as e:
supabase_admin = None
print(f"Warning: Supabase service key invalid. User authentication will fail. Error: {e}")
# --- Real API Client for the "Quantum Core 3.0" Service ---
class QuantumAPIClient:
# CHANGED: The __init__ method is updated to handle an optional API key.
def __init__(self, base_url: str, api_key: Optional[str] = None):
self.base_url = base_url
self.headers = {
"Content-Type": "application/json",
"Accept": "application/json"
}
if api_key:
print("Quantum API Key found, adding Authorization header.")
self.headers["Authorization"] = f"Bearer {api_key}"
else:
print("No Quantum API Key found, making public API calls.")
def _get(self, endpoint: str, params: Dict[str, Any] = None):
try:
response = requests.get(f"{self.base_url}{endpoint}", headers=self.headers, params=params, timeout=20)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": f"API call failed: {e}"}
def _post(self, endpoint: str, data: Dict[str, Any]):
try:
response = requests.post(f"{self.base_url}{endpoint}", headers=self.headers, json=data, timeout=20)
response.raise_for_status()
return response.json()
except Exception as e:
return {"error": f"API call failed: {e}"}
quantum_client = QuantumAPIClient(base_url=QUANTUM_API_BASE_URL, api_key=QUANTUM_API_KEY)
# --- Tool Definitions (Internal Memory + External API) ---
def get_ai_advisor_chat_history(supabase_user_client: Client, user_id: str):
print(f"TOOL CALL: get_ai_advisor_chat_history for user '{user_id}' from Supabase")
try:
response = supabase_user_client.table('chat_history').select("*").eq('user_id', user_id).order('timestamp').limit(50).execute()
return response.data
except Exception as e:
return {"error": f"Database query failed: {str(e)}"}
def save_chat_turn(supabase_user_client: Client, user_id: str, session_id: str, user_message: str, assistant_message: str):
try:
supabase_user_client.table('chat_history').insert([
{"user_id": user_id, "session_id": session_id, "role": "user", "content": user_message},
{"user_id": user_id, "session_id": session_id, "role": "assistant", "content": assistant_message}
]).execute()
except Exception as e:
print(f"Error saving chat turn: {e}")
def run_standard_financial_simulation(prompt: str, duration_years: int = 5):
"""Submits a 'What-If' scenario to the Quantum Oracle AI for standard financial impact analysis."""
print(f"TOOL CALL: run_standard_financial_simulation with prompt: '{prompt}'")
payload = {"prompt": prompt, "parameters": {"durationYears": duration_years}}
return quantum_client._post("/ai/oracle/simulate", data=payload)
def generate_video_ad(prompt: str, style: str = "Cinematic", length_seconds: int = 15):
"""Submits a request to generate a high-quality video ad using the Veo 2.0 generative AI model."""
print(f"TOOL CALL: generate_video_ad with prompt: '{prompt}'")
payload = {"prompt": prompt, "style": style, "lengthSeconds": length_seconds}
return quantum_client._post("/ai/ads/generate", data=payload)
def list_available_ai_tools():
"""Retrieves a list of all integrated AI tools that Quantum can invoke."""
print("TOOL CALL: list_available_ai_tools -> GET /ai/advisor/tools")
return quantum_client._get("/ai/advisor/tools")
# --- Core Agent & Auth Logic ---
def perform_login(user_id: str):
if not supabase_admin: raise Exception("Auth service not configured.")
payload = { "sub": user_id, "aud": "authenticated", "exp": datetime.utcnow() + timedelta(hours=1), "role": "authenticated" }
return jwt.encode(payload, JWT_SECRET, algorithm="HS256")
def run_agent_logic(prompt: str, user_id: str, session_id: str, token: str):
try:
decoded_token = jwt.decode(token, JWT_SECRET, algorithms=["HS256"], audience="authenticated")
if not decoded_token.get("sub") or decoded_token.get("sub") != user_id:
raise Exception("Invalid token or user mismatch.")
supabase_user_client = create_client(SUPABASE_URL, SUPABASE_KEY)
supabase_user_client.auth.set_session(access_token=token, refresh_token=token)
except jwt.PyJWTError as e:
raise Exception(f"Invalid token: {e}")
def get_my_chat_history():
"""Fetches my most recent conversation history to provide context."""
return get_ai_advisor_chat_history(supabase_user_client, user_id)
all_tools = [
get_my_chat_history,
run_standard_financial_simulation,
generate_video_ad,
list_available_ai_tools,
]
try:
genai.configure(api_key=GOOGLE_API_KEY)
history_data = get_my_chat_history()
conversation_history = []
if isinstance(history_data, list):
for turn in history_data:
role = "user" if turn['role'] == 'user' else "model"
conversation_history.append({'role': role, 'parts': [{'text': turn['content']}]})
conversation_history.append({'role': 'user', 'parts': [{'text': prompt}]})
model = genai.GenerativeModel('gemini-pro')
response = model.generate_content(
conversation_history, tools=all_tools,
safety_settings={'HARM_CATEGORY_DANGEROUS_CONTENT': 'BLOCK_NONE', 'HARM_CATEGORY_HARASSMENT': 'BLOCK_NONE', 'HARM_CATEGORY_HATE_SPEECH': 'BLOCK_NONE', 'HARM_CATEGORY_SEXUALLY_EXPLICIT': 'BLOCK_NONE'}
)
final_response_text = response.text
save_chat_turn(supabase_user_client, user_id, session_id, prompt, final_response_text)
return final_response_text
except Exception as e:
print(f"Error in agent invocation: {e}")
return f"An error occurred: {e}"
# --- Gradio UI (Pure Gradio, No FastAPI) ---
with gr.Blocks() as demo:
gr.Markdown("# 💎 Secure, Multi-User AI Agent (Full API)")
gr.Markdown("Log in with a User ID. The agent can fetch chat history (from Supabase) and call external APIs for simulations or ads.")
user_id_state = gr.State("")
token_state = gr.State("")
session_id_state = gr.State("session-" + os.urandom(8).hex())
with gr.Row():
user_id_input = gr.Textbox(label="Enter User ID to Log In", placeholder="e.g., user-a")
login_button = gr.Button("Login")
login_status = gr.Markdown("")
chatbot = gr.Chatbot(label="Agent Chat", visible=False, height=500)
msg_input = gr.Textbox(label="Your Message", visible=False, show_label=False, placeholder="Type your message here...")
def login_fn(user_id):
if not user_id:
return {login_status: gr.update(value="<p style='color:red;'>Please enter a User ID.</p>")}
try:
token = perform_login(user_id)
status_html = f"<p style='color:green;'>Logged in as: {user_id}. You can start chatting.</p>"
return {
login_status: gr.update(value=status_html), user_id_state: user_id, token_state: token,
chatbot: gr.update(visible=True), msg_input: gr.update(visible=True),
}
except Exception as e:
return {login_status: gr.update(value=f"<p style='color:red;'>Login failed: {e}</p>")}
def chat_fn(message, chat_history, user_id, token, session_id):
if not token:
chat_history.append((message, "Error: You are not logged in."))
return "", chat_history
bot_message = run_agent_logic(message, user_id, session_id, token)
chat_history.append((message, bot_message))
return "", chat_history
login_button.click(login_fn, inputs=[user_id_input], outputs=[login_status, user_id_state, token_state, chatbot, msg_input])
msg_input.submit(chat_fn, [msg_input, chatbot, user_id_state, token_state, session_id_state], [msg_input, chatbot])
# The standard way to launch a Gradio app on Hugging Face
demo.launch()