Spaces:
Paused
Paused
| import os | |
| import re | |
| import textwrap | |
| import traceback | |
| import gradio as gr | |
| from openai import OpenAI | |
| # --------------------------- | |
| # Configuration | |
| # --------------------------- | |
| HF_ENV_VAR = "OPENAI_API_KEY" | |
| HF_TOKEN = os.environ.get(HF_ENV_VAR) | |
| if not HF_TOKEN: | |
| raise RuntimeError( | |
| f"Environment variable {HF_ENV_VAR} not found. Set {HF_ENV_VAR} before running." | |
| ) | |
| MODEL_ID = "openai/gpt-oss-20b" | |
| client = OpenAI(base_url="https://router.huggingface.co/v1", api_key=HF_TOKEN) | |
| # --------------------------- | |
| # Load research files (size-capped) | |
| # --------------------------- | |
| MAX_RESEARCH_CHARS = 6000 # adjust to stay within token limits | |
| def load_research_context() -> str: | |
| """Concatenate .txt/.md files in repo root up to MAX_RESEARCH_CHARS.""" | |
| repo_root = os.path.dirname(os.path.abspath(__file__)) | |
| chunks, total = [], 0 | |
| for fname in sorted(os.listdir(repo_root)): | |
| if fname.lower().endswith((".txt", ".md")): | |
| with open(os.path.join(repo_root, fname), "r", encoding="utf-8") as f: | |
| txt = f.read() | |
| if total + len(txt) > MAX_RESEARCH_CHARS: | |
| txt = txt[: MAX_RESEARCH_CHARS - total] | |
| chunks.append(f"\n--- {fname} ---\n{txt}") | |
| total += len(txt) | |
| if total >= MAX_RESEARCH_CHARS: | |
| break | |
| return "\n".join(chunks) | |
| RESEARCH_CONTEXT = load_research_context() | |
| # --------------------------- | |
| # System prompt | |
| # --------------------------- | |
| def get_system_prompt(mode: str = "chat") -> str: | |
| base = textwrap.dedent(f""" | |
| You are OhamLab AI, a Dialectical Agentic CrossSphere Intelligence AI. | |
| IMPORTANT GUIDELINES: | |
| - Deliver a complete, self-contained answer in one message whenever possible. | |
| - If the user types "continue", resume exactly where you left off and DO NOT repeat previously provided text. | |
| - NEVER include runnable code blocks, scripts, or function/class definitions unless the user explicitly asks. | |
| - If code is requested, ask one clarifying question if needed, then provide code. | |
| - Express algorithms in plain English unless code is explicitly requested. | |
| - If uncertain, label speculation and state confidence. | |
| Mode: {mode} | |
| --- RESEARCH CONTEXT (TRIMMED) --- | |
| {RESEARCH_CONTEXT} | |
| --- END CONTEXT --- | |
| """).strip() | |
| return base | |
| # --------------------------- | |
| # Conversation state | |
| # --------------------------- | |
| conversation_mode = "chat" | |
| history_messages = [{"role": "system", "content": get_system_prompt("chat")}] | |
| chat_history_for_ui = [] | |
| MAX_HISTORY_CHARS = 9000 | |
| def trim_history_by_chars(msgs, max_chars=MAX_HISTORY_CHARS): | |
| if not msgs: | |
| return msgs | |
| system = msgs[0] | |
| tail, total = [], len(system["content"]) | |
| for m in reversed(msgs[1:]): | |
| seg_len = len(m.get("content", "")) | |
| if total + seg_len > max_chars: | |
| break | |
| tail.append(m) | |
| total += seg_len | |
| return [system] + list(reversed(tail)) | |
| # --------------------------- | |
| # Code-detection utilities | |
| # --------------------------- | |
| CODE_PATTERN = re.compile( | |
| r"(```|```[a-zA-Z]*|^\s*def\s+|^\s*class\s+|^\s*import\s+|#include\s+|<\?php|<html>|^\s*using\s+namespace)", | |
| re.MULTILINE, | |
| ) | |
| def is_code_like(text: str) -> bool: | |
| if not text: | |
| return False | |
| if CODE_PATTERN.search(text): | |
| return True | |
| lines = [ln for ln in text.splitlines() if ln.strip()] | |
| if len(lines) >= 4: | |
| codey = sum( | |
| 1 for ln in lines | |
| if ";" in ln or "{" in ln or "}" in ln or ln.strip().endswith(":") | |
| ) | |
| return codey / len(lines) > 0.35 | |
| return False | |
| def strip_code_blocks(text: str) -> str: | |
| text = re.sub(r"```.*?```", "", text, flags=re.DOTALL) | |
| text_lines = [] | |
| for ln in text.splitlines(): | |
| if re.match(r"^\s*(def |class |import |from |#include|using |<)", ln): | |
| continue | |
| if ln.strip().startswith("```") or ln.strip().endswith("```"): | |
| continue | |
| text_lines.append(ln) | |
| cleaned = "\n".join(text_lines).strip() | |
| return re.sub(r"\n{3,}", "\n\n", cleaned) or \ | |
| "[Content removed: model produced code which has been stripped.]" | |
| # --------------------------- | |
| # Model call helper | |
| # --------------------------- | |
| def call_model_get_response(messages, max_tokens=2000, allow_code=False) -> str: | |
| msgs = trim_history_by_chars(messages) | |
| try: | |
| resp = client.chat.completions.create( | |
| model=MODEL_ID, | |
| messages=msgs, | |
| max_tokens=max_tokens, | |
| temperature=0.7, | |
| ) | |
| content = resp.choices[0].message.content or "" | |
| except Exception as e: | |
| raise | |
| if not allow_code and is_code_like(content): | |
| rewrite_instruction = ( | |
| "Rewrite the previous reply in clear prose — no code blocks, no imports, " | |
| "no function/class definitions. Keep all reasoning and numeric details." | |
| ) | |
| rewrite_msgs = [ | |
| msgs[0], | |
| {"role": "assistant", "content": content}, | |
| {"role": "user", "content": rewrite_instruction}, | |
| ] | |
| try: | |
| resp2 = client.chat.completions.create( | |
| model=MODEL_ID, | |
| messages=rewrite_msgs, | |
| max_tokens=max_tokens, | |
| temperature=0.7, | |
| ) | |
| content2 = resp2.choices[0].message.content or "" | |
| except Exception: | |
| return strip_code_blocks(content) + \ | |
| "\n\n⚠️ Note: rewrite failed; code removed." | |
| if is_code_like(content2): | |
| return strip_code_blocks(content2) + \ | |
| "\n\n⚠️ Note: model persisted in producing code; sanitized." | |
| return content2 | |
| return content | |
| # --------------------------- | |
| # Chat logic | |
| # --------------------------- | |
| def get_last_assistant_tail(max_chars=1200) -> str: | |
| for m in reversed(history_messages): | |
| if m["role"] == "assistant" and m.get("content"): | |
| return m["content"][-max_chars:] | |
| return "" | |
| CONTINUE_KEYWORDS = {"continue", "carry on", "go on", "proceed", "resume", "next"} | |
| def user_requested_code(user_text: str) -> bool: | |
| t = (user_text or "").lower() | |
| triggers = ["show code", "give me code", "provide code", | |
| "script", "python", "javascript", "implementation"] | |
| return any(k in t for k in triggers) | |
| def chat_with_model(user_message, chat_history): | |
| global history_messages, chat_history_for_ui, conversation_mode | |
| user_message = (user_message or "").strip() | |
| if not user_message: | |
| return chat_history, "" | |
| lower = user_message.lower().strip() | |
| # Mode switching | |
| if "switch to research mode" in lower: | |
| conversation_mode = "research" | |
| history_messages = [{"role": "system", | |
| "content": get_system_prompt("research")}] | |
| return chat_history + [("🟢 Mode switched", "🔬 Research Mode activated.")], "" | |
| if "switch to chat mode" in lower: | |
| conversation_mode = "chat" | |
| history_messages = [{"role": "system", | |
| "content": get_system_prompt("chat")}] | |
| return chat_history + [("🟢 Mode switched", "💬 Chat Mode activated.")], "" | |
| allow_code = user_requested_code(user_message) | |
| if lower in CONTINUE_KEYWORDS: | |
| last_tail = get_last_assistant_tail() | |
| resume_hint = ( | |
| "User requested continuation. Resume exactly where you left off " | |
| "and DO NOT repeat earlier sections.\n\nLast assistant message tail:\n" | |
| + last_tail | |
| ) | |
| history_messages.append({"role": "user", "content": resume_hint}) | |
| else: | |
| history_messages.append({"role": "user", "content": user_message}) | |
| history_messages[:] = trim_history_by_chars(history_messages) | |
| try: | |
| bot_text = call_model_get_response( | |
| history_messages, max_tokens=2000, allow_code=allow_code | |
| ) | |
| except Exception as e: | |
| tb = traceback.format_exc() | |
| bot_text = f"⚠️ **Error**: {e}\n\nTraceback:\n{tb.splitlines()[-6:]}" | |
| history_messages.append({"role": "assistant", "content": bot_text}) | |
| chat_history_for_ui.append((user_message, bot_text)) | |
| return chat_history_for_ui, "" | |
| def reset_chat(): | |
| global history_messages, chat_history_for_ui | |
| history_messages = [{"role": "system", "content": get_system_prompt(conversation_mode)}] | |
| chat_history_for_ui = [] | |
| return [] | |
| # --------------------------- | |
| # Gradio UI with working Send button | |
| # --------------------------- | |
| # Gradio UI | |
| # --------------------------- | |
| def build_ui(): | |
| with gr.Blocks( | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| #chatbot { | |
| background-color: #f9f9fb; | |
| border-radius: 12px; | |
| padding: 10px; | |
| overflow-y: auto; | |
| } | |
| .user-bubble { | |
| background: #4a90e2; | |
| color: white; | |
| border-radius: 14px; | |
| padding: 8px 12px; | |
| margin: 6px; | |
| max-width: 75%; | |
| align-self: flex-end; | |
| font-size: 14px; | |
| } | |
| .bot-bubble { | |
| background: #e6e6e6; | |
| color: #333; | |
| border-radius: 14px; | |
| padding: 8px 12px; | |
| margin: 6px; | |
| max-width: 75%; | |
| align-self: flex-start; | |
| font-size: 14px; | |
| } | |
| #controls { | |
| display: flex; | |
| gap: 8px; | |
| align-items: center; | |
| margin-top: 6px; | |
| } | |
| #topbar { | |
| display: flex; | |
| justify-content: flex-end; | |
| gap: 8px; | |
| margin-bottom: 6px; | |
| } | |
| """ | |
| ) as demo: | |
| # Top bar with close + clear | |
| with gr.Row(elem_id="topbar"): | |
| close_btn = gr.Button("❌", size="sm") | |
| clear_btn = gr.Button("🧹 Clear", size="sm") | |
| chatbot = gr.Chatbot( | |
| label="", | |
| height=350, # reduced height so input is visible | |
| elem_id="chatbot", | |
| type="tuples", | |
| bubble_full_width=False, | |
| avatar_images=("👤", "🤖"), | |
| ) | |
| with gr.Row(elem_id="controls"): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here...", | |
| lines=2, | |
| scale=8, | |
| ) | |
| submit_btn = gr.Button("🚀 Send", variant="primary", scale=2) | |
| # Wire buttons | |
| submit_btn.click(chat_with_model, inputs=[msg, chatbot], outputs=[msg, chatbot]) | |
| msg.submit(chat_with_model, inputs=[msg, chatbot], outputs=[msg, chatbot]) | |
| clear_btn.click(reset_chat, inputs=None, outputs=chatbot) | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=False) | |
| return demo | |
| # --------------------------- | |
| # Entrypoint | |
| # --------------------------- | |
| if __name__ == "__main__": | |
| print("Starting Aerelyth with size-capped research context…") | |
| build_ui() |