diff --git a/.env b/.env
new file mode 100644
index 0000000000000000000000000000000000000000..6b057cb56056ed04f79133ee40ce30def8abbafe
--- /dev/null
+++ b/.env
@@ -0,0 +1,15 @@
+PROJECT_NAME=Admin Dashboard API
+VERSION=1.0.0
+API_V1_STR=/api/v1
+
+# Security
+SECRET_KEY=your-secret-key-here-change-in-production
+ACCESS_TOKEN_EXPIRE_MINUTES=30
+ALGORITHM=HS256
+
+# Database
+DATABASE_URL=postgresql+asyncpg://postgres:Lovyelias5584.@db.mqyrkmsdgugdhxiucukb.supabase.co:5432/postgres
+
+# Redis Cache
+REDIS_HOST=localhost
+REDIS_PORT=6379
\ No newline at end of file
diff --git a/alembic.ini b/alembic.ini
new file mode 100644
index 0000000000000000000000000000000000000000..761e652c76b533b9982f8667524e320ae517bd0e
--- /dev/null
+++ b/alembic.ini
@@ -0,0 +1,77 @@
+# A generic, single database configuration.
+
+[alembic]
+# path to migration scripts
+script_location = alembic
+
+# template used to generate migration files
+file_template = %%(year)d%%(month).2d%%(day).2d_%%(hour).2d%%(minute).2d_%%(rev)s_%%(slug)s
+
+# timezone to use when rendering the date within the migration file
+# as well as the filename.
+timezone = UTC
+
+# max length of characters to apply to the "slug" field
+truncate_slug_length = 40
+
+# set to 'true' to run the environment during
+# the 'revision' command, regardless of autogenerate
+revision_environment = false
+
+# set to 'true' to allow .pyc and .pyo files without
+# a source .py file to be detected as revisions in the
+# versions/ directory
+sourceless = false
+
+# version location specification
+version_locations = alembic/versions
+
+# version path separator
+version_path_separator = os
+
+# the output encoding used when revision files
+# are written from script.py.mako
+output_encoding = utf-8
+
+sqlalchemy.url = postgresql+psycopg2://postgres:Lovyelias5584.@db.mqyrkmsdgugdhxiucukb.supabase.co:5432/postgres
+
+[post_write_hooks]
+# format using "black"
+hooks = black
+black.type = console_scripts
+black.entrypoint = black
+black.options = -l 79 REVISION_SCRIPT_FILENAME
+
+[loggers]
+keys = root,sqlalchemy,alembic
+
+[handlers]
+keys = console
+
+[formatters]
+keys = generic
+
+[logger_root]
+level = WARN
+handlers = console
+qualname =
+
+[logger_sqlalchemy]
+level = WARN
+handlers =
+qualname = sqlalchemy.engine
+
+[logger_alembic]
+level = INFO
+handlers =
+qualname = alembic
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatter_generic]
+format = %(levelname)-5.5s [%(name)s] %(message)s
+datefmt = %H:%M:%S
diff --git a/alembic/README b/alembic/README
new file mode 100644
index 0000000000000000000000000000000000000000..98e4f9c44effe479ed38c66ba922e7bcc672916f
--- /dev/null
+++ b/alembic/README
@@ -0,0 +1 @@
+Generic single-database configuration.
\ No newline at end of file
diff --git a/alembic/__pycache__/env.cpython-312.pyc b/alembic/__pycache__/env.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..b114317a5007040593853113a5442bf4917b9f53
Binary files /dev/null and b/alembic/__pycache__/env.cpython-312.pyc differ
diff --git a/alembic/env.py b/alembic/env.py
new file mode 100644
index 0000000000000000000000000000000000000000..49188293be5c3fc2086ce6abd13b00df072f3a02
--- /dev/null
+++ b/alembic/env.py
@@ -0,0 +1,63 @@
+from logging.config import fileConfig
+from sqlalchemy import engine_from_config
+from sqlalchemy import pool
+from alembic import context
+import os
+import sys
+from pathlib import Path
+
+# Add the parent directory to the Python path
+parent_dir = str(Path(__file__).resolve().parents[1])
+sys.path.append(parent_dir)
+
+from app.core.config import settings
+from app.db.models import Base
+
+config = context.config
+
+if config.config_file_name is not None:
+ fileConfig(config.config_file_name)
+
+def get_url():
+ return str(settings.DATABASE_URL).replace("+asyncpg", "+psycopg2")
+
+config.set_main_option("sqlalchemy.url", get_url())
+
+target_metadata = Base.metadata
+
+def run_migrations_offline() -> None:
+ """Run migrations in 'offline' mode."""
+ url = get_url()
+ context.configure(
+ url=url,
+ target_metadata=target_metadata,
+ literal_binds=True,
+ dialect_opts={"paramstyle": "named"},
+ )
+
+ with context.begin_transaction():
+ context.run_migrations()
+
+def run_migrations_online() -> None:
+ """Run migrations in 'online' mode."""
+ configuration = config.get_section(config.config_ini_section)
+ configuration["sqlalchemy.url"] = get_url()
+ connectable = engine_from_config(
+ configuration,
+ prefix="sqlalchemy.",
+ poolclass=pool.NullPool,
+ )
+
+ with connectable.connect() as connection:
+ context.configure(
+ connection=connection,
+ target_metadata=target_metadata
+ )
+
+ with context.begin_transaction():
+ context.run_migrations()
+
+if context.is_offline_mode():
+ run_migrations_offline()
+else:
+ run_migrations_online()
diff --git a/alembic/script.py.mako b/alembic/script.py.mako
new file mode 100644
index 0000000000000000000000000000000000000000..fbc4b07dcef98b20c6f96b642097f35e8433258e
--- /dev/null
+++ b/alembic/script.py.mako
@@ -0,0 +1,26 @@
+"""${message}
+
+Revision ID: ${up_revision}
+Revises: ${down_revision | comma,n}
+Create Date: ${create_date}
+
+"""
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+${imports if imports else ""}
+
+# revision identifiers, used by Alembic.
+revision: str = ${repr(up_revision)}
+down_revision: Union[str, None] = ${repr(down_revision)}
+branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)}
+depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)}
+
+
+def upgrade() -> None:
+ ${upgrades if upgrades else "pass"}
+
+
+def downgrade() -> None:
+ ${downgrades if downgrades else "pass"}
diff --git a/app/__init__.py b/app/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/app/__pycache__/__init__.cpython-312.pyc b/app/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..96195daa7444dc966e9a0873ad74c05b850fbf2e
Binary files /dev/null and b/app/__pycache__/__init__.cpython-312.pyc differ
diff --git a/app/__pycache__/main.cpython-312.pyc b/app/__pycache__/main.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..0c4986851a91f9d0f200704d889c7b0c2cb6152b
Binary files /dev/null and b/app/__pycache__/main.cpython-312.pyc differ
diff --git a/app/api/__init__.py b/app/api/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/app/api/analytics.py b/app/api/analytics.py
new file mode 100644
index 0000000000000000000000000000000000000000..c7b14c70458fa5e51e89accb0759fb80bd2d184f
--- /dev/null
+++ b/app/api/analytics.py
@@ -0,0 +1,175 @@
+from fastapi import APIRouter, Depends, Query
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, func, cast, Date
+from datetime import datetime, timedelta
+from typing import Dict, Any
+from ..core.dependencies import get_current_superuser
+from ..db.database import get_db
+from ..db.models import Order, Product, User
+
+router = APIRouter()
+
+@router.get("/sales")
+async def get_sales_analytics(
+ start_date: datetime = Query(default=None),
+ end_date: datetime = Query(default=None),
+ _=Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ if not start_date:
+ start_date = datetime.now() - timedelta(days=30)
+ if not end_date:
+ end_date = datetime.now()
+
+ # Daily sales query
+ stmt = select(
+ cast(Order.created_at, Date).label('date'),
+ func.sum(Order.total_amount).label('total_sales'),
+ func.count().label('order_count')
+ ).where(
+ Order.created_at.between(start_date, end_date),
+ Order.status.in_(['completed', 'delivered'])
+ ).group_by(
+ cast(Order.created_at, Date)
+ ).order_by(
+ cast(Order.created_at, Date)
+ )
+
+ result = await db.execute(stmt)
+ daily_sales = result.all()
+
+ # Calculate totals
+ total_revenue = sum(day.total_sales for day in daily_sales)
+ total_orders = sum(day.order_count for day in daily_sales)
+ avg_order_value = total_revenue / total_orders if total_orders > 0 else 0
+
+ return {
+ "daily_sales": [
+ {"date": day.date, "total_sales": day.total_sales, "order_count": day.order_count}
+ for day in daily_sales
+ ],
+ "total_revenue": total_revenue,
+ "total_orders": total_orders,
+ "average_order_value": avg_order_value
+ }
+
+@router.get("/products")
+async def get_product_analytics(
+ _=Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ # Top selling products
+ stmt = select(
+ Product,
+ func.sum(Order.total_amount).label('total_revenue'),
+ func.count().label('total_orders')
+ ).join(
+ Order, Product.id == Order.id
+ ).group_by(
+ Product.id
+ ).order_by(
+ func.sum(Order.total_amount).desc()
+ ).limit(10)
+
+ result = await db.execute(stmt)
+ top_products = result.all()
+
+ # Count total and low stock products
+ total_products = await db.scalar(select(func.count()).select_from(Product))
+ low_stock_count = await db.scalar(
+ select(func.count()).select_from(Product).where(Product.inventory_count < 10)
+ )
+
+ return {
+ "top_products": [
+ {
+ "id": product.id,
+ "name": product.name,
+ "total_revenue": revenue,
+ "total_orders": orders
+ }
+ for product, revenue, orders in top_products
+ ],
+ "total_products": total_products,
+ "low_stock_products": low_stock_count
+ }
+
+@router.get("/customers")
+async def get_customer_analytics(
+ _=Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ # Customer statistics
+ stmt = select(
+ User,
+ func.sum(Order.total_amount).label('total_spent'),
+ func.count().label('total_orders')
+ ).join(
+ Order, User.id == Order.customer_id
+ ).group_by(
+ User.id
+ ).order_by(
+ func.sum(Order.total_amount).desc()
+ )
+
+ result = await db.execute(stmt)
+ customer_data = result.all()
+
+ total_customers = len(customer_data)
+ total_revenue = sum(spent for _, spent, _ in customer_data)
+ avg_customer_value = total_revenue / total_customers if total_customers > 0 else 0
+
+ # Customer segments
+ segments = {
+ "high_value": len([c for c, spent, _ in customer_data if spent > 1000]),
+ "medium_value": len([c for c, spent, _ in customer_data if 500 <= spent <= 1000]),
+ "low_value": len([c for c, spent, _ in customer_data if spent < 500])
+ }
+
+ return {
+ "total_customers": total_customers,
+ "average_customer_value": avg_customer_value,
+ "customer_segments": segments,
+ "top_customers": [
+ {
+ "id": customer.id,
+ "email": customer.email,
+ "total_spent": spent,
+ "total_orders": orders
+ }
+ for customer, spent, orders in customer_data[:10] # Top 10 customers
+ ]
+ }
+
+@router.get("/dashboard")
+async def get_dashboard_analytics(
+ _=Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ """Get a comprehensive dashboard with key metrics"""
+ # Get last 30 days of sales data
+ start_date = datetime.now() - timedelta(days=30)
+ end_date = datetime.now()
+
+ sales_data = await get_sales_analytics(start_date, end_date, _, db)
+ product_data = await get_product_analytics(_, db)
+ customer_data = await get_customer_analytics(_, db)
+
+ return {
+ "sales_summary": {
+ "total_revenue": sales_data["total_revenue"],
+ "total_orders": sales_data["total_orders"],
+ "average_order_value": sales_data["average_order_value"],
+ "daily_sales": sales_data["daily_sales"][-7:] # Last 7 days
+ },
+ "product_summary": {
+ "total_products": product_data["total_products"],
+ "low_stock_products": product_data["low_stock_products"],
+ "top_selling_products": product_data["top_products"][:5] # Top 5 products
+ },
+ "customer_summary": {
+ "total_customers": customer_data["total_customers"],
+ "average_customer_value": customer_data["average_customer_value"],
+ "customer_segments": customer_data["customer_segments"]
+ }
+ }
\ No newline at end of file
diff --git a/app/api/auth.py b/app/api/auth.py
new file mode 100644
index 0000000000000000000000000000000000000000..65e47675c43af4eba274799c166aa2609b6c482a
--- /dev/null
+++ b/app/api/auth.py
@@ -0,0 +1,69 @@
+from fastapi import APIRouter, Depends, HTTPException, status
+from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from ..core.security import create_access_token, verify_password, get_password_hash
+from ..db.database import get_db
+from ..db.models import User
+from datetime import timedelta
+from typing import Any
+
+router = APIRouter()
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
+
+@router.post("/login")
+async def login(
+ form_data: OAuth2PasswordRequestForm = Depends(),
+ db: AsyncSession = Depends(get_db)
+) -> Any:
+ stmt = select(User).where(User.email == form_data.username)
+ result = await db.execute(stmt)
+ user = result.scalar_one_or_none()
+
+ if not user:
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Incorrect email or password",
+ )
+
+ if not verify_password(form_data.password, user.hashed_password):
+ raise HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Incorrect email or password",
+ )
+
+ access_token = create_access_token(user.id)
+ return {"access_token": access_token, "token_type": "bearer"}
+
+@router.post("/register", response_model=User)
+async def register(
+ user_data: OAuth2PasswordRequestForm = Depends(),
+ db: AsyncSession = Depends(get_db)
+) -> Any:
+ # Check if user exists
+ stmt = select(User).where(User.email == user_data.username)
+ result = await db.execute(stmt)
+ existing_user = result.scalar_one_or_none()
+
+ if existing_user:
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Email already registered",
+ )
+
+ # Create new user
+ user = User(
+ email=user_data.username,
+ hashed_password=get_password_hash(user_data.password),
+ full_name=user_data.username, # You might want to add this as a separate field in the form
+ username=user_data.username,
+ is_active=True,
+ is_superuser=False,
+ roles=["user"]
+ )
+
+ db.add(user)
+ await db.commit()
+ await db.refresh(user)
+
+ return user
\ No newline at end of file
diff --git a/app/api/calendar.py b/app/api/calendar.py
new file mode 100644
index 0000000000000000000000000000000000000000..bad50f90cad6dba5b9aaac78b80f08fde2a536e3
--- /dev/null
+++ b/app/api/calendar.py
@@ -0,0 +1,146 @@
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, or_
+from typing import List, Dict, Any
+from datetime import datetime, timedelta
+from ..core.dependencies import get_current_active_user
+from ..db.database import get_db
+from ..db.models import Event, User
+
+router = APIRouter()
+
+@router.post("/events", response_model=Event)
+async def create_event(
+ event: Event,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Event:
+ """Create a new calendar event"""
+ # Set the user_id from the authenticated user
+ event.user_id = current_user.id
+
+ # Add to database
+ db.add(event)
+ await db.commit()
+ await db.refresh(event)
+ return event
+
+@router.get("/events", response_model=List[Event])
+async def get_events(
+ start_date: datetime = Query(default=None),
+ end_date: datetime = Query(default=None),
+ include_attendee_events: bool = True,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> List[Event]:
+ """Get user's events within a date range"""
+ if not start_date:
+ start_date = datetime.now()
+ if not end_date:
+ end_date = start_date + timedelta(days=30)
+
+ query = select(Event).where(
+ Event.start_time >= start_date,
+ Event.end_time <= end_date
+ )
+
+ if include_attendee_events:
+ query = query.where(or_(
+ Event.user_id == current_user.id,
+ Event.attendees.contains([str(current_user.id)])
+ ))
+ else:
+ query = query.where(Event.user_id == current_user.id)
+
+ query = query.order_by(Event.start_time)
+ result = await db.execute(query)
+ return result.scalars().all()
+
+@router.put("/events/{event_id}", response_model=Event)
+async def update_event(
+ event_id: int,
+ event_update: Event,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Event:
+ """Update an event"""
+ stmt = select(Event).where(
+ Event.id == event_id,
+ Event.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ event = result.scalar_one_or_none()
+
+ if not event:
+ raise HTTPException(
+ status_code=404,
+ detail="Event not found or you don't have permission to update it"
+ )
+
+ # Update event fields
+ update_data = event_update.dict(exclude_unset=True)
+ for field, value in update_data.items():
+ setattr(event, field, value)
+
+ event.updated_at = datetime.utcnow()
+ await db.commit()
+ await db.refresh(event)
+ return event
+
+@router.delete("/events/{event_id}")
+async def delete_event(
+ event_id: int,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, bool]:
+ """Delete an event"""
+ stmt = select(Event).where(
+ Event.id == event_id,
+ Event.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ event = result.scalar_one_or_none()
+
+ if not event:
+ raise HTTPException(
+ status_code=404,
+ detail="Event not found or you don't have permission to delete it"
+ )
+
+ await db.delete(event)
+ await db.commit()
+ return {"success": True}
+
+@router.post("/events/{event_id}/respond")
+async def respond_to_event(
+ event_id: int,
+ response: str,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, bool]:
+ """Respond to an event invitation"""
+ if response not in ["accepted", "declined", "maybe"]:
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid response. Must be one of: accepted, declined, maybe"
+ )
+
+ stmt = select(Event).where(
+ Event.id == event_id,
+ Event.attendees.contains([str(current_user.id)])
+ )
+ result = await db.execute(stmt)
+ event = result.scalar_one_or_none()
+
+ if not event:
+ raise HTTPException(
+ status_code=404,
+ detail="Event not found or you are not invited to this event"
+ )
+
+ # Update the response in the attendee_responses dictionary
+ event.attendee_responses[str(current_user.id)] = response
+ event.updated_at = datetime.utcnow()
+
+ await db.commit()
+ return {"success": True}
\ No newline at end of file
diff --git a/app/api/files.py b/app/api/files.py
new file mode 100644
index 0000000000000000000000000000000000000000..48b4b667409d213af64088b96aad9ece852c4972
--- /dev/null
+++ b/app/api/files.py
@@ -0,0 +1,53 @@
+from fastapi import APIRouter, UploadFile, File, Depends, HTTPException
+from fastapi.responses import FileResponse
+from typing import List
+from ..core.dependencies import get_current_active_user
+from ..utils.file_storage import file_storage
+from ..utils.logger import logger
+from pathlib import Path
+
+router = APIRouter()
+
+@router.post("/upload")
+async def upload_file(
+ file: UploadFile = File(...),
+ category: str = "documents",
+ current_user = Depends(get_current_active_user)
+) -> dict:
+ try:
+ file_path = await file_storage.save_file(file, category)
+ if not file_path:
+ raise HTTPException(status_code=400, detail="Failed to upload file")
+
+ return {
+ "filename": file.filename,
+ "stored_path": file_path,
+ "url": file_storage.get_file_url(file_path)
+ }
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e))
+ except Exception as e:
+ logger.error(f"File upload error: {str(e)}")
+ raise HTTPException(status_code=500, detail="Internal server error")
+
+@router.delete("/{file_path:path}")
+async def delete_file(
+ file_path: str,
+ current_user = Depends(get_current_active_user)
+) -> dict:
+ success = await file_storage.delete_file(file_path)
+ if not success:
+ raise HTTPException(status_code=404, detail="File not found")
+
+ return {"status": "success", "message": "File deleted successfully"}
+
+@router.get("/{file_path:path}")
+async def get_file(
+ file_path: str,
+ current_user = Depends(get_current_active_user)
+):
+ full_path = Path("uploads") / file_path
+ if not full_path.exists():
+ raise HTTPException(status_code=404, detail="File not found")
+
+ return FileResponse(str(full_path))
\ No newline at end of file
diff --git a/app/api/maintenance.py b/app/api/maintenance.py
new file mode 100644
index 0000000000000000000000000000000000000000..54fa475abff30506d8b3c9cfe131327b88883e1a
--- /dev/null
+++ b/app/api/maintenance.py
@@ -0,0 +1,133 @@
+from fastapi import APIRouter, Depends, HTTPException
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, delete, func
+from typing import Dict, Any, List
+from datetime import datetime, timedelta
+from ..core.dependencies import get_current_active_user
+from ..db.database import get_db
+from ..db.models import User, Order, Notification, Event
+from ..utils.logger import logger
+
+router = APIRouter()
+
+@router.post("/sessions/cleanup")
+async def cleanup_sessions(
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, int]:
+ """Manually trigger session cleanup"""
+ if "admin" not in current_user.roles:
+ raise HTTPException(
+ status_code=403,
+ detail="Only administrators can perform maintenance operations"
+ )
+
+ cutoff_date = datetime.utcnow() - timedelta(days=7)
+ stmt = delete(Event).where(Event.created_at < cutoff_date)
+ result = await db.execute(stmt)
+ await db.commit()
+
+ return {"deleted_sessions": result.rowcount}
+
+@router.post("/data/archive")
+async def archive_data(
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, int]:
+ """Manually trigger data archiving"""
+ if "admin" not in current_user.roles:
+ raise HTTPException(
+ status_code=403,
+ detail="Only administrators can perform maintenance operations"
+ )
+
+ archive_date = datetime.utcnow() - timedelta(days=365)
+ archived = {}
+
+ # Archive old orders
+ orders_stmt = delete(Order).where(
+ Order.created_at < archive_date,
+ Order.status.in_(["delivered", "cancelled"])
+ )
+ orders_result = await db.execute(orders_stmt)
+ archived["orders"] = orders_result.rowcount
+
+ # Archive old notifications
+ notif_stmt = delete(Notification).where(
+ Notification.created_at < archive_date,
+ Notification.read == True
+ )
+ notif_result = await db.execute(notif_stmt)
+ archived["notifications"] = notif_result.rowcount
+
+ await db.commit()
+ return archived
+
+@router.get("/health")
+async def check_health(
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ """Check system health metrics"""
+ if "admin" not in current_user.roles:
+ raise HTTPException(
+ status_code=403,
+ detail="Only administrators can view system health"
+ )
+
+ try:
+ # Check database connection
+ await db.execute(select(1))
+
+ # Get database statistics
+ total_users = await db.scalar(select(func.count()).select_from(User))
+ total_orders = await db.scalar(select(func.count()).select_from(Order))
+ total_notifications = await db.scalar(select(func.count()).select_from(Notification))
+
+ return {
+ "status": "healthy",
+ "timestamp": datetime.utcnow(),
+ "database": {
+ "connected": True,
+ "total_users": total_users,
+ "total_orders": total_orders,
+ "total_notifications": total_notifications
+ }
+ }
+ except Exception as e:
+ logger.error(f"Health check error: {str(e)}")
+ return {
+ "status": "unhealthy",
+ "error": str(e),
+ "timestamp": datetime.utcnow()
+ }
+
+@router.post("/database/maintenance")
+async def perform_db_maintenance(
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, Any]:
+ """Manually trigger database maintenance"""
+ if "admin" not in current_user.roles:
+ raise HTTPException(
+ status_code=403,
+ detail="Only administrators can perform maintenance operations"
+ )
+
+ try:
+ # Cleanup expired sessions
+ await cleanup_sessions(current_user, db)
+
+ # Run VACUUM ANALYZE (requires raw SQL)
+ await db.execute("VACUUM ANALYZE;")
+
+ return {
+ "status": "success",
+ "message": "Database maintenance completed successfully"
+ }
+ except Exception as e:
+ logger.error(f"Database maintenance error: {str(e)}")
+ raise HTTPException(
+ status_code=500,
+ detail=f"Database maintenance failed: {str(e)}"
+ )
\ No newline at end of file
diff --git a/app/api/notifications.py b/app/api/notifications.py
new file mode 100644
index 0000000000000000000000000000000000000000..73650dd45fe9967fe60d3531fdeef9b6f7057c1e
--- /dev/null
+++ b/app/api/notifications.py
@@ -0,0 +1,84 @@
+from fastapi import APIRouter, Depends, HTTPException, Query
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, update
+from typing import List, Dict, Any, Optional
+from ..core.dependencies import get_current_active_user
+from ..db.database import get_db
+from ..db.models import Notification, User
+
+router = APIRouter()
+
+@router.get("/")
+async def get_notifications(
+ skip: int = Query(0, ge=0),
+ limit: int = Query(50, ge=1, le=100),
+ unread_only: bool = False,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> List[Notification]:
+ """Get user's notifications"""
+ query = select(Notification).where(Notification.user_id == current_user.id)
+
+ if unread_only:
+ query = query.where(Notification.read == False)
+
+ query = query.order_by(Notification.created_at.desc()).offset(skip).limit(limit)
+ result = await db.execute(query)
+ return result.scalars().all()
+
+@router.post("/mark-read/{notification_id}")
+async def mark_notification_read(
+ notification_id: int,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, bool]:
+ """Mark a notification as read"""
+ stmt = select(Notification).where(
+ Notification.id == notification_id,
+ Notification.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ notification = result.scalar_one_or_none()
+
+ if not notification:
+ raise HTTPException(status_code=404, detail="Notification not found")
+
+ notification.read = True
+ await db.commit()
+ return {"success": True}
+
+@router.post("/mark-all-read")
+async def mark_all_notifications_read(
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, int]:
+ """Mark all notifications as read"""
+ stmt = update(Notification).where(
+ Notification.user_id == current_user.id,
+ Notification.read == False
+ ).values(read=True)
+
+ result = await db.execute(stmt)
+ await db.commit()
+ return {"marked_count": result.rowcount}
+
+@router.delete("/{notification_id}")
+async def delete_notification(
+ notification_id: int,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, bool]:
+ """Delete a notification"""
+ stmt = select(Notification).where(
+ Notification.id == notification_id,
+ Notification.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ notification = result.scalar_one_or_none()
+
+ if not notification:
+ raise HTTPException(status_code=404, detail="Notification not found")
+
+ await db.delete(notification)
+ await db.commit()
+ return {"success": True}
\ No newline at end of file
diff --git a/app/api/orders.py b/app/api/orders.py
new file mode 100644
index 0000000000000000000000000000000000000000..93089011469d48819bb3c9fea21b330256cbc9ca
--- /dev/null
+++ b/app/api/orders.py
@@ -0,0 +1,142 @@
+from fastapi import APIRouter, HTTPException, status, Depends
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from typing import List, Optional
+from ..db.database import get_db
+from ..db.models import Order, Product, OrderItem, User
+from datetime import datetime
+
+router = APIRouter()
+
+@router.post("/", response_model=Order)
+async def create_order(
+ order: Order,
+ db: AsyncSession = Depends(get_db)
+) -> Order:
+ # Calculate total and validate products
+ total = 0
+ order_items = []
+
+ for item in order.items:
+ # Get product
+ stmt = select(Product).where(Product.id == item.product_id)
+ result = await db.execute(stmt)
+ product = result.scalar_one_or_none()
+
+ if not product:
+ raise HTTPException(
+ status_code=404,
+ detail=f"Product {item.product_id} not found"
+ )
+
+ if product.inventory_count < item.quantity:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Insufficient inventory for product {item.product_id}"
+ )
+
+ # Update inventory
+ product.inventory_count -= item.quantity
+ total += product.price * item.quantity
+
+ # Create order item
+ order_item = OrderItem(
+ product_id=item.product_id,
+ quantity=item.quantity,
+ price=product.price
+ )
+ order_items.append(order_item)
+
+ # Create order
+ db_order = Order(
+ customer_id=order.customer_id,
+ total_amount=total,
+ status="pending",
+ items=order_items,
+ created_at=datetime.utcnow(),
+ updated_at=datetime.utcnow()
+ )
+
+ db.add(db_order)
+ await db.commit()
+ await db.refresh(db_order)
+ return db_order
+
+@router.get("/", response_model=List[Order])
+async def list_orders(
+ skip: int = 0,
+ limit: int = 10,
+ status: Optional[str] = None,
+ db: AsyncSession = Depends(get_db)
+) -> List[Order]:
+ query = select(Order)
+ if status:
+ query = query.where(Order.status == status)
+
+ query = query.offset(skip).limit(limit)
+ result = await db.execute(query)
+ return result.scalars().all()
+
+@router.get("/{order_id}", response_model=Order)
+async def get_order(
+ order_id: int,
+ db: AsyncSession = Depends(get_db)
+) -> Order:
+ stmt = select(Order).where(Order.id == order_id)
+ result = await db.execute(stmt)
+ order = result.scalar_one_or_none()
+
+ if not order:
+ raise HTTPException(status_code=404, detail="Order not found")
+ return order
+
+@router.put("/{order_id}/status", response_model=Order)
+async def update_order_status(
+ order_id: int,
+ status: str,
+ db: AsyncSession = Depends(get_db)
+) -> Order:
+ valid_statuses = ["pending", "processing", "shipped", "delivered", "cancelled"]
+ if status not in valid_statuses:
+ raise HTTPException(status_code=400, detail="Invalid status")
+
+ stmt = select(Order).where(Order.id == order_id)
+ result = await db.execute(stmt)
+ order = result.scalar_one_or_none()
+
+ if not order:
+ raise HTTPException(status_code=404, detail="Order not found")
+
+ order.status = status
+ order.updated_at = datetime.utcnow()
+
+ await db.commit()
+ await db.refresh(order)
+ return order
+
+@router.delete("/{order_id}")
+async def delete_order(
+ order_id: int,
+ db: AsyncSession = Depends(get_db)
+):
+ # Get the order
+ stmt = select(Order).where(Order.id == order_id)
+ result = await db.execute(stmt)
+ order = result.scalar_one_or_none()
+
+ if not order:
+ raise HTTPException(status_code=404, detail="Order not found")
+
+ # Restore inventory for each product
+ for item in order.items:
+ product_stmt = select(Product).where(Product.id == item.product_id)
+ product_result = await db.execute(product_stmt)
+ product = product_result.scalar_one_or_none()
+
+ if product:
+ product.inventory_count += item.quantity
+
+ await db.delete(order)
+ await db.commit()
+
+ return {"status": "success", "message": "Order deleted and inventory restored"}
\ No newline at end of file
diff --git a/app/api/products.py b/app/api/products.py
new file mode 100644
index 0000000000000000000000000000000000000000..433ff629ba0651021054dd79bb1e91532959e43b
--- /dev/null
+++ b/app/api/products.py
@@ -0,0 +1,86 @@
+from fastapi import APIRouter, HTTPException, status, Depends
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from typing import List, Optional
+from ..db.database import get_db
+from ..db.models import Product
+from datetime import datetime
+
+router = APIRouter()
+
+@router.post("/", response_model=Product)
+async def create_product(
+ product: Product,
+ db: AsyncSession = Depends(get_db)
+) -> Product:
+ db.add(product)
+ await db.commit()
+ await db.refresh(product)
+ return product
+
+@router.get("/", response_model=List[Product])
+async def list_products(
+ skip: int = 0,
+ limit: int = 10,
+ category: Optional[str] = None,
+ db: AsyncSession = Depends(get_db)
+) -> List[Product]:
+ query = select(Product)
+ if category:
+ query = query.where(Product.category == category)
+
+ query = query.offset(skip).limit(limit)
+ result = await db.execute(query)
+ return result.scalars().all()
+
+@router.get("/{product_id}", response_model=Product)
+async def get_product(
+ product_id: int,
+ db: AsyncSession = Depends(get_db)
+) -> Product:
+ stmt = select(Product).where(Product.id == product_id)
+ result = await db.execute(stmt)
+ product = result.scalar_one_or_none()
+
+ if not product:
+ raise HTTPException(status_code=404, detail="Product not found")
+ return product
+
+@router.put("/{product_id}", response_model=Product)
+async def update_product(
+ product_id: int,
+ product_update: Product,
+ db: AsyncSession = Depends(get_db)
+) -> Product:
+ stmt = select(Product).where(Product.id == product_id)
+ result = await db.execute(stmt)
+ product = result.scalar_one_or_none()
+
+ if not product:
+ raise HTTPException(status_code=404, detail="Product not found")
+
+ # Update product fields
+ update_data = product_update.dict(exclude_unset=True)
+ for field, value in update_data.items():
+ setattr(product, field, value)
+
+ product.updated_at = datetime.utcnow()
+ await db.commit()
+ await db.refresh(product)
+ return product
+
+@router.delete("/{product_id}")
+async def delete_product(
+ product_id: int,
+ db: AsyncSession = Depends(get_db)
+):
+ stmt = select(Product).where(Product.id == product_id)
+ result = await db.execute(stmt)
+ product = result.scalar_one_or_none()
+
+ if not product:
+ raise HTTPException(status_code=404, detail="Product not found")
+
+ await db.delete(product)
+ await db.commit()
+ return {"status": "success", "message": "Product deleted"}
\ No newline at end of file
diff --git a/app/api/scheduler.py b/app/api/scheduler.py
new file mode 100644
index 0000000000000000000000000000000000000000..ae7b90c82b60d0150c6f6111fb2fcb55ef79fa6e
--- /dev/null
+++ b/app/api/scheduler.py
@@ -0,0 +1,203 @@
+from fastapi import APIRouter, Depends, HTTPException
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select, delete
+from typing import List, Dict, Any, Optional
+from datetime import datetime, timedelta
+from ..core.dependencies import get_current_active_user
+from ..db.database import get_db
+from ..db.models import Event, User
+from pydantic import BaseModel
+
+router = APIRouter()
+
+class RecurringEventCreate(BaseModel):
+ title: str
+ description: str
+ start_time: datetime
+ end_time: datetime
+ recurrence_pattern: str
+ recurrence_end_date: Optional[datetime] = None
+ attendees: List[str] = []
+ reminder_minutes: int = 30
+
+class RecurringEventUpdate(BaseModel):
+ title: Optional[str] = None
+ description: Optional[str] = None
+ start_time: Optional[datetime] = None
+ end_time: Optional[datetime] = None
+ attendees: Optional[List[str]] = None
+ reminder_minutes: Optional[int] = None
+
+@router.post("/recurring-events")
+async def create_recurring_event(
+ event_data: RecurringEventCreate,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> List[Dict[str, Any]]:
+ """Create a new recurring event"""
+ if event_data.recurrence_pattern not in ["daily", "weekly", "monthly", "yearly"]:
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid recurrence pattern. Must be one of: daily, weekly, monthly, yearly"
+ )
+
+ if event_data.start_time >= event_data.end_time:
+ raise HTTPException(
+ status_code=400,
+ detail="End time must be after start time"
+ )
+
+ events = []
+ current_start = event_data.start_time
+ current_end = event_data.end_time
+ duration = event_data.end_time - event_data.start_time
+ sequence_number = 0
+
+ while True:
+ if event_data.recurrence_end_date and current_start > event_data.recurrence_end_date:
+ break
+
+ event = Event(
+ user_id=current_user.id,
+ title=event_data.title,
+ description=event_data.description,
+ start_time=current_start,
+ end_time=current_end,
+ attendees=event_data.attendees,
+ reminder_minutes=event_data.reminder_minutes,
+ is_recurring=True,
+ recurrence_pattern=event_data.recurrence_pattern,
+ sequence_number=sequence_number,
+ status="scheduled"
+ )
+ db.add(event)
+ events.append(event)
+
+ # Calculate next occurrence
+ sequence_number += 1
+ if event_data.recurrence_pattern == "daily":
+ current_start += timedelta(days=1)
+ elif event_data.recurrence_pattern == "weekly":
+ current_start += timedelta(weeks=1)
+ elif event_data.recurrence_pattern == "monthly":
+ # Add one month (approximately)
+ if current_start.month == 12:
+ current_start = current_start.replace(year=current_start.year + 1, month=1)
+ else:
+ current_start = current_start.replace(month=current_start.month + 1)
+ elif event_data.recurrence_pattern == "yearly":
+ current_start = current_start.replace(year=current_start.year + 1)
+
+ current_end = current_start + duration
+
+ await db.commit()
+
+ # Refresh all events to get their IDs
+ for event in events:
+ await db.refresh(event)
+
+ return events
+
+@router.put("/recurring-events/{event_id}")
+async def update_recurring_event(
+ event_id: int,
+ event_update: RecurringEventUpdate,
+ update_future: bool = True,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> List[Dict[str, Any]]:
+ """Update a recurring event and optionally its future occurrences"""
+ update_data = event_update.dict(exclude_unset=True)
+ if not update_data:
+ raise HTTPException(status_code=400, detail="No update data provided")
+
+ # Get the original event
+ stmt = select(Event).where(
+ Event.id == event_id,
+ Event.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ event = result.scalar_one_or_none()
+
+ if not event:
+ raise HTTPException(
+ status_code=404,
+ detail="Event not found or you don't have permission to update it"
+ )
+
+ updated_events = [event]
+
+ # Update future occurrences if requested
+ if update_future and event.is_recurring:
+ future_stmt = select(Event).where(
+ Event.recurrence_group == event.recurrence_group,
+ Event.sequence_number > event.sequence_number,
+ Event.user_id == current_user.id
+ )
+ future_result = await db.execute(future_stmt)
+ future_events = future_result.scalars().all()
+
+ for future_event in future_events:
+ for field, value in update_data.items():
+ setattr(future_event, field, value)
+ updated_events.append(future_event)
+
+ await db.commit()
+ return updated_events
+
+@router.delete("/recurring-events/{event_id}")
+async def delete_recurring_event(
+ event_id: int,
+ delete_future: bool = True,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> Dict[str, bool]:
+ """Delete a recurring event and optionally its future occurrences"""
+ stmt = select(Event).where(
+ Event.id == event_id,
+ Event.user_id == current_user.id
+ )
+ result = await db.execute(stmt)
+ event = result.scalar_one_or_none()
+
+ if not event:
+ raise HTTPException(
+ status_code=404,
+ detail="Event not found or you don't have permission to delete it"
+ )
+
+ if delete_future and event.is_recurring:
+ delete_stmt = delete(Event).where(
+ Event.recurrence_group == event.recurrence_group,
+ Event.sequence_number >= event.sequence_number,
+ Event.user_id == current_user.id
+ )
+ await db.execute(delete_stmt)
+ else:
+ await db.delete(event)
+
+ await db.commit()
+ return {"success": True}
+
+@router.get("/recurring-events/upcoming")
+async def get_upcoming_recurring_events(
+ days: int = 30,
+ current_user: User = Depends(get_current_active_user),
+ db: AsyncSession = Depends(get_db)
+) -> List[Dict[str, Any]]:
+ """Get upcoming recurring events for the next N days"""
+ if days <= 0 or days > 365:
+ raise HTTPException(
+ status_code=400,
+ detail="Days parameter must be between 1 and 365"
+ )
+
+ end_date = datetime.utcnow() + timedelta(days=days)
+ stmt = select(Event).where(
+ Event.user_id == current_user.id,
+ Event.start_time <= end_date,
+ Event.is_recurring == True
+ ).order_by(Event.start_time)
+
+ result = await db.execute(stmt)
+ return result.scalars().all()
\ No newline at end of file
diff --git a/app/api/users.py b/app/api/users.py
new file mode 100644
index 0000000000000000000000000000000000000000..a51c26bdcf21dfcafb8068014cc95108a8a96d8f
--- /dev/null
+++ b/app/api/users.py
@@ -0,0 +1,120 @@
+from fastapi import APIRouter, HTTPException, status, Depends
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from typing import List, Optional
+from ..db.database import get_db
+from ..db.models import User
+from ..core.dependencies import get_current_superuser, get_current_active_user
+from ..core.security import get_password_hash
+
+router = APIRouter()
+
+@router.get("/me", response_model=User)
+async def read_user_me(current_user: User = Depends(get_current_active_user)):
+ return current_user
+
+@router.get("/", response_model=List[User])
+async def list_users(
+ skip: int = 0,
+ limit: int = 10,
+ current_user: User = Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> List[User]:
+ stmt = select(User).offset(skip).limit(limit)
+ result = await db.execute(stmt)
+ return result.scalars().all()
+
+@router.post("/", response_model=User)
+async def create_user(
+ user: User,
+ current_user: User = Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> User:
+ # Check if email exists
+ stmt = select(User).where(User.email == user.email)
+ result = await db.execute(stmt)
+ if result.scalar_one_or_none():
+ raise HTTPException(
+ status_code=status.HTTP_400_BAD_REQUEST,
+ detail="Email already registered"
+ )
+
+ # Hash password if provided
+ if hasattr(user, "password"):
+ user.hashed_password = get_password_hash(user.password)
+ delattr(user, "password")
+
+ db.add(user)
+ await db.commit()
+ await db.refresh(user)
+ return user
+
+@router.put("/{user_id}", response_model=User)
+async def update_user(
+ user_id: int,
+ user_update: User,
+ current_user: User = Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> User:
+ stmt = select(User).where(User.id == user_id)
+ result = await db.execute(stmt)
+ db_user = result.scalar_one_or_none()
+
+ if not db_user:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ # Update user fields
+ update_data = user_update.dict(exclude_unset=True)
+ if "password" in update_data:
+ update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
+
+ for field, value in update_data.items():
+ setattr(db_user, field, value)
+
+ await db.commit()
+ await db.refresh(db_user)
+ return db_user
+
+@router.delete("/{user_id}")
+async def delete_user(
+ user_id: int,
+ current_user: User = Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+):
+ stmt = select(User).where(User.id == user_id)
+ result = await db.execute(stmt)
+ user = result.scalar_one_or_none()
+
+ if not user:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ await db.delete(user)
+ await db.commit()
+ return {"status": "success", "message": "User deleted"}
+
+@router.put("/{user_id}/roles", response_model=User)
+async def update_user_roles(
+ user_id: int,
+ roles: List[str],
+ current_user: User = Depends(get_current_superuser),
+ db: AsyncSession = Depends(get_db)
+) -> User:
+ valid_roles = ["user", "admin", "manager", "support"]
+ invalid_roles = [role for role in roles if role not in valid_roles]
+ if invalid_roles:
+ raise HTTPException(
+ status_code=400,
+ detail=f"Invalid roles: {', '.join(invalid_roles)}"
+ )
+
+ stmt = select(User).where(User.id == user_id)
+ result = await db.execute(stmt)
+ user = result.scalar_one_or_none()
+
+ if not user:
+ raise HTTPException(status_code=404, detail="User not found")
+
+ user.roles = roles
+ await db.commit()
+ await db.refresh(user)
+ return user
\ No newline at end of file
diff --git a/app/core/__init__.py b/app/core/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/app/core/__pycache__/__init__.cpython-312.pyc b/app/core/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..288a6d737963390170f67314e95e3c00c29e7f20
Binary files /dev/null and b/app/core/__pycache__/__init__.cpython-312.pyc differ
diff --git a/app/core/__pycache__/config.cpython-312.pyc b/app/core/__pycache__/config.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..5cdaa11d651d5f55e4f3e97b471f9c4bc569b2a6
Binary files /dev/null and b/app/core/__pycache__/config.cpython-312.pyc differ
diff --git a/app/core/config.py b/app/core/config.py
new file mode 100644
index 0000000000000000000000000000000000000000..88f6e8b719b80d3f322a4f5e9ceb9d7fe78d015a
--- /dev/null
+++ b/app/core/config.py
@@ -0,0 +1,34 @@
+from pydantic_settings import BaseSettings
+from typing import Optional
+
+class Settings(BaseSettings):
+ API_V1_STR: str = "/api/v1"
+ PROJECT_NAME: str = "Admin Dashboard"
+ VERSION: str = "1.0.0"
+
+ # PostgreSQL Database settings
+ DATABASE_URL: str = "postgresql+asyncpg://postgres:Lovyelias5584.@db.mqyrkmsdgugdhxiucukb.supabase.co:5432/postgres"
+
+ # JWT Settings
+ SECRET_KEY: str = "your-secret-key-here" # Change in production
+ ALGORITHM: str = "HS256"
+ ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
+
+ # Redis settings
+ REDIS_HOST: str = "localhost"
+ REDIS_PORT: int = 6379
+
+ # Email settings
+ MAIL_USERNAME: str = "yungdml31@gmail.com"
+ MAIL_PASSWORD: str = ""
+ MAIL_FROM: str = "admin@angelo.com"
+ MAIL_PORT: int = 587
+ MAIL_SERVER: str = "smtp.gmail.com"
+
+ # Frontend URL for email links
+ FRONTEND_URL: str = "http://localhost:3000"
+
+ class Config:
+ case_sensitive = True
+
+settings = Settings()
\ No newline at end of file
diff --git a/app/core/dependencies.py b/app/core/dependencies.py
new file mode 100644
index 0000000000000000000000000000000000000000..21f26817e11c293a9b64c010d016224b307eb28a
--- /dev/null
+++ b/app/core/dependencies.py
@@ -0,0 +1,52 @@
+from fastapi import Depends, HTTPException, status
+from fastapi.security import OAuth2PasswordBearer
+from sqlalchemy.ext.asyncio import AsyncSession
+from sqlalchemy import select
+from jose import JWTError, jwt
+from ..db.database import get_db
+from ..db.models import User
+from ..core.config import settings
+
+oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"{settings.API_V1_STR}/auth/login")
+
+async def get_current_user(
+ token: str = Depends(oauth2_scheme),
+ db: AsyncSession = Depends(get_db)
+):
+ credentials_exception = HTTPException(
+ status_code=status.HTTP_401_UNAUTHORIZED,
+ detail="Could not validate credentials",
+ headers={"WWW-Authenticate": "Bearer"},
+ )
+
+ try:
+ payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
+ user_id: str = payload.get("sub")
+ if user_id is None:
+ raise credentials_exception
+ except JWTError:
+ raise credentials_exception
+
+ stmt = select(User).where(User.id == int(user_id))
+ result = await db.execute(stmt)
+ user = result.scalar_one_or_none()
+
+ if user is None:
+ raise credentials_exception
+ return user
+
+async def get_current_active_user(
+ current_user: User = Depends(get_current_user)
+):
+ if not current_user.is_active:
+ raise HTTPException(status_code=400, detail="Inactive user")
+ return current_user
+
+async def get_current_superuser(
+ current_user: User = Depends(get_current_user)
+):
+ if not current_user.is_superuser:
+ raise HTTPException(
+ status_code=403, detail="The user doesn't have enough privileges"
+ )
+ return current_user
\ No newline at end of file
diff --git a/app/core/security.py b/app/core/security.py
new file mode 100644
index 0000000000000000000000000000000000000000..46fc5e23ac4d66ef3ee48be79907c69d76de179e
--- /dev/null
+++ b/app/core/security.py
@@ -0,0 +1,23 @@
+from datetime import datetime, timedelta
+from typing import Any, Optional
+from jose import jwt
+from passlib.context import CryptContext
+from .config import settings
+
+pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
+
+def create_access_token(subject: Any, expires_delta: Optional[timedelta] = None) -> str:
+ if expires_delta:
+ expire = datetime.utcnow() + expires_delta
+ else:
+ expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
+
+ to_encode = {"exp": expire, "sub": str(subject)}
+ encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
+ return encoded_jwt
+
+def verify_password(plain_password: str, hashed_password: str) -> bool:
+ return pwd_context.verify(plain_password, hashed_password)
+
+def get_password_hash(password: str) -> str:
+ return pwd_context.hash(password)
\ No newline at end of file
diff --git a/app/db/__init__.py b/app/db/__init__.py
new file mode 100644
index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391
diff --git a/app/db/__pycache__/__init__.cpython-312.pyc b/app/db/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..c796f745236bde10c1c2f824f6751277c5b7885b
Binary files /dev/null and b/app/db/__pycache__/__init__.cpython-312.pyc differ
diff --git a/app/db/__pycache__/database.cpython-312.pyc b/app/db/__pycache__/database.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..31dd7bb71bacb6c16fd9737030a01d8127f47f01
Binary files /dev/null and b/app/db/__pycache__/database.cpython-312.pyc differ
diff --git a/app/db/__pycache__/models.cpython-312.pyc b/app/db/__pycache__/models.cpython-312.pyc
new file mode 100644
index 0000000000000000000000000000000000000000..bc7793aa1839ecd15ac7c3dbfbcb24b1ea65f0e5
Binary files /dev/null and b/app/db/__pycache__/models.cpython-312.pyc differ
diff --git a/app/db/database.py b/app/db/database.py
new file mode 100644
index 0000000000000000000000000000000000000000..4249bed35c05a514f8ab69841051fa8a5083230b
--- /dev/null
+++ b/app/db/database.py
@@ -0,0 +1,30 @@
+from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
+from sqlalchemy.orm import sessionmaker, declarative_base
+from sqlalchemy import create_engine
+from ..core.config import settings
+
+# Create async engine for FastAPI
+async_engine = create_async_engine(
+ settings.DATABASE_URL,
+ echo=True,
+ future=True,
+ pool_pre_ping=True
+)
+
+# Create async session factory
+AsyncSessionLocal = async_sessionmaker(
+ bind=async_engine,
+ class_=AsyncSession,
+ expire_on_commit=False
+)
+
+# Create declarative base for models
+Base = declarative_base()
+
+# Database dependency
+async def get_db():
+ async with AsyncSessionLocal() as session:
+ try:
+ yield session
+ finally:
+ await session.close()
\ No newline at end of file
diff --git a/app/db/init_db.py b/app/db/init_db.py
new file mode 100644
index 0000000000000000000000000000000000000000..f0adbde49333096ce2841cc1cf1172a322a7474c
--- /dev/null
+++ b/app/db/init_db.py
@@ -0,0 +1,77 @@
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from ..core.config import settings
+from ..core.security import get_password_hash
+from datetime import datetime
+from .models import Base, User, Product
+import asyncio
+
+def init_db():
+ # Create synchronous engine for initialization
+ engine = create_engine(
+ settings.DATABASE_URL.replace("+asyncpg", ""),
+ echo=True
+ )
+
+ # Create all tables
+ Base.metadata.create_all(bind=engine)
+
+ # Create session
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+
+ try:
+ # Create default admin user if not exists
+ admin_user = session.query(User).filter_by(email="admin@example.com").first()
+ if not admin_user:
+ admin_user = User(
+ email="admin@example.com",
+ username="admin",
+ full_name="System Administrator",
+ hashed_password=get_password_hash("admin123"), # Change in production
+ is_active=True,
+ is_superuser=True,
+ roles=["admin"],
+ created_at=datetime.utcnow()
+ )
+ session.add(admin_user)
+ print("Created default admin user.")
+
+ # Create default product categories as products
+ categories = [
+ "Soups & Stews",
+ "Rice Dishes",
+ "Swallow & Fufu",
+ "Snacks & Small Chops",
+ "Protein & Meat",
+ "Drinks"
+ ]
+
+ for category in categories:
+ exists = session.query(Product).filter_by(name=category).first()
+ if not exists:
+ product = Product(
+ name=category,
+ description=f"Category: {category}",
+ price=0.0, # Category products have zero price
+ category=category,
+ inventory_count=0, # Categories don't have inventory
+ seller_id=admin_user.id if admin_user else 1, # Link to admin user
+ created_at=datetime.utcnow()
+ )
+ session.add(product)
+
+ print("Initialized product categories.")
+
+ # Commit changes
+ session.commit()
+
+ except Exception as e:
+ print(f"Error during initialization: {e}")
+ session.rollback()
+ raise
+ finally:
+ session.close()
+
+if __name__ == "__main__":
+ init_db()
\ No newline at end of file
diff --git a/app/db/models.py b/app/db/models.py
new file mode 100644
index 0000000000000000000000000000000000000000..6b8f8aabd981fc3b05a4a8a4d7fd9c6734496c5f
--- /dev/null
+++ b/app/db/models.py
@@ -0,0 +1,131 @@
+from sqlalchemy import Column, Integer, String, Boolean, DateTime, Float, ForeignKey, ARRAY, JSON, Table
+from sqlalchemy.orm import relationship, mapped_column, Mapped
+from sqlalchemy.dialects.postgresql import JSONB
+from datetime import datetime
+from typing import List, Optional
+from .database import Base
+
+# Association tables for many-to-many relationships
+user_roles = Table(
+ 'user_roles',
+ Base.metadata,
+ Column('user_id', Integer, ForeignKey('users.id')),
+ Column('role_id', Integer, ForeignKey('roles.id'))
+)
+
+class User(Base):
+ __tablename__ = "users"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ email: Mapped[str] = mapped_column(String, unique=True, index=True)
+ username: Mapped[str] = mapped_column(String, unique=True, index=True)
+ full_name: Mapped[str] = mapped_column(String)
+ hashed_password: Mapped[str] = mapped_column(String)
+ is_active: Mapped[bool] = mapped_column(Boolean, default=True)
+ is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
+ roles: Mapped[List[str]] = mapped_column(ARRAY(String), default=list)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+
+ # Relationships
+ products = relationship("Product", back_populates="seller")
+ orders = relationship("Order", back_populates="customer")
+ notifications = relationship("Notification", back_populates="user")
+
+class Product(Base):
+ __tablename__ = "products"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ name: Mapped[str] = mapped_column(String, index=True)
+ description: Mapped[str] = mapped_column(String)
+ price: Mapped[float] = mapped_column(Float)
+ category: Mapped[str] = mapped_column(String, index=True)
+ inventory_count: Mapped[int] = mapped_column(Integer)
+ seller_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.utcnow,
+ onupdate=datetime.utcnow
+ )
+
+ # Relationships
+ seller = relationship("User", back_populates="products")
+ order_items = relationship("OrderItem", back_populates="product")
+
+class Order(Base):
+ __tablename__ = "orders"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ customer_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
+ total_amount: Mapped[float] = mapped_column(Float)
+ status: Mapped[str] = mapped_column(String)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.utcnow,
+ onupdate=datetime.utcnow
+ )
+
+ # Relationships
+ customer = relationship("User", back_populates="orders")
+ items = relationship("OrderItem", back_populates="order", cascade="all, delete-orphan")
+
+class OrderItem(Base):
+ __tablename__ = "order_items"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ order_id: Mapped[int] = mapped_column(ForeignKey("orders.id"))
+ product_id: Mapped[int] = mapped_column(ForeignKey("products.id"))
+ quantity: Mapped[int] = mapped_column(Integer)
+ price: Mapped[float] = mapped_column(Float)
+
+ # Relationships
+ order = relationship("Order", back_populates="items")
+ product = relationship("Product", back_populates="order_items")
+
+class Notification(Base):
+ __tablename__ = "notifications"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
+ title: Mapped[str] = mapped_column(String)
+ message: Mapped[str] = mapped_column(String)
+ type: Mapped[str] = mapped_column(String)
+ data: Mapped[Optional[dict]] = mapped_column(JSONB)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+ read: Mapped[bool] = mapped_column(Boolean, default=False)
+
+ # Relationship
+ user = relationship("User", back_populates="notifications")
+
+class Event(Base):
+ __tablename__ = "events"
+
+ id: Mapped[int] = mapped_column(primary_key=True)
+ user_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
+ title: Mapped[str] = mapped_column(String)
+ description: Mapped[str] = mapped_column(String)
+ start_time: Mapped[datetime] = mapped_column(DateTime)
+ end_time: Mapped[datetime] = mapped_column(DateTime)
+ attendees: Mapped[List[str]] = mapped_column(ARRAY(String), default=list)
+ is_all_day: Mapped[bool] = mapped_column(Boolean, default=False)
+ reminder_minutes: Mapped[int] = mapped_column(Integer)
+ status: Mapped[str] = mapped_column(String)
+ attendee_responses: Mapped[dict] = mapped_column(JSONB, default=dict)
+ created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
+ updated_at: Mapped[datetime] = mapped_column(
+ DateTime,
+ default=datetime.utcnow,
+ onupdate=datetime.utcnow
+ )
+ # Fields for recurring events
+ is_recurring: Mapped[bool] = mapped_column(Boolean, default=False)
+ recurrence_pattern: Mapped[Optional[str]] = mapped_column(String)
+ recurrence_group: Mapped[Optional[str]] = mapped_column(String)
+ recurrence_end_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
+ parent_event_id: Mapped[Optional[int]] = mapped_column(Integer)
+ sequence_number: Mapped[Optional[int]] = mapped_column(Integer)
+ reminder_sent: Mapped[bool] = mapped_column(Boolean, default=False)
+
+ # Relationship
+ user = relationship("User")
\ No newline at end of file
diff --git a/app/db/schemas.py b/app/db/schemas.py
new file mode 100644
index 0000000000000000000000000000000000000000..1f7937b740585151f1d45f174d7ca4beb6ef83d3
--- /dev/null
+++ b/app/db/schemas.py
@@ -0,0 +1,63 @@
+from sqlalchemy.orm import validates
+from sqlalchemy import event
+from datetime import datetime
+from .models import User, Product, Order, Event, Notification
+import re
+
+@validates('email')
+def validate_email(self, key, email):
+ if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
+ raise ValueError('Invalid email address')
+ return email
+
+@validates('username')
+def validate_username(self, key, username):
+ if len(username) < 3:
+ raise ValueError('Username must be at least 3 characters long')
+ return username
+
+@validates('inventory_count')
+def validate_inventory(self, key, count):
+ if count < 0:
+ raise ValueError('Inventory count cannot be negative')
+ return count
+
+@validates('price')
+def validate_price(self, key, price):
+ if price < 0:
+ raise ValueError('Price cannot be negative')
+ return price
+
+# Event listeners for automatic timestamps
+@event.listens_for(Product, 'before_insert')
+def set_created_at(mapper, connection, target):
+ target.created_at = datetime.utcnow()
+ target.updated_at = datetime.utcnow()
+
+@event.listens_for(Product, 'before_update')
+def set_updated_at(mapper, connection, target):
+ target.updated_at = datetime.utcnow()
+
+@event.listens_for(Order, 'before_insert')
+def set_order_created_at(mapper, connection, target):
+ target.created_at = datetime.utcnow()
+ target.updated_at = datetime.utcnow()
+
+@event.listens_for(Order, 'before_update')
+def set_order_updated_at(mapper, connection, target):
+ target.updated_at = datetime.utcnow()
+
+@event.listens_for(Event, 'before_insert')
+def set_event_created_at(mapper, connection, target):
+ target.created_at = datetime.utcnow()
+ target.updated_at = datetime.utcnow()
+
+@event.listens_for(Event, 'before_update')
+def set_event_updated_at(mapper, connection, target):
+ target.updated_at = datetime.utcnow()
+
+# Add validators to models
+User.validate_email = validate_email
+User.validate_username = validate_username
+Product.validate_inventory = validate_inventory
+Product.validate_price = validate_price
\ No newline at end of file
diff --git a/app/main.py b/app/main.py
new file mode 100644
index 0000000000000000000000000000000000000000..6ab22e731e4e4428da5e16e1011e93fc79e0daa0
--- /dev/null
+++ b/app/main.py
@@ -0,0 +1,108 @@
+from fastapi import FastAPI, Request, WebSocket
+from fastapi.middleware.cors import CORSMiddleware
+from .core.config import settings
+from .db.database import engine, Base
+from .api import auth, products, orders, users, analytics, files, notifications, calendar, scheduler, maintenance
+from .utils.rate_limiter import rate_limiter
+from .utils.logger import log_api_request
+from .utils.tasks import run_periodic_tasks
+import time
+import logging
+import asyncio
+from typing import List
+
+# Configure logging
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION)
+
+# Store active WebSocket connections and background tasks
+active_connections: List[WebSocket] = []
+background_tasks = set()
+
+# Configure CORS
+app.add_middleware(
+ CORSMiddleware,
+ allow_origins=["*"], # Configure appropriately for production
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# WebSocket connection manager
+@app.websocket("/ws")
+async def websocket_endpoint(websocket: WebSocket):
+ await websocket.accept()
+ active_connections.append(websocket)
+ try:
+ while True:
+ data = await websocket.receive_text()
+ except:
+ active_connections.remove(websocket)
+
+# Notification broadcaster
+async def broadcast_notification(message: dict):
+ for connection in active_connections:
+ try:
+ await connection.send_json(message)
+ except:
+ active_connections.remove(connection)
+
+# Request logging and rate limiting middleware
+@app.middleware("http")
+async def middleware(request: Request, call_next):
+ await rate_limiter.check_rate_limit(request)
+ start_time = time.time()
+ response = await call_next(request)
+ end_time = time.time()
+ duration = end_time - start_time
+ log_api_request(
+ method=request.method,
+ path=request.url.path,
+ status_code=response.status_code,
+ duration=duration
+ )
+ return response
+
+# Application startup and shutdown events
+@app.on_event("startup")
+async def startup_event():
+ # Create all database tables
+ async with engine.begin() as conn:
+ await conn.run_sync(Base.metadata.create_all)
+
+ # Start background tasks
+ task = asyncio.create_task(run_periodic_tasks())
+ background_tasks.add(task)
+ task.add_done_callback(background_tasks.discard)
+
+@app.on_event("shutdown")
+async def shutdown_event():
+ # Cancel background tasks
+ for task in background_tasks:
+ task.cancel()
+
+ # Close WebSocket connections
+ for connection in active_connections:
+ await connection.close()
+
+# Include routers
+app.include_router(auth.router, prefix=f"{settings.API_V1_STR}/auth", tags=["auth"])
+app.include_router(users.router, prefix=f"{settings.API_V1_STR}/users", tags=["users"])
+app.include_router(products.router, prefix=f"{settings.API_V1_STR}/products", tags=["products"])
+app.include_router(orders.router, prefix=f"{settings.API_V1_STR}/orders", tags=["orders"])
+app.include_router(analytics.router, prefix=f"{settings.API_V1_STR}/analytics", tags=["analytics"])
+app.include_router(files.router, prefix=f"{settings.API_V1_STR}/files", tags=["files"])
+app.include_router(notifications.router, prefix=f"{settings.API_V1_STR}/notifications", tags=["notifications"])
+app.include_router(calendar.router, prefix=f"{settings.API_V1_STR}/calendar", tags=["calendar"])
+app.include_router(scheduler.router, prefix=f"{settings.API_V1_STR}/scheduler", tags=["scheduler"])
+app.include_router(maintenance.router, prefix=f"{settings.API_V1_STR}/maintenance", tags=["maintenance"])
+
+@app.get("/")
+async def root():
+ return {
+ "message": f"Welcome to {settings.PROJECT_NAME} v{settings.VERSION}",
+ "docs_url": "/docs",
+ "openapi_url": "/openapi.json"
+ }
\ No newline at end of file
diff --git a/app/schemas/events.py b/app/schemas/events.py
new file mode 100644
index 0000000000000000000000000000000000000000..4225d12daace193ecec9bddcc6a1428b8f383830
--- /dev/null
+++ b/app/schemas/events.py
@@ -0,0 +1,87 @@
+from pydantic import BaseModel, validator
+from typing import List, Optional, Dict, Any
+from datetime import datetime
+
+class EventBase(BaseModel):
+ title: str
+ description: str
+ start_time: datetime
+ end_time: datetime
+ is_all_day: bool = False
+ reminder_minutes: int = 30
+
+ @validator('end_time')
+ def end_time_after_start_time(cls, v, values):
+ if 'start_time' in values and v <= values['start_time']:
+ raise ValueError('end_time must be after start_time')
+ return v
+
+ @validator('reminder_minutes')
+ def valid_reminder_minutes(cls, v):
+ if v < 0:
+ raise ValueError('reminder_minutes cannot be negative')
+ return v
+
+class EventCreate(EventBase):
+ attendees: List[str] = []
+
+class EventUpdate(BaseModel):
+ title: Optional[str] = None
+ description: Optional[str] = None
+ start_time: Optional[datetime] = None
+ end_time: Optional[datetime] = None
+ is_all_day: Optional[bool] = None
+ reminder_minutes: Optional[int] = None
+ attendees: Optional[List[str]] = None
+
+ @validator('reminder_minutes')
+ def valid_reminder_minutes(cls, v):
+ if v is not None and v < 0:
+ raise ValueError('reminder_minutes cannot be negative')
+ return v
+
+class EventInDB(EventBase):
+ id: str
+ user_id: str
+ attendees: List[str]
+ status: str
+ attendee_responses: Dict[str, str]
+ created_at: datetime
+ updated_at: Optional[datetime] = None
+ reminder_sent: bool = False
+ is_recurring: bool = False
+ recurrence_group: Optional[str] = None
+ parent_event_id: Optional[str] = None
+ sequence_number: Optional[int] = None
+
+ class Config:
+ orm_mode = True
+
+class RecurringEventCreate(EventCreate):
+ recurrence_pattern: str
+ recurrence_end_date: Optional[datetime] = None
+
+ @validator('recurrence_pattern')
+ def valid_recurrence_pattern(cls, v):
+ valid_patterns = ['daily', 'weekly', 'monthly', 'yearly']
+ if v not in valid_patterns:
+ raise ValueError(f'recurrence_pattern must be one of: {", ".join(valid_patterns)}')
+ return v
+
+ @validator('recurrence_end_date')
+ def end_date_after_start_time(cls, v, values):
+ if v is not None and 'start_time' in values and v <= values['start_time']:
+ raise ValueError('recurrence_end_date must be after start_time')
+ return v
+
+class RecurringEventUpdate(EventUpdate):
+ recurrence_pattern: Optional[str] = None
+ recurrence_end_date: Optional[datetime] = None
+
+ @validator('recurrence_pattern')
+ def valid_recurrence_pattern(cls, v):
+ if v is not None:
+ valid_patterns = ['daily', 'weekly', 'monthly', 'yearly']
+ if v not in valid_patterns:
+ raise ValueError(f'recurrence_pattern must be one of: {", ".join(valid_patterns)}')
+ return v
\ No newline at end of file
diff --git a/app/services/analytics.py b/app/services/analytics.py
new file mode 100644
index 0000000000000000000000000000000000000000..b6baba6e91aeca5da0efdd1560068dbeec0ae39a
--- /dev/null
+++ b/app/services/analytics.py
@@ -0,0 +1,131 @@
+from datetime import datetime, timedelta
+from ..db.database import db
+from ..utils.cache import cache
+from typing import Dict, List, Any
+
+class AnalyticsService:
+ @staticmethod
+ async def get_sales_analytics(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
+ cache_key = f"sales_analytics:{start_date.date()}:{end_date.date()}"
+ cached_data = await cache.get_cache(cache_key)
+ if cached_data:
+ return cached_data
+
+ pipeline = [
+ {
+ "$match": {
+ "created_at": {
+ "$gte": start_date,
+ "$lte": end_date
+ },
+ "status": {"$in": ["completed", "delivered"]}
+ }
+ },
+ {
+ "$group": {
+ "_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}},
+ "total_sales": {"$sum": "$total_amount"},
+ "order_count": {"$sum": 1}
+ }
+ },
+ {"$sort": {"_id": 1}}
+ ]
+
+ sales_data = await db.db["orders"].aggregate(pipeline).to_list(None)
+ result = {
+ "daily_sales": sales_data,
+ "total_revenue": sum(day["total_sales"] for day in sales_data),
+ "total_orders": sum(day["order_count"] for day in sales_data),
+ "average_order_value": sum(day["total_sales"] for day in sales_data) /
+ (sum(day["order_count"] for day in sales_data) or 1)
+ }
+
+ await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
+ return result
+
+ @staticmethod
+ async def get_product_analytics() -> Dict[str, Any]:
+ cache_key = "product_analytics"
+ cached_data = await cache.get_cache(cache_key)
+ if cached_data:
+ return cached_data
+
+ pipeline = [
+ {
+ "$unwind": "$products"
+ },
+ {
+ "$group": {
+ "_id": "$products.product_id",
+ "total_quantity": {"$sum": "$products.quantity"},
+ "total_revenue": {
+ "$sum": {
+ "$multiply": ["$products.price", "$products.quantity"]
+ }
+ }
+ }
+ },
+ {
+ "$sort": {"total_revenue": -1}
+ },
+ {
+ "$limit": 10
+ }
+ ]
+
+ top_products = await db.db["orders"].aggregate(pipeline).to_list(None)
+
+ # Get product details
+ for product in top_products:
+ product_detail = await db.db["products"].find_one({"_id": product["_id"]})
+ if product_detail:
+ product["name"] = product_detail["name"]
+ product["category"] = product_detail["category"]
+
+ result = {
+ "top_products": top_products,
+ "total_products": await db.db["products"].count_documents({}),
+ "low_stock_products": await db.db["products"].count_documents({"inventory_count": {"$lt": 10}})
+ }
+
+ await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
+ return result
+
+ @staticmethod
+ async def get_customer_analytics() -> Dict[str, Any]:
+ cache_key = "customer_analytics"
+ cached_data = await cache.get_cache(cache_key)
+ if cached_data:
+ return cached_data
+
+ pipeline = [
+ {
+ "$group": {
+ "_id": "$customer_id",
+ "total_orders": {"$sum": 1},
+ "total_spent": {"$sum": "$total_amount"},
+ "last_order": {"$max": "$created_at"}
+ }
+ },
+ {
+ "$sort": {"total_spent": -1}
+ }
+ ]
+
+ customer_data = await db.db["orders"].aggregate(pipeline).to_list(None)
+
+ result = {
+ "total_customers": len(customer_data),
+ "top_customers": customer_data[:10],
+ "average_customer_value": sum(c["total_spent"] for c in customer_data) / (len(customer_data) or 1),
+ "customer_segments": {
+ "high_value": len([c for c in customer_data if c["total_spent"] > 1000]),
+ "medium_value": len([c for c in customer_data if 500 <= c["total_spent"] <= 1000]),
+ "low_value": len([c for c in customer_data if c["total_spent"] < 500])
+ }
+ }
+
+ await cache.set_cache(cache_key, result, expire=3600) # Cache for 1 hour
+ return result
+
+analytics = AnalyticsService()
\ No newline at end of file
diff --git a/app/services/backup.py b/app/services/backup.py
new file mode 100644
index 0000000000000000000000000000000000000000..5417a540a3a4d67f8ecaee97c8b19f90d7c536d2
--- /dev/null
+++ b/app/services/backup.py
@@ -0,0 +1,179 @@
+import os
+import shutil
+import json
+import tarfile
+from datetime import datetime
+from typing import Dict, Any, List
+from bson import ObjectId
+from ..db.database import db
+from ..utils.logger import logger
+
+class BackupService:
+ def __init__(self):
+ self.backup_dir = "backups"
+ self._ensure_backup_dir()
+
+ def _ensure_backup_dir(self):
+ """Ensure backup directory exists"""
+ if not os.path.exists(self.backup_dir):
+ os.makedirs(self.backup_dir)
+
+ async def create_backup(self, include_files: bool = True) -> Dict[str, Any]:
+ """Create a new system backup"""
+ try:
+ timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
+ backup_id = str(ObjectId())
+ backup_name = f"backup_{timestamp}_{backup_id}"
+ backup_path = os.path.join(self.backup_dir, backup_name)
+
+ # Create backup directory
+ os.makedirs(backup_path, exist_ok=True)
+
+ # Backup database collections
+ db_backup = {}
+ for collection in await db.db.list_collection_names():
+ docs = await db.db[collection].find().to_list(None)
+ db_backup[collection] = [
+ {**doc, "_id": str(doc["_id"])}
+ for doc in docs
+ ]
+
+ # Save database backup
+ with open(os.path.join(backup_path, "database.json"), "w") as f:
+ json.dump(db_backup, f, default=str)
+
+ # Backup files if requested
+ if include_files:
+ uploads_dir = "uploads"
+ if os.path.exists(uploads_dir):
+ shutil.copytree(
+ uploads_dir,
+ os.path.join(backup_path, "uploads"),
+ dirs_exist_ok=True
+ )
+
+ # Create archive
+ archive_path = f"{backup_path}.tar.gz"
+ with tarfile.open(archive_path, "w:gz") as tar:
+ tar.add(backup_path, arcname=os.path.basename(backup_path))
+
+ # Clean up temporary directory
+ shutil.rmtree(backup_path)
+
+ # Record backup in database
+ backup_info = {
+ "_id": backup_id,
+ "filename": f"{backup_name}.tar.gz",
+ "path": archive_path,
+ "created_at": datetime.utcnow(),
+ "size": os.path.getsize(archive_path),
+ "includes_files": include_files
+ }
+
+ await db.db["backup_history"].insert_one(backup_info)
+
+ return {
+ "id": backup_id,
+ "path": archive_path,
+ "size": backup_info["size"],
+ "created_at": backup_info["created_at"]
+ }
+
+ except Exception as e:
+ logger.error(f"Backup creation failed: {str(e)}")
+ raise
+
+ async def restore_backup(self, backup_path: str) -> Dict[str, Any]:
+ """Restore system from a backup"""
+ try:
+ if not os.path.exists(backup_path):
+ raise FileNotFoundError("Backup file not found")
+
+ # Create temporary restoration directory
+ restore_dir = f"restore_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
+ os.makedirs(restore_dir, exist_ok=True)
+
+ # Extract archive
+ with tarfile.open(backup_path, "r:gz") as tar:
+ tar.extractall(restore_dir)
+
+ backup_contents = os.listdir(restore_dir)[0]
+ backup_root = os.path.join(restore_dir, backup_contents)
+
+ # Restore database
+ with open(os.path.join(backup_root, "database.json"), "r") as f:
+ db_backup = json.load(f)
+
+ # Clear existing collections
+ for collection in await db.db.list_collection_names():
+ await db.db[collection].delete_many({})
+
+ # Restore collections
+ for collection, docs in db_backup.items():
+ if docs:
+ # Convert string IDs back to ObjectId
+ for doc in docs:
+ doc["_id"] = ObjectId(doc["_id"])
+ await db.db[collection].insert_many(docs)
+
+ # Restore files if present
+ uploads_source = os.path.join(backup_root, "uploads")
+ if os.path.exists(uploads_source):
+ if os.path.exists("uploads"):
+ shutil.rmtree("uploads")
+ shutil.copytree(uploads_source, "uploads")
+
+ # Clean up
+ shutil.rmtree(restore_dir)
+
+ return {
+ "success": True,
+ "collections_restored": len(db_backup),
+ "files_restored": os.path.exists(uploads_source)
+ }
+
+ except Exception as e:
+ logger.error(f"Backup restoration failed: {str(e)}")
+ raise
+ finally:
+ if os.path.exists(restore_dir):
+ shutil.rmtree(restore_dir)
+
+ async def list_backups(self) -> List[Dict[str, Any]]:
+ """List all available backups"""
+ try:
+ backups = await db.db["backup_history"].find().sort("created_at", -1).to_list(None)
+ return [
+ {
+ "id": str(backup["_id"]),
+ "filename": backup["filename"],
+ "created_at": backup["created_at"],
+ "size": backup["size"],
+ "includes_files": backup["includes_files"]
+ }
+ for backup in backups
+ ]
+ except Exception as e:
+ logger.error(f"Failed to list backups: {str(e)}")
+ raise
+
+ async def delete_backup(self, backup_id: str) -> bool:
+ """Delete a backup"""
+ try:
+ backup = await db.db["backup_history"].find_one({"_id": backup_id})
+ if not backup:
+ return False
+
+ # Delete the physical backup file
+ if os.path.exists(backup["path"]):
+ os.remove(backup["path"])
+
+ # Remove from database
+ await db.db["backup_history"].delete_one({"_id": backup_id})
+ return True
+
+ except Exception as e:
+ logger.error(f"Failed to delete backup: {str(e)}")
+ raise
+
+backup = BackupService()
\ No newline at end of file
diff --git a/app/services/calendar.py b/app/services/calendar.py
new file mode 100644
index 0000000000000000000000000000000000000000..5adc359db51bb384a57f6f8d3e69087f10eedf03
--- /dev/null
+++ b/app/services/calendar.py
@@ -0,0 +1,215 @@
+from datetime import datetime, timedelta
+from typing import List, Dict, Any, Optional
+from bson import ObjectId
+from ..db.database import db
+from ..utils.cache import cache
+from ..services.notifications import notifications
+
+class CalendarService:
+ async def create_event(
+ self,
+ user_id: str,
+ title: str,
+ description: str,
+ start_time: datetime,
+ end_time: datetime,
+ attendees: List[str] = None,
+ is_all_day: bool = False,
+ reminder_minutes: int = 30
+ ) -> Dict[str, Any]:
+ """Create a new calendar event"""
+ event = {
+ "user_id": user_id,
+ "title": title,
+ "description": description,
+ "start_time": start_time,
+ "end_time": end_time,
+ "attendees": attendees or [],
+ "is_all_day": is_all_day,
+ "reminder_minutes": reminder_minutes,
+ "status": "scheduled",
+ "created_at": datetime.utcnow()
+ }
+
+ result = await db.db["events"].insert_one(event)
+ event["_id"] = result.inserted_id
+
+ # Clear cache for affected users
+ cache_keys = [f"user_events:{user_id}"]
+ for attendee in attendees or []:
+ cache_keys.append(f"user_events:{attendee}")
+
+ for key in cache_keys:
+ await cache.delete_cache(key)
+
+ # Notify attendees
+ if attendees:
+ for attendee in attendees:
+ await notifications.create_notification(
+ user_id=attendee,
+ title=f"New Event Invitation: {title}",
+ message=f"You have been invited to an event: {title}",
+ notification_type="event_invitation",
+ data={"event_id": str(result.inserted_id)}
+ )
+
+ return event
+
+ async def get_user_events(
+ self,
+ user_id: str,
+ start_date: datetime,
+ end_date: datetime,
+ include_attendee_events: bool = True
+ ) -> List[Dict[str, Any]]:
+ """Get events for a user within a date range"""
+ cache_key = f"user_events:{user_id}:{start_date.date()}:{end_date.date()}"
+ cached = await cache.get_cache(cache_key)
+ if cached:
+ return cached
+
+ query = {
+ "$or": [
+ {"user_id": user_id}, # Events created by user
+ {"attendees": user_id} if include_attendee_events else {"_id": None}
+ ],
+ "start_time": {"$gte": start_date},
+ "end_time": {"$lte": end_date}
+ }
+
+ cursor = db.db["events"].find(query).sort("start_time", 1)
+ events = await cursor.to_list(None)
+
+ await cache.set_cache(cache_key, events, expire=300) # Cache for 5 minutes
+ return events
+
+ async def update_event(
+ self,
+ event_id: str,
+ user_id: str,
+ update_data: Dict[str, Any]
+ ) -> Optional[Dict[str, Any]]:
+ """Update an event"""
+ if not ObjectId.is_valid(event_id):
+ return None
+
+ event = await db.db["events"].find_one({
+ "_id": ObjectId(event_id),
+ "user_id": user_id # Only creator can update
+ })
+
+ if not event:
+ return None
+
+ update_data["updated_at"] = datetime.utcnow()
+
+ await db.db["events"].update_one(
+ {"_id": ObjectId(event_id)},
+ {"$set": update_data}
+ )
+
+ # Clear cache for affected users
+ cache_keys = [f"user_events:{user_id}"]
+ for attendee in event.get("attendees", []):
+ cache_keys.append(f"user_events:{attendee}")
+
+ for key in cache_keys:
+ await cache.delete_cache(key)
+
+ # Notify attendees of changes
+ if "start_time" in update_data or "end_time" in update_data:
+ for attendee in event.get("attendees", []):
+ await notifications.create_notification(
+ user_id=attendee,
+ title=f"Event Updated: {event['title']}",
+ message=f"An event you're attending has been updated",
+ notification_type="event_update",
+ data={"event_id": event_id}
+ )
+
+ return await db.db["events"].find_one({"_id": ObjectId(event_id)})
+
+ async def delete_event(self, event_id: str, user_id: str) -> bool:
+ """Delete an event"""
+ if not ObjectId.is_valid(event_id):
+ return False
+
+ event = await db.db["events"].find_one({
+ "_id": ObjectId(event_id),
+ "user_id": user_id # Only creator can delete
+ })
+
+ if not event:
+ return False
+
+ result = await db.db["events"].delete_one({"_id": ObjectId(event_id)})
+
+ if result.deleted_count > 0:
+ # Clear cache for affected users
+ cache_keys = [f"user_events:{user_id}"]
+ for attendee in event.get("attendees", []):
+ cache_keys.append(f"user_events:{attendee}")
+ # Notify attendees
+ await notifications.create_notification(
+ user_id=attendee,
+ title=f"Event Cancelled: {event['title']}",
+ message=f"An event you were attending has been cancelled",
+ notification_type="event_cancellation",
+ data={"event_id": event_id}
+ )
+
+ for key in cache_keys:
+ await cache.delete_cache(key)
+
+ return True
+ return False
+
+ async def respond_to_event(
+ self,
+ event_id: str,
+ user_id: str,
+ response: str
+ ) -> bool:
+ """Respond to an event invitation"""
+ if not ObjectId.is_valid(event_id):
+ return False
+
+ valid_responses = ["accepted", "declined", "maybe"]
+ if response not in valid_responses:
+ return False
+
+ event = await db.db["events"].find_one({
+ "_id": ObjectId(event_id),
+ "attendees": user_id
+ })
+
+ if not event:
+ return False
+
+ # Update response in attendee list
+ await db.db["events"].update_one(
+ {"_id": ObjectId(event_id)},
+ {
+ "$set": {
+ f"attendee_responses.{user_id}": response,
+ "updated_at": datetime.utcnow()
+ }
+ }
+ )
+
+ # Notify event creator
+ await notifications.create_notification(
+ user_id=event["user_id"],
+ title=f"Event Response: {event['title']}",
+ message=f"An attendee has {response} your event",
+ notification_type="event_response",
+ data={
+ "event_id": event_id,
+ "responder": user_id,
+ "response": response
+ }
+ )
+
+ return True
+
+calendar = CalendarService()
\ No newline at end of file
diff --git a/app/services/maintenance.py b/app/services/maintenance.py
new file mode 100644
index 0000000000000000000000000000000000000000..0df508e254c9619860eebd88cb788dea8f8955a4
--- /dev/null
+++ b/app/services/maintenance.py
@@ -0,0 +1,286 @@
+import os
+import psutil
+from datetime import datetime, timedelta
+from typing import Dict, Any, List
+
+from ..db.database import db
+from ..utils.logger import logger
+from ..utils.cache import cache
+from ..services.notifications import notifications
+from .backup import backup
+import gzip
+import shutil
+
+class MaintenanceService:
+ async def cleanup_expired_sessions(self) -> int:
+ """Clean up expired sessions from the database"""
+ try:
+ cutoff_date = datetime.utcnow() - timedelta(days=7)
+ result = await db.db["sessions"].delete_many({
+ "last_activity": {"$lt": cutoff_date}
+ })
+ logger.info(f"Cleaned up {result.deleted_count} expired sessions")
+ return result.deleted_count
+ except Exception as e:
+ logger.error(f"Error cleaning up sessions: {str(e)}")
+ return 0
+
+ async def archive_old_data(self) -> Dict[str, int]:
+ """Archive old data to maintain database performance"""
+ try:
+ archive_date = datetime.utcnow() - timedelta(days=365) # Archive data older than 1 year
+ archives = {}
+
+ # Archive old orders
+ old_orders = await db.db["orders"].find({
+ "created_at": {"$lt": archive_date},
+ "status": {"$in": ["delivered", "cancelled"]}
+ }).to_list(None)
+
+ if old_orders:
+ await db.db["archived_orders"].insert_many(old_orders)
+ result = await db.db["orders"].delete_many({
+ "_id": {"$in": [order["_id"] for order in old_orders]}
+ })
+ archives["orders"] = len(old_orders)
+
+ # Archive old notifications
+ old_notifications = await db.db["notifications"].find({
+ "created_at": {"$lt": archive_date},
+ "read": True
+ }).to_list(None)
+
+ if old_notifications:
+ await db.db["archived_notifications"].insert_many(old_notifications)
+ result = await db.db["notifications"].delete_many({
+ "_id": {"$in": [notif["_id"] for notif in old_notifications]}
+ })
+ archives["notifications"] = len(old_notifications)
+
+ # Archive old metrics (keep last 90 days)
+ cutoff_date = datetime.utcnow() - timedelta(days=90)
+ result = await db.db.system_metrics.delete_many({
+ "timestamp": {"$lt": cutoff_date}
+ })
+ logger.info(f"Archived {result.deleted_count} old metric records")
+ archives["metrics"] = result.deleted_count
+
+ # Archive old audit logs (keep last 180 days)
+ audit_cutoff = datetime.utcnow() - timedelta(days=180)
+ audit_result = await db.db.audit_logs.delete_many({
+ "timestamp": {"$lt": audit_cutoff}
+ })
+ logger.info(f"Archived {audit_result.deleted_count} old audit logs")
+ archives["audit_logs"] = audit_result.deleted_count
+
+ return archives
+ except Exception as e:
+ logger.error(f"Error archiving old data: {str(e)}")
+ return {}
+
+ async def check_system_health(self) -> Dict[str, Any]:
+ """Check various system health metrics"""
+ try:
+ health_data = {
+ "timestamp": datetime.utcnow(),
+ "database": {},
+ "cache": {},
+ "storage": {}
+ }
+
+ # Check database stats
+ db_stats = await db.db.command("dbStats")
+ health_data["database"] = {
+ "size": db_stats["dataSize"],
+ "collections": db_stats["collections"],
+ "indexes": db_stats["indexes"]
+ }
+
+ # Check Redis cache
+ try:
+ cache_info = await cache.redis_client.info()
+ health_data["cache"] = {
+ "connected": True,
+ "used_memory": cache_info["used_memory"],
+ "connected_clients": cache_info["connected_clients"]
+ }
+ except:
+ health_data["cache"] = {"connected": False}
+
+ # Check storage metrics
+ storage_stats = {
+ "uploads_size": await self._get_directory_size("uploads"),
+ "logs_size": await self._get_directory_size("logs")
+ }
+ health_data["storage"] = storage_stats
+
+ return health_data
+ except Exception as e:
+ logger.error(f"Error checking system health: {str(e)}")
+ return {"error": str(e)}
+
+ async def perform_database_maintenance(self):
+ """Perform routine database maintenance tasks"""
+ try:
+ # Cleanup expired sessions
+ await self.cleanup_expired_sessions()
+
+ # Run database vacuum and analyze
+ await db.db.command('analyze')
+ logger.info("Database maintenance completed successfully")
+ except Exception as e:
+ logger.error(f"Database maintenance failed: {str(e)}")
+ raise
+
+ async def monitor_system_resources(self) -> Dict[str, Any]:
+ """Monitor system resources and return metrics"""
+ try:
+ cpu_percent = psutil.cpu_percent()
+ memory = psutil.virtual_memory()
+ disk = psutil.disk_usage('/')
+
+ metrics = {
+ "cpu_usage": cpu_percent,
+ "memory_usage": memory.percent,
+ "disk_usage": disk.percent,
+ "timestamp": datetime.utcnow()
+ }
+
+ await db.db.system_metrics.insert_one(metrics)
+
+ # Alert if resources are critically low
+ if any([cpu_percent > 90, memory.percent > 90, disk.percent > 90]):
+ logger.warning("System resources critically low", extra=metrics)
+
+ return metrics
+ except Exception as e:
+ logger.error(f"Resource monitoring failed: {str(e)}")
+ raise
+
+ async def perform_scheduled_backup(self):
+ """Perform scheduled system backup"""
+ try:
+ result = await backup.create_backup(include_files=True)
+ logger.info(f"Scheduled backup completed successfully: {result['id']}")
+
+ # Cleanup old backups (keep last 7 days)
+ await self.cleanup_old_backups(days_to_keep=7)
+ except Exception as e:
+ logger.error(f"Scheduled backup failed: {str(e)}")
+ raise
+
+ async def cleanup_old_backups(self, days_to_keep: int = 7):
+ """Clean up backups older than specified days"""
+ try:
+ cutoff_date = datetime.utcnow() - timedelta(days=days_to_keep)
+ old_backups = await db.db.backup_history.find({
+ "created_at": {"$lt": cutoff_date}
+ }).to_list(None)
+
+ for old_backup in old_backups:
+ await backup.delete_backup(str(old_backup["_id"]))
+
+ logger.info(f"Cleaned up {len(old_backups)} old backups")
+ except Exception as e:
+ logger.error(f"Backup cleanup failed: {str(e)}")
+ raise
+
+ async def rotate_log_files(self):
+ """Rotate and archive log files"""
+ try:
+ log_dir = "logs"
+ if not os.path.exists(log_dir):
+ return
+
+ current_date = datetime.utcnow().strftime("%Y%m%d")
+ for filename in os.listdir(log_dir):
+ if filename.endswith(".log"):
+ src_path = os.path.join(log_dir, filename)
+ dst_path = os.path.join(log_dir, f"{filename}.{current_date}")
+
+ if os.path.exists(src_path):
+ os.rename(src_path, dst_path)
+
+ logger.info("Log rotation completed successfully")
+ except Exception as e:
+ logger.error(f"Log rotation failed: {str(e)}")
+ raise
+
+ async def manage_storage_quotas(self):
+ """Check and manage storage quotas"""
+ try:
+ results = {
+ "status": "ok",
+ "warnings": [],
+ "cleaned": 0
+ }
+
+ # Check uploads directory size
+ uploads_dir = "uploads"
+ if os.path.exists(uploads_dir):
+ total_size = sum(
+ os.path.getsize(os.path.join(dirpath, filename))
+ for dirpath, _, filenames in os.walk(uploads_dir)
+ for filename in filenames
+ )
+
+ # Alert if total size exceeds 90% of quota (e.g., 10GB)
+ quota_limit = 10 * 1024 * 1024 * 1024 # 10GB in bytes
+ if total_size > (quota_limit * 0.9):
+ warning_msg = f"Storage quota nearly reached: {total_size / quota_limit:.1%}"
+ results["warnings"].append(warning_msg)
+ await self._notify_resource_warning(warning_msg)
+
+ # Check database size
+ db_stats = await db.db.command("dbStats")
+ db_size = db_stats["dataSize"] + db_stats["indexSize"]
+
+ # Alert if database size exceeds 90% of quota (e.g., 5GB)
+ db_quota = 5 * 1024 * 1024 * 1024 # 5GB in bytes
+ if db_size > (db_quota * 0.9):
+ warning_msg = f"Database quota nearly reached: {db_size / db_quota:.1%}"
+ results["warnings"].append(warning_msg)
+ await self._notify_resource_warning(warning_msg)
+
+ # Clean up temporary uploads older than 24 hours
+ temp_dir = os.path.join("uploads", "temp")
+ if os.path.exists(temp_dir):
+ current_time = datetime.utcnow()
+ for file_name in os.listdir(temp_dir):
+ file_path = os.path.join(temp_dir, file_name)
+ file_age = datetime.fromtimestamp(os.path.getctime(file_path))
+
+ if current_time - file_age > timedelta(hours=24):
+ os.remove(file_path)
+ results["cleaned"] += 1
+
+ return results
+ except Exception as e:
+ logger.error(f"Error managing storage quotas: {str(e)}")
+ return {"error": str(e)}
+
+ async def _notify_resource_warning(self, message: str):
+ """Send notification for resource warnings"""
+ try:
+ # Get admin users
+ admin_users = await db.db["users"].find(
+ {"roles": "admin"}
+ ).to_list(None)
+
+ # Send notifications
+ for admin in admin_users:
+ await notifications.create_notification(
+ user_id=str(admin["_id"]),
+ title="System Resource Warning",
+ message=message,
+ notification_type="system_warning"
+ )
+ except Exception as e:
+ logger.error(f"Error sending resource warning: {str(e)}")
+
+ async def _get_directory_size(self, path: str) -> int:
+ """Get the total size of a directory in bytes"""
+ from pathlib import Path
+ return sum(f.stat().st_size for f in Path(path).glob('**/*') if f.is_file())
+
+maintenance = MaintenanceService()
\ No newline at end of file
diff --git a/app/services/notifications.py b/app/services/notifications.py
new file mode 100644
index 0000000000000000000000000000000000000000..0998b68481d397804afa4a73f62f159c1ff6442d
--- /dev/null
+++ b/app/services/notifications.py
@@ -0,0 +1,105 @@
+from typing import Dict, Any
+from datetime import datetime
+from ..db.database import db
+from ..core.config import settings
+from ..utils.cache import cache
+from ..main import broadcast_notification
+
+class NotificationService:
+ async def create_notification(
+ self,
+ user_id: str,
+ title: str,
+ message: str,
+ notification_type: str,
+ data: Dict[str, Any] = None
+ ):
+ """Create and store a notification"""
+ notification = {
+ "user_id": user_id,
+ "title": title,
+ "message": message,
+ "type": notification_type,
+ "data": data,
+ "created_at": datetime.utcnow(),
+ "read": False
+ }
+
+ # Store in database
+ await db.db["notifications"].insert_one(notification)
+
+ # Broadcast to connected clients
+ await broadcast_notification({
+ "type": "notification",
+ "data": notification
+ })
+
+ # Clear user's notification cache
+ await cache.delete_cache(f"user_notifications:{user_id}")
+
+ return notification
+
+ async def get_user_notifications(
+ self,
+ user_id: str,
+ skip: int = 0,
+ limit: int = 50,
+ unread_only: bool = False
+ ):
+ """Get notifications for a user"""
+ cache_key = f"user_notifications:{user_id}"
+ if not unread_only:
+ cached = await cache.get_cache(cache_key)
+ if cached:
+ return cached
+
+ query = {"user_id": user_id}
+ if unread_only:
+ query["read"] = False
+
+ cursor = db.db["notifications"].find(query)\
+ .sort("created_at", -1)\
+ .skip(skip)\
+ .limit(limit)
+
+ notifications = await cursor.to_list(length=limit)
+
+ if not unread_only:
+ await cache.set_cache(cache_key, notifications, expire=300) # Cache for 5 minutes
+
+ return notifications
+
+ async def mark_as_read(self, notification_id: str, user_id: str):
+ """Mark a notification as read"""
+ result = await db.db["notifications"].update_one(
+ {"_id": notification_id, "user_id": user_id},
+ {"$set": {"read": True}}
+ )
+
+ if result.modified_count > 0:
+ await cache.delete_cache(f"user_notifications:{user_id}")
+ return True
+ return False
+
+ async def mark_all_as_read(self, user_id: str):
+ """Mark all notifications as read for a user"""
+ result = await db.db["notifications"].update_many(
+ {"user_id": user_id, "read": False},
+ {"$set": {"read": True}}
+ )
+
+ await cache.delete_cache(f"user_notifications:{user_id}")
+ return result.modified_count
+
+ async def delete_notification(self, notification_id: str, user_id: str):
+ """Delete a notification"""
+ result = await db.db["notifications"].delete_one(
+ {"_id": notification_id, "user_id": user_id}
+ )
+
+ if result.deleted_count > 0:
+ await cache.delete_cache(f"user_notifications:{user_id}")
+ return True
+ return False
+
+notifications = NotificationService()
\ No newline at end of file
diff --git a/app/services/scheduler.py b/app/services/scheduler.py
new file mode 100644
index 0000000000000000000000000000000000000000..55cf4563120361a44f9acab0b4aa9c3a1218110b
--- /dev/null
+++ b/app/services/scheduler.py
@@ -0,0 +1,243 @@
+from datetime import datetime, timedelta
+from typing import List, Dict, Any, Optional
+from bson import ObjectId
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.cron import CronTrigger
+from ..db.database import db
+from ..utils.cache import cache
+from ..utils.logger import logger
+from ..services.calendar import calendar
+from ..services.maintenance import maintenance
+
+class SchedulerService:
+ def __init__(self):
+ self.scheduler = AsyncIOScheduler()
+ self._setup_maintenance_jobs()
+
+ def _setup_maintenance_jobs(self):
+ """Setup all maintenance related scheduled jobs"""
+ # Daily database maintenance at 2 AM
+ self.scheduler.add_job(
+ maintenance.perform_database_maintenance,
+ CronTrigger(hour=2),
+ id="daily_db_maintenance",
+ replace_existing=True
+ )
+
+ # Session cleanup every 6 hours
+ self.scheduler.add_job(
+ maintenance.cleanup_expired_sessions,
+ CronTrigger(hour="*/6"),
+ id="session_cleanup",
+ replace_existing=True
+ )
+
+ # System health check every 15 minutes
+ self.scheduler.add_job(
+ maintenance.monitor_system_resources,
+ CronTrigger(minute="*/15"),
+ id="health_check",
+ replace_existing=True
+ )
+
+ # Daily backup at 1 AM
+ self.scheduler.add_job(
+ maintenance.perform_scheduled_backup,
+ CronTrigger(hour=1),
+ id="daily_backup",
+ replace_existing=True
+ )
+
+ # Daily log rotation at 3 AM
+ self.scheduler.add_job(
+ maintenance.rotate_log_files,
+ CronTrigger(hour=3),
+ id="log_rotation",
+ replace_existing=True
+ )
+
+ # Storage quota check every 2 hours
+ self.scheduler.add_job(
+ maintenance.manage_storage_quotas,
+ CronTrigger(hour="*/2"),
+ id="storage_quota_check",
+ replace_existing=True
+ )
+
+ # Monthly data archiving at 4 AM on the 1st of each month
+ self.scheduler.add_job(
+ maintenance.archive_old_data,
+ CronTrigger(day=1, hour=4),
+ id="monthly_archiving",
+ replace_existing=True
+ )
+
+ def start(self):
+ """Start the scheduler"""
+ try:
+ self.scheduler.start()
+ logger.info("Scheduler started successfully")
+ except Exception as e:
+ logger.error(f"Failed to start scheduler: {str(e)}")
+ raise
+
+ def shutdown(self):
+ """Shutdown the scheduler"""
+ try:
+ self.scheduler.shutdown()
+ logger.info("Scheduler shutdown successfully")
+ except Exception as e:
+ logger.error(f"Error during scheduler shutdown: {str(e)}")
+ raise
+
+ def get_jobs(self):
+ """Get all scheduled jobs"""
+ return [
+ {
+ "id": job.id,
+ "name": job.name,
+ "next_run_time": job.next_run_time.isoformat() if job.next_run_time else None,
+ "trigger": str(job.trigger)
+ }
+ for job in self.scheduler.get_jobs()
+ ]
+
+ async def create_recurring_event(
+ self,
+ user_id: str,
+ title: str,
+ description: str,
+ start_time: datetime,
+ end_time: datetime,
+ recurrence_pattern: str, # daily, weekly, monthly, yearly
+ recurrence_end_date: Optional[datetime] = None,
+ attendees: List[str] = None,
+ reminder_minutes: int = 30
+ ) -> List[Dict[str, Any]]:
+ """Create recurring events based on pattern"""
+ events = []
+ current_start = start_time
+ current_end = end_time
+ duration = end_time - start_time
+
+ while True:
+ if recurrence_end_date and current_start > recurrence_end_date:
+ break
+
+ # Create individual event instance
+ event = await calendar.create_event(
+ user_id=user_id,
+ title=title,
+ description=description,
+ start_time=current_start,
+ end_time=current_end,
+ attendees=attendees,
+ reminder_minutes=reminder_minutes
+ )
+ events.append(event)
+
+ # Calculate next occurrence
+ if recurrence_pattern == "daily":
+ current_start += timedelta(days=1)
+ elif recurrence_pattern == "weekly":
+ current_start += timedelta(weeks=1)
+ elif recurrence_pattern == "monthly":
+ # Add one month (approximately)
+ if current_start.month == 12:
+ current_start = current_start.replace(year=current_start.year + 1, month=1)
+ else:
+ current_start = current_start.replace(month=current_start.month + 1)
+ elif recurrence_pattern == "yearly":
+ current_start = current_start.replace(year=current_start.year + 1)
+
+ current_end = current_start + duration
+
+ return events
+
+ async def update_recurring_event(
+ self,
+ event_id: str,
+ user_id: str,
+ update_data: Dict[str, Any],
+ update_future: bool = True
+ ) -> List[Dict[str, Any]]:
+ """Update a recurring event and optionally its future occurrences"""
+ # Get the original event
+ event = await db.db["events"].find_one({
+ "_id": ObjectId(event_id),
+ "user_id": user_id
+ })
+
+ if not event:
+ return []
+
+ # Update the current event
+ await calendar.update_event(event_id, user_id, update_data)
+
+ updated_events = [event]
+
+ # Update future occurrences if requested
+ if update_future:
+ future_events = await db.db["events"].find({
+ "recurrence_group": event.get("recurrence_group"),
+ "start_time": {"$gt": event["start_time"]},
+ "user_id": user_id
+ }).to_list(None)
+
+ for future_event in future_events:
+ await calendar.update_event(
+ str(future_event["_id"]),
+ user_id,
+ update_data
+ )
+ updated_events.append(future_event)
+
+ return updated_events
+
+ async def delete_recurring_event(
+ self,
+ event_id: str,
+ user_id: str,
+ delete_future: bool = True
+ ) -> bool:
+ """Delete a recurring event and optionally its future occurrences"""
+ event = await db.db["events"].find_one({
+ "_id": ObjectId(event_id),
+ "user_id": user_id
+ })
+
+ if not event:
+ return False
+
+ # Delete the current event
+ await calendar.delete_event(event_id, user_id)
+
+ # Delete future occurrences if requested
+ if delete_future and event.get("recurrence_group"):
+ await db.db["events"].delete_many({
+ "recurrence_group": event["recurrence_group"],
+ "start_time": {"$gt": event["start_time"]},
+ "user_id": user_id
+ })
+
+ return True
+
+ async def get_upcoming_recurring_events(
+ self,
+ user_id: str,
+ days: int = 30
+ ) -> List[Dict[str, Any]]:
+ """Get upcoming recurring events for a user"""
+ start_date = datetime.utcnow()
+ end_date = start_date + timedelta(days=days)
+
+ events = await calendar.get_user_events(
+ user_id=user_id,
+ start_date=start_date,
+ end_date=end_date,
+ include_attendee_events=True
+ )
+
+ return sorted(events, key=lambda x: x["start_time"])
+
+scheduler = SchedulerService()
\ No newline at end of file
diff --git a/app/templates/email/low_stock_alert.html b/app/templates/email/low_stock_alert.html
new file mode 100644
index 0000000000000000000000000000000000000000..ec7c3794d68f498c834cdd1032627234bb457e85
--- /dev/null
+++ b/app/templates/email/low_stock_alert.html
@@ -0,0 +1,41 @@
+
+
+
+
+
+
+
+
+
+
+
Product Stock Alert
+
The following product is running low on inventory:
+
+
+
+ - Product Name: {{ product_name }}
+ - Current Stock: {{ current_stock }} units
+
+
+
+ View Product
+
+
+
Please review and restock this item as needed to maintain adequate inventory levels.
+
+
+
+
+
\ No newline at end of file
diff --git a/app/templates/email/order_confirmation.html b/app/templates/email/order_confirmation.html
new file mode 100644
index 0000000000000000000000000000000000000000..ffd4bd8a4c296b46e8cb5b5bae8365aca36c907d
--- /dev/null
+++ b/app/templates/email/order_confirmation.html
@@ -0,0 +1,40 @@
+
+
+
+
+
+
+
+
+
+
Thank you for your order!
+
Order Details
+
Order ID: {{ order_id }}
+
Total Amount: ${{ "%.2f"|format(total_amount) }}
+
Status: {{ status|title }}
+
+
Products Ordered:
+
+ {% for product in products %}
+ - {{ product.name }} - Quantity: {{ product.quantity }} - Price: ${{ "%.2f"|format(product.price) }}
+ {% endfor %}
+
+
+
You can track your order status by clicking the button below:
+
Track Order
+
+
+
+
+
\ No newline at end of file
diff --git a/app/templates/email/password_reset.html b/app/templates/email/password_reset.html
new file mode 100644
index 0000000000000000000000000000000000000000..c82dfbc987a7b23e264644eff39fe3176935f3b8
--- /dev/null
+++ b/app/templates/email/password_reset.html
@@ -0,0 +1,38 @@
+
+
+
+
+
+
+
+
+
+
We received a request to reset your password. Click the button below to create a new password:
+
+
+ Reset Password
+
+
+
+
If you didn't request a password reset, please ignore this email or contact support if you have concerns.
+
+
+
This password reset link will expire in 30 minutes for security reasons.
+
+
+
+
+
\ No newline at end of file
diff --git a/app/templates/email/welcome.html b/app/templates/email/welcome.html
new file mode 100644
index 0000000000000000000000000000000000000000..0a405b3c5e55f6da62f90a217ad1b5f7503dc3b6
--- /dev/null
+++ b/app/templates/email/welcome.html
@@ -0,0 +1,45 @@
+
+
+
+
+
+
+
+
+
+
Hello {{ username }},
+
Welcome to our platform! We're excited to have you on board.
+
+
+
What you can do with your account:
+
+ - Manage products and inventory
+ - Process and track orders
+ - View analytics and reports
+ - Manage customer relationships
+
+
+
+
+ Get Started
+
+
+
If you have any questions or need assistance, our support team is here to help.
+
+
+
+
+
\ No newline at end of file
diff --git a/app/utils/cache.py b/app/utils/cache.py
new file mode 100644
index 0000000000000000000000000000000000000000..b57a438917fca0ea53fe32ccb1327bbbb18b09a7
--- /dev/null
+++ b/app/utils/cache.py
@@ -0,0 +1,55 @@
+import redis
+import json
+from ..core.config import settings
+from typing import Any, Optional
+
+class RedisCache:
+ def __init__(self):
+ self.redis_client = redis.Redis(
+ host=settings.REDIS_HOST,
+ port=settings.REDIS_PORT,
+ decode_responses=True
+ )
+
+ async def set_cache(self, key: str, value: Any, expire: int = 3600):
+ """Set a cache entry with optional expiration time (default 1 hour)"""
+ try:
+ self.redis_client.setex(
+ key,
+ expire,
+ json.dumps(value)
+ )
+ return True
+ except Exception as e:
+ print(f"Cache set error: {str(e)}")
+ return False
+
+ async def get_cache(self, key: str) -> Optional[Any]:
+ """Get a cached value by key"""
+ try:
+ value = self.redis_client.get(key)
+ return json.loads(value) if value else None
+ except Exception as e:
+ print(f"Cache get error: {str(e)}")
+ return None
+
+ async def delete_cache(self, key: str) -> bool:
+ """Delete a cache entry by key"""
+ try:
+ return bool(self.redis_client.delete(key))
+ except Exception as e:
+ print(f"Cache delete error: {str(e)}")
+ return False
+
+ async def clear_cache_pattern(self, pattern: str) -> bool:
+ """Clear all cache entries matching a pattern"""
+ try:
+ keys = self.redis_client.keys(pattern)
+ if keys:
+ return bool(self.redis_client.delete(*keys))
+ return True
+ except Exception as e:
+ print(f"Cache clear error: {str(e)}")
+ return False
+
+cache = RedisCache()
\ No newline at end of file
diff --git a/app/utils/email.py b/app/utils/email.py
new file mode 100644
index 0000000000000000000000000000000000000000..e43fe2f0c4015dbd581c7445014e91b0907f756e
--- /dev/null
+++ b/app/utils/email.py
@@ -0,0 +1,103 @@
+from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
+from pydantic import EmailStr
+from typing import List, Dict, Any
+from ..core.config import settings
+from pathlib import Path
+import aiofiles
+import jinja2
+
+class EmailService:
+ def __init__(self):
+ self.conf = ConnectionConfig(
+ MAIL_USERNAME=settings.MAIL_USERNAME,
+ MAIL_PASSWORD=settings.MAIL_PASSWORD,
+ MAIL_FROM=settings.MAIL_FROM,
+ MAIL_PORT=settings.MAIL_PORT,
+ MAIL_SERVER=settings.MAIL_SERVER,
+ MAIL_TLS=True,
+ MAIL_SSL=False,
+ TEMPLATE_FOLDER=Path(__file__).parent.parent / 'templates' / 'email'
+ )
+ self.fast_mail = FastMail(self.conf)
+
+ # Create templates directory if it doesn't exist
+ template_dir = Path(__file__).parent.parent / 'templates' / 'email'
+ template_dir.mkdir(parents=True, exist_ok=True)
+
+ async def send_email(
+ self,
+ email_to: List[EmailStr],
+ subject: str,
+ template_name: str,
+ template_data: Dict[str, Any]
+ ):
+ """Send an email using a template"""
+ try:
+ message = MessageSchema(
+ subject=subject,
+ recipients=email_to,
+ template_body=template_data,
+ subtype="html"
+ )
+
+ await self.fast_mail.send_message(
+ message,
+ template_name=template_name
+ )
+ return True
+ except Exception as e:
+ print(f"Failed to send email: {str(e)}")
+ return False
+
+ async def send_order_confirmation(self, email: EmailStr, order_data: Dict[str, Any]):
+ """Send order confirmation email"""
+ return await self.send_email(
+ email_to=[email],
+ subject="Order Confirmation",
+ template_name="order_confirmation.html",
+ template_data={
+ "order_id": str(order_data["_id"]),
+ "total_amount": order_data["total_amount"],
+ "products": order_data["products"],
+ "status": order_data["status"]
+ }
+ )
+
+ async def send_password_reset(self, email: EmailStr, reset_token: str):
+ """Send password reset email"""
+ return await self.send_email(
+ email_to=[email],
+ subject="Password Reset Request",
+ template_name="password_reset.html",
+ template_data={
+ "reset_token": reset_token,
+ "reset_url": f"{settings.FRONTEND_URL}/reset-password?token={reset_token}"
+ }
+ )
+
+ async def send_welcome_email(self, email: EmailStr, username: str):
+ """Send welcome email to new users"""
+ return await self.send_email(
+ email_to=[email],
+ subject="Welcome to Admin Dashboard",
+ template_name="welcome.html",
+ template_data={
+ "username": username,
+ "login_url": f"{settings.FRONTEND_URL}/login"
+ }
+ )
+
+ async def send_low_stock_alert(self, email: EmailStr, product_data: Dict[str, Any]):
+ """Send low stock alert to admins"""
+ return await self.send_email(
+ email_to=[email],
+ subject="Low Stock Alert",
+ template_name="low_stock_alert.html",
+ template_data={
+ "product_name": product_data["name"],
+ "current_stock": product_data["inventory_count"],
+ "product_id": str(product_data["_id"])
+ }
+ )
+
+email_service = EmailService()
\ No newline at end of file
diff --git a/app/utils/file_storage.py b/app/utils/file_storage.py
new file mode 100644
index 0000000000000000000000000000000000000000..217a83162c5c631f030ca5277edb5589ee34d2eb
--- /dev/null
+++ b/app/utils/file_storage.py
@@ -0,0 +1,79 @@
+import os
+import shutil
+from fastapi import UploadFile
+from datetime import datetime
+from pathlib import Path
+from typing import Optional
+from ..core.config import settings
+from .logger import logger
+
+class FileStorage:
+ def __init__(self):
+ self.upload_dir = Path("uploads")
+ self.upload_dir.mkdir(exist_ok=True)
+
+ # Create subdirectories for different file types
+ self.image_dir = self.upload_dir / "images"
+ self.document_dir = self.upload_dir / "documents"
+ self.image_dir.mkdir(exist_ok=True)
+ self.document_dir.mkdir(exist_ok=True)
+
+ async def save_file(
+ self,
+ file: UploadFile,
+ category: str = "documents",
+ max_size: int = 10 * 1024 * 1024 # 10MB default
+ ) -> Optional[str]:
+ try:
+ # Validate file size
+ file.file.seek(0, os.SEEK_END)
+ size = file.file.tell()
+ file.file.seek(0)
+
+ if size > max_size:
+ raise ValueError(f"File size exceeds maximum limit of {max_size/1024/1024}MB")
+
+ # Generate unique filename
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
+ filename = f"{timestamp}_{file.filename}"
+
+ # Determine storage directory based on category
+ if category == "images":
+ save_dir = self.image_dir
+ allowed_types = {".jpg", ".jpeg", ".png", ".gif"}
+ else:
+ save_dir = self.document_dir
+ allowed_types = {".pdf", ".doc", ".docx", ".txt"}
+
+ # Validate file type
+ file_ext = Path(file.filename).suffix.lower()
+ if file_ext not in allowed_types:
+ raise ValueError(f"File type {file_ext} not allowed")
+
+ # Save file
+ file_path = save_dir / filename
+ with file_path.open("wb") as buffer:
+ shutil.copyfileobj(file.file, buffer)
+
+ return str(file_path.relative_to(self.upload_dir))
+
+ except Exception as e:
+ logger.error(f"File upload error: {str(e)}")
+ return None
+
+ async def delete_file(self, file_path: str) -> bool:
+ try:
+ full_path = self.upload_dir / file_path
+ if full_path.exists():
+ full_path.unlink()
+ return True
+ return False
+ except Exception as e:
+ logger.error(f"File deletion error: {str(e)}")
+ return False
+
+ def get_file_url(self, file_path: str) -> str:
+ """Generate URL for accessing the file"""
+ return f"/uploads/{file_path}"
+
+file_storage = FileStorage()
\ No newline at end of file
diff --git a/app/utils/logger.py b/app/utils/logger.py
new file mode 100644
index 0000000000000000000000000000000000000000..683420b2879a7f67121656edf446596e7599f608
--- /dev/null
+++ b/app/utils/logger.py
@@ -0,0 +1,59 @@
+import logging
+import sys
+from datetime import datetime
+from pathlib import Path
+from logging.handlers import RotatingFileHandler
+from ..core.config import settings
+
+# Create logs directory if it doesn't exist
+logs_dir = Path("logs")
+logs_dir.mkdir(exist_ok=True)
+
+# Configure logging format
+log_format = logging.Formatter(
+ "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+)
+
+def setup_logger(name: str) -> logging.Logger:
+ logger = logging.getLogger(name)
+ logger.setLevel(logging.INFO)
+
+ # Console handler
+ console_handler = logging.StreamHandler(sys.stdout)
+ console_handler.setFormatter(log_format)
+ logger.addHandler(console_handler)
+
+ # File handler with rotation
+ file_handler = RotatingFileHandler(
+ logs_dir / f"{name}.log",
+ maxBytes=10485760, # 10MB
+ backupCount=5
+ )
+ file_handler.setFormatter(log_format)
+ logger.addHandler(file_handler)
+
+ return logger
+
+# Create main application logger
+logger = setup_logger("admin_dashboard")
+
+def log_api_request(method: str, path: str, status_code: int, duration: float):
+ """Log API request details"""
+ logger.info(
+ f"API Request - Method: {method}, Path: {path}, "
+ f"Status: {status_code}, Duration: {duration:.3f}s"
+ )
+
+def log_error(error: Exception, context: dict = None):
+ """Log error with context"""
+ logger.error(
+ f"Error: {str(error)}, Type: {type(error).__name__}, "
+ f"Context: {context or {}}"
+ )
+
+def log_database_operation(operation: str, collection: str, success: bool):
+ """Log database operations"""
+ logger.info(
+ f"Database Operation - Type: {operation}, Collection: {collection}, "
+ f"Success: {success}"
+ )
\ No newline at end of file
diff --git a/app/utils/rate_limiter.py b/app/utils/rate_limiter.py
new file mode 100644
index 0000000000000000000000000000000000000000..194768c49d6bcfb8d250ed80981977dd84bb9dfb
--- /dev/null
+++ b/app/utils/rate_limiter.py
@@ -0,0 +1,36 @@
+from fastapi import HTTPException, Request
+from redis import Redis
+from ..core.config import settings
+import time
+
+class RateLimiter:
+ def __init__(self, redis_client: Redis):
+ self.redis = redis_client
+ self.rate_limit = 100 # requests
+ self.time_window = 60 # seconds
+
+ async def check_rate_limit(self, request: Request):
+ client_ip = request.client.host
+ key = f"rate_limit:{client_ip}"
+
+ current = int(time.time())
+ window_start = current - self.time_window
+
+ pipeline = self.redis.pipeline()
+ pipeline.zremrangebyscore(key, 0, window_start)
+ pipeline.zadd(key, {str(current): current})
+ pipeline.zcard(key)
+ pipeline.expire(key, self.time_window)
+ _, _, request_count, _ = pipeline.execute()
+
+ if request_count > self.rate_limit:
+ raise HTTPException(
+ status_code=429,
+ detail="Too many requests. Please try again later."
+ )
+
+rate_limiter = RateLimiter(Redis(
+ host=settings.REDIS_HOST,
+ port=settings.REDIS_PORT,
+ decode_responses=True
+))
\ No newline at end of file
diff --git a/app/utils/tasks.py b/app/utils/tasks.py
new file mode 100644
index 0000000000000000000000000000000000000000..2e1da056085ac48a1673c920be647a2c77289049
--- /dev/null
+++ b/app/utils/tasks.py
@@ -0,0 +1,139 @@
+from datetime import datetime, timedelta
+from ..db.database import db
+from ..services.notifications import notifications
+from ..services.maintenance import maintenance
+from ..utils.logger import logger
+import asyncio
+
+async def check_event_reminders():
+ """Check and send event reminders"""
+ try:
+ now = datetime.utcnow()
+ reminder_window = now + timedelta(minutes=30) # Check next 30 minutes
+
+ # Find events that need reminders
+ events = await db.db["events"].find({
+ "start_time": {
+ "$gte": now,
+ "$lte": reminder_window
+ },
+ "reminder_sent": {"$ne": True}
+ }).to_list(None)
+
+ for event in events:
+ # Send reminder to event creator
+ await notifications.create_notification(
+ user_id=event["user_id"],
+ title=f"Event Reminder: {event['title']}",
+ message=f"Your event '{event['title']}' starts in {event['reminder_minutes']} minutes",
+ notification_type="event_reminder",
+ data={"event_id": str(event["_id"])}
+ )
+
+ # Send reminders to attendees
+ for attendee in event.get("attendees", []):
+ await notifications.create_notification(
+ user_id=attendee,
+ title=f"Event Reminder: {event['title']}",
+ message=f"Event '{event['title']}' starts in {event['reminder_minutes']} minutes",
+ notification_type="event_reminder",
+ data={"event_id": str(event["_id"])}
+ )
+
+ # Mark reminder as sent
+ await db.db["events"].update_one(
+ {"_id": event["_id"]},
+ {"$set": {"reminder_sent": True}}
+ )
+
+ except Exception as e:
+ logger.error(f"Error in event reminder check: {str(e)}")
+
+async def cleanup_old_notifications():
+ """Clean up old notifications"""
+ try:
+ # Delete notifications older than 30 days
+ cutoff_date = datetime.utcnow() - timedelta(days=30)
+ result = await db.db["notifications"].delete_many({
+ "created_at": {"$lt": cutoff_date}
+ })
+ logger.info(f"Cleaned up {result.deleted_count} old notifications")
+ except Exception as e:
+ logger.error(f"Error in notification cleanup: {str(e)}")
+
+async def perform_daily_maintenance():
+ """Perform daily system maintenance tasks"""
+ try:
+ # Clean up expired sessions
+ deleted_sessions = await maintenance.cleanup_expired_sessions()
+ logger.info(f"Cleaned up {deleted_sessions} expired sessions")
+
+ # Archive old data
+ archived = await maintenance.archive_old_data()
+ if archived:
+ logger.info(f"Archived data: {archived}")
+
+ # Check system health
+ health_data = await maintenance.check_system_health()
+ if "error" not in health_data:
+ logger.info("System health check completed successfully")
+ else:
+ logger.error(f"System health check error: {health_data['error']}")
+
+ # Monitor system resources
+ resources = await maintenance.monitor_system_resources()
+ if "error" not in resources:
+ logger.info("System resource monitoring completed successfully")
+ else:
+ logger.error(f"Resource monitoring error: {resources['error']}")
+
+ except Exception as e:
+ logger.error(f"Error in daily maintenance: {str(e)}")
+
+async def perform_weekly_maintenance():
+ """Perform weekly system maintenance tasks"""
+ try:
+ # Perform database maintenance
+ db_maintenance = await maintenance.perform_database_maintenance()
+ if "error" not in db_maintenance:
+ logger.info("Database maintenance completed successfully")
+ else:
+ logger.error(f"Database maintenance error: {db_maintenance['error']}")
+
+ except Exception as e:
+ logger.error(f"Error in weekly maintenance: {str(e)}")
+
+async def run_periodic_tasks():
+ """Run periodic maintenance tasks"""
+ daily_maintenance_run = False
+ weekly_maintenance_run = False
+
+ while True:
+ try:
+ now = datetime.utcnow()
+
+ # Check event reminders every minute
+ await check_event_reminders()
+
+ # Clean up old notifications daily
+ await cleanup_old_notifications()
+
+ # Run daily maintenance at 2 AM
+ if now.hour == 2 and not daily_maintenance_run:
+ await perform_daily_maintenance()
+ daily_maintenance_run = True
+ elif now.hour != 2:
+ daily_maintenance_run = False
+
+ # Run weekly maintenance on Sunday at 3 AM
+ if now.weekday() == 6 and now.hour == 3 and not weekly_maintenance_run:
+ await perform_weekly_maintenance()
+ weekly_maintenance_run = True
+ elif now.weekday() != 6 or now.hour != 3:
+ weekly_maintenance_run = False
+
+ # Wait before next check
+ await asyncio.sleep(60) # 1 minute
+ except Exception as e:
+ logger.error(f"Error in periodic tasks: {str(e)}")
+ await asyncio.sleep(60) # Wait before retrying
\ No newline at end of file
diff --git a/package.json b/package.json
new file mode 100644
index 0000000000000000000000000000000000000000..e2888a287440d793b71057c0a6605be25de0bfea
--- /dev/null
+++ b/package.json
@@ -0,0 +1,12 @@
+{
+ "name": "admin_bknd2",
+ "version": "1.0.0",
+ "main": "index.js",
+ "scripts": {
+ "test": "echo \"Error: no test specified\" && exit 1"
+ },
+ "keywords": [],
+ "author": "",
+ "license": "ISC",
+ "description": ""
+}
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000000000000000000000000000000000000..56d5ce75b34bca059641978a8801be65ce8c64e1
Binary files /dev/null and b/requirements.txt differ