Spaces:
Sleeping
Sleeping
| """ | |
| Neosantara RAG Demo | |
| Upload PDF β Chat with AI β Export Code | |
| """ | |
| import os | |
| import uuid | |
| import gradio as gr | |
| from agno.agent import Agent | |
| from agno.knowledge.embedder.openai import OpenAIEmbedder | |
| from agno.knowledge.knowledge import Knowledge | |
| from agno.knowledge.reader.pdf_reader import PDFReader | |
| from agno.vectordb.lancedb import LanceDb, SearchType | |
| from agno.models.neosantara import Neosantara | |
| from agno.db.sqlite import SqliteDb | |
| NEOSANTARA_API_KEY = os.getenv("NEOSANTARAA_API_KEY", "") | |
| MODEL_ID = os.getenv("MODEL_ID", "claude-3-haiku") | |
| EMBEDDER_ID = os.getenv("EMBEDDER_ID", "nusa-embedding-0001") | |
| def get_embedder(): | |
| return OpenAIEmbedder( | |
| id=EMBEDDER_ID, | |
| api_key=NEOSANTARA_API_KEY, | |
| base_url="https://api.neosantara.xyz/v1", | |
| ) | |
| def process_pdf(file_obj, user_id, agent_state, file_path_state): | |
| if file_obj is None: | |
| return "Please upload a PDF file.", [], agent_state, file_path_state | |
| try: | |
| if hasattr(file_obj, "name"): | |
| file_path = file_obj.name | |
| else: | |
| file_path = str(file_obj) | |
| embedder = get_embedder() | |
| vector_db = LanceDb( | |
| table_name=f"rag_{user_id}", | |
| uri="/tmp/lancedb", | |
| search_type=SearchType.hybrid, | |
| embedder=embedder, | |
| ) | |
| knowledge = Knowledge(vector_db=vector_db) | |
| knowledge.insert( | |
| path=file_path, | |
| reader=PDFReader(chunk=True, chunk_size=3000, skip_if_exists=True), | |
| ) | |
| agent = Agent( | |
| model=Neosantara( | |
| id=MODEL_ID, | |
| api_key=NEOSANTARA_API_KEY, | |
| ), | |
| knowledge=knowledge, | |
| db=SqliteDb(db_file="/tmp/agno_sessions.db"), | |
| search_knowledge=True, | |
| markdown=True, | |
| ) | |
| return ( | |
| "β PDF loaded successfully! You can now chat with the document.", | |
| [], | |
| agent, | |
| file_path, | |
| ) | |
| except Exception as e: | |
| return f"β Error: {str(e)}", [], agent_state, file_path_state | |
| def chat_fn(message, history, user_id, agent): | |
| if history is None: | |
| history = [] | |
| history.append({"role": "user", "content": message}) | |
| if agent is None: | |
| history.append({"role": "assistant", "content": "Please upload a PDF first!"}) | |
| return history, agent, "" | |
| try: | |
| response = agent.run(message, user_id=user_id, session_id=user_id) | |
| history.append({"role": "assistant", "content": response.content}) | |
| return history, agent, "" | |
| except Exception as e: | |
| history.append({"role": "assistant", "content": f"Error: {str(e)}"}) | |
| return history, agent, "" | |
| def generate_code(file_path_state): | |
| if file_path_state is None: | |
| return "Please upload a PDF first!" | |
| return f'''# Neosantara RAG Agent | |
| # Generated from RAG Demo Tool | |
| from agno.agent import Agent | |
| from agno.knowledge.knowledge import Knowledge | |
| from agno.knowledge.reader.pdf_reader import PDFReader | |
| from agno.vectordb.lancedb import LanceDb, SearchType | |
| from agno.knowledge.embedder.openai import OpenAIEmbedder | |
| from agno.models.neosantara import Neosantara | |
| from agno.storage.sqlite import SqliteStorage | |
| NEOSANTARA_API_KEY = "your-api-key-here" | |
| MODEL_ID = "{MODEL_ID}" | |
| EMBEDDER_ID = "{EMBEDDER_ID}" | |
| embedder = OpenAIEmbedder( | |
| id=EMBEDDER_ID, | |
| api_key=NEOSANTARA_API_KEY, | |
| base_url="https://api.neosantara.xyz/v1", | |
| ) | |
| vector_db = LanceDb( | |
| table_name="my_doc", | |
| uri="/tmp/lancedb", | |
| search_type=SearchType.hybrid, | |
| embedder=embedder, | |
| ) | |
| knowledge = Knowledge(vector_db=vector_db) | |
| knowledge.insert( | |
| path="{file_path_state}", | |
| reader=PDFReader(chunk=True, chunk_size=3000, skip_if_exists=True), | |
| ) | |
| agent = Agent( | |
| model=Neosantara( | |
| id=MODEL_ID, | |
| api_key=NEOSANTARA_API_KEY, | |
| ), | |
| knowledge=knowledge, | |
| storage=SqliteStorage(db_file="tmp/agno_sessions.db"), | |
| search_knowledge=True, | |
| markdown=True, | |
| ) | |
| agent.print_response("Apa isi dokumen ini?") | |
| ''' | |
| def clear_fn(): | |
| return [], "Session cleared. Upload a new PDF.", None, None | |
| with gr.Blocks() as demo: | |
| # Per-user state (disimpan per tab browser, tanpa login) | |
| user_id_state = gr.State(value=lambda: str(uuid.uuid4())) | |
| agent_state = gr.State(value=None) | |
| file_path_state = gr.State(value=None) | |
| gr.Markdown("# π Neosantara RAG Demo") | |
| gr.Markdown("Upload a PDF, chat with AI, and export the code!") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="Upload PDF", file_types=[".pdf"]) | |
| process_btn = gr.Button("Process PDF", variant="primary") | |
| status = gr.Textbox(label="Status", interactive=False) | |
| clear_btn = gr.Button("Clear Session") | |
| with gr.Column(scale=2): | |
| chatbot = gr.Chatbot(label="Chat", height=400) | |
| msg = gr.Textbox(label="Message", placeholder="Ask about your document...") | |
| with gr.Row(): | |
| send_btn = gr.Button("Send", variant="primary") | |
| export_btn = gr.Button("π Export Code") | |
| code_output = gr.Code(label="Generated Code", language="python") | |
| process_btn.click( | |
| fn=process_pdf, | |
| inputs=[file_input, user_id_state, agent_state, file_path_state], | |
| outputs=[status, chatbot, agent_state, file_path_state], | |
| ) | |
| send_btn.click( | |
| fn=chat_fn, | |
| inputs=[msg, chatbot, user_id_state, agent_state], | |
| outputs=[chatbot, agent_state, msg], | |
| ) | |
| msg.submit( | |
| fn=chat_fn, | |
| inputs=[msg, chatbot, user_id_state, agent_state], | |
| outputs=[chatbot, agent_state, msg], | |
| ) | |
| export_btn.click( | |
| fn=generate_code, | |
| inputs=[file_path_state], | |
| outputs=[code_output], | |
| ) | |
| clear_btn.click( | |
| fn=clear_fn, | |
| inputs=[], | |
| outputs=[chatbot, status, agent_state, file_path_state], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| ) |