File size: 3,691 Bytes
adf6185
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
import streamlit as st
import os
import json
import faiss
import numpy as np
from vosk import Model, KaldiRecognizer
import wave
from pydub import AudioSegment
from sentence_transformers import SentenceTransformer
from transformers import pipeline

# ------------------------------
# 1. LOAD MODELS OFFLINE
# ------------------------------

@st.cache_resource
def load_stt_model():
    model_path = "vosk-model-small-en-us-0.15"
    return Model(model_path)

@st.cache_resource
def load_embedding_model():
    return SentenceTransformer("all-MiniLM-L6-v2")

@st.cache_resource
def load_qa_model():
    return pipeline("question-answering", model="distilbert-base-cased-distilled-squad")

stt_model = load_stt_model()
embedder = load_embedding_model()
qa_model = load_qa_model()

# ------------------------------
# 2. FUNCTIONS
# ------------------------------

def transcribe_audio(file_path):
    wf = wave.open(file_path, "rb")
    rec = KaldiRecognizer(stt_model, wf.getframerate())
    rec.SetWords(True)

    text_result = ""

    while True:
        data = wf.readframes(4000)
        if len(data) == 0:
            break
        if rec.AcceptWaveform(data):
            res = rec.Result()
            text_result += json.loads(res)["text"] + " "

    final_res = rec.FinalResult()
    text_result += json.loads(final_res)["text"]

    return text_result.strip()


def convert_to_wav(uploaded_file):
    audio = AudioSegment.from_file(uploaded_file)
    output_path = "temp.wav"
    audio.export(output_path, format="wav")
    return output_path


def save_text(text):
    with open(f"transcripts/data.txt", "a", encoding="utf-8") as f:
        f.write(text + "\n")


def build_vector_db():
    with open("transcripts/data.txt", "r", encoding="utf-8") as f:
        docs = f.readlines()

    embeddings = embedder.encode(docs)
    index = faiss.IndexFlatL2(embeddings.shape[1])
    index.add(np.array(embeddings).astype("float32"))
    return docs, index


def retrieve(query, docs, index, top_k=3):
    q_emb = embedder.encode([query])
    D, I = index.search(np.array(q_emb).astype("float32"), top_k)
    results = [docs[i] for i in I[0]]
    return " ".join(results)


# ------------------------------
# 3. STREAMLIT UI
# ------------------------------

st.title("πŸ”΄ Offline GenAI RAG from Audio (No API β€’ No Internet)")
st.write("🎀 Upload or record audio β†’ πŸ“„ Convert to text β†’ πŸ€– Ask questions offline")

menu = st.sidebar.radio("Navigation", ["Upload Audio", "Ask Questions"])

# -----------------------------------
#  UPLOAD AUDIO PAGE
# -----------------------------------
if menu == "Upload Audio":
    st.header("🎀 Upload or Record Audio")

    audio_file = st.file_uploader("Upload audio file", type=["wav", "mp3", "m4a"])

    if audio_file:
        st.success("File uploaded successfully")

        wav_path = convert_to_wav(audio_file)

        st.info("Transcribing offline... please wait")

        text = transcribe_audio(wav_path)

        st.subheader("πŸ“ Transcribed Text")
        st.write(text)

        save_text(text)

        st.success("Saved locally in transcripts/ folder")

# -----------------------------------
#  ASK QUESTION PAGE
# -----------------------------------
if menu == "Ask Questions":
    st.header("❓ Ask Questions From Your Audio Knowledge Base")

    docs, index = build_vector_db()

    user_q = st.text_input("Enter your question")

    if st.button("Get Answer"):
        context = retrieve(user_q, docs, index)
        result = qa_model(question=user_q, context=context)

        st.subheader("🧠 Answer")
        st.write(result["answer"])

        st.caption("Based only on your stored audio transcriptions")