Axion / app.py
Mr-Thop's picture
Add
f6af436
from flask import Flask , request , jsonify
from flask_cors import CORS
from dotenv import load_dotenv
load_dotenv()
import logging
from Models import Model
from Parser import Parser
from DB import DB
from Embedder import Embed
from Scheduler import Schedule
from Evaluator import InterviewEvaluator
import json
import os
import pandas as pd
import shutil
import tempfile
# At the top with imports
from werkzeug.utils import secure_filename
app = Flask(__name__)
CORS(app)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
model_4b = os.getenv("model_4b")
model_27b = os.getenv("model_27b")
db = DB()
embedder = Embed()
embedder.create_db(db.string,db.collection)
logging.info("Embedder Loaded Successfully")
evaluator = InterviewEvaluator(model_4b)
logging.info("Evaluator Loaded Successfully")
PROMPT = os.getenv("Prompt")
JD_Prompt = os.getenv("JD_Prompt")
Summary = os.getenv("Summary_Prompt")
parser = Parser()
@app.route("/",methods = ["GET","POST"])
def default_route():
return jsonify({
"output" : "Backend Running Successfully"
})
# After app initialization
UPLOAD_FOLDER = './resume'
ALLOWED_EXTENSIONS = {'pdf'}
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# Add this endpoint (before /parse)
def cleanup_old_csv_files():
"""Helper function to clean up old test.csv files"""
csv_locations = ['/tmp/test.csv', './test.csv']
temp_dir = tempfile.gettempdir()
temp_csv_path = os.path.join(temp_dir, 'test.csv')
if temp_csv_path not in csv_locations:
csv_locations.append(temp_csv_path)
deleted_files = []
for csv_path in csv_locations:
try:
if os.path.exists(csv_path):
os.remove(csv_path)
deleted_files.append(csv_path)
logging.info(f"Deleted old CSV file: {csv_path}")
except Exception as e:
logging.warning(f"Could not delete {csv_path}: {e}")
return deleted_files
@app.route("/cleanup-csv", methods=["POST"])
def cleanup_csv():
"""Endpoint to manually clean up old CSV files"""
try:
deleted_files = cleanup_old_csv_files()
return jsonify({
"output": "CSV cleanup completed",
"deleted_files": deleted_files
})
except Exception as e:
logging.error(f"CSV cleanup error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/check-csv", methods=["GET"])
def check_csv():
"""Check what CSV files currently exist"""
try:
csv_locations = ['/tmp/test.csv', './test.csv']
temp_dir = tempfile.gettempdir()
temp_csv_path = os.path.join(temp_dir, 'test.csv')
if temp_csv_path not in csv_locations:
csv_locations.append(temp_csv_path)
existing_files = {}
for csv_path in csv_locations:
if os.path.exists(csv_path):
try:
# Get file info
stat = os.stat(csv_path)
df = pd.read_csv(csv_path)
existing_files[csv_path] = {
"exists": True,
"size": stat.st_size,
"modified": stat.st_mtime,
"candidates": len(df),
"columns": list(df.columns)
}
except Exception as e:
existing_files[csv_path] = {
"exists": True,
"error": str(e)
}
else:
existing_files[csv_path] = {"exists": False}
return jsonify({
"csv_files": existing_files,
"temp_dir": tempfile.gettempdir()
})
except Exception as e:
logging.error(f"Check CSV error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/test-file-save", methods=["POST"])
def test_file_save():
try:
# Test saving a simple file to different locations
test_content = "Name,Email\nJohn Doe,john@example.com\nJane Smith,jane@example.com"
results = {}
# Test /tmp
try:
with open('/tmp/test.csv', 'w') as f:
f.write(test_content)
results['/tmp/test.csv'] = 'SUCCESS'
logging.info("Successfully wrote to /tmp/test.csv")
except Exception as e:
results['/tmp/test.csv'] = f'FAILED: {str(e)}'
logging.error(f"Failed to write to /tmp/test.csv: {e}")
# Test current directory
try:
with open('./test.csv', 'w') as f:
f.write(test_content)
results['./test.csv'] = 'SUCCESS'
logging.info("Successfully wrote to ./test.csv")
except Exception as e:
results['./test.csv'] = f'FAILED: {str(e)}'
logging.error(f"Failed to write to ./test.csv: {e}")
# Test system temp
try:
temp_dir = tempfile.gettempdir()
temp_path = os.path.join(temp_dir, 'test.csv')
with open(temp_path, 'w') as f:
f.write(test_content)
results[temp_path] = 'SUCCESS'
logging.info(f"Successfully wrote to {temp_path}")
except Exception as e:
results[temp_path] = f'FAILED: {str(e)}'
logging.error(f"Failed to write to {temp_path}: {e}")
return jsonify({
"test_results": results,
"temp_dir": tempfile.gettempdir(),
"current_dir": os.getcwd()
})
except Exception as e:
logging.error(f"Test file save error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/debug-permissions", methods=["GET"])
def debug_permissions():
try:
import tempfile
# Test various directories
test_results = {}
# Test current directory
try:
with open('./test_write.txt', 'w') as f:
f.write('test')
os.remove('./test_write.txt')
test_results['current_dir'] = 'writable'
except Exception as e:
test_results['current_dir'] = f'not writable: {str(e)}'
# Test /tmp directory
try:
with open('/tmp/test_write.txt', 'w') as f:
f.write('test')
os.remove('/tmp/test_write.txt')
test_results['tmp_dir'] = 'writable'
except Exception as e:
test_results['tmp_dir'] = f'not writable: {str(e)}'
# Test temp directory
try:
temp_dir = tempfile.gettempdir()
test_path = os.path.join(temp_dir, 'test_write.txt')
with open(test_path, 'w') as f:
f.write('test')
os.remove(test_path)
test_results['temp_dir'] = f'writable: {temp_dir}'
except Exception as e:
test_results['temp_dir'] = f'not writable: {str(e)}'
return jsonify({
"current_working_dir": os.getcwd(),
"write_permissions": test_results
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/test-upload", methods=["POST"])
def test_upload():
try:
logging.info(f"Test upload - Content type: {request.content_type}")
logging.info(f"Test upload - Files: {list(request.files.keys())}")
logging.info(f"Test upload - Form data: {list(request.form.keys())}")
return jsonify({
"content_type": request.content_type,
"files": list(request.files.keys()),
"form": list(request.form.keys())
})
except Exception as e:
logging.error(f"Test upload error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/upload-candidate-list", methods=["POST"])
def upload_candidate_list():
try:
logging.info(f"Received request with content type: {request.content_type}")
logging.info(f"Request files: {list(request.files.keys())}")
if 'candidate_list' not in request.files:
return jsonify({"error": "No CSV file provided"}), 400
file = request.files['candidate_list']
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
if file and file.filename.endswith('.csv'):
# Delete any existing test.csv files first
csv_locations = ['/tmp/test.csv', './test.csv']
temp_dir = tempfile.gettempdir()
temp_csv_path = os.path.join(temp_dir, 'test.csv')
if temp_csv_path not in csv_locations:
csv_locations.append(temp_csv_path)
for old_csv in csv_locations:
try:
if os.path.exists(old_csv):
os.remove(old_csv)
logging.info(f"Deleted existing CSV file: {old_csv}")
except Exception as delete_error:
logging.warning(f"Could not delete {old_csv}: {delete_error}")
# Use /tmp directory which should be writable
filepath = '/tmp/test.csv'
try:
file.save(filepath)
logging.info(f"Successfully saved new CSV file to: {filepath}")
except Exception as save_error:
logging.error(f"Failed to save to /tmp: {save_error}")
# Try alternative temp directory
temp_dir = tempfile.gettempdir()
filepath = os.path.join(temp_dir, 'test.csv')
try:
file.save(filepath)
logging.info(f"Successfully saved CSV file to alternative temp: {filepath}")
except Exception as alt_error:
logging.error(f"Failed to save to alternative temp: {alt_error}")
return jsonify({"error": f"Cannot save file to any writable location: {alt_error}"}), 500
# Validate CSV format
try:
df = pd.read_csv(filepath)
required_columns = ['Name', 'Email']
if not all(col in df.columns for col in required_columns):
return jsonify({
"error": "CSV must have 'Name' and 'Email' columns",
"found_columns": list(df.columns)
}), 400
# Try to copy to the current directory for backward compatibility (optional)
try:
shutil.copy(filepath, './test.csv')
logging.info("Also copied to ./test.csv")
except Exception as copy_error:
logging.warning(f"Could not copy to ./test.csv (this is OK): {copy_error}")
logging.info(f"Successfully validated CSV with {len(df)} candidates")
return jsonify({
"output": f"Successfully uploaded candidate list with {len(df)} candidates",
"candidates": len(df),
"filepath": filepath,
"replaced_existing": True
})
except Exception as e:
logging.error(f"CSV validation error: {str(e)}")
return jsonify({"error": f"Invalid CSV format: {str(e)}"}), 400
else:
return jsonify({"error": "File must be a CSV"}), 400
except Exception as e:
logging.error(f"CSV upload error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/upload-resumes", methods=["POST"])
def upload_resumes():
try:
if 'resumes' not in request.files:
return jsonify({"error": "No files provided"}), 400
files = request.files.getlist('resumes')
if not files:
return jsonify({"error": "No files selected"}), 400
uploaded_files = []
for file in files:
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
uploaded_files.append(filename)
logging.info(f"Uploaded file: {filename}")
return jsonify({
"output": f"Successfully uploaded {len(uploaded_files)} resume(s)",
"files": uploaded_files
})
except Exception as e:
logging.error(f"Upload error: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route("/parse-batch",methods = ["POST"])
def parse_batch():
"""Process resumes in smaller batches to avoid timeout"""
try:
data = request.get_json()
batch_size = data.get("batch_size", 2) # Process 2 resumes at a time
db.connect_PS()
logging.info("Database Connected Successfully")
# Use the faster 4B model
structure_model = Model(PROMPT, model_4b)
logging.info("Model Loaded Successfully")
files = os.listdir("./resume")
pdf_files = [f for f in files if f.endswith(".pdf")]
if not pdf_files:
db.close_PS()
return jsonify({"error": "No PDF files found in resume folder"}), 400
total_files = len(pdf_files)
processed_count = 0
# Process in batches
for i in range(0, total_files, batch_size):
batch_files = pdf_files[i:i+batch_size]
batch_documents = []
logging.info(f"Processing batch {i//batch_size + 1}: files {i+1}-{min(i+batch_size, total_files)}")
for file_path in batch_files:
try:
path = r"./resume/" + file_path
result = parser.parse(path)
content = result["content"][0]
structured = structure_model.send(content)
doc = embedder.create_document(processed_count, json.dumps(structured))
batch_documents.append(doc)
processed_count += 1
logging.info(f"Successfully processed: {file_path}")
except Exception as e:
logging.error(f"Error processing {file_path}: {str(e)}")
continue
# Add batch to database
if batch_documents:
embedder.add_docs(batch_documents)
logging.info(f"Added batch of {len(batch_documents)} documents to database")
db.close_PS()
return jsonify({
"output": f"Successfully processed {processed_count} out of {total_files} resumes",
"processed": processed_count,
"total_found": total_files
})
except Exception as e:
logging.error(f"Batch parse error: {str(e)}")
try:
db.close_PS()
except:
pass
return jsonify({
"error": "Batch processing failed",
"message": str(e)
}), 500
@app.route("/parse",methods = ["POST"])
def parse():
try:
db.connect_PS()
logging.info("Database Connected Successfully")
# Use the faster 4B model for parsing to reduce timeout
structure_model = Model(PROMPT, model_4b)
logging.info("Model Loaded Successfully")
logging.info("Parsing Resumes")
files = os.listdir("./resume")
contents = []
documents = []
processed_count = 0
for file_path in files:
if file_path.endswith(".pdf"):
try:
path = r"./resume/" + file_path
logging.info(f"Processing: {path}")
result = parser.parse(path)
content = result["content"][0]
contents.append(content)
except Exception as e:
logging.error(f"Error parsing {file_path}: {str(e)}")
continue
if contents:
logging.info(f"Found {len(contents)} resumes to process")
for i in range(len(contents)):
try:
logging.info(f"Structuring resume {i+1}/{len(contents)}")
structured = structure_model.send(contents[i])
doc = embedder.create_document(i, json.dumps(structured))
documents.append(doc)
processed_count += 1
logging.info(f"Successfully processed resume {i+1}")
except Exception as e:
logging.error(f"Error structuring resume {i+1}: {str(e)}")
continue
if documents:
embedder.add_docs(documents)
logging.info(f"Successfully embedded {len(documents)} resumes")
db.close_PS()
return jsonify({
"output": f"Successfully processed {processed_count} resumes",
"processed": processed_count,
"total_found": len(contents)
})
except Exception as e:
logging.error(f"Parse endpoint error: {str(e)}")
try:
db.close_PS()
except:
pass
return jsonify({
"error": "Processing failed",
"message": str(e)
}), 500
@app.route("/match", methods = ["POST"])
def match():
db.connect_PS()
data = request.get_json()
JD = data.get("job_description")
k = data.get("candidates")
out = Model(JD_Prompt, model_4b).send(JD)
summarizer = Model(Summary,model_4b)
result = embedder.match(json.dumps(out),k)
output = []
for i in result:
content = {}
content["Name"] = i.metadata["name"]
content["Email"] = i.metadata["email"]
content["content"] = summarizer.send(json.dumps(i.page_content))
output.append(content)
db.close_PS()
return jsonify(output)
@app.route("/interview1", methods = ["POST"])
def interview():
# 0 for non_tech and 1 for tech questions
data = request.get_json()
q_id = data.get("id")
questions = evaluator.ask(q_id)
return jsonify(questions)
@app.route("/interview",methods=["POST"])
def edit():
data = request.get_json()
# 0 for non_tech and 1 for tech questions
q_id = data.get("id")
questions = data.get("questions")
evaluator.edit(questions,q_id)
return jsonify({"output" : "Questions Edited Successfully"})
@app.route("/evaluate",methods = ["POST"])
def score():
db.connect_MS()
data = request.get_json()
candid = data.get("user_output")
email = data.get("email")
password = data.get("password")
result = evaluator.score_candid(candid)
score = result["score"]
query = f"Update score = %s from users where Email_id = '%s' and pasword = '%s'"
db.cursor_MS.execute(query,(score,email,password))
db.connection_MS.commit()
db.close_MS()
return jsonify(result)
def create_user(scheduler):
try:
db.connect_MS()
df = scheduler.df
for name, email, slot in zip(df["Name"], df["Email"], df["Slot"]):
password = name[:5] + email[:5]
# Extract date and time from slot string
slot_parts = slot.split(" - ")
interview_datetime = slot_parts[0] # "2024-03-15 09:00"
date_part, time_part = interview_datetime.split(" ")
# Insert individual values, not as a list
db.insert([name, email, date_part, time_part, password, 0.0])
db.connection_MS.commit()
db.close_MS()
logging.info(f"Successfully created {len(df)} user records")
except Exception as e:
logging.error(f"Error creating users: {str(e)}")
try:
db.close_MS()
except:
pass
raise e
@app.route("/schedule", methods = ["POST"])
def email():
try:
data = request.get_json()
date = data.get("date")
time = data.get("time")
slot_length = data.get("length")
# Validate input
if not date or not time or not slot_length:
return jsonify({"error": "Missing required fields: date, time, or length"}), 400
# Check for CSV file in multiple locations
csv_paths = ["/tmp/test.csv", "./test.csv"]
# Also check system temp directory
temp_dir = tempfile.gettempdir()
temp_csv_path = os.path.join(temp_dir, 'test.csv')
if temp_csv_path not in csv_paths:
csv_paths.insert(1, temp_csv_path)
csv_path = None
for path in csv_paths:
if os.path.exists(path):
csv_path = path
logging.info(f"Found CSV file at: {csv_path}")
break
if not csv_path:
return jsonify({"error": "Candidate list file not found. Please upload a CSV file first."}), 400
scheduler = Schedule()
scheduler.defaults(date, time, slot_length)
scheduler.schedule_slots(csv_path)
# Create user records in database
create_user(scheduler)
# Email sending disabled - just log what would have been sent
# scheduler.send_emails()
logging.info(f"Email sending disabled. Would have sent invitations to {len(scheduler.df)} candidates.")
return jsonify({
"output": "Interview slots scheduled successfully (email sending disabled)",
"scheduled_count": len(scheduler.df),
"note": "Candidates have been scheduled but no emails were sent"
})
except FileNotFoundError as e:
logging.error(f"File not found: {str(e)}")
return jsonify({"error": "CSV file not found", "message": str(e)}), 400
except Exception as e:
logging.error(f"Scheduling error: {str(e)}")
return jsonify({"error": "Failed to schedule interviews", "message": str(e)}), 500
@app.route("/login-user",methods=["POST"])
def login_u():
db.connect_MS()
data = request.get_json()
email = data.get("email")
password = data.get("pass")
query = f"SELECT * FROM user WHERE email_id = %s AND pasword = %s"
db.cursor_MS.execute(query,(email,password))
result = db.cursor_MS.fetchone()
db.close_MS()
if result:
return jsonify({"user": "True"})
else:
return jsonify({"user": "False"})
@app.route("/login-org",methods=["POST"])
def login_o():
db.connect_MS()
data = request.get_json()
email = data.get("email")
password = data.get("pass")
query = "SELECT * FROM org WHERE email_id = %s AND pasword = %s"
db.cursor_MS.execute(query,(email,password))
result = db.cursor_MS.fetchone()
db.close_MS()
if result:
return jsonify({"user": "True"})
else:
return jsonify({"user": "False"})
if __name__ == "__main__":
app.run(debug=True)