Spaces:
Sleeping
Sleeping
| import os | |
| import tempfile | |
| import traceback | |
| from fastapi import FastAPI, UploadFile, File, Header, HTTPException, Body | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from spitch import Spitch | |
| from langchain.prompts import PromptTemplate | |
| from langchain_huggingface import HuggingFaceEndpoint | |
| from langdetect import detect, DetectorFactory | |
| from huggingface_hub.utils import HfHubHTTPError | |
| from smebuilder_vector import retriever # Retriever for context injection | |
| # ----------------- CONFIG ----------------- | |
| DetectorFactory.seed = 0 | |
| SPITCH_API_KEY = os.getenv("SPITCH_API_KEY") | |
| HF_MODEL = os.getenv("HF_MODEL", "deepseek-ai/deepseek-coder-1.3b-instruct") | |
| FRONTEND_ORIGIN = os.getenv("ALLOWED_ORIGIN", "*") | |
| PROJECT_API_KEY = os.getenv("PROJECT_API_KEY") | |
| if not SPITCH_API_KEY: | |
| raise RuntimeError("Set SPITCH_API_KEY in environment before starting.") | |
| os.environ["SPITCH_API_KEY"] = SPITCH_API_KEY | |
| spitch_client = Spitch() | |
| # HuggingFace LLM | |
| llm = HuggingFaceEndpoint( | |
| repo_id=HF_MODEL, | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| repetition_penalty=1.1, | |
| max_new_tokens=2048 | |
| ) | |
| # ----------------- FASTAPI ----------------- | |
| app = FastAPI(title="DevAssist AI Backend (FastAPI + LangChain)") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=[FRONTEND_ORIGIN] if FRONTEND_ORIGIN != "*" else ["*"], | |
| allow_credentials=True, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["Authorization", "Content-Type"], | |
| ) | |
| # ----------------- PROMPT TEMPLATES ----------------- | |
| chat_template = """You are DevAssist, an AI coding assistant. | |
| Guidelines: | |
| - Always format responses in Markdown. | |
| - Use section headers: Explanation:, Steps:, Fixed Code: | |
| - Use bullet points for steps. | |
| - Use fenced code blocks for code. | |
| - Be friendly yet professional. | |
| Question: {question} | |
| Answer: | |
| """ | |
| stt_chat_template = """You are DevAssist, an AI coding assistant. | |
| The input is transcribed speech. Interpret it as a developer question. | |
| Provide clear answers with code examples. | |
| If unclear, ask for clarification. | |
| Spoken Question: {speech} | |
| Answer: | |
| """ | |
| autodoc_template = """You are DevAssist DocBot. | |
| Read the code and produce professional documentation in markdown. | |
| Code: {code} | |
| Documentation: | |
| """ | |
| sme_template = """ | |
| You are a senior full-stack engineer specializing in modern front-end development. | |
| Your job is to generate **production-ready code** for websites and apps. | |
| Guidelines: | |
| - Always return three separate files: index.html, styles.css, and script.js | |
| - HTML must be semantic, responsive, and mobile-first | |
| - CSS should use Flexbox/Grid with hover/transition effects | |
| - JavaScript must add interactivity (animations, toggles, button actions) | |
| - Include hero, feature grid, testimonials, and footer | |
| - Use realistic content (no lorem ipsum, no placeholders) | |
| Prompt: {user_prompt} | |
| Context: {context} | |
| Output: | |
| """ | |
| # ----------------- CHAINS ----------------- | |
| chat_chain = PromptTemplate(input_variables=["question"], template=chat_template) | llm | |
| stt_chain = PromptTemplate(input_variables=["speech"], template=stt_chat_template) | llm | |
| autodoc_chain = PromptTemplate(input_variables=["code"], template=autodoc_template) | llm | |
| sme_chain = PromptTemplate(input_variables=["user_prompt", "context"], template=sme_template) | llm | |
| # ----------------- REQUEST MODELS ----------------- | |
| class ChatRequest(BaseModel): | |
| question: str | |
| class AutoDocRequest(BaseModel): | |
| code: str | |
| # ----------------- AUTH ----------------- | |
| def check_auth(authorization: str | None): | |
| if not PROJECT_API_KEY: | |
| return | |
| if not authorization or not authorization.startswith("Bearer "): | |
| raise HTTPException(status_code=401, detail="Missing bearer token") | |
| token = authorization.split(" ", 1)[1] | |
| if token != PROJECT_API_KEY: | |
| raise HTTPException(status_code=403, detail="Invalid token") | |
| # ----------------- HELPER FUNCTIONS ----------------- | |
| def run_chain(chain, input_dict: dict): | |
| """ | |
| Safely run a LangChain PromptTemplate | HuggingFaceEndpoint chain. | |
| Converts output to string and captures errors. | |
| """ | |
| try: | |
| # Render template | |
| if hasattr(chain, "prompt"): | |
| prompt_text = chain.prompt.format(**input_dict) | |
| else: | |
| prompt_text = str(input_dict) | |
| # Generate using HuggingFaceEndpoint (expects str input) | |
| output = chain.llm.generate([{"role": "user", "content": prompt_text}]) | |
| return output.generations[0][0].text.strip() | |
| except Exception: | |
| return {"success": False, "error": "⚠️ LLM error", "details": traceback.format_exc()} | |
| async def process_audio(file: UploadFile, lang_hint: str | None = None): | |
| suffix = os.path.splitext(file.filename)[1] or ".wav" | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tf: | |
| tf.write(await file.read()) | |
| tmp_path = tf.name | |
| with open(tmp_path, "rb") as f: | |
| audio_bytes = f.read() | |
| try: | |
| if lang_hint: | |
| resp = spitch_client.speech.transcribe(language=lang_hint, content=audio_bytes) | |
| else: | |
| resp = spitch_client.speech.transcribe(content=audio_bytes) | |
| except Exception: | |
| resp = spitch_client.speech.transcribe(language="en", content=audio_bytes) | |
| transcription = getattr(resp, "text", "") or (resp.get("text", "") if isinstance(resp, dict) else "") | |
| detected_lang = "en" | |
| try: | |
| detected_lang = detect(transcription) if transcription.strip() else "en" | |
| except Exception: | |
| pass | |
| translation = transcription | |
| if detected_lang != "en": | |
| try: | |
| translation_resp = spitch_client.text.translate(text=transcription, source=detected_lang, target="en") | |
| translation = getattr(translation_resp, "text", "") or translation_resp.get("text", "") | |
| except Exception: | |
| translation = transcription | |
| return transcription, detected_lang, translation | |
| # ----------------- ENDPOINTS ----------------- | |
| def root(): | |
| return {"status": "✅ DevAssist AI Backend running"} | |
| def chat(req: ChatRequest, authorization: str | None = Header(None)): | |
| check_auth(authorization) | |
| result = run_chain(chat_chain, {"question": req.question}) | |
| return result if isinstance(result, dict) else {"reply": result} | |
| async def stt_audio(file: UploadFile = File(...), lang_hint: str | None = None, authorization: str | None = Header(None)): | |
| check_auth(authorization) | |
| transcription, detected_lang, translation = await process_audio(file, lang_hint) | |
| result = run_chain(stt_chain, {"speech": translation}) | |
| return { | |
| "transcription": transcription, | |
| "detected_language": detected_lang, | |
| "translation": translation, | |
| "reply": result if isinstance(result, str) else result.get("reply", "") | |
| } | |
| def autodoc(req: AutoDocRequest, authorization: str | None = Header(None)): | |
| check_auth(authorization) | |
| result = run_chain(autodoc_chain, {"code": req.code}) | |
| return result if isinstance(result, dict) else {"documentation": result} | |
| async def sme_generate(payload: dict = Body(...), authorization: str | None = Header(None)): | |
| check_auth(authorization) | |
| try: | |
| user_prompt = payload.get("user_prompt", "") | |
| context_docs = retriever.get_relevant_documents(user_prompt) | |
| context = "\n".join([doc.page_content for doc in context_docs]) if context_docs else "No extra context" | |
| result = run_chain(sme_chain, {"user_prompt": user_prompt, "context": context}) | |
| return {"success": True, "data": result if isinstance(result, str) else result.get("reply", "")} | |
| except Exception: | |
| return {"success": False, "error": "⚠️ LLM error", "details": traceback.format_exc()} | |
| async def sme_speech_generate(file: UploadFile = File(...), lang_hint: str | None = None, authorization: str | None = Header(None)): | |
| check_auth(authorization) | |
| transcription, detected_lang, translation = await process_audio(file, lang_hint) | |
| try: | |
| context_docs = retriever.get_relevant_documents(translation) | |
| context = "\n".join([doc.page_content for doc in context_docs]) if context_docs else "No extra context" | |
| result = run_chain(sme_chain, {"user_prompt": translation, "context": context}) | |
| return { | |
| "success": True, | |
| "transcription": transcription, | |
| "detected_language": detected_lang, | |
| "translation": translation, | |
| "sme_site": result if isinstance(result, str) else result.get("reply", "") | |
| } | |
| except Exception: | |
| return {"success": False, "error": "⚠️ LLM error", "details": traceback.format_exc()} | |
| # ----------------- MAIN ----------------- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=False) | |