Spaces:
Sleeping
Sleeping
| import os | |
| import subprocess | |
| import sys | |
| from langchain_community.embeddings import OpenAIEmbeddings | |
| from dotenv import load_dotenv | |
| def install_packages(): | |
| # List of packages to install in separate batches | |
| packages_batches = [ | |
| ["langchain", "langchain-openai", "langchain_core", "langchain-community", "langchainhub", "openai", "langchain-qdrant"], | |
| ["qdrant-client", "pymupdf", "pandas"], | |
| ["llama-index", "--no-cache-dir"], | |
| ["llama-parse", "PyPDF2", "tiktoken"], | |
| ["langchain-text-splitters"], | |
| ["PyPDF2"], | |
| ["scikit-learn"] | |
| ] | |
| # Install each batch of packages | |
| for package_list in packages_batches: | |
| try: | |
| print(f"Installing: {' '.join(package_list)}") | |
| subprocess.check_call([sys.executable, "-m", "pip", "install"] + package_list) | |
| print(f"Successfully installed: {' '.join(package_list)}\n") | |
| except subprocess.CalledProcessError as e: | |
| print(f"Failed to install {package_list}: {e}\n") | |
| # Call the function to install the packages | |
| if __name__ == "__main__": | |
| install_packages() | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Get the OpenAI API key from the environment variables | |
| api_key = os.getenv("OPENAI_API_KEY") | |
| # Check if the API key is loaded | |
| if not api_key: | |
| print("OpenAI API key not found. Please ensure it is set in the .env file.") | |
| else: | |
| print("OpenAI API key loaded successfully.") | |
| import nest_asyncio | |
| nest_asyncio.apply() | |
| # Function to extract text from PDF URLs | |
| import re | |
| import requests | |
| from PyPDF2 import PdfReader | |
| from io import BytesIO | |
| # URLs for the two PDFs | |
| pdf_urls = [ | |
| "https://www.whitehouse.gov/wp-content/uploads/2022/10/Blueprint-for-an-AI-Bill-of-Rights.pdf", | |
| "https://nvlpubs.nist.gov/nistpubs/ai/NIST.AI.600-1.pdf" | |
| ] | |
| def extract_text_from_pdf(url): | |
| response = requests.get(url) | |
| pdf_file = BytesIO(response.content) | |
| reader = PdfReader(pdf_file) | |
| pdf_text = "" | |
| for page in reader.pages: | |
| pdf_text += page.extract_text() | |
| cleaned_text = pdf_text.replace("\n", " ").replace("\r", " ").strip() | |
| cleaned_text = " ".join(cleaned_text.split()) | |
| sentences = re.split(r'(?<=[.!?]) +', cleaned_text) | |
| return sentences | |
| # Extract text from both PDFs | |
| sentences_list = [] | |
| for url in pdf_urls: | |
| sentences = extract_text_from_pdf(url) | |
| sentences_list.append(sentences) | |
| print(f"Extracted {len(sentences)} sentences from {url}") | |
| # Semantic chunking | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import tiktoken | |
| import numpy as np | |
| embedding_model = OpenAIEmbeddings() | |
| flat_sentences = [sentence for sublist in sentences_list for sentence in sublist] | |
| embeddings = embedding_model.embed_documents(flat_sentences) | |
| def greedy_chunk_sentences(sentences, sentence_embeddings, max_chunk_size=1000, similarity_threshold=0.75): | |
| chunks = [] | |
| current_chunk = [] | |
| current_chunk_tokens = 0 | |
| encoder = tiktoken.get_encoding("cl100k_base") | |
| for i, sentence in enumerate(sentences): | |
| sentence_tokens = len(encoder.encode(sentence)) | |
| if current_chunk: | |
| similarity = cosine_similarity([sentence_embeddings[i]], [sentence_embeddings[i - 1]])[0][0] | |
| if similarity < similarity_threshold or current_chunk_tokens + sentence_tokens > max_chunk_size: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [] | |
| current_chunk_tokens = 0 | |
| current_chunk.append(sentence) | |
| current_chunk_tokens += sentence_tokens | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| # Perform greedy chunking | |
| semantic_chunks = greedy_chunk_sentences(sentences_list[0], embeddings) | |
| # Qdrant setup for storing chunks | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.http.models import Distance, VectorParams | |
| from langchain_qdrant import QdrantVectorStore | |
| from langchain.schema import Document | |
| import uuid | |
| LOCATION = ":memory:" | |
| COLLECTION_NAME = "Semantic_Chunking" | |
| qdrant_client = QdrantClient(LOCATION) | |
| qdrant_client.create_collection( | |
| collection_name=COLLECTION_NAME, | |
| vectors_config=VectorParams(size=1536, distance=Distance.COSINE) | |
| ) | |
| qdrant_vector_store = QdrantVectorStore( | |
| client=qdrant_client, | |
| collection_name=COLLECTION_NAME, | |
| embedding=embedding_model, | |
| ) | |
| documents = [Document(page_content=chunk, metadata={"source": "generated"}, id=str(uuid.uuid4())) for chunk in semantic_chunks] | |
| qdrant_vector_store.add_documents(documents) | |
| # Retrieve data from Qdrant | |
| retriever = qdrant_vector_store.as_retriever() | |
| # Define prompt and execute RAG chain | |
| from langchain.prompts import ChatPromptTemplate | |
| from operator import itemgetter | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.runnables import RunnablePassthrough | |
| template = """ | |
| ### You are a helpful assistant. Use the available context to answer the question. If you can't answer the question, say you don't know. | |
| Question: | |
| {question} | |
| Context: | |
| {context} | |
| """ | |
| prompt = ChatPromptTemplate.from_template(template) | |
| primary_qa_llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0) | |
| retrieval_augmented_qa_chain = ( | |
| {"context": itemgetter("question") | retriever, "question": itemgetter("question")} | |
| | RunnablePassthrough.assign(context=itemgetter("context")) | |
| | {"response": prompt | primary_qa_llm, "context": itemgetter("context")} | |
| ) | |
| # Query the RAG chain | |
| question = "What are the top AI risks and how to best manage them?" | |
| result = retrieval_augmented_qa_chain.invoke({"question": question}) | |
| print(result["response"].content) | |