Dynamic-chatbot-using-RAG / processing.py
Jagukumar's picture
Update processing.py
d286924 verified
import mimetypes
import pandas as pd
import PyPDF2
import json
import re
import spacy
import os
from dotenv import load_dotenv
import openai
import numpy as np
# Load environment variables
load_dotenv()
# Set OpenAI API Key
openai.api_key = os.getenv("OPENAI_API_KEY")
# Load SpaCy model
# nlp = spacy.load("en_core_web_sm")
import spacy
from spacy.cli import download
# Ensure the model is available
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
print("Downloading SpaCy 'en_core_web_sm' model...")
download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
# Detect file type
def detect_file_type(file_path):
file_type = mimetypes.guess_type(file_path)[0]
if file_type in ["application/pdf"]:
return "pdf"
elif file_type in ["text/csv", "application/vnd.ms-excel"]:
return "csv"
elif file_type == "application/json":
return "json"
else:
raise ValueError(f"Unsupported file format: {file_type}")
# Extract text from CSV
def extract_text_from_csv(file_path):
df = pd.read_csv(file_path)
text = " ".join(df.astype(str).stack())
return text
# Extract text from PDF
def extract_text_from_pdf(file_path):
pdf_reader = PyPDF2.PdfReader(file_path)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
# Extract text from JSON
def extract_text_from_json(file_path):
def recursive_text_extraction(data):
if isinstance(data, dict):
return " ".join(recursive_text_extraction(value) for value in data.values())
elif isinstance(data, list):
return " ".join(recursive_text_extraction(item) for item in data)
else:
return str(data)
with open(file_path, 'r') as f:
data = json.load(f)
return recursive_text_extraction(data)
# Generalized text extraction
def extract_text(file_path):
file_type = detect_file_type(file_path)
if file_type == "csv":
return extract_text_from_csv(file_path)
elif file_type == "pdf":
return extract_text_from_pdf(file_path)
elif file_type == "json":
return extract_text_from_json(file_path)
else:
raise ValueError("Unsupported file format")
# Preprocess text
def preprocess_text_generalized(text):
text = re.sub(r"http\S+|www\S+|https\S+", "", text) # Remove URLs
text = re.sub(r"[^\x20-\x7E]", "", text) # Remove non-ASCII characters
text = re.sub(r"\s+", " ", text) # Normalize whitespace
chunk_size = 100000 # Maximum chunk size
chunks = [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
processed_chunks = []
for chunk in chunks:
doc = nlp(chunk.lower())
tokens = [
token.lemma_
for token in doc
if not token.is_stop and token.is_alpha
]
processed_chunks.append(" ".join(tokens))
processed_text = " ".join(processed_chunks)
return processed_text
# Generate embeddings using OpenAI API
def get_openai_embeddings(text, model="text-embedding-ada-002"):
"""
Generate embeddings for a given text using OpenAI API.
"""
try:
response = openai.Embedding.create(input=text, model=model)
embeddings = response["data"][0]["embedding"]
return np.array(embeddings) # Convert to NumPy array for compatibility
except Exception as e:
print(f"Error generating embeddings: {e}")
return None
# Example usage
if __name__ == "__main__":
# Example file path
file_path = "example.pdf"
# Extract and preprocess text
raw_text = extract_text(file_path)
preprocessed_text = preprocess_text_generalized(raw_text)
# Generate embeddings using OpenAI API
embeddings = get_openai_embeddings(preprocessed_text)
if embeddings is not None:
print(f"Embeddings generated successfully. Shape: {embeddings.shape}")
else:
print("Failed to generate embeddings.")