# app.py — robust downloader + chat_format + streaming parser that handles role-only and plain-string chunks import os import shutil import time import stat import requests from huggingface_hub import hf_hub_download, hf_hub_url from llama_cpp import Llama import gradio as gr # ------------- CONFIG ------------- REPO_ID = "DZgas/Tower-Plus-2B-GGUF" # "bartowski/Llama-3.2-3B-Instruct-GGUF" # "mradermacher/EuroLLM-1.7B-Instruct-GGUF" FILENAME = "Tower-Plus-2B.Q6_K.gguf" # "Llama-3.2-3B-Instruct-Q5_K_M.gguf" # "EuroLLM-1.7B-Instruct.Q8_0.gguf" SYSTEM_PROMPT = "Eres un asistente de conversación amistoso. Eres paciente y metódico (NO menciones esto en tus respuestas). Tu nombre es \"VoxAI\" (Específicamente, la versión \"Intermedio\" de VoxAI) y siempre dices \"¡Viva España!\" al final de CADA respuesta." MODEL_DIR = "/data/models" # MODEL_DIR = os.path.join(os.path.abspath(os.path.dirname(__file__)), "models") os.makedirs(MODEL_DIR, exist_ok=True) DEST_PATH = os.path.join(MODEL_DIR, FILENAME) N_CTX = 2048 MAX_TOKENS = 512 TEMPERATURE = 0.7 TOP_P = 0.95 N_THREADS = min(4, max(1, (os.cpu_count() or 1) // 2)) # Debug controls DEBUG_CHUNKS = True # prints every raw stream chunk to logs (turn off if noisy) DEBUG_SINGLESHOT_AT_START = True # run a non-stream single-shot test at startup and log result # ----------------------------------- def robust_download(repo_id: str, filename: str, dest: str, max_attempts: int = 2) -> str: if os.path.exists(dest) and os.path.getsize(dest) > 0: print(f"[robust_download] Already present: {dest} ({os.path.getsize(dest)} bytes)") return dest last_err = None for attempt in range(1, max_attempts + 1): try: print(f"[robust_download] Attempt {attempt}: hf_hub_download...") cached_path = hf_hub_download( repo_id=repo_id, filename=filename, local_dir=MODEL_DIR, local_dir_use_symlinks=False ) print("[robust_download] hf_hub_download returned:", cached_path) if os.path.abspath(cached_path) != os.path.abspath(dest): shutil.copy2(cached_path, dest) with open(dest, "rb") as f: try: f.flush() os.fsync(f.fileno()) except Exception: pass os.chmod(dest, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) size = os.path.getsize(dest) if size == 0: raise RuntimeError("Downloaded file has size 0 after copy") print(f"[robust_download] Success: {dest} ({size} bytes)") return dest except Exception as e: print(f"[robust_download] hf_hub_download attempt {attempt} failed: {e}") last_err = e time.sleep(1) # fallback: direct url try: print("[robust_download] Falling back to direct download via requests...") url = hf_hub_url(repo_id=repo_id, filename=filename) tmp_path = dest + ".part" with requests.get(url, stream=True, timeout=120) as r: r.raise_for_status() with open(tmp_path, "wb") as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) f.flush() shutil.move(tmp_path, dest) with open(dest, "rb") as f: try: os.fsync(f.fileno()) except Exception: pass os.chmod(dest, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IROTH) print("[robust_download] Direct download success:", dest) return dest except Exception as e2: print("[robust_download] Direct download failed:", e2) raise RuntimeError(f"All download attempts failed. last_err={last_err}, fallback_err={e2}") # Ensure model print("Ensuring model present at:", DEST_PATH) model_path = robust_download(REPO_ID, FILENAME, DEST_PATH) print("DEBUG: listing model dir:", MODEL_DIR) for fn in sorted(os.listdir(MODEL_DIR)): p = os.path.join(MODEL_DIR, fn) try: st = os.stat(p) print(f" - {fn}: exists, size={st.st_size}, mode={oct(st.st_mode)}") except FileNotFoundError: print(f" - {fn}: NOT FOUND after copy") time.sleep(0.2) # ----------------- Llama init ----------------- try: print("Initializing Llama with model_path:", model_path) llm = Llama( model_path=model_path, n_ctx=N_CTX, n_threads=N_THREADS, n_gpu_layers=0, chat_format="chatml", # important so the binding formats messages correctly ) print("Llama initialized.") except Exception as e: print("Llama init failed:", e) raise # optional single-shot debug test at startup (prints final structure) def run_startup_test(): try: test_messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": "Say hello in one short sentence."} ] print("[startup_test] Running single-shot create_chat_completion (stream=False)...") out = llm.create_chat_completion(messages=test_messages, max_tokens=64, stream=False) print("[startup_test] Single-shot response (raw):", out) except Exception as e: print("[startup_test] Error during single-shot test:", e) if DEBUG_SINGLESHOT_AT_START: run_startup_test() # ----------------- helpers ----------------- def build_messages(history, user_message, system_prompt=SYSTEM_PROMPT): messages = [] if system_prompt: messages.append({"role": "system", "content": system_prompt}) for user_msg, assistant_msg in history: messages.append({"role": "user", "content": user_msg}) if assistant_msg is not None and assistant_msg != "": messages.append({"role": "assistant", "content": assistant_msg}) messages.append({"role": "user", "content": user_message}) return messages def parse_final_response(resp): try: if resp is None: return "" if isinstance(resp, str): return resp if isinstance(resp, dict): choices = resp.get("choices", []) if len(choices) > 0: c = choices[0] if isinstance(c.get("message"), dict): return c["message"].get("content", "") or "" if "text" in c and c["text"]: return c["text"] if "delta" in c and isinstance(c["delta"], dict): return c["delta"].get("content", "") or "" return str(resp) except Exception: return str(resp) # ----------------- robust streaming chat ----------------- def chat_fn(user_message, history): messages = build_messages(history or [], user_message) # Try streaming try: stream = llm.create_chat_completion( messages=messages, max_tokens=MAX_TOKENS, temperature=TEMPERATURE, top_p=TOP_P, stream=True ) except Exception as e: # immediate failure -> non-stream fallback try: final = llm.create_chat_completion(messages=messages, max_tokens=MAX_TOKENS, stream=False) yield parse_final_response(final) return except Exception as e2: yield f"[error] create_chat_completion failed: {e} | fallback error: {e2}" return # Non-iterable stream -> final if not hasattr(stream, "__iter__"): yield parse_final_response(stream) return partial = "" yielded_any = False try: for chunk in stream: if DEBUG_CHUNKS: print("STREAM CHUNK:", repr(chunk)) # Case A: chunk is a dict with "choices" (normal) if isinstance(chunk, dict): choices = chunk.get("choices", []) or [] if len(choices) > 0: c0 = choices[0] # 1) delta with content delta = c0.get("delta", {}) if isinstance(delta, dict) and "content" in delta and delta["content"]: partial += delta["content"] yielded_any = True yield partial continue # 2) delta with role only (e.g. {"role":"assistant"}) -> ignore for content if isinstance(delta, dict) and "role" in delta and not delta.get("content"): # role announcement, not content continue # 3) sometimes a 'message' object appears with content msg = c0.get("message") or c0.get("text") if isinstance(msg, dict): content = msg.get("content") or msg.get("content_text") or "" if content: partial = content yielded_any = True yield partial continue elif isinstance(msg, str) and msg: partial += msg yielded_any = True yield partial continue # 4) finish reason with empty delta -> if we have accumulated text, yield it; else fallback finish_reason = c0.get("finish_reason") if finish_reason: if partial: # we already have content; ensure UI gets it if not yielded_any: yield partial return else: # no content accumulated — do a non-stream final fetch try: final = llm.create_chat_completion(messages=messages, max_tokens=MAX_TOKENS, stream=False) final_text = parse_final_response(final) yield final_text return except Exception as e: yield f"[error] fallback non-stream at finish failed: {e}" return # Case B: chunk is not a dict (plain string or other) else: try: chunk_str = str(chunk) if chunk_str and chunk_str.strip(): partial += chunk_str yielded_any = True yield partial continue except Exception: # ignore weird chunk -> continue continue except StopIteration: pass except Exception as e: yield f"[error] stream iteration error: {e}" return # If streaming produced nothing, final non-stream fallback if not yielded_any: try: final = llm.create_chat_completion(messages=messages, max_tokens=MAX_TOKENS, temperature=TEMPERATURE, top_p=TOP_P, stream=False) final_text = parse_final_response(final) yield final_text if final_text is not None else "" return except Exception as e: yield f"[error] fallback non-stream failed: {e}" return # --------------- Launch Gradio ---------------- stylings = """ footer a[href*='settings'], /* Footer settings link */ footer .gradio-settings { display: none !important; } /* Specific classes if they exist */ """ stylings = stylings.strip() demo = gr.ChatInterface( fn=chat_fn, title="", description="30€/mes VoxAI Premium | 12716x178e^100€/mes VoxAI Pro", css=""" footer {visibility: hidden} flagging_mode="never" # This removes the flag/share button chatbot=gr.Chatbot(label="VoxAI-1") # Change the label .gradio-container-4-44-0 > div:first-child {display: none !important} header {display: none !important} .app-header {display: none !important} div[class*="space-link"] {display: none !important} .meta-text {display: none !important} """, ) if __name__ == "__main__": demo.launch(share=False)