Spaces:
Runtime error
Runtime error
| """ | |
| Multi-format document processor for resumes and cover letters | |
| Supports: Word, PDF, Text, PowerPoint for both input and output | |
| """ | |
| import os | |
| import io | |
| import logging | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, List, Tuple | |
| from datetime import datetime | |
| import json | |
| import re | |
| import zipfile | |
| # Document processing libraries | |
| try: | |
| from docx import Document | |
| from docx.shared import Pt, Inches, RGBColor | |
| from docx.enum.text import WD_ALIGN_PARAGRAPH | |
| DOCX_AVAILABLE = True | |
| except ImportError: | |
| DOCX_AVAILABLE = False | |
| try: | |
| from pptx import Presentation | |
| from pptx.util import Inches, Pt | |
| from pptx.enum.text import PP_ALIGN | |
| PPTX_AVAILABLE = True | |
| except ImportError: | |
| PPTX_AVAILABLE = False | |
| try: | |
| import PyPDF2 | |
| from PyPDF2 import PdfReader | |
| PDF_READ_AVAILABLE = True | |
| except ImportError: | |
| PDF_READ_AVAILABLE = False | |
| try: | |
| from reportlab.lib.pagesizes import letter, A4 | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib import colors | |
| from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY | |
| PDF_WRITE_AVAILABLE = True | |
| except ImportError: | |
| PDF_WRITE_AVAILABLE = False | |
| logger = logging.getLogger(__name__) | |
| class DocumentProcessor: | |
| """Handles multiple document formats for resume/CV processing""" | |
| def __init__(self): | |
| self.supported_input_formats = [] | |
| self.supported_output_formats = ['txt'] # Text always available | |
| if DOCX_AVAILABLE: | |
| self.supported_input_formats.append('docx') | |
| self.supported_output_formats.append('docx') | |
| if PPTX_AVAILABLE: | |
| self.supported_input_formats.append('pptx') | |
| self.supported_output_formats.append('pptx') | |
| if PDF_READ_AVAILABLE: | |
| self.supported_input_formats.append('pdf') | |
| if PDF_WRITE_AVAILABLE: | |
| self.supported_output_formats.append('pdf') | |
| logger.info(f"Document processor initialized - Input formats: {self.supported_input_formats}, Output formats: {self.supported_output_formats}") | |
| def extract_from_file(self, file_path: str) -> Dict[str, Any]: | |
| """Extract structured data from uploaded resume file""" | |
| file_ext = Path(file_path).suffix.lower().replace('.', '') | |
| if file_ext == 'docx': | |
| if DOCX_AVAILABLE: | |
| return self._extract_from_docx(file_path) | |
| else: | |
| # Fallback: parse DOCX as zip and extract XML text | |
| return self._extract_docx_zip_fallback(file_path) | |
| elif file_ext == 'pdf': | |
| if PDF_READ_AVAILABLE: | |
| return self._extract_from_pdf(file_path) | |
| else: | |
| logger.warning("PDF reader not available; returning empty parse") | |
| return {"full_text": "", "contact": {}, "summary": "", "experience": [], "education": [], "skills": []} | |
| elif file_ext == 'pptx': | |
| if PPTX_AVAILABLE: | |
| return self._extract_from_pptx(file_path) | |
| else: | |
| logger.warning("PPTX reader not available; returning empty parse") | |
| return {"full_text": "", "contact": {}, "summary": "", "experience": [], "education": [], "skills": []} | |
| elif file_ext in ['txt', 'text']: | |
| return self._extract_from_text(file_path) | |
| else: | |
| logger.warning(f"Unsupported file format: {file_ext}") | |
| # Don't try to read binary formats as text; return minimal structure | |
| return {"full_text": "", "contact": {}, "summary": "", "experience": [], "education": [], "skills": []} | |
| def _extract_from_docx(self, file_path: str) -> Dict[str, Any]: | |
| """Extract data from Word document""" | |
| try: | |
| doc = Document(file_path) | |
| full_text = [] | |
| for paragraph in doc.paragraphs: | |
| if paragraph.text.strip(): | |
| full_text.append(paragraph.text.strip()) | |
| # Also extract from tables | |
| for table in doc.tables: | |
| for row in table.rows: | |
| for cell in row.cells: | |
| if cell.text.strip(): | |
| full_text.append(cell.text.strip()) | |
| text_content = '\n'.join(full_text) | |
| return self._parse_resume_text(text_content) | |
| except Exception as e: | |
| logger.error(f"Error extracting from DOCX: {e}") | |
| # Attempt zip fallback | |
| try: | |
| return self._extract_docx_zip_fallback(file_path) | |
| except Exception: | |
| return {} | |
| def _extract_docx_zip_fallback(self, file_path: str) -> Dict[str, Any]: | |
| """Extract text from a DOCX by reading the zipped XML (no python-docx).""" | |
| try: | |
| with zipfile.ZipFile(file_path) as z: | |
| with z.open('word/document.xml') as f: | |
| xml_bytes = f.read() | |
| # crude tag strip | |
| xml_text = xml_bytes.decode('utf-8', errors='ignore') | |
| # Replace common tags with newlines/spaces | |
| xml_text = re.sub(r'<w:p[^>]*>', '\n', xml_text) | |
| xml_text = re.sub(r'<[^>]+>', ' ', xml_text) | |
| text_content = re.sub(r'\s+', ' ', xml_text) | |
| return self._parse_resume_text(text_content) | |
| except Exception as e: | |
| logger.error(f"DOCX zip fallback failed: {e}") | |
| return {} | |
| def _extract_from_pdf(self, file_path: str) -> Dict[str, Any]: | |
| """Extract data from PDF""" | |
| try: | |
| with open(file_path, 'rb') as file: | |
| reader = PdfReader(file) | |
| full_text = [] | |
| for page in reader.pages: | |
| text = page.extract_text() | |
| if text: | |
| full_text.append(text) | |
| text_content = '\n'.join(full_text) | |
| return self._parse_resume_text(text_content) | |
| except Exception as e: | |
| logger.error(f"Error extracting from PDF: {e}") | |
| return {} | |
| def _extract_from_pptx(self, file_path: str) -> Dict[str, Any]: | |
| """Extract data from PowerPoint""" | |
| try: | |
| prs = Presentation(file_path) | |
| full_text = [] | |
| for slide in prs.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| full_text.append(shape.text.strip()) | |
| text_content = '\n'.join(full_text) | |
| return self._parse_resume_text(text_content) | |
| except Exception as e: | |
| logger.error(f"Error extracting from PPTX: {e}") | |
| return {} | |
| def _extract_from_text(self, file_path: str) -> Dict[str, Any]: | |
| """Extract data from text file""" | |
| try: | |
| # try multiple encodings safely | |
| try: | |
| with open(file_path, 'r', encoding='utf-8') as file: | |
| text_content = file.read() | |
| except Exception: | |
| try: | |
| with open(file_path, 'r', encoding='utf-16') as file: | |
| text_content = file.read() | |
| except Exception: | |
| with open(file_path, 'rb') as file: | |
| text_content = file.read().decode('cp1252', errors='ignore') | |
| return self._parse_resume_text(text_content) | |
| except Exception as e: | |
| logger.error(f"Error extracting from text: {e}") | |
| return {} | |
| def _parse_resume_text(self, text: str) -> Dict[str, Any]: | |
| """Parse resume text into structured data""" | |
| data = { | |
| 'full_text': text, | |
| 'contact': {}, | |
| 'summary': '', | |
| 'experience': [], | |
| 'education': [], | |
| 'skills': [], | |
| 'certifications': [], | |
| 'projects': [], | |
| 'languages': [] | |
| } | |
| lines = text.split('\n') | |
| # Extract email | |
| email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' | |
| emails = re.findall(email_pattern, text) | |
| if emails: | |
| data['contact']['email'] = emails[0] | |
| # Extract phone | |
| phone_pattern = r'[\+]?[()]?[0-9]{1,4}[)]?[-\s\.]?[()]?[0-9]{1,4}[)]?[-\s\.]?[0-9]{1,5}[-\s\.]?[0-9]{1,5}' | |
| phones = re.findall(phone_pattern, text) | |
| if phones: | |
| data['contact']['phone'] = phones[0] | |
| # Extract LinkedIn URL | |
| linkedin_pattern = r'linkedin\.com/in/[\w-]+' | |
| linkedin = re.findall(linkedin_pattern, text.lower()) | |
| if linkedin: | |
| data['contact']['linkedin'] = f"https://{linkedin[0]}" | |
| # Extract name (usually first non-empty line) | |
| for line in lines: | |
| if line.strip() and not any(char.isdigit() for char in line[:5]): | |
| data['contact']['name'] = line.strip() | |
| break | |
| # Extract sections | |
| current_section = None | |
| section_content = [] | |
| section_keywords = { | |
| 'experience': ['experience', 'work history', 'employment', 'professional experience'], | |
| 'education': ['education', 'academic', 'qualification'], | |
| 'skills': ['skills', 'technical skills', 'competencies', 'expertise'], | |
| 'summary': ['summary', 'objective', 'profile', 'about'], | |
| 'projects': ['projects', 'portfolio'], | |
| 'certifications': ['certifications', 'certificates', 'credentials'], | |
| 'languages': ['languages', 'language skills'] | |
| } | |
| for line in lines: | |
| line_lower = line.lower().strip() | |
| # Check if this line is a section header | |
| for section, keywords in section_keywords.items(): | |
| if any(keyword in line_lower for keyword in keywords): | |
| # Save previous section | |
| if current_section and section_content: | |
| if current_section in ['experience', 'education', 'projects']: | |
| data[current_section] = self._parse_list_section(section_content) | |
| elif current_section == 'skills': | |
| data[current_section] = self._parse_skills(section_content) | |
| else: | |
| data[current_section] = '\n'.join(section_content) | |
| current_section = section | |
| section_content = [] | |
| break | |
| else: | |
| if current_section: | |
| section_content.append(line) | |
| # Save last section | |
| if current_section and section_content: | |
| if current_section in ['experience', 'education', 'projects']: | |
| data[current_section] = self._parse_list_section(section_content) | |
| elif current_section == 'skills': | |
| data[current_section] = self._parse_skills(section_content) | |
| else: | |
| data[current_section] = '\n'.join(section_content) | |
| return data | |
| def _parse_list_section(self, lines: List[str]) -> List[Dict[str, str]]: | |
| """Parse experience/education/projects sections""" | |
| items = [] | |
| current_item = {} | |
| for line in lines: | |
| if line.strip(): | |
| # Simple heuristic: lines with dates might be titles | |
| if re.search(r'\d{4}', line): | |
| if current_item: | |
| items.append(current_item) | |
| current_item = {'title': line.strip(), 'description': ''} | |
| elif current_item: | |
| current_item['description'] += line.strip() + ' ' | |
| else: | |
| current_item = {'title': line.strip(), 'description': ''} | |
| if current_item: | |
| items.append(current_item) | |
| return items | |
| def _parse_skills(self, lines: List[str]) -> List[str]: | |
| """Parse skills section""" | |
| skills = [] | |
| for line in lines: | |
| # Split by common delimiters | |
| parts = re.split(r'[,;|β’Β·]', line) | |
| for part in parts: | |
| skill = part.strip() | |
| if skill and len(skill) > 1: | |
| skills.append(skill) | |
| return skills | |
| def export_to_format(self, data: Dict[str, Any], format: str, template: Optional[str] = None) -> bytes: | |
| """Export resume data to specified format""" | |
| format = format.lower() | |
| if format == 'docx' and DOCX_AVAILABLE: | |
| return self._export_to_docx(data, template) | |
| elif format == 'pdf' and PDF_WRITE_AVAILABLE: | |
| return self._export_to_pdf(data, template) | |
| elif format == 'pptx' and PPTX_AVAILABLE: | |
| return self._export_to_pptx(data, template) | |
| else: | |
| return self._export_to_text(data).encode('utf-8') | |
| def _export_to_docx(self, data: Dict[str, Any], template: Optional[str] = None) -> bytes: | |
| """Export to Word document""" | |
| doc = Document() | |
| # Add title (name) | |
| if data.get('contact', {}).get('name'): | |
| title = doc.add_heading(data['contact']['name'], 0) | |
| title.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| # Add contact info | |
| if data.get('contact'): | |
| contact_para = doc.add_paragraph() | |
| contact_para.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| contact_items = [] | |
| if data['contact'].get('email'): | |
| contact_items.append(data['contact']['email']) | |
| if data['contact'].get('phone'): | |
| contact_items.append(data['contact']['phone']) | |
| if data['contact'].get('linkedin'): | |
| contact_items.append(data['contact']['linkedin']) | |
| contact_para.add_run(' | '.join(contact_items)) | |
| # Add summary | |
| if data.get('summary'): | |
| doc.add_heading('Professional Summary', 1) | |
| doc.add_paragraph(data['summary']) | |
| # Add experience | |
| if data.get('experience'): | |
| doc.add_heading('Professional Experience', 1) | |
| for exp in data['experience']: | |
| if isinstance(exp, dict): | |
| doc.add_heading(exp.get('title', ''), 2) | |
| doc.add_paragraph(exp.get('description', '')) | |
| else: | |
| doc.add_paragraph(str(exp)) | |
| # Add education | |
| if data.get('education'): | |
| doc.add_heading('Education', 1) | |
| for edu in data['education']: | |
| if isinstance(edu, dict): | |
| doc.add_heading(edu.get('title', ''), 2) | |
| doc.add_paragraph(edu.get('description', '')) | |
| else: | |
| doc.add_paragraph(str(edu)) | |
| # Add skills | |
| if data.get('skills'): | |
| doc.add_heading('Skills', 1) | |
| skills_para = doc.add_paragraph() | |
| if isinstance(data['skills'], list): | |
| for skill in data['skills']: | |
| skills_para.add_run(f'β’ {skill}\n') | |
| else: | |
| skills_para.add_run(str(data['skills'])) | |
| # Save to bytes | |
| buffer = io.BytesIO() | |
| doc.save(buffer) | |
| buffer.seek(0) | |
| return buffer.getvalue() | |
| def _export_to_pdf(self, data: Dict[str, Any], template: Optional[str] = None) -> bytes: | |
| """Export to PDF""" | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Title style | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=24, | |
| textColor=colors.HexColor('#2E4057'), | |
| alignment=TA_CENTER, | |
| spaceAfter=12 | |
| ) | |
| # Add name | |
| if data.get('contact', {}).get('name'): | |
| story.append(Paragraph(data['contact']['name'], title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Add contact info | |
| if data.get('contact'): | |
| contact_items = [] | |
| if data['contact'].get('email'): | |
| contact_items.append(data['contact']['email']) | |
| if data['contact'].get('phone'): | |
| contact_items.append(data['contact']['phone']) | |
| if data['contact'].get('linkedin'): | |
| contact_items.append(data['contact']['linkedin']) | |
| contact_style = ParagraphStyle( | |
| 'Contact', | |
| parent=styles['Normal'], | |
| alignment=TA_CENTER | |
| ) | |
| story.append(Paragraph(' | '.join(contact_items), contact_style)) | |
| story.append(Spacer(1, 20)) | |
| # Add sections | |
| for section, heading in [ | |
| ('summary', 'Professional Summary'), | |
| ('experience', 'Professional Experience'), | |
| ('education', 'Education'), | |
| ('skills', 'Skills') | |
| ]: | |
| if data.get(section): | |
| story.append(Paragraph(heading, styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| if isinstance(data[section], list): | |
| for item in data[section]: | |
| if isinstance(item, dict): | |
| story.append(Paragraph(item.get('title', ''), styles['Heading3'])) | |
| story.append(Paragraph(item.get('description', ''), styles['Normal'])) | |
| else: | |
| story.append(Paragraph(f'β’ {item}', styles['Normal'])) | |
| story.append(Spacer(1, 6)) | |
| else: | |
| story.append(Paragraph(str(data[section]), styles['Normal'])) | |
| story.append(Spacer(1, 12)) | |
| doc.build(story) | |
| buffer.seek(0) | |
| return buffer.getvalue() | |
| def _export_to_pptx(self, data: Dict[str, Any], template: Optional[str] = None) -> bytes: | |
| """Export to PowerPoint""" | |
| prs = Presentation() | |
| # Title slide | |
| slide = prs.slides.add_slide(prs.slide_layouts[0]) | |
| title = slide.shapes.title | |
| subtitle = slide.placeholders[1] | |
| if data.get('contact', {}).get('name'): | |
| title.text = data['contact']['name'] | |
| contact_items = [] | |
| if data.get('contact'): | |
| if data['contact'].get('email'): | |
| contact_items.append(data['contact']['email']) | |
| if data['contact'].get('phone'): | |
| contact_items.append(data['contact']['phone']) | |
| subtitle.text = ' | '.join(contact_items) | |
| # Summary slide | |
| if data.get('summary'): | |
| slide = prs.slides.add_slide(prs.slide_layouts[1]) | |
| slide.shapes.title.text = "Professional Summary" | |
| slide.placeholders[1].text = data['summary'] | |
| # Experience slides | |
| if data.get('experience'): | |
| for exp in data['experience'][:3]: # Limit to 3 for brevity | |
| slide = prs.slides.add_slide(prs.slide_layouts[1]) | |
| slide.shapes.title.text = "Professional Experience" | |
| if isinstance(exp, dict): | |
| content = f"{exp.get('title', '')}\n\n{exp.get('description', '')}" | |
| else: | |
| content = str(exp) | |
| slide.placeholders[1].text = content | |
| # Skills slide | |
| if data.get('skills'): | |
| slide = prs.slides.add_slide(prs.slide_layouts[1]) | |
| slide.shapes.title.text = "Skills" | |
| if isinstance(data['skills'], list): | |
| slide.placeholders[1].text = '\n'.join([f'β’ {skill}' for skill in data['skills']]) | |
| else: | |
| slide.placeholders[1].text = str(data['skills']) | |
| # Save to bytes | |
| buffer = io.BytesIO() | |
| prs.save(buffer) | |
| buffer.seek(0) | |
| return buffer.getvalue() | |
| def _export_to_text(self, data: Dict[str, Any]) -> str: | |
| """Export to plain text""" | |
| lines = [] | |
| # Name and contact | |
| if data.get('contact', {}).get('name'): | |
| lines.append(data['contact']['name']) | |
| lines.append('=' * len(data['contact']['name'])) | |
| if data.get('contact'): | |
| contact_items = [] | |
| for field in ['email', 'phone', 'linkedin']: | |
| if data['contact'].get(field): | |
| contact_items.append(data['contact'][field]) | |
| if contact_items: | |
| lines.append(' | '.join(contact_items)) | |
| lines.append('') | |
| # Sections | |
| for section, heading in [ | |
| ('summary', 'PROFESSIONAL SUMMARY'), | |
| ('experience', 'PROFESSIONAL EXPERIENCE'), | |
| ('education', 'EDUCATION'), | |
| ('skills', 'SKILLS'), | |
| ('certifications', 'CERTIFICATIONS'), | |
| ('projects', 'PROJECTS') | |
| ]: | |
| if data.get(section): | |
| lines.append(heading) | |
| lines.append('-' * len(heading)) | |
| if isinstance(data[section], list): | |
| for item in data[section]: | |
| if isinstance(item, dict): | |
| lines.append(f"\n{item.get('title', '')}") | |
| lines.append(item.get('description', '')) | |
| else: | |
| lines.append(f"β’ {item}") | |
| else: | |
| lines.append(str(data[section])) | |
| lines.append('') | |
| return '\n'.join(lines) | |
| # Singleton instance | |
| document_processor = DocumentProcessor() |