import mimetypes import pandas as pd import PyPDF2 import json import re import spacy import os from dotenv import load_dotenv import openai import numpy as np # Load environment variables load_dotenv() # Set OpenAI API Key openai.api_key = os.getenv("OPENAI_API_KEY") # Load SpaCy model # nlp = spacy.load("en_core_web_sm") import spacy from spacy.cli import download # Ensure the model is available try: nlp = spacy.load("en_core_web_sm") except OSError: print("Downloading SpaCy 'en_core_web_sm' model...") download("en_core_web_sm") nlp = spacy.load("en_core_web_sm") # Detect file type def detect_file_type(file_path): file_type = mimetypes.guess_type(file_path)[0] if file_type in ["application/pdf"]: return "pdf" elif file_type in ["text/csv", "application/vnd.ms-excel"]: return "csv" elif file_type == "application/json": return "json" else: raise ValueError(f"Unsupported file format: {file_type}") # Extract text from CSV def extract_text_from_csv(file_path): df = pd.read_csv(file_path) text = " ".join(df.astype(str).stack()) return text # Extract text from PDF def extract_text_from_pdf(file_path): pdf_reader = PyPDF2.PdfReader(file_path) text = "" for page in pdf_reader.pages: text += page.extract_text() return text # Extract text from JSON def extract_text_from_json(file_path): def recursive_text_extraction(data): if isinstance(data, dict): return " ".join(recursive_text_extraction(value) for value in data.values()) elif isinstance(data, list): return " ".join(recursive_text_extraction(item) for item in data) else: return str(data) with open(file_path, 'r') as f: data = json.load(f) return recursive_text_extraction(data) # Generalized text extraction def extract_text(file_path): file_type = detect_file_type(file_path) if file_type == "csv": return extract_text_from_csv(file_path) elif file_type == "pdf": return extract_text_from_pdf(file_path) elif file_type == "json": return extract_text_from_json(file_path) else: raise ValueError("Unsupported file format") # Preprocess text def preprocess_text_generalized(text): text = re.sub(r"http\S+|www\S+|https\S+", "", text) # Remove URLs text = re.sub(r"[^\x20-\x7E]", "", text) # Remove non-ASCII characters text = re.sub(r"\s+", " ", text) # Normalize whitespace chunk_size = 100000 # Maximum chunk size chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] processed_chunks = [] for chunk in chunks: doc = nlp(chunk.lower()) tokens = [ token.lemma_ for token in doc if not token.is_stop and token.is_alpha ] processed_chunks.append(" ".join(tokens)) processed_text = " ".join(processed_chunks) return processed_text # Generate embeddings using OpenAI API def get_openai_embeddings(text, model="text-embedding-ada-002"): """ Generate embeddings for a given text using OpenAI API. """ try: response = openai.Embedding.create(input=text, model=model) embeddings = response["data"][0]["embedding"] return np.array(embeddings) # Convert to NumPy array for compatibility except Exception as e: print(f"Error generating embeddings: {e}") return None # Example usage if __name__ == "__main__": # Example file path file_path = "example.pdf" # Extract and preprocess text raw_text = extract_text(file_path) preprocessed_text = preprocess_text_generalized(raw_text) # Generate embeddings using OpenAI API embeddings = get_openai_embeddings(preprocessed_text) if embeddings is not None: print(f"Embeddings generated successfully. Shape: {embeddings.shape}") else: print("Failed to generate embeddings.")