Professional / app.py
Alamgirapi's picture
Update app.py
e5ec5b1 verified
import os
import sys
import tempfile
import shutil
from datetime import datetime
# CRITICAL: Set cache directories BEFORE importing any HuggingFace libraries
os.environ['TRANSFORMERS_CACHE'] = '/tmp/transformers'
os.environ['SENTENCE_TRANSFORMERS_HOME'] = '/tmp/sentence_transformers'
os.environ['HF_HOME'] = '/tmp/huggingface'
os.environ['HF_DATASETS_CACHE'] = '/tmp/datasets'
# Create cache directories
cache_dirs = ['/tmp/transformers', '/tmp/sentence_transformers', '/tmp/huggingface', '/tmp/datasets']
for cache_dir in cache_dirs:
try:
os.makedirs(cache_dir, exist_ok=True)
print(f"✓ Created cache directory: {cache_dir}")
except PermissionError as e:
print(f"✗ Failed to create {cache_dir}: {e}")
sys.exit(1)
# Now import other modules
from flask import Flask, request, jsonify, render_template, redirect, url_for, flash
import asyncio
from retriever.document_store import DocumentStore
from retriever.rag_pipeline import RAGPipeline
from models.model_loader import load_llm
from config import Config
app = Flask(__name__)
app.config.from_object(Config)
app.secret_key = os.getenv('SECRET_KEY', 'your-secret-key-here')
# Initialize components
print("===== Application Startup at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "=====")
print("Initializing document store...")
print(f"Using vector DB path: {os.getenv('VECTOR_DB_PATH', 'data/vector_store')}")
document_store = DocumentStore()
print("Document store initialized")
print("Loading LLM...")
llm = load_llm(api_key=app.config["LLAMA_API_KEY"])
print("LLM loaded")
print("Initializing RAG pipeline...")
rag_pipeline = RAGPipeline(document_store, llm)
print("RAG pipeline initialized")
@app.route('/')
def index():
"""Home page"""
return render_template('index.html')
@app.route('/add_data', methods=['GET', 'POST'])
def add_data():
"""Add data to the document store"""
if request.method == 'POST':
content = request.form.get('content')
title = request.form.get('title', 'Untitled')
if content:
try:
document_store.add_text(content=content, title=title)
flash('Data added successfully!', 'success')
return redirect(url_for('index'))
except Exception as e:
flash(f'Error adding data: {str(e)}', 'error')
else:
flash('Content is required', 'error')
return render_template('add_data.html')
@app.route('/upload_file', methods=['POST'])
def upload_file():
"""Upload and process a file (PDF, TXT, etc.)"""
if 'file' not in request.files:
flash('No file selected', 'error')
return redirect(url_for('add_data'))
file = request.files['file']
if file.filename == '':
flash('No file selected', 'error')
return redirect(url_for('add_data'))
if file:
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(file.filename)[1]) as tmp_file:
file.save(tmp_file.name)
# Process the file and add to document store
document_store.add_document(tmp_file.name)
# Clean up temporary file
os.unlink(tmp_file.name)
flash('File uploaded and processed successfully!', 'success')
except Exception as e:
flash(f'Error processing file: {str(e)}', 'error')
return redirect(url_for('add_data'))
@app.route('/api/generate', methods=['POST'])
async def api_generate():
"""API endpoint to generate text based on stored data"""
data = request.json
query = data.get('query', '')
gen_type = data.get('type', 'bio') # bio, cover_letter, general
if not query:
return jsonify({"error": "Query is required"}), 400
try:
# Generate response using RAG pipeline
response = await rag_pipeline.generate(query, gen_type)
return jsonify({"response": response})
except Exception as e:
return jsonify({"error": f"Error generating response: {str(e)}"}), 500
@app.route('/generate', methods=['GET', 'POST'])
def generate():
"""Generate text based on a query and display results"""
if request.method == 'POST':
query = request.form.get('query', '')
gen_type = request.form.get('type', 'bio')
if query:
try:
# Run the async function using asyncio
response = asyncio.run(rag_pipeline.generate(query, gen_type))
return render_template('generate.html', query=query, response=response, gen_type=gen_type)
except Exception as e:
flash(f'Error generating response: {str(e)}', 'error')
return render_template('generate.html', query=query, error=str(e))
else:
flash('Query is required', 'error')
return render_template('generate.html')
@app.route('/debug/documents', methods=['GET'])
def debug_documents():
"""Debug endpoint to view stored documents"""
try:
doc_count = len(document_store.documents)
chunk_count = sum(len(doc.get('chunks', [])) for doc in document_store.documents.values())
docs_summary = []
for doc_id, doc in document_store.documents.items():
docs_summary.append({
"id": doc_id,
"title": doc.get("title", "Untitled"),
"chunks": len(doc.get("chunks", [])),
"first_chunk_preview": doc.get("chunks", [""])[0][:100] + "..." if doc.get("chunks") else ""
})
return render_template(
'debug.html',
doc_count=doc_count,
chunk_count=chunk_count,
docs=docs_summary
)
except Exception as e:
return f"Error in debug endpoint: {str(e)}", 500
@app.route('/health')
def health_check():
"""Health check endpoint"""
return jsonify({"status": "healthy", "message": "RAG application is running"})
if __name__ == '__main__':
# Ensure data directory exists
os.makedirs("data", exist_ok=True)
# Get port from environment variable (Hugging Face Spaces uses PORT)
port = int(os.environ.get('PORT', 7860))
print(f"Starting Flask app on port {port}")
app.run(host='0.0.0.0', port=port, debug=False)