Spaces:
Sleeping
Sleeping
| import json | |
| import logging | |
| import os | |
| from pathlib import Path | |
| import requests | |
| from pprint import pprint | |
| from src.schemas import UploadResult | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| if not logger.hasHandlers(): | |
| handler = logging.StreamHandler() | |
| handler.setLevel(logging.INFO) | |
| formatter = logging.Formatter('%(asctime)s %(levelname)s %(name)s: %(message)s') | |
| handler.setFormatter(formatter) | |
| logger.addHandler(handler) | |
| class VectaraAPIError(Exception): | |
| """Custom exception for Vectara API errors.""" | |
| pass | |
| class IndexingError(Exception): | |
| """Custom exception for general Indexing errors.""" | |
| pass | |
| def load_environment_variables(): | |
| """ | |
| Load environment variables from a .env file. | |
| This function is useful for local development to avoid hardcoding sensitive information. | |
| """ | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| if not os.getenv("VECTARA_API_KEY"): | |
| raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") | |
| def is_allowed_filetype(suffix: str): | |
| # Commonmark / Markdown (md extension). | |
| # PDF/A (pdf). | |
| # Open Office (odt). | |
| # Microsoft Word (doc, docx). | |
| # Microsoft Powerpoint (ppt, pptx). | |
| # Text files (txt). | |
| # HTML files (.html). | |
| # LXML files (.lxml). | |
| # RTF files (.rtf). | |
| # ePUB files (.epub). | |
| return suffix in [".md", ".pdf", ".odt", ".doc", ".docx", ".ppt", ".pptx", ".txt", ".html", ".lxml", ".rtf", ".epub"] | |
| def save_response_to_file(response_json: dict, filename: str): | |
| """ | |
| Saves the Vectara API response to a JSON file. | |
| Args: | |
| response_json (dict): The Vectara API response. | |
| filename (str): The name of the file to save the response to. | |
| """ | |
| with open(filename, "w") as f: | |
| json.dump(response_json, f, indent=2) | |
| def upload_file_to_vectara(file_bytes: bytes, filename: str) -> UploadResult: | |
| """ | |
| Uploads a supported file type to Vectara for processing. | |
| Args: | |
| file_bytes (bytes): The file content in bytes. | |
| filename (str): The name of the file. | |
| Returns: | |
| None | |
| Raises: | |
| VectaraAPIError: If there's an error during the Vectara API call. | |
| IndexingError: For other processing errors | |
| """ | |
| CORPUS_KEY = "YouTwo" # Replace with your actual corpus key | |
| # Check if file_bytes is provided | |
| if not file_bytes: | |
| raise IndexingError("No file bytes provided.") | |
| suffix = Path(filename).suffix | |
| # Ensure valid filename | |
| if not is_allowed_filetype(suffix): | |
| raise IndexingError("Invalid filename. Please provide a filename ending with .pdf") | |
| # Replace with your actual corpus_key and API key | |
| api_key = os.getenv("VECTARA_API_KEY") | |
| if not api_key: | |
| raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") | |
| url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/upload_file" | |
| headers = { | |
| "Accept": "application/json", | |
| "x-api-key": api_key, | |
| } | |
| files = { | |
| 'file': (filename, file_bytes) | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, files=files) | |
| response.raise_for_status() # Raise an exception for HTTP errors | |
| response_json = response.json() | |
| result = process_upload_response(response_json) | |
| # You might want to store some information from the Vectara response | |
| # in your session object, e.g., document ID. | |
| return result | |
| except requests.exceptions.RequestException as e: | |
| raise VectaraAPIError(f"Error uploading to Vectara: {e}") from e | |
| except Exception as e: | |
| raise VectaraAPIError(f"An unexpected error occurred during PDF upload: {e}") from e | |
| def process_upload_response(response_json: dict) -> UploadResult: | |
| """ | |
| Stores | |
| Args: | |
| response_json (dict): The Vectara API response. | |
| Returns: | |
| UploadResult: The upload result. | |
| """ | |
| log_filename = "upload_results.json" | |
| save_response_to_file(response_json, log_filename) | |
| logger.info(f"Saved response to file: {log_filename}") | |
| # pprint(response_json) | |
| return UploadResult( | |
| id=response_json["id"], | |
| metadata=response_json["metadata"], | |
| storage_usage=response_json["storage_usage"] | |
| ) | |
| # See https://docs.vectara.com/docs/rest-api/query-corpus | |
| def retrieve_chunks(query: str, limit: int = 10, filter_by_id: str = None) -> tuple[list[str], str]: | |
| """ | |
| Retrieves relevant chunks and a generated summary from the Vectara corpus based on the query. | |
| Args: | |
| query (str): The user's query. | |
| Returns: | |
| tuple[list[str], str]: A tuple containing a list of retrieved text chunks and the llm generation. | |
| """ | |
| CORPUS_KEY = "YouTwo" # Replace with your actual corpus key | |
| api_key = os.getenv("VECTARA_API_KEY") | |
| if not api_key: | |
| raise IndexingError("Vectara API key not set. Please set the VECTARA_API_KEY environment variable.") | |
| url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/query" | |
| headers = { | |
| "Accept": "application/json", | |
| "x-api-key": api_key, | |
| "Content-Type": "application/json" | |
| } | |
| metadata_filter = f"doc.id='{filter_by_id}'" if filter_by_id else None | |
| if metadata_filter: | |
| search = { | |
| "metadata_filter": metadata_filter, | |
| "limit": limit, | |
| } | |
| else: | |
| search = { | |
| "limit": limit, | |
| } | |
| payload = { | |
| "query": query, | |
| "search": search, | |
| "generation": { | |
| "generation_preset_name": "mockingbird-2.0", # Using Mockingbird for RAG | |
| "max_used_search_results": 5, | |
| "response_language": "eng", | |
| "enable_factual_consistency_score": True, | |
| # "prompt_template": "[\n {\"role\": \"system\", \"content\": \"You are a helpful search assistant.\"},\n #foreach ($qResult in $vectaraQueryResults)\n {\"role\": \"user\", \"content\": \"Given the $vectaraIdxWord[$foreach.index] search result.\"},\n {\"role\": \"assistant\", \"content\": \"${qResult.getText()}\" },\n #end\n {\"role\": \"user\", \"content\": \"Generate a summary for the query '${vectaraQuery}' based on the above results.\"}\n]\n", | |
| }, | |
| # NOTE: We can stream response | |
| "stream_response": False, | |
| "save_history": True, | |
| "intelligent_query_rewriting": False | |
| } | |
| try: | |
| response = requests.post(url, headers=headers, json=payload) | |
| response.raise_for_status() | |
| response_json = response.json() | |
| pprint(response_json) | |
| # TODO: Parse Output here | |
| retrieved_chunks = [] | |
| # Extract search results (chunks) | |
| # The structure of the response has changed, adapt extraction logic | |
| if "search_results" in response_json: | |
| for search_result in response_json["search_results"]: | |
| if "text" in search_result: | |
| retrieved_chunks.append(search_result["text"]) | |
| # Extract generated summary | |
| if "summary" in response_json: # Changed from generation_response to summary | |
| generated_response = response_json["summary"] # Changed from generation_response["text"] to summary | |
| print(f"Factual Consistency Score: {response_json.get('factual_consistency_score')}") # Moved factual_consistency_score to top level | |
| else: | |
| generated_response = "" | |
| print("No generated response found in the Vectara response.") | |
| return retrieved_chunks, generated_response | |
| except requests.exceptions.RequestException as e: | |
| raise VectaraAPIError(f"Error querying Vectara: {e}") from e | |
| except Exception as e: | |
| raise VectaraAPIError(f"An unexpected error occurred during Vectara query: {e}") from e | |
| def fetch_documents_from_corpus(limit: int = 10, metadata_filter: str = None, page_key: str = None) -> dict: | |
| """ | |
| Fetches documents from a specific Vectara corpus. | |
| Args: | |
| limit (int, optional): Maximum number of documents to return. Must be between 1 and 100. Defaults to 10. | |
| metadata_filter (str, optional): Filter documents by metadata. Uses expression similar to query metadata filter. | |
| page_key (str, optional): Key used to retrieve the next page of documents after the limit has been reached. | |
| request_timeout (int, optional): Time in seconds the API will attempt to complete the request before timing out. | |
| request_timeout_millis (int, optional): Time in milliseconds the API will attempt to complete the request. | |
| Returns: | |
| dict: The response from the Vectara API containing the requested documents. | |
| Raises: | |
| VectaraAPIError: If there's an error with the Vectara API request. | |
| """ | |
| import os | |
| import requests | |
| CORPUS_KEY = "YouTwo" | |
| request_timeout = 20 | |
| request_timeout_millis = 60000 | |
| # Validate inputs | |
| if limit is not None and (limit < 1 or limit > 100): | |
| raise ValueError("Limit must be between 1 and 100") | |
| if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY): | |
| raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$") | |
| # Prepare request | |
| vectara_api_key = os.getenv("VECTARA_API_KEY") | |
| if not vectara_api_key: | |
| raise VectaraAPIError("Vectara API key not found in environment variables") | |
| url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents" | |
| headers = { | |
| "Accept": "application/json", | |
| "x-api-key": vectara_api_key | |
| } | |
| payload = {} | |
| # Build query params | |
| params = {} | |
| if limit is not None: | |
| params["limit"] = limit | |
| if metadata_filter is not None: | |
| params["metadata_filter"] = metadata_filter | |
| if page_key is not None: | |
| params["page_key"] = page_key | |
| try: | |
| response = requests.get(url, headers=headers, params=params) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise VectaraAPIError(f"Error fetching documents from Vectara corpus: {e}") from e | |
| except Exception as e: | |
| raise VectaraAPIError(f"An unexpected error occurred while fetching documents: {e}") from e | |
| def fetch_document_by_id(document_id: str) -> dict: | |
| """ | |
| Retrieves the content and metadata of a specific document by its ID. | |
| Args: | |
| document_id (str): The document ID to retrieve. Must be percent encoded. | |
| Returns: | |
| dict: The document data including content and metadata. | |
| Raises: | |
| VectaraAPIError: If there's an error with the Vectara API request. | |
| """ | |
| import os | |
| import requests | |
| from urllib.parse import quote | |
| CORPUS_KEY = "YouTwo" | |
| request_timeout = 20 | |
| request_timeout_millis = 60000 | |
| # Validate corpus key | |
| if len(CORPUS_KEY) > 50 or not all(c.isalnum() or c in ['_', '=', '-'] for c in CORPUS_KEY): | |
| raise ValueError("corpus_key must be <= 50 characters and match regex [a-zA-Z0-9_\\=\\-]+$") | |
| # Prepare request | |
| vectara_api_key = os.getenv("VECTARA_API_KEY") | |
| if not vectara_api_key: | |
| raise VectaraAPIError("Vectara API key not found in environment variables") | |
| # Ensure document_id is percent encoded | |
| encoded_document_id = quote(document_id) | |
| url = f"https://api.vectara.io/v2/corpora/{CORPUS_KEY}/documents/{encoded_document_id}" | |
| headers = { | |
| "Accept": "application/json", | |
| "x-api-key": vectara_api_key | |
| } | |
| payload = {} | |
| # Set timeout parameters if needed | |
| params = {} | |
| if request_timeout is not None: | |
| headers["Request-Timeout"] = str(request_timeout) | |
| if request_timeout_millis is not None: | |
| headers["Request-Timeout-Millis"] = str(request_timeout_millis) | |
| try: | |
| response = requests.get(url, headers=headers, params=params, data=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.exceptions.RequestException as e: | |
| raise VectaraAPIError(f"Error fetching document from Vectara: {e}") from e | |
| except Exception as e: | |
| raise VectaraAPIError(f"An unexpected error occurred while fetching document: {e}") from e | |
| # This is still a placeholder | |
| def generate_llm_response(chat_state: list[dict], retrieved_chunks: list[str], summary: str) -> str: | |
| """ | |
| Generates an LLM response based on chat state, retrieved chunks, and a generated summary. | |
| In this updated version, the summary from Vectara is directly used as the LLM response. | |
| Args: | |
| chat_state (list[dict]): The current conversation history/chat state (not directly used here but kept for signature consistency). | |
| retrieved_chunks (list[str]): The chunks retrieved from the RAG system (can be used for additional context if needed). | |
| summary (str): The summary generated by Vectara's RAG. | |
| Returns: | |
| str: The LLM's generated response (which is the Vectara summary). | |
| """ | |
| print("Using Vectara generated summary as LLM response.") | |
| if summary: | |
| return summary | |
| else: | |
| # Fallback if for some reason summary is empty, though it shouldn't be with successful RAG | |
| context = "\n".join(retrieved_chunks) | |
| return f"Based on the retrieved information:\n{context}\n\nNo summary was generated, but here's the raw context." | |
| def test_file_upload(): | |
| # Change filepath | |
| FILEPATH = "~/Downloads/Linux-Essentials-Training-Course-craw-updated.pdf" | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| try: | |
| pdf_path = Path(FILEPATH).expanduser() | |
| with open(pdf_path, "rb") as f: | |
| pdf_bytes = f.read() | |
| upload_file_to_vectara(pdf_bytes, pdf_path.name) | |
| except Exception as e: | |
| raise IndexingError(f"Error occurred while uploading PDF: {e}") | |
| if __name__ == "__main__": | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| chunks, summary = retrieve_chunks("What is the main idea of the document?") | |
| print(chunks) | |
| print(summary) |