Spaces:
Sleeping
Sleeping
| import sys | |
| import os | |
| sys.path.append(os.getcwd()) | |
| from langgraph.graph import START, StateGraph, MessagesState | |
| from langgraph.prebuilt import ToolNode | |
| from langchain_core.messages import SystemMessage | |
| from langgraph.prebuilt import tools_condition | |
| from langchain_core.tools import tool | |
| from src.rag.documents_rag_pipeline import RAGPipeline | |
| from src.rag.youtube_rag_pipeline import YouTubeRAGPipeline | |
| from src.models.llm_wrapper import GeminiWrapper | |
| from src.utils.search_docs_utils import search_relevant_documents | |
| from src.configs.config import LOG_DIR | |
| import logging | |
| import os | |
| # LOGGING SETUP | |
| LOG_FILE = os.path.join(LOG_DIR, "Agents.log") | |
| logging.basicConfig( | |
| filename=LOG_FILE, | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| # PIPELINE INIT | |
| doc_rag_pipeline = RAGPipeline() | |
| ytb_rag_pipeline = YouTubeRAGPipeline() | |
| # TOOLS | |
| def search_legal_documents(query: str) -> str: | |
| """Search Moroccan **legal codes, decrees, and PDFs**. Use this tool when user asks about Moroccan **laws, articles, codes, legal references**, or related content.""" | |
| logging.info(f"Tool call: search_legal_documents with input: {query}") | |
| try: | |
| relevant_docs = search_relevant_documents(query, top_k=1) | |
| results = doc_rag_pipeline.get_top_docs_chunks_for_query(query, relevant_docs['results']) | |
| output = "\n".join([f"{r['text']}\n(Source: {r['pdf_link']})" for r in results]) | |
| logging.info(f"Tool result (search_legal_documents): {output}") | |
| return output | |
| except Exception as e: | |
| logging.error(f"Error in search_legal_documents: {e}") | |
| return "Une erreur est survenue lors de la recherche des documents légaux." | |
| def search_youtube_transcripts(query: str) -> str: | |
| """Search Moroccan **Parliament YouTube videos** (debates, speeches, questions, sessions). Use this tool when user asks about **video content** or **what someone said**.""" | |
| logging.info(f"Tool call: search_youtube_transcripts with input: {query}") | |
| try: | |
| results = ytb_rag_pipeline.search(query) | |
| output = "\n".join([f"{r['texte']}\n(Source: {r['lien']})" for r in results]) | |
| logging.info(f"Tool result (search_youtube_transcripts): {output}") | |
| return output | |
| except Exception as e: | |
| logging.error(f"Error in search_youtube_transcripts: {e}") | |
| return "Une erreur est survenue lors de la recherche dans les transcriptions YouTube." | |
| tools = [search_legal_documents, search_youtube_transcripts] | |
| # LLM SETUP | |
| llm = GeminiWrapper() | |
| llm_with_tools = llm.bind_tools(tools) | |
| # LLM NODE | |
| def assistant(state: MessagesState): | |
| # Utiliser l'état d'invocation global | |
| #invocation_type = invocation_state.invocation_type | |
| user_msg = state["messages"][-1].content | |
| logging.info(f"User input: {user_msg}") | |
| # Créer le SystemMessage selon le type d'invocation | |
| if invocation_state.invocation_type == 'chatbot': | |
| sys_msg = SystemMessage(content=""" | |
| You are a helpful assistant specialized in answering user questions related to Moroccan Parliament YouTube videos and legal documents. | |
| Your response must be strictly in the same language as the user’s query. | |
| Provide accurate answers and include relevant sources (YouTube video links or PDF document links) in your response. | |
| """) | |
| else : | |
| sys_msg = SystemMessage(content=""" | |
| You are a helpful assistant for an API that answers user questions related to Moroccan Parliament YouTube videos and legal documents. | |
| Ensure your responses are concise and formatted appropriately for API output. Your response should be maximum 100 words. | |
| Your response shoud be in json format like that : {"text response" : "", "sources" : ""} | |
| """) | |
| try: | |
| print(state['messages']) | |
| result = llm_with_tools.invoke([sys_msg] + state["messages"]) | |
| logging.info(f"🤖 Model Output : {result}") | |
| return {"messages": [result]} | |
| except Exception as e: | |
| logging.error(f"LLM invocation failed: {e}") | |
| return {"messages": [SystemMessage(content="Une erreur est survenue avec le modèle.")]} | |
| # Tool to set the invocation type based on the user's message | |
| def set_invocation_type(state: MessagesState): | |
| user_msg = state["messages"][-1].content.strip() # Normalize the input | |
| logging.info(f"User input for type update: {user_msg}") | |
| # Set the invocation type based on the presence of 'APICALL' | |
| if 'apicall' in user_msg: | |
| invocation_state.invocation_type = 'API' | |
| else: | |
| invocation_state.invocation_type = 'chatbot' | |
| logging.info(f"Invocation type set to: {invocation_state.invocation_type}") | |
| class InvocationState: | |
| def __init__(self): | |
| self.invocation_type = None | |
| # Créer une instance de l'état d'invocation | |
| invocation_state = InvocationState() | |
| # GRAPH SETUP | |
| builder = StateGraph(MessagesState) | |
| # Add the node to set the invocation type | |
| builder.add_node("set_invocation_type", set_invocation_type) | |
| builder.add_node("llm_assistant", assistant) | |
| builder.add_node("tools", ToolNode(tools)) | |
| # Define the edges of the graph | |
| builder.add_edge(START, "set_invocation_type") | |
| builder.add_edge("set_invocation_type", "llm_assistant") | |
| builder.add_conditional_edges("llm_assistant", tools_condition) | |
| builder.add_edge("tools", "llm_assistant") | |
| rag_graph = builder.compile() | |