File size: 14,294 Bytes
b2c0149 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 | """
Kokbco - Hugging Face Space app.py
Chat + Image + Code (Hugging Face Inference API as primary) + optional Copilot.com fallback
راهنما (خلاصه):
- این فایل برای اجرا در یک Hugging Face Space با SDK = Gradio طراحی شده.
- در Secrets فضای HF این مقادیر را اضافه کن:
- HF_TOKEN: توکن HuggingFace (required)
- HF_MODEL_CHAT: مدل متن برای چت (مثال: "meta-llama/Llama-2-7b-chat" یا مدل سبکتر)
- HF_MODEL_IMAGE: مدل تصویر (مثال: "stabilityai/stable-diffusion-2")
- HF_MODEL_CODE: مدل مخصوص کدنویسی (مثال: "bigcode/starcoder")
- ENABLE_COPILOT_FALLBACK: "1" یا "0" (اختیاری — فعالسازی ماژول fallback به Copilot)
- COPILOT_EMAIL / COPILOT_PASSWORD (در صورت نیاز برای fallback با مرورگر؛ **بسیار** اختیاری)
- نکته مهم: fallback به Copilot.com از طریق scraping / browser automation پیاده شده تا وقتی HF لیمیت داشت، تلاش کند. این بخش آزمایشی و نیاز به Playwright/Selenium دارد و ممکن است به دلیل تغییرات سایت یا محدودیتها کار نکند.
- فایل requirements.txt پیشنهادی:
gradio
requests
cachetools
playwright (اختیاری برای fallback)
دو بخش مهم:
1) API primary: Hugging Face Inference API (requests)
2) fallback: experimental Playwright automation برای copilot.com
اجرا: این Space با Gradio اجرا میشود. قبل از push به Space، حتما HF_TOKEN را در Secrets بذار.
"""
import os
import time
import json
import base64
import threading
from typing import Optional
import requests
from cachetools import TTLCache
# try to import playwright only if fallback enabled (optional)
try:
from playwright.sync_api import sync_playwright
PLAYWRIGHT_AVAILABLE = True
except Exception:
PLAYWRIGHT_AVAILABLE = False
# Gradio for UI
import gradio as gr
# -----------------------------
# Configuration (via env / secrets)
# -----------------------------
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_MODEL_CHAT = os.environ.get("HF_MODEL_CHAT", "tiiuae/falcon-7b-instruct") # replace with your model
HF_MODEL_IMAGE = os.environ.get("HF_MODEL_IMAGE", "runwayml/stable-diffusion-v1-5")
HF_MODEL_CODE = os.environ.get("HF_MODEL_CODE", "bigcode/starcoder")
ENABLE_COPILOT_FALLBACK = os.environ.get("ENABLE_COPILOT_FALLBACK", "0") == "1"
COPILOT_EMAIL = os.environ.get("COPILOT_EMAIL")
COPILOT_PASSWORD = os.environ.get("COPILOT_PASSWORD")
HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# Caches
TEXT_CACHE = TTLCache(maxsize=1024, ttl=60 * 5) # 5-minute cache
IMAGE_CACHE = TTLCache(maxsize=256, ttl=60 * 60) # 1-hour cache
CODE_CACHE = TTLCache(maxsize=512, ttl=60 * 10) # 10-minute cache
# -----------------------------
# Helper: Hugging Face Inference Calls
# -----------------------------
def call_hf_model(model: str, payload: dict, stream: bool = False, timeout: int = 60):
"""Generic call to Hugging Face Inference API for a given model.
Returns raw response (requests.Response) — caller decides how to parse.
"""
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN is not set in environment/secrets")
url = f"https://api-inference.huggingface.co/models/{model}"
headers = HF_HEADERS.copy()
headers.update({"Accept": "application/json"})
resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
resp.raise_for_status()
return resp
def ask_hf_text(model: str, prompt: str, max_new_tokens: int = 512) -> str:
key = f"text::{model}::{prompt}"
if key in TEXT_CACHE:
return TEXT_CACHE[key]
payload = {
"inputs": prompt,
"options": {"wait_for_model": True},
"parameters": {"max_new_tokens": max_new_tokens}
}
try:
resp = call_hf_model(model, payload)
data = resp.json()
# Different HF models/spaces return different shapes; try common keys
if isinstance(data, dict) and "error" in data:
raise RuntimeError(data.get("error"))
# If list of dicts
if isinstance(data, list):
# look for generated_text or 'generated_text' key
text = None
for item in data:
if isinstance(item, dict) and "generated_text" in item:
text = item["generated_text"]
break
if text is None:
# join strings
text = "\n".join([str(x) for x in data])
elif isinstance(data, dict) and "generated_text" in data:
text = data["generated_text"]
else:
text = str(data)
except Exception as e:
# HF failed — try fallback
text = f"[HF ERROR] {e}"
TEXT_CACHE[key] = text
return text
def generate_image_hf(model: str, prompt: str) -> str:
"""Call HF model to generate an image and return a data URI (base64)"""
key = f"image::{model}::{prompt}"
if key in IMAGE_CACHE:
return IMAGE_CACHE[key]
payload = {"inputs": prompt, "options": {"wait_for_model": True}}
try:
resp = call_hf_model(model, payload)
# Many HF image endpoints return raw image bytes with content-type image/png
content_type = resp.headers.get("Content-Type", "")
if content_type.startswith("image/"):
b = resp.content
data_uri = f"data:{content_type};base64,{base64.b64encode(b).decode()}"
IMAGE_CACHE[key] = data_uri
return data_uri
# Otherwise parse JSON that may contain base64
data = resp.json()
# try to find image base64
img_b64 = None
if isinstance(data, dict):
# common field names
for k in ("image_base64", "images", "data"):
if k in data:
val = data[k]
if isinstance(val, str):
img_b64 = val
break
if isinstance(val, list) and len(val) > 0:
if isinstance(val[0], str):
img_b64 = val[0]
break
if img_b64:
data_uri = f"data:image/png;base64,{img_b64}"
IMAGE_CACHE[key] = data_uri
return data_uri
# fallback: return textual repr
return "[IMAGE_RESPONSE_NOT_BINARY] " + json.dumps(data)[:2000]
except Exception as e:
return f"[HF IMAGE ERROR] {e}"
def ask_hf_code(model: str, prompt: str, max_new_tokens: int = 512) -> str:
key = f"code::{model}::{prompt}"
if key in CODE_CACHE:
return CODE_CACHE[key]
payload = {"inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens}}
try:
resp = call_hf_model(model, payload)
data = resp.json()
if isinstance(data, list) and isinstance(data[0], dict) and "generated_text" in data[0]:
code = data[0]["generated_text"]
elif isinstance(data, dict) and "generated_text" in data:
code = data["generated_text"]
else:
code = str(data)
except Exception as e:
code = f"[HF CODE ERROR] {e}"
CODE_CACHE[key] = code
return code
# -----------------------------
# Optional Copilot fallback (experimental)
# -----------------------------
def copilot_fallback_text(prompt: str, timeout: int = 30) -> str:
"""Experimental: try to get an answer from Copilot.com by launching a headless browser and automating the UI.
WARNING: This is fragile, may break if Copilot changes UI, and may violate Copilot terms of service.
Only enable if you understand risks and have credentials if needed.
"""
if not ENABLE_COPILOT_FALLBACK:
return ""
if not PLAYWRIGHT_AVAILABLE:
return "[COPILOT FALLBACK UNAVAILABLE: playwright not installed]"
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
page.goto("https://copilot.com")
time.sleep(2)
# NOTE: Selectors below are placeholders. You must inspect copilot.com and adapt selectors.
# This block is intentionally generic and may require manual tweaks.
try:
# if login required, attempt a simple email/password flow if credentials provided
if COPILOT_EMAIL and COPILOT_PASSWORD:
# Example flow, likely needs edits depending on actual site
if page.query_selector('input[type="email"]'):
page.fill('input[type="email"]', COPILOT_EMAIL)
page.click('button[type="submit"]')
time.sleep(1)
if page.query_selector('input[type="password"]'):
page.fill('input[type="password"]', COPILOT_PASSWORD)
page.click('button[type="submit"]')
time.sleep(3)
# Try to find a chat box/input
# The actual selector for the chat input must be discovered by the developer.
if page.query_selector('textarea'):
page.fill('textarea', prompt)
page.keyboard.press('Enter')
# wait for response; naive wait
time.sleep(5)
# try to capture response — naive approach
content = page.content()
# As fallback return full page content truncated
return "[COPILOT SCRAPE] " + content[:4000]
finally:
context.close()
browser.close()
except Exception as e:
return f"[COPILOT FALLBACK ERROR] {e}"
return ""
# -----------------------------
# Router / Dispatcher logic
# -----------------------------
def handle_chat(prompt: str) -> str:
# First: try HF chat model
out = ask_hf_text(HF_MODEL_CHAT, prompt)
if out.startswith("[HF ERROR]") or (not out) or (len(out.strip()) < 10):
# fallback to copilot if available
if ENABLE_COPILOT_FALLBACK:
fb = copilot_fallback_text(prompt)
if fb:
return fb
return out
def handle_image(prompt: str) -> str:
out = generate_image_hf(HF_MODEL_IMAGE, prompt)
# if HF returned error text, try copilot (unlikely for image)
if out.startswith("[HF IMAGE ERROR]") and ENABLE_COPILOT_FALLBACK:
return copilot_fallback_text(prompt)
return out
def handle_code(prompt: str) -> str:
out = ask_hf_code(HF_MODEL_CODE, prompt)
if out.startswith("[HF CODE ERROR]") and ENABLE_COPILOT_FALLBACK:
return copilot_fallback_text(prompt)
return out
# -----------------------------
# Background: Simple rate-limiter using tokens
# -----------------------------
class SimpleRateLimiter:
def __init__(self, calls_per_minute=30):
self.calls_per_minute = calls_per_minute
self.lock = threading.Lock()
self.calls = []
def allow(self):
with self.lock:
now = time.time()
# drop old
self.calls = [t for t in self.calls if now - t < 60]
if len(self.calls) < self.calls_per_minute:
self.calls.append(now)
return True
return False
RATE_LIMITER = SimpleRateLimiter(calls_per_minute=25)
# -----------------------------
# Gradio UI
# -----------------------------
def build_ui():
with gr.Blocks(title="Kokbco — Chat + Image + Code") as demo:
gr.Markdown("# Kokbco — Chat, Image & Code (HF primary, Copilot fallback optional)")
with gr.Tabs():
with gr.TabItem("Chat"):
chat_input = gr.Textbox(label="Prompt", placeholder="از من بپرس...")
chat_btn = gr.Button("Send")
chat_output = gr.Textbox(label="Response", lines=10)
def on_chat(inp):
if not RATE_LIMITER.allow():
return "Rate limit exceeded. Try again in a moment."
return handle_chat(inp)
chat_btn.click(on_chat, inputs=[chat_input], outputs=[chat_output])
with gr.TabItem("Image"):
img_input = gr.Textbox(label="Prompt for image", placeholder="شرح تصویر... e.g. a warrior in sunset")
img_btn = gr.Button("Generate")
img_output = gr.Image(type="numpy", label="Result")
def on_image(inp):
if not RATE_LIMITER.allow():
return ""
res = handle_image(inp)
# if data URI
if isinstance(res, str) and res.startswith("data:"):
header, b64 = res.split(",", 1)
im_bytes = base64.b64decode(b64)
# write to temporary file
with open("/tmp/out_img.png", "wb") as f:
f.write(im_bytes)
return "/tmp/out_img.png"
# else return None
return None
img_btn.click(on_image, inputs=[img_input], outputs=[img_output])
with gr.TabItem("Code"):
code_input = gr.Textbox(label="Code prompt", placeholder="Write a python function that...")
code_btn = gr.Button("Generate Code")
code_output = gr.Code(label="Generated Code", language="python")
def on_code(inp):
if not RATE_LIMITER.allow():
return "# Rate limit exceeded"
return handle_code(inp)
code_btn.click(on_code, inputs=[code_input], outputs=[code_output])
gr.Markdown("---\n**Notes:** Make sure HF_TOKEN + model names are set in Space secrets. Enable Copilot fallback only if you understand risks.")
return demo
app = build_ui()
if __name__ == "__main__":
app.launch()
|