Spaces:
Sleeping
Sleeping
| from src.data_processing.loader import MultiFormatDocumentLoader | |
| from src.data_processing.chunker import SDPMChunker, BGEM3Embeddings | |
| import pandas as pd | |
| from typing import List, Dict, Any | |
| from pinecone import Pinecone, ServerlessSpec | |
| import time | |
| from tqdm import tqdm | |
| from dotenv import load_dotenv | |
| import os | |
| load_dotenv() | |
| # API Keys | |
| # PINECONE_API_KEY = os.getenv('PINECONE_API_KEY') | |
| embedding_model = BGEM3Embeddings(model_name="BAAI/bge-m3") | |
| def load_documents(file_paths: List[str], output_path='./data/output.md'): | |
| """ | |
| Load documents from multiple sources and combine them into a single markdown file | |
| """ | |
| loader = MultiFormatDocumentLoader( | |
| file_paths=file_paths, | |
| enable_ocr=False, | |
| enable_tables=True | |
| ) | |
| # Append all documents to the markdown file | |
| with open(output_path, 'w') as f: | |
| for doc in loader.lazy_load(): | |
| # Add metadata as YAML frontmatter | |
| f.write('---\n') | |
| for key, value in doc.metadata.items(): | |
| f.write(f'{key}: {value}\n') | |
| f.write('---\n\n') | |
| f.write(doc.page_content) | |
| f.write('\n\n') | |
| return output_path | |
| def process_chunks(markdown_path: str, chunk_size: int = 256, | |
| threshold: float = 0.7, skip_window: int = 2): | |
| """ | |
| Process the markdown file into chunks and prepare for vector storage | |
| """ | |
| chunker = SDPMChunker( | |
| embedding_model=embedding_model, | |
| chunk_size=chunk_size, | |
| threshold=threshold, | |
| skip_window=skip_window | |
| ) | |
| # Read the markdown file | |
| with open(markdown_path, 'r') as file: | |
| text = file.read() | |
| # Generate chunks | |
| chunks = chunker.chunk(text) | |
| # Prepare data for Parquet | |
| processed_chunks = [] | |
| for chunk in chunks: | |
| processed_chunks.append({ | |
| 'text': chunk.text, | |
| 'token_count': chunk.token_count, | |
| 'start_index': chunk.start_index, | |
| 'end_index': chunk.end_index, | |
| 'num_sentences': len(chunk.sentences), | |
| }) | |
| return processed_chunks | |
| def save_to_parquet(chunks: List[Dict[str, Any]], output_path='./data/chunks.parquet'): | |
| """ | |
| Save processed chunks to a Parquet file | |
| """ | |
| df = pd.DataFrame(chunks) | |
| print(f"Saving to Parquet: {output_path}") | |
| df.to_parquet(output_path) | |
| print(f"Saved to Parquet: {output_path}") | |
| return output_path | |
| class PineconeRetriever: | |
| def __init__( | |
| self, | |
| pinecone_client: Pinecone, | |
| index_name: str, | |
| namespace: str, | |
| embedding_generator: BGEM3Embeddings | |
| ): | |
| """Initialize the retriever with Pinecone client and embedding generator. | |
| Args: | |
| pinecone_client: Initialized Pinecone client | |
| index_name: Name of the Pinecone index | |
| namespace: Namespace in the index | |
| embedding_generator: BGEM3Embeddings instance | |
| """ | |
| self.pinecone = pinecone_client | |
| self.index = self.pinecone.Index(index_name) | |
| self.namespace = namespace | |
| self.embedding_generator = embedding_generator | |
| def invoke(self, question: str, top_k: int = 5): | |
| """Retrieve similar documents for a question. | |
| Args: | |
| question: Query string | |
| top_k: Number of results to return | |
| Returns: | |
| List of dictionaries containing retrieved documents | |
| """ | |
| # Generate embedding for the question | |
| question_embedding = self.embedding_generator.embed(question) | |
| question_embedding = question_embedding.tolist() | |
| # Query Pinecone | |
| results = self.index.query( | |
| namespace=self.namespace, | |
| vector=question_embedding, | |
| top_k=top_k, | |
| include_values=False, | |
| include_metadata=True | |
| ) | |
| # Format results | |
| retrieved_docs = [ | |
| {"page_content": match.metadata["text"], "score": match.score} | |
| for match in results.matches | |
| ] | |
| return retrieved_docs | |
| def ingest_data( | |
| pc, | |
| parquet_path: str, | |
| text_column: str, | |
| pinecone_client: Pinecone, | |
| index_name= "vector-index", | |
| namespace= "rag", | |
| batch_size: int = 100 | |
| ): | |
| """Ingest data from a Parquet file into Pinecone. | |
| Args: | |
| parquet_path: Path to the Parquet file | |
| text_column: Name of the column containing text data | |
| pinecone_client: Initialized Pinecone client | |
| index_name: Name of the Pinecone index | |
| namespace: Namespace in the index | |
| batch_size: Batch size for processing | |
| """ | |
| # Read Parquet file | |
| print(f"Reading Parquet file: {parquet_path}") | |
| df = pd.read_parquet(parquet_path) | |
| print(f"Total records: {len(df)}") | |
| # Create or get index | |
| if not pinecone_client.has_index(index_name): | |
| pinecone_client.create_index( | |
| name=index_name, | |
| dimension=1024, # BGE-M3 dimension | |
| metric="cosine", | |
| spec=ServerlessSpec( | |
| cloud='aws', | |
| region='us-east-1' | |
| ) | |
| ) | |
| # Wait for index to be ready | |
| while not pinecone_client.describe_index(index_name).status['ready']: | |
| time.sleep(1) | |
| index = pinecone_client.Index(index_name) | |
| # Process in batches | |
| for i in tqdm(range(0, len(df), batch_size)): | |
| batch_df = df.iloc[i:i+batch_size] | |
| # Generate embeddings for batch | |
| texts = batch_df[text_column].tolist() | |
| embeddings = embedding_model.embed_batch(texts) | |
| print(f"embeddings for batch: {i}") | |
| # Prepare records for upsert | |
| records = [] | |
| for idx, (_, row) in enumerate(batch_df.iterrows()): | |
| records.append({ | |
| "id": str(row.name), # Using DataFrame index as ID | |
| "values": embeddings[idx], | |
| "metadata": {"text": row[text_column]} | |
| }) | |
| # Upsert to Pinecone | |
| index.upsert(vectors=records, namespace=namespace) | |
| # Small delay to handle rate limits | |
| time.sleep(0.5) | |
| def get_retriever( | |
| pinecone_client: Pinecone, | |
| index_name= "vector-index", | |
| namespace= "rag" | |
| ): | |
| """Create and return a PineconeRetriever instance. | |
| Args: | |
| pinecone_client: Initialized Pinecone client | |
| index_name: Name of the Pinecone index | |
| namespace: Namespace in the index | |
| Returns: | |
| Configured PineconeRetriever instance | |
| """ | |
| return PineconeRetriever( | |
| pinecone_client=pinecone_client, | |
| index_name=index_name, | |
| namespace=namespace, | |
| embedding_generator=embedding_model | |
| ) | |
| # def main(): | |
| # # Initialize Pinecone client | |
| # pc = Pinecone(api_key=PINECONE_API_KEY) | |
| # # Define input files | |
| # file_paths=[ | |
| # # './data/2404.19756v1.pdf', | |
| # # './data/OD429347375590223100.pdf', | |
| # # './data/Project Report Format.docx', | |
| # './data/UNIT 2 GENDER BASED VIOLENCE.pptx' | |
| # ] | |
| # # Process pipeline | |
| # try: | |
| # Step 1: Load and combine documents | |
| # print("Loading documents...") | |
| # markdown_path = load_documents(file_paths) | |
| # # Step 2: Process into chunks with embeddings | |
| # print("Processing chunks...") | |
| # chunks = process_chunks(markdown_path) | |
| # # Step 3: Save to Parquet | |
| # print("Saving to Parquet...") | |
| # parquet_path = save_to_parquet(chunks) | |
| # # Step 4: Ingest into Pinecone | |
| # print("Ingesting into Pinecone...") | |
| # ingest_data( | |
| # pc, | |
| # parquet_path=parquet_path, | |
| # text_column="text", | |
| # pinecone_client=pc, | |
| # ) | |
| # # Step 5: Test retrieval | |
| # print("\nTesting retrieval...") | |
| # retriever = get_retriever( | |
| # pinecone_client=pc, | |
| # index_name="vector-index", | |
| # namespace="rag" | |
| # ) | |
| # results = retriever.invoke( | |
| # question="describe the gender based violence", | |
| # top_k=5 | |
| # ) | |
| # for i, doc in enumerate(results, 1): | |
| # print(f"\nResult {i}:") | |
| # print(f"Content: {doc['page_content']}...") | |
| # print(f"Score: {doc['score']}") | |
| # except Exception as e: | |
| # print(f"Error in pipeline: {str(e)}") | |
| # if __name__ == "__main__": | |
| # main() |