Kokbco / app.py
Hosdroid's picture
Create app.py
b2c0149 verified
Raw
History Blame Contribute Delete
14.3 kB
"""
Kokbco - Hugging Face Space app.py
Chat + Image + Code (Hugging Face Inference API as primary) + optional Copilot.com fallback
راهنما (خلاصه):
- این فایل برای اجرا در یک Hugging Face Space با SDK = Gradio طراحی شده.
- در Secrets فضای HF این مقادیر را اضافه کن:
- HF_TOKEN: توکن HuggingFace (required)
- HF_MODEL_CHAT: مدل متن برای چت (مثال: "meta-llama/Llama-2-7b-chat" یا مدل سبک‌تر)
- HF_MODEL_IMAGE: مدل تصویر (مثال: "stabilityai/stable-diffusion-2")
- HF_MODEL_CODE: مدل مخصوص کدنویسی (مثال: "bigcode/starcoder")
- ENABLE_COPILOT_FALLBACK: "1" یا "0" (اختیاری — فعال‌سازی ماژول fallback به Copilot)
- COPILOT_EMAIL / COPILOT_PASSWORD (در صورت نیاز برای fallback با مرورگر؛ **بسیار** اختیاری)
- نکته مهم: fallback به Copilot.com از طریق scraping / browser automation پیاده شده تا وقتی HF لیمیت داشت، تلاش کند. این بخش آزمایشی و نیاز به Playwright/Selenium دارد و ممکن است به دلیل تغییرات سایت یا محدودیت‌ها کار نکند.
- فایل requirements.txt پیشنهادی:
gradio
requests
cachetools
playwright (اختیاری برای fallback)
دو بخش مهم:
1) API primary: Hugging Face Inference API (requests)
2) fallback: experimental Playwright automation برای copilot.com
اجرا: این Space با Gradio اجرا می‌شود. قبل از push به Space، حتما HF_TOKEN را در Secrets بذار.
"""
import os
import time
import json
import base64
import threading
from typing import Optional
import requests
from cachetools import TTLCache
# try to import playwright only if fallback enabled (optional)
try:
from playwright.sync_api import sync_playwright
PLAYWRIGHT_AVAILABLE = True
except Exception:
PLAYWRIGHT_AVAILABLE = False
# Gradio for UI
import gradio as gr
# -----------------------------
# Configuration (via env / secrets)
# -----------------------------
HF_TOKEN = os.environ.get("HF_TOKEN", "")
HF_MODEL_CHAT = os.environ.get("HF_MODEL_CHAT", "tiiuae/falcon-7b-instruct") # replace with your model
HF_MODEL_IMAGE = os.environ.get("HF_MODEL_IMAGE", "runwayml/stable-diffusion-v1-5")
HF_MODEL_CODE = os.environ.get("HF_MODEL_CODE", "bigcode/starcoder")
ENABLE_COPILOT_FALLBACK = os.environ.get("ENABLE_COPILOT_FALLBACK", "0") == "1"
COPILOT_EMAIL = os.environ.get("COPILOT_EMAIL")
COPILOT_PASSWORD = os.environ.get("COPILOT_PASSWORD")
HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {}
# Caches
TEXT_CACHE = TTLCache(maxsize=1024, ttl=60 * 5) # 5-minute cache
IMAGE_CACHE = TTLCache(maxsize=256, ttl=60 * 60) # 1-hour cache
CODE_CACHE = TTLCache(maxsize=512, ttl=60 * 10) # 10-minute cache
# -----------------------------
# Helper: Hugging Face Inference Calls
# -----------------------------
def call_hf_model(model: str, payload: dict, stream: bool = False, timeout: int = 60):
"""Generic call to Hugging Face Inference API for a given model.
Returns raw response (requests.Response) — caller decides how to parse.
"""
if not HF_TOKEN:
raise RuntimeError("HF_TOKEN is not set in environment/secrets")
url = f"https://api-inference.huggingface.co/models/{model}"
headers = HF_HEADERS.copy()
headers.update({"Accept": "application/json"})
resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
resp.raise_for_status()
return resp
def ask_hf_text(model: str, prompt: str, max_new_tokens: int = 512) -> str:
key = f"text::{model}::{prompt}"
if key in TEXT_CACHE:
return TEXT_CACHE[key]
payload = {
"inputs": prompt,
"options": {"wait_for_model": True},
"parameters": {"max_new_tokens": max_new_tokens}
}
try:
resp = call_hf_model(model, payload)
data = resp.json()
# Different HF models/spaces return different shapes; try common keys
if isinstance(data, dict) and "error" in data:
raise RuntimeError(data.get("error"))
# If list of dicts
if isinstance(data, list):
# look for generated_text or 'generated_text' key
text = None
for item in data:
if isinstance(item, dict) and "generated_text" in item:
text = item["generated_text"]
break
if text is None:
# join strings
text = "\n".join([str(x) for x in data])
elif isinstance(data, dict) and "generated_text" in data:
text = data["generated_text"]
else:
text = str(data)
except Exception as e:
# HF failed — try fallback
text = f"[HF ERROR] {e}"
TEXT_CACHE[key] = text
return text
def generate_image_hf(model: str, prompt: str) -> str:
"""Call HF model to generate an image and return a data URI (base64)"""
key = f"image::{model}::{prompt}"
if key in IMAGE_CACHE:
return IMAGE_CACHE[key]
payload = {"inputs": prompt, "options": {"wait_for_model": True}}
try:
resp = call_hf_model(model, payload)
# Many HF image endpoints return raw image bytes with content-type image/png
content_type = resp.headers.get("Content-Type", "")
if content_type.startswith("image/"):
b = resp.content
data_uri = f"data:{content_type};base64,{base64.b64encode(b).decode()}"
IMAGE_CACHE[key] = data_uri
return data_uri
# Otherwise parse JSON that may contain base64
data = resp.json()
# try to find image base64
img_b64 = None
if isinstance(data, dict):
# common field names
for k in ("image_base64", "images", "data"):
if k in data:
val = data[k]
if isinstance(val, str):
img_b64 = val
break
if isinstance(val, list) and len(val) > 0:
if isinstance(val[0], str):
img_b64 = val[0]
break
if img_b64:
data_uri = f"data:image/png;base64,{img_b64}"
IMAGE_CACHE[key] = data_uri
return data_uri
# fallback: return textual repr
return "[IMAGE_RESPONSE_NOT_BINARY] " + json.dumps(data)[:2000]
except Exception as e:
return f"[HF IMAGE ERROR] {e}"
def ask_hf_code(model: str, prompt: str, max_new_tokens: int = 512) -> str:
key = f"code::{model}::{prompt}"
if key in CODE_CACHE:
return CODE_CACHE[key]
payload = {"inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens}}
try:
resp = call_hf_model(model, payload)
data = resp.json()
if isinstance(data, list) and isinstance(data[0], dict) and "generated_text" in data[0]:
code = data[0]["generated_text"]
elif isinstance(data, dict) and "generated_text" in data:
code = data["generated_text"]
else:
code = str(data)
except Exception as e:
code = f"[HF CODE ERROR] {e}"
CODE_CACHE[key] = code
return code
# -----------------------------
# Optional Copilot fallback (experimental)
# -----------------------------
def copilot_fallback_text(prompt: str, timeout: int = 30) -> str:
"""Experimental: try to get an answer from Copilot.com by launching a headless browser and automating the UI.
WARNING: This is fragile, may break if Copilot changes UI, and may violate Copilot terms of service.
Only enable if you understand risks and have credentials if needed.
"""
if not ENABLE_COPILOT_FALLBACK:
return ""
if not PLAYWRIGHT_AVAILABLE:
return "[COPILOT FALLBACK UNAVAILABLE: playwright not installed]"
try:
with sync_playwright() as pw:
browser = pw.chromium.launch(headless=True)
context = browser.new_context()
page = context.new_page()
page.goto("https://copilot.com")
time.sleep(2)
# NOTE: Selectors below are placeholders. You must inspect copilot.com and adapt selectors.
# This block is intentionally generic and may require manual tweaks.
try:
# if login required, attempt a simple email/password flow if credentials provided
if COPILOT_EMAIL and COPILOT_PASSWORD:
# Example flow, likely needs edits depending on actual site
if page.query_selector('input[type="email"]'):
page.fill('input[type="email"]', COPILOT_EMAIL)
page.click('button[type="submit"]')
time.sleep(1)
if page.query_selector('input[type="password"]'):
page.fill('input[type="password"]', COPILOT_PASSWORD)
page.click('button[type="submit"]')
time.sleep(3)
# Try to find a chat box/input
# The actual selector for the chat input must be discovered by the developer.
if page.query_selector('textarea'):
page.fill('textarea', prompt)
page.keyboard.press('Enter')
# wait for response; naive wait
time.sleep(5)
# try to capture response — naive approach
content = page.content()
# As fallback return full page content truncated
return "[COPILOT SCRAPE] " + content[:4000]
finally:
context.close()
browser.close()
except Exception as e:
return f"[COPILOT FALLBACK ERROR] {e}"
return ""
# -----------------------------
# Router / Dispatcher logic
# -----------------------------
def handle_chat(prompt: str) -> str:
# First: try HF chat model
out = ask_hf_text(HF_MODEL_CHAT, prompt)
if out.startswith("[HF ERROR]") or (not out) or (len(out.strip()) < 10):
# fallback to copilot if available
if ENABLE_COPILOT_FALLBACK:
fb = copilot_fallback_text(prompt)
if fb:
return fb
return out
def handle_image(prompt: str) -> str:
out = generate_image_hf(HF_MODEL_IMAGE, prompt)
# if HF returned error text, try copilot (unlikely for image)
if out.startswith("[HF IMAGE ERROR]") and ENABLE_COPILOT_FALLBACK:
return copilot_fallback_text(prompt)
return out
def handle_code(prompt: str) -> str:
out = ask_hf_code(HF_MODEL_CODE, prompt)
if out.startswith("[HF CODE ERROR]") and ENABLE_COPILOT_FALLBACK:
return copilot_fallback_text(prompt)
return out
# -----------------------------
# Background: Simple rate-limiter using tokens
# -----------------------------
class SimpleRateLimiter:
def __init__(self, calls_per_minute=30):
self.calls_per_minute = calls_per_minute
self.lock = threading.Lock()
self.calls = []
def allow(self):
with self.lock:
now = time.time()
# drop old
self.calls = [t for t in self.calls if now - t < 60]
if len(self.calls) < self.calls_per_minute:
self.calls.append(now)
return True
return False
RATE_LIMITER = SimpleRateLimiter(calls_per_minute=25)
# -----------------------------
# Gradio UI
# -----------------------------
def build_ui():
with gr.Blocks(title="Kokbco — Chat + Image + Code") as demo:
gr.Markdown("# Kokbco — Chat, Image & Code (HF primary, Copilot fallback optional)")
with gr.Tabs():
with gr.TabItem("Chat"):
chat_input = gr.Textbox(label="Prompt", placeholder="از من بپرس...")
chat_btn = gr.Button("Send")
chat_output = gr.Textbox(label="Response", lines=10)
def on_chat(inp):
if not RATE_LIMITER.allow():
return "Rate limit exceeded. Try again in a moment."
return handle_chat(inp)
chat_btn.click(on_chat, inputs=[chat_input], outputs=[chat_output])
with gr.TabItem("Image"):
img_input = gr.Textbox(label="Prompt for image", placeholder="شرح تصویر... e.g. a warrior in sunset")
img_btn = gr.Button("Generate")
img_output = gr.Image(type="numpy", label="Result")
def on_image(inp):
if not RATE_LIMITER.allow():
return ""
res = handle_image(inp)
# if data URI
if isinstance(res, str) and res.startswith("data:"):
header, b64 = res.split(",", 1)
im_bytes = base64.b64decode(b64)
# write to temporary file
with open("/tmp/out_img.png", "wb") as f:
f.write(im_bytes)
return "/tmp/out_img.png"
# else return None
return None
img_btn.click(on_image, inputs=[img_input], outputs=[img_output])
with gr.TabItem("Code"):
code_input = gr.Textbox(label="Code prompt", placeholder="Write a python function that...")
code_btn = gr.Button("Generate Code")
code_output = gr.Code(label="Generated Code", language="python")
def on_code(inp):
if not RATE_LIMITER.allow():
return "# Rate limit exceeded"
return handle_code(inp)
code_btn.click(on_code, inputs=[code_input], outputs=[code_output])
gr.Markdown("---\n**Notes:** Make sure HF_TOKEN + model names are set in Space secrets. Enable Copilot fallback only if you understand risks.")
return demo
app = build_ui()
if __name__ == "__main__":
app.launch()