"""Private Knowledge AI, Hugging Face Space demo v08. Token-active Hugging Face demo: Qwen generation through HF Inference API, semantic embeddings, lexical reranking, and extractive fallback. """ from __future__ import annotations import hashlib import html import json import os import re import spaces from collections import Counter, defaultdict from dataclasses import asdict, dataclass from pathlib import Path from typing import Any, List, Optional, Tuple import gradio as gr import numpy as np os.environ["CUDA_VISIBLE_DEVICES"] = "" os.environ["TOKENIZERS_PARALLELISM"] = "false" APP_TITLE = os.getenv("APP_TITLE", "Private Knowledge AI") APP_SUBTITLE = os.getenv("APP_SUBTITLE", "Upload documents, index them locally in the Space runtime, and ask source-grounded questions.") APP_PROFILE = os.getenv("APP_PROFILE", "hf_token").strip().lower() EMBEDDING_BACKEND = os.getenv("EMBEDDING_BACKEND", "sentence_transformers").strip().lower() # hash | sentence_transformers EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "llamaindex/vdr-2b-multi-v1") LLM_MODE = os.getenv("LLM_MODE", "hf_api").strip().lower() # extractive | hf_api | local_transformers HF_LLM_MODEL = os.getenv("HF_LLM_MODEL", "llamaindex/vdr-2b-multi-v1") MAX_CHUNK_CHARS = int(os.getenv("MAX_CHUNK_CHARS", "1100" if APP_PROFILE == "hf_token" else "900" if APP_PROFILE == "zero" else "1200")) CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "160" if APP_PROFILE == "hf_token" else "120" if APP_PROFILE == "zero" else "180")) MAX_FILE_CHARS = int(os.getenv("MAX_FILE_CHARS", "500000" if APP_PROFILE == "hf_token" else "300000" if APP_PROFILE == "zero" else "600000")) MAX_TOTAL_CHUNKS = int(os.getenv("MAX_TOTAL_CHUNKS", "1800" if APP_PROFILE == "hf_token" else "1200" if APP_PROFILE == "zero" else "2500")) FEATURE_HASH_DIM = int(os.getenv("FEATURE_HASH_DIM", "1024")) HF_TOKEN = os.getenv("HF_TOKEN") RERANKER_MODE = os.getenv("RERANKER_MODE", "lexical").strip().lower() # none | lexical HYBRID_ALPHA = float(os.getenv("HYBRID_ALPHA", "0.82")) os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") os.environ.setdefault("HF_HOME", os.getenv("HF_HOME", str(Path.home() / ".cache" / "huggingface"))) @dataclass class Chunk: id: str source: str page: Optional[int] chunk_id: int text: str char_count: int CHUNKS: List[Chunk] = [] EMBEDDINGS: Optional[np.ndarray] = None _EMBEDDER = None _LOCAL_LLM: Optional[Tuple[Any, Any, Any]] = None def _safe_filename(path_or_name: str) -> str: name = Path(str(path_or_name)).name return re.sub(r"[^\w.()\- ]+", "_", name, flags=re.UNICODE)[:180] or "document" def _hash_text(text: str) -> str: return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()[:16] def _normalize_text(text: str) -> str: text = text.replace("\x00", " ") text = re.sub(r"[ \t]+", " ", text) text = re.sub(r"\n{3,}", "\n\n", text) return text.strip() def _truncate(text: str, limit: int = MAX_FILE_CHARS) -> str: return text if len(text) <= limit else text[:limit] + "\n\n[TRUNCATED: MAX_FILE_CHARS exceeded]" def _read_txt(path: Path) -> List[Tuple[str, Optional[int]]]: raw = path.read_bytes() for enc in ("utf-8", "utf-8-sig", "cp1254", "latin-1"): try: return [(_truncate(raw.decode(enc)), None)] except UnicodeDecodeError: pass return [(_truncate(raw.decode("utf-8", errors="replace")), None)] def _read_pdf(path: Path) -> List[Tuple[str, Optional[int]]]: try: from pypdf import PdfReader reader = PdfReader(str(path)) pages = [] for i, page in enumerate(reader.pages, start=1): text = _normalize_text(page.extract_text() or "") if text: pages.append((_truncate(text), i)) return pages or [("[No extractable PDF text found. OCR is not enabled in zero profile.]", None)] except Exception as exc: return [(f"[PDF parser error: {exc}]", None)] def _read_docx(path: Path) -> List[Tuple[str, Optional[int]]]: try: import docx doc = docx.Document(str(path)) parts = [p.text.strip() for p in doc.paragraphs if p.text.strip()] for table in doc.tables: for row in table.rows: cells = [cell.text.strip() for cell in row.cells] if any(cells): parts.append(" | ".join(cells)) return [(_truncate("\n".join(parts)), None)] except Exception as exc: return [(f"[DOCX parser error: {exc}]", None)] def _read_csv(path: Path) -> List[Tuple[str, Optional[int]]]: try: import pandas as pd try: df = pd.read_csv(path) except UnicodeDecodeError: df = pd.read_csv(path, encoding="latin-1") meta = f"Rows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n\n" return [(_truncate(meta + df.head(300).to_csv(index=False)), None)] except Exception as exc: return [(f"[CSV parser error: {exc}]", None)] def _read_xlsx(path: Path) -> List[Tuple[str, Optional[int]]]: try: import pandas as pd sheets = pd.read_excel(path, sheet_name=None) parts = [] for sheet, df in sheets.items(): parts.append(f"Sheet: {sheet}\nRows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n{df.head(200).to_csv(index=False)}") return [(_truncate("\n\n".join(parts)), None)] except Exception as exc: return [(f"[XLSX parser error: {exc}]", None)] def read_document(path_str: str) -> List[Tuple[str, Optional[int]]]: path = Path(path_str) suffix = path.suffix.lower() if suffix in {".txt", ".md", ".markdown", ".rst", ".log"}: return _read_txt(path) if suffix == ".pdf": return _read_pdf(path) if suffix == ".docx": return _read_docx(path) if suffix == ".csv": return _read_csv(path) if suffix in {".xlsx", ".xlsm"}: return _read_xlsx(path) return _read_txt(path) def chunk_text(text: str, source: str, page: Optional[int]) -> List[Chunk]: text = _normalize_text(text) if not text: return [] chunks, start, cid = [], 0, 1 max_chars = max(350, MAX_CHUNK_CHARS) overlap = min(max(0, CHUNK_OVERLAP), max_chars // 3) while start < len(text): end = min(start + max_chars, len(text)) if end < len(text): window = text[start:end] cut = max(window.rfind("\n\n"), window.rfind(". "), window.rfind("; "), window.rfind(", ")) if cut > max_chars * 0.55: end = start + cut + 1 piece = text[start:end].strip() if piece: chunks.append(Chunk(_hash_text(f"{source}:{page}:{cid}:{piece}"), source, page, cid, piece, len(piece))) cid += 1 if end >= len(text): break start = max(end - overlap, start + 1) return chunks def _tokenize_for_hash(text: str) -> List[str]: return re.findall(r"[\wçğıöşüÇĞİÖŞÜ]+", text.lower(), flags=re.UNICODE) def _lexical_overlap_score(query: str, text: str) -> float: q_tokens = set(_tokenize_for_hash(query)) if not q_tokens: return 0.0 t_counts = Counter(_tokenize_for_hash(text)) hit = sum(1 for t in q_tokens if t in t_counts) density = sum(min(t_counts.get(t, 0), 3) for t in q_tokens) / max(1, len(q_tokens) * 3) return float((hit / max(1, len(q_tokens))) * 0.7 + density * 0.3) def _hash_vector(text: str, dim: int = FEATURE_HASH_DIM) -> np.ndarray: vec = np.zeros(dim, dtype=np.float32) counts = Counter(_tokenize_for_hash(text)) for token, count in counts.items(): digest = hashlib.md5(token.encode("utf-8", errors="ignore")).hexdigest() idx = int(digest[:8], 16) % dim sign = 1.0 if int(digest[8:10], 16) % 2 == 0 else -1.0 vec[idx] += sign * (1.0 + np.log1p(count)) norm = float(np.linalg.norm(vec)) if norm > 0: vec /= norm return vec def _needs_e5_prefix(model_name: str) -> bool: return "e5" in model_name.lower() def _get_embedder(): global _EMBEDDER if _EMBEDDER is None: try: from sentence_transformers import SentenceTransformer except Exception as exc: raise RuntimeError("sentence-transformers is not installed. Use EMBEDDING_BACKEND=hash or requirements-full.txt.") from exc _EMBEDDER = SentenceTransformer(EMBEDDING_MODEL, device="cpu") return _EMBEDDER def _encode_passages(texts: List[str]) -> np.ndarray: if EMBEDDING_BACKEND == "hash": return np.vstack([_hash_vector(t) for t in texts]).astype(np.float32) embedder = _get_embedder() encoded = [f"passage: {t}" for t in texts] if _needs_e5_prefix(EMBEDDING_MODEL) else texts return np.asarray(embedder.encode(encoded, batch_size=16, normalize_embeddings=True, show_progress_bar=False), dtype=np.float32) def _encode_query(query: str) -> np.ndarray: if EMBEDDING_BACKEND == "hash": return _hash_vector(query) embedder = _get_embedder() encoded = f"query: {query}" if _needs_e5_prefix(EMBEDDING_MODEL) else query return np.asarray(embedder.encode([encoded], normalize_embeddings=True, show_progress_bar=False), dtype=np.float32)[0] def _append_chunks(new_chunks: List[Chunk]) -> None: global CHUNKS, EMBEDDINGS if not new_chunks: return if len(CHUNKS) + len(new_chunks) > MAX_TOTAL_CHUNKS: new_chunks = new_chunks[: max(0, MAX_TOTAL_CHUNKS - len(CHUNKS))] if not new_chunks: return vectors = _encode_passages([c.text for c in new_chunks]) CHUNKS.extend(new_chunks) EMBEDDINGS = vectors if EMBEDDINGS is None else np.vstack([EMBEDDINGS, vectors]) def _markdown_table(rows: List[List[str]], headers: List[str]) -> str: if not rows: return "_No records._" lines = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"] for row in rows: lines.append("| " + " | ".join(str(c).replace("|", "\\|").replace("\n", " ") for c in row) + " |") return "\n".join(lines) def ingest_files(files: Optional[List[str]], reset_first: bool = True) -> str: """Index uploaded files. Public Gradio API endpoint: ingest_files.""" global CHUNKS, EMBEDDINGS if reset_first: CHUNKS, EMBEDDINGS = [], None if not files: return "No file received. Upload PDF, DOCX, TXT, MD, CSV or XLSX files." if isinstance(files, (str, Path)): files = [str(files)] rows, errors, total_new = [], [], 0 for file_path in files: try: source = _safe_filename(str(file_path)) doc_chunks = [] segments = read_document(str(file_path)) for text, page in segments: doc_chunks.extend(chunk_text(text, source, page)) before = len(CHUNKS) _append_chunks(doc_chunks) added = len(CHUNKS) - before total_new += added rows.append([source, str(len(segments)), str(added)]) except Exception as exc: errors.append(f"{_safe_filename(str(file_path))}: {type(exc).__name__}: {exc}") msg = [ f"Indexed chunks: **{len(CHUNKS)}**", f"New chunks added: **{total_new}**", f"Embedding backend: `{EMBEDDING_BACKEND}`", f"Embedding model: `{EMBEDDING_MODEL if EMBEDDING_BACKEND != 'hash' else 'feature-hash'}`", "", _markdown_table(rows, ["File", "Segments", "Chunks added"]), ] if errors: msg += ["", "### Errors", "\n".join(f"- `{e}`" for e in errors)] return "\n".join(msg) def load_sample_documents() -> str: """Load sample documents shipped with the Space.""" files = [str(p) for p in sorted((Path(__file__).parent / "examples").glob("*")) if p.is_file()] return ingest_files(files, reset_first=True) def retrieve(query: str, top_k: int = 5) -> List[dict]: if EMBEDDINGS is None or not CHUNKS: return [] requested = max(1, min(int(top_k), len(CHUNKS))) q = _encode_query(query) vector_scores = EMBEDDINGS @ q candidate_n = min(len(CHUNKS), max(requested * 4, requested)) idxs = np.argsort(-vector_scores)[:candidate_n] ranked = [] for i in idxs: i = int(i) vector = float(vector_scores[i]) lexical = _lexical_overlap_score(query, CHUNKS[i].text) if RERANKER_MODE == "lexical" else 0.0 score = (HYBRID_ALPHA * vector + (1.0 - HYBRID_ALPHA) * lexical) if RERANKER_MODE == "lexical" else vector ranked.append({"score": float(score), "vector_score": vector, "lexical_score": float(lexical), "chunk": CHUNKS[i]}) ranked.sort(key=lambda x: x["score"], reverse=True) return [{"rank": r, **item} for r, item in enumerate(ranked[:requested], start=1)] def _build_context(results: List[dict], max_chars: int = 6000) -> str: blocks, used = [], 0 for item in results: c: Chunk = item["chunk"] page = f", page {c.page}" if c.page else "" block = f"[S{item['rank']}] Source: {c.source}{page}, chunk {c.chunk_id}\n{c.text}" if used + len(block) > max_chars: break blocks.append(block) used += len(block) return "\n\n".join(blocks) def _sources_markdown(results: List[dict]) -> str: rows = [] for item in results: c: Chunk = item["chunk"] rows.append([f"S{item['rank']}", f"{item['score']:.3f}", f"{item.get('vector_score', item['score']):.3f}", f"{item.get('lexical_score', 0.0):.3f}", c.source, str(c.page or ""), str(c.chunk_id), html.escape(c.text[:350].replace("\n", " "))]) return _markdown_table(rows, ["ID", "Score", "Vector", "Lexical", "Source", "Page", "Chunk", "Snippet"]) def _prompt(query: str, results: List[dict]) -> str: return f"""You are Private Knowledge AI. Answer only from the provided context. If evidence is insufficient, say so. Cite sources inline as [S1]. Give a direct answer, then evidence, then limitations.\n\nContext:\n{_build_context(results)}\n\nQuestion:\n{query}\n\nAnswer:""" def _answer_extractive(query: str, results: List[dict]) -> str: if not results: return "No indexed context found. Upload and index documents first." bullets = [] for item in results[:5]: c: Chunk = item["chunk"] sentences = re.split(r"(?<=[.!?])\s+", c.text.replace("\n", " ")) selected = (" ".join(sentences[:2]).strip() or c.text[:500])[:700] page = f", p. {c.page}" if c.page else "" bullets.append(f"- **[S{item['rank']}] {c.source}{page}:** {selected}") return "\n".join(["### Answer", "Extractive, source-grounded answer. Generative LLM is disabled.", "", *bullets, "", "### Confidence", "Medium when top scores are high and sources converge. Low when context is sparse."]) def _answer_hf_api(query: str, results: List[dict]) -> str: try: from huggingface_hub import InferenceClient except Exception as exc: return f"HF API mode unavailable: `{exc}`. Switch to extractive mode." if not HF_TOKEN: return "HF API mode requires `HF_TOKEN` as a Space secret." try: client = InferenceClient(token=HF_TOKEN) completion = client.chat.completions.create( model=HF_LLM_MODEL, messages=[{"role": "system", "content": "Answer only from context. Cite [S1]. Be concise."}, {"role": "user", "content": _prompt(query, results)}], max_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")), temperature=float(os.getenv("TEMPERATURE", "0.2")), ) return completion.choices[0].message.content.strip() except Exception as chat_exc: try: client = InferenceClient(model=HF_LLM_MODEL, token=HF_TOKEN) return client.text_generation(_prompt(query, results), max_new_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")), temperature=float(os.getenv("TEMPERATURE", "0.2"))).strip() except Exception as text_exc: return f"HF API generation failed. Chat: `{chat_exc}`. Text: `{text_exc}`." def _get_local_llm(): global _LOCAL_LLM if _LOCAL_LLM is not None: return _LOCAL_LLM import torch from transformers import AutoModelForCausalLM, AutoTokenizer tokenizer = AutoTokenizer.from_pretrained(HF_LLM_MODEL) model = AutoModelForCausalLM.from_pretrained(HF_LLM_MODEL, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device); model.eval() _LOCAL_LLM = (tokenizer, model, device) return _LOCAL_LLM def _answer_local_transformers(query: str, results: List[dict]) -> str: try: import torch tokenizer, model, device = _get_local_llm() text = tokenizer.apply_chat_template([{"role": "system", "content": "Answer only from context. Cite sources."}, {"role": "user", "content": _prompt(query, results)}], tokenize=False, add_generation_prompt=True) if hasattr(tokenizer, "apply_chat_template") else _prompt(query, results) inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=12000).to(device) with torch.no_grad(): generated = model.generate(**inputs, max_new_tokens=600, do_sample=False, pad_token_id=tokenizer.eos_token_id) return tokenizer.decode(generated[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True).strip() except Exception as exc: return f"Local Transformers generation failed: `{type(exc).__name__}: {exc}`. Use extractive or hf_api mode." def test_hf_token() -> str: """Check whether HF_TOKEN and the configured Qwen model are usable.""" if not HF_TOKEN: return "HF_TOKEN is missing. Add it under Space Settings -> Variables and secrets -> New secret." try: from huggingface_hub import InferenceClient client = InferenceClient(token=HF_TOKEN) completion = client.chat.completions.create( model=HF_LLM_MODEL, messages=[{"role": "user", "content": "Reply with exactly: HF_OK"}], max_tokens=8, temperature=0.0, ) txt = completion.choices[0].message.content.strip() return f"HF token active. Model: `{HF_LLM_MODEL}`. Test response: `{txt}`" except Exception as exc: return f"HF token/model test failed: `{type(exc).__name__}: {exc}`" @spaces.GPU def ask_question(query: str, top_k: int = 5, answer_mode: str = "auto") -> Tuple[str, str]: """Ask a question against indexed documents. Public Gradio API endpoint: ask_question.""" query = (query or "").strip() if not query: return "Enter a question.", "" if EMBEDDINGS is None or not CHUNKS: return "Upload and index documents first, or click 'Load sample documents'.", "" results = retrieve(query, int(top_k)) mode = (answer_mode or "auto").strip().lower() if mode == "auto": mode = LLM_MODE answer = _answer_hf_api(query, results) if mode == "hf_api" else _answer_local_transformers(query, results) if mode == "local_transformers" else _answer_extractive(query, results) return answer, _sources_markdown(results) def list_sources() -> str: """List indexed sources. Public Gradio API endpoint: list_sources.""" if not CHUNKS: return "No indexed sources." counts, pages = Counter(c.source for c in CHUNKS), defaultdict(set) for c in CHUNKS: if c.page: pages[c.source].add(c.page) rows = [[src, str(cnt), f"{min(pages[src])}-{max(pages[src])}" if pages[src] else ""] for src, cnt in sorted(counts.items())] return _markdown_table(rows, ["Source", "Chunks", "Pages"]) def reset_index() -> str: """Clear the in-memory document index. Public Gradio API endpoint: reset_index.""" global CHUNKS, EMBEDDINGS CHUNKS, EMBEDDINGS = [], None return "Index cleared." def runtime_status() -> str: """Return runtime configuration. Public Gradio API endpoint: runtime_status.""" payload = {"app_profile": APP_PROFILE, "embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "llm_mode": LLM_MODE, "hf_llm_model": HF_LLM_MODEL, "hf_token_present": bool(HF_TOKEN), "reranker_mode": RERANKER_MODE, "hybrid_alpha": HYBRID_ALPHA, "max_chunk_chars": MAX_CHUNK_CHARS, "max_total_chunks": MAX_TOTAL_CHUNKS, "chunk_count": len(CHUNKS)} return "```json\n" + json.dumps(payload, ensure_ascii=False, indent=2) + "\n```" def export_index_json() -> str: """Return index metadata as JSON text. Public Gradio API endpoint: export_index_json.""" payload = {"embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "chunks": [asdict(c) for c in CHUNKS], "chunk_count": len(CHUNKS)} return json.dumps(payload, ensure_ascii=False, indent=2) def build_app() -> gr.Blocks: description = f""" # {APP_TITLE} {APP_SUBTITLE} **v08 profile:** `{APP_PROFILE}` **Embedding backend:** `{EMBEDDING_BACKEND}` **Default LLM:** `{HF_LLM_MODEL}` through Hugging Face Inference API. **Token status:** `{bool(HF_TOKEN)}` **Fallback:** extractive RAG if token/API fails. """ with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(description) with gr.Tab("1. Upload / Index"): gr.Markdown("Upload files, then build a semantic RAG index. Supported: PDF, DOCX, TXT, MD, CSV, XLSX.") files = gr.File(label="Documents", file_count="multiple", type="filepath") reset_first = gr.Checkbox(label="Reset index before ingest", value=True) with gr.Row(): ingest_btn = gr.Button("Index uploaded documents", variant="primary") sample_btn = gr.Button("Load sample documents") ingest_status = gr.Markdown() ingest_btn.click(fn=ingest_files, inputs=[files, reset_first], outputs=ingest_status, api_name="ingest_files") sample_btn.click(fn=load_sample_documents, inputs=None, outputs=ingest_status, api_name="load_sample_documents") with gr.Tab("2. Ask"): query = gr.Textbox(label="Question", lines=3, placeholder="Ask a question about the indexed documents...") with gr.Row(): top_k = gr.Slider(label="Top-k chunks", minimum=1, maximum=10, step=1, value=5) mode = gr.Radio(label="Answer mode", choices=["auto", "hf_api", "extractive", "local_transformers"], value="auto") ask_btn = gr.Button("Ask", variant="primary") answer = gr.Markdown(label="Answer") sources = gr.Markdown(label="Retrieved sources") ask_btn.click(fn=ask_question, inputs=[query, top_k, mode], outputs=[answer, sources], api_name="ask_question") with gr.Tab("3. Admin / API"): gr.Markdown("Agent-callable endpoints: `ingest_files`, `ask_question`, `list_sources`, `reset_index`, `export_index_json`, `runtime_status`, `test_hf_token`.") gr.Markdown(runtime_status()) with gr.Row(): list_btn = gr.Button("List sources") reset_btn = gr.Button("Reset index") status_btn = gr.Button("Runtime status") token_btn = gr.Button("Test HF token / Qwen") export_btn = gr.Button("Export index metadata") admin_out = gr.Markdown() export_out = gr.Code(label="Index JSON", language="json") list_btn.click(fn=list_sources, inputs=None, outputs=admin_out, api_name="list_sources") reset_btn.click(fn=reset_index, inputs=None, outputs=admin_out, api_name="reset_index") status_btn.click(fn=runtime_status, inputs=None, outputs=admin_out, api_name="runtime_status") token_btn.click(fn=test_hf_token, inputs=None, outputs=admin_out, api_name="test_hf_token") export_btn.click(fn=export_index_json, inputs=None, outputs=export_out, api_name="export_index_json") gr.Markdown("**Privacy note:** HF API modunda soru ve retrieved context Hugging Face Inference API’ye gider. Public Space’e gizli belge yüklemeyin. Gerçek müşteri verisi için Private Space veya on-premise sürüm kullanın.") return demo demo = build_app() if __name__ == "__main__": demo.launch()