jhjv / app /models /__init__.py
Docfile's picture
Upload 76 files
d324dde verified
import secrets
import string
from datetime import date, datetime, timedelta, timezone
from flask import current_app
from flask_login import UserMixin
from app import db, login_manager
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
class AppSettings(db.Model):
"""Model for storing application settings that can be modified from admin panel"""
__tablename__ = "app_settings"
id = db.Column(db.Integer, primary_key=True)
key = db.Column(db.String(100), unique=True, nullable=False)
value = db.Column(db.Text, nullable=True)
description = db.Column(db.String(255), nullable=True)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
@staticmethod
def get_setting(key, default=None):
"""Get a setting value by key"""
setting = AppSettings.query.filter_by(key=key).first()
return setting.value if setting else default
@staticmethod
def set_setting(key, value, description=None):
"""Set a setting value"""
setting = AppSettings.query.filter_by(key=key).first()
if setting:
setting.value = value
if description:
setting.description = description
else:
setting = AppSettings(key=key, value=value, description=description)
db.session.add(setting)
db.session.commit()
return setting
@staticmethod
def get_app_name():
"""Get the application name"""
return AppSettings.get_setting("app_name", "Apex Ores")
@staticmethod
def get_app_logo():
"""Get the application logo URL"""
return AppSettings.get_setting("app_logo", None)
@staticmethod
def get_fake_user_count():
"""Get the fake user count added by admin for display purposes"""
value = AppSettings.get_setting("fake_user_count", "0")
try:
return int(value)
except (ValueError, TypeError):
return 0
@staticmethod
def set_fake_user_count(count):
"""Set the fake user count"""
AppSettings.set_setting(
"fake_user_count",
str(count),
"Nombre d'utilisateurs fictifs ajoutés pour l'affichage",
)
@staticmethod
def get_total_displayed_users():
"""Get total users to display (real + fake)"""
real_users = User.query.count()
fake_users = AppSettings.get_fake_user_count()
return real_users + fake_users
class User(UserMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False, default="Utilisateur")
phone = db.Column(db.String(20), unique=True, nullable=False)
country_code = db.Column(db.String(5), nullable=False)
password_hash = db.Column(db.String(255), nullable=False)
referral_code = db.Column(db.String(10), unique=True, nullable=False)
referred_by_code = db.Column(db.String(10), nullable=True)
balance = db.Column(db.Float, default=0.0)
bonus_balance = db.Column(db.Float, default=0.0)
total_gains = db.Column(db.Float, default=0.0)
# Referral earnings tracking
referral_earnings = db.Column(db.Float, default=0.0) # Total earned from referrals
# Registration bonus tracking - bonus is locked until first subscription
registration_bonus = db.Column(db.Float, default=0.0)
registration_bonus_unlocked = db.Column(db.Boolean, default=False)
# Track if user has seen the welcome popup
has_seen_welcome_popup = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
last_login = db.Column(db.DateTime, nullable=True)
last_daily_bonus = db.Column(db.Date, nullable=True)
account_active = db.Column(db.Boolean, default=True)
is_admin = db.Column(db.Boolean, default=False)
metals = db.relationship(
"UserMetal", backref="user", lazy=True, cascade="all, delete-orphan"
)
transactions = db.relationship(
"Transaction", backref="user", lazy=True, cascade="all, delete-orphan"
)
notifications = db.relationship(
"Notification", backref="user", lazy=True, cascade="all, delete-orphan"
)
def __init__(self, phone, country_code, password, name="Utilisateur"):
self.name = name
self.phone = phone
self.country_code = country_code
self.password_hash = password
self.referral_code = self.generate_referral_code()
def generate_referral_code(self):
while True:
code = "".join(
secrets.choice(string.ascii_uppercase + string.digits) for _ in range(6)
)
if not User.query.filter_by(referral_code=code).first():
return code
@property
def has_active_subscription(self):
"""Check if user has ever purchased a subscription plan"""
return UserMetal.query.filter_by(user_id=self.id).count() > 0
@property
def display_balance(self):
"""Total balance displayed to user (includes locked bonus)"""
return self.balance + (
self.registration_bonus if not self.registration_bonus_unlocked else 0
)
@property
def withdrawable_balance(self):
"""Balance available for withdrawal (excludes locked bonus)"""
return self.balance
@property
def locked_bonus(self):
"""Amount of registration bonus still locked"""
if self.registration_bonus_unlocked:
return 0
return self.registration_bonus
def unlock_registration_bonus(self):
"""Unlock the registration bonus and add it to main balance"""
if not self.registration_bonus_unlocked and self.registration_bonus > 0:
self.balance += self.registration_bonus
self.registration_bonus_unlocked = True
return True
return False
def get_referrer(self):
"""Get the user who referred this user"""
if self.referred_by_code:
return User.query.filter_by(referral_code=self.referred_by_code).first()
return None
def get_referrals(self):
"""Get all users referred by this user"""
return User.query.filter_by(referred_by_code=self.referral_code).all()
def get_referral_count(self):
"""Get count of users referred by this user"""
return User.query.filter_by(referred_by_code=self.referral_code).count()
class Metal(db.Model):
__tablename__ = "metals"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
description = db.Column(db.Text)
price = db.Column(db.Float, nullable=False)
daily_gain = db.Column(db.Float, nullable=False)
cycle_days = db.Column(db.Integer, nullable=False)
image_url = db.Column(db.String(255))
is_active = db.Column(db.Boolean, default=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
user_metals = db.relationship(
"UserMetal", backref="metal", lazy=True, cascade="all, delete-orphan"
)
@property
def total_return(self):
return self.daily_gain * self.cycle_days
class UserMetal(db.Model):
__tablename__ = "user_metals"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
metal_id = db.Column(db.Integer, db.ForeignKey("metals.id"), nullable=False)
purchase_date = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
expiry_date = db.Column(db.DateTime, nullable=False)
last_gain_date = db.Column(db.Date, default=date.today)
is_active = db.Column(db.Boolean, default=True)
purchase_price = db.Column(db.Float, nullable=False)
class Transaction(db.Model):
__tablename__ = "transactions"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
type = db.Column(db.String(20), nullable=False)
amount = db.Column(db.Float, nullable=False)
description = db.Column(db.String(255))
status = db.Column(db.String(20), default="pending")
# Withdrawal specific fields
gross_amount = db.Column(db.Float, nullable=True) # Amount before fees
fee_amount = db.Column(db.Float, nullable=True) # Fee amount
net_amount = db.Column(
db.Float, nullable=True
) # Amount after fees (what user receives)
# 24h delay processing
scheduled_process_time = db.Column(
db.DateTime, nullable=True
) # When the withdrawal can be processed
admin_action = db.Column(
db.String(20), nullable=True
) # 'approved', 'rejected', or None (auto-approved after 24h)
admin_action_time = db.Column(db.DateTime, nullable=True) # When admin took action
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
processed_at = db.Column(db.DateTime, nullable=True)
@staticmethod
def get_withdrawal_fee_percentage():
"""Get withdrawal fee percentage from config"""
try:
return current_app.config.get("WITHDRAWAL_FEE_PERCENTAGE", 0.15)
except RuntimeError:
# Outside application context, use default
return 0.15
@staticmethod
def get_withdrawal_delay_hours():
"""Get withdrawal delay in hours from config"""
try:
return current_app.config.get("WITHDRAWAL_DELAY_HOURS", 24)
except RuntimeError:
return 24
@staticmethod
def calculate_withdrawal_fee(amount):
"""Calculate withdrawal fee based on config percentage"""
fee_percentage = Transaction.get_withdrawal_fee_percentage()
fee = amount * fee_percentage
net = amount - fee
return {
"gross": amount,
"fee": fee,
"net": net,
"fee_percentage": fee_percentage * 100,
}
@staticmethod
def calculate_scheduled_process_time(from_time=None):
"""
Calculate when a withdrawal should be processed (configurable delay, business days only)
If withdrawal is on Friday, Saturday, or Sunday, schedule for Monday + delay
"""
if from_time is None:
from_time = datetime.now(timezone.utc)
delay_hours = Transaction.get_withdrawal_delay_hours()
# Add delay hours
process_time = from_time + timedelta(hours=delay_hours)
# Check if it falls on a weekend (Saturday=5, Sunday=6)
while process_time.weekday() in [5, 6]:
# Move to next day
process_time += timedelta(days=1)
return process_time
def can_be_auto_processed(self):
"""Check if withdrawal can be auto-processed (delay passed and no admin action)"""
if self.type != "withdrawal" or self.status != "pending":
return False
if self.admin_action is not None:
return False
if self.scheduled_process_time is None:
return False
now = datetime.now(timezone.utc)
# Check if current time is a business day
if now.weekday() in [5, 6]:
return False
return now >= self.scheduled_process_time
@staticmethod
def get_total_withdrawal_fees():
"""Get total fees collected from all completed withdrawals"""
total = (
db.session.query(db.func.sum(Transaction.fee_amount))
.filter(
Transaction.type == "withdrawal",
Transaction.status.in_(["approved", "completed"]),
Transaction.fee_amount.isnot(None),
)
.scalar()
)
return total or 0
@staticmethod
def get_total_withdrawals_amount():
"""Get total amount of all completed withdrawals (gross amount)"""
total = (
db.session.query(db.func.sum(Transaction.gross_amount))
.filter(
Transaction.type == "withdrawal",
Transaction.status.in_(["approved", "completed"]),
Transaction.gross_amount.isnot(None),
)
.scalar()
)
return total or 0
class ReferralCommission(db.Model):
__tablename__ = "referral_commissions"
id = db.Column(db.Integer, primary_key=True)
referrer_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
referred_user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
level = db.Column(db.Integer, nullable=False)
commission_type = db.Column(
db.String(20), nullable=False
) # 'purchase' or 'daily_gain'
commission_percentage = db.Column(db.Float, nullable=False)
commission_amount = db.Column(db.Float, nullable=False)
purchase_amount = db.Column(db.Float, nullable=True) # For purchase commissions
gain_amount = db.Column(db.Float, nullable=True) # For daily gain commissions
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
referrer = db.relationship(
"User", foreign_keys=[referrer_id], backref="commissions_earned"
)
referred_user = db.relationship(
"User", foreign_keys=[referred_user_id], backref="commissions_generated"
)
@staticmethod
def get_purchase_commission_rate():
"""Get purchase commission rate from config (default 15%)"""
try:
return current_app.config.get("REFERRAL_PURCHASE_COMMISSION", 0.15)
except RuntimeError:
# Outside application context, use default
return 0.15
@staticmethod
def get_daily_gain_commission_rate():
"""Get daily gain commission rate from config (default 3%)"""
try:
return current_app.config.get("REFERRAL_DAILY_GAIN_COMMISSION", 0.03)
except RuntimeError:
# Outside application context, use default
return 0.03
@staticmethod
def calculate_purchase_commission(purchase_amount):
"""Calculate commission on plan purchase using config rate"""
rate = ReferralCommission.get_purchase_commission_rate()
return purchase_amount * rate
@staticmethod
def calculate_daily_gain_commission(gain_amount):
"""Calculate commission on daily gains using config rate"""
rate = ReferralCommission.get_daily_gain_commission_rate()
return gain_amount * rate
@staticmethod
def create_purchase_commission(referrer, referred_user, purchase_amount):
"""Create a commission entry for a plan purchase"""
rate = ReferralCommission.get_purchase_commission_rate()
commission_amount = purchase_amount * rate
commission = ReferralCommission(
referrer_id=referrer.id,
referred_user_id=referred_user.id,
level=1,
commission_type="purchase",
commission_percentage=rate * 100,
commission_amount=commission_amount,
purchase_amount=purchase_amount,
)
# Add commission to referrer's balance
referrer.balance += commission_amount
referrer.referral_earnings = (
referrer.referral_earnings or 0
) + commission_amount
db.session.add(commission)
return commission
@staticmethod
def create_daily_gain_commission(referrer, referred_user, gain_amount):
"""Create a commission entry for daily gains"""
rate = ReferralCommission.get_daily_gain_commission_rate()
commission_amount = gain_amount * rate
commission = ReferralCommission(
referrer_id=referrer.id,
referred_user_id=referred_user.id,
level=1,
commission_type="daily_gain",
commission_percentage=rate * 100,
commission_amount=commission_amount,
gain_amount=gain_amount,
)
# Add commission to referrer's balance
referrer.balance += commission_amount
referrer.referral_earnings = (
referrer.referral_earnings or 0
) + commission_amount
db.session.add(commission)
return commission
class Notification(db.Model):
__tablename__ = "notifications"
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey("users.id"), nullable=False)
title = db.Column(db.String(100), nullable=False)
message = db.Column(db.Text, nullable=False)
type = db.Column(db.String(20), default="info")
is_read = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
class ScheduledTask(db.Model):
"""Model for database-based scheduled tasks - replaces cron jobs"""
__tablename__ = "scheduled_tasks"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), unique=True, nullable=False)
description = db.Column(db.String(255))
# Configuration
schedule_type = db.Column(db.String(20), default="daily") # daily, hourly, interval
schedule_time = db.Column(db.Time, nullable=True) # Heure exacte (ex: 00:05:00)
interval_minutes = db.Column(db.Integer, nullable=True) # Ou intervalle
# État
is_active = db.Column(db.Boolean, default=True)
last_run_at = db.Column(db.DateTime, nullable=True)
next_run_at = db.Column(db.DateTime, nullable=True)
last_status = db.Column(
db.String(20), default="pending"
) # pending, running, success, failed
last_error = db.Column(db.Text, nullable=True)
run_count = db.Column(db.Integer, default=0)
fail_count = db.Column(db.Integer, default=0)
# Métadonnées
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
)
def __repr__(self):
return f"<ScheduledTask {self.name}>"
def should_run(self):
"""Vérifie si la tâche doit s'exécuter maintenant"""
from datetime import timezone
if not self.is_active:
return False
if not self.next_run_at:
self.calculate_next_run()
return True
now = datetime.now(timezone.utc)
next_run = self.next_run_at
if next_run.tzinfo is None:
next_run = next_run.replace(tzinfo=timezone.utc)
return now >= next_run
def calculate_next_run(self):
"""Calcule la prochaine exécution"""
from datetime import date, time, timedelta
now = datetime.now(timezone.utc)
if self.schedule_type == "daily" and self.schedule_time:
# Prochaine exécution à l'heure définie
next_run = datetime.combine(date.today(), self.schedule_time)
next_run = next_run.replace(tzinfo=timezone.utc)
if next_run <= now:
next_run += timedelta(days=1)
self.next_run_at = next_run
elif self.schedule_type == "hourly":
# Toutes les heures
self.next_run_at = now.replace(
minute=0, second=0, microsecond=0
) + timedelta(hours=1)
elif self.schedule_type == "interval" and self.interval_minutes:
# Intervalle personnalisé
if self.last_run_at:
self.next_run_at = self.last_run_at + timedelta(
minutes=self.interval_minutes
)
else:
self.next_run_at = now
def mark_running(self):
"""Marque la tâche comme en cours"""
self.last_status = "running"
db.session.commit()
def mark_success(self):
"""Marque la tâche comme réussie"""
self.last_run_at = datetime.now(timezone.utc)
self.run_count += 1
self.last_status = "success"
self.last_error = None
self.calculate_next_run()
db.session.commit()
def mark_failed(self, error_message):
"""Marque la tâche comme échouée"""
self.last_run_at = datetime.now(timezone.utc)
self.fail_count += 1
self.last_status = "failed"
self.last_error = str(error_message)[:500] # Limiter la taille
self.calculate_next_run()
db.session.commit()
class TaskExecutionLog(db.Model):
"""Log d'exécution des tâches planifiées"""
__tablename__ = "task_execution_logs"
id = db.Column(db.Integer, primary_key=True)
task_id = db.Column(db.Integer, db.ForeignKey("scheduled_tasks.id"), nullable=False)
task_name = db.Column(db.String(100), nullable=False)
started_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
completed_at = db.Column(db.DateTime, nullable=True)
status = db.Column(db.String(20), default="running") # running, success, failed
error_message = db.Column(db.Text, nullable=True)
output_log = db.Column(db.Text, nullable=True)
# Pour debug
server_hostname = db.Column(db.String(100), nullable=True)
process_id = db.Column(db.Integer, nullable=True)
task = db.relationship(
"ScheduledTask", backref=db.backref("execution_logs", lazy="dynamic")
)
def __repr__(self):
return f"<TaskExecutionLog {self.task_name} {self.status}>"