Spaces:
Sleeping
Sleeping
| import os | |
| from PyPDF2 import PdfReader | |
| import docx2txt | |
| from pinecone import Pinecone, ServerlessSpec | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # -------- Document Text Extraction -------- | |
| def extract_text_from_pdf(file_path: str, use_ocr: bool = True) -> str: | |
| text = "" | |
| try: | |
| reader = PdfReader(file_path) | |
| for page in reader.pages: | |
| text += page.extract_text() or "" | |
| except Exception as e: | |
| print(f"PDF text extraction error: {e}") | |
| return text | |
| def extract_text_from_docx(file_path: str) -> str: | |
| try: | |
| return docx2txt.process(file_path) | |
| except Exception as e: | |
| print(f"DOCX extraction error: {e}") | |
| return "" | |
| def extract_text_from_txt(file_path: str) -> str: | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return f.read() | |
| except Exception as e: | |
| print(f"TXT extraction error: {e}") | |
| return "" | |
| def extract_text_from_md(file_path: str) -> str: | |
| return extract_text_from_txt(file_path) | |
| # -------- Hugging Face Embedding Setup -------- | |
| tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2") | |
| model.eval() | |
| def mean_pooling(model_output, attention_mask): | |
| token_embeddings = model_output.last_hidden_state | |
| input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
| sum_embeddings = torch.sum(token_embeddings * input_mask_expanded, dim=1) | |
| sum_mask = torch.clamp(input_mask_expanded.sum(dim=1), min=1e-9) | |
| return sum_embeddings / sum_mask | |
| def embed_text(text): | |
| encoded_input = tokenizer(text, padding=True, truncation=True, max_length=512, return_tensors='pt') | |
| with torch.no_grad(): | |
| model_output = model(**encoded_input) | |
| embeddings = mean_pooling(model_output, encoded_input['attention_mask']) | |
| normalized_embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1) | |
| return normalized_embeddings[0].cpu().numpy() | |
| # -------- Pinecone Setup -------- | |
| PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
| INDEX_NAME = "studybuddy-notes" | |
| DIMENSION = 384 # Embedding dimension from the model | |
| pc = Pinecone(api_key=PINECONE_API_KEY) | |
| index = pc.Index(INDEX_NAME) | |
| # -------- Text Chunking -------- | |
| def chunk_text(text, chunk_size=500, overlap=100): | |
| if overlap >= chunk_size: | |
| raise ValueError("Overlap must be smaller than chunk size") | |
| chunks = [] | |
| start = 0 | |
| text_length = len(text) | |
| while start < text_length: | |
| end = start + chunk_size | |
| chunks.append(text[start:end]) | |
| start += chunk_size - overlap | |
| return chunks | |
| # -------- Complete Pipeline -------- | |
| def process_file(file_path, file_type): | |
| if file_type == "pdf": | |
| text = extract_text_from_pdf(file_path) | |
| elif file_type == "docx": | |
| text = extract_text_from_docx(file_path) | |
| elif file_type == "txt": | |
| text = extract_text_from_txt(file_path) | |
| elif file_type == "md": | |
| text = extract_text_from_md(file_path) | |
| else: | |
| raise ValueError(f"Unsupported file type: {file_type}") | |
| chunks = chunk_text(text) | |
| vectors = [] | |
| for i, chunk in enumerate(chunks): | |
| vector = embed_text(chunk) | |
| vector_id = f"{os.path.basename(file_path)}_chunk_{i}" | |
| vectors.append((vector_id, vector)) | |
| index.upsert(vectors) | |
| #----retrieve from pinecone------ | |
| def retrieve_from_pinecone(query: str, top_k: int = 5): | |
| # Embed the query text | |
| query_vector = embed_text(query) | |
| # Query Pinecone index | |
| result = index.query(vector=query_vector, top_k=top_k, include_metadata=True) | |
| # Parse and return results (ID, score, metadata) | |
| matches = [] | |
| for match in result['matches']: | |
| matches.append({ | |
| 'id': match['id'], | |
| 'score': match['score'], | |
| 'metadata': match.get('metadata', {}) | |
| }) | |
| return matches |