CodeAgent / app.py
ThuanLuong's picture
zzzz
5e10300
Raw
History Blame Contribute Delete
11.1 kB
from __future__ import annotations
import os
import inspect
import sys
from pathlib import Path
import gradio as gr
from dotenv import load_dotenv
from pypdf import PdfReader
BASE_DIR = Path(__file__).resolve().parent
PROJECT_ROOT = BASE_DIR.parent
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from router import build_model_route
load_dotenv()
PROXY_URL = os.environ.get("HTTP_PROXY") or None
DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "glm/4.7-flash")
MAX_UPLOAD_CHARS = int(os.environ.get("MAX_UPLOAD_CHARS", "40000"))
PROVIDER_CHOICES = [
("GitHub Models · GPT-4.1 Mini", "github/gpt-4.1-mini"),
("Gemini · 2.5 Flash", "gemini/2.5-flash"),
("GLM · 4.7 Flash", "glm/4.7-flash"),
]
PROVIDER_OPTIONS = [value for _, value in PROVIDER_CHOICES]
def _supports_kwarg(callable_obj: object, kwarg_name: str) -> bool:
try:
return kwarg_name in inspect.signature(callable_obj).parameters
except (TypeError, ValueError):
return False
CHATBOT_USES_MESSAGES = _supports_kwarg(gr.Chatbot, "type")
def _normalize_uploads(files: object | None) -> list[str]:
if not files:
return []
if isinstance(files, (str, Path)):
return [str(files)]
normalized: list[str] = []
for file_item in files if isinstance(files, list) else [files]:
if not file_item:
continue
if isinstance(file_item, (str, Path)):
normalized.append(str(file_item))
continue
file_path = getattr(file_item, "path", None) or getattr(file_item, "name", None)
if file_path:
normalized.append(str(file_path))
return normalized
def _read_text_file(file_path: Path) -> str:
return file_path.read_text(encoding="utf-8", errors="ignore")
def _read_pdf_file(file_path: Path) -> str:
reader = PdfReader(str(file_path))
pages: list[str] = []
for page in reader.pages:
text = page.extract_text() or ""
if text.strip():
pages.append(text)
return "\n\n".join(pages)
def read_uploaded_context(uploaded_files: object | None) -> tuple[str, str]:
file_paths = _normalize_uploads(uploaded_files)
if not file_paths:
return "", "No uploads attached."
parts: list[str] = []
summary_lines = ["Uploaded context:"]
for file_path_str in file_paths:
file_path = Path(file_path_str)
summary_lines.append(f"- {file_path.name}")
try:
if file_path.suffix.lower() == ".pdf":
extracted = _read_pdf_file(file_path)
else:
extracted = _read_text_file(file_path)
except Exception as exc:
parts.append(f"[Could not read {file_path.name}: {exc}]")
continue
extracted = extracted.strip()
if not extracted:
parts.append(f"[No readable text found in {file_path.name}]")
continue
if len(extracted) > MAX_UPLOAD_CHARS:
extracted = extracted[:MAX_UPLOAD_CHARS] + "\n\n[Truncated for context limit.]"
parts.append(f"## {file_path.name}\n\n{extracted}")
return "\n\n".join(parts), "\n".join(summary_lines)
def build_system_prompt(provider: str, model: str, upload_summary: str, uploaded_context: str) -> str:
base_prompt = (
"You are CodeAgent, a helpful assistant in a deployed web UI. "
"Local workspace tools, local file read/write, and RAG knowledge base tools are disabled in this deployment. "
"You can still help by reading user-uploaded text or PDF files, reasoning over that uploaded text, and generating markdown responses the user can download."
)
capability_note = (
f"\n\nCurrent model route: {provider}/{model}. "
"If the user asks for unavailable local tools, explain the limitation briefly and continue with the available uploaded context."
)
upload_note = f"\n\n{upload_summary}"
if uploaded_context.strip():
upload_note += f"\n\nUploaded content:\n{uploaded_context}"
return base_prompt + capability_note + upload_note
def _format_chatbot_history(history: list[dict[str, str]]) -> object:
"""Return chat history in the shape expected by the active Gradio Chatbot."""
return history
def run_turn(
user_message: str,
history: list[dict[str, str]],
provider: str,
uploaded_files: object | None,
user_already_in_history: bool = False,
) -> tuple[list[dict[str, str]], str, str]:
provider = (provider or DEFAULT_PROVIDER).strip().lower()
if provider not in PROVIDER_OPTIONS:
raise gr.Error(f"Unknown provider '{provider}'. Choose one of: {', '.join(PROVIDER_OPTIONS)}")
provider_name = provider.split("/")[0]
try:
route = build_model_route(provider_name=provider_name, proxy_url=PROXY_URL)
except Exception as exc:
raise gr.Error(str(exc)) from exc
uploaded_context, upload_summary = read_uploaded_context(uploaded_files)
system_prompt = build_system_prompt(provider, route.model, upload_summary, uploaded_context)
messages: list[dict[str, str]] = [{"role": "system", "content": system_prompt}]
messages.extend(history)
if not user_already_in_history:
messages.append({"role": "user", "content": user_message})
status = "Model is generating a markdown response. Local tools are disabled in this deployment."
if uploaded_context.strip():
status = "Model is generating a markdown response using uploaded text/PDF context. Local tools are disabled in this deployment."
response = route.client.chat.completions.create(
model=route.model,
messages=messages,
)
assistant_text = response.choices[0].message.content or ""
assistant_text = assistant_text.strip() or "The model returned an empty response."
updated_history = history + ([{"role": "user", "content": user_message}] if not user_already_in_history else []) + [
{"role": "assistant", "content": assistant_text},
]
return updated_history, assistant_text, status
def add_upload_summary(uploaded_files: object | None) -> str:
_, upload_summary = read_uploaded_context(uploaded_files)
return f"**Upload status**\n\n{upload_summary}"
# Load CSS from file before creating Blocks
APP_CSS = ""
try:
with open(os.path.join(BASE_DIR, "app.css"), "r", encoding="utf-8") as file_handle:
APP_CSS = file_handle.read()
except Exception:
pass
blocks_kwargs = {"title": "CodeAgent Web UI"}
launch_extra_kwargs = {}
theme = gr.themes.Ocean()
if _supports_kwarg(gr.Blocks.launch, "css"):
launch_extra_kwargs["css"] = APP_CSS
else:
blocks_kwargs["css"] = APP_CSS
if _supports_kwarg(gr.Blocks.launch, "theme"):
launch_extra_kwargs["theme"] = theme
elif _supports_kwarg(gr.Blocks, "theme"):
blocks_kwargs["theme"] = theme
with gr.Blocks(**blocks_kwargs) as demo:
gr.Markdown(
"<div id='app-title'>"
"<h1>CodeAgent</h1>"
"<p>AI assistant for research and analysis</p>"
"</div>",
elem_id="app-title",
)
gr.Markdown("<div id='title-divider'></div>", elem_id="title-divider-wrap")
chat_state = gr.State([])
model_panel_visible = gr.State(False)
upload_panel_visible = gr.State(False)
pending_user_message = gr.State("")
chatbot_kwargs = {"label": "Conversation", "height": 560, "show_label": False}
if CHATBOT_USES_MESSAGES:
chatbot_kwargs["type"] = "messages"
chatbot = gr.Chatbot(**chatbot_kwargs, elem_id="chatbot")
status_box = gr.Markdown("Ready. Send a message to start.", elem_id="status-box")
with gr.Row(scale=1, elem_id="composer-row"):
textbox_kwargs = {
"label": "Message",
"placeholder": "Message CodeAgent",
"lines": 1,
"scale": 8,
"show_label": False,
"container": False,
}
if _supports_kwarg(gr.Textbox, "submit_on_enter"):
textbox_kwargs["submit_on_enter"] = True
user_input = gr.Textbox(**textbox_kwargs)
send_btn = gr.Button("↑", elem_id="send-arrow")
with gr.Row(scale=1, elem_id="settings-row"):
toggle_model_btn = gr.Button("Model settings", scale=1)
toggle_upload_btn = gr.Button("Upload files", scale=1)
with gr.Column(visible=False, elem_id="model-panel") as model_panel:
provider = gr.Dropdown(
choices=PROVIDER_CHOICES,
value=DEFAULT_PROVIDER if DEFAULT_PROVIDER in PROVIDER_OPTIONS else PROVIDER_OPTIONS[0],
label="Model provider",
info="Choose where the request is sent",
)
with gr.Column(visible=False, elem_id="upload-panel") as upload_panel:
upload_box = gr.File(
label="Upload text or PDF",
file_count="multiple",
type="filepath",
)
upload_summary = gr.Markdown("**Upload status**\n\nNo uploads attached.")
upload_box.change(
fn=add_upload_summary,
inputs=[upload_box],
outputs=[upload_summary],
)
def toggle_model_panel(is_visible: bool):
new_visible = not is_visible
return new_visible, gr.update(visible=new_visible)
def toggle_upload_panel(is_visible: bool):
new_visible = not is_visible
return new_visible, gr.update(visible=new_visible)
toggle_model_btn.click(
fn=toggle_model_panel,
inputs=[model_panel_visible],
outputs=[model_panel_visible, model_panel],
)
toggle_upload_btn.click(
fn=toggle_upload_panel,
inputs=[upload_panel_visible],
outputs=[upload_panel_visible, upload_panel],
)
def handle_message(user_text: str, history: list[dict[str, str]], selected_provider: str, files: object | None):
if not user_text or not user_text.strip():
raise gr.Error("Please enter a message.")
current_history = history or []
stripped_text = user_text.strip()
updated_history = current_history + [{"role": "user", "content": stripped_text}]
yield updated_history, _format_chatbot_history(updated_history), "Model is generating a response...", ""
final_history, assistant_text, status = run_turn(
stripped_text,
current_history,
selected_provider,
files,
user_already_in_history=False,
)
yield final_history, _format_chatbot_history(final_history), status, ""
send_btn.click(
fn=handle_message,
inputs=[user_input, chat_state, provider, upload_box],
outputs=[chat_state, chatbot, status_box, user_input],
show_progress="hidden"
)
user_input.submit(
fn=handle_message,
inputs=[user_input, chat_state, provider, upload_box],
outputs=[chat_state, chatbot, status_box, user_input],
show_progress="hidden"
)
if __name__ == "__main__":
demo.queue().launch(
server_name="0.0.0.0",
server_port=int(os.environ.get("PORT", "7860")),
**launch_extra_kwargs,
)