Spaces:
Sleeping
Sleeping
| """Private Knowledge AI, Hugging Face Space demo v08. | |
| Token-active Hugging Face demo: Qwen generation through HF Inference API, semantic embeddings, lexical reranking, and extractive fallback. | |
| """ | |
| from __future__ import annotations | |
| import hashlib | |
| import html | |
| import json | |
| import os | |
| import re | |
| import spaces | |
| from collections import Counter, defaultdict | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| from typing import Any, List, Optional, Tuple | |
| import gradio as gr | |
| import numpy as np | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
| os.environ["TOKENIZERS_PARALLELISM"] = "false" | |
| APP_TITLE = os.getenv("APP_TITLE", "Private Knowledge AI") | |
| APP_SUBTITLE = os.getenv("APP_SUBTITLE", "Upload documents, index them locally in the Space runtime, and ask source-grounded questions.") | |
| APP_PROFILE = os.getenv("APP_PROFILE", "hf_token").strip().lower() | |
| EMBEDDING_BACKEND = os.getenv("EMBEDDING_BACKEND", "sentence_transformers").strip().lower() # hash | sentence_transformers | |
| EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "llamaindex/vdr-2b-multi-v1") | |
| LLM_MODE = os.getenv("LLM_MODE", "hf_api").strip().lower() # extractive | hf_api | local_transformers | |
| HF_LLM_MODEL = os.getenv("HF_LLM_MODEL", "llamaindex/vdr-2b-multi-v1") | |
| MAX_CHUNK_CHARS = int(os.getenv("MAX_CHUNK_CHARS", "1100" if APP_PROFILE == "hf_token" else "900" if APP_PROFILE == "zero" else "1200")) | |
| CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "160" if APP_PROFILE == "hf_token" else "120" if APP_PROFILE == "zero" else "180")) | |
| MAX_FILE_CHARS = int(os.getenv("MAX_FILE_CHARS", "500000" if APP_PROFILE == "hf_token" else "300000" if APP_PROFILE == "zero" else "600000")) | |
| MAX_TOTAL_CHUNKS = int(os.getenv("MAX_TOTAL_CHUNKS", "1800" if APP_PROFILE == "hf_token" else "1200" if APP_PROFILE == "zero" else "2500")) | |
| FEATURE_HASH_DIM = int(os.getenv("FEATURE_HASH_DIM", "1024")) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| RERANKER_MODE = os.getenv("RERANKER_MODE", "lexical").strip().lower() # none | lexical | |
| HYBRID_ALPHA = float(os.getenv("HYBRID_ALPHA", "0.82")) | |
| os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") | |
| os.environ.setdefault("HF_HOME", os.getenv("HF_HOME", str(Path.home() / ".cache" / "huggingface"))) | |
| class Chunk: | |
| id: str | |
| source: str | |
| page: Optional[int] | |
| chunk_id: int | |
| text: str | |
| char_count: int | |
| CHUNKS: List[Chunk] = [] | |
| EMBEDDINGS: Optional[np.ndarray] = None | |
| _EMBEDDER = None | |
| _LOCAL_LLM: Optional[Tuple[Any, Any, Any]] = None | |
| def _safe_filename(path_or_name: str) -> str: | |
| name = Path(str(path_or_name)).name | |
| return re.sub(r"[^\w.()\- ]+", "_", name, flags=re.UNICODE)[:180] or "document" | |
| def _hash_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()[:16] | |
| def _normalize_text(text: str) -> str: | |
| text = text.replace("\x00", " ") | |
| text = re.sub(r"[ \t]+", " ", text) | |
| text = re.sub(r"\n{3,}", "\n\n", text) | |
| return text.strip() | |
| def _truncate(text: str, limit: int = MAX_FILE_CHARS) -> str: | |
| return text if len(text) <= limit else text[:limit] + "\n\n[TRUNCATED: MAX_FILE_CHARS exceeded]" | |
| def _read_txt(path: Path) -> List[Tuple[str, Optional[int]]]: | |
| raw = path.read_bytes() | |
| for enc in ("utf-8", "utf-8-sig", "cp1254", "latin-1"): | |
| try: | |
| return [(_truncate(raw.decode(enc)), None)] | |
| except UnicodeDecodeError: | |
| pass | |
| return [(_truncate(raw.decode("utf-8", errors="replace")), None)] | |
| def _read_pdf(path: Path) -> List[Tuple[str, Optional[int]]]: | |
| try: | |
| from pypdf import PdfReader | |
| reader = PdfReader(str(path)) | |
| pages = [] | |
| for i, page in enumerate(reader.pages, start=1): | |
| text = _normalize_text(page.extract_text() or "") | |
| if text: | |
| pages.append((_truncate(text), i)) | |
| return pages or [("[No extractable PDF text found. OCR is not enabled in zero profile.]", None)] | |
| except Exception as exc: | |
| return [(f"[PDF parser error: {exc}]", None)] | |
| def _read_docx(path: Path) -> List[Tuple[str, Optional[int]]]: | |
| try: | |
| import docx | |
| doc = docx.Document(str(path)) | |
| parts = [p.text.strip() for p in doc.paragraphs if p.text.strip()] | |
| for table in doc.tables: | |
| for row in table.rows: | |
| cells = [cell.text.strip() for cell in row.cells] | |
| if any(cells): | |
| parts.append(" | ".join(cells)) | |
| return [(_truncate("\n".join(parts)), None)] | |
| except Exception as exc: | |
| return [(f"[DOCX parser error: {exc}]", None)] | |
| def _read_csv(path: Path) -> List[Tuple[str, Optional[int]]]: | |
| try: | |
| import pandas as pd | |
| try: | |
| df = pd.read_csv(path) | |
| except UnicodeDecodeError: | |
| df = pd.read_csv(path, encoding="latin-1") | |
| meta = f"Rows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n\n" | |
| return [(_truncate(meta + df.head(300).to_csv(index=False)), None)] | |
| except Exception as exc: | |
| return [(f"[CSV parser error: {exc}]", None)] | |
| def _read_xlsx(path: Path) -> List[Tuple[str, Optional[int]]]: | |
| try: | |
| import pandas as pd | |
| sheets = pd.read_excel(path, sheet_name=None) | |
| parts = [] | |
| for sheet, df in sheets.items(): | |
| parts.append(f"Sheet: {sheet}\nRows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n{df.head(200).to_csv(index=False)}") | |
| return [(_truncate("\n\n".join(parts)), None)] | |
| except Exception as exc: | |
| return [(f"[XLSX parser error: {exc}]", None)] | |
| def read_document(path_str: str) -> List[Tuple[str, Optional[int]]]: | |
| path = Path(path_str) | |
| suffix = path.suffix.lower() | |
| if suffix in {".txt", ".md", ".markdown", ".rst", ".log"}: | |
| return _read_txt(path) | |
| if suffix == ".pdf": | |
| return _read_pdf(path) | |
| if suffix == ".docx": | |
| return _read_docx(path) | |
| if suffix == ".csv": | |
| return _read_csv(path) | |
| if suffix in {".xlsx", ".xlsm"}: | |
| return _read_xlsx(path) | |
| return _read_txt(path) | |
| def chunk_text(text: str, source: str, page: Optional[int]) -> List[Chunk]: | |
| text = _normalize_text(text) | |
| if not text: | |
| return [] | |
| chunks, start, cid = [], 0, 1 | |
| max_chars = max(350, MAX_CHUNK_CHARS) | |
| overlap = min(max(0, CHUNK_OVERLAP), max_chars // 3) | |
| while start < len(text): | |
| end = min(start + max_chars, len(text)) | |
| if end < len(text): | |
| window = text[start:end] | |
| cut = max(window.rfind("\n\n"), window.rfind(". "), window.rfind("; "), window.rfind(", ")) | |
| if cut > max_chars * 0.55: | |
| end = start + cut + 1 | |
| piece = text[start:end].strip() | |
| if piece: | |
| chunks.append(Chunk(_hash_text(f"{source}:{page}:{cid}:{piece}"), source, page, cid, piece, len(piece))) | |
| cid += 1 | |
| if end >= len(text): | |
| break | |
| start = max(end - overlap, start + 1) | |
| return chunks | |
| def _tokenize_for_hash(text: str) -> List[str]: | |
| return re.findall(r"[\wçğıöşüÇĞİÖŞÜ]+", text.lower(), flags=re.UNICODE) | |
| def _lexical_overlap_score(query: str, text: str) -> float: | |
| q_tokens = set(_tokenize_for_hash(query)) | |
| if not q_tokens: | |
| return 0.0 | |
| t_counts = Counter(_tokenize_for_hash(text)) | |
| hit = sum(1 for t in q_tokens if t in t_counts) | |
| density = sum(min(t_counts.get(t, 0), 3) for t in q_tokens) / max(1, len(q_tokens) * 3) | |
| return float((hit / max(1, len(q_tokens))) * 0.7 + density * 0.3) | |
| def _hash_vector(text: str, dim: int = FEATURE_HASH_DIM) -> np.ndarray: | |
| vec = np.zeros(dim, dtype=np.float32) | |
| counts = Counter(_tokenize_for_hash(text)) | |
| for token, count in counts.items(): | |
| digest = hashlib.md5(token.encode("utf-8", errors="ignore")).hexdigest() | |
| idx = int(digest[:8], 16) % dim | |
| sign = 1.0 if int(digest[8:10], 16) % 2 == 0 else -1.0 | |
| vec[idx] += sign * (1.0 + np.log1p(count)) | |
| norm = float(np.linalg.norm(vec)) | |
| if norm > 0: | |
| vec /= norm | |
| return vec | |
| def _needs_e5_prefix(model_name: str) -> bool: | |
| return "e5" in model_name.lower() | |
| def _get_embedder(): | |
| global _EMBEDDER | |
| if _EMBEDDER is None: | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| except Exception as exc: | |
| raise RuntimeError("sentence-transformers is not installed. Use EMBEDDING_BACKEND=hash or requirements-full.txt.") from exc | |
| _EMBEDDER = SentenceTransformer(EMBEDDING_MODEL, device="cpu") | |
| return _EMBEDDER | |
| def _encode_passages(texts: List[str]) -> np.ndarray: | |
| if EMBEDDING_BACKEND == "hash": | |
| return np.vstack([_hash_vector(t) for t in texts]).astype(np.float32) | |
| embedder = _get_embedder() | |
| encoded = [f"passage: {t}" for t in texts] if _needs_e5_prefix(EMBEDDING_MODEL) else texts | |
| return np.asarray(embedder.encode(encoded, batch_size=16, normalize_embeddings=True, show_progress_bar=False), dtype=np.float32) | |
| def _encode_query(query: str) -> np.ndarray: | |
| if EMBEDDING_BACKEND == "hash": | |
| return _hash_vector(query) | |
| embedder = _get_embedder() | |
| encoded = f"query: {query}" if _needs_e5_prefix(EMBEDDING_MODEL) else query | |
| return np.asarray(embedder.encode([encoded], normalize_embeddings=True, show_progress_bar=False), dtype=np.float32)[0] | |
| def _append_chunks(new_chunks: List[Chunk]) -> None: | |
| global CHUNKS, EMBEDDINGS | |
| if not new_chunks: | |
| return | |
| if len(CHUNKS) + len(new_chunks) > MAX_TOTAL_CHUNKS: | |
| new_chunks = new_chunks[: max(0, MAX_TOTAL_CHUNKS - len(CHUNKS))] | |
| if not new_chunks: | |
| return | |
| vectors = _encode_passages([c.text for c in new_chunks]) | |
| CHUNKS.extend(new_chunks) | |
| EMBEDDINGS = vectors if EMBEDDINGS is None else np.vstack([EMBEDDINGS, vectors]) | |
| def _markdown_table(rows: List[List[str]], headers: List[str]) -> str: | |
| if not rows: | |
| return "_No records._" | |
| lines = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"] | |
| for row in rows: | |
| lines.append("| " + " | ".join(str(c).replace("|", "\\|").replace("\n", " ") for c in row) + " |") | |
| return "\n".join(lines) | |
| def ingest_files(files: Optional[List[str]], reset_first: bool = True) -> str: | |
| """Index uploaded files. Public Gradio API endpoint: ingest_files.""" | |
| global CHUNKS, EMBEDDINGS | |
| if reset_first: | |
| CHUNKS, EMBEDDINGS = [], None | |
| if not files: | |
| return "No file received. Upload PDF, DOCX, TXT, MD, CSV or XLSX files." | |
| if isinstance(files, (str, Path)): | |
| files = [str(files)] | |
| rows, errors, total_new = [], [], 0 | |
| for file_path in files: | |
| try: | |
| source = _safe_filename(str(file_path)) | |
| doc_chunks = [] | |
| segments = read_document(str(file_path)) | |
| for text, page in segments: | |
| doc_chunks.extend(chunk_text(text, source, page)) | |
| before = len(CHUNKS) | |
| _append_chunks(doc_chunks) | |
| added = len(CHUNKS) - before | |
| total_new += added | |
| rows.append([source, str(len(segments)), str(added)]) | |
| except Exception as exc: | |
| errors.append(f"{_safe_filename(str(file_path))}: {type(exc).__name__}: {exc}") | |
| msg = [ | |
| f"Indexed chunks: **{len(CHUNKS)}**", | |
| f"New chunks added: **{total_new}**", | |
| f"Embedding backend: `{EMBEDDING_BACKEND}`", | |
| f"Embedding model: `{EMBEDDING_MODEL if EMBEDDING_BACKEND != 'hash' else 'feature-hash'}`", | |
| "", | |
| _markdown_table(rows, ["File", "Segments", "Chunks added"]), | |
| ] | |
| if errors: | |
| msg += ["", "### Errors", "\n".join(f"- `{e}`" for e in errors)] | |
| return "\n".join(msg) | |
| def load_sample_documents() -> str: | |
| """Load sample documents shipped with the Space.""" | |
| files = [str(p) for p in sorted((Path(__file__).parent / "examples").glob("*")) if p.is_file()] | |
| return ingest_files(files, reset_first=True) | |
| def retrieve(query: str, top_k: int = 5) -> List[dict]: | |
| if EMBEDDINGS is None or not CHUNKS: | |
| return [] | |
| requested = max(1, min(int(top_k), len(CHUNKS))) | |
| q = _encode_query(query) | |
| vector_scores = EMBEDDINGS @ q | |
| candidate_n = min(len(CHUNKS), max(requested * 4, requested)) | |
| idxs = np.argsort(-vector_scores)[:candidate_n] | |
| ranked = [] | |
| for i in idxs: | |
| i = int(i) | |
| vector = float(vector_scores[i]) | |
| lexical = _lexical_overlap_score(query, CHUNKS[i].text) if RERANKER_MODE == "lexical" else 0.0 | |
| score = (HYBRID_ALPHA * vector + (1.0 - HYBRID_ALPHA) * lexical) if RERANKER_MODE == "lexical" else vector | |
| ranked.append({"score": float(score), "vector_score": vector, "lexical_score": float(lexical), "chunk": CHUNKS[i]}) | |
| ranked.sort(key=lambda x: x["score"], reverse=True) | |
| return [{"rank": r, **item} for r, item in enumerate(ranked[:requested], start=1)] | |
| def _build_context(results: List[dict], max_chars: int = 6000) -> str: | |
| blocks, used = [], 0 | |
| for item in results: | |
| c: Chunk = item["chunk"] | |
| page = f", page {c.page}" if c.page else "" | |
| block = f"[S{item['rank']}] Source: {c.source}{page}, chunk {c.chunk_id}\n{c.text}" | |
| if used + len(block) > max_chars: | |
| break | |
| blocks.append(block) | |
| used += len(block) | |
| return "\n\n".join(blocks) | |
| def _sources_markdown(results: List[dict]) -> str: | |
| rows = [] | |
| for item in results: | |
| c: Chunk = item["chunk"] | |
| rows.append([f"S{item['rank']}", f"{item['score']:.3f}", f"{item.get('vector_score', item['score']):.3f}", f"{item.get('lexical_score', 0.0):.3f}", c.source, str(c.page or ""), str(c.chunk_id), html.escape(c.text[:350].replace("\n", " "))]) | |
| return _markdown_table(rows, ["ID", "Score", "Vector", "Lexical", "Source", "Page", "Chunk", "Snippet"]) | |
| def _prompt(query: str, results: List[dict]) -> str: | |
| return f"""You are Private Knowledge AI. Answer only from the provided context. If evidence is insufficient, say so. Cite sources inline as [S1]. Give a direct answer, then evidence, then limitations.\n\nContext:\n{_build_context(results)}\n\nQuestion:\n{query}\n\nAnswer:""" | |
| def _answer_extractive(query: str, results: List[dict]) -> str: | |
| if not results: | |
| return "No indexed context found. Upload and index documents first." | |
| bullets = [] | |
| for item in results[:5]: | |
| c: Chunk = item["chunk"] | |
| sentences = re.split(r"(?<=[.!?])\s+", c.text.replace("\n", " ")) | |
| selected = (" ".join(sentences[:2]).strip() or c.text[:500])[:700] | |
| page = f", p. {c.page}" if c.page else "" | |
| bullets.append(f"- **[S{item['rank']}] {c.source}{page}:** {selected}") | |
| return "\n".join(["### Answer", "Extractive, source-grounded answer. Generative LLM is disabled.", "", *bullets, "", "### Confidence", "Medium when top scores are high and sources converge. Low when context is sparse."]) | |
| def _answer_hf_api(query: str, results: List[dict]) -> str: | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except Exception as exc: | |
| return f"HF API mode unavailable: `{exc}`. Switch to extractive mode." | |
| if not HF_TOKEN: | |
| return "HF API mode requires `HF_TOKEN` as a Space secret." | |
| try: | |
| client = InferenceClient(token=HF_TOKEN) | |
| completion = client.chat.completions.create( | |
| model=HF_LLM_MODEL, | |
| messages=[{"role": "system", "content": "Answer only from context. Cite [S1]. Be concise."}, {"role": "user", "content": _prompt(query, results)}], | |
| max_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")), | |
| temperature=float(os.getenv("TEMPERATURE", "0.2")), | |
| ) | |
| return completion.choices[0].message.content.strip() | |
| except Exception as chat_exc: | |
| try: | |
| client = InferenceClient(model=HF_LLM_MODEL, token=HF_TOKEN) | |
| return client.text_generation(_prompt(query, results), max_new_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")), temperature=float(os.getenv("TEMPERATURE", "0.2"))).strip() | |
| except Exception as text_exc: | |
| return f"HF API generation failed. Chat: `{chat_exc}`. Text: `{text_exc}`." | |
| def _get_local_llm(): | |
| global _LOCAL_LLM | |
| if _LOCAL_LLM is not None: | |
| return _LOCAL_LLM | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| tokenizer = AutoTokenizer.from_pretrained(HF_LLM_MODEL) | |
| model = AutoModelForCausalLM.from_pretrained(HF_LLM_MODEL, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device); model.eval() | |
| _LOCAL_LLM = (tokenizer, model, device) | |
| return _LOCAL_LLM | |
| def _answer_local_transformers(query: str, results: List[dict]) -> str: | |
| try: | |
| import torch | |
| tokenizer, model, device = _get_local_llm() | |
| text = tokenizer.apply_chat_template([{"role": "system", "content": "Answer only from context. Cite sources."}, {"role": "user", "content": _prompt(query, results)}], tokenize=False, add_generation_prompt=True) if hasattr(tokenizer, "apply_chat_template") else _prompt(query, results) | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=12000).to(device) | |
| with torch.no_grad(): | |
| generated = model.generate(**inputs, max_new_tokens=600, do_sample=False, pad_token_id=tokenizer.eos_token_id) | |
| return tokenizer.decode(generated[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True).strip() | |
| except Exception as exc: | |
| return f"Local Transformers generation failed: `{type(exc).__name__}: {exc}`. Use extractive or hf_api mode." | |
| def test_hf_token() -> str: | |
| """Check whether HF_TOKEN and the configured Qwen model are usable.""" | |
| if not HF_TOKEN: | |
| return "HF_TOKEN is missing. Add it under Space Settings -> Variables and secrets -> New secret." | |
| try: | |
| from huggingface_hub import InferenceClient | |
| client = InferenceClient(token=HF_TOKEN) | |
| completion = client.chat.completions.create( | |
| model=HF_LLM_MODEL, | |
| messages=[{"role": "user", "content": "Reply with exactly: HF_OK"}], | |
| max_tokens=8, | |
| temperature=0.0, | |
| ) | |
| txt = completion.choices[0].message.content.strip() | |
| return f"HF token active. Model: `{HF_LLM_MODEL}`. Test response: `{txt}`" | |
| except Exception as exc: | |
| return f"HF token/model test failed: `{type(exc).__name__}: {exc}`" | |
| def ask_question(query: str, top_k: int = 5, answer_mode: str = "auto") -> Tuple[str, str]: | |
| """Ask a question against indexed documents. Public Gradio API endpoint: ask_question.""" | |
| query = (query or "").strip() | |
| if not query: | |
| return "Enter a question.", "" | |
| if EMBEDDINGS is None or not CHUNKS: | |
| return "Upload and index documents first, or click 'Load sample documents'.", "" | |
| results = retrieve(query, int(top_k)) | |
| mode = (answer_mode or "auto").strip().lower() | |
| if mode == "auto": | |
| mode = LLM_MODE | |
| answer = _answer_hf_api(query, results) if mode == "hf_api" else _answer_local_transformers(query, results) if mode == "local_transformers" else _answer_extractive(query, results) | |
| return answer, _sources_markdown(results) | |
| def list_sources() -> str: | |
| """List indexed sources. Public Gradio API endpoint: list_sources.""" | |
| if not CHUNKS: | |
| return "No indexed sources." | |
| counts, pages = Counter(c.source for c in CHUNKS), defaultdict(set) | |
| for c in CHUNKS: | |
| if c.page: | |
| pages[c.source].add(c.page) | |
| rows = [[src, str(cnt), f"{min(pages[src])}-{max(pages[src])}" if pages[src] else ""] for src, cnt in sorted(counts.items())] | |
| return _markdown_table(rows, ["Source", "Chunks", "Pages"]) | |
| def reset_index() -> str: | |
| """Clear the in-memory document index. Public Gradio API endpoint: reset_index.""" | |
| global CHUNKS, EMBEDDINGS | |
| CHUNKS, EMBEDDINGS = [], None | |
| return "Index cleared." | |
| def runtime_status() -> str: | |
| """Return runtime configuration. Public Gradio API endpoint: runtime_status.""" | |
| payload = {"app_profile": APP_PROFILE, "embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "llm_mode": LLM_MODE, "hf_llm_model": HF_LLM_MODEL, "hf_token_present": bool(HF_TOKEN), "reranker_mode": RERANKER_MODE, "hybrid_alpha": HYBRID_ALPHA, "max_chunk_chars": MAX_CHUNK_CHARS, "max_total_chunks": MAX_TOTAL_CHUNKS, "chunk_count": len(CHUNKS)} | |
| return "```json\n" + json.dumps(payload, ensure_ascii=False, indent=2) + "\n```" | |
| def export_index_json() -> str: | |
| """Return index metadata as JSON text. Public Gradio API endpoint: export_index_json.""" | |
| payload = {"embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "chunks": [asdict(c) for c in CHUNKS], "chunk_count": len(CHUNKS)} | |
| return json.dumps(payload, ensure_ascii=False, indent=2) | |
| def build_app() -> gr.Blocks: | |
| description = f""" | |
| # {APP_TITLE} | |
| {APP_SUBTITLE} | |
| **v08 profile:** `{APP_PROFILE}` | |
| **Embedding backend:** `{EMBEDDING_BACKEND}` | |
| **Default LLM:** `{HF_LLM_MODEL}` through Hugging Face Inference API. | |
| **Token status:** `{bool(HF_TOKEN)}` | |
| **Fallback:** extractive RAG if token/API fails. | |
| """ | |
| with gr.Blocks(title=APP_TITLE) as demo: | |
| gr.Markdown(description) | |
| with gr.Tab("1. Upload / Index"): | |
| gr.Markdown("Upload files, then build a semantic RAG index. Supported: PDF, DOCX, TXT, MD, CSV, XLSX.") | |
| files = gr.File(label="Documents", file_count="multiple", type="filepath") | |
| reset_first = gr.Checkbox(label="Reset index before ingest", value=True) | |
| with gr.Row(): | |
| ingest_btn = gr.Button("Index uploaded documents", variant="primary") | |
| sample_btn = gr.Button("Load sample documents") | |
| ingest_status = gr.Markdown() | |
| ingest_btn.click(fn=ingest_files, inputs=[files, reset_first], outputs=ingest_status, api_name="ingest_files") | |
| sample_btn.click(fn=load_sample_documents, inputs=None, outputs=ingest_status, api_name="load_sample_documents") | |
| with gr.Tab("2. Ask"): | |
| query = gr.Textbox(label="Question", lines=3, placeholder="Ask a question about the indexed documents...") | |
| with gr.Row(): | |
| top_k = gr.Slider(label="Top-k chunks", minimum=1, maximum=10, step=1, value=5) | |
| mode = gr.Radio(label="Answer mode", choices=["auto", "hf_api", "extractive", "local_transformers"], value="auto") | |
| ask_btn = gr.Button("Ask", variant="primary") | |
| answer = gr.Markdown(label="Answer") | |
| sources = gr.Markdown(label="Retrieved sources") | |
| ask_btn.click(fn=ask_question, inputs=[query, top_k, mode], outputs=[answer, sources], api_name="ask_question") | |
| with gr.Tab("3. Admin / API"): | |
| gr.Markdown("Agent-callable endpoints: `ingest_files`, `ask_question`, `list_sources`, `reset_index`, `export_index_json`, `runtime_status`, `test_hf_token`.") | |
| gr.Markdown(runtime_status()) | |
| with gr.Row(): | |
| list_btn = gr.Button("List sources") | |
| reset_btn = gr.Button("Reset index") | |
| status_btn = gr.Button("Runtime status") | |
| token_btn = gr.Button("Test HF token / Qwen") | |
| export_btn = gr.Button("Export index metadata") | |
| admin_out = gr.Markdown() | |
| export_out = gr.Code(label="Index JSON", language="json") | |
| list_btn.click(fn=list_sources, inputs=None, outputs=admin_out, api_name="list_sources") | |
| reset_btn.click(fn=reset_index, inputs=None, outputs=admin_out, api_name="reset_index") | |
| status_btn.click(fn=runtime_status, inputs=None, outputs=admin_out, api_name="runtime_status") | |
| token_btn.click(fn=test_hf_token, inputs=None, outputs=admin_out, api_name="test_hf_token") | |
| export_btn.click(fn=export_index_json, inputs=None, outputs=export_out, api_name="export_index_json") | |
| gr.Markdown("**Privacy note:** HF API modunda soru ve retrieved context Hugging Face Inference API’ye gider. Public Space’e gizli belge yüklemeyin. Gerçek müşteri verisi için Private Space veya on-premise sürüm kullanın.") | |
| return demo | |
| demo = build_app() | |
| if __name__ == "__main__": | |
| demo.launch() | |