detector-ai / database.py
irham111's picture
Initial commit: D'Tector.AI
16e59e0
# database.py - COMPLETE VERSION
from sqlalchemy import create_engine, Column, Integer, String, Float, DateTime, Text, Boolean, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, relationship
from datetime import datetime
import os
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
email = Column(String(100), unique=True, nullable=True)
phone = Column(String(20), unique=True, nullable=True)
username = Column(String(50), unique=True, nullable=False)
password_hash = Column(String(200), nullable=False)
full_name = Column(String(100))
is_verified = Column(Boolean, default=False)
verification_code = Column(String(10))
verification_code_expires = Column(DateTime)
reset_token = Column(String(100), nullable=True)
reset_token_expires = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_login = Column(DateTime)
detections = relationship('DetectionHistory', back_populates='user', cascade='all, delete-orphan')
class DetectionHistory(Base):
__tablename__ = 'detection_history'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey('users.id', ondelete='CASCADE'), nullable=True)
text_preview = Column(String(200))
text_full = Column(Text, nullable=True)
file_name = Column(String(200), nullable=True)
file_type = Column(String(50), nullable=True)
ai_probability = Column(Float)
human_probability = Column(Float)
confidence = Column(Float)
is_ai = Column(Integer)
is_pinned = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
user = relationship('User', back_populates='detections')
class Database:
def __init__(self):
# Cek apakah di production (Railway / Hugging Face Spaces)
DATABASE_URL = os.environ.get('DATABASE_URL')
if DATABASE_URL:
# Gunakan PostgreSQL
self.engine = create_engine(DATABASE_URL)
else:
# Gunakan SQLite untuk development
self.engine = create_engine('sqlite:///detections.db', connect_args={'check_same_thread': False})
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def get_session(self):
return self.Session()
# ============ USER METHODS ============
def get_user_by_id(self, user_id):
session = self.Session()
try:
return session.query(User).filter(User.id == user_id).first()
finally:
session.close()
def get_user_by_email(self, email):
if not email:
return None
session = self.Session()
try:
return session.query(User).filter(User.email == email).first()
finally:
session.close()
def get_user_by_phone(self, phone):
if not phone:
return None
session = self.Session()
try:
return session.query(User).filter(User.phone == phone).first()
finally:
session.close()
def get_user_by_username(self, username):
if not username:
return None
session = self.Session()
try:
return session.query(User).filter(User.username == username).first()
finally:
session.close()
def create_user(self, username, password_hash, email=None, phone=None, full_name=None, is_verified=False):
session = self.Session()
try:
user = User(
username=username,
password_hash=password_hash,
email=email if email else None,
phone=phone if phone else None,
full_name=full_name,
is_verified=is_verified
)
session.add(user)
session.commit()
session.refresh(user)
return user
except Exception as e:
session.rollback()
print(f"Error creating user: {e}")
return None
finally:
session.close()
def update_user(self, user_id, **kwargs):
session = self.Session()
try:
user = session.query(User).filter(User.id == user_id).first()
if user:
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
session.commit()
return True
return False
except Exception as e:
session.rollback()
print(f"Error updating user: {e}")
return False
finally:
session.close()
# ============ VERIFICATION METHODS ============
def set_verification_code(self, user_id, code, expires):
session = self.Session()
try:
user = session.query(User).filter(User.id == user_id).first()
if user:
user.verification_code = code
user.verification_code_expires = expires
session.commit()
return True
return False
finally:
session.close()
def verify_user(self, user_id, code):
session = self.Session()
try:
user = session.query(User).filter(User.id == user_id).first()
if user and user.verification_code == code:
if user.verification_code_expires and user.verification_code_expires > datetime.utcnow():
user.is_verified = True
user.verification_code = None
user.verification_code_expires = None
session.commit()
return True
return False
finally:
session.close()
# ============ DETECTION METHODS ============
def add_detection(self, text, result, user_id=None, file_name=None, file_type=None):
session = self.Session()
try:
history = DetectionHistory(
user_id=user_id,
text_preview=text[:200] + "..." if len(text) > 200 else text,
text_full=text[:5000],
file_name=file_name,
file_type=file_type,
ai_probability=result['ai_probability'],
human_probability=result['human_probability'],
confidence=result['confidence'],
is_ai=1 if result['is_ai'] else 0
)
session.add(history)
session.commit()
return history.id
except Exception as e:
print(f"Error saving detection: {e}")
session.rollback()
return None
finally:
session.close()
def get_user_history(self, user_id, limit=50):
session = self.Session()
try:
history = session.query(DetectionHistory).filter(
DetectionHistory.user_id == user_id
).order_by(
DetectionHistory.is_pinned.desc(),
DetectionHistory.created_at.desc()
).limit(limit).all()
return history
finally:
session.close()
def get_all_history(self, limit=50):
session = self.Session()
try:
history = session.query(DetectionHistory).order_by(
DetectionHistory.created_at.desc()
).limit(limit).all()
return history
finally:
session.close()
def delete_detection(self, detection_id, user_id):
session = self.Session()
try:
detection = session.query(DetectionHistory).filter(
DetectionHistory.id == detection_id,
DetectionHistory.user_id == user_id
).first()
if detection:
session.delete(detection)
session.commit()
return True
return False
finally:
session.close()
def toggle_pin(self, detection_id, user_id):
session = self.Session()
try:
detection = session.query(DetectionHistory).filter(
DetectionHistory.id == detection_id,
DetectionHistory.user_id == user_id
).first()
if detection:
detection.is_pinned = not detection.is_pinned
session.commit()
return detection.is_pinned
return None
finally:
session.close()
def get_user_stats(self, user_id):
session = self.Session()
try:
total = session.query(DetectionHistory).filter(DetectionHistory.user_id == user_id).count()
ai_count = session.query(DetectionHistory).filter(
DetectionHistory.user_id == user_id,
DetectionHistory.is_ai == 1
).count()
human_count = session.query(DetectionHistory).filter(
DetectionHistory.user_id == user_id,
DetectionHistory.is_ai == 0
).count()
avg_confidence = session.query(DetectionHistory.confidence).filter(
DetectionHistory.user_id == user_id
).all()
avg_conf = sum(c[0] for c in avg_confidence) / len(avg_confidence) if avg_confidence else 0
return {
'total': total,
'ai_count': ai_count,
'human_count': human_count,
'avg_confidence': round(avg_conf, 2)
}
finally:
session.close()
def clear_user_history(self, user_id):
session = self.Session()
try:
session.query(DetectionHistory).filter(DetectionHistory.user_id == user_id).delete()
session.commit()
return True
except:
session.rollback()
return False
finally:
session.close()