|
|
import gradio as gr |
|
|
from PyPDF2 import PdfReader |
|
|
import zipfile |
|
|
import os |
|
|
import io |
|
|
import nltk |
|
|
import openai |
|
|
import time |
|
|
import subprocess |
|
|
import sys |
|
|
from sentence_transformers import SentenceTransformer, util |
|
|
|
|
|
|
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", "requirements.txt"]) |
|
|
|
|
|
def install(package): |
|
|
subprocess.check_call([sys.executable, "-m", "pip", "install", package]) |
|
|
|
|
|
|
|
|
install("torch") |
|
|
install("transformers") |
|
|
install("sentence-transformers") |
|
|
|
|
|
|
|
|
nltk.download('punkt') |
|
|
|
|
|
|
|
|
openai.api_key = os.getenv('OpenAPI') |
|
|
|
|
|
|
|
|
cache = {} |
|
|
|
|
|
|
|
|
model = SentenceTransformer('all-MiniLM-L6-v2') |
|
|
|
|
|
def create_persona(text): |
|
|
max_retries = 5 |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
response = openai.ChatCompletion.create( |
|
|
model="gpt-3.5-turbo", |
|
|
messages=[ |
|
|
{"role": "system", "content": "You are an expert at summarizing content to provide a factual persona."}, |
|
|
{"role": "user", "content": f"Create a persona based on this text: {text}"}, |
|
|
] |
|
|
) |
|
|
return response['choices'][0]['message']['content'] |
|
|
except Exception as e: |
|
|
if attempt < max_retries - 1: |
|
|
time.sleep(1) |
|
|
continue |
|
|
else: |
|
|
return str(e) |
|
|
|
|
|
def call_openai_api(persona, user_prompt, additional_facts): |
|
|
max_retries = 5 |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
response = openai.ChatCompletion.create( |
|
|
model="gpt-3.5-turbo", |
|
|
messages=[ |
|
|
{"role": "system", "content": f"You are an expert at providing short factual answers."}, |
|
|
{"role": "user", "content": f"""Ignore all previous instructions. As {persona} |
|
|
You are James Waddell, you wrote this paper: optimizing the Workplace through Technology. |
|
|
Here are some additional facts: {additional_facts}. Now, provide short factual answers as James, focus on the additional facts if provided: {user_prompt}"""}, |
|
|
] |
|
|
) |
|
|
return response['choices'][0]['message']['content'] |
|
|
except Exception as e: |
|
|
if attempt < max_retries - 1: |
|
|
time.sleep(1) |
|
|
continue |
|
|
else: |
|
|
return str(e) |
|
|
|
|
|
def extract_persona_from_pdf(pdf_file): |
|
|
with open(pdf_file, 'rb') as f: |
|
|
pdf = PdfReader(f) |
|
|
aggregated_text = '' |
|
|
for page in pdf.pages: |
|
|
aggregated_text += page.extract_text() |
|
|
return create_persona(aggregated_text) |
|
|
|
|
|
|
|
|
persona = extract_persona_from_pdf('persona.pdf') |
|
|
|
|
|
def pdf_to_text(pdf_file_io, user_prompt, persona): |
|
|
aggregated_text = '' |
|
|
pdf = PdfReader(pdf_file_io) |
|
|
for page in pdf.pages: |
|
|
aggregated_text += page.extract_text() |
|
|
cache[pdf_file_io] = aggregated_text |
|
|
|
|
|
query_embedding = model.encode(user_prompt, convert_to_tensor=True) |
|
|
text_embedding = model.encode(aggregated_text, convert_to_tensor=True) |
|
|
cosine_scores = util.pytorch_cos_sim(query_embedding, text_embedding) |
|
|
|
|
|
if cosine_scores[0][0] > 0.5: |
|
|
additional_facts = "Direct answer from author's knoweledge: " + user_prompt |
|
|
else: |
|
|
additional_facts = "No additional information to add." |
|
|
|
|
|
answer = call_openai_api(persona, user_prompt, additional_facts) |
|
|
return answer |
|
|
|
|
|
def ask_expert(user_prompt): |
|
|
with zipfile.ZipFile("documents.zip", 'r') as z: |
|
|
for filename in z.namelist(): |
|
|
if filename.endswith('.pdf'): |
|
|
pdf_file_data = z.read(filename) |
|
|
pdf_file_io = io.BytesIO(pdf_file_data) |
|
|
result = pdf_to_text(pdf_file_io, user_prompt, persona) |
|
|
return result |
|
|
|
|
|
iface = gr.Interface( |
|
|
fn=ask_expert, |
|
|
inputs=gr.inputs.Textbox(lines=1, placeholder="Enter a question or prompt for the Author", label="User Prompt"), |
|
|
outputs=gr.outputs.Textbox(label="Cognitive Agent Response") |
|
|
) |
|
|
iface.launch() |
|
|
|
|
|
|