import os from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, ForeignKey, JSON, Boolean from sqlalchemy.orm import declarative_base, sessionmaker, relationship from datetime import datetime from contextlib import contextmanager DATABASE_URL = os.environ.get('DATABASE_URL', 'sqlite:///output/app.db') # 1. Connection Pooling & Production Settings engine = create_engine( DATABASE_URL, pool_size=20 if not DATABASE_URL.startswith("sqlite") else 5, max_overflow=40 if not DATABASE_URL.startswith("sqlite") else 10, connect_args={"check_same_thread": False} if DATABASE_URL.startswith("sqlite") else {} ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # 2. Safe Transaction Handling (Atomic Operations) @contextmanager def get_db_session(): """Context manager for atomic DB transactions with automatic rollback.""" db = SessionLocal() try: yield db db.commit() except Exception: db.rollback() raise finally: db.close() class Company(Base): __tablename__ = "companies" id = Column(Integer, primary_key=True, index=True) name = Column(String, nullable=False) domain = Column(String, index=True, nullable=True) # Added Index created_at = Column(DateTime, default=datetime.utcnow, index=True) users = relationship("User", back_populates="company") class User(Base): __tablename__ = "users" id = Column(Integer, primary_key=True, index=True) email = Column(String, unique=True, index=True, nullable=False) password_hash = Column(String, nullable=False) role = Column(String, default='user') company_id = Column(Integer, ForeignKey("companies.id"), index=True, nullable=True) # Added Index created_at = Column(DateTime, default=datetime.utcnow, index=True) company = relationship("Company", back_populates="users") jobs = relationship("Job", back_populates="user") class Job(Base): __tablename__ = "jobs" id = Column(Integer, primary_key=True, index=True) url = Column(String, nullable=False) org_name = Column(String, index=True, nullable=False) # Added Index org_url = Column(String, nullable=True) max_pages = Column(Integer, default=3) runs = Column(Integer, default=1) # Advanced State Machine: pending, running, retrying, failed, completed status = Column(String, default="pending", index=True) # Added Index progress = Column(JSON, default=dict) result_path = Column(String, nullable=True) user_id = Column(Integer, ForeignKey("users.id"), index=True, nullable=True) # Added Index company_id = Column(Integer, index=True, nullable=True) # Added Index industry_override = Column(String, nullable=True) # Cost tracking mechanism estimated_cost = Column(Integer, default=0) created_at = Column(DateTime, default=datetime.utcnow, index=True) # Added Index updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, index=True) user = relationship("User", back_populates="jobs") def get_db(): """Dependency for FastAPI""" db = SessionLocal() try: yield db finally: db.close() class Action(Base): __tablename__ = "actions" id = Column(Integer, primary_key=True, index=True) job_id = Column(Integer, ForeignKey("jobs.id"), index=True, nullable=True) type = Column(String, index=True) # content, technical, outreach task = Column(String) priority_score = Column(Integer, default=50) # Computed dynamically status = Column(String, default="pending", index=True) # pending, executing, done, failed result = Column(JSON, default=dict) # Technical output # Proof Engine Fields initial_metrics = Column(JSON, default=dict) # {clicks: X, rank: Y} latest_metrics = Column(JSON, default=dict) # {clicks: X+Z, rank: Y-A} impact_score = Column(Integer, default=0) # Computed growth % created_at = Column(DateTime, default=datetime.utcnow, index=True) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) class ProjectMetric(Base): __tablename__ = "project_metrics" id = Column(Integer, primary_key=True, index=True) job_id = Column(Integer, ForeignKey("jobs.id"), index=True) date = Column(DateTime, default=datetime.utcnow, index=True) clicks = Column(Integer, default=0) impressions = Column(Integer, default=0) avg_position = Column(Integer, default=100) class Mention(Base): __tablename__ = "mentions" id = Column(Integer, primary_key=True, index=True) brand = Column(String, index=True) source = Column(String) # Reddit, News, X content = Column(String) url = Column(String) sentiment_score = Column(Integer, nullable=True) created_at = Column(DateTime, default=datetime.utcnow, index=True) class Lead(Base): __tablename__ = "leads" id = Column(Integer, primary_key=True, index=True) job_id = Column(Integer, ForeignKey("jobs.id"), index=True, nullable=True) url = Column(String) email = Column(String, index=True, nullable=True) status = Column(String, default="found", index=True) # found, outreach_sent, replied created_at = Column(DateTime, default=datetime.utcnow, index=True) class Entity(Base): __tablename__ = "entities" id = Column(Integer, primary_key=True, index=True) name = Column(String, unique=True, index=True) type = Column(String, index=True) # brand, competence, competitor, topic roi_score = Column(Integer, default=0) # Computed from impact_score of linked actions # Relationships relations = relationship("EntityRelation", foreign_keys="[EntityRelation.source_id]", back_populates="source") class EntityRelation(Base): __tablename__ = "entity_relations" id = Column(Integer, primary_key=True, index=True) source_id = Column(Integer, ForeignKey("entities.id"), index=True) target_id = Column(Integer, ForeignKey("entities.id"), index=True) relation_type = Column(String) # competes_with, targets, part_of source = relationship("Entity", foreign_keys=[source_id]) target = relationship("Entity", foreign_keys=[target_id]) class ActionEntity(Base): __tablename__ = "action_entities" id = Column(Integer, primary_key=True, index=True) action_id = Column(Integer, ForeignKey("actions.id"), index=True) entity_id = Column(Integer, ForeignKey("entities.id"), index=True) class AIPrompt(Base): __tablename__ = "ai_prompts" id = Column(Integer, primary_key=True, index=True) job_id = Column(Integer, ForeignKey("jobs.id"), index=True) prompt_text = Column(String, index=True) category = Column(String) # transactional, informational, comparison class AIResponse(Base): __tablename__ = "ai_responses" id = Column(Integer, primary_key=True, index=True) prompt_id = Column(Integer, ForeignKey("ai_prompts.id"), index=True) model_name = Column(String, index=True) # gpt-4, claude-3, perplexity raw_content = Column(Text) # Analysis Fields mentioned_brand = Column(String, nullable=True) mention_found = Column(Integer, default=0) # 0 or 1 rank_position = Column(Integer, nullable=True) sentiment_score = Column(Integer, default=0) competitors_mentioned = Column(JSON, default=list) # ["Ahrefs", "Semrush"] created_at = Column(DateTime, default=datetime.utcnow, index=True) class ProjectSettings(Base): __tablename__ = "project_settings" id = Column(Integer, primary_key=True, index=True) job_id = Column(Integer, ForeignKey("jobs.id"), unique=True, index=True) autopilot_enabled = Column(Boolean, default=False) budget_limit = Column(Integer, default=1000) # Simulation units/USD preferred_mix = Column(JSON, default=lambda: {"content": 70, "outreach": 20, "tech": 10}) class SuccessLog(Base): __tablename__ = "success_logs" id = Column(Integer, primary_key=True, index=True) action_type = Column(String, index=True) avg_roi = Column(Integer, default=0) success_count = Column(Integer, default=0) fail_count = Column(Integer, default=0) def init_db(): Base.metadata.create_all(bind=engine)