File size: 5,627 Bytes
545e280 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | import base64
import os
import time
from pathlib import Path
from typing import Dict, Optional
from langchain_anthropic import ChatAnthropic
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_ollama import ChatOllama
from langchain_openai import AzureChatOpenAI, ChatOpenAI
import gradio as gr
from .llm import DeepSeekR1ChatOpenAI
def get_llm_model(provider: str, **kwargs):
if provider == "anthropic":
base_url = kwargs.get("base_url", "https://api.anthropic.com")
api_key = kwargs.get("api_key", os.getenv("ANTHROPIC_API_KEY", ""))
return ChatAnthropic(
model_name=kwargs.get("model_name", "claude-3-5-sonnet-20240620"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "openai":
base_url = kwargs.get("base_url", os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1"))
api_key = kwargs.get("api_key", os.getenv("OPENAI_API_KEY", ""))
return ChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "deepseek":
base_url = kwargs.get("base_url", os.getenv("DEEPSEEK_ENDPOINT", ""))
api_key = kwargs.get("api_key", os.getenv("DEEPSEEK_API_KEY", ""))
model_name = kwargs.get("model_name", "deepseek-chat")
if model_name == "deepseek-reasoner":
return DeepSeekR1ChatOpenAI(
model=model_name,
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
else:
return ChatOpenAI(
model=model_name,
temperature=kwargs.get("temperature", 0.0),
base_url=base_url,
api_key=api_key,
)
elif provider == "gemini":
api_key = kwargs.get("api_key", os.getenv("GOOGLE_API_KEY", ""))
return ChatGoogleGenerativeAI(
model=kwargs.get("model_name", "gemini-2.0-flash-exp"),
temperature=kwargs.get("temperature", 0.0),
google_api_key=api_key,
)
elif provider == "ollama":
return ChatOllama(
model=kwargs.get("model_name", "qwen2.5:7b"),
temperature=kwargs.get("temperature", 0.0),
num_ctx=kwargs.get("num_ctx", 32000),
base_url=kwargs.get("base_url", "http://localhost:11434"),
)
elif provider == "azure_openai":
base_url = kwargs.get("base_url", os.getenv("AZURE_OPENAI_ENDPOINT", ""))
api_key = kwargs.get("api_key", os.getenv("AZURE_OPENAI_API_KEY", ""))
return AzureChatOpenAI(
model=kwargs.get("model_name", "gpt-4o"),
temperature=kwargs.get("temperature", 0.0),
api_version="2024-05-01-preview",
azure_endpoint=base_url,
api_key=api_key,
)
else:
raise ValueError(f"Unsupported provider: {provider}")
model_names = {
"anthropic": ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229"],
"openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
"deepseek": ["deepseek-chat", "deepseek-reasoner"],
"gemini": ["gemini-2.0-flash-exp", "gemini-1.5-flash-latest"],
"ollama": ["qwen2.5:7b", "llama2:7b"],
"azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
}
def update_model_dropdown(llm_provider, api_key=None, base_url=None):
if not api_key:
api_key = os.getenv(f"{llm_provider.upper()}_API_KEY", "")
if not base_url:
base_url = os.getenv(f"{llm_provider.upper()}_BASE_URL", "")
if llm_provider in model_names:
return gr.Dropdown(choices=model_names[llm_provider], value=model_names[llm_provider][0], interactive=True)
else:
return gr.Dropdown(choices=[], value="", interactive=True, allow_custom_value=True)
def encode_image(img_path):
if not img_path:
return None
with open(img_path, "rb") as fin:
return base64.b64encode(fin.read()).decode("utf-8")
def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
latest_files = {ext: None for ext in file_types}
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
return latest_files
for file_type in file_types:
try:
matches = list(Path(directory).rglob(f"*{file_type}"))
if matches:
latest = max(matches, key=lambda p: p.stat().st_mtime)
if time.time() - latest.stat().st_mtime > 1.0:
latest_files[file_type] = str(latest)
except Exception as e:
print(f"Error getting latest {file_type} file: {e}")
return latest_files
def capture_screenshot(browser_context):
try:
playwright_browser = browser_context.browser.playwright_browser
if not playwright_browser or not playwright_browser.contexts:
return None
playwright_context = playwright_browser.contexts[0]
pages = playwright_context.pages if playwright_context else []
active_page = next((page for page in pages if page.url != "about:blank"), pages[0] if pages else None)
if not active_page:
return None
screenshot = active_page.screenshot(type='jpeg', quality=75, scale="css")
return base64.b64encode(screenshot).decode('utf-8')
except Exception as e:
print(f"Error capturing screenshot: {e}")
return None
|