Spaces:
Running
Running
| import json | |
| import random | |
| import string | |
| from typing import List, Optional, Any | |
| from datetime import datetime, timedelta | |
| from fastapi import FastAPI, HTTPException, Depends, Query, Request, status | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field, field_validator, model_validator, ConfigDict | |
| from pydantic import HttpUrl, EmailStr | |
| from sqlalchemy import create_engine, Column, String, Integer, Float, Boolean, Text, DateTime, func | |
| from sqlalchemy.orm import declarative_base, sessionmaker, Session | |
| import os | |
| from fastapi.responses import JSONResponse | |
| # ============================================================================= | |
| # DATABASE SETUP | |
| # ============================================================================= | |
| if os.path.exists("/data"): | |
| DATABASE_DIR = "/data" | |
| print("✅ PRODUCTION MODE: Using Persistent Storage at /data") | |
| else: | |
| DATABASE_DIR = os.path.join(os.getcwd(), "data") | |
| os.makedirs(DATABASE_DIR, exist_ok=True) | |
| print(f"⚠️ LOCAL MODE: Using local storage at {DATABASE_DIR}") | |
| # 2. Set the Database URL | |
| SQLALCHEMY_DATABASE_URL = f"sqlite:///{DATABASE_DIR}/tools.db" | |
| # 3. Create Engine | |
| engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base = declarative_base() | |
| # ============================================================================= | |
| # DATABASE MODELS | |
| # ============================================================================= | |
| class Tool(Base): | |
| __tablename__ = "tools" | |
| id = Column(Integer, primary_key=True, index=True) | |
| slug = Column(String, unique=True, index=True) | |
| name = Column(String, index=True) | |
| description = Column(String) | |
| url = Column(String) | |
| category = Column(String) | |
| tags = Column(String) # JSON string | |
| developer_name = Column(String) | |
| developer_website = Column(String) | |
| developer_email = Column(String) | |
| price = Column(Float, nullable=True) | |
| pricing_model = Column(String, nullable=True) | |
| notes = Column(Text, nullable=True) | |
| submission_date = Column(DateTime, default=func.now()) | |
| status = Column(String, default="pending") | |
| featured = Column(Boolean, default=False) | |
| rating = Column(Float, default=0.0) | |
| thumbnail = Column(String, nullable=True) | |
| updated_at = Column(DateTime, default=func.now(), onupdate=func.now()) | |
| promoted = Column(Boolean, default=False) | |
| # Create tables | |
| Base.metadata.create_all(bind=engine) | |
| # ============================================================================= | |
| # PYDANTIC V2 SCHEMAS | |
| # ============================================================================= | |
| class Developer(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| name: str = Field(..., min_length=2, max_length=100) | |
| website: str = Field(..., pattern=r'^https?://') # Simplified URL validation | |
| email: Optional[str] = Field(None, pattern=r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') | |
| def validate_website(cls, v: str) -> str: | |
| if not v.startswith(('http://', 'https://')): | |
| return f'https://{v}' | |
| return v | |
| class ToolCreate(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| name: str = Field(..., min_length=2, max_length=100) | |
| description: str = Field(..., min_length=10, max_length=500) | |
| url: str = Field(..., pattern=r'^https?://') | |
| category: str = Field(...) | |
| tags: List[str] = Field(..., min_length=1) | |
| developer: Developer | |
| price: Optional[float] = Field(None, ge=0) | |
| pricingModel: Optional[str] = Field(None) | |
| notes: Optional[str] = Field(None) | |
| terms: bool = Field(..., description="Must agree to terms") | |
| def validate_url(cls, v: str) -> str: | |
| if not v.startswith(('http://', 'https://')): | |
| return f'https://{v}' | |
| return v | |
| def validate_terms(cls, v: bool) -> bool: | |
| if not v: | |
| raise ValueError('You must agree to the terms') | |
| return v | |
| class ToolUpdate(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| name: Optional[str] = Field(None, min_length=2, max_length=100) | |
| description: Optional[str] = Field(None, min_length=10, max_length=500) | |
| url: Optional[str] = Field(None, pattern=r'^https?://') | |
| category: Optional[str] = None | |
| tags: Optional[List[str]] = None | |
| developer: Optional[Developer] = None | |
| price: Optional[float] = Field(None, ge=0) | |
| pricingModel: Optional[str] = None | |
| notes: Optional[str] = None | |
| status: Optional[str] = Field(None, pattern=r'^(pending|approved|rejected)$') | |
| featured: Optional[bool] = None | |
| rating: Optional[float] = Field(None, ge=0, le=5) | |
| thumbnail: Optional[str] = None | |
| promoted: Optional[bool] = None | |
| def validate_url(cls, v: Optional[str]) -> Optional[str]: | |
| if v and not v.startswith(('http://', 'https://')): | |
| return f'https://{v}' | |
| return v | |
| class ToolResponse(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| id: str # Slug | |
| name: str | |
| description: str | |
| url: str | |
| category: str | |
| tags: List[str] | |
| developer: Developer | |
| price: Optional[float] = None | |
| pricingModel: Optional[str] = None | |
| submissionDate: str | |
| status: str | |
| featured: bool | |
| rating: float | |
| thumbnail: Optional[str] = None | |
| updatedAt: Optional[str] = None | |
| promoted: bool = False | |
| notes: Optional[str] = None | |
| # ============================================================================= | |
| # FASTAPI APP SETUP | |
| # ============================================================================= | |
| app = FastAPI( | |
| title="Ahaa", | |
| description="Ahaa", | |
| version="2.0.0", | |
| ) | |
| # CORS Configuration - Allow all origins for development | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # ============================================================================= | |
| # DEPENDENCIES & UTILITIES | |
| # ============================================================================= | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def generate_slug(): | |
| return 'tool_' + ''.join(random.choices(string.ascii_lowercase + string.digits, k=9)) | |
| def map_db_to_response(db_tool: Tool) -> ToolResponse: | |
| """Convert SQLAlchemy model to Pydantic response model""" | |
| try: | |
| tags_list = json.loads(db_tool.tags) if db_tool.tags else [] | |
| except: | |
| tags_list = [] | |
| return ToolResponse( | |
| id=db_tool.slug, | |
| name=db_tool.name, | |
| description=db_tool.description, | |
| url=db_tool.url, | |
| category=db_tool.category, | |
| tags=tags_list, | |
| developer=Developer( | |
| name=db_tool.developer_name, | |
| website=db_tool.developer_website, | |
| email=db_tool.developer_email | |
| ), | |
| price=db_tool.price, | |
| pricingModel=db_tool.pricing_model, | |
| submissionDate=db_tool.submission_date.isoformat(), | |
| status=db_tool.status, | |
| featured=db_tool.featured, | |
| rating=db_tool.rating, | |
| thumbnail=db_tool.thumbnail, | |
| updatedAt=db_tool.updated_at.isoformat() if db_tool.updated_at else None, | |
| notes=db_tool.notes | |
| ) | |
| # ============================================================================= | |
| # HEALTH CHECK | |
| # ============================================================================= | |
| async def root(): | |
| return { | |
| "message": "error", | |
| } | |
| async def health_check(): | |
| return { | |
| "status": "healthy", | |
| } | |
| # ============================================================================= | |
| # API ENDPOINTS | |
| # ============================================================================= | |
| def create_tool(tool: ToolCreate, db: Session = Depends(get_db)): | |
| """ | |
| Create a new tool submission | |
| """ | |
| # Check for duplicate URL | |
| existing = db.query(Tool).filter(Tool.url == tool.url).first() | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="A tool with this URL already exists." | |
| ) | |
| # Create database object | |
| db_tool = Tool( | |
| slug=generate_slug(), | |
| name=tool.name, | |
| description=tool.description, | |
| url=tool.url, | |
| category=tool.category, | |
| tags=json.dumps(tool.tags), | |
| developer_name=tool.developer.name, | |
| developer_website=tool.developer.website, | |
| developer_email=tool.developer.email, | |
| price=tool.price, | |
| pricing_model=tool.pricingModel, | |
| notes=tool.notes, | |
| status="pending", | |
| featured=False, | |
| rating=0.0 | |
| ) | |
| try: | |
| db.add(db_tool) | |
| db.commit() | |
| db.refresh(db_tool) | |
| return map_db_to_response(db_tool) | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Database error: {str(e)}" | |
| ) | |
| def get_tools( | |
| skip: int = Query(0, ge=0, description="Number of items to skip"), | |
| limit: int = Query(100, ge=1, le=1000, description="Number of items to return"), | |
| status: Optional[str] = Query(None, description="Filter by status"), | |
| category: Optional[str] = Query(None, description="Filter by category"), | |
| featured: Optional[bool] = Query(None, description="Filter by featured status"), | |
| search: Optional[str] = Query(None, description="Search in name and description"), | |
| db: Session = Depends(get_db) | |
| ): | |
| """ | |
| Get paginated list of tools with filtering options | |
| """ | |
| query = db.query(Tool) | |
| # Apply filters | |
| if status: | |
| query = query.filter(Tool.status == status) | |
| if category: | |
| query = query.filter(Tool.category == category) | |
| if featured is not None: | |
| query = query.filter(Tool.featured == featured) | |
| if search: | |
| search_term = f"%{search}%" | |
| query = query.filter( | |
| (Tool.name.ilike(search_term)) | | |
| (Tool.description.ilike(search_term)) | |
| ) | |
| # Get results | |
| tools = query.order_by(Tool.submission_date.desc()).offset(skip).limit(limit).all() | |
| return [map_db_to_response(t) for t in tools] | |
| def get_tool(slug: str, db: Session = Depends(get_db)): | |
| """ | |
| Get a specific tool by slug | |
| """ | |
| tool = db.query(Tool).filter(Tool.slug == slug).first() | |
| if not tool: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Tool not found" | |
| ) | |
| return map_db_to_response(tool) | |
| def update_tool(slug: str, tool_update: ToolUpdate, db: Session = Depends(get_db)): | |
| """ | |
| Update a tool | |
| """ | |
| db_tool = db.query(Tool).filter(Tool.slug == slug).first() | |
| if not db_tool: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Tool not found" | |
| ) | |
| # Update fields if provided | |
| update_data = tool_update.model_dump(exclude_unset=True) | |
| if 'pricingModel' in update_data: | |
| update_data['pricing_model'] = update_data.pop('pricingModel') | |
| if 'developer' in update_data: | |
| developer = update_data.pop('developer') | |
| db_tool.developer_name = developer['name'] | |
| db_tool.developer_website = developer['website'] | |
| db_tool.developer_email = developer['email'] | |
| if 'tags' in update_data: | |
| db_tool.tags = json.dumps(update_data.pop('tags')) | |
| # Update remaining fields | |
| for key, value in update_data.items(): | |
| if hasattr(db_tool, key): | |
| setattr(db_tool, key, value) | |
| try: | |
| db.commit() | |
| db.refresh(db_tool) | |
| return map_db_to_response(db_tool) | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Update failed: {str(e)}" | |
| ) | |
| def update_tool(tool_id: str, tool_update: ToolUpdate, db: Session = Depends(get_db)): | |
| # 1. Find the tool | |
| db_tool = db.query(Tool).filter(Tool.id == tool_id).first() | |
| if not db_tool: | |
| raise HTTPException(status_code=404, detail="Tool not found") | |
| # 2. Update only the fields provided in the request | |
| # This magic line ensures that sending {"promoted": true} ONLY updates that one flag | |
| update_data = tool_update.dict(exclude_unset=True) | |
| for key, value in update_data.items(): | |
| setattr(db_tool, key, value) | |
| # 3. Save and refresh | |
| db.commit() | |
| db.refresh(db_tool) | |
| # 4. Return the updated tool | |
| return map_db_to_response(db_tool) | |
| def delete_tool(slug: str, db: Session = Depends(get_db)): | |
| """ | |
| Delete a tool | |
| """ | |
| tool = db.query(Tool).filter(Tool.slug == slug).first() | |
| if not tool: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Tool not found" | |
| ) | |
| try: | |
| db.delete(tool) | |
| db.commit() | |
| return { | |
| "message": "Tool deleted successfully", | |
| "slug": slug | |
| } | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Deletion failed: {str(e)}" | |
| ) | |
| def get_statistics(db: Session = Depends(get_db)): | |
| """ | |
| Get system statistics | |
| """ | |
| try: | |
| total = db.query(Tool).count() | |
| pending = db.query(Tool).filter(Tool.status == "pending").count() | |
| approved = db.query(Tool).filter(Tool.status == "approved").count() | |
| rejected = db.query(Tool).filter(Tool.status == "rejected").count() | |
| featured = db.query(Tool).filter(Tool.featured == True).count() | |
| # Get category distribution | |
| categories = db.query( | |
| Tool.category, | |
| func.count(Tool.id).label('count') | |
| ).group_by(Tool.category).all() | |
| # Get recent submissions (last 7 days) | |
| week_ago = datetime.now() - timedelta(days=7) | |
| recent = db.query(Tool).filter(Tool.submission_date >= week_ago).count() | |
| return { | |
| "total": total, | |
| "pending": pending, | |
| "approved": approved, | |
| "rejected": rejected, | |
| "featured": featured, | |
| "recent_submissions": recent, | |
| "categories": [{"name": c[0], "count": c[1]} for c in categories], | |
| "updated": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to fetch statistics: {str(e)}" | |
| ) | |
| # ============================================================================= | |
| # ERROR HANDLING | |
| # ============================================================================= | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content={ | |
| "detail": exc.detail, | |
| "path": request.url.path, | |
| "method": request.method | |
| } | |
| ) | |
| async def general_exception_handler(request: Request, exc: Exception): | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content={ | |
| "detail": "Internal server error", | |
| "error": str(exc), | |
| "path": request.url.path | |
| } | |
| ) | |
| # ============================================================================= | |
| # MAIN EXECUTION | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| import uvicorn | |
| from fastapi.responses import JSONResponse | |
| from datetime import timedelta | |
| uvicorn.run( | |
| "app:app", | |
| host="0.0.0.0", | |
| port=7860, | |
| log_level="info", | |
| reload=True | |
| ) |