|
|
|
|
|
""" |
|
|
High Capacity Input Processor |
|
|
============================ |
|
|
Handles large character count inputs and file uploads for training data generation. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import hashlib |
|
|
import mimetypes |
|
|
import asyncio |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional, Union, Generator |
|
|
from dataclasses import dataclass, asdict |
|
|
import numpy as np |
|
|
import torch |
|
|
from datetime import datetime |
|
|
|
|
|
@dataclass |
|
|
class InputChunk: |
|
|
"""Represents a chunk of input data.""" |
|
|
chunk_id: str |
|
|
content: str |
|
|
chunk_index: int |
|
|
total_chunks: int |
|
|
file_hash: str |
|
|
metadata: Dict[str, Any] |
|
|
timestamp: str |
|
|
|
|
|
@dataclass |
|
|
class FileUpload: |
|
|
"""Represents an uploaded file.""" |
|
|
file_id: str |
|
|
filename: str |
|
|
file_path: str |
|
|
file_size: int |
|
|
file_hash: str |
|
|
mime_type: str |
|
|
upload_timestamp: str |
|
|
chunks: List[InputChunk] |
|
|
|
|
|
class HighCapacityInputProcessor: |
|
|
"""Processes high character count inputs and file uploads.""" |
|
|
|
|
|
def __init__(self, |
|
|
max_chunk_size: int = 1000000, |
|
|
max_file_size: int = 100000000, |
|
|
upload_dir: str = "uploads", |
|
|
chunk_dir: str = "chunks", |
|
|
training_data_dir: str = "training_data"): |
|
|
|
|
|
self.max_chunk_size = max_chunk_size |
|
|
self.max_file_size = max_file_size |
|
|
self.upload_dir = Path(upload_dir) |
|
|
self.chunk_dir = Path(chunk_dir) |
|
|
self.training_data_dir = Path(training_data_dir) |
|
|
|
|
|
|
|
|
self.upload_dir.mkdir(exist_ok=True) |
|
|
self.chunk_dir.mkdir(exist_ok=True) |
|
|
self.training_data_dir.mkdir(exist_ok=True) |
|
|
|
|
|
|
|
|
self.supported_types = { |
|
|
'text/plain': ['.txt', '.md', '.py', '.js', '.html', '.css'], |
|
|
'application/json': ['.json', '.jsonl'], |
|
|
'text/csv': ['.csv'], |
|
|
'application/pdf': ['.pdf'], |
|
|
'application/msword': ['.doc'], |
|
|
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': ['.docx'], |
|
|
'text/xml': ['.xml'], |
|
|
'application/xml': ['.xml'], |
|
|
'text/yaml': ['.yaml', '.yml'] |
|
|
} |
|
|
|
|
|
def calculate_file_hash(self, file_path: Union[str, Path]) -> str: |
|
|
"""Calculate SHA256 hash of file.""" |
|
|
hash_sha256 = hashlib.sha256() |
|
|
with open(file_path, "rb") as f: |
|
|
for chunk in iter(lambda: f.read(4096), b""): |
|
|
hash_sha256.update(chunk) |
|
|
return hash_sha256.hexdigest() |
|
|
|
|
|
def get_file_info(self, file_path: Union[str, Path]) -> Dict[str, Any]: |
|
|
"""Get file information.""" |
|
|
path = Path(file_path) |
|
|
|
|
|
if not path.exists(): |
|
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
return { |
|
|
'filename': path.name, |
|
|
'file_size': path.stat().st_size, |
|
|
'file_hash': self.calculate_file_hash(path), |
|
|
'mime_type': mimetypes.guess_type(str(path))[0] or 'application/octet-stream', |
|
|
'extension': path.suffix.lower(), |
|
|
'created_time': datetime.fromtimestamp(path.stat().st_ctime).isoformat(), |
|
|
'modified_time': datetime.fromtimestamp(path.stat().st_mtime).isoformat() |
|
|
} |
|
|
|
|
|
def validate_file(self, file_path: Union[str, Path]) -> bool: |
|
|
"""Validate uploaded file.""" |
|
|
path = Path(file_path) |
|
|
file_info = self.get_file_info(path) |
|
|
|
|
|
|
|
|
if file_info['file_size'] > self.max_file_size: |
|
|
raise ValueError(f"File too large: {file_info['file_size']} bytes > {self.max_file_size} bytes") |
|
|
|
|
|
|
|
|
mime_type = file_info['mime_type'] |
|
|
extension = file_info['extension'] |
|
|
|
|
|
if mime_type not in self.supported_types: |
|
|
|
|
|
supported_extensions = [ext for exts in self.supported_types.values() for ext in exts] |
|
|
if extension not in supported_extensions: |
|
|
raise ValueError(f"Unsupported file type: {mime_type} ({extension})") |
|
|
|
|
|
return True |
|
|
|
|
|
def chunk_text_content(self, content: str, chunk_overlap: int = 1000) -> List[InputChunk]: |
|
|
"""Chunk text content into manageable pieces.""" |
|
|
if len(content) <= self.max_chunk_size: |
|
|
return [InputChunk( |
|
|
chunk_id=f"chunk_0", |
|
|
content=content, |
|
|
chunk_index=0, |
|
|
total_chunks=1, |
|
|
file_hash=hashlib.sha256(content.encode()).hexdigest(), |
|
|
metadata={'chunk_type': 'text', 'original_length': len(content)}, |
|
|
timestamp=datetime.now().isoformat() |
|
|
)] |
|
|
|
|
|
chunks = [] |
|
|
total_chunks = (len(content) + self.max_chunk_size - 1) // self.max_chunk_size |
|
|
content_hash = hashlib.sha256(content.encode()).hexdigest() |
|
|
|
|
|
for i in range(total_chunks): |
|
|
start_idx = i * (self.max_chunk_size - chunk_overlap) |
|
|
end_idx = min(start_idx + self.max_chunk_size, len(content)) |
|
|
|
|
|
chunk_content = content[start_idx:end_idx] |
|
|
|
|
|
chunk = InputChunk( |
|
|
chunk_id=f"chunk_{i}", |
|
|
content=chunk_content, |
|
|
chunk_index=i, |
|
|
total_chunks=total_chunks, |
|
|
file_hash=content_hash, |
|
|
metadata={ |
|
|
'chunk_type': 'text', |
|
|
'start_index': start_idx, |
|
|
'end_index': end_idx, |
|
|
'overlap': chunk_overlap if i > 0 else 0, |
|
|
'original_length': len(content) |
|
|
}, |
|
|
timestamp=datetime.now().isoformat() |
|
|
) |
|
|
|
|
|
chunks.append(chunk) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def read_file_content(self, file_path: Union[str, Path]) -> str: |
|
|
"""Read file content based on file type.""" |
|
|
path = Path(file_path) |
|
|
mime_type = mimetypes.guess_type(str(path))[0] or 'application/octet-stream' |
|
|
|
|
|
try: |
|
|
if mime_type == 'text/plain' or path.suffix in ['.txt', '.md', '.py', '.js', '.html', '.css']: |
|
|
with open(path, 'r', encoding='utf-8') as f: |
|
|
return f.read() |
|
|
|
|
|
elif mime_type == 'application/json' or path.suffix in ['.json', '.jsonl']: |
|
|
with open(path, 'r', encoding='utf-8') as f: |
|
|
content = f.read() |
|
|
|
|
|
json.loads(content) |
|
|
return content |
|
|
|
|
|
elif mime_type == 'text/csv' or path.suffix == '.csv': |
|
|
import pandas as pd |
|
|
df = pd.read_csv(path) |
|
|
return df.to_string() |
|
|
|
|
|
elif mime_type == 'application/pdf' or path.suffix == '.pdf': |
|
|
try: |
|
|
import PyPDF2 |
|
|
with open(path, 'rb') as f: |
|
|
reader = PyPDF2.PdfReader(f) |
|
|
content = "" |
|
|
for page in reader.pages: |
|
|
content += page.extract_text() + "\n" |
|
|
return content |
|
|
except ImportError: |
|
|
return f"[PDF file: {path.name} - Install PyPDF2 to extract text]" |
|
|
|
|
|
elif mime_type in ['application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document']: |
|
|
try: |
|
|
from docx import Document |
|
|
doc = Document(path) |
|
|
content = "" |
|
|
for paragraph in doc.paragraphs: |
|
|
content += paragraph.text + "\n" |
|
|
return content |
|
|
except ImportError: |
|
|
return f"[Word document: {path.name} - Install python-docx to extract text]" |
|
|
|
|
|
else: |
|
|
|
|
|
with open(path, 'r', encoding='utf-8', errors='ignore') as f: |
|
|
return f.read() |
|
|
|
|
|
except Exception as e: |
|
|
return f"[Error reading file {path.name}: {str(e)}]" |
|
|
|
|
|
def process_file_upload(self, file_path: Union[str, Path], chunk_overlap: int = 1000) -> FileUpload: |
|
|
"""Process a file upload and create chunks.""" |
|
|
path = Path(file_path) |
|
|
|
|
|
|
|
|
self.validate_file(path) |
|
|
|
|
|
|
|
|
file_info = self.get_file_info(path) |
|
|
|
|
|
|
|
|
file_id = hashlib.sha256(f"{file_info['filename']}_{file_info['file_hash']}".encode()).hexdigest()[:16] |
|
|
|
|
|
|
|
|
upload_path = self.upload_dir / f"{file_id}_{path.name}" |
|
|
import shutil |
|
|
shutil.copy2(path, upload_path) |
|
|
|
|
|
|
|
|
content = self.read_file_content(path) |
|
|
|
|
|
|
|
|
chunks = self.chunk_text_content(content, chunk_overlap) |
|
|
|
|
|
|
|
|
file_upload = FileUpload( |
|
|
file_id=file_id, |
|
|
filename=path.name, |
|
|
file_path=str(upload_path), |
|
|
file_size=file_info['file_size'], |
|
|
file_hash=file_info['file_hash'], |
|
|
mime_type=file_info['mime_type'], |
|
|
upload_timestamp=datetime.now().isoformat(), |
|
|
chunks=chunks |
|
|
) |
|
|
|
|
|
|
|
|
self.save_chunks(file_upload) |
|
|
|
|
|
return file_upload |
|
|
|
|
|
def save_chunks(self, file_upload: FileUpload): |
|
|
"""Save chunks to disk.""" |
|
|
chunk_file = self.chunk_dir / f"{file_upload.file_id}_chunks.json" |
|
|
|
|
|
with open(chunk_file, 'w', encoding='utf-8') as f: |
|
|
json.dump({ |
|
|
'file_upload': asdict(file_upload), |
|
|
'chunks': [asdict(chunk) for chunk in file_upload.chunks] |
|
|
}, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
def load_chunks(self, file_id: str) -> Optional[FileUpload]: |
|
|
"""Load chunks from disk.""" |
|
|
chunk_file = self.chunk_dir / f"{file_id}_chunks.json" |
|
|
|
|
|
if not chunk_file.exists(): |
|
|
return None |
|
|
|
|
|
with open(chunk_file, 'r', encoding='utf-8') as f: |
|
|
data = json.load(f) |
|
|
|
|
|
chunks = [InputChunk(**chunk_data) for chunk_data in data['chunks']] |
|
|
|
|
|
file_upload_data = data['file_upload'] |
|
|
file_upload_data['chunks'] = chunks |
|
|
|
|
|
return FileUpload(**file_upload_data) |
|
|
|
|
|
def get_all_uploads(self) -> List[FileUpload]: |
|
|
"""Get all uploaded files.""" |
|
|
uploads = [] |
|
|
|
|
|
for chunk_file in self.chunk_dir.glob("*_chunks.json"): |
|
|
file_id = chunk_file.stem.replace("_chunks", "") |
|
|
upload = self.load_chunks(file_id) |
|
|
if upload: |
|
|
uploads.append(upload) |
|
|
|
|
|
return uploads |
|
|
|
|
|
def create_training_data_from_chunks(self, |
|
|
file_uploads: List[FileUpload], |
|
|
output_format: str = "jsonl", |
|
|
include_metadata: bool = True) -> str: |
|
|
"""Create training data from chunks.""" |
|
|
|
|
|
output_file = self.training_data_dir / f"training_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.{output_format}" |
|
|
|
|
|
training_examples = [] |
|
|
|
|
|
for file_upload in file_uploads: |
|
|
for chunk in file_upload.chunks: |
|
|
example = { |
|
|
'content': chunk.content, |
|
|
'chunk_id': chunk.chunk_id, |
|
|
'file_id': file_upload.file_id, |
|
|
'filename': file_upload.filename, |
|
|
'chunk_index': chunk.chunk_index, |
|
|
'total_chunks': chunk.total_chunks |
|
|
} |
|
|
|
|
|
if include_metadata: |
|
|
example.update({ |
|
|
'metadata': chunk.metadata, |
|
|
'file_metadata': { |
|
|
'file_size': file_upload.file_size, |
|
|
'mime_type': file_upload.mime_type, |
|
|
'upload_timestamp': file_upload.upload_timestamp |
|
|
} |
|
|
}) |
|
|
|
|
|
training_examples.append(example) |
|
|
|
|
|
if output_format == "jsonl": |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
for example in training_examples: |
|
|
f.write(json.dumps(example, ensure_ascii=False) + '\n') |
|
|
|
|
|
elif output_format == "json": |
|
|
with open(output_file, 'w', encoding='utf-8') as f: |
|
|
json.dump(training_examples, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
return str(output_file) |
|
|
|
|
|
def process_high_capacity_input(self, |
|
|
content: str, |
|
|
chunk_overlap: int = 1000, |
|
|
save_chunks: bool = True) -> List[InputChunk]: |
|
|
"""Process high capacity text input.""" |
|
|
|
|
|
chunks = self.chunk_text_content(content, chunk_overlap) |
|
|
|
|
|
if save_chunks: |
|
|
|
|
|
temp_file_id = hashlib.sha256(content.encode()).hexdigest()[:16] |
|
|
temp_file_upload = FileUpload( |
|
|
file_id=temp_file_id, |
|
|
filename="high_capacity_input.txt", |
|
|
file_path="", |
|
|
file_size=len(content), |
|
|
file_hash=hashlib.sha256(content.encode()).hexdigest(), |
|
|
mime_type="text/plain", |
|
|
upload_timestamp=datetime.now().isoformat(), |
|
|
chunks=chunks |
|
|
) |
|
|
self.save_chunks(temp_file_upload) |
|
|
|
|
|
return chunks |
|
|
|
|
|
def get_processing_stats(self) -> Dict[str, Any]: |
|
|
"""Get processing statistics.""" |
|
|
uploads = self.get_all_uploads() |
|
|
|
|
|
total_files = len(uploads) |
|
|
total_chunks = sum(len(upload.chunks) for upload in uploads) |
|
|
total_size = sum(upload.file_size for upload in uploads) |
|
|
|
|
|
file_types = {} |
|
|
for upload in uploads: |
|
|
mime_type = upload.mime_type |
|
|
file_types[mime_type] = file_types.get(mime_type, 0) + 1 |
|
|
|
|
|
return { |
|
|
'total_files': total_files, |
|
|
'total_chunks': total_chunks, |
|
|
'total_size_bytes': total_size, |
|
|
'total_size_mb': total_size / (1024 * 1024), |
|
|
'file_types': file_types, |
|
|
'upload_directory': str(self.upload_dir), |
|
|
'chunk_directory': str(self.chunk_dir), |
|
|
'training_data_directory': str(self.training_data_dir) |
|
|
} |
|
|
|
|
|
def main(): |
|
|
"""Demo the high capacity input processor.""" |
|
|
|
|
|
print("๐ High Capacity Input Processor Demo") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
processor = HighCapacityInputProcessor() |
|
|
|
|
|
|
|
|
print("\n๐ Demo 1: High Capacity Text Input") |
|
|
large_text = "This is a large text input. " * 50000 |
|
|
|
|
|
chunks = processor.process_high_capacity_input(large_text) |
|
|
print(f" Input length: {len(large_text):,} characters") |
|
|
print(f" Generated chunks: {len(chunks)}") |
|
|
print(f" Chunk sizes: {[len(chunk.content) for chunk in chunks[:3]]}...") |
|
|
|
|
|
|
|
|
print("\n๐ Demo 2: Processing Statistics") |
|
|
stats = processor.get_processing_stats() |
|
|
print(f" Total files: {stats['total_files']}") |
|
|
print(f" Total chunks: {stats['total_chunks']}") |
|
|
print(f" Total size: {stats['total_size_mb']:.2f} MB") |
|
|
|
|
|
print(f"\nโ
High Capacity Input Processor ready!") |
|
|
print(f" Upload directory: {processor.upload_dir}") |
|
|
print(f" Chunk directory: {processor.chunk_dir}") |
|
|
print(f" Training data directory: {processor.training_data_dir}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|