misakovhearst
Initial deploy
48c7fed
import uuid
from datetime import datetime
from typing import Optional, List
from .models import AnalysisResult, Session
import json
class DatabaseManager:
"""Manager for database operations"""
@staticmethod
def save_analysis_result(
filename: str,
file_format: str,
file_size: int,
text_length: int,
word_count: int,
overall_ai_score: float,
overall_confidence: str,
detector_results: dict,
text_preview: str = "",
user_id: Optional[str] = None,
analysis_status: str = "completed",
error_message: Optional[str] = None,
report_html_path: Optional[str] = None
) -> Optional[AnalysisResult]:
"""
Save an analysis result to the database.
Returns:
AnalysisResult object or None if save failed
"""
try:
session = Session.get_session()
# Generate unique file ID
file_id = str(uuid.uuid4())
result = AnalysisResult(
file_id=file_id,
filename=filename,
file_format=file_format,
file_size=file_size,
text_length=text_length,
word_count=word_count,
overall_ai_score=overall_ai_score,
overall_confidence=overall_confidence,
detector_results=detector_results,
text_preview=text_preview[:500],
user_id=user_id,
analysis_status=analysis_status,
error_message=error_message,
report_html_path=report_html_path,
upload_timestamp=datetime.utcnow()
)
session.add(result)
session.commit()
# Refresh to get ID
session.refresh(result)
return result
except Exception as e:
print(f"Error saving analysis result: {e}")
return None
finally:
session.close()
@staticmethod
def get_result_by_id(result_id: int) -> Optional[AnalysisResult]:
"""Get analysis result by ID"""
try:
session = Session.get_session()
result = session.query(AnalysisResult).filter(
AnalysisResult.id == result_id
).first()
return result
except Exception as e:
print(f"Error retrieving result: {e}")
return None
finally:
session.close()
@staticmethod
def get_result_by_file_id(file_id: str) -> Optional[AnalysisResult]:
"""Get analysis result by file ID"""
try:
session = Session.get_session()
result = session.query(AnalysisResult).filter(
AnalysisResult.file_id == file_id
).first()
return result
except Exception as e:
print(f"Error retrieving result: {e}")
return None
finally:
session.close()
@staticmethod
def get_all_results(
limit: int = 100,
offset: int = 0,
order_by: str = "upload_timestamp_desc"
) -> List[AnalysisResult]:
"""
Get all analysis results with pagination.
Args:
limit: Number of results to return
offset: Number of results to skip
order_by: Sort order (upload_timestamp_desc, upload_timestamp_asc, score_desc, score_asc)
Returns:
List of AnalysisResult objects
"""
try:
session = Session.get_session()
query = session.query(AnalysisResult)
# Apply ordering
if order_by == "upload_timestamp_desc":
query = query.order_by(AnalysisResult.upload_timestamp.desc())
elif order_by == "upload_timestamp_asc":
query = query.order_by(AnalysisResult.upload_timestamp.asc())
elif order_by == "score_desc":
query = query.order_by(AnalysisResult.overall_ai_score.desc())
elif order_by == "score_asc":
query = query.order_by(AnalysisResult.overall_ai_score.asc())
results = query.limit(limit).offset(offset).all()
return results
except Exception as e:
print(f"Error retrieving results: {e}")
return []
finally:
session.close()
@staticmethod
def get_results_summary() -> dict:
"""Get summary statistics of all results"""
try:
session = Session.get_session()
total = session.query(AnalysisResult).count()
if total == 0:
return {
"total_analyses": 0,
"average_ai_score": 0,
"total_text_analyzed": 0,
"likely_human": 0,
"suspicious": 0,
"likely_ai": 0,
}
from sqlalchemy import func
# Calculate averages
avg_score = session.query(func.avg(AnalysisResult.overall_ai_score)).scalar() or 0
total_text = session.query(func.sum(AnalysisResult.text_length)).scalar() or 0
# Count by confidence
likely_human = session.query(AnalysisResult).filter(
AnalysisResult.overall_ai_score < 0.3
).count()
suspicious = session.query(AnalysisResult).filter(
(AnalysisResult.overall_ai_score >= 0.3) &
(AnalysisResult.overall_ai_score < 0.7)
).count()
likely_ai = session.query(AnalysisResult).filter(
AnalysisResult.overall_ai_score >= 0.7
).count()
return {
"total_analyses": total,
"average_ai_score": round(avg_score, 3),
"total_text_analyzed": total_text,
"likely_human": likely_human,
"suspicious": suspicious,
"likely_ai": likely_ai,
}
except Exception as e:
print(f"Error getting summary: {e}")
return {}
finally:
session.close()
@staticmethod
def delete_result(result_id: int) -> bool:
"""Delete an analysis result"""
try:
session = Session.get_session()
result = session.query(AnalysisResult).filter(
AnalysisResult.id == result_id
).first()
if result:
session.delete(result)
session.commit()
return True
return False
except Exception as e:
print(f"Error deleting result: {e}")
return False
finally:
session.close()
@staticmethod
def update_result(result_id: int, **kwargs) -> Optional[AnalysisResult]:
"""Update an analysis result"""
try:
session = Session.get_session()
result = session.query(AnalysisResult).filter(
AnalysisResult.id == result_id
).first()
if result:
for key, value in kwargs.items():
if hasattr(result, key):
setattr(result, key, value)
session.commit()
session.refresh(result)
return result
except Exception as e:
print(f"Error updating result: {e}")
return None
finally:
session.close()