Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from docx import Document | |
| import openai | |
| import google.generativeai as genai | |
| import requests | |
| from tempfile import NamedTemporaryFile | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| from sambanova import SambaNova # hard dependency; no try/except | |
| def _flatten_text(value: Any) -> str: | |
| """Converts nested structures (dict/list/tuple) into a single string.""" | |
| if value is None: | |
| return "" | |
| if isinstance(value, str): | |
| return value | |
| if isinstance(value, dict): | |
| for key in ("text", "content", "parts"): | |
| if key in value: | |
| return _flatten_text(value[key]) | |
| return " ".join(filter(None, (_flatten_text(v) for v in value.values()))) | |
| if isinstance(value, (list, tuple)): | |
| return " ".join(filter(None, (_flatten_text(v) for v in value))) | |
| return str(value) | |
| def _normalize_messages(history: Optional[List[Any]]) -> List[Dict[str, str]]: | |
| """Normalizes chat history into a list of {'role','content'} dicts.""" | |
| msgs = [] | |
| for h in (history or []): | |
| if isinstance(h, dict) and "role" in h and "content" in h: | |
| if h["role"] in ("user", "assistant"): | |
| msgs.append({"role": h["role"], "content": _flatten_text(h["content"])}) | |
| elif isinstance(h, (list, tuple)) and len(h) >= 2: | |
| u, a = h[0], h[1] | |
| if u is not None: | |
| msgs.append({"role": "user", "content": _flatten_text(u)}) | |
| if a is not None: | |
| msgs.append({"role": "assistant", "content": _flatten_text(a)}) | |
| return msgs | |
| def _pairs_from_history(history: Optional[List[Any]]) -> List[Tuple[str, str]]: | |
| """Converts normalized messages into (user, assistant) pairs.""" | |
| pairs = [] | |
| pending_user = None | |
| for h in _normalize_messages(history): | |
| if h["role"] == "user": | |
| if pending_user is not None: | |
| pairs.append((pending_user, "")) # user without assistant response | |
| pending_user = h["content"] | |
| elif h["role"] == "assistant": | |
| if pending_user is None: | |
| pairs.append(("", h["content"])) # assistant without prior user | |
| else: | |
| pairs.append((pending_user, h["content"])) | |
| pending_user = None | |
| if pending_user is not None: | |
| pairs.append((pending_user, "")) # trailing user | |
| return pairs | |
| def _msgs(history: Optional[List[Any]], user_msg: str) -> List[Dict[str, str]]: | |
| """Builds a messages array with a system prompt.""" | |
| m = [{"role": "system", "content": "You are a helpful assistant."}] | |
| m += _normalize_messages(history) | |
| m.append({"role": "user", "content": user_msg}) | |
| return m | |
| def guardar_conversacion(historia: Optional[List[Any]]) -> Optional[str]: | |
| """Generates a .docx file of the conversation and returns its filepath.""" | |
| try: | |
| doc = Document() | |
| doc.add_heading("Scribe Conversation", level=1) | |
| for idx, (u, a) in enumerate(_pairs_from_history(historia)): | |
| user_text = _flatten_text(u).strip() | |
| assistant_text = _flatten_text(a).strip() | |
| if idx: | |
| doc.add_paragraph("") | |
| p_user = doc.add_paragraph() | |
| p_user.add_run("User: ").bold = True | |
| p_user.add_run(user_text or "β") | |
| p_assistant = doc.add_paragraph() | |
| p_assistant.add_run("Assistant: ").bold = True | |
| p_assistant.add_run(assistant_text or "β") | |
| tmp = NamedTemporaryFile(delete=False, suffix=".docx", prefix="Scribe_") | |
| doc.save(tmp.name) | |
| return tmp.name | |
| except Exception: | |
| return None | |
| def chat_response(message: str, history: Optional[List[Any]], provider: str, api_key: str, model: str) -> str: | |
| """Routes the chat request to the selected provider and returns the assistant text.""" | |
| if provider != "Ollama" and not api_key: | |
| return "β οΈ Please enter an API Key to proceed." | |
| # Require explicit model for all providers except Ollama | |
| if provider != "Ollama" and not (model or "").strip(): | |
| return "β οΈ Please specify a model for the selected provider." | |
| try: | |
| if provider == "OpenAI": | |
| client = openai.OpenAI(api_key=api_key) | |
| r = client.chat.completions.create(model=model, messages=_msgs(history, message)) | |
| return r.choices[0].message.content | |
| elif provider == "Gemini": | |
| genai.configure(api_key=api_key) | |
| mdl = genai.GenerativeModel(model) | |
| ctx = "System: You are a helpful assistant.\n" | |
| for u, a in _pairs_from_history(history): | |
| ctx += f"User: {u or ''}\nModel: {a or ''}\n" | |
| ctx += f"User: {message}\nModel:" | |
| out = mdl.generate_content(ctx) | |
| return getattr(out, "text", "") or "β οΈ Empty response from Gemini." | |
| elif provider == "Sambanova": | |
| client = SambaNova(api_key=api_key, base_url="https://api.sambanova.ai/v1") | |
| r = client.chat.completions.create( | |
| model=model, | |
| messages=_msgs(history, message), | |
| temperature=0.2, | |
| top_p=0.9, | |
| ) | |
| return r.choices[0].message.content | |
| elif provider == "Nebius": | |
| client = openai.OpenAI(base_url="https://api.tokenfactory.nebius.com/v1/", api_key=api_key) | |
| r = client.chat.completions.create(model=model, messages=_msgs(history, message)) | |
| return r.choices[0].message.content | |
| elif provider == "Ollama": | |
| base = "http://127.0.0.1:11434" | |
| mdl = (model or "").strip() | |
| if not mdl: | |
| try: | |
| r = requests.get(f"{base}/api/tags", timeout=5) | |
| if r.status_code == 200: | |
| data = r.json() if r.headers.get("Content-Type", "").startswith("application/json") else {} | |
| tags = data.get("models", []) | |
| mdl = tags[0]["name"] if tags else "llama3" | |
| else: | |
| mdl = "llama3" | |
| except Exception: | |
| mdl = "llama3" | |
| resp = requests.post( | |
| f"{base}/v1/chat/completions", | |
| json={"model": mdl, "messages": _msgs(history, message), "stream": False}, | |
| timeout=60, | |
| ) | |
| if resp.status_code == 200: | |
| try: | |
| data = resp.json() | |
| return data["choices"][0]["message"]["content"] | |
| except Exception: | |
| return "β οΈ Ollama returned invalid JSON." | |
| return f"β οΈ Ollama Error {resp.status_code}: {resp.text}" | |
| else: | |
| return "π« Provider not supported." | |
| except Exception as e: | |
| return f"β οΈ Error: {e}" | |
| # --- Dynamic help in the UI --- | |
| def _provider_help(p: str) -> str: | |
| """Returns help text for the selected provider.""" | |
| if p == "Sambanova": | |
| return ( | |
| "Sambanova:\n" | |
| "- pip install sambanova\n" | |
| "- Get your API Key at sambanova.ai.\n" | |
| "- Specify the exact model name (e.g., Meta-Llama-3.1-8B-Instruct, Meta-Llama-3.1-70B-Instruct)." | |
| ) | |
| if p == "Nebius": | |
| return ( | |
| "Nebius:\n" | |
| "- Paste your Nebius API Key.\n" | |
| "- Base URL is preconfigured: https://api.tokenfactory.nebius.com/v1/\n" | |
| "- Specify a model (e.g., openai/gpt-oss-120b, openai/gpt-4o-mini)." | |
| ) | |
| if p == "OpenAI": | |
| return ( | |
| "OpenAI: enter your API Key and specify a model.\n" | |
| "Examples: gpt-4o-mini, gpt-4o, o4-mini, o3-mini." | |
| ) | |
| if p == "Gemini": | |
| return ( | |
| "Gemini: enter your API Key and specify a model.\n" | |
| "Examples: gemini-1.5-flash, gemini-1.5-pro, gemini-1.5-flash-8b." | |
| ) | |
| if p == "Ollama": | |
| return "Ollama: no API Key required; leave the model empty to auto-select a local one (e.g., llama3, qwen2.5)." | |
| return "" | |
| def _on_provider_change(p: str): | |
| """Updates help text and model placeholder based on provider.""" | |
| if p == "Sambanova": | |
| ph = "e.g. Meta-Llama-3.1-8B-Instruct" | |
| elif p == "Nebius": | |
| ph = "e.g. openai/gpt-oss-120b" | |
| elif p == "OpenAI": | |
| ph = "e.g. gpt-4o-mini" | |
| elif p == "Gemini": | |
| ph = "e.g. gemini-1.5-flash" | |
| else: | |
| ph = "(Ollama: leave empty for automatic)" | |
| return _provider_help(p), gr.update(placeholder=ph) | |
| def handle_chat(message: str, history: Optional[List[Any]], provider: str, api_key: str, model: str): | |
| """Gradio handler to process a message and update chat history.""" | |
| reply = chat_response(message, history, provider, api_key, model) | |
| new_hist = (_normalize_messages(history)) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": str(reply)}, | |
| ] | |
| return "", new_hist | |
| with gr.Blocks(title="π Scribe") as demo: | |
| gr.Markdown("## π Scribe\nChat and save your conversation to .docx") | |
| # Disclaimer about API key safety and best practices | |
| gr.Markdown( | |
| "Disclaimer: While this app takes reasonable steps to reduce risks related to API keys (e.g., not auto-filling secrets and using them only for requests you trigger), no application can fully prevent misuse. Follow these best practices:\n" | |
| "- Use environment variables or a secure secrets manager where possible.\n" | |
| "- Do not share or hard-code your API keys in source control.\n" | |
| "- Rotate keys periodically and revoke any suspected-compromised keys.\n" | |
| "- Restrict key permissions and scopes to the minimum needed.\n" | |
| "- Monitor usage and set rate limits/quotas where available.\n" | |
| "- Only run this app in trusted environments and networks." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| provider = gr.Dropdown( | |
| choices=["OpenAI", "Gemini", "Sambanova", "Nebius", "Ollama"], | |
| value="OpenAI", | |
| label="π Service Provider" | |
| ) | |
| api_key = gr.Textbox(label="π API Key", type="password", placeholder="sk-...") # do not auto-fill secrets | |
| model = gr.Textbox(label="π§ Model", placeholder="(Ollama: leave empty for automatic)") | |
| help_md = gr.Markdown(_provider_help("OpenAI")) | |
| with gr.Column(scale=3): | |
| chat = gr.Chatbot(label="π¬ Scribe Chat") | |
| msg = gr.Textbox(placeholder="βοΈ Type your message and press Enter...") | |
| with gr.Row(): | |
| send = gr.Button("π Send", variant="primary") | |
| clear = gr.Button("π§Ή Clear") | |
| download = gr.Button("β¬οΈ Download Scribe .docx") | |
| file_out = gr.File(label="π Scribe Generated file", interactive=False) | |
| send.click(handle_chat, inputs=[msg, chat, provider, api_key, model], outputs=[msg, chat]) | |
| msg.submit(handle_chat, inputs=[msg, chat, provider, api_key, model], outputs=[msg, chat]) | |
| clear.click(lambda: [], None, chat, queue=False) | |
| download.click(guardar_conversacion, inputs=[chat], outputs=[file_out]) | |
| provider.change(_on_provider_change, inputs=[provider], outputs=[help_md, model]) | |
| demo.launch(mcp_server=True, allowed_paths=["."]) | |