Spaces:
Paused
Paused
| import os | |
| import sys | |
| # Hugging Face safe cache | |
| os.environ["HF_HOME"] = "/tmp/huggingface" | |
| os.environ["TRANSFORMERS_CACHE"] = "/tmp/huggingface/transformers" | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = "/tmp/huggingface/hub" | |
| # Force Flask instance path to a writable temporary folder | |
| safe_instance_path = "/tmp/flask_instance" | |
| # Create the safe instance path after imports | |
| os.makedirs(safe_instance_path, exist_ok=True) | |
| from flask import Flask, render_template, redirect, url_for, flash, request, jsonify | |
| from flask_login import LoginManager, login_required, current_user | |
| from werkzeug.utils import secure_filename | |
| import sys | |
| from datetime import datetime | |
| # Adjust sys.path for import flexibility | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| sys.path.append(current_dir) | |
| # Import and initialize DB | |
| from backend.models.database import db, Job, Application, init_db | |
| from backend.models.user import User | |
| from backend.routes.auth import auth_bp, handle_resume_upload | |
| from backend.routes.interview_api import interview_api | |
| # Import additional utilities | |
| import re | |
| import json | |
| # ----------------------------------------------------------------------------- | |
| # Chatbot setup | |
| # | |
| # The chatbot feature uses a local vector database (Chroma) to search the | |
| # ``chatbot/chatbot.txt`` knowledge base and then calls the Groq API via the | |
| # OpenAI client. To avoid the expensive model and database initialisation on | |
| # every request, we lazily load the embeddings and collection the first time | |
| # a chat query is processed. Subsequent requests reuse the same global | |
| # objects. See ``init_chatbot()`` and ``get_chatbot_response()`` below for | |
| # implementation details. | |
| # Paths for the chatbot knowledge base and persistent vector store. We | |
| # compute these relative to the current file so that the app can be deployed | |
| # anywhere without needing to change configuration. The ``chroma_db`` | |
| # directory will be created automatically by the Chroma client if it does not | |
| # exist. | |
| import shutil | |
| # Remove any old unwritable Chroma DB path from previous versions | |
| shutil.rmtree("/app/chatbot/chroma_db", ignore_errors=True) | |
| CHATBOT_TXT_PATH = os.path.join(current_dir, 'chatbot', 'chatbot.txt') | |
| CHATBOT_DB_DIR = "/tmp/chroma_db" | |
| # ----------------------------------------------------------------------------- | |
| # Hugging Face model configuration | |
| # | |
| # The original chatbot implementation sent queries to the Groq API via the | |
| # OpenAI client. To remove that dependency we now load a small conversational | |
| # model from Hugging Face. ``HF_MODEL_NAME`` defines which model to use. The | |
| # default value, ``facebook/blenderbot-400M-distill``, provides a good | |
| # balance between quality and resource consumption and is available on | |
| # Hugging Face without requiring authentication. Should you wish to swap to | |
| # another conversational model (e.g. ``microsoft/DialoGPT-medium``), update | |
| # this constant accordingly. The model and tokenizer are loaded lazily in | |
| # ``init_hf_model()`` to avoid impacting application startup time. | |
| HF_MODEL_NAME = "facebook/blenderbot-400M-distill" | |
| # Global Hugging Face model and tokenizer. These variables remain ``None`` | |
| # until ``init_hf_model()`` is called. They are reused across all chatbot | |
| # requests to prevent repeatedly loading the large model into memory. | |
| _hf_model = None | |
| _hf_tokenizer = None | |
| def init_hf_model() -> None: | |
| """Initialise the Hugging Face conversational model and tokenizer.""" | |
| global _hf_model, _hf_tokenizer | |
| if _hf_model is not None and _hf_tokenizer is not None: | |
| return | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer | |
| import torch | |
| model_name = "facebook/blenderbot-400M-distill" | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_name).to(device) | |
| _hf_model = model | |
| _hf_tokenizer = tokenizer | |
| HF_MODEL_NAME = "facebook/blenderbot-400M-distill" | |
| _chatbot_embedder = None | |
| _chatbot_collection = None | |
| def init_chatbot() -> None: | |
| """Initialise the Chroma vector DB with chatbot.txt content.""" | |
| global _chatbot_embedder, _chatbot_collection | |
| if _chatbot_embedder is not None and _chatbot_collection is not None: | |
| return | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from sentence_transformers import SentenceTransformer | |
| import chromadb | |
| from chromadb.config import Settings | |
| import os | |
| os.makedirs(CHATBOT_DB_DIR, exist_ok=True) | |
| with open(CHATBOT_TXT_PATH, encoding="utf-8") as f: | |
| text = f.read() | |
| splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=100) | |
| docs = [doc.strip() for doc in splitter.split_text(text)] | |
| embedder = SentenceTransformer("all-MiniLM-L6-v2") | |
| embeddings = embedder.encode(docs, show_progress_bar=False, batch_size=32) | |
| client = chromadb.Client(Settings(persist_directory=CHATBOT_DB_DIR, anonymized_telemetry=False)) | |
| collection = client.get_or_create_collection("chatbot") | |
| ids = [f"doc_{i}" for i in range(len(docs))] | |
| try: | |
| existing = collection.get(ids=ids[:1]) | |
| if not existing.get("documents"): | |
| raise ValueError("Empty Chroma DB") | |
| except Exception: | |
| collection.add(documents=docs, embeddings=embeddings, ids=ids) | |
| _chatbot_embedder = embedder | |
| _chatbot_collection = collection | |
| def get_chatbot_response(query: str) -> str: | |
| """Generate a reply to the user's query using Chroma + Hugging Face model.""" | |
| init_chatbot() | |
| init_hf_model() | |
| # Safety: prevent empty input | |
| if not query or not query.strip(): | |
| return "Please type a question about the Codingo platform." | |
| embedder = _chatbot_embedder | |
| collection = _chatbot_collection | |
| model = _hf_model | |
| tokenizer = _hf_tokenizer | |
| device = model.device | |
| # Retrieve context from Chroma | |
| query_embedding = embedder.encode([query])[0] | |
| results = collection.query(query_embeddings=[query_embedding], n_results=3) | |
| retrieved_docs = results.get("documents", [[]])[0] if results else [] | |
| context = "\n".join(retrieved_docs) | |
| # System instruction | |
| system_prompt = ( | |
| "You are a helpful assistant for the Codingo website. " | |
| "Only answer questions relevant to the context provided. " | |
| "If unrelated, reply: 'I'm only trained to answer questions about the Codingo platform.'" | |
| ) | |
| prompt = f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:" | |
| # ✅ Safe tokenization with truncation to avoid CUDA indexing issues | |
| inputs = tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=256, # Prevents long inputs | |
| padding=True | |
| ).to(device) | |
| try: | |
| output_ids = model.generate( | |
| **inputs, | |
| max_length=200, | |
| num_beams=3, | |
| do_sample=False, | |
| early_stopping=True | |
| ) | |
| reply = tokenizer.decode(output_ids[0], skip_special_tokens=True) | |
| if reply.startswith(prompt): | |
| reply = reply[len(prompt):] | |
| return reply.strip() | |
| except Exception as e: | |
| return f"Error generating response: {str(e)}" | |
| # Initialize Flask app | |
| app = Flask( | |
| __name__, | |
| static_folder='backend/static', | |
| static_url_path='/static', | |
| template_folder='backend/templates', | |
| instance_path=safe_instance_path # ✅ points to writable '/tmp/flask_instance' | |
| ) | |
| app.config['SECRET_KEY'] = 'saadi' | |
| # ----------------------------------------------------------------------------- | |
| # Cookie configuration for Hugging Face Spaces | |
| # | |
| # When running this app inside an iframe (as is typical on Hugging Face Spaces), | |
| # browsers will drop cookies that have the default SameSite policy of ``Lax``. | |
| # This prevents the Flask session cookie from being stored and means that | |
| # ``login_user()`` will appear to have no effect – the user will be redirected | |
| # back to the home page but remain anonymous. By explicitly setting the | |
| # SameSite policy to ``None`` and enabling the ``Secure`` flag, we allow the | |
| # session and remember cookies to be sent even when the app is embedded in an | |
| # iframe. Without these settings the sign‑up and login flows work locally | |
| # but silently fail in Spaces, causing the "redirect to home page without | |
| # anything" behaviour reported by users. | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
| app.config['SESSION_COOKIE_SECURE'] = True | |
| app.config['REMEMBER_COOKIE_SAMESITE'] = 'None' | |
| app.config['REMEMBER_COOKIE_SECURE'] = True | |
| # Configure the database connection | |
| # Use /tmp directory for database in Hugging Face Spaces | |
| # Note: Data will be lost when the space restarts | |
| app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/codingo.db' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| from flask_wtf.csrf import CSRFProtect | |
| # csrf = CSRFProtect(app) | |
| # Create necessary directories in writable locations | |
| os.makedirs('/tmp/static/audio', exist_ok=True) | |
| os.makedirs('/tmp/temp', exist_ok=True) | |
| # Initialize DB with app | |
| init_db(app) | |
| # Flask-Login setup | |
| login_manager = LoginManager() | |
| login_manager.login_view = 'auth.login' | |
| login_manager.init_app(app) | |
| def load_user(user_id): | |
| return db.session.get(User, int(user_id)) | |
| # Register blueprints | |
| app.register_blueprint(auth_bp) | |
| app.register_blueprint(interview_api, url_prefix="/api") | |
| # Routes (keep your existing routes) | |
| def index(): | |
| return render_template('index.html') | |
| def jobs(): | |
| all_jobs = Job.query.order_by(Job.date_posted.desc()).all() | |
| return render_template('jobs.html', jobs=all_jobs) | |
| def job_detail(job_id): | |
| job = Job.query.get_or_404(job_id) | |
| return render_template('job_detail.html', job=job) | |
| def apply(job_id): | |
| job = Job.query.get_or_404(job_id) | |
| if request.method == 'POST': | |
| # Retrieve the uploaded resume file from the request. The ``name`` | |
| # attribute in the HTML form is ``resume``. | |
| file = request.files.get('resume') | |
| # Use our safe upload helper to store the resume. ``filepath`` | |
| # contains the location where the file was saved so that recruiters | |
| # can download it later. Resume parsing has been disabled, so | |
| # ``features`` will always be an empty dictionary. | |
| features, error, filepath = handle_resume_upload(file) | |
| # If there was an error saving the resume, notify the user. We no | |
| # longer attempt to parse the resume contents, so the manual fields | |
| # collected below will form the entire feature set. | |
| if error: | |
| flash("Resume upload failed. Please try again.", "danger") | |
| return render_template('apply.html', job=job) | |
| # Collect the manually entered fields for skills, experience and education. | |
| # Users can separate entries with commas, semicolons or newlines; we | |
| # normalise the input into lists of trimmed strings. | |
| def parse_entries(raw_value: str): | |
| import re | |
| entries = [] | |
| if raw_value: | |
| # Split on commas, semicolons or newlines | |
| for item in re.split(r'[\n,;]+', raw_value): | |
| item = item.strip() | |
| if item: | |
| entries.append(item) | |
| return entries | |
| skills_input = request.form.get('skills', '') | |
| experience_input = request.form.get('experience', '') | |
| education_input = request.form.get('education', '') | |
| manual_features = { | |
| "skills": parse_entries(skills_input), | |
| "experience": parse_entries(experience_input), | |
| "education": parse_entries(education_input) | |
| } | |
| # Prepare the application record. We ignore the empty ``features`` | |
| # returned by ``handle_resume_upload`` and instead persist the | |
| # manually collected attributes. The extracted_features column | |
| # expects a JSON string; json.dumps handles proper serialization. | |
| application = Application( | |
| job_id=job_id, | |
| user_id=current_user.id, | |
| name=current_user.username, | |
| email=current_user.email, | |
| resume_path=filepath, | |
| extracted_features=json.dumps(manual_features) | |
| ) | |
| db.session.add(application) | |
| db.session.commit() | |
| flash('Your application has been submitted successfully!', 'success') | |
| return redirect(url_for('jobs')) | |
| return render_template('apply.html', job=job) | |
| def my_applications(): | |
| applications = Application.query.filter_by( | |
| user_id=current_user.id | |
| ).order_by(Application.date_applied.desc()).all() | |
| return render_template('my_applications.html', applications=applications) | |
| # ----------------------------------------------------------------------------- | |
| # Chatbot API endpoint | |
| # | |
| # This route receives a JSON payload containing a ``message`` field from the | |
| # front‑end chat widget. It validates the input, invokes the chatbot | |
| # response function and returns a JSON response. Any errors are surfaced | |
| # as a 400 or 500 response with an ``error`` message field. | |
| def chatbot_endpoint(): | |
| data = request.get_json(silent=True) or {} | |
| user_input = str(data.get('message', '')).strip() | |
| if not user_input: | |
| return jsonify({"error": "Empty message"}), 400 | |
| try: | |
| reply = get_chatbot_response(user_input) | |
| return jsonify({"response": reply}) | |
| except Exception as exc: | |
| # Log the exception to stderr for debugging in the console. In a | |
| # production setting you might want to log this to a proper logging | |
| # facility instead. | |
| print(f"Chatbot error: {exc}", file=sys.stderr) | |
| return jsonify({"error": str(exc)}), 500 | |
| def parse_resume(): | |
| file = request.files.get('resume') | |
| features, error, filepath = handle_resume_upload(file) | |
| # If the upload failed, return an error. Parsing is no longer | |
| # supported, so we do not attempt to inspect the resume contents. | |
| if error: | |
| return {"error": "Error processing resume. Please try again."}, 400 | |
| # If no features were extracted (the normal case now), respond with | |
| # empty fields rather than an error. This preserves the API | |
| # contract expected by any front‑end code that might call this | |
| # endpoint. | |
| if not features: | |
| return { | |
| "name": "", | |
| "email": "", | |
| "mobile_number": "", | |
| "skills": [], | |
| "experience": [], | |
| "education": [], | |
| "summary": "" | |
| }, 200 | |
| # Should features contain values (unlikely in the new implementation), | |
| # pass them through to the client. | |
| response = { | |
| "name": features.get('name', ''), | |
| "email": features.get('email', ''), | |
| "mobile_number": features.get('mobile_number', ''), | |
| "skills": features.get('skills', []), | |
| "experience": features.get('experience', []), | |
| "education": features.get('education', []), | |
| "summary": features.get('summary', '') | |
| } | |
| return response, 200 | |
| def interview_page(job_id): | |
| job = Job.query.get_or_404(job_id) | |
| application = Application.query.filter_by( | |
| user_id=current_user.id, | |
| job_id=job_id | |
| ).first() | |
| if not application or not application.extracted_features: | |
| flash("Please apply for this job and upload your resume first.", "warning") | |
| return redirect(url_for('job_detail', job_id=job_id)) | |
| cv_data = json.loads(application.extracted_features) | |
| return render_template("interview.html", job=job, cv=cv_data) | |
| # ----------------------------------------------------------------------------- | |
| # Recruiter job posting route | |
| # | |
| # Authenticated users with a recruiter or admin role can access this page to | |
| # create new job listings. Posted jobs are associated with the current | |
| # recruiter via the ``recruiter_id`` foreign key on the ``Job`` model. | |
| def post_job(): | |
| # Only allow recruiters and admins to post jobs | |
| if current_user.role not in ('recruiter', 'admin'): | |
| flash('You do not have permission to post jobs.', 'warning') | |
| return redirect(url_for('jobs')) | |
| if request.method == 'POST': | |
| # Extract fields from the form | |
| role_title = request.form.get('role', '').strip() | |
| description = request.form.get('description', '').strip() | |
| seniority = request.form.get('seniority', '').strip() | |
| skills_input = request.form.get('skills', '').strip() | |
| company = request.form.get('company', '').strip() | |
| # Validate required fields | |
| errors = [] | |
| if not role_title: | |
| errors.append('Job title is required.') | |
| if not description: | |
| errors.append('Job description is required.') | |
| if not seniority: | |
| errors.append('Seniority level is required.') | |
| if not skills_input: | |
| errors.append('Skills are required.') | |
| if not company: | |
| errors.append('Company name is required.') | |
| if errors: | |
| for err in errors: | |
| flash(err, 'danger') | |
| return render_template('post_job.html') | |
| # Normalise the skills input into a JSON encoded list. Users can | |
| # separate entries with commas, semicolons or newlines. | |
| skills_list = [s.strip() for s in re.split(r'[\n,;]+', skills_input) if s.strip()] | |
| skills_json = json.dumps(skills_list) | |
| # Create and persist the new job | |
| new_job = Job( | |
| role=role_title, | |
| description=description, | |
| seniority=seniority, | |
| skills=skills_json, | |
| company=company, | |
| recruiter_id=current_user.id | |
| ) | |
| db.session.add(new_job) | |
| db.session.commit() | |
| flash('Job posted successfully!', 'success') | |
| return redirect(url_for('jobs')) | |
| # GET request returns the form | |
| return render_template('post_job.html') | |
| # ----------------------------------------------------------------------------- | |
| # Recruiter dashboard route | |
| # | |
| # Displays a list of candidates who applied to jobs posted by the current | |
| # recruiter. Candidates are sorted by a simple skill match score computed | |
| # against the job requirements. A placeholder download button is provided | |
| # for future PDF report functionality. | |
| def dashboard(): | |
| # Only recruiters and admins can view the dashboard | |
| if current_user.role not in ('recruiter', 'admin'): | |
| flash('You do not have permission to access the dashboard.', 'warning') | |
| return redirect(url_for('index')) | |
| # Fetch jobs posted by the current recruiter | |
| posted_jobs = Job.query.filter_by(recruiter_id=current_user.id).all() | |
| job_ids = [job.id for job in posted_jobs] | |
| candidates_with_scores = [] | |
| if job_ids: | |
| # Fetch applications associated with these job IDs | |
| candidate_apps = Application.query.filter(Application.job_id.in_(job_ids)).all() | |
| # Helper to compute a match score based on skills overlap | |
| def compute_score(application): | |
| try: | |
| # Extract candidate skills from stored JSON | |
| candidate_features = json.loads(application.extracted_features) if application.extracted_features else {} | |
| candidate_skills = candidate_features.get('skills', []) | |
| # Retrieve the job's required skills and parse from JSON | |
| job_skills = json.loads(application.job.skills) if application.job and application.job.skills else [] | |
| if not job_skills: | |
| return ('Medium', 2) # Default when job specifies no skills | |
| # Compute case‑insensitive intersection | |
| candidate_set = {s.lower() for s in candidate_skills} | |
| job_set = {s.lower() for s in job_skills} | |
| common = candidate_set & job_set | |
| ratio = len(common) / len(job_set) if job_set else 0 | |
| # Map ratio to qualitative score | |
| if ratio >= 0.75: | |
| return ('Excellent', 4) | |
| elif ratio >= 0.5: | |
| return ('Good', 3) | |
| elif ratio >= 0.25: | |
| return ('Medium', 2) | |
| else: | |
| return ('Poor', 1) | |
| except Exception: | |
| return ('Medium', 2) | |
| # Build a list of candidate applications with computed scores | |
| for app_record in candidate_apps: | |
| score_label, score_value = compute_score(app_record) | |
| candidates_with_scores.append({ | |
| 'application': app_record, | |
| 'score_label': score_label, | |
| 'score_value': score_value | |
| }) | |
| # Sort candidates from highest to lowest score | |
| candidates_with_scores.sort(key=lambda item: item['score_value'], reverse=True) | |
| return render_template('dashboard.html', candidates=candidates_with_scores) | |
| if __name__ == '__main__': | |
| print("Starting Codingo application...") | |
| with app.app_context(): | |
| db.create_all() | |
| # Use port from environment or default to 7860 | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(debug=True, host='0.0.0.0', port=port) |