from typing import List from .constants import stopwords, short_words_mappings from langchain_core.documents import Document from app.utils.model_factory import get_local_model import spacy from pathlib import Path import re nlp = spacy.load('en_core_web_sm') def lowercase(text: str): return text.strip() def tokenization(text: str): if text is None or len(text) == 0: return [] results = lowercase(text).split(" ") return results def stop_words_removal(text: str, short_words_mapping: bool = False): if not text: return [] doc = nlp(text) results = [] for token in doc: if token.is_space: continue if token.pos_ not in ["NOUN", "PROPN", "VERB", "NUM", "ADJ"]: continue word = token.text.lower() if short_words_mapping and word in short_words_mappings: word = short_words_mappings[word] doc2 = nlp(word) lemma = doc2[0].lemma_ else: lemma = token.lemma_ lemma = lemma.strip().lower() if lemma and lemma not in stopwords: results.append(lemma) return results def space_removal(words: List[str]): results = [] for word in words: word = word.strip() if(word == ""): continue results.append(word.strip()) return results def preprocess(text: str, short_words_mapping: bool = False) -> str: if text is None or len(text) == 0: raise ValueError("Text cannot be empty") stop_words_removed_chunks = stop_words_removal(text, short_words_mapping) return " ".join(stop_words_removed_chunks) def normalize(text: str) -> str: if text is None or len(text) == 0: raise ValueError("Text cannot be empty") chunks = tokenization(text) r1 = space_removal(chunks) return " ".join(r1) def preprocess_document(doc: Document): if(doc.page_content == ""): return doc.page_content = preprocess(doc.page_content) def preprocess_documents(docs: List[Document]): for doc in docs: preprocess_document(doc) def preprocess_query(query: str) -> str: if query is None or len(query.strip()) == 0: raise ValueError("Query cannot be empty") # model = get_local_model() # prompt = f"""Rewrite this query for better semantic search/embeddings: # Make it more descriptive, clear, natural. Keep core intent. # Query: "{query}" # Improved: # """ # response = model.invoke(prompt) # cleaned = re.sub(r'^\s*Improved:\s*', '', response.content.strip(), flags=re.IGNORECASE).strip() return normalize(query) def preprocess_filename(filePath: Path) -> str: file_name = filePath.name name = Path(file_name).stem ext = Path(file_name).suffix.lower() # Remove special characters but keep letters, numbers, _ and - safe_name = re.sub(r'[^a-zA-Z0-9_-]', '', name) # Convert to lowercase safe_name = safe_name.lower() # Fallback if name becomes empty (e.g. "!!!.pdf") if not safe_name: safe_name = "file" return safe_name + ext