|
|
|
|
|
""" |
|
|
Helper utility functions |
|
|
""" |
|
|
import re |
|
|
import hashlib |
|
|
import logging |
|
|
from typing import Dict, List, Any, Optional |
|
|
from datetime import datetime |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
def sanitize_text(text: str) -> str: |
|
|
"""Sanitize text input for processing""" |
|
|
if not text: |
|
|
return "" |
|
|
|
|
|
|
|
|
text = re.sub(r'\s+', ' ', text.strip()) |
|
|
|
|
|
|
|
|
text = re.sub(r'[^\w\s\.\,\!\?\;\:\-\(\)\'\"]', '', text) |
|
|
|
|
|
return text |
|
|
|
|
|
def validate_hex_color(color: str) -> bool: |
|
|
"""Validate hex color format""" |
|
|
if not color: |
|
|
return False |
|
|
|
|
|
pattern = r'^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$' |
|
|
return bool(re.match(pattern, color)) |
|
|
|
|
|
def generate_unique_id(content: str = "") -> str: |
|
|
"""Generate unique ID for content""" |
|
|
timestamp = datetime.now().isoformat() |
|
|
content_hash = hashlib.md5(f"{content}{timestamp}".encode()).hexdigest() |
|
|
return content_hash[:8] |
|
|
|
|
|
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: |
|
|
"""Truncate text to specified length""" |
|
|
if len(text) <= max_length: |
|
|
return text |
|
|
|
|
|
return text[:max_length - len(suffix)].strip() + suffix |
|
|
|
|
|
def extract_numbers(text: str) -> List[float]: |
|
|
"""Extract all numbers from text""" |
|
|
pattern = r'-?\d+(?:\.\d+)?' |
|
|
matches = re.findall(pattern, text) |
|
|
return [float(match) for match in matches] |
|
|
|
|
|
def calculate_reading_time(text: str, wpm: int = 200) -> int: |
|
|
"""Calculate estimated reading time in minutes""" |
|
|
word_count = len(text.split()) |
|
|
return max(1, round(word_count / wpm)) |
|
|
|
|
|
def format_file_size(size_bytes: int) -> str: |
|
|
"""Format file size in human readable format""" |
|
|
if size_bytes == 0: |
|
|
return "0 B" |
|
|
|
|
|
size_names = ["B", "KB", "MB", "GB"] |
|
|
i = 0 |
|
|
while size_bytes >= 1024 and i < len(size_names) - 1: |
|
|
size_bytes /= 1024.0 |
|
|
i += 1 |
|
|
|
|
|
return f"{size_bytes:.1f} {size_names[i]}" |
|
|
|
|
|
def safe_divide(a: float, b: float, default: float = 0.0) -> float: |
|
|
"""Safe division with default value""" |
|
|
try: |
|
|
return a / b if b != 0 else default |
|
|
except (TypeError, ZeroDivisionError): |
|
|
return default |
|
|
|
|
|
def merge_dicts(dict1: Dict, dict2: Dict) -> Dict: |
|
|
"""Merge two dictionaries recursively""" |
|
|
result = dict1.copy() |
|
|
|
|
|
for key, value in dict2.items(): |
|
|
if key in result and isinstance(result[key], dict) and isinstance(value, dict): |
|
|
result[key] = merge_dicts(result[key], value) |
|
|
else: |
|
|
result[key] = value |
|
|
|
|
|
return result |
|
|
|
|
|
def validate_content_length(text: str, min_length: int = 50, max_length: int = 15000) -> Dict[str, Any]: |
|
|
"""Validate content length""" |
|
|
length = len(text.strip()) |
|
|
|
|
|
return { |
|
|
'valid': min_length <= length <= max_length, |
|
|
'length': length, |
|
|
'min_required': min_length, |
|
|
'max_allowed': max_length, |
|
|
'message': f"Content length: {length} characters" |
|
|
} |
|
|
|
|
|
def extract_urls(text: str) -> List[str]: |
|
|
"""Extract URLs from text""" |
|
|
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' |
|
|
return re.findall(url_pattern, text) |
|
|
|
|
|
def clean_filename(filename: str) -> str: |
|
|
"""Clean filename for safe file system usage""" |
|
|
|
|
|
filename = re.sub(r'[<>:"/\\|?*]', '_', filename) |
|
|
filename = filename.strip('. ') |
|
|
|
|
|
|
|
|
if len(filename) > 100: |
|
|
name, ext = os.path.splitext(filename) |
|
|
filename = name[:100-len(ext)] + ext |
|
|
|
|
|
return filename or 'untitled' |
|
|
|
|
|
def log_performance(func): |
|
|
"""Decorator to log function performance""" |
|
|
def wrapper(*args, **kwargs): |
|
|
start_time = datetime.now() |
|
|
try: |
|
|
result = func(*args, **kwargs) |
|
|
end_time = datetime.now() |
|
|
duration = (end_time - start_time).total_seconds() |
|
|
logger.info(f"{func.__name__} completed in {duration:.2f} seconds") |
|
|
return result |
|
|
except Exception as e: |
|
|
end_time = datetime.now() |
|
|
duration = (end_time - start_time).total_seconds() |
|
|
logger.error(f"{func.__name__} failed after {duration:.2f} seconds: {e}") |
|
|
raise |
|
|
|
|
|
return wrapper |
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: |
|
|
"""Split text into overlapping chunks""" |
|
|
if len(text) <= chunk_size: |
|
|
return [text] |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(text): |
|
|
end = start + chunk_size |
|
|
|
|
|
if end >= len(text): |
|
|
chunks.append(text[start:]) |
|
|
break |
|
|
|
|
|
|
|
|
chunk = text[start:end] |
|
|
|
|
|
|
|
|
last_sentence = chunk.rfind('.') |
|
|
if last_sentence > chunk_size // 2: |
|
|
chunk = chunk[:last_sentence + 1] |
|
|
else: |
|
|
|
|
|
last_space = chunk.rfind(' ') |
|
|
if last_space > chunk_size // 2: |
|
|
chunk = chunk[:last_space] |
|
|
|
|
|
chunks.append(chunk) |
|
|
start += len(chunk) - overlap |
|
|
|
|
|
return chunks |