Spaces:
Sleeping
Sleeping
| import os | |
| import pickle | |
| from json import dumps, loads | |
| from typing import Any, List, Mapping, Optional | |
| import numpy as np | |
| import openai | |
| import pandas as pd | |
| from dotenv import load_dotenv | |
| from huggingface_hub import HfFileSystem | |
| from langchain.llms.base import LLM | |
| from llama_index import ( | |
| Document, | |
| GPTVectorStoreIndex, | |
| LLMPredictor, | |
| PromptHelper, | |
| ServiceContext, | |
| SimpleDirectoryReader, | |
| StorageContext, | |
| load_index_from_storage, | |
| ) | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline | |
| # from utils.customLLM import CustomLLM | |
| load_dotenv() | |
| # openai.api_key = os.getenv("OPENAI_API_KEY") | |
| fs = HfFileSystem() | |
| # define prompt helper | |
| # set maximum input size | |
| CONTEXT_WINDOW = 2048 | |
| # set number of output tokens | |
| NUM_OUTPUT = 525 | |
| # set maximum chunk overlap | |
| CHUNK_OVERLAP_RATION = 0.2 | |
| prompt_helper = PromptHelper( | |
| context_window=CONTEXT_WINDOW, | |
| num_output=NUM_OUTPUT, | |
| chunk_overlap_ratio=CHUNK_OVERLAP_RATION, | |
| ) | |
| llm_model_name = "bigscience/bloom-560m" | |
| tokenizer = AutoTokenizer.from_pretrained(llm_model_name) | |
| model = AutoModelForCausalLM.from_pretrained(llm_model_name, config="T5Config") | |
| model_pipeline = pipeline( | |
| model=model, | |
| tokenizer=tokenizer, | |
| task="text-generation", | |
| # device=0, # GPU device number | |
| # max_length=512, | |
| do_sample=True, | |
| top_p=0.95, | |
| top_k=50, | |
| temperature=0.0, | |
| ) | |
| class CustomLLM(LLM): | |
| pipeline = model_pipeline | |
| def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str: | |
| prompt_length = len(prompt) | |
| response = self.pipeline(prompt, max_new_tokens=525)[0]["generated_text"] | |
| # only return newly generated tokens | |
| return response[prompt_length:] | |
| def _identifying_params(self) -> Mapping[str, Any]: | |
| return {"name_of_model": self.model_name} | |
| def _llm_type(self) -> str: | |
| return "custom" | |
| class LlamaCustom: | |
| # define llm | |
| llm_predictor = LLMPredictor(llm=CustomLLM()) | |
| service_context = ServiceContext.from_defaults( | |
| llm_predictor=llm_predictor, prompt_helper=prompt_helper | |
| ) | |
| def __init__(self, name: str) -> None: | |
| self.vector_index = self.initialize_index(index_name=name) | |
| def prepare_data(self, file_path: str): | |
| df = pd.read_json(file_path) | |
| df = df.replace(to_replace="", value=np.nan).dropna(axis=0) # remove null values | |
| parsed = loads(df.to_json(orient="records")) | |
| documents = [] | |
| for item in parsed: | |
| document = Document( | |
| text=item["paragraphText"], | |
| doc_id=item["_id"]["$oid"], | |
| extra_info={ | |
| "chapter": item["chapter"], | |
| "article": item["article"], | |
| "title": item["title"], | |
| }, | |
| ) | |
| documents.append(document) | |
| return documents | |
| def initialize_index(self, index_name): | |
| file_path = f"./vectorStores/{index_name}" | |
| if os.path.exists(path=file_path): | |
| # rebuild storage context | |
| storage_context = StorageContext.from_defaults(persist_dir=file_path) | |
| # local load index access | |
| index = load_index_from_storage(storage_context) | |
| # huggingface repo load access | |
| # with fs.open(file_path, "r") as file: | |
| # index = pickle.loads(file.readlines()) | |
| return index | |
| else: | |
| documents = self.prepare_data(r"./assets/regItems.json") | |
| #documents = SimpleDirectoryReader(input_dir="./assets/pdf").load_data() | |
| index = GPTVectorStoreIndex.from_documents( | |
| documents, service_context=self.service_context | |
| ) | |
| # local write access | |
| index.storage_context.persist(file_path) | |
| # huggingface repo write access | |
| # with fs.open(file_path, "w") as file: | |
| # file.write(pickle.dumps(index)) | |
| return index | |
| def get_response(self, query_str): | |
| print("query_str: ", query_str) | |
| query_engine = self.vector_index.as_query_engine() | |
| response = query_engine.query(query_str) | |
| return str(response) | |