Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import os | |
| import inspect | |
| import sys | |
| from pathlib import Path | |
| import gradio as gr | |
| from dotenv import load_dotenv | |
| from pypdf import PdfReader | |
| BASE_DIR = Path(__file__).resolve().parent | |
| PROJECT_ROOT = BASE_DIR.parent | |
| if str(PROJECT_ROOT) not in sys.path: | |
| sys.path.insert(0, str(PROJECT_ROOT)) | |
| from router import build_model_route | |
| load_dotenv() | |
| PROXY_URL = os.environ.get("HTTP_PROXY") or None | |
| DEFAULT_PROVIDER = os.environ.get("DEFAULT_PROVIDER", "glm/4.7-flash") | |
| MAX_UPLOAD_CHARS = int(os.environ.get("MAX_UPLOAD_CHARS", "40000")) | |
| PROVIDER_CHOICES = [ | |
| ("GitHub Models · GPT-4.1 Mini", "github/gpt-4.1-mini"), | |
| ("Gemini · 2.5 Flash", "gemini/2.5-flash"), | |
| ("GLM · 4.7 Flash", "glm/4.7-flash"), | |
| ] | |
| PROVIDER_OPTIONS = [value for _, value in PROVIDER_CHOICES] | |
| def _supports_kwarg(callable_obj: object, kwarg_name: str) -> bool: | |
| try: | |
| return kwarg_name in inspect.signature(callable_obj).parameters | |
| except (TypeError, ValueError): | |
| return False | |
| CHATBOT_USES_MESSAGES = _supports_kwarg(gr.Chatbot, "type") | |
| def _normalize_uploads(files: object | None) -> list[str]: | |
| if not files: | |
| return [] | |
| if isinstance(files, (str, Path)): | |
| return [str(files)] | |
| normalized: list[str] = [] | |
| for file_item in files if isinstance(files, list) else [files]: | |
| if not file_item: | |
| continue | |
| if isinstance(file_item, (str, Path)): | |
| normalized.append(str(file_item)) | |
| continue | |
| file_path = getattr(file_item, "path", None) or getattr(file_item, "name", None) | |
| if file_path: | |
| normalized.append(str(file_path)) | |
| return normalized | |
| def _read_text_file(file_path: Path) -> str: | |
| return file_path.read_text(encoding="utf-8", errors="ignore") | |
| def _read_pdf_file(file_path: Path) -> str: | |
| reader = PdfReader(str(file_path)) | |
| pages: list[str] = [] | |
| for page in reader.pages: | |
| text = page.extract_text() or "" | |
| if text.strip(): | |
| pages.append(text) | |
| return "\n\n".join(pages) | |
| def read_uploaded_context(uploaded_files: object | None) -> tuple[str, str]: | |
| file_paths = _normalize_uploads(uploaded_files) | |
| if not file_paths: | |
| return "", "No uploads attached." | |
| parts: list[str] = [] | |
| summary_lines = ["Uploaded context:"] | |
| for file_path_str in file_paths: | |
| file_path = Path(file_path_str) | |
| summary_lines.append(f"- {file_path.name}") | |
| try: | |
| if file_path.suffix.lower() == ".pdf": | |
| extracted = _read_pdf_file(file_path) | |
| else: | |
| extracted = _read_text_file(file_path) | |
| except Exception as exc: | |
| parts.append(f"[Could not read {file_path.name}: {exc}]") | |
| continue | |
| extracted = extracted.strip() | |
| if not extracted: | |
| parts.append(f"[No readable text found in {file_path.name}]") | |
| continue | |
| if len(extracted) > MAX_UPLOAD_CHARS: | |
| extracted = extracted[:MAX_UPLOAD_CHARS] + "\n\n[Truncated for context limit.]" | |
| parts.append(f"## {file_path.name}\n\n{extracted}") | |
| return "\n\n".join(parts), "\n".join(summary_lines) | |
| def build_system_prompt(provider: str, model: str, upload_summary: str, uploaded_context: str) -> str: | |
| base_prompt = ( | |
| "You are CodeAgent, a helpful assistant in a deployed web UI. " | |
| "Local workspace tools, local file read/write, and RAG knowledge base tools are disabled in this deployment. " | |
| "You can still help by reading user-uploaded text or PDF files, reasoning over that uploaded text, and generating markdown responses the user can download." | |
| ) | |
| capability_note = ( | |
| f"\n\nCurrent model route: {provider}/{model}. " | |
| "If the user asks for unavailable local tools, explain the limitation briefly and continue with the available uploaded context." | |
| ) | |
| upload_note = f"\n\n{upload_summary}" | |
| if uploaded_context.strip(): | |
| upload_note += f"\n\nUploaded content:\n{uploaded_context}" | |
| return base_prompt + capability_note + upload_note | |
| def _format_chatbot_history(history: list[dict[str, str]]) -> object: | |
| """Return chat history in the shape expected by the active Gradio Chatbot.""" | |
| return history | |
| def run_turn( | |
| user_message: str, | |
| history: list[dict[str, str]], | |
| provider: str, | |
| uploaded_files: object | None, | |
| user_already_in_history: bool = False, | |
| ) -> tuple[list[dict[str, str]], str, str]: | |
| provider = (provider or DEFAULT_PROVIDER).strip().lower() | |
| if provider not in PROVIDER_OPTIONS: | |
| raise gr.Error(f"Unknown provider '{provider}'. Choose one of: {', '.join(PROVIDER_OPTIONS)}") | |
| provider_name = provider.split("/")[0] | |
| try: | |
| route = build_model_route(provider_name=provider_name, proxy_url=PROXY_URL) | |
| except Exception as exc: | |
| raise gr.Error(str(exc)) from exc | |
| uploaded_context, upload_summary = read_uploaded_context(uploaded_files) | |
| system_prompt = build_system_prompt(provider, route.model, upload_summary, uploaded_context) | |
| messages: list[dict[str, str]] = [{"role": "system", "content": system_prompt}] | |
| messages.extend(history) | |
| if not user_already_in_history: | |
| messages.append({"role": "user", "content": user_message}) | |
| status = "Model is generating a markdown response. Local tools are disabled in this deployment." | |
| if uploaded_context.strip(): | |
| status = "Model is generating a markdown response using uploaded text/PDF context. Local tools are disabled in this deployment." | |
| response = route.client.chat.completions.create( | |
| model=route.model, | |
| messages=messages, | |
| ) | |
| assistant_text = response.choices[0].message.content or "" | |
| assistant_text = assistant_text.strip() or "The model returned an empty response." | |
| updated_history = history + ([{"role": "user", "content": user_message}] if not user_already_in_history else []) + [ | |
| {"role": "assistant", "content": assistant_text}, | |
| ] | |
| return updated_history, assistant_text, status | |
| def add_upload_summary(uploaded_files: object | None) -> str: | |
| _, upload_summary = read_uploaded_context(uploaded_files) | |
| return f"**Upload status**\n\n{upload_summary}" | |
| # Load CSS from file before creating Blocks | |
| APP_CSS = "" | |
| try: | |
| with open(os.path.join(BASE_DIR, "app.css"), "r", encoding="utf-8") as file_handle: | |
| APP_CSS = file_handle.read() | |
| except Exception: | |
| pass | |
| blocks_kwargs = {"title": "CodeAgent Web UI"} | |
| launch_extra_kwargs = {} | |
| theme = gr.themes.Ocean() | |
| if _supports_kwarg(gr.Blocks.launch, "css"): | |
| launch_extra_kwargs["css"] = APP_CSS | |
| else: | |
| blocks_kwargs["css"] = APP_CSS | |
| if _supports_kwarg(gr.Blocks.launch, "theme"): | |
| launch_extra_kwargs["theme"] = theme | |
| elif _supports_kwarg(gr.Blocks, "theme"): | |
| blocks_kwargs["theme"] = theme | |
| with gr.Blocks(**blocks_kwargs) as demo: | |
| gr.Markdown( | |
| "<div id='app-title'>" | |
| "<h1>CodeAgent</h1>" | |
| "<p>AI assistant for research and analysis</p>" | |
| "</div>", | |
| elem_id="app-title", | |
| ) | |
| gr.Markdown("<div id='title-divider'></div>", elem_id="title-divider-wrap") | |
| chat_state = gr.State([]) | |
| model_panel_visible = gr.State(False) | |
| upload_panel_visible = gr.State(False) | |
| pending_user_message = gr.State("") | |
| chatbot_kwargs = {"label": "Conversation", "height": 560, "show_label": False} | |
| if CHATBOT_USES_MESSAGES: | |
| chatbot_kwargs["type"] = "messages" | |
| chatbot = gr.Chatbot(**chatbot_kwargs, elem_id="chatbot") | |
| status_box = gr.Markdown("Ready. Send a message to start.", elem_id="status-box") | |
| with gr.Row(scale=1, elem_id="composer-row"): | |
| textbox_kwargs = { | |
| "label": "Message", | |
| "placeholder": "Message CodeAgent", | |
| "lines": 1, | |
| "scale": 8, | |
| "show_label": False, | |
| "container": False, | |
| } | |
| if _supports_kwarg(gr.Textbox, "submit_on_enter"): | |
| textbox_kwargs["submit_on_enter"] = True | |
| user_input = gr.Textbox(**textbox_kwargs) | |
| send_btn = gr.Button("↑", elem_id="send-arrow") | |
| with gr.Row(scale=1, elem_id="settings-row"): | |
| toggle_model_btn = gr.Button("Model settings", scale=1) | |
| toggle_upload_btn = gr.Button("Upload files", scale=1) | |
| with gr.Column(visible=False, elem_id="model-panel") as model_panel: | |
| provider = gr.Dropdown( | |
| choices=PROVIDER_CHOICES, | |
| value=DEFAULT_PROVIDER if DEFAULT_PROVIDER in PROVIDER_OPTIONS else PROVIDER_OPTIONS[0], | |
| label="Model provider", | |
| info="Choose where the request is sent", | |
| ) | |
| with gr.Column(visible=False, elem_id="upload-panel") as upload_panel: | |
| upload_box = gr.File( | |
| label="Upload text or PDF", | |
| file_count="multiple", | |
| type="filepath", | |
| ) | |
| upload_summary = gr.Markdown("**Upload status**\n\nNo uploads attached.") | |
| upload_box.change( | |
| fn=add_upload_summary, | |
| inputs=[upload_box], | |
| outputs=[upload_summary], | |
| ) | |
| def toggle_model_panel(is_visible: bool): | |
| new_visible = not is_visible | |
| return new_visible, gr.update(visible=new_visible) | |
| def toggle_upload_panel(is_visible: bool): | |
| new_visible = not is_visible | |
| return new_visible, gr.update(visible=new_visible) | |
| toggle_model_btn.click( | |
| fn=toggle_model_panel, | |
| inputs=[model_panel_visible], | |
| outputs=[model_panel_visible, model_panel], | |
| ) | |
| toggle_upload_btn.click( | |
| fn=toggle_upload_panel, | |
| inputs=[upload_panel_visible], | |
| outputs=[upload_panel_visible, upload_panel], | |
| ) | |
| def handle_message(user_text: str, history: list[dict[str, str]], selected_provider: str, files: object | None): | |
| if not user_text or not user_text.strip(): | |
| raise gr.Error("Please enter a message.") | |
| current_history = history or [] | |
| stripped_text = user_text.strip() | |
| updated_history = current_history + [{"role": "user", "content": stripped_text}] | |
| yield updated_history, _format_chatbot_history(updated_history), "Model is generating a response...", "" | |
| final_history, assistant_text, status = run_turn( | |
| stripped_text, | |
| current_history, | |
| selected_provider, | |
| files, | |
| user_already_in_history=False, | |
| ) | |
| yield final_history, _format_chatbot_history(final_history), status, "" | |
| send_btn.click( | |
| fn=handle_message, | |
| inputs=[user_input, chat_state, provider, upload_box], | |
| outputs=[chat_state, chatbot, status_box, user_input], | |
| show_progress="hidden" | |
| ) | |
| user_input.submit( | |
| fn=handle_message, | |
| inputs=[user_input, chat_state, provider, upload_box], | |
| outputs=[chat_state, chatbot, status_box, user_input], | |
| show_progress="hidden" | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=int(os.environ.get("PORT", "7860")), | |
| **launch_extra_kwargs, | |
| ) | |