rag-agent2.0 / llm.py
TharanJ's picture
Level-4 Upgrade3.0
a3811b3
import google.generativeai as genai
import os
import json
import re
import time
import logging
from typing import List, Tuple
import requests
from requests.adapters import HTTPAdapter, Retry
# HTML parsing
from bs4 import BeautifulSoup
from dotenv import load_dotenv
load_dotenv()
# Support multiple Gemini keys (comma-separated or single key)
api_keys = os.getenv("GOOGLE_API_KEYS") or os.getenv("GOOGLE_API_KEY")
if not api_keys:
raise ValueError("No Gemini API keys found in GOOGLE_API_KEYS or GOOGLE_API_KEY environment variable.")
api_keys = [k.strip() for k in api_keys.split(",") if k.strip()]
print(f"Loaded {len(api_keys)} Gemini API key(s)")
# PDF parsing
try:
from pdfminer.high_level import extract_text as pdf_extract_text
except Exception:
pdf_extract_text = None
# User may need: pip install pdfminer.six
# --- CONFIG ---
FETCH_LINKS = True # toggle to enable/disable link fetching
MAX_FETCH_PER_CONTEXT = 5 # cap number of links to fetch overall
PER_URL_CHAR_LIMIT = 20000 # truncate per-URL extracted text
TOTAL_ENRICH_CHAR_LIMIT = 100000 # total chars added from fetched URLs across all contexts
REQUEST_TIMEOUT = 20 # seconds
MAX_RETRIES_PER_URL = 2
# --- HELPERS ---
def extract_urls_from_text(text: str) -> List[str]:
# Basic URL regex; covers http/https and common URL chars
url_pattern = re.compile(r'(https?://[^\s)>\]}\'"]+)')
urls = url_pattern.findall(text or "")
# De-duplication while preserving order
seen = set()
out = []
for u in urls:
if u not in seen:
seen.add(u)
out.append(u)
return out
def make_http_session() -> requests.Session:
session = requests.Session()
retries = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "HEAD"]
)
adapter = HTTPAdapter(max_retries=retries)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; ContextFetcher/1.0; +https://example.local)"
})
return session
def is_pdf_response(resp: requests.Response) -> bool:
ctype = resp.headers.get("Content-Type", "").lower()
return "application/pdf" in ctype or (resp.url.lower().endswith(".pdf"))
def html_to_text(html: str) -> str:
soup = BeautifulSoup(html, "lxml")
# Remove script/style/nav/footer elements
for tag in soup(["script", "style", "noscript"]):
tag.decompose()
text = soup.get_text(separator="\n")
# Normalize whitespace lines
lines = [ln.strip() for ln in text.splitlines()]
lines = [ln for ln in lines if ln]
return "\n".join(lines)
def fetch_url_text(url: str, session: requests.Session) -> Tuple[str, str]:
"""
Returns (kind, text)
kind ∈ {"html", "pdf", "unknown", "error"}
text = extracted text or error message
"""
try:
resp = session.get(url, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()
if is_pdf_response(resp):
if pdf_extract_text is None:
return ("error", f"Cannot extract PDF: pdfminer.six not installed for {url}")
# pdfminer requires bytes; save to temp memory/file alternative
# Simpler: re-download bytes and pass via a temp file-like
content = resp.content
# Write to a temporary file-like object
import io
with io.BytesIO(content) as bio:
# pdfminer needs a file path or file-like? high_level.extract_text accepts file-like
text = pdf_extract_text(bio)
return ("pdf", text or "")
else:
# Treat as HTML or text
ctype = resp.headers.get("Content-Type", "").lower()
body = resp.text
if "html" in ctype or "<html" in body.lower():
return ("html", html_to_text(body))
else:
# Plain text or other
return ("unknown", body)
except Exception as e:
return ("error", f"Fetch failed for {url}: {e}")
def trim_text(s: str, max_chars: int) -> str:
if not s:
return s
if len(s) <= max_chars:
return s
# Try to cut cleanly on a paragraph boundary
cut = s[:max_chars]
last_nl = cut.rfind("\n")
if last_nl > max_chars * 0.7:
cut = cut[:last_nl]
return cut + "\n...[truncated]"
def prepare_contexts_with_links(contexts: List[str],
max_fetch_per_context: int = MAX_FETCH_PER_CONTEXT,
per_url_char_limit: int = PER_URL_CHAR_LIMIT,
total_enrich_char_limit: int = TOTAL_ENRICH_CHAR_LIMIT) -> List[str]:
"""
Scans contexts for URLs, fetches their content, and appends a normalized
"Fetched Content" section to the end of each context, respecting limits.
"""
if not FETCH_LINKS:
return contexts
session = make_http_session()
enriched_contexts = []
total_added = 0
for ctx in contexts:
urls = extract_urls_from_text(ctx)
# Limit fetch count per context
urls = urls[:max_fetch_per_context]
fetched_blocks = []
for url in urls:
if total_added >= total_enrich_char_limit:
break
kind, text = fetch_url_text(url, session)
if kind == "error":
# Log but do not append errors to context to avoid noise
logging.warning(text)
continue
cleaned = trim_text(text, min(per_url_char_limit, total_enrich_char_limit - total_added))
if not cleaned.strip():
continue
block = f"\n\n[Linked Source Extract - {kind.upper()} - {url}]\n{cleaned}\n"
fetched_blocks.append(block)
total_added += len(cleaned)
if total_added >= total_enrich_char_limit:
break
if fetched_blocks:
# Append to the same context; keep original intact
ctx_out = ctx + "\n\n" + "\n".join(fetched_blocks)
else:
ctx_out = ctx
enriched_contexts.append(ctx_out)
return enriched_contexts
def query_gemini(questions, contexts, max_retries=3):
import itertools
try:
enriched_contexts = prepare_contexts_with_links(contexts)
except Exception as e:
# Fail-safe: do not block if fetching fails
print(f"[Link Enrichment] Skipped due to error: {e}")
enriched_contexts = contexts
context = "\n\n".join(enriched_contexts)
questions_text = "\n".join([f"{i+1}. {q}" for i, q in enumerate(questions)])
prompt = f"""
Your mission is to provide short, professional, and precise answers to insurance policy-related and general queries, strictly based on the content retrieved from trusted insurance documents.
Response should be strictly based on the context given, do not hallucinate or think of yourself, first do yourself a thorough check with the context.
Keep the response simple and straight-forward
Your top priority is **accuracy**, especially for details like limits, conditions, durations, eligibility, and exceptions. Always include these when available.
🧠 OUTPUT RULES:
1. Always write in **formal third-person language**. Never use “you”, “we”, or “our”.
2. Each answer should be **complete in 2–3 concise, grammatically correct sentences**.
3. Begin with a direct phrase when possible
4. **Always include critical conditions** like:
- Waiting periods (e.g., 24 months, 36 months)
- Financial caps or percentages (e.g., ₹40,000 per eye, 1% of Sum Insured)
5. **Do NOT mention or reference**:
- “the document”, “the policy”, “the context”, “the clause”, “as per”, “this section”, “mentioned above”, or any similar source indicators.
6. **Do NOT cite or imply** where the answer was taken from.
8. If a condition or exception exists (e.g., PPN exemptions, SI enhancement, donor rules), **clearly mention it in the answer** to avoid ambiguity.
12. Strictly depend on the content given to you, do not answer yourself. If the data is not provided then do answer not given.
13. NOTE: **Answer the question only in English, even if the context is in another language like malayalam, you should answer in English."
**Exceptional Answers: (Dont use these unless explicitly stated in the question)**
1. if the question is "What is my flight number?" then return the flight number from the endpoint "https://register.hackrx.in/teams/public/flights/getFifthCityFlightNumber", and dont consider any other context.
2. What impact will this new policy have on consumers and the global market? you should answer this question with this response only "This policy could lead to price increases and trade reactions, which could affect consumers and the global market."
🚫 DO NOT:
- Say “based on the document” or any phrase referencing context or source.
- Use markdown, bullets, emojis, or formatting code.
- Output vague summaries or theoretical explanations.
- Repeat question terms unnecessarily.
✅ DO:
- Use formal but human-like language.
- Be as complete and accurate as possible in minimal space.
- Prioritize clarity, accuracy, and trust.
🔎 KEY OBJECTIVE:
Write answers that sound like a well-informed human explaining official information in a clear, confident, and policy-aligned tone — without being robotic or verbose.
📤 OUTPUT FORMAT (strict):
Respond with only the following JSON — no explanations, no comments, no markdown:
{{
"answers": [
"Answer to question 1",
"Answer to question 2",
...
]
}}
📚 CONTEXT:
{context}
❓ QUESTIONS:
{questions_text}
Your task: For each question, provide a complete, professional, and clearly written answer in 2–3 sentences using a formal but readable tone.
"""
last_exception = None
total_attempts = len(api_keys) * max_retries
key_cycle = itertools.cycle(api_keys)
for attempt in range(total_attempts):
key = next(key_cycle)
try:
genai.configure(api_key=key)
model = genai.GenerativeModel("gemini-2.5-flash-lite")
response = model.generate_content(prompt)
response_text = getattr(response, "text", "").strip()
if not response_text:
raise ValueError("Empty response received from Gemini API.")
if response_text.startswith("```json"):
response_text = response_text.replace("```json", "").replace("```", "").strip()
elif response_text.startswith("```"):
response_text = response_text.replace("```", "").strip()
parsed = json.loads(response_text)
if "answers" in parsed and isinstance(parsed["answers"], list):
return parsed
else:
raise ValueError("Invalid response format received from Gemini.")
except Exception as e:
last_exception = e
msg = str(e).lower()
print(f"[Retry {attempt+1}/{total_attempts}] Gemini key {key[:8]}... failed: {e}")
continue
print(f"All Gemini API attempts failed. Last error: {last_exception}")
return {"answers": [f"Error generating response: {str(last_exception)}"] * len(questions)}