File size: 14,294 Bytes
b2c0149
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
"""
Kokbco - Hugging Face Space app.py
Chat + Image + Code (Hugging Face Inference API as primary) + optional Copilot.com fallback

راهنما (خلاصه):
- این فایل برای اجرا در یک Hugging Face Space با SDK = Gradio طراحی شده.
- در Secrets فضای HF این مقادیر را اضافه کن:
    - HF_TOKEN: توکن HuggingFace (required)
    - HF_MODEL_CHAT: مدل متن برای چت (مثال: "meta-llama/Llama-2-7b-chat" یا مدل سبک‌تر)
    - HF_MODEL_IMAGE: مدل تصویر (مثال: "stabilityai/stable-diffusion-2")
    - HF_MODEL_CODE: مدل مخصوص کدنویسی (مثال: "bigcode/starcoder")
    - ENABLE_COPILOT_FALLBACK: "1" یا "0" (اختیاری — فعال‌سازی ماژول fallback به Copilot)
    - COPILOT_EMAIL / COPILOT_PASSWORD (در صورت نیاز برای fallback با مرورگر؛ **بسیار** اختیاری)

- نکته مهم: fallback به Copilot.com از طریق scraping / browser automation پیاده شده تا وقتی HF لیمیت داشت، تلاش کند. این بخش آزمایشی و نیاز به Playwright/Selenium دارد و ممکن است به دلیل تغییرات سایت یا محدودیت‌ها کار نکند.

- فایل requirements.txt پیشنهادی:
    gradio
    requests
    cachetools
    playwright (اختیاری برای fallback)

دو بخش مهم:
1) API primary: Hugging Face Inference API (requests)
2) fallback: experimental Playwright automation برای copilot.com

اجرا: این Space با Gradio اجرا می‌شود. قبل از push به Space، حتما HF_TOKEN را در Secrets بذار.

"""

import os
import time
import json
import base64
import threading
from typing import Optional

import requests
from cachetools import TTLCache

# try to import playwright only if fallback enabled (optional)
try:
    from playwright.sync_api import sync_playwright
    PLAYWRIGHT_AVAILABLE = True
except Exception:
    PLAYWRIGHT_AVAILABLE = False

# Gradio for UI
import gradio as gr

# -----------------------------
# Configuration (via env / secrets)
# -----------------------------
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_MODEL_CHAT = os.environ.get("HF_MODEL_CHAT", "tiiuae/falcon-7b-instruct")  # replace with your model
HF_MODEL_IMAGE = os.environ.get("HF_MODEL_IMAGE", "runwayml/stable-diffusion-v1-5")
HF_MODEL_CODE = os.environ.get("HF_MODEL_CODE", "bigcode/starcoder")
ENABLE_COPILOT_FALLBACK = os.environ.get("ENABLE_COPILOT_FALLBACK", "0") == "1"
COPILOT_EMAIL = os.environ.get("COPILOT_EMAIL")
COPILOT_PASSWORD = os.environ.get("COPILOT_PASSWORD")

HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}

# Caches
TEXT_CACHE = TTLCache(maxsize=1024, ttl=60 * 5)  # 5-minute cache
IMAGE_CACHE = TTLCache(maxsize=256, ttl=60 * 60)  # 1-hour cache
CODE_CACHE = TTLCache(maxsize=512, ttl=60 * 10)  # 10-minute cache

# -----------------------------
# Helper: Hugging Face Inference Calls
# -----------------------------

def call_hf_model(model: str, payload: dict, stream: bool = False, timeout: int = 60):
    """Generic call to Hugging Face Inference API for a given model.
    Returns raw response (requests.Response) — caller decides how to parse.
    """
    if not HF_TOKEN:
        raise RuntimeError("HF_TOKEN is not set in environment/secrets")
    url = f"https://api-inference.huggingface.co/models/{model}"
    headers = HF_HEADERS.copy()
    headers.update({"Accept": "application/json"})
    resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
    resp.raise_for_status()
    return resp


def ask_hf_text(model: str, prompt: str, max_new_tokens: int = 512) -> str:
    key = f"text::{model}::{prompt}"
    if key in TEXT_CACHE:
        return TEXT_CACHE[key]
    payload = {
        "inputs": prompt,
        "options": {"wait_for_model": True},
        "parameters": {"max_new_tokens": max_new_tokens}
    }
    try:
        resp = call_hf_model(model, payload)
        data = resp.json()
        # Different HF models/spaces return different shapes; try common keys
        if isinstance(data, dict) and "error" in data:
            raise RuntimeError(data.get("error"))
        # If list of dicts
        if isinstance(data, list):
            # look for generated_text or 'generated_text' key
            text = None
            for item in data:
                if isinstance(item, dict) and "generated_text" in item:
                    text = item["generated_text"]
                    break
            if text is None:
                # join strings
                text = "\n".join([str(x) for x in data])
        elif isinstance(data, dict) and "generated_text" in data:
            text = data["generated_text"]
        else:
            text = str(data)
    except Exception as e:
        # HF failed — try fallback
        text = f"[HF ERROR] {e}"
    TEXT_CACHE[key] = text
    return text


def generate_image_hf(model: str, prompt: str) -> str:
    """Call HF model to generate an image and return a data URI (base64)"""
    key = f"image::{model}::{prompt}"
    if key in IMAGE_CACHE:
        return IMAGE_CACHE[key]
    payload = {"inputs": prompt, "options": {"wait_for_model": True}}
    try:
        resp = call_hf_model(model, payload)
        # Many HF image endpoints return raw image bytes with content-type image/png
        content_type = resp.headers.get("Content-Type", "")
        if content_type.startswith("image/"):
            b = resp.content
            data_uri = f"data:{content_type};base64,{base64.b64encode(b).decode()}"
            IMAGE_CACHE[key] = data_uri
            return data_uri
        # Otherwise parse JSON that may contain base64
        data = resp.json()
        # try to find image base64
        img_b64 = None
        if isinstance(data, dict):
            # common field names
            for k in ("image_base64", "images", "data"):
                if k in data:
                    val = data[k]
                    if isinstance(val, str):
                        img_b64 = val
                        break
                    if isinstance(val, list) and len(val) > 0:
                        if isinstance(val[0], str):
                            img_b64 = val[0]
                            break
        if img_b64:
            data_uri = f"data:image/png;base64,{img_b64}"
            IMAGE_CACHE[key] = data_uri
            return data_uri
        # fallback: return textual repr
        return "[IMAGE_RESPONSE_NOT_BINARY] " + json.dumps(data)[:2000]
    except Exception as e:
        return f"[HF IMAGE ERROR] {e}"


def ask_hf_code(model: str, prompt: str, max_new_tokens: int = 512) -> str:
    key = f"code::{model}::{prompt}"
    if key in CODE_CACHE:
        return CODE_CACHE[key]
    payload = {"inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens}}
    try:
        resp = call_hf_model(model, payload)
        data = resp.json()
        if isinstance(data, list) and isinstance(data[0], dict) and "generated_text" in data[0]:
            code = data[0]["generated_text"]
        elif isinstance(data, dict) and "generated_text" in data:
            code = data["generated_text"]
        else:
            code = str(data)
    except Exception as e:
        code = f"[HF CODE ERROR] {e}"
    CODE_CACHE[key] = code
    return code

# -----------------------------
# Optional Copilot fallback (experimental)
# -----------------------------

def copilot_fallback_text(prompt: str, timeout: int = 30) -> str:
    """Experimental: try to get an answer from Copilot.com by launching a headless browser and automating the UI.
    WARNING: This is fragile, may break if Copilot changes UI, and may violate Copilot terms of service.
    Only enable if you understand risks and have credentials if needed.
    """
    if not ENABLE_COPILOT_FALLBACK:
        return ""
    if not PLAYWRIGHT_AVAILABLE:
        return "[COPILOT FALLBACK UNAVAILABLE: playwright not installed]"

    try:
        with sync_playwright() as pw:
            browser = pw.chromium.launch(headless=True)
            context = browser.new_context()
            page = context.new_page()
            page.goto("https://copilot.com")
            time.sleep(2)
            # NOTE: Selectors below are placeholders. You must inspect copilot.com and adapt selectors.
            # This block is intentionally generic and may require manual tweaks.
            try:
                # if login required, attempt a simple email/password flow if credentials provided
                if COPILOT_EMAIL and COPILOT_PASSWORD:
                    # Example flow, likely needs edits depending on actual site
                    if page.query_selector('input[type="email"]'):
                        page.fill('input[type="email"]', COPILOT_EMAIL)
                        page.click('button[type="submit"]')
                        time.sleep(1)
                        if page.query_selector('input[type="password"]'):
                            page.fill('input[type="password"]', COPILOT_PASSWORD)
                            page.click('button[type="submit"]')
                            time.sleep(3)
                # Try to find a chat box/input
                # The actual selector for the chat input must be discovered by the developer.
                if page.query_selector('textarea'):
                    page.fill('textarea', prompt)
                    page.keyboard.press('Enter')
                    # wait for response; naive wait
                    time.sleep(5)
                    # try to capture response — naive approach
                    content = page.content()
                    # As fallback return full page content truncated
                    return "[COPILOT SCRAPE] " + content[:4000]
            finally:
                context.close()
                browser.close()
    except Exception as e:
        return f"[COPILOT FALLBACK ERROR] {e}"
    return ""

# -----------------------------
# Router / Dispatcher logic
# -----------------------------

def handle_chat(prompt: str) -> str:
    # First: try HF chat model
    out = ask_hf_text(HF_MODEL_CHAT, prompt)
    if out.startswith("[HF ERROR]") or (not out) or (len(out.strip()) < 10):
        # fallback to copilot if available
        if ENABLE_COPILOT_FALLBACK:
            fb = copilot_fallback_text(prompt)
            if fb:
                return fb
    return out


def handle_image(prompt: str) -> str:
    out = generate_image_hf(HF_MODEL_IMAGE, prompt)
    # if HF returned error text, try copilot (unlikely for image)
    if out.startswith("[HF IMAGE ERROR]") and ENABLE_COPILOT_FALLBACK:
        return copilot_fallback_text(prompt)
    return out


def handle_code(prompt: str) -> str:
    out = ask_hf_code(HF_MODEL_CODE, prompt)
    if out.startswith("[HF CODE ERROR]") and ENABLE_COPILOT_FALLBACK:
        return copilot_fallback_text(prompt)
    return out

# -----------------------------
# Background: Simple rate-limiter using tokens
# -----------------------------
class SimpleRateLimiter:
    def __init__(self, calls_per_minute=30):
        self.calls_per_minute = calls_per_minute
        self.lock = threading.Lock()
        self.calls = []

    def allow(self):
        with self.lock:
            now = time.time()
            # drop old
            self.calls = [t for t in self.calls if now - t < 60]
            if len(self.calls) < self.calls_per_minute:
                self.calls.append(now)
                return True
            return False

RATE_LIMITER = SimpleRateLimiter(calls_per_minute=25)

# -----------------------------
# Gradio UI
# -----------------------------

def build_ui():
    with gr.Blocks(title="Kokbco — Chat + Image + Code") as demo:
        gr.Markdown("# Kokbco — Chat, Image & Code (HF primary, Copilot fallback optional)")
        with gr.Tabs():
            with gr.TabItem("Chat"):
                chat_input = gr.Textbox(label="Prompt", placeholder="از من بپرس...")
                chat_btn = gr.Button("Send")
                chat_output = gr.Textbox(label="Response", lines=10)

                def on_chat(inp):
                    if not RATE_LIMITER.allow():
                        return "Rate limit exceeded. Try again in a moment."
                    return handle_chat(inp)

                chat_btn.click(on_chat, inputs=[chat_input], outputs=[chat_output])

            with gr.TabItem("Image"):
                img_input = gr.Textbox(label="Prompt for image", placeholder="شرح تصویر... e.g. a warrior in sunset")
                img_btn = gr.Button("Generate")
                img_output = gr.Image(type="numpy", label="Result")

                def on_image(inp):
                    if not RATE_LIMITER.allow():
                        return ""
                    res = handle_image(inp)
                    # if data URI
                    if isinstance(res, str) and res.startswith("data:"):
                        header, b64 = res.split(",", 1)
                        im_bytes = base64.b64decode(b64)
                        # write to temporary file
                        with open("/tmp/out_img.png", "wb") as f:
                            f.write(im_bytes)
                        return "/tmp/out_img.png"
                    # else return None
                    return None

                img_btn.click(on_image, inputs=[img_input], outputs=[img_output])

            with gr.TabItem("Code"):
                code_input = gr.Textbox(label="Code prompt", placeholder="Write a python function that...")
                code_btn = gr.Button("Generate Code")
                code_output = gr.Code(label="Generated Code", language="python")

                def on_code(inp):
                    if not RATE_LIMITER.allow():
                        return "# Rate limit exceeded"
                    return handle_code(inp)

                code_btn.click(on_code, inputs=[code_input], outputs=[code_output])

        gr.Markdown("---\n**Notes:** Make sure HF_TOKEN + model names are set in Space secrets. Enable Copilot fallback only if you understand risks.")
    return demo


app = build_ui()

if __name__ == "__main__":
    app.launch()