| import streamlit as st |
| from huggingface_hub import InferenceClient |
| import os |
| from typing import Iterator |
| from PIL import Image |
| import pytesseract |
| from PyPDF2 import PdfReader |
| import base64 |
| from together import Together |
|
|
| API_KEY = "2b1197679a1190d14286fe898f6c0024a872c7b72d31c6b33892d486d97b0c1d" |
| if not API_KEY: |
| raise ValueError("API key is missing! Make sure TOGETHER_API_KEY is set in the Secrets.") |
|
|
|
|
| |
| @st.cache_resource |
| def get_client(): |
| |
| |
| |
| |
| return Together(api_key=API_KEY) |
|
|
| def process_file(file) -> str: |
| """Process uploaded file and return its content""" |
| if file is None: |
| return "" |
|
|
| try: |
| |
| if file.type == "application/pdf": |
| text = "" |
| pdf_reader = PdfReader(file) |
| for page in pdf_reader.pages: |
| page_text = page.extract_text() |
| if page_text: |
| text += page_text + "\n" |
| return text |
|
|
| |
| elif file.type.startswith("image/"): |
| return base64.b64encode(file.getvalue()).decode("utf-8") |
|
|
| |
| else: |
| return file.getvalue().decode('utf-8') |
| except Exception as e: |
| return f"Error processing file: {str(e)}" |
|
|
| def generate_response( |
| message: str, |
| history: list[tuple[str, str]], |
| system_message: str, |
| max_tokens: int, |
| temperature: float, |
| top_p: float, |
| files=None |
| ) -> Iterator[str]: |
| client = get_client() |
|
|
| has_images = False |
| formatted_content = message |
| image_content = None |
| image_mime_type = None |
|
|
| if files: |
| for file in files: |
| content = process_file(file) |
| if file.type.startswith("image/"): |
| has_images = True |
| image_content = content |
| image_mime_type = file.type |
| else: |
| formatted_content += f"\n\nContext:\n{file.name}\nContents:\n{content}" |
|
|
|
|
| |
| messages = [{"role": "system", "content": system_message}] |
|
|
| |
| for user_msg, assistant_msg in history: |
| messages.append({"role": "user", "content": user_msg}) |
| messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
| try: |
| if has_images: |
| |
| vision_messages = [{ |
| "role": "user", |
| "content": [ |
| {"type": "text", "text": message}, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:{image_mime_type};base64,{image_content}", |
| }, |
| }, |
| ] |
| }] |
|
|
| stream = client.chat.completions.create( |
| model="meta-llama/Llama-3.2-11B-Vision-Instruct-Turbo", |
| messages=vision_messages, |
| stream=True, |
| ) |
|
|
| else: |
| |
| messages = [{"role": "system", "content": system_message}] |
| |
| |
| for user_msg, assistant_msg in history: |
| messages.append({"role": "user", "content": user_msg}) |
| messages.append({"role": "assistant", "content": assistant_msg}) |
| |
| |
| messages.append({"role": "user", "content": formatted_content}) |
|
|
| stream = client.chat.completions.create( |
| model="deepseek-ai/DeepSeek-R1", |
| messages=messages, |
| max_tokens=max_tokens, |
| temperature=temperature, |
| top_p=top_p, |
| stream=True, |
| ) |
|
|
| |
| for chunk in stream: |
| yield chunk.choices[0].delta.content |
|
|
| except Exception as e: |
| yield f"Error: {str(e)}" |
| def main(): |
| st.set_page_config(page_title="DeepSeek Chat", page_icon="💭", layout="wide") |
| |
| |
| if "messages" not in st.session_state: |
| st.session_state.messages = [] |
|
|
| st.title("DeepSeek/Llama Vision Chat with File Upload") |
| st.markdown("Chat with DeepSeek AI model. You can optionally upload files for the model to analyze.") |
| st.markdown("Feel free to upload images too, in this case Llama Vision will be used") |
|
|
| |
| with st.sidebar: |
| st.header("Settings") |
| system_message = st.text_area( |
| "System Message", |
| value="You are a friendly Chatbot.", |
| height=100 |
| ) |
| max_tokens = st.slider( |
| "Max Tokens", |
| min_value=1, |
| max_value=8192, |
| value=8192, |
| step=1 |
| ) |
| temperature = st.slider( |
| "Temperature", |
| min_value=0.1, |
| max_value=4.0, |
| value=0.0, |
| step=0.1 |
| ) |
| top_p = st.slider( |
| "Top-p (nucleus sampling)", |
| min_value=0.1, |
| max_value=1.0, |
| value=0.95, |
| step=0.05 |
| ) |
| uploaded_file = st.file_uploader( |
| "Upload File (optional)", |
| type=['txt', 'py', 'md', 'swift', 'java', 'js', 'ts', 'rb', 'go', |
| 'php', 'c', 'cpp', 'h', 'hpp', 'cs', 'html', 'css', 'kt', 'svelte', |
| 'pdf', 'png', 'jpg', 'jpeg'], |
| accept_multiple_files=True |
| ) |
|
|
| |
| for message in st.session_state.messages: |
| with st.chat_message(message["role"]): |
| st.write(message["content"]) |
|
|
| |
| if prompt := st.chat_input("What would you like to know?"): |
| |
| st.session_state.messages.append({"role": "user", "content": prompt}) |
| with st.chat_message("user"): |
| st.write(prompt) |
|
|
| |
| with st.chat_message("assistant"): |
| response_placeholder = st.empty() |
| full_response = "" |
| |
| |
| history = [(msg["content"], next_msg["content"]) |
| for msg, next_msg in zip(st.session_state.messages[::2], st.session_state.messages[1::2])] |
| |
| |
| for response_chunk in generate_response( |
| prompt, |
| history, |
| system_message, |
| max_tokens, |
| temperature, |
| top_p, |
| uploaded_file |
| ): |
| full_response += response_chunk |
| print(full_response) |
| response_placeholder.markdown(full_response + "▌") |
| |
| response_placeholder.markdown(full_response) |
| |
| |
| st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|
| if __name__ == "__main__": |
| main() |