import cohere import streamlit as st from streamlit.components.v1 import html from streamlit_extras.stylable_container import stylable_container import re import urllib.parse from langchain_text_splitters import RecursiveCharacterTextSplitter import numpy as np import pypdfium2 as pdfium st.title("Cohere Chat UI") if "api_key" not in st.session_state: api_key = st.text_input("Enter your API Key", type="password") if api_key: if api_key.isascii(): st.session_state.api_key = api_key client = cohere.ClientV2(api_key=api_key) st.rerun() else: st.warning("Please enter your API key correctly.") st.stop() else: st.warning("Please enter your API key to use the app. You can obtain your API key from here: https://dashboard.cohere.com/api-keys") st.stop() else: client = cohere.ClientV2(api_key=st.session_state.api_key) if "messages" not in st.session_state: st.session_state.messages = [] if "rag_file_key" not in st.session_state: st.session_state.rag_file_key = None if "rag_embedded" not in st.session_state: st.session_state.rag_embedded = False text_splitter = RecursiveCharacterTextSplitter( chunk_size=512, chunk_overlap=50, length_function=len, is_separator_regex=False, ) def batch_embed(texts, batch_size=96): all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] response = client.embed( texts=batch, model=embed_model, input_type="search_document", embedding_types=['float'] ) all_embeddings.extend(response.embeddings.float) return all_embeddings def cosine_similarity(a, b): return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)) def get_ai_response(chat_history): st.session_state.is_streaming = True st.session_state.response = "" with st.chat_message("assistant", avatar=st.session_state.assistant_avatar): # RAG if st.session_state.get("rag_chunks") and st.session_state.get("rag_embeddings"): chunks = st.session_state.rag_chunks embeddings = st.session_state.rag_embeddings vector_database = {i: np.array(embedding) for i, embedding in enumerate(embeddings)} query = chat_history[-1]["content"] query_embedding = client.embed(texts=[query], model=embed_model, input_type="search_query", embedding_types=['float']).embeddings.float[0] similarities = [cosine_similarity(query_embedding, chunk) for chunk in embeddings] top_indices = np.argsort(similarities)[::-1][:10] top_chunks_after_retrieval = [chunks[i] for i in top_indices] rerank_response = client.rerank(query=query, documents=top_chunks_after_retrieval, top_n=3, model=rerank_model) top_chunks_after_rerank = [top_chunks_after_retrieval[result.index] for result in rerank_response.results] documents = [{"data": {"title": f"chunk {i}", "snippet": chunk}} for i, chunk in enumerate(top_chunks_after_rerank)] penalty_kwargs = { "frequency_penalty" if penalty_type == "Frequency Penalty" else "presence_penalty": penalty_value } chat_history.insert(0, {"role": "system", "content": preamble}) stream_kwargs = { "messages": chat_history, "model": model, "temperature": temperature, "k": k, "p": p, **penalty_kwargs } if st.session_state.get("rag_text"): stream_kwargs["documents"] = documents stream_kwargs["citation_options"] = {"mode": "OFF"} elif model in ["command-r-08-2024", "command-r-plus-08-2024"]: stream_kwargs["safety_mode"] = "OFF" stream = client.chat_stream(**stream_kwargs) placeholder = st.empty() with stylable_container( key="stop_generating", css_styles=""" button { position: fixed; bottom: 100px; left: 50%; transform: translateX(-50%); z-index: 1; } """, ): st.button("Stop generating") shown_message = "" for chunk in stream: if chunk.type == "content-delta": content = chunk.delta.message.content.text st.session_state.response += content shown_message += content.replace("\n", " \n")\ .replace("<", "\\<")\ .replace(">", "\\>") placeholder.markdown(shown_message) st.session_state.is_streaming = False return st.session_state.response def normalize_code_block(match): return match.group(0).replace(" \n", "\n")\ .replace("\\<", "<")\ .replace("\\>", ">") def normalize_inline(match): return match.group(0).replace("\\<", "<")\ .replace("\\>", ">") code_block_pattern = r"(```.*?```)" inline_pattern = r"`([^`\n]+?)`" def display_messages(): for i, message in enumerate(st.session_state.messages): avatar = st.session_state.user_avatar if message["role"] == "user" else st.session_state.assistant_avatar with st.chat_message(message["role"], avatar=avatar): shown_message = message["content"].replace("\n", " \n")\ .replace("<", "\\<")\ .replace(">", "\\>") if "```" in shown_message: # Replace " \n" with "\n" within code blocks shown_message = re.sub(code_block_pattern, normalize_code_block, shown_message, flags=re.DOTALL) if "`" in shown_message: shown_message = re.sub(inline_pattern, normalize_inline, shown_message) st.markdown(shown_message) col1, col2, col3, col4 = st.columns([1, 1, 1, 1]) with col1: if st.button("Edit", key=f"edit_{i}_{len(st.session_state.messages)}"): st.session_state.edit_index = i st.rerun() with col2: if st.session_state.is_delete_mode and st.button("Delete", key=f"delete_{i}_{len(st.session_state.messages)}"): del st.session_state.messages[i] st.rerun() with col3: text_to_copy = message["content"] # Encode the string to escape text_to_copy_escaped = urllib.parse.quote(text_to_copy) copy_button_html = f""" """ html(copy_button_html, height=50) if i == len(st.session_state.messages) - 1 and message["role"] == "assistant": with col4: if st.button("Retry", key=f"retry_{i}_{len(st.session_state.messages)}"): if len(st.session_state.messages) >= 2: del st.session_state.messages[-1] st.session_state.retry_flag = True st.rerun() if "edit_index" in st.session_state and st.session_state.edit_index == i: with st.form(key=f"edit_form_{i}_{len(st.session_state.messages)}"): new_content = st.text_area("Edit message", height=200, value=st.session_state.messages[i]["content"]) col1, col2 = st.columns([1, 1]) with col1: if st.form_submit_button("Save"): st.session_state.messages[i]["content"] = new_content del st.session_state.edit_index st.rerun() with col2: if st.form_submit_button("Cancel"): del st.session_state.edit_index st.rerun() # Add sidebar for advanced settings with st.sidebar: settings_tab, appearance_tab = st.tabs(["Settings", "Appearance"]) with settings_tab: st.markdown("Help (Japanese): https://rentry.org/9hgneofz") # Copy Conversation History button log_text = "" for message in st.session_state.messages: if message["role"] == "user": log_text += "\n" log_text += message["content"] + "\n\n" else: log_text += "\n" log_text += message["content"] + "\n\n" log_text = log_text.rstrip("\n") # Encode the string to escape log_text_escaped = urllib.parse.quote(log_text) copy_log_button_html = f""" """ html(copy_log_button_html, height=50) if st.session_state.get("is_history_shown") != True: if st.button("Display History as Code Block"): st.session_state.is_history_shown = True st.rerun() else: if st.button("Hide History"): st.session_state.is_history_shown = False st.rerun() st.code(log_text) st.session_state.is_delete_mode = st.toggle("Enable Delete button") st.header("Advanced Settings") model = st.selectbox("Model", options=["command-a-03-2025", "command-r-plus", "command-r", "command-r-plus-08-2024", "command-r-08-2024", ], index=0) preamble = st.text_area("Preamble", height=200) temperature = st.slider("Temperature", min_value=0.0, max_value=1.0, value=0.3, step=0.1) k = st.slider("Top-K", min_value=0, max_value=500, value=0, step=1) p = st.slider("Top-P", min_value=0.01, max_value=0.99, value=0.75, step=0.01) penalty_type = st.selectbox("Penalty Type", options=["Frequency Penalty", "Presence Penalty"]) penalty_value = st.slider("Penalty Value", min_value=0.0, max_value=1.0, value=0.0, step=0.1) st.header("RAG") st.markdown("Select the model and encoding before uploading the file.") rag_model = st.selectbox("RAG Model", options=["Multilingual", "English"], index=0) file_encoding = st.selectbox("Encoding", options=["utf_8", "shift_jis"], index=0) st.session_state.rag_file = st.file_uploader("Choose a txt or pdf file", type=["txt", "pdf"], key="rag_file_uploader") if rag_model == "Multilingual": embed_model = "embed-multilingual-v3.0" rerank_model = "rerank-multilingual-v3.0" else: embed_model = "embed-english-v3.0" rerank_model = "rerank-english-v3.0" if st.session_state.rag_file is not None: if st.session_state.rag_file_key != st.session_state.rag_file: st.session_state.rag_file_key = st.session_state.rag_file st.session_state.rag_embedded = False if "rag_text" in st.session_state: del st.session_state.rag_text if "rag_chunks" in st.session_state: del st.session_state.rag_chunks if "rag_embeddings" in st.session_state: del st.session_state.rag_embeddings if not st.session_state.rag_embedded: if st.session_state.rag_file.type == "application/pdf": pdf = pdfium.PdfDocument(st.session_state.rag_file) st.session_state.rag_text = "" for page in pdf: textpage = page.get_textpage() st.session_state.rag_text += textpage.get_text_range() else: st.session_state.rag_text = st.session_state.rag_file.read().decode(file_encoding) chunks_ = text_splitter.create_documents([st.session_state.rag_text]) chunks = [c.page_content for c in chunks_] embeddings = batch_embed(chunks) st.session_state.rag_chunks = chunks st.session_state.rag_embeddings = embeddings st.session_state.rag_embedded = True else: st.session_state.rag_file_key = None st.session_state.rag_embedded = False if "rag_text" in st.session_state: del st.session_state.rag_text if "rag_chunks" in st.session_state: del st.session_state.rag_chunks if "rag_embeddings" in st.session_state: del st.session_state.rag_embeddings st.header("Restore History") history_input = st.text_area("Paste conversation history:", height=200) if st.button("Restore History"): st.session_state.messages = [] messages = re.split(r"^(|)\n", history_input, flags=re.MULTILINE) role = None text = "" for message in messages: if message.strip() in ["", ""]: if role and text: st.session_state.messages.append({"role": role, "content": text.strip()}) text = "" role = "user" if message.strip() == "" else "assistant" else: text += message if role and text: st.session_state.messages.append({"role": role, "content": text.strip()}) st.rerun() st.header("Clear History") if st.button("Clear Chat History"): st.session_state.messages = [] st.rerun() st.header("Change API Key") new_api_key = st.text_input("Enter new API Key", type="password") if st.button("Update API Key"): if new_api_key and new_api_key.isascii(): st.session_state.api_key = new_api_key client = cohere.ClientV2(api_key=new_api_key) st.success("API Key updated successfully!") else: st.warning("Please enter a valid API Key.") with appearance_tab: st.header("Font Selection") font_options = { "Zen Maru Gothic": "Zen Maru Gothic", "Noto Sans JP": "Noto Sans JP", "Sawarabi Mincho": "Sawarabi Mincho" } selected_font = st.selectbox("Choose a font", ["Default"] + list(font_options.keys())) st.header("Change the font size") st.session_state.font_size = st.slider("Font size", min_value=16.0, max_value=50.0, value=16.0, step=1.0) st.header("Change the user's icon") st.session_state.user_avatar = st.file_uploader("Choose an image", type=["png", "jpg", "jpeg", "webp", "gif", "bmp", "svg",], key="user_avatar_uploader") st.header("Change the assistant's icon") st.session_state.assistant_avatar = st.file_uploader("Choose an image", type=["png", "jpg", "jpeg", "webp", "gif", "bmp", "svg",], key="assistant_avatar_uploader") st.header("Change the icon size") st.session_state.avatar_size = st.slider("Icon size", min_value=2.0, max_value=20.0, value=2.0, step=0.2) # After Stop generating if st.session_state.get("is_streaming"): st.session_state.messages.append({"role": "assistant", "content": st.session_state.response}) st.session_state.is_streaming = False if "retry_flag" in st.session_state and st.session_state.retry_flag: st.session_state.retry_flag = False st.rerun() if selected_font != "Default": with open("style.css") as css: st.markdown(f'', unsafe_allow_html=True) st.markdown(f'', unsafe_allow_html=True) # Change font size st.markdown(f'', unsafe_allow_html=True) # Change icon size # (CSS element names may be subject to change.) # (Contributor: ★31 >>538) AVATAR_SIZE_STYLE = f""" """ st.markdown(AVATAR_SIZE_STYLE, unsafe_allow_html=True) display_messages() # After Retry if st.session_state.get("retry_flag"): if len(st.session_state.messages) > 0: messages = st.session_state.messages.copy() response = get_ai_response(messages) st.session_state.messages.append({"role": "assistant", "content": response}) st.session_state.retry_flag = False st.rerun() else: st.session_state.retry_flag = False if prompt := st.chat_input("Enter your message here..."): st.session_state.messages.append({"role": "user", "content": prompt}) chat_history = st.session_state.messages.copy() shown_message = prompt.replace("\n", " \n")\ .replace("<", "\\<")\ .replace(">", "\\>") with st.chat_message("user", avatar=st.session_state.user_avatar): st.write(shown_message) response = get_ai_response(chat_history) st.session_state.messages.append({"role": "assistant", "content": response}) st.rerun()