from __future__ import annotations import os import inspect import sys from pathlib import Path import gradio as gr from dotenv import load_dotenv from pypdf import PdfReader BASE_DIR = Path(__file__).resolve().parent PROJECT_ROOT = BASE_DIR.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from router import build_model_route load_dotenv() PROXY_URL = os.environ.get("HTTP_PROXY") or None DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "glm/4.7-flash") MAX_UPLOAD_CHARS = int(os.environ.get("MAX_UPLOAD_CHARS", "40000")) PROVIDER_CHOICES = [ ("GitHub Models · GPT-4.1 Mini", "github/gpt-4.1-mini"), ("Gemini · 2.5 Flash", "gemini/2.5-flash"), ("GLM · 4.7 Flash", "glm/4.7-flash"), ] PROVIDER_OPTIONS = [value for _, value in PROVIDER_CHOICES] def _supports_kwarg(callable_obj: object, kwarg_name: str) -> bool: try: return kwarg_name in inspect.signature(callable_obj).parameters except (TypeError, ValueError): return False CHATBOT_USES_MESSAGES = _supports_kwarg(gr.Chatbot, "type") def _normalize_uploads(files: object | None) -> list[str]: if not files: return [] if isinstance(files, (str, Path)): return [str(files)] normalized: list[str] = [] for file_item in files if isinstance(files, list) else [files]: if not file_item: continue if isinstance(file_item, (str, Path)): normalized.append(str(file_item)) continue file_path = getattr(file_item, "path", None) or getattr(file_item, "name", None) if file_path: normalized.append(str(file_path)) return normalized def _read_text_file(file_path: Path) -> str: return file_path.read_text(encoding="utf-8", errors="ignore") def _read_pdf_file(file_path: Path) -> str: reader = PdfReader(str(file_path)) pages: list[str] = [] for page in reader.pages: text = page.extract_text() or "" if text.strip(): pages.append(text) return "\n\n".join(pages) def read_uploaded_context(uploaded_files: object | None) -> tuple[str, str]: file_paths = _normalize_uploads(uploaded_files) if not file_paths: return "", "No uploads attached." parts: list[str] = [] summary_lines = ["Uploaded context:"] for file_path_str in file_paths: file_path = Path(file_path_str) summary_lines.append(f"- {file_path.name}") try: if file_path.suffix.lower() == ".pdf": extracted = _read_pdf_file(file_path) else: extracted = _read_text_file(file_path) except Exception as exc: parts.append(f"[Could not read {file_path.name}: {exc}]") continue extracted = extracted.strip() if not extracted: parts.append(f"[No readable text found in {file_path.name}]") continue if len(extracted) > MAX_UPLOAD_CHARS: extracted = extracted[:MAX_UPLOAD_CHARS] + "\n\n[Truncated for context limit.]" parts.append(f"## {file_path.name}\n\n{extracted}") return "\n\n".join(parts), "\n".join(summary_lines) def build_system_prompt(provider: str, model: str, upload_summary: str, uploaded_context: str) -> str: base_prompt = ( "You are CodeAgent, a helpful assistant in a deployed web UI. " "Local workspace tools, local file read/write, and RAG knowledge base tools are disabled in this deployment. " "You can still help by reading user-uploaded text or PDF files, reasoning over that uploaded text, and generating markdown responses the user can download." ) capability_note = ( f"\n\nCurrent model route: {provider}/{model}. " "If the user asks for unavailable local tools, explain the limitation briefly and continue with the available uploaded context." ) upload_note = f"\n\n{upload_summary}" if uploaded_context.strip(): upload_note += f"\n\nUploaded content:\n{uploaded_context}" return base_prompt + capability_note + upload_note def _format_chatbot_history(history: list[dict[str, str]]) -> object: """Return chat history in the shape expected by the active Gradio Chatbot.""" return history def run_turn( user_message: str, history: list[dict[str, str]], provider: str, uploaded_files: object | None, user_already_in_history: bool = False, ) -> tuple[list[dict[str, str]], str, str]: provider = (provider or DEFAULT_PROVIDER).strip().lower() if provider not in PROVIDER_OPTIONS: raise gr.Error(f"Unknown provider '{provider}'. Choose one of: {', '.join(PROVIDER_OPTIONS)}") provider_name = provider.split("/")[0] try: route = build_model_route(provider_name=provider_name, proxy_url=PROXY_URL) except Exception as exc: raise gr.Error(str(exc)) from exc uploaded_context, upload_summary = read_uploaded_context(uploaded_files) system_prompt = build_system_prompt(provider, route.model, upload_summary, uploaded_context) messages: list[dict[str, str]] = [{"role": "system", "content": system_prompt}] messages.extend(history) if not user_already_in_history: messages.append({"role": "user", "content": user_message}) status = "Model is generating a markdown response. Local tools are disabled in this deployment." if uploaded_context.strip(): status = "Model is generating a markdown response using uploaded text/PDF context. Local tools are disabled in this deployment." response = route.client.chat.completions.create( model=route.model, messages=messages, ) assistant_text = response.choices[0].message.content or "" assistant_text = assistant_text.strip() or "The model returned an empty response." updated_history = history + ([{"role": "user", "content": user_message}] if not user_already_in_history else []) + [ {"role": "assistant", "content": assistant_text}, ] return updated_history, assistant_text, status def add_upload_summary(uploaded_files: object | None) -> str: _, upload_summary = read_uploaded_context(uploaded_files) return f"**Upload status**\n\n{upload_summary}" # Load CSS from file before creating Blocks APP_CSS = "" try: with open(os.path.join(BASE_DIR, "app.css"), "r", encoding="utf-8") as file_handle: APP_CSS = file_handle.read() except Exception: pass blocks_kwargs = {"title": "CodeAgent Web UI"} launch_extra_kwargs = {} theme = gr.themes.Ocean() if _supports_kwarg(gr.Blocks.launch, "css"): launch_extra_kwargs["css"] = APP_CSS else: blocks_kwargs["css"] = APP_CSS if _supports_kwarg(gr.Blocks.launch, "theme"): launch_extra_kwargs["theme"] = theme elif _supports_kwarg(gr.Blocks, "theme"): blocks_kwargs["theme"] = theme with gr.Blocks(**blocks_kwargs) as demo: gr.Markdown( "
AI assistant for research and analysis
" "