""" Kokbco - Hugging Face Space app.py Chat + Image + Code (Hugging Face Inference API as primary) + optional Copilot.com fallback راهنما (خلاصه): - این فایل برای اجرا در یک Hugging Face Space با SDK = Gradio طراحی شده. - در Secrets فضای HF این مقادیر را اضافه کن: - HF_TOKEN: توکن HuggingFace (required) - HF_MODEL_CHAT: مدل متن برای چت (مثال: "meta-llama/Llama-2-7b-chat" یا مدل سبک‌تر) - HF_MODEL_IMAGE: مدل تصویر (مثال: "stabilityai/stable-diffusion-2") - HF_MODEL_CODE: مدل مخصوص کدنویسی (مثال: "bigcode/starcoder") - ENABLE_COPILOT_FALLBACK: "1" یا "0" (اختیاری — فعال‌سازی ماژول fallback به Copilot) - COPILOT_EMAIL / COPILOT_PASSWORD (در صورت نیاز برای fallback با مرورگر؛ **بسیار** اختیاری) - نکته مهم: fallback به Copilot.com از طریق scraping / browser automation پیاده شده تا وقتی HF لیمیت داشت، تلاش کند. این بخش آزمایشی و نیاز به Playwright/Selenium دارد و ممکن است به دلیل تغییرات سایت یا محدودیت‌ها کار نکند. - فایل requirements.txt پیشنهادی: gradio requests cachetools playwright (اختیاری برای fallback) دو بخش مهم: 1) API primary: Hugging Face Inference API (requests) 2) fallback: experimental Playwright automation برای copilot.com اجرا: این Space با Gradio اجرا می‌شود. قبل از push به Space، حتما HF_TOKEN را در Secrets بذار. """ import os import time import json import base64 import threading from typing import Optional import requests from cachetools import TTLCache # try to import playwright only if fallback enabled (optional) try: from playwright.sync_api import sync_playwright PLAYWRIGHT_AVAILABLE = True except Exception: PLAYWRIGHT_AVAILABLE = False # Gradio for UI import gradio as gr # ----------------------------- # Configuration (via env / secrets) # ----------------------------- HF_TOKEN = os.environ.get("HF_TOKEN", "") HF_MODEL_CHAT = os.environ.get("HF_MODEL_CHAT", "tiiuae/falcon-7b-instruct") # replace with your model HF_MODEL_IMAGE = os.environ.get("HF_MODEL_IMAGE", "runwayml/stable-diffusion-v1-5") HF_MODEL_CODE = os.environ.get("HF_MODEL_CODE", "bigcode/starcoder") ENABLE_COPILOT_FALLBACK = os.environ.get("ENABLE_COPILOT_FALLBACK", "0") == "1" COPILOT_EMAIL = os.environ.get("COPILOT_EMAIL") COPILOT_PASSWORD = os.environ.get("COPILOT_PASSWORD") HF_HEADERS = {"Authorization": f"Bearer {HF_TOKEN}"} if HF_TOKEN else {} # Caches TEXT_CACHE = TTLCache(maxsize=1024, ttl=60 * 5) # 5-minute cache IMAGE_CACHE = TTLCache(maxsize=256, ttl=60 * 60) # 1-hour cache CODE_CACHE = TTLCache(maxsize=512, ttl=60 * 10) # 10-minute cache # ----------------------------- # Helper: Hugging Face Inference Calls # ----------------------------- def call_hf_model(model: str, payload: dict, stream: bool = False, timeout: int = 60): """Generic call to Hugging Face Inference API for a given model. Returns raw response (requests.Response) — caller decides how to parse. """ if not HF_TOKEN: raise RuntimeError("HF_TOKEN is not set in environment/secrets") url = f"https://api-inference.huggingface.co/models/{model}" headers = HF_HEADERS.copy() headers.update({"Accept": "application/json"}) resp = requests.post(url, headers=headers, json=payload, timeout=timeout) resp.raise_for_status() return resp def ask_hf_text(model: str, prompt: str, max_new_tokens: int = 512) -> str: key = f"text::{model}::{prompt}" if key in TEXT_CACHE: return TEXT_CACHE[key] payload = { "inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens} } try: resp = call_hf_model(model, payload) data = resp.json() # Different HF models/spaces return different shapes; try common keys if isinstance(data, dict) and "error" in data: raise RuntimeError(data.get("error")) # If list of dicts if isinstance(data, list): # look for generated_text or 'generated_text' key text = None for item in data: if isinstance(item, dict) and "generated_text" in item: text = item["generated_text"] break if text is None: # join strings text = "\n".join([str(x) for x in data]) elif isinstance(data, dict) and "generated_text" in data: text = data["generated_text"] else: text = str(data) except Exception as e: # HF failed — try fallback text = f"[HF ERROR] {e}" TEXT_CACHE[key] = text return text def generate_image_hf(model: str, prompt: str) -> str: """Call HF model to generate an image and return a data URI (base64)""" key = f"image::{model}::{prompt}" if key in IMAGE_CACHE: return IMAGE_CACHE[key] payload = {"inputs": prompt, "options": {"wait_for_model": True}} try: resp = call_hf_model(model, payload) # Many HF image endpoints return raw image bytes with content-type image/png content_type = resp.headers.get("Content-Type", "") if content_type.startswith("image/"): b = resp.content data_uri = f"data:{content_type};base64,{base64.b64encode(b).decode()}" IMAGE_CACHE[key] = data_uri return data_uri # Otherwise parse JSON that may contain base64 data = resp.json() # try to find image base64 img_b64 = None if isinstance(data, dict): # common field names for k in ("image_base64", "images", "data"): if k in data: val = data[k] if isinstance(val, str): img_b64 = val break if isinstance(val, list) and len(val) > 0: if isinstance(val[0], str): img_b64 = val[0] break if img_b64: data_uri = f"data:image/png;base64,{img_b64}" IMAGE_CACHE[key] = data_uri return data_uri # fallback: return textual repr return "[IMAGE_RESPONSE_NOT_BINARY] " + json.dumps(data)[:2000] except Exception as e: return f"[HF IMAGE ERROR] {e}" def ask_hf_code(model: str, prompt: str, max_new_tokens: int = 512) -> str: key = f"code::{model}::{prompt}" if key in CODE_CACHE: return CODE_CACHE[key] payload = {"inputs": prompt, "options": {"wait_for_model": True}, "parameters": {"max_new_tokens": max_new_tokens}} try: resp = call_hf_model(model, payload) data = resp.json() if isinstance(data, list) and isinstance(data[0], dict) and "generated_text" in data[0]: code = data[0]["generated_text"] elif isinstance(data, dict) and "generated_text" in data: code = data["generated_text"] else: code = str(data) except Exception as e: code = f"[HF CODE ERROR] {e}" CODE_CACHE[key] = code return code # ----------------------------- # Optional Copilot fallback (experimental) # ----------------------------- def copilot_fallback_text(prompt: str, timeout: int = 30) -> str: """Experimental: try to get an answer from Copilot.com by launching a headless browser and automating the UI. WARNING: This is fragile, may break if Copilot changes UI, and may violate Copilot terms of service. Only enable if you understand risks and have credentials if needed. """ if not ENABLE_COPILOT_FALLBACK: return "" if not PLAYWRIGHT_AVAILABLE: return "[COPILOT FALLBACK UNAVAILABLE: playwright not installed]" try: with sync_playwright() as pw: browser = pw.chromium.launch(headless=True) context = browser.new_context() page = context.new_page() page.goto("https://copilot.com") time.sleep(2) # NOTE: Selectors below are placeholders. You must inspect copilot.com and adapt selectors. # This block is intentionally generic and may require manual tweaks. try: # if login required, attempt a simple email/password flow if credentials provided if COPILOT_EMAIL and COPILOT_PASSWORD: # Example flow, likely needs edits depending on actual site if page.query_selector('input[type="email"]'): page.fill('input[type="email"]', COPILOT_EMAIL) page.click('button[type="submit"]') time.sleep(1) if page.query_selector('input[type="password"]'): page.fill('input[type="password"]', COPILOT_PASSWORD) page.click('button[type="submit"]') time.sleep(3) # Try to find a chat box/input # The actual selector for the chat input must be discovered by the developer. if page.query_selector('textarea'): page.fill('textarea', prompt) page.keyboard.press('Enter') # wait for response; naive wait time.sleep(5) # try to capture response — naive approach content = page.content() # As fallback return full page content truncated return "[COPILOT SCRAPE] " + content[:4000] finally: context.close() browser.close() except Exception as e: return f"[COPILOT FALLBACK ERROR] {e}" return "" # ----------------------------- # Router / Dispatcher logic # ----------------------------- def handle_chat(prompt: str) -> str: # First: try HF chat model out = ask_hf_text(HF_MODEL_CHAT, prompt) if out.startswith("[HF ERROR]") or (not out) or (len(out.strip()) < 10): # fallback to copilot if available if ENABLE_COPILOT_FALLBACK: fb = copilot_fallback_text(prompt) if fb: return fb return out def handle_image(prompt: str) -> str: out = generate_image_hf(HF_MODEL_IMAGE, prompt) # if HF returned error text, try copilot (unlikely for image) if out.startswith("[HF IMAGE ERROR]") and ENABLE_COPILOT_FALLBACK: return copilot_fallback_text(prompt) return out def handle_code(prompt: str) -> str: out = ask_hf_code(HF_MODEL_CODE, prompt) if out.startswith("[HF CODE ERROR]") and ENABLE_COPILOT_FALLBACK: return copilot_fallback_text(prompt) return out # ----------------------------- # Background: Simple rate-limiter using tokens # ----------------------------- class SimpleRateLimiter: def __init__(self, calls_per_minute=30): self.calls_per_minute = calls_per_minute self.lock = threading.Lock() self.calls = [] def allow(self): with self.lock: now = time.time() # drop old self.calls = [t for t in self.calls if now - t < 60] if len(self.calls) < self.calls_per_minute: self.calls.append(now) return True return False RATE_LIMITER = SimpleRateLimiter(calls_per_minute=25) # ----------------------------- # Gradio UI # ----------------------------- def build_ui(): with gr.Blocks(title="Kokbco — Chat + Image + Code") as demo: gr.Markdown("# Kokbco — Chat, Image & Code (HF primary, Copilot fallback optional)") with gr.Tabs(): with gr.TabItem("Chat"): chat_input = gr.Textbox(label="Prompt", placeholder="از من بپرس...") chat_btn = gr.Button("Send") chat_output = gr.Textbox(label="Response", lines=10) def on_chat(inp): if not RATE_LIMITER.allow(): return "Rate limit exceeded. Try again in a moment." return handle_chat(inp) chat_btn.click(on_chat, inputs=[chat_input], outputs=[chat_output]) with gr.TabItem("Image"): img_input = gr.Textbox(label="Prompt for image", placeholder="شرح تصویر... e.g. a warrior in sunset") img_btn = gr.Button("Generate") img_output = gr.Image(type="numpy", label="Result") def on_image(inp): if not RATE_LIMITER.allow(): return "" res = handle_image(inp) # if data URI if isinstance(res, str) and res.startswith("data:"): header, b64 = res.split(",", 1) im_bytes = base64.b64decode(b64) # write to temporary file with open("/tmp/out_img.png", "wb") as f: f.write(im_bytes) return "/tmp/out_img.png" # else return None return None img_btn.click(on_image, inputs=[img_input], outputs=[img_output]) with gr.TabItem("Code"): code_input = gr.Textbox(label="Code prompt", placeholder="Write a python function that...") code_btn = gr.Button("Generate Code") code_output = gr.Code(label="Generated Code", language="python") def on_code(inp): if not RATE_LIMITER.allow(): return "# Rate limit exceeded" return handle_code(inp) code_btn.click(on_code, inputs=[code_input], outputs=[code_output]) gr.Markdown("---\n**Notes:** Make sure HF_TOKEN + model names are set in Space secrets. Enable Copilot fallback only if you understand risks.") return demo app = build_ui() if __name__ == "__main__": app.launch()