""" SQLAlchemy ORM models with relationships, indexes, and cascading deletes. All timestamps are UTC. All text fields are sanitized at the service layer. """ from __future__ import annotations import json from datetime import datetime, timezone from typing import Optional from flask_sqlalchemy import SQLAlchemy from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, String, Text, Enum, UniqueConstraint, ) from sqlalchemy.orm import relationship db = SQLAlchemy() class BaseModel(db.Model): """Base model with __allow_unmapped__ for legacy annotations.""" __abstract__ = True __allow_unmapped__ = True def utcnow() -> datetime: return datetime.now(timezone.utc).replace(tzinfo=None) # --------------------------------------------------------------------------- # Enumerations (stored as VARCHAR for SQLite compatibility) # --------------------------------------------------------------------------- JOB_STATUS = ("pending", "running", "completed", "failed", "cancelled") SCRAPE_TYPE = ("static", "dynamic") EXTRACTION_TYPE = ("text", "images", "links", "attributes", "table", "json_ld", "full_html") EXPORT_FORMAT = ("json", "csv", "excel") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class User(BaseModel): """Simple user model — single-user by default, extendable to multi-user.""" __tablename__ = "users" id: int = Column(Integer, primary_key=True) username: str = Column(String(80), unique=True, nullable=False, index=True) email: str = Column(String(120), unique=True, nullable=False, index=True) password_hash: str = Column(String(256), nullable=False) is_active: bool = Column(Boolean, default=True, nullable=False) is_admin: bool = Column(Boolean, default=False, nullable=False) api_key: Optional[str] = Column(String(64), unique=True, index=True) created_at: datetime = Column(DateTime, default=utcnow, nullable=False) updated_at: datetime = Column(DateTime, default=utcnow, onupdate=utcnow, nullable=False) # Relationships jobs: list[ScrapeJob] = relationship( "ScrapeJob", back_populates="user", cascade="all, delete-orphan", lazy="select" ) schedules: list[Schedule] = relationship( "Schedule", back_populates="user", cascade="all, delete-orphan", lazy="select" ) def __repr__(self) -> str: return f"" def to_dict(self) -> dict: return { "id": self.id, "username": self.username, "email": self.email, "is_active": self.is_active, "is_admin": self.is_admin, "created_at": self.created_at.isoformat() if self.created_at else None, } class ScrapeJob(BaseModel): """Represents a single scraping job with all configuration.""" __tablename__ = "scrape_jobs" __table_args__ = ( Index("ix_scrape_jobs_status", "status"), Index("ix_scrape_jobs_created_at", "created_at"), Index("ix_scrape_jobs_user_id", "user_id"), ) id: int = Column(Integer, primary_key=True) user_id: Optional[int] = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) name: str = Column(String(200), nullable=False) url: str = Column(Text, nullable=False) # Selectors html_tag: Optional[str] = Column(String(100)) css_selector: Optional[str] = Column(Text) xpath_selector: Optional[str] = Column(Text) attribute_name: Optional[str] = Column(String(100)) # Configuration extraction_type: str = Column( Enum(*EXTRACTION_TYPE, name="extraction_type"), default="text", nullable=False ) scrape_type: str = Column( Enum(*SCRAPE_TYPE, name="scrape_type"), default="static", nullable=False ) status: str = Column( Enum(*JOB_STATUS, name="job_status"), default="pending", nullable=False ) # Advanced options follow_pagination: bool = Column(Boolean, default=False) max_pages: int = Column(Integer, default=1) infinite_scroll: bool = Column(Boolean, default=False) scroll_count: int = Column(Integer, default=3) download_images: bool = Column(Boolean, default=False) custom_headers: Optional[str] = Column(Text) # JSON string user_agent: Optional[str] = Column(Text) delay_seconds: float = Column(Float, default=1.0) timeout_seconds: int = Column(Integer, default=30) max_retries: int = Column(Integer, default=3) check_robots_txt: bool = Column(Boolean, default=True) deduplicate: bool = Column(Boolean, default=True) # Stats total_items: int = Column(Integer, default=0) pages_scraped: int = Column(Integer, default=0) error_count: int = Column(Integer, default=0) duration_seconds: Optional[float] = Column(Float) # Timestamps created_at: datetime = Column(DateTime, default=utcnow, nullable=False) updated_at: datetime = Column(DateTime, default=utcnow, onupdate=utcnow) started_at: Optional[datetime] = Column(DateTime) completed_at: Optional[datetime] = Column(DateTime) # Relationships user: Optional[User] = relationship("User", back_populates="jobs") results: list[ScrapeResult] = relationship( "ScrapeResult", back_populates="job", cascade="all, delete-orphan", lazy="select" ) logs: list[JobLog] = relationship( "JobLog", back_populates="job", cascade="all, delete-orphan", lazy="select" ) exports: list[ExportRecord] = relationship( "ExportRecord", back_populates="job", cascade="all, delete-orphan", lazy="select" ) def __repr__(self) -> str: return f"" @property def custom_headers_dict(self) -> dict: if self.custom_headers: try: return json.loads(self.custom_headers) except (json.JSONDecodeError, TypeError): return {} return {} def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "url": self.url, "html_tag": self.html_tag, "css_selector": self.css_selector, "xpath_selector": self.xpath_selector, "extraction_type": self.extraction_type, "scrape_type": self.scrape_type, "status": self.status, "follow_pagination": self.follow_pagination, "max_pages": self.max_pages, "infinite_scroll": self.infinite_scroll, "total_items": self.total_items, "pages_scraped": self.pages_scraped, "error_count": self.error_count, "duration_seconds": self.duration_seconds, "created_at": self.created_at.isoformat() if self.created_at else None, "started_at": self.started_at.isoformat() if self.started_at else None, "completed_at": self.completed_at.isoformat() if self.completed_at else None, } class ScrapeResult(BaseModel): """Individual scraped data item linked to a job.""" __tablename__ = "scrape_results" __table_args__ = ( Index("ix_scrape_results_job_id", "job_id"), Index("ix_scrape_results_page_num", "page_num"), Index("ix_scrape_results_content_hash", "content_hash"), ) id: int = Column(Integer, primary_key=True) job_id: int = Column(Integer, ForeignKey("scrape_jobs.id", ondelete="CASCADE"), nullable=False) page_url: str = Column(Text, nullable=False) page_num: int = Column(Integer, default=1, nullable=False) item_index: int = Column(Integer, default=0) content: Optional[str] = Column(Text) content_type: str = Column(String(50), default="text") content_hash: Optional[str] = Column(String(64)) metadata_: Optional[str] = Column("metadata", Text) # JSON created_at: datetime = Column(DateTime, default=utcnow, nullable=False) # Relationship job: ScrapeJob = relationship("ScrapeJob", back_populates="results") def __repr__(self) -> str: return f"" @property def metadata_dict(self) -> dict: if self.metadata_: try: return json.loads(self.metadata_) except (json.JSONDecodeError, TypeError): return {} return {} def to_dict(self) -> dict: return { "id": self.id, "job_id": self.job_id, "page_url": self.page_url, "page_num": self.page_num, "item_index": self.item_index, "content": self.content, "content_type": self.content_type, "metadata": self.metadata_dict, "created_at": self.created_at.isoformat() if self.created_at else None, } class JobLog(BaseModel): """Structured logs for each scraping job.""" __tablename__ = "job_logs" __table_args__ = ( Index("ix_job_logs_job_id", "job_id"), Index("ix_job_logs_level", "level"), Index("ix_job_logs_created_at", "created_at"), ) id: int = Column(Integer, primary_key=True) job_id: int = Column(Integer, ForeignKey("scrape_jobs.id", ondelete="CASCADE"), nullable=False) level: str = Column(String(10), default="INFO", nullable=False) message: str = Column(Text, nullable=False) details: Optional[str] = Column(Text) # JSON for structured extras created_at: datetime = Column(DateTime, default=utcnow, nullable=False) job: ScrapeJob = relationship("ScrapeJob", back_populates="logs") def to_dict(self) -> dict: return { "id": self.id, "job_id": self.job_id, "level": self.level, "message": self.message, "details": self.details, "created_at": self.created_at.isoformat() if self.created_at else None, } class ExportRecord(BaseModel): """Tracks export files generated for each job.""" __tablename__ = "export_records" __table_args__ = (Index("ix_export_records_job_id", "job_id"),) id: int = Column(Integer, primary_key=True) job_id: int = Column(Integer, ForeignKey("scrape_jobs.id", ondelete="CASCADE"), nullable=False) format: str = Column(Enum(*EXPORT_FORMAT, name="export_format"), nullable=False) filename: str = Column(String(255), nullable=False) filepath: str = Column(Text, nullable=False) file_size_bytes: Optional[int] = Column(Integer) row_count: int = Column(Integer, default=0) created_at: datetime = Column(DateTime, default=utcnow, nullable=False) job: ScrapeJob = relationship("ScrapeJob", back_populates="exports") def to_dict(self) -> dict: return { "id": self.id, "job_id": self.job_id, "format": self.format, "filename": self.filename, "file_size_bytes": self.file_size_bytes, "row_count": self.row_count, "created_at": self.created_at.isoformat() if self.created_at else None, } class Schedule(BaseModel): """APScheduler-backed recurring scrape schedules.""" __tablename__ = "schedules" __table_args__ = (Index("ix_schedules_user_id", "user_id"),) id: int = Column(Integer, primary_key=True) user_id: Optional[int] = Column(Integer, ForeignKey("users.id", ondelete="SET NULL"), nullable=True) name: str = Column(String(200), nullable=False) cron_expression: str = Column(String(100), nullable=False) # e.g. "0 9 * * 1" job_config: str = Column(Text, nullable=False) # JSON of ScrapeJob config is_active: bool = Column(Boolean, default=True, nullable=False) last_run_at: Optional[datetime] = Column(DateTime) next_run_at: Optional[datetime] = Column(DateTime) run_count: int = Column(Integer, default=0) created_at: datetime = Column(DateTime, default=utcnow, nullable=False) updated_at: datetime = Column(DateTime, default=utcnow, onupdate=utcnow) user: Optional[User] = relationship("User", back_populates="schedules") @property def job_config_dict(self) -> dict: try: return json.loads(self.job_config) except (json.JSONDecodeError, TypeError): return {} def to_dict(self) -> dict: return { "id": self.id, "name": self.name, "cron_expression": self.cron_expression, "is_active": self.is_active, "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, "run_count": self.run_count, "created_at": self.created_at.isoformat() if self.created_at else None, } class AppSetting(BaseModel): """Key-value application settings stored in the DB.""" __tablename__ = "app_settings" id: int = Column(Integer, primary_key=True) key: str = Column(String(100), unique=True, nullable=False, index=True) value: str = Column(Text, nullable=False) description: Optional[str] = Column(Text) updated_at: datetime = Column(DateTime, default=utcnow, onupdate=utcnow) @classmethod def get(cls, key: str, default: str = "") -> str: row = cls.query.filter_by(key=key).first() return row.value if row else default @classmethod def set(cls, key: str, value: str, description: str = "") -> None: row = cls.query.filter_by(key=key).first() if row: row.value = value else: row = cls(key=key, value=value, description=description) db.session.add(row) db.session.commit()