Spaces:
Running
Running
| from encodings import utf_8 | |
| import os | |
| from pickle import POP | |
| import gradio as gr | |
| import openai | |
| from openai import OpenAI | |
| from dotenv import load_dotenv | |
| from pathlib import Path | |
| from time import sleep | |
| # import audioread | |
| # import queue | |
| # import threading | |
| from glob import glob | |
| import json | |
| from datetime import datetime, timedelta | |
| import sqlite3 | |
| import struct | |
| import re | |
| from dateutil.parser import * | |
| from openai.lib import azure | |
| from pydantic.type_adapter import R | |
| import pytz | |
| import requests | |
| import boto3 | |
| load_dotenv(override=True) | |
| key2 = os.getenv('OPENAI_API_KEY2') | |
| key3 = os.getenv('OPENAI_API_KEY3') | |
| key = os.getenv('OPENAI_API_KEY') | |
| keyb = os.getenv('OPENAI_API_KEYB') | |
| users = os.getenv('LOGNAME') | |
| unames = users.split(',') | |
| pwds = os.getenv('PASSWORD') | |
| pwdList = pwds.split(',') | |
| google_translate_key = os.getenv('GOOGLE_KEY') | |
| amazon_access_id = os.getenv('AMAZON_ACCESS_ID') | |
| amazon_access_secret = os.getenv('AMAZON_ACCESS_SECRET') | |
| azure_key = os.getenv('AZURE_KEY') | |
| site = os.getenv('SITE') | |
| if site == 'local': | |
| dp = Path('./data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = './data/' | |
| else: | |
| dp = Path('/data') | |
| dp.mkdir(exist_ok=True) | |
| dataDir = '/data/' | |
| speak_file = dataDir + "speek.wav" | |
| # client = OpenAI(api_key = key) | |
| #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] | |
| abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} | |
| languages ={'en':'English', 'es':'Spanish', 'de':'German', 'fr':'French', 'zh':'Chinese', 'ro':'Romanian', | |
| 'ja':'Japanese', 'he':'Hebrew', 'af':'Afrikaans'} | |
| relevance_terms = [ | |
| 'Probably little connection', | |
| 'Questionable relevance', | |
| 'May be relevant', | |
| 'Probably relevant', | |
| 'Likely highly relevant' | |
| ] | |
| def populate_book_chooser(active: bool, bible_books: {}): | |
| if active: | |
| books = [] | |
| for book in bible_books.keys(): | |
| books.append(book) | |
| return gr.Dropdown(choices=books) | |
| else: | |
| return gr.Dropdown() | |
| def get_bible_verse(book: str, chapter: str, verse_num: str)->str: | |
| verse_num = int(verse_num) | |
| rv = '' | |
| conn = sqlite3.connect(dataDir + 'ISR_bible.db') | |
| cur = conn.cursor() | |
| query = f'SELECT * from embeds where book = ? and chapter = ?' | |
| result = cur.execute(query, (book, chapter) ) | |
| for row in result.fetchall(): | |
| verse_range = row[2] | |
| (n1, n2) = verse_range.split('-') | |
| if verse_num >= int(n1) and verse_num <= int(n2): | |
| verse_text = row[3] | |
| rv = verse_text | |
| break; | |
| conn.close() | |
| return rv | |
| def parse_verse_refs(txt: str, reverse_bible_books)->[]: | |
| subs = {'1 ':'First ', '2 ':'Second ', '3 ':' Third'} | |
| rv = [] | |
| pattern = r'\{(.+),(.+),(.+),(.+)\}' | |
| ml = re.findall(pattern, txt) | |
| for (n, bk, ch, vs) in ml: | |
| try: | |
| if '#' in n: | |
| continue | |
| if 'chunk' in n.casefold(): | |
| chunk_num = int(n.strip().split()[1]) | |
| else: | |
| chunk_num = int(n.strip()) | |
| book = bk.strip() | |
| for c in ['1 ','2 ','3 ']: | |
| if book.startswith(c): | |
| book = book.replace(c, subs[c]) | |
| short_book = reverse_bible_books.get(book.casefold(), None) | |
| chapter = int(ch.strip()) | |
| verse = int(vs.strip()) | |
| if short_book: | |
| rv.append( (chunk_num, short_book, chapter, verse) ) | |
| except: | |
| continue | |
| return rv | |
| # def set_prompt(is_find_verses): | |
| # txt = '' | |
| # if (is_find_verses): | |
| # txt = "bible books or verses mentioned. (Go ahead and tap 'Submit Prompt/Question')" | |
| # return gr.Textbox(value=txt) | |
| def check_books(filter: bool, books: []): | |
| if len(books) == 0 and filter: | |
| return md('<h5>Warning: You enabled bible book filter but have no books selected in filter</h5>\n') | |
| else: | |
| return 'ok' | |
| def on_db_change(db_name: str, bible_books): | |
| date_vis = True | |
| bible_vis = False | |
| books = [] | |
| for key in bible_books.keys(): | |
| books.append(key) | |
| if 'ISR' in db_name: | |
| date_vis = False | |
| bible_vis = True | |
| return [gr.Markdown(visible=date_vis), gr.Textbox(visible=date_vis), | |
| gr.Textbox(visible=date_vis), | |
| gr.Dropdown(visible=bible_vis, choices=books, interactive=True), | |
| gr.Checkbox(visible=bible_vis), | |
| gr.Checkbox(visible=date_vis, value=False)] | |
| def make_sorted_passages(passages, bible_books): | |
| numbered_passages = [] | |
| for passage in passages: | |
| (book, chapter, verse_range, verse, dp) = passage | |
| (book_num, book) = bible_books.get(book, (0, 'Unknown') ) | |
| book_num *= 100000 | |
| chap_num = int(chapter) * 1000 | |
| verses = verse_range.split('-') | |
| verse_num = int(verses[0].strip()) | |
| sort_num = book_num + chap_num + verse_num | |
| relevance = get_relevance_number(dp) | |
| numbered_passages.append( (sort_num, book, chapter, verse_range, verse, relevance) ) | |
| sorted_passages = sorted(numbered_passages) | |
| return sorted_passages | |
| def get_relevance_number(dp: float)->int: | |
| rv = 0 | |
| if dp > 0.6: | |
| rv = 4 | |
| elif dp > 0.5: | |
| rv = 3 | |
| elif dp > 0.4: | |
| rv = 2 | |
| elif dp > 0.3: | |
| rv = 1 | |
| return rv | |
| def make_hebrew(prompt: str, en_hebrew: {})->str: | |
| prompt = prompt.casefold() | |
| for (key, val) in en_hebrew.items(): | |
| key = key.casefold() | |
| if key in prompt: | |
| prompt = prompt.replace(key, val) | |
| return prompt | |
| def update_translation_count(count, language): | |
| if language != 'en': | |
| return count | |
| else: | |
| return 0 | |
| def azure_translate_text(text, target_language, source_language = 'en'): | |
| if target_language == source_language: | |
| return text | |
| path = '/translate' | |
| endpoint = 'https://api.cognitive.microsofttranslator.com' | |
| constructed_url = endpoint + path | |
| headers = { | |
| 'Ocp-Apim-Subscription-Key': azure_key, | |
| 'Ocp-Apim-Subscription-Region': 'eastus', #'East US' | |
| 'Content-Type': 'application/json', | |
| } | |
| body = [{ | |
| 'text': text | |
| }] | |
| params = { | |
| 'api-version': '3.0', | |
| 'to': target_language | |
| } | |
| response = requests.post(constructed_url, headers=headers, params=params, json=body) | |
| response.raise_for_status() | |
| return response.json()[0]['translations'][0]['text'] | |
| def translate_text(text_list, target_lang): # Amazon translate | |
| client = boto3.client( | |
| 'translate', | |
| aws_access_key_id=amazon_access_id, | |
| aws_secret_access_key=amazon_access_secret, | |
| region_name='us-east-1' | |
| ) | |
| rv = '' | |
| for text in text_list: | |
| result = client.translate_text( | |
| Text=text, | |
| SourceLanguageCode='en', | |
| TargetLanguageCode=target_lang | |
| ) | |
| temp = result['TranslatedText'] | |
| rv += temp | |
| return rv | |
| def get_translation(text: str, language: str): # Google translate | |
| params = { | |
| 'q': text, | |
| 'source': 'en', | |
| 'target': language, | |
| 'format': 'text', | |
| 'key': google_translate_key | |
| } | |
| response = requests.post( | |
| 'https://translation.googleapis.com/language/translate/v2', | |
| data=params | |
| ) | |
| if response.status_code == 200: | |
| translation = response.json()['data']['translations'][0]['translatedText'] | |
| return translation | |
| else: | |
| return 'translation failed' | |
| def etz_now(): | |
| eastern = pytz.timezone('US/Eastern') | |
| ltime = datetime.now(eastern) | |
| return ltime.strftime('%Y-%m-%d') | |
| def populate_bible_books(bible_books, reverse_bible_books): | |
| rv = True | |
| if len(bible_books) == 66: | |
| return (rv, bible_books, reverse_bible_books) | |
| try: | |
| bible_books = {} | |
| reverse_bible_books = {} | |
| path = Path(dataDir + 'BibleBooks.txt') | |
| if path.is_file(): | |
| with open(path, 'rt', encoding='utf-8') as fp: | |
| lines = fp.readlines() | |
| book_num = 0 | |
| for line in lines: | |
| if line.startswith('#'): | |
| continue | |
| book_num += 1 | |
| items = line.split(',') | |
| short_name = items[0].strip() | |
| long_name = items[1].strip() | |
| bible_books[short_name] = (book_num, long_name) | |
| reverse_bible_books[long_name.casefold()] = short_name | |
| else: | |
| bible_books = {} | |
| reverse_bible_books = {} | |
| rv = False | |
| except: | |
| bible_books = {} | |
| reverse_bible_books = {} | |
| rv = False | |
| return (rv, bible_books, reverse_bible_books) | |
| def init_db_and_bible_books(en_heb, bible_books, reverse_bible_books): | |
| db_paths = glob(dataDir + '*.db') | |
| db_list = [] | |
| for path in db_paths: | |
| db_list.append(os.path.basename(path)[:-3]) | |
| db_list.append('All Teaching Topics') | |
| try: | |
| path = Path(dataDir + 'HebrewGlossary.txt') | |
| if path.is_file(): | |
| with open(path, 'rt', encoding='utf-8') as fp: | |
| lines = fp.readlines() | |
| for line in lines: | |
| if line.startswith('#'): | |
| continue | |
| items = line.split(',') | |
| en_heb[items[0].casefold().strip()] = items[1].strip() | |
| else: | |
| en_heb = {} | |
| except: | |
| en_hep = {} | |
| (rv, bible_books, reverse_bible_books) = populate_bible_books(bible_books, reverse_bible_books) | |
| return [gr.Dropdown(choices=db_list, value=db_list[0]), | |
| gr.DateTime(value=etz_now()), en_heb, bible_books, reverse_bible_books] | |
| # gr.Timer(active=False), | |
| def fix_date(date): | |
| try: | |
| dt = parse(date) | |
| date = dt.strftime('%Y-%m-%d') | |
| pattern = r'\d{4}-\d{2}-\d{2}' | |
| str = re.match(pattern, date, re.A) | |
| if not str: | |
| rv = None | |
| else: | |
| rv = date.replace('-','') | |
| except: | |
| rv = None | |
| return rv | |
| def set_db(value): | |
| return value | |
| def remove_times(txt): | |
| pattern = '\s\[\d+\]\s' | |
| rv = re.sub(pattern, ' ', txt) | |
| return rv | |
| def correct_time(time, txt): | |
| loc = txt.find('[') | |
| if loc < 10: | |
| return time | |
| delta = int(loc/400 * 30000) | |
| time = int(time) - delta | |
| if time < 0: | |
| time = 0 | |
| return time | |
| def remove_headers(txt): | |
| frag = txt[0:60] | |
| loc = frag.find('udate') | |
| if loc > -1: | |
| loc2 = frag.find('[') | |
| if loc2 > -1: | |
| txt = ' ' + txt[loc2:] | |
| return txt | |
| def seek_hms(seek_ms): | |
| seek_ms /= 1000; | |
| hrs = int(seek_ms / 3600) | |
| mins = int((seek_ms - hrs * 3600) / 60) | |
| secs = int(seek_ms - hrs * 3600 - mins * 60) | |
| return f'{hrs}h{mins}m{secs}s' | |
| def do_bible_search(prompt, db_name, books, book_filter): | |
| db_name += '.db' | |
| if (not os.path.exists(dataDir + db_name)): | |
| return ([]) | |
| embeddings = get_bible_db_embeddings(db_name) | |
| (prompt_embed, prompt_tokens, total_tokens) = get_prompt_embedding(prompt) | |
| dot_products = [] | |
| for (book, chapter, verse_range, verse, db_embed) in embeddings: | |
| if not book_filter or book in books: | |
| dp = dot_product(prompt_embed, db_embed) | |
| dot_products.append((book, chapter, verse_range, verse, dp) ) | |
| sorted_dots = sorted(dot_products, key=lambda x: x[4])[-10:] # was -10, -5 | |
| sorted_dots.reverse() | |
| return (sorted_dots, prompt_tokens, total_tokens) | |
| def get_bible_db_embeddings(db_name): | |
| embeds = [] | |
| conn = sqlite3.connect(dataDir + db_name) | |
| cur = conn.cursor() | |
| result = cur.execute('SELECT * from embeds') | |
| unpacker = struct.Struct('<f') | |
| for row in result.fetchall(): | |
| book = row[0] | |
| chapter = row[1] | |
| verse_range = row[2] | |
| verse = row[3] | |
| embed = row[4] | |
| x = [] | |
| row_embed = [] | |
| for i in range(1536): | |
| j = 4*i | |
| x=bytes(embed[j:j+4]) | |
| val = unpacker.unpack(x)[0] | |
| row_embed.append(val) | |
| embeds.append( (book, chapter, verse_range, verse, row_embed) ) | |
| conn.close() | |
| return embeds | |
| def do_search(prompt, db_name, start_date, end_date, find_verses): | |
| db_name += '.db' | |
| if find_verses: | |
| max_returned = -10 # was -50 | |
| else: | |
| max_returned = -10 | |
| if (not os.path.exists(dataDir + db_name)) and (not 'All' in db_name): | |
| return ([], 0, 0) | |
| embeddings = get_db_embeddings(db_name) | |
| (prompt_embed, prompt_tokens, total_tokens) = get_prompt_embedding(prompt) | |
| dot_products = [] | |
| for (name, text, time, yt_id, udate, db_embed) in embeddings: | |
| udate = udate.replace('"','') | |
| if not 'unknown' in udate.casefold(): | |
| if int(udate) < int(start_date) or int(udate) > int(end_date): | |
| continue | |
| else: | |
| udate = 'Date unknown' | |
| dp = dot_product(prompt_embed, db_embed) | |
| dot_products.append((name, text, time, yt_id, udate, dp) ) | |
| sorted_dots = sorted(dot_products, key=lambda x: x[5])[max_returned:] # was -10 | |
| sorted_dots.reverse() | |
| return (sorted_dots, prompt_tokens, total_tokens) | |
| def get_bible_refs(txt: str, bible_books)->[str]: | |
| rv = [] | |
| # txt = txt.casefold() | |
| for item in bible_books.items(): | |
| (key, (num, book)) = item | |
| words = book.split() | |
| if len(words) == 2: | |
| word = ' ' + words[0] + ' ' + words[1] | |
| elif len(words) == 3: | |
| word = ' ' + words[0] + ' ' + words[1] + ' ' + words[2] | |
| else: | |
| word = ' ' + words[0] | |
| if word in txt: | |
| rv.append(book) | |
| return rv | |
| def dot_product(v1, v2): | |
| # v1n = np.array(v1) | |
| # v2n = np.array(v2) | |
| # dotp = float(np.dot(v1n, v2n)) | |
| dotp = 0.0 | |
| for i in range(len(v1)): | |
| dotp += v1[i]*v2[i] | |
| return dotp | |
| def get_db_embeddings(db_name): | |
| if 'all' in db_name.casefold(): | |
| embeds = [] | |
| db_paths = glob(dataDir + '*.db') | |
| for path in db_paths: | |
| if 'ISR' in path: | |
| continue | |
| embeds.extend(append_db_embeddings(os.path.basename(path))) | |
| else: | |
| embeds = append_db_embeddings(db_name) | |
| return embeds | |
| def append_db_embeddings(db_name): | |
| embeds = [] | |
| conn = sqlite3.connect(dataDir + db_name) | |
| cur = conn.cursor() | |
| result = cur.execute('SELECT * from Embeds') | |
| unpacker = struct.Struct('<f') | |
| for row in result.fetchall(): | |
| time = row[1] | |
| name = row[2] | |
| text = row[3] | |
| yt_id = row[4] | |
| if len(row) == 7: | |
| udate = row[5] | |
| embed = row[6] | |
| else: | |
| embed = row[5] | |
| udate = 'Date unknown' | |
| x = [] | |
| row_embed = [] | |
| for i in range(1536): | |
| j = 4*i | |
| x=bytes(embed[j:j+4]) | |
| val = unpacker.unpack(x)[0] | |
| row_embed.append(val) | |
| embeds.append( (name, text, time, yt_id, udate, row_embed) ) | |
| conn.close() | |
| return (embeds) | |
| def get_prompt_embedding(txt): | |
| response = Client().embeddings.create( | |
| input=txt, | |
| model="text-embedding-3-small" | |
| ) | |
| embedding = response.data[0].embedding | |
| prompt_tokens = response.usage.prompt_tokens | |
| total_tokens = response.usage.total_tokens | |
| return (embedding, prompt_tokens, total_tokens) | |
| def write_db_file(fpath): | |
| try: | |
| with open(fpath, 'rb') as fp: | |
| data = fp.read() | |
| fname = os.path.basename(fpath) | |
| except: | |
| return 'Unable to load database, could not read selected file' | |
| try: | |
| with open(dataDir + fname, 'wb') as fp: | |
| fp.write(data) | |
| except: | |
| return 'Unable to load database, could not write data' | |
| try: | |
| os.remove(fpath) | |
| except: | |
| return "Database loaded, but error deleting temp file" | |
| return 'Database loaded' | |
| def Client(): | |
| if site=='local': | |
| return OpenAI(api_key = key) | |
| else: | |
| #return OpenAI(api_key = key2 + key3) | |
| return OpenAI(api_key = keyb) | |
| def md(txt): | |
| # if 'DOCTYPE' in txt: | |
| # return str(txt.replace('GPT','<br>GPT')) | |
| # else: | |
| return str(txt).replace('```', ' ').replace(' ', ' ').replace(' ', ' ').replace(' ', ' ').replace('\n','<br>') | |
| # return txt | |
| def genUsageStats(do_reset=False): | |
| result = [] | |
| ttotal_embed_in = 0 | |
| ttotal_embed_out = 0 | |
| ttotal4mini_in = 0 | |
| ttotal4mini_out = 0 | |
| totalAudio = 0 | |
| totalTranslation = 0 | |
| totalImages = 0 | |
| totalHdImages = 0 | |
| if do_reset: | |
| dudPath = dataDir + '_speech.txt' | |
| if os.path.exists(dudPath): | |
| os.remove(dudPath) | |
| for user in unames: | |
| tokens_embed_in = 0 | |
| tokens_embed_out = 0 | |
| tokens4mini_in = 0 | |
| tokens4mini_out = 0 | |
| fp = dataDir + user + '_log.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (u, t) = line.split(':') | |
| (t, m) = t.split('-') | |
| (tin, tout) = t.split('/') | |
| incount = int(tin) | |
| outcount = int(tout) | |
| if 'mini' in m: | |
| tokens4mini_in += incount | |
| tokens4mini_out += outcount | |
| ttotal4mini_in += incount | |
| ttotal4mini_out += outcount | |
| else: | |
| tokens_embed_in += incount | |
| tokens_embed_out += outcount | |
| ttotal_embed_in += incount | |
| ttotal_embed_out += outcount | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading stats for user: {user}' | |
| userAudio = 0 | |
| fp = dataDir + user + '_audio.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userAudio += int(len) | |
| totalAudio += int(userAudio) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading audio stats for user: {user}' | |
| userTranslation = 0 | |
| fp = dataDir + user + '_translation.txt' | |
| if os.path.exists(fp): | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| with open(fp) as f: | |
| dataList = f.readlines() | |
| if do_reset: | |
| os.remove(fp) | |
| else: | |
| for line in dataList: | |
| (dud, len) = line.split(':') | |
| userTranslation += int(len) | |
| totalTranslation += int(userTranslation) | |
| accessOk = True | |
| break | |
| except: | |
| sleep(3) | |
| if not accessOk: | |
| return f'File access failed reading speech stats for user: {user}' | |
| user_images = 0 | |
| user_hd_images = 0 | |
| result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens_embed_in}/{tokens_embed_out}', f'audio:{userAudio}',f'translate:{userTranslation}', f'images:{user_images}/{user_hd_images}']) | |
| result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal_embed_in}/{ttotal_embed_out}', f'audio:{totalAudio}',f'translate:{totalTranslation}', f'images:{totalImages}/{totalHdImages}']) | |
| return result | |
| def new_conversation(user): | |
| return [None, [], gr.Markdown(value='', label='Dialog', container=True), '', '1990-01-01', etz_now()] | |
| def updatePassword(user, pwd): | |
| password = pwd.lower().strip() | |
| if user == unames[0] and password == pwdList[0]: | |
| return [password, "*********", gr.Button(visible=True, value='Upload Database')] | |
| else: | |
| return [password, "*********", gr.Button(visible=False, value='Upload Database')] | |
| def chat(prompt, user_window, pwd_window, past, response, gptModel, clip_text, db_name, | |
| start_date,end_date, language, en_hebrew, books, book_filter, find_verses, | |
| bible_books, reverse_bible_books): | |
| user_window = user_window.lower().strip() | |
| translation_count = 0 | |
| if len(prompt.strip()) == 0: | |
| return [past, 'You must enter a prompt or question', None, gptModel,clip_text] | |
| fixed_date = fix_date(start_date) | |
| if not fixed_date: | |
| return [past, f'"{start_date}" is not a valid start date, please use a common format', None, gptModel,clip_text] | |
| start_date = fixed_date | |
| fixed_date = fix_date(end_date) | |
| if not end_date: | |
| return [past, f'"{end_date}"" is not a valid end date, please use a common format', None, gptModel,clip_text] | |
| end_date = fixed_date | |
| isBoss = False | |
| clip_txt = clip_text | |
| if not response: | |
| response = '' | |
| else: | |
| loc = response.find('<h5>') # 'Following are Clips') | |
| if loc > -1: | |
| response = response[:loc].strip() | |
| # if response.endswith('<h5>'): | |
| # response = response[:-4] | |
| plot = gr.LinePlot(visible=False) | |
| # plot = gr.Plot(visible=False) | |
| if user_window == unames[0] and pwd_window == pwdList[0]: | |
| isBoss = True | |
| if prompt.startswith('delete'): | |
| db_path = dataDir + prompt[7:] | |
| if not os.path.exists(db_path): | |
| response = f'File {db_path} not found' | |
| else: | |
| os.remove(db_path) | |
| response = f'File {db_path} was deleted' | |
| return [past, str(response), None, gptModel,clip_text] | |
| if prompt == 'stats': | |
| response = genUsageStats() | |
| return [past, str(response), None, gptModel,clip_text] | |
| if prompt == 'reset': | |
| response = genUsageStats(True) | |
| return [past, md(response), None, gptModel,clip_text] | |
| if prompt.startswith("clean"): | |
| user = prompt[6:] | |
| response = f'cleaned all .wav and .b64 files for {user}' | |
| final_clean_up(user, True) | |
| return [past, response, None, gptModel,clip_text] | |
| if prompt.startswith('files'): | |
| (log_cnt, wav_cnt, other_cnt, others, log_list) = list_permanent_files() | |
| response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}\nlogs: {str(log_list)}' | |
| return [past, response, None, gptModel,clip_text] | |
| if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: | |
| chatType = 'normal' | |
| prompt = prompt.strip() | |
| finish_reason = 'ok' | |
| rag_txt = '' | |
| rag_txt2 = '' | |
| prompt_bare = prompt | |
| translation_count += update_translation_count(len(prompt), language) | |
| prompt = azure_translate_text(prompt, "en", language) | |
| first_time = False | |
| prompt_tokens = 0 | |
| total_tokens = 0 | |
| clip_list = [] | |
| bible_list = [] | |
| max_clips = 5 + 5 * (language == 'en') | |
| tokens_in = 0 | |
| tokens_out = 0 | |
| tokens = 0 | |
| bible_search = False | |
| if len(past) == 0: | |
| first_time = True | |
| if 'bible' in db_name.casefold(): | |
| bible_search = True | |
| msg = check_books(book_filter, books) | |
| if msg != 'ok': | |
| return [past, msg, None, gptModel,clip_text] | |
| instructions = '''You are a helpful assistant who has expert knowledge | |
| of the Bible and is familiar with Hebrew versions of biblical names. ''' | |
| past.append({'role':'developer', 'content': instructions}) | |
| prompt = make_hebrew(prompt, en_hebrew) | |
| (results, prompt_tokens, total_tokens) = do_bible_search(prompt, | |
| db_name, | |
| books, | |
| book_filter) | |
| insert = '' | |
| if book_filter: | |
| book_listing = ' ,'.join(books) | |
| insert = f' From books in filter: {book_listing}, ' | |
| txt = f'\n=================\n\n<h5>Following are ISR Bible verses in response to your query.</h5>{insert} Listed in the order they appear in the bible:\n=================\n' | |
| bible_list.append(txt) | |
| if len(results) == 0: | |
| txt = '\n**Sorry, no bible verses were found in response to your prompt**\n' | |
| return [past, txt, None, gptModel,clip_text] | |
| # bible_list.append(txt) | |
| max_dp = 0.0 | |
| good_count = 0 | |
| sorted_passages = make_sorted_passages(results, bible_books) | |
| for (book_num, book, chapter, verse_range, verse, relevance) in sorted_passages: | |
| verse = verse.rstrip(" )\n") | |
| if relevance > 1: | |
| good_count += 1 | |
| rag_line = f'{book}:{chapter}:{verse_range}\n{verse}\n' | |
| rag_txt += rag_line | |
| verse += f'\n({relevance_terms[relevance]} to query)' | |
| line = f'<h5>{book}:{chapter}:{verse_range}</h5>{verse}\n' | |
| bible_list.append(line) | |
| if good_count == 0: | |
| txt = '\n**Sorry, no relevant bible verses were found in response to your prompt**\n' | |
| return [past, txt, None, gptModel,clip_text] | |
| guidance = '''It is a group of bible passages. | |
| Each group is headed by (Passage: Book Name, Chapter 3, Verses)''' | |
| prompt = rag_txt + '.\n ' + prompt + '\nGive higher priority to the information just provided.' \ | |
| + guidance | |
| else: # searching teachings | |
| chunk_num = 0 | |
| if find_verses: | |
| instructions = '''You are a helpful assistant who has expert knowledge | |
| of the Bible and is familiar with Hebrew versions of biblical names.''' | |
| past.append({'role':'developer', 'content': instructions}) | |
| # prompt = 'mentions of bible book, chapter and verse' | |
| (results, prompt_tokens, total_tokens) = do_search(prompt, db_name, | |
| start_date, end_date, find_verses) | |
| start_date = start_date[0:4] + '-' + start_date[4:6] + '-' + start_date[6:8] | |
| end_date = end_date[0:4] + '-' + end_date[4:6] + '-' + end_date[6:8] | |
| if find_verses: | |
| max_clips = 10 # was 50 | |
| clip_list = [] | |
| note = '' | |
| if find_verses: | |
| note = '''<b>Note:</b> Biblical character names the same as a book name are detected as books. | |
| <b>Warning about related passages:</b> AI sometimes hallucinates, identifying passages not related to teaching text | |
| ''' | |
| txt = f'\n=================\n\n<h5>Following are Clips and YouTube Links based on your initial query for dates between {start_date} and {end_date}:</h5>{note}=================\n' | |
| clip_list.append(azure_translate_text(txt, language)) | |
| translation_count += update_translation_count(len(txt), language) | |
| if len(results) == 0: | |
| txt = '\n**Sorry, no teachings were within the start/end dates you specified**\n' | |
| txt = azure_translate_text(txt ,language) | |
| translation_count += update_translation_count(len(txt), language) | |
| return [past, txt, None, gptModel,clip_text] | |
| # clip_list.append(azure_translate_text(txt ,language)) | |
| # translation_count += update_translation_count(len(txt), language) | |
| for (name, text, time, yt_id, udate, dp) in results: | |
| time = correct_time(time, text) | |
| upload_date = udate.replace('"','') | |
| if not 'unknown' in upload_date.casefold(): | |
| upload_date = upload_date[0:4] + '-' + upload_date[4:6] + '-' + upload_date[6:8] | |
| yt_id = yt_id.replace('"','') | |
| seek_HMS = seek_hms(time) | |
| seek_colons = seek_HMS.replace('h',' : ').replace('m',' : ').replace('s','') | |
| text = remove_headers(text) | |
| pure_text = remove_times(text).replace('\n','') | |
| yt_url = f'https://youtu.be/{yt_id}?t={seek_HMS}' | |
| if len(clip_list) <= max_clips: | |
| if find_verses: | |
| book_refs = get_bible_refs(pure_text, bible_books) | |
| if len(book_refs) > 0: | |
| books_mentioned = ', '.join(book_refs) | |
| for bref in book_refs: | |
| pure_text = pure_text.replace(bref,'<b>' + bref + '</b>') | |
| else: | |
| books_mentioned = ['(None found)'] | |
| clip_list.append( | |
| md(f'\n\n<h5>{name} ({upload_date})</h5><h6>At seek time: {seek_colons}</h6>[YouTube Link: ]({yt_url})\nBooks mentioned: {books_mentioned}\n\n{pure_text}\n================')) | |
| rag_txt2 += f'\n[start chunk {chunk_num}]: {pure_text}\n[end chunk {chunk_num}]\n' | |
| rag_txt += pure_text | |
| chunk_num += 1 | |
| else: | |
| txt = azure_translate_text(pure_text, language) | |
| clip_list.append( | |
| md(f'\n\n<h5>{name} ({upload_date})</h5><h6>At seek time: {seek_colons}</h6>[YouTube Link: ]({yt_url})\n\n{txt}\n================')) | |
| translation_count += update_translation_count(len(txt), language) | |
| rag_txt += pure_text | |
| prompt = rag_txt + '.\n ' + prompt + '\nGive higher priority to the information just provided.' | |
| else: | |
| prompt += '\nGive higher priority to the information just provided.' | |
| past.append({"role":"user", "content":prompt}) | |
| completion = Client().chat.completions.create(model=gptModel, messages=past) | |
| reporting_model = gptModel | |
| reply = completion.choices[0].message.content | |
| if find_verses and first_time and not bible_search: | |
| past2 = past.copy() | |
| past2.pop() | |
| order = ''' You have been provided a series of chunks delineated | |
| by [start chunk #] and [end chunk #]. In each chunk, find citations of bible book, chapter and verse. Make a | |
| list with each item formatted as {chunk #, book, chapter, verse}''' | |
| prompt = rag_txt2 + '\n' + order | |
| past2.append({"role":"user", "content":prompt}) | |
| reporting_model = gptModel | |
| completion2 = Client().chat.completions.create(model=gptModel, messages=past2) | |
| reply2 = completion2.choices[0].message.content | |
| tokens_in += completion2.usage.prompt_tokens | |
| tokens_out += completion2.usage.completion_tokens | |
| tokens += completion2.usage.total_tokens | |
| ml = parse_verse_refs(reply2, reverse_bible_books) | |
| prior_psg = '' | |
| prior_idx = '' | |
| for (idx, bk, ch, vn) in ml: | |
| psg = get_bible_verse(bk, ch, vn) | |
| if psg == prior_psg and idx == prior_idx: | |
| continue | |
| else: | |
| prior_psg = psg | |
| prior_idx = idx | |
| if len(psg) > 0: | |
| (dud, this_book) = bible_books[bk] | |
| if not this_book in clip_list[int(idx)+1]: | |
| continue | |
| # psg = '<b>[??? Relationship questionable]</b> ' + psg | |
| clip_list[int(idx)+1] += ('\n<h5>Possible Related Bible passage: </h5>' + psg + '\n') | |
| reply = azure_translate_text(reply, language) | |
| translation_count += update_translation_count(len(reply), language) | |
| tokens_in += completion.usage.prompt_tokens | |
| tokens_out += completion.usage.completion_tokens | |
| tokens += completion.usage.total_tokens | |
| response += "\n\n***YOU***: " + prompt_bare + "\n\n***GPT***: " + reply.replace('```','\n\n```\n\n') | |
| # if SLICE_TRANS: | |
| if translation_count > 0: | |
| with open(dataDir + user_window + '_translation.txt','a') as f: | |
| f.write(f'Translation:{translation_count}\n') | |
| if len(clip_list) > 0: | |
| response += md(' '.join(map(str, clip_list))) | |
| if len(bible_list) > 0: | |
| response += md(' '.join(map(str, bible_list))) | |
| if isBoss: | |
| response += md(f"\n\n{reporting_model}: tokens in/out = {tokens_in}/{tokens_out}\n") | |
| if finish_reason != 'ok': | |
| response += md(f"\n{finish_reason}\n") | |
| if tokens > 40000: | |
| response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." | |
| past.append({"role":"assistant", "content": reply}) | |
| accessOk = False | |
| for i in range(3): | |
| try: | |
| dataFile = new_func(user_window) | |
| with open(dataFile, 'a') as f: | |
| m = '4omini' | |
| f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') | |
| if (prompt_tokens + total_tokens) > 0: | |
| f.write(f'{user_window}:{prompt_tokens}/{total_tokens}-embed\n') | |
| accessOk = True | |
| break | |
| except Exception as e: | |
| sleep(3) | |
| if not accessOk: | |
| response += f"\nDATA LOG FAILED, path = {dataFile}" | |
| return [past, response , None, gptModel,clip_txt] | |
| else: | |
| return [[], "User name and/or password are incorrect", prompt, gptModel,clip_txt] | |
| def new_func(user): | |
| dataFile = dataDir + user + '_log.txt' | |
| return dataFile | |
| def transcribe(user, pwd, fpath): | |
| user = user.lower().strip() | |
| pwd = pwd.lower().strip() | |
| if not (user in unames and pwd in pwdList): | |
| return 'Bad credentials' | |
| with audioread.audio_open(fpath) as audio: | |
| duration = int(audio.duration) | |
| if duration > 0: | |
| with open(dataDir + user + '_audio.txt','a') as f: | |
| f.write(f'audio:{str(duration)}\n') | |
| with open(fpath,'rb') as audio_file: | |
| transcript = Client().audio.transcriptions.create( | |
| model='whisper-1', file = audio_file ,response_format = 'text' ) | |
| reply = transcript | |
| return str(reply) | |
| def pause_message(): | |
| return "Audio input is paused. Resume or Stop as desired" | |
| def update_user(user_win): | |
| user_win = user_win.lower().strip() | |
| user = 'unknown' | |
| for s in unames: | |
| if user_win == s: | |
| user = s | |
| break | |
| return [user, user] | |
| def speech_worker(chunks=[],q=[]): | |
| for chunk in chunks: | |
| fpath = q.pop(0) | |
| response = Client().audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') | |
| with open(fpath, 'wb') as fp: | |
| fp.write(response.content) | |
| def gen_speech_file_names(user, cnt): | |
| rv = [] | |
| for i in range(0, cnt): | |
| rv.append(dataDir + f'{user}_speech{i}.wav') | |
| return rv | |
| def final_clean_up(user, do_b64 = False): | |
| user = user.strip().lower() | |
| if user == 'kill': | |
| flist = glob(dataDir + '*') | |
| elif user == 'all': | |
| flist = glob(dataDir + '*_speech*.wav') | |
| else: | |
| flist = glob(dataDir + f'{user}_speech*.wav') | |
| for fpath in flist: | |
| try: | |
| os.remove(fpath) | |
| except: | |
| continue | |
| def list_permanent_files(): | |
| flist = os.listdir(dataDir) | |
| others = [] | |
| log_cnt = 0 | |
| wav_cnt = 0 | |
| other_cnt = 0 | |
| list_logs = [] | |
| for fpath in flist: | |
| if fpath.endswith('.txt'): | |
| log_cnt += 1 | |
| list_logs.append(fpath) | |
| elif fpath.endswith('.wav'): | |
| wav_cnt += 1 | |
| else: | |
| others.append(fpath) | |
| other_cnt = len(others) | |
| if log_cnt > 5: | |
| list_logs = [] | |
| return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others), list_logs) | |
| def show_help(): | |
| txt = ''' | |
| MTOI Search scans a database you select that contains transcripts of MTOI video teachings, or the | |
| ISR Scriptures, finding sections/passages that relate to the question or topic you enter. | |
| It formulates a response based on that text found. It appends to the response as | |
| follows: | |
| For video teachings, it lists at least five text clips plus YouTube links to | |
| the video at the point when that text is spoken. Prompts may be entered in either | |
| English or the selected translation language. Responses will be given in the | |
| selected translation language. If you check "Find bible verses mentioned in selected teachings", | |
| Each teaching excerpt will (1) Display and highlight books mentioned and | |
| (2) AI will attempt to discern Book/Chapter/Verse citations and will display the related | |
| passage text following the teaching excerpt. | |
| For ISR_Bible searches, it lists up to ten bible passages. The AI response will be | |
| given in the selected translation language but the ISR Bible verses remain as found. | |
| Prompts/questions should be entered in English | |
| 1. Gemeral: | |
| 1.1 Login with user name and password (not case-sensitive) | |
| 1.2 Select a database (topic) using "Choose Topic". You can target all the teaching | |
| video databases by selecting "All Teaching Topics". Note: This selection does not | |
| include ISR_Bible scripture in the search. Bible and teachings searches are two | |
| distinct procedures and cannot be combined. | |
| 1.3 Select a Translation language (initially defaults to "English") | |
| 1.4 Type prompts (questions, topics) into "Prompt or Question" window. | |
| 1.5 For teaching videos, you can limit results based on the dates when the results were uploaded to YouTube with the | |
| Start Date and End Date entries. | |
| 1.6 For ISR_Bible searches, you can filter searches to any selection of books by checking | |
| Activate Filter and selecting book(s) you want to limit search to. | |
| 2. Search: | |
| 2.1 Enter prompt/question and tap the "Submit Prompt/Question" button. The responses appear in the Dialog window. | |
| 2.2 Enter follow-up questions in the Prompt window. Then tap "Submit Prompt/Question". | |
| 2.3 If topic changes, or when done chatting, tap the "Restart Conversation" button. | |
| Hints: | |
| 1. Better chat results are obtained by including more detail in prompts. Say what you want to know. | |
| You can ask for complex results like: "List the important points of these teachings". | |
| 2. Always tap "Restart Conversation" before changing chat topics. | |
| ''' | |
| return str(txt).replace('```', ' ').replace(' ', ' ').replace(' ', ' ').replace(' ', ' ').replace('\n','<br>') | |
| def upload_db_file(visibility): | |
| viz = not visibility | |
| return [viz, gr.File(visible=viz, type="filepath", interactive=True, label='Upload Database')] | |
| with gr.Blocks() as demo: # theme=gr.themes.Soft() | |
| history = gr.State([]) | |
| password = gr.State("") | |
| user = gr.State("unknown") | |
| model = gr.State("gpt-4o-mini") | |
| clip_text = gr.State("") | |
| file_browser_visibility = gr.State(False) | |
| q = gr.State([]) | |
| qsave = gr.State([]) | |
| en_hebrew = gr.State({}) | |
| bible_books = gr.State({}) | |
| reverse_bible_books = gr.State({}) | |
| gr.Markdown('# MTOI Search') | |
| gr.Markdown('Enter user name & password. Tap "Help & Hints" button for more instructions.') | |
| # timer = gr.Timer(value=2.0, active=True) | |
| with gr.Row(): | |
| user_window = gr.Textbox(label = "User Name") | |
| user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) | |
| pwd_window = gr.Textbox(label = "Password") | |
| help_button = gr.Button(value='Help & Hints') | |
| # with gr.Row(): | |
| # audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( | |
| # show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) | |
| # reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() | |
| with gr.Row(): | |
| clear_button = gr.Button(value="Restart Conversation", scale=3) | |
| db_chooser = gr.Dropdown(type="value", label='Choose Topic', show_label=True, scale=4, | |
| choices=['Good News', 'Passover', 'Marriage & Divorce','False Prophets'], interactive=True) | |
| lang_chooser = gr.Dropdown(label='Translation',show_label=True, scale=3, | |
| choices=[('English','en'),('Hebrew','he'),('Spanish','es'),('German','de'),('French','fr'), | |
| ('Japanese','ja'),('Romanian', 'ro'),('Afrikaans', 'af')], | |
| interactive = True) | |
| button_upload_db = gr.Button(value='Upload Database', visible=False, scale=2) | |
| # speak_output = gr.Button(value="Speak Dialog", visible=True, scale=2) | |
| submit_button = gr.Button(value="Submit Prompt/Question", scale=4) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| find_verses = gr.Checkbox( | |
| label='Find bible verses mentioned in selected teachings', | |
| value=False) | |
| prompt_window = gr.Textbox(label = "Prompt or Question", scale=3) | |
| with gr.Column(scale=2): | |
| filter_heading = gr.Markdown('### **Optional Date Filter. Most common formats are OK<br />such as 12/2004, jan 2015, 4 Dec 2012**') | |
| with gr.Row(): | |
| start_date = gr.Textbox(label='Start Date (YYYY-mm-dd)', scale =1,value='1990-01-01',max_lines=1) | |
| end_date = gr.Textbox(label='End Date (YYYY-mm-dd)', scale =1,value=etz_now(),max_lines=1) | |
| checkbox_filter = gr.Checkbox(label='Activate Book Filter', scale=2, | |
| show_label=True, visible=False) | |
| book_chooser = gr.Dropdown(choices=[],type='value', scale=3, | |
| multiselect=True, interactive=True, | |
| label='Book Filter, Select one or more', visible=False) | |
| gr.Markdown('### **Dialog:**') | |
| #output_window = gr.Text(container=True, label='Dialog') | |
| output_window = gr.Markdown(container=True) | |
| with gr.Row(): | |
| db_file = gr.File(visible=False, type="filepath", interactive=True, label='Upload Database') | |
| pwd_window.blur(updatePassword, inputs = [user_window, pwd_window], outputs = [password, pwd_window, button_upload_db]) | |
| submit_button.click(chat, | |
| inputs=[prompt_window, user_window, password, history, output_window, | |
| model, clip_text, db_chooser,start_date,end_date, lang_chooser, | |
| en_hebrew, book_chooser, checkbox_filter, find_verses, | |
| bible_books, reverse_bible_books], | |
| outputs=[history, output_window, prompt_window, model, clip_text]) | |
| clear_button.click(fn=new_conversation, inputs=[user_window], | |
| outputs=[prompt_window, history, output_window, clip_text, start_date, end_date]) | |
| help_button.click(fn=show_help, outputs=output_window) | |
| button_upload_db.click(fn=upload_db_file,inputs = [file_browser_visibility], | |
| outputs = [file_browser_visibility, db_file]) | |
| db_file.upload(fn=write_db_file, inputs=[db_file], outputs=[output_window]) | |
| db_chooser.input(fn=on_db_change,inputs= [db_chooser, bible_books], | |
| outputs= [filter_heading, start_date, end_date, book_chooser, | |
| checkbox_filter, find_verses]) | |
| # timer.tick(fn=init_db_and_bible_books, inputs=[en_hebrew, bible_books, reverse_bible_books], | |
| # outputs=[timer, db_chooser, end_date, en_hebrew, bible_books, reverse_bible_books]) | |
| demo.load(fn=init_db_and_bible_books, inputs=[en_hebrew, bible_books, reverse_bible_books], | |
| outputs=[db_chooser, end_date, en_hebrew, bible_books, reverse_bible_books]) | |
| checkbox_filter.input(fn=populate_book_chooser, | |
| inputs=[checkbox_filter, bible_books], | |
| outputs=[book_chooser]) | |
| demo.launch(share=True, allowed_paths=[dataDir], ssr_mode=False, theme=gr.themes.Soft()) | |