import os import pickle import logging import platform import gradio as gr from llama_cpp import Llama from huggingface_hub import hf_hub_download from langchain_huggingface import HuggingFaceEmbeddings # Qdrant filter models from qdrant_client.http.models import Filter, FieldCondition, MatchValue # ====================== LOGGING ====================== logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") logger = logging.getLogger(__name__) # ====================== CONFIG ====================== repo_id = "robertolofaro/articles-model" BACKENDS = { "FAISS - RAG (HNSW)": "FAISS", "Qdrant - RAG": "Qdrant", } _HERE = os.path.dirname(os.path.abspath(__file__)) METADATA_PATH = os.path.join(_HERE, "metadata.pkl") FAISS_PATH = os.path.join(_HERE, "faiss_hnsw") QDRANT_PATH = os.path.join(_HERE, "qdrant_db") QDRANT_COLLECTION = "articles" # ====================== GPU / HARDWARE DETECTION ====================== # Override everything with N_GPU_LAYERS env var when you need fine control. # Otherwise: CUDA → all layers on GPU (-1); Apple Silicon → Metal (-1); else CPU (0). def _detect_gpu_layers() -> int: override = os.environ.get("N_GPU_LAYERS") if override is not None: val = int(override) logger.info("N_GPU_LAYERS override: %d", val) return val try: import torch if torch.cuda.is_available(): logger.info("CUDA detected — offloading all layers to GPU") return -1 except ImportError: pass if platform.system() == "Darwin" and platform.machine() == "arm64": logger.info("Apple Silicon / Metal detected — offloading all layers to GPU") return -1 logger.info("No GPU detected — running on CPU only") return 0 N_GPU_LAYERS = _detect_gpu_layers() # ====================== LOAD METADATA ====================== def _load_metadata(): """Load the DataFrame from metadata.pkl; return None on any failure.""" try: with open(METADATA_PATH, "rb") as f: df = pickle.load(f) logger.info("metadata.pkl loaded — %d rows, columns: %s", len(df), df.columns.tolist()) return df except FileNotFoundError: logger.error("metadata.pkl not found at %s", METADATA_PATH) except Exception as exc: logger.error("Failed to load metadata.pkl: %s", exc) return None _METADATA_DF = _load_metadata() def load_category_list(): """Return ['All categories'] + sorted unique article_category values.""" if _METADATA_DF is not None and "article_category" in _METADATA_DF.columns: cats = sorted(_METADATA_DF["article_category"].dropna().unique().tolist()) logger.info("Found %d categories", len(cats)) return ["All categories"] + cats logger.warning("article_category column not found — showing only 'All categories'") return ["All categories"] def load_articles_for_category(category: str): """Return ['All articles in category'] + sorted titles for the given category.""" default = ["All articles in category"] if _METADATA_DF is None or "article_title" not in _METADATA_DF.columns: return default if category in ("All categories", None, ""): titles = sorted(_METADATA_DF["article_title"].dropna().unique().tolist()) else: mask = _METADATA_DF["article_category"] == category titles = sorted(_METADATA_DF.loc[mask, "article_title"].dropna().unique().tolist()) return default + titles CATEGORY_LIST = load_category_list() # ====================== LOAD LLM ====================== # LOCAL_MODEL_PATH env var lets you point to a local GGUF and skip the HF download. # N_THREADS env var overrides thread count (default: 4 on CPU, 2 on GPU). def _load_llm() -> Llama: local_model = os.environ.get("LOCAL_MODEL_PATH") if local_model and os.path.isfile(local_model): model_path = local_model logger.info("Using local model at %s", model_path) else: if local_model: logger.warning("LOCAL_MODEL_PATH set but file not found (%s) — downloading from HF", local_model) logger.info("Downloading model from HF hub (%s)…", repo_id) model_path = hf_hub_download( repo_id=repo_id, filename="articles-Q4_K_M.gguf", repo_type="model", token=os.environ.get("HF_TOKEN"), ) default_threads = 2 if N_GPU_LAYERS != 0 else 4 n_threads = int(os.environ.get("N_THREADS", default_threads)) logger.info("Llama init: n_gpu_layers=%d, n_threads=%d", N_GPU_LAYERS, n_threads) return Llama( model_path=model_path, n_ctx=8192, n_threads=n_threads, n_batch=512, n_ubatch=512, n_gpu_layers=N_GPU_LAYERS, verbose=False, ) llm = _load_llm() # ====================== RAG CACHE ====================== # ====================== VECTOR STORES ====================== vectorstores: dict = {} def get_vectorstore(backend_name: str): if backend_name in vectorstores: return vectorstores[backend_name] try: embeddings = HuggingFaceEmbeddings( model_name="BAAI/bge-small-en-v1.5", encode_kwargs={"normalize_embeddings": True}, ) if backend_name == "FAISS": # Modern recommended import (still under langchain-community) from langchain_community.vectorstores import FAISS vs = FAISS.load_local( FAISS_PATH, embeddings, allow_dangerous_deserialization=True ) logger.info("FAISS index loaded from %s", FAISS_PATH) elif backend_name == "Qdrant": # Modern Qdrant integration from langchain_qdrant import QdrantVectorStore from qdrant_client import QdrantClient client = QdrantClient( path=QDRANT_PATH, # path to your qdrant_db folder timeout=60, ) vs = QdrantVectorStore( client=client, collection_name=QDRANT_COLLECTION, embedding=embeddings, ) logger.info("Qdrant collection '%s' loaded from %s", QDRANT_COLLECTION, QDRANT_PATH) else: # fallback to FAISS from langchain_community.vectorstores import FAISS vs = FAISS.load_local( FAISS_PATH, embeddings, allow_dangerous_deserialization=True ) vectorstores[backend_name] = vs logger.info("Vector store '%s' loaded successfully", backend_name) return vs except Exception as exc: logger.error("Failed to load vector store '%s': %s", backend_name, exc) import traceback logger.error(traceback.format_exc()) return None def _rag_search(vs, query: str, k: int, article_filter: str, category_filter: str): """ Similarity search with optional metadata filtering. """ want_title = None if article_filter in (None, "", "All articles in category") else article_filter want_category = None if category_filter in (None, "", "All categories") else category_filter backend_type = type(vs).__name__ ## potential security fix as catchall for FAISS search failure #if "FAISS" in backend_type: #try: # pool_size = min(k * 10, 80) # pool = vs.similarity_search(query, k=pool_size) # # # ... rest of your filtering code ... # #except Exception as e: # logger.error("FAISS similarity_search failed: %s", e) # # Fallback: try without k limit or return empty # return vs.similarity_search(query, k=k) if "FAISS" in backend_type: # FAISS: post-filtering (unchanged) pool_size = min(k * 10, 80) pool = vs.similarity_search(query, k=pool_size) filtered = [] for doc in pool: meta = doc.metadata if want_title and meta.get("article_title") != want_title: continue if want_category and meta.get("article_category") != want_category: continue filtered.append(doc) if len(filtered) >= k: break if not filtered and (want_title or want_category): logger.warning( "FAISS post-filter (title=%r, cat=%r) matched 0 docs — returning unfiltered top-%d", want_title, want_category, k ) return pool[:k] logger.info( "FAISS post-filter (title=%r, cat=%r) → %d/%d docs kept", want_title, want_category, len(filtered), len(pool) ) return filtered else: # === QDRANT - FIXED METADATA FILTER === from qdrant_client.http.models import Filter, FieldCondition, MatchValue conditions = [] if want_title: conditions.append( FieldCondition( key="metadata.article_title", # ← Fixed: metadata. prefix match=MatchValue(value=want_title) ) ) elif want_category: conditions.append( FieldCondition( key="metadata.article_category", # ← Fixed: metadata. prefix match=MatchValue(value=want_category) ) ) filter_dict = Filter(must=conditions) if conditions else None try: docs = vs.similarity_search( query, k=k, filter=filter_dict ) logger.info( "Qdrant search (filter=%s) → %d docs", "title" if want_title else "category" if want_category else "none", len(docs) ) return docs except Exception as e: logger.error("Qdrant search failed with filter: %s", e) # Fallback: search without filter logger.warning("Falling back to unfiltered Qdrant search") return vs.similarity_search(query, k=k) # ====================== SYSTEM PROMPT ====================== SYSTEM_PROMPT = """You are the reference expert for the articles contained in the training \ of this model, all extracted from the website robertolofaro.com, and all focused on change. IMPORTANT: Relevant article excerpts retrieved via semantic search will be injected \ directly in the user message under the heading "Context:". You MUST use those excerpts \ as the primary source for your answer. Do not speculate about whether you have access \ to articles — the context IS provided inline when available. # Your Mission When a user asks a question, provide a structured response based ONLY on the article \ content provided in the Context section. Do not draw on general knowledge outside those \ sources. Do not provide article titles or article IDs — provide only the concepts the \ articles express. # Response Format 1. Executive Summary: A 2-3 sentence overview answering the core query. 2. Guidelines & Hints: A markdown list of specific answers/guidelines/hints found in \ the source material.""" # ====================== GENERATION FUNCTION ====================== def generate_response( message, history, rag_mode, category_filter, article_filter, max_tokens, temperature, top_p, repeat_penalty, suppress_thinking, ): # Strip any /nothink the user may have typed manually clean_message = message.replace("/nothink", "").strip() # Build prompt with last 4 history turns for context window economy full_prompt = f"<|im_start|>system\n{SYSTEM_PROMPT}<|im_end|>\n" for msg in history[-4:]: full_prompt += f"<|im_start|>{msg['role']}\n{msg['content']}<|im_end|>\n" # --- RAG retrieval --- backend = BACKENDS.get(rag_mode) context = "" if backend: vs = get_vectorstore(backend) if vs: try: docs = _rag_search( vs, clean_message, k=5, article_filter=article_filter, category_filter=category_filter, ) if docs: context = "\n\n".join( f"[Article: {doc.metadata.get('article_title', 'N/A')}] " f"{doc.page_content[:700]}" for doc in docs ) logger.info( "RAG: %d chunks injected (article=%r, cat=%r)", len(docs), article_filter, category_filter, ) else: logger.warning("RAG returned 0 chunks — answering without context") except Exception as exc: logger.error("RAG retrieval failed: %s", exc) # Qwen3 /nothink MUST appear on its own line at the very end of the user turn. # A leading space (e.g. " /nothink") is NOT recognised by the tokeniser. nothink_suffix = "\n/nothink" if suppress_thinking else "" if context: full_prompt += ( f"<|im_start|>user\nContext:\n{context}\n\n" f"Question: {clean_message}{nothink_suffix}<|im_end|>\n" ) else: full_prompt += ( f"<|im_start|>user\n{clean_message}{nothink_suffix}<|im_end|>\n" ) full_prompt += "<|im_start|>assistant\n" # Sanitise generation params max_tokens_val = int(max_tokens) if max_tokens is not None else 900 temp_val = float(temperature) if temperature is not None else 0.65 top_p_val = float(top_p) if top_p is not None else 0.9 rep_penalty_val = float(repeat_penalty) if repeat_penalty is not None else 1.1 partial_text = "" for chunk in llm( full_prompt, max_tokens=max_tokens_val, temperature=temp_val, top_p=top_p_val, repeat_penalty=rep_penalty_val, stop=["<|im_end|>", "<|im_start|>"], stream=True, ): token = chunk["choices"][0]["text"] partial_text += token yield partial_text # ====================== GRADIO INTERFACE ====================== with gr.Blocks(title="Article Q&A model") as demo: gr.Markdown("# sourcing 350+ articles on change") gr.Markdown( "Qwen3.5-4B DoRA fine-tuned on 350+ articles on change from robertolofaro.com — " "experimental demo on CPU-only, to test embedding methods (takes a few minutes, " "you can restrict by category, and then a specific article) — updated as of 2026-05-05" ) gr.Markdown( "**NOTAM:** by querying this model you access the articles and metadata " "available on robertolofaro.com and GitHub. " "Answers reflect the article corpus only — do not treat them as advice, " "just expression of a position derived from material contained within the articles. " "If you want to read actual positions expressed within articles, you can read the articles " "(see the model repository for all links to the available options)." ) gr.Markdown( "If, after getting an answer, you want something tailored to your context, " "contact a consultant (myself included)." ) with gr.Row(): rag_mode = gr.Radio( choices=list(BACKENDS.keys()), value="FAISS - RAG (HNSW)", label="Retrieval backend", ) suppress_thinking = gr.Checkbox( value=True, label="Suppress model thinking (/nothink)", info="Uncheck to see the model's reasoning chain", ) with gr.Row(): category_filter = gr.Dropdown( choices=CATEGORY_LIST, value="All categories", label="Filter by category", info=f"{len(CATEGORY_LIST) - 1} categories available", ) article_filter = gr.Dropdown( choices=["All articles in category"], value="All articles in category", label="Narrow to specific article (optional)", info="Select a category first to populate this list", ) # Dynamically populate the article dropdown when category changes def update_article_dropdown(category): articles = load_articles_for_category(category) return gr.Dropdown(choices=articles, value=articles[0]) category_filter.change( fn=update_article_dropdown, inputs=category_filter, outputs=article_filter, ) with gr.Accordion("Advanced Generation Parameters", open=False): max_tokens = gr.Slider(256, 2048, value=900, step=64, label="Max Tokens") temperature = gr.Slider(0.0, 1.0, value=0.65, step=0.05, label="Temperature") top_p = gr.Slider(0.0, 1.0, value=0.9, step=0.05, label="Top-p") repeat_penalty = gr.Slider(1.0, 2.0, value=1.1, step=0.05, label="Repeat Penalty") gr.ChatInterface( fn=generate_response, additional_inputs=[ rag_mode, category_filter, article_filter, max_tokens, temperature, top_p, repeat_penalty, suppress_thinking, ], cache_examples=False, examples=[ ["What is the potential for Italy?"], ["What is the potential for Turin?"], ], ) if __name__ == "__main__": demo.queue(default_concurrency_limit=1).launch()