Spaces:
Sleeping
Sleeping
| # app.py - INDAS Chatbot with Google T5 Model and Enhanced PDF Processing | |
| # Uses Google's Flan-T5 for intelligent responses and proper PDF extraction | |
| import os | |
| import sys | |
| import logging | |
| import traceback | |
| from datetime import datetime | |
| from typing import List, Dict, Optional, Tuple | |
| import re | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Progressive library imports with fallbacks | |
| def safe_import_libraries(): | |
| """Safely import libraries with detailed error reporting""" | |
| imports = {} | |
| try: | |
| import gradio as gr | |
| imports['gradio'] = gr | |
| logger.info("β Gradio imported") | |
| except ImportError as e: | |
| logger.error(f"β Gradio failed: {e}") | |
| raise | |
| try: | |
| import torch | |
| imports['torch'] = torch | |
| logger.info(f"β PyTorch imported: {torch.__version__}") | |
| except ImportError as e: | |
| logger.error(f"β PyTorch failed: {e}") | |
| imports['torch'] = None | |
| try: | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline | |
| imports['transformers'] = (AutoTokenizer, AutoModelForSeq2SeqLM, pipeline) | |
| logger.info("β Transformers imported") | |
| except ImportError as e: | |
| logger.error(f"β Transformers failed: {e}") | |
| imports['transformers'] = None | |
| try: | |
| from sentence_transformers import SentenceTransformer | |
| imports['sentence_transformers'] = SentenceTransformer | |
| logger.info("β Sentence Transformers imported") | |
| except ImportError as e: | |
| logger.error(f"β Sentence Transformers failed: {e}") | |
| imports['sentence_transformers'] = None | |
| try: | |
| import PyPDF2 | |
| imports['PyPDF2'] = PyPDF2 | |
| logger.info("β PyPDF2 imported") | |
| except ImportError as e: | |
| logger.error(f"β PyPDF2 failed: {e}") | |
| imports['PyPDF2'] = None | |
| try: | |
| import pdfminer | |
| from pdfminer.high_level import extract_text | |
| from pdfminer.layout import LAParams | |
| imports['pdfminer'] = { | |
| 'extract_text': extract_text, | |
| 'LAParams': LAParams | |
| } | |
| logger.info("β PDFMiner.six imported") | |
| except ImportError as e: | |
| logger.error(f"β PDFMiner.six failed: {e}") | |
| imports['pdfminer'] = None | |
| try: | |
| import numpy as np | |
| imports['numpy'] = np | |
| logger.info("β NumPy imported") | |
| except ImportError as e: | |
| logger.error(f"β NumPy failed: {e}") | |
| imports['numpy'] = None | |
| return imports | |
| # Import libraries | |
| libs = safe_import_libraries() | |
| gr = libs['gradio'] | |
| torch = libs['torch'] | |
| np = libs['numpy'] | |
| class EnhancedPDFProcessor: | |
| """Enhanced PDF processor with PDFMiner.six, PyPDF2, and fallback methods""" | |
| def __init__(self): | |
| self.pdf_lib = libs.get('PyPDF2') | |
| self.pdfminer = libs.get('pdfminer') | |
| self.available = self.pdf_lib is not None or self.pdfminer is not None | |
| extraction_methods = [] | |
| if self.pdfminer: | |
| extraction_methods.append("PDFMiner.six") | |
| if self.pdf_lib: | |
| extraction_methods.append("PyPDF2") | |
| extraction_methods.append("Built-in fallback") | |
| logger.info(f"π PDF Processor - Available methods: {', '.join(extraction_methods)}") | |
| def extract_text_from_pdf(self, pdf_file) -> Tuple[bool, str]: | |
| """Extract text from PDF using PDFMiner.six as primary method, then fallbacks""" | |
| # Try PDFMiner.six first (most robust) | |
| if self.pdfminer: | |
| success, text = self._extract_with_pdfminer(pdf_file) | |
| if success and text and self._is_meaningful_text(text): | |
| logger.info(f"β PDFMiner.six extraction successful: {len(text)} characters") | |
| return True, text | |
| else: | |
| logger.warning("β οΈ PDFMiner.six extraction produced no meaningful text") | |
| # Try PyPDF2 as backup | |
| if self.pdf_lib: | |
| success, text = self._extract_with_pypdf2(pdf_file) | |
| if success and text and self._is_meaningful_text(text): | |
| logger.info(f"β PyPDF2 extraction successful: {len(text)} characters") | |
| return True, text | |
| else: | |
| logger.warning("β οΈ PyPDF2 extraction produced no meaningful text") | |
| # Fall back to manual extraction methods | |
| return self._fallback_pdf_extraction(pdf_file) | |
| def _extract_with_pdfminer(self, pdf_file) -> Tuple[bool, str]: | |
| """Extract text using PDFMiner.six with optimized settings""" | |
| try: | |
| logger.info("π Starting PDFMiner.six extraction...") | |
| # Reset file pointer | |
| if hasattr(pdf_file, 'seek'): | |
| pdf_file.seek(0) | |
| # Configure layout analysis parameters for better text extraction | |
| laparams = self.pdfminer['LAParams']( | |
| line_margin=0.5, # Merge lines closer together | |
| char_margin=2.0, # Group characters into words | |
| word_margin=0.1, # Space between words | |
| boxes_flow=0.5, # Maintain reading order | |
| strip_control=True # Remove control characters | |
| ) | |
| # Extract text with optimized parameters | |
| text = self.pdfminer['extract_text']( | |
| pdf_file, | |
| laparams=laparams, | |
| maxpages=50, # Limit pages for performance | |
| password="", # Try empty password | |
| codec='utf-8', | |
| check_extractable=True | |
| ) | |
| if text and text.strip(): | |
| # Clean the extracted text | |
| cleaned_text = self._clean_pdfminer_text(text) | |
| if cleaned_text and len(cleaned_text.strip()) > 50: | |
| return True, cleaned_text | |
| else: | |
| logger.warning("β οΈ PDFMiner text not meaningful after cleaning") | |
| return False, "PDFMiner extracted text but it appears to be garbled" | |
| else: | |
| return False, "PDFMiner found no text content" | |
| except Exception as e: | |
| logger.error(f"β PDFMiner.six extraction failed: {e}") | |
| return False, f"PDFMiner extraction error: {str(e)}" | |
| def _extract_with_pypdf2(self, pdf_file) -> Tuple[bool, str]: | |
| """Extract text using PyPDF2 with improved handling""" | |
| try: | |
| logger.info("π Starting PyPDF2 extraction...") | |
| # Reset file pointer | |
| if hasattr(pdf_file, 'seek'): | |
| pdf_file.seek(0) | |
| pdf_reader = self.pdf_lib.PdfReader(pdf_file) | |
| text_content = [] | |
| total_pages = len(pdf_reader.pages) | |
| logger.info(f"π PDF has {total_pages} pages") | |
| # Limit pages for performance | |
| max_pages = min(total_pages, 50) | |
| for page_num, page in enumerate(pdf_reader.pages[:max_pages]): | |
| try: | |
| page_text = page.extract_text() | |
| if page_text and page_text.strip(): | |
| cleaned_text = self._clean_extracted_text(page_text) | |
| if cleaned_text and self._is_meaningful_text(cleaned_text): | |
| text_content.append(f"--- Page {page_num + 1} ---\n{cleaned_text}") | |
| logger.info(f"β Extracted text from page {page_num + 1}") | |
| else: | |
| logger.debug(f"β οΈ Page {page_num + 1} text not meaningful after cleaning") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Could not extract from page {page_num + 1}: {e}") | |
| continue | |
| if text_content: | |
| full_text = "\n\n".join(text_content) | |
| if len(full_text.strip()) > 100: | |
| return True, full_text | |
| else: | |
| return False, "PyPDF2 extracted minimal text" | |
| else: | |
| return False, "PyPDF2 found no readable text" | |
| except Exception as e: | |
| logger.error(f"β PyPDF2 extraction failed: {e}") | |
| return False, f"PyPDF2 extraction error: {str(e)}" | |
| def _clean_pdfminer_text(self, text: str) -> str: | |
| """Clean text extracted by PDFMiner.six""" | |
| if not text: | |
| return "" | |
| # PDFMiner.six usually provides cleaner text, but still needs some processing | |
| # Remove excessive whitespace while preserving structure | |
| text = re.sub(r'\n\s*\n\s*\n', '\n\n', text) # Multiple newlines to double | |
| text = re.sub(r'[ \t]+', ' ', text) # Multiple spaces/tabs to single space | |
| # Remove control characters except newlines and tabs | |
| text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) | |
| # Fix common PDF extraction issues | |
| text = re.sub(r'([a-z])([A-Z])', r'\1 \2', text) # Add space between camelCase | |
| text = re.sub(r'(\w)(\d)', r'\1 \2', text) # Space between word and number | |
| text = re.sub(r'(\d)(\w)', r'\1 \2', text) # Space between number and word | |
| # Clean up spacing around punctuation | |
| text = re.sub(r'\s+([,.;:!?])', r'\1', text) # Remove space before punctuation | |
| text = re.sub(r'([,.;:!?])([A-Za-z])', r'\1 \2', text) # Add space after punctuation | |
| return text.strip() | |
| def _fallback_pdf_extraction(self, pdf_file) -> Tuple[bool, str]: | |
| """Fallback PDF extraction using multiple encoding methods""" | |
| try: | |
| logger.info("π Using enhanced fallback PDF extraction...") | |
| # Read as bytes | |
| if hasattr(pdf_file, 'read'): | |
| pdf_data = pdf_file.read() | |
| if hasattr(pdf_file, 'seek'): | |
| pdf_file.seek(0) | |
| else: | |
| with open(pdf_file, 'rb') as f: | |
| pdf_data = f.read() | |
| # Try multiple extraction methods | |
| extracted_text = self._try_multiple_extraction_methods(pdf_data) | |
| if extracted_text: | |
| logger.info(f"β Fallback extraction successful: {len(extracted_text)} characters") | |
| return True, extracted_text | |
| return False, "Could not extract readable text from PDF. This might be a scanned document or have complex formatting." | |
| except Exception as e: | |
| logger.error(f"β Fallback extraction failed: {e}") | |
| return False, f"PDF extraction error: {str(e)}" | |
| def _try_multiple_extraction_methods(self, pdf_data: bytes) -> str: | |
| """Try multiple methods to extract readable text from PDF bytes""" | |
| extraction_methods = [ | |
| self._extract_with_pdfminer_bytes, | |
| self._extract_with_pypdf_fallback, | |
| self._extract_readable_content, | |
| self._extract_from_content_streams, | |
| self._extract_text_objects, | |
| self._basic_text_patterns | |
| ] | |
| for method in extraction_methods: | |
| try: | |
| result = method(pdf_data) | |
| if result and len(result.strip()) > 50 and self._is_meaningful_text(result): | |
| logger.info(f"β Successful extraction using {method.__name__}") | |
| return result | |
| except Exception as e: | |
| logger.debug(f"Method {method.__name__} failed: {e}") | |
| continue | |
| return "" | |
| def _extract_with_pdfminer_bytes(self, pdf_data: bytes) -> str: | |
| """Try PDFMiner.six on bytes data""" | |
| if not self.pdfminer: | |
| return "" | |
| try: | |
| import io | |
| pdf_stream = io.BytesIO(pdf_data) | |
| laparams = self.pdfminer['LAParams']( | |
| line_margin=0.5, | |
| char_margin=2.0, | |
| word_margin=0.1, | |
| boxes_flow=0.5, | |
| strip_control=True | |
| ) | |
| text = self.pdfminer['extract_text']( | |
| pdf_stream, | |
| laparams=laparams, | |
| maxpages=20, | |
| codec='utf-8' | |
| ) | |
| if text: | |
| cleaned = self._clean_pdfminer_text(text) | |
| return cleaned if self._is_meaningful_text(cleaned) else "" | |
| return "" | |
| except Exception as e: | |
| logger.debug(f"PDFMiner bytes extraction failed: {e}") | |
| return "" | |
| def _extract_with_pypdf_fallback(self, pdf_data: bytes) -> str: | |
| """Try to use PyPDF2 directly on bytes data""" | |
| if not self.pdf_lib: | |
| return "" | |
| try: | |
| import io | |
| pdf_stream = io.BytesIO(pdf_data) | |
| pdf_reader = self.pdf_lib.PdfReader(pdf_stream) | |
| text_parts = [] | |
| for page in pdf_reader.pages: | |
| try: | |
| text = page.extract_text() | |
| if text and self._is_meaningful_text(text): | |
| text_parts.append(text) | |
| except: | |
| continue | |
| return ' '.join(text_parts) if text_parts else "" | |
| except: | |
| return "" | |
| def _extract_readable_content(self, pdf_data: bytes) -> str: | |
| """Extract readable content using improved pattern matching""" | |
| try: | |
| # Try different encodings | |
| for encoding in ['utf-8', 'latin-1', 'cp1252', 'ascii']: | |
| try: | |
| pdf_text = pdf_data.decode(encoding, errors='ignore') | |
| # Look for text between common PDF text markers | |
| patterns = [ | |
| r'BT\s+.*?(?:\((.*?)\)\s*Tj\s*)+.*?ET', # Text objects | |
| r'\((.*?)\)\s*Tj', # Text show operators | |
| r'\[(.*?)\]\s*TJ', # Text show with array | |
| r'/F\d+\s+\d+\s+Tf\s*\((.*?)\)', # Font followed by text | |
| ] | |
| extracted_text = [] | |
| for pattern in patterns: | |
| matches = re.findall(pattern, pdf_text, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| clean_text = self._deep_clean_text(match) | |
| if clean_text and len(clean_text) > 10: | |
| extracted_text.append(clean_text) | |
| if extracted_text: | |
| result = ' '.join(extracted_text) | |
| if self._is_meaningful_text(result): | |
| return result | |
| except UnicodeDecodeError: | |
| continue | |
| return "" | |
| except: | |
| return "" | |
| def _extract_from_content_streams(self, pdf_data: bytes) -> str: | |
| """Extract text from PDF content streams""" | |
| try: | |
| pdf_text = pdf_data.decode('latin-1', errors='ignore') | |
| # Find content streams | |
| stream_pattern = r'stream\s*(.*?)\s*endstream' | |
| streams = re.findall(stream_pattern, pdf_text, re.DOTALL) | |
| readable_parts = [] | |
| for stream in streams: | |
| # Look for text commands in streams | |
| text_commands = re.findall(r'\((.*?)\)\s*[Tt][jJ]', stream) | |
| for command in text_commands: | |
| clean_text = self._deep_clean_text(command) | |
| if clean_text and len(clean_text) > 5: | |
| readable_parts.append(clean_text) | |
| return ' '.join(readable_parts) if readable_parts else "" | |
| except: | |
| return "" | |
| def _extract_text_objects(self, pdf_data: bytes) -> str: | |
| """Extract text from PDF text objects""" | |
| try: | |
| pdf_text = pdf_data.decode('latin-1', errors='ignore') | |
| # Pattern for text objects with multiple text commands | |
| text_object_pattern = r'BT\s+(.*?)\s+ET' | |
| text_objects = re.findall(text_object_pattern, pdf_text, re.DOTALL) | |
| extracted_parts = [] | |
| for obj in text_objects: | |
| # Extract all text show commands from this object | |
| text_shows = re.findall(r'\((.*?)\)\s*[Tt][jJ]', obj) | |
| for show in text_shows: | |
| clean_text = self._deep_clean_text(show) | |
| if clean_text and len(clean_text) > 3: | |
| extracted_parts.append(clean_text) | |
| return ' '.join(extracted_parts) if extracted_parts else "" | |
| except: | |
| return "" | |
| def _basic_text_patterns(self, pdf_data: bytes) -> str: | |
| """Extract using basic text patterns as last resort""" | |
| try: | |
| pdf_text = pdf_data.decode('latin-1', errors='ignore') | |
| # Look for any text in parentheses that might be readable | |
| all_parens = re.findall(r'\(([^)]{3,100})\)', pdf_text) | |
| readable_parts = [] | |
| for text in all_parens: | |
| clean_text = self._deep_clean_text(text) | |
| if clean_text and self._is_basic_readable(clean_text): | |
| readable_parts.append(clean_text) | |
| # Remove duplicates while preserving order | |
| unique_parts = list(dict.fromkeys(readable_parts)) | |
| return ' '.join(unique_parts) if unique_parts else "" | |
| except: | |
| return "" | |
| def _deep_clean_text(self, text: str) -> str: | |
| """Deep cleaning of extracted PDF text""" | |
| if not text: | |
| return "" | |
| # Remove PDF escape sequences | |
| text = re.sub(r'\\[nrtbf\\()0-7]+', ' ', text) | |
| # Remove control characters but keep basic punctuation | |
| text = re.sub(r'[\x00-\x1f\x7f-\x9f]', ' ', text) | |
| # Remove non-printable characters except letters, numbers, spaces, and basic punctuation | |
| text = re.sub(r'[^\w\s.,;:!?()[\]{}\'"/-]', ' ', text) | |
| # Handle common PDF encoding issues | |
| replacements = { | |
| r'\\n': ' ', | |
| r'\\r': ' ', | |
| r'\\t': ' ', | |
| r'\\\\': ' ', | |
| r'\s+': ' ', # Multiple spaces to single space | |
| } | |
| for pattern, replacement in replacements.items(): | |
| text = re.sub(pattern, replacement, text) | |
| text = text.strip() | |
| # Filter out obvious garbage | |
| if len(text) < 3: | |
| return "" | |
| # Check if it's mostly special characters or numbers | |
| alpha_chars = sum(1 for c in text if c.isalpha()) | |
| total_chars = len(text) | |
| if total_chars > 0 and alpha_chars / total_chars < 0.3: | |
| return "" | |
| return text | |
| def _is_basic_readable(self, text: str) -> bool: | |
| """Basic check for readable text""" | |
| if not text or len(text) < 3: | |
| return False | |
| # Must have at least some letters | |
| if not re.search(r'[a-zA-Z]', text): | |
| return False | |
| # Shouldn't be mostly numbers or special characters | |
| letters = sum(1 for c in text if c.isalpha()) | |
| return letters >= 3 and letters / len(text) > 0.2 | |
| def _extract_readable_patterns(self, pdf_text: str) -> str: | |
| """Extract readable text patterns from PDF content""" | |
| extracted_parts = [] | |
| # Method 1: Text in parentheses (most common) | |
| paren_matches = re.findall(r'\(([^)]{5,})\)', pdf_text) | |
| for match in paren_matches: | |
| clean_match = self._clean_extracted_text(match) | |
| if clean_match and self._is_meaningful_text(clean_match): | |
| extracted_parts.append(clean_match) | |
| # Method 2: Text after Tj commands | |
| tj_matches = re.findall(r'\(([^)]+)\)\s*Tj', pdf_text) | |
| for match in tj_matches: | |
| clean_match = self._clean_extracted_text(match) | |
| if clean_match and self._is_meaningful_text(clean_match): | |
| extracted_parts.append(clean_match) | |
| # Method 3: Text in square brackets | |
| bracket_matches = re.findall(r'\[([^\]]{10,})\]', pdf_text) | |
| for match in bracket_matches: | |
| # Remove PDF formatting codes | |
| clean_match = re.sub(r'\([^)]*\)', ' ', match) | |
| clean_match = self._clean_extracted_text(clean_match) | |
| if clean_match and self._is_meaningful_text(clean_match): | |
| extracted_parts.append(clean_match) | |
| # Remove duplicates while preserving order | |
| unique_parts = list(dict.fromkeys(extracted_parts)) | |
| return ' '.join(unique_parts) | |
| def _clean_extracted_text(self, text: str) -> str: | |
| """Enhanced text cleaning for PDF extracted content""" | |
| if not text: | |
| return "" | |
| # Remove PDF escape sequences and control characters | |
| text = re.sub(r'\\[nrtbf\\()0-7]', ' ', text) | |
| text = re.sub(r'[\x00-\x1f\x7f-\x9f]', ' ', text) | |
| # Remove common PDF artifacts | |
| text = re.sub(r'[^\x20-\x7E\s]', '', text) # Keep only printable ASCII | |
| # Remove excessive whitespace and normalize | |
| text = re.sub(r'\s+', ' ', text) | |
| text = text.strip() | |
| # Remove obvious garbage patterns | |
| if re.match(r'^[^a-zA-Z]*$', text): # No letters at all | |
| return "" | |
| return text | |
| def _clean_text(self, text: str) -> str: | |
| """Clean extracted text""" | |
| if not text: | |
| return "" | |
| # Remove PDF escape sequences | |
| text = re.sub(r'\\[nrtbf\\()]', ' ', text) | |
| # Remove control characters | |
| text = re.sub(r'[\x00-\x1f\x7f-\x9f]', ' ', text) | |
| # Normalize whitespace | |
| text = re.sub(r'\s+', ' ', text) | |
| return text.strip() | |
| def _is_meaningful_text(self, text: str) -> bool: | |
| """Enhanced check if text is meaningful and readable""" | |
| if not text or len(text) < 5: | |
| return False | |
| # Remove whitespace for analysis | |
| clean_text = text.strip() | |
| if len(clean_text) < 5: | |
| return False | |
| # Check for reasonable letter content | |
| letters = sum(1 for c in clean_text if c.isalpha()) | |
| numbers = sum(1 for c in clean_text if c.isdigit()) | |
| total = len(clean_text) | |
| if total == 0: | |
| return False | |
| letter_ratio = letters / total | |
| # Reject if mostly numbers or special characters | |
| if letter_ratio < 0.3: | |
| return False | |
| # Check for excessive special characters (indicates garbled text) | |
| special_chars = sum(1 for c in clean_text if not c.isalnum() and c not in ' .,;:!?-()[]{}"\'/\\') | |
| special_ratio = special_chars / total if total > 0 else 1 | |
| # Reject if too many special characters | |
| if special_ratio > 0.4: | |
| return False | |
| # Check for patterns that indicate garbled text | |
| garbled_patterns = [ | |
| r'[}{]{3,}', # Multiple curly braces | |
| r'[@#$%^&*]{3,}', # Multiple special symbols | |
| r'[A-Z]{10,}', # Too many consecutive uppercase letters | |
| r'[\d\W]{20,}', # Long sequences of numbers and non-word chars | |
| r'[^\w\s]{5,}', # Long sequences of non-word, non-space chars | |
| ] | |
| for pattern in garbled_patterns: | |
| if re.search(pattern, clean_text): | |
| return False | |
| # Check for common English words (indicates readable text) | |
| common_words = [ | |
| 'the', 'and', 'or', 'of', 'to', 'in', 'for', 'is', 'are', 'with', 'that', 'this', 'as', 'by', 'on', 'at', | |
| 'be', 'have', 'will', 'shall', 'may', 'can', 'should', 'would', 'could', 'must', 'not', 'but', 'from', | |
| 'accounting', 'standard', 'financial', 'entity', 'amount', 'cost', 'value', 'asset', 'liability', | |
| 'revenue', 'expense', 'income', 'statement', 'balance', 'cash', 'flow', 'depreciation', 'impairment' | |
| ] | |
| text_lower = clean_text.lower() | |
| word_count = sum(1 for word in common_words if word in text_lower) | |
| # Must have word structure (spaces between words) | |
| has_spaces = ' ' in clean_text | |
| # Check for reasonable word length distribution | |
| if has_spaces: | |
| words = clean_text.split() | |
| if words: | |
| avg_word_length = sum(len(word) for word in words) / len(words) | |
| # Reasonable average word length (2-15 characters) | |
| if avg_word_length < 2 or avg_word_length > 15: | |
| return False | |
| return (letter_ratio > 0.3 and # At least 30% letters | |
| special_ratio < 0.4 and # Less than 40% special characters | |
| has_spaces and # Has spaces (indicates words) | |
| not clean_text.isdigit() and # Not just numbers | |
| (word_count > 0 or len(clean_text) > 30)) # Has common words or substantial length | |
| def chunk_text(self, text: str, chunk_size: int = 300, overlap: int = 50) -> List[str]: | |
| """Split text into overlapping chunks""" | |
| if not text or len(text.strip()) < 50: | |
| return [] | |
| # Split into sentences first for better chunking | |
| sentences = re.split(r'[.!?]+', text) | |
| sentences = [s.strip() for s in sentences if len(s.strip()) > 10] | |
| # Join sentences and split into words | |
| clean_text = '. '.join(sentences) | |
| words = clean_text.split() | |
| chunks = [] | |
| for i in range(0, len(words), chunk_size - overlap): | |
| chunk = " ".join(words[i:i + chunk_size]) | |
| if len(chunk.strip()) > 50: | |
| chunks.append(chunk.strip()) | |
| # Limit chunks for memory efficiency | |
| if len(chunks) >= 100: | |
| break | |
| logger.info(f"β Created {len(chunks)} text chunks") | |
| return chunks | |
| def process_pdf(self, pdf_file) -> Tuple[bool, List[str], str]: | |
| """Complete PDF processing pipeline""" | |
| if not pdf_file: | |
| return False, [], "No PDF file provided" | |
| logger.info("π Starting PDF processing...") | |
| # Extract text | |
| success, text_or_error = self.extract_text_from_pdf(pdf_file) | |
| if not success: | |
| return False, [], text_or_error | |
| # Create chunks | |
| chunks = self.chunk_text(text_or_error) | |
| if not chunks: | |
| return False, [], "No meaningful text chunks could be created" | |
| message = f"Successfully processed PDF: {len(chunks)} chunks created" | |
| return True, chunks, message | |
| class GoogleT5Model: | |
| """Google T5 model for intelligent responses - optimized for Hugging Face Spaces""" | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.available = False | |
| # Use even smaller model for HF Spaces compatibility | |
| self.model_name = "google/flan-t5-base" | |
| if libs.get('torch') and libs.get('transformers'): | |
| self._initialize_model() | |
| def _initialize_model(self): | |
| """Initialize Google T5 model with HF Spaces optimizations""" | |
| try: | |
| logger.info(f"π€ Loading T5 model optimized for HF Spaces: {self.model_name}") | |
| AutoTokenizer, AutoModelForSeq2SeqLM, pipeline = libs['transformers'] | |
| # Load with memory optimizations for HF Spaces | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| self.model_name, | |
| use_fast=True # Use fast tokenizer | |
| ) | |
| # Load model with aggressive memory optimization | |
| self.model = AutoModelForSeq2SeqLM.from_pretrained( | |
| self.model_name, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32, | |
| low_cpu_mem_usage=True, | |
| device_map="auto" if torch.cuda.is_available() else "cpu" | |
| ) | |
| # Set device | |
| self.device = "cuda" if torch.cuda.is_available() else "cpu" | |
| if not torch.cuda.is_available(): | |
| self.model = self.model.to("cpu") | |
| # Set to evaluation mode | |
| self.model.eval() | |
| # Quick test with minimal resources | |
| self._test_model() | |
| self.available = True | |
| logger.info(f"β T5 model loaded successfully on {self.device}") | |
| except Exception as e: | |
| logger.warning(f"β οΈ T5 model initialization failed (this is normal on resource-constrained environments): {e}") | |
| self.available = False | |
| def _test_model(self): | |
| """Lightweight model test""" | |
| try: | |
| test_input = "What is INDAS?" | |
| inputs = self.tokenizer(test_input, return_tensors="pt", max_length=128, truncation=True) | |
| if hasattr(self, 'device'): | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_length=50, | |
| num_return_sequences=1, | |
| do_sample=False # Greedy decoding for consistency | |
| ) | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| logger.info(f"β Model test successful: {response[:30]}...") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Model test failed: {e}") | |
| raise e | |
| def generate_response(self, prompt: str, max_length: int = 150) -> str: | |
| """Generate response with resource optimization""" | |
| if not self.available: | |
| return None | |
| try: | |
| # Shorter, more efficient prompt | |
| formatted_prompt = f"Explain: {prompt[:200]}" # Limit input length | |
| # Tokenize with strict limits | |
| inputs = self.tokenizer( | |
| formatted_prompt, | |
| return_tensors="pt", | |
| max_length=256, # Reduced max length | |
| truncation=True, | |
| padding=False | |
| ) | |
| # Move to device if available | |
| if hasattr(self, 'device'): | |
| inputs = {k: v.to(self.device) for k, v in inputs.items()} | |
| # Generate with memory optimization | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_length=max_length, | |
| num_return_sequences=1, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.9, | |
| early_stopping=True, | |
| pad_token_id=self.tokenizer.eos_token_id | |
| ) | |
| # Decode response | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| # Clean response | |
| response = response.strip() | |
| if response.lower().startswith("explain:"): | |
| response = response[8:].strip() | |
| return response | |
| except Exception as e: | |
| logger.warning(f"T5 generation error (falling back to knowledge base): {e}") | |
| return None | |
| class SemanticSearch: | |
| """Lightweight semantic search optimized for Hugging Face Spaces""" | |
| def __init__(self): | |
| self.embedder = None | |
| self.chunks = [] | |
| self.embeddings = None | |
| self.available = False | |
| if libs.get('sentence_transformers') and libs.get('numpy'): | |
| self._initialize_embedder() | |
| def _initialize_embedder(self): | |
| """Initialize lightweight sentence transformer""" | |
| try: | |
| SentenceTransformer = libs['sentence_transformers'] | |
| # Use a smaller, faster model for HF Spaces | |
| model_name = 'all-MiniLM-L6-v2' # Small but effective | |
| logger.info(f"π Loading semantic search model: {model_name}") | |
| self.embedder = SentenceTransformer(model_name) | |
| # Test with a simple encoding | |
| test_text = "test semantic search" | |
| test_embedding = self.embedder.encode([test_text]) | |
| self.available = True | |
| logger.info("β Semantic search initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"β οΈ Semantic search initialization failed (using basic search): {e}") | |
| self.available = False | |
| def index_chunks(self, chunks: List[str]) -> bool: | |
| """Index PDF chunks with memory optimization""" | |
| if not self.available or not chunks: | |
| return False | |
| try: | |
| # Limit chunks for memory efficiency on HF Spaces | |
| max_chunks = 50 | |
| if len(chunks) > max_chunks: | |
| logger.info(f"π Limiting chunks to {max_chunks} for memory efficiency") | |
| chunks = chunks[:max_chunks] | |
| self.chunks = chunks | |
| # Encode in smaller batches to avoid memory issues | |
| batch_size = 10 | |
| embeddings_list = [] | |
| for i in range(0, len(chunks), batch_size): | |
| batch = chunks[i:i + batch_size] | |
| batch_embeddings = self.embedder.encode(batch) | |
| embeddings_list.append(batch_embeddings) | |
| # Combine all embeddings | |
| self.embeddings = np.vstack(embeddings_list) | |
| logger.info(f"β Indexed {len(chunks)} chunks for semantic search") | |
| return True | |
| except Exception as e: | |
| logger.warning(f"β οΈ Indexing failed (falling back to basic search): {e}") | |
| return False | |
| def search(self, query: str, top_k: int = 2) -> List[Tuple[str, float]]: | |
| """Lightweight search for relevant chunks""" | |
| if not self.available or not self.chunks: | |
| return [] | |
| try: | |
| # Encode query | |
| query_embedding = self.embedder.encode([query]) | |
| # Compute similarities | |
| similarities = np.dot(query_embedding, self.embeddings.T).flatten() | |
| # Get top results with lower threshold for HF Spaces | |
| top_indices = np.argsort(similarities)[::-1][:top_k] | |
| results = [] | |
| for idx in top_indices: | |
| if similarities[idx] > 0.15: # Lower threshold | |
| results.append((self.chunks[idx], similarities[idx])) | |
| return results | |
| except Exception as e: | |
| logger.warning(f"Search error (using fallback): {e}") | |
| return [] | |
| class INDASChatbotWithT5: | |
| """INDAS Chatbot with Google T5 model and enhanced PDF processing""" | |
| def __init__(self): | |
| self.pdf_processor = EnhancedPDFProcessor() | |
| self.t5_model = GoogleT5Model() | |
| self.semantic_search = SemanticSearch() | |
| self.pdf_content_available = False | |
| self.pdf_chunks = [] # Add this line to fix the issue | |
| self.conversation_history = [] | |
| # Enhanced expert knowledge base with better matching | |
| self.expert_knowledge = { | |
| # INDAS Standards | |
| "indas 1": "INDAS 1 - Presentation of Financial Statements: This standard establishes the basis for presentation of general purpose financial statements to ensure comparability both with the entity's financial statements of previous periods and with the financial statements of other entities. It sets out overall requirements for the presentation of financial statements, guidelines for their structure and minimum requirements for their content.", | |
| "indas 7": "INDAS 7 - Statement of Cash Flows: This standard requires entities to provide information about the historical changes in cash and cash equivalents of an entity by means of a cash flow statement which classifies cash flows during the period from operating, investing and financing activities.", | |
| "indas 8": "INDAS 8 - Accounting Policies, Changes in Accounting Estimates and Errors: This standard prescribes the criteria for selecting and changing accounting policies, together with the accounting treatment and disclosure of changes in accounting policies, changes in accounting estimates and corrections of errors.", | |
| "indas 16": "INDAS 16 - Property, Plant and Equipment: This standard prescribes the accounting treatment for property, plant and equipment including recognition, measurement, depreciation and impairment. An item of property, plant and equipment should be recognised as an asset when it is probable that future economic benefits will flow to the entity and the cost can be measured reliably.", | |
| "indas 36": "INDAS 36 - Impairment of Assets: This standard ensures that assets are carried at no more than their recoverable amount. An asset is carried at more than its recoverable amount if its carrying amount exceeds the amount to be recovered through use or sale of the asset. The standard establishes procedures to ensure that assets are carried at no more than their recoverable amount.", | |
| "indas 38": "INDAS 38 - Intangible Assets: This standard prescribes the accounting treatment for intangible assets that are not dealt with specifically in another Ind AS. An intangible asset is an identifiable non-monetary asset without physical substance.", | |
| "indas 109": "INDAS 109 - Financial Instruments: This standard establishes principles for financial reporting of financial assets and financial liabilities. It establishes the framework for classification and measurement, impairment, and hedge accounting of financial instruments.", | |
| "indas 115": "INDAS 115 - Revenue from Contracts with Customers: This standard establishes a comprehensive framework for determining whether, how much and when revenue is recognised. It replaces existing revenue recognition guidance. The core principle is that an entity recognises revenue to depict the transfer of promised goods or services to customers in an amount that reflects the consideration to which the entity expects to be entitled in exchange for those goods or services.", | |
| # Topic-based responses | |
| "revenue recognition": "Revenue recognition under INDAS 115 follows a five-step model: 1) Identify the contract(s) with a customer, 2) Identify the performance obligations in the contract, 3) Determine the transaction price, 4) Allocate the transaction price to the performance obligations, 5) Recognise revenue when (or as) the entity satisfies a performance obligation.", | |
| "depreciation": "Under INDAS 16, depreciation is the systematic allocation of the depreciable amount of an asset over its useful life. The depreciable amount is the cost of an asset less its residual value. Each part of an item of property, plant and equipment with a cost that is significant in relation to the total cost of the item shall be depreciated separately.", | |
| "impairment": "INDAS 36 requires an entity to assess at the end of each reporting period whether there is any indication that an asset may be impaired. If any such indication exists, the entity shall estimate the recoverable amount of the asset. The recoverable amount is the higher of an asset's fair value less costs of disposal and its value in use.", | |
| "financial instruments": "INDAS 109 classifies financial assets into three categories: measured at amortised cost, fair value through other comprehensive income (FVOCI), and fair value through profit or loss (FVTPL). The classification depends on the entity's business model for managing the financial assets and the contractual cash flow characteristics of the financial asset.", | |
| # General INDAS | |
| "indas": "Indian Accounting Standards (INDAS) are accounting standards adopted by companies in India. These standards are largely converged with International Financial Reporting Standards (IFRS) issued by the International Accounting Standards Board (IASB), with certain carve-outs to suit the Indian regulatory and economic environment." | |
| } | |
| # Try to load default PDF | |
| pdf_loaded = self._load_default_pdf() | |
| if pdf_loaded: | |
| logger.info("π€ INDAS Chatbot with T5 initialized - Default INDAS.pdf loaded and ready!") | |
| else: | |
| logger.info("π€ INDAS Chatbot with T5 initialized - Ready for PDF uploads and INDAS questions") | |
| def _load_default_pdf(self): | |
| """Load the INDAS.pdf file from Hugging Face Space repository""" | |
| pdf_file = "INDAS.pdf" | |
| if os.path.exists(pdf_file): | |
| logger.info(f"π Found INDAS.pdf in repository - loading default content...") | |
| try: | |
| with open(pdf_file, 'rb') as f: | |
| result = self.process_pdf(f) | |
| logger.info(f"π Default INDAS.pdf processing result: {result}") | |
| if "β " in result: | |
| logger.info("β Successfully loaded INDAS.pdf - chatbot ready with document content!") | |
| return True | |
| else: | |
| logger.warning("β οΈ INDAS.pdf found but processing failed") | |
| return False | |
| except Exception as e: | |
| logger.error(f"β Error loading INDAS.pdf: {e}") | |
| return False | |
| else: | |
| logger.info("π No INDAS.pdf found in repository - users can upload their own documents") | |
| return False | |
| def process_pdf(self, pdf_file): | |
| """Process uploaded PDF file with enhanced error handling""" | |
| if not pdf_file: | |
| return "β No PDF file provided." | |
| try: | |
| success, chunks, message = self.pdf_processor.process_pdf(pdf_file) | |
| if success and chunks: | |
| # Check if chunks contain readable text | |
| readable_chunks = [chunk for chunk in chunks if self.pdf_processor._is_meaningful_text(chunk)] | |
| if not readable_chunks: | |
| return """β PDF processed but contains no readable text. | |
| **This PDF appears to contain:** | |
| - Scanned images instead of text | |
| - Encrypted or protected content | |
| - Complex formatting that can't be extracted | |
| - Non-standard encoding | |
| **Solutions to try:** | |
| 1. **OCR Conversion**: Use OCR software (like Adobe Acrobat Pro, Google Docs, or online OCR tools) to convert the scanned images to text | |
| 2. **Copy-Paste**: Try selecting and copying text directly from the PDF viewer and paste it into the chat | |
| 3. **Different PDF**: Try a different PDF file that contains selectable text | |
| 4. **Manual Input**: Type specific questions about INDAS standards directly | |
| **You can still ask questions about INDAS standards using the built-in knowledge base!**""" | |
| # Store readable chunks | |
| self.pdf_chunks = readable_chunks | |
| # Try semantic search indexing | |
| if self.semantic_search.index_chunks(readable_chunks): | |
| self.pdf_content_available = True | |
| return f"β PDF processed successfully!\n\nπ **Results:** {len(readable_chunks)} readable sections extracted from {len(chunks)} total chunks.\n\nπ **Smart Search Enabled:** You can now ask specific questions about your document content. The AI will search through your PDF and combine it with expert INDAS knowledge." | |
| else: | |
| # Fallback: still mark as available even without semantic search | |
| self.pdf_content_available = True | |
| return f"β PDF processed successfully!\n\nπ **Results:** {len(readable_chunks)} readable sections found.\n\nπ **Basic Search Available:** Ask questions about your document. The system will use text matching to find relevant content." | |
| else: | |
| return f"""β PDF Processing Failed | |
| **Error:** {message} | |
| **Common causes and solutions:** | |
| - **Scanned PDFs**: Use OCR software to convert to searchable text | |
| - **Password Protected**: Remove password protection first | |
| - **Corrupted File**: Try downloading the PDF again | |
| - **Complex Format**: Some PDFs have non-standard formatting | |
| **Alternative:** You can still ask questions about INDAS standards using the comprehensive built-in knowledge base.""" | |
| except Exception as e: | |
| logger.error(f"PDF processing error: {e}") | |
| return f"""β Technical Error Processing PDF | |
| **Error Details:** {str(e)} | |
| **What you can do:** | |
| 1. Try a different PDF file | |
| 2. Ensure the file isn't corrupted | |
| 3. Ask INDAS questions directly - the system has extensive built-in knowledge | |
| 4. Copy-paste text from your PDF into the chat | |
| **The chatbot is fully functional for INDAS questions even without PDF upload!**""" | |
| def _find_best_knowledge_match(self, query: str) -> str: | |
| """Find the best matching knowledge base entry""" | |
| query_lower = query.lower() | |
| # Direct INDAS number matching (most specific) | |
| indas_patterns = [ | |
| r'indas\s*(\d+)', | |
| r'ind\s*as\s*(\d+)', | |
| r'standard\s*(\d+)' | |
| ] | |
| for pattern in indas_patterns: | |
| match = re.search(pattern, query_lower) | |
| if match: | |
| indas_num = match.group(1) | |
| key = f"indas {indas_num}" | |
| if key in self.expert_knowledge: | |
| return self.expert_knowledge[key] | |
| # Topic-based matching (more specific topics first) | |
| topic_keywords = { | |
| "revenue recognition": ["revenue", "recognition", "contract", "customer", "performance obligation"], | |
| "depreciation": ["depreciation", "depreciate", "useful life", "residual value"], | |
| "impairment": ["impairment", "impaired", "recoverable amount", "value in use"], | |
| "financial instruments": ["financial instrument", "financial asset", "financial liability", "fair value"] | |
| } | |
| best_match = "" | |
| max_score = 0 | |
| for topic, keywords in topic_keywords.items(): | |
| score = sum(1 for keyword in keywords if keyword in query_lower) | |
| if score > max_score: | |
| max_score = score | |
| best_match = topic | |
| if best_match and max_score > 0: | |
| return self.expert_knowledge[best_match] | |
| # Fallback to general INDAS info | |
| return self.expert_knowledge["indas"] | |
| def _search_pdf_content(self, query: str) -> List[str]: | |
| """Search PDF content using available methods with garbled text filtering""" | |
| if not self.pdf_content_available or not self.pdf_chunks: | |
| return [] | |
| # Filter out garbled chunks first | |
| clean_chunks = [] | |
| for chunk in self.pdf_chunks: | |
| if self.pdf_processor._is_meaningful_text(chunk): | |
| clean_chunks.append(chunk) | |
| if not clean_chunks: | |
| logger.warning("No readable PDF content available") | |
| return [] | |
| # Try semantic search first | |
| if self.semantic_search.available: | |
| # Re-index with clean chunks if needed | |
| if len(clean_chunks) != len(self.pdf_chunks): | |
| self.semantic_search.index_chunks(clean_chunks) | |
| results = self.semantic_search.search(query, top_k=2) | |
| return [chunk for chunk, score in results if self.pdf_processor._is_meaningful_text(chunk)] | |
| # Fallback: basic text search | |
| query_words = query.lower().split() | |
| relevant_chunks = [] | |
| for chunk in clean_chunks[:20]: # Limit for performance | |
| chunk_lower = chunk.lower() | |
| score = sum(1 for word in query_words if word in chunk_lower) | |
| if score > 0: | |
| relevant_chunks.append((chunk, score)) | |
| # Sort by relevance and return top results | |
| relevant_chunks.sort(key=lambda x: x[1], reverse=True) | |
| return [chunk for chunk, score in relevant_chunks[:2]] | |
| def generate_response(self, query: str) -> str: | |
| """Generate intelligent response with graceful fallbacks for HF Spaces""" | |
| if not query.strip(): | |
| return "Please ask a question about Indian Accounting Standards (INDAS)." | |
| try: | |
| # Search PDF content if available | |
| pdf_results = self._search_pdf_content(query) | |
| # Get expert knowledge | |
| expert_response = self._find_best_knowledge_match(query) | |
| # Try T5 model for enhanced response (with timeout protection) | |
| ai_response = "" | |
| if self.t5_model.available: | |
| try: | |
| if pdf_results: | |
| context = " ".join(pdf_results[:1])[:300] # Smaller context for HF Spaces | |
| enhanced_prompt = f"Based on INDAS context: {context} Question: {query[:100]}" | |
| else: | |
| enhanced_prompt = f"INDAS question: {query[:150]}" | |
| ai_response = self.t5_model.generate_response(enhanced_prompt, max_length=100) | |
| except Exception as e: | |
| logger.warning(f"T5 model error (using fallback): {e}") | |
| ai_response = "" | |
| # Construct final response with better formatting for HF Spaces | |
| response_parts = [] | |
| # Add AI response if available and meaningful | |
| if ai_response and len(ai_response) > 15 and "error" not in ai_response.lower(): | |
| response_parts.append(f"**π€ AI Analysis:** {ai_response}") | |
| # Always add expert knowledge (most reliable) | |
| response_parts.append(f"**π Expert Knowledge:** {expert_response}") | |
| # Add PDF content if found | |
| if pdf_results: | |
| response_parts.append(f"**π From Your INDAS Document:**") | |
| for i, chunk in enumerate(pdf_results[:2], 1): | |
| # Smaller chunks for better display on HF Spaces | |
| truncated_chunk = chunk[:200] + "..." if len(chunk) > 200 else chunk | |
| response_parts.append(f"{i}. {truncated_chunk}") | |
| final_response = "\n\n".join(response_parts) | |
| # Store conversation with memory management | |
| if len(self.conversation_history) > 20: # Limit memory usage | |
| self.conversation_history = self.conversation_history[-15:] # Keep last 15 | |
| self.conversation_history.append({ | |
| "query": query[:100], # Limit stored query length | |
| "response": final_response[:500], # Limit stored response length | |
| "timestamp": datetime.now().isoformat() | |
| }) | |
| return final_response | |
| except Exception as e: | |
| logger.warning(f"Response generation error (using minimal fallback): {e}") | |
| # Minimal fallback that always works | |
| expert_response = self._find_best_knowledge_match(query) | |
| return f"**π INDAS Expert Knowledge:**\n\n{expert_response}\n\n*Note: Using knowledge base due to system constraints. Your question has been processed successfully.*" | |
| def get_system_status(self) -> Dict: | |
| """Get comprehensive system status""" | |
| pdf_status = "β None" | |
| if self.pdf_processor.pdfminer and self.pdf_processor.pdf_lib: | |
| pdf_status = "β PDFMiner.six + PyPDF2" | |
| elif self.pdf_processor.pdfminer: | |
| pdf_status = "β PDFMiner.six Only" | |
| elif self.pdf_processor.pdf_lib: | |
| pdf_status = "β PyPDF2 Only" | |
| else: | |
| pdf_status = "π§ Built-in Fallback" | |
| return { | |
| "t5_model": "β Available" if self.t5_model.available else "β Not Available", | |
| "pdf_processor": pdf_status, | |
| "semantic_search": "β Available" if self.semantic_search.available else "β Basic Only", | |
| "pdf_content": "β Loaded" if self.pdf_content_available else "β None", | |
| "conversations": len(self.conversation_history) | |
| } | |
| # Initialize chatbot | |
| logger.info("π Initializing INDAS Chatbot with Google T5...") | |
| try: | |
| chatbot = INDASChatbotWithT5() | |
| logger.info("β Chatbot ready!") | |
| except Exception as e: | |
| logger.error(f"β Chatbot initialization failed: {e}") | |
| chatbot = None | |
| def create_interface(): | |
| """Create advanced Gradio interface""" | |
| def chat_function(message, history): | |
| if not message.strip(): | |
| return history, "" | |
| try: | |
| response = chatbot.generate_response(message) if chatbot else "β System unavailable" | |
| history.append([message, response]) | |
| return history, "" | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| history.append([message, error_msg]) | |
| return history, "" | |
| def handle_pdf_upload(pdf_file): | |
| """Handle PDF upload""" | |
| try: | |
| if chatbot and pdf_file: | |
| return chatbot.process_pdf(pdf_file) | |
| else: | |
| return "β System unavailable or no file provided" | |
| except Exception as e: | |
| return f"β Upload error: {str(e)}" | |
| def get_system_status(): | |
| """Get detailed system status""" | |
| if not chatbot: | |
| return "β System unavailable" | |
| try: | |
| status = chatbot.get_system_status() | |
| return f"""π€ **Advanced System Status:** | |
| **AI Components:** | |
| - Google T5 Model: {status['t5_model']} | |
| - Semantic Search: {status['semantic_search']} | |
| - PDF Processor: {status['pdf_processor']} | |
| **Content:** | |
| - PDF Content: {status['pdf_content']} | |
| - Conversations: {status['conversations']} | |
| **Capabilities:** | |
| {'π― AI-Powered Responses with PDF Integration' if status['t5_model'] == 'β Available' else 'π Knowledge-Based Responses'} | |
| - Intelligent document analysis | |
| - Context-aware answers | |
| - Professional INDAS guidance""" | |
| except Exception as e: | |
| return f"β Status error: {e}" | |
| with gr.Blocks(title="INDAS AI Expert") as interface: | |
| gr.Markdown(""" | |
| # π¦ INDAS AI Expert Chatbot | |
| **π€ Google T5 Model + π Your INDAS.pdf + π§ Expert Knowledge = π― Intelligent INDAS Guidance** | |
| Advanced AI assistant with pre-loaded INDAS documentation, Google T5 model, and expert accounting knowledge. | |
| """) | |
| # Show PDF status at the top | |
| if os.path.exists("INDAS.pdf"): | |
| gr.Markdown("### β INDAS Documentation Ready - Ask specific questions about standards, examples, and implementations!") | |
| else: | |
| gr.Markdown("### π Expert Knowledge Available - Upload INDAS documents for enhanced answers!") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| chatbot_interface = gr.Chatbot( | |
| height=500, | |
| label="INDAS AI Expert", | |
| placeholder="Ask intelligent questions about INDAS or your documents..." | |
| ) | |
| msg = gr.Textbox( | |
| placeholder="Ask about INDAS standards, your PDF content, or complex accounting questions...", | |
| label="Your Question", | |
| lines=2 | |
| ) | |
| with gr.Row(): | |
| clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
| submit_btn = gr.Button("π Send", variant="primary") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Document Management") | |
| # Show status of default PDF | |
| if os.path.exists("INDAS.pdf"): | |
| gr.Markdown("β **INDAS.pdf loaded** - Ask questions about the document content!") | |
| else: | |
| gr.Markdown("π **No default document** - Upload your INDAS PDF below") | |
| pdf_upload = gr.File( | |
| label="Upload Additional PDF (Optional)", | |
| file_types=[".pdf"] | |
| ) | |
| upload_status = gr.Textbox( | |
| label="Upload Status", | |
| interactive=False, | |
| lines=3 | |
| ) | |
| gr.Markdown("### π€ System Status") | |
| status_btn = gr.Button("π Check Status") | |
| system_status = gr.Textbox( | |
| label="System Information", | |
| interactive=False, | |
| lines=8 | |
| ) | |
| # Enhanced examples | |
| gr.Markdown(""" | |
| ### π‘ Intelligent Questions You Can Ask: | |
| **π INDAS Standards:** | |
| - "What is INDAS 1 about?" | |
| - "Explain INDAS 16 property plant and equipment" | |
| - "Tell me about INDAS 115 revenue recognition" | |
| - "What is INDAS 36 impairment of assets?" | |
| - "Explain INDAS 109 financial instruments" | |
| **π Document Analysis:** | |
| - "Analyze the depreciation policy mentioned in my uploaded document" | |
| - "What specific INDAS requirements are covered in my PDF?" | |
| - "Compare my document's approach to standard INDAS 16 requirements" | |
| **π§ Complex INDAS Questions:** | |
| - "How should I implement the five-step revenue recognition model?" | |
| - "What are the detailed impairment testing procedures?" | |
| - "Explain the interaction between INDAS 109 and INDAS 115" | |
| **π― Practical Applications:** | |
| - "How to prepare for INDAS transition?" | |
| - "What are the key differences between cost model and revaluation model?" | |
| - "How to handle complex financial instruments?" | |
| """) | |
| # Advanced status display | |
| status_info = f""" | |
| ### βοΈ Advanced AI System: | |
| - **Google T5 Model**: {'β Active' if chatbot and chatbot.t5_model.available else 'π Loading/Fallback'} | |
| - **Semantic Search**: {'β Active' if chatbot and chatbot.semantic_search.available else 'π Basic Search'} | |
| - **PDF Processing**: {'β PDFMiner.six + PyPDF2' if chatbot and chatbot.pdf_processor.pdfminer and chatbot.pdf_processor.pdf_lib else 'β PyPDF2 Only' if chatbot and chatbot.pdf_processor.pdf_lib else 'π§ Built-in Fallback'} | |
| - **Response Quality**: {'π― AI-Enhanced' if chatbot and chatbot.t5_model.available else 'π Knowledge-Based'} | |
| **π AI Features**: Intelligent analysis, semantic understanding, context-aware responses | |
| """ | |
| gr.Markdown(status_info) | |
| # Event handlers | |
| msg.submit(chat_function, [msg, chatbot_interface], [chatbot_interface, msg]) | |
| submit_btn.click(chat_function, [msg, chatbot_interface], [chatbot_interface, msg]) | |
| clear_btn.click(lambda: [], outputs=[chatbot_interface]) | |
| pdf_upload.upload(handle_pdf_upload, [pdf_upload], [upload_status]) | |
| status_btn.click(get_system_status, outputs=[system_status]) | |
| return interface | |
| # Launch application | |
| if __name__ == "__main__": | |
| try: | |
| logger.info("π Creating advanced AI interface...") | |
| interface = create_interface() | |
| if interface: | |
| logger.info("π Launching INDAS AI Expert Chatbot...") | |
| interface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=True, | |
| show_error=True | |
| ) | |
| else: | |
| logger.error("β Interface creation failed") | |
| except Exception as e: | |
| logger.error(f"β Launch failed: {e}") | |
| logger.error(f"Traceback: {traceback.format_exc()}") |