Spaces:
Sleeping
Sleeping
| from flask import Flask, request, jsonify | |
| from flask import render_template | |
| from werkzeug.utils import secure_filename | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings | |
| import google.generativeai as genai | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from utils import add_file_to_chroma, remove_file_from_chroma, generate_query_response | |
| from db import init_db,add_file_to_db,remove_file_from_db,get_all_files | |
| from langchain_chroma import Chroma | |
| import os | |
| from dotenv import load_dotenv | |
| import logging | |
| from flask_cors import CORS | |
| logging.basicConfig(filename='log.txt', filemode='w', level=logging.DEBUG, | |
| format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger() | |
| load_dotenv() | |
| HF_TOKEN = os.getenv('HF_TOKEN') | |
| GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY') | |
| CHROMA_PATH = "chroma" | |
| UPLOAD_FOLDER = "uploads" | |
| DB_PATH = "files.db" | |
| PROMPT_TEMPLATE = """ | |
| Answer the given query based only on the context given below. | |
| context: | |
| {context} | |
| --- | |
| based on the context above answer the following query: | |
| {query} | |
| """ | |
| # make required folder for files to upload | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| # Initialize Hugging Face embeddings Model | |
| hugging_face_ef = HuggingFaceInferenceAPIEmbeddings( | |
| api_key=HF_TOKEN, | |
| model_name="sentence-transformers/all-mpnet-base-v2" | |
| ) | |
| # initialize LLM | |
| genai.configure(api_key=GOOGLE_API_KEY) | |
| llm_model = genai.GenerativeModel("gemini-1.5-flash") | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Initialize ChromaDB client | |
| db = Chroma(persist_directory=CHROMA_PATH, embedding_function=hugging_face_ef) | |
| # Initialize database | |
| init_db() | |
| def index(): | |
| return render_template('index.html') | |
| def get_files(): | |
| """Get list of all files.""" | |
| try: | |
| files = get_all_files() | |
| return jsonify({ | |
| 'status': 'success', | |
| 'files': files | |
| }), 200 | |
| except Exception as e: | |
| logger.error(f"Error fetching files: {str(e)}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'error': str(e) | |
| }), 500 | |
| def upload_file(): | |
| """Handle file uploads.""" | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file in request', 'status': 'error'}), 400 | |
| file = request.files['file'] | |
| file_id = request.form.get('file_count') | |
| if not file or not file.filename: | |
| return jsonify({'error': 'No file selected', 'status': 'error'}), 400 | |
| filename = secure_filename(file.filename) | |
| file_type = filename.split('.')[-1].lower() | |
| file_path = os.path.join(UPLOAD_FOLDER, filename) | |
| file.save(file_path) | |
| try: | |
| # Add file chunks to ChromaDB | |
| add_file_to_chroma(file_path, file_id, hugging_face_ef, db, logger) | |
| # Add file to SQLite database | |
| add_file_to_db(file_id, filename, file_type) | |
| return jsonify({ | |
| 'message': 'File uploaded successfully', | |
| 'status': 'success', | |
| 'file_info': { | |
| 'file_id': file_id, | |
| 'file_name': filename, | |
| 'file_type': file_type | |
| } | |
| }), 200 | |
| except ValueError as e: | |
| return jsonify({ | |
| 'error': str(e), | |
| 'status': 'error' | |
| }), 400 | |
| except Exception as e: | |
| return jsonify({ | |
| 'error': str(e), | |
| 'status': 'error' | |
| }), 500 | |
| def remove_file(): | |
| file_id = request.form.get('file_id') | |
| try: | |
| # Remove from ChromaDB | |
| if remove_file_from_chroma(file_id, db): | |
| # Remove from SQLite database | |
| remove_file_from_db(file_id) | |
| return jsonify({ | |
| 'message': 'File deleted successfully', | |
| 'status': 'success' | |
| }), 200 | |
| else: | |
| return jsonify({ | |
| 'message': 'File Not Found', | |
| 'status': 'fail' | |
| }), 404 | |
| except Exception as e: | |
| logger.error(f"Error removing file: {str(e)}") | |
| return jsonify({ | |
| 'error': str(e), | |
| 'status': 'error' | |
| }), 500 | |
| def ask_query(): | |
| query = request.form.get("query") | |
| resp = generate_query_response(query, db, llm_model, PROMPT_TEMPLATE) | |
| return jsonify(resp), 200 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=8000, debug=True, threaded=True) |