| import gradio as gr |
| import os |
| import time |
|
|
| import sys |
| import pinecone |
|
|
| from openai import OpenAI |
| from langchain_pinecone import PineconeVectorStore |
| from pinecone import Pinecone, ServerlessSpec |
|
|
| from langchain_openai import ChatOpenAI |
| from langchain_openai import OpenAIEmbeddings |
|
|
| from langchain.chains import RetrievalQA |
|
|
| from langchain_community.document_loaders import PyPDFLoader |
| from langchain_text_splitters import CharacterTextSplitter |
|
|
| import glob |
|
|
|
|
| from langdetect import detect |
| from googletrans import Translator |
|
|
| class CNC_QA: |
| def __init__(self): |
| print("Initialing CNC_QA ") |
| os.environ['PYTHINTRACEMALLOC'] = '1' |
| |
| |
| |
| self.PINECONE_INDEX ="meldas" |
| self.PINECONE_ENV = "gcp-starter" |
|
|
| |
| self.add_files=False |
| |
| self.files=glob.glob(f'./Doc/*') |
|
|
| self.vectorstore = self.initialize_vectorstore(index_name=self.PINECONE_INDEX) |
|
|
| if self.add_files==True: |
| self.add_documents() |
|
|
| llm = self.load_llm(model_id="gpt-4o") |
| |
| self.bot = RetrievalQA.from_chain_type( |
| llm=llm, |
| chain_type='stuff', |
| retriever=self.vectorstore.as_retriever(), |
| return_source_documents=True |
| ) |
| |
| def load_embedding_model(self,model_name): |
| print(f'loading embedding model:{model_name}') |
| embeddings = OpenAIEmbeddings( |
| model=model_name, |
| ) |
| return embeddings |
|
|
| def initialize_vectorstore(self,index_name): |
| model_name = "text-embedding-3-small" |
| embeddings = self.load_embedding_model(model_name=model_name) |
|
|
| print(f'loading vectorstore:{index_name}') |
| self.pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY")) |
|
|
| existing_indexes = [index_info["name"] for index_info in self.pc.list_indexes()] |
| if index_name not in existing_indexes: |
| print(f'Index:{self.PINECONE_INDEX} is not found....') |
| print(f'Creating new Index:{self.PINECONE_INDEX}') |
| self.add_files=True |
| self.pc.create_index( |
| name=self.PINECONE_INDEX, |
| dimension=1536, |
| metric="cosine", |
| spec=ServerlessSpec(cloud="aws", region="us-east-1"), |
| ) |
| while not self.pc.describe_index(self.PINECONE_INDEX).status["ready"]: |
| time.sleep(1) |
| print(f'Created new Index:{self.PINECONE_INDEX}') |
|
|
| self.show_index() |
| index = self.pc.Index(self.PINECONE_INDEX) |
| vectorstore = PineconeVectorStore(index=index, embedding=embeddings) |
| return vectorstore |
| |
| def show_index(self): |
| print(f'detail of Index:{self.PINECONE_INDEX}') |
| index = self.pc.Index(self.PINECONE_INDEX) |
| while not self.pc.describe_index(self.PINECONE_INDEX).status["ready"]: |
| time.sleep(1) |
| print(index.describe_index_stats()) |
|
|
| def delete_documents(self): |
| print(f'delete documents.....') |
| self.pc.delete_index(self.PINECONE_INDEX) |
| self.show_index |
|
|
| def add_documents(self): |
| print(f'add documents.....') |
| for i,file in enumerate(self.files): |
| print(f'{i+1}/{len(self.files)}:{file}') |
| loader = PyPDFLoader(file) |
| documents = loader.load_and_split() |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) |
| docs = text_splitter.split_documents(documents) |
| self.vectorstore.add_documents(documents=docs) |
| print('Documents are loaded on database') |
| self.show_index() |
|
|
| def load_llm(self,model_id): |
| print(f'load llm:{model_id}') |
| llm = ChatOpenAI( |
| model=model_id, |
| temperature=0, |
| max_tokens=None, |
| timeout=None, |
| max_retries=2, |
| ) |
| return llm |
|
|
| def echo(self,message,history): |
| |
|
|
| if message == 'meldas_del': |
| self.delete_documents() |
| elif message == "Who are you?": |
| message = "I am MELDAS. I will answer your question about Mitsubishi CNC" |
| else: |
| text_en ,lang_original = self.translate_message(message,'en') |
| ans = self.bot(text_en) |
|
|
| try: |
| result,lang = self.translate_message(ans['result'],lang_original) |
| except: |
| result='' |
| try: |
| source = ans['source_documents'][0].metadata['source'] |
| except: |
| source = '' |
| try: |
| page_content = ans['source_documents'][0].page_content |
| except: |
| page_content = '' |
|
|
| OtherSource = '' |
| for source_documents in ans['source_documents']: |
| OtherSource += source_documents.metadata['source'] + '\n' |
|
|
| message = result + '\n\n' + '■Document\n' + source + '\n\n' + '■Page\n' + page_content + '\n\n' + '■References Documents\n' + OtherSource |
|
|
| return message |
|
|
|
|
| def translate_message(self,message,lang_dest): |
| import asyncio |
|
|
| async def translate_text(text,dest='en'): |
| async with Translator() as translator: |
| result = await translator.translate(text, dest = dest) |
| print(result) |
| return result.text,result.src |
|
|
| |
| text, lang = asyncio.run(translate_text(text = message,dest=lang_dest)) |
|
|
| print(f'元言語:{lang} -> 翻訳言語:{lang_dest}') |
| print(message) |
| print(text) |
| return text, lang |
|
|
|
|
|
|
|
|
|
|
| if __name__ == "__main__": |
| print("start") |
|
|
| meldas = CNC_QA() |
|
|
| demo = gr.ChatInterface(fn=meldas.echo, examples=["Who are you?"], title="MELDAS AI") |
| |
| demo.launch(debug=True) |
|
|