YouTwo / src /yt_rag /rag.py
Tanuj
Move files to src folder
180024b
import json
import logging
import os
from pathlib import Path
import requests
from pprint import pprint
from src.schemas import UploadResult
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
if not logger.hasHandlers():
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
class VectaraAPIError(Exception):
"""Custom exception for Vectara API errors."""
pass
class IndexingError(Exception):
"""Custom exception for general Indexing errors."""
pass
def load_environment_variables():
"""
Load environment variables from a .env file.
This function is useful for local development to avoid hardcoding sensitive information.
"""
from dotenv import load_dotenv
load_dotenv()
if not os.getenv("VECTARA_API_KEY"):
raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.")
def is_allowed_filetype(suffix: str):
# Commonmark / Markdown (md extension).
# PDF/A (pdf).
# Open Office (odt).
# Microsoft Word (doc, docx).
# Microsoft Powerpoint (ppt, pptx).
# Text files (txt).
# HTML files (.html).
# LXML files (.lxml).
# RTF files (.rtf).
# ePUB files (.epub).
return suffix in [".md", ".pdf", ".odt", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".html", ".lxml", ".rtf", ".epub"]
def save_response_to_file(response_json: dict, filename: str):
"""
Saves the Vectara API response to a JSON file.
Args:
response_json (dict): The Vectara API response.
filename (str): The name of the file to save the response to.
"""
with open(filename, "w") as f:
json.dump(response_json, f, indent=2)
def upload_file_to_vectara(file_bytes: bytes, filename: str) -> UploadResult:
"""
Uploads a supported file type to Vectara for processing.
Args:
file_bytes (bytes): The file content in bytes.
filename (str): The name of the file.
Returns:
None
Raises:
VectaraAPIError: If there's an error during the Vectara API call.
IndexingError: For other processing errors
"""
CORPUS_KEY = "YouTwo" # Replace with your actual corpus key
# Check if file_bytes is provided
if not file_bytes:
raise IndexingError("No file bytes provided.")
suffix = Path(filename).suffix
# Ensure valid filename
if not is_allowed_filetype(suffix):
raise IndexingError("Invalid filename. Please provide a filename ending with .pdf")
# Replace with your actual corpus_key and API key
api_key = os.getenv("VECTARA_API_KEY")
if not api_key:
raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.")
url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/upload_file"
headers = {
"Accept": "application/json",
"x-api-key": api_key,
}
files = {
'file': (filename, file_bytes)
}
try:
response = requests.post(url, headers=headers, files=files)
response.raise_for_status() # Raise an exception for HTTP errors
response_json = response.json()
result = process_upload_response(response_json)
# You might want to store some information from the Vectara response
# in your session object, e.g., document ID.
return result
except requests.exceptions.RequestException as e:
raise VectaraAPIError(f"Error uploading to Vectara: {e}") from e
except Exception as e:
raise VectaraAPIError(f"An unexpected error occurred during PDF upload: {e}") from e
def process_upload_response(response_json: dict) -> UploadResult:
"""
Stores
Args:
response_json (dict): The Vectara API response.
Returns:
UploadResult: The upload result.
"""
log_filename = "upload_results.json"
save_response_to_file(response_json, log_filename)
logger.info(f"Saved response to file: {log_filename}")
# pprint(response_json)
return UploadResult(
id=response_json["id"],
metadata=response_json["metadata"],
storage_usage=response_json["storage_usage"]
)
# See https://docs.vectara.com/docs/rest-api/query-corpus
def retrieve_chunks(query: str, limit: int = 10, filter_by_id: str = None) -> tuple[list[str], str]:
"""
Retrieves relevant chunks and a generated summary from the Vectara corpus based on the query.
Args:
query (str): The user's query.
Returns:
tuple[list[str], str]: A tuple containing a list of retrieved text chunks and the llm generation.
"""
CORPUS_KEY = "YouTwo" # Replace with your actual corpus key
api_key = os.getenv("VECTARA_API_KEY")
if not api_key:
raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.")
url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/query"
headers = {
"Accept": "application/json",
"x-api-key": api_key,
"Content-Type": "application/json"
}
metadata_filter = f"doc.id='{filter_by_id}'" if filter_by_id else None
if metadata_filter:
search = {
"metadata_filter": metadata_filter,
"limit": limit,
}
else:
search = {
"limit": limit,
}
payload = {
"query": query,
"search": search,
"generation": {
"generation_preset_name": "mockingbird-2.0", # Using Mockingbird for RAG
"max_used_search_results": 5,
"response_language": "eng",
"enable_factual_consistency_score": True,
# "prompt_template": "[\n {\"role\": \"system\", \"content\": \"You are a helpful search assistant.\"},\n #foreach ($qResult in $vectaraQueryResults)\n {\"role\": \"user\", \"content\": \"Given the $vectaraIdxWord[$foreach.index] search result.\"},\n {\"role\": \"assistant\", \"content\": \"${qResult.getText()}\" },\n #end\n {\"role\": \"user\", \"content\": \"Generate a summary for the query '${vectaraQuery}' based on the above results.\"}\n]\n",
},
# NOTE: We can stream response
"stream_response": False,
"save_history": True,
"intelligent_query_rewriting": False
}
try:
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
response_json = response.json()
pprint(response_json)
# TODO: Parse Output here
retrieved_chunks = []
# Extract search results (chunks)
# The structure of the response has changed, adapt extraction logic
if "search_results" in response_json:
for search_result in response_json["search_results"]:
if "text" in search_result:
retrieved_chunks.append(search_result["text"])
# Extract generated summary
if "summary" in response_json: # Changed from generation_response to summary
generated_response = response_json["summary"] # Changed from generation_response["text"] to summary
print(f"Factual Consistency Score: {response_json.get('factual_consistency_score')}") # Moved factual_consistency_score to top level
else:
generated_response = ""
print("No generated response found in the Vectara response.")
return retrieved_chunks, generated_response
except requests.exceptions.RequestException as e:
raise VectaraAPIError(f"Error querying Vectara: {e}") from e
except Exception as e:
raise VectaraAPIError(f"An unexpected error occurred during Vectara query: {e}") from e
def fetch_documents_from_corpus(limit: int = 10, metadata_filter: str = None, page_key: str = None) -> dict:
"""
Fetches documents from a specific Vectara corpus.
Args:
limit (int, optional): Maximum number of documents to return. Must be between 1 and 100. Defaults to 10.
metadata_filter (str, optional): Filter documents by metadata. Uses expression similar to query metadata filter.
page_key (str, optional): Key used to retrieve the next page of documents after the limit has been reached.
request_timeout (int, optional): Time in seconds the API will attempt to complete the request before timing out.
request_timeout_millis (int, optional): Time in milliseconds the API will attempt to complete the request.
Returns:
dict: The response from the Vectara API containing the requested documents.
Raises:
VectaraAPIError: If there's an error with the Vectara API request.
"""
import os
import requests
CORPUS_KEY = "YouTwo"
request_timeout = 20
request_timeout_millis = 60000
# Validate inputs
if limit is not None and (limit < 1 or limit > 100):
raise ValueError("Limit must be between 1 and 100")
if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY):
raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$")
# Prepare request
vectara_api_key = os.getenv("VECTARA_API_KEY")
if not vectara_api_key:
raise VectaraAPIError("Vectara API key not found in environment variables")
url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents"
headers = {
"Accept": "application/json",
"x-api-key": vectara_api_key
}
payload = {}
# Build query params
params = {}
if limit is not None:
params["limit"] = limit
if metadata_filter is not None:
params["metadata_filter"] = metadata_filter
if page_key is not None:
params["page_key"] = page_key
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise VectaraAPIError(f"Error fetching documents from Vectara corpus: {e}") from e
except Exception as e:
raise VectaraAPIError(f"An unexpected error occurred while fetching documents: {e}") from e
def fetch_document_by_id(document_id: str) -> dict:
"""
Retrieves the content and metadata of a specific document by its ID.
Args:
document_id (str): The document ID to retrieve. Must be percent encoded.
Returns:
dict: The document data including content and metadata.
Raises:
VectaraAPIError: If there's an error with the Vectara API request.
"""
import os
import requests
from urllib.parse import quote
CORPUS_KEY = "YouTwo"
request_timeout = 20
request_timeout_millis = 60000
# Validate corpus key
if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY):
raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$")
# Prepare request
vectara_api_key = os.getenv("VECTARA_API_KEY")
if not vectara_api_key:
raise VectaraAPIError("Vectara API key not found in environment variables")
# Ensure document_id is percent encoded
encoded_document_id = quote(document_id)
url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents/{encoded_document_id}"
headers = {
"Accept": "application/json",
"x-api-key": vectara_api_key
}
payload = {}
# Set timeout parameters if needed
params = {}
if request_timeout is not None:
headers["Request-Timeout"] = str(request_timeout)
if request_timeout_millis is not None:
headers["Request-Timeout-Millis"] = str(request_timeout_millis)
try:
response = requests.get(url, headers=headers, params=params, data=payload)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
raise VectaraAPIError(f"Error fetching document from Vectara: {e}") from e
except Exception as e:
raise VectaraAPIError(f"An unexpected error occurred while fetching document: {e}") from e
# This is still a placeholder
def generate_llm_response(chat_state: list[dict], retrieved_chunks: list[str], summary: str) -> str:
"""
Generates an LLM response based on chat state, retrieved chunks, and a generated summary.
In this updated version, the summary from Vectara is directly used as the LLM response.
Args:
chat_state (list[dict]): The current conversation history/chat state (not directly used here but kept for signature consistency).
retrieved_chunks (list[str]): The chunks retrieved from the RAG system (can be used for additional context if needed).
summary (str): The summary generated by Vectara's RAG.
Returns:
str: The LLM's generated response (which is the Vectara summary).
"""
print("Using Vectara generated summary as LLM response.")
if summary:
return summary
else:
# Fallback if for some reason summary is empty, though it shouldn't be with successful RAG
context = "\n".join(retrieved_chunks)
return f"Based on the retrieved information:\n{context}\n\nNo summary was generated, but here's the raw context."
def test_file_upload():
# Change filepath
FILEPATH = "~/Downloads/Linux-Essentials-Training-Course-craw-updated.pdf"
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
try:
pdf_path = Path(FILEPATH).expanduser()
with open(pdf_path, "rb") as f:
pdf_bytes = f.read()
upload_file_to_vectara(pdf_bytes, pdf_path.name)
except Exception as e:
raise IndexingError(f"Error occurred while uploading PDF: {e}")
if __name__ == "__main__":
from dotenv import load_dotenv
load_dotenv()
chunks, summary = retrieve_chunks("What is the main idea of the document?")
print(chunks)
print(summary)