| | |
| | import chainlit as cl |
| | from agent import make_graph |
| |
|
| | from langchain_google_genai import ChatGoogleGenerativeAI |
| | from langchain_core.messages import AIMessageChunk, HumanMessage |
| |
|
| | from chainlit.input_widget import Select, Slider |
| |
|
| | from typing import Optional |
| | import os, uuid, base64 |
| | from dotenv import load_dotenv |
| |
|
| | _ : bool = load_dotenv() |
| |
|
| | |
| | async def process_image(image: cl.Image): |
| | """ |
| | Processes an image file, reads its data, and converts it to a base64 encoded string. |
| | """ |
| | try: |
| | with open(image.path, "rb") as image_file: |
| | image_data = image_file.read() |
| | base64_image = base64.b64encode(image_data).decode("utf-8") |
| | return { |
| | "type": "image_url", |
| | "image_url": { |
| | "url": f"data:image/{image.mime.split('/')[-1]};base64,{base64_image}" |
| | } |
| | } |
| | except Exception as e: |
| | print(f"Error reading image file: {e}") |
| | return {"type": "text", "text": f"Error processing image {image.name}."} |
| |
|
| | |
| | |
| | |
| | @cl.oauth_callback |
| | def oauth_callback( |
| | provider_id: str, |
| | token: str, |
| | raw_user_data: dict[str, str], |
| | default_user: cl.User, |
| | ) -> Optional[cl.User]: |
| |
|
| | return default_user |
| |
|
| | |
| | |
| | |
| | @cl.set_starters |
| | async def set_starters(): |
| | return [ |
| | cl.Starter( |
| | label="LangGraph Agent Creation", |
| | message="Create an Agent in LangGraph which can search the web using Tavily.", |
| | icon="/public/msg_icons/chatbot.png", |
| | ), |
| |
|
| | cl.Starter( |
| | label="Explain MCP", |
| | message="Explain Model Context Protocol (MCP) to a non-tech person.", |
| | icon="/public/msg_icons/usb.png", |
| | ), |
| | cl.Starter( |
| | label="Composio Tools Integration", |
| | message="How can I connect Composio tools to my agent?", |
| | icon="/public/msg_icons/tools.png", |
| | ), |
| |
|
| | ] |
| |
|
| | |
| | |
| | |
| | @cl.set_chat_profiles |
| | async def chat_profile(): |
| | return [ |
| | cl.ChatProfile( |
| | name="Agent Mode", |
| | markdown_description= "Ideal for complex tasks like brainstorming, code generation, and web apps creation." |
| |
|
| | ), |
| | cl.ChatProfile( |
| | name="Chat Mode", |
| | markdown_description="Suited for quick information retrieval and answering questions from the provided documentations." |
| |
|
| | ), |
| | ] |
| |
|
| | |
| | |
| | |
| | @cl.on_chat_start |
| | async def on_chat_start(): |
| | thread_id = f"thread-{uuid.uuid4()}" |
| | |
| | cl.user_session.set("thread_id", thread_id) |
| |
|
| | |
| | settings = await cl.ChatSettings( |
| | [ |
| | Select( |
| | id="model", |
| | label="Gemini - Model", |
| | values=[ |
| | "gemini-2.5-flash", |
| | "gemini-2.5-pro", |
| | "gemini-2.5-flash-lite" |
| | ], |
| | initial_index=0, |
| | ), |
| | Slider( |
| | id="temperature", |
| | label="Temperature", |
| | initial=1, |
| | min=0, |
| | max=2, |
| | step=0.1, |
| | ), |
| | ] |
| | ).send() |
| |
|
| | |
| | model = ChatGoogleGenerativeAI( |
| | model=settings["model"], |
| | api_key=os.getenv("GOOGLE_API_KEY"), |
| | temperature=settings["temperature"] |
| | ) |
| |
|
| | |
| | cl.user_session.set("model", model) |
| | cl.user_session.set("temperature", settings["temperature"]) |
| |
|
| | |
| | |
| | |
| | @cl.on_settings_update |
| | async def on_settings_update(settings: dict): |
| | |
| | cl.user_session.set("model", settings.get("model")) |
| | cl.user_session.set("temperature", settings.get("temperature")) |
| |
|
| | |
| | |
| | |
| | @cl.on_message |
| | async def on_message(message: cl.Message): |
| | thread_id = cl.user_session.get("thread_id") |
| | config = {"configurable": {"thread_id": thread_id}} |
| |
|
| | |
| | model = cl.user_session.get("model") |
| | answer_mode = cl.user_session.get("chat_profile", "Agent Mode") |
| |
|
| | |
| | content = [] |
| |
|
| | |
| | if message.content: |
| | content.append({"type": "text", "text": message.content}) |
| | |
| | |
| | image_elements = [element for element in message.elements if "image" in element.mime] |
| | for image in image_elements: |
| | if image.path: |
| | content.append(await process_image(image)) |
| | else: |
| | print(f"Image {image.name} has no content and no path.") |
| | content.append({"type": "text", "text": f"Image {image.name} could not be processed."}) |
| | |
| | msg = cl.Message(content="") |
| |
|
| | try: |
| | async with make_graph(model, answer_mode) as agent: |
| | async for stream, _ in agent.astream( |
| | {"messages": HumanMessage(content=content)}, |
| | config=config, |
| | stream_mode="messages" |
| | ): |
| |
|
| | if isinstance(stream, AIMessageChunk) and stream.content: |
| | await msg.stream_token(stream.content.replace("```", "\n```")) |
| | await msg.send() |
| |
|
| | except Exception as e: |
| | await cl.Message(content=f"Error during agent invocation: {e}").send() |
| |
|