manabb's picture
Update app.py
e0bc77f verified
#R&D
import gradio as gr
import time
#correct with history
from huggingface_hub import hf_hub_download
import os
from langchain_community.vectorstores import FAISS
from langchain_huggingface import HuggingFaceEmbeddings
from transformers import pipeline
import shutil
import re
import json
from datetime import datetime
from collections import deque
user_repo_id = "manabb/nrl"
msg = ""
# History storage
HISTORY = []
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
faiss_path = hf_hub_download(repo_id=user_repo_id, filename="index.faiss", repo_type="dataset")
pkl_path = hf_hub_download(repo_id=user_repo_id, filename="index.pkl", repo_type="dataset")
folder_path = os.path.dirname(faiss_path)
vectorstore = FAISS.load_local(folder_path, embeddings, allow_dangerous_deserialization=True)
print(f"✅ Vectorstore: {vectorstore.index.ntotal} docs")
retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
summarizer = pipeline("summarization", model="google/flan-t5-small", device_map="cpu")
#=========================================TAB-1-START======================================
def intelligently_show_context_with_pages_resources(context, query, docs, top_n=3):
""" Intelligently extract paragraphs with PAGE NUMBERS + RESOURCE names """
display_context = []
display_context.append("📄 ****\n")
display_context.append("=" * 120)
paragraphs = [p.strip() for p in re.split(r'\n\s*\n', context) if p.strip()]
if not paragraphs:
paragraphs = context.split('\n')
query_words = set(re.findall(r'\w+', query.lower()))
scored_paras = []
for i, para in enumerate(paragraphs):
para_words = set(re.findall(r'\w+', para.lower()))
overlap = len(query_words.intersection(para_words))
score = overlap / max(len(query_words), 1)
scored_paras.append((para, score, i))
scored_paras.sort(key=lambda x: x[1], reverse=True)
for i, (para, score, para_idx) in enumerate(scored_paras[:top_n]):
if i < len(docs):
doc = docs[i]
metadata = doc.metadata
page_num = (metadata.get('page') or metadata.get('source_page') or
metadata.get('page_number') or 'N/A')
resource = (metadata.get('source') or metadata.get('filename') or
metadata.get('file_name') or metadata.get('document') or 'Unknown')
if isinstance(page_num, dict): page_num = page_num.get('page', 'N/A')
if isinstance(resource, dict): resource = resource.get('source', 'Unknown')
page_str = f"📍 Pg {page_num}" if page_num != 'N/A' else "📍 Pg ?"
resource_str = f"📁 {os.path.basename(resource)}" if resource != 'Unknown' else "📁 Unknown"
else:
page_str = "📍 Pg ?"
resource_str = "📁 Unknown"
marker = "🔥 TOP" if i < 2 else "⭐ RELEVANT"
score_pct = int(score * 100)
display_context.extend([
f"\n{marker} [{score_pct}%] {page_str} | {resource_str}",
para,
"─" * 100
])
if len(scored_paras) > top_n:
display_context.append(f"\n... +{len(scored_paras)-top_n} more from other pages/resources")
return "\n".join(display_context)
#===========================================================================
def save_to_history(query, summary, context, docs, timestamp=None):
"""Save query to conversation history"""
if timestamp is None:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
history_entry = {
"timestamp": timestamp,
"query": query,
"summary": summary,
"context_preview": context[:200] + "..." if len(context) > 200 else context,
"full_context_length": len(context),
"retrieved_docs": len(docs),
"top_resources": [os.path.basename(doc.metadata.get('source', 'Unknown')) for doc in docs[:3]],
"avg_relevance_score": sum([float(doc.metadata.get('score', 0)) for doc in docs[:5]]) / max(1, len(docs))
}
HISTORY.append(history_entry)
# Keep last 50 entries
if len(HISTORY) > 50:
HISTORY.pop(0)
print(f"💾 Saved to history #{len(HISTORY)}")
#================================================================================
def show_history_compact(limit=3):
"""Compact history for embedding in results."""
if not HISTORY:
return "No previous queries yet."
output = ""
for i, entry in enumerate(HISTORY[-limit:], 1):
output += f"\n{i}. **{entry['query'][:50]}...** [{entry['timestamp'][:16]}]"
output += f"\n 📄 {entry['retrieved_docs']} docs | {entry['top_resources'][0] if entry['top_resources'] else 'N/A'}"
output += f"\n 💡 {entry['summary'][:60]}..."
output += "\n" + "─" * 60
return output
#========================optimized the question
def reframe_question_with_history(user_question):
# Reframing prompt
reframe_prompt = f"""Generate a single, comprehensive question that best captures the information needed to address the user's query or intent and includes the context from the conversation history.
User's question: {user_question}
Only output the optimized question.
OPTIMIZED QUESTION:"""
# Use FLAN-T5 for reframing (lightweight)
reframer = pipeline("text2text-generation", model="google/flan-t5-small", device_map="cpu")
reframed = reframer(
reframe_prompt,
max_new_tokens=100,
max_length=512,
temperature=0.1,
do_sample=False
)[0]['generated_text']
# Extract just the question
optimized_question = reframed.split("OPTIMIZED QUESTION:")[-1].strip()
if not optimized_question or len(optimized_question) < 10:
optimized_question = user_question # Fallback
return optimized_question
#========================main funcition-TAB1===========================
def summarize_with_flan_t5(query):
user_repo_id = "manabb/nrl"
msg=""
"""Generate bullet summary + context + HISTORY TRACKING."""
try:
# REFRARE QUESTION WITH HISTORY
#print("🔄 Reframing question with history...")
optimized_query = reframe_question_with_history(query)
msg=msg+" /n Your original querry : "+query
msg=msg+" /n The optimized querry : "+optimized_query
#print(f"📝 Original: {query}")
#print(f"📝 Optimized: {optimized_query}")
docs = retriever.invoke(optimized_query)
#print(f"✅ Retrieved {len(docs)} docs")
context = "\n".join([doc.page_content for doc in docs])
bullet_prompt = f"""Summarize as 4-6 bullet points:
{context[:900]}
Main Points:"""
bullet_summary = summarizer(bullet_prompt, max_length=200, min_length=50, do_sample=False)[0]['summary_text']
smart_context = intelligently_show_context_with_pages_resources(context, query, docs)
# ✅ SAVE TO HISTORY
save_to_history(query, bullet_summary, context, docs)
# ✅ COMBINE HISTORY + CURRENT RESULT
history_section = show_history_compact(limit=3) # Last 3 queries
combined_result = f"""
🤖 **YOUR Querry: "{query}"**
📋 **SUMMARY:**
{bullet_summary}
📄 **INTELLIGENT CONTEXT:**
{smart_context}
📜 **RECENT HISTORY** (last 3 queries):
{history_section}"""
#return combined_result # Single output with everything!
msg = msg+" \n "+ combined_result
except Exception as e1:
#print(f"❌ Error: {e1}")
#return f"Error: {e1}", f"Error: {e1}"
msg=f"Error: {e1}"
finally:
if os.path.exists("temp_faiss"):
shutil.rmtree("temp_faiss")
return msg
#==============================Main Function end
def login(user, pwd):
if user == "785699" and pwd == "781005":
return (
gr.update(visible=False),#loading_panel
gr.update(visible=False),#login_panel
gr.update(visible=True),#tabs_panel
"✅ Login successful"#status
)
return (
gr.update(visible=False),#loading_panel
gr.update(visible=True),#login_panel
gr.update(visible=False),#tabs_panel
"❌ Invalid credentials"#status
)
#================================
def load_resources():
time.sleep(3) # simulate FAISS / model loading
return (
gr.update(visible=False), # hide loading
gr.update(visible=True), # show login_panel
gr.update(visible=False) # hide tabs
)
#=====================================================TAB2 START====================================
#=============================================
def create_faiss_index(repo_id, file, embedding_model="sentence-transformers/all-MiniLM-L6-v2"):
"""Create FAISS index from PDF and upload to HF dataset repo"""
message = "Index creation started"
try:
# Step 1: Create proper embeddings object (CRITICAL FIX)
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# Step 2: Clean temp directory
if os.path.exists("temp_faiss"):
shutil.rmtree("temp_faiss")
# Step 3: Try PyPDFLoader first
loader = PyPDFLoader(file)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
new_docs = text_splitter.split_documents(documents)
db = FAISS.from_documents(new_docs, embeddings)
db.save_local("temp_faiss")
# Step 4: Upload to HF Hub
api = HfApi(token=os.getenv("HF_TOKEN"))
api.upload_file(path_or_fileobj="temp_faiss/index.faiss", path_in_repo="index.faiss", repo_id=repo_id, repo_type="dataset")
api.upload_file(path_or_fileobj="temp_faiss/index.pkl", path_in_repo="index.pkl", repo_id=repo_id, repo_type="dataset")
message = "✅ Index created successfully with PyPDFLoader and uploaded to repo"
except Exception as e1:
try:
print(f"PyPDFLoader failed: {e1}")
# Step 5: Fallback to PyMuPDFLoader
loader = PyMuPDFLoader(file)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
new_docs = text_splitter.split_documents(documents)
# Use same embeddings instance
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
db = FAISS.from_documents(new_docs, embeddings)
db.save_local("temp_faiss")
# Upload
api = HfApi(token=os.getenv("HF_TOKEN"))
api.upload_file(path_or_fileobj="temp_faiss/index.faiss", path_in_repo="index.faiss", repo_id=repo_id, repo_type="dataset")
api.upload_file(path_or_fileobj="temp_faiss/index.pkl", path_in_repo="index.pkl", repo_id=repo_id, repo_type="dataset")
message = f"✅ PyPDFLoader failed ({e1}), PyMuPDFLoader succeeded and uploaded to repo"
except Exception as e2:
message = f"❌ Both loaders failed. PyPDF: {e1}, PyMuPDF: {e2}"
finally:
# Cleanup
if os.path.exists("temp_faiss"):
shutil.rmtree("temp_faiss")
return message
# Usage
#result = create_faiss_index("your_username/your-dataset", "path/to/your/file.pdf")
#print(result)
#=============
def update_faiss_from_hf(repo_id, file, embedding_model="sentence-transformers/all-MiniLM-L6-v2"):
"""Load existing FAISS from HF, add new docs, push updated version."""
message = ""
try:
# Step 1: Create embeddings
embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
# Step 2: Download existing FAISS files
print("Downloading existing FAISS index...")
faiss_path = hf_hub_download(repo_id=repo_id, filename="index.faiss", repo_type="dataset")
pkl_path = hf_hub_download(repo_id=repo_id, filename="index.pkl", repo_type="dataset")
# Step 3: Load existing vectorstore
folder_path = os.path.dirname(faiss_path)
vectorstore = FAISS.load_local(
folder_path=folder_path,
embeddings=embeddings,
allow_dangerous_deserialization=True
)
message += f"✅ Loaded existing index with {vectorstore.index.ntotal} vectors\n"
# Step 4: Load new document with fallback
documents = None
loaders = [
("PyPDFLoader", PyPDFLoader),
("PyMuPDFLoader", PyMuPDFLoader)
]
for loader_name, LoaderClass in loaders:
try:
print(f"Trying {loader_name}...")
loader = LoaderClass(file)
documents = loader.load()
message += f"✅ Loaded {len(documents)} pages with {loader_name}\n"
break
except Exception as e:
message += f"❌ {loader_name} failed: {str(e)[:100]}...\n"
continue
if documents is None:
return "❌ All PDF loaders failed"
# Step 5: Split documents
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
new_docs = text_splitter.split_documents(documents)
message += f"✅ Created {len(new_docs)} chunks from new document\n"
# Step 6: Add new documents to existing index
vectorstore.add_documents(new_docs)
message += f"✅ Added to index. New total: {vectorstore.index.ntotal} vectors\n"
# Step 7: Save updated index
temp_dir = "temp_faiss_update"
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
vectorstore.save_local(temp_dir)
# Step 8: Upload updated files
api = HfApi(token=os.getenv("HF_TOKEN")) # Replace with your token
api.upload_file(
path_or_fileobj=f"{temp_dir}/index.faiss",
path_in_repo="index.faiss",
repo_id=repo_id,
repo_type="dataset"
)
api.upload_file(
path_or_fileobj=f"{temp_dir}/index.pkl",
path_in_repo="index.pkl",
repo_id=repo_id,
repo_type="dataset"
)
message += f"✅ Successfully updated repo with {len(new_docs)} new chunks!"
except Exception as e:
message += f"❌ Update failed: {str(e)}"
finally:
# Cleanup
if os.path.exists("temp_faiss_update"):
shutil.rmtree("temp_faiss_update")
return message
# Usage
# result = update_faiss_from_hf("yourusername/my-faiss-store", "new_document.pdf")
# print(result)
#====================
def upload_and_prepare(file,user):
# Load & split document
mm=""
if user == os.getenv("uploading_password"):
if file_exists(repo_id=repo_id, filename="index.faiss", repo_type="dataset"):
mm=update_faiss_from_hf(repo_id, file)
#mm="✅ Document processed. New index added. You can now ask questions!"
if not file_exists(repo_id=repo_id, filename="index.faiss", repo_type="dataset"):
mm=create_faiss_index(repo_id, file)
#mm="✅ Document processed. New index created. You can now ask questions!"
else:
mm="❌ Unauthorized User"
return mm
#create_faiss_index(repo_id, file_input)
#========================================TAB2 END=====================================================
#=============================================================================================gradio
with gr.Blocks() as demo:
status = gr.Markdown("# 🚀 NRL AI Space for commercial department - Guwahati")
# ---- Loading Screen ----
with gr.Column(visible=True) as loading_panel:
gr.Markdown("⏳ Loading resources, please wait...")
with gr.Column(visible=False) as login_panel:
user = gr.Textbox(label="Username", placeholder="hint:Pin code of the location where our refinery is")
pwd = gr.Textbox(label="Password", type="password", placeholder="hint:Pin code of the location where our corporate office is")
login_btn = gr.Button("Login")
# ---- Tabs Container (initially hidden) ----
with gr.Column(visible=False) as tabs_panel:
with gr.Tab("📄 ASK on manual of procurement of Goods"):
answer_output1 = gr.Textbox(label="✅ Answer", lines=10, interactive=True)
query_input1 = gr.Textbox(label="❓ Your Question pls", placeholder="e.g., What is Gem?")
query_btn1 = gr.Button("🧠 Get Answer", variant="primary")
query_btn1.click(
fn=summarize_with_flan_t5,
inputs=query_input1,
outputs=answer_output1 # answers with bullet, smart context and history
)
with gr.Tab("Upload PDF and create FAISS"):
gr.Markdown("## 🧠 For uploading new PDF documents.")
output_msg = gr.Textbox(label="📁 Authorization Message", interactive=False)
file_input = gr.File(label="📄 Upload .pdf File by only authorized user", type="filepath")
upload_btn = gr.Button("🔄 Process Doc")
authorized_user=gr.Textbox(label="Write the password to upload new Circular Doc.")
upload_btn.click(upload_and_prepare, inputs=[file_input, authorized_user], outputs=output_msg)
with gr.Tab("📊 Upcoming functionality-2"):
gr.Textbox(label="Coming soon")
with gr.Tab("📊 Upcoming functionality-3"):
gr.Textbox(label="Coming soon")
# Auto-trigger loading after app starts
demo.load(
load_resources,
outputs=[loading_panel, login_panel, tabs_panel]
)
login_btn.click(
login,
inputs=[user, pwd],
outputs=[loading_panel, login_panel, tabs_panel, status]
)
demo.launch()