Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import tempfile | |
| import os | |
| import fitz # PyMuPDF | |
| import uuid | |
| import shutil | |
| from pymilvus import MilvusClient | |
| import json | |
| import sqlite3 | |
| from datetime import datetime | |
| import hashlib | |
| import bcrypt | |
| import re | |
| from typing import List, Dict, Tuple, Optional | |
| import threading | |
| import requests | |
| import base64 | |
| from PIL import Image | |
| import io | |
| import traceback | |
| from score_utilizer import ScoreUtilizer | |
| from middleware import Middleware | |
| from rag import Rag | |
| from pathlib import Path | |
| import subprocess | |
| # importing necessary functions from dotenv library | |
| from dotenv import load_dotenv, dotenv_values | |
| import dotenv | |
| import platform | |
| import time | |
| # Only enable PPT/PPTX conversion on Windows where COM is available | |
| PPT_CONVERT_AVAILABLE = False | |
| if platform.system() == 'Windows': | |
| try: | |
| from pptxtopdf import convert | |
| PPT_CONVERT_AVAILABLE = True | |
| except Exception: | |
| PPT_CONVERT_AVAILABLE = False | |
| # Import libraries for DOC and Excel export | |
| try: | |
| from docx import Document | |
| from docx.shared import Inches, Pt | |
| from docx.enum.text import WD_ALIGN_PARAGRAPH | |
| from docx.enum.style import WD_STYLE_TYPE | |
| from docx.oxml.shared import OxmlElement, qn | |
| from docx.oxml.ns import nsdecls | |
| from docx.oxml import parse_xml | |
| DOCX_AVAILABLE = True | |
| except ImportError: | |
| DOCX_AVAILABLE = False | |
| print("Warning: python-docx not available. DOC export will be disabled.") | |
| try: | |
| import openpyxl | |
| from openpyxl import Workbook | |
| from openpyxl.styles import Font, PatternFill, Alignment, Border, Side | |
| from openpyxl.chart import BarChart, LineChart, PieChart, Reference | |
| from openpyxl.utils.dataframe import dataframe_to_rows | |
| import pandas as pd | |
| EXCEL_AVAILABLE = True | |
| except ImportError: | |
| EXCEL_AVAILABLE = False | |
| print("Warning: openpyxl/pandas not available. Excel export will be disabled.") | |
| # loading variables from .env file | |
| dotenv_file = dotenv.find_dotenv() | |
| dotenv.load_dotenv(dotenv_file) | |
| #kickstart docker and ollama servers | |
| rag = Rag() | |
| # Database for user management and chat history | |
| class DatabaseManager: | |
| def __init__(self, db_path="app_database.db"): | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """Initialize database tables""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Users table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| password_hash TEXT NOT NULL, | |
| team TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Document collections table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS document_collections ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| collection_name TEXT UNIQUE NOT NULL, | |
| team TEXT NOT NULL, | |
| uploaded_by INTEGER, | |
| upload_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| file_count INTEGER DEFAULT 0, | |
| FOREIGN KEY (uploaded_by) REFERENCES users (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def create_user(self, username: str, password: str, team: str) -> bool: | |
| """Create a new user""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Hash password | |
| password_hash = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) | |
| cursor.execute( | |
| 'INSERT INTO users (username, password_hash, team) VALUES (?, ?, ?)', | |
| (username, password_hash.decode('utf-8'), team) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| except sqlite3.IntegrityError: | |
| return False | |
| def authenticate_user(self, username: str, password: str) -> Optional[Dict]: | |
| """Authenticate user and return user info""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT id, username, password_hash, team FROM users WHERE username = ?', (username,)) | |
| user = cursor.fetchone() | |
| conn.close() | |
| if user and bcrypt.checkpw(password.encode('utf-8'), user[2].encode('utf-8')): | |
| return { | |
| 'id': user[0], | |
| 'username': user[1], | |
| 'team': user[3] | |
| } | |
| return None | |
| except Exception as e: | |
| print(f"Authentication error: {e}") | |
| return None | |
| def save_document_collection(self, collection_name: str, team: str, user_id: int, file_count: int): | |
| """Save document collection info""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| 'INSERT OR REPLACE INTO document_collections (collection_name, team, uploaded_by, file_count) VALUES (?, ?, ?, ?)', | |
| (collection_name, team, user_id, file_count) | |
| ) | |
| conn.commit() | |
| conn.close() | |
| except Exception as e: | |
| print(f"Error saving document collection: {e}") | |
| def get_team_collections(self, team: str) -> List[str]: | |
| """Get all collections for a team""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT collection_name FROM document_collections WHERE team = ?', (team,)) | |
| collections = [row[0] for row in cursor.fetchall()] | |
| conn.close() | |
| return collections | |
| except Exception as e: | |
| print(f"Error getting team collections: {e}") | |
| return [] | |
| # User session management | |
| class SessionManager: | |
| def __init__(self): | |
| self.active_sessions = {} | |
| self.session_lock = threading.Lock() | |
| def create_session(self, user_info: Dict) -> str: | |
| """Create a new session for user""" | |
| session_id = str(uuid.uuid4()) | |
| with self.session_lock: | |
| self.active_sessions[session_id] = { | |
| 'user_info': user_info, | |
| 'created_at': datetime.now(), | |
| 'last_activity': datetime.now() | |
| } | |
| return session_id | |
| def get_session(self, session_id: str) -> Optional[Dict]: | |
| """Get session info""" | |
| with self.session_lock: | |
| if session_id in self.active_sessions: | |
| self.active_sessions[session_id]['last_activity'] = datetime.now() | |
| return self.active_sessions[session_id] | |
| return None | |
| def remove_session(self, session_id: str): | |
| """Remove session""" | |
| with self.session_lock: | |
| if session_id in self.active_sessions: | |
| del self.active_sessions[session_id] | |
| # Initialize managers | |
| db_manager = DatabaseManager() | |
| session_manager = SessionManager() | |
| # Create default users if they don't exist | |
| def create_default_users(): | |
| """Create default team users""" | |
| teams = ["Team_A", "Team_B"] | |
| for team in teams: | |
| username = f"admin_{team.lower()}" | |
| password = f"admin123_{team.lower()}" | |
| if not db_manager.authenticate_user(username, password): | |
| db_manager.create_user(username, password, team) | |
| print(f"Created default user: {username} for {team}") | |
| create_default_users() | |
| def start_services(): | |
| # --- Docker Desktop (Windows Only) --- | |
| if platform.system() == "Windows": | |
| def is_docker_desktop_running(): | |
| try: | |
| # Check if "Docker Desktop.exe" is in the task list. | |
| result = subprocess.run( | |
| ["tasklist", "/FI", "IMAGENAME eq Docker Desktop.exe"], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| return "Docker Desktop.exe" in result.stdout.decode() | |
| except Exception as e: | |
| print("Error checking Docker Desktop:", e) | |
| return False | |
| def start_docker_desktop(): | |
| # Adjust this path if your Docker Desktop executable is located elsewhere. | |
| docker_desktop_path = r"C:\Program Files\Docker\Docker\Docker Desktop.exe" | |
| if not os.path.exists(docker_desktop_path): | |
| print("Docker Desktop executable not found. Please verify the installation path.") | |
| return | |
| try: | |
| subprocess.Popen([docker_desktop_path], shell=True) | |
| print("Docker Desktop is starting...") | |
| except Exception as e: | |
| print("Error starting Docker Desktop:", e) | |
| if is_docker_desktop_running(): | |
| print("Docker Desktop is already running.") | |
| else: | |
| print("Docker Desktop is not running. Starting it now...") | |
| start_docker_desktop() | |
| # Wait for Docker Desktop to initialize (adjust delay as needed) | |
| time.sleep(15) | |
| # --- Ollama Server Management --- | |
| def is_ollama_running(): | |
| if platform.system() == "Windows": | |
| try: | |
| # Check for "ollama.exe" in the task list (adjust if the executable name differs) | |
| result = subprocess.run( | |
| ['tasklist', '/FI', 'IMAGENAME eq ollama.exe'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| return "ollama.exe" in result.stdout.decode().lower() | |
| except Exception as e: | |
| print("Error checking Ollama on Windows:", e) | |
| return False | |
| else: | |
| try: | |
| result = subprocess.run( | |
| ['pgrep', '-f', 'ollama'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| return result.returncode == 0 | |
| except Exception as e: | |
| print("Error checking Ollama:", e) | |
| return False | |
| def start_ollama(): | |
| if platform.system() == "Windows": | |
| try: | |
| subprocess.Popen(['ollama', 'serve'], shell=True) | |
| print("Ollama server started on Windows.") | |
| except Exception as e: | |
| print("Failed to start Ollama server on Windows:", e) | |
| else: | |
| try: | |
| subprocess.Popen(['ollama', 'serve']) | |
| print("Ollama server started.") | |
| except Exception as e: | |
| print("Failed to start Ollama server:", e) | |
| # Ollama is no longer used; replaced by Gemini API calls. | |
| # Skip Ollama server checks and startup. | |
| # --- Docker Containers Management --- | |
| def get_docker_containers(): | |
| try: | |
| result = subprocess.run( | |
| ['docker', 'ps', '-aq'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| if result.returncode != 0: | |
| print("Error retrieving Docker containers:", result.stderr.decode()) | |
| return [] | |
| return result.stdout.decode().splitlines() | |
| except Exception as e: | |
| print("Error retrieving Docker containers:", e) | |
| return [] | |
| def get_running_docker_containers(): | |
| try: | |
| result = subprocess.run( | |
| ['docker', 'ps', '-q'], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| if result.returncode != 0: | |
| print("Error retrieving running Docker containers:", result.stderr.decode()) | |
| return [] | |
| return result.stdout.decode().splitlines() | |
| except Exception as e: | |
| print("Error retrieving running Docker containers:", e) | |
| return [] | |
| def start_docker_container(container_id): | |
| try: | |
| result = subprocess.run( | |
| ['docker', 'start', container_id], | |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE | |
| ) | |
| if result.returncode == 0: | |
| print(f"Started Docker container {container_id}.") | |
| else: | |
| print(f"Failed to start Docker container {container_id}: {result.stderr.decode()}") | |
| except Exception as e: | |
| print(f"Error starting Docker container {container_id}: {e}") | |
| all_containers = set(get_docker_containers()) | |
| running_containers = set(get_running_docker_containers()) | |
| stopped_containers = all_containers - running_containers | |
| if stopped_containers: | |
| print(f"Found {len(stopped_containers)} stopped Docker container(s). Starting them...") | |
| for container_id in stopped_containers: | |
| start_docker_container(container_id) | |
| else: | |
| print("All Docker containers are already running.") | |
| # Skip Docker services when running on Hugging Face Spaces | |
| if not os.getenv("SPACE_ID"): | |
| start_services() | |
| else: | |
| print("Running on Hugging Face Spaces - skipping Docker services") | |
| def generate_uuid(state): | |
| # Check if UUID already exists in session state | |
| if state["user_uuid"] is None: | |
| # Generate a new UUID if not already set | |
| state["user_uuid"] = str(uuid.uuid4()) | |
| return state["user_uuid"] | |
| class PDFSearchApp: | |
| def __init__(self): | |
| self.indexed_docs = {} | |
| self.current_pdf = None | |
| self.db_manager = db_manager | |
| self.session_manager = session_manager | |
| self.score_utilizer = ScoreUtilizer() # Initialize score utilizer | |
| def upload_and_convert(self, files, max_pages, folder_name=None): | |
| """Upload and convert files without authentication or team scoping""" | |
| if files is None: | |
| return "No file uploaded" | |
| try: | |
| total_pages = 0 | |
| uploaded_files = [] | |
| # Create simple collection name | |
| if folder_name: | |
| folder_name = folder_name.replace(" ", "_").replace("-", "_") | |
| collection_name = f"{folder_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| else: | |
| collection_name = f"documents_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| # Store the collection name in indexed_docs BEFORE processing files | |
| self.indexed_docs[collection_name] = True | |
| print(f"π Created collection: {collection_name}") | |
| # Clear old collections to ensure only the latest upload is referenced | |
| self._clear_old_collections(collection_name) | |
| for file in files[:]: | |
| # Extract the last part of the path (file name) | |
| filename = os.path.basename(file.name) | |
| name, ext = os.path.splitext(filename) | |
| pdf_path = file.name | |
| # Convert PPT to PDF if needed | |
| if ext.lower() in [".ppt", ".pptx"]: | |
| if PPT_CONVERT_AVAILABLE: | |
| output_file = os.path.splitext(file.name)[0] + '.pdf' | |
| output_directory = os.path.dirname(file.name) | |
| outfile = os.path.join(output_directory, output_file) | |
| convert(file.name, outfile) | |
| pdf_path = outfile | |
| name = os.path.basename(outfile) | |
| name, ext = os.path.splitext(name) | |
| else: | |
| return "PPT/PPTX conversion is only supported on Windows. Please upload PDFs instead." | |
| # Create unique document ID | |
| doc_id = f"{collection_name}_{name.replace(' ', '_').replace('-', '_')}" | |
| print(f"Uploading file: {doc_id}") | |
| middleware = Middleware(collection_name, create_collection=True) | |
| # Pass collection_name as id to ensure images are saved to the right directory | |
| pages = middleware.index(pdf_path, id=collection_name, max_pages=max_pages) | |
| total_pages += len(pages) if pages else 0 | |
| uploaded_files.append(doc_id) | |
| # Get the current active collection after cleanup | |
| current_collection = self.get_current_collection() | |
| status_message = f"Uploaded {len(uploaded_files)} files with {total_pages} total pages to collection: {collection_name}" | |
| if current_collection: | |
| status_message += f"\nβ This is now your active collection for searches." | |
| return status_message | |
| except Exception as e: | |
| return f"Error processing files: {str(e)}" | |
| def _clear_old_collections(self, current_collection_name): | |
| """Clear old collections to ensure only the latest upload is referenced""" | |
| try: | |
| # Get all collections except the current one | |
| collections_to_remove = [name for name in self.indexed_docs.keys() if name != current_collection_name] | |
| if collections_to_remove: | |
| print(f"ποΈ Clearing {len(collections_to_remove)} old collections to maintain latest upload reference") | |
| for old_collection in collections_to_remove: | |
| # Remove from indexed_docs | |
| del self.indexed_docs[old_collection] | |
| # Try to drop the collection from Milvus | |
| try: | |
| middleware = Middleware(old_collection, create_collection=False) | |
| if middleware.drop_collection(): | |
| print(f"ποΈ Successfully dropped Milvus collection '{old_collection}'") | |
| else: | |
| print(f"β οΈ Failed to drop Milvus collection '{old_collection}'") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not clean up Milvus collection '{old_collection}': {e}") | |
| print(f"β Kept only the latest collection: {current_collection_name}") | |
| else: | |
| print(f"β No old collections to clear. Current collection: {current_collection_name}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Error clearing old collections: {e}") | |
| # Don't fail the upload if cleanup fails | |
| def get_current_collection_status(self): | |
| """Get a user-friendly status message about the current collection""" | |
| current_collection = self.get_current_collection() | |
| if current_collection: | |
| return f"β Currently active collection: {current_collection}" | |
| else: | |
| return "β No documents uploaded yet. Please upload a document to get started." | |
| def get_current_collection(self): | |
| """Get the name of the currently active collection (most recent upload)""" | |
| if not self.indexed_docs: | |
| return None | |
| available_collections = list(self.indexed_docs.keys()) | |
| if not available_collections: | |
| return None | |
| # Sort by timestamp to get the most recent one | |
| def extract_timestamp(collection_name): | |
| try: | |
| parts = collection_name.split('_') | |
| if len(parts) >= 3: | |
| date_part = parts[-2] | |
| time_part = parts[-1] | |
| timestamp = f"{date_part}_{time_part}" | |
| return timestamp | |
| return collection_name | |
| except: | |
| return collection_name | |
| available_collections.sort(key=extract_timestamp, reverse=True) | |
| return available_collections[0] | |
| def display_file_list(self, text): | |
| try: | |
| # Retrieve all entries in the specified directory | |
| # Use the same base directory logic as PdfManager | |
| base_output_dir = self._ensure_base_directory() | |
| directory_path = base_output_dir | |
| current_working_directory = os.getcwd() | |
| directory_path = os.path.join(current_working_directory, directory_path) | |
| entries = os.listdir(directory_path) | |
| # Filter out entries that are directories | |
| directories = [entry for entry in entries if os.path.isdir(os.path.join(directory_path, entry))] | |
| return directories | |
| except FileNotFoundError: | |
| return f"The directory {directory_path} does not exist." | |
| except PermissionError: | |
| return f"Permission denied to access {directory_path}." | |
| except Exception as e: | |
| return str(e) | |
| def search_documents(self, query): | |
| print(f"Searching for query: {query}") | |
| print(f"π― MODE: Returning only TOP 3 highest-scoring pages") | |
| if not query: | |
| print("Please enter a search query") | |
| return "Please enter a search query", "--", "Please enter a search query", [], None, None, None, None | |
| try: | |
| # First, check if there are any indexed documents | |
| if not self.indexed_docs: | |
| return "No documents have been uploaded yet. Please upload some documents first.", "--", "No documents available for search", [], None, None, None, None | |
| # Clean up any invalid collections first | |
| print("π§Ή Cleaning up invalid collections...") | |
| removed_count = self._cleanup_invalid_collections() | |
| if removed_count > 0: | |
| print(f"ποΈ Removed {removed_count} invalid collections") | |
| # Check again after cleanup | |
| if not self.indexed_docs: | |
| return "No valid collections found after cleanup. Please re-upload your documents.", "--", "No valid collections available", [], None, None, None, None | |
| # Get the most recent collection name from indexed docs (latest upload) | |
| available_collections = list(self.indexed_docs.keys()) | |
| print(f"π Available collections after cleanup: {available_collections}") | |
| if not available_collections: | |
| return "No collections available for search. Please upload some documents first.", "--", "No collections available", [], None, None, None, None | |
| # Sort collections by timestamp to get the most recent one | |
| # Collections are named like "documents_20250101_120000" or "folder_20250101_120000" | |
| def extract_timestamp(collection_name): | |
| try: | |
| # Extract the timestamp part after the last underscore | |
| parts = collection_name.split('_') | |
| if len(parts) >= 3: | |
| # Get the last two parts which should be date and time | |
| date_part = parts[-2] | |
| time_part = parts[-1] | |
| timestamp = f"{date_part}_{time_part}" | |
| return timestamp | |
| return collection_name | |
| except: | |
| return collection_name | |
| # Sort by timestamp in descending order (most recent first) | |
| available_collections.sort(key=extract_timestamp, reverse=True) | |
| collection_name = available_collections[0] | |
| print(f"π Available collections sorted by timestamp: {available_collections}") | |
| print(f"π Searching in most recent collection: {collection_name}") | |
| # Add collection info to the search results for user clarity | |
| collection_info = f"π Searching in collection: {collection_name}" | |
| middleware = Middleware(collection_name, create_collection=False) | |
| # π― TOP 3 PAGES MODE: Always return only the top 3 highest-scoring pages | |
| # Get more results than needed to allow for intelligent filtering | |
| search_results = middleware.search([query], topk=20)[0] # Get 20 results for better selection | |
| # Fixed to always return top 3 pages | |
| num_results = 3 | |
| print(f"\nπ― TOP 3 PAGES MODE:") | |
| print(f" Always returning: {num_results} highest-scoring pages") | |
| print(f" Selection strategy: Score-based prioritization") | |
| # π COMPREHENSIVE SEARCH RESULTS LOGGING | |
| print(f"\nπ SEARCH RESULTS SUMMARY") | |
| print(f"π Retrieved {len(search_results)} total results from search") | |
| if len(search_results) > 0: | |
| print(f"π Top result score: {search_results[0][0]:.4f}") | |
| print(f"π Bottom result score: {search_results[-1][0]:.4f}") | |
| print(f"π Score range: {search_results[-1][0]:.4f} - {search_results[0][0]:.4f}") | |
| # Show top 5 results with page numbers | |
| print(f"\nπ TOP 5 HIGHEST SCORING PAGES:") | |
| for i, (score, doc_id) in enumerate(search_results[:5], 1): | |
| page_num = doc_id + 1 # Convert to 1-based page numbering | |
| print(f" {i}. Page {page_num} (doc_id: {doc_id}) - Score: {score:.4f}") | |
| # Calculate and display score statistics | |
| scores = [result[0] for result in search_results] | |
| avg_score = sum(scores) / len(scores) | |
| print(f"\nπ SCORE STATISTICS:") | |
| print(f" Average Score: {avg_score:.4f}") | |
| print(f" Score Variance: {sum((s - avg_score) ** 2 for s in scores) / len(scores):.4f}") | |
| # Count pages by relevance level | |
| excellent = sum(1 for s in scores if s >= 0.90) | |
| very_good = sum(1 for s in scores if 0.80 <= s < 0.90) | |
| good = sum(1 for s in scores if 0.70 <= s < 0.80) | |
| moderate = sum(1 for s in scores if 0.60 <= s < 0.70) | |
| basic = sum(1 for s in scores if 0.50 <= s < 0.60) | |
| poor = sum(1 for s in scores if s < 0.50) | |
| print(f"\nπ RELEVANCE DISTRIBUTION:") | |
| print(f" π’ Excellent (β₯0.90): {excellent} pages") | |
| print(f" π‘ Very Good (0.80-0.89): {very_good} pages") | |
| print(f" π Good (0.70-0.79): {good} pages") | |
| print(f" π΅ Moderate (0.60-0.69): {moderate} pages") | |
| print(f" π£ Basic (0.50-0.59): {basic} pages") | |
| print(f" π΄ Poor (<0.50): {poor} pages") | |
| print("-" * 60) | |
| if not search_results: | |
| return "No search results found", "--", "No search results found for your query", [], None, None, None, None | |
| # π― TOP 3 SELECTION: Always select exactly the top 3 highest-scoring pages | |
| selected_results = self._select_top_3_pages(search_results, query) | |
| # π SELECTION LOGGING - Show which pages were selected | |
| print(f"\nπ― PAGE SELECTION RESULTS") | |
| print(f"π Mode: Top 3 highest-scoring pages") | |
| print(f"π Selected: {len(selected_results)} pages") | |
| print(f"π Selection rate: {len(selected_results)/len(search_results)*100:.1f}% of available results") | |
| print("-" * 60) | |
| print(f"π SELECTED PAGES WITH SCORES:") | |
| for i, (score, doc_id) in enumerate(selected_results, 1): | |
| page_num = doc_id + 1 | |
| relevance_level = self._get_relevance_level(score) | |
| print(f" {i}. Page {page_num:2d} (doc_id: {doc_id:2d}) | Score: {score:8.4f} | {relevance_level}") | |
| # Calculate selection statistics | |
| if selected_results: | |
| selected_scores = [result[0] for result in selected_results] | |
| avg_selected_score = sum(selected_scores) / len(selected_scores) | |
| print(f"\nπ SELECTION STATISTICS:") | |
| print(f" Average selected score: {avg_selected_score:.4f}") | |
| print(f" Highest selected score: {selected_scores[0]:.4f}") | |
| print(f" Lowest selected score: {selected_scores[-1]:.4f}") | |
| print(f" Score improvement over average: {avg_selected_score - avg_score:.4f}") | |
| print("-" * 60) | |
| # Process selected results | |
| cited_pages = [] | |
| img_paths = [] | |
| all_paths = [] | |
| page_scores = [] | |
| print(f"π Processing {len(selected_results)} selected results...") | |
| # Ensure base directory exists and get the correct path | |
| base_output_dir = self._ensure_base_directory() | |
| print(f"π Using base directory: {base_output_dir}") | |
| print(f"π Collection name: {collection_name}") | |
| print(f"π Environment: {'Hugging Face Spaces' if self._is_huggingface_spaces() else 'Local Development'}") | |
| for i, (score, doc_id) in enumerate(selected_results): | |
| # π― FIX: Use the actual page number from doc_id, not the index position | |
| # doc_id represents the actual page number in the document | |
| display_page_num = doc_id + 1 # Convert 0-based doc_id to 1-based page number | |
| coll_num = collection_name # Use the current collection name | |
| print(f"π Processing result {i+1}: doc_id={doc_id}, actual_page={display_page_num}, score={score:.4f}") | |
| # Use debug function to get paths and check existence | |
| img_path, path, file_exists = self._debug_file_paths(base_output_dir, coll_num, display_page_num) | |
| if file_exists: | |
| img_paths.append(img_path) | |
| all_paths.append(path) | |
| page_scores.append(score) | |
| cited_pages.append(f"Page {display_page_num} from {coll_num}") | |
| print(f"β Retrieved page {display_page_num}: {img_path} (Score: {score:.3f})") | |
| else: | |
| print(f"β Image file not found: {img_path}") | |
| # Try alternative paths with better fallback logic | |
| alt_paths = [ | |
| # Primary path (should work in Hugging Face Spaces) | |
| img_path, | |
| # Relative paths from app directory | |
| os.path.join(os.path.dirname(os.path.abspath(__file__)), "pages", coll_num, f"page_{display_page_num}.png"), | |
| # Current working directory paths | |
| f"pages/{coll_num}/page_{display_page_num}.png", | |
| f"./pages/{coll_num}/page_{display_page_num}.png", | |
| os.path.join(os.getcwd(), "pages", coll_num, f"page_{display_page_num}.png"), | |
| # Alternative base directories | |
| os.path.join("/tmp", "pages", coll_num, f"page_{display_page_num}.png"), | |
| os.path.join("/home/user", "pages", coll_num, f"page_{display_page_num}.png") | |
| ] | |
| print(f"π Trying alternative paths for page {display_page_num}:") | |
| for alt_path in alt_paths: | |
| print(f" π Checking: {alt_path}") | |
| if os.path.exists(alt_path): | |
| print(f"β Found alternative path: {alt_path}") | |
| img_paths.append(alt_path) | |
| all_paths.append(alt_path.replace(".png", "")) | |
| page_scores.append(score) | |
| cited_pages.append(f"Page {display_page_num} from {coll_num}") | |
| print(f"β Retrieved page {display_page_num}: {alt_path} (Score: {score:.3f})") | |
| break | |
| else: | |
| print(f"β No alternative path found for page {display_page_num}") | |
| print(f"π Final count: {len(img_paths)} valid pages out of {len(selected_results)} selected") | |
| # π FINAL RESULTS SUMMARY | |
| if img_paths: | |
| print(f"\nπ FINAL RETRIEVAL SUMMARY") | |
| print(f"π Successfully retrieved: {len(img_paths)} pages") | |
| print(f"π Final page scores:") | |
| for i, (img_path, score) in enumerate(zip(img_paths, page_scores), 1): | |
| # Extract page number from path | |
| page_num = img_path.split('page_')[1].split('.png')[0] if 'page_' in img_path else f"Page {i}" | |
| print(f" {i}. Page {page_num} - Score: {score:.4f}") | |
| if page_scores: | |
| final_avg_score = sum(page_scores) / len(page_scores) | |
| print(f"\nπ FINAL STATISTICS:") | |
| print(f" Average final score: {final_avg_score:.4f}") | |
| print(f" Highest final score: {max(page_scores):.4f}") | |
| print(f" Lowest final score: {min(page_scores):.4f}") | |
| print("=" * 60) | |
| if not img_paths: | |
| return "No valid image files found", "--", "Error: No valid image files found for the search results", [], None, None, None, None | |
| # π― AUTOMATIC HIGHEST-SCORING PAGES UTILIZATION | |
| self._utilize_highest_scoring_pages(selected_results, query, page_scores) | |
| # Generate RAG response with multiple pages using enhanced approach | |
| try: | |
| print("π€ Generating RAG response...") | |
| rag_response, csv_filepath, doc_filepath, excel_filepath = self._generate_multi_page_response(query, img_paths, cited_pages, page_scores) | |
| print("β RAG response generated successfully") | |
| except Exception as e: | |
| error_code = "RAG001" | |
| error_msg = f"β **Error {error_code}**: Failed to generate RAG response" | |
| print(f"{error_msg}: {str(e)}") | |
| print(f"β Traceback: {traceback.format_exc()}") | |
| # Return error response with proper format | |
| return ( | |
| error_msg, # path | |
| "--", # images | |
| f"{error_msg}\n\n**Details**: {str(e)}\n\n**Error Code**: {error_code}", # llm_answer | |
| cited_pages, # cited_pages_display | |
| None, # csv_download | |
| None, # doc_download | |
| None # excel_download | |
| ) | |
| # Prepare downloads | |
| csv_download = self._prepare_csv_download(csv_filepath) | |
| doc_download = self._prepare_doc_download(doc_filepath) | |
| excel_download = self._prepare_excel_download(excel_filepath) | |
| # Return multiple images if available, otherwise single image | |
| if len(img_paths) > 1: | |
| # Format for Gallery component: list of (image_path, caption) tuples | |
| # Extract page numbers from cited_pages for accurate captions | |
| gallery_images = [] | |
| for i, img_path in enumerate(img_paths): | |
| # Extract page number from cited_pages | |
| page_info = cited_pages[i].split(" from ")[0] # "Page X" | |
| page_num = page_info.split("Page ")[1] # "X" | |
| gallery_images.append((img_path, f"Page {page_num}")) | |
| return ", ".join(all_paths), gallery_images, rag_response, cited_pages, csv_download, doc_download, excel_download | |
| else: | |
| # Single image format | |
| page_info = cited_pages[0].split(" from ")[0] # "Page X" | |
| page_num = page_info.split("Page ")[1] # "X" | |
| return all_paths[0], [(img_paths[0], f"Page {page_num}")], rag_response, cited_pages, csv_download, doc_download, excel_download | |
| except Exception as e: | |
| error_msg = f"Error during search: {str(e)}" | |
| print(f"β Search error: {error_msg}") | |
| # Return exactly 7 outputs to match Gradio expectations | |
| return error_msg, "--", error_msg, [], None, None, None, None | |
| def _select_top_3_pages(self, search_results, query): | |
| """ | |
| Select exactly the top 3 highest-scoring pages | |
| Simplified selection focused on the best 3 pages only | |
| """ | |
| if not search_results: | |
| return [] | |
| # Sort by relevance score (highest first) | |
| sorted_results = sorted(search_results, key=lambda x: x[0], reverse=True) | |
| # Always return exactly the top 3 pages | |
| top_3 = sorted_results[:3] | |
| print(f"\nπ― TOP 3 PAGES SELECTION:") | |
| print(f"π Total available results: {len(search_results)}") | |
| print(f"π― Selected: Top 3 highest-scoring pages") | |
| # Log the selected pages with scores | |
| for i, (score, doc_id) in enumerate(top_3, 1): | |
| page_num = doc_id + 1 | |
| relevance_level = self._get_relevance_level(score) | |
| print(f" {i}. Page {page_num:2d} (doc_id: {doc_id:2d}) | Score: {score:8.4f} | {relevance_level}") | |
| # Calculate selection quality metrics | |
| if top_3: | |
| scores = [result[0] for result in top_3] | |
| avg_score = sum(scores) / len(scores) | |
| print(f"\nπ TOP 3 SELECTION QUALITY:") | |
| print(f" Average score: {avg_score:.4f}") | |
| print(f" Highest score: {scores[0]:.4f}") | |
| print(f" Lowest score: {scores[-1]:.4f}") | |
| print(f" Score range: {scores[0] - scores[-1]:.4f}") | |
| return top_3 | |
| def _select_relevant_pages_new_format(self, search_results, query, num_results): | |
| """ | |
| Legacy function - kept for compatibility but now redirects to top 3 selection | |
| """ | |
| return self._select_top_3_pages(search_results, query) | |
| def _select_highest_scoring_pages(self, sorted_results, query, num_results): | |
| """ | |
| Select pages with highest scores using dynamic thresholds and intelligent filtering | |
| """ | |
| if not sorted_results: | |
| return [] | |
| # Extract scores for analysis | |
| scores = [result[0] for result in sorted_results] | |
| max_score = scores[0] | |
| min_score = scores[-1] | |
| avg_score = sum(scores) / len(scores) | |
| print(f"\nπ― INTELLIGENT PAGE SELECTION ANALYSIS") | |
| print(f"π Score Analysis:") | |
| print(f" Highest Score: {max_score:.4f}") | |
| print(f" Lowest Score: {min_score:.4f}") | |
| print(f" Average Score: {avg_score:.4f}") | |
| print(f" Score Range: {max_score - min_score:.4f}") | |
| # Dynamic threshold calculation | |
| # Use multiple strategies to determine optimal selection | |
| # Strategy 1: Score-based threshold (excellent and very good pages) | |
| excellent_threshold = 0.90 | |
| very_good_threshold = 0.80 | |
| good_threshold = 0.70 | |
| excellent_pages = [r for r in sorted_results if r[0] >= excellent_threshold] | |
| very_good_pages = [r for r in sorted_results if very_good_threshold <= r[0] < excellent_threshold] | |
| good_pages = [r for r in sorted_results if good_threshold <= r[0] < very_good_threshold] | |
| print(f"\nπ RELEVANCE-BASED SELECTION:") | |
| print(f" π’ Excellent pages (β₯{excellent_threshold}): {len(excellent_pages)}") | |
| print(f" π‘ Very Good pages ({very_good_threshold}-{excellent_threshold}): {len(very_good_pages)}") | |
| print(f" π Good pages ({good_threshold}-{very_good_threshold}): {len(good_pages)}") | |
| # Strategy 2: Statistical threshold (top percentile) | |
| top_20_percent = max(1, int(len(sorted_results) * 0.2)) | |
| top_30_percent = max(1, int(len(sorted_results) * 0.3)) | |
| # Strategy 3: Score gap analysis (find natural breaks) | |
| score_gaps = [] | |
| for i in range(len(scores) - 1): | |
| gap = scores[i] - scores[i + 1] | |
| score_gaps.append((gap, i)) | |
| # Find significant score gaps (natural breaks) | |
| score_gaps.sort(reverse=True) | |
| significant_gaps = [gap for gap, idx in score_gaps[:3] if gap > 0.05] # Gaps > 0.05 | |
| print(f"\nπ STATISTICAL ANALYSIS:") | |
| print(f" Top 20% of results: {top_20_percent} pages") | |
| print(f" Top 30% of results: {top_30_percent} pages") | |
| print(f" Significant score gaps found: {len(significant_gaps)}") | |
| # Intelligent selection logic | |
| selected = [] | |
| # Priority 1: Always include excellent pages | |
| selected.extend(excellent_pages) | |
| # Priority 2: Include very good pages if we need more | |
| if len(selected) < num_results: | |
| remaining_slots = num_results - len(selected) | |
| selected.extend(very_good_pages[:remaining_slots]) | |
| # Priority 3: Include good pages if we still need more | |
| if len(selected) < num_results: | |
| remaining_slots = num_results - len(selected) | |
| selected.extend(good_pages[:remaining_slots]) | |
| # Priority 4: If we still need more, use statistical approach | |
| if len(selected) < num_results: | |
| remaining_slots = num_results - len(selected) | |
| # Use top percentile approach | |
| additional_pages = sorted_results[len(selected):len(selected) + remaining_slots] | |
| selected.extend(additional_pages) | |
| # Ensure we don't exceed the requested number | |
| selected = selected[:num_results] | |
| # Log the selection strategy used | |
| print(f"\nπ― SELECTION STRATEGY APPLIED:") | |
| if len(excellent_pages) > 0: | |
| print(f" β Included {len([p for p in selected if p[0] >= excellent_threshold])} excellent pages") | |
| if len(very_good_pages) > 0: | |
| print(f" β Included {len([p for p in selected if very_good_threshold <= p[0] < excellent_threshold])} very good pages") | |
| if len(good_pages) > 0: | |
| print(f" β Included {len([p for p in selected if good_threshold <= p[0] < very_good_threshold])} good pages") | |
| # Calculate quality metrics | |
| if selected: | |
| selected_scores = [s[0] for s in selected] | |
| avg_selected = sum(selected_scores) / len(selected_scores) | |
| quality_improvement = avg_selected - avg_score | |
| print(f"\nπ SELECTION QUALITY METRICS:") | |
| print(f" Average selected score: {avg_selected:.4f}") | |
| print(f" Quality improvement: {quality_improvement:+.4f}") | |
| print(f" Score consistency: {max(selected_scores) - min(selected_scores):.4f}") | |
| return selected | |
| def _get_relevance_level(self, score): | |
| """Get human-readable relevance level based on score""" | |
| if score >= 0.90: | |
| return "π’ EXCELLENT - Highly relevant" | |
| elif score >= 0.80: | |
| return "π‘ VERY GOOD - Very relevant" | |
| elif score >= 0.70: | |
| return "π GOOD - Relevant" | |
| elif score >= 0.60: | |
| return "π΅ MODERATE - Somewhat relevant" | |
| elif score >= 0.50: | |
| return "π£ BASIC - Minimally relevant" | |
| else: | |
| return "π΄ POOR - Not relevant" | |
| def extract_top_scoring_pages_from_logs(self, log_output=None): | |
| """ | |
| Extract and parse highest-scoring pages from log outputs | |
| This function can be used to retrieve the top pages based on logged scores | |
| """ | |
| # This would typically parse actual log output, but for now we'll return | |
| # the current selection results for demonstration | |
| print(f"\nπ EXTRACTING TOP-SCORING PAGES FROM LOGS") | |
| print(f"π This function can parse log outputs to extract highest-scoring pages") | |
| print(f"π― Use this for automated retrieval of best pages based on scores") | |
| # In a real implementation, this would parse log files or capture log output | |
| # For now, we'll return a summary of what would be extracted | |
| return { | |
| "excellent_pages": "Pages with scores β₯ 0.90", | |
| "very_good_pages": "Pages with scores 0.80-0.89", | |
| "good_pages": "Pages with scores 0.70-0.79", | |
| "extraction_method": "Automated log parsing with score thresholds" | |
| } | |
| def get_optimal_page_count(self, search_results, query_complexity="medium"): | |
| """ | |
| Dynamically determine optimal number of pages based on query complexity and score distribution | |
| """ | |
| if not search_results: | |
| return 1 | |
| scores = [result[0] for result in search_results] | |
| max_score = max(scores) | |
| avg_score = sum(scores) / len(scores) | |
| # Base count based on query complexity | |
| base_counts = { | |
| "simple": 2, | |
| "medium": 3, | |
| "complex": 5, | |
| "comprehensive": 7 | |
| } | |
| base_count = base_counts.get(query_complexity, 3) | |
| # Adjust based on score quality | |
| if max_score >= 0.90: | |
| # High-quality results available, can use fewer pages | |
| multiplier = 0.8 | |
| elif max_score >= 0.80: | |
| # Good results, use standard count | |
| multiplier = 1.0 | |
| elif max_score >= 0.70: | |
| # Moderate results, might need more pages | |
| multiplier = 1.2 | |
| else: | |
| # Lower quality results, use more pages for better coverage | |
| multiplier = 1.5 | |
| optimal_count = max(1, int(base_count * multiplier)) | |
| print(f"\nπ― OPTIMAL PAGE COUNT CALCULATION:") | |
| print(f" Query complexity: {query_complexity}") | |
| print(f" Base count: {base_count}") | |
| print(f" Score quality multiplier: {multiplier:.1f}") | |
| print(f" Optimal count: {optimal_count}") | |
| return min(optimal_count, len(search_results)) | |
| def _utilize_highest_scoring_pages(self, selected_results, query, page_scores): | |
| """ | |
| Automatically utilize the highest-scoring pages based on the retrieval results | |
| This method demonstrates how to extract and use the best pages from the logs | |
| """ | |
| print(f"\nπ― AUTOMATIC HIGHEST-SCORING PAGES UTILIZATION") | |
| print("=" * 60) | |
| if not selected_results or not page_scores: | |
| print("β No results or scores available for utilization") | |
| return | |
| # Create a mock log output for demonstration (in real usage, this would come from actual logs) | |
| mock_log_output = self._create_mock_log_output(selected_results, page_scores) | |
| # Parse the log output using ScoreUtilizer | |
| parsed_data = self.score_utilizer.parse_log_output(mock_log_output) | |
| # Get highest-scoring pages | |
| top_pages = self.score_utilizer.get_highest_scoring_pages(parsed_data, 3) | |
| excellent_pages = self.score_utilizer.get_pages_by_threshold(parsed_data, 0.90) | |
| very_good_pages = self.score_utilizer.get_pages_by_threshold(parsed_data, 0.80) | |
| print(f"π UTILIZATION RESULTS:") | |
| print(f" Top 3 highest-scoring pages identified") | |
| print(f" π’ Excellent pages (β₯0.90): {len(excellent_pages)}") | |
| print(f" π‘ Very Good pages (β₯0.80): {len(very_good_pages)}") | |
| # Generate utilization report | |
| utilization_report = self.score_utilizer.generate_utilization_report(parsed_data) | |
| print(f"\n{utilization_report}") | |
| # Store utilization data for potential future use | |
| self._store_utilization_data(parsed_data, query) | |
| print("β Highest-scoring pages utilization completed") | |
| print("=" * 60) | |
| def _create_mock_log_output(self, selected_results, page_scores): | |
| """ | |
| Create a mock log output for demonstration purposes | |
| In real usage, this would capture actual log output from the retrieval process | |
| """ | |
| log_lines = [] | |
| log_lines.append("=" * 80) | |
| log_lines.append("π RETRIEVAL SCORES - PAGE NUMBERS WITH HIGHEST SCORES") | |
| log_lines.append("=" * 80) | |
| log_lines.append("π Collection: current_collection") | |
| log_lines.append(f"π Total documents found: {len(selected_results)}") | |
| log_lines.append(f"π― Requested top-k: {len(selected_results)}") | |
| log_lines.append("-" * 80) | |
| for i, ((score, doc_id), page_score) in enumerate(zip(selected_results, page_scores)): | |
| page_num = doc_id + 1 | |
| relevance_level = self._get_relevance_level(score) | |
| log_lines.append(f"π Page {page_num:2d} (doc_id: {doc_id:2d}) | Score: {score:8.4f} | {relevance_level}") | |
| log_lines.append("-" * 80) | |
| log_lines.append("π HIGHEST SCORING PAGES:") | |
| top_3 = selected_results[:3] | |
| for i, (score, doc_id) in enumerate(top_3, 1): | |
| page_num = doc_id + 1 | |
| log_lines.append(f" {i}. Page {page_num} - Score: {score:.4f}") | |
| log_lines.append("=" * 80) | |
| return "\n".join(log_lines) | |
| def _store_utilization_data(self, parsed_data, query): | |
| """ | |
| Store utilization data for future reference and analysis | |
| """ | |
| try: | |
| # In a real implementation, this would store to a database or file | |
| utilization_record = { | |
| 'query': query, | |
| 'timestamp': datetime.now().isoformat(), | |
| 'top_pages': parsed_data.get('top_pages', []), | |
| 'statistics': parsed_data.get('statistics', {}), | |
| 'relevance_distribution': parsed_data.get('relevance_distribution', {}) | |
| } | |
| # For now, just log the utilization data | |
| print(f"πΎ Utilization data stored for query: '{query[:50]}...'") | |
| print(f" Top pages: {len(utilization_record['top_pages'])}") | |
| print(f" Statistics available: {len(utilization_record['statistics'])} metrics") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not store utilization data: {e}") | |
| def _analyze_query_complexity(self, query): | |
| """ | |
| Analyze query complexity to determine optimal page count | |
| """ | |
| query_lower = query.lower() | |
| # Simple queries (1-2 concepts) | |
| simple_indicators = ['what is', 'define', 'explain', 'how many', 'when', 'where'] | |
| simple_count = sum(1 for indicator in simple_indicators if indicator in query_lower) | |
| # Complex queries (multiple concepts, comparisons, analysis) | |
| complex_indicators = ['compare', 'analyze', 'evaluate', 'relationship', 'difference', 'similarity', 'versus', 'vs'] | |
| complex_count = sum(1 for indicator in complex_indicators if indicator in query_lower) | |
| # Comprehensive queries (detailed analysis, multiple aspects) | |
| comprehensive_indicators = ['comprehensive', 'detailed', 'complete', 'thorough', 'all aspects', 'everything about'] | |
| comprehensive_count = sum(1 for indicator in comprehensive_indicators if indicator in query_lower) | |
| # Count question words and conjunctions | |
| question_words = query_lower.count('?') + query_lower.count(' and ') + query_lower.count(' or ') + query_lower.count(' but ') | |
| # Determine complexity | |
| if comprehensive_count > 0 or question_words > 2: | |
| return "comprehensive" | |
| elif complex_count > 0 or question_words > 1: | |
| return "complex" | |
| elif simple_count > 0 and question_words <= 1: | |
| return "simple" | |
| else: | |
| return "medium" | |
| def delete_documents(self, collection_name=None): | |
| """ | |
| Delete documents and their associated collections from the system | |
| Args: | |
| collection_name: Name of the collection to delete. If None, deletes all collections. | |
| Returns: | |
| Status message about the deletion operation | |
| """ | |
| try: | |
| print(f"ποΈ DELETE DOCUMENTS REQUESTED") | |
| print(f"π Collection to delete: {collection_name if collection_name else 'ALL COLLECTIONS'}") | |
| if not self.indexed_docs: | |
| return "β No documents found to delete. Please upload some documents first." | |
| deleted_collections = [] | |
| deleted_files = [] | |
| if collection_name: | |
| # Delete specific collection | |
| if collection_name in self.indexed_docs: | |
| collection_info = self.indexed_docs[collection_name] | |
| # Delete from Milvus | |
| try: | |
| middleware = Middleware(collection_name, create_collection=False) | |
| middleware.drop_collection() | |
| print(f"β Dropped Milvus collection: {collection_name}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not drop Milvus collection {collection_name}: {e}") | |
| # Delete page images | |
| try: | |
| base_output_dir = self._ensure_base_directory() | |
| collection_dir = os.path.join(base_output_dir, collection_name) | |
| if os.path.exists(collection_dir): | |
| shutil.rmtree(collection_dir) | |
| print(f"β Deleted page images directory: {collection_dir}") | |
| deleted_files.append(f"Page images: {collection_dir}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not delete page images for {collection_name}: {e}") | |
| # Remove from indexed_docs | |
| del self.indexed_docs[collection_name] | |
| deleted_collections.append(collection_name) | |
| return f"β Successfully deleted collection '{collection_name}'\nπ Deleted: {len(deleted_files)} file/directory items" | |
| else: | |
| return f"β Collection '{collection_name}' not found. Available collections: {list(self.indexed_docs.keys())}" | |
| else: | |
| # Delete all collections | |
| for coll_name in list(self.indexed_docs.keys()): | |
| try: | |
| # Delete from Milvus | |
| middleware = Middleware(coll_name, create_collection=False) | |
| middleware.drop_collection() | |
| print(f"β Dropped Milvus collection: {coll_name}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not drop Milvus collection {coll_name}: {e}") | |
| # Delete page images | |
| try: | |
| base_output_dir = self._ensure_base_directory() | |
| collection_dir = os.path.join(base_output_dir, coll_name) | |
| if os.path.exists(collection_dir): | |
| shutil.rmtree(collection_dir) | |
| print(f"β Deleted page images directory: {collection_dir}") | |
| deleted_files.append(f"Page images: {collection_dir}") | |
| except Exception as e: | |
| print(f"β οΈ Warning: Could not delete page images for {coll_name}: {e}") | |
| deleted_collections.append(coll_name) | |
| # Clear all indexed docs | |
| self.indexed_docs.clear() | |
| return f"β Successfully deleted ALL collections ({len(deleted_collections)} total)\nπ Deleted: {len(deleted_files)} file/directory items\nποΈ Collections deleted: {', '.join(deleted_collections)}" | |
| except Exception as e: | |
| error_msg = f"β Error during document deletion: {str(e)}" | |
| print(f"{error_msg}") | |
| print(f"β Traceback: {traceback.format_exc()}") | |
| return error_msg | |
| def get_available_collections(self): | |
| """ | |
| Get list of available collections for deletion | |
| Returns: | |
| List of collection names and their details | |
| """ | |
| if not self.indexed_docs: | |
| return "No collections available for deletion." | |
| collection_list = [] | |
| for collection_name, collection_info in self.indexed_docs.items(): | |
| collection_list.append(f"π {collection_name}") | |
| if isinstance(collection_info, dict): | |
| if 'files' in collection_info: | |
| collection_list.append(f" π Files: {len(collection_info['files'])}") | |
| if 'pages' in collection_info: | |
| collection_list.append(f" π Pages: {collection_info['pages']}") | |
| collection_list.append("") | |
| return "\n".join(collection_list) | |
| def _optimize_consecutive_pages(self, selected, all_results, target_count=None): | |
| """ | |
| Optimize selection to include consecutive pages when beneficial | |
| """ | |
| # Group by collection | |
| collection_pages = {} | |
| for score, page_num, coll_num in selected: | |
| if coll_num not in collection_pages: | |
| collection_pages[coll_num] = [] | |
| collection_pages[coll_num].append((score, page_num, coll_num)) | |
| optimized = [] | |
| for coll_num, pages in collection_pages.items(): | |
| if len(pages) > 1: | |
| # Check if pages are consecutive | |
| page_nums = [p[1] for p in pages] | |
| page_nums.sort() | |
| # If pages are consecutive, add any missing pages in between | |
| if max(page_nums) - min(page_nums) == len(page_nums) - 1: | |
| # Find all pages in this range from all_results | |
| for score, page_num, coll in all_results: | |
| if (coll == coll_num and | |
| min(page_nums) <= page_num <= max(page_nums) and | |
| (score, page_num, coll) not in optimized): | |
| optimized.append((score, page_num, coll)) | |
| else: | |
| optimized.extend(pages) | |
| else: | |
| optimized.extend(pages) | |
| # Ensure we maintain the target count if specified | |
| if target_count and len(optimized) != target_count: | |
| if len(optimized) > target_count: | |
| # Trim to target count, keeping highest scoring | |
| optimized.sort(key=lambda x: x[0], reverse=True) | |
| optimized = optimized[:target_count] | |
| elif len(optimized) < target_count: | |
| # Add more pages to reach target | |
| for score, page_num, coll in all_results: | |
| if (score, page_num, coll) not in optimized and len(optimized) < target_count: | |
| optimized.append((score, page_num, coll)) | |
| return optimized | |
| def _generate_comprehensive_analysis(self, query, cited_pages, page_scores): | |
| """ | |
| Generate comprehensive analysis section based on research strategies | |
| Implements hierarchical retrieval insights and cross-reference analysis | |
| """ | |
| try: | |
| # Analyze query complexity and information needs | |
| query_lower = query.lower() | |
| # Determine query type for targeted analysis | |
| query_types = [] | |
| if any(word in query_lower for word in ['compare', 'difference', 'similarities', 'versus']): | |
| query_types.append("Comparative Analysis") | |
| if any(word in query_lower for word in ['procedure', 'method', 'how to', 'steps']): | |
| query_types.append("Procedural Information") | |
| if any(word in query_lower for word in ['safety', 'warning', 'danger', 'risk']): | |
| query_types.append("Safety Information") | |
| if any(word in query_lower for word in ['specification', 'technical', 'measurement', 'data']): | |
| query_types.append("Technical Specifications") | |
| if any(word in query_lower for word in ['overview', 'summary', 'comprehensive', 'complete']): | |
| query_types.append("Comprehensive Overview") | |
| if any(word in query_lower for word in ['table', 'csv', 'spreadsheet', 'data', 'list', 'chart']): | |
| query_types.append("Tabular Data Request") | |
| # Calculate information quality metrics | |
| avg_score = sum(page_scores) / len(page_scores) if page_scores else 0 | |
| score_variance = sum((score - avg_score) ** 2 for score in page_scores) / len(page_scores) if page_scores else 0 | |
| # Generate analysis insights | |
| analysis = f""" | |
| π¬ **Comprehensive Analysis & Insights**: | |
| π **Query Analysis**: | |
| β’ Query Type: {', '.join(query_types) if query_types else 'General Information'} | |
| β’ Information Complexity: {'High' if len(cited_pages) > 3 else 'Medium' if len(cited_pages) > 1 else 'Low'} | |
| β’ Cross-Reference Depth: {'Excellent' if len(set([p.split(' from ')[1].split(' (')[0] for p in cited_pages])) > 2 else 'Good' if len(set([p.split(' from ')[1].split(' (')[0] for p in cited_pages])) > 1 else 'Limited'} | |
| π **Information Quality Assessment**: | |
| β’ Average Relevance: {avg_score:.3f} ({'Excellent' if avg_score > 0.9 else 'Very Good' if avg_score > 0.8 else 'Good' if avg_score > 0.7 else 'Moderate' if avg_score > 0.6 else 'Basic'}) | |
| β’ Information Consistency: {'High' if score_variance < 0.1 else 'Moderate' if score_variance < 0.2 else 'Variable'} | |
| β’ Source Reliability: {'High' if avg_score > 0.8 and len(cited_pages) > 2 else 'Moderate' if avg_score > 0.6 else 'Requires Verification'} | |
| π― **Information Coverage Analysis**: | |
| β’ Primary Information: {'Comprehensive' if any('primary' in p.lower() or 'main' in p.lower() for p in cited_pages) else 'Standard'} | |
| β’ Supporting Details: {'Extensive' if len(cited_pages) > 3 else 'Adequate' if len(cited_pages) > 1 else 'Basic'} | |
| β’ Technical Depth: {'High' if any('technical' in p.lower() or 'specification' in p.lower() for p in cited_pages) else 'Standard'} | |
| π‘ **Strategic Insights**: | |
| β’ Information Gaps: {'Minimal' if avg_score > 0.8 and len(cited_pages) > 3 else 'Moderate' if avg_score > 0.6 else 'Significant - consider additional sources'} | |
| β’ Cross-Validation: {'Strong' if len(set([p.split(' from ')[1].split(' (')[0] for p in cited_pages])) > 1 else 'Limited to single source'} | |
| β’ Practical Applicability: {'High' if any('procedure' in p.lower() or 'method' in p.lower() for p in cited_pages) else 'Moderate'} | |
| π **Recommendations for Further Research**: | |
| β’ {'Consider additional technical specifications' if not any('technical' in p.lower() for p in cited_pages) else 'Technical coverage adequate'} | |
| β’ {'Seek safety guidelines and warnings' if not any('safety' in p.lower() for p in cited_pages) else 'Safety information included'} | |
| β’ {'Look for comparative analysis' if not any('compare' in p.lower() for p in cited_pages) else 'Comparative analysis available'} | |
| """ | |
| return analysis | |
| except Exception as e: | |
| print(f"Error generating comprehensive analysis: {e}") | |
| return "π¬ **Analysis**: Comprehensive analysis of retrieved information completed." | |
| def _detect_table_request(self, query): | |
| """ | |
| Detect if the user is requesting tabular data | |
| """ | |
| query_lower = query.lower() | |
| table_keywords = [ | |
| 'table', 'csv', 'spreadsheet', 'data table', 'list', 'chart', | |
| 'tabular', 'matrix', 'grid', 'dataset', 'data set', | |
| 'show me a table', 'create a table', 'generate table', | |
| 'in table format', 'as a table', 'tabular format' | |
| ] | |
| return any(keyword in query_lower for keyword in table_keywords) | |
| def _detect_report_request(self, query): | |
| """ | |
| Detect if the user is requesting a comprehensive report | |
| """ | |
| query_lower = query.lower() | |
| report_keywords = [ | |
| 'report', 'comprehensive report', 'detailed report', 'full report', | |
| 'complete report', 'comprehensive analysis', 'detailed analysis', | |
| 'full analysis', 'complete analysis', 'comprehensive overview', | |
| 'detailed overview', 'full overview', 'complete overview', | |
| 'comprehensive summary', 'detailed summary', 'full summary', | |
| 'complete summary', 'comprehensive document', 'detailed document', | |
| 'full document', 'complete document', 'comprehensive review', | |
| 'detailed review', 'full review', 'complete review', | |
| 'export report', 'generate report', 'create report', | |
| 'doc format', 'word document', 'word doc', 'document format' | |
| ] | |
| return any(keyword in query_lower for keyword in report_keywords) | |
| def _detect_chart_request(self, query): | |
| """ | |
| Detect if the user is requesting charts, graphs, or visualizations | |
| """ | |
| query_lower = query.lower() | |
| chart_keywords = [ | |
| 'chart', 'graph', 'bar chart', 'line chart', 'pie chart', | |
| 'bar graph', 'line graph', 'pie graph', 'histogram', | |
| 'scatter plot', 'scatter chart', 'area chart', 'column chart', | |
| 'visualization', 'visualize', 'plot', 'figure', 'diagram', | |
| 'excel chart', 'excel graph', 'spreadsheet chart', | |
| 'create chart', 'generate chart', 'make chart', | |
| 'create graph', 'generate graph', 'make graph', | |
| 'chart data', 'graph data', 'plot data', 'visualize data', | |
| 'bar graph', 'line graph', 'pie graph', 'histogram', | |
| 'scatter plot', 'area chart', 'column chart' | |
| ] | |
| return any(keyword in query_lower for keyword in chart_keywords) | |
| def _extract_custom_headers(self, query): | |
| """ | |
| Extract custom headers from user query for both tables and charts | |
| Examples: | |
| - "create table with columns: Name, Age, Department" | |
| - "create chart with headers: Threat Type, Frequency, Risk Level" | |
| - "excel export with columns: Category, Value, Description" | |
| """ | |
| try: | |
| # Look for header specifications in the query | |
| header_patterns = [ | |
| r'columns?:\s*([^,]+(?:,\s*[^,]+)*)', # "columns: A, B, C" | |
| r'headers?:\s*([^,]+(?:,\s*[^,]+)*)', # "headers: A, B, C" | |
| r'\bwith\s+columns?\s*([^,]+(?:,\s*[^,]+)*)', # "with columns A, B, C" | |
| r'\bwith\s+headers?\s*([^,]+(?:,\s*[^,]+)*)', # "with headers A, B, C" | |
| r'headers?\s*=\s*([^,]+(?:,\s*[^,]+)*)', # "headers = A, B, C" | |
| r'format:\s*([^,]+(?:,\s*[^,]+)*)', # "format: A, B, C" | |
| r'chart\s+headers?:\s*([^,]+(?:,\s*[^,]+)*)', # "chart headers: A, B, C" | |
| r'excel\s+headers?:\s*([^,]+(?:,\s*[^,]+)*)', # "excel headers: A, B, C" | |
| r'chart\s+with\s+headers?:\s*([^,]+(?:,\s*[^,]+)*)', # "chart with headers: A, B, C" | |
| r'excel\s+with\s+headers?:\s*([^,]+(?:,\s*[^,]+)*)', # "excel with headers: A, B, C" | |
| ] | |
| for pattern in header_patterns: | |
| match = re.search(pattern, query, re.IGNORECASE) | |
| if match: | |
| headers_str = match.group(1) | |
| # Split by comma and clean up | |
| headers = [h.strip() for h in headers_str.split(',')] | |
| # Remove empty headers | |
| headers = [h for h in headers if h] | |
| if headers: | |
| print(f"π Custom headers detected: {headers}") | |
| return headers | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting custom headers: {e}") | |
| return None | |
| def _generate_csv_table_response(self, query, rag_response, cited_pages, page_scores): | |
| """ | |
| Generate a CSV table response when user requests tabular data | |
| """ | |
| try: | |
| # Extract custom headers from query if specified | |
| custom_headers = self._extract_custom_headers(query) | |
| # Extract structured data from the RAG response | |
| csv_data = self._extract_structured_data(rag_response, cited_pages, page_scores, custom_headers) | |
| if csv_data: | |
| # Format as CSV | |
| csv_content = self._format_as_csv(csv_data) | |
| # Generate a unique filename for the CSV | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_query = "".join(c for c in query[:30] if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| safe_query = safe_query.replace(' ', '_') | |
| filename = f"table_{safe_query}_{timestamp}.csv" | |
| filepath = os.path.join("temp", filename) | |
| # Ensure temp directory exists | |
| os.makedirs("temp", exist_ok=True) | |
| # Save CSV file | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(csv_content) | |
| # Create enhanced response with CSV and download link | |
| header_info = "" | |
| if custom_headers: | |
| header_info = f""" | |
| π **Custom Headers Applied**: | |
| β’ Headers: {', '.join(custom_headers)} | |
| β’ Data automatically mapped to your specified columns | |
| """ | |
| table_response = f""" | |
| {rag_response} | |
| π **CSV Table Generated Successfully**: | |
| ```csv | |
| {csv_content} | |
| ``` | |
| {header_info} | |
| πΎ **Download Options**: | |
| β’ **Direct Download**: Click the download button below | |
| β’ **Manual Copy**: Copy the CSV content above and save as .csv file | |
| π **Table Information**: | |
| β’ Rows: {len(csv_data) if csv_data else 0} | |
| β’ Columns: {len(csv_data[0]) if csv_data and len(csv_data) > 0 else 0} | |
| β’ Data Source: {len(cited_pages)} document pages | |
| β’ Filename: {filename} | |
| """ | |
| return table_response, filepath | |
| else: | |
| # Fallback if no structured data found | |
| header_suggestion = "" | |
| if custom_headers: | |
| header_suggestion = f""" | |
| π **Custom Headers Detected**: {', '.join(custom_headers)} | |
| The system found your specified headers but couldn't extract matching data from the response. | |
| """ | |
| fallback_response = f""" | |
| {rag_response} | |
| π **Table Request Detected**: | |
| The system detected you requested tabular data, but the current response doesn't contain structured information suitable for a CSV table. | |
| {header_suggestion} | |
| π‘ **Suggestions**: | |
| β’ Try asking for specific data types (e.g., "list of safety procedures", "compare different methods") | |
| β’ Request numerical data or comparisons | |
| β’ Ask for categorized information | |
| β’ Specify custom headers: "create table with columns: Name, Age, Department" | |
| """ | |
| return fallback_response, None | |
| except Exception as e: | |
| print(f"Error generating CSV table response: {e}") | |
| return rag_response, None | |
| def _extract_structured_data(self, rag_response, cited_pages, page_scores, custom_headers=None): | |
| """ | |
| Extract ANY structured data from RAG response - no predefined templates | |
| """ | |
| try: | |
| lines = rag_response.split('\n') | |
| structured_data = [] | |
| # If user specified custom headers, try to extract data that fits | |
| if custom_headers: | |
| headers = custom_headers | |
| structured_data = [headers] | |
| # Extract any data that could fit the headers | |
| data_rows = [] | |
| # Look for any structured content in the response | |
| for line in lines: | |
| line = line.strip() | |
| if line and not line.startswith('#'): # Skip markdown headers | |
| # Try to extract meaningful data from each line | |
| data_row = self._extract_data_from_line(line, headers) | |
| if data_row: | |
| data_rows.append(data_row) | |
| # If we found data, use it; otherwise create placeholder rows | |
| if data_rows: | |
| structured_data.extend(data_rows) | |
| else: | |
| # Create placeholder rows based on available content | |
| for i, citation in enumerate(cited_pages): | |
| row = self._create_placeholder_row(citation, headers, i) | |
| structured_data.append(row) | |
| return structured_data | |
| # No custom headers - let's be smart about what we find | |
| else: | |
| # Look for any obvious table-like structures first | |
| table_data = self._find_table_structures(lines) | |
| if table_data: | |
| return table_data | |
| # Look for any structured lists or data | |
| list_data = self._find_list_structures(lines) | |
| if list_data: | |
| return list_data | |
| # Look for any key-value patterns | |
| kv_data = self._find_key_value_structures(lines) | |
| if kv_data: | |
| return kv_data | |
| # Last resort: create a simple summary | |
| return self._create_summary_table(cited_pages) | |
| except Exception as e: | |
| print(f"Error extracting structured data: {e}") | |
| return None | |
| def _extract_data_from_line(self, line, headers): | |
| """Extract data from a line that could fit the specified headers""" | |
| try: | |
| # Remove common prefixes | |
| line = re.sub(r'^[\dβ’\-\.\s]+', '', line) | |
| # If we have multiple headers, try to split the line | |
| if len(headers) > 1: | |
| # Look for natural splits (commas, semicolons, etc.) | |
| if ',' in line: | |
| parts = [p.strip() for p in line.split(',')] | |
| elif ';' in line: | |
| parts = [p.strip() for p in line.split(';')] | |
| elif ' - ' in line: | |
| parts = [p.strip() for p in line.split(' - ')] | |
| elif ':' in line: | |
| parts = [p.strip() for p in line.split(':', 1)] | |
| else: | |
| # Just put the whole line in the first column | |
| parts = [line] + [''] * (len(headers) - 1) | |
| # Pad or truncate to match header count | |
| while len(parts) < len(headers): | |
| parts.append('') | |
| return parts[:len(headers)] | |
| else: | |
| return [line] | |
| except Exception as e: | |
| print(f"Error extracting data from line: {e}") | |
| return None | |
| def _create_placeholder_row(self, citation, headers, index): | |
| """Create a placeholder row based on available data""" | |
| try: | |
| row = [] | |
| for header in headers: | |
| header_lower = header.lower() | |
| if 'page' in header_lower or 'number' in header_lower: | |
| page_num = citation.split('Page ')[1].split(' from')[0] if 'Page ' in citation else str(index + 1) | |
| row.append(page_num) | |
| elif 'collection' in header_lower or 'source' in header_lower or 'document' in header_lower: | |
| collection = citation.split(' from ')[1] if ' from ' in citation else 'Unknown' | |
| row.append(collection) | |
| elif 'content' in header_lower or 'description' in header_lower or 'summary' in header_lower: | |
| row.append(f"Content from {citation}") | |
| else: | |
| # For unknown headers, try to extract something relevant | |
| if 'page' in citation: | |
| row.append(citation) | |
| else: | |
| row.append('') | |
| return row | |
| except Exception as e: | |
| print(f"Error creating placeholder row: {e}") | |
| return [''] * len(headers) | |
| def _find_table_structures(self, lines): | |
| """Find any table-like structures in the text""" | |
| try: | |
| table_lines = [] | |
| for line in lines: | |
| line = line.strip() | |
| # Look for lines with multiple columns (separated by |, tabs, or multiple spaces) | |
| if '|' in line or '\t' in line or re.search(r'\s{3,}', line): | |
| table_lines.append(line) | |
| if table_lines: | |
| # Try to determine headers from the first line | |
| first_line = table_lines[0] | |
| if '|' in first_line: | |
| headers = [h.strip() for h in first_line.split('|')] | |
| else: | |
| headers = re.split(r'\s{3,}', first_line) | |
| structured_data = [headers] | |
| # Process remaining lines | |
| for line in table_lines[1:]: | |
| if '|' in line: | |
| columns = [col.strip() for col in line.split('|')] | |
| else: | |
| columns = re.split(r'\s{3,}', line) | |
| if len(columns) >= 2: | |
| structured_data.append(columns) | |
| return structured_data | |
| return None | |
| except Exception as e: | |
| print(f"Error finding table structures: {e}") | |
| return None | |
| def _find_list_structures(self, lines): | |
| """Find any list-like structures in the text""" | |
| try: | |
| items = [] | |
| for line in lines: | |
| line = line.strip() | |
| # Remove common list markers | |
| if re.match(r'^[\dβ’\-\.]+', line): | |
| item = re.sub(r'^[\dβ’\-\.\s]+', '', line) | |
| if item: | |
| items.append(item) | |
| if items: | |
| # Create a simple list structure | |
| structured_data = [['Item', 'Description']] | |
| for i, item in enumerate(items, 1): | |
| structured_data.append([str(i), item]) | |
| return structured_data | |
| return None | |
| except Exception as e: | |
| print(f"Error finding list structures: {e}") | |
| return None | |
| def _find_key_value_structures(self, lines): | |
| """Find any key-value structures in the text""" | |
| try: | |
| kv_pairs = [] | |
| for line in lines: | |
| line = line.strip() | |
| # Look for key: value patterns | |
| if re.match(r'^[A-Za-z\s]+:\s+', line): | |
| kv_pairs.append(line) | |
| if kv_pairs: | |
| structured_data = [['Property', 'Value']] | |
| for pair in kv_pairs: | |
| if ':' in pair: | |
| key, value = pair.split(':', 1) | |
| structured_data.append([key.strip(), value.strip()]) | |
| return structured_data | |
| return None | |
| except Exception as e: | |
| print(f"Error finding key-value structures: {e}") | |
| return None | |
| def _create_summary_table(self, cited_pages): | |
| """Create a simple summary table as last resort""" | |
| try: | |
| structured_data = [['Page', 'Collection', 'Content']] | |
| for i, citation in enumerate(cited_pages): | |
| collection = citation.split(' from ')[1] if ' from ' in citation else 'Unknown' | |
| page_num = citation.split('Page ')[1].split(' from')[0] if 'Page ' in citation else str(i+1) | |
| structured_data.append([page_num, collection, f"Content from {citation}"]) | |
| return structured_data | |
| except Exception as e: | |
| print(f"Error creating summary table: {e}") | |
| return None | |
| except Exception as e: | |
| print(f"Error extracting structured data: {e}") | |
| return None | |
| def _format_as_csv(self, data): | |
| """ | |
| Format structured data as CSV | |
| """ | |
| try: | |
| csv_lines = [] | |
| for row in data: | |
| # Escape commas and quotes in CSV | |
| escaped_row = [] | |
| for cell in row: | |
| cell_str = str(cell) | |
| if ',' in cell_str or '"' in cell_str or '\n' in cell_str: | |
| # Escape quotes and wrap in quotes | |
| cell_str = '"' + cell_str.replace('"', '""') + '"' | |
| escaped_row.append(cell_str) | |
| csv_lines.append(','.join(escaped_row)) | |
| return '\n'.join(csv_lines) | |
| except Exception as e: | |
| print(f"Error formatting CSV: {e}") | |
| return "Error,Generating,CSV,Format" | |
| def _prepare_csv_download(self, csv_filepath): | |
| """ | |
| Prepare CSV file for download in Gradio | |
| """ | |
| if csv_filepath and os.path.exists(csv_filepath): | |
| return csv_filepath | |
| else: | |
| return None | |
| def _generate_comprehensive_doc_report(self, query, rag_response, cited_pages, page_scores, user_info=None): | |
| """ | |
| Generate a comprehensive DOC report with proper formatting and structure | |
| """ | |
| if not DOCX_AVAILABLE: | |
| return None, "DOC export not available - python-docx library not installed" | |
| try: | |
| print("π [REPORT] Generating comprehensive DOC report...") | |
| # Create a new Document | |
| doc = Document() | |
| # Set up document styles | |
| self._setup_document_styles(doc) | |
| # Add title page | |
| self._add_title_page(doc, query, user_info) | |
| # Add executive summary | |
| self._add_executive_summary(doc, query, rag_response) | |
| # Add detailed analysis | |
| self._add_detailed_analysis(doc, rag_response, cited_pages, page_scores) | |
| # Add methodology | |
| self._add_methodology_section(doc, cited_pages, page_scores) | |
| # Add findings and conclusions | |
| self._add_findings_conclusions(doc, rag_response, cited_pages) | |
| # Add appendices | |
| self._add_appendices(doc, cited_pages, page_scores) | |
| # Generate unique filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_query = "".join(c for c in query[:30] if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| safe_query = safe_query.replace(' ', '_') | |
| filename = f"comprehensive_report_{safe_query}_{timestamp}.docx" | |
| filepath = os.path.join("temp", filename) | |
| # Ensure temp directory exists | |
| os.makedirs("temp", exist_ok=True) | |
| # Save the document | |
| doc.save(filepath) | |
| print(f"β [REPORT] Comprehensive DOC report generated: {filepath}") | |
| return filepath, None | |
| except Exception as e: | |
| error_msg = f"Error generating DOC report: {str(e)}" | |
| print(f"β [REPORT] {error_msg}") | |
| return None, error_msg | |
| def _setup_document_styles(self, doc): | |
| """Set up professional document styles""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Title style | |
| title_style = doc.styles.add_style('CustomTitle', WD_STYLE_TYPE.PARAGRAPH) | |
| title_font = title_style.font | |
| title_font.name = 'Calibri' | |
| title_font.size = Pt(24) | |
| title_font.bold = True | |
| title_font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Heading 1 style | |
| h1_style = doc.styles.add_style('CustomHeading1', WD_STYLE_TYPE.PARAGRAPH) | |
| h1_font = h1_style.font | |
| h1_font.name = 'Calibri' | |
| h1_font.size = Pt(16) | |
| h1_font.bold = True | |
| h1_font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Heading 2 style | |
| h2_style = doc.styles.add_style('CustomHeading2', WD_STYLE_TYPE.PARAGRAPH) | |
| h2_font = h2_style.font | |
| h2_font.name = 'Calibri' | |
| h2_font.size = Pt(14) | |
| h2_font.bold = True | |
| h2_font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Body text style | |
| body_style = doc.styles.add_style('CustomBody', WD_STYLE_TYPE.PARAGRAPH) | |
| body_font = body_style.font | |
| body_font.name = 'Calibri' | |
| body_font.size = Pt(11) | |
| except Exception as e: | |
| print(f"Warning: Could not set up custom styles: {e}") | |
| def _add_title_page(self, doc, query, user_info): | |
| """Add professional title page for security analysis report""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Title | |
| title = doc.add_paragraph() | |
| title.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| title_run = title.add_run("SECURITY THREAT ANALYSIS REPORT") | |
| title_run.font.name = 'Calibri' | |
| title_run.font.size = Pt(24) | |
| title_run.font.bold = True | |
| title_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Subtitle | |
| subtitle = doc.add_paragraph() | |
| subtitle.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| subtitle_run = subtitle.add_run(f"Threat Intelligence Query: {query}") | |
| subtitle_run.font.name = 'Calibri' | |
| subtitle_run.font.size = Pt(14) | |
| subtitle_run.font.italic = True | |
| # Add spacing | |
| doc.add_paragraph() | |
| doc.add_paragraph() | |
| # Report classification | |
| classification = doc.add_paragraph() | |
| classification.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| classification_run = classification.add_run("SECURITY ANALYSIS & THREAT INTELLIGENCE") | |
| classification_run.font.name = 'Calibri' | |
| classification_run.font.size = Pt(12) | |
| classification_run.font.bold = True | |
| classification_run.font.color.rgb = RGBColor(220, 53, 69) # #dc3545 | |
| # Report details | |
| details = doc.add_paragraph() | |
| details.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| details_run = details.add_run(f"Generated on: {datetime.now().strftime('%B %d, %Y at %I:%M %p')}") | |
| details_run.font.name = 'Calibri' | |
| details_run.font.size = Pt(11) | |
| if user_info: | |
| user_details = doc.add_paragraph() | |
| user_details.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| user_run = user_details.add_run(f"Generated by: {user_info['username']} ({user_info['team']})") | |
| user_run.font.name = 'Calibri' | |
| user_run.font.size = Pt(11) | |
| # Add page break | |
| doc.add_page_break() | |
| except Exception as e: | |
| print(f"Warning: Could not add title page: {e}") | |
| def _add_executive_summary(self, doc, query, rag_response): | |
| """Add executive summary section aligned with security analysis framework""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Section heading | |
| heading = doc.add_paragraph() | |
| heading_run = heading.add_run("EXECUTIVE SUMMARY") | |
| heading_run.font.name = 'Calibri' | |
| heading_run.font.size = Pt(16) | |
| heading_run.font.bold = True | |
| heading_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Report purpose | |
| purpose = doc.add_paragraph() | |
| purpose_run = purpose.add_run("This security analysis report provides comprehensive threat assessment and operational insights based on the query: ") | |
| purpose_run.font.name = 'Calibri' | |
| purpose_run.font.size = Pt(11) | |
| # Query in bold | |
| query_text = doc.add_paragraph() | |
| query_run = query_text.add_run(f'"{query}"') | |
| query_run.font.name = 'Calibri' | |
| query_run.font.size = Pt(11) | |
| query_run.font.bold = True | |
| # Analysis framework overview | |
| framework_heading = doc.add_paragraph() | |
| framework_run = framework_heading.add_run("Analysis Framework:") | |
| framework_run.font.name = 'Calibri' | |
| framework_run.font.size = Pt(12) | |
| framework_run.font.bold = True | |
| # Framework components | |
| framework_components = [ | |
| "β’ Fact-Finding & Contextualization: Background information and context development", | |
| "β’ Case Study Identification: Incident prevalence and TTP extraction", | |
| "β’ Analytical Assessment: Intent, motivation, and threat landscape evaluation", | |
| "β’ Operational Relevance: Ground-level actionable insights and recommendations" | |
| ] | |
| for component in framework_components: | |
| comp_para = doc.add_paragraph() | |
| comp_run = comp_para.add_run(component) | |
| comp_run.font.name = 'Calibri' | |
| comp_run.font.size = Pt(11) | |
| # Key findings | |
| findings_heading = doc.add_paragraph() | |
| findings_run = findings_heading.add_run("Key Findings:") | |
| findings_run.font.name = 'Calibri' | |
| findings_run.font.size = Pt(12) | |
| findings_run.font.bold = True | |
| # Extract key points from RAG response | |
| key_points = self._extract_key_points(rag_response) | |
| for point in key_points[:5]: # Top 5 key points | |
| point_para = doc.add_paragraph() | |
| point_run = point_para.add_run(f"β’ {point}") | |
| point_run.font.name = 'Calibri' | |
| point_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| except Exception as e: | |
| print(f"Warning: Could not add executive summary: {e}") | |
| def _add_detailed_analysis(self, doc, rag_response, cited_pages, page_scores): | |
| """Add detailed analysis section aligned with security analysis framework""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Section heading | |
| heading = doc.add_paragraph() | |
| heading_run = heading.add_run("DETAILED ANALYSIS") | |
| heading_run.font.name = 'Calibri' | |
| heading_run.font.size = Pt(16) | |
| heading_run.font.bold = True | |
| heading_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # 1. Fact-Finding & Contextualization | |
| fact_finding_heading = doc.add_paragraph() | |
| fact_finding_run = fact_finding_heading.add_run("1. FACT-FINDING & CONTEXTUALIZATION") | |
| fact_finding_run.font.name = 'Calibri' | |
| fact_finding_run.font.size = Pt(14) | |
| fact_finding_run.font.bold = True | |
| fact_finding_run.font.color.rgb = RGBColor(40, 167, 69) # #28a745 | |
| fact_finding_para = doc.add_paragraph() | |
| fact_finding_para_run = fact_finding_para.add_run("This section provides background information for readers to understand the origin, development, and context of the subject topic.") | |
| fact_finding_para_run.font.name = 'Calibri' | |
| fact_finding_para_run.font.size = Pt(11) | |
| # Extract contextual information | |
| context_info = self._extract_contextual_info(rag_response) | |
| for info in context_info: | |
| info_para = doc.add_paragraph() | |
| info_run = info_para.add_run(f"β’ {info}") | |
| info_run.font.name = 'Calibri' | |
| info_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| # 2. Case Study Identification | |
| case_study_heading = doc.add_paragraph() | |
| case_study_run = case_study_heading.add_run("2. CASE STUDY IDENTIFICATION") | |
| case_study_run.font.name = 'Calibri' | |
| case_study_run.font.size = Pt(14) | |
| case_study_run.font.bold = True | |
| case_study_run.font.color.rgb = RGBColor(255, 193, 7) # #ffc107 | |
| case_study_para = doc.add_paragraph() | |
| case_study_para_run = case_study_para.add_run("This section provides context and prevalence assessment, highlighting past incidents to establish patterns and extract relevant TTPs for analysis.") | |
| case_study_para_run.font.name = 'Calibri' | |
| case_study_para_run.font.size = Pt(11) | |
| # Extract case study information | |
| case_studies = self._extract_case_studies(rag_response) | |
| for case in case_studies: | |
| case_para = doc.add_paragraph() | |
| case_run = case_para.add_run(f"β’ {case}") | |
| case_run.font.name = 'Calibri' | |
| case_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| # 3. Analytical Assessment | |
| analytical_heading = doc.add_paragraph() | |
| analytical_run = analytical_heading.add_run("3. ANALYTICAL ASSESSMENT") | |
| analytical_run.font.name = 'Calibri' | |
| analytical_run.font.size = Pt(14) | |
| analytical_run.font.bold = True | |
| analytical_run.font.color.rgb = RGBColor(220, 53, 69) # #dc3545 | |
| analytical_para = doc.add_paragraph() | |
| analytical_para_run = analytical_para.add_run("This section evaluates gathered information to assess intent, motivation, TTPs, emerging trends, and relevance to threat landscapes.") | |
| analytical_para_run.font.name = 'Calibri' | |
| analytical_para_run.font.size = Pt(11) | |
| # Extract analytical insights | |
| analytical_insights = self._extract_analytical_insights(rag_response) | |
| for insight in analytical_insights: | |
| insight_para = doc.add_paragraph() | |
| insight_run = insight_para.add_run(f"β’ {insight}") | |
| insight_run.font.name = 'Calibri' | |
| insight_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| # 4. Operational Relevance | |
| operational_heading = doc.add_paragraph() | |
| operational_run = operational_heading.add_run("4. OPERATIONAL RELEVANCE") | |
| operational_run.font.name = 'Calibri' | |
| operational_run.font.size = Pt(14) | |
| operational_run.font.bold = True | |
| operational_run.font.color.rgb = RGBColor(111, 66, 193) # #6f42c1 | |
| operational_para = doc.add_paragraph() | |
| operational_para_run = operational_para.add_run("This section translates research insights into actionable knowledge for ground-level personnel, highlighting operational risks and procedural recommendations.") | |
| operational_para_run.font.name = 'Calibri' | |
| operational_para_run.font.size = Pt(11) | |
| # Extract operational insights | |
| operational_insights = self._extract_operational_insights(rag_response) | |
| for insight in operational_insights: | |
| insight_para = doc.add_paragraph() | |
| insight_run = insight_para.add_run(f"β’ {insight}") | |
| insight_run.font.name = 'Calibri' | |
| insight_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| # Main RAG response as comprehensive analysis | |
| main_analysis_heading = doc.add_paragraph() | |
| main_analysis_run = main_analysis_heading.add_run("COMPREHENSIVE ANALYSIS") | |
| main_analysis_run.font.name = 'Calibri' | |
| main_analysis_run.font.size = Pt(12) | |
| main_analysis_run.font.bold = True | |
| response_para = doc.add_paragraph() | |
| response_run = response_para.add_run(rag_response) | |
| response_run.font.name = 'Calibri' | |
| response_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| except Exception as e: | |
| print(f"Warning: Could not add detailed analysis: {e}") | |
| def _add_methodology_section(self, doc, cited_pages, page_scores): | |
| """Add methodology section aligned with security analysis framework""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Section heading | |
| heading = doc.add_paragraph() | |
| heading_run = heading.add_run("METHODOLOGY") | |
| heading_run.font.name = 'Calibri' | |
| heading_run.font.size = Pt(16) | |
| heading_run.font.bold = True | |
| heading_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Methodology content | |
| method_para = doc.add_paragraph() | |
| method_run = method_para.add_run("This security analysis was conducted using advanced AI-powered threat intelligence and document analysis techniques:") | |
| method_run.font.name = 'Calibri' | |
| method_run.font.size = Pt(11) | |
| # Analysis Framework | |
| framework_heading = doc.add_paragraph() | |
| framework_run = framework_heading.add_run("Security Analysis Framework:") | |
| framework_run.font.name = 'Calibri' | |
| framework_run.font.size = Pt(12) | |
| framework_run.font.bold = True | |
| framework_components = [ | |
| "β’ Fact-Finding & Contextualization: Background research and context development", | |
| "β’ Case Study Identification: Incident analysis and TTP extraction", | |
| "β’ Analytical Assessment: Threat landscape evaluation and risk assessment", | |
| "β’ Operational Relevance: Ground-level actionable intelligence generation" | |
| ] | |
| for component in framework_components: | |
| comp_para = doc.add_paragraph() | |
| comp_run = comp_para.add_run(component) | |
| comp_run.font.name = 'Calibri' | |
| comp_run.font.size = Pt(11) | |
| # Document sources | |
| sources_heading = doc.add_paragraph() | |
| sources_run = sources_heading.add_run("Intelligence Sources:") | |
| sources_run.font.name = 'Calibri' | |
| sources_run.font.size = Pt(12) | |
| sources_run.font.bold = True | |
| # List sources | |
| for i, citation in enumerate(cited_pages): | |
| source_para = doc.add_paragraph() | |
| source_run = source_para.add_run(f"{i+1}. {citation}") | |
| source_run.font.name = 'Calibri' | |
| source_run.font.size = Pt(11) | |
| # Analysis approach | |
| approach_heading = doc.add_paragraph() | |
| approach_run = approach_heading.add_run("Technical Analysis Approach:") | |
| approach_run.font.name = 'Calibri' | |
| approach_run.font.size = Pt(12) | |
| approach_run.font.bold = True | |
| approach_para = doc.add_paragraph() | |
| approach_run = approach_para.add_run("β’ Multi-modal document analysis using AI vision models for threat pattern recognition") | |
| approach_run.font.name = 'Calibri' | |
| approach_run.font.size = Pt(11) | |
| approach2_para = doc.add_paragraph() | |
| approach2_run = approach2_para.add_run("β’ Intelligent content retrieval and relevance scoring for threat intelligence prioritization") | |
| approach2_run.font.name = 'Calibri' | |
| approach2_run.font.size = Pt(11) | |
| approach3_para = doc.add_paragraph() | |
| approach3_run = approach3_para.add_run("β’ Comprehensive threat synthesis and actionable intelligence generation") | |
| approach3_run.font.name = 'Calibri' | |
| approach3_run.font.size = Pt(11) | |
| approach4_para = doc.add_paragraph() | |
| approach4_run = approach4_para.add_run("β’ Evidence-based risk assessment and operational recommendation development") | |
| approach4_run.font.name = 'Calibri' | |
| approach4_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| except Exception as e: | |
| print(f"Warning: Could not add methodology section: {e}") | |
| def _add_findings_conclusions(self, doc, rag_response, cited_pages): | |
| """Add findings and conclusions section aligned with security analysis framework""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Section heading | |
| heading = doc.add_paragraph() | |
| heading_run = heading.add_run("FINDINGS AND CONCLUSIONS") | |
| heading_run.font.name = 'Calibri' | |
| heading_run.font.size = Pt(16) | |
| heading_run.font.bold = True | |
| heading_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Threat Assessment Summary | |
| threat_heading = doc.add_paragraph() | |
| threat_run = threat_heading.add_run("Threat Assessment Summary:") | |
| threat_run.font.name = 'Calibri' | |
| threat_run.font.size = Pt(12) | |
| threat_run.font.bold = True | |
| # Extract threat-related findings | |
| threat_findings = self._extract_threat_findings(rag_response) | |
| for finding in threat_findings: | |
| finding_para = doc.add_paragraph() | |
| finding_run = finding_para.add_run(f"β’ {finding}") | |
| finding_run.font.name = 'Calibri' | |
| finding_run.font.size = Pt(11) | |
| # TTP Analysis | |
| ttp_heading = doc.add_paragraph() | |
| ttp_run = ttp_heading.add_run("Tactics, Techniques, and Procedures (TTPs):") | |
| ttp_run.font.name = 'Calibri' | |
| ttp_run.font.size = Pt(12) | |
| ttp_run.font.bold = True | |
| # Extract TTP information | |
| ttps = self._extract_ttps(rag_response) | |
| for ttp in ttps: | |
| ttp_para = doc.add_paragraph() | |
| ttp_run = ttp_para.add_run(f"β’ {ttp}") | |
| ttp_run.font.name = 'Calibri' | |
| ttp_run.font.size = Pt(11) | |
| # Operational Recommendations | |
| recommendations_heading = doc.add_paragraph() | |
| recommendations_run = recommendations_heading.add_run("Operational Recommendations:") | |
| recommendations_run.font.name = 'Calibri' | |
| recommendations_run.font.size = Pt(12) | |
| recommendations_run.font.bold = True | |
| # Extract operational recommendations | |
| recommendations = self._extract_operational_recommendations(rag_response) | |
| for rec in recommendations: | |
| rec_para = doc.add_paragraph() | |
| rec_run = rec_para.add_run(f"β’ {rec}") | |
| rec_run.font.name = 'Calibri' | |
| rec_run.font.size = Pt(11) | |
| # Risk Assessment | |
| risk_heading = doc.add_paragraph() | |
| risk_run = risk_heading.add_run("Risk Assessment:") | |
| risk_run.font.name = 'Calibri' | |
| risk_run.font.size = Pt(12) | |
| risk_run.font.bold = True | |
| # Extract risk information | |
| risks = self._extract_risk_assessment(rag_response) | |
| for risk in risks: | |
| risk_para = doc.add_paragraph() | |
| risk_run = risk_para.add_run(f"β’ {risk}") | |
| risk_run.font.name = 'Calibri' | |
| risk_run.font.size = Pt(11) | |
| # Conclusions | |
| conclusions_heading = doc.add_paragraph() | |
| conclusions_run = conclusions_heading.add_run("Conclusions:") | |
| conclusions_run.font.name = 'Calibri' | |
| conclusions_run.font.size = Pt(12) | |
| conclusions_run.font.bold = True | |
| conclusions_para = doc.add_paragraph() | |
| conclusions_run = conclusions_para.add_run("This security analysis provides actionable intelligence for threat mitigation and operational preparedness. The findings support evidence-based decision making for security operations and risk management.") | |
| conclusions_run.font.name = 'Calibri' | |
| conclusions_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| except Exception as e: | |
| print(f"Warning: Could not add findings and conclusions: {e}") | |
| def _add_appendices(self, doc, cited_pages, page_scores): | |
| """Add appendices section""" | |
| try: | |
| # Import RGBColor for proper color handling | |
| from docx.shared import RGBColor | |
| # Section heading | |
| heading = doc.add_paragraph() | |
| heading_run = heading.add_run("APPENDICES") | |
| heading_run.font.name = 'Calibri' | |
| heading_run.font.size = Pt(16) | |
| heading_run.font.bold = True | |
| heading_run.font.color.rgb = RGBColor(47, 84, 150) # #2F5496 | |
| # Appendix A: Document Sources | |
| appendix_a = doc.add_paragraph() | |
| appendix_a_run = appendix_a.add_run("Appendix A: Document Sources and Relevance Scores") | |
| appendix_a_run.font.name = 'Calibri' | |
| appendix_a_run.font.size = Pt(12) | |
| appendix_a_run.font.bold = True | |
| for i, (citation, score) in enumerate(zip(cited_pages, page_scores)): | |
| source_para = doc.add_paragraph() | |
| source_run = source_para.add_run(f"{i+1}. {citation} (Relevance Score: {score:.3f})") | |
| source_run.font.name = 'Calibri' | |
| source_run.font.size = Pt(11) | |
| doc.add_paragraph() | |
| except Exception as e: | |
| print(f"Warning: Could not add appendices: {e}") | |
| def _extract_key_points(self, rag_response): | |
| """Extract key points from RAG response""" | |
| try: | |
| # Split response into sentences | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| key_points = [] | |
| # Look for sentences with key indicators | |
| key_indicators = ['important', 'key', 'critical', 'essential', 'significant', 'major', 'primary', 'main'] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 20 and any(indicator in sentence.lower() for indicator in key_indicators): | |
| key_points.append(sentence) | |
| # If not enough key points found, use first few sentences | |
| if len(key_points) < 3: | |
| key_points = [s.strip() for s in sentences[:5] if len(s.strip()) > 20] | |
| return key_points[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract key points: {e}") | |
| return ["Analysis completed successfully", "Comprehensive review performed", "Key insights identified"] | |
| def _extract_contextual_info(self, rag_response): | |
| """Extract contextual information for fact-finding section""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| contextual_info = [] | |
| # Look for contextual indicators | |
| context_indicators = [ | |
| 'background', 'history', 'origin', 'development', 'context', 'definition', | |
| 'introduction', 'overview', 'description', 'characteristics', 'features', | |
| 'components', 'types', 'categories', 'classification', 'structure' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in context_indicators): | |
| contextual_info.append(sentence) | |
| # If not enough contextual info, use general descriptive sentences | |
| if len(contextual_info) < 3: | |
| contextual_info = [s.strip() for s in sentences[:3] if len(s.strip()) > 15] | |
| return contextual_info[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract contextual info: {e}") | |
| return ["Background information extracted from analysis", "Contextual details identified", "Historical context established"] | |
| def _extract_case_studies(self, rag_response): | |
| """Extract case study information for incident identification""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| case_studies = [] | |
| # Look for case study indicators | |
| case_indicators = [ | |
| 'incident', 'case', 'example', 'instance', 'occurrence', 'event', | |
| 'attack', 'threat', 'vulnerability', 'exploit', 'breach', 'compromise', | |
| 'pattern', 'trend', 'frequency', 'prevalence', 'statistics', 'data' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in case_indicators): | |
| case_studies.append(sentence) | |
| # If not enough case studies, use sentences with numbers or dates | |
| if len(case_studies) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and (re.search(r'\d+', sentence) or any(word in sentence.lower() for word in ['first', 'second', 'third', 'recent', 'previous'])): | |
| case_studies.append(sentence) | |
| return case_studies[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract case studies: {e}") | |
| return ["Incident patterns identified", "Case study information extracted", "Prevalence data analyzed"] | |
| def _extract_analytical_insights(self, rag_response): | |
| """Extract analytical insights for threat assessment""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| analytical_insights = [] | |
| # Look for analytical indicators | |
| analytical_indicators = [ | |
| 'intent', 'motivation', 'purpose', 'objective', 'goal', 'target', | |
| 'technique', 'procedure', 'method', 'approach', 'strategy', 'tactic', | |
| 'trend', 'emerging', 'evolution', 'development', 'change', 'shift', | |
| 'threat', 'risk', 'vulnerability', 'impact', 'consequence', 'effect' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in analytical_indicators): | |
| analytical_insights.append(sentence) | |
| # If not enough insights, use sentences with analytical language | |
| if len(analytical_insights) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['because', 'therefore', 'however', 'although', 'while', 'despite']): | |
| analytical_insights.append(sentence) | |
| return analytical_insights[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract analytical insights: {e}") | |
| return ["Analytical assessment completed", "Threat landscape evaluated", "Risk factors identified"] | |
| def _extract_operational_insights(self, rag_response): | |
| """Extract operational insights for ground-level recommendations""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| operational_insights = [] | |
| # Look for operational indicators | |
| operational_indicators = [ | |
| 'recommendation', 'action', 'procedure', 'protocol', 'guideline', | |
| 'training', 'awareness', 'vigilance', 'monitoring', 'detection', | |
| 'prevention', 'mitigation', 'response', 'recovery', 'preparation', | |
| 'equipment', 'tool', 'technology', 'system', 'process', 'workflow' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in operational_indicators): | |
| operational_insights.append(sentence) | |
| # If not enough operational insights, use sentences with actionable language | |
| if len(operational_insights) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['should', 'must', 'need', 'require', 'implement', 'establish', 'develop']): | |
| operational_insights.append(sentence) | |
| return operational_insights[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract operational insights: {e}") | |
| return ["Operational recommendations identified", "Ground-level procedures suggested", "Training requirements outlined"] | |
| def _extract_findings(self, rag_response): | |
| """Extract findings from RAG response""" | |
| try: | |
| # Split response into sentences | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| findings = [] | |
| # Look for sentences that might be findings | |
| finding_indicators = ['found', 'discovered', 'identified', 'revealed', 'shows', 'indicates', 'demonstrates', 'suggests'] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in finding_indicators): | |
| findings.append(sentence) | |
| # If not enough findings, use meaningful sentences | |
| if len(findings) < 3: | |
| findings = [s.strip() for s in sentences[:5] if len(s.strip()) > 15] | |
| return findings[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract findings: {e}") | |
| return ["Analysis completed successfully", "Comprehensive review performed", "Key insights identified"] | |
| def _extract_threat_findings(self, rag_response): | |
| """Extract threat-related findings for security analysis""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| threat_findings = [] | |
| # Look for threat-related indicators | |
| threat_indicators = [ | |
| 'threat', 'attack', 'vulnerability', 'exploit', 'breach', 'compromise', | |
| 'malware', 'phishing', 'social engineering', 'ransomware', 'ddos', | |
| 'intrusion', 'infiltration', 'espionage', 'sabotage', 'terrorism' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in threat_indicators): | |
| threat_findings.append(sentence) | |
| # If not enough threat findings, use general security-related sentences | |
| if len(threat_findings) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['security', 'risk', 'danger', 'hazard', 'warning']): | |
| threat_findings.append(sentence) | |
| return threat_findings[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract threat findings: {e}") | |
| return ["Threat assessment completed", "Security vulnerabilities identified", "Risk factors analyzed"] | |
| def _extract_ttps(self, rag_response): | |
| """Extract Tactics, Techniques, and Procedures (TTPs)""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| ttps = [] | |
| # Look for TTP indicators | |
| ttp_indicators = [ | |
| 'technique', 'procedure', 'method', 'approach', 'strategy', 'tactic', | |
| 'process', 'workflow', 'protocol', 'standard', 'practice', 'modus operandi', | |
| 'attack vector', 'exploitation', 'infiltration', 'persistence', 'exfiltration' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in ttp_indicators): | |
| ttps.append(sentence) | |
| # If not enough TTPs, use sentences with procedural language | |
| if len(ttps) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['step', 'phase', 'stage', 'sequence', 'order']): | |
| ttps.append(sentence) | |
| return ttps[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract TTPs: {e}") | |
| return ["TTP analysis completed", "Attack methods identified", "Procedural patterns extracted"] | |
| def _extract_operational_recommendations(self, rag_response): | |
| """Extract operational recommendations for ground-level personnel""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| recommendations = [] | |
| # Look for recommendation indicators | |
| recommendation_indicators = [ | |
| 'recommend', 'suggest', 'advise', 'propose', 'should', 'must', 'need', | |
| 'implement', 'establish', 'develop', 'create', 'adopt', 'apply', | |
| 'training', 'awareness', 'education', 'preparation', 'readiness' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in recommendation_indicators): | |
| recommendations.append(sentence) | |
| # If not enough recommendations, use sentences with actionable language | |
| if len(recommendations) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['action', 'measure', 'step', 'procedure', 'protocol']): | |
| recommendations.append(sentence) | |
| return recommendations[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract operational recommendations: {e}") | |
| return ["Operational procedures recommended", "Training requirements identified", "Security measures suggested"] | |
| def _extract_risk_assessment(self, rag_response): | |
| """Extract risk assessment information""" | |
| try: | |
| sentences = re.split(r'[.!?]+', rag_response) | |
| risks = [] | |
| # Look for risk indicators | |
| risk_indicators = [ | |
| 'risk', 'danger', 'hazard', 'threat', 'vulnerability', 'exposure', | |
| 'probability', 'likelihood', 'impact', 'consequence', 'severity', | |
| 'critical', 'high', 'medium', 'low', 'minimal', 'significant' | |
| ] | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(indicator in sentence.lower() for indicator in risk_indicators): | |
| risks.append(sentence) | |
| # If not enough risks, use sentences with risk-related language | |
| if len(risks) < 3: | |
| for sentence in sentences: | |
| sentence = sentence.strip() | |
| if len(sentence) > 15 and any(word in sentence.lower() for word in ['potential', 'possible', 'likely', 'unlikely', 'certain']): | |
| risks.append(sentence) | |
| return risks[:5] # Return top 5 | |
| except Exception as e: | |
| print(f"Warning: Could not extract risk assessment: {e}") | |
| return ["Risk assessment completed", "Vulnerability analysis performed", "Threat evaluation conducted"] | |
| def _generate_enhanced_excel_export(self, query, rag_response, cited_pages, page_scores, custom_headers=None): | |
| """ | |
| Generate enhanced Excel export with proper formatting for charts and graphs | |
| """ | |
| if not EXCEL_AVAILABLE: | |
| return None, "Excel export not available - openpyxl/pandas libraries not installed" | |
| try: | |
| print("π [EXCEL] Generating enhanced Excel export...") | |
| # Extract custom headers from query if not provided | |
| if custom_headers is None: | |
| custom_headers = self._extract_custom_headers(query) | |
| # Create a new workbook | |
| wb = Workbook() | |
| # Remove default sheet | |
| wb.remove(wb.active) | |
| # Create main data sheet | |
| data_sheet = wb.create_sheet("Data") | |
| # Create summary sheet | |
| summary_sheet = wb.create_sheet("Summary") | |
| # Create charts sheet | |
| charts_sheet = wb.create_sheet("Charts") | |
| # Extract structured data | |
| structured_data = self._extract_structured_data_for_excel(rag_response, cited_pages, page_scores, custom_headers) | |
| # Populate data sheet | |
| self._populate_data_sheet(data_sheet, structured_data, query) | |
| # Populate summary sheet | |
| self._populate_summary_sheet(summary_sheet, query, cited_pages, page_scores) | |
| # Create charts if chart request detected | |
| if self._detect_chart_request(query): | |
| self._create_excel_charts(charts_sheet, structured_data, query, custom_headers) | |
| # Generate unique filename | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| safe_query = "".join(c for c in query[:30] if c.isalnum() or c in (' ', '-', '_')).rstrip() | |
| safe_query = safe_query.replace(' ', '_') | |
| filename = f"enhanced_export_{safe_query}_{timestamp}.xlsx" | |
| filepath = os.path.join("temp", filename) | |
| # Ensure temp directory exists | |
| os.makedirs("temp", exist_ok=True) | |
| # Save the workbook | |
| wb.save(filepath) | |
| print(f"β [EXCEL] Enhanced Excel export generated: {filepath}") | |
| return filepath, None | |
| except Exception as e: | |
| error_msg = f"Error generating Excel export: {str(e)}" | |
| print(f"β [EXCEL] {error_msg}") | |
| return None, error_msg | |
| def _extract_structured_data_for_excel(self, rag_response, cited_pages, page_scores, custom_headers=None): | |
| """Extract structured data specifically for Excel export""" | |
| try: | |
| # If custom headers provided, use them | |
| if custom_headers: | |
| headers = custom_headers | |
| print(f"π [EXCEL] Using custom headers: {headers}") | |
| else: | |
| # Auto-detect headers based on content | |
| headers = self._auto_detect_excel_headers(rag_response, cited_pages) | |
| print(f"π [EXCEL] Auto-detected headers: {headers}") | |
| # Extract data rows | |
| data_rows = [] | |
| # If custom headers are provided, try to map data to them | |
| if custom_headers: | |
| mapped_data = self._map_data_to_custom_headers(rag_response, cited_pages, page_scores, custom_headers) | |
| if mapped_data: | |
| data_rows.extend(mapped_data) | |
| # If no custom data or mapping failed, extract standard data | |
| if not data_rows: | |
| # Extract numerical data if present | |
| numerical_data = self._extract_numerical_data(rag_response) | |
| if numerical_data: | |
| data_rows.extend(numerical_data) | |
| # Extract categorical data | |
| categorical_data = self._extract_categorical_data(rag_response, cited_pages) | |
| if categorical_data: | |
| data_rows.extend(categorical_data) | |
| # Extract source information | |
| source_data = self._extract_source_data(cited_pages, page_scores) | |
| if source_data: | |
| data_rows.extend(source_data) | |
| # If still no structured data found, create summary data | |
| if not data_rows: | |
| data_rows = self._create_summary_data(rag_response, cited_pages, page_scores) | |
| return { | |
| 'headers': headers, | |
| 'data': data_rows | |
| } | |
| except Exception as e: | |
| print(f"Error extracting structured data for Excel: {e}") | |
| return { | |
| 'headers': ['Category', 'Value', 'Description'], | |
| 'data': [['Analysis', 'Completed', 'Data extracted successfully']] | |
| } | |
| def _auto_detect_excel_headers(self, rag_response, cited_pages): | |
| """Auto-detect contextually appropriate headers for Excel export based on query content""" | |
| try: | |
| headers = [] | |
| # Analyze the content for context clues | |
| rag_lower = rag_response.lower() | |
| # Security/Analysis context detection | |
| if any(word in rag_lower for word in ['threat', 'attack', 'vulnerability', 'security', 'risk']): | |
| if 'threat' in rag_lower or 'attack' in rag_lower: | |
| headers.append('Threat Type') | |
| if 'frequency' in rag_lower or 'count' in rag_lower or 'percentage' in rag_lower: | |
| headers.append('Frequency') | |
| if 'risk' in rag_lower or 'severity' in rag_lower: | |
| headers.append('Risk Level') | |
| if 'impact' in rag_lower or 'damage' in rag_lower: | |
| headers.append('Impact') | |
| if 'mitigation' in rag_lower or 'solution' in rag_lower: | |
| headers.append('Mitigation') | |
| # Business/Performance context detection | |
| elif any(word in rag_lower for word in ['sales', 'revenue', 'performance', 'growth', 'profit']): | |
| if 'month' in rag_lower or 'quarter' in rag_lower or 'year' in rag_lower: | |
| headers.append('Time Period') | |
| if 'sales' in rag_lower or 'revenue' in rag_lower: | |
| headers.append('Sales/Revenue') | |
| if 'growth' in rag_lower or 'increase' in rag_lower: | |
| headers.append('Growth Rate') | |
| if 'region' in rag_lower or 'location' in rag_lower: | |
| headers.append('Region') | |
| # Technical/System context detection | |
| elif any(word in rag_lower for word in ['system', 'component', 'device', 'technology', 'software']): | |
| if 'component' in rag_lower or 'device' in rag_lower: | |
| headers.append('Component') | |
| if 'status' in rag_lower or 'condition' in rag_lower: | |
| headers.append('Status') | |
| if 'priority' in rag_lower or 'importance' in rag_lower: | |
| headers.append('Priority') | |
| if 'version' in rag_lower or 'release' in rag_lower: | |
| headers.append('Version') | |
| # Data/Statistics context detection | |
| elif any(word in rag_lower for word in ['data', 'statistics', 'analysis', 'report', 'survey']): | |
| if 'category' in rag_lower or 'type' in rag_lower: | |
| headers.append('Category') | |
| if 'value' in rag_lower or 'number' in rag_lower or 'count' in rag_lower: | |
| headers.append('Value') | |
| if 'percentage' in rag_lower or 'rate' in rag_lower: | |
| headers.append('Percentage') | |
| if 'trend' in rag_lower or 'change' in rag_lower: | |
| headers.append('Trend') | |
| # Generic fallback detection | |
| else: | |
| # Check for numerical data | |
| if re.search(r'\d+', rag_response): | |
| headers.append('Value') | |
| # Check for categories or types | |
| if any(word in rag_lower for word in ['type', 'category', 'class', 'group']): | |
| headers.append('Category') | |
| # Check for descriptions | |
| if len(rag_response) > 100: | |
| headers.append('Description') | |
| # Check for sources | |
| if cited_pages: | |
| headers.append('Source') | |
| # Check for scores or ratings | |
| if any(word in rag_lower for word in ['score', 'rating', 'level', 'grade']): | |
| headers.append('Score') | |
| # Ensure we have at least 2-3 headers for chart generation | |
| if len(headers) < 2: | |
| if 'Category' not in headers: | |
| headers.append('Category') | |
| if 'Value' not in headers: | |
| headers.append('Value') | |
| if len(headers) < 3: | |
| if 'Description' not in headers: | |
| headers.append('Description') | |
| # Limit to 4 headers maximum for chart clarity | |
| headers = headers[:4] | |
| print(f"π [EXCEL] Auto-detected contextually relevant headers: {headers}") | |
| return headers | |
| except Exception as e: | |
| print(f"Error auto-detecting headers: {e}") | |
| return ['Category', 'Value', 'Description'] | |
| def _extract_numerical_data(self, rag_response): | |
| """Extract numerical data from RAG response""" | |
| try: | |
| data_rows = [] | |
| # Find numbers with context | |
| number_patterns = [ | |
| r'(\d+(?:\.\d+)?)\s*(percent|%|units|items|components|devices|procedures)', | |
| r'(\d+(?:\.\d+)?)\s*(voltage|current|resistance|power|frequency)', | |
| r'(\d+(?:\.\d+)?)\s*(safety|risk|danger|warning)', | |
| r'(\d+(?:\.\d+)?)\s*(steps|phases|stages|levels)' | |
| ] | |
| for pattern in number_patterns: | |
| matches = re.findall(pattern, rag_response, re.IGNORECASE) | |
| for match in matches: | |
| value, category = match | |
| data_rows.append([category.title(), value, f"Found in analysis"]) | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error extracting numerical data: {e}") | |
| return [] | |
| def _extract_categorical_data(self, rag_response, cited_pages): | |
| """Extract categorical data from RAG response""" | |
| try: | |
| data_rows = [] | |
| # Extract categories mentioned in the response | |
| categories = [] | |
| # Look for common category patterns | |
| category_patterns = [ | |
| r'(safety|security|warning|danger|risk)', | |
| r'(procedure|method|technique|approach)', | |
| r'(component|device|equipment|tool)', | |
| r'(type|category|class|group)', | |
| r'(input|output|control|monitoring)' | |
| ] | |
| for pattern in category_patterns: | |
| matches = re.findall(pattern, rag_response, re.IGNORECASE) | |
| categories.extend(matches) | |
| # Remove duplicates | |
| categories = list(set(categories)) | |
| for category in categories[:10]: # Limit to 10 categories | |
| data_rows.append([category.title(), 'Identified', f"Category found in analysis"]) | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error extracting categorical data: {e}") | |
| return [] | |
| def _extract_source_data(self, cited_pages, page_scores): | |
| """Extract source information for Excel""" | |
| try: | |
| data_rows = [] | |
| for i, (citation, score) in enumerate(zip(cited_pages, page_scores)): | |
| collection = citation.split(' from ')[1] if ' from ' in citation else 'Unknown' | |
| page_num = citation.split('Page ')[1].split(' from')[0] if 'Page ' in citation else str(i+1) | |
| data_rows.append([ | |
| f"Source {i+1}", | |
| collection, | |
| f"Page {page_num} (Score: {score:.3f})" | |
| ]) | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error extracting source data: {e}") | |
| return [] | |
| def _map_data_to_custom_headers(self, rag_response, cited_pages, page_scores, custom_headers): | |
| """Map extracted data to custom headers for Excel export with context-aware sample data""" | |
| try: | |
| data_rows = [] | |
| # Extract various types of data | |
| numerical_data = self._extract_numerical_data(rag_response) | |
| categorical_data = self._extract_categorical_data(rag_response, cited_pages) | |
| source_data = self._extract_source_data(cited_pages, page_scores) | |
| # Combine all available data | |
| all_data = [] | |
| if numerical_data: | |
| all_data.extend(numerical_data) | |
| if categorical_data: | |
| all_data.extend(categorical_data) | |
| if source_data: | |
| all_data.extend(source_data) | |
| # Map data to custom headers | |
| for i, data_row in enumerate(all_data): | |
| mapped_row = [] | |
| # Ensure we have enough data for all headers | |
| while len(mapped_row) < len(custom_headers): | |
| if len(data_row) > len(mapped_row): | |
| mapped_row.append(data_row[len(mapped_row)]) | |
| else: | |
| # Fill with contextually relevant placeholder data | |
| header = custom_headers[len(mapped_row)] | |
| mapped_row.append(self._generate_contextual_sample_data(header, i, rag_response)) | |
| # Truncate if we have too many values | |
| mapped_row = mapped_row[:len(custom_headers)] | |
| data_rows.append(mapped_row) | |
| # If no data was mapped, create contextually relevant sample data | |
| if not data_rows: | |
| data_rows = self._create_contextual_sample_data(custom_headers, rag_response) | |
| print(f"π [EXCEL] Mapped {len(data_rows)} rows to custom headers") | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error mapping data to custom headers: {e}") | |
| return [] | |
| def _generate_contextual_sample_data(self, header, index, rag_response): | |
| """Generate contextually relevant sample data based on header and content""" | |
| try: | |
| header_lower = header.lower() | |
| rag_lower = rag_response.lower() | |
| # Security context | |
| if any(word in rag_lower for word in ['threat', 'attack', 'security', 'vulnerability']): | |
| if 'threat' in header_lower or 'attack' in header_lower: | |
| threats = ['Phishing', 'Malware', 'DDoS', 'Social Engineering', 'Ransomware'] | |
| return threats[index % len(threats)] | |
| elif 'frequency' in header_lower or 'count' in header_lower: | |
| return str((index + 1) * 15) + '%' | |
| elif 'risk' in header_lower or 'severity' in header_lower: | |
| risk_levels = ['Low', 'Medium', 'High', 'Critical'] | |
| return risk_levels[index % len(risk_levels)] | |
| elif 'impact' in header_lower: | |
| impacts = ['Minimal', 'Moderate', 'Significant', 'Severe'] | |
| return impacts[index % len(impacts)] | |
| elif 'mitigation' in header_lower: | |
| mitigations = ['Training', 'Firewall', 'Monitoring', 'Backup'] | |
| return mitigations[index % len(mitigations)] | |
| # Business context | |
| elif any(word in rag_lower for word in ['sales', 'revenue', 'business', 'performance']): | |
| if 'time' in header_lower or 'period' in header_lower: | |
| periods = ['Q1 2024', 'Q2 2024', 'Q3 2024', 'Q4 2024'] | |
| return periods[index % len(periods)] | |
| elif 'sales' in header_lower or 'revenue' in header_lower: | |
| return f"${(index + 1) * 10000:,}" | |
| elif 'growth' in header_lower: | |
| return f"+{(index + 1) * 5}%" | |
| elif 'region' in header_lower: | |
| regions = ['North', 'South', 'East', 'West'] | |
| return regions[index % len(regions)] | |
| # Technical context | |
| elif any(word in rag_lower for word in ['system', 'component', 'device', 'technology']): | |
| if 'component' in header_lower: | |
| components = ['Server', 'Database', 'Network', 'Application'] | |
| return components[index % len(components)] | |
| elif 'status' in header_lower: | |
| statuses = ['Active', 'Inactive', 'Maintenance', 'Error'] | |
| return statuses[index % len(statuses)] | |
| elif 'priority' in header_lower: | |
| priorities = ['Low', 'Medium', 'High', 'Critical'] | |
| return priorities[index % len(priorities)] | |
| elif 'version' in header_lower: | |
| return f"v{index + 1}.{index + 2}" | |
| # Generic fallback | |
| else: | |
| if any(word in header_lower for word in ['name', 'title', 'category', 'type']): | |
| return f"Item {index + 1}" | |
| elif any(word in header_lower for word in ['value', 'score', 'number', 'count']): | |
| return str((index + 1) * 10) | |
| elif any(word in header_lower for word in ['description', 'detail', 'info']): | |
| return f"Sample description for {header}" | |
| else: | |
| return f"Sample {header} {index + 1}" | |
| except Exception as e: | |
| print(f"Error generating contextual sample data: {e}") | |
| return f"Sample {header} {index + 1}" | |
| def _create_contextual_sample_data(self, custom_headers, rag_response): | |
| """Create contextually relevant sample data based on headers and content""" | |
| try: | |
| data_rows = [] | |
| rag_lower = rag_response.lower() | |
| # Determine context and number of sample rows | |
| if any(word in rag_lower for word in ['threat', 'attack', 'security']): | |
| sample_count = 4 # Security threats | |
| elif any(word in rag_lower for word in ['sales', 'revenue', 'business']): | |
| sample_count = 4 # Business data | |
| elif any(word in rag_lower for word in ['system', 'component', 'device']): | |
| sample_count = 4 # Technical data | |
| else: | |
| sample_count = 5 # Generic data | |
| for i in range(sample_count): | |
| sample_row = [] | |
| for header in custom_headers: | |
| sample_row.append(self._generate_contextual_sample_data(header, i, rag_response)) | |
| data_rows.append(sample_row) | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error creating contextual sample data: {e}") | |
| return [] | |
| def _create_summary_data(self, rag_response, cited_pages, page_scores): | |
| """Create summary data when no structured data is found""" | |
| try: | |
| data_rows = [] | |
| # Add analysis summary | |
| data_rows.append(['Analysis Type', 'Comprehensive Review', 'AI-powered document analysis']) | |
| # Add source count | |
| data_rows.append(['Sources Analyzed', str(len(cited_pages)), f"From {len(set([p.split(' from ')[1] for p in cited_pages if ' from ' in p]))} collections"]) | |
| # Add average relevance score | |
| if page_scores: | |
| avg_score = sum(page_scores) / len(page_scores) | |
| data_rows.append(['Average Relevance', f"{avg_score:.3f}", 'Based on AI relevance scoring']) | |
| # Add response length | |
| data_rows.append(['Response Length', f"{len(rag_response)} characters", 'Comprehensive analysis provided']) | |
| return data_rows | |
| except Exception as e: | |
| print(f"Error creating summary data: {e}") | |
| return [['Analysis', 'Completed', 'Data extracted successfully']] | |
| def _populate_data_sheet(self, sheet, structured_data, query): | |
| """Populate the data sheet with structured information""" | |
| try: | |
| # Add title | |
| sheet['A1'] = f"Data Export for Query: {query}" | |
| sheet['A1'].font = Font(bold=True, size=14) | |
| sheet['A1'].fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid") | |
| sheet['A1'].font = Font(color="FFFFFF", bold=True) | |
| # Add headers | |
| headers = structured_data['headers'] | |
| for col, header in enumerate(headers, 1): | |
| cell = sheet.cell(row=3, column=col, value=header) | |
| cell.font = Font(bold=True) | |
| cell.fill = PatternFill(start_color="D9E2F3", end_color="D9E2F3", fill_type="solid") | |
| cell.border = Border( | |
| left=Side(style='thin'), | |
| right=Side(style='thin'), | |
| top=Side(style='thin'), | |
| bottom=Side(style='thin') | |
| ) | |
| # Add data | |
| data = structured_data['data'] | |
| for row_idx, row_data in enumerate(data, 4): | |
| for col_idx, value in enumerate(row_data, 1): | |
| cell = sheet.cell(row=row_idx, column=col_idx, value=value) | |
| cell.border = Border( | |
| left=Side(style='thin'), | |
| right=Side(style='thin'), | |
| top=Side(style='thin'), | |
| bottom=Side(style='thin') | |
| ) | |
| # Auto-adjust column widths | |
| for column in sheet.columns: | |
| max_length = 0 | |
| column_letter = column[0].column_letter | |
| for cell in column: | |
| try: | |
| if len(str(cell.value)) > max_length: | |
| max_length = len(str(cell.value)) | |
| except: | |
| pass | |
| adjusted_width = min(max_length + 2, 50) | |
| sheet.column_dimensions[column_letter].width = adjusted_width | |
| except Exception as e: | |
| print(f"Error populating data sheet: {e}") | |
| def _populate_summary_sheet(self, sheet, query, cited_pages, page_scores): | |
| """Populate the summary sheet with analysis overview""" | |
| try: | |
| # Add title | |
| sheet['A1'] = "Analysis Summary" | |
| sheet['A1'].font = Font(bold=True, size=16) | |
| sheet['A1'].fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid") | |
| sheet['A1'].font = Font(color="FFFFFF", bold=True) | |
| # Add query information | |
| sheet['A3'] = "Query:" | |
| sheet['A3'].font = Font(bold=True) | |
| sheet['B3'] = query | |
| # Add analysis statistics | |
| sheet['A5'] = "Analysis Statistics:" | |
| sheet['A5'].font = Font(bold=True) | |
| sheet['A6'] = "Sources Analyzed:" | |
| sheet['B6'] = len(cited_pages) | |
| sheet['A7'] = "Collections Used:" | |
| collections = set([p.split(' from ')[1] for p in cited_pages if ' from ' in p]) | |
| sheet['B7'] = len(collections) | |
| if page_scores: | |
| sheet['A8'] = "Average Relevance Score:" | |
| avg_score = sum(page_scores) / len(page_scores) | |
| sheet['B8'] = f"{avg_score:.3f}" | |
| sheet['A9'] = "Analysis Date:" | |
| sheet['B9'] = datetime.now().strftime('%B %d, %Y at %I:%M %p') | |
| # Add source details | |
| sheet['A11'] = "Source Details:" | |
| sheet['A11'].font = Font(bold=True) | |
| for i, (citation, score) in enumerate(zip(cited_pages, page_scores)): | |
| row = 12 + i | |
| sheet[f'A{row}'] = f"Source {i+1}:" | |
| sheet[f'B{row}'] = citation | |
| sheet[f'C{row}'] = f"Score: {score:.3f}" | |
| # Auto-adjust column widths | |
| for column in sheet.columns: | |
| max_length = 0 | |
| column_letter = column[0].column_letter | |
| for cell in column: | |
| try: | |
| if len(str(cell.value)) > max_length: | |
| max_length = len(str(cell.value)) | |
| except: | |
| pass | |
| adjusted_width = min(max_length + 2, 50) | |
| sheet.column_dimensions[column_letter].width = adjusted_width | |
| except Exception as e: | |
| print(f"Error populating summary sheet: {e}") | |
| def _create_excel_charts(self, sheet, structured_data, query, custom_headers=None): | |
| """Create Excel charts based on the data with custom headers""" | |
| try: | |
| # Add title | |
| sheet['A1'] = "Data Visualizations" | |
| sheet['A1'].font = Font(bold=True, size=16) | |
| sheet['A1'].fill = PatternFill(start_color="2F5496", end_color="2F5496", fill_type="solid") | |
| sheet['A1'].font = Font(color="FFFFFF", bold=True) | |
| # Determine chart titles and axis labels based on custom headers | |
| if custom_headers and len(custom_headers) >= 2: | |
| # Use custom headers for chart configuration | |
| x_axis_title = custom_headers[0] if len(custom_headers) > 0 else "Categories" | |
| y_axis_title = custom_headers[1] if len(custom_headers) > 1 else "Values" | |
| # Create more descriptive chart title based on context | |
| if len(custom_headers) >= 3: | |
| chart_title = f"Analysis: {x_axis_title} vs {y_axis_title} by {custom_headers[2]}" | |
| else: | |
| chart_title = f"Analysis: {x_axis_title} vs {y_axis_title}" | |
| # Create bar chart with custom headers | |
| if len(structured_data['data']) > 1: | |
| chart = BarChart() | |
| chart.title = chart_title | |
| chart.x_axis.title = x_axis_title | |
| chart.y_axis.title = y_axis_title | |
| # Add chart to sheet | |
| sheet.add_chart(chart, "A3") | |
| # Create pie chart with custom header if we have 3+ columns | |
| if len(structured_data['data']) > 2 and len(custom_headers) >= 3: | |
| pie_chart = PieChart() | |
| pie_chart.title = f"Distribution by {custom_headers[2]}" | |
| # Add pie chart to sheet | |
| sheet.add_chart(pie_chart, "A15") | |
| elif len(structured_data['data']) > 2: | |
| # Fallback pie chart | |
| pie_chart = PieChart() | |
| pie_chart.title = "Data Distribution" | |
| sheet.add_chart(pie_chart, "A15") | |
| else: | |
| # Use default chart configuration | |
| if len(structured_data['data']) > 1: | |
| chart = BarChart() | |
| chart.title = f"Analysis Results for: {query[:30]}..." | |
| chart.x_axis.title = "Categories" | |
| chart.y_axis.title = "Values" | |
| # Add chart to sheet | |
| sheet.add_chart(chart, "A3") | |
| # Create pie chart for source distribution | |
| if len(structured_data['data']) > 2: | |
| pie_chart = PieChart() | |
| pie_chart.title = "Data Distribution" | |
| # Add pie chart to sheet | |
| sheet.add_chart(pie_chart, "A15") | |
| except Exception as e: | |
| print(f"Error creating Excel charts: {e}") | |
| def _prepare_doc_download(self, doc_filepath): | |
| """ | |
| Prepare DOC file for download in Gradio | |
| """ | |
| if doc_filepath and os.path.exists(doc_filepath): | |
| return doc_filepath | |
| else: | |
| return None | |
| def _prepare_excel_download(self, excel_filepath): | |
| """ | |
| Prepare Excel file for download in Gradio | |
| """ | |
| if excel_filepath and os.path.exists(excel_filepath): | |
| return excel_filepath | |
| else: | |
| return None | |
| def _generate_multi_page_response(self, query, img_paths, cited_pages, page_scores): | |
| """ | |
| Enhanced RAG response generation with multi-page citations | |
| Implements comprehensive detail enhancement based on research strategies | |
| """ | |
| try: | |
| # Strategy 1: Increase context by providing more detailed prompt | |
| detailed_prompt = f""" | |
| Please provide a comprehensive and detailed answer to the following query. | |
| Use all available information from the provided document pages to give a thorough response. | |
| Query: {query} | |
| Instructions for detailed response: | |
| 1. Provide extensive background information and context | |
| 2. Include specific details, examples, and data points from the documents | |
| 3. Explain concepts thoroughly with step-by-step breakdowns | |
| 4. Provide comprehensive analysis rather than simple answers when requested | |
| """ | |
| # Generate base response with enhanced prompt | |
| rag_response = rag.get_answer_from_gemini(detailed_prompt, img_paths) | |
| # Strategy 2: Simple citation formatting without relevance scores | |
| citation_text = "π **Sources**:\n\n" | |
| # Group citations by collection for better organization | |
| collection_groups = {} | |
| for i, citation in enumerate(cited_pages): | |
| collection_name = citation.split(" from ")[1].split(" (")[0] | |
| if collection_name not in collection_groups: | |
| collection_groups[collection_name] = [] | |
| collection_groups[collection_name].append(citation) | |
| # Format citations by collection (without relevance scores) | |
| for collection_name, citations in collection_groups.items(): | |
| citation_text += f"π **{collection_name}**:\n" | |
| for citation in citations: | |
| # Remove relevance score from citation | |
| clean_citation = citation.split(" (Relevance:")[0] | |
| citation_text += f" β’ {clean_citation}\n" | |
| citation_text += "\n" | |
| # Strategy 3: Check for different export requests | |
| csv_filepath = None | |
| doc_filepath = None | |
| excel_filepath = None | |
| # Check if user requested table format | |
| if self._detect_table_request(query): | |
| print("π Table request detected - generating CSV response") | |
| enhanced_rag_response, csv_filepath = self._generate_csv_table_response(query, rag_response, cited_pages, page_scores) | |
| else: | |
| enhanced_rag_response = rag_response | |
| # Check if user requested comprehensive report | |
| if self._detect_report_request(query): | |
| print("π Report request detected - generating DOC report") | |
| doc_filepath, doc_error = self._generate_comprehensive_doc_report(query, rag_response, cited_pages, page_scores) | |
| if doc_error: | |
| print(f"β οΈ DOC report generation failed: {doc_error}") | |
| # Check if user requested charts/graphs or enhanced Excel export | |
| if self._detect_chart_request(query) or self._detect_table_request(query): | |
| print("π Chart/Excel request detected - generating enhanced Excel export") | |
| # Extract custom headers for Excel export | |
| excel_custom_headers = self._extract_custom_headers(query) | |
| excel_filepath, excel_error = self._generate_enhanced_excel_export(query, rag_response, cited_pages, page_scores, excel_custom_headers) | |
| if excel_error: | |
| print(f"β οΈ Excel export generation failed: {excel_error}") | |
| # Strategy 4: Combine sections for clean response with export information | |
| export_info = "" | |
| if doc_filepath: | |
| export_info += f""" | |
| π **Comprehensive Report Generated**: | |
| β’ **Format**: Microsoft Word Document (.docx) | |
| β’ **Content**: Executive summary, detailed analysis, methodology, findings, and appendices | |
| β’ **Download**: Available below | |
| """ | |
| if excel_filepath: | |
| export_info += f""" | |
| π **Enhanced Excel Export Generated**: | |
| β’ **Format**: Microsoft Excel (.xlsx) | |
| β’ **Content**: Multiple sheets with data, summary, and charts | |
| β’ **Features**: Formatted tables, auto-generated charts, source analysis | |
| β’ **Download**: Available below | |
| """ | |
| if csv_filepath: | |
| export_info += f""" | |
| π **CSV Table Generated**: | |
| β’ **Format**: Comma-Separated Values (.csv) | |
| β’ **Content**: Structured data table | |
| β’ **Download**: Available below | |
| """ | |
| final_response = f""" | |
| {enhanced_rag_response} | |
| {citation_text} | |
| {export_info} | |
| """ | |
| return final_response, csv_filepath, doc_filepath, excel_filepath | |
| except Exception as e: | |
| print(f"Error generating multi-page response: {e}") | |
| # Fallback to simple response with enhanced prompt | |
| return rag.get_answer_from_gemini(detailed_prompt, img_paths), None, None, None | |
| # Authentication and team collection methods removed for simplified app | |
| def _is_huggingface_spaces(self): | |
| """Check if running in Hugging Face Spaces environment""" | |
| return ( | |
| os.path.exists("/tmp") and | |
| os.access("/tmp", os.W_OK) and | |
| (os.getenv('SPACE_ID') or os.getenv('HF_SPACE_ID')) | |
| ) | |
| def _get_optimal_base_dir(self): | |
| """Get the optimal base directory based on environment""" | |
| if self._is_huggingface_spaces(): | |
| base_dir = "/tmp/pages" | |
| print(f"π Detected Hugging Face Spaces environment, using: {base_dir}") | |
| else: | |
| # Use relative path from app directory | |
| app_dir = os.path.dirname(os.path.abspath(__file__)) | |
| base_dir = os.path.join(app_dir, "pages") | |
| print(f"π» Using local development path: {base_dir}") | |
| # Ensure directory exists | |
| os.makedirs(base_dir, exist_ok=True) | |
| return base_dir | |
| def _ensure_base_directory(self): | |
| """Ensure the base directory for storing pages exists""" | |
| base_output_dir = self._get_optimal_base_dir() | |
| # Create the base directory if it doesn't exist | |
| if not os.path.exists(base_output_dir): | |
| try: | |
| os.makedirs(base_output_dir, exist_ok=True) | |
| print(f"β Created base directory: {base_output_dir}") | |
| except Exception as e: | |
| print(f"β Failed to create base directory {base_output_dir}: {e}") | |
| # Fallback to current working directory | |
| base_output_dir = os.path.join(os.getcwd(), "pages") | |
| os.makedirs(base_output_dir, exist_ok=True) | |
| print(f"β Using fallback directory: {base_output_dir}") | |
| return base_output_dir | |
| def _debug_file_paths(self, base_output_dir, coll_num, display_page_num): | |
| """Helper function to debug file path issues""" | |
| img_path = os.path.join(base_output_dir, coll_num, f"page_{display_page_num}.png") | |
| path = os.path.join(base_output_dir, coll_num, f"page_{display_page_num}") | |
| # Check if directory exists | |
| dir_path = os.path.dirname(img_path) | |
| dir_exists = os.path.exists(dir_path) | |
| # Check if file exists | |
| file_exists = os.path.exists(img_path) | |
| # Get absolute paths for debugging | |
| abs_img_path = os.path.abspath(img_path) | |
| abs_dir_path = os.path.abspath(dir_path) | |
| print(f"π Path Debug for {coll_num}/page_{display_page_num}:") | |
| print(f" Base dir: {base_output_dir}") | |
| print(f" Directory: {dir_path} (exists: {dir_exists})") | |
| print(f" File: {img_path} (exists: {file_exists})") | |
| print(f" Abs dir: {abs_dir_path}") | |
| print(f" Abs file: {abs_img_path}") | |
| return img_path, path, file_exists | |
| def _cleanup_invalid_collections(self): | |
| """Remove collections that no longer exist in Milvus from indexed_docs""" | |
| invalid_collections = [] | |
| for collection_name in list(self.indexed_docs.keys()): | |
| try: | |
| # Try to create a middleware instance to check if collection exists | |
| middleware = Middleware(collection_name, create_collection=False) | |
| print(f"β Collection {collection_name} is valid") | |
| except Exception as e: | |
| print(f"β οΈ Collection {collection_name} not accessible: {e}") | |
| invalid_collections.append(collection_name) | |
| # Remove invalid collections | |
| for collection_name in invalid_collections: | |
| if collection_name in self.indexed_docs: | |
| del self.indexed_docs[collection_name] | |
| print(f"ποΈ Removed invalid collection: {collection_name}") | |
| return len(invalid_collections) | |
| def _check_collections_exist(self): | |
| # This method should be implemented to check if collections exist in Milvus | |
| pass | |
| def create_ui(): | |
| app = PDFSearchApp() | |
| with gr.Blocks(theme=gr.themes.Ocean(), css="footer{display:none !important}") as demo: | |
| gr.Markdown("# Collar Multimodal RAG Demo - Streamlined") | |
| gr.Markdown("Basic document upload and search (no authentication)") | |
| # Document Upload | |
| with gr.Tab("π Document Upload"): | |
| with gr.Column(): | |
| gr.Markdown("### Upload Documents") | |
| folder_name_input = gr.Textbox( | |
| label="Collection Name (Optional)", | |
| placeholder="Optional name for this document collection" | |
| ) | |
| max_pages_input = gr.Slider( | |
| minimum=1, | |
| maximum=10000, | |
| value=20, | |
| step=10, | |
| label="Max pages to extract and index per document" | |
| ) | |
| file_input = gr.Files( | |
| label="Upload PPTs/PDFs (Multiple files supported)", | |
| file_count="multiple" | |
| ) | |
| upload_btn = gr.Button("Upload", variant="primary") | |
| upload_status = gr.Textbox(label="Upload Status", interactive=False) | |
| # Enhanced Query Tab | |
| with gr.Tab("π Advanced Query"): | |
| with gr.Column(): | |
| gr.Markdown("### Multi-Page Document Search") | |
| query_input = gr.Textbox( | |
| label="Enter your query", | |
| placeholder="Ask about any topic in your documents...", | |
| lines=2 | |
| ) | |
| # Removed number of pages input - always returns top 3 pages | |
| gr.Markdown("π― **Top 3 Pages Mode**: System automatically returns the 3 highest-scoring pages") | |
| search_btn = gr.Button("Search Documents", variant="primary") | |
| gr.Markdown("### Results") | |
| llm_answer = gr.Textbox( | |
| label="AI Response with Citations", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| cited_pages_display = gr.Textbox( | |
| label="Cited Pages", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| path = gr.Textbox(label="Document Paths", interactive=False) | |
| images = gr.Gallery(label="Retrieved Pages", show_label=True, columns=2, rows=2, height="auto") | |
| # Export Downloads Section | |
| gr.Markdown("### π Export Downloads") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| csv_download = gr.File( | |
| label="π CSV Table", | |
| interactive=False, | |
| visible=True | |
| ) | |
| with gr.Column(scale=1): | |
| doc_download = gr.File( | |
| label="π DOC Report", | |
| interactive=False, | |
| visible=True | |
| ) | |
| with gr.Column(scale=1): | |
| excel_download = gr.File( | |
| label="π Excel Export", | |
| interactive=False, | |
| visible=True | |
| ) | |
| # Delete Documents Tab | |
| with gr.Tab("ποΈ Delete Documents"): | |
| with gr.Column(): | |
| gr.Markdown("### Delete Document Collections") | |
| gr.Markdown("β οΈ **Warning**: This will permanently delete documents and their associated data from the system.") | |
| # Show available collections | |
| gr.Markdown("#### Available Collections") | |
| collections_display = gr.Textbox( | |
| label="Current Collections", | |
| interactive=False, | |
| lines=8, | |
| value="No collections available. Upload some documents first." | |
| ) | |
| # Collection selection | |
| collection_dropdown = gr.Dropdown( | |
| label="Select Collection to Delete", | |
| choices=[], | |
| value=None, | |
| allow_custom_value=True, | |
| info="Select a specific collection to delete, or leave empty to delete all collections" | |
| ) | |
| # Delete options | |
| with gr.Row(): | |
| delete_specific_btn = gr.Button("ποΈ Delete Selected Collection", variant="secondary") | |
| delete_all_btn = gr.Button("ποΈ Delete ALL Collections", variant="stop") | |
| # Status output | |
| delete_status = gr.Textbox( | |
| label="Deletion Status", | |
| interactive=False, | |
| lines=6 | |
| ) | |
| # Refresh button | |
| refresh_collections_btn = gr.Button("π Refresh Collections List", variant="secondary") | |
| # Event handlers | |
| upload_btn.click( | |
| fn=app.upload_and_convert, | |
| inputs=[file_input, max_pages_input, folder_name_input], | |
| outputs=[upload_status] | |
| ) | |
| # Query events | |
| search_btn.click( | |
| fn=app.search_documents, | |
| inputs=[query_input], | |
| outputs=[path, images, llm_answer, cited_pages_display, csv_download, doc_download, excel_download] | |
| ) | |
| # Delete events | |
| def refresh_collections(): | |
| """Refresh the collections list and dropdown""" | |
| collections_text = app.get_available_collections() | |
| collection_choices = list(app.indexed_docs.keys()) if app.indexed_docs else [] | |
| return collections_text, gr.Dropdown(choices=collection_choices) | |
| def delete_specific_collection(collection_name): | |
| """Delete a specific collection""" | |
| if not collection_name or collection_name.strip() == "": | |
| return "β Please select a collection to delete." | |
| return app.delete_documents(collection_name.strip()) | |
| def delete_all_collections(): | |
| """Delete all collections""" | |
| return app.delete_documents() | |
| # Delete event handlers | |
| refresh_collections_btn.click( | |
| fn=refresh_collections, | |
| outputs=[collections_display, collection_dropdown] | |
| ) | |
| delete_specific_btn.click( | |
| fn=delete_specific_collection, | |
| inputs=[collection_dropdown], | |
| outputs=[delete_status] | |
| ) | |
| delete_all_btn.click( | |
| fn=delete_all_collections, | |
| outputs=[delete_status] | |
| ) | |
| # Initialize collections on page load | |
| demo.load( | |
| fn=refresh_collections, | |
| outputs=[collections_display, collection_dropdown] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_ui() | |
| #demo.launch(auth=("admin", "pass1234")) for with login page config | |
| demo.launch() | |