# data_aggregator.py (Complete Version with Resume Parsing) import json import os import re import time from datetime import datetime import logging # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger('data_aggregator') # Import the scraper functions and classes from github_scraper import get_github_profile from codeforces_scraper import get_codeforces_profile from leetcode_scraper import get_leetcode_profile from ipu_scraper import StudentScraper # Import our resume parser from resume_parser import parse_resume # --- Configuration --- # Define the list of students with resume paths STUDENTS_TO_FETCH = [ { "enrollment_no": "35214811922", "leetcode_user": "akshitsharma7093", "github_user": "akshit7093", "codeforces_user": "akshit7093", "resume_path": "resume.pdf" # REQUIRED FIELD }, { "enrollment_no": "35314811922", "leetcode_user": "Nikita_06211", "github_user": "Nikita06211", "codeforces_user": "Nikita06211", "resume_path": "Nikita_Bansal.pdf" }, { "enrollment_no": "05414811922", "leetcode_user": "Vineet_Goyal10", "github_user": "Vineetg2003", "codeforces_user": "Nikita06211", "resume_path": "Vineet_Goyal_Resume (4).pdf" } # Add more student dictionaries here ] OUTPUT_FILE = 'final_cleaned_student_data.json' # --- Advanced Cleaning and Filtering Functions --- def clean_ipu_data(raw_data): """Transforms raw IPU academic data into a final, clean format.""" if not raw_data or raw_data.get("status") != "success": logger.warning("IPU data is empty or failed") return None overall = raw_data["academic_summary"]["overall_performance"] programme = raw_data["programme_info"] cleaned = { "institute": programme.get("institute", {}).get("insti_name"), "degree": programme.get("course", {}).get("course_name"), "branch": programme.get("branch", {}).get("branch_name"), "overall_cgpa": round(overall.get("cgpa", 0), 2), "overall_percentage": round(overall.get("percentage", 0), 2), "semester_performance": [] } for sem_result in raw_data["academic_summary"]["semester_results"]: if sem_result.get("sgpa", 0) > 0: sem_data = { "semester": sem_result.get("result_no"), "sgpa": round(sem_result.get("sgpa", 0), 2), # Rounding SGPA "percentage": round(sem_result.get("percentage", 0), 2), "subjects": [ { "subject": sub.get("subject_name"), "grade": sub.get("grade"), "marks": sub.get("total_marks") } for sub in sem_result.get("subject_results", []) ] } cleaned["semester_performance"].append(sem_data) return cleaned def clean_leetcode_data(raw_data): """Cleans and filters LeetCode data, summarizing top skills.""" if not raw_data: logger.warning("LeetCode data is empty") return None # Flatten all skills into a single list to find the absolute top skills all_skills = [] for category in ["skillsAdvanced", "skillsIntermediate", "skillsFundamental"]: if raw_data.get(category): all_skills.extend(raw_data[category]) # Sort by problems solved and take the top 15 top_skills_sorted = sorted(all_skills, key=lambda x: x.get("problemsSolved", 0), reverse=True) top_skills_summary = [{"skill": s.get("tagName"), "solved": s.get("problemsSolved")} for s in top_skills_sorted[:15]] return { "username": raw_data.get("username"), "ranking": raw_data.get("ranking"), "totalSolved": raw_data.get("totalSolved"), "acceptanceRate": raw_data.get("acceptanceRate"), "problemsByDifficulty": raw_data.get("problemsSolvedByDifficulty"), "primaryLanguage": raw_data.get("languageStats", [{}])[0], "topSkillsSummary": top_skills_summary, # New summarized field "activity": { "currentStreak": raw_data.get("currentStreak"), "totalActiveDays": raw_data.get("totalActiveDays"), }, "recentSubmissions": [ { "title": sub.get("title"), "timestamp": datetime.fromtimestamp(int(sub.get("timestamp"))).strftime('%Y-%m-%d') } for sub in raw_data.get("recentAcSubmissions", []) ] } def clean_github_data(raw_data): """Summarizes GitHub data, cleans README, and fixes pinned repo logic.""" if not raw_data: logger.warning("GitHub data is empty") return None def summarize_repo(repo): # Create a dictionary only with non-null values summary = {k: v for k, v in { "name": repo.get("name"), "description": repo.get("description"), "language": repo.get("language"), "stars": repo.get("stargazers_count"), "forks": repo.get("forks_count"), "last_pushed": repo.get("pushed_at", "")[:10] # Truncate to date }.items() if v is not None} return summary def summarize_pinned_repo(repo): # Pinned repo scraper uses a different key for the name ('repo') summary = {k: v for k, v in { "name": repo.get("repo", "").strip(), # Clean whitespace "description": repo.get("description"), "language": repo.get("language"), "stars": int(repo.get("stars", 0)), "forks": repo.get("forks") }.items() if v is not None and v != ''} return summary # Clean the README by removing HTML/Markdown tags, image links, etc. readme_text = raw_data.get("user_readme", "") # Remove HTML tags readme_text = re.sub(r'<[^>]+>', '', readme_text) # Remove Markdown images and badges readme_text = re.sub(r'!\[[^\]]*\]\([^\)]*\)', '', readme_text) # Remove standalone links but keep link text readme_text = re.sub(r'\[([^\]]+)\]\([^\)]*\)', r'\1', readme_text) # Clean up excessive newlines readme_text = re.sub(r'\n\s*\n', '\n', readme_text).strip() return { "username": raw_data.get("login"), "name": raw_data.get("name"), "bio": raw_data.get("bio", "").strip() if raw_data.get("bio") else None, "stats": { "public_repos": raw_data.get("public_repos"), "followers": raw_data.get("followers"), "following": raw_data.get("following") }, "cleaned_profile_readme": readme_text, "pinned_repositories": [summarize_pinned_repo(r) for r in raw_data.get("pinned_repos", [])], "top_repositories": [summarize_repo(r) for r in raw_data.get("repos", [])] } def clean_codeforces_data(raw_data): """Cleans Codeforces data, focusing on performance and simplifying contest history.""" if not raw_data: logger.warning("Codeforces data is empty") return None profile = raw_data.get("profile", {}) cleaned_contests = [] for contest in raw_data.get("contests", []): cleaned_contests.append({ "contestName": contest.get("contestName"), "rank": contest.get("rank"), "oldRating": contest.get("oldRating"), "newRating": contest.get("newRating"), "ratingChange": contest.get("newRating", 0) - contest.get("oldRating", 0) }) return { "username": profile.get("handle"), "rating": profile.get("rating"), "maxRating": profile.get("maxRating"), "rank": profile.get("rank"), "maxRank": profile.get("maxRank"), "contest_history": cleaned_contests, "problem_solving_stats": raw_data.get("solved_stats"), "submissions": [ { "problem_name": sub.get("problem", {}).get("name"), "problem_tags": sub.get("problem", {}).get("tags"), "problem_rating": sub.get("problem", {}).get("rating"), "language": sub.get("programmingLanguage"), "verdict": sub.get("verdict") } for sub in raw_data.get("submissions", []) ] } def clean_resume_data(raw_resume_data): """Processes raw resume data into final structured format""" if not raw_resume_data: logger.warning("Resume data is empty") return None # Extract only professional hyperlinks (filter out common non-professional links) professional_links = [ url for url in raw_resume_data["hyperlinks"] if not re.search(r'(facebook|instagram|twitter|linkedin\.com\/in\/[^\/]+\/(detail|overlay)|youtube)', url, re.I) ] # Extract skills from resume text (simplified approach) skills = [] skill_keywords = ['python', 'java', 'javascript', 'react', 'node', 'angular', 'vue', 'sql', 'mongodb', 'aws', 'docker', 'kubernetes', 'git', 'c++', 'c#', 'typescript', 'html', 'css', 'spring', 'django', 'flask', 'tensorflow', 'pytorch', 'dsa', 'data structures', 'algorithms', 'problem solving', 'full stack', 'backend', 'frontend', 'mobile', 'android', 'ios', 'flutter', 'react native'] resume_text = raw_resume_data["full_text"].lower() for keyword in skill_keywords: if keyword in resume_text and keyword not in skills: skills.append(keyword.capitalize()) # Identify missing elements (simplified approach) missing_elements = [] if 'projects' not in resume_text and 'project' not in resume_text: missing_elements.append("Projects section") if 'internship' not in resume_text and 'experience' not in resume_text and 'work' not in resume_text: missing_elements.append("Work experience") if 'education' not in resume_text and 'degree' not in resume_text: missing_elements.append("Education details") if len(skills) < 3: missing_elements.append("Technical skills listing") # Clean summary text (remove excessive whitespace and special characters) cleaned_summary = re.sub(r'\s{2,}', ' ', raw_resume_data["summary"]) cleaned_summary = re.sub(r'[^\w\s.,;:!?()\-]', '', cleaned_summary) return { "full_text": raw_resume_data["full_text"], "full_text_preview": raw_resume_data["full_text"][:500] + "..." if len(raw_resume_data["full_text"]) > 500 else raw_resume_data["full_text"], "professional_links": professional_links, "skills_summary": cleaned_summary, "key_skills": skills, "total_hyperlinks": len(raw_resume_data["hyperlinks"]), "professional_link_count": len(professional_links), "missing_elements": missing_elements } # --- Main Execution Logic --- def main(): """Main function to fetch, clean, aggregate, and save student data.""" ipu_scraper = StudentScraper(encryption_key="Qm9sRG9OYVphcmEK") all_student_data = {} # Load existing data if output file exists if os.path.exists(OUTPUT_FILE): try: with open(OUTPUT_FILE, 'r', encoding='utf-8') as f: all_student_data = json.load(f) logger.info(f"Loaded existing data for {len(all_student_data)} student(s) from '{OUTPUT_FILE}'.") except Exception as e: logger.warning(f"Could not load existing data: {e}. Starting fresh.") all_student_data = {} else: logger.info(f"No existing output file found. Starting fresh.") # Get set of already processed enrollment numbers existing_enrollments = set(all_student_data.keys()) # Filter STUDENTS_TO_FETCH to only include unprocessed enrollments students_to_process = [ student for student in STUDENTS_TO_FETCH if student.get("enrollment_no") not in existing_enrollments ] if not students_to_process: logger.info("āœ… No new students to process. All enrollments already exist.") return logger.info(f"Starting data aggregation for {len(students_to_process)} new student(s)...") for student in students_to_process: enrollment_no = student.get("enrollment_no") if not enrollment_no: logger.warning("Skipping entry due to missing enrollment number.") continue logger.info(f"\nProcessing data for Enrollment No: {enrollment_no}") student_record = { "name": None, "enrollment_no": enrollment_no, "academic_profile": None, "coding_profiles": { "leetcode": None, "github": None, "codeforces": None, }, "resume": None, "errors": {} } # Fetch, Clean, and Assign Data try: logger.info(" - Processing IPU data...") raw_ipu_data = ipu_scraper.get_student_data(enrollment_no) student_record["academic_profile"] = clean_ipu_data(raw_ipu_data) if student_record["academic_profile"]: student_record["name"] = raw_ipu_data.get("student_info", {}).get("name") logger.info(" > IPU data processed successfully.") else: raise Exception("Failed to process IPU data.") except Exception as e: student_record["errors"]["ipu"] = str(e) logger.error(f" > IPU processing FAILED: {e}") if student.get("leetcode_user"): try: logger.info(f" - Processing LeetCode data for '{student['leetcode_user']}'...") raw_leetcode_result = get_leetcode_profile(student["leetcode_user"]) if raw_leetcode_result.get("success"): student_record["coding_profiles"]["leetcode"] = clean_leetcode_data(raw_leetcode_result["data"]) logger.info(" > LeetCode data processed successfully.") else: raise Exception(raw_leetcode_result.get("error", "Unknown error")) except Exception as e: student_record["errors"]["leetcode"] = str(e) logger.error(f" > LeetCode processing FAILED: {e}") if student.get("github_user"): try: logger.info(f" - Processing GitHub data for '{student['github_user']}'...") raw_github_result = get_github_profile(student["github_user"]) if raw_github_result.get("success"): student_record["coding_profiles"]["github"] = clean_github_data(raw_github_result["data"]) logger.info(" > GitHub data processed successfully.") else: raise Exception(raw_github_result.get("error", "Unknown error")) except Exception as e: student_record["errors"]["github"] = str(e) logger.error(f" > GitHub processing FAILED: {e}") if student.get("codeforces_user"): try: logger.info(f" - Processing Codeforces data for '{student['codeforces_user']}'...") raw_codeforces_result = get_codeforces_profile(student["codeforces_user"]) if raw_codeforces_result.get("success"): student_record["coding_profiles"]["codeforces"] = clean_codeforces_data(raw_codeforces_result["data"]) logger.info(" > Codeforces data processed successfully.") else: raise Exception(raw_codeforces_result.get("error", "Unknown error")) except Exception as e: student_record["errors"]["codeforces"] = str(e) logger.error(f" > Codeforces processing FAILED: {e}") # Process resume data if student.get("resume_path"): try: logger.info(f" - Processing resume from '{student['resume_path']}'...") if not os.path.exists(student["resume_path"]): raise FileNotFoundError(f"Resume file not found at {student['resume_path']}") raw_resume_data = parse_resume(student["resume_path"]) student_record["resume"] = clean_resume_data(raw_resume_data) logger.info(" > Resume data processed successfully.") except Exception as e: student_record["errors"]["resume"] = str(e) logger.error(f" > Resume processing FAILED: {e}") all_student_data[enrollment_no] = student_record time.sleep(1) # Respectful delay # Save merged data (existing + new) try: with open(OUTPUT_FILE, 'w', encoding='utf-8') as f: json.dump(all_student_data, f, indent=4, ensure_ascii=False) logger.info(f"\nāœ… Final data saved to '{OUTPUT_FILE}' ({len(all_student_data)} total students).") except Exception as e: logger.error(f"\nāŒ Error saving final JSON file: {e}") if __name__ == "__main__": main()