| import base64
|
| import os
|
| import time
|
| from pathlib import Path
|
| from typing import Dict, Optional
|
|
|
| from langchain_anthropic import ChatAnthropic
|
| from langchain_google_genai import ChatGoogleGenerativeAI
|
| from langchain_ollama import ChatOllama
|
| from langchain_openai import AzureChatOpenAI, ChatOpenAI
|
| import gradio as gr
|
|
|
| from .llm import DeepSeekR1ChatOpenAI
|
|
|
| def get_llm_model(provider: str, **kwargs):
|
| """
|
| 获取LLM 模型
|
| :param provider: 模型类型
|
| :param kwargs:
|
| :return:
|
| """
|
| if provider == "anthropic":
|
| if not kwargs.get("base_url", ""):
|
| base_url = "https://api.anthropic.com"
|
| else:
|
| base_url = kwargs.get("base_url")
|
|
|
| if not kwargs.get("api_key", ""):
|
| api_key = os.getenv("ANTHROPIC_API_KEY", "")
|
| else:
|
| api_key = kwargs.get("api_key")
|
|
|
| return ChatAnthropic(
|
| model_name=kwargs.get("model_name", "claude-3-5-sonnet-20240620"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| base_url=base_url,
|
| api_key=api_key,
|
| )
|
| elif provider == "openai":
|
| if not kwargs.get("base_url", ""):
|
| base_url = os.getenv("OPENAI_ENDPOINT", "https://api.openai.com/v1")
|
| else:
|
| base_url = kwargs.get("base_url")
|
|
|
| if not kwargs.get("api_key", ""):
|
| api_key = os.getenv("OPENAI_API_KEY", "")
|
| else:
|
| api_key = kwargs.get("api_key")
|
|
|
| return ChatOpenAI(
|
| model=kwargs.get("model_name", "gpt-4o"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| base_url=base_url,
|
| api_key=api_key,
|
| )
|
| elif provider == "deepseek":
|
| if not kwargs.get("base_url", ""):
|
| base_url = os.getenv("DEEPSEEK_ENDPOINT", "")
|
| else:
|
| base_url = kwargs.get("base_url")
|
|
|
| if not kwargs.get("api_key", ""):
|
| api_key = os.getenv("DEEPSEEK_API_KEY", "")
|
| else:
|
| api_key = kwargs.get("api_key")
|
|
|
| if kwargs.get("model_name", "deepseek-chat") == "deepseek-reasoner":
|
| return DeepSeekR1ChatOpenAI(
|
| model=kwargs.get("model_name", "deepseek-reasoner"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| base_url=base_url,
|
| api_key=api_key,
|
| )
|
| else:
|
| return ChatOpenAI(
|
| model=kwargs.get("model_name", "deepseek-chat"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| base_url=base_url,
|
| api_key=api_key,
|
| )
|
| elif provider == "gemini":
|
| if not kwargs.get("api_key", ""):
|
| api_key = os.getenv("GOOGLE_API_KEY", "")
|
| else:
|
| api_key = kwargs.get("api_key")
|
| return ChatGoogleGenerativeAI(
|
| model=kwargs.get("model_name", "gemini-2.0-flash-exp"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| google_api_key=api_key,
|
| )
|
| elif provider == "ollama":
|
| return ChatOllama(
|
| model=kwargs.get("model_name", "qwen2.5:7b"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| num_ctx=kwargs.get("num_ctx", 32000),
|
| base_url=kwargs.get("base_url", "http://localhost:11434"),
|
| )
|
| elif provider == "azure_openai":
|
| if not kwargs.get("base_url", ""):
|
| base_url = os.getenv("AZURE_OPENAI_ENDPOINT", "")
|
| else:
|
| base_url = kwargs.get("base_url")
|
| if not kwargs.get("api_key", ""):
|
| api_key = os.getenv("AZURE_OPENAI_API_KEY", "")
|
| else:
|
| api_key = kwargs.get("api_key")
|
| return AzureChatOpenAI(
|
| model=kwargs.get("model_name", "gpt-4o"),
|
| temperature=kwargs.get("temperature", 0.0),
|
| api_version="2024-05-01-preview",
|
| azure_endpoint=base_url,
|
| api_key=api_key,
|
| )
|
| else:
|
| raise ValueError(f"Unsupported provider: {provider}")
|
|
|
|
|
| model_names = {
|
| "anthropic": ["claude-3-5-sonnet-20240620", "claude-3-opus-20240229"],
|
| "openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"],
|
| "deepseek": ["deepseek-chat", "deepseek-reasoner"],
|
| "gemini": ["gemini-2.0-flash-exp", "gemini-2.0-flash-thinking-exp", "gemini-1.5-flash-latest", "gemini-1.5-flash-8b-latest", "gemini-2.0-flash-thinking-exp-1219" ],
|
| "ollama": ["qwen2.5:7b", "llama2:7b"],
|
| "azure_openai": ["gpt-4o", "gpt-4", "gpt-3.5-turbo"]
|
| }
|
|
|
|
|
| def update_model_dropdown(llm_provider, api_key=None, base_url=None):
|
| """
|
| Update the model name dropdown with predefined models for the selected provider.
|
| """
|
|
|
| if not api_key:
|
| api_key = os.getenv(f"{llm_provider.upper()}_API_KEY", "")
|
| if not base_url:
|
| base_url = os.getenv(f"{llm_provider.upper()}_BASE_URL", "")
|
|
|
|
|
| if llm_provider in model_names:
|
| return gr.Dropdown(choices=model_names[llm_provider], value=model_names[llm_provider][0], interactive=True)
|
| else:
|
| return gr.Dropdown(choices=[], value="", interactive=True, allow_custom_value=True)
|
|
|
| def encode_image(img_path):
|
| if not img_path:
|
| return None
|
| with open(img_path, "rb") as fin:
|
| image_data = base64.b64encode(fin.read()).decode("utf-8")
|
| return image_data
|
|
|
|
|
| def get_latest_files(directory: str, file_types: list = ['.webm', '.zip']) -> Dict[str, Optional[str]]:
|
| """Get the latest recording and trace files"""
|
| latest_files: Dict[str, Optional[str]] = {ext: None for ext in file_types}
|
|
|
| if not os.path.exists(directory):
|
| os.makedirs(directory, exist_ok=True)
|
| return latest_files
|
|
|
| for file_type in file_types:
|
| try:
|
| matches = list(Path(directory).rglob(f"*{file_type}"))
|
| if matches:
|
| latest = max(matches, key=lambda p: p.stat().st_mtime)
|
|
|
| if time.time() - latest.stat().st_mtime > 1.0:
|
| latest_files[file_type] = str(latest)
|
| except Exception as e:
|
| print(f"Error getting latest {file_type} file: {e}")
|
|
|
| return latest_files
|
| async def capture_screenshot(browser_context):
|
| """Capture and encode a screenshot"""
|
|
|
| playwright_browser = browser_context.browser.playwright_browser
|
|
|
|
|
| if playwright_browser and playwright_browser.contexts:
|
| playwright_context = playwright_browser.contexts[0]
|
| else:
|
| return None
|
|
|
|
|
| pages = None
|
| if playwright_context:
|
| pages = playwright_context.pages
|
|
|
|
|
| if pages:
|
| active_page = pages[0]
|
| for page in pages:
|
| if page.url != "about:blank":
|
| active_page = page
|
| else:
|
| return None
|
|
|
|
|
| try:
|
| screenshot = await active_page.screenshot(
|
| type='jpeg',
|
| quality=75,
|
| scale="css"
|
| )
|
| encoded = base64.b64encode(screenshot).decode('utf-8')
|
| return encoded
|
| except Exception as e:
|
| return None
|
|
|