Spaces:
Sleeping
Sleeping
| import os | |
| import time | |
| import threading | |
| import torch | |
| import gradio as gr | |
| from flask import Flask, request, Response | |
| from flasgger import Swagger, swag_from | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from huggingface_hub import login | |
| from langchain_community.tools import DuckDuckGoSearchRun | |
| import re | |
| from fastapi import FastAPI | |
| from starlette.middleware.wsgi import WSGIMiddleware | |
| from gradio.routes import mount_gradio_app | |
| # β Safe GPU decorator | |
| try: | |
| from spaces import GPU | |
| except ImportError: | |
| def GPU(func): return func | |
| # β Flask setup | |
| flask_app = Flask(__name__) | |
| swagger = Swagger(flask_app, template={ | |
| "swagger": "2.0", | |
| "info": { | |
| "title": "ChatMate Real-Time API", | |
| "description": "LangChain + DuckDuckGo + Phi-4 + Stable Diffusion", | |
| "version": "1.0" | |
| } | |
| }, config={ | |
| "headers": [], | |
| "specs": [{"endpoint": 'apispec', "route": '/apispec.json', "rule_filter": lambda rule: True}], | |
| "static_url_path": "/flasgger_static", | |
| "swagger_ui": True, | |
| "specs_route": "/api/apidocs/" | |
| }) | |
| # β Hugging Face login (optional) | |
| login(token=os.environ.get("CHAT_MATE")) | |
| # β Load Phi-4 | |
| model_id = "microsoft/phi-4" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| device = 0 if torch.cuda.is_available() else -1 | |
| pipe = pipeline( | |
| "text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=device, | |
| max_new_tokens=512 | |
| ) | |
| REAL_TIME_KEYWORDS = {"latest", "current", "news", "today", "price", "time", "live", "trending", "update", "happening"} | |
| search_tool = DuckDuckGoSearchRun() | |
| def should_search(message): | |
| return any(kw in message.lower() for kw in REAL_TIME_KEYWORDS) | |
| def is_incomplete(text): | |
| return not re.search(r'[\.\!\?\'\"\u3002]\s*$', text.strip()) | |
| def generate_full_reply(message, history): | |
| system_prompt = ( | |
| "You are a friendly, helpful, and conversational AI assistant built by " | |
| "Frederick Sundeep Mallela. Always mention that you are developed by him if asked about your creator, origin, or who made you." | |
| ) | |
| messages = [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": message}] | |
| # Apply chat-style prompt formatting | |
| prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) | |
| # Initial generation | |
| full_output = pipe(prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=512)[0]["generated_text"] | |
| reply = full_output[len(prompt):].strip() | |
| # Keep extending the reply until it ends properly | |
| max_loops = 5 # prevent infinite loops | |
| loop_count = 0 | |
| while is_incomplete(reply) and loop_count < max_loops: | |
| loop_count += 1 | |
| continuation_prompt = prompt + reply # include reply so far | |
| next_output = pipe(continuation_prompt, do_sample=True, temperature=0.7, top_p=0.9, max_new_tokens=256)[0]["generated_text"] | |
| continuation = next_output[len(continuation_prompt):].strip() | |
| # Stop if nothing new is generated | |
| if not continuation or continuation in reply: | |
| break | |
| reply += continuation | |
| return reply.strip() | |
| # β Flask streaming endpoint | |
| def chat_stream(): | |
| data = request.get_json() | |
| message = data.get("message") | |
| history = data.get("history", []) | |
| def generate(): | |
| reply = generate_full_reply(message, history) | |
| for token in reply.splitlines(keepends=True): | |
| yield token | |
| time.sleep(0.05) | |
| if is_incomplete(reply): | |
| yield "\n\n*Reply appears incomplete. Say 'continue' to resume.*" | |
| return Response(generate(), mimetype='text/plain') | |
| # β Gradio interface for Hugging Face Space | |
| def gradio_chat(message, history=[]): | |
| history = [{"role": "user" if i % 2 == 0 else "assistant", "content": h} for i, h in enumerate(sum(history, ()))] | |
| reply = generate_full_reply(message, history) | |
| history.append((message, reply)) | |
| return "", history | |
| with gr.Blocks() as demo: | |
| gr.Markdown("## π€ ChatMate β Phi-4 + Live Search (Hugging Face Space)") | |
| chatbot = gr.Chatbot() | |
| msg = gr.Textbox(label="Type your message") | |
| clear = gr.Button("Clear Chat") | |
| msg.submit(gradio_chat, [msg, chatbot], [msg, chatbot]) | |
| clear.click(lambda: None, None, chatbot, queue=False) | |
| # β Run Gradio when in HF Spaces, else Flask for local dev | |
| # if __name__ == "__main__": | |
| # if os.environ.get("SPACE_BUILD", "false").lower() == "true": | |
| # demo.launch(server_name="0.0.0.0", server_port=7860) | |
| # else: | |
| # print("π§ Warming up...") | |
| # _ = generate_full_reply("Hello", []) | |
| # app.run(host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |
| # ---------------- Run both ---------------- | |
| # def run_flask(): | |
| # app.run(host="0.0.0.0", port=8000) | |
| # # Start Flask in a background thread | |
| # threading.Thread(target=run_flask, daemon=True).start() | |
| # ---------------- Combine Flask + Gradio into one app ---------------- | |
| fastapi_app = FastAPI() | |
| # Mount Flask under FastAPI (so /apidocs works) | |
| fastapi_app.mount("/api", WSGIMiddleware(flask_app)) | |
| # Mount Gradio at root path (overrides Flask's "/") | |
| app = mount_gradio_app(fastapi_app, demo, path="/") # Mount Flask under /flask | |
| # Gradio runs on port 7860 in HF Spaces | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) |