import json import logging import os from pathlib import Path import requests from pprint import pprint from src.schemas import UploadResult logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) if not logger.hasHandlers(): handler = logging.StreamHandler() handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) class VectaraAPIError(Exception): """Custom exception for Vectara API errors.""" pass class IndexingError(Exception): """Custom exception for general Indexing errors.""" pass def load_environment_variables(): """ Load environment variables from a .env file. This function is useful for local development to avoid hardcoding sensitive information. """ from dotenv import load_dotenv load_dotenv() if not os.getenv("VECTARA_API_KEY"): raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") def is_allowed_filetype(suffix: str): # Commonmark / Markdown (md extension). # PDF/A (pdf). # Open Office (odt). # Microsoft Word (doc, docx). # Microsoft Powerpoint (ppt, pptx). # Text files (txt). # HTML files (.html). # LXML files (.lxml). # RTF files (.rtf). # ePUB files (.epub). return suffix in [".md", ".pdf", ".odt", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".html", ".lxml", ".rtf", ".epub"] def save_response_to_file(response_json: dict, filename: str): """ Saves the Vectara API response to a JSON file. Args: response_json (dict): The Vectara API response. filename (str): The name of the file to save the response to. """ with open(filename, "w") as f: json.dump(response_json, f, indent=2) def upload_file_to_vectara(file_bytes: bytes, filename: str) -> UploadResult: """ Uploads a supported file type to Vectara for processing. Args: file_bytes (bytes): The file content in bytes. filename (str): The name of the file. Returns: None Raises: VectaraAPIError: If there's an error during the Vectara API call. IndexingError: For other processing errors """ CORPUS_KEY = "YouTwo" # Replace with your actual corpus key # Check if file_bytes is provided if not file_bytes: raise IndexingError("No file bytes provided.") suffix = Path(filename).suffix # Ensure valid filename if not is_allowed_filetype(suffix): raise IndexingError("Invalid filename. Please provide a filename ending with .pdf") # Replace with your actual corpus_key and API key api_key = os.getenv("VECTARA_API_KEY") if not api_key: raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/upload_file" headers = { "Accept": "application/json", "x-api-key": api_key, } files = { 'file': (filename, file_bytes) } try: response = requests.post(url, headers=headers, files=files) response.raise_for_status() # Raise an exception for HTTP errors response_json = response.json() result = process_upload_response(response_json) # You might want to store some information from the Vectara response # in your session object, e.g., document ID. return result except requests.exceptions.RequestException as e: raise VectaraAPIError(f"Error uploading to Vectara: {e}") from e except Exception as e: raise VectaraAPIError(f"An unexpected error occurred during PDF upload: {e}") from e def process_upload_response(response_json: dict) -> UploadResult: """ Stores Args: response_json (dict): The Vectara API response. Returns: UploadResult: The upload result. """ log_filename = "upload_results.json" save_response_to_file(response_json, log_filename) logger.info(f"Saved response to file: {log_filename}") # pprint(response_json) return UploadResult( id=response_json["id"], metadata=response_json["metadata"], storage_usage=response_json["storage_usage"] ) # See https://docs.vectara.com/docs/rest-api/query-corpus def retrieve_chunks(query: str, limit: int = 10, filter_by_id: str = None) -> tuple[list[str], str]: """ Retrieves relevant chunks and a generated summary from the Vectara corpus based on the query. Args: query (str): The user's query. Returns: tuple[list[str], str]: A tuple containing a list of retrieved text chunks and the llm generation. """ CORPUS_KEY = "YouTwo" # Replace with your actual corpus key api_key = os.getenv("VECTARA_API_KEY") if not api_key: raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/query" headers = { "Accept": "application/json", "x-api-key": api_key, "Content-Type": "application/json" } metadata_filter = f"doc.id='{filter_by_id}'" if filter_by_id else None if metadata_filter: search = { "metadata_filter": metadata_filter, "limit": limit, } else: search = { "limit": limit, } payload = { "query": query, "search": search, "generation": { "generation_preset_name": "mockingbird-2.0", # Using Mockingbird for RAG "max_used_search_results": 5, "response_language": "eng", "enable_factual_consistency_score": True, # "prompt_template": "[\n {\"role\": \"system\", \"content\": \"You are a helpful search assistant.\"},\n #foreach ($qResult in $vectaraQueryResults)\n {\"role\": \"user\", \"content\": \"Given the $vectaraIdxWord[$foreach.index] search result.\"},\n {\"role\": \"assistant\", \"content\": \"${qResult.getText()}\" },\n #end\n {\"role\": \"user\", \"content\": \"Generate a summary for the query '${vectaraQuery}' based on the above results.\"}\n]\n", }, # NOTE: We can stream response "stream_response": False, "save_history": True, "intelligent_query_rewriting": False } try: response = requests.post(url, headers=headers, json=payload) response.raise_for_status() response_json = response.json() pprint(response_json) # TODO: Parse Output here retrieved_chunks = [] # Extract search results (chunks) # The structure of the response has changed, adapt extraction logic if "search_results" in response_json: for search_result in response_json["search_results"]: if "text" in search_result: retrieved_chunks.append(search_result["text"]) # Extract generated summary if "summary" in response_json: # Changed from generation_response to summary generated_response = response_json["summary"] # Changed from generation_response["text"] to summary print(f"Factual Consistency Score: {response_json.get('factual_consistency_score')}") # Moved factual_consistency_score to top level else: generated_response = "" print("No generated response found in the Vectara response.") return retrieved_chunks, generated_response except requests.exceptions.RequestException as e: raise VectaraAPIError(f"Error querying Vectara: {e}") from e except Exception as e: raise VectaraAPIError(f"An unexpected error occurred during Vectara query: {e}") from e def fetch_documents_from_corpus(limit: int = 10, metadata_filter: str = None, page_key: str = None) -> dict: """ Fetches documents from a specific Vectara corpus. Args: limit (int, optional): Maximum number of documents to return. Must be between 1 and 100. Defaults to 10. metadata_filter (str, optional): Filter documents by metadata. Uses expression similar to query metadata filter. page_key (str, optional): Key used to retrieve the next page of documents after the limit has been reached. request_timeout (int, optional): Time in seconds the API will attempt to complete the request before timing out. request_timeout_millis (int, optional): Time in milliseconds the API will attempt to complete the request. Returns: dict: The response from the Vectara API containing the requested documents. Raises: VectaraAPIError: If there's an error with the Vectara API request. """ import os import requests CORPUS_KEY = "YouTwo" request_timeout = 20 request_timeout_millis = 60000 # Validate inputs if limit is not None and (limit < 1 or limit > 100): raise ValueError("Limit must be between 1 and 100") if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY): raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$") # Prepare request vectara_api_key = os.getenv("VECTARA_API_KEY") if not vectara_api_key: raise VectaraAPIError("Vectara API key not found in environment variables") url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents" headers = { "Accept": "application/json", "x-api-key": vectara_api_key } payload = {} # Build query params params = {} if limit is not None: params["limit"] = limit if metadata_filter is not None: params["metadata_filter"] = metadata_filter if page_key is not None: params["page_key"] = page_key try: response = requests.get(url, headers=headers, params=params) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise VectaraAPIError(f"Error fetching documents from Vectara corpus: {e}") from e except Exception as e: raise VectaraAPIError(f"An unexpected error occurred while fetching documents: {e}") from e def fetch_document_by_id(document_id: str) -> dict: """ Retrieves the content and metadata of a specific document by its ID. Args: document_id (str): The document ID to retrieve. Must be percent encoded. Returns: dict: The document data including content and metadata. Raises: VectaraAPIError: If there's an error with the Vectara API request. """ import os import requests from urllib.parse import quote CORPUS_KEY = "YouTwo" request_timeout = 20 request_timeout_millis = 60000 # Validate corpus key if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY): raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$") # Prepare request vectara_api_key = os.getenv("VECTARA_API_KEY") if not vectara_api_key: raise VectaraAPIError("Vectara API key not found in environment variables") # Ensure document_id is percent encoded encoded_document_id = quote(document_id) url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents/{encoded_document_id}" headers = { "Accept": "application/json", "x-api-key": vectara_api_key } payload = {} # Set timeout parameters if needed params = {} if request_timeout is not None: headers["Request-Timeout"] = str(request_timeout) if request_timeout_millis is not None: headers["Request-Timeout-Millis"] = str(request_timeout_millis) try: response = requests.get(url, headers=headers, params=params, data=payload) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: raise VectaraAPIError(f"Error fetching document from Vectara: {e}") from e except Exception as e: raise VectaraAPIError(f"An unexpected error occurred while fetching document: {e}") from e # This is still a placeholder def generate_llm_response(chat_state: list[dict], retrieved_chunks: list[str], summary: str) -> str: """ Generates an LLM response based on chat state, retrieved chunks, and a generated summary. In this updated version, the summary from Vectara is directly used as the LLM response. Args: chat_state (list[dict]): The current conversation history/chat state (not directly used here but kept for signature consistency). retrieved_chunks (list[str]): The chunks retrieved from the RAG system (can be used for additional context if needed). summary (str): The summary generated by Vectara's RAG. Returns: str: The LLM's generated response (which is the Vectara summary). """ print("Using Vectara generated summary as LLM response.") if summary: return summary else: # Fallback if for some reason summary is empty, though it shouldn't be with successful RAG context = "\n".join(retrieved_chunks) return f"Based on the retrieved information:\n{context}\n\nNo summary was generated, but here's the raw context." def test_file_upload(): # Change filepath FILEPATH = "~/Downloads/Linux-Essentials-Training-Course-craw-updated.pdf" from pathlib import Path from dotenv import load_dotenv load_dotenv() try: pdf_path = Path(FILEPATH).expanduser() with open(pdf_path, "rb") as f: pdf_bytes = f.read() upload_file_to_vectara(pdf_bytes, pdf_path.name) except Exception as e: raise IndexingError(f"Error occurred while uploading PDF: {e}") if __name__ == "__main__": from dotenv import load_dotenv load_dotenv() chunks, summary = retrieve_chunks("What is the main idea of the document?") print(chunks) print(summary)