monlam-rag / app.py
saber418's picture
Update app.py
ae9aa9c verified
# -*- coding: utf-8 -*-
"""Monlam RAG - Tibetan Historical Knowledge System with Intelligent Question Classification."""
import asyncio
import os
from typing import Dict, Any, List, Tuple
import gradio as gr
from openai import OpenAI
import google.generativeai as genai
from dotenv import load_dotenv
from pymilvus import MilvusClient
load_dotenv()
class TibetanRAGWebUI:
"""Standalone Web UI for Tibetan RAG system with Monlam LLM and intelligent question routing."""
def __init__(self):
# Initialize Monlam LLM (OpenAI-compatible API)
monlam_base_url = os.getenv('MONLAM_BASE_URL')
monlam_api_key = os.getenv('MONLAM_API_KEY')
if not monlam_base_url:
raise ValueError("MONLAM_BASE_URL not found in environment variables")
if not monlam_api_key:
raise ValueError("MONLAM_API_KEY not found in environment variables")
self.llm_client = OpenAI(
base_url=monlam_base_url,
api_key=monlam_api_key
)
self.model_name = "MonlamAI/merged_slice4-it-3"
# Initialize Gemini for embeddings only
gemini_api_key = os.getenv('GEMINI_API_KEY')
if not gemini_api_key:
raise ValueError("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=gemini_api_key)
# Initialize Zilliz/Milvus client
zilliz_uri = os.getenv('ZILLIZ_URI')
zilliz_token = os.getenv('ZILLIZ_TOKEN')
if not zilliz_uri or not zilliz_token:
raise ValueError("ZILLIZ_URI and ZILLIZ_TOKEN must be set in environment variables")
self.client = MilvusClient(
uri=zilliz_uri,
token=zilliz_token
)
self.collection_name = os.getenv('COLLECTION_NAME', 'melong')
print("✅ Monlam RAG System initialized with intelligent question classification")
async def generate_embedding(self, text: str) -> list:
"""Generate embeddings using Gemini."""
result = await asyncio.to_thread(
genai.embed_content,
model="models/gemini-embedding-001",
content=text,
task_type="retrieval_query"
)
return result['embedding']
async def search_documents(self, query_embedding: list, limit: int = 10) -> list:
"""Search for similar documents."""
try:
results = self.client.search(
collection_name=self.collection_name,
data=[query_embedding],
limit=limit,
output_fields=["content", "metadata"]
)
# Format results
formatted_results = []
if results and len(results) > 0:
for hit in results[0]:
formatted_results.append({
'content': hit.get('entity', {}).get('content', ''),
'metadata': hit.get('entity', {}).get('metadata', {}),
'similarity': hit.get('distance', 0.0)
})
return formatted_results
except Exception as e:
print(f"❌ Search error: {e}")
return []
async def classify_question(self, query: str) -> bool:
"""Check if question is historical/political and needs RAG."""
# Simple keyword-based classification for reliability
historical_keywords = [
'ལོ་རྒྱུས', 'སྲིད་དོན', 'གནད་དོན', 'དམག་འཁྲུག', 'རྒྱལ་པོ',
'བླ་མ', 'རང་བཙན', 'རང་སྐྱོང', 'བཙན་འཛུལ', 'ཆིངས་ཡིག',
'སྲོང་བཙན', 'ཏཱ་ལའི', 'རྒྱ་ནག', 'གོར་', 'དབུ་མའི་ལམ',
'སུ་ཡིན', 'ག་རེ་རེད', 'ཇི་ལྟར་བྱུང', 'ཅིའི་ཕྱིར'
]
# Check if query contains historical keywords
for keyword in historical_keywords:
if keyword in query:
print(f"🔍 Question classification: Historical (keyword: {keyword}) → Using RAG")
return True
# If no keywords found, still default to RAG for Tibetan questions
# (better to over-use RAG than under-use it)
print(f"🔍 Question classification: No clear keywords, defaulting to RAG")
return True
def build_prompt(self, query: str, docs: List[str]) -> str:
"""Build prompt with Chain of Thought reasoning for Monlam LLM."""
# Format documents with clear numbering and metadata
formatted_docs = []
for i, doc in enumerate(docs):
formatted_docs.append(f"Document [{i+1}]:\n{doc}")
ctx = "\n\n".join(formatted_docs)
return f"""You are an expert Tibetan historian and scholar. Answer the user's question using the provided Tibetan texts, writing in a natural, flowing narrative style.
# CHAIN OF THOUGHT REASONING PROCESS
Follow this structured thinking process before providing your final answer:
## Step 1: Question Analysis
- Identify the question type (factual/explanatory/comparative/analytical)
- Determine what specific information is being requested
- Note if the question asks for a brief answer or detailed explanation
## Step 2: Document Review
- Scan each provided document for relevant information
- Identify which documents contain pertinent facts, names, dates, or explanations
- Note the document numbers that are most relevant
## Step 3: Information Synthesis
- Extract key facts from relevant documents
- Organize information logically
- Identify any gaps or missing information
- Cross-reference information across multiple documents if applicable
## Step 4: Response Construction
- Determine appropriate response length based on question type
- Structure the answer with proper flow and coherence
- Prepare citations for each claim
---
*User's Question:* "{query}"
*Relevant Texts:*
---
{ctx}
---
---
# RESPONSE FRAMEWORK
Now provide your answer following this structure:
<think> (Internal reasoning - keep brief)
- Question type: [factual/explanatory/etc.]
- Relevant documents: [list document numbers]
- Key information found: [brief summary]
- Response approach: [short/comprehensive]
</think>
<answer> (Final answer in Tibetan - write as an expert historian would narrate)
[Provide your answer in Tibetan, following these guidelines:]
1. Narrative Style:
- Write in a natural, flowing manner like an expert Tibetan historian telling a story
- Use transitional phrases: དེ་ཡང་། (moreover), འོན་ཀྱང་། (however), དེར་བརྟེན། (therefore), མཐའ་མར། (finally)
- Build context before presenting facts
- Connect ideas smoothly rather than listing points
- Be circumspective and nuanced in analysis
- Use phrases like: "ཡིག་ཆ་ནང་གསལ་བ་ལྟར།" (as shown in the documents), "དེ་ལྟར་ན།" (in that case)
2. Answer Depth (IMPORTANT - Provide comprehensive answers):
- Factual questions: Provide context before the direct answer, then elaborate with details
- Explanatory questions: Develop a comprehensive narrative with historical background
- Start broad with context, then narrow to specifics, provide examples
- End with synthesis or implications when appropriate
- Aim for detailed, thorough responses (minimum 300-500 words for complex topics)
- Use multiple paragraphs to develop ideas fully
3. Citation Style:
- When citing sources, include the book/document title from metadata if available
- Format: དཔེ་དེབ་[book title] [number] or ཡིག་ཆ་[document title] [number]
- If no title available, use: ཡིག་ཆ་ [1], [2], [3]
- Example with title: "གནད་འགག་ནི་དཔེ་དེབ་བོད་དང་བོད་མིའི་མགྲིན་ཚབ་ [9] ནང་གསལ་བའི་..."
- Example without title: "བོད་དང་རྒྱའི་གནད་དོན་གྱི་སྙིང་པོ་ནི་ཡིག་ཆ་ [1] ནང་..."
- Group related sources: [1, 2] when multiple sources support same point
- Cite after claims, but don't let citations interrupt the narrative flow
4. Content Constraints:
- Use ONLY information from the provided documents
- Do NOT add external knowledge or assumptions
- If information is insufficient: "ཡིག་ཆ་འདི་དག་ནང་འདི་སྐོར་གྱི་ཆ་ཚང་བའི་གནས་ཚུལ་མི་འདུག"
5. Language Quality:
- Use sophisticated, scholarly Tibetan
- Maintain objectivity while being engaging
- Vary sentence structure for natural flow
- Use appropriate honorifics and formal language
</answer>
<sources> (Source summary in Tibetan)
[Write in Tibetan: Briefly list which documents were used and what information each provided. Format: ཡིག་ཆ་ [1]: [information provided], ཡིག་ཆ་ [2]: [information provided], etc.]
</sources>
IMPORTANT: The <sources> section MUST be written entirely in Tibetan language, just like the <answer> section.
"""
async def generate_response(self, prompt: str, context: list) -> Dict[str, str]:
"""Generate response with COT using Monlam LLM."""
full_prompt = self.build_prompt(prompt, context)
# Call Monlam LLM
try:
response = await asyncio.to_thread(
self.llm_client.chat.completions.create,
model=self.model_name,
messages=[
{
"role": "system",
"content": "You are an expert Tibetan historian and scholar. Always respond in Tibetan with proper citations in a natural, flowing narrative style. Provide comprehensive, detailed answers that fully explore the topic with historical context and analysis."
},
{
"role": "user",
"content": full_prompt
}
],
temperature=0.7,
max_tokens=8000
)
response_text = response.choices[0].message.content
except Exception as e:
print(f"❌ LLM error: {e}")
response_text = f"Error generating response: {str(e)}"
# Extract sections
thinking = ""
answer = ""
sources = ""
if "<think>" in response_text and "</think>" in response_text:
thinking_start = response_text.find("<think>") + len("<think>")
thinking_end = response_text.find("</think>")
thinking = response_text[thinking_start:thinking_end].strip()
if "<answer>" in response_text and "</answer>" in response_text:
answer_start = response_text.find("<answer>") + len("<answer>")
answer_end = response_text.find("</answer>")
answer = response_text[answer_start:answer_end].strip()
else:
answer = response_text
if "<sources>" in response_text and "</sources>" in response_text:
sources_start = response_text.find("<sources>") + len("<sources>")
sources_end = response_text.find("</sources>")
sources = response_text[sources_start:sources_end].strip()
return {
'think': thinking,
'answer': answer,
'sources': sources,
'full_response': response_text
}
async def process_query(
self,
query: str,
num_docs: int = 10,
show_thinking: bool = True,
show_sources: bool = True
) -> Tuple[str, str, str, str]:
"""Process query and return formatted results."""
if not query.strip():
return "⚠️ Please enter a question", "", "", ""
try:
# Step 1: Classify question
needs_rag = await self.classify_question(query)
if not needs_rag:
# Answer directly without RAG for non-historical questions
simple_response = await asyncio.to_thread(
self.llm_client.chat.completions.create,
model=self.model_name,
messages=[
{
"role": "system",
"content": "You are a helpful Tibetan language assistant. Answer questions naturally in Tibetan."
},
{
"role": "user",
"content": query
}
],
temperature=0.7,
max_tokens=8000
)
answer = simple_response.choices[0].message.content
return answer, "", "དྲི་བ་འདི་ལོ་རྒྱུས་དང་འབྲེལ་བ་མེད་པས། ཡིག་ཆ་བཤེར་མེད།", ""
# Step 2: Generate embedding for historical/political questions
query_embedding = await self.generate_embedding(query)
# Step 3: Search documents
docs = await self.search_documents(query_embedding, num_docs)
if not docs:
return "⚠️ No relevant documents found", "", "", ""
# Extract context
context = []
sources_info = []
for i, doc in enumerate(docs):
content = doc.get('content', '')
metadata = doc.get('metadata', {})
similarity = doc.get('similarity', 0.0)
# Format metadata
meta_info = []
if metadata.get('author'):
meta_info.append(f"Author: {metadata['author']}")
if metadata.get('book_title'):
meta_info.append(f"Book: {metadata['book_title']}")
if metadata.get('chapter'):
meta_info.append(f"Chapter: {metadata['chapter']}")
if metadata.get('topic'):
meta_info.append(f"Topic: {metadata['topic']}")
if meta_info:
context_with_meta = f"[{', '.join(meta_info)}]\n{content}"
else:
context_with_meta = content
context.append(context_with_meta)
# Format source info for display
source_display = f"**[{i+1}]** Similarity: {similarity:.3f}\n"
if meta_info:
source_display += f"*{', '.join(meta_info)}*\n"
source_display += f"{content[:300]}{'...' if len(content) > 300 else ''}\n"
sources_info.append(source_display)
# Generate response using Monlam LLM
response_data = await self.generate_response(query, context)
# Format outputs
thinking_output = response_data.get('think', '') if show_thinking else ""
answer_output = response_data.get('answer', response_data.get('full_response', ''))
sources_summary = response_data.get('sources', '') if show_sources else ""
retrieved_docs = "\n\n---\n\n".join(sources_info)
return answer_output, thinking_output, sources_summary, retrieved_docs
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
import traceback
print(traceback.format_exc())
return error_msg, "", "", ""
def query_sync(
self,
query: str,
num_docs: int,
show_thinking: bool,
show_sources: bool
) -> Tuple[str, str, str, str]:
"""Synchronous wrapper for Gradio."""
return asyncio.run(self.process_query(query, num_docs, show_thinking, show_sources))
# Initialize the RAG system
print("🏔️ Initializing Monlam RAG System...")
try:
rag_system = TibetanRAGWebUI()
print("✅ System ready!")
except Exception as e:
print(f"❌ Initialization error: {e}")
import traceback
print(traceback.format_exc())
rag_system = None
# Create Gradio interface
def create_interface():
"""Create the Gradio web interface."""
if rag_system is None:
# Show error interface if initialization failed
with gr.Blocks(title="Monlam RAG - Error") as demo:
gr.Markdown("# ❌ System Initialization Error")
gr.Markdown("Please check that all environment variables are set correctly:")
gr.Markdown("- `GEMINI_API_KEY` (for embeddings)\n- `ZILLIZ_URI`\n- `ZILLIZ_TOKEN`\n- `MONLAM_API_KEY`\n- `MONLAM_BASE_URL`\n- `COLLECTION_NAME` (optional, default: melong)")
return demo
with gr.Blocks(
title="Monlam RAG - Tibetan Historical Knowledge System",
theme=gr.themes.Soft()
) as demo:
gr.Markdown(
"""
# 🏔️ Tibetan RAG System with Monlam LLM
Ask questions about Tibetan texts and get answers with transparent reasoning and source citations.
**Features:**
- Chain of Thought reasoning
- Source citations
- Semantic search across Tibetan documents
- Powered by **MonlamAI/merged_slice4-it-3**
"""
)
with gr.Row():
with gr.Column(scale=2):
query_input = gr.Textbox(
label="དྲི་བ། / Question",
placeholder="Enter your question in Tibetan...",
lines=3
)
with gr.Row():
num_docs_slider = gr.Slider(
minimum=1,
maximum=20,
value=10,
step=1,
label="Number of documents to retrieve"
)
with gr.Row():
show_thinking_check = gr.Checkbox(
label="Show Chain of Thought reasoning",
value=True
)
show_sources_check = gr.Checkbox(
label="Show source summary",
value=True
)
with gr.Row():
submit_btn = gr.Button("🔍 Search & Answer", variant="primary", size="lg")
clear_btn = gr.Button("🗑️ Clear", size="lg")
with gr.Column(scale=1):
gr.Markdown(
"""
### 💡 Tips
**Question Types:**
- Factual: སུ་ཡིན། (who), གང་ཡིན། (what)
- Explanatory: ཅིའི་ཕྱིར། (why), ཇི་ལྟར། (how)
**Examples:**
- བོད་ཀྱི་རྒྱལ་པོ་སྲོང་བཙན་སྒམ་པོ་ནི་སུ་ཡིན།
- གོར་བོད་དམག་འཁྲུག་ཇི་ལྟར་བྱུང་།
**Powered by:**
- 🤖 Monlam LLM
- 🔍 Gemini Embeddings
- 📚 Zilliz Vector DB
"""
)
# Output sections
# Collapsible thinking section (above tabs)
with gr.Accordion("🧠 Chain of Thought Reasoning", open=False):
thinking_output = gr.Textbox(
label="",
lines=8,
show_label=False,
show_copy_button=True
)
# Main output tabs
with gr.Tabs():
with gr.Tab("🎯 Answer"):
answer_output = gr.Textbox(
label="ལན། / Answer",
lines=10,
show_copy_button=True
)
with gr.Tab("📑 Source Summary"):
sources_output = gr.Textbox(
label="ཁུངས་བསྡུས་དོན། / Sources",
lines=8,
show_copy_button=True
)
with gr.Tab("📚 Retrieved Documents"):
docs_output = gr.Textbox(
label="Retrieved Documents",
lines=15,
show_copy_button=True
)
# Event handlers
submit_btn.click(
fn=rag_system.query_sync,
inputs=[query_input, num_docs_slider, show_thinking_check, show_sources_check],
outputs=[answer_output, thinking_output, sources_output, docs_output]
)
clear_btn.click(
fn=lambda: ("", "", "", "", 10, True, True),
inputs=[],
outputs=[query_input, answer_output, thinking_output, sources_output, docs_output, num_docs_slider, show_thinking_check, show_sources_check]
)
# Example queries
gr.Examples(
examples=[
["བོད་དང་རྒྱའི་གནད་དོན་ག་རེ་རེད།"],
["སྲོང་བཙན་སྒམ་པོ་ནི་སུ་ཡིན།"],
["དབུ་མའི་ལམ་གྱི་སྲིད་བྱུས་ག་རེ་རེད།"],
],
inputs=query_input,
label="དཔེར་བརྗོད། / Examples"
)
gr.Markdown(
"""
---
**Note:** This system uses intelligent question classification to route historical/political questions through RAG with expert narrative responses, while answering general questions directly.
"""
)
return demo
if __name__ == "__main__":
# Create and launch the interface
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)