Spaces:
Runtime error
Runtime error
| from langchain_openai import OpenAIEmbeddings | |
| from langchain_community.vectorstores import FAISS | |
| from langchain_core.documents import Document | |
| from langchain.document_loaders import PyPDFLoader | |
| from langchain_openai import ChatOpenAI | |
| from langchain_core.output_parsers import StrOutputParser | |
| from langchain_core.prompts import PromptTemplate | |
| from uuid import uuid4 | |
| from prompt import * | |
| import random | |
| from itext2kg.models import KnowledgeGraph | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_experimental.text_splitter import SemanticChunker | |
| import faiss | |
| from langchain_community.docstore.in_memory import InMemoryDocstore | |
| from pydantic import BaseModel, Field | |
| from dotenv import load_dotenv | |
| import os | |
| from langchain_core.tools import tool | |
| import pickle | |
| import unicodedata | |
| import yaml | |
| load_dotenv() | |
| index_name = os.environ.get("INDEX_NAME") | |
| # Global initialization | |
| embedding_model = "text-embedding-3-small" | |
| embedding = OpenAIEmbeddings(model=embedding_model) | |
| # vector_store = PineconeVectorStore(index=index_name, embedding=embedding) | |
| def advanced_graph_to_json(graph:KnowledgeGraph): | |
| nodes = [] | |
| edges = [] | |
| for node in graph.entities: | |
| node_id = node.name.replace(" ", "_") | |
| label = node.name | |
| type = node.label | |
| nodes.append({"id": node_id, "label": label, "type": type}) | |
| for relationship in graph.relationships: | |
| source = relationship.startEntity | |
| source_id = source.name.replace(" ", "_") | |
| target = relationship.endEntity | |
| target_id = target.name.replace(" ", "_") | |
| label = relationship.name | |
| edges.append({"source": source_id, "label": label, "cible": target_id}) | |
| return {"noeuds": nodes, "relations": edges} | |
| def load_document(name): | |
| try: | |
| loader = PyPDFLoader(f"./data/{name}.pdf") | |
| pages = loader.load() | |
| return pages | |
| except Exception as e: | |
| print(e) | |
| print("Make sure the name indicated in the config.yaml file is the same as the pdf file in the data folder") | |
| return False | |
| def chunk_by_scene(scenes_indexes,pages,last_page=None): | |
| try: | |
| scenes = [] | |
| for i in range(len(scenes_indexes)-1): | |
| print("scene sclice",scenes_indexes[i]-1,scenes_indexes[i+1]-1) | |
| current_scene = "Scene "+ str(i+1) | |
| print("current_scene",current_scene) | |
| scene_content = "".join([page.page_content.replace("\n","").replace(" "," ").replace("\t"," ") for page in pages[scenes_indexes[i]-1:scenes_indexes[i+1]-1]]) | |
| scenes.append(scene_content) | |
| if last_page == None: | |
| last_page = len(pages) | |
| print("scene sclice",scenes_indexes[-1]-1,last_page) | |
| current_scene = "Scene "+str(len(scenes_indexes)) | |
| print("current_scene",current_scene) | |
| scene_content = "".join([page.page_content.replace("\n","").replace(" "," ").replace("\t"," ") for page in pages[scenes_indexes[-1]-1:]]) | |
| scenes.append(scene_content) | |
| return scenes | |
| except Exception as e: | |
| print(e) | |
| return False | |
| def chunk_semantic(pages): | |
| text_filtered = "\n".join([page.page_content.replace("\n","").replace(" "," ").replace("\t"," ") for page in pages]) | |
| text_splitter = SemanticChunker(OpenAIEmbeddings(),breakpoint_threshold_type="standard_deviation",breakpoint_threshold_amount=2.718) | |
| chunks_filtered = text_splitter.create_documents([text_filtered]) | |
| semantic_chunks = [chunk.page_content for chunk in chunks_filtered] | |
| return semantic_chunks | |
| def handle_config(config_path='config.yaml'): | |
| try: | |
| with open(config_path, "r", encoding="utf-8") as f: | |
| config = yaml.safe_load(f) | |
| name = config.get('nom_ouvrage', '') | |
| scenes_indexes = config.get('scenes_par_numero_de_page', []) | |
| writer = config.get('auteur', 'anonyme') | |
| summary = config.get('resume', '') | |
| severite = config.get('severite', 0) # 0: faible, 1: max | |
| scene_specific = config.get('scenes_choisies', None) # None: tout le récit, [1,3,5]: scènes 1, 3 et 5 | |
| last_page = config.get('derniere_page', None ) | |
| ## log config | |
| print("########### Config loaded ###########") | |
| print(f"Loading document {name}") | |
| print(f"Writer: {writer}") | |
| print(f"Scenes: {scenes_indexes}") | |
| print(f"Summary: {summary}") | |
| print(f"Severite: {severite}") | |
| print(f"Scene specific: {scene_specific}") | |
| print(f"Last page: {last_page}") | |
| print("#####################################") | |
| config = { | |
| "name_book": name, | |
| "scenes_indexes": scenes_indexes, | |
| "writer": writer, | |
| "summary": summary, | |
| "severite": severite, | |
| "scene_specific": scene_specific, | |
| "last_page": last_page | |
| } | |
| return config | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return False | |
| config = handle_config() | |
| name_book = config.get('name_book') | |
| scenes_indexes = config.get('scenes_indexes') | |
| writer = config.get('writer') | |
| summary_text = config.get('summary') | |
| severite = config.get('severite') | |
| scene_specific = config.get('scene_specific') | |
| last_page = config.get('last_page') | |
| print("########### Loading document ###########") | |
| pages = load_document(name_book) | |
| print("########### Pages loaded ###########") | |
| print("########### Loading scenes ###########") | |
| scenes = chunk_by_scene(scenes_indexes,pages) | |
| print("########### Scenes loaded ###########") | |
| print("########### Loading chunks ###########") | |
| chunks = chunk_semantic(pages) | |
| print("########### Chunks loaded ###########") | |
| class sphinx_output(BaseModel): | |
| question: str = Field(description="The question to ask the user to test if they read the entire book") | |
| answers: list[str] = Field(description="The possible answers to the question to test if the user read the entire book") | |
| class verify_response_model(BaseModel): | |
| response: str = Field(description="The response from the user to the question") | |
| answers: list[str] = Field(description="The possible answers to the question to test if the user read the entire book") | |
| initial_question: str = Field(description="The question asked to the user to test if they read the entire book") | |
| class verification_score(BaseModel): | |
| score: float = Field(description="The score of the user's response from 0 to 10 to the question") | |
| llm = ChatOpenAI(model="gpt-4o", max_tokens=1000, temperature=0.5) | |
| def split_texts(text : str) -> list[str]: | |
| splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=1000, | |
| chunk_overlap=200, | |
| length_function=len, | |
| is_separator_regex=False, | |
| ) | |
| return splitter.split_text(text) | |
| ######################################################################### | |
| ### PAR ICI , CHOISIR UNE SCENE SPECIFIQUE DANS L'ARGUMENT DE LA FONCTION | |
| def get_random_chunk(scene_specific = scene_specific) : # scene_specific = None signifie qu'on considère tout le récit / [1,3,5] pour avoir la 1 et la 3 et la 5 / [5] pour avoir que la 5 | |
| if scene_specific: | |
| scene_specific_content = [scenes[i-1] for i in scene_specific] | |
| scene_specific_content = " ".join(scene_specific_content) | |
| chunks_scene = split_texts(scene_specific_content) | |
| print(f"Scene {scene_specific} has {len(chunks_scene)} chunks") | |
| print([chunk[0:50] for chunk in chunks_scene]) | |
| print('---') | |
| chunk_chosen = chunks_scene[random.randint(0, len(chunks_scene) - 1)] | |
| print(f"Chosen chunk: {chunk_chosen}") | |
| return chunk_chosen, scene_specific | |
| return chunks[random.randint(0, len(chunks) - 1)],scene_specific | |
| def get_vectorstore(chunks) -> FAISS: | |
| index = faiss.IndexFlatL2(len(embedding.embed_query("hello world"))) | |
| vector_store = FAISS( | |
| embedding_function=embedding, | |
| index=index, | |
| docstore=InMemoryDocstore(), | |
| index_to_docstore_id={}, | |
| ) | |
| print("Adding documents to vector store") | |
| print("Chunks",len(chunks)) | |
| documents = [Document(page_content=chunk) for chunk in chunks] | |
| uuids = [str(uuid4()) for _ in range(len(documents))] | |
| vector_store.add_documents(documents=documents, ids=uuids) | |
| return vector_store | |
| vectore_store = get_vectorstore(chunks) | |
| scenes_vectore_store = get_vectorstore(scenes) | |
| def generate_sphinx_response() -> sphinx_output: | |
| summary = summary_text | |
| excerpt , scene_number = get_random_chunk() | |
| if scene_number: | |
| summary = "scene " + str(scene_number) | |
| prompt = PromptTemplate.from_template(template_sphinx) | |
| structured_llm = llm.with_structured_output(sphinx_output) | |
| # Create an LLM chain with the prompt and the LLM | |
| llm_chain = prompt | structured_llm | |
| return llm_chain.invoke({"writer":writer,"book_name":name_book,"summary":summary,"excerpt":excerpt}) | |
| ############################################################# | |
| ### PAR ICI , CHOISIR LE DEGRE DE SEVERITE DE LA VERIFICATION | |
| def verify_response(response:str,answers:list[str],question:str) -> bool: | |
| prompt = PromptTemplate.from_template(template_verify) | |
| structured_llm = llm.with_structured_output(verification_score) | |
| llm_chain = prompt | structured_llm | |
| score = llm_chain.invoke({"response":response,"answers":answers,"initial_question":question}) | |
| if score.score >= severite: | |
| return True | |
| def retrieve_context_from_vectorestore(query:str) -> str: | |
| retriever = vectore_store.as_retriever(search_type="mmr", search_kwargs={"k": 3}) | |
| return retriever.invoke(query) | |
| def retrieve_context_from_scenes(query:str) -> str: | |
| retriever = scenes_vectore_store.as_retriever(search_kwargs={"k": 1}) | |
| return retriever.invoke(query) | |
| def generate_stream(query:str,messages = [], model = "gpt-4o-mini", max_tokens = 300, temperature = 1,index_name="",stream=True,vector_store=None): | |
| try: | |
| print("init chat") | |
| print("init template") | |
| prompt = PromptTemplate.from_template(template) | |
| summary = summary_text | |
| print("retreiving context") | |
| context = retrieve_context_from_vectorestore(query) | |
| print(f"Context: {context}") | |
| llm_chain = prompt | llm | StrOutputParser() | |
| print("streaming") | |
| if stream: | |
| return llm_chain.stream({"name_book":name_book,"writer":writer,"context":context,"query":query,"summary":summary}) | |
| else: | |
| return llm_chain.invoke({"name_book":name_book,"writer":writer,"context":context,"query":query,"summary":summary}) | |
| except Exception as e: | |
| print(e) | |
| return False | |
| def generate_whatif_stream(question:str,response:str, stream:bool = False) -> str: | |
| try: | |
| prompt = PromptTemplate.from_template(template_whatif) | |
| llm_chain = prompt | llm | StrOutputParser() | |
| print("Enter whatif") | |
| context = retrieve_context_from_scenes(f"question: {question} . reponse : {response}") | |
| print(f"Context: {context}") | |
| if stream: | |
| return llm_chain.stream({"question":question,"response":response,"context":context}) | |
| else: | |
| return llm_chain.invoke({"question":question,"response":response,"context":context}) | |
| except Exception as e: | |
| print(e) | |
| return False | |
| def generate_stream_whatif_chat(query:str,messages = [], model = "gpt-4o-mini", max_tokens = 500, temperature = 1,index_name="",stream=True,vector_store=None): | |
| try: | |
| print("init chat") | |
| print("init template") | |
| prompt = PromptTemplate.from_template(template_whatif_response) | |
| print("retreiving context") | |
| context = retrieve_context_from_vectorestore(query) | |
| print(f"Context: {context}") | |
| llm_chain = prompt | llm | StrOutputParser() | |
| print("streaming") | |
| if stream: | |
| return llm_chain.stream({"name_book":name_book,"writer":writer,"messages":messages,"context":context,"query":query,"summary":summary_text}) | |
| else: | |
| return llm_chain.invoke({"name_book":name_book,"writer":writer,"messages":messages,"context":context,"query":query,"summary":summary_text}) | |
| except Exception as e: | |
| print(e) | |
| return False |