RAG_Machine_Hy3 / app.py
mfirat007's picture
Update app.py
61ee248 verified
"""Private Knowledge AI, Hugging Face Space demo v08.
Token-active Hugging Face demo: Qwen generation through HF Inference API, semantic embeddings, lexical reranking, and extractive fallback.
"""
from __future__ import annotations
import hashlib
import html
import json
import os
import re
import spaces
from collections import Counter, defaultdict
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any, List, Optional, Tuple
import gradio as gr
import numpy as np
os.environ["CUDA_VISIBLE_DEVICES"] = ""
os.environ["TOKENIZERS_PARALLELISM"] = "false"
APP_TITLE = os.getenv("APP_TITLE", "Private Knowledge AI")
APP_SUBTITLE = os.getenv("APP_SUBTITLE", "Upload documents, index them locally in the Space runtime, and ask source-grounded questions.")
APP_PROFILE = os.getenv("APP_PROFILE", "hf_token").strip().lower()
EMBEDDING_BACKEND = os.getenv("EMBEDDING_BACKEND", "sentence_transformers").strip().lower() # hash | sentence_transformers
EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "llamaindex/vdr-2b-multi-v1")
LLM_MODE = os.getenv("LLM_MODE", "hf_api").strip().lower() # extractive | hf_api | local_transformers
HF_LLM_MODEL = os.getenv("HF_LLM_MODEL", "llamaindex/vdr-2b-multi-v1")
MAX_CHUNK_CHARS = int(os.getenv("MAX_CHUNK_CHARS", "1100" if APP_PROFILE == "hf_token" else "900" if APP_PROFILE == "zero" else "1200"))
CHUNK_OVERLAP = int(os.getenv("CHUNK_OVERLAP", "160" if APP_PROFILE == "hf_token" else "120" if APP_PROFILE == "zero" else "180"))
MAX_FILE_CHARS = int(os.getenv("MAX_FILE_CHARS", "500000" if APP_PROFILE == "hf_token" else "300000" if APP_PROFILE == "zero" else "600000"))
MAX_TOTAL_CHUNKS = int(os.getenv("MAX_TOTAL_CHUNKS", "1800" if APP_PROFILE == "hf_token" else "1200" if APP_PROFILE == "zero" else "2500"))
FEATURE_HASH_DIM = int(os.getenv("FEATURE_HASH_DIM", "1024"))
HF_TOKEN = os.getenv("HF_TOKEN")
RERANKER_MODE = os.getenv("RERANKER_MODE", "lexical").strip().lower() # none | lexical
HYBRID_ALPHA = float(os.getenv("HYBRID_ALPHA", "0.82"))
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")
os.environ.setdefault("HF_HOME", os.getenv("HF_HOME", str(Path.home() / ".cache" / "huggingface")))
@dataclass
class Chunk:
id: str
source: str
page: Optional[int]
chunk_id: int
text: str
char_count: int
CHUNKS: List[Chunk] = []
EMBEDDINGS: Optional[np.ndarray] = None
_EMBEDDER = None
_LOCAL_LLM: Optional[Tuple[Any, Any, Any]] = None
def _safe_filename(path_or_name: str) -> str:
name = Path(str(path_or_name)).name
return re.sub(r"[^\w.()\- ]+", "_", name, flags=re.UNICODE)[:180] or "document"
def _hash_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8", errors="ignore")).hexdigest()[:16]
def _normalize_text(text: str) -> str:
text = text.replace("\x00", " ")
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{3,}", "\n\n", text)
return text.strip()
def _truncate(text: str, limit: int = MAX_FILE_CHARS) -> str:
return text if len(text) <= limit else text[:limit] + "\n\n[TRUNCATED: MAX_FILE_CHARS exceeded]"
def _read_txt(path: Path) -> List[Tuple[str, Optional[int]]]:
raw = path.read_bytes()
for enc in ("utf-8", "utf-8-sig", "cp1254", "latin-1"):
try:
return [(_truncate(raw.decode(enc)), None)]
except UnicodeDecodeError:
pass
return [(_truncate(raw.decode("utf-8", errors="replace")), None)]
def _read_pdf(path: Path) -> List[Tuple[str, Optional[int]]]:
try:
from pypdf import PdfReader
reader = PdfReader(str(path))
pages = []
for i, page in enumerate(reader.pages, start=1):
text = _normalize_text(page.extract_text() or "")
if text:
pages.append((_truncate(text), i))
return pages or [("[No extractable PDF text found. OCR is not enabled in zero profile.]", None)]
except Exception as exc:
return [(f"[PDF parser error: {exc}]", None)]
def _read_docx(path: Path) -> List[Tuple[str, Optional[int]]]:
try:
import docx
doc = docx.Document(str(path))
parts = [p.text.strip() for p in doc.paragraphs if p.text.strip()]
for table in doc.tables:
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
if any(cells):
parts.append(" | ".join(cells))
return [(_truncate("\n".join(parts)), None)]
except Exception as exc:
return [(f"[DOCX parser error: {exc}]", None)]
def _read_csv(path: Path) -> List[Tuple[str, Optional[int]]]:
try:
import pandas as pd
try:
df = pd.read_csv(path)
except UnicodeDecodeError:
df = pd.read_csv(path, encoding="latin-1")
meta = f"Rows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n\n"
return [(_truncate(meta + df.head(300).to_csv(index=False)), None)]
except Exception as exc:
return [(f"[CSV parser error: {exc}]", None)]
def _read_xlsx(path: Path) -> List[Tuple[str, Optional[int]]]:
try:
import pandas as pd
sheets = pd.read_excel(path, sheet_name=None)
parts = []
for sheet, df in sheets.items():
parts.append(f"Sheet: {sheet}\nRows: {len(df)}, Columns: {len(df.columns)}\nColumns: {', '.join(map(str, df.columns))}\n{df.head(200).to_csv(index=False)}")
return [(_truncate("\n\n".join(parts)), None)]
except Exception as exc:
return [(f"[XLSX parser error: {exc}]", None)]
def read_document(path_str: str) -> List[Tuple[str, Optional[int]]]:
path = Path(path_str)
suffix = path.suffix.lower()
if suffix in {".txt", ".md", ".markdown", ".rst", ".log"}:
return _read_txt(path)
if suffix == ".pdf":
return _read_pdf(path)
if suffix == ".docx":
return _read_docx(path)
if suffix == ".csv":
return _read_csv(path)
if suffix in {".xlsx", ".xlsm"}:
return _read_xlsx(path)
return _read_txt(path)
def chunk_text(text: str, source: str, page: Optional[int]) -> List[Chunk]:
text = _normalize_text(text)
if not text:
return []
chunks, start, cid = [], 0, 1
max_chars = max(350, MAX_CHUNK_CHARS)
overlap = min(max(0, CHUNK_OVERLAP), max_chars // 3)
while start < len(text):
end = min(start + max_chars, len(text))
if end < len(text):
window = text[start:end]
cut = max(window.rfind("\n\n"), window.rfind(". "), window.rfind("; "), window.rfind(", "))
if cut > max_chars * 0.55:
end = start + cut + 1
piece = text[start:end].strip()
if piece:
chunks.append(Chunk(_hash_text(f"{source}:{page}:{cid}:{piece}"), source, page, cid, piece, len(piece)))
cid += 1
if end >= len(text):
break
start = max(end - overlap, start + 1)
return chunks
def _tokenize_for_hash(text: str) -> List[str]:
return re.findall(r"[\wçğıöşüÇĞİÖŞÜ]+", text.lower(), flags=re.UNICODE)
def _lexical_overlap_score(query: str, text: str) -> float:
q_tokens = set(_tokenize_for_hash(query))
if not q_tokens:
return 0.0
t_counts = Counter(_tokenize_for_hash(text))
hit = sum(1 for t in q_tokens if t in t_counts)
density = sum(min(t_counts.get(t, 0), 3) for t in q_tokens) / max(1, len(q_tokens) * 3)
return float((hit / max(1, len(q_tokens))) * 0.7 + density * 0.3)
def _hash_vector(text: str, dim: int = FEATURE_HASH_DIM) -> np.ndarray:
vec = np.zeros(dim, dtype=np.float32)
counts = Counter(_tokenize_for_hash(text))
for token, count in counts.items():
digest = hashlib.md5(token.encode("utf-8", errors="ignore")).hexdigest()
idx = int(digest[:8], 16) % dim
sign = 1.0 if int(digest[8:10], 16) % 2 == 0 else -1.0
vec[idx] += sign * (1.0 + np.log1p(count))
norm = float(np.linalg.norm(vec))
if norm > 0:
vec /= norm
return vec
def _needs_e5_prefix(model_name: str) -> bool:
return "e5" in model_name.lower()
def _get_embedder():
global _EMBEDDER
if _EMBEDDER is None:
try:
from sentence_transformers import SentenceTransformer
except Exception as exc:
raise RuntimeError("sentence-transformers is not installed. Use EMBEDDING_BACKEND=hash or requirements-full.txt.") from exc
_EMBEDDER = SentenceTransformer(EMBEDDING_MODEL, device="cpu")
return _EMBEDDER
def _encode_passages(texts: List[str]) -> np.ndarray:
if EMBEDDING_BACKEND == "hash":
return np.vstack([_hash_vector(t) for t in texts]).astype(np.float32)
embedder = _get_embedder()
encoded = [f"passage: {t}" for t in texts] if _needs_e5_prefix(EMBEDDING_MODEL) else texts
return np.asarray(embedder.encode(encoded, batch_size=16, normalize_embeddings=True, show_progress_bar=False), dtype=np.float32)
def _encode_query(query: str) -> np.ndarray:
if EMBEDDING_BACKEND == "hash":
return _hash_vector(query)
embedder = _get_embedder()
encoded = f"query: {query}" if _needs_e5_prefix(EMBEDDING_MODEL) else query
return np.asarray(embedder.encode([encoded], normalize_embeddings=True, show_progress_bar=False), dtype=np.float32)[0]
def _append_chunks(new_chunks: List[Chunk]) -> None:
global CHUNKS, EMBEDDINGS
if not new_chunks:
return
if len(CHUNKS) + len(new_chunks) > MAX_TOTAL_CHUNKS:
new_chunks = new_chunks[: max(0, MAX_TOTAL_CHUNKS - len(CHUNKS))]
if not new_chunks:
return
vectors = _encode_passages([c.text for c in new_chunks])
CHUNKS.extend(new_chunks)
EMBEDDINGS = vectors if EMBEDDINGS is None else np.vstack([EMBEDDINGS, vectors])
def _markdown_table(rows: List[List[str]], headers: List[str]) -> str:
if not rows:
return "_No records._"
lines = ["| " + " | ".join(headers) + " |", "| " + " | ".join(["---"] * len(headers)) + " |"]
for row in rows:
lines.append("| " + " | ".join(str(c).replace("|", "\\|").replace("\n", " ") for c in row) + " |")
return "\n".join(lines)
def ingest_files(files: Optional[List[str]], reset_first: bool = True) -> str:
"""Index uploaded files. Public Gradio API endpoint: ingest_files."""
global CHUNKS, EMBEDDINGS
if reset_first:
CHUNKS, EMBEDDINGS = [], None
if not files:
return "No file received. Upload PDF, DOCX, TXT, MD, CSV or XLSX files."
if isinstance(files, (str, Path)):
files = [str(files)]
rows, errors, total_new = [], [], 0
for file_path in files:
try:
source = _safe_filename(str(file_path))
doc_chunks = []
segments = read_document(str(file_path))
for text, page in segments:
doc_chunks.extend(chunk_text(text, source, page))
before = len(CHUNKS)
_append_chunks(doc_chunks)
added = len(CHUNKS) - before
total_new += added
rows.append([source, str(len(segments)), str(added)])
except Exception as exc:
errors.append(f"{_safe_filename(str(file_path))}: {type(exc).__name__}: {exc}")
msg = [
f"Indexed chunks: **{len(CHUNKS)}**",
f"New chunks added: **{total_new}**",
f"Embedding backend: `{EMBEDDING_BACKEND}`",
f"Embedding model: `{EMBEDDING_MODEL if EMBEDDING_BACKEND != 'hash' else 'feature-hash'}`",
"",
_markdown_table(rows, ["File", "Segments", "Chunks added"]),
]
if errors:
msg += ["", "### Errors", "\n".join(f"- `{e}`" for e in errors)]
return "\n".join(msg)
def load_sample_documents() -> str:
"""Load sample documents shipped with the Space."""
files = [str(p) for p in sorted((Path(__file__).parent / "examples").glob("*")) if p.is_file()]
return ingest_files(files, reset_first=True)
def retrieve(query: str, top_k: int = 5) -> List[dict]:
if EMBEDDINGS is None or not CHUNKS:
return []
requested = max(1, min(int(top_k), len(CHUNKS)))
q = _encode_query(query)
vector_scores = EMBEDDINGS @ q
candidate_n = min(len(CHUNKS), max(requested * 4, requested))
idxs = np.argsort(-vector_scores)[:candidate_n]
ranked = []
for i in idxs:
i = int(i)
vector = float(vector_scores[i])
lexical = _lexical_overlap_score(query, CHUNKS[i].text) if RERANKER_MODE == "lexical" else 0.0
score = (HYBRID_ALPHA * vector + (1.0 - HYBRID_ALPHA) * lexical) if RERANKER_MODE == "lexical" else vector
ranked.append({"score": float(score), "vector_score": vector, "lexical_score": float(lexical), "chunk": CHUNKS[i]})
ranked.sort(key=lambda x: x["score"], reverse=True)
return [{"rank": r, **item} for r, item in enumerate(ranked[:requested], start=1)]
def _build_context(results: List[dict], max_chars: int = 6000) -> str:
blocks, used = [], 0
for item in results:
c: Chunk = item["chunk"]
page = f", page {c.page}" if c.page else ""
block = f"[S{item['rank']}] Source: {c.source}{page}, chunk {c.chunk_id}\n{c.text}"
if used + len(block) > max_chars:
break
blocks.append(block)
used += len(block)
return "\n\n".join(blocks)
def _sources_markdown(results: List[dict]) -> str:
rows = []
for item in results:
c: Chunk = item["chunk"]
rows.append([f"S{item['rank']}", f"{item['score']:.3f}", f"{item.get('vector_score', item['score']):.3f}", f"{item.get('lexical_score', 0.0):.3f}", c.source, str(c.page or ""), str(c.chunk_id), html.escape(c.text[:350].replace("\n", " "))])
return _markdown_table(rows, ["ID", "Score", "Vector", "Lexical", "Source", "Page", "Chunk", "Snippet"])
def _prompt(query: str, results: List[dict]) -> str:
return f"""You are Private Knowledge AI. Answer only from the provided context. If evidence is insufficient, say so. Cite sources inline as [S1]. Give a direct answer, then evidence, then limitations.\n\nContext:\n{_build_context(results)}\n\nQuestion:\n{query}\n\nAnswer:"""
def _answer_extractive(query: str, results: List[dict]) -> str:
if not results:
return "No indexed context found. Upload and index documents first."
bullets = []
for item in results[:5]:
c: Chunk = item["chunk"]
sentences = re.split(r"(?<=[.!?])\s+", c.text.replace("\n", " "))
selected = (" ".join(sentences[:2]).strip() or c.text[:500])[:700]
page = f", p. {c.page}" if c.page else ""
bullets.append(f"- **[S{item['rank']}] {c.source}{page}:** {selected}")
return "\n".join(["### Answer", "Extractive, source-grounded answer. Generative LLM is disabled.", "", *bullets, "", "### Confidence", "Medium when top scores are high and sources converge. Low when context is sparse."])
def _answer_hf_api(query: str, results: List[dict]) -> str:
try:
from huggingface_hub import InferenceClient
except Exception as exc:
return f"HF API mode unavailable: `{exc}`. Switch to extractive mode."
if not HF_TOKEN:
return "HF API mode requires `HF_TOKEN` as a Space secret."
try:
client = InferenceClient(token=HF_TOKEN)
completion = client.chat.completions.create(
model=HF_LLM_MODEL,
messages=[{"role": "system", "content": "Answer only from context. Cite [S1]. Be concise."}, {"role": "user", "content": _prompt(query, results)}],
max_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")),
temperature=float(os.getenv("TEMPERATURE", "0.2")),
)
return completion.choices[0].message.content.strip()
except Exception as chat_exc:
try:
client = InferenceClient(model=HF_LLM_MODEL, token=HF_TOKEN)
return client.text_generation(_prompt(query, results), max_new_tokens=int(os.getenv("MAX_NEW_TOKENS", "700")), temperature=float(os.getenv("TEMPERATURE", "0.2"))).strip()
except Exception as text_exc:
return f"HF API generation failed. Chat: `{chat_exc}`. Text: `{text_exc}`."
def _get_local_llm():
global _LOCAL_LLM
if _LOCAL_LLM is not None:
return _LOCAL_LLM
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(HF_LLM_MODEL)
model = AutoModelForCausalLM.from_pretrained(HF_LLM_MODEL, torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, low_cpu_mem_usage=True)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device); model.eval()
_LOCAL_LLM = (tokenizer, model, device)
return _LOCAL_LLM
def _answer_local_transformers(query: str, results: List[dict]) -> str:
try:
import torch
tokenizer, model, device = _get_local_llm()
text = tokenizer.apply_chat_template([{"role": "system", "content": "Answer only from context. Cite sources."}, {"role": "user", "content": _prompt(query, results)}], tokenize=False, add_generation_prompt=True) if hasattr(tokenizer, "apply_chat_template") else _prompt(query, results)
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=12000).to(device)
with torch.no_grad():
generated = model.generate(**inputs, max_new_tokens=600, do_sample=False, pad_token_id=tokenizer.eos_token_id)
return tokenizer.decode(generated[0][inputs["input_ids"].shape[1]:], skip_special_tokens=True).strip()
except Exception as exc:
return f"Local Transformers generation failed: `{type(exc).__name__}: {exc}`. Use extractive or hf_api mode."
def test_hf_token() -> str:
"""Check whether HF_TOKEN and the configured Qwen model are usable."""
if not HF_TOKEN:
return "HF_TOKEN is missing. Add it under Space Settings -> Variables and secrets -> New secret."
try:
from huggingface_hub import InferenceClient
client = InferenceClient(token=HF_TOKEN)
completion = client.chat.completions.create(
model=HF_LLM_MODEL,
messages=[{"role": "user", "content": "Reply with exactly: HF_OK"}],
max_tokens=8,
temperature=0.0,
)
txt = completion.choices[0].message.content.strip()
return f"HF token active. Model: `{HF_LLM_MODEL}`. Test response: `{txt}`"
except Exception as exc:
return f"HF token/model test failed: `{type(exc).__name__}: {exc}`"
@spaces.GPU
def ask_question(query: str, top_k: int = 5, answer_mode: str = "auto") -> Tuple[str, str]:
"""Ask a question against indexed documents. Public Gradio API endpoint: ask_question."""
query = (query or "").strip()
if not query:
return "Enter a question.", ""
if EMBEDDINGS is None or not CHUNKS:
return "Upload and index documents first, or click 'Load sample documents'.", ""
results = retrieve(query, int(top_k))
mode = (answer_mode or "auto").strip().lower()
if mode == "auto":
mode = LLM_MODE
answer = _answer_hf_api(query, results) if mode == "hf_api" else _answer_local_transformers(query, results) if mode == "local_transformers" else _answer_extractive(query, results)
return answer, _sources_markdown(results)
def list_sources() -> str:
"""List indexed sources. Public Gradio API endpoint: list_sources."""
if not CHUNKS:
return "No indexed sources."
counts, pages = Counter(c.source for c in CHUNKS), defaultdict(set)
for c in CHUNKS:
if c.page:
pages[c.source].add(c.page)
rows = [[src, str(cnt), f"{min(pages[src])}-{max(pages[src])}" if pages[src] else ""] for src, cnt in sorted(counts.items())]
return _markdown_table(rows, ["Source", "Chunks", "Pages"])
def reset_index() -> str:
"""Clear the in-memory document index. Public Gradio API endpoint: reset_index."""
global CHUNKS, EMBEDDINGS
CHUNKS, EMBEDDINGS = [], None
return "Index cleared."
def runtime_status() -> str:
"""Return runtime configuration. Public Gradio API endpoint: runtime_status."""
payload = {"app_profile": APP_PROFILE, "embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "llm_mode": LLM_MODE, "hf_llm_model": HF_LLM_MODEL, "hf_token_present": bool(HF_TOKEN), "reranker_mode": RERANKER_MODE, "hybrid_alpha": HYBRID_ALPHA, "max_chunk_chars": MAX_CHUNK_CHARS, "max_total_chunks": MAX_TOTAL_CHUNKS, "chunk_count": len(CHUNKS)}
return "```json\n" + json.dumps(payload, ensure_ascii=False, indent=2) + "\n```"
def export_index_json() -> str:
"""Return index metadata as JSON text. Public Gradio API endpoint: export_index_json."""
payload = {"embedding_backend": EMBEDDING_BACKEND, "embedding_model": EMBEDDING_MODEL if EMBEDDING_BACKEND != "hash" else "feature-hash", "chunks": [asdict(c) for c in CHUNKS], "chunk_count": len(CHUNKS)}
return json.dumps(payload, ensure_ascii=False, indent=2)
def build_app() -> gr.Blocks:
description = f"""
# {APP_TITLE}
{APP_SUBTITLE}
**v08 profile:** `{APP_PROFILE}`
**Embedding backend:** `{EMBEDDING_BACKEND}`
**Default LLM:** `{HF_LLM_MODEL}` through Hugging Face Inference API.
**Token status:** `{bool(HF_TOKEN)}`
**Fallback:** extractive RAG if token/API fails.
"""
with gr.Blocks(title=APP_TITLE) as demo:
gr.Markdown(description)
with gr.Tab("1. Upload / Index"):
gr.Markdown("Upload files, then build a semantic RAG index. Supported: PDF, DOCX, TXT, MD, CSV, XLSX.")
files = gr.File(label="Documents", file_count="multiple", type="filepath")
reset_first = gr.Checkbox(label="Reset index before ingest", value=True)
with gr.Row():
ingest_btn = gr.Button("Index uploaded documents", variant="primary")
sample_btn = gr.Button("Load sample documents")
ingest_status = gr.Markdown()
ingest_btn.click(fn=ingest_files, inputs=[files, reset_first], outputs=ingest_status, api_name="ingest_files")
sample_btn.click(fn=load_sample_documents, inputs=None, outputs=ingest_status, api_name="load_sample_documents")
with gr.Tab("2. Ask"):
query = gr.Textbox(label="Question", lines=3, placeholder="Ask a question about the indexed documents...")
with gr.Row():
top_k = gr.Slider(label="Top-k chunks", minimum=1, maximum=10, step=1, value=5)
mode = gr.Radio(label="Answer mode", choices=["auto", "hf_api", "extractive", "local_transformers"], value="auto")
ask_btn = gr.Button("Ask", variant="primary")
answer = gr.Markdown(label="Answer")
sources = gr.Markdown(label="Retrieved sources")
ask_btn.click(fn=ask_question, inputs=[query, top_k, mode], outputs=[answer, sources], api_name="ask_question")
with gr.Tab("3. Admin / API"):
gr.Markdown("Agent-callable endpoints: `ingest_files`, `ask_question`, `list_sources`, `reset_index`, `export_index_json`, `runtime_status`, `test_hf_token`.")
gr.Markdown(runtime_status())
with gr.Row():
list_btn = gr.Button("List sources")
reset_btn = gr.Button("Reset index")
status_btn = gr.Button("Runtime status")
token_btn = gr.Button("Test HF token / Qwen")
export_btn = gr.Button("Export index metadata")
admin_out = gr.Markdown()
export_out = gr.Code(label="Index JSON", language="json")
list_btn.click(fn=list_sources, inputs=None, outputs=admin_out, api_name="list_sources")
reset_btn.click(fn=reset_index, inputs=None, outputs=admin_out, api_name="reset_index")
status_btn.click(fn=runtime_status, inputs=None, outputs=admin_out, api_name="runtime_status")
token_btn.click(fn=test_hf_token, inputs=None, outputs=admin_out, api_name="test_hf_token")
export_btn.click(fn=export_index_json, inputs=None, outputs=export_out, api_name="export_index_json")
gr.Markdown("**Privacy note:** HF API modunda soru ve retrieved context Hugging Face Inference API’ye gider. Public Space’e gizli belge yüklemeyin. Gerçek müşteri verisi için Private Space veya on-premise sürüm kullanın.")
return demo
demo = build_app()
if __name__ == "__main__":
demo.launch()