File size: 11,581 Bytes
f88dde2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
import gradio as gr
from docx import Document
import openai
import google.generativeai as genai
import requests
from tempfile import NamedTemporaryFile
from typing import Any, Dict, List, Optional, Tuple, Union
from sambanova import SambaNova  # hard dependency; no try/except

def _flatten_text(value: Any) -> str:
    """Converts nested structures (dict/list/tuple) into a single string."""
    if value is None:
        return ""
    if isinstance(value, str):
        return value
    if isinstance(value, dict):
        for key in ("text", "content", "parts"):
            if key in value:
                return _flatten_text(value[key])
        return " ".join(filter(None, (_flatten_text(v) for v in value.values())))
    if isinstance(value, (list, tuple)):
        return " ".join(filter(None, (_flatten_text(v) for v in value)))
    return str(value)

def _normalize_messages(history: Optional[List[Any]]) -> List[Dict[str, str]]:
    """Normalizes chat history into a list of {'role','content'} dicts."""
    msgs = []
    for h in (history or []):
        if isinstance(h, dict) and "role" in h and "content" in h:
            if h["role"] in ("user", "assistant"):
                msgs.append({"role": h["role"], "content": _flatten_text(h["content"])})
        elif isinstance(h, (list, tuple)) and len(h) >= 2:
            u, a = h[0], h[1]
            if u is not None:
                msgs.append({"role": "user", "content": _flatten_text(u)})
            if a is not None:
                msgs.append({"role": "assistant", "content": _flatten_text(a)})
    return msgs

def _pairs_from_history(history: Optional[List[Any]]) -> List[Tuple[str, str]]:
    """Converts normalized messages into (user, assistant) pairs."""
    pairs = []
    pending_user = None
    for h in _normalize_messages(history):
        if h["role"] == "user":
            if pending_user is not None:
                pairs.append((pending_user, ""))  # user without assistant response
            pending_user = h["content"]
        elif h["role"] == "assistant":
            if pending_user is None:
                pairs.append(("", h["content"]))  # assistant without prior user
            else:
                pairs.append((pending_user, h["content"]))
                pending_user = None
    if pending_user is not None:
        pairs.append((pending_user, ""))  # trailing user
    return pairs

def _msgs(history: Optional[List[Any]], user_msg: str) -> List[Dict[str, str]]:
    """Builds a messages array with a system prompt."""
    m = [{"role": "system", "content": "You are a helpful assistant."}]
    m += _normalize_messages(history)
    m.append({"role": "user", "content": user_msg})
    return m

def guardar_conversacion(historia: Optional[List[Any]]) -> Optional[str]:
    """Generates a .docx file of the conversation and returns its filepath."""
    try:
        doc = Document()
        doc.add_heading("Scribe Conversation", level=1)
        for idx, (u, a) in enumerate(_pairs_from_history(historia)):
            user_text = _flatten_text(u).strip()
            assistant_text = _flatten_text(a).strip()
            if idx:
                doc.add_paragraph("")
            p_user = doc.add_paragraph()
            p_user.add_run("User: ").bold = True
            p_user.add_run(user_text or "β€”")
            p_assistant = doc.add_paragraph()
            p_assistant.add_run("Assistant: ").bold = True
            p_assistant.add_run(assistant_text or "β€”")
        tmp = NamedTemporaryFile(delete=False, suffix=".docx", prefix="Scribe_")
        doc.save(tmp.name)
        return tmp.name
    except Exception:
        return None

def chat_response(message: str, history: Optional[List[Any]], provider: str, api_key: str, model: str) -> str:
    """Routes the chat request to the selected provider and returns the assistant text."""
    if provider != "Ollama" and not api_key:
        return "⚠️ Please enter an API Key to proceed."
    # Require explicit model for all providers except Ollama
    if provider != "Ollama" and not (model or "").strip():
        return "⚠️ Please specify a model for the selected provider."
    try:
        if provider == "OpenAI":
            client = openai.OpenAI(api_key=api_key)
            r = client.chat.completions.create(model=model, messages=_msgs(history, message))
            return r.choices[0].message.content
        elif provider == "Gemini":
            genai.configure(api_key=api_key)
            mdl = genai.GenerativeModel(model)
            ctx = "System: You are a helpful assistant.\n"
            for u, a in _pairs_from_history(history):
                ctx += f"User: {u or ''}\nModel: {a or ''}\n"
            ctx += f"User: {message}\nModel:"
            out = mdl.generate_content(ctx)
            return getattr(out, "text", "") or "⚠️ Empty response from Gemini."
        elif provider == "Sambanova":
            client = SambaNova(api_key=api_key, base_url="https://api.sambanova.ai/v1")
            r = client.chat.completions.create(
                model=model,
                messages=_msgs(history, message),
                temperature=0.2,
                top_p=0.9,
            )
            return r.choices[0].message.content
        elif provider == "Nebius":
            client = openai.OpenAI(base_url="https://api.tokenfactory.nebius.com/v1/", api_key=api_key)
            r = client.chat.completions.create(model=model, messages=_msgs(history, message))
            return r.choices[0].message.content
        elif provider == "Ollama":
            base = "http://127.0.0.1:11434"
            mdl = (model or "").strip()
            if not mdl:
                try:
                    r = requests.get(f"{base}/api/tags", timeout=5)
                    if r.status_code == 200:
                        data = r.json() if r.headers.get("Content-Type", "").startswith("application/json") else {}
                        tags = data.get("models", [])
                        mdl = tags[0]["name"] if tags else "llama3"
                    else:
                        mdl = "llama3"
                except Exception:
                    mdl = "llama3"
            resp = requests.post(
                f"{base}/v1/chat/completions",
                json={"model": mdl, "messages": _msgs(history, message), "stream": False},
                timeout=60,
            )
            if resp.status_code == 200:
                try:
                    data = resp.json()
                    return data["choices"][0]["message"]["content"]
                except Exception:
                    return "⚠️ Ollama returned invalid JSON."
            return f"⚠️ Ollama Error {resp.status_code}: {resp.text}"
        else:
            return "🚫 Provider not supported."
    except Exception as e:
        return f"⚠️ Error: {e}"

# --- Dynamic help in the UI ---
def _provider_help(p: str) -> str:
    """Returns help text for the selected provider."""
    if p == "Sambanova":
        return (
            "Sambanova:\n"
            "- pip install sambanova\n"
            "- Get your API Key at sambanova.ai.\n"
            "- Specify the exact model name (e.g., Meta-Llama-3.1-8B-Instruct, Meta-Llama-3.1-70B-Instruct)."
        )
    if p == "Nebius":
        return (
            "Nebius:\n"
            "- Paste your Nebius API Key.\n"
            "- Base URL is preconfigured: https://api.tokenfactory.nebius.com/v1/\n"
            "- Specify a model (e.g., openai/gpt-oss-120b, openai/gpt-4o-mini)."
        )
    if p == "OpenAI":
        return (
            "OpenAI: enter your API Key and specify a model.\n"
            "Examples: gpt-4o-mini, gpt-4o, o4-mini, o3-mini."
        )
    if p == "Gemini":
        return (
            "Gemini: enter your API Key and specify a model.\n"
            "Examples: gemini-1.5-flash, gemini-1.5-pro, gemini-1.5-flash-8b."
        )
    if p == "Ollama":
        return "Ollama: no API Key required; leave the model empty to auto-select a local one (e.g., llama3, qwen2.5)."
    return ""

def _on_provider_change(p: str):
    """Updates help text and model placeholder based on provider."""
    if p == "Sambanova":
        ph = "e.g. Meta-Llama-3.1-8B-Instruct"
    elif p == "Nebius":
        ph = "e.g. openai/gpt-oss-120b"
    elif p == "OpenAI":
        ph = "e.g. gpt-4o-mini"
    elif p == "Gemini":
        ph = "e.g. gemini-1.5-flash"
    else:
        ph = "(Ollama: leave empty for automatic)"
    return _provider_help(p), gr.update(placeholder=ph)

def handle_chat(message: str, history: Optional[List[Any]], provider: str, api_key: str, model: str):
    """Gradio handler to process a message and update chat history."""
    reply = chat_response(message, history, provider, api_key, model)
    new_hist = (_normalize_messages(history)) + [
        {"role": "user", "content": message},
        {"role": "assistant", "content": str(reply)},
    ]
    return "", new_hist

with gr.Blocks(title="πŸ“ Scribe") as demo:
    gr.Markdown("## πŸŽ‰ Scribe\nChat and save your conversation to .docx")
    # Disclaimer about API key safety and best practices
    gr.Markdown(
        "Disclaimer: While this app takes reasonable steps to reduce risks related to API keys (e.g., not auto-filling secrets and using them only for requests you trigger), no application can fully prevent misuse. Follow these best practices:\n"
        "- Use environment variables or a secure secrets manager where possible.\n"
        "- Do not share or hard-code your API keys in source control.\n"
        "- Rotate keys periodically and revoke any suspected-compromised keys.\n"
        "- Restrict key permissions and scopes to the minimum needed.\n"
        "- Monitor usage and set rate limits/quotas where available.\n"
        "- Only run this app in trusted environments and networks."
    )
    with gr.Row():
        with gr.Column(scale=1):
            provider = gr.Dropdown(
                choices=["OpenAI", "Gemini", "Sambanova", "Nebius", "Ollama"],
                value="OpenAI",
                label="πŸ”Œ Service Provider"
            )
            api_key = gr.Textbox(label="πŸ”‘ API Key", type="password", placeholder="sk-...")  # do not auto-fill secrets
            model = gr.Textbox(label="🧠 Model", placeholder="(Ollama: leave empty for automatic)")
            help_md = gr.Markdown(_provider_help("OpenAI"))
        with gr.Column(scale=3):
            chat = gr.Chatbot(label="πŸ’¬ Scribe Chat")
            msg = gr.Textbox(placeholder="✍️ Type your message and press Enter...")
            with gr.Row():
                send = gr.Button("πŸš€ Send", variant="primary")
                clear = gr.Button("🧹 Clear")
                download = gr.Button("⬇️ Download Scribe .docx")
            file_out = gr.File(label="πŸ“„ Scribe Generated file", interactive=False)
    send.click(handle_chat, inputs=[msg, chat, provider, api_key, model], outputs=[msg, chat])
    msg.submit(handle_chat, inputs=[msg, chat, provider, api_key, model], outputs=[msg, chat])
    clear.click(lambda: [], None, chat, queue=False)
    download.click(guardar_conversacion, inputs=[chat], outputs=[file_out])
    provider.change(_on_provider_change, inputs=[provider], outputs=[help_md, model])

demo.launch(mcp_server=True, allowed_paths=["."])