Spaces:
Sleeping
Sleeping
| from flask import Flask, request, send_file, abort, jsonify, url_for, render_template, Response | |
| from flask_cors import CORS | |
| import pandas as pd | |
| from sentence_transformers import SentenceTransformer, util | |
| import torch | |
| from dataclasses import dataclass | |
| from typing import List, Dict, Tuple, Optional, Any, Iterator | |
| from collections import deque | |
| import os | |
| import logging | |
| import atexit | |
| from threading import Thread, Lock | |
| import time | |
| from datetime import datetime | |
| from uuid import uuid4 as generate_uuid | |
| import csv as csv_lib | |
| import functools | |
| import json | |
| import re | |
| import subprocess | |
| import sys | |
| import sqlite3 | |
| import io | |
| from langdetect import detect, LangDetectException | |
| from dotenv import load_dotenv | |
| # Load environment variables from .env file AT THE VERY TOP | |
| load_dotenv() | |
| # Import RAG system and Fallback LLM from llm_handling AFTER load_dotenv | |
| # MODIFIED: Imported new functions and prompts | |
| from llm_handling import ( | |
| initialize_and_get_rag_system, | |
| KnowledgeRAG, | |
| groq_bot_instance, | |
| RAG_SOURCES_DIR, | |
| RAG_STORAGE_PARENT_DIR, | |
| RAG_CHUNKED_SOURCES_FILENAME, | |
| get_answer_from_context | |
| ) | |
| from system_prompts import PROMPTS | |
| # Setup logging (remains global for the app) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app_hybrid_rag.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) # Main app logger | |
| # --- Application Constants and Configuration --- | |
| ADMIN_USERNAME = os.getenv('FLASK_ADMIN_USERNAME', 'admin') | |
| ADMIN_PASSWORD = os.getenv('FLASK_ADMIN_PASSWORD', 'admin') | |
| FLASK_APP_HOST = os.getenv("FLASK_HOST", "0.0.0.0") | |
| FLASK_APP_PORT = int(os.getenv("FLASK_PORT", "7860")) | |
| FLASK_DEBUG_MODE = os.getenv("FLASK_DEBUG", "True").lower() == "true" | |
| _APP_BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
| TEXT_EXTRACTIONS_DIR = os.path.join(_APP_BASE_DIR, 'text_extractions') | |
| RELATED_QUESTIONS_TO_SHOW = 10 | |
| QUESTIONS_TO_SEND_TO_GROQ_QA = 3 | |
| # MODIFIED: Replaced separate confidence values with a single configurable one for the LLM formatter. | |
| LLM_FORMATTER_CONFIDENCE_THRESHOLD = int(os.getenv("LLM_FORMATTER_CONFIDENCE_THRESHOLD", "95")) | |
| HIGH_CONFIDENCE_THRESHOLD = 90 # For greetings, which are answered directly without LLM formatting. | |
| # MODIFIED: Made CHAT_HISTORY_TO_SEND configurable via environment variable | |
| CHAT_HISTORY_TO_SEND = int(os.getenv("CHAT_HISTORY_TO_SEND", "5")) # Defines how many *pairs* of (user, assistant) messages to send | |
| CHAT_LOG_FILE = os.path.join(_APP_BASE_DIR, 'chat_history.csv') | |
| rag_system: Optional[KnowledgeRAG] = None | |
| # --- Persistent Chat History Management using SQLite --- | |
| class ChatHistoryManager: | |
| def __init__(self, db_path): | |
| self.db_path = db_path | |
| self.lock = Lock() | |
| self._create_table() | |
| logger.info(f"SQLite chat history manager initialized at: {self.db_path}") | |
| def _get_connection(self): | |
| conn = sqlite3.connect(self.db_path, timeout=10) | |
| return conn | |
| def _create_table(self): | |
| with self.lock: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS chat_histories ( | |
| session_id TEXT PRIMARY KEY, | |
| history TEXT NOT NULL | |
| ) | |
| """) | |
| conn.commit() | |
| def get_history(self, session_id: str, limit_turns: int = 5) -> list: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT history FROM chat_histories WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| history_list = json.loads(row[0]) | |
| return history_list[-(limit_turns * 2):] | |
| else: | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error fetching history for session {session_id}: {e}", exc_info=True) | |
| return [] | |
| def update_history(self, session_id: str, query: str, answer: str): | |
| with self.lock: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT history FROM chat_histories WHERE session_id = ?", (session_id,)) | |
| row = cursor.fetchone() | |
| history = json.loads(row[0]) if row else [] | |
| history.append({'role': 'user', 'content': query}) | |
| history.append({'role': 'assistant', 'content': answer}) | |
| updated_history_json = json.dumps(history) | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO chat_histories (session_id, history) | |
| VALUES (?, ?) | |
| """, (session_id, updated_history_json)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"Error updating history for session {session_id}: {e}", exc_info=True) | |
| def clear_history(self, session_id: str): | |
| with self.lock: | |
| try: | |
| with self._get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT OR REPLACE INTO chat_histories (session_id, history) | |
| VALUES (?, ?) | |
| """, (session_id, json.dumps([]))) | |
| conn.commit() | |
| logger.info(f"Chat history cleared for session: {session_id}") | |
| except Exception as e: | |
| logger.error(f"Error clearing history for session {session_id}: {e}", exc_info=True) | |
| # --- EmbeddingManager for CSV QA (remains in app.py) --- | |
| class QAEmbeddings: | |
| questions: List[str] | |
| question_map: List[int] | |
| embeddings: torch.Tensor | |
| df_qa: pd.DataFrame | |
| original_questions: List[str] | |
| class EmbeddingManager: | |
| def __init__(self, model_name='all-MiniLM-L6-v2'): | |
| self.model = SentenceTransformer(model_name) | |
| self.embeddings = { | |
| 'general': None, | |
| 'personal': None, | |
| 'greetings': None | |
| } | |
| logger.info(f"EmbeddingManager initialized with model: {model_name}") | |
| def _process_questions(self, df: pd.DataFrame) -> Tuple[List[str], List[int], List[str]]: | |
| questions = [] | |
| question_map = [] | |
| original_questions = [] | |
| if 'Question' not in df.columns: | |
| logger.warning(f"DataFrame for EmbeddingManager is missing 'Question' column. Cannot process questions from it.") | |
| return questions, question_map, original_questions | |
| for idx, question_text_raw in enumerate(df['Question']): | |
| if pd.isna(question_text_raw): | |
| continue | |
| question_text_cleaned = str(question_text_raw).strip() | |
| if not question_text_cleaned or question_text_cleaned.lower() == "nan": | |
| continue | |
| questions.append(question_text_cleaned) | |
| question_map.append(idx) | |
| original_questions.append(question_text_cleaned) | |
| return questions, question_map, original_questions | |
| def update_embeddings(self, general_qa: pd.DataFrame, personal_qa: pd.DataFrame, greetings_qa: pd.DataFrame): | |
| gen_questions, gen_question_map, gen_original_questions = self._process_questions(general_qa) | |
| gen_embeddings = self.model.encode(gen_questions, convert_to_tensor=True, show_progress_bar=False) if gen_questions else None | |
| pers_questions, pers_question_map, pers_original_questions = self._process_questions(personal_qa) | |
| pers_embeddings = self.model.encode(pers_questions, convert_to_tensor=True, show_progress_bar=False) if pers_questions else None | |
| greet_questions, greet_question_map, greet_original_questions = self._process_questions(greetings_qa) | |
| greet_embeddings = self.model.encode(greet_questions, convert_to_tensor=True, show_progress_bar=False) if greet_questions else None | |
| self.embeddings['general'] = QAEmbeddings( | |
| questions=gen_questions, question_map=gen_question_map, embeddings=gen_embeddings, | |
| df_qa=general_qa, original_questions=gen_original_questions | |
| ) | |
| self.embeddings['personal'] = QAEmbeddings( | |
| questions=pers_questions, question_map=pers_question_map, embeddings=pers_embeddings, | |
| df_qa=personal_qa, original_questions=pers_original_questions | |
| ) | |
| self.embeddings['greetings'] = QAEmbeddings( | |
| questions=greet_questions, question_map=greet_question_map, embeddings=greet_embeddings, | |
| df_qa=greetings_qa, original_questions=greet_original_questions | |
| ) | |
| logger.info("CSV QA embeddings updated in EmbeddingManager.") | |
| def find_best_answers(self, user_query: str, qa_type: str, top_n: int = 5) -> Tuple[List[float], List[str], List[str], List[str], List[int]]: | |
| qa_data = self.embeddings[qa_type] | |
| if qa_data is None or qa_data.embeddings is None or len(qa_data.embeddings) == 0: | |
| return [], [], [], [], [] | |
| query_embedding_tensor = self.model.encode([user_query], convert_to_tensor=True, show_progress_bar=False) | |
| if not isinstance(qa_data.embeddings, torch.Tensor): | |
| qa_data.embeddings = torch.tensor(qa_data.embeddings) # Safeguard | |
| cos_scores = util.cos_sim(query_embedding_tensor, qa_data.embeddings)[0] | |
| top_k = min(top_n, len(cos_scores)) | |
| if top_k == 0: | |
| return [], [], [], [], [] | |
| top_scores_tensor, indices_tensor = torch.topk(cos_scores, k=top_k) | |
| top_confidences = [score.item() * 100 for score in top_scores_tensor] | |
| top_indices_mapped = [] | |
| top_questions = [] | |
| for idx_tensor in indices_tensor: | |
| item_idx = idx_tensor.item() | |
| if item_idx < len(qa_data.question_map) and item_idx < len(qa_data.original_questions): | |
| original_df_idx = qa_data.question_map[item_idx] | |
| if original_df_idx < len(qa_data.df_qa): | |
| top_indices_mapped.append(original_df_idx) | |
| top_questions.append(qa_data.original_questions[item_idx]) | |
| else: | |
| logger.warning(f"Index out of bounds: original_df_idx {original_df_idx} for df_qa length {len(qa_data.df_qa)}") | |
| else: | |
| logger.warning(f"Index out of bounds: item_idx {item_idx} for question_map/original_questions") | |
| valid_count = len(top_indices_mapped) | |
| top_confidences = top_confidences[:valid_count] | |
| top_questions = top_questions[:valid_count] | |
| # MODIFIED: Changed Answer to Respuesta to match new loading logic for xlsx | |
| answer_col = 'Respuesta' if 'Respuesta' in qa_data.df_qa.columns else 'Answer' | |
| top_answers = [str(qa_data.df_qa[answer_col].iloc[i]) for i in top_indices_mapped] | |
| top_images = [str(qa_data.df_qa['Image'].iloc[i]) if 'Image' in qa_data.df_qa.columns and pd.notna(qa_data.df_qa['Image'].iloc[i]) else None for i in top_indices_mapped] | |
| return top_confidences, top_questions, top_answers, top_images, top_indices_mapped | |
| # --- DatabaseMonitor for personal_qa.csv placeholders (remains in app.py) --- | |
| class DatabaseMonitor: | |
| def __init__(self, database_path): | |
| self.logger = logging.getLogger(__name__ + ".DatabaseMonitor") | |
| self.database_path = database_path | |
| self.last_modified = None | |
| self.last_size = None | |
| self.df = None | |
| self.lock = Lock() | |
| self.running = True | |
| self._load_database() | |
| self.monitor_thread = Thread(target=self._monitor_database, daemon=True) | |
| self.monitor_thread.start() | |
| self.logger.info(f"DatabaseMonitor initialized for: {database_path}") | |
| def _load_database(self): | |
| try: | |
| if not os.path.exists(self.database_path): | |
| self.logger.warning(f"Personal data file not found: {self.database_path}.") | |
| self.df = None | |
| return | |
| with self.lock: | |
| self.df = pd.read_csv(self.database_path, encoding='cp1252') | |
| self.last_modified = os.path.getmtime(self.database_path) | |
| self.last_size = os.path.getsize(self.database_path) | |
| self.logger.info(f"Personal data file reloaded: {self.database_path}") | |
| except Exception as e: | |
| self.logger.error(f"Error loading personal data file '{self.database_path}': {e}", exc_info=True) | |
| self.df = None | |
| def _monitor_database(self): | |
| while self.running: | |
| try: | |
| if not os.path.exists(self.database_path): | |
| if self.df is not None: | |
| self.logger.warning(f"Personal data file disappeared: {self.database_path}") | |
| self.df = None; self.last_modified = None; self.last_size = None | |
| time.sleep(5) | |
| continue | |
| current_modified = os.path.getmtime(self.database_path); current_size = os.path.getsize(self.database_path) | |
| if (self.last_modified is None or current_modified != self.last_modified or | |
| self.last_size is None or current_size != self.last_size): | |
| self.logger.info("Personal data file change detected.") | |
| self._load_database() | |
| time.sleep(1) | |
| except Exception as e: | |
| self.logger.error(f"Error monitoring personal data file: {e}", exc_info=True) | |
| time.sleep(5) | |
| def get_data(self, user_id): | |
| with self.lock: | |
| if self.df is not None and user_id: | |
| try: | |
| if 'id' not in self.df.columns: | |
| self.logger.warning("'id' column not found in personal_data.csv") | |
| return None | |
| id_col_type = self.df['id'].dtype | |
| target_user_id = user_id | |
| if pd.api.types.is_numeric_dtype(id_col_type): | |
| try: | |
| if user_id is None: return None | |
| valid_ids = self.df['id'].dropna() | |
| if not valid_ids.empty: | |
| target_user_id = type(valid_ids.iloc[0])(user_id) | |
| else: | |
| target_user_id = int(user_id) | |
| except (ValueError, TypeError): | |
| self.logger.warning(f"Could not convert user_id '{user_id}' to numeric type {id_col_type}") | |
| return None | |
| user_data = self.df[self.df['id'] == target_user_id] | |
| if not user_data.empty: return user_data.iloc[0].to_dict() | |
| except Exception as e: | |
| self.logger.error(f"Error retrieving data for user_id {user_id}: {e}", exc_info=True) | |
| return None | |
| def stop(self): | |
| self.running = False | |
| if hasattr(self, 'monitor_thread') and self.monitor_thread.is_alive(): | |
| self.monitor_thread.join(timeout=5) | |
| self.logger.info("DatabaseMonitor stopped.") | |
| # --- Flask App Initialization --- | |
| app = Flask(__name__) | |
| CORS(app, resources={r"/*": {"origins": "*"}}, supports_credentials=True) | |
| # --- Initialize Managers --- | |
| embedding_manager = EmbeddingManager() | |
| history_manager = ChatHistoryManager('chat_history.db') | |
| database_csv_path = os.path.join(RAG_SOURCES_DIR, 'database.csv') | |
| personal_data_monitor = DatabaseMonitor(database_csv_path) | |
| # --- Helper Functions (App specific) --- | |
| def clean_html_from_text(text: str) -> str: | |
| """Removes HTML tags from a string using a simple regex.""" | |
| if not isinstance(text, str): | |
| return text | |
| clean_text = re.sub(r'<[^>]+>', '', text) | |
| return clean_text.strip() | |
| def normalize_text(text): | |
| if isinstance(text, str): | |
| replacements = { | |
| '\x91': "'", '\x92': "'", '\x93': '"', '\x94': '"', | |
| '\x96': '-', '\x97': '-', '\x85': '...', '\x95': '-', | |
| '"': '"', '"': '"', '‘': "'", '’': "'", | |
| '–': '-', '—': '-', '…': '...', '•': '-', | |
| } | |
| for old, new in replacements.items(): text = text.replace(old, new) | |
| return text | |
| def require_admin_auth(f): | |
| def decorated(*args, **kwargs): | |
| auth = request.authorization | |
| if not auth or auth.username != ADMIN_USERNAME or auth.password != ADMIN_PASSWORD: | |
| return Response('Admin auth failed.', 401, {'WWW-Authenticate': 'Basic realm="Admin Login Required"'}) | |
| return f(*args, **kwargs) | |
| return decorated | |
| def initialize_chat_log(): | |
| if not os.path.exists(CHAT_LOG_FILE): | |
| with open(CHAT_LOG_FILE, 'w', newline='', encoding='utf-8') as f: | |
| writer = csv_lib.writer(f) | |
| writer.writerow(['sl', 'date_time', 'session_id', 'user_id', 'query', 'answer']) | |
| def store_chat_history(sid: str, uid: Optional[str], query: str, resp: Dict[str, Any]): | |
| try: | |
| # This now gets the final response key, which is 'answer' in the old logic | |
| answer = str(resp.get('answer', '')) | |
| history_manager.update_history(sid, query, answer) | |
| initialize_chat_log() | |
| next_sl = 1 | |
| try: | |
| if os.path.exists(CHAT_LOG_FILE) and os.path.getsize(CHAT_LOG_FILE) > 0: | |
| df_log = pd.read_csv(CHAT_LOG_FILE, on_bad_lines='skip') | |
| if not df_log.empty and 'sl' in df_log.columns and pd.api.types.is_numeric_dtype(df_log['sl'].dropna()): | |
| if not df_log['sl'].dropna().empty: | |
| next_sl = int(df_log['sl'].dropna().max()) + 1 | |
| except Exception as e: | |
| logger.error(f"Error reading SL from {CHAT_LOG_FILE}: {e}", exc_info=True) | |
| with open(CHAT_LOG_FILE, 'a', newline='', encoding='utf-8') as f: | |
| csv_lib.writer(f).writerow([next_sl, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), sid, uid or "N/A", query, answer]) | |
| except Exception as e: | |
| logger.error(f"Error in store_chat_history for session {sid}: {e}", exc_info=True) | |
| def get_formatted_chat_history(session_id: str) -> List[Dict[str, str]]: | |
| if not session_id: | |
| return [] | |
| return history_manager.get_history(session_id, limit_turns=CHAT_HISTORY_TO_SEND) | |
| def get_qa_context_for_groq(all_questions: List[Dict]) -> str: | |
| valid_qa_pairs = [] | |
| non_greeting_questions = [q for q in all_questions if q.get('source_type') != 'greetings'] | |
| sorted_questions = sorted(non_greeting_questions, key=lambda x: x.get('confidence', 0), reverse=True) | |
| for qa in sorted_questions[:QUESTIONS_TO_SEND_TO_GROQ_QA]: | |
| answer = qa.get('answer') | |
| if (not pd.isna(answer) and isinstance(answer, str) and answer.strip() and | |
| "not available" not in answer.lower()): | |
| valid_qa_pairs.append(f"Q: {qa.get('question')}\nA: {answer}") | |
| return '\n'.join(valid_qa_pairs) | |
| def replace_placeholders_in_answer(answer, db_data): | |
| if pd.isna(answer) or str(answer).strip() == '': | |
| return "Sorry, this information is not available yet" | |
| answer_str = str(answer) | |
| placeholders = re.findall(r'\{(\w+)\}', answer_str) | |
| if not placeholders: return answer_str | |
| if db_data is None: | |
| return "To get this specific information, please ensure you are logged in or have provided your user ID." | |
| missing_count = 0; replacements_made = 0 | |
| for placeholder in set(placeholders): | |
| key = placeholder.strip() | |
| value = db_data.get(key) | |
| if value is None or (isinstance(value, float) and pd.isna(value)) or str(value).strip() == '': | |
| answer_str = answer_str.replace(f'{{{key}}}', "not available") | |
| missing_count += 1 | |
| else: | |
| answer_str = answer_str.replace(f'{{{key}}}', str(value)) | |
| replacements_made +=1 | |
| if missing_count == len(placeholders) and len(placeholders) > 0 : | |
| return "Sorry, some specific details for you are not available at the moment." | |
| if "not available" in answer_str.lower() and replacements_made < len(placeholders): | |
| if answer_str == "not available" and len(placeholders) == 1: | |
| return "Sorry, this information is not available yet." | |
| if re.search(r'\{(\w+)\}', answer_str): | |
| logger.warning(f"Unresolved placeholders remain after replacement attempt: {answer_str}") | |
| answer_str = re.sub(r'\{(\w+)\}', "a specific detail", answer_str) | |
| if "a specific detail" in answer_str and not "Sorry" in answer_str: | |
| return "Sorry, I couldn't retrieve all the specific details for this answer. " + answer_str | |
| return "Sorry, I couldn't retrieve all the specific details for this answer. Some information has been generalized." | |
| return answer_str | |
| # --- Non-Streaming Logic (Preserved from original) --- | |
| def get_hybrid_response_logic_non_streaming(user_query: str, session_id: str, user_id: Optional[str], chat_history: Optional[List[Dict]] = None) -> Dict[str, Any]: | |
| global rag_system | |
| if not user_query: return {'error': 'No query provided'} | |
| if not session_id: return {'error': 'session_id is required'} | |
| # AÑADIDO: Detección de idioma | |
| try: | |
| lang = detect(user_query) | |
| if lang not in ["es", "en"]: # Define los idiomas soportados | |
| lang = "es" | |
| except LangDetectException: | |
| lang = "es" | |
| personal_db_data = personal_data_monitor.get_data(user_id) if user_id else None | |
| # MODIFIED: Capture indices from the search | |
| conf_greet, q_greet, a_greet, img_greet, idx_greet = embedding_manager.find_best_answers(user_query, 'greetings', top_n=1) | |
| conf_pers, q_pers, a_pers, img_pers, idx_pers = embedding_manager.find_best_answers(user_query, 'personal', top_n=RELATED_QUESTIONS_TO_SHOW) | |
| conf_gen, q_gen, a_gen, img_gen, idx_gen = embedding_manager.find_best_answers(user_query, 'general', top_n=RELATED_QUESTIONS_TO_SHOW) | |
| all_csv_candidate_answers = [] | |
| if conf_greet and conf_greet[0] >= HIGH_CONFIDENCE_THRESHOLD: | |
| all_csv_candidate_answers.append({'question': q_greet[0], 'answer': a_greet[0], 'image': img_greet[0] if img_greet else None, 'confidence': conf_greet[0], 'source_type': 'greetings', 'original_index': idx_greet[0]}) | |
| if conf_pers: | |
| # MODIFIED: Add original_index to candidates | |
| for c, q, a, img, idx in zip(conf_pers, q_pers, a_pers, img_pers, idx_pers): | |
| processed_a = replace_placeholders_in_answer(a, personal_db_data) | |
| if not ("Sorry, this information is not available yet" in processed_a or "To get this specific information" in processed_a): | |
| all_csv_candidate_answers.append({'question': q, 'answer': processed_a, 'image': img, 'confidence': c, 'source_type': 'personal', 'original_index': idx}) | |
| if conf_gen: | |
| # MODIFIED: Add original_index to candidates | |
| for c, q, a, img, idx in zip(conf_gen, q_gen, a_gen, img_gen, idx_gen): | |
| if not (pd.isna(a) or str(a).strip() == '' or str(a).lower() == 'nan'): | |
| all_csv_candidate_answers.append({'question': q, 'answer': str(a), 'image': img, 'confidence': c, 'source_type': 'general', 'original_index': idx}) | |
| all_csv_candidate_answers.sort(key=lambda x: x['confidence'], reverse=True) | |
| related_questions_list = [] | |
| if all_csv_candidate_answers: | |
| best_csv_match = all_csv_candidate_answers[0] | |
| is_direct_csv_answer = False | |
| source_name = "" | |
| # MODIFIED: Use new configurable threshold for LLM formatting | |
| best_source_type = best_csv_match['source_type'] | |
| best_confidence = best_csv_match['confidence'] | |
| if best_source_type == 'greetings' and best_confidence >= HIGH_CONFIDENCE_THRESHOLD: | |
| is_direct_csv_answer = True | |
| source_name = 'greetings_qa' | |
| elif best_source_type in ['personal', 'general'] and best_confidence >= LLM_FORMATTER_CONFIDENCE_THRESHOLD: | |
| is_direct_csv_answer = True | |
| source_name = f"{best_source_type}_qa" | |
| if is_direct_csv_answer: | |
| # MODIFICATION START: Reroute high-confidence matches to the LLM for formatting | |
| best_match_source = best_csv_match['source_type'] | |
| # For greetings, we still provide a direct answer without LLM formatting | |
| if best_match_source == 'greetings': | |
| response_data = {'query': user_query, 'answer': best_csv_match['answer'], 'confidence': best_csv_match['confidence'], 'original_question': best_csv_match['question'], 'source': source_name} | |
| if best_csv_match.get('image'): | |
| response_data['image_url'] = url_for('static', filename=best_csv_match['image'], _external=True) | |
| else: | |
| # For 'personal' and 'general', use the LLM to format the answer from the full row | |
| best_match_index = best_csv_match['original_index'] | |
| # Retrieve the full row from the original dataframe stored in the embedding manager | |
| original_df = embedding_manager.embeddings[best_match_source].df_qa | |
| matched_row_data = original_df.iloc[best_match_index] | |
| # Format the row data as a string context for the LLM | |
| # We drop the 'Question' column as it's a duplicate of 'Pregunta' and not needed in the context | |
| context_dict = matched_row_data.drop('Question', errors='ignore').to_dict() | |
| context_str = "\n".join([f"'{key}': '{value}'" for key, value in context_dict.items() if pd.notna(value) and str(value).strip() != '']) | |
| # Call the LLM to generate a conversational answer based on the row data | |
| selected_qa_prompt = PROMPTS.get("qa_formatter_prompt", {}).get(lang, PROMPTS["qa_formatter_prompt"]["es"]) | |
| final_answer = get_answer_from_context( | |
| question=user_query, | |
| context=context_str, | |
| system_prompt=selected_qa_prompt | |
| ) | |
| response_data = { | |
| 'query': user_query, | |
| 'answer': final_answer, | |
| 'confidence': best_csv_match['confidence'], | |
| 'original_question': best_csv_match['question'], | |
| 'source': f'{source_name}_llm_formatted' | |
| } | |
| if best_csv_match.get('image'): | |
| response_data['image_url'] = url_for('static', filename=best_csv_match['image'], _external=True) | |
| # MODIFICATION END | |
| for i, cand_q in enumerate(all_csv_candidate_answers): | |
| if i == 0: continue | |
| if cand_q['source_type'] != 'greetings': | |
| related_questions_list.append({'question': cand_q['question'], 'answer': cand_q['answer'], 'match': cand_q['confidence']}) | |
| if len(related_questions_list) >= RELATED_QUESTIONS_TO_SHOW: break | |
| response_data['related_questions'] = related_questions_list | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return response_data | |
| if rag_system and rag_system.retriever: | |
| logger.info(f"Attempting FAISS RAG query for: {user_query[:50]}...") | |
| rag_result = rag_system.invoke(query=user_query, lang=lang) # MODIFICADO # Use invoke for non-streaming | |
| rag_answer = rag_result.get("answer") | |
| if rag_answer and "the provided bibliography does not contain specific information" not in rag_answer.lower(): | |
| logger.info(f"FAISS RAG system provided a valid answer: {rag_answer[:100]}...") | |
| response_data = { | |
| 'query': user_query, 'answer': rag_answer, 'confidence': 85, | |
| 'source': 'document_rag_faiss', 'related_questions': [], | |
| 'document_sources_details': rag_result.get("cited_source_details") | |
| } | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return response_data | |
| logger.info(f"No high-confidence answer. Using Groq fallback.") | |
| chat_history_messages_for_groq = chat_history if chat_history is not None else get_formatted_chat_history(session_id) | |
| groq_context = {'current_query': user_query, 'chat_history': chat_history_messages_for_groq, 'qa_related_info': ""} | |
| groq_stream = groq_bot_instance.stream_response(groq_context, lang=lang) # MODIFICADO | |
| groq_answer = "".join([chunk for chunk in groq_stream]) | |
| response_data = {'query': user_query, 'answer': groq_answer, 'confidence': 75, 'source': 'groq_general_fallback', 'related_questions': []} | |
| store_chat_history(session_id, user_id, user_query, response_data) | |
| return response_data | |
| # --- Streaming Logic --- | |
| def generate_streaming_response(user_query: str, session_id: str, user_id: Optional[str], chat_history: Optional[List[Dict]] = None) -> Iterator[str]: | |
| """ | |
| Handles the logic for generating a response and yields chunks of the response as a stream. | |
| """ | |
| global rag_system | |
| # AÑADIDO: Detección de idioma | |
| try: | |
| lang = detect(user_query) | |
| if lang not in ["es", "en"]: | |
| lang = "es" | |
| except LangDetectException: | |
| lang = "es" | |
| personal_db_data = personal_data_monitor.get_data(user_id) if user_id else None | |
| conf_greet, _, a_greet, _, idx_greet = embedding_manager.find_best_answers(user_query, 'greetings', top_n=1) | |
| conf_pers, _, a_pers, _, idx_pers = embedding_manager.find_best_answers(user_query, 'personal', top_n=1) | |
| conf_gen, _, a_gen, _, idx_gen = embedding_manager.find_best_answers(user_query, 'general', top_n=1) | |
| # MODIFIED: Use new configurable threshold and logic for picking best candidate | |
| candidates = [] | |
| # Greetings have their own threshold for a direct, non-LLM answer | |
| if conf_greet and conf_greet[0] >= HIGH_CONFIDENCE_THRESHOLD: | |
| candidates.append({'answer': a_greet[0], 'confidence': conf_greet[0], 'source': 'greetings', 'index': idx_greet[0]}) | |
| # Personal and General QA have a stricter threshold to be sent to the LLM formatter | |
| if conf_pers and conf_pers[0] >= LLM_FORMATTER_CONFIDENCE_THRESHOLD: | |
| processed_a = replace_placeholders_in_answer(a_pers[0], personal_db_data) | |
| # Only add candidate if placeholder replacement was successful | |
| if not ("Sorry, this information is not available yet" in processed_a or "To get this specific information" in processed_a): | |
| candidates.append({'answer': processed_a, 'confidence': conf_pers[0], 'source': 'personal', 'index': idx_pers[0]}) | |
| if conf_gen and conf_gen[0] >= LLM_FORMATTER_CONFIDENCE_THRESHOLD: | |
| # Filter out empty/invalid answers | |
| if not (pd.isna(a_gen[0]) or str(a_gen[0]).strip() == '' or str(a_gen[0]).lower() == 'nan'): | |
| candidates.append({'answer': a_gen[0], 'confidence': conf_gen[0], 'source': 'general', 'index': idx_gen[0]}) | |
| if candidates: | |
| best_candidate = max(candidates, key=lambda x: x['confidence']) | |
| best_source_type = best_candidate['source'] | |
| logger.info(f"High-confidence match from CSV source: {best_source_type}") | |
| # If the best match is a greeting, yield it directly | |
| if best_source_type == 'greetings': | |
| yield best_candidate['answer'] | |
| return | |
| # Otherwise, the best match is 'personal' or 'general' and needs LLM formatting | |
| original_df = embedding_manager.embeddings[best_source_type].df_qa | |
| matched_row_data = original_df.iloc[best_candidate['index']] | |
| context_dict = matched_row_data.drop('Question', errors='ignore').to_dict() | |
| context_str = "\n".join([f"'{key}': '{value}'" for key, value in context_dict.items() if pd.notna(value) and str(value).strip() != '']) | |
| # MODIFICADO: Usa el prompt multilingüe del diccionario PROMPTS | |
| # Asume que ya has detectado el idioma 'lang' | |
| selected_qa_prompt = PROMPTS.get("qa_formatter_prompt", {}).get(lang, PROMPTS["qa_formatter_prompt"]["es"]) | |
| final_answer = get_answer_from_context( | |
| question=user_query, | |
| context=context_str, | |
| system_prompt=selected_qa_prompt | |
| ) | |
| yield final_answer | |
| return | |
| if rag_system and rag_system.retriever: | |
| logger.info(f"Attempting to stream from FAISS RAG for: {user_query[:50]}...") | |
| rag_stream = rag_system.stream(query=user_query, lang=lang) # MODIFICADO | |
| first_chunk = next(rag_stream, None) | |
| if first_chunk and "the provided bibliography does not contain specific information" not in first_chunk.lower(): | |
| logger.info("FAISS RAG streaming valid answer...") | |
| yield first_chunk | |
| yield from rag_stream | |
| return | |
| logger.info(f"No high-confidence CSV or RAG answer. Streaming from Groq fallback.") | |
| chat_history_messages_for_groq = chat_history if chat_history is not None else get_formatted_chat_history(session_id) | |
| groq_context = {'current_query': user_query, 'chat_history': chat_history_messages_for_groq, 'qa_related_info': ""} | |
| yield from groq_bot_instance.stream_response(groq_context, lang=lang) # MODIFICADO | |
| def stream_formatter(logic_generator: Iterator[str], session_id: str, user_id: Optional[str], query: str) -> Iterator[str]: | |
| """ | |
| Wraps raw text chunks into the Server-Sent Events (SSE) format and logs the full response at the end. | |
| """ | |
| chunk_id = f"chatcmpl-{str(generate_uuid())}" | |
| model_name = "MedicalAssisstantBot/v1" | |
| full_response_chunks = [] | |
| for chunk in logic_generator: | |
| if not chunk: continue | |
| full_response_chunks.append(chunk) | |
| response_json = { | |
| "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, | |
| "choices": [{"index": 0, "delta": {"content": chunk}, "finish_reason": None}] | |
| } | |
| yield f"data: {json.dumps(response_json)}\n\n" | |
| final_json = { | |
| "id": chunk_id, "object": "chat.completion.chunk", "created": int(time.time()), "model": model_name, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}] | |
| } | |
| yield f"data: {json.dumps(final_json)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # After streaming is complete, log the full conversation to the database | |
| full_response = "".join(full_response_chunks) | |
| # MODIFIED: Added print statement for full streamed response | |
| print(f"\n--- STREAMED FULL RESPONSE ---") | |
| print(full_response) | |
| print(f"------------------------------\n") | |
| history_manager.update_history(session_id, query, full_response) | |
| # --- Original Chat Endpoint (Preserved) --- | |
| def get_answer_hybrid(): | |
| data = request.json | |
| user_query = data.get('query', '') | |
| user_query = clean_html_from_text(user_query) # ADDED | |
| user_id = data.get('user_id') | |
| session_id = data.get('session_id') | |
| if not user_query or not session_id: | |
| return jsonify({'error': 'query and session_id are required'}), 400 | |
| response_data = get_hybrid_response_logic_non_streaming(user_query, session_id, user_id, None) | |
| return jsonify(response_data) | |
| # --- OpenAI Compatible Endpoints (Added) --- | |
| def list_models(): | |
| model_data = { | |
| "object": "list", | |
| "data": [{"id": "MedicalAssisstantBot/v1", "object": "model", "created": int(time.time()), "owned_by": "user"}] | |
| } | |
| return jsonify(model_data) | |
| def openai_compatible_chat_endpoint(): | |
| data = request.json | |
| is_streaming = data.get("stream", False) | |
| messages = data.get("messages", []) | |
| if not messages: return jsonify({"error": "No messages provided"}), 400 | |
| user_query = messages[-1].get("content", "") | |
| user_query = clean_html_from_text(user_query) # ADDED | |
| chat_history = messages[:-1] | |
| session_id = data.get("conversation_id", f"webui-session-{str(generate_uuid())}") | |
| user_id = None | |
| if is_streaming: | |
| logic_generator = generate_streaming_response(user_query, session_id, user_id, chat_history) | |
| return Response(stream_formatter(logic_generator, session_id, user_id, user_query), mimetype='text/event-stream') | |
| else: | |
| full_response_dict = get_hybrid_response_logic_non_streaming(user_query, session_id, user_id, chat_history) | |
| response_content = full_response_dict.get("answer", "Sorry, an error occurred.") | |
| openai_response = { | |
| "id": f"chatcmpl-{str(generate_uuid())}", "object": "chat.completion", "created": int(time.time()), | |
| "model": "MedicalAssisstantBot/v1", | |
| "choices": [{"index": 0, "message": {"role": "assistant", "content": response_content}, "finish_reason": "stop"}], | |
| "usage": {"prompt_tokens": 0, "completion_tokens": 0, "total_tokens": 0} | |
| } | |
| history_manager.update_history(session_id, user_query, response_content) | |
| return jsonify(openai_response) | |
| # --- Admin and Utility Routes (Unchanged) --- | |
| def index_route(): | |
| template_to_render = 'chat-bot.html' | |
| if not os.path.exists(os.path.join(app.root_path, 'templates', template_to_render)): | |
| logger.warning(f"Template '{template_to_render}' not found. Serving basic message.") | |
| return "Chatbot interface not found. Please ensure 'templates/chat-bot.html' exists.", 404 | |
| return render_template(template_to_render) | |
| def get_faiss_rag_status(): | |
| global rag_system | |
| if not rag_system: | |
| return jsonify({"error": "FAISS RAG system not initialized."}), 500 | |
| try: | |
| status = { | |
| "status": "Initialized" if rag_system.retriever else "Initialized (Retriever not ready)", | |
| "index_storage_dir": rag_system.index_storage_dir, | |
| "embedding_model": rag_system.embedding_model_name, | |
| "groq_model": rag_system.groq_model_name, | |
| "retriever_k": rag_system.retriever.k if rag_system.retriever else "N/A", | |
| "processed_source_files": rag_system.processed_source_files, | |
| "index_type": "FAISS", | |
| "index_loaded_or_built": rag_system.vector_store is not None | |
| } | |
| if rag_system.vector_store and hasattr(rag_system.vector_store, 'index') and rag_system.vector_store.index: | |
| try: | |
| status["num_vectors_in_index"] = rag_system.vector_store.index.ntotal | |
| except Exception: | |
| status["num_vectors_in_index"] = "N/A (Could not get count)" | |
| else: | |
| status["num_vectors_in_index"] = "N/A (Vector store or index not available)" | |
| return jsonify(status) | |
| except Exception as e: | |
| logger.error(f"Error getting FAISS RAG status: {e}", exc_info=True) | |
| return jsonify({"error": str(e)}), 500 | |
| # NEW FUNCTION: Endpoint to download the combined QA databases as an Excel file | |
| def download_qa_database(): | |
| try: | |
| output = io.BytesIO() | |
| with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
| # Safely access the dataframes from the embedding manager | |
| if embedding_manager.embeddings['general'] and embedding_manager.embeddings['general'].df_qa is not None: | |
| embedding_manager.embeddings['general'].df_qa.to_excel(writer, sheet_name='General_QA', index=False) | |
| if embedding_manager.embeddings['personal'] and embedding_manager.embeddings['personal'].df_qa is not None: | |
| embedding_manager.embeddings['personal'].df_qa.to_excel(writer, sheet_name='Personal_QA', index=False) | |
| if embedding_manager.embeddings['greetings'] and embedding_manager.embeddings['greetings'].df_qa is not None: | |
| embedding_manager.embeddings['g greetings'].df_qa.to_excel(writer, sheet_name='Greetings', index=False) | |
| output.seek(0) | |
| return send_file( | |
| output, | |
| mimetype='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', | |
| as_attachment=True, | |
| download_name=f'qa_database_{datetime.now().strftime("%Y%m%d")}.xlsx' | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating QA database file: {e}", exc_info=True) | |
| return jsonify({'error': 'Failed to generate QA database file'}), 500 | |
| def rebuild_faiss_index_route(): | |
| global rag_system | |
| logger.info("Admin request to rebuild FAISS RAG index received. Starting two-step process.") | |
| logger.info("Step 1: Running chunker.py to pre-process source documents.") | |
| chunker_script_path = os.path.join(_APP_BASE_DIR, 'chunker.py') | |
| chunked_json_output_path = os.path.join(RAG_STORAGE_PARENT_DIR, RAG_CHUNKED_SOURCES_FILENAME) | |
| os.makedirs(TEXT_EXTRACTIONS_DIR, exist_ok=True) | |
| if not os.path.exists(chunker_script_path): | |
| logger.error(f"Chunker script not found at '{chunker_script_path}'. Aborting rebuild.") | |
| return jsonify({"error": f"chunker.py not found. Cannot proceed with rebuild."}), 500 | |
| command = [ | |
| sys.executable, | |
| chunker_script_path, | |
| '--sources-dir', RAG_SOURCES_DIR, | |
| '--output-file', chunked_json_output_path, | |
| '--text-output-dir', TEXT_EXTRACTIONS_DIR | |
| ] | |
| try: | |
| process = subprocess.run(command, capture_output=True, text=True, check=True) | |
| logger.info("Chunker script executed successfully.") | |
| logger.info(f"Chunker stdout:\n{process.stdout}") | |
| except subprocess.CalledProcessError as e: | |
| logger.error(f"Chunker script failed with exit code {e.returncode}.") | |
| logger.error(f"Chunker stderr:\n{e.stderr}") | |
| return jsonify({"error": "Step 1 (Chunking) failed.", "details": e.stderr}), 500 | |
| except Exception as e: | |
| logger.error(f"An unexpected error occurred while running the chunker script: {e}", exc_info=True) | |
| return jsonify({"error": f"An unexpected error occurred during the chunking step: {str(e)}"}), 500 | |
| logger.info("Step 2: Rebuilding FAISS index from the newly generated chunks.") | |
| try: | |
| new_rag_system_instance = initialize_and_get_rag_system(force_rebuild=True) | |
| if new_rag_system_instance and new_rag_system_instance.vector_store: | |
| rag_system = new_rag_system_instance | |
| logger.info("FAISS RAG index rebuild completed and new RAG system instance is active.") | |
| updated_status_response = get_faiss_rag_status() | |
| return jsonify({"message": "FAISS RAG index rebuild completed.", "status": updated_status_response.get_json()}), 200 | |
| else: | |
| logger.error("FAISS RAG index rebuild failed during the indexing phase.") | |
| return jsonify({"error": "Step 2 (Indexing) failed. Check logs."}), 500 | |
| except Exception as e: | |
| logger.error(f"Error during admin FAISS index rebuild (indexing phase): {e}", exc_info=True) | |
| return jsonify({"error": f"Failed to rebuild index during indexing phase: {str(e)}"}), 500 | |
| def get_personal_db_status(): | |
| try: | |
| status_info = { | |
| 'personal_data_csv_monitor_status': 'running', | |
| 'file_exists': os.path.exists(personal_data_monitor.database_path), | |
| 'data_loaded': personal_data_monitor.df is not None, 'last_update': None | |
| } | |
| if status_info['file_exists'] and os.path.getmtime(personal_data_monitor.database_path) is not None: | |
| status_info['last_update'] = datetime.fromtimestamp(os.path.getmtime(personal_data_monitor.database_path)).isoformat() | |
| return jsonify(status_info) | |
| except Exception as e: return jsonify({'status': 'error', 'error': str(e)}), 500 | |
| def download_report(): | |
| try: | |
| if not os.path.exists(CHAT_LOG_FILE) or os.path.getsize(CHAT_LOG_FILE) == 0: | |
| return jsonify({'error': 'No chat history available.'}), 404 | |
| return send_file(CHAT_LOG_FILE, mimetype='text/csv', as_attachment=True, download_name=f'chat_history_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv') | |
| except Exception as e: | |
| logger.error(f"Error downloading report: {e}", exc_info=True) | |
| return jsonify({'error': 'Failed to generate report'}), 500 | |
| def create_session_route(): | |
| try: | |
| session_id = str(generate_uuid()) | |
| logger.info(f"New session created: {session_id}") | |
| return jsonify({'status': 'success', 'session_id': session_id}), 200 | |
| except Exception as e: | |
| logger.error(f"Session creation error: {e}", exc_info=True) | |
| return jsonify({'status': 'error', 'message': str(e)}), 500 | |
| def get_version_route(): | |
| return jsonify({'version': '3.9.8-Env-Chat-History'}), 200 # Updated version | |
| def clear_session_history_route(): | |
| data = request.json | |
| session_id = data.get('session_id') | |
| if not session_id: | |
| return jsonify({'status': 'error', 'message': 'session_id is required'}), 400 | |
| history_manager.clear_history(session_id) | |
| return jsonify({'status': 'success', 'message': f'History cleared for session {session_id}'}) | |
| # --- App Cleanup and Startup --- | |
| def cleanup_application(): | |
| if personal_data_monitor: personal_data_monitor.stop() | |
| logger.info("Application cleanup finished.") | |
| atexit.register(cleanup_application) | |
| def load_qa_data_on_startup(): | |
| global embedding_manager | |
| # MODIFIED: Added print statement | |
| print("\n--- Loading QA Source Files ---") | |
| try: | |
| general_qa_path = os.path.join(RAG_SOURCES_DIR, 'general_qa.csv') | |
| personal_qa_path = os.path.join(RAG_SOURCES_DIR, 'personal_qa.csv') | |
| greetings_qa_path = os.path.join(RAG_SOURCES_DIR, 'greetings.csv') | |
| general_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| personal_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| greetings_qa_df = pd.DataFrame(columns=['Question', 'Answer', 'Image']) | |
| if os.path.exists(general_qa_path): | |
| try: | |
| general_qa_df = pd.read_csv(general_qa_path, encoding='cp1252') | |
| print(f"- Loaded: {os.path.basename(general_qa_path)}") | |
| except Exception as e_csv: logger.error(f"Error reading general_qa.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'general_qa.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| if os.path.exists(personal_qa_path): | |
| try: | |
| personal_qa_df = pd.read_csv(personal_qa_path, encoding='cp1252') | |
| print(f"- Loaded: {os.path.basename(personal_qa_path)}") | |
| except Exception as e_csv: logger.error(f"Error reading personal_qa.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'personal_qa.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| if os.path.exists(greetings_qa_path): | |
| try: | |
| greetings_qa_df = pd.read_csv(greetings_qa_path, encoding='cp1252') | |
| print(f"- Loaded: {os.path.basename(greetings_qa_path)}") | |
| except Exception as e_csv: logger.error(f"Error reading greetings.csv: {e_csv}") | |
| else: | |
| logger.warning(f"Optional file 'greetings.csv' not found in '{RAG_SOURCES_DIR}'.") | |
| logger.info(f"Scanning for additional QA sources (.xlsx) in '{RAG_SOURCES_DIR}'...") | |
| if os.path.isdir(RAG_SOURCES_DIR): | |
| xlsx_files_found = [f for f in os.listdir(RAG_SOURCES_DIR) if f.endswith('.xlsx') and os.path.isfile(os.path.join(RAG_SOURCES_DIR, f))] | |
| if xlsx_files_found: | |
| all_general_dfs = [general_qa_df] if not general_qa_df.empty else [] | |
| for xlsx_file in xlsx_files_found: | |
| try: | |
| xlsx_path = os.path.join(RAG_SOURCES_DIR, xlsx_file) | |
| logger.info(f"Processing XLSX source file: {xlsx_file}") | |
| df_excel = pd.read_excel(xlsx_path) | |
| # MODIFIED: New logic to preserve all columns and handle dynamic headers | |
| if 'Pregunta' in df_excel.columns and 'Respuesta' in df_excel.columns: | |
| logger.info(f"Found 'Pregunta' and 'Respuesta' in {xlsx_file}. Preserving all columns.") | |
| # The 'Question' column is required by the EmbeddingManager for semantic search. | |
| # We create it from 'Pregunta' but keep all original columns. | |
| df_excel['Question'] = df_excel['Pregunta'] | |
| all_general_dfs.append(df_excel) | |
| print(f"- Loaded and processing: {xlsx_file}") | |
| else: | |
| logger.warning(f"Skipping XLSX file '{xlsx_file}' as it lacks the required 'Pregunta' and 'Respuesta' columns.") | |
| except Exception as e_xlsx: | |
| logger.error(f"Error processing XLSX file '{xlsx_file}': {e_xlsx}") | |
| if len(all_general_dfs) > 0: | |
| general_qa_df = pd.concat(all_general_dfs, ignore_index=True) | |
| logger.info(f"Successfully merged data from {len(xlsx_files_found)} XLSX file(s) into the general QA set.") | |
| else: | |
| logger.warning(f"Sources directory '{RAG_SOURCES_DIR}' not found. Cannot scan for additional QA files.") | |
| dataframes_to_process = { | |
| "general": general_qa_df, | |
| "personal": personal_qa_df, | |
| "greetings": greetings_qa_df | |
| } | |
| for df_name, df_val in dataframes_to_process.items(): | |
| if df_val.empty: continue | |
| # Normalize text in all columns to prevent issues | |
| for col in df_val.columns: | |
| if not df_val[col].isnull().all(): | |
| df_val[col] = df_val[col].astype(str).apply(normalize_text) | |
| # Ensure 'Question' column exists for embedding manager compatibility | |
| if 'Question' not in df_val.columns: | |
| # For CSVs that might not have 'Pregunta' but have 'Question' | |
| if 'Question' in df_val.columns: | |
| pass # Already exists | |
| else: | |
| df_val['Question'] = None | |
| logger.warning(f"'Question' column missing in {df_name} data. Added empty column.") | |
| embedding_manager.update_embeddings( | |
| dataframes_to_process["general"], | |
| dataframes_to_process["personal"], | |
| dataframes_to_process["greetings"] | |
| ) | |
| logger.info("CSV & XLSX QA data loaded and embeddings initialized.") | |
| except Exception as e: | |
| logger.critical(f"CRITICAL: Error loading or processing QA data: {e}. Semantic QA may not function.", exc_info=True) | |
| # MODIFIED: Added print statement | |
| print("-----------------------------\n") | |
| if __name__ == '__main__': | |
| for folder_path in [os.path.join(_APP_BASE_DIR, 'templates'), | |
| os.path.join(_APP_BASE_DIR, 'static'), | |
| TEXT_EXTRACTIONS_DIR]: | |
| os.makedirs(folder_path, exist_ok=True) | |
| load_qa_data_on_startup() | |
| initialize_chat_log() | |
| logger.info("Attempting to initialize RAG system from llm_handling module...") | |
| rag_system = initialize_and_get_rag_system() | |
| if rag_system: | |
| logger.info("RAG system initialized successfully via llm_handling module.") | |
| else: | |
| logger.warning("RAG system failed to initialize. Document RAG functionality will be unavailable.") | |
| logger.info(f"Flask application starting with Hybrid RAG on {FLASK_APP_HOST}:{FLASK_APP_PORT} Debug: {FLASK_DEBUG_MODE}...") | |
| if not FLASK_DEBUG_MODE: | |
| werkzeug_log = logging.getLogger('werkzeug') | |
| werkzeug_log.setLevel(logging.ERROR) | |
| app.run(host=FLASK_APP_HOST, port=FLASK_APP_PORT, debug=FLASK_DEBUG_MODE, use_reloader=False) |