| from __future__ import annotations |
| from typing import TYPE_CHECKING, Any, Callable, Dict, List, Tuple, Type |
| import logging |
| import json |
| import os |
| import datetime |
| import hashlib |
| import csv |
| import requests |
| import re |
| import html |
| import markdown2 |
| import torch |
| import sys |
| import gc |
| from pygments.lexers import guess_lexer, ClassNotFound |
|
|
| import gradio as gr |
| from pypinyin import lazy_pinyin |
| import tiktoken |
| import mdtex2html |
| from markdown import markdown |
| from pygments import highlight |
| from pygments.lexers import guess_lexer,get_lexer_by_name |
| from pygments.formatters import HtmlFormatter |
|
|
| from langchain.chains import LLMChain, RetrievalQA |
| from langchain.chat_models import ChatOpenAI |
| from langchain.document_loaders import PyPDFLoader, WebBaseLoader, UnstructuredWordDocumentLoader, DirectoryLoader |
| from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader |
| from langchain.document_loaders.generic import GenericLoader |
| from langchain.document_loaders.parsers import OpenAIWhisperParser |
| from langchain.schema import AIMessage, HumanMessage |
| from langchain.llms import HuggingFaceHub |
| from langchain.llms import HuggingFaceTextGenInference |
| from langchain.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings |
|
|
| from langchain.embeddings.openai import OpenAIEmbeddings |
| from langchain.prompts import PromptTemplate |
| from langchain.text_splitter import RecursiveCharacterTextSplitter |
| from langchain.vectorstores import Chroma |
| from chromadb.errors import InvalidDimensionException |
|
|
|
|
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] [%(filename)s:%(lineno)d] %(message)s", |
| ) |
|
|
| |
| |
| |
| |
| |
| def create_directory_loader(file_type, directory_path): |
| |
| loaders = { |
| '.pdf': PyPDFLoader, |
| '.word': UnstructuredWordDocumentLoader, |
| } |
| return DirectoryLoader( |
| path=directory_path, |
| glob=f"**/*{file_type}", |
| loader_cls=loaders[file_type], |
| ) |
| |
| |
| def document_loading_splitting(): |
| global splittet |
| |
| |
| docs = [] |
| |
| |
| pdf_loader = create_directory_loader('.pdf', './chroma/pdf') |
| word_loader = create_directory_loader('.word', './chroma/word') |
| |
| |
| |
| pdf_documents = pdf_loader.load() |
| word_documents = word_loader.load() |
|
|
| |
| docs.extend(pdf_documents) |
| docs.extend(word_documents) |
|
|
| |
| |
| loader = PyPDFLoader(PDF_URL) |
| docs.extend(loader.load()) |
| |
| loader = WebBaseLoader(WEB_URL) |
| docs.extend(loader.load()) |
| |
| loader = GenericLoader(YoutubeAudioLoader([YOUTUBE_URL_1,YOUTUBE_URL_2], PATH_WORK + YOUTUBE_DIR), OpenAIWhisperParser()) |
| docs.extend(loader.load()) |
| |
| |
| text_splitter = RecursiveCharacterTextSplitter(chunk_overlap = 150, chunk_size = 1500) |
| splits = text_splitter.split_documents(docs) |
| |
| |
| splittet = True |
| return splits |
|
|
| |
| |
| def document_storage_chroma(splits): |
| |
| Chroma.from_documents(documents = splits, embedding = OpenAIEmbeddings(disallowed_special = ()), persist_directory = PATH_WORK + CHROMA_DIR) |
|
|
| |
| |
| |
| |
| def document_storage_mongodb(splits): |
| MongoDBAtlasVectorSearch.from_documents(documents = splits, |
| embedding = OpenAIEmbeddings(disallowed_special = ()), |
| collection = MONGODB_COLLECTION, |
| index_name = MONGODB_INDEX_NAME) |
| |
| |
| def document_retrieval_chroma(llm, prompt): |
| |
| embeddings = OpenAIEmbeddings() |
|
|
| |
| |
| |
| |
| |
|
|
| |
| db = Chroma(embedding_function = embeddings, persist_directory = PATH_WORK + CHROMA_DIR) |
| return db |
|
|
| |
| |
| |
| def document_retrieval_chroma2(): |
| |
| embeddings = OpenAIEmbeddings() |
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| |
| db = Chroma(embedding_function = embeddings, persist_directory = PATH_WORK + CHROMA_DIR) |
| print ("Chroma DB bereit ...................") |
| |
| return db |
| |
| |
| |
| def document_retrieval_mongodb(llm, prompt): |
| db = MongoDBAtlasVectorSearch.from_connection_string(MONGODB_URI, |
| MONGODB_DB_NAME + "." + MONGODB_COLLECTION_NAME, |
| OpenAIEmbeddings(disallowed_special = ()), |
| index_name = MONGODB_INDEX_NAME) |
| return db |
|
|
| |
| |
| |
| |
| def llm_chain(llm, prompt): |
| llm_chain = LLMChain(llm = llm, prompt = LLM_CHAIN_PROMPT) |
| result = llm_chain.run({"question": prompt}) |
| return result |
|
|
| |
| |
| def rag_chain(llm, prompt, db): |
| rag_chain = RetrievalQA.from_chain_type(llm, |
| chain_type_kwargs = {"prompt": RAG_CHAIN_PROMPT}, |
| retriever = db.as_retriever(search_kwargs = {"k": 3}), |
| return_source_documents = True) |
| result = rag_chain({"query": prompt}) |
| return result["result"] |
|
|
| |
| |
| |
| |
| def rag_chain2(prompt, db, k=3): |
| rag_template = "Nutze die folgenden Kontext Teile am Ende, um die Frage zu beantworten . " + template + "Frage: " + prompt + "Kontext Teile: " |
| retrieved_chunks = db.similarity_search(prompt, k) |
|
|
| neu_prompt = rag_template |
| for i, chunk in enumerate(retrieved_chunks): |
| neu_prompt += f"{i+1}. {chunk}\n" |
|
|
| return neu_prompt |
|
|
| |
| |
| |
| |
| def generate_prompt_with_history(text, history, max_length=4048): |
| |
| |
| prompt="" |
| history = ["\n{}\n{}".format(x[0],x[1]) for x in history] |
| history.append("\n{}\n".format(text)) |
| history_text = "" |
| flag = False |
| for x in history[::-1]: |
| history_text = x + history_text |
| flag = True |
| print ("Prompt: ..........................") |
| print(prompt+history_text) |
| if flag: |
| return prompt+history_text |
| else: |
| return None |
|
|
| |
| |
| def generate_prompt_with_history_openai(prompt, history): |
| history_openai_format = [] |
| for human, assistant in history: |
| history_openai_format.append({"role": "user", "content": human }) |
| history_openai_format.append({"role": "assistant", "content":assistant}) |
|
|
| history_openai_format.append({"role": "user", "content": prompt}) |
| print("openai history und prompt................") |
| print(history_openai_format) |
| return history_openai_format |
|
|
| |
| |
| def generate_prompt_with_history_hf(prompt, history): |
| history_transformer_format = history + [[prompt, ""]] |
| |
|
|
| messages = "".join(["".join(["\n<human>:"+item[0], "\n<bot>:"+item[1]]) |
| for item in history_transformer_format]) |
|
|
| |
| |
| def generate_prompt_with_history_langchain(prompt, history): |
| history_langchain_format = [] |
| for human, ai in history: |
| history_langchain_format.append(HumanMessage(content=human)) |
| history_langchain_format.append(AIMessage(content=ai)) |
| history_langchain_format.append(HumanMessage(content=prompt)) |
| |
| return history_langchain_format |
| |
|
|
|
|
|
|
|
|
|
|
| |
| |
| |
| def markdown_to_html_with_syntax_highlight(md_str): |
| def replacer(match): |
| lang = match.group(1) or "text" |
| code = match.group(2) |
| lang = lang.strip() |
| |
| if lang=="text": |
| lexer = guess_lexer(code) |
| lang = lexer.name |
| |
| try: |
| lexer = get_lexer_by_name(lang, stripall=True) |
| except ValueError: |
| lexer = get_lexer_by_name("python", stripall=True) |
| formatter = HtmlFormatter() |
| |
| highlighted_code = highlight(code, lexer, formatter) |
|
|
| return f'<pre><code class="{lang}">{highlighted_code}</code></pre>' |
|
|
| code_block_pattern = r"```(\w+)?\n([\s\S]+?)\n```" |
| md_str = re.sub(code_block_pattern, replacer, md_str, flags=re.MULTILINE) |
|
|
| html_str = markdown(md_str) |
| return html_str |
|
|
|
|
| def normalize_markdown(md_text: str) -> str: |
| lines = md_text.split("\n") |
| normalized_lines = [] |
| inside_list = False |
|
|
| for i, line in enumerate(lines): |
| if re.match(r"^(\d+\.|-|\*|\+)\s", line.strip()): |
| if not inside_list and i > 0 and lines[i - 1].strip() != "": |
| normalized_lines.append("") |
| inside_list = True |
| normalized_lines.append(line) |
| elif inside_list and line.strip() == "": |
| if i < len(lines) - 1 and not re.match( |
| r"^(\d+\.|-|\*|\+)\s", lines[i + 1].strip() |
| ): |
| normalized_lines.append(line) |
| continue |
| else: |
| inside_list = False |
| normalized_lines.append(line) |
|
|
| return "\n".join(normalized_lines) |
|
|
|
|
| def convert_mdtext(md_text): |
| code_block_pattern = re.compile(r"```(.*?)(?:```|$)", re.DOTALL) |
| inline_code_pattern = re.compile(r"`(.*?)`", re.DOTALL) |
| code_blocks = code_block_pattern.findall(md_text) |
| non_code_parts = code_block_pattern.split(md_text)[::2] |
|
|
| result = [] |
| for non_code, code in zip(non_code_parts, code_blocks + [""]): |
| if non_code.strip(): |
| non_code = normalize_markdown(non_code) |
| if inline_code_pattern.search(non_code): |
| result.append(markdown(non_code, extensions=["tables"])) |
| else: |
| result.append(mdtex2html.convert(non_code, extensions=["tables"])) |
| if code.strip(): |
| code = f"\n```{code}\n\n```" |
| code = markdown_to_html_with_syntax_highlight(code) |
| result.append(code) |
| result = "".join(result) |
| result += ALREADY_CONVERTED_MARK |
| return result |
|
|
| def convert_asis(userinput): |
| return f"<p style=\"white-space:pre-wrap;\">{html.escape(userinput)}</p>"+ALREADY_CONVERTED_MARK |
|
|
| def detect_converted_mark(userinput): |
| if userinput.endswith(ALREADY_CONVERTED_MARK): |
| return True |
| else: |
| return False |
|
|
|
|
|
|
| def detect_language(code): |
| if code.startswith("\n"): |
| first_line = "" |
| else: |
| first_line = code.strip().split("\n", 1)[0] |
| language = first_line.lower() if first_line else "" |
| code_without_language = code[len(first_line) :].lstrip() if first_line else code |
| return language, code_without_language |
|
|
| def convert_to_markdown(text): |
| text = text.replace("$","$") |
| def replace_leading_tabs_and_spaces(line): |
| new_line = [] |
| |
| for char in line: |
| if char == "\t": |
| new_line.append("	") |
| elif char == " ": |
| new_line.append(" ") |
| else: |
| break |
| return "".join(new_line) + line[len(new_line):] |
|
|
| markdown_text = "" |
| lines = text.split("\n") |
| in_code_block = False |
|
|
| for line in lines: |
| if in_code_block is False and line.startswith("```"): |
| in_code_block = True |
| markdown_text += f"{line}\n" |
| elif in_code_block is True and line.startswith("```"): |
| in_code_block = False |
| markdown_text += f"{line}\n" |
| elif in_code_block: |
| markdown_text += f"{line}\n" |
| else: |
| line = replace_leading_tabs_and_spaces(line) |
| line = re.sub(r"^(#)", r"\\\1", line) |
| markdown_text += f"{line} \n" |
|
|
| return markdown_text |
|
|
| def add_language_tag(text): |
| def detect_language(code_block): |
| try: |
| lexer = guess_lexer(code_block) |
| return lexer.name.lower() |
| except ClassNotFound: |
| return "" |
|
|
| code_block_pattern = re.compile(r"(```)(\w*\n[^`]+```)", re.MULTILINE) |
|
|
| def replacement(match): |
| code_block = match.group(2) |
| if match.group(2).startswith("\n"): |
| language = detect_language(code_block) |
| if language: |
| return f"```{language}{code_block}```" |
| else: |
| return f"```\n{code_block}```" |
| else: |
| return match.group(1) + code_block + "```" |
|
|
| text2 = code_block_pattern.sub(replacement, text) |
| return text2 |
|
|
| def delete_last_conversation(chatbot, history): |
| if len(chatbot) > 0: |
| chatbot.pop() |
|
|
| if len(history) > 0: |
| history.pop() |
| |
| return ( |
| chatbot, |
| history, |
| "Delete Done", |
| ) |
|
|
| def reset_state(): |
| return [], [], "Reset Done" |
|
|
| def reset_textbox(): |
| return gr.update(value=""),"" |
|
|
| def cancel_outputing(): |
| return "Stop Done" |
|
|
|
|
| |
| |
| |
| |
| |
| def create_picture(history, prompt): |
| client = OpenAI() |
| response = client.images.generate(model="dall-e-3", prompt=prompt,size="1024x1024",quality="standard",n=1,) |
| image_url = response.data[0].url |
| |
| response2 = requests.get(image_url) |
| |
| image = Image.open(response2.raw) |
| return image |
|
|
| |
| |
| |
| def process_image(image_path, prompt): |
| |
| with open(image_path, "rb") as image_file: |
| encoded_string = base64.b64encode(image_file.read()).decode('utf-8') |
|
|
|
|
| |
| headers = { |
| "Content-Type": "application/json", |
| "Authorization": f"Bearer {OAI_API_KEY}" |
| } |
|
|
| payload = { |
| "model": MODEL_NAME_IMAGE, |
| "messages": [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": prompt |
| }, |
| { |
| "type": "image_url", |
| "image_url": { |
| "url": f"data:image/jpeg;base64,{encoded_string}" |
| } |
| } |
| ] |
| } |
| ], |
| "max_tokens": 300 |
| } |
| return headers, payload |
|
|
| |
| |
| def transfer_input(inputs): |
| textbox = reset_textbox() |
| return ( |
| inputs, |
| gr.update(value=""), |
| gr.Button.update(visible=True), |
| ) |
|
|
|
|
| |
| |
| |
| class State: |
| interrupted = False |
|
|
| def interrupt(self): |
| self.interrupted = True |
|
|
| def recover(self): |
| self.interrupted = False |
| shared_state = State() |
|
|
|
|
|
|
|
|
| def is_stop_word_or_prefix(s: str, stop_words: list) -> bool: |
| for stop_word in stop_words: |
| if s.endswith(stop_word): |
| return True |
| for i in range(1, len(stop_word)): |
| if s.endswith(stop_word[:i]): |
| return True |
| return False |
|
|
|
|
|
|