Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| import pandas as pd | |
| from datetime import datetime | |
| class ResumeDatabase: | |
| def __init__(self, db_path='resumes.db'): | |
| self.db_path = db_path | |
| self.create_tables() | |
| def create_tables(self): | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| c.execute('''CREATE TABLE IF NOT EXISTS resumes | |
| (id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT, | |
| email TEXT, | |
| phone TEXT, | |
| raw_text TEXT, | |
| analysis_json TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') | |
| conn.commit() | |
| conn.close() | |
| def save_analysis(self, analysis_result, raw_text): | |
| conn = sqlite3.connect(self.db_path) | |
| c = conn.cursor() | |
| c.execute('''INSERT INTO resumes (name, email, phone, raw_text, analysis_json) | |
| VALUES (?, ?, ?, ?, ?)''', | |
| (analysis_result.get('name', 'Not found'), | |
| analysis_result.get('email', 'Not found'), | |
| analysis_result.get('phone', 'Not found'), | |
| raw_text, | |
| json.dumps(analysis_result))) | |
| conn.commit() | |
| conn.close() | |
| def calculate_score(self, analysis): | |
| """Calculate a comprehensive score based on resume analysis""" | |
| try: | |
| # Initialize scores | |
| education_score = 0 | |
| experience_score = 0 | |
| technical_score = 0 | |
| project_score = 0 | |
| impact_score = 0 | |
| role_specific_score = 0 | |
| # Education Score (max 20 points) | |
| edu_level = str(analysis.get('education_level', '')).lower() | |
| if edu_level: | |
| if 'phd' in edu_level or 'doctorate' in edu_level: | |
| education_score += 20 | |
| elif 'master' in edu_level or 'ms' in edu_level or 'mtech' in edu_level: | |
| education_score += 18 | |
| elif 'bachelor' in edu_level or 'bs' in edu_level or 'btech' in edu_level: | |
| education_score += 15 | |
| else: | |
| education_score += 10 | |
| # Add points for CGPA if available | |
| cgpa = analysis.get('cgpa', 'Not found') | |
| if isinstance(cgpa, (int, float)): | |
| if cgpa >= 3.5: # Assuming 4.0 scale | |
| education_score = min(20, education_score + 2) | |
| # Experience Score (max 20 points) | |
| years_exp = analysis.get('years_experience', 0) | |
| if isinstance(years_exp, (int, float)): | |
| experience_score = min(20, years_exp * 4) # 5 years for max score | |
| elif isinstance(years_exp, str) and years_exp.replace('.', '').isdigit(): | |
| experience_score = min(20, float(years_exp) * 4) | |
| # Technical Score (max 20 points) | |
| tech_skills = { | |
| 'programming_languages': analysis.get('programming_languages', []), | |
| 'technical_skills': analysis.get('technical_skills', []), | |
| 'ml_frameworks': analysis.get('ml_frameworks', []), | |
| 'databases': analysis.get('databases', []), | |
| 'cloud_platforms': analysis.get('cloud_platforms', []) | |
| } | |
| total_skills = sum(len(skills) for skills in tech_skills.values()) | |
| technical_score = min(20, total_skills * 2) | |
| # Project Score (max 15 points) | |
| projects = len(analysis.get('projects', [])) | |
| research_exp = 1 if analysis.get('research_experience') else 0 | |
| publications = len(analysis.get('publications', [])) | |
| project_score = min(15, projects * 2 + research_exp * 3 + publications * 2) | |
| # Impact Score (max 15 points) | |
| leadership = 1 if analysis.get('leadership_experience') else 0 | |
| team_size = analysis.get('team_size', 0) | |
| if isinstance(team_size, str): | |
| try: | |
| team_size = int(''.join(filter(str.isdigit, team_size))) | |
| except: | |
| team_size = 0 | |
| certifications = len(analysis.get('certifications', [])) | |
| awards = len(analysis.get('awards', [])) | |
| impact_score = min(15, leadership * 5 + min(5, team_size/2) + min(5, certifications * 2 + awards)) | |
| # Role Specific Score (max 10 points) | |
| ds_skills = len(analysis.get('ml_frameworks', [])) + len(analysis.get('deep_learning', [])) + \ | |
| len(analysis.get('nlp_skills', [])) + len(analysis.get('computer_vision', [])) | |
| de_skills = len(analysis.get('etl_tools', [])) + len(analysis.get('data_warehousing', [])) + \ | |
| len(analysis.get('orchestration_tools', [])) + len(analysis.get('streaming_tech', [])) | |
| role_specific_score = min(10, max(ds_skills, de_skills)) | |
| # Calculate total score | |
| total_score = education_score + experience_score + technical_score + \ | |
| project_score + impact_score + role_specific_score | |
| return { | |
| 'total_score': total_score, | |
| 'education_score': education_score, | |
| 'experience_score': experience_score, | |
| 'technical_score': technical_score, | |
| 'project_score': project_score, | |
| 'impact_score': impact_score, | |
| 'role_specific_score': role_specific_score | |
| } | |
| except Exception as e: | |
| print(f"Error calculating score: {str(e)}") | |
| return { | |
| 'total_score': 0, | |
| 'education_score': 0, | |
| 'experience_score': 0, | |
| 'technical_score': 0, | |
| 'project_score': 0, | |
| 'impact_score': 0, | |
| 'role_specific_score': 0 | |
| } | |
| def get_statistics(self): | |
| """Get statistics of analyzed resumes""" | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query("SELECT analysis_json FROM resumes", conn) | |
| conn.close() | |
| if df.empty: | |
| return { | |
| 'total_resumes': 0, | |
| 'avg_work_experience': 0, | |
| 'education_levels': {}, | |
| 'major_distribution': {}, | |
| 'top_programming_languages': {}, | |
| 'top_technical_skills': {}, | |
| 'top_ml_frameworks': {}, | |
| 'top_visualization_tools': {}, | |
| 'top_databases': {}, | |
| 'top_etl_tools': {}, | |
| 'top_streaming_tech': {}, | |
| 'top_cloud_platforms': {}, | |
| 'top_certifications': {}, | |
| 'university_distribution': {} | |
| } | |
| analyses = [json.loads(x) for x in df['analysis_json']] | |
| # Calculate statistics | |
| stats = { | |
| 'total_resumes': len(analyses), | |
| 'avg_work_experience': 0, | |
| 'education_levels': {}, | |
| 'major_distribution': {}, | |
| 'top_programming_languages': {}, | |
| 'top_technical_skills': {}, | |
| 'top_ml_frameworks': {}, | |
| 'top_visualization_tools': {}, | |
| 'top_databases': {}, | |
| 'top_etl_tools': {}, | |
| 'top_streaming_tech': {}, | |
| 'top_cloud_platforms': {}, | |
| 'top_certifications': {}, | |
| 'university_distribution': {} | |
| } | |
| # Calculate averages and distributions | |
| total_exp = 0 | |
| valid_exp = 0 | |
| for analysis in analyses: | |
| # Work experience | |
| exp = analysis.get('years_experience', 0) | |
| if isinstance(exp, (int, float)) or (isinstance(exp, str) and exp.replace('.', '').isdigit()): | |
| try: | |
| exp = float(exp) | |
| total_exp += exp | |
| valid_exp += 1 | |
| except: | |
| pass | |
| # Education level | |
| edu = analysis.get('education_level', 'Not specified') | |
| stats['education_levels'][edu] = stats['education_levels'].get(edu, 0) + 1 | |
| # Major | |
| major = analysis.get('major', 'Not specified') | |
| stats['major_distribution'][major] = stats['major_distribution'].get(major, 0) + 1 | |
| # University | |
| uni = analysis.get('university', 'Not specified') | |
| stats['university_distribution'][uni] = stats['university_distribution'].get(uni, 0) + 1 | |
| # Technical skills distributions | |
| for lang in analysis.get('programming_languages', []): | |
| stats['top_programming_languages'][lang] = stats['top_programming_languages'].get(lang, 0) + 1 | |
| for skill in analysis.get('technical_skills', []): | |
| stats['top_technical_skills'][skill] = stats['top_technical_skills'].get(skill, 0) + 1 | |
| for framework in analysis.get('ml_frameworks', []): | |
| stats['top_ml_frameworks'][framework] = stats['top_ml_frameworks'].get(framework, 0) + 1 | |
| for tool in analysis.get('visualization_tools', []): | |
| stats['top_visualization_tools'][tool] = stats['top_visualization_tools'].get(tool, 0) + 1 | |
| for db in analysis.get('databases', []): | |
| stats['top_databases'][db] = stats['top_databases'].get(db, 0) + 1 | |
| for tool in analysis.get('etl_tools', []): | |
| stats['top_etl_tools'][tool] = stats['top_etl_tools'].get(tool, 0) + 1 | |
| for tech in analysis.get('streaming_tech', []): | |
| stats['top_streaming_tech'][tech] = stats['top_streaming_tech'].get(tech, 0) + 1 | |
| for platform in analysis.get('cloud_platforms', []): | |
| stats['top_cloud_platforms'][platform] = stats['top_cloud_platforms'].get(platform, 0) + 1 | |
| for cert in analysis.get('certifications', []): | |
| stats['top_certifications'][cert] = stats['top_certifications'].get(cert, 0) + 1 | |
| # Calculate average work experience | |
| stats['avg_work_experience'] = total_exp / valid_exp if valid_exp > 0 else 0 | |
| # Sort and limit distributions | |
| for key in stats: | |
| if isinstance(stats[key], dict): | |
| stats[key] = dict(sorted(stats[key].items(), key=lambda x: x[1], reverse=True)[:10]) | |
| return stats | |
| def get_candidate_rankings(self, role_type='both', min_score=50): | |
| """Get ranked list of candidates based on their scores""" | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query("SELECT analysis_json FROM resumes", conn) | |
| conn.close() | |
| if df.empty: | |
| return [] | |
| rankings = [] | |
| for analysis_json in df['analysis_json']: | |
| analysis = json.loads(analysis_json) | |
| scores = self.calculate_score(analysis) | |
| if scores['total_score'] >= min_score: | |
| candidate = { | |
| 'name': analysis.get('name', 'Not found'), | |
| 'email': analysis.get('email', 'Not found'), | |
| 'years_experience': analysis.get('years_experience', 'Not found'), | |
| 'education_level': analysis.get('education_level', 'Not found'), | |
| 'key_skills': ( | |
| analysis.get('programming_languages', []) + | |
| analysis.get('technical_skills', []) | |
| )[:5], # Top 5 skills | |
| **scores | |
| } | |
| # Filter based on role type | |
| if role_type == 'data_science': | |
| ds_score = len(analysis.get('ml_frameworks', [])) + \ | |
| len(analysis.get('deep_learning', [])) + \ | |
| len(analysis.get('nlp_skills', [])) + \ | |
| len(analysis.get('computer_vision', [])) | |
| if ds_score > 0: | |
| rankings.append(candidate) | |
| elif role_type == 'data_engineering': | |
| de_score = len(analysis.get('etl_tools', [])) + \ | |
| len(analysis.get('data_warehousing', [])) + \ | |
| len(analysis.get('orchestration_tools', [])) + \ | |
| len(analysis.get('streaming_tech', [])) | |
| if de_score > 0: | |
| rankings.append(candidate) | |
| else: # both | |
| rankings.append(candidate) | |
| # Sort by total score | |
| rankings.sort(key=lambda x: x['total_score'], reverse=True) | |
| return rankings | |
| def export_to_csv(self): | |
| """Export analyses to CSV""" | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query("SELECT * FROM resumes", conn) | |
| conn.close() | |
| csv_path = f"resume_analyses_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
| df.to_csv(csv_path, index=False) | |
| return csv_path | |
| def export_to_json(self): | |
| """Export analyses to JSON""" | |
| conn = sqlite3.connect(self.db_path) | |
| df = pd.read_sql_query("SELECT * FROM resumes", conn) | |
| conn.close() | |
| json_path = f"resume_analyses_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
| df.to_json(json_path, orient='records') | |
| return json_path |