Spaces:
Running
Running
| import os | |
| import base64 | |
| import requests | |
| import gradio as gr | |
| from huggingface_hub import InferenceClient | |
| from dataclasses import dataclass | |
| import speech_recognition as sr | |
| import easyocr | |
| from PIL import Image | |
| class ChatMessage: | |
| role: str | |
| content: str | |
| def to_dict(self): | |
| return {"role": self.role, "content": self.content} | |
| class XylariaChat: | |
| def __init__(self): | |
| self.hf_token = os.getenv("HF_TOKEN") | |
| if not self.hf_token: | |
| raise ValueError("HuggingFace token not found in environment variables") | |
| self.client = InferenceClient( | |
| model="Qwen/QwQ-32B-Preview", | |
| api_key=self.hf_token | |
| ) | |
| self.image_api_url = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-large" | |
| self.image_api_headers = {"Authorization": f"Bearer {self.hf_token}"} | |
| self.conversation_history = [] | |
| self.persistent_memory = {} | |
| self.system_prompt = """You are a helpful and harmless assistant. You are Xylaria developed by Sk Md Saad Amin . You should think step-by-step.""" | |
| self.reader = easyocr.Reader(['ch_sim','en'], gpu=False) | |
| def store_information(self, key, value): | |
| self.persistent_memory[key] = value | |
| return f"Stored: {key} = {value}" | |
| def retrieve_information(self, key): | |
| return self.persistent_memory.get(key, "No information found for this key.") | |
| def reset_conversation(self): | |
| self.conversation_history = [] | |
| self.persistent_memory.clear() | |
| try: | |
| self.client = InferenceClient( | |
| model="Qwen/QwQ-32B-Preview", | |
| api_key=self.hf_token | |
| ) | |
| except Exception as e: | |
| print(f"Error resetting API client: {e}") | |
| return None | |
| def caption_image(self, image): | |
| try: | |
| if isinstance(image, str) and os.path.isfile(image): | |
| with open(image, "rb") as f: | |
| data = f.read() | |
| elif isinstance(image, str): | |
| if image.startswith('data:image'): | |
| image = image.split(',')[1] | |
| data = base64.b64decode(image) | |
| else: | |
| data = image.read() | |
| response = requests.post( | |
| self.image_api_url, | |
| headers=self.image_api_headers, | |
| data=data | |
| ) | |
| if response.status_code == 200: | |
| caption = response.json()[0].get('generated_text', 'No caption generated') | |
| return caption | |
| else: | |
| return f"Error captioning image: {response.status_code} - {response.text}" | |
| except Exception as e: | |
| return f"Error processing image: {str(e)}" | |
| def perform_math_ocr(self, image_path): | |
| try: | |
| img = Image.open(image_path) | |
| result = self.reader.readtext(image_path) | |
| text = ' '.join([item[1] for item in result]) | |
| return text.strip() | |
| except Exception as e: | |
| return f"Error during Math OCR: {e}" | |
| def get_response(self, user_input, image=None): | |
| try: | |
| messages = [] | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=self.system_prompt | |
| ).to_dict()) | |
| if self.persistent_memory: | |
| memory_context = "Remembered Information:\n" + "\n".join( | |
| [f"{k}: {v}" for k, v in self.persistent_memory.items()] | |
| ) | |
| messages.append(ChatMessage( | |
| role="system", | |
| content=memory_context | |
| ).to_dict()) | |
| for msg in self.conversation_history: | |
| messages.append(msg) | |
| if image: | |
| image_caption = self.caption_image(image) | |
| user_input = f"description of an image: {image_caption}\n\nUser's message about it: {user_input}" | |
| messages.append(ChatMessage( | |
| role="user", | |
| content=user_input | |
| ).to_dict()) | |
| input_tokens = sum(len(msg['content'].split()) for msg in messages) | |
| max_new_tokens = 16384 - input_tokens - 50 | |
| max_new_tokens = min(max_new_tokens, 10020) | |
| stream = self.client.chat_completion( | |
| messages=messages, | |
| model="Qwen/QwQ-32B-Preview", | |
| temperature=0.7, | |
| max_tokens=max_new_tokens, | |
| top_p=0.9, | |
| stream=True | |
| ) | |
| return stream | |
| except Exception as e: | |
| print(f"Detailed error in get_response: {e}") | |
| return f"Error generating response: {str(e)}" | |
| def messages_to_prompt(self, messages): | |
| prompt = "" | |
| for msg in messages: | |
| if msg["role"] == "system": | |
| prompt += f"<|system|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "user": | |
| prompt += f"<|user|>\n{msg['content']}<|end|>\n" | |
| elif msg["role"] == "assistant": | |
| prompt += f"<|assistant|>\n{msg['content']}<|end|>\n" | |
| prompt += "<|assistant|>\n" | |
| return prompt | |
| def recognize_speech(self, audio_file): | |
| recognizer = sr.Recognizer() | |
| try: | |
| with sr.AudioFile(audio_file) as source: | |
| audio_data = recognizer.record(source) | |
| text = recognizer.recognize_google(audio_data) | |
| return text | |
| except sr.UnknownValueError: | |
| return "Could not understand audio" | |
| except sr.RequestError: | |
| return "Could not request results from Google Speech Recognition service" | |
| def create_interface(self): | |
| def streaming_response(message, chat_history, image_filepath, math_ocr_image_path, audio_file): | |
| if audio_file: | |
| voice_message = self.recognize_speech(audio_file) | |
| if not voice_message.startswith("Error"): | |
| message = voice_message | |
| ocr_text = "" | |
| if math_ocr_image_path: | |
| ocr_text = self.perform_math_ocr(math_ocr_image_path) | |
| if ocr_text.startswith("Error"): | |
| updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ocr_text}]] | |
| yield "", updated_history, None, None, None | |
| return | |
| elif len(ocr_text) > 500: | |
| ocr_text = "OCR output is too large to be processed." | |
| updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ocr_text}]] | |
| yield "", updated_history, None, None, None | |
| return | |
| else: | |
| message = f"Math OCR Result: {ocr_text}\n\nUser's message: {message}" | |
| if image_filepath: | |
| response_stream = self.get_response(message, image_filepath) | |
| else: | |
| response_stream = self.get_response(message) | |
| if isinstance(response_stream, str): | |
| updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": response_stream}]] | |
| yield "", updated_history, None, None, None | |
| return | |
| full_response = "" | |
| updated_history = chat_history + [[{"role": "user", "content": message}, {"role": "assistant", "content": ""}]] | |
| try: | |
| for chunk in response_stream: | |
| if chunk.choices and chunk.choices[0].delta and chunk.choices[0].delta.content: | |
| chunk_content = chunk.choices[0].delta.content | |
| full_response += chunk_content | |
| updated_history[-1][1]["content"] = full_response | |
| yield "", updated_history, None, None, None | |
| except Exception as e: | |
| print(f"Streaming error: {e}") | |
| updated_history[-1][1]["content"] = f"Error during response: {e}" | |
| yield "", updated_history, None, None, None | |
| return | |
| self.conversation_history.append(ChatMessage(role="user", content=message).to_dict()) | |
| self.conversation_history.append(ChatMessage(role="assistant", content=full_response).to_dict()) | |
| if len(self.conversation_history) > 10: | |
| self.conversation_history = self.conversation_history[-10:] | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); | |
| body, .gradio-container { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .chatbot-container .message { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .gradio-container input, | |
| .gradio-container textarea, | |
| .gradio-container button { | |
| font-family: 'Inter', sans-serif !important; | |
| } | |
| .image-container { | |
| display: flex; | |
| gap: 10px; | |
| margin-bottom: 10px; | |
| } | |
| .image-upload { | |
| border: 1px solid #ccc; | |
| border-radius: 8px; | |
| padding: 10px; | |
| background-color: #f8f8f8; | |
| } | |
| .image-preview { | |
| max-width: 200px; | |
| max-height: 200px; | |
| border-radius: 8px; | |
| } | |
| .clear-button { | |
| display: none; | |
| } | |
| .chatbot-container .message { | |
| opacity: 0; | |
| animation: fadeIn 0.5s ease-in-out forwards; | |
| } | |
| @keyframes fadeIn { | |
| from { | |
| opacity: 0; | |
| transform: translateY(20px); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0); | |
| } | |
| } | |
| .gradio-accordion { | |
| overflow: hidden; | |
| transition: max-height 0.3s ease-in-out; | |
| max-height: 0; | |
| } | |
| .gradio-accordion.open { | |
| max-height: 500px; | |
| } | |
| """ | |
| with gr.Blocks(theme='soft', css=custom_css) as demo: | |
| with gr.Column(): | |
| chatbot = gr.Chatbot( | |
| label="Xylaria 1.5 Senoa (EXPERIMENTAL)", | |
| height=500, | |
| show_copy_button=True, | |
| type='messages' | |
| ) | |
| with gr.Accordion("Image Input", open=False) as accordion: | |
| with gr.Row(elem_classes="image-container"): | |
| with gr.Column(elem_classes="image-upload"): | |
| img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Column(elem_classes="image-upload"): | |
| math_ocr_img = gr.Image( | |
| sources=["upload", "webcam"], | |
| type="filepath", | |
| label="Upload Image for Math OCR", | |
| elem_classes="image-preview" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Type your message...", | |
| container=False | |
| ) | |
| with gr.Column(scale=1): | |
| audio_input = gr.Audio( | |
| sources=["microphone"], | |
| type="filepath", | |
| label="Voice Input" | |
| ) | |
| btn = gr.Button("Send", scale=1) | |
| with gr.Row(): | |
| clear = gr.Button("Clear Conversation") | |
| clear_memory = gr.Button("Clear Memory") | |
| btn.click( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img, audio_input], | |
| outputs=[txt, chatbot, img, math_ocr_img, audio_input] | |
| ) | |
| txt.submit( | |
| fn=streaming_response, | |
| inputs=[txt, chatbot, img, math_ocr_img, audio_input], | |
| outputs=[txt, chatbot, img, math_ocr_img, audio_input] | |
| ) | |
| clear.click( | |
| fn=lambda: None, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| clear_memory.click( | |
| fn=self.reset_conversation, | |
| inputs=None, | |
| outputs=[chatbot], | |
| queue=False | |
| ) | |
| demo.load(None, None, None, _js=""" | |
| () => { | |
| const accordion = document.querySelector(".gradio-accordion"); | |
| if (accordion) { | |
| const accordionHeader = accordion.querySelector(".label-wrap"); | |
| accordionHeader.addEventListener("click", () => { | |
| accordion.classList.toggle("open"); | |
| }); | |
| } | |
| } | |
| """) | |
| demo.load(self.reset_conversation, None, None) | |
| return demo | |
| def main(): | |
| chat = XylariaChat() | |
| interface = chat.create_interface() | |
| interface.launch( | |
| share=False, | |
| debug=True | |
| ) | |
| if __name__ == "__main__": | |
| main() |