from encodings import utf_8 import os from pickle import POP import gradio as gr import openai from openai import OpenAI from dotenv import load_dotenv from pathlib import Path from time import sleep # import audioread # import queue # import threading from glob import glob import json from datetime import datetime, timedelta import sqlite3 import struct import re from dateutil.parser import * from openai.lib import azure from pydantic.type_adapter import R import pytz import requests import boto3 load_dotenv(override=True) key2 = os.getenv('OPENAI_API_KEY2') key3 = os.getenv('OPENAI_API_KEY3') key = os.getenv('OPENAI_API_KEY') keyb = os.getenv('OPENAI_API_KEYB') users = os.getenv('LOGNAME') unames = users.split(',') pwds = os.getenv('PASSWORD') pwdList = pwds.split(',') google_translate_key = os.getenv('GOOGLE_KEY') amazon_access_id = os.getenv('AMAZON_ACCESS_ID') amazon_access_secret = os.getenv('AMAZON_ACCESS_SECRET') azure_key = os.getenv('AZURE_KEY') site = os.getenv('SITE') if site == 'local': dp = Path('./data') dp.mkdir(exist_ok=True) dataDir = './data/' else: dp = Path('/data') dp.mkdir(exist_ok=True) dataDir = '/data/' speak_file = dataDir + "speek.wav" # client = OpenAI(api_key = key) #digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} languages ={'en':'English', 'es':'Spanish', 'de':'German', 'fr':'French', 'zh':'Chinese', 'ro':'Romanian', 'ja':'Japanese', 'he':'Hebrew', 'af':'Afrikaans'} relevance_terms = [ 'Probably little connection', 'Questionable relevance', 'May be relevant', 'Probably relevant', 'Likely highly relevant' ] def populate_book_chooser(active: bool, bible_books: {}): if active: books = [] for book in bible_books.keys(): books.append(book) return gr.Dropdown(choices=books) else: return gr.Dropdown() def get_bible_verse(book: str, chapter: str, verse_num: str)->str: verse_num = int(verse_num) rv = '' conn = sqlite3.connect(dataDir + 'ISR_bible.db') cur = conn.cursor() query = f'SELECT * from embeds where book = ? and chapter = ?' result = cur.execute(query, (book, chapter) ) for row in result.fetchall(): verse_range = row[2] (n1, n2) = verse_range.split('-') if verse_num >= int(n1) and verse_num <= int(n2): verse_text = row[3] rv = verse_text break; conn.close() return rv def parse_verse_refs(txt: str, reverse_bible_books)->[]: subs = {'1 ':'First ', '2 ':'Second ', '3 ':' Third'} rv = [] pattern = r'\{(.+),(.+),(.+),(.+)\}' ml = re.findall(pattern, txt) for (n, bk, ch, vs) in ml: try: if '#' in n: continue if 'chunk' in n.casefold(): chunk_num = int(n.strip().split()[1]) else: chunk_num = int(n.strip()) book = bk.strip() for c in ['1 ','2 ','3 ']: if book.startswith(c): book = book.replace(c, subs[c]) short_book = reverse_bible_books.get(book.casefold(), None) chapter = int(ch.strip()) verse = int(vs.strip()) if short_book: rv.append( (chunk_num, short_book, chapter, verse) ) except: continue return rv # def set_prompt(is_find_verses): # txt = '' # if (is_find_verses): # txt = "bible books or verses mentioned. (Go ahead and tap 'Submit Prompt/Question')" # return gr.Textbox(value=txt) def check_books(filter: bool, books: []): if len(books) == 0 and filter: return md('
Warning: You enabled bible book filter but have no books selected in filter
\n') else: return 'ok' def on_db_change(db_name: str, bible_books): date_vis = True bible_vis = False books = [] for key in bible_books.keys(): books.append(key) if 'ISR' in db_name: date_vis = False bible_vis = True return [gr.Markdown(visible=date_vis), gr.Textbox(visible=date_vis), gr.Textbox(visible=date_vis), gr.Dropdown(visible=bible_vis, choices=books, interactive=True), gr.Checkbox(visible=bible_vis), gr.Checkbox(visible=date_vis, value=False)] def make_sorted_passages(passages, bible_books): numbered_passages = [] for passage in passages: (book, chapter, verse_range, verse, dp) = passage (book_num, book) = bible_books.get(book, (0, 'Unknown') ) book_num *= 100000 chap_num = int(chapter) * 1000 verses = verse_range.split('-') verse_num = int(verses[0].strip()) sort_num = book_num + chap_num + verse_num relevance = get_relevance_number(dp) numbered_passages.append( (sort_num, book, chapter, verse_range, verse, relevance) ) sorted_passages = sorted(numbered_passages) return sorted_passages def get_relevance_number(dp: float)->int: rv = 0 if dp > 0.6: rv = 4 elif dp > 0.5: rv = 3 elif dp > 0.4: rv = 2 elif dp > 0.3: rv = 1 return rv def make_hebrew(prompt: str, en_hebrew: {})->str: prompt = prompt.casefold() for (key, val) in en_hebrew.items(): key = key.casefold() if key in prompt: prompt = prompt.replace(key, val) return prompt def update_translation_count(count, language): if language != 'en': return count else: return 0 def azure_translate_text(text, target_language, source_language = 'en'): if target_language == source_language: return text path = '/translate' endpoint = 'https://api.cognitive.microsofttranslator.com' constructed_url = endpoint + path headers = { 'Ocp-Apim-Subscription-Key': azure_key, 'Ocp-Apim-Subscription-Region': 'eastus', #'East US' 'Content-Type': 'application/json', } body = [{ 'text': text }] params = { 'api-version': '3.0', 'to': target_language } response = requests.post(constructed_url, headers=headers, params=params, json=body) response.raise_for_status() return response.json()[0]['translations'][0]['text'] def translate_text(text_list, target_lang): # Amazon translate client = boto3.client( 'translate', aws_access_key_id=amazon_access_id, aws_secret_access_key=amazon_access_secret, region_name='us-east-1' ) rv = '' for text in text_list: result = client.translate_text( Text=text, SourceLanguageCode='en', TargetLanguageCode=target_lang ) temp = result['TranslatedText'] rv += temp return rv def get_translation(text: str, language: str): # Google translate params = { 'q': text, 'source': 'en', 'target': language, 'format': 'text', 'key': google_translate_key } response = requests.post( 'https://translation.googleapis.com/language/translate/v2', data=params ) if response.status_code == 200: translation = response.json()['data']['translations'][0]['translatedText'] return translation else: return 'translation failed' def etz_now(): eastern = pytz.timezone('US/Eastern') ltime = datetime.now(eastern) return ltime.strftime('%Y-%m-%d') def populate_bible_books(bible_books, reverse_bible_books): rv = True if len(bible_books) == 66: return (rv, bible_books, reverse_bible_books) try: bible_books = {} reverse_bible_books = {} path = Path(dataDir + 'BibleBooks.txt') if path.is_file(): with open(path, 'rt', encoding='utf-8') as fp: lines = fp.readlines() book_num = 0 for line in lines: if line.startswith('#'): continue book_num += 1 items = line.split(',') short_name = items[0].strip() long_name = items[1].strip() bible_books[short_name] = (book_num, long_name) reverse_bible_books[long_name.casefold()] = short_name else: bible_books = {} reverse_bible_books = {} rv = False except: bible_books = {} reverse_bible_books = {} rv = False return (rv, bible_books, reverse_bible_books) def init_db_and_bible_books(en_heb, bible_books, reverse_bible_books): db_paths = glob(dataDir + '*.db') db_list = [] for path in db_paths: db_list.append(os.path.basename(path)[:-3]) db_list.append('All Teaching Topics') try: path = Path(dataDir + 'HebrewGlossary.txt') if path.is_file(): with open(path, 'rt', encoding='utf-8') as fp: lines = fp.readlines() for line in lines: if line.startswith('#'): continue items = line.split(',') en_heb[items[0].casefold().strip()] = items[1].strip() else: en_heb = {} except: en_hep = {} (rv, bible_books, reverse_bible_books) = populate_bible_books(bible_books, reverse_bible_books) return [gr.Dropdown(choices=db_list, value=db_list[0]), gr.DateTime(value=etz_now()), en_heb, bible_books, reverse_bible_books] # gr.Timer(active=False), def fix_date(date): try: dt = parse(date) date = dt.strftime('%Y-%m-%d') pattern = r'\d{4}-\d{2}-\d{2}' str = re.match(pattern, date, re.A) if not str: rv = None else: rv = date.replace('-','') except: rv = None return rv def set_db(value): return value def remove_times(txt): pattern = '\s\[\d+\]\s' rv = re.sub(pattern, ' ', txt) return rv def correct_time(time, txt): loc = txt.find('[') if loc < 10: return time delta = int(loc/400 * 30000) time = int(time) - delta if time < 0: time = 0 return time def remove_headers(txt): frag = txt[0:60] loc = frag.find('udate') if loc > -1: loc2 = frag.find('[') if loc2 > -1: txt = ' ' + txt[loc2:] return txt def seek_hms(seek_ms): seek_ms /= 1000; hrs = int(seek_ms / 3600) mins = int((seek_ms - hrs * 3600) / 60) secs = int(seek_ms - hrs * 3600 - mins * 60) return f'{hrs}h{mins}m{secs}s' def do_bible_search(prompt, db_name, books, book_filter): db_name += '.db' if (not os.path.exists(dataDir + db_name)): return ([]) embeddings = get_bible_db_embeddings(db_name) (prompt_embed, prompt_tokens, total_tokens) = get_prompt_embedding(prompt) dot_products = [] for (book, chapter, verse_range, verse, db_embed) in embeddings: if not book_filter or book in books: dp = dot_product(prompt_embed, db_embed) dot_products.append((book, chapter, verse_range, verse, dp) ) sorted_dots = sorted(dot_products, key=lambda x: x[4])[-10:] # was -10, -5 sorted_dots.reverse() return (sorted_dots, prompt_tokens, total_tokens) def get_bible_db_embeddings(db_name): embeds = [] conn = sqlite3.connect(dataDir + db_name) cur = conn.cursor() result = cur.execute('SELECT * from embeds') unpacker = struct.Struct(' int(end_date): continue else: udate = 'Date unknown' dp = dot_product(prompt_embed, db_embed) dot_products.append((name, text, time, yt_id, udate, dp) ) sorted_dots = sorted(dot_products, key=lambda x: x[5])[max_returned:] # was -10 sorted_dots.reverse() return (sorted_dots, prompt_tokens, total_tokens) def get_bible_refs(txt: str, bible_books)->[str]: rv = [] # txt = txt.casefold() for item in bible_books.items(): (key, (num, book)) = item words = book.split() if len(words) == 2: word = ' ' + words[0] + ' ' + words[1] elif len(words) == 3: word = ' ' + words[0] + ' ' + words[1] + ' ' + words[2] else: word = ' ' + words[0] if word in txt: rv.append(book) return rv def dot_product(v1, v2): # v1n = np.array(v1) # v2n = np.array(v2) # dotp = float(np.dot(v1n, v2n)) dotp = 0.0 for i in range(len(v1)): dotp += v1[i]*v2[i] return dotp def get_db_embeddings(db_name): if 'all' in db_name.casefold(): embeds = [] db_paths = glob(dataDir + '*.db') for path in db_paths: if 'ISR' in path: continue embeds.extend(append_db_embeddings(os.path.basename(path))) else: embeds = append_db_embeddings(db_name) return embeds def append_db_embeddings(db_name): embeds = [] conn = sqlite3.connect(dataDir + db_name) cur = conn.cursor() result = cur.execute('SELECT * from Embeds') unpacker = struct.Struct('GPT')) # else: return str(txt).replace('```', ' ').replace(' ', '  ').replace(' ', '  ').replace(' ', '  ').replace('\n','
') # return txt def genUsageStats(do_reset=False): result = [] ttotal_embed_in = 0 ttotal_embed_out = 0 ttotal4mini_in = 0 ttotal4mini_out = 0 totalAudio = 0 totalTranslation = 0 totalImages = 0 totalHdImages = 0 if do_reset: dudPath = dataDir + '_speech.txt' if os.path.exists(dudPath): os.remove(dudPath) for user in unames: tokens_embed_in = 0 tokens_embed_out = 0 tokens4mini_in = 0 tokens4mini_out = 0 fp = dataDir + user + '_log.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (u, t) = line.split(':') (t, m) = t.split('-') (tin, tout) = t.split('/') incount = int(tin) outcount = int(tout) if 'mini' in m: tokens4mini_in += incount tokens4mini_out += outcount ttotal4mini_in += incount ttotal4mini_out += outcount else: tokens_embed_in += incount tokens_embed_out += outcount ttotal_embed_in += incount ttotal_embed_out += outcount accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading stats for user: {user}' userAudio = 0 fp = dataDir + user + '_audio.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userAudio += int(len) totalAudio += int(userAudio) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading audio stats for user: {user}' userTranslation = 0 fp = dataDir + user + '_translation.txt' if os.path.exists(fp): accessOk = False for i in range(3): try: with open(fp) as f: dataList = f.readlines() if do_reset: os.remove(fp) else: for line in dataList: (dud, len) = line.split(':') userTranslation += int(len) totalTranslation += int(userTranslation) accessOk = True break except: sleep(3) if not accessOk: return f'File access failed reading speech stats for user: {user}' user_images = 0 user_hd_images = 0 result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens_embed_in}/{tokens_embed_out}', f'audio:{userAudio}',f'translate:{userTranslation}', f'images:{user_images}/{user_hd_images}']) result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal_embed_in}/{ttotal_embed_out}', f'audio:{totalAudio}',f'translate:{totalTranslation}', f'images:{totalImages}/{totalHdImages}']) return result def new_conversation(user): return [None, [], gr.Markdown(value='', label='Dialog', container=True), '', '1990-01-01', etz_now()] def updatePassword(user, pwd): password = pwd.lower().strip() if user == unames[0] and password == pwdList[0]: return [password, "*********", gr.Button(visible=True, value='Upload Database')] else: return [password, "*********", gr.Button(visible=False, value='Upload Database')] def chat(prompt, user_window, pwd_window, past, response, gptModel, clip_text, db_name, start_date,end_date, language, en_hebrew, books, book_filter, find_verses, bible_books, reverse_bible_books): user_window = user_window.lower().strip() translation_count = 0 if len(prompt.strip()) == 0: return [past, 'You must enter a prompt or question', None, gptModel,clip_text] fixed_date = fix_date(start_date) if not fixed_date: return [past, f'"{start_date}" is not a valid start date, please use a common format', None, gptModel,clip_text] start_date = fixed_date fixed_date = fix_date(end_date) if not end_date: return [past, f'"{end_date}"" is not a valid end date, please use a common format', None, gptModel,clip_text] end_date = fixed_date isBoss = False clip_txt = clip_text if not response: response = '' else: loc = response.find('
') # 'Following are Clips') if loc > -1: response = response[:loc].strip() # if response.endswith('
'): # response = response[:-4] plot = gr.LinePlot(visible=False) # plot = gr.Plot(visible=False) if user_window == unames[0] and pwd_window == pwdList[0]: isBoss = True if prompt.startswith('delete'): db_path = dataDir + prompt[7:] if not os.path.exists(db_path): response = f'File {db_path} not found' else: os.remove(db_path) response = f'File {db_path} was deleted' return [past, str(response), None, gptModel,clip_text] if prompt == 'stats': response = genUsageStats() return [past, str(response), None, gptModel,clip_text] if prompt == 'reset': response = genUsageStats(True) return [past, md(response), None, gptModel,clip_text] if prompt.startswith("clean"): user = prompt[6:] response = f'cleaned all .wav and .b64 files for {user}' final_clean_up(user, True) return [past, response, None, gptModel,clip_text] if prompt.startswith('files'): (log_cnt, wav_cnt, other_cnt, others, log_list) = list_permanent_files() response = f'{log_cnt} log files\n{wav_cnt} .wav files\n{other_cnt} Other files:\n{others}\nlogs: {str(log_list)}' return [past, response, None, gptModel,clip_text] if user_window in unames and pwd_window == pwdList[unames.index(user_window)]: chatType = 'normal' prompt = prompt.strip() finish_reason = 'ok' rag_txt = '' rag_txt2 = '' prompt_bare = prompt translation_count += update_translation_count(len(prompt), language) prompt = azure_translate_text(prompt, "en", language) first_time = False prompt_tokens = 0 total_tokens = 0 clip_list = [] bible_list = [] max_clips = 5 + 5 * (language == 'en') tokens_in = 0 tokens_out = 0 tokens = 0 bible_search = False if len(past) == 0: first_time = True if 'bible' in db_name.casefold(): bible_search = True msg = check_books(book_filter, books) if msg != 'ok': return [past, msg, None, gptModel,clip_text] instructions = '''You are a helpful assistant who has expert knowledge of the Bible and is familiar with Hebrew versions of biblical names. ''' past.append({'role':'developer', 'content': instructions}) prompt = make_hebrew(prompt, en_hebrew) (results, prompt_tokens, total_tokens) = do_bible_search(prompt, db_name, books, book_filter) insert = '' if book_filter: book_listing = ' ,'.join(books) insert = f' From books in filter: {book_listing}, ' txt = f'\n=================\n\n
Following are ISR Bible verses in response to your query.
{insert} Listed in the order they appear in the bible:\n=================\n' bible_list.append(txt) if len(results) == 0: txt = '\n**Sorry, no bible verses were found in response to your prompt**\n' return [past, txt, None, gptModel,clip_text] # bible_list.append(txt) max_dp = 0.0 good_count = 0 sorted_passages = make_sorted_passages(results, bible_books) for (book_num, book, chapter, verse_range, verse, relevance) in sorted_passages: verse = verse.rstrip(" )\n") if relevance > 1: good_count += 1 rag_line = f'{book}:{chapter}:{verse_range}\n{verse}\n' rag_txt += rag_line verse += f'\n({relevance_terms[relevance]} to query)' line = f'
{book}:{chapter}:{verse_range}
{verse}\n' bible_list.append(line) if good_count == 0: txt = '\n**Sorry, no relevant bible verses were found in response to your prompt**\n' return [past, txt, None, gptModel,clip_text] guidance = '''It is a group of bible passages. Each group is headed by (Passage: Book Name, Chapter 3, Verses)''' prompt = rag_txt + '.\n ' + prompt + '\nGive higher priority to the information just provided.' \ + guidance else: # searching teachings chunk_num = 0 if find_verses: instructions = '''You are a helpful assistant who has expert knowledge of the Bible and is familiar with Hebrew versions of biblical names.''' past.append({'role':'developer', 'content': instructions}) # prompt = 'mentions of bible book, chapter and verse' (results, prompt_tokens, total_tokens) = do_search(prompt, db_name, start_date, end_date, find_verses) start_date = start_date[0:4] + '-' + start_date[4:6] + '-' + start_date[6:8] end_date = end_date[0:4] + '-' + end_date[4:6] + '-' + end_date[6:8] if find_verses: max_clips = 10 # was 50 clip_list = [] note = '' if find_verses: note = '''Note: Biblical character names the same as a book name are detected as books. Warning about related passages: AI sometimes hallucinates, identifying passages not related to teaching text ''' txt = f'\n=================\n\n
Following are Clips and YouTube Links based on your initial query for dates between {start_date} and {end_date}:
{note}=================\n' clip_list.append(azure_translate_text(txt, language)) translation_count += update_translation_count(len(txt), language) if len(results) == 0: txt = '\n**Sorry, no teachings were within the start/end dates you specified**\n' txt = azure_translate_text(txt ,language) translation_count += update_translation_count(len(txt), language) return [past, txt, None, gptModel,clip_text] # clip_list.append(azure_translate_text(txt ,language)) # translation_count += update_translation_count(len(txt), language) for (name, text, time, yt_id, udate, dp) in results: time = correct_time(time, text) upload_date = udate.replace('"','') if not 'unknown' in upload_date.casefold(): upload_date = upload_date[0:4] + '-' + upload_date[4:6] + '-' + upload_date[6:8] yt_id = yt_id.replace('"','') seek_HMS = seek_hms(time) seek_colons = seek_HMS.replace('h',' : ').replace('m',' : ').replace('s','') text = remove_headers(text) pure_text = remove_times(text).replace('\n','') yt_url = f'https://youtu.be/{yt_id}?t={seek_HMS}' if len(clip_list) <= max_clips: if find_verses: book_refs = get_bible_refs(pure_text, bible_books) if len(book_refs) > 0: books_mentioned = ', '.join(book_refs) for bref in book_refs: pure_text = pure_text.replace(bref,'' + bref + '') else: books_mentioned = ['(None found)'] clip_list.append( md(f'\n\n
{name} ({upload_date})
At seek time: {seek_colons}
[YouTube Link: ]({yt_url})\nBooks mentioned: {books_mentioned}\n\n{pure_text}\n================')) rag_txt2 += f'\n[start chunk {chunk_num}]: {pure_text}\n[end chunk {chunk_num}]\n' rag_txt += pure_text chunk_num += 1 else: txt = azure_translate_text(pure_text, language) clip_list.append( md(f'\n\n
{name} ({upload_date})
At seek time: {seek_colons}
[YouTube Link: ]({yt_url})\n\n{txt}\n================')) translation_count += update_translation_count(len(txt), language) rag_txt += pure_text prompt = rag_txt + '.\n ' + prompt + '\nGive higher priority to the information just provided.' else: prompt += '\nGive higher priority to the information just provided.' past.append({"role":"user", "content":prompt}) completion = Client().chat.completions.create(model=gptModel, messages=past) reporting_model = gptModel reply = completion.choices[0].message.content if find_verses and first_time and not bible_search: past2 = past.copy() past2.pop() order = ''' You have been provided a series of chunks delineated by [start chunk #] and [end chunk #]. In each chunk, find citations of bible book, chapter and verse. Make a list with each item formatted as {chunk #, book, chapter, verse}''' prompt = rag_txt2 + '\n' + order past2.append({"role":"user", "content":prompt}) reporting_model = gptModel completion2 = Client().chat.completions.create(model=gptModel, messages=past2) reply2 = completion2.choices[0].message.content tokens_in += completion2.usage.prompt_tokens tokens_out += completion2.usage.completion_tokens tokens += completion2.usage.total_tokens ml = parse_verse_refs(reply2, reverse_bible_books) prior_psg = '' prior_idx = '' for (idx, bk, ch, vn) in ml: psg = get_bible_verse(bk, ch, vn) if psg == prior_psg and idx == prior_idx: continue else: prior_psg = psg prior_idx = idx if len(psg) > 0: (dud, this_book) = bible_books[bk] if not this_book in clip_list[int(idx)+1]: continue # psg = '[??? Relationship questionable] ' + psg clip_list[int(idx)+1] += ('\n
Possible Related Bible passage:
' + psg + '\n') reply = azure_translate_text(reply, language) translation_count += update_translation_count(len(reply), language) tokens_in += completion.usage.prompt_tokens tokens_out += completion.usage.completion_tokens tokens += completion.usage.total_tokens response += "\n\n***YOU***: " + prompt_bare + "\n\n***GPT***: " + reply.replace('```','\n\n```\n\n') # if SLICE_TRANS: if translation_count > 0: with open(dataDir + user_window + '_translation.txt','a') as f: f.write(f'Translation:{translation_count}\n') if len(clip_list) > 0: response += md(' '.join(map(str, clip_list))) if len(bible_list) > 0: response += md(' '.join(map(str, bible_list))) if isBoss: response += md(f"\n\n{reporting_model}: tokens in/out = {tokens_in}/{tokens_out}\n") if finish_reason != 'ok': response += md(f"\n{finish_reason}\n") if tokens > 40000: response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." past.append({"role":"assistant", "content": reply}) accessOk = False for i in range(3): try: dataFile = new_func(user_window) with open(dataFile, 'a') as f: m = '4omini' f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') if (prompt_tokens + total_tokens) > 0: f.write(f'{user_window}:{prompt_tokens}/{total_tokens}-embed\n') accessOk = True break except Exception as e: sleep(3) if not accessOk: response += f"\nDATA LOG FAILED, path = {dataFile}" return [past, response , None, gptModel,clip_txt] else: return [[], "User name and/or password are incorrect", prompt, gptModel,clip_txt] def new_func(user): dataFile = dataDir + user + '_log.txt' return dataFile def transcribe(user, pwd, fpath): user = user.lower().strip() pwd = pwd.lower().strip() if not (user in unames and pwd in pwdList): return 'Bad credentials' with audioread.audio_open(fpath) as audio: duration = int(audio.duration) if duration > 0: with open(dataDir + user + '_audio.txt','a') as f: f.write(f'audio:{str(duration)}\n') with open(fpath,'rb') as audio_file: transcript = Client().audio.transcriptions.create( model='whisper-1', file = audio_file ,response_format = 'text' ) reply = transcript return str(reply) def pause_message(): return "Audio input is paused. Resume or Stop as desired" def update_user(user_win): user_win = user_win.lower().strip() user = 'unknown' for s in unames: if user_win == s: user = s break return [user, user] def speech_worker(chunks=[],q=[]): for chunk in chunks: fpath = q.pop(0) response = Client().audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format='wav') with open(fpath, 'wb') as fp: fp.write(response.content) def gen_speech_file_names(user, cnt): rv = [] for i in range(0, cnt): rv.append(dataDir + f'{user}_speech{i}.wav') return rv def final_clean_up(user, do_b64 = False): user = user.strip().lower() if user == 'kill': flist = glob(dataDir + '*') elif user == 'all': flist = glob(dataDir + '*_speech*.wav') else: flist = glob(dataDir + f'{user}_speech*.wav') for fpath in flist: try: os.remove(fpath) except: continue def list_permanent_files(): flist = os.listdir(dataDir) others = [] log_cnt = 0 wav_cnt = 0 other_cnt = 0 list_logs = [] for fpath in flist: if fpath.endswith('.txt'): log_cnt += 1 list_logs.append(fpath) elif fpath.endswith('.wav'): wav_cnt += 1 else: others.append(fpath) other_cnt = len(others) if log_cnt > 5: list_logs = [] return (str(log_cnt), str(wav_cnt), str(other_cnt), str(others), list_logs) def show_help(): txt = ''' MTOI Search scans a database you select that contains transcripts of MTOI video teachings, or the ISR Scriptures, finding sections/passages that relate to the question or topic you enter. It formulates a response based on that text found. It appends to the response as follows: For video teachings, it lists at least five text clips plus YouTube links to the video at the point when that text is spoken. Prompts may be entered in either English or the selected translation language. Responses will be given in the selected translation language. If you check "Find bible verses mentioned in selected teachings", Each teaching excerpt will (1) Display and highlight books mentioned and (2) AI will attempt to discern Book/Chapter/Verse citations and will display the related passage text following the teaching excerpt. For ISR_Bible searches, it lists up to ten bible passages. The AI response will be given in the selected translation language but the ISR Bible verses remain as found. Prompts/questions should be entered in English 1. Gemeral: 1.1 Login with user name and password (not case-sensitive) 1.2 Select a database (topic) using "Choose Topic". You can target all the teaching video databases by selecting "All Teaching Topics". Note: This selection does not include ISR_Bible scripture in the search. Bible and teachings searches are two distinct procedures and cannot be combined. 1.3 Select a Translation language (initially defaults to "English") 1.4 Type prompts (questions, topics) into "Prompt or Question" window. 1.5 For teaching videos, you can limit results based on the dates when the results were uploaded to YouTube with the Start Date and End Date entries. 1.6 For ISR_Bible searches, you can filter searches to any selection of books by checking Activate Filter and selecting book(s) you want to limit search to. 2. Search: 2.1 Enter prompt/question and tap the "Submit Prompt/Question" button. The responses appear in the Dialog window. 2.2 Enter follow-up questions in the Prompt window. Then tap "Submit Prompt/Question". 2.3 If topic changes, or when done chatting, tap the "Restart Conversation" button. Hints: 1. Better chat results are obtained by including more detail in prompts. Say what you want to know. You can ask for complex results like: "List the important points of these teachings". 2. Always tap "Restart Conversation" before changing chat topics. ''' return str(txt).replace('```', ' ').replace(' ', '  ').replace(' ', '  ').replace(' ', '  ').replace('\n','
') def upload_db_file(visibility): viz = not visibility return [viz, gr.File(visible=viz, type="filepath", interactive=True, label='Upload Database')] with gr.Blocks() as demo: # theme=gr.themes.Soft() history = gr.State([]) password = gr.State("") user = gr.State("unknown") model = gr.State("gpt-4o-mini") clip_text = gr.State("") file_browser_visibility = gr.State(False) q = gr.State([]) qsave = gr.State([]) en_hebrew = gr.State({}) bible_books = gr.State({}) reverse_bible_books = gr.State({}) gr.Markdown('# MTOI Search') gr.Markdown('Enter user name & password. Tap "Help & Hints" button for more instructions.') # timer = gr.Timer(value=2.0, active=True) with gr.Row(): user_window = gr.Textbox(label = "User Name") user_window.blur(fn=update_user, inputs=user_window, outputs=[user, user_window]) pwd_window = gr.Textbox(label = "Password") help_button = gr.Button(value='Help & Hints') # with gr.Row(): # audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( # show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) # reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() with gr.Row(): clear_button = gr.Button(value="Restart Conversation", scale=3) db_chooser = gr.Dropdown(type="value", label='Choose Topic', show_label=True, scale=4, choices=['Good News', 'Passover', 'Marriage & Divorce','False Prophets'], interactive=True) lang_chooser = gr.Dropdown(label='Translation',show_label=True, scale=3, choices=[('English','en'),('Hebrew','he'),('Spanish','es'),('German','de'),('French','fr'), ('Japanese','ja'),('Romanian', 'ro'),('Afrikaans', 'af')], interactive = True) button_upload_db = gr.Button(value='Upload Database', visible=False, scale=2) # speak_output = gr.Button(value="Speak Dialog", visible=True, scale=2) submit_button = gr.Button(value="Submit Prompt/Question", scale=4) with gr.Row(): with gr.Column(scale=3): find_verses = gr.Checkbox( label='Find bible verses mentioned in selected teachings', value=False) prompt_window = gr.Textbox(label = "Prompt or Question", scale=3) with gr.Column(scale=2): filter_heading = gr.Markdown('### **Optional Date Filter. Most common formats are OK
such as   12/2004,     jan 2015,     4 Dec 2012**') with gr.Row(): start_date = gr.Textbox(label='Start Date (YYYY-mm-dd)', scale =1,value='1990-01-01',max_lines=1) end_date = gr.Textbox(label='End Date (YYYY-mm-dd)', scale =1,value=etz_now(),max_lines=1) checkbox_filter = gr.Checkbox(label='Activate Book Filter', scale=2, show_label=True, visible=False) book_chooser = gr.Dropdown(choices=[],type='value', scale=3, multiselect=True, interactive=True, label='Book Filter, Select one or more', visible=False) gr.Markdown('### **Dialog:**') #output_window = gr.Text(container=True, label='Dialog') output_window = gr.Markdown(container=True) with gr.Row(): db_file = gr.File(visible=False, type="filepath", interactive=True, label='Upload Database') pwd_window.blur(updatePassword, inputs = [user_window, pwd_window], outputs = [password, pwd_window, button_upload_db]) submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model, clip_text, db_chooser,start_date,end_date, lang_chooser, en_hebrew, book_chooser, checkbox_filter, find_verses, bible_books, reverse_bible_books], outputs=[history, output_window, prompt_window, model, clip_text]) clear_button.click(fn=new_conversation, inputs=[user_window], outputs=[prompt_window, history, output_window, clip_text, start_date, end_date]) help_button.click(fn=show_help, outputs=output_window) button_upload_db.click(fn=upload_db_file,inputs = [file_browser_visibility], outputs = [file_browser_visibility, db_file]) db_file.upload(fn=write_db_file, inputs=[db_file], outputs=[output_window]) db_chooser.input(fn=on_db_change,inputs= [db_chooser, bible_books], outputs= [filter_heading, start_date, end_date, book_chooser, checkbox_filter, find_verses]) # timer.tick(fn=init_db_and_bible_books, inputs=[en_hebrew, bible_books, reverse_bible_books], # outputs=[timer, db_chooser, end_date, en_hebrew, bible_books, reverse_bible_books]) demo.load(fn=init_db_and_bible_books, inputs=[en_hebrew, bible_books, reverse_bible_books], outputs=[db_chooser, end_date, en_hebrew, bible_books, reverse_bible_books]) checkbox_filter.input(fn=populate_book_chooser, inputs=[checkbox_filter, bible_books], outputs=[book_chooser]) demo.launch(share=True, allowed_paths=[dataDir], ssr_mode=False, theme=gr.themes.Soft())