Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import random | |
| from dotenv import load_dotenv | |
| from PyPDF2 import PdfReader | |
| from langchain.chat_models import init_chat_model # new universal model initializer | |
| from langchain.agents import create_agent # replaces AgentExecutor | |
| from langchain.tools import tool # tool decorator | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_core.messages import HumanMessage, AIMessage, ToolMessage | |
| load_dotenv() | |
| # Constant values | |
| CHUNK_SIZE = 2500 | |
| CHUNK_OVERLAP = 300 | |
| EMBED_DELAY_SECONDS = 1.5 | |
| TPM_SAFE_THRESHOLD = 27000 | |
| MAX_RETRIES = 3 | |
| # Extract Text from the PDF | |
| def extract_text_from_pdf(pdf_file): | |
| """ | |
| Reads each page of the PDF and extracts raw text | |
| Some page smay return None so we use ''or empty string' as safety | |
| """ | |
| pdf_reader = PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() or "" | |
| return text | |
| # Split the text into Chunks | |
| def split_text_into_chunks(raw_text): | |
| """ | |
| split the text into chunks of 1000 characters to avoid hiting the token limits | |
| chunk_overlap = 200 | |
| """ | |
| text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size = CHUNK_SIZE, | |
| chunk_overlap = CHUNK_OVERLAP, | |
| length_function = len | |
| ) | |
| chunks = text_splitter.split_text(raw_text) | |
| return chunks | |
| # Create Vector Store | |
| def create_vector_store(text_chunks): | |
| """ | |
| Embed chunks with Gemini and store in FAISS. | |
| Respects all three free tier limits: | |
| RPM = 100 β actual ~40 RPM (1.5s delay) | |
| TPM = 30K β pauses at 27K tokens/min threshold | |
| RPD = 1K β chunk_size=2500 minimises daily calls | |
| On 429 : reads Google's retry delay from error, waits + jitter | |
| On RPD exhausted : raises clear message immediately | |
| """ | |
| embeddings = GoogleGenerativeAIEmbeddings( | |
| model = "models/gemini-embedding-001" | |
| ) | |
| total = len(text_chunks) | |
| est_min = (total * EMBED_DELAY_SECONDS) / 60 | |
| print(f"\nEmbedding {total} chunks β est. {est_min:.1f} min") | |
| print(f"RPD usage: {total}/1,000 daily quota\n") | |
| vector_store = None | |
| tokens_this_minute = 0 | |
| minute_start = time.time() | |
| for idx, chunk in enumerate(text_chunks): | |
| chunk_tokens = max(1, len(chunk) // 4) # 1 token β 4 chars | |
| # --- TPM Guard: pause if approaching 30K tokens/min --- | |
| elapsed = time.time() - minute_start | |
| if elapsed < 60 and (tokens_this_minute + chunk_tokens) > TPM_SAFE_THRESHOLD: | |
| wait = 60 - elapsed + 2 | |
| print(f" TPM guard β {tokens_this_minute:,} tokens sent. Waiting {wait:.0f}s...") | |
| time.sleep(wait) | |
| tokens_this_minute = 0 | |
| minute_start = time.time() | |
| if time.time() - minute_start >= 60: | |
| tokens_this_minute = 0 | |
| minute_start = time.time() | |
| # --- Embed with retry --- | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| if vector_store is None: | |
| vector_store = FAISS.from_texts(texts=[chunk], embedding=embeddings) | |
| else: | |
| vector_store.add_texts(texts=[chunk]) | |
| tokens_this_minute += chunk_tokens | |
| if (idx + 1) % 10 == 0 or (idx + 1) == total: | |
| print(f" β {idx+1}/{total} chunks | ~{tokens_this_minute:,} tokens this min") | |
| break | |
| except Exception as e: | |
| err = str(e) | |
| if "429" in err or "RESOURCE_EXHAUSTED" in err: | |
| match = re.search(r"retry[^\d]*(\d+\.?\d*)\s*s", err, re.IGNORECASE) | |
| g_wait = float(match.group(1)) if match else 30 | |
| wait = g_wait + random.uniform(1, 3) | |
| print(f" 429 on chunk {idx+1} (attempt {attempt}/{MAX_RETRIES}). Waiting {wait:.0f}s...") | |
| time.sleep(wait) | |
| tokens_this_minute = 0 | |
| minute_start = time.time() | |
| if attempt == MAX_RETRIES: | |
| raise Exception( | |
| f"Chunk {idx+1} failed after {MAX_RETRIES} retries. " | |
| f"Daily quota may be exhausted β try again tomorrow." | |
| ) from e | |
| elif "per day" in err.lower(): | |
| raise Exception( | |
| "Daily RPD quota (1,000) exhausted. " | |
| "Resets at midnight Pacific Time. Try again tomorrow." | |
| ) from e | |
| else: | |
| raise | |
| time.sleep(EMBED_DELAY_SECONDS) | |
| print(f"\nβ Done β {total} chunks stored in FAISS.\n") | |
| return vector_store | |
| # Build RAG agent | |
| def create_rag_agent(vector_store): | |
| model = init_chat_model( | |
| "google_genai:gemini-2.5-flash", | |
| temperature = 0 | |
| ) | |
| def retrieve_context(query:str): | |
| """ | |
| searches the PDF for context relevant to query and return relevant chunks from the PDF | |
| """ | |
| retrieved_docs = vector_store.similarity_search(query, k = 3) | |
| # Format docs as readable text for the LLM | |
| serialized = "\n\n".join( | |
| f"[Chunk {i+1}] :\n{doc.page_content}" | |
| for i, doc in enumerate(retrieved_docs) | |
| ) | |
| return serialized, retrieved_docs # content for LLM, raw docs for UI | |
| tools = [retrieve_context] | |
| system_prompt = ( | |
| "You are a helpful assistant that answers questions about an uploaded PDF document " | |
| "You have access to a retrieval tool that searches the PDF content for context relevant to the question" | |
| "Always use the retrieval tool to find relevant information before answering the question" | |
| "If the document does not contain the answer, say so clearly" | |
| "Keep your answers concise, accurate and grounded in the document content" | |
| ) | |
| agent = create_agent(model, tools, system_prompt = system_prompt) | |
| return agent | |
| # Get the answer with conversation history | |
| def get_answer(agent, user_question, chat_history): | |
| # convert history dicts to langchain message objects | |
| messages = [] | |
| for msg in chat_history: | |
| if msg['role'] == "user": | |
| messages.append(HumanMessage(content = msg["content"])) | |
| elif msg['role'] =="assistant": | |
| messages.append(AIMessage(content = msg["content"])) | |
| # Append the current question | |
| messages.append(HumanMessage(content = user_question)) | |
| source_docs = [] | |
| final_answer = "" | |
| # Stream through all agent steps | |
| for step in agent.stream({"messages" : messages}, stream_mode = "values"): | |
| last_message = step["messages"][-1] | |
| #Collect source docs from tool message | |
| if isinstance(last_message, ToolMessage): | |
| if hasattr(last_message, "artifact") and isinstance(last_message.artifact, list): | |
| source_docs = last_message.artifact | |
| # Extract final answer from the AIMessage only | |
| if isinstance(last_message, AIMessage): | |
| content = last_message.content | |
| # Handle string content (common case) | |
| if isinstance(content, str) and content.strip(): | |
| final_answer = content | |
| # Handle list of content blocks | |
| elif isinstance(content, list): | |
| text_parts = [ | |
| block.get("text", "") if isinstance(block, dict) else str(block) | |
| for block in content | |
| ] | |
| assembled = " ".join(part for part in text_parts if part.strip()) | |
| if assembled: | |
| final_answer = assembled | |
| # Fallback if no answer was captured | |
| if not final_answer: | |
| final_answer = "I was unable to generate response. Please try rephrasing your question" | |
| return final_answer, source_docs |