from __future__ import annotations import os import inspect import sys from pathlib import Path import gradio as gr from dotenv import load_dotenv from pypdf import PdfReader BASE_DIR = Path(__file__).resolve().parent PROJECT_ROOT = BASE_DIR.parent if str(PROJECT_ROOT) not in sys.path: sys.path.insert(0, str(PROJECT_ROOT)) from router import build_model_route load_dotenv() PROXY_URL = os.environ.get("HTTP_PROXY") or None DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "glm/4.7-flash") MAX_UPLOAD_CHARS = int(os.environ.get("MAX_UPLOAD_CHARS", "40000")) PROVIDER_CHOICES = [ ("GitHub Models · GPT-4.1 Mini", "github/gpt-4.1-mini"), ("Gemini · 2.5 Flash", "gemini/2.5-flash"), ("GLM · 4.7 Flash", "glm/4.7-flash"), ] PROVIDER_OPTIONS = [value for _, value in PROVIDER_CHOICES] def _supports_kwarg(callable_obj: object, kwarg_name: str) -> bool: try: return kwarg_name in inspect.signature(callable_obj).parameters except (TypeError, ValueError): return False CHATBOT_USES_MESSAGES = _supports_kwarg(gr.Chatbot, "type") def _normalize_uploads(files: object | None) -> list[str]: if not files: return [] if isinstance(files, (str, Path)): return [str(files)] normalized: list[str] = [] for file_item in files if isinstance(files, list) else [files]: if not file_item: continue if isinstance(file_item, (str, Path)): normalized.append(str(file_item)) continue file_path = getattr(file_item, "path", None) or getattr(file_item, "name", None) if file_path: normalized.append(str(file_path)) return normalized def _read_text_file(file_path: Path) -> str: return file_path.read_text(encoding="utf-8", errors="ignore") def _read_pdf_file(file_path: Path) -> str: reader = PdfReader(str(file_path)) pages: list[str] = [] for page in reader.pages: text = page.extract_text() or "" if text.strip(): pages.append(text) return "\n\n".join(pages) def read_uploaded_context(uploaded_files: object | None) -> tuple[str, str]: file_paths = _normalize_uploads(uploaded_files) if not file_paths: return "", "No uploads attached." parts: list[str] = [] summary_lines = ["Uploaded context:"] for file_path_str in file_paths: file_path = Path(file_path_str) summary_lines.append(f"- {file_path.name}") try: if file_path.suffix.lower() == ".pdf": extracted = _read_pdf_file(file_path) else: extracted = _read_text_file(file_path) except Exception as exc: parts.append(f"[Could not read {file_path.name}: {exc}]") continue extracted = extracted.strip() if not extracted: parts.append(f"[No readable text found in {file_path.name}]") continue if len(extracted) > MAX_UPLOAD_CHARS: extracted = extracted[:MAX_UPLOAD_CHARS] + "\n\n[Truncated for context limit.]" parts.append(f"## {file_path.name}\n\n{extracted}") return "\n\n".join(parts), "\n".join(summary_lines) def build_system_prompt(provider: str, model: str, upload_summary: str, uploaded_context: str) -> str: base_prompt = ( "You are CodeAgent, a helpful assistant in a deployed web UI. " "Local workspace tools, local file read/write, and RAG knowledge base tools are disabled in this deployment. " "You can still help by reading user-uploaded text or PDF files, reasoning over that uploaded text, and generating markdown responses the user can download." ) capability_note = ( f"\n\nCurrent model route: {provider}/{model}. " "If the user asks for unavailable local tools, explain the limitation briefly and continue with the available uploaded context." ) upload_note = f"\n\n{upload_summary}" if uploaded_context.strip(): upload_note += f"\n\nUploaded content:\n{uploaded_context}" return base_prompt + capability_note + upload_note def _format_chatbot_history(history: list[dict[str, str]]) -> object: """Return chat history in the shape expected by the active Gradio Chatbot.""" return history def run_turn( user_message: str, history: list[dict[str, str]], provider: str, uploaded_files: object | None, user_already_in_history: bool = False, ) -> tuple[list[dict[str, str]], str, str]: provider = (provider or DEFAULT_PROVIDER).strip().lower() if provider not in PROVIDER_OPTIONS: raise gr.Error(f"Unknown provider '{provider}'. Choose one of: {', '.join(PROVIDER_OPTIONS)}") provider_name = provider.split("/")[0] try: route = build_model_route(provider_name=provider_name, proxy_url=PROXY_URL) except Exception as exc: raise gr.Error(str(exc)) from exc uploaded_context, upload_summary = read_uploaded_context(uploaded_files) system_prompt = build_system_prompt(provider, route.model, upload_summary, uploaded_context) messages: list[dict[str, str]] = [{"role": "system", "content": system_prompt}] messages.extend(history) if not user_already_in_history: messages.append({"role": "user", "content": user_message}) status = "Model is generating a markdown response. Local tools are disabled in this deployment." if uploaded_context.strip(): status = "Model is generating a markdown response using uploaded text/PDF context. Local tools are disabled in this deployment." response = route.client.chat.completions.create( model=route.model, messages=messages, ) assistant_text = response.choices[0].message.content or "" assistant_text = assistant_text.strip() or "The model returned an empty response." updated_history = history + ([{"role": "user", "content": user_message}] if not user_already_in_history else []) + [ {"role": "assistant", "content": assistant_text}, ] return updated_history, assistant_text, status def add_upload_summary(uploaded_files: object | None) -> str: _, upload_summary = read_uploaded_context(uploaded_files) return f"**Upload status**\n\n{upload_summary}" # Load CSS from file before creating Blocks APP_CSS = "" try: with open(os.path.join(BASE_DIR, "app.css"), "r", encoding="utf-8") as file_handle: APP_CSS = file_handle.read() except Exception: pass blocks_kwargs = {"title": "CodeAgent Web UI"} launch_extra_kwargs = {} theme = gr.themes.Ocean() if _supports_kwarg(gr.Blocks.launch, "css"): launch_extra_kwargs["css"] = APP_CSS else: blocks_kwargs["css"] = APP_CSS if _supports_kwarg(gr.Blocks.launch, "theme"): launch_extra_kwargs["theme"] = theme elif _supports_kwarg(gr.Blocks, "theme"): blocks_kwargs["theme"] = theme with gr.Blocks(**blocks_kwargs) as demo: gr.Markdown( "
" "

CodeAgent

" "

AI assistant for research and analysis

" "
", elem_id="app-title", ) gr.Markdown("
", elem_id="title-divider-wrap") chat_state = gr.State([]) model_panel_visible = gr.State(False) upload_panel_visible = gr.State(False) pending_user_message = gr.State("") chatbot_kwargs = {"label": "Conversation", "height": 560, "show_label": False} if CHATBOT_USES_MESSAGES: chatbot_kwargs["type"] = "messages" chatbot = gr.Chatbot(**chatbot_kwargs, elem_id="chatbot") status_box = gr.Markdown("Ready. Send a message to start.", elem_id="status-box") with gr.Row(scale=1, elem_id="composer-row"): textbox_kwargs = { "label": "Message", "placeholder": "Message CodeAgent", "lines": 1, "scale": 8, "show_label": False, "container": False, } if _supports_kwarg(gr.Textbox, "submit_on_enter"): textbox_kwargs["submit_on_enter"] = True user_input = gr.Textbox(**textbox_kwargs) send_btn = gr.Button("↑", elem_id="send-arrow") with gr.Row(scale=1, elem_id="settings-row"): toggle_model_btn = gr.Button("Model settings", scale=1) toggle_upload_btn = gr.Button("Upload files", scale=1) with gr.Column(visible=False, elem_id="model-panel") as model_panel: provider = gr.Dropdown( choices=PROVIDER_CHOICES, value=DEFAULT_PROVIDER if DEFAULT_PROVIDER in PROVIDER_OPTIONS else PROVIDER_OPTIONS[0], label="Model provider", info="Choose where the request is sent", ) with gr.Column(visible=False, elem_id="upload-panel") as upload_panel: upload_box = gr.File( label="Upload text or PDF", file_count="multiple", type="filepath", ) upload_summary = gr.Markdown("**Upload status**\n\nNo uploads attached.") upload_box.change( fn=add_upload_summary, inputs=[upload_box], outputs=[upload_summary], ) def toggle_model_panel(is_visible: bool): new_visible = not is_visible return new_visible, gr.update(visible=new_visible) def toggle_upload_panel(is_visible: bool): new_visible = not is_visible return new_visible, gr.update(visible=new_visible) toggle_model_btn.click( fn=toggle_model_panel, inputs=[model_panel_visible], outputs=[model_panel_visible, model_panel], ) toggle_upload_btn.click( fn=toggle_upload_panel, inputs=[upload_panel_visible], outputs=[upload_panel_visible, upload_panel], ) def handle_message(user_text: str, history: list[dict[str, str]], selected_provider: str, files: object | None): if not user_text or not user_text.strip(): raise gr.Error("Please enter a message.") current_history = history or [] stripped_text = user_text.strip() updated_history = current_history + [{"role": "user", "content": stripped_text}] yield updated_history, _format_chatbot_history(updated_history), "Model is generating a response...", "" final_history, assistant_text, status = run_turn( stripped_text, current_history, selected_provider, files, user_already_in_history=False, ) yield final_history, _format_chatbot_history(final_history), status, "" send_btn.click( fn=handle_message, inputs=[user_input, chat_state, provider, upload_box], outputs=[chat_state, chatbot, status_box, user_input], show_progress="hidden" ) user_input.submit( fn=handle_message, inputs=[user_input, chat_state, provider, upload_box], outputs=[chat_state, chatbot, status_box, user_input], show_progress="hidden" ) if __name__ == "__main__": demo.queue().launch( server_name="0.0.0.0", server_port=int(os.environ.get("PORT", "7860")), **launch_extra_kwargs, )