wuhp's picture
Create app2.py
be69d68 verified
import gradio as gr
from google import genai
from google.genai import types
from google.genai.types import Tool, GoogleSearch, FunctionDeclaration
from PIL import Image
import io
import traceback
import datetime
import re
import importlib
import os
import sys
from typing import List, Dict, Any, Optional
from pathlib import Path
# Add current directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from base_extension import BaseExtension
DESCRIPTION = """
# GemiWine
**Powered by Gemini 2.5 Flash + Google Search Grounding + Agent Extensions**
"""
BASE_SYSTEM_PROMPT = """
You are GemiWine, a helpful AI assistant with extensible capabilities.
Your core abilities include conversation, web search, and image understanding.
When users enable extensions, you gain additional tools and capabilities.
Always use the available tools when they would be helpful to the user.
Be proactive about suggesting when an extension might be useful.
"""
def log(msg: str):
now = datetime.datetime.now().strftime("%H:%M:%S")
print(f"[{now}] {msg}", flush=True)
def get_mime_type(file_path: str) -> str:
"""Determine MIME type from file extension"""
ext = Path(file_path).suffix.lower()
mime_types = {
# Images
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.gif': 'image/gif',
'.webp': 'image/webp',
'.heic': 'image/heic',
'.heif': 'image/heif',
# Documents
'.pdf': 'application/pdf',
'.txt': 'text/plain',
'.html': 'text/html',
'.md': 'text/markdown',
# Videos
'.mp4': 'video/mp4',
'.mpeg': 'video/mpeg',
'.mov': 'video/mov',
'.avi': 'video/avi',
'.flv': 'video/x-flv',
'.mpg': 'video/mpg',
'.webm': 'video/webm',
'.wmv': 'video/wmv',
'.3gpp': 'video/3gpp',
}
return mime_types.get(ext, 'application/octet-stream')
def process_uploaded_file(client: genai.Client, file_path: str) -> types.Part:
"""Process an uploaded file and return a Part object"""
mime_type = get_mime_type(file_path)
file_size = Path(file_path).stat().st_size
log(f"📎 Processing file: {Path(file_path).name} ({mime_type}, {file_size/1024:.1f}KB)")
# For files > 20MB or videos, use File API
if file_size > 20 * 1024 * 1024 or mime_type.startswith('video/'):
log(f"📤 Uploading large file via File API...")
uploaded_file = client.files.upload(file=file_path)
log(f"✅ File uploaded: {uploaded_file.name}")
return uploaded_file
else:
# For smaller files, pass inline
with open(file_path, 'rb') as f:
file_bytes = f.read()
log(f"✅ File loaded inline")
return types.Part.from_bytes(data=file_bytes, mime_type=mime_type)
class ExtensionManager:
"""Manages loading and interfacing with extensions"""
def __init__(self):
self.extensions: Dict[str, BaseExtension] = {}
self.load_extensions()
def load_extensions(self):
"""Dynamically load all extensions from extensions/ folder"""
extensions_dir = Path("extensions")
if not extensions_dir.exists():
log("⚠️ Extensions directory not found, creating it...")
extensions_dir.mkdir()
return
log(f"🔍 Scanning for extensions in {extensions_dir.absolute()}")
for file in extensions_dir.glob("*.py"):
if file.name.startswith("_"):
log(f"⏭️ Skipping {file.name} (starts with _)")
continue
try:
log(f"📦 Attempting to load: {file.name}")
module_name = file.stem
spec = importlib.util.spec_from_file_location(module_name, file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Find Extension class in module
found_extension = False
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, BaseExtension) and
attr != BaseExtension):
ext = attr()
self.extensions[ext.name] = ext
log(f"✅ Loaded extension: {ext.display_name} ({ext.name})")
found_extension = True
break
if not found_extension:
log(f"⚠️ No extension class found in {file.name}")
except Exception as e:
log(f"❌ Failed to load {file.name}: {e}")
traceback.print_exc()
log(f"📊 Total extensions loaded: {len(self.extensions)}")
def get_extension(self, name: str) -> Optional[BaseExtension]:
return self.extensions.get(name)
def get_all_extensions(self) -> List[BaseExtension]:
return list(self.extensions.values())
def get_enabled_extensions(self, user_id: str, enabled_list: List[str]) -> List[BaseExtension]:
"""Get list of enabled extension objects"""
return [ext for name, ext in self.extensions.items() if name in enabled_list]
def build_system_prompt(self, enabled_list: List[str]) -> str:
"""Build system prompt with enabled extension contexts"""
prompt = BASE_SYSTEM_PROMPT
enabled_exts = self.get_enabled_extensions("", enabled_list)
if enabled_exts:
prompt += "\n\n# ENABLED EXTENSIONS\nYou currently have these extensions enabled:\n\n"
for ext in enabled_exts:
prompt += f"## {ext.display_name}\n{ext.get_system_context()}\n\n"
return prompt
def get_all_tools(self, enabled_list: List[str]) -> List[types.Tool]:
"""Get all tools from enabled extensions (no search tool here)"""
tools = []
# Add extension tools only
for ext_name in enabled_list:
ext = self.get_extension(ext_name)
if ext:
tools.extend(ext.get_tools())
return tools
def get_search_tool(self) -> types.Tool:
"""Get Google Search tool separately"""
return types.Tool(google_search=types.GoogleSearch())
def handle_function_calls(self, user_id: str, enabled_list: List[str], function_calls: List) -> List:
"""Process function calls from Gemini and return results"""
results = []
for fc in function_calls:
function_name = fc.name
args = fc.args if hasattr(fc, 'args') else {}
# Find which extension owns this function
handled = False
for ext_name in enabled_list:
ext = self.get_extension(ext_name)
if ext:
# Check if this function is in the extension's tools
for tool in ext.get_tools():
if hasattr(tool, 'function_declarations'):
for func_decl in tool.function_declarations:
if func_decl.name == function_name:
result = ext.handle_tool_call(user_id, function_name, args)
# Keep result as dict, don't convert to string yet
results.append(result)
handled = True
break
if handled:
break
if handled:
break
if not handled:
results.append({"error": f"Unknown function {function_name}"})
return results
class AgentOrchestrator:
"""Orchestrates multiple specialized agents"""
def __init__(self, client, chat, extension_manager, enabled_extensions):
self.client = client
self.chat = chat # Multi-turn chat session
self.extension_manager = extension_manager
self.enabled_extensions = enabled_extensions
# Create a separate chat session for search (to isolate it from tool calls)
self.search_chat = client.chats.create(model="gemini-2.5-flash")
def call_search_agent(self, query: str, file_parts: List = None) -> tuple:
"""Call specialized search agent using streaming - returns (text, citations)"""
log("🔍 Calling Search Agent...")
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
system_instruction="You are a search specialist. Use Google Search to find relevant, accurate information. Provide concise, well-cited answers.",
tools=[grounding_tool],
temperature=0.7,
max_output_tokens=2048
)
try:
# Build message content with files if provided
content_parts = []
if file_parts:
content_parts.extend(file_parts)
content_parts.append(query)
# Use streaming like the working example
result_text = ""
last_chunk = None
stream = self.search_chat.send_message_stream(content_parts, config=config)
for chunk in stream:
last_chunk = chunk
if hasattr(chunk, 'candidates') and chunk.candidates:
candidate = chunk.candidates[0]
if hasattr(candidate, 'content') and candidate.content:
if hasattr(candidate.content, 'parts') and candidate.content.parts:
for part in candidate.content.parts:
if hasattr(part, 'text') and part.text:
result_text += part.text
# Extract citations from the last chunk using the working function
citations = None
if last_chunk and hasattr(last_chunk, 'candidates') and last_chunk.candidates:
log(f"🔍 Extracting citations from search response...")
citations = insert_citations_from_grounding(last_chunk.candidates)
if citations:
log(f"✅ Citations extracted successfully")
else:
log(f"⚠️ No citations found in grounding metadata")
if result_text:
log(f"✅ Search Agent returned {len(result_text)} chars")
else:
log(f"⚠️ Search Agent returned empty result")
return result_text, citations
except Exception as e:
log(f"⚠️ Search Agent error: {e}")
traceback.print_exc()
return "", None
def call_tool_agent(self, query: str, search_context: str = "", reasoning_budget: int = -1, file_parts: List = None) -> tuple:
"""Call tool execution agent with function calling - uses multi-turn chat"""
log("🛠️ Calling Tool Agent...")
# Build prompt with context if needed
prompt = query
if search_context:
prompt = f"[Context from Search]\n{search_context}\n\n[User Request]\n{query}"
# Get extension tools
tools = self.extension_manager.get_all_tools(self.enabled_extensions)
system_prompt = self.extension_manager.build_system_prompt(self.enabled_extensions)
system_prompt += """
CRITICAL INSTRUCTIONS FOR TOOL USAGE:
- You have PERSISTENT STATE across all conversations in this chat session
- Timers, tasks, notes, and other data remain even after responses
- When users ask about "the timer", "the alarm", "my tasks", etc., they're referring to items created earlier
- ALWAYS use your tools (list_timers, list_tasks, check_timer, etc.) when asked about status
- Don't say you can't access information - use your available tools first
- Be proactive: if user mentions checking something, use the appropriate tool immediately
If search context is provided, incorporate it naturally.
When images, PDFs, videos, or other files are provided, analyze them thoroughly and reference them in your response."""
config = types.GenerateContentConfig(
system_instruction=system_prompt,
tools=tools,
temperature=0.7,
max_output_tokens=4096,
thinking_config=types.ThinkingConfig(
include_thoughts=True,
thinking_budget=reasoning_budget,
)
)
try:
# Build message content with files if provided
content_parts = []
if file_parts:
content_parts.extend(file_parts)
content_parts.append(prompt)
# Use the chat session's send_message (maintains conversation history automatically)
response = self.chat.send_message(
content_parts,
config=config
)
function_calls = []
text_response = ""
thoughts = ""
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if hasattr(part, 'function_call') and part.function_call:
function_calls.append(part.function_call)
log(f"🔧 Tool call: {part.function_call.name}")
if getattr(part, "text", None):
if getattr(part, "thought", False):
thoughts += part.text
else:
text_response += part.text
return function_calls, text_response, thoughts
except Exception as e:
log(f"⚠️ Tool Agent error: {e}")
traceback.print_exc()
return [], "", ""
def synthesize_response(self, query: str, search_results: str, tool_results: list, search_citations: Optional[str] = None, file_parts: List = None) -> tuple:
"""Synthesize final response from all sources - returns (text, images_html)"""
log("✨ Synthesizing final response...")
synthesis_prompt = f"[Original Query]\n{query}\n\n"
if search_results:
synthesis_prompt += f"[Web Search Results]\n{search_results}\n\n"
# Collect any generated images from tool results
generated_images = []
if tool_results:
synthesis_prompt += "[Tool Execution Results]\n"
for tool_name, result in tool_results:
if result is None:
result = "(no result)"
# Check if result contains a generated chart/image
if isinstance(result, dict) and 'image_base64' in result:
generated_images.append({
'base64': result['image_base64'],
'title': result.get('message', 'Generated visualization'),
'filepath': result.get('filepath', '')
})
# Don't include base64 in the synthesis prompt (too long)
result_clean = dict(result)
result_clean.pop('image_base64', None)
synthesis_prompt += f"- {tool_name}: {result_clean.get('message', '')} (Chart created and will be displayed)\n"
else:
synthesis_prompt += f"- {tool_name}: {result}\n"
synthesis_prompt += "\n"
synthesis_prompt += "Provide a comprehensive answer that incorporates all available information above. Be natural and conversational."
# If files were provided, reference them in the context
if file_parts:
synthesis_prompt += "\n\nNote: The user has provided files (images/documents/videos) with their query. Make sure to reference and discuss the content of these files in your response."
config = types.GenerateContentConfig(
system_instruction="You are a synthesis specialist. Combine information from multiple sources into coherent, helpful responses. When files are provided, analyze and reference them in your answer.",
temperature=0.7,
max_output_tokens=4096
)
try:
# Build content parts with files if provided
content_parts = []
if file_parts:
content_parts.extend(file_parts)
content_parts.append(types.Part(text=synthesis_prompt))
response = self.client.models.generate_content(
model="gemini-2.5-flash",
contents=[types.Content(role="user", parts=content_parts)],
config=config
)
result_text = ""
if response.candidates and response.candidates[0].content:
for part in response.candidates[0].content.parts:
if getattr(part, "text", None):
result_text += part.text
return result_text, generated_images
except Exception as e:
log(f"⚠️ Synthesis error: {e}")
return "I encountered an error synthesizing the response.", []
def determine_needs_search(chat, query: str) -> bool:
"""Determine if query needs web search - uses chat session for reliability"""
# Simple heuristic first - if query explicitly asks to search
search_keywords = ['search', 'find online', 'look up online', 'google', 'search online', 'check online']
if any(keyword in query.lower() for keyword in search_keywords):
log(f"🔍 Search triggered by explicit keyword")
return True
# For questions about recommendations, comparisons, "best" items - likely needs search
recommendation_keywords = ['best', 'recommend', 'top', 'which', 'what are good', 'compare']
if any(keyword in query.lower() for keyword in recommendation_keywords):
log(f"🔍 Search triggered by recommendation question")
return True
# Default to no search for timer/task management queries
internal_keywords = ['timer', 'alarm', 'task', 'note', 'how much time']
if any(keyword in query.lower() for keyword in internal_keywords):
log(f"❌ No search - internal tool query")
return False
log(f"❌ No search - general query")
return False
# Global instances
EXTENSION_MANAGER = ExtensionManager()
CHAT_SESSIONS: Dict[str, Dict[str, Any]] = {}
def get_or_create_session(api_key: str):
if not api_key:
return None, None
if api_key in CHAT_SESSIONS:
return (CHAT_SESSIONS[api_key]["client"],
CHAT_SESSIONS[api_key]["chat"])
try:
client = genai.Client(api_key=api_key)
# Create a chat session for multi-turn conversations
chat = client.chats.create(model="gemini-2.5-flash")
CHAT_SESSIONS[api_key] = {
"client": client,
"chat": chat
}
log("✅ Created new Gemini session with multi-turn chat.")
return client, chat
except Exception as e:
log(f"❌ Error creating Gemini client: {e}")
return None, None
def insert_citations_from_grounding(candidates):
"""Extract citations from grounding metadata - using chunk titles as display names"""
try:
if not candidates:
log("⚠️ No candidates for citation extraction")
return None
cand = candidates[0]
# Check if grounding metadata exists
grounding = getattr(cand, "grounding_metadata", None)
if not grounding:
log("⚠️ No grounding_metadata found")
return None
# Get chunks
chunks = getattr(grounding, "grounding_chunks", None) or []
if not chunks:
log("⚠️ No grounding_chunks found")
return None
# Build citation list from chunks
citations = []
seen_titles = set()
for idx, chunk in enumerate(chunks):
if hasattr(chunk, 'web') and chunk.web:
uri = getattr(chunk.web, "uri", None)
title = getattr(chunk.web, "title", None)
# Use title as the clickable text since it shows the actual domain
if uri and title and title not in seen_titles:
seen_titles.add(title)
citations.append(f"[{title}]({uri})")
elif uri:
citations.append(f"[Source {idx+1}]({uri})")
if citations:
citation_text = "\n\n📚 **Sources:** " + " • ".join(citations)
log(f"✅ Created {len(citations)} citations with source domains")
return citation_text
else:
log("⚠️ No valid citations could be created")
return None
except Exception as e:
log(f"⚠️ Citation extraction failed: {e}")
traceback.print_exc()
return None
def reasoning_budget(level: str) -> int:
level = (level or "Dynamic").lower()
if level == "none":
return 0
elif level == "concise":
return 256
elif level == "strong":
return 2048
elif level == "dynamic":
return -1
return -1
def chat_with_gemini(api_key, chat_history_msgs, multimodal_input, show_thoughts, reasoning_level, enabled_extensions):
log("=== chat_with_gemini CALLED ===")
if not api_key:
chat_history_msgs = chat_history_msgs or []
chat_history_msgs.append({
"role": "assistant",
"content": "🔑 Please enter your Gemini API key first."
})
yield chat_history_msgs
return
client, chat = get_or_create_session(api_key)
if not client:
chat_history_msgs.append({
"role": "assistant",
"content": "⚠️ Could not create Gemini session."
})
yield chat_history_msgs
return
user_text = (multimodal_input or {}).get("text", "") or ""
uploaded_files = (multimodal_input or {}).get("files", []) or []
if chat_history_msgs is None:
chat_history_msgs = []
# Process uploaded files
file_parts = []
if uploaded_files:
log(f"📎 Processing {len(uploaded_files)} uploaded file(s)...")
for file_path in uploaded_files:
try:
file_part = process_uploaded_file(client, file_path)
file_parts.append(file_part)
except Exception as e:
log(f"❌ Error processing file {file_path}: {e}")
traceback.print_exc()
chat_history_msgs.append({"role": "user", "content": user_text})
yield chat_history_msgs
assistant_base_index = len(chat_history_msgs)
# Setup thinking display if enabled
if show_thoughts:
thought_index = assistant_base_index
chat_history_msgs.append({"role": "assistant", "content": "<em>💭 Thinking...</em>"})
answer_index = thought_index + 1
chat_history_msgs.append({"role": "assistant", "content": "🤔 Processing..."})
else:
thought_index = None
answer_index = assistant_base_index
chat_history_msgs.append({"role": "assistant", "content": "🤔 Processing..."})
yield chat_history_msgs
try:
# Initialize variables at function scope
search_citations = None
# AGENT ORCHESTRATION APPROACH
if enabled_extensions:
log("🎭 Using multi-agent orchestration with multi-turn chat")
orchestrator = AgentOrchestrator(client, chat, EXTENSION_MANAGER, enabled_extensions)
budget = reasoning_budget(reasoning_level)
thoughts_accumulated = ""
# Step 1: Determine if search is needed
needs_search = determine_needs_search(chat, user_text)
log(f"📊 Search needed: {needs_search}")
# Step 2: Call search agent if needed
search_results = ""
if needs_search:
chat_history_msgs[answer_index]["content"] = "🔍 Searching the web..."
yield chat_history_msgs
search_results, search_citations = orchestrator.call_search_agent(user_text, file_parts)
log(f"📋 After search: search_citations = {search_citations[:100] if search_citations else 'None'}")
if search_results:
chat_history_msgs[answer_index]["content"] = "✅ Found information online\n\n🛠️ Now processing with tools..."
yield chat_history_msgs
# Step 3: Call tool agent (with files)
function_calls, tool_response, tool_thoughts = orchestrator.call_tool_agent(
user_text, search_results, budget, file_parts
)
# Show thoughts if available
if tool_thoughts and show_thoughts:
thoughts_accumulated += tool_thoughts
chat_history_msgs[thought_index]["content"] = (
f"<details open>"
f"<summary><strong>💭 GemiWine's Thinking</strong></summary>"
f"<div style='white-space:pre-wrap;background:inherit;color:inherit;"
f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>"
f"{thoughts_accumulated.strip()}</div>"
f"</details>"
)
yield chat_history_msgs
# Step 4: Execute function calls if any
tool_results = []
if function_calls:
chat_history_msgs[answer_index]["content"] = "⚙️ Executing tools..."
yield chat_history_msgs
user_id = api_key
results = EXTENSION_MANAGER.handle_function_calls(
user_id, enabled_extensions, function_calls
)
for fc, result in zip(function_calls, results):
tool_results.append((fc.name, result))
log(f"✅ {fc.name}: {result}")
# Step 5: Synthesize final response
if search_results or tool_results or tool_response:
chat_history_msgs[answer_index]["content"] = "✨ Synthesizing answer..."
yield chat_history_msgs
final_answer, generated_images = orchestrator.synthesize_response(user_text, search_results, tool_results, search_citations, file_parts)
else:
final_answer = tool_response or "I couldn't process that request."
generated_images = []
# Build the final content with citations if available
final_content = (
f"<div><strong>🍇 Final Answer</strong>"
f"<div style='white-space:pre-wrap;background:inherit;color:inherit;"
f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>"
f"{final_answer.strip()}</div></div>"
)
# Add generated images/charts
if generated_images:
log(f"📊 Adding {len(generated_images)} generated visualizations to response")
for img_data in generated_images:
final_content += f"\n\n<div style='margin-top:16px;'>"
final_content += f"<strong>📊 {img_data['title']}</strong><br/>"
final_content += f"<img src='data:image/png;base64,{img_data['base64']}' style='max-width:100%;border-radius:8px;box-shadow:0 2px 8px rgba(0,0,0,0.1);'/>"
if img_data['filepath']:
final_content += f"<br/><small style='color:#666;'>Saved to: {img_data['filepath']}</small>"
final_content += "</div>"
# Append citations if they exist
if search_citations:
final_content += "\n\n" + search_citations
log(f"✅ Appended citations to final answer")
chat_history_msgs[answer_index]["content"] = final_content
yield chat_history_msgs
else:
# No extensions - simple streaming with search
log("📺 Using simple streaming mode")
# Build parts for message with files
parts = []
if file_parts:
parts.extend(file_parts)
parts.append(user_text)
budget = reasoning_budget(reasoning_level)
grounding_tool = types.Tool(google_search=types.GoogleSearch())
config = types.GenerateContentConfig(
system_instruction=BASE_SYSTEM_PROMPT,
tools=[grounding_tool],
temperature=0.7,
top_p=0.9,
max_output_tokens=8192,
thinking_config=types.ThinkingConfig(
include_thoughts=True,
thinking_budget=budget,
)
)
stream = chat.send_message_stream(parts, config=config)
answer = ""
thoughts = ""
last_chunk = None
# Add thinking placeholder if needed
if show_thoughts:
thought_index = answer_index
chat_history_msgs[answer_index]["content"] = "<em>💭 Thinking...</em>"
answer_index = len(chat_history_msgs)
chat_history_msgs.append({"role": "assistant", "content": ""})
yield chat_history_msgs
for chunk in stream:
last_chunk = chunk
if not getattr(chunk, "candidates", None):
continue
candidate = chunk.candidates[0]
if getattr(candidate, "content", None):
for part in candidate.content.parts:
if not getattr(part, "text", None):
continue
if getattr(part, "thought", False):
thoughts += part.text
if show_thoughts:
chat_history_msgs[thought_index]["content"] = (
f"<details open>"
f"<summary><strong>💭 GemiWine's Thinking</strong></summary>"
f"<div style='white-space:pre-wrap;background:inherit;color:inherit;"
f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>"
f"{thoughts.strip()}</div>"
f"</details>"
)
yield chat_history_msgs
else:
answer += part.text
chat_history_msgs[answer_index]["content"] = (
f"<div><strong>🍇 Final Answer</strong>"
f"<div style='white-space:pre-wrap;background:inherit;color:inherit;"
f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>"
f"{answer.strip()}</div></div>"
)
yield chat_history_msgs
# Add citations
if last_chunk:
citations = insert_citations_from_grounding(last_chunk.candidates)
if citations:
chat_history_msgs[answer_index]["content"] += "\n\n" + citations
yield chat_history_msgs
log("✅ Response complete.")
return
except Exception as e:
log(f"❌ Error: {e}")
traceback.print_exc()
chat_history_msgs[answer_index]["content"] = f"⚠️ Error: {e}"
yield chat_history_msgs
return
def build_extension_ui():
"""Build the extension toggle UI"""
extensions = EXTENSION_MANAGER.get_all_extensions()
if not extensions:
return gr.Markdown("No extensions available"), []
checkboxes = []
with gr.Accordion("🔌 Agent Extensions", open=True):
gr.Markdown("Enable extensions to give the agent additional capabilities:")
gr.Markdown("✨ **Agentic Mode:** When extensions are enabled, the agent uses multi-step reasoning with search + tools")
for ext in extensions:
cb = gr.Checkbox(
label=f"{ext.icon} {ext.display_name}",
info=ext.description,
value=False
)
checkboxes.append((ext.name, cb))
return checkboxes
with gr.Blocks(
theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue"),
title="GemiWine",
fill_width=True
) as demo:
gr.HTML("""
<style>
.gradio-container { padding-top: 1.5rem; padding-bottom: 1.5rem; }
.chat-panel {
background: rgba(255, 255, 255, 0.05);
border-radius: 16px !important;
padding: 1.5rem;
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.05);
border: 1px solid rgba(255, 255, 255, 0.1);
}
.message-input {
border-radius: 12px !important;
border: 1px solid rgba(0,0,0,0.1);
}
</style>
""")
with gr.Row():
with gr.Column(scale=1, min_width=320):
gr.Markdown("## ⚙️ Settings & Controls")
api_key = gr.Textbox(
label="🔑 Gemini API Key",
placeholder="Paste your Gemini API key here...",
type="password",
)
reasoning_level = gr.Radio(
["None", "Concise", "Strong", "Dynamic"],
label="🧠 Reasoning Level",
value="Dynamic",
info="Controls the model's thinking depth.",
)
show_thoughts = gr.Checkbox(
label="💭 Show Thinking",
value=True,
info="Display reasoning process before answers.",
)
# Build extension checkboxes
extension_checkboxes = build_extension_ui()
with gr.Column(scale=4):
with gr.Group(elem_classes="chat-panel"):
chatbot = gr.Chatbot(
label="🍇 Chat with GemiWine",
height=650,
show_copy_button=True,
type="messages",
avatar_images=(None, "https://i.imgur.com/Q2EMk2N.png"),
)
multimodal_msg = gr.MultimodalTextbox(
file_types=[
"image", "video", "audio", # Gradio presets
".pdf", ".txt", ".md", ".html", ".xml", # Documents
".doc", ".docx", ".csv", ".json" # Additional formats
],
placeholder="Ask anything, upload images/PDFs/videos, or let extensions help you...",
label="Your Message",
elem_classes="message-input",
autofocus=True
)
# Hidden state to track enabled extensions
enabled_extensions_state = gr.State([])
def clear_box():
return {"text": "", "files": []}
def handle_chat(api_key_input, chat_history_msgs, multimodal_dict, thinking_flag, reasoning_lvl, *extension_states):
# Convert extension checkbox states to list of enabled extension names
enabled = []
for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states):
if is_enabled:
enabled.append(ext_name)
log(f"Enabled extensions: {enabled}")
yield from chat_with_gemini(
api_key_input, chat_history_msgs, multimodal_dict,
thinking_flag, reasoning_lvl, enabled
)
def check_timers(api_key_input, chat_history, enabled_exts):
"""Background function to check for completed timers"""
if not api_key_input or 'timer' not in enabled_exts:
return chat_history
timer_ext = EXTENSION_MANAGER.get_extension('timer')
if not timer_ext:
return chat_history
user_id = api_key_input
timer_ext.initialize_state(user_id)
state = timer_ext.get_state(user_id)
import datetime as dt
now = dt.datetime.now()
newly_completed = []
for timer in state.get("timers", []):
if timer.get("active") and not timer.get("notified", False):
end_time = dt.datetime.fromisoformat(timer["end_time"])
if now >= end_time:
newly_completed.append(timer)
timer["notified"] = True
if newly_completed:
timer_ext.update_state(user_id, state)
# Add notification to chat
if chat_history is None:
chat_history = []
for timer in newly_completed:
notification = f"⏰ **Timer Complete!** Your timer '{timer['name']}' has finished!"
chat_history.append({"role": "assistant", "content": notification})
log(f"⏰ Timer notification sent: {timer['name']}")
return chat_history
# Get just the checkbox components for inputs
checkbox_components = [cb for _, cb in extension_checkboxes]
# Main chat submission
multimodal_msg.submit(
fn=handle_chat,
inputs=[api_key, chatbot, multimodal_msg, show_thoughts, reasoning_level] + checkbox_components,
outputs=[chatbot],
queue=True,
).then(fn=clear_box, outputs=[multimodal_msg])
# Background timer check - runs every 10 seconds
timer_check = gr.Timer(value=10, active=True)
def update_enabled_state(*extension_states):
enabled = []
for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states):
if is_enabled:
enabled.append(ext_name)
return enabled
# Update enabled extensions state whenever checkboxes change
for _, cb in extension_checkboxes:
cb.change(
fn=update_enabled_state,
inputs=checkbox_components,
outputs=[enabled_extensions_state]
)
# Timer polling
timer_check.tick(
fn=check_timers,
inputs=[api_key, chatbot, enabled_extensions_state],
outputs=[chatbot]
)
if __name__ == "__main__":
log(f"===== GemiWine with Extensions started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====")
demo.launch()