import base64 import os import time from pathlib import Path from typing import Dict, Optional from langchain_anthropic import ChatAnthropic from langchain_google_genai import ChatGoogleGenerativeAI from langchain_ollama import ChatOllama from langchain_openai import AzureChatOpenAI, ChatOpenAI import gradio as gr from .llm import DeepSeekR1ChatOpenAI def get_llm_model(provider: str, **kwargs): if provider == "anthropic": base_url = kwargs.get("base_url", "https://api.anthropic.com") api_key = kwargs.get("api_key", os.getenv("ANTHROPIC_API_KEY", "")) return ChatAnthropic( model_name=kwargs.get("model_name", "claude-3-5-sonnet-20240620"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "openai": base_url = kwargs.get("base_url", os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1")) api_key = kwargs.get("api_key", os.getenv("OPENAI_API_KEY", "")) return ChatOpenAI( model=kwargs.get("model_name", "gpt-4o"), temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "deepseek": base_url = kwargs.get("base_url", os.getenv("DEEPSEEK_ENDPOINT", "")) api_key = kwargs.get("api_key", os.getenv("DEEPSEEK_API_KEY", "")) model_name = kwargs.get("model_name", "deepseek-chat") if model_name == "deepseek-reasoner": return DeepSeekR1ChatOpenAI( model=model_name, temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) else: return ChatOpenAI( model=model_name, temperature=kwargs.get("temperature", 0.0), base_url=base_url, api_key=api_key, ) elif provider == "gemini": api_key = kwargs.get("api_key", os.getenv("GOOGLE_API_KEY", "")) return ChatGoogleGenerativeAI( model=kwargs.get("model_name", "gemini-2.0-flash-exp"), temperature=kwargs.get("temperature", 0.0), google_api_key=api_key, ) elif provider == "ollama": return ChatOllama( model=kwargs.get("model_name", "qwen2.5:7b"), temperature=kwargs.get("temperature", 0.0), num_ctx=kwargs.get("num_ctx", 32000), base_url=kwargs.get("base_url", "http://localhost:11434"), ) elif provider == "azure_openai": base_url = kwargs.get("base_url", os.getenv("AZURE_OPENAI_ENDPOINT", "")) api_key = kwargs.get("api_key", os.getenv("AZURE_OPENAI_API_KEY", "")) return AzureChatOpenAI( model=kwargs.get("model_name", "gpt-4o"), temperature=kwargs.get("temperature", 0.0), api_version="2024-05-01-preview", azure_endpoint=base_url, api_key=api_key, ) else: raise ValueError(f"Unsupported provider: {provider}") model_names = { "anthropic": ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229"], "openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"], "deepseek": ["deepseek-chat", "deepseek-reasoner"], "gemini": ["gemini-2.0-flash-exp", "gemini-1.5-flash-latest"], "ollama": ["qwen2.5:7b", "llama2:7b"], "azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"], } def update_model_dropdown(llm_provider, api_key=None, base_url=None): if not api_key: api_key = os.getenv(f"{llm_provider.upper()}_API_KEY", "") if not base_url: base_url = os.getenv(f"{llm_provider.upper()}_BASE_URL", "") if llm_provider in model_names: return gr.Dropdown(choices=model_names[llm_provider], value=model_names[llm_provider][0], interactive=True) else: return gr.Dropdown(choices=[], value="", interactive=True, allow_custom_value=True) def encode_image(img_path): if not img_path: return None with open(img_path, "rb") as fin: return base64.b64encode(fin.read()).decode("utf-8") def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]: latest_files = {ext: None for ext in file_types} if not os.path.exists(directory): os.makedirs(directory, exist_ok=True) return latest_files for file_type in file_types: try: matches = list(Path(directory).rglob(f"*{file_type}")) if matches: latest = max(matches, key=lambda p: p.stat().st_mtime) if time.time() - latest.stat().st_mtime > 1.0: latest_files[file_type] = str(latest) except Exception as e: print(f"Error getting latest {file_type} file: {e}") return latest_files def capture_screenshot(browser_context): try: playwright_browser = browser_context.browser.playwright_browser if not playwright_browser or not playwright_browser.contexts: return None playwright_context = playwright_browser.contexts[0] pages = playwright_context.pages if playwright_context else [] active_page = next((page for page in pages if page.url != "about:blank"), pages[0] if pages else None) if not active_page: return None screenshot = active_page.screenshot(type='jpeg', quality=75, scale="css") return base64.b64encode(screenshot).decode('utf-8') except Exception as e: print(f"Error capturing screenshot: {e}") return None