Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import asyncio | |
| import streamlit as st | |
| from html import escape | |
| import edge_tts | |
| from groq import Groq | |
| from langchain_groq import ChatGroq | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| # Load API key from environment | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| # ββ Core Bot Logic βββββββββββββββββββββββββββββ | |
| class CodeAssistantBot: | |
| def __init__(self): | |
| self.client = Groq(api_key=GROQ_API_KEY) | |
| self.model = ChatGroq(model="llama-3.3-70b-versatile", temperature=0.6) | |
| self.analysis_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", "You are a skilled coding assistant. Keep answers relevant and concise."), | |
| ("user", "Code: {code}\nOutput: {output}\nError: {error}\n" | |
| "Summary: {summary}\nRecent: {recent}\nQuestion: {question}") | |
| ]) | |
| self.summary_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", "Summarize key technical points from the conversation so far."), | |
| ("user", "Conversation: {conversation}") | |
| ]) | |
| self.voice_prompt = ChatPromptTemplate.from_messages([ | |
| ("system", "You are a friendly narrator. Explain the answer clearly and casually."), | |
| ("user", "Code: {code}\nOutput: {output}\nError: {error}\n" | |
| "Conversation so far: {summary}\nAnswer to explain: {answer}") | |
| ]) | |
| def analyze_code(self, code, output, error, question, summary="", history=None): | |
| parser = StrOutputParser() | |
| recent = "\n".join([f"User: {q}\nBot: {a}" for q, a in (history or [])[-4:]]) | |
| return (self.analysis_prompt | self.model | parser).invoke({ | |
| 'code': code, 'output': output, 'error': error, | |
| 'summary': summary, 'recent': recent, 'question': question | |
| }) | |
| def narrate_response(self, code, output, error, answer, summary=""): | |
| parser = StrOutputParser() | |
| return (self.voice_prompt | self.model | parser).invoke({ | |
| 'code': code, 'output': output, 'error': error, | |
| 'summary': summary, 'answer': answer | |
| }) | |
| # ββ Text to Speech βββββββββββββββββββββββββββββ | |
| async def text_to_speech(text, filename): | |
| voice = "fr-FR-VivienneMultilingualNeural" | |
| communicator = edge_tts.Communicate(text, voice) | |
| await communicator.save(filename) | |
| # ββ Chat UI Logic ββββββββββββββββββββββββββββββ | |
| def render_chatbot(code, output, error): | |
| st.markdown(""" | |
| <style> | |
| .chat-container { | |
| max-height: 60vh; | |
| overflow-y: auto; | |
| padding: 1rem 0.5rem 1rem 1rem; | |
| border: 1px solid #ddd; | |
| border-radius: 8px; | |
| background-color: #f9f9f9; | |
| } | |
| .chat-message { | |
| margin-bottom: 1rem; | |
| word-wrap: break-word; | |
| } | |
| .user-message { | |
| font-weight: bold; | |
| color: #1a73e8; | |
| } | |
| .bot-message pre { | |
| background-color: #f0f0f0; | |
| padding: 0.5rem; | |
| border-radius: 5px; | |
| overflow-x: auto; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Session setup | |
| st.session_state.setdefault('conversation', []) | |
| st.session_state.setdefault('chat_summary', "") | |
| st.session_state.setdefault('chat_display_count', 5) | |
| st.session_state.setdefault('narrated_audio', {}) | |
| # Input row | |
| col1, col2 = st.columns([4, 1]) | |
| with col1: | |
| question = st.text_input("Ask something about your code...", key="chat_input") | |
| with col2: | |
| send = st.button("π") | |
| # Bot response | |
| if send and question: | |
| bot = CodeAssistantBot() | |
| history = st.session_state.conversation[-4:] | |
| summary = st.session_state.chat_summary | |
| answer = bot.analyze_code(code, output, error, question, summary, history) | |
| st.session_state.conversation.append((question, answer)) | |
| st.session_state.chat_display_count = 5 | |
| if len(st.session_state.conversation) >= 3: | |
| try: | |
| full_chat = "\n".join([f"User: {q}\nBot: {a}" for q, a in st.session_state.conversation[-10:]]) | |
| summarizer = bot.summary_prompt | bot.model | StrOutputParser() | |
| st.session_state.chat_summary = summarizer.invoke({'conversation': full_chat}) | |
| except: | |
| pass | |
| # Display messages | |
| visible = list(reversed(st.session_state.conversation[-st.session_state.chat_display_count:])) | |
| for idx, (q, a) in enumerate(visible): | |
| st.markdown(f'<div class="chat-message user-message">{escape(q)}</div>', unsafe_allow_html=True) | |
| def format_response(text): | |
| parts = text.split("```") | |
| result = "" | |
| for i, part in enumerate(parts): | |
| if i % 2 == 1: | |
| lines = part.splitlines() | |
| if lines and lines[0].isalpha(): | |
| lines = lines[1:] | |
| result += f'<pre><code>{escape("\\n".join(lines))}</code></pre>' | |
| else: | |
| result += escape(part) | |
| return result | |
| st.markdown(f'<div class="chat-message bot-message">{format_response(a)}</div>', unsafe_allow_html=True) | |
| # Narration logic | |
| audio_file = st.session_state.narrated_audio.get((q, a)) | |
| if not audio_file: | |
| if st.button("π Narrate", key=f"narrate_{idx}"): | |
| status = st.empty() | |
| status.info("π§ Generating narration...") | |
| bot = CodeAssistantBot() | |
| narration = bot.narrate_response(code, output, error, a, st.session_state.chat_summary) | |
| status.info("ποΈ Converting to audio...") | |
| audio_file = f"audio_{uuid.uuid4().hex}.mp3" | |
| asyncio.run(text_to_speech(narration, audio_file)) | |
| st.session_state.narrated_audio[(q, a)] = audio_file | |
| status.success("π Narration ready!") | |
| st.audio(audio_file, format="audio/mp3", autoplay=True) | |
| else: | |
| st.audio(audio_file, format="audio/mp3", autoplay=False) | |
| if len(visible) < len(st.session_state.conversation): | |
| if st.button("π½ Show more"): | |
| st.session_state.chat_display_count += 5 | |
| st.rerun() | |
| # Auto-scroll & pause others on audio play | |
| st.markdown(""" | |
| <script> | |
| const container = window.parent.document.querySelector('.chat-container'); | |
| if (container) container.scrollTop = container.scrollHeight; | |
| document.querySelectorAll('audio').forEach(audio => { | |
| audio.addEventListener('play', function () { | |
| document.querySelectorAll('audio').forEach(a => { | |
| if (a !== this) a.pause(); | |
| }); | |
| }); | |
| }); | |
| </script> | |
| """, unsafe_allow_html=True) | |