Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from google import genai | |
| from google.genai import types | |
| from google.genai.types import Tool, GoogleSearch, FunctionDeclaration | |
| from PIL import Image | |
| import io | |
| import traceback | |
| import datetime | |
| import re | |
| import importlib | |
| import os | |
| import sys | |
| from typing import List, Dict, Any, Optional | |
| from pathlib import Path | |
| # Add current directory to path for imports | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| from base_extension import BaseExtension | |
| DESCRIPTION = """ | |
| # GemiWine | |
| **Powered by Gemini 2.5 Flash + Google Search Grounding + Agent Extensions** | |
| """ | |
| BASE_SYSTEM_PROMPT = """ | |
| You are GemiWine, a helpful AI assistant with extensible capabilities. | |
| Your core abilities include conversation, web search, and image understanding. | |
| When users enable extensions, you gain additional tools and capabilities. | |
| Always use the available tools when they would be helpful to the user. | |
| Be proactive about suggesting when an extension might be useful. | |
| """ | |
| def log(msg: str): | |
| now = datetime.datetime.now().strftime("%H:%M:%S") | |
| print(f"[{now}] {msg}", flush=True) | |
| def get_mime_type(file_path: str) -> str: | |
| """Determine MIME type from file extension""" | |
| ext = Path(file_path).suffix.lower() | |
| mime_types = { | |
| # Images | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.webp': 'image/webp', | |
| '.heic': 'image/heic', | |
| '.heif': 'image/heif', | |
| # Documents | |
| '.pdf': 'application/pdf', | |
| '.txt': 'text/plain', | |
| '.html': 'text/html', | |
| '.md': 'text/markdown', | |
| # Videos | |
| '.mp4': 'video/mp4', | |
| '.mpeg': 'video/mpeg', | |
| '.mov': 'video/mov', | |
| '.avi': 'video/avi', | |
| '.flv': 'video/x-flv', | |
| '.mpg': 'video/mpg', | |
| '.webm': 'video/webm', | |
| '.wmv': 'video/wmv', | |
| '.3gpp': 'video/3gpp', | |
| } | |
| return mime_types.get(ext, 'application/octet-stream') | |
| def process_uploaded_file(client: genai.Client, file_path: str) -> types.Part: | |
| """Process an uploaded file and return a Part object""" | |
| mime_type = get_mime_type(file_path) | |
| file_size = Path(file_path).stat().st_size | |
| log(f"📎 Processing file: {Path(file_path).name} ({mime_type}, {file_size/1024:.1f}KB)") | |
| # For files > 20MB or videos, use File API | |
| if file_size > 20 * 1024 * 1024 or mime_type.startswith('video/'): | |
| log(f"📤 Uploading large file via File API...") | |
| uploaded_file = client.files.upload(file=file_path) | |
| log(f"✅ File uploaded: {uploaded_file.name}") | |
| return uploaded_file | |
| else: | |
| # For smaller files, pass inline | |
| with open(file_path, 'rb') as f: | |
| file_bytes = f.read() | |
| log(f"✅ File loaded inline") | |
| return types.Part.from_bytes(data=file_bytes, mime_type=mime_type) | |
| class ExtensionManager: | |
| """Manages loading and interfacing with extensions""" | |
| def __init__(self): | |
| self.extensions: Dict[str, BaseExtension] = {} | |
| self.load_extensions() | |
| def load_extensions(self): | |
| """Dynamically load all extensions from extensions/ folder""" | |
| extensions_dir = Path("extensions") | |
| if not extensions_dir.exists(): | |
| log("⚠️ Extensions directory not found, creating it...") | |
| extensions_dir.mkdir() | |
| return | |
| log(f"🔍 Scanning for extensions in {extensions_dir.absolute()}") | |
| for file in extensions_dir.glob("*.py"): | |
| if file.name.startswith("_"): | |
| log(f"⏭️ Skipping {file.name} (starts with _)") | |
| continue | |
| try: | |
| log(f"📦 Attempting to load: {file.name}") | |
| module_name = file.stem | |
| spec = importlib.util.spec_from_file_location(module_name, file) | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| # Find Extension class in module | |
| found_extension = False | |
| for attr_name in dir(module): | |
| attr = getattr(module, attr_name) | |
| if (isinstance(attr, type) and | |
| issubclass(attr, BaseExtension) and | |
| attr != BaseExtension): | |
| ext = attr() | |
| self.extensions[ext.name] = ext | |
| log(f"✅ Loaded extension: {ext.display_name} ({ext.name})") | |
| found_extension = True | |
| break | |
| if not found_extension: | |
| log(f"⚠️ No extension class found in {file.name}") | |
| except Exception as e: | |
| log(f"❌ Failed to load {file.name}: {e}") | |
| traceback.print_exc() | |
| log(f"📊 Total extensions loaded: {len(self.extensions)}") | |
| def get_extension(self, name: str) -> Optional[BaseExtension]: | |
| return self.extensions.get(name) | |
| def get_all_extensions(self) -> List[BaseExtension]: | |
| return list(self.extensions.values()) | |
| def get_enabled_extensions(self, user_id: str, enabled_list: List[str]) -> List[BaseExtension]: | |
| """Get list of enabled extension objects""" | |
| return [ext for name, ext in self.extensions.items() if name in enabled_list] | |
| def build_system_prompt(self, enabled_list: List[str]) -> str: | |
| """Build system prompt with enabled extension contexts""" | |
| prompt = BASE_SYSTEM_PROMPT | |
| enabled_exts = self.get_enabled_extensions("", enabled_list) | |
| if enabled_exts: | |
| prompt += "\n\n# ENABLED EXTENSIONS\nYou currently have these extensions enabled:\n\n" | |
| for ext in enabled_exts: | |
| prompt += f"## {ext.display_name}\n{ext.get_system_context()}\n\n" | |
| return prompt | |
| def get_all_tools(self, enabled_list: List[str]) -> List[types.Tool]: | |
| """Get all tools from enabled extensions (no search tool here)""" | |
| tools = [] | |
| # Add extension tools only | |
| for ext_name in enabled_list: | |
| ext = self.get_extension(ext_name) | |
| if ext: | |
| tools.extend(ext.get_tools()) | |
| return tools | |
| def get_search_tool(self) -> types.Tool: | |
| """Get Google Search tool separately""" | |
| return types.Tool(google_search=types.GoogleSearch()) | |
| def handle_function_calls(self, user_id: str, enabled_list: List[str], function_calls: List) -> List: | |
| """Process function calls from Gemini and return results""" | |
| results = [] | |
| for fc in function_calls: | |
| function_name = fc.name | |
| args = fc.args if hasattr(fc, 'args') else {} | |
| # Find which extension owns this function | |
| handled = False | |
| for ext_name in enabled_list: | |
| ext = self.get_extension(ext_name) | |
| if ext: | |
| # Check if this function is in the extension's tools | |
| for tool in ext.get_tools(): | |
| if hasattr(tool, 'function_declarations'): | |
| for func_decl in tool.function_declarations: | |
| if func_decl.name == function_name: | |
| result = ext.handle_tool_call(user_id, function_name, args) | |
| # Keep result as dict, don't convert to string yet | |
| results.append(result) | |
| handled = True | |
| break | |
| if handled: | |
| break | |
| if handled: | |
| break | |
| if not handled: | |
| results.append({"error": f"Unknown function {function_name}"}) | |
| return results | |
| class AgentOrchestrator: | |
| """Orchestrates multiple specialized agents""" | |
| def __init__(self, client, chat, extension_manager, enabled_extensions): | |
| self.client = client | |
| self.chat = chat # Multi-turn chat session | |
| self.extension_manager = extension_manager | |
| self.enabled_extensions = enabled_extensions | |
| # Create a separate chat session for search (to isolate it from tool calls) | |
| self.search_chat = client.chats.create(model="gemini-2.5-flash") | |
| def call_search_agent(self, query: str, file_parts: List = None) -> tuple: | |
| """Call specialized search agent using streaming - returns (text, citations)""" | |
| log("🔍 Calling Search Agent...") | |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) | |
| config = types.GenerateContentConfig( | |
| system_instruction="You are a search specialist. Use Google Search to find relevant, accurate information. Provide concise, well-cited answers.", | |
| tools=[grounding_tool], | |
| temperature=0.7, | |
| max_output_tokens=2048 | |
| ) | |
| try: | |
| # Build message content with files if provided | |
| content_parts = [] | |
| if file_parts: | |
| content_parts.extend(file_parts) | |
| content_parts.append(query) | |
| # Use streaming like the working example | |
| result_text = "" | |
| last_chunk = None | |
| stream = self.search_chat.send_message_stream(content_parts, config=config) | |
| for chunk in stream: | |
| last_chunk = chunk | |
| if hasattr(chunk, 'candidates') and chunk.candidates: | |
| candidate = chunk.candidates[0] | |
| if hasattr(candidate, 'content') and candidate.content: | |
| if hasattr(candidate.content, 'parts') and candidate.content.parts: | |
| for part in candidate.content.parts: | |
| if hasattr(part, 'text') and part.text: | |
| result_text += part.text | |
| # Extract citations from the last chunk using the working function | |
| citations = None | |
| if last_chunk and hasattr(last_chunk, 'candidates') and last_chunk.candidates: | |
| log(f"🔍 Extracting citations from search response...") | |
| citations = insert_citations_from_grounding(last_chunk.candidates) | |
| if citations: | |
| log(f"✅ Citations extracted successfully") | |
| else: | |
| log(f"⚠️ No citations found in grounding metadata") | |
| if result_text: | |
| log(f"✅ Search Agent returned {len(result_text)} chars") | |
| else: | |
| log(f"⚠️ Search Agent returned empty result") | |
| return result_text, citations | |
| except Exception as e: | |
| log(f"⚠️ Search Agent error: {e}") | |
| traceback.print_exc() | |
| return "", None | |
| def call_tool_agent(self, query: str, search_context: str = "", reasoning_budget: int = -1, file_parts: List = None) -> tuple: | |
| """Call tool execution agent with function calling - uses multi-turn chat""" | |
| log("🛠️ Calling Tool Agent...") | |
| # Build prompt with context if needed | |
| prompt = query | |
| if search_context: | |
| prompt = f"[Context from Search]\n{search_context}\n\n[User Request]\n{query}" | |
| # Get extension tools | |
| tools = self.extension_manager.get_all_tools(self.enabled_extensions) | |
| system_prompt = self.extension_manager.build_system_prompt(self.enabled_extensions) | |
| system_prompt += """ | |
| CRITICAL INSTRUCTIONS FOR TOOL USAGE: | |
| - You have PERSISTENT STATE across all conversations in this chat session | |
| - Timers, tasks, notes, and other data remain even after responses | |
| - When users ask about "the timer", "the alarm", "my tasks", etc., they're referring to items created earlier | |
| - ALWAYS use your tools (list_timers, list_tasks, check_timer, etc.) when asked about status | |
| - Don't say you can't access information - use your available tools first | |
| - Be proactive: if user mentions checking something, use the appropriate tool immediately | |
| If search context is provided, incorporate it naturally. | |
| When images, PDFs, videos, or other files are provided, analyze them thoroughly and reference them in your response.""" | |
| config = types.GenerateContentConfig( | |
| system_instruction=system_prompt, | |
| tools=tools, | |
| temperature=0.7, | |
| max_output_tokens=4096, | |
| thinking_config=types.ThinkingConfig( | |
| include_thoughts=True, | |
| thinking_budget=reasoning_budget, | |
| ) | |
| ) | |
| try: | |
| # Build message content with files if provided | |
| content_parts = [] | |
| if file_parts: | |
| content_parts.extend(file_parts) | |
| content_parts.append(prompt) | |
| # Use the chat session's send_message (maintains conversation history automatically) | |
| response = self.chat.send_message( | |
| content_parts, | |
| config=config | |
| ) | |
| function_calls = [] | |
| text_response = "" | |
| thoughts = "" | |
| if response.candidates and response.candidates[0].content: | |
| for part in response.candidates[0].content.parts: | |
| if hasattr(part, 'function_call') and part.function_call: | |
| function_calls.append(part.function_call) | |
| log(f"🔧 Tool call: {part.function_call.name}") | |
| if getattr(part, "text", None): | |
| if getattr(part, "thought", False): | |
| thoughts += part.text | |
| else: | |
| text_response += part.text | |
| return function_calls, text_response, thoughts | |
| except Exception as e: | |
| log(f"⚠️ Tool Agent error: {e}") | |
| traceback.print_exc() | |
| return [], "", "" | |
| def synthesize_response(self, query: str, search_results: str, tool_results: list, search_citations: Optional[str] = None, file_parts: List = None) -> tuple: | |
| """Synthesize final response from all sources - returns (text, images_html)""" | |
| log("✨ Synthesizing final response...") | |
| synthesis_prompt = f"[Original Query]\n{query}\n\n" | |
| if search_results: | |
| synthesis_prompt += f"[Web Search Results]\n{search_results}\n\n" | |
| # Collect any generated images from tool results | |
| generated_images = [] | |
| if tool_results: | |
| synthesis_prompt += "[Tool Execution Results]\n" | |
| for tool_name, result in tool_results: | |
| if result is None: | |
| result = "(no result)" | |
| # Check if result contains a generated chart/image | |
| if isinstance(result, dict) and 'image_base64' in result: | |
| generated_images.append({ | |
| 'base64': result['image_base64'], | |
| 'title': result.get('message', 'Generated visualization'), | |
| 'filepath': result.get('filepath', '') | |
| }) | |
| # Don't include base64 in the synthesis prompt (too long) | |
| result_clean = dict(result) | |
| result_clean.pop('image_base64', None) | |
| synthesis_prompt += f"- {tool_name}: {result_clean.get('message', '')} (Chart created and will be displayed)\n" | |
| else: | |
| synthesis_prompt += f"- {tool_name}: {result}\n" | |
| synthesis_prompt += "\n" | |
| synthesis_prompt += "Provide a comprehensive answer that incorporates all available information above. Be natural and conversational." | |
| # If files were provided, reference them in the context | |
| if file_parts: | |
| synthesis_prompt += "\n\nNote: The user has provided files (images/documents/videos) with their query. Make sure to reference and discuss the content of these files in your response." | |
| config = types.GenerateContentConfig( | |
| system_instruction="You are a synthesis specialist. Combine information from multiple sources into coherent, helpful responses. When files are provided, analyze and reference them in your answer.", | |
| temperature=0.7, | |
| max_output_tokens=4096 | |
| ) | |
| try: | |
| # Build content parts with files if provided | |
| content_parts = [] | |
| if file_parts: | |
| content_parts.extend(file_parts) | |
| content_parts.append(types.Part(text=synthesis_prompt)) | |
| response = self.client.models.generate_content( | |
| model="gemini-2.5-flash", | |
| contents=[types.Content(role="user", parts=content_parts)], | |
| config=config | |
| ) | |
| result_text = "" | |
| if response.candidates and response.candidates[0].content: | |
| for part in response.candidates[0].content.parts: | |
| if getattr(part, "text", None): | |
| result_text += part.text | |
| return result_text, generated_images | |
| except Exception as e: | |
| log(f"⚠️ Synthesis error: {e}") | |
| return "I encountered an error synthesizing the response.", [] | |
| def determine_needs_search(chat, query: str) -> bool: | |
| """Determine if query needs web search - uses chat session for reliability""" | |
| # Simple heuristic first - if query explicitly asks to search | |
| search_keywords = ['search', 'find online', 'look up online', 'google', 'search online', 'check online'] | |
| if any(keyword in query.lower() for keyword in search_keywords): | |
| log(f"🔍 Search triggered by explicit keyword") | |
| return True | |
| # For questions about recommendations, comparisons, "best" items - likely needs search | |
| recommendation_keywords = ['best', 'recommend', 'top', 'which', 'what are good', 'compare'] | |
| if any(keyword in query.lower() for keyword in recommendation_keywords): | |
| log(f"🔍 Search triggered by recommendation question") | |
| return True | |
| # Default to no search for timer/task management queries | |
| internal_keywords = ['timer', 'alarm', 'task', 'note', 'how much time'] | |
| if any(keyword in query.lower() for keyword in internal_keywords): | |
| log(f"❌ No search - internal tool query") | |
| return False | |
| log(f"❌ No search - general query") | |
| return False | |
| # Global instances | |
| EXTENSION_MANAGER = ExtensionManager() | |
| CHAT_SESSIONS: Dict[str, Dict[str, Any]] = {} | |
| def get_or_create_session(api_key: str): | |
| if not api_key: | |
| return None, None | |
| if api_key in CHAT_SESSIONS: | |
| return (CHAT_SESSIONS[api_key]["client"], | |
| CHAT_SESSIONS[api_key]["chat"]) | |
| try: | |
| client = genai.Client(api_key=api_key) | |
| # Create a chat session for multi-turn conversations | |
| chat = client.chats.create(model="gemini-2.5-flash") | |
| CHAT_SESSIONS[api_key] = { | |
| "client": client, | |
| "chat": chat | |
| } | |
| log("✅ Created new Gemini session with multi-turn chat.") | |
| return client, chat | |
| except Exception as e: | |
| log(f"❌ Error creating Gemini client: {e}") | |
| return None, None | |
| def insert_citations_from_grounding(candidates): | |
| """Extract citations from grounding metadata - using chunk titles as display names""" | |
| try: | |
| if not candidates: | |
| log("⚠️ No candidates for citation extraction") | |
| return None | |
| cand = candidates[0] | |
| # Check if grounding metadata exists | |
| grounding = getattr(cand, "grounding_metadata", None) | |
| if not grounding: | |
| log("⚠️ No grounding_metadata found") | |
| return None | |
| # Get chunks | |
| chunks = getattr(grounding, "grounding_chunks", None) or [] | |
| if not chunks: | |
| log("⚠️ No grounding_chunks found") | |
| return None | |
| # Build citation list from chunks | |
| citations = [] | |
| seen_titles = set() | |
| for idx, chunk in enumerate(chunks): | |
| if hasattr(chunk, 'web') and chunk.web: | |
| uri = getattr(chunk.web, "uri", None) | |
| title = getattr(chunk.web, "title", None) | |
| # Use title as the clickable text since it shows the actual domain | |
| if uri and title and title not in seen_titles: | |
| seen_titles.add(title) | |
| citations.append(f"[{title}]({uri})") | |
| elif uri: | |
| citations.append(f"[Source {idx+1}]({uri})") | |
| if citations: | |
| citation_text = "\n\n📚 **Sources:** " + " • ".join(citations) | |
| log(f"✅ Created {len(citations)} citations with source domains") | |
| return citation_text | |
| else: | |
| log("⚠️ No valid citations could be created") | |
| return None | |
| except Exception as e: | |
| log(f"⚠️ Citation extraction failed: {e}") | |
| traceback.print_exc() | |
| return None | |
| def reasoning_budget(level: str) -> int: | |
| level = (level or "Dynamic").lower() | |
| if level == "none": | |
| return 0 | |
| elif level == "concise": | |
| return 256 | |
| elif level == "strong": | |
| return 2048 | |
| elif level == "dynamic": | |
| return -1 | |
| return -1 | |
| def chat_with_gemini(api_key, chat_history_msgs, multimodal_input, show_thoughts, reasoning_level, enabled_extensions): | |
| log("=== chat_with_gemini CALLED ===") | |
| if not api_key: | |
| chat_history_msgs = chat_history_msgs or [] | |
| chat_history_msgs.append({ | |
| "role": "assistant", | |
| "content": "🔑 Please enter your Gemini API key first." | |
| }) | |
| yield chat_history_msgs | |
| return | |
| client, chat = get_or_create_session(api_key) | |
| if not client: | |
| chat_history_msgs.append({ | |
| "role": "assistant", | |
| "content": "⚠️ Could not create Gemini session." | |
| }) | |
| yield chat_history_msgs | |
| return | |
| user_text = (multimodal_input or {}).get("text", "") or "" | |
| uploaded_files = (multimodal_input or {}).get("files", []) or [] | |
| if chat_history_msgs is None: | |
| chat_history_msgs = [] | |
| # Process uploaded files | |
| file_parts = [] | |
| if uploaded_files: | |
| log(f"📎 Processing {len(uploaded_files)} uploaded file(s)...") | |
| for file_path in uploaded_files: | |
| try: | |
| file_part = process_uploaded_file(client, file_path) | |
| file_parts.append(file_part) | |
| except Exception as e: | |
| log(f"❌ Error processing file {file_path}: {e}") | |
| traceback.print_exc() | |
| chat_history_msgs.append({"role": "user", "content": user_text}) | |
| yield chat_history_msgs | |
| assistant_base_index = len(chat_history_msgs) | |
| # Setup thinking display if enabled | |
| if show_thoughts: | |
| thought_index = assistant_base_index | |
| chat_history_msgs.append({"role": "assistant", "content": "<em>💭 Thinking...</em>"}) | |
| answer_index = thought_index + 1 | |
| chat_history_msgs.append({"role": "assistant", "content": "🤔 Processing..."}) | |
| else: | |
| thought_index = None | |
| answer_index = assistant_base_index | |
| chat_history_msgs.append({"role": "assistant", "content": "🤔 Processing..."}) | |
| yield chat_history_msgs | |
| try: | |
| # Initialize variables at function scope | |
| search_citations = None | |
| # AGENT ORCHESTRATION APPROACH | |
| if enabled_extensions: | |
| log("🎭 Using multi-agent orchestration with multi-turn chat") | |
| orchestrator = AgentOrchestrator(client, chat, EXTENSION_MANAGER, enabled_extensions) | |
| budget = reasoning_budget(reasoning_level) | |
| thoughts_accumulated = "" | |
| # Step 1: Determine if search is needed | |
| needs_search = determine_needs_search(chat, user_text) | |
| log(f"📊 Search needed: {needs_search}") | |
| # Step 2: Call search agent if needed | |
| search_results = "" | |
| if needs_search: | |
| chat_history_msgs[answer_index]["content"] = "🔍 Searching the web..." | |
| yield chat_history_msgs | |
| search_results, search_citations = orchestrator.call_search_agent(user_text, file_parts) | |
| log(f"📋 After search: search_citations = {search_citations[:100] if search_citations else 'None'}") | |
| if search_results: | |
| chat_history_msgs[answer_index]["content"] = "✅ Found information online\n\n🛠️ Now processing with tools..." | |
| yield chat_history_msgs | |
| # Step 3: Call tool agent (with files) | |
| function_calls, tool_response, tool_thoughts = orchestrator.call_tool_agent( | |
| user_text, search_results, budget, file_parts | |
| ) | |
| # Show thoughts if available | |
| if tool_thoughts and show_thoughts: | |
| thoughts_accumulated += tool_thoughts | |
| chat_history_msgs[thought_index]["content"] = ( | |
| f"<details open>" | |
| f"<summary><strong>💭 GemiWine's Thinking</strong></summary>" | |
| f"<div style='white-space:pre-wrap;background:inherit;color:inherit;" | |
| f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>" | |
| f"{thoughts_accumulated.strip()}</div>" | |
| f"</details>" | |
| ) | |
| yield chat_history_msgs | |
| # Step 4: Execute function calls if any | |
| tool_results = [] | |
| if function_calls: | |
| chat_history_msgs[answer_index]["content"] = "⚙️ Executing tools..." | |
| yield chat_history_msgs | |
| user_id = api_key | |
| results = EXTENSION_MANAGER.handle_function_calls( | |
| user_id, enabled_extensions, function_calls | |
| ) | |
| for fc, result in zip(function_calls, results): | |
| tool_results.append((fc.name, result)) | |
| log(f"✅ {fc.name}: {result}") | |
| # Step 5: Synthesize final response | |
| if search_results or tool_results or tool_response: | |
| chat_history_msgs[answer_index]["content"] = "✨ Synthesizing answer..." | |
| yield chat_history_msgs | |
| final_answer, generated_images = orchestrator.synthesize_response(user_text, search_results, tool_results, search_citations, file_parts) | |
| else: | |
| final_answer = tool_response or "I couldn't process that request." | |
| generated_images = [] | |
| # Build the final content with citations if available | |
| final_content = ( | |
| f"<div><strong>🍇 Final Answer</strong>" | |
| f"<div style='white-space:pre-wrap;background:inherit;color:inherit;" | |
| f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>" | |
| f"{final_answer.strip()}</div></div>" | |
| ) | |
| # Add generated images/charts | |
| if generated_images: | |
| log(f"📊 Adding {len(generated_images)} generated visualizations to response") | |
| for img_data in generated_images: | |
| final_content += f"\n\n<div style='margin-top:16px;'>" | |
| final_content += f"<strong>📊 {img_data['title']}</strong><br/>" | |
| final_content += f"<img src='data:image/png;base64,{img_data['base64']}' style='max-width:100%;border-radius:8px;box-shadow:0 2px 8px rgba(0,0,0,0.1);'/>" | |
| if img_data['filepath']: | |
| final_content += f"<br/><small style='color:#666;'>Saved to: {img_data['filepath']}</small>" | |
| final_content += "</div>" | |
| # Append citations if they exist | |
| if search_citations: | |
| final_content += "\n\n" + search_citations | |
| log(f"✅ Appended citations to final answer") | |
| chat_history_msgs[answer_index]["content"] = final_content | |
| yield chat_history_msgs | |
| else: | |
| # No extensions - simple streaming with search | |
| log("📺 Using simple streaming mode") | |
| # Build parts for message with files | |
| parts = [] | |
| if file_parts: | |
| parts.extend(file_parts) | |
| parts.append(user_text) | |
| budget = reasoning_budget(reasoning_level) | |
| grounding_tool = types.Tool(google_search=types.GoogleSearch()) | |
| config = types.GenerateContentConfig( | |
| system_instruction=BASE_SYSTEM_PROMPT, | |
| tools=[grounding_tool], | |
| temperature=0.7, | |
| top_p=0.9, | |
| max_output_tokens=8192, | |
| thinking_config=types.ThinkingConfig( | |
| include_thoughts=True, | |
| thinking_budget=budget, | |
| ) | |
| ) | |
| stream = chat.send_message_stream(parts, config=config) | |
| answer = "" | |
| thoughts = "" | |
| last_chunk = None | |
| # Add thinking placeholder if needed | |
| if show_thoughts: | |
| thought_index = answer_index | |
| chat_history_msgs[answer_index]["content"] = "<em>💭 Thinking...</em>" | |
| answer_index = len(chat_history_msgs) | |
| chat_history_msgs.append({"role": "assistant", "content": ""}) | |
| yield chat_history_msgs | |
| for chunk in stream: | |
| last_chunk = chunk | |
| if not getattr(chunk, "candidates", None): | |
| continue | |
| candidate = chunk.candidates[0] | |
| if getattr(candidate, "content", None): | |
| for part in candidate.content.parts: | |
| if not getattr(part, "text", None): | |
| continue | |
| if getattr(part, "thought", False): | |
| thoughts += part.text | |
| if show_thoughts: | |
| chat_history_msgs[thought_index]["content"] = ( | |
| f"<details open>" | |
| f"<summary><strong>💭 GemiWine's Thinking</strong></summary>" | |
| f"<div style='white-space:pre-wrap;background:inherit;color:inherit;" | |
| f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>" | |
| f"{thoughts.strip()}</div>" | |
| f"</details>" | |
| ) | |
| yield chat_history_msgs | |
| else: | |
| answer += part.text | |
| chat_history_msgs[answer_index]["content"] = ( | |
| f"<div><strong>🍇 Final Answer</strong>" | |
| f"<div style='white-space:pre-wrap;background:inherit;color:inherit;" | |
| f"padding:8px;border-radius:8px;border:1px solid var(--border-color);'>" | |
| f"{answer.strip()}</div></div>" | |
| ) | |
| yield chat_history_msgs | |
| # Add citations | |
| if last_chunk: | |
| citations = insert_citations_from_grounding(last_chunk.candidates) | |
| if citations: | |
| chat_history_msgs[answer_index]["content"] += "\n\n" + citations | |
| yield chat_history_msgs | |
| log("✅ Response complete.") | |
| return | |
| except Exception as e: | |
| log(f"❌ Error: {e}") | |
| traceback.print_exc() | |
| chat_history_msgs[answer_index]["content"] = f"⚠️ Error: {e}" | |
| yield chat_history_msgs | |
| return | |
| def build_extension_ui(): | |
| """Build the extension toggle UI""" | |
| extensions = EXTENSION_MANAGER.get_all_extensions() | |
| if not extensions: | |
| return gr.Markdown("No extensions available"), [] | |
| checkboxes = [] | |
| with gr.Accordion("🔌 Agent Extensions", open=True): | |
| gr.Markdown("Enable extensions to give the agent additional capabilities:") | |
| gr.Markdown("✨ **Agentic Mode:** When extensions are enabled, the agent uses multi-step reasoning with search + tools") | |
| for ext in extensions: | |
| cb = gr.Checkbox( | |
| label=f"{ext.icon} {ext.display_name}", | |
| info=ext.description, | |
| value=False | |
| ) | |
| checkboxes.append((ext.name, cb)) | |
| return checkboxes | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(primary_hue="purple", secondary_hue="blue"), | |
| title="GemiWine", | |
| fill_width=True | |
| ) as demo: | |
| gr.HTML(""" | |
| <style> | |
| .gradio-container { padding-top: 1.5rem; padding-bottom: 1.5rem; } | |
| .chat-panel { | |
| background: rgba(255, 255, 255, 0.05); | |
| border-radius: 16px !important; | |
| padding: 1.5rem; | |
| box-shadow: 0 4px 12px rgba(0, 0, 0, 0.05); | |
| border: 1px solid rgba(255, 255, 255, 0.1); | |
| } | |
| .message-input { | |
| border-radius: 12px !important; | |
| border: 1px solid rgba(0,0,0,0.1); | |
| } | |
| </style> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=320): | |
| gr.Markdown("## ⚙️ Settings & Controls") | |
| api_key = gr.Textbox( | |
| label="🔑 Gemini API Key", | |
| placeholder="Paste your Gemini API key here...", | |
| type="password", | |
| ) | |
| reasoning_level = gr.Radio( | |
| ["None", "Concise", "Strong", "Dynamic"], | |
| label="🧠 Reasoning Level", | |
| value="Dynamic", | |
| info="Controls the model's thinking depth.", | |
| ) | |
| show_thoughts = gr.Checkbox( | |
| label="💭 Show Thinking", | |
| value=True, | |
| info="Display reasoning process before answers.", | |
| ) | |
| # Build extension checkboxes | |
| extension_checkboxes = build_extension_ui() | |
| with gr.Column(scale=4): | |
| with gr.Group(elem_classes="chat-panel"): | |
| chatbot = gr.Chatbot( | |
| label="🍇 Chat with GemiWine", | |
| height=650, | |
| show_copy_button=True, | |
| type="messages", | |
| avatar_images=(None, "https://i.imgur.com/Q2EMk2N.png"), | |
| ) | |
| multimodal_msg = gr.MultimodalTextbox( | |
| file_types=[ | |
| "image", "video", "audio", # Gradio presets | |
| ".pdf", ".txt", ".md", ".html", ".xml", # Documents | |
| ".doc", ".docx", ".csv", ".json" # Additional formats | |
| ], | |
| placeholder="Ask anything, upload images/PDFs/videos, or let extensions help you...", | |
| label="Your Message", | |
| elem_classes="message-input", | |
| autofocus=True | |
| ) | |
| # Hidden state to track enabled extensions | |
| enabled_extensions_state = gr.State([]) | |
| def clear_box(): | |
| return {"text": "", "files": []} | |
| def handle_chat(api_key_input, chat_history_msgs, multimodal_dict, thinking_flag, reasoning_lvl, *extension_states): | |
| # Convert extension checkbox states to list of enabled extension names | |
| enabled = [] | |
| for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states): | |
| if is_enabled: | |
| enabled.append(ext_name) | |
| log(f"Enabled extensions: {enabled}") | |
| yield from chat_with_gemini( | |
| api_key_input, chat_history_msgs, multimodal_dict, | |
| thinking_flag, reasoning_lvl, enabled | |
| ) | |
| def check_timers(api_key_input, chat_history, enabled_exts): | |
| """Background function to check for completed timers""" | |
| if not api_key_input or 'timer' not in enabled_exts: | |
| return chat_history | |
| timer_ext = EXTENSION_MANAGER.get_extension('timer') | |
| if not timer_ext: | |
| return chat_history | |
| user_id = api_key_input | |
| timer_ext.initialize_state(user_id) | |
| state = timer_ext.get_state(user_id) | |
| import datetime as dt | |
| now = dt.datetime.now() | |
| newly_completed = [] | |
| for timer in state.get("timers", []): | |
| if timer.get("active") and not timer.get("notified", False): | |
| end_time = dt.datetime.fromisoformat(timer["end_time"]) | |
| if now >= end_time: | |
| newly_completed.append(timer) | |
| timer["notified"] = True | |
| if newly_completed: | |
| timer_ext.update_state(user_id, state) | |
| # Add notification to chat | |
| if chat_history is None: | |
| chat_history = [] | |
| for timer in newly_completed: | |
| notification = f"⏰ **Timer Complete!** Your timer '{timer['name']}' has finished!" | |
| chat_history.append({"role": "assistant", "content": notification}) | |
| log(f"⏰ Timer notification sent: {timer['name']}") | |
| return chat_history | |
| # Get just the checkbox components for inputs | |
| checkbox_components = [cb for _, cb in extension_checkboxes] | |
| # Main chat submission | |
| multimodal_msg.submit( | |
| fn=handle_chat, | |
| inputs=[api_key, chatbot, multimodal_msg, show_thoughts, reasoning_level] + checkbox_components, | |
| outputs=[chatbot], | |
| queue=True, | |
| ).then(fn=clear_box, outputs=[multimodal_msg]) | |
| # Background timer check - runs every 10 seconds | |
| timer_check = gr.Timer(value=10, active=True) | |
| def update_enabled_state(*extension_states): | |
| enabled = [] | |
| for (ext_name, _), is_enabled in zip(extension_checkboxes, extension_states): | |
| if is_enabled: | |
| enabled.append(ext_name) | |
| return enabled | |
| # Update enabled extensions state whenever checkboxes change | |
| for _, cb in extension_checkboxes: | |
| cb.change( | |
| fn=update_enabled_state, | |
| inputs=checkbox_components, | |
| outputs=[enabled_extensions_state] | |
| ) | |
| # Timer polling | |
| timer_check.tick( | |
| fn=check_timers, | |
| inputs=[api_key, chatbot, enabled_extensions_state], | |
| outputs=[chatbot] | |
| ) | |
| if __name__ == "__main__": | |
| log(f"===== GemiWine with Extensions started at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} =====") | |
| demo.launch() |