| import os |
| import asyncio |
| from dotenv import load_dotenv |
| import gradio as gr |
|
|
|
|
|
|
| |
| |
|
|
| |
| from openai import OpenAI |
| openai = OpenAI( |
| api_key=DEEPINFRA_API_KEY, |
| base_url="https://api.deepinfra.com/v1/openai", |
| ) |
|
|
| |
| import weaviate |
| from weaviate.classes.init import Auth |
| from contextlib import contextmanager |
|
|
| @contextmanager |
| def weaviate_client(): |
| client = weaviate.connect_to_weaviate_cloud( |
| cluster_url=WEAVIATE_URL, |
| auth_credentials=Auth.api_key(WEAVIATE_API_KEY), |
| ) |
| try: |
| yield client |
| finally: |
| client.close() |
|
|
| |
| last_uploaded_path = None |
|
|
| |
| def embed_texts(texts: list[str], batch_size: int = 50) -> list[list[float]]: |
| all_embeddings = [] |
| for i in range(0, len(texts), batch_size): |
| batch = texts[i : i + batch_size] |
| try: |
| resp = openai.embeddings.create( |
| model="Qwen/Qwen3-Embedding-8B", |
| input=batch, |
| encoding_format="float" |
| ) |
| batch_embs = [item.embedding for item in resp.data] |
| all_embeddings.extend(batch_embs) |
| except Exception as e: |
| print(f"Embedding error: {e}") |
| all_embeddings.extend([[] for _ in batch]) |
| return all_embeddings |
|
|
| def encode_query(query: str) -> list[float] | None: |
| embs = embed_texts([query], batch_size=1) |
| if embs and embs[0]: |
| return embs[0] |
| return None |
|
|
| async def old_Document(query: str, top_k: int = 1) -> dict: |
| qe = encode_query(query) |
| if not qe: |
| return {"answer": []} |
|
|
| try: |
| with weaviate_client() as client: |
| coll = client.collections.get("Old_Documents") |
| res = coll.query.near_vector( |
| near_vector=qe, |
| limit=top_k, |
| return_properties=["text"] |
| ) |
| if not getattr(res, "objects", None): |
| return {"answer": []} |
| return { |
| "answer": [obj.properties.get("text", "[No Text]") for obj in res.objects] |
| } |
| except Exception as e: |
| print("RAG Error:", e) |
| return {"answer": []} |
|
|
| |
| def ingest_file(path: str) -> str: |
| global last_uploaded_path |
| last_uploaded_path = path |
| return f"Old document ingested: {os.path.basename(path)}" |
|
|
| def answer_question(query: str) -> str: |
| try: |
| rag_resp = asyncio.run(old_Document(query)) |
| chunks = rag_resp.get("answer", []) |
| if not chunks: |
| return "Sorry, I couldn't find relevant content in the old document." |
|
|
| return "\n".join(f"- {c}" for c in chunks) |
| except Exception as e: |
| return f"Error processing your request: {e}" |
|
|
| |
| with gr.Blocks(title="Old Documents RAG") as demo: |
| gr.Markdown("## Old Documents RAG") |
| query = gr.Textbox(placeholder="Your question...", lines=2, label="Ask about Old Documents") |
| doc_file = gr.File(label="Upload Old Document (PDF, DOCX, TXT)") |
| btn = gr.Button("Submit") |
| out = gr.Textbox(label="Answer from Old Documents", lines=8, interactive=False) |
|
|
| def process_old_doc(query, doc_file): |
| if doc_file: |
| |
| upload_dir = os.path.join(os.path.dirname(__file__), "uploaded_docs") |
| os.makedirs(upload_dir, exist_ok=True) |
| safe_filename = os.path.basename(doc_file.name) |
| save_path = os.path.join(upload_dir, safe_filename) |
| with open(save_path, "wb") as f: |
| f.write(doc_file.read()) |
| status = ingest_file(save_path) |
| answer = answer_question(query) |
| return f"{status}\n\n{answer}" |
| else: |
| |
| if last_uploaded_path: |
| answer = answer_question(query) |
| return f"[Using previously uploaded document: {os.path.basename(last_uploaded_path)}]\n\n{answer}" |
| else: |
| return "No document uploaded. Please upload an old document to proceed." |
|
|
| btn.click(fn=process_old_doc, inputs=[query, doc_file], outputs=out) |
|
|
| if __name__ == "__main__": |
| demo.launch(debug=True) |