| """ |
| Kokbco - Hugging Face Space app.py |
| Chat + Image + Code (Hugging Face Inference API as primary) + optional Copilot.com fallback |
| |
| راهنما (خلاصه): |
| - این فایل برای اجرا در یک Hugging Face Space با SDK = Gradio طراحی شده. |
| - در Secrets فضای HF این مقادیر را اضافه کن: |
| - HF_TOKEN: توکن HuggingFace (required) |
| - HF_MODEL_CHAT: مدل متن برای چت (مثال: "meta-llama/Llama-2-7b-chat" یا مدل سبکتر) |
| - HF_MODEL_IMAGE: مدل تصویر (مثال: "stabilityai/stable-diffusion-2") |
| - HF_MODEL_CODE: مدل مخصوص کدنویسی (مثال: "bigcode/starcoder") |
| - ENABLE_COPILOT_FALLBACK: "1" یا "0" (اختیاری — فعالسازی ماژول fallback به Copilot) |
| - COPILOT_EMAIL / COPILOT_PASSWORD (در صورت نیاز برای fallback با مرورگر؛ **بسیار** اختیاری) |
| |
| - نکته مهم: fallback به Copilot.com از طریق scraping / browser automation پیاده شده تا وقتی HF لیمیت داشت، تلاش کند. این بخش آزمایشی و نیاز به Playwright/Selenium دارد و ممکن است به دلیل تغییرات سایت یا محدودیتها کار نکند. |
| |
| - فایل requirements.txt پیشنهادی: |
| gradio |
| requests |
| cachetools |
| playwright (اختیاری برای fallback) |
| |
| دو بخش مهم: |
| 1) API primary: Hugging Face Inference API (requests) |
| 2) fallback: experimental Playwright automation برای copilot.com |
| |
| اجرا: این Space با Gradio اجرا میشود. قبل از push به Space، حتما HF_TOKEN را در Secrets بذار. |
| |
| """ |
|
|
| import os |
| import time |
| import json |
| import base64 |
| import threading |
| from typing import Optional |
|
|
| import requests |
| from cachetools import TTLCache |
|
|
| |
| try: |
| from playwright.sync_api import sync_playwright |
| PLAYWRIGHT_AVAILABLE = True |
| except Exception: |
| PLAYWRIGHT_AVAILABLE = False |
|
|
| |
| import gradio as gr |
|
|
| |
| |
| |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") |
| HF_MODEL_CHAT = os.environ.get("HF_MODEL_CHAT", "tiiuae/falcon-7b-instruct") |
| HF_MODEL_IMAGE = os.environ.get("HF_MODEL_IMAGE", "runwayml/stable-diffusion-v1-5") |
| HF_MODEL_CODE = os.environ.get("HF_MODEL_CODE", "bigcode/starcoder") |
| ENABLE_COPILOT_FALLBACK = os.environ.get("ENABLE_COPILOT_FALLBACK", "0") == "1" |
| COPILOT_EMAIL = os.environ.get("COPILOT_EMAIL") |
| COPILOT_PASSWORD = os.environ.get("COPILOT_PASSWORD") |
|
|
| HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} |
|
|
| |
| TEXT_CACHE = TTLCache(maxsize=1024, ttl=60 * 5) |
| IMAGE_CACHE = TTLCache(maxsize=256, ttl=60 * 60) |
| CODE_CACHE = TTLCache(maxsize=512, ttl=60 * 10) |
|
|
| |
| |
| |
|
|
| def call_hf_model(model: str, payload: dict, stream: bool = False, timeout: int = 60): |
| """Generic call to Hugging Face Inference API for a given model. |
| Returns raw response (requests.Response) — caller decides how to parse. |
| """ |
| if not HF_TOKEN: |
| raise RuntimeError("HF_TOKEN is not set in environment/secrets") |
| url = f"https://api-inference.huggingface.co/models/{model}" |
| headers = HF_HEADERS.copy() |
| headers.update({"Accept": "application/json"}) |
| resp = requests.post(url, headers=headers, json=payload, timeout=timeout) |
| resp.raise_for_status() |
| return resp |
|
|
|
|
| def ask_hf_text(model: str, prompt: str, max_new_tokens: int = 512) -> str: |
| key = f"text::{model}::{prompt}" |
| if key in TEXT_CACHE: |
| return TEXT_CACHE[key] |
| payload = { |
| "inputs": prompt, |
| "options": {"wait_for_model": True}, |
| "parameters": {"max_new_tokens": max_new_tokens} |
| } |
| try: |
| resp = call_hf_model(model, payload) |
| data = resp.json() |
| |
| if isinstance(data, dict) and "error" in data: |
| raise RuntimeError(data.get("error")) |
| |
| if isinstance(data, list): |
| |
| text = None |
| for item in data: |
| if isinstance(item, dict) and "generated_text" in item: |
| text = item["generated_text"] |
| break |
| if text is None: |
| |
| text = "\n".join([str(x) for x in data]) |
| elif isinstance(data, dict) and "generated_text" in data: |
| text = data["generated_text"] |
| else: |
| text = str(data) |
| except Exception as e: |
| |
| text = f"[HF ERROR] {e}" |
| TEXT_CACHE[key] = text |
| return text |
|
|
|
|
| def generate_image_hf(model: str, prompt: str) -> str: |
| """Call HF model to generate an image and return a data URI (base64)""" |
| key = f"image::{model}::{prompt}" |
| if key in IMAGE_CACHE: |
| return IMAGE_CACHE[key] |
| payload = {"inputs": prompt, "options": {"wait_for_model": True}} |
| try: |
| resp = call_hf_model(model, payload) |
| |
| content_type = resp.headers.get("Content-Type", "") |
| if content_type.startswith("image/"): |
| b = resp.content |
| data_uri = f"data:{content_type};base64,{base64.b64encode(b).decode()}" |
| IMAGE_CACHE[key] = data_uri |
| return data_uri |
| |
| data = resp.json() |
| |
| img_b64 = None |
| if isinstance(data, dict): |
| |
| for k in ("image_base64", "images", "data"): |
| if k in data: |
| val = data[k] |
| if isinstance(val, str): |
| img_b64 = val |
| break |
| if isinstance(val, list) and len(val) > 0: |
| if isinstance(val[0], str): |
| img_b64 = val[0] |
| break |
| if img_b64: |
| data_uri = f"data:image/png;base64,{img_b64}" |
| IMAGE_CACHE[key] = data_uri |
| return data_uri |
| |
| return "[IMAGE_RESPONSE_NOT_BINARY] " + json.dumps(data)[:2000] |
| except Exception as e: |
| return f"[HF IMAGE ERROR] {e}" |
|
|
|
|
| def ask_hf_code(model: str, prompt: str, max_new_tokens: int = 512) -> str: |
| key = f"code::{model}::{prompt}" |
| if key in CODE_CACHE: |
| return CODE_CACHE[key] |
| payload = {"inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens}} |
| try: |
| resp = call_hf_model(model, payload) |
| data = resp.json() |
| if isinstance(data, list) and isinstance(data[0], dict) and "generated_text" in data[0]: |
| code = data[0]["generated_text"] |
| elif isinstance(data, dict) and "generated_text" in data: |
| code = data["generated_text"] |
| else: |
| code = str(data) |
| except Exception as e: |
| code = f"[HF CODE ERROR] {e}" |
| CODE_CACHE[key] = code |
| return code |
|
|
| |
| |
| |
|
|
| def copilot_fallback_text(prompt: str, timeout: int = 30) -> str: |
| """Experimental: try to get an answer from Copilot.com by launching a headless browser and automating the UI. |
| WARNING: This is fragile, may break if Copilot changes UI, and may violate Copilot terms of service. |
| Only enable if you understand risks and have credentials if needed. |
| """ |
| if not ENABLE_COPILOT_FALLBACK: |
| return "" |
| if not PLAYWRIGHT_AVAILABLE: |
| return "[COPILOT FALLBACK UNAVAILABLE: playwright not installed]" |
|
|
| try: |
| with sync_playwright() as pw: |
| browser = pw.chromium.launch(headless=True) |
| context = browser.new_context() |
| page = context.new_page() |
| page.goto("https://copilot.com") |
| time.sleep(2) |
| |
| |
| try: |
| |
| if COPILOT_EMAIL and COPILOT_PASSWORD: |
| |
| if page.query_selector('input[type="email"]'): |
| page.fill('input[type="email"]', COPILOT_EMAIL) |
| page.click('button[type="submit"]') |
| time.sleep(1) |
| if page.query_selector('input[type="password"]'): |
| page.fill('input[type="password"]', COPILOT_PASSWORD) |
| page.click('button[type="submit"]') |
| time.sleep(3) |
| |
| |
| if page.query_selector('textarea'): |
| page.fill('textarea', prompt) |
| page.keyboard.press('Enter') |
| |
| time.sleep(5) |
| |
| content = page.content() |
| |
| return "[COPILOT SCRAPE] " + content[:4000] |
| finally: |
| context.close() |
| browser.close() |
| except Exception as e: |
| return f"[COPILOT FALLBACK ERROR] {e}" |
| return "" |
|
|
| |
| |
| |
|
|
| def handle_chat(prompt: str) -> str: |
| |
| out = ask_hf_text(HF_MODEL_CHAT, prompt) |
| if out.startswith("[HF ERROR]") or (not out) or (len(out.strip()) < 10): |
| |
| if ENABLE_COPILOT_FALLBACK: |
| fb = copilot_fallback_text(prompt) |
| if fb: |
| return fb |
| return out |
|
|
|
|
| def handle_image(prompt: str) -> str: |
| out = generate_image_hf(HF_MODEL_IMAGE, prompt) |
| |
| if out.startswith("[HF IMAGE ERROR]") and ENABLE_COPILOT_FALLBACK: |
| return copilot_fallback_text(prompt) |
| return out |
|
|
|
|
| def handle_code(prompt: str) -> str: |
| out = ask_hf_code(HF_MODEL_CODE, prompt) |
| if out.startswith("[HF CODE ERROR]") and ENABLE_COPILOT_FALLBACK: |
| return copilot_fallback_text(prompt) |
| return out |
|
|
| |
| |
| |
| class SimpleRateLimiter: |
| def __init__(self, calls_per_minute=30): |
| self.calls_per_minute = calls_per_minute |
| self.lock = threading.Lock() |
| self.calls = [] |
|
|
| def allow(self): |
| with self.lock: |
| now = time.time() |
| |
| self.calls = [t for t in self.calls if now - t < 60] |
| if len(self.calls) < self.calls_per_minute: |
| self.calls.append(now) |
| return True |
| return False |
|
|
| RATE_LIMITER = SimpleRateLimiter(calls_per_minute=25) |
|
|
| |
| |
| |
|
|
| def build_ui(): |
| with gr.Blocks(title="Kokbco — Chat + Image + Code") as demo: |
| gr.Markdown("# Kokbco — Chat, Image & Code (HF primary, Copilot fallback optional)") |
| with gr.Tabs(): |
| with gr.TabItem("Chat"): |
| chat_input = gr.Textbox(label="Prompt", placeholder="از من بپرس...") |
| chat_btn = gr.Button("Send") |
| chat_output = gr.Textbox(label="Response", lines=10) |
|
|
| def on_chat(inp): |
| if not RATE_LIMITER.allow(): |
| return "Rate limit exceeded. Try again in a moment." |
| return handle_chat(inp) |
|
|
| chat_btn.click(on_chat, inputs=[chat_input], outputs=[chat_output]) |
|
|
| with gr.TabItem("Image"): |
| img_input = gr.Textbox(label="Prompt for image", placeholder="شرح تصویر... e.g. a warrior in sunset") |
| img_btn = gr.Button("Generate") |
| img_output = gr.Image(type="numpy", label="Result") |
|
|
| def on_image(inp): |
| if not RATE_LIMITER.allow(): |
| return "" |
| res = handle_image(inp) |
| |
| if isinstance(res, str) and res.startswith("data:"): |
| header, b64 = res.split(",", 1) |
| im_bytes = base64.b64decode(b64) |
| |
| with open("/tmp/out_img.png", "wb") as f: |
| f.write(im_bytes) |
| return "/tmp/out_img.png" |
| |
| return None |
|
|
| img_btn.click(on_image, inputs=[img_input], outputs=[img_output]) |
|
|
| with gr.TabItem("Code"): |
| code_input = gr.Textbox(label="Code prompt", placeholder="Write a python function that...") |
| code_btn = gr.Button("Generate Code") |
| code_output = gr.Code(label="Generated Code", language="python") |
|
|
| def on_code(inp): |
| if not RATE_LIMITER.allow(): |
| return "# Rate limit exceeded" |
| return handle_code(inp) |
|
|
| code_btn.click(on_code, inputs=[code_input], outputs=[code_output]) |
|
|
| gr.Markdown("---\n**Notes:** Make sure HF_TOKEN + model names are set in Space secrets. Enable Copilot fallback only if you understand risks.") |
| return demo |
|
|
|
|
| app = build_ui() |
|
|
| if __name__ == "__main__": |
| app.launch() |
|
|