Neha Singh
DB bug
9c1fd0e
"""
db.py
-----
Handles MongoDB connection and operations for HireScope AI.
"""
import logging
import os
from pymongo import MongoClient
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
from dotenv import load_dotenv
from bson.objectid import ObjectId
load_dotenv()
logger = logging.getLogger(__name__)
# Initialize MongoDB Connection
MONGO_URI = os.getenv("MONGO_URI", "")
if not MONGO_URI:
logger.error("MONGO_URI environment variable is not set!")
raise RuntimeError("MONGO_URI environment variable is required. Set it in your .env file.")
client = MongoClient(MONGO_URI)
# Database Name
db = client.get_database("resume_screener")
# Collections
users_collection = db.get_collection("users")
jobs_collection = db.get_collection("jobs")
candidates_collection = db.get_collection("candidates")
trash_candidates_collection = db.get_collection("trash_candidates")
# ── User Operations ──
def create_user(username, email, password, role="recruiter"):
if users_collection.find_one({"email": email}):
return False, "Email already exists"
hashed_password = generate_password_hash(password)
user_data = {
"username": username,
"email": email,
"password": hashed_password,
"role": role,
"created_at": datetime.utcnow()
}
users_collection.insert_one(user_data)
return True, "User created successfully"
def authenticate_user(email, password):
user = users_collection.find_one({"email": email})
if user and check_password_hash(user["password"], password):
# Don't return the password hash in the user object
user['_id'] = str(user['_id'])
del user['password']
return user
return None
# ── Candidate Operations ──
def insert_candidate(candidate_data):
"""
Candidate data should include name, filename, text, skills, match_score, etc.
"""
candidate_data["created_at"] = datetime.utcnow()
result = candidates_collection.insert_one(candidate_data)
return str(result.inserted_id)
def get_all_candidates(user_id=None):
query = {}
if user_id:
query["uploaded_by"] = user_id
candidates = list(candidates_collection.find(query).sort("match_score", -1))
for c in candidates:
c['_id'] = str(c['_id'])
return candidates
def get_candidate_by_id(candidate_id):
try:
candidate = candidates_collection.find_one({"_id": ObjectId(candidate_id)})
if candidate:
candidate['_id'] = str(candidate['_id'])
return candidate
except Exception:
return None
def _candidate_filter(candidate_id):
return {"_id": ObjectId(candidate_id)}
def set_candidate_audio_processing(candidate_id, audio_url=None):
candidates_collection.update_one(
_candidate_filter(candidate_id),
{
"$set": {
"audio_transcription": {
"status": "processing",
"text": "",
"language": "",
"error": None,
"audio_url": audio_url,
"updated_at": datetime.utcnow(),
}
}
},
)
def update_candidate_audio(candidate_id, audio_text, language, audio_url=None):
candidates_collection.update_one(
_candidate_filter(candidate_id),
{
"$set": {
"audio_transcription": {
"status": "completed",
"text": audio_text,
"language": language,
"error": None,
"audio_url": audio_url,
"updated_at": datetime.utcnow(),
}
}
},
)
def update_candidate_audio_error(candidate_id, error_msg, audio_url=None):
candidates_collection.update_one(
_candidate_filter(candidate_id),
{
"$set": {
"audio_transcription": {
"status": "failed",
"text": "",
"language": "",
"error": error_msg,
"audio_url": audio_url,
"updated_at": datetime.utcnow(),
}
}
},
)
def clear_all_candidates(user_id=None):
query = {}
if user_id:
query["uploaded_by"] = user_id
candidates_to_trash = list(candidates_collection.find(query))
if candidates_to_trash:
for c in candidates_to_trash:
c["deleted_at"] = datetime.utcnow()
trash_candidates_collection.insert_many(candidates_to_trash)
candidates_collection.delete_many(query)