Spaces:
Runtime error
Runtime error
| import asyncio | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Union | |
| from urllib.parse import urlparse | |
| from langchain_core.documents import Document | |
| from src.models import ContentDeliveryFrequency | |
| class Helper: | |
| """A utility class providing helper methods for document creation, delivery date calculation, and URL validation. | |
| Methods: | |
| __aenter__(): | |
| Asynchronous context manager entry method. | |
| __aexit__(exc_type, exc_value, traceback): | |
| Asynchronous context manager exit method. | |
| create_documents(page_texts: List[Dict], file_name: str, user_id: int = None) -> List[Document]: | |
| calculate_next_delivery_date(last_sent_at: Optional[datetime], frequency: ContentDeliveryFrequency) -> datetime: | |
| Calculate next delivery date based on last message sent date. | |
| is_valid_url(url: str) -> bool: | |
| Check if the URL is valid. | |
| """ | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| async def create_documents( | |
| self, | |
| page_texts: List[Dict], | |
| file_name: str, | |
| user_id: int = None, | |
| ) -> List[Document]: | |
| """ | |
| Create Document objects from parsed page texts using either file_id or file_name. | |
| Args: | |
| page_texts: List of tuples containing page number and text content | |
| identifier: Either file_id (int) or file_name (str) depending on user_id | |
| user_id: ID of the user who uploaded the file, None for anonymous uploads | |
| Returns: | |
| List of Document objects with appropriate metadata | |
| """ | |
| documents = [] | |
| for page_num, text_content in page_texts: | |
| metadata = { | |
| "file_name": file_name, | |
| "page_number": page_num, | |
| "user_id": str(user_id) if user_id else "public", | |
| } | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor() as pool: | |
| document = await loop.run_in_executor( | |
| pool, lambda: Document(page_content=text_content, metadata=metadata) | |
| ) | |
| documents.append(document) | |
| return documents | |
| def calculate_next_delivery_date( | |
| self, last_sent_at: Optional[datetime], frequency: ContentDeliveryFrequency | |
| ) -> datetime: | |
| """Calculate next delivery date based on last message sent date""" | |
| base_date = last_sent_at if last_sent_at else datetime.now() | |
| if frequency == ContentDeliveryFrequency.DAILY: | |
| return base_date + timedelta(days=1) | |
| elif frequency == ContentDeliveryFrequency.WEEKLY: | |
| return base_date + timedelta(weeks=1) | |
| elif frequency == ContentDeliveryFrequency.BI_WEEKLY: | |
| return base_date + timedelta(weeks=2) | |
| elif frequency == ContentDeliveryFrequency.MONTHLY: | |
| return base_date + timedelta(days=30) | |
| else: | |
| return base_date + timedelta(days=1) # Default to daily | |
| def is_valid_url(self, url: str) -> bool: | |
| """Check if the URL is valid""" | |
| try: | |
| result = urlparse(url) | |
| return all([result.scheme, result.netloc]) | |
| except: | |
| return False | |