| |
| import os, time |
| from openai import AsyncOpenAI |
| import chainlit as cl |
| import re |
| import requests |
| from io import BytesIO |
| from chainlit.element import ElementBased |
| from groq import Groq |
|
|
| |
| from concurrent.futures import ThreadPoolExecutor |
|
|
| client = AsyncOpenAI(base_url="https://api.groq.com/openai/v1", api_key=os.getenv("GROQ_API_KEY")) |
|
|
| |
| cl.instrument_openai() |
|
|
| settings = { |
| "model": "llama3-8b-8192", |
| "temperature": 0.5, |
| "max_tokens": 500, |
| "top_p": 1, |
| "frequency_penalty": 0, |
| "presence_penalty": 0, |
| } |
|
|
| def extract_urls(text): |
| url_pattern = re.compile(r'(https?://\S+)') |
| return url_pattern.findall(text) |
|
|
| def crawl_url(url): |
| data = { |
| "urls": [url], |
| "include_raw_html": True, |
| "word_count_threshold": 10, |
| "extraction_strategy": "NoExtractionStrategy", |
| "chunking_strategy": "RegexChunking" |
| } |
| response = requests.post("https://crawl4ai.com/crawl", json=data) |
| response_data = response.json() |
| response_data = response_data['results'][0] |
| return response_data['markdown'] |
|
|
| @cl.on_chat_start |
| async def on_chat_start(): |
| cl.user_session.set("session", { |
| "history": [], |
| "context": {} |
| }) |
| await cl.Message( |
| content="Welcome to the chat! How can I assist you today?" |
| ).send() |
|
|
| @cl.on_message |
| async def on_message(message: cl.Message): |
| user_session = cl.user_session.get("session") |
| |
| |
| urls = extract_urls(message.content) |
| |
| |
| futures = [] |
| with ThreadPoolExecutor() as executor: |
| for url in urls: |
| futures.append(executor.submit(crawl_url, url)) |
|
|
| results = [future.result() for future in futures] |
|
|
| for url, result in zip(urls, results): |
| ref_number = f"REF_{len(user_session['context']) + 1}" |
| user_session["context"][ref_number] = { |
| "url": url, |
| "content": result |
| } |
|
|
|
|
| user_session["history"].append({ |
| "role": "user", |
| "content": message.content |
| }) |
|
|
| |
| context_messages = [ |
| f'<appendix ref="{ref}">\n{data["content"]}\n</appendix>' |
| for ref, data in user_session["context"].items() |
| ] |
| if context_messages: |
| system_message = { |
| "role": "system", |
| "content": ( |
| "You are a helpful bot. Use the following context for answering questions. " |
| "Refer to the sources using the REF number in square brackets, e.g., [1], only if the source is given in the appendices below.\n\n" |
| "If the question requires any information from the provided appendices or context, refer to the sources. " |
| "If not, there is no need to add a references section. " |
| "At the end of your response, provide a reference section listing the URLs and their REF numbers only if sources from the appendices were used.\n\n" |
| "\n\n".join(context_messages) |
| ) |
| } |
| else: |
| system_message = { |
| "role": "system", |
| "content": "You are a helpful assistant." |
| } |
|
|
|
|
| msg = cl.Message(content="") |
| await msg.send() |
|
|
| |
| stream = await client.chat.completions.create( |
| messages=[ |
| system_message, |
| *user_session["history"] |
| ], |
| stream=True, |
| **settings |
| ) |
|
|
| assistant_response = "" |
| async for part in stream: |
| if token := part.choices[0].delta.content: |
| assistant_response += token |
| await msg.stream_token(token) |
|
|
| |
| user_session["history"].append({ |
| "role": "assistant", |
| "content": assistant_response |
| }) |
| await msg.update() |
|
|
| |
| reference_section = "\n\nReferences:\n" |
| for ref, data in user_session["context"].items(): |
| reference_section += f"[{ref.split('_')[1]}]: {data['url']}\n" |
|
|
| msg.content += reference_section |
| await msg.update() |
|
|
|
|
| @cl.on_audio_chunk |
| async def on_audio_chunk(chunk: cl.AudioChunk): |
| if chunk.isStart: |
| buffer = BytesIO() |
| |
| buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}" |
| |
| cl.user_session.set("audio_buffer", buffer) |
| cl.user_session.set("audio_mime_type", chunk.mimeType) |
|
|
| |
| cl.user_session.get("audio_buffer").write(chunk.data) |
|
|
| pass |
|
|
| @cl.step(type="tool") |
| async def speech_to_text(audio_file): |
| cli = Groq() |
| |
| response = await client.audio.transcriptions.create( |
| model="whisper-large-v3", file=audio_file |
| ) |
|
|
| return response.text |
|
|
|
|
| @cl.on_audio_end |
| async def on_audio_end(elements: list[ElementBased]): |
| |
| audio_buffer: BytesIO = cl.user_session.get("audio_buffer") |
| audio_buffer.seek(0) |
| audio_file = audio_buffer.read() |
| audio_mime_type: str = cl.user_session.get("audio_mime_type") |
| |
| start_time = time.time() |
| whisper_input = (audio_buffer.name, audio_file, audio_mime_type) |
| transcription = await speech_to_text(whisper_input) |
| end_time = time.time() |
| print(f"Transcription took {end_time - start_time} seconds") |
| |
| user_msg = cl.Message( |
| author="You", |
| type="user_message", |
| content=transcription |
| ) |
| await user_msg.send() |
| await on_message(user_msg) |
|
|
|
|
| if __name__ == "__main__": |
| from chainlit.cli import run_chainlit |
| run_chainlit(__file__) |
|
|
|
|
|
|