| """
|
| Binary File Processor for HenAi
|
| Extracts metadata, text, and structured data from various binary file formats
|
| - Multi-backend audio processing with fallbacks
|
| - OCR using EasyOCR (no external dependencies)
|
| - Comprehensive file type support
|
| """
|
|
|
| import io
|
| import os
|
| import tempfile
|
| from typing import Dict, Any, Optional, Tuple
|
|
|
|
|
|
|
|
|
| try:
|
| from PIL import Image, ImageOps, ImageEnhance
|
| PIL_AVAILABLE = True
|
| except ImportError:
|
| PIL_AVAILABLE = False
|
| print("Warning: PIL/Pillow not available. Install with: pip install Pillow")
|
|
|
| try:
|
| import exifread
|
| EXIF_AVAILABLE = True
|
| except ImportError:
|
| EXIF_AVAILABLE = False
|
|
|
|
|
| try:
|
| import easyocr
|
| EASYOCR_AVAILABLE = True
|
| _easyocr_reader = None
|
| except ImportError:
|
| EASYOCR_AVAILABLE = False
|
| print("Warning: EasyOCR not available. Install with: pip install easyocr")
|
|
|
|
|
| try:
|
| from pydub import AudioSegment
|
| PYDUB_AVAILABLE = True
|
| except ImportError:
|
| PYDUB_AVAILABLE = False
|
|
|
| try:
|
| import speech_recognition as sr
|
| SPEECH_RECOGNITION_AVAILABLE = True
|
| except ImportError:
|
| SPEECH_RECOGNITION_AVAILABLE = False
|
|
|
| try:
|
| import mutagen
|
| MUTAGEN_AVAILABLE = True
|
| except ImportError:
|
| MUTAGEN_AVAILABLE = False
|
|
|
| try:
|
| import audioread
|
| AUDIOREAD_AVAILABLE = True
|
| except ImportError:
|
| AUDIOREAD_AVAILABLE = False
|
|
|
| try:
|
| import librosa
|
| LIBROSA_AVAILABLE = True
|
| except ImportError:
|
| LIBROSA_AVAILABLE = False
|
|
|
|
|
| try:
|
| import pdfplumber
|
| PDFPLUMBER_AVAILABLE = True
|
| except ImportError:
|
| PDFPLUMBER_AVAILABLE = False
|
|
|
|
|
| try:
|
| import pandas as pd
|
| PANDAS_AVAILABLE = True
|
| except ImportError:
|
| PANDAS_AVAILABLE = False
|
|
|
| try:
|
| import openpyxl
|
| OPENPYXL_AVAILABLE = True
|
| except ImportError:
|
| OPENPYXL_AVAILABLE = False
|
|
|
|
|
| try:
|
| from docx import Document
|
| DOCX_AVAILABLE = True
|
| except ImportError:
|
| DOCX_AVAILABLE = False
|
|
|
| try:
|
| from pptx import Presentation
|
| PPTX_AVAILABLE = True
|
| except ImportError:
|
| PPTX_AVAILABLE = False
|
|
|
|
|
| try:
|
| import zipfile
|
| import tarfile
|
| ARCHIVE_AVAILABLE = True
|
| except ImportError:
|
| ARCHIVE_AVAILABLE = False
|
|
|
|
|
| try:
|
| import chardet
|
| CHARDET_AVAILABLE = True
|
| except ImportError:
|
| CHARDET_AVAILABLE = False
|
|
|
|
|
| try:
|
| import whisper
|
| WHISPER_AVAILABLE = True
|
| except ImportError:
|
| WHISPER_AVAILABLE = False
|
|
|
|
|
| try:
|
| import cv2
|
| import numpy as np
|
| CV2_AVAILABLE = True
|
| except ImportError:
|
| CV2_AVAILABLE = False
|
|
|
|
|
| def get_easyocr_reader():
|
| """Lazy initialization of EasyOCR reader"""
|
| global _easyocr_reader
|
| if _easyocr_reader is None and EASYOCR_AVAILABLE:
|
| try:
|
|
|
| _easyocr_reader = easyocr.Reader(['en'], gpu=False)
|
| print("EasyOCR initialized successfully")
|
| except Exception as e:
|
| print(f"Failed to initialize EasyOCR: {e}")
|
| return _easyocr_reader
|
|
|
|
|
| class BinaryProcessor:
|
| """Main processor for all binary file types"""
|
|
|
| def __init__(self):
|
| self.initialize_handlers()
|
|
|
| def initialize_handlers(self):
|
| """Initialize all format-specific handlers"""
|
| self.handlers = {
|
| 'image': self.process_image,
|
| 'audio': self.process_audio,
|
| 'video': self.process_video,
|
| 'pdf': self.process_pdf,
|
| 'spreadsheet': self.process_spreadsheet,
|
| 'word': self.process_word_document,
|
| 'presentation': self.process_presentation,
|
| 'archive': self.process_archive,
|
| 'database': self.process_database,
|
| 'text': self.process_text_file,
|
| }
|
|
|
| def process_file(self, file_content: bytes, filename: str) -> str:
|
| """
|
| Main entry point - processes any file and returns formatted text for AI
|
| """
|
| file_ext = filename.split('.')[-1].lower() if '.' in filename else ''
|
|
|
|
|
| output = f"\n\n--- FILE: {filename} ---\n"
|
| output += f"Size: {len(file_content)} bytes\n"
|
|
|
|
|
| if file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'ico']:
|
| output += self.process_image(file_content, filename)
|
| elif file_ext in ['mp3', 'wav', 'ogg', 'flac', 'm4a', 'aac', 'wma', 'opus']:
|
| output += self.process_audio(file_content, filename)
|
| elif file_ext in ['mp4', 'avi', 'mov', 'mkv', 'webm', 'flv', 'wmv']:
|
| output += self.process_video(file_content, filename)
|
| elif file_ext == 'pdf':
|
| output += self.process_pdf(file_content, filename)
|
| elif file_ext in ['xlsx', 'xls', 'csv', 'xlsm', 'xlsb']:
|
| output += self.process_spreadsheet(file_content, filename)
|
| elif file_ext in ['docx', 'doc', 'odt']:
|
| output += self.process_word_document(file_content, filename)
|
| elif file_ext in ['pptx', 'ppt', 'odp']:
|
| output += self.process_presentation(file_content, filename)
|
| elif file_ext in ['zip', 'rar', '7z', 'tar', 'gz', 'bz2', 'xz']:
|
| output += self.process_archive(file_content, filename)
|
| elif file_ext in ['db', 'sqlite', 'sqlite3', 'db3']:
|
| output += self.process_database(file_content, filename)
|
| elif file_ext in ['txt', 'md', 'py', 'js', 'html', 'css', 'json', 'xml',
|
| 'java', 'c', 'cpp', 'h', 'hpp', 'rb', 'php', 'go', 'rs',
|
| 'swift', 'kt', 'ts', 'jsx', 'tsx', 'vue']:
|
| output += self.process_text_file(file_content, filename)
|
| else:
|
|
|
| text_result = self.try_extract_text(file_content)
|
| if text_result:
|
| output += f"\n--- EXTRACTED TEXT ---\n{text_result}\n--- END TEXT ---\n"
|
| else:
|
| output += f"\n[Binary file: {filename}]\n"
|
| output += "No further extraction available for this file type.\n"
|
|
|
| output += "--- END FILE ---\n\n"
|
| return output
|
|
|
| def process_image(self, content: bytes, filename: str) -> str:
|
| """Extract image metadata and perform OCR using EasyOCR (no external dependencies)"""
|
| output = "\n--- IMAGE ANALYSIS ---\n"
|
|
|
| if not PIL_AVAILABLE:
|
| output += "β Image processing not available (Pillow not installed)\n"
|
| output += "Install with: pip install Pillow\n"
|
| output += "--- END IMAGE ANALYSIS ---\n"
|
| return output
|
|
|
| try:
|
| img = Image.open(io.BytesIO(content))
|
| output += f"π Dimensions: {img.width}x{img.height}\n"
|
| output += f"π¨ Format: {img.format}\n"
|
| output += f"πΌοΈ Mode: {img.mode}\n"
|
|
|
|
|
| if EXIF_AVAILABLE:
|
| try:
|
| with io.BytesIO(content) as f:
|
| tags = exifread.process_file(f)
|
| if tags:
|
| output += "\nπ· EXIF DATA:\n"
|
| for tag, value in list(tags.items())[:10]:
|
| output += f" β’ {tag}: {value}\n"
|
| except:
|
| pass
|
|
|
|
|
| if EASYOCR_AVAILABLE:
|
| try:
|
| reader = get_easyocr_reader()
|
| if reader:
|
|
|
| if img.width > 1500 or img.height > 1500:
|
| img.thumbnail((1500, 1500))
|
| output += f"\nπ Image scaled for OCR\n"
|
|
|
|
|
| import numpy as np
|
| img_array = np.array(img)
|
|
|
|
|
| output += "\nπ OCR PROCESSING:\n"
|
| results = reader.readtext(img_array)
|
|
|
| if results:
|
| extracted_text = []
|
| high_confidence_text = []
|
|
|
| for (bbox, text, confidence) in results:
|
| if confidence > 0.5:
|
| high_confidence_text.append(text)
|
| extracted_text.append(text)
|
|
|
| if high_confidence_text:
|
| full_text = ' '.join(high_confidence_text)
|
| output += f"β
Extracted {len(full_text)} characters (high confidence)\n"
|
| output += f"\nπ EXTRACTED TEXT:\n{full_text.strip()}\n"
|
| elif extracted_text:
|
| full_text = ' '.join(extracted_text)
|
| output += f"β οΈ Extracted {len(full_text)} characters (low confidence)\n"
|
| output += f"\nπ EXTRACTED TEXT:\n{full_text.strip()}\n"
|
| else:
|
| output += "β No readable text detected in image\n"
|
| else:
|
| output += "β No text detected in the image\n"
|
| except Exception as e:
|
| output += f"\nβ οΈ OCR processing error: {str(e)}\n"
|
| output += "Make sure EasyOCR is installed: pip install easyocr\n"
|
| else:
|
| output += "\nβ οΈ EasyOCR not installed. Install with: pip install easyocr\n"
|
| output += "This will enable text extraction from images without external dependencies.\n"
|
|
|
| output += "--- END IMAGE ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing image: {str(e)}\n"
|
|
|
| return output
|
|
|
| def extract_ocr_text(self, image_content: bytes, filename: str) -> str:
|
| """
|
| Extract only OCR text from an image without all the metadata
|
| """
|
| if not EASYOCR_AVAILABLE:
|
| return "[EasyOCR not installed. Install with: pip install easyocr]"
|
|
|
| try:
|
| from PIL import Image
|
| import numpy as np
|
|
|
| img = Image.open(io.BytesIO(image_content))
|
|
|
|
|
| if img.width > 1500 or img.height > 1500:
|
| img.thumbnail((1500, 1500))
|
|
|
| img_array = np.array(img)
|
| reader = get_easyocr_reader()
|
|
|
| if reader:
|
| results = reader.readtext(img_array)
|
| if results:
|
| extracted_text = []
|
| for (bbox, text, confidence) in results:
|
| if confidence > 0.3:
|
| extracted_text.append(text)
|
|
|
| if extracted_text:
|
| return ' '.join(extracted_text)
|
|
|
| return ""
|
| except Exception as e:
|
| print(f"OCR extraction error: {e}")
|
| return f"[OCR error: {str(e)}]"
|
|
|
| def process_audio(self, content: bytes, filename: str) -> str:
|
| """
|
| Extract audio metadata and transcribe speech with multiple fallback methods
|
| Tries: 1. Mutagen (metadata) β 2. Audioread (info) β 3. Pydub (properties) β 4. Whisper (transcription) β 5. SpeechRecognition
|
| """
|
| output = "\n--- AUDIO ANALYSIS ---\n"
|
| output += f"π΅ File: {filename}\n"
|
| output += f"π¦ Size: {len(content)} bytes\n"
|
|
|
| temp_file_path = None
|
| try:
|
|
|
| with tempfile.NamedTemporaryFile(suffix='.' + filename.split('.')[-1], delete=False) as tmp:
|
| tmp.write(content)
|
| tmp.flush()
|
| temp_file_path = tmp.name
|
|
|
|
|
| if MUTAGEN_AVAILABLE:
|
| try:
|
| audio_file = mutagen.File(temp_file_path)
|
| if audio_file:
|
| output += "\nπ METADATA (Mutagen):\n"
|
|
|
|
|
| if hasattr(audio_file, 'info'):
|
| info = audio_file.info
|
| if hasattr(info, 'length'):
|
| minutes = int(info.length // 60)
|
| seconds = int(info.length % 60)
|
| output += f" β’ Duration: {minutes}:{seconds:02d} ({info.length:.2f} seconds)\n"
|
| if hasattr(info, 'bitrate'):
|
| output += f" β’ Bitrate: {info.bitrate} bps\n"
|
| if hasattr(info, 'sample_rate'):
|
| output += f" β’ Sample Rate: {info.sample_rate} Hz\n"
|
| if hasattr(info, 'channels'):
|
| output += f" β’ Channels: {info.channels}\n"
|
|
|
|
|
| if hasattr(audio_file, 'tags') and audio_file.tags:
|
| output += "\nπ·οΈ TAGS:\n"
|
| for key, value in list(audio_file.tags.items())[:15]:
|
| output += f" β’ {key}: {value}\n"
|
| except Exception as e:
|
| output += f"\nβ οΈ Mutagen metadata extraction failed: {str(e)}\n"
|
|
|
|
|
| if AUDIOREAD_AVAILABLE and not (MUTAGEN_AVAILABLE and 'Duration' in output):
|
| try:
|
| with audioread.audio_open(temp_file_path) as f:
|
| output += "\nπ AUDIO INFO (Audioread):\n"
|
| duration = f.duration
|
| minutes = int(duration // 60)
|
| seconds = int(duration % 60)
|
| output += f" β’ Duration: {minutes}:{seconds:02d} ({duration:.2f} seconds)\n"
|
| output += f" β’ Sample Rate: {f.samplerate} Hz\n"
|
| output += f" β’ Channels: {f.channels}\n"
|
| if hasattr(f, 'bitrate'):
|
| output += f" β’ Bitrate: {f.bitrate} bps\n"
|
| except Exception as e:
|
| output += f"\nβ οΈ Audioread info extraction failed: {str(e)}\n"
|
|
|
|
|
| if PYDUB_AVAILABLE:
|
| try:
|
| audio = AudioSegment.from_file(temp_file_path)
|
| duration = len(audio) / 1000
|
| minutes = int(duration // 60)
|
| seconds = int(duration % 60)
|
| output += "\nποΈ AUDIO PROPERTIES (Pydub):\n"
|
| output += f" β’ Duration: {minutes}:{seconds:02d} ({duration:.2f} seconds)\n"
|
| output += f" β’ Channels: {audio.channels}\n"
|
| output += f" β’ Frame Rate: {audio.frame_rate} Hz\n"
|
| output += f" β’ Sample Width: {audio.sample_width} bytes\n"
|
| output += f" β’ Max Amplitude: {audio.max}\n"
|
| output += f" β’ RMS: {audio.rms:.2f}\n"
|
| except Exception as e:
|
| output += f"\nβ οΈ Pydub processing failed: {str(e)}\n"
|
|
|
|
|
| if WHISPER_AVAILABLE:
|
| try:
|
| output += "\nποΈ WHISPER TRANSCRIPTION (Offline):\n"
|
| output += "Loading Whisper model (first time may take a moment)...\n"
|
| model = whisper.load_model("base")
|
| result = model.transcribe(temp_file_path, language="en")
|
| if result and result.get("text"):
|
| transcript = result["text"].strip()
|
| output += f"β
Transcription complete!\n"
|
| output += f"\nπ TRANSCRIPT:\n{transcript}\n"
|
| else:
|
| output += "β No speech detected\n"
|
| except Exception as e:
|
| output += f"β οΈ Whisper transcription failed: {str(e)}\n"
|
| output += "Install Whisper: pip install openai-whisper torch\n"
|
|
|
|
|
| elif SPEECH_RECOGNITION_AVAILABLE and not WHISPER_AVAILABLE:
|
| try:
|
|
|
| if PYDUB_AVAILABLE:
|
| try:
|
| audio = AudioSegment.from_file(temp_file_path)
|
| wav_io = io.BytesIO()
|
| audio.export(wav_io, format="wav")
|
| wav_io.seek(0)
|
| audio_source = wav_io
|
| except:
|
| audio_source = temp_file_path
|
| else:
|
| audio_source = temp_file_path
|
|
|
| recognizer = sr.Recognizer()
|
| with sr.AudioFile(audio_source) as source:
|
| output += "\nποΈ SPEECH RECOGNITION (Google):\n"
|
| recognizer.adjust_for_ambient_noise(source, duration=0.5)
|
| audio_data = recognizer.record(source, duration=30)
|
|
|
| try:
|
| transcript = recognizer.recognize_google(audio_data)
|
| if transcript and transcript.strip():
|
| output += f"β
Transcription complete!\n"
|
| output += f"\nπ TRANSCRIPT:\n{transcript.strip()}\n"
|
| else:
|
| output += "β No speech detected\n"
|
| except sr.UnknownValueError:
|
| output += "β Could not understand audio\n"
|
| except sr.RequestError as e:
|
| output += f"β οΈ Google Speech Recognition error: {str(e)}\n"
|
| except Exception as e:
|
| output += f"\nβ οΈ Speech recognition failed: {str(e)}\n"
|
| output += "Install SpeechRecognition: pip install SpeechRecognition\n"
|
|
|
|
|
| if LIBROSA_AVAILABLE:
|
| try:
|
| import numpy as np
|
| y, sr_lib = librosa.load(temp_file_path, sr=None, duration=30)
|
| output += "\n㪠AUDIO ANALYSIS (Librosa):\n"
|
| output += f" β’ RMS Energy: {np.mean(librosa.feature.rms(y=y)):.4f}\n"
|
| output += f" β’ Zero Crossing Rate: {np.mean(librosa.feature.zero_crossing_rate(y)):.4f}\n"
|
| try:
|
| tempo, _ = librosa.beat.beat_track(y=y, sr=sr_lib)
|
| output += f" β’ Estimated Tempo: {tempo:.2f} BPM\n"
|
| except:
|
| pass
|
| except Exception as e:
|
| pass
|
|
|
|
|
| output += "\nπ PROCESSING SUMMARY:\n"
|
| success_count = 0
|
| if MUTAGEN_AVAILABLE and 'METADATA' in output:
|
| output += " β Metadata extracted (Mutagen)\n"
|
| success_count += 1
|
| if AUDIOREAD_AVAILABLE and 'Audioread' in output:
|
| output += " β Basic info extracted (Audioread)\n"
|
| success_count += 1
|
| if PYDUB_AVAILABLE and 'Pydub' in output:
|
| output += " β Audio properties analyzed (Pydub)\n"
|
| success_count += 1
|
| if WHISPER_AVAILABLE and 'TRANSCRIPT' in output:
|
| output += " β Speech transcribed (Whisper)\n"
|
| success_count += 1
|
| elif SPEECH_RECOGNITION_AVAILABLE and 'TRANSCRIPT' in output:
|
| output += " β Speech transcribed (Google)\n"
|
| success_count += 1
|
|
|
| if success_count == 0:
|
| output += " β οΈ Limited information available. Install additional packages:\n"
|
| output += " β’ pip install mutagen audioread (for metadata)\n"
|
| output += " β’ pip install openai-whisper torch (for transcription)\n"
|
| output += " β’ pip install pydub (for audio properties)\n"
|
|
|
| except Exception as e:
|
| output += f"\nβ Critical error processing audio file: {str(e)}\n"
|
| finally:
|
|
|
| if temp_file_path and os.path.exists(temp_file_path):
|
| try:
|
| os.unlink(temp_file_path)
|
| except:
|
| pass
|
|
|
| output += "--- END AUDIO ANALYSIS ---\n"
|
| return output
|
|
|
| def process_video(self, content: bytes, filename: str) -> str:
|
| """Extract video metadata using multiple methods"""
|
| output = "\n--- VIDEO ANALYSIS ---\n"
|
| output += f"π¬ File: {filename}\n"
|
| output += f"π¦ Size: {len(content)} bytes\n"
|
|
|
| temp_file_path = None
|
| try:
|
| with tempfile.NamedTemporaryFile(suffix='.' + filename.split('.')[-1], delete=False) as tmp:
|
| tmp.write(content)
|
| tmp.flush()
|
| temp_file_path = tmp.name
|
|
|
|
|
| if CV2_AVAILABLE:
|
| try:
|
| import numpy as np
|
| cap = cv2.VideoCapture(temp_file_path)
|
| if cap.isOpened():
|
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
| fps = cap.get(cv2.CAP_PROP_FPS)
|
| frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
| duration = frame_count / fps if fps > 0 else 0
|
|
|
| output += "\nπ₯ VIDEO PROPERTIES (OpenCV):\n"
|
| output += f" β’ Resolution: {width}x{height}\n"
|
| output += f" β’ FPS: {fps:.2f}\n"
|
| output += f" β’ Frame Count: {frame_count}\n"
|
| minutes = int(duration // 60)
|
| seconds = int(duration % 60)
|
| output += f" β’ Duration: {minutes}:{seconds:02d} ({duration:.2f} seconds)\n"
|
| cap.release()
|
| except Exception as e:
|
| output += f"\nβ οΈ OpenCV processing failed: {str(e)}\n"
|
|
|
|
|
| try:
|
| from moviepy.editor import VideoFileClip
|
| clip = VideoFileClip(temp_file_path)
|
| output += "\nποΈ VIDEO PROPERTIES (MoviePy):\n"
|
| output += f" β’ Duration: {clip.duration:.2f} seconds\n"
|
| output += f" β’ FPS: {clip.fps}\n"
|
| output += f" β’ Size: {clip.size}\n"
|
| output += f" β’ Has Audio: {clip.audio is not None}\n"
|
| clip.close()
|
| except ImportError:
|
| pass
|
| except Exception as e:
|
| output += f"\nβ οΈ MoviePy processing failed: {str(e)}\n"
|
|
|
| except Exception as e:
|
| output += f"\nβ Error processing video: {str(e)}\n"
|
| finally:
|
| if temp_file_path and os.path.exists(temp_file_path):
|
| try:
|
| os.unlink(temp_file_path)
|
| except:
|
| pass
|
|
|
| output += "--- END VIDEO ANALYSIS ---\n"
|
| return output
|
|
|
| def process_pdf(self, content: bytes, filename: str) -> str:
|
| """Extract text, tables, and metadata from PDFs - FULL CONTENT"""
|
| output = "\n--- PDF ANALYSIS ---\n"
|
|
|
| if not PDFPLUMBER_AVAILABLE:
|
| output += "β PDF processing not available (pdfplumber not installed)\n"
|
| output += "Install with: pip install pdfplumber\n"
|
| output += "--- END PDF ANALYSIS ---\n"
|
| return output
|
|
|
| try:
|
| with pdfplumber.open(io.BytesIO(content)) as pdf:
|
| total_pages = len(pdf.pages)
|
| output += f"π Pages: {total_pages}\n"
|
|
|
|
|
| if pdf.metadata:
|
| output += "\nπ METADATA:\n"
|
| for key, value in pdf.metadata.items():
|
| if value:
|
| output += f" β’ {key}: {value}\n"
|
|
|
|
|
| full_text = ""
|
| for i, page in enumerate(pdf.pages):
|
| page_text = page.extract_text()
|
| if page_text:
|
| full_text += f"\n--- PAGE {i+1} ---\n{page_text}\n"
|
|
|
| if full_text:
|
|
|
|
|
| if len(full_text) > 500000:
|
| full_text += f"\n\n[Note: Full PDF content extracted ({len(full_text)} characters).]"
|
| output += f"\nπ FULL TEXT CONTENT ({len(full_text)} characters):\n{full_text}\n"
|
|
|
| output += "--- END PDF ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing PDF: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_spreadsheet(self, content: bytes, filename: str) -> str:
|
| """Extract data from Excel spreadsheets - FULL CONTENT"""
|
| output = "\n--- SPREADSHEET ANALYSIS ---\n"
|
|
|
| if not PANDAS_AVAILABLE:
|
| output += "β Spreadsheet processing not available (pandas not installed)\n"
|
| output += "Install with: pip install pandas openpyxl\n"
|
| output += "--- END SPREADSHEET ANALYSIS ---\n"
|
| return output
|
|
|
| try:
|
|
|
| df_dict = pd.read_excel(io.BytesIO(content), sheet_name=None)
|
| output += f"π Sheets: {', '.join(list(df_dict.keys()))}\n"
|
|
|
|
|
| for sheet_name, df in df_dict.items():
|
| output += f"\nπ SHEET: {sheet_name}\n"
|
| output += f" β’ Dimensions: {df.shape[0]} rows x {df.shape[1]} columns\n"
|
| output += f" β’ Columns: {', '.join(df.columns.astype(str)[:30])}\n"
|
|
|
|
|
| if df.shape[0] <= 1000:
|
| full_data = df.to_string()
|
| output += f"\n FULL DATA:\n{full_data}\n"
|
| else:
|
|
|
| sample = df.head(500).to_string()
|
| output += f"\n DATA (first 500 rows of {df.shape[0]}):\n{sample}\n"
|
| output += f"\n ... and {df.shape[0] - 500} more rows\n"
|
|
|
|
|
| numeric_cols = df.select_dtypes(include=['number']).columns
|
| if len(numeric_cols) > 0:
|
| output += f"\n Numeric summary:\n"
|
| output += df[numeric_cols].describe().to_string()
|
| output += "\n"
|
|
|
| output += "--- END SPREADSHEET ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing spreadsheet: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_word_document(self, content: bytes, filename: str) -> str:
|
| """Extract text from Word documents - FULL CONTENT"""
|
| output = "\n--- WORD DOCUMENT ANALYSIS ---\n"
|
|
|
| if not DOCX_AVAILABLE:
|
| output += "β Word document processing not available (python-docx not installed)\n"
|
| output += "Install with: pip install python-docx\n"
|
| output += "--- END WORD DOCUMENT ANALYSIS ---\n"
|
| return output
|
|
|
| try:
|
| doc = Document(io.BytesIO(content))
|
| output += f"π Paragraphs: {len(doc.paragraphs)}\n"
|
|
|
|
|
| text = '\n'.join([p.text for p in doc.paragraphs if p.text.strip()])
|
| if text:
|
|
|
| if len(text) > 500000:
|
| text += f"\n\n[Note: Full document content extracted ({len(text)} characters).]"
|
| output += f"\nπ FULL TEXT CONTENT ({len(text)} characters):\n{text}\n"
|
|
|
|
|
| if doc.tables:
|
| output += f"\nπ Tables found: {len(doc.tables)}\n"
|
| for table_idx, table in enumerate(doc.tables):
|
| output += f"\n--- TABLE {table_idx + 1} ---\n"
|
| for row in table.rows:
|
| row_text = ' | '.join([cell.text for cell in row.cells])
|
| output += f"{row_text}\n"
|
|
|
| output += "--- END WORD DOCUMENT ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing Word document: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_presentation(self, content: bytes, filename: str) -> str:
|
| """Extract content from PowerPoint presentations"""
|
| output = "\n--- PRESENTATION ANALYSIS ---\n"
|
|
|
| if not PPTX_AVAILABLE:
|
| output += "β PowerPoint processing not available (python-pptx not installed)\n"
|
| output += "Install with: pip install python-pptx\n"
|
| output += "--- END PRESENTATION ANALYSIS ---\n"
|
| return output
|
|
|
| try:
|
| prs = Presentation(io.BytesIO(content))
|
| output += f"π½οΈ Slides: {len(prs.slides)}\n"
|
|
|
| slide_text = []
|
| for i, slide in enumerate(prs.slides[:10]):
|
| slide_content = []
|
| for shape in slide.shapes:
|
| if hasattr(shape, "text") and shape.text.strip():
|
| slide_content.append(shape.text)
|
| if slide_content:
|
| slide_text.append(f"\n--- SLIDE {i+1} ---\n" + '\n'.join(slide_content))
|
|
|
| if slide_text:
|
| full_text = ''.join(slide_text)
|
| if len(full_text) > 10000:
|
| full_text = full_text[:10000] + "\n\n[Content truncated...]"
|
| output += f"\nπ TEXT CONTENT:\n{full_text}\n"
|
|
|
| output += "--- END PRESENTATION ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing presentation: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_archive(self, content: bytes, filename: str) -> str:
|
| """List archive contents"""
|
| output = "\n--- ARCHIVE ANALYSIS ---\n"
|
|
|
| try:
|
| file_ext = filename.split('.')[-1].lower()
|
|
|
| if file_ext == 'zip':
|
| with zipfile.ZipFile(io.BytesIO(content)) as zf:
|
| files = zf.namelist()
|
| output += f"π¦ Total files: {len(files)}\n"
|
| output += "\nπ FILE LIST:\n"
|
| for f in files[:50]:
|
| info = zf.getinfo(f)
|
| size = info.file_size
|
| output += f" β’ {f} ({size:,} bytes)\n"
|
| if len(files) > 50:
|
| output += f" ... and {len(files) - 50} more files\n"
|
| elif file_ext in ['tar', 'gz', 'bz2']:
|
| with tarfile.open(fileobj=io.BytesIO(content), mode='r:*') as tf:
|
| files = tf.getnames()
|
| output += f"π¦ Total files: {len(files)}\n"
|
| output += "\nπ FILE LIST:\n"
|
| for f in files[:50]:
|
| output += f" β’ {f}\n"
|
| if len(files) > 50:
|
| output += f" ... and {len(files) - 50} more files\n"
|
| else:
|
| output += f"Archive format {file_ext} - size: {len(content)} bytes\n"
|
| output += "For full archive support, install: pip install patool\n"
|
|
|
| output += "--- END ARCHIVE ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing archive: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_database(self, content: bytes, filename: str) -> str:
|
| """Analyze SQLite databases"""
|
| output = "\n--- DATABASE ANALYSIS ---\n"
|
|
|
| try:
|
| import sqlite3
|
| with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as tmp:
|
| tmp.write(content)
|
| tmp.flush()
|
| tmp_path = tmp.name
|
|
|
| try:
|
| conn = sqlite3.connect(tmp_path)
|
| cursor = conn.cursor()
|
|
|
|
|
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
| tables = cursor.fetchall()
|
|
|
| output += f"ποΈ Tables: {len(tables)}\n\n"
|
|
|
| for table in tables[:20]:
|
| table_name = table[0]
|
| cursor.execute(f"PRAGMA table_info({table_name})")
|
| columns = cursor.fetchall()
|
|
|
| output += f"π TABLE: {table_name}\n"
|
| output += f" β’ Columns: {len(columns)}\n"
|
| for col in columns[:15]:
|
| output += f" - {col[1]} ({col[2]})\n"
|
|
|
|
|
| cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
| row_count = cursor.fetchone()[0]
|
| output += f" β’ Rows: {row_count:,}\n"
|
|
|
|
|
| if row_count > 0:
|
| cursor.execute(f"SELECT * FROM {table_name} LIMIT 3")
|
| sample = cursor.fetchall()
|
| output += f"\n Sample rows:\n"
|
| for row in sample[:3]:
|
| output += f" {row}\n"
|
| output += "\n"
|
|
|
| conn.close()
|
| finally:
|
| os.unlink(tmp_path)
|
|
|
| output += "--- END DATABASE ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing database: {str(e)}\n"
|
|
|
| return output
|
|
|
| def process_text_file(self, content: bytes, filename: str) -> str:
|
| """Enhanced text file processing with encoding detection"""
|
| output = "\n--- TEXT FILE ANALYSIS ---\n"
|
|
|
| try:
|
|
|
| if CHARDET_AVAILABLE:
|
| detection = chardet.detect(content)
|
| encoding = detection.get('encoding', 'utf-8')
|
| confidence = detection.get('confidence', 0)
|
| output += f"π€ Encoding: {encoding} (confidence: {confidence:.2%})\n"
|
| else:
|
| encoding = 'utf-8'
|
| output += "π€ Encoding detection not available (install chardet)\n"
|
|
|
|
|
| text = content.decode(encoding, errors='replace')
|
| lines = text.split('\n')
|
| output += f"π Lines: {len(lines):,}\n"
|
| output += f"π Characters: {len(text):,}\n"
|
|
|
|
|
| output += f"\nπ SAMPLE CONTENT:\n"
|
| sample_lines = lines[:100]
|
| output += '\n'.join(sample_lines)
|
| if len(lines) > 100:
|
| output += f"\n... and {len(lines) - 100} more lines\n"
|
|
|
| output += "--- END TEXT FILE ANALYSIS ---\n"
|
|
|
| except Exception as e:
|
| output += f"β Error processing text file: {str(e)}\n"
|
|
|
| return output
|
|
|
| def try_extract_text(self, content: bytes) -> Optional[str]:
|
| """Attempt to extract text from unknown file types"""
|
| try:
|
|
|
| text = content.decode('utf-8', errors='replace')
|
|
|
| printable_chars = sum(1 for c in text if c.isprintable() or c in '\n\r\t')
|
| if printable_chars / len(text) > 0.7 and len(text) > 100:
|
|
|
| text = ''.join(char for char in text if char.isprintable() or char in '\n\r\t')
|
| return text[:5000]
|
|
|
|
|
| if CHARDET_AVAILABLE:
|
| detection = chardet.detect(content)
|
| if detection['encoding']:
|
| text = content.decode(detection['encoding'], errors='replace')
|
| if len(text) > 100:
|
| return text[:5000]
|
| except:
|
| pass
|
|
|
| return None |