| import os | |
| import fitz | |
| import nltk | |
| import json | |
| import faiss | |
| import numpy as np | |
| from openai import OpenAI | |
| import time | |
| nltk.download("punkt") | |
| PDF_FOLDER = "backend/app/sentiment/pds" | |
| FAISS_INDEX_PATH = "backend/app/sentiment/faiss_index.idx" | |
| METADATA_PATH = "backend/app/sentiment/metadata.json" | |
| EMBEDDING_MODEL = "text-embedding-ada-002" | |
| OVERLAP_TOKENS = 50 | |
| CHUNK_SIZE_TOKENS = 200 | |
| OPENAI_API_KEY='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' | |
| client = OpenAI(api_key=OPENAI_API_KEY) | |
| def extract_text_by_page(pdf_path): | |
| """Extract text from each page of PDF separately (for metadata).""" | |
| doc = fitz.open(pdf_path) | |
| pages_text = [] | |
| for page in doc: | |
| text = page.get_text() | |
| pages_text.append(text) | |
| return pages_text | |
| def tokenize_text(text): | |
| """Tokenize text into tokens using nltk sentence tokenizer + split.""" | |
| sentences = nltk.sent_tokenize(text) | |
| tokens = [] | |
| for sentence in sentences: | |
| tokens.extend(sentence.split()) | |
| return tokens | |
| def detokenize_tokens(tokens): | |
| """Convert tokens back to text.""" | |
| return " ".join(tokens) | |
| def chunk_tokens(tokens, chunk_size=CHUNK_SIZE_TOKENS, overlap=OVERLAP_TOKENS): | |
| """Chunk tokens into overlapping chunks.""" | |
| chunks = [] | |
| start = 0 | |
| while start < len(tokens): | |
| end = start + chunk_size | |
| chunk = tokens[start:end] | |
| chunks.append(chunk) | |
| if end >= len(tokens): | |
| break | |
| start = end - overlap | |
| return chunks | |
| def get_embedding(text): | |
| response = client.embeddings.create( | |
| input=text, | |
| model="text-embedding-3-large" | |
| ) | |
| return response.data[0].embedding | |
| def build_index_and_save(): | |
| all_embeddings = [] | |
| metadata = [] | |
| print("Reading PDFs and chunking text...") | |
| for filename in os.listdir(PDF_FOLDER): | |
| if not filename.lower().endswith(".pdf"): | |
| continue | |
| pdf_path = os.path.join(PDF_FOLDER, filename) | |
| print(f"Processing {filename}") | |
| pages = extract_text_by_page(pdf_path) | |
| for page_num, page_text in enumerate(pages): | |
| page_text = page_text.lower().strip() | |
| tokens = tokenize_text(page_text) | |
| chunks_tokens = chunk_tokens(tokens) | |
| for i, chunk_tokens_ in enumerate(chunks_tokens): | |
| chunk_text = detokenize_tokens(chunk_tokens_) | |
| chunk_text = chunk_text.lower() | |
| embedding = get_embedding(chunk_text) | |
| all_embeddings.append(embedding) | |
| metadata.append({ | |
| "source_pdf": filename, | |
| "page": page_num, | |
| "chunk_index": i, | |
| "text": chunk_text[:500] | |
| }) | |
| if len(all_embeddings) % 50 == 0: | |
| save_index_and_metadata(all_embeddings, metadata) | |
| save_index_and_metadata(all_embeddings, metadata) | |
| print("Index build completed.") | |
| def save_index_and_metadata(embeddings, metadata): | |
| dimension = len(embeddings[0]) | |
| print(f"Saving index with {len(embeddings)} vectors...") | |
| embeddings_np = np.array(embeddings).astype("float32") | |
| faiss.normalize_L2(embeddings_np) | |
| index = faiss.IndexFlatIP(dimension) | |
| index.add(embeddings_np) | |
| faiss.write_index(index, FAISS_INDEX_PATH) | |
| with open(METADATA_PATH, "w", encoding="utf-8") as f: | |
| json.dump(metadata, f, ensure_ascii=False, indent=2) | |
| def load_index_and_metadata(): | |
| index = faiss.read_index(FAISS_INDEX_PATH) | |
| with open(METADATA_PATH, "r", encoding="utf-8") as f: | |
| metadata = json.load(f) | |
| return index, metadata | |
| def query_index(query, top_k=5): | |
| index, metadata = load_index_and_metadata() | |
| query = query.lower() | |
| query_embedding = get_embedding(query) | |
| query_embedding_np = np.array([query_embedding]).astype("float32") | |
| faiss.normalize_L2(query_embedding_np) | |
| distances, indices = index.search(query_embedding_np, top_k) | |
| results = [] | |
| for dist, idx in zip(distances[0], indices[0]): | |
| meta = metadata[idx] | |
| results.append({ | |
| "score": float(dist), | |
| "source_pdf": meta["source_pdf"], | |
| "page": meta["page"], | |
| "chunk_index": meta["chunk_index"], | |
| "text_snippet": meta["text"] | |
| }) | |
| return results | |
| if __name__ == "__main__": | |
| build_index_and_save() | |
| print("\nQuery results:") | |