| import os, shutil, logging |
| from flask import Flask, redirect, url_for, request, render_template, flash |
| from flask_sqlalchemy import SQLAlchemy |
| from flask_migrate import Migrate |
| from dotenv import load_dotenv |
|
|
| |
| load_dotenv() |
|
|
| |
| logging.basicConfig(level=logging.INFO, |
| format="[%(asctime)s] %(levelname)s in %(module)s: %(message)s") |
| logger = logging.getLogger() |
|
|
| |
| app = Flask(__name__) |
| app.logger.handlers = logger.handlers |
| app.logger.setLevel(logger.level) |
|
|
| |
| SRC = os.path.join(os.getcwd(), "instance", "jobs.db") |
| DST = "/tmp/jobs.db" |
| if os.path.exists(SRC) and not os.path.exists(DST): |
| shutil.copy(SRC, DST) |
| |
| elif not os.path.exists(DST): |
| open(DST, "a").close() |
|
|
| |
| app.config["SQLALCHEMY_DATABASE_URI"] = f"sqlite:///{DST}" |
| app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False |
|
|
| app.secret_key = os.getenv("SECRET_KEY", "change_me") |
|
|
| db = SQLAlchemy(app) |
| migrate = Migrate(app, db) |
|
|
|
|
|
|
| |
| class Job(db.Model): |
| id = db.Column(db.Integer, primary_key=True) |
| company = db.Column(db.String(100), nullable=False) |
| position = db.Column(db.String(200), nullable=False) |
| resume_used = db.Column(db.String(200)) |
| date_applied = db.Column(db.String(20)) |
| status = db.Column(db.String(50)) |
| interview_details = db.Column(db.Text) |
| comments = db.Column(db.Text) |
| link = db.Column(db.String(300)) |
| job_description = db.Column(db.Text) |
|
|
| def __repr__(self): |
| return f"<Job {self.company} - {self.position}>" |
|
|
| |
| def convert_excel_serial_dates(): |
| updated = False |
| for job in Job.query.all(): |
| da = job.date_applied |
| if da and da.isdigit(): |
| try: |
| serial = int(da) |
| dt = datetime(1899, 12, 30) + timedelta(days=serial) |
| job.date_applied = dt.strftime("%Y-%m-%d") |
| updated = True |
| except ValueError: |
| pass |
| if updated: |
| db.session.commit() |
| app.logger.info("Converted Excel serial dates to ISO strings") |
|
|
| def generate_interview_plan(job_description: str) -> str: |
| try: |
| client = OpenAI(api_key=OPENAI_API_KEY) |
| resp = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "You are a helpful assistant."}, |
| {"role": "user", "content": |
| f"Create an interview plan for the following job description. " |
| f"Highlight key skills and requirements:\n\n{job_description}" |
| }, |
| ], |
| max_tokens=150 |
| ) |
| return resp.choices[0].message.content.strip() |
| except Exception: |
| app.logger.exception("Error generating interview plan") |
| return "Error generating interview plan. Please try again later." |
|
|
| |
| with app.app_context(): |
| db.create_all() |
| convert_excel_serial_dates() |
| app.logger.info("Database initialized") |
|
|
| |
|
|
| @app.route("/") |
| def index(): |
| return redirect(url_for("jobs")) |
|
|
| @app.route("/jobs", methods=["GET", "POST"]) |
| def jobs(): |
| if request.method == "POST": |
| try: |
| new_job = Job( |
| company=request.form["company"], |
| position=request.form["position"], |
| resume_used=request.form.get("resume_used"), |
| date_applied=request.form.get("date_applied"), |
| status=request.form.get("status"), |
| interview_details=request.form.get("interview_details"), |
| comments=request.form.get("comments"), |
| link=request.form.get("link"), |
| job_description=request.form.get("job_description") |
| ) |
| db.session.add(new_job) |
| db.session.commit() |
| return redirect(url_for("jobs")) |
| except SQLAlchemyError: |
| app.logger.exception("Error adding job") |
| flash("Error adding job. See logs for details.") |
| |
| search = request.args.get("search", "").strip() |
| status_filter = request.args.get("status_filter", "").strip() |
| sort_by = request.args.get("sort", "") |
| direction = request.args.get("direction", "asc") |
|
|
| query = Job.query |
| if search: |
| query = query.filter( |
| or_( |
| Job.company.ilike(f"%{search}%"), |
| Job.position.ilike(f"%{search}%"), |
| Job.status.ilike(f"%{search}%") |
| ) |
| ) |
| if status_filter in {"Applied", "Rejected", "Interviewing"}: |
| query = query.filter_by(status=status_filter) |
| if sort_by in {"position", "date_applied", "status"}: |
| col = getattr(Job, sort_by) |
| query = query.order_by(col.desc() if direction=="desc" else col.asc()) |
|
|
| jobs_list = query.all() |
| counts = { |
| "Applied": Job.query.filter_by(status="Applied").count(), |
| "Rejected": Job.query.filter_by(status="Rejected").count(), |
| "Interviewing": Job.query.filter_by(status="Interviewing").count() |
| } |
| return render_template( |
| "jobs.html", |
| jobs=jobs_list, |
| search=search, |
| status_filter=status_filter, |
| sort_by=sort_by, |
| direction=direction, |
| **counts |
| ) |
|
|
|
|
|
|
| @app.route("/edit_job/<int:job_id>", methods=["GET", "POST"]) |
| def edit_job(job_id): |
| job = Job.query.get_or_404(job_id) |
| if request.method == "POST": |
| try: |
| job.company = request.form["company"] |
| job.position = request.form["position"] |
| job.resume_used = request.form.get("resume_used") |
| job.date_applied = request.form.get("date_applied") |
| job.status = request.form.get("status") |
| job.interview_details = request.form.get("interview_details") |
| job.comments = request.form.get("comments") |
| job.link = request.form.get("link") |
| job.job_description = request.form.get("job_description") |
| db.session.commit() |
| return redirect(url_for("jobs")) |
| except SQLAlchemyError: |
| app.logger.exception("Error updating job") |
| flash("Error updating job. See logs for details.") |
| db.session.rollback() |
| return render_template("jobs.html", jobs=Job.query.all(), edit_job=job) |
|
|
| @app.route("/delete_job/<int:job_id>", methods=["POST"]) |
| def delete_job(job_id): |
| job = Job.query.get_or_404(job_id) |
| try: |
| db.session.delete(job) |
| db.session.commit() |
| except SQLAlchemyError: |
| app.logger.exception("Error deleting job") |
| flash("Error deleting job. See logs for details.") |
| db.session.rollback() |
| return redirect(url_for("jobs")) |
|
|
| @app.route("/api/chat", methods=["POST"]) |
| def api_chat(): |
| data = request.get_json() or {} |
| msg = data.get("message", "").strip() |
| if not msg: |
| return jsonify(response="Please type something!") |
| try: |
| client = OpenAI(api_key=OPENAI_API_KEY) |
| resp = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "You are a helpful career coach."}, |
| {"role": "user", "content": msg} |
| ], |
| max_tokens=150 |
| ) |
| return jsonify(response=resp.choices[0].message.content.strip()) |
| except Exception: |
| app.logger.exception("Chat API error") |
| return jsonify(response="Error during chat. See logs.") |
|
|
| @app.route("/prepme/<int:job_id>") |
| def prepme(job_id): |
| job = Job.query.get_or_404(job_id) |
| resume_content = "" |
| resume_path = os.path.join("uploads", "resume.docx") |
| if os.path.exists(resume_path): |
| from docx import Document |
| doc = Document(resume_path) |
| resume_content = "\n".join(p.text for p in doc.paragraphs) |
| context = f"Job Description:\n{job.job_description}\n\nResume:\n{resume_content}" |
| try: |
| client = OpenAI(api_key=OPENAI_API_KEY) |
| resp = client.chat.completions.create( |
| model="gpt-4o-mini", |
| messages=[ |
| {"role": "system", "content": "You are a career coach."}, |
| {"role": "user", "content": context} |
| ], |
| max_tokens=150 |
| ) |
| initial_response = resp.choices[0].message.content.strip() |
| except Exception: |
| app.logger.exception("PrepMe error") |
| initial_response = "Error generating prep response." |
| return render_template( |
| "prepme.html", |
| job=job, |
| initial_context=context, |
| initial_response=initial_response |
| ) |
|
|
| @app.route("/download") |
| def download_jobs(): |
| search = request.args.get("search", "").strip() |
| if search: |
| jobs_list = Job.query.filter( |
| or_( |
| Job.company.ilike(f"%{search}%"), |
| Job.position.ilike(f"%{search}%"), |
| Job.status.ilike(f"%{search}%") |
| ) |
| ).all() |
| else: |
| jobs_list = Job.query.all() |
|
|
| data = [ |
| { |
| "Company": j.company, |
| "Position": j.position, |
| "Resume Used": j.resume_used, |
| "Date Applied": j.date_applied, |
| "Status": j.status, |
| "Interview Details": j.interview_details, |
| "Comments": j.comments, |
| "Link": j.link, |
| "Job Description": j.job_description |
| } |
| for j in jobs_list |
| ] |
| df = pd.DataFrame(data) |
| return Response( |
| df.to_csv(index=False), |
| mimetype="text/csv", |
| headers={"Content-Disposition": "attachment;filename=jobs.csv"} |
| ) |
|
|
| |
| if __name__ == "__main__": |
| port = int(os.environ.get("PORT", 5000)) |
| app.run(host="0.0.0.0", port=port, debug=False) |
|
|
|
|