Spaces:
Sleeping
Sleeping
| from typing import List | |
| from .constants import stopwords, short_words_mappings | |
| from langchain_core.documents import Document | |
| from app.utils.model_factory import get_local_model | |
| import spacy | |
| from pathlib import Path | |
| import re | |
| nlp = spacy.load('en_core_web_sm') | |
| def lowercase(text: str): | |
| return text.strip() | |
| def tokenization(text: str): | |
| if text is None or len(text) == 0: | |
| return [] | |
| results = lowercase(text).split(" ") | |
| return results | |
| def stop_words_removal(text: str, short_words_mapping: bool = False): | |
| if not text: | |
| return [] | |
| doc = nlp(text) | |
| results = [] | |
| for token in doc: | |
| if token.is_space: | |
| continue | |
| if token.pos_ not in ["NOUN", "PROPN", "VERB", "NUM", "ADJ"]: | |
| continue | |
| word = token.text.lower() | |
| if short_words_mapping and word in short_words_mappings: | |
| word = short_words_mappings[word] | |
| doc2 = nlp(word) | |
| lemma = doc2[0].lemma_ | |
| else: | |
| lemma = token.lemma_ | |
| lemma = lemma.strip().lower() | |
| if lemma and lemma not in stopwords: | |
| results.append(lemma) | |
| return results | |
| def space_removal(words: List[str]): | |
| results = [] | |
| for word in words: | |
| word = word.strip() | |
| if(word == ""): | |
| continue | |
| results.append(word.strip()) | |
| return results | |
| def preprocess(text: str, short_words_mapping: bool = False) -> str: | |
| if text is None or len(text) == 0: | |
| raise ValueError("Text cannot be empty") | |
| stop_words_removed_chunks = stop_words_removal(text, short_words_mapping) | |
| return " ".join(stop_words_removed_chunks) | |
| def normalize(text: str) -> str: | |
| if text is None or len(text) == 0: | |
| raise ValueError("Text cannot be empty") | |
| chunks = tokenization(text) | |
| r1 = space_removal(chunks) | |
| return " ".join(r1) | |
| def preprocess_document(doc: Document): | |
| if(doc.page_content == ""): | |
| return | |
| doc.page_content = preprocess(doc.page_content) | |
| def preprocess_documents(docs: List[Document]): | |
| for doc in docs: | |
| preprocess_document(doc) | |
| def preprocess_query(query: str) -> str: | |
| if query is None or len(query.strip()) == 0: | |
| raise ValueError("Query cannot be empty") | |
| # model = get_local_model() | |
| # prompt = f"""Rewrite this query for better semantic search/embeddings: | |
| # Make it more descriptive, clear, natural. Keep core intent. | |
| # Query: "{query}" | |
| # Improved: | |
| # """ | |
| # response = model.invoke(prompt) | |
| # cleaned = re.sub(r'^\s*Improved:\s*', '', response.content.strip(), flags=re.IGNORECASE).strip() | |
| return normalize(query) | |
| def preprocess_filename(filePath: Path) -> str: | |
| file_name = filePath.name | |
| name = Path(file_name).stem | |
| ext = Path(file_name).suffix.lower() | |
| # Remove special characters but keep letters, numbers, _ and - | |
| safe_name = re.sub(r'[^a-zA-Z0-9_-]', '', name) | |
| # Convert to lowercase | |
| safe_name = safe_name.lower() | |
| # Fallback if name becomes empty (e.g. "!!!.pdf") | |
| if not safe_name: | |
| safe_name = "file" | |
| return safe_name + ext | |