"""Project report parser for semi-structured PDF documents.""" from __future__ import annotations import re from typing import Dict, List, Optional, Tuple from src.models.project import GeoComponents, Milestone, ProjectRecord from src.config import get_logger logger = get_logger(__name__) class ProjectReportParser: """Comprehensive parser for semi-structured project report PDFs.""" # Identification patterns PATTERN_PROJECT_ID = r"Project ID:\s*([0-9]+)" PATTERN_PROJECT_NAME = r"Project Name\s+(.+?)\s+PEC Activity Diagram" # Classification patterns PATTERN_INDUSTRY_CODE = r"Industry Code\s+([0-9]+\s+[A-Za-z\s&\(\)]+?)(?:\s+Project Type)" PATTERN_PROJECT_TYPE = r"Project Type\s+([A-Za-z]+)" PATTERN_SECTOR = r"Sector\s+([A-Za-z\s]+?)(?:\s+SIC Product|\s+Status)" PATTERN_SIC_CODE = r"SIC Code\s+([0-9]+\s+[A-Za-z\s&,\[\]]+?)(?:\s+Sector)" PATTERN_SIC_PRODUCT = r"SIC Product\s+([0-9\*]+\s+[A-Za-z\s,\(\)\-]+?)(?:\s+Status)" # Financial patterns PATTERN_TIV_USD = r"TIV \(USD\)\s*([0-9,]+)" PATTERN_TIV_CNY = r"TIV \(CNY\)\s*([0-9,]+)" # Status patterns PATTERN_STATUS = r"Status\s+([A-Za-z]+)\s+Last Update" PATTERN_STATUS_REASON = r"Status Reason\s+(.+?)\s+Environmental" PATTERN_PROJECT_PROBABILITY = r"Project Probability\s+([A-Za-z]+\s*\([0-9\-]+%\))" # Timeline patterns PATTERN_LAST_UPDATE = r"Last Update\s+([0-9]{2}-[A-Za-z]{3}-[0-9]{4})" PATTERN_INITIAL_RELEASE = r"Initial Release\s+([0-9]{2}-[A-Za-z]{3}-[0-9]{4})" PATTERN_PEC_TIMING = r"PEC.\s*Timing\s+([A-Z][0-9])" PATTERN_PEC_ACTIVITY = r"PEC.\s*Activity\s+([A-Za-z\s\-]+?)(?:\s+Project Probability)" # Location patterns PATTERN_LOCATION = r"Location\s+(.+?)\s+Phone" PATTERN_CITY_STATE = r"City/State\s+(.+?)\s+Zone/County" PATTERN_ZONE_COUNTY = r"Zone/County\s+(.+?)\s+Project Responsibility" PATTERN_PHONE = r"Phone\s+(\+?[0-9\s\-]+)" # Plant info patterns PATTERN_PLANT_OWNER = r"Plant Owner\s+([A-Za-z\s&,\.]+?)(?:\s+Plant Parent)" PATTERN_PLANT_PARENT = r"Plant Parent\s+([A-Za-z\s&,\.]+?)(?:\s+Plant Name|\s+Unit Name)" PATTERN_PLANT_NAME = r"Plant Name\s+([A-Za-z\s&,\.]+?)(?:\s+Unit Name|\s+Plant ID)" PATTERN_PLANT_ID = r"Plant ID\s+([0-9]+)" PATTERN_UNIT_NAME = r"Unit Name\s+([A-Za-z0-9\s&]+?)(?:\s+Plant ID|\s+Location)" # Contact patterns PATTERN_PROJECT_MANAGER = r"Project Manager\s+([A-Za-z\s&,\.]+?)\s+([A-Z][a-z]+\s+[A-Z][a-z]+)\s+(?:\d|No\.|[A-Z][a-z]+\s+(?:Road|Street|Drive|Ave|Suite|Manager))" PATTERN_ENGINEER = r"Eng\s+([A-Za-z\s&,\.]+?)\s+(?:[A-Z][a-z]+\s+[A-Z][a-z]+|[0-9])" PATTERN_EC_FIRM = r"E&C\s+([A-Za-z\s&,\.]+?)\s+(?:[A-Z][a-z]+\s+[A-Z][a-z]+|[0-9])" PATTERN_EMAIL = r"\[E-Mail\]\s*([a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,})" # Technical patterns PATTERN_SCOPE = r"Scope\s+(.+?)\s+Schedule\s+" PATTERN_PROJECT_CAPACITY = r"Project Capacity\s+(?:Planned\s+)?([0-9,]+\s*(?:MW|BBL|Megawatts)[^\n]*)" PATTERN_ENVIRONMENTAL = r"Environmental\s+(Air\s*\([A-Z]\)[^C]*?)(?:\s+Construction Labor)" PATTERN_CONSTRUCTION_LABOR = r"Construction Labor Preference\s+([A-Za-z\-]+)" PATTERN_OPERATIONS_LABOR = r"Operations Labor Preference\s+([A-Za-z\-]+)" PATTERN_FUEL_TYPE = r"Project Fuel Type\s+([A-Za-z]+)" # Schedule/details patterns PATTERN_SCHEDULE = r"Schedule\s+(.+?)\bDetails\b" PATTERN_SCHEDULE_FALLBACK = r"Schedule\s+(.+?)\s+Engineering\s+(?:Civil|Contracting|Electrical)" PATTERN_DETAILS = r"Details\s+(.+?)\s+Engineering\s+(?:Civil|Contracting)" # Milestone pattern PATTERN_MILESTONE = ( r"(?P[A-Za-z0-9\-\s&/]+?)\s+" r"(?P(?:[1-4]Q\d{2,4}|\d{4}|[A-Za-z]{3}-\d{4})(?:\s*\([^\)]*\))?)" ) CHALLENGE_KEYWORDS = r"funding|partners|agreement|RFQ|bid|cancelled|delay|escalat" PATTERN_GEO = r"^(?P[^,]+),\s*(?P[^\d]+?)\s+(?P\d+)\s+(?P.+)$" def __init__(self) -> None: self._compiled_patterns: Dict[str, re.Pattern] = {} def _get_pattern(self, pattern: str, flags: int = 0) -> re.Pattern: key = f"{pattern}:{flags}" if key not in self._compiled_patterns: self._compiled_patterns[key] = re.compile(pattern, flags) return self._compiled_patterns[key] def _find_match(self, text: str, pattern: str, flags: int = 0) -> Optional[str]: compiled = self._get_pattern(pattern, flags) match = compiled.search(text) return match.group(1).strip() if match else None def _find_all_matches(self, text: str, pattern: str, flags: int = 0) -> List[str]: compiled = self._get_pattern(pattern, flags) return [m.group(1).strip() for m in compiled.finditer(text)] @staticmethod def _money_to_float(value: str) -> Optional[float]: try: return float(value.replace(",", "")) except (ValueError, AttributeError): return None def _extract_project_manager(self, text: str) -> Tuple[Optional[str], Optional[str], Optional[str]]: """Extract project manager name, company, and email.""" pm_pattern = self._get_pattern(self.PATTERN_PROJECT_MANAGER, re.IGNORECASE) pm_match = pm_pattern.search(text) name, company, email = None, None, None if pm_match: company = pm_match.group(1).strip() name = pm_match.group(2).strip() pm_section = text[pm_match.start():pm_match.start() + 500] email_match = re.search(self.PATTERN_EMAIL, pm_section) if email_match: email = email_match.group(1) logger.info(f"Found Project Manager: {name} ({company})") return name, company, email def parse(self, text: str, source_name: str) -> ProjectRecord: """Parse a report into a ProjectRecord with comprehensive field extraction.""" normalized = re.sub(r"\s+", " ", text) # Identification project_id = self._find_match(normalized, self.PATTERN_PROJECT_ID) project_name = self._find_match(normalized, self.PATTERN_PROJECT_NAME, re.IGNORECASE) # Classification industry_code = self._find_match(normalized, self.PATTERN_INDUSTRY_CODE, re.IGNORECASE) project_type = self._find_match(normalized, self.PATTERN_PROJECT_TYPE, re.IGNORECASE) sector = self._find_match(normalized, self.PATTERN_SECTOR, re.IGNORECASE) sic_code = self._find_match(normalized, self.PATTERN_SIC_CODE, re.IGNORECASE) sic_product = self._find_match(normalized, self.PATTERN_SIC_PRODUCT, re.IGNORECASE) # Financial tiv_usd = self._find_match(normalized, self.PATTERN_TIV_USD) tiv_cny = self._find_match(normalized, self.PATTERN_TIV_CNY) tiv_amount: Optional[float] = None tiv_currency: Optional[str] = None if tiv_usd: tiv_amount = self._money_to_float(tiv_usd) tiv_currency = "USD" elif tiv_cny: tiv_amount = self._money_to_float(tiv_cny) tiv_currency = "CNY" # Status status = self._find_match(normalized, self.PATTERN_STATUS, re.IGNORECASE) status_reason = self._find_match(normalized, self.PATTERN_STATUS_REASON, re.IGNORECASE) project_probability = self._find_match(normalized, self.PATTERN_PROJECT_PROBABILITY, re.IGNORECASE) # Timeline last_update = self._find_match(normalized, self.PATTERN_LAST_UPDATE) initial_release = self._find_match(normalized, self.PATTERN_INITIAL_RELEASE) pec_timing = self._find_match(normalized, self.PATTERN_PEC_TIMING, re.IGNORECASE) pec_activity = self._find_match(normalized, self.PATTERN_PEC_ACTIVITY, re.IGNORECASE) # Location address = self._find_match(normalized, self.PATTERN_LOCATION, re.IGNORECASE) city_state_line = self._find_match(normalized, self.PATTERN_CITY_STATE, re.IGNORECASE) zone_county = self._find_match(normalized, self.PATTERN_ZONE_COUNTY, re.IGNORECASE) phone = self._find_match(normalized, self.PATTERN_PHONE) # Plant info plant_owner = self._find_match(normalized, self.PATTERN_PLANT_OWNER, re.IGNORECASE) plant_parent = self._find_match(normalized, self.PATTERN_PLANT_PARENT, re.IGNORECASE) plant_name = self._find_match(normalized, self.PATTERN_PLANT_NAME, re.IGNORECASE) plant_id = self._find_match(normalized, self.PATTERN_PLANT_ID) unit_name = self._find_match(normalized, self.PATTERN_UNIT_NAME, re.IGNORECASE) # Contacts project_manager, project_manager_company, project_manager_email = self._extract_project_manager(normalized) engineer_company = self._find_match(normalized, self.PATTERN_ENGINEER, re.IGNORECASE) ec_firm = self._find_match(normalized, self.PATTERN_EC_FIRM, re.IGNORECASE) # Technical scope_text = self._find_match(normalized, self.PATTERN_SCOPE, re.IGNORECASE | re.DOTALL) project_capacity = self._find_match(normalized, self.PATTERN_PROJECT_CAPACITY, re.IGNORECASE) environmental = self._find_match(normalized, self.PATTERN_ENVIRONMENTAL, re.IGNORECASE) construction_labor = self._find_match(normalized, self.PATTERN_CONSTRUCTION_LABOR, re.IGNORECASE) operations_labor = self._find_match(normalized, self.PATTERN_OPERATIONS_LABOR, re.IGNORECASE) fuel_type = self._find_match(normalized, self.PATTERN_FUEL_TYPE, re.IGNORECASE) # Schedule/details schedule_text = self._find_match(normalized, self.PATTERN_SCHEDULE, re.IGNORECASE | re.DOTALL) if not schedule_text: schedule_text = self._find_match(normalized, self.PATTERN_SCHEDULE_FALLBACK, re.IGNORECASE | re.DOTALL) details_text = self._find_match(normalized, self.PATTERN_DETAILS, re.IGNORECASE | re.DOTALL) extracted_count = sum(1 for v in [ project_id, project_name, industry_code, project_type, sector, tiv_amount, status, plant_owner, project_manager, scope_text, schedule_text, pec_timing, pec_activity ] if v is not None) logger.info(f"Extracted {extracted_count}/13 key fields from {source_name}") return ProjectRecord( source=source_name, project_id=project_id, project_name=project_name, industry_code=industry_code, project_type=project_type, sector=sector, sic_code=sic_code, sic_product=sic_product, tiv_amount=tiv_amount, tiv_currency=tiv_currency, status=status, status_reason=status_reason, project_probability=project_probability, last_update=last_update, initial_release=initial_release, pec_timing=pec_timing, pec_activity=pec_activity, address=address, city_state_line=city_state_line, zone_county=zone_county, plant_owner=plant_owner, plant_parent=plant_parent, plant_name=plant_name, plant_id=plant_id, unit_name=unit_name, project_manager=project_manager, project_manager_company=project_manager_company, project_manager_email=project_manager_email, engineer_company=engineer_company, ec_firm=ec_firm, phone=phone, scope_text=scope_text, project_capacity=project_capacity, environmental=environmental, construction_labor=construction_labor, operations_labor=operations_labor, fuel_type=fuel_type, schedule_text=schedule_text, details_text=details_text, ) def extract_milestones(self, schedule_text: Optional[str]) -> List[Milestone]: """Extract milestone-like statements from schedule text.""" if not schedule_text: return [] milestones: List[Milestone] = [] pattern = self._get_pattern(self.PATTERN_MILESTONE) for match in pattern.finditer(schedule_text): name = match.group("name").strip() date_text = match.group("date").strip() if len(name) >= 3 and name.lower() not in ("the", "and", "for", "with"): milestones.append(Milestone( name=name, date_text=date_text, sentence=schedule_text[max(0, match.start()-50):match.end()+20].strip(), )) if not milestones and schedule_text.strip(): milestones.append(Milestone(name="Schedule", date_text="", sentence=schedule_text.strip()[:200])) return milestones def derive_challenges(self, record: ProjectRecord) -> List[str]: """Derive candidate challenges/constraints from record fields.""" candidates: List[str] = [] if record.status_reason: candidates.append(f"Status reason: {record.status_reason}") if record.details_text: candidates.append(record.details_text) if record.schedule_text and re.search(self.CHALLENGE_KEYWORDS, record.schedule_text, re.IGNORECASE): candidates.append("Dependencies / commercial gating mentioned in schedule (funding, partners, RFQs/bids).") if record.status and record.status.lower() == "cancelled": candidates.append("Project status is Cancelled.") seen: set = set() cleaned: List[str] = [] for candidate in candidates: candidate = candidate.strip() if candidate and candidate not in seen: seen.add(candidate) cleaned.append(candidate) return cleaned def parse_city_state_country(self, city_state_line: Optional[str]) -> GeoComponents: """Parse City/State line into structured components.""" if not city_state_line: return GeoComponents() line = city_state_line.strip() pattern = self._get_pattern(self.PATTERN_GEO) match = pattern.match(line) if not match: return GeoComponents(city=line) return GeoComponents( city=match.group("city").strip(), state=match.group("state").strip(), postal=match.group("postal").strip(), country=match.group("country").strip(), ) _default_parser: Optional[ProjectReportParser] = None def get_parser() -> ProjectReportParser: """Get the default parser instance (singleton).""" global _default_parser if _default_parser is None: _default_parser = ProjectReportParser() return _default_parser