Spaces:
Runtime error
Runtime error
| import os | |
| from threading import Thread | |
| from typing import Iterator | |
| import gradio as gr | |
| import spaces | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, TextStreamer | |
| from llama_index.core.prompts.prompts import SimpleInputPrompt | |
| from llama_index.llms.huggingface import HuggingFaceLLM | |
| from llama_index.legacy.embeddings.langchain import LangchainEmbedding | |
| #from langchain.embeddings.huggingface import HuggingFaceEmbeddings # This import should now work | |
| from langchain_huggingface import HuggingFaceEmbeddings | |
| from sentence_transformers import SentenceTransformer | |
| from llama_index.core import set_global_service_context, ServiceContext | |
| from llama_index.core import VectorStoreIndex, download_loader, Document # Import Document | |
| from pathlib import Path | |
| import fitz # PyMuPDF | |
| MAX_MAX_NEW_TOKENS = 2048 | |
| DEFAULT_MAX_NEW_TOKENS = 512 | |
| MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "4096")) | |
| DEFAULT_SYS_PROMPT = """\ | |
| """ | |
| DESCRIPTION = """\ | |
| # Test Chat Information System for MEPO 2024 courtesy of Dr. Dancy & THiCC Lab | |
| Duplicated, then modified from [llama-2 7B example](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat) | |
| """ | |
| LICENSE = """ | |
| <p/> | |
| --- | |
| As a derivate work of [Llama-2-7b-chat](https://huggingface.co/meta-llama/Llama-2-7b-chat) by Meta, | |
| this demo is governed by the original [license](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/LICENSE.txt) and [acceptable use policy](https://huggingface.co/spaces/huggingface-projects/llama-2-7b-chat/blob/main/USE_POLICY.md). | |
| """ | |
| SYSTEM_PROMPT = """<s>[INST] <<SYS>> | |
| <</SYS>>""" | |
| def read_pdf_to_documents(file_path): | |
| doc = fitz.open(file_path) | |
| documents = [] | |
| for page_num in range(len(doc)): | |
| page = doc.load_page(page_num) | |
| text = page.get_text() | |
| documents.append(Document(text=text)) # Now Document is defined | |
| return documents | |
| # Function to update the global system prompt | |
| def update_system_prompt(new_prompt): | |
| global SYSTEM_PROMPT | |
| SYSTEM_PROMPT = new_prompt | |
| query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]") | |
| return "System prompt updated." | |
| def query_model(question): | |
| llm = HuggingFaceLLM( | |
| context_window=4096, | |
| max_new_tokens=256, | |
| system_prompt=SYSTEM_PROMPT, | |
| query_wrapper_prompt=query_wrapper_prompt, | |
| model=model, | |
| tokenizer=tokenizer | |
| ) | |
| #embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")) | |
| service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings) | |
| set_global_service_context(service_context) | |
| response = query_engine.query(question) | |
| # formatted_response = format_paragraph(response.response) | |
| return response.response | |
| def format_paragraph(text, line_length=80): | |
| words = text.split() | |
| lines = [] | |
| current_line = [] | |
| current_length = 0 | |
| for word in words: | |
| if current_length + len(word) + 1 > line_length: | |
| lines.append(' '.join(current_line)) | |
| current_line = [word] | |
| current_length = len(word) + 1 | |
| else: | |
| current_line.append(word) | |
| current_length += len(word) + 1 | |
| if current_line: | |
| lines.append(' '.join(current_line)) | |
| return '\n'.join(lines) | |
| if not torch.cuda.is_available(): | |
| DESCRIPTION += "We won't be able to run this space! We need GPU processing" | |
| if torch.cuda.is_available(): | |
| llama_model_id = "meta-llama/Llama-2-7b-chat-hf" | |
| model = AutoModelForCausalLM.from_pretrained(llama_model_id, torch_dtype=torch.float16, device_map="auto") | |
| tokenizer = AutoTokenizer.from_pretrained(llama_model_id) | |
| tokenizer.use_default_system_prompt = False | |
| # Throw together the query wrapper | |
| query_wrapper_prompt = SimpleInputPrompt("{query_str} [/INST]") | |
| llm = HuggingFaceLLM(context_window=4096, | |
| max_new_tokens=256, | |
| system_prompt=SYSTEM_PROMPT, | |
| query_wrapper_prompt=query_wrapper_prompt, | |
| model=model, tokenizer=tokenizer) | |
| embeddings = LangchainEmbedding(HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")) | |
| service_context = ServiceContext.from_defaults(chunk_size=1024, llm=llm, embed_model=embeddings) | |
| set_global_service_context(service_context) | |
| print(os.listdir()) | |
| file_path = Path("files/Full_Pamplet.pdf") | |
| documents = read_pdf_to_documents(file_path) | |
| index = VectorStoreIndex.from_documents(documents) | |
| query_engine = index.as_query_engine() | |
| update_prompt_interface = gr.Interface( | |
| fn=update_system_prompt, | |
| inputs=gr.Textbox(lines=5, placeholder="Enter the system prompt here...", label="System Prompt", value=SYSTEM_PROMPT), | |
| outputs=gr.Textbox(label="Status"), | |
| title="System Prompt Updater", | |
| description="Update the system prompt used for context." | |
| ) | |
| # Create Gradio interface for querying the model | |
| query_interface = gr.Interface( | |
| fn=query_model, | |
| inputs=gr.Textbox(lines=2, placeholder="Enter your question here...", label="User Question"), | |
| outputs=gr.Textbox(label="Response"), | |
| title="Document Query Assistant", | |
| description="Ask questions based on the content of the loaded pamphlet." | |
| ) | |
| # Combine the interfaces | |
| combined_interface = gr.TabbedInterface([update_prompt_interface, query_interface], ["Update System Prompt", "Query Assistant"]) | |
| # Launch the combined interface | |
| #combined_interface.launch() | |
| """ | |
| @spaces.GPU(duration=240) | |
| def generate( | |
| message: str, | |
| chat_history: list[tuple[str, str]], | |
| system_prompt: str, | |
| max_new_tokens: int = MAX_MAX_NEW_TOKENS, | |
| temperature: float = 0.6, | |
| top_p: float = 0.9, | |
| top_k: int = 50, | |
| repetition_penalty: float = 1.2, | |
| ) -> Iterator[str]: | |
| conversation = [] | |
| if system_prompt: | |
| conversation.append({"role": "system", "content": system_prompt}) | |
| for user, assistant in chat_history: | |
| conversation.extend([{"role": "user", "content": user}, {"role": "assistant", "content": assistant}]) | |
| conversation.append({"role": "user", "content": message}) | |
| input_ids = tokenizer.apply_chat_template(conversation, return_tensors="pt") | |
| if input_ids.shape[1] > MAX_INPUT_TOKEN_LENGTH: | |
| input_ids = input_ids[:, -MAX_INPUT_TOKEN_LENGTH:] | |
| gr.Warning(f"Trimmed input from conversation as it was longer than {MAX_INPUT_TOKEN_LENGTH} tokens.") | |
| input_ids = input_ids.to(model.device) | |
| streamer = TextIteratorStreamer(tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True) | |
| generate_kwargs = dict( | |
| {"input_ids": input_ids}, | |
| streamer=streamer, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=True, | |
| top_p=top_p, | |
| top_k=top_k, | |
| temperature=temperature, | |
| num_beams=1, | |
| repetition_penalty=repetition_penalty, | |
| ) | |
| t = Thread(target=model.generate, kwargs=generate_kwargs) | |
| t.start() | |
| outputs = [] | |
| for text in streamer: | |
| outputs.append(text) | |
| yield "".join(outputs) | |
| chat_interface = gr.ChatInterface( | |
| fn=generate, | |
| additional_inputs=[ | |
| gr.Textbox(label="System prompt", lines=6), | |
| gr.Slider( | |
| label="Max new tokens", | |
| minimum=1, | |
| maximum=MAX_MAX_NEW_TOKENS, | |
| step=1, | |
| value=DEFAULT_MAX_NEW_TOKENS, | |
| ), | |
| gr.Slider( | |
| label="Temperature", | |
| minimum=0.1, | |
| maximum=4.0, | |
| step=0.1, | |
| value=0.6, | |
| ), | |
| gr.Slider( | |
| label="Top-p (nucleus sampling)", | |
| minimum=0.05, | |
| maximum=1.0, | |
| step=0.05, | |
| value=0.9, | |
| ), | |
| gr.Slider( | |
| label="Top-k", | |
| minimum=1, | |
| maximum=1000, | |
| step=1, | |
| value=50, | |
| ), | |
| gr.Slider( | |
| label="Repetition penalty", | |
| minimum=1.0, | |
| maximum=2.0, | |
| step=0.05, | |
| value=1.2, | |
| ), | |
| ], | |
| stop_btn=None, | |
| examples=[ | |
| ["Hello there! How are you doing?"], | |
| ["Can you explain briefly to me what is the Python programming language?"], | |
| ["Explain the plot of Cinderella in a sentence."], | |
| ["How many hours does it take a man to eat a Helicopter?"], | |
| ["Write a 100-word article on 'Benefits of Open-Source in AI research'"], | |
| ], | |
| ) | |
| """ | |
| with gr.Blocks(css="style.css") as demo: | |
| gr.Markdown(DESCRIPTION) | |
| #chat_interface.render() | |
| combined_interface.render() | |
| gr.Markdown(LICENSE) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=20).launch(share=True) | |