Spaces:
Sleeping
Sleeping
| import os | |
| import csv | |
| import uuid | |
| from datetime import datetime | |
| import streamlit as st | |
| from settings import BASE_DIR, NUMBER_OF_TECHNICAL_QUESTIONS, NUMBER_OF_COMMON_QUESTIONS | |
| from core.slack_notifier import SlackNotifier | |
| from core.questions_loader_local import QuestionLoaderLocal | |
| from presentation.layout import Layout | |
| # Slack Webhook | |
| SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"] | |
| layout = Layout() | |
| def call(candidate_id): | |
| # Check if questions are already loaded | |
| if 'questions' not in st.session_state: | |
| # Define questions | |
| questions = QuestionLoaderLocal( | |
| os.path.join(BASE_DIR, "data/questions", st.session_state['technology'].lower(), "questions.csv"), | |
| NUMBER_OF_TECHNICAL_QUESTIONS | |
| ).fetch_questions() | |
| if not questions: | |
| st.markdown("Work in progress!!") | |
| return | |
| common_questions = QuestionLoaderLocal( | |
| os.path.join(BASE_DIR, "data/questions", "common", "questions.csv"), | |
| NUMBER_OF_COMMON_QUESTIONS | |
| ).fetch_questions() | |
| questions.extend(common_questions) | |
| # Store questions in session state to persist across interactions | |
| st.session_state['questions'] = questions | |
| # Retrieve the questions from session state | |
| questions = st.session_state['questions'] | |
| score = 0 | |
| total_questions = len(questions) | |
| answered_all = True | |
| for idx, question in enumerate(questions): | |
| # Section for each question with styling | |
| selected_option = layout.render_test_question(question, idx) | |
| if not selected_option: | |
| answered_all = False | |
| # Checking for correct answer and assigning points based on difficulty | |
| if selected_option == question['answer']: | |
| score += 1 | |
| if st.button("Submit Test", use_container_width=True, type="primary"): | |
| if answered_all: | |
| st.session_state['test_started'] = False | |
| layout.render_completion_message(score, total_questions) | |
| result = (score / total_questions) * 100 | |
| update_candidate_status(candidate_id, result) | |
| SlackNotifier(SLACK_WEBHOOK_URL).send_candidate_info( | |
| st.session_state['name'], | |
| st.session_state['email'], | |
| st.session_state['experience'], | |
| st.session_state['technology'], | |
| f"{result:.2f}%" | |
| ) | |
| else: | |
| # Show a message asking the user to answer all questions | |
| st.warning("Please answer all questions before submitting.") | |
| def generate_url(): | |
| id = uuid.uuid4().hex # Generate a UUID and convert it to a hex string | |
| return f'http://localhost:8501/?candidate_id={id}', id | |
| def add_candidate_id_to_csv(candidate_id): | |
| with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'a') as file: | |
| csv.writer(file).writerow([candidate_id,"","","","","","","In Progress"]) | |
| def read_candidate_csv(): | |
| with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: | |
| reader = csv.DictReader(file) | |
| return list(reader) | |
| def is_candidate_test_completed(candidate_id, candidates): | |
| for row in candidates: | |
| if row["id"].lower() == candidate_id.lower() and row["status"] == "Test Completed": | |
| return True | |
| return False | |
| def is_valid_candidate_id(candidate_id, candidates): | |
| for row in candidates: | |
| if row["id"].lower() == candidate_id: | |
| return True | |
| return False | |
| def update_candidate_status(candidate_id, result): | |
| with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'r') as file: | |
| reader = csv.reader(file) | |
| header = next(reader) # Save the header | |
| rows = [] | |
| for row in reader: | |
| if row[0] == candidate_id: | |
| row[1] = st.session_state['name'] | |
| row[2] = st.session_state['email'] | |
| row[3] = st.session_state['experience'] | |
| row[4] = st.session_state['technology'] | |
| row[5] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| row[6] = result | |
| row[7] = "Test Completed" | |
| rows.append(row) | |
| with open(os.path.join(BASE_DIR, "data", "candidates.csv"), 'w') as file: | |
| writer = csv.writer(file) | |
| writer.writerow(header) # Write the header | |
| writer.writerows(rows) | |
| def main(): | |
| query_params = st.query_params | |
| if not query_params: | |
| st.markdown("Invalid URL") | |
| return | |
| candidate_id = query_params.get("candidate_id", None) | |
| internal = query_params.get("internal", None) | |
| # Set page config with custom title and layout | |
| st.set_page_config(page_title="Candidate MCQ Platform", layout="wide") | |
| layout.render_header() | |
| if internal: | |
| if st.button('Generate URL'): | |
| url, candidate_id = generate_url() | |
| st.write(f"Generated URL: {url}") | |
| add_candidate_id_to_csv(candidate_id) | |
| elif candidate_id: | |
| candidates = read_candidate_csv() | |
| if not is_valid_candidate_id(candidate_id, candidates): | |
| st.markdown("Invalid URL") | |
| return | |
| print("Candidate ID:", candidate_id) | |
| if is_candidate_test_completed(candidate_id, candidates): | |
| st.markdown("Test has been completed! Great job on completing it. Thank you for your effort and dedication.") | |
| return | |
| if 'test_started' not in st.session_state: | |
| st.session_state['test_started'] = False | |
| if not st.session_state['test_started']: | |
| st.title("Welcome to the Candidate Assessment Platform") | |
| name, email, experience, technology, submit = layout.render_signup_form() | |
| if name and email: | |
| st.session_state['name'] = name | |
| st.session_state['email'] = email | |
| st.session_state['experience'] = experience | |
| st.session_state['technology'] = technology | |
| st.session_state['test_wip'] = True | |
| if submit: | |
| st.session_state['test_started'] = True | |
| st.rerun() | |
| layout.render_instructions() | |
| else: | |
| call(candidate_id) | |
| else: | |
| st.markdown("Invalid URL") | |
| if __name__ == "__main__": | |
| main() | |