Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import PyPDF2 | |
| import re | |
| import json | |
| import io | |
| from typing import Dict, List, Tuple, Any | |
| import traceback | |
| class PropertyFormulaAnalyzer: | |
| def __init__(self, formula_file_path: str = "formulas.txt"): | |
| """Initialize the analyzer with the formula file path""" | |
| self.formula_file_path = formula_file_path | |
| self.formulas = {} | |
| self.load_formulas() | |
| def load_formulas(self): | |
| """Load and parse all formulas from the formula file""" | |
| try: | |
| with open(self.formula_file_path, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| # Parse formulas using regex | |
| # Pattern: number. cell_ref (description) = formula | |
| pattern = r'(\d+)\.\s+([A-Z]+\d+)\s*\(([^)]+)\)\s*=\s*([^=\n]+?)(?=\s+\d+\.|$)' | |
| matches = re.findall(pattern, content, re.DOTALL) | |
| for match in matches: | |
| formula_num, cell_ref, description, formula = match | |
| # Clean up the formula | |
| formula = formula.strip() | |
| formula = re.sub(r'\s+', ' ', formula) | |
| self.formulas[cell_ref] = { | |
| 'number': formula_num, | |
| 'description': description.strip(), | |
| 'formula': formula, | |
| 'cell_ref': cell_ref | |
| } | |
| print(f"Loaded {len(self.formulas)} formulas from {self.formula_file_path}") | |
| except Exception as e: | |
| print(f"Error loading formulas: {str(e)}") | |
| traceback.print_exc() | |
| def extract_text_from_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF file""" | |
| try: | |
| text = "" | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| except Exception as e: | |
| print(f"Error extracting PDF: {str(e)}") | |
| return "" | |
| def extract_text_from_txt(self, file_path: str) -> str: | |
| """Extract text from TXT file""" | |
| try: | |
| with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: | |
| return file.read() | |
| except Exception as e: | |
| print(f"Error reading TXT: {str(e)}") | |
| return "" | |
| def extract_data_from_files(self, files: List[str]) -> Dict[str, Any]: | |
| """Extract all relevant data from uploaded property files""" | |
| combined_text = "" | |
| for file_path in files: | |
| if file_path.lower().endswith('.pdf'): | |
| combined_text += self.extract_text_from_pdf(file_path) + "\n" | |
| else: | |
| combined_text += self.extract_text_from_txt(file_path) + "\n" | |
| # Extract data using comprehensive patterns | |
| extracted_data = {} | |
| # Define extraction patterns | |
| patterns = { | |
| # Basic property info | |
| 'UNITS': [r'(?:Total\s+)?Units?\s*:?\s*(\d+)', r'Units\s*(\d+)'], | |
| 'BUILDING_SF': [r'Building\s+(?:Size|SF)\s*:?\s*([\d,]+)', r'Building\s+(?:Size|SF)\s*(\d+)'], | |
| 'LOT_ACRES': [r'Lot\s+Size\s*:?\s*([\d.]+)\s*(?:acres?|Acres?)', r'Lot:\s*([\d.]+)\s*acres?'], | |
| 'LOT_SF': [r'Lot\s+(?:Size\s+)?SF\s*:?\s*([\d,]+)'], | |
| # Financial metrics | |
| 'PRICE': [r'(?:Asking\s+)?Price\s*:?\s*\$\s*([\d,]+)', r'Price\s+per\s+Unit\s*\$\s*([\d,]+)'], | |
| 'NOI': [r'Net\s+Operating\s+Income\s*(?:\(NOI\))?\s*:?\s*\$?\s*([\d,]+)', r'NOI\s*:?\s*\$?\s*([\d,]+)'], | |
| 'EGI': [r'Effective\s+Gross\s+Income\s*:?\s*\$?\s*([\d,]+)', r'EGI\s*:?\s*\$?\s*([\d,]+)'], | |
| 'GPR': [r'Gross\s+Potential\s+Rent\s*(?:\(Annual\))?\s*:?\s*\$?\s*([\d,]+)', r'GPR\s*:?\s*\$?\s*([\d,]+)'], | |
| 'OPEX': [r'Operating\s+Expenses\s*:?\s*\$?\s*([\d,]+)', r'Total\s+Operating\s+Expenses\s*=?\s*\$?\s*([\d,]+)'], | |
| 'VACANCY': [r'Vacancy\s*(?:\([\d.]+%\))?\s*:?\s*-?\$?\s*([\d,]+)'], | |
| # Operating expenses categories | |
| 'PROPERTY_TAXES': [r'Property\s+Taxes\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'INSURANCE': [r'Insurance\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'UTILITIES': [r'Utilities\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'REPAIRS_MAINTENANCE': [r'Repairs?\s*(?:&|and)?\s*Maintenance\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'PAYROLL': [r'Payroll\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'ADMINISTRATIVE': [r'Administrative\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'MARKETING': [r'Marketing\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'REPLACEMENT_RESERVES': [r'Replacement\s+Reserves\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| 'MANAGEMENT_FEE': [r'Management\s*(?:\([^)]+\))?\s*:?\s*\$?\s*([\d,]+\.?\d*)'], | |
| # Rates and percentages | |
| 'CAP_RATE': [r'Cap\s+Rate\s*:?\s*([\d.]+)%?', r'Cap\s+Rate\s+([\d.]+)'], | |
| 'INTEREST_RATE': [r'Interest\s+Rate\s*:?\s*([\d.]+)%?'], | |
| 'LTC': [r'Loan[- ]to[- ]Cost\s*(?:\(LTC\))?\s*:?\s*([\d.]+)%?'], | |
| 'EXIT_CAP_RATE': [r'Exit\s+Cap\s+Rate\s*:?\s*([\d.]+)%?'], | |
| # Demographics | |
| 'MEDIAN_INCOME': [r'Median\s+(?:HH\s+)?Income\s*:?\s*\$?\s*([\d,]+)', r'Median\s+(?:Household\s+)?Income:\s*\$?\s*([\d,]+)'], | |
| 'POPULATION': [r'Population\s*:?\s*([\d,]+)'], | |
| 'HOUSEHOLDS': [r'Households\s*:?\s*([\d,]+)'], | |
| 'RENTER_OCCUPIED_PCT': [r'Renter[- ]Occupied\s*:?\s*([\d.]+)%?'], | |
| # Construction & Development | |
| 'CONSTRUCTION_GMP': [r'(?:Total\s+)?Construction\s+GMP\s*:?\s*\$?\s*([\d,]+)'], | |
| 'SOFT_COSTS': [r'(?:Total\s+)?Soft\s+Costs?\s*:?\s*\$?\s*([\d,]+)'], | |
| 'CONTINGENCY': [r'Contingency\s*:?\s*\$?\s*([\d,]+)'], | |
| 'DEV_FEE': [r'Dev(?:elopment)?\s+Fee\s*:?\s*\$?\s*([\d,]+)'], | |
| # Land & Acquisition | |
| 'LAND_VALUE': [r'(?:Total\s+)?Land\s+Value\s*:?\s*\$?\s*([\d,]+)'], | |
| 'CLOSING_COSTS': [r'Closing\s+Costs\s*:?\s*\$?\s*([\d,]+)'], | |
| 'ACQ_FEE': [r'Acq(?:uisition)?\s+Fee\s*:?\s*\$?\s*([\d,]+)'], | |
| } | |
| # Extract values using patterns | |
| for key, pattern_list in patterns.items(): | |
| for pattern in pattern_list: | |
| matches = re.findall(pattern, combined_text, re.IGNORECASE) | |
| if matches: | |
| try: | |
| # Take the first match and clean it | |
| value_str = matches[0].replace(',', '').strip() | |
| value = float(value_str) | |
| extracted_data[key] = value | |
| break | |
| except (ValueError, IndexError): | |
| continue | |
| # Calculate derived values | |
| if 'PRICE' in extracted_data and 'UNITS' in extracted_data: | |
| extracted_data['PRICE_PER_UNIT'] = extracted_data['PRICE'] / extracted_data['UNITS'] | |
| if 'NOI' in extracted_data and 'PRICE' in extracted_data: | |
| extracted_data['CALCULATED_CAP_RATE'] = (extracted_data['NOI'] / extracted_data['PRICE']) * 100 | |
| if 'LTC' in extracted_data and extracted_data['LTC'] > 1: | |
| extracted_data['LTC'] = extracted_data['LTC'] / 100 # Convert percentage | |
| if 'INTEREST_RATE' in extracted_data and extracted_data['INTEREST_RATE'] > 1: | |
| extracted_data['INTEREST_RATE'] = extracted_data['INTEREST_RATE'] / 100 | |
| # Add common cell references based on extracted data | |
| if 'BUILDING_SF' in extracted_data: | |
| extracted_data['D2'] = extracted_data['BUILDING_SF'] | |
| extracted_data['D$2'] = extracted_data['BUILDING_SF'] | |
| extracted_data['$D$2'] = extracted_data['BUILDING_SF'] | |
| if 'UNITS' in extracted_data: | |
| extracted_data['F2'] = extracted_data['UNITS'] | |
| extracted_data['F$2'] = extracted_data['UNITS'] | |
| extracted_data['$F$2'] = extracted_data['UNITS'] | |
| # Assume RSF is 90% of GSF if not provided | |
| if 'BUILDING_SF' in extracted_data and 'E2' not in extracted_data: | |
| extracted_data['E2'] = extracted_data['BUILDING_SF'] * 0.9 | |
| extracted_data['E$2'] = extracted_data['E2'] | |
| extracted_data['$E$2'] = extracted_data['E2'] | |
| # Map common variables | |
| if 'LAND_VALUE' in extracted_data: | |
| extracted_data['C4'] = extracted_data['LAND_VALUE'] | |
| extracted_data['$C4'] = extracted_data['LAND_VALUE'] | |
| extracted_data['$C$4'] = extracted_data['LAND_VALUE'] | |
| if 'CLOSING_COSTS' in extracted_data: | |
| extracted_data['C5'] = extracted_data['CLOSING_COSTS'] | |
| extracted_data['$C5'] = extracted_data['CLOSING_COSTS'] | |
| if 'OPEX' in extracted_data: | |
| extracted_data['M15'] = extracted_data['OPEX'] | |
| extracted_data['$M$15'] = extracted_data['OPEX'] | |
| if 'EGI' in extracted_data: | |
| extracted_data['J38'] = extracted_data['EGI'] | |
| extracted_data['$J$38'] = extracted_data['EGI'] | |
| return extracted_data | |
| def extract_variables_from_formula(self, formula: str) -> List[str]: | |
| """Extract all variable references from a formula""" | |
| # Match Excel-style cell references (e.g., C4, $D$2, E2) | |
| cell_pattern = r'\$?[A-Z]+\$?\d+' | |
| variables = re.findall(cell_pattern, formula) | |
| # Also match named variables | |
| named_pattern = r'[A-Z_][A-Z0-9_]*' | |
| named_vars = re.findall(named_pattern, formula) | |
| # Filter out Excel functions | |
| excel_functions = {'SUM', 'PV', 'MIN', 'MAX', 'AVERAGE', 'IF', 'AND', 'OR'} | |
| named_vars = [v for v in named_vars if v not in excel_functions] | |
| return list(set(variables + named_vars)) | |
| def check_formula_computable(self, formula: str, data: Dict[str, Any]) -> Tuple[bool, List[str]]: | |
| """Check if a formula can be computed with available data""" | |
| variables = self.extract_variables_from_formula(formula) | |
| missing = [] | |
| for var in variables: | |
| # Check all variants of the variable | |
| variants = [var, var.replace('$', ''), var.upper()] | |
| if not any(v in data for v in variants): | |
| missing.append(var) | |
| return len(missing) == 0, missing | |
| def evaluate_formula(self, formula: str, data: Dict[str, Any]) -> Any: | |
| """Safely evaluate a formula with the provided data""" | |
| try: | |
| # Create a safe evaluation environment | |
| safe_dict = {} | |
| # Add all data to the environment | |
| for key, value in data.items(): | |
| safe_dict[key] = value | |
| safe_dict[key.replace('$', '')] = value | |
| safe_dict[key.upper()] = value | |
| # Replace Excel functions with Python equivalents | |
| formula_py = formula | |
| # Handle SUM function | |
| sum_pattern = r'SUM\(([^)]+)\)' | |
| while re.search(sum_pattern, formula_py): | |
| match = re.search(sum_pattern, formula_py) | |
| range_str = match.group(1) | |
| # For ranges like C4:C6, we'll need to handle them | |
| if ':' in range_str: | |
| # Extract the range | |
| parts = range_str.split(':') | |
| # For now, we'll just try to add the values if they exist | |
| formula_py = formula_py.replace(match.group(0), f"sum_range('{range_str}')") | |
| else: | |
| formula_py = formula_py.replace(match.group(0), f"sum([{range_str}])") | |
| # Handle PV function (present value) - simplified | |
| pv_pattern = r'PV\([^)]+\)' | |
| formula_py = re.sub(pv_pattern, '0', formula_py) # Simplified for now | |
| # Handle MIN function | |
| formula_py = re.sub(r'MIN\(([^)]+)\)', r'min([\1])', formula_py) | |
| # Replace cell references with their values | |
| for key in sorted(data.keys(), key=len, reverse=True): | |
| if key in formula_py: | |
| formula_py = formula_py.replace(key, str(data[key])) | |
| # Replace ^ with ** for exponentiation | |
| formula_py = formula_py.replace('^', '**') | |
| # Evaluate | |
| result = eval(formula_py, {"__builtins__": {}}, safe_dict) | |
| return result | |
| except Exception as e: | |
| raise Exception(f"Error evaluating formula: {str(e)}") | |
| def process_files(self, files) -> Tuple[str, str, str]: | |
| """Main processing function for Gradio interface""" | |
| try: | |
| if not files: | |
| return "β No files uploaded", "", "" | |
| # Extract file paths | |
| file_paths = [f.name for f in files] | |
| # Extract data from all files | |
| extracted_data = self.extract_data_from_files(file_paths) | |
| if not extracted_data: | |
| return "β No data could be extracted from the files", "", "" | |
| # Process formulas | |
| computable_formulas = {} | |
| non_computable_formulas = {} | |
| for cell_ref, formula_info in self.formulas.items(): | |
| formula = formula_info['formula'] | |
| is_computable, missing_vars = self.check_formula_computable(formula, extracted_data) | |
| if is_computable: | |
| try: | |
| result = self.evaluate_formula(formula, extracted_data) | |
| computable_formulas[cell_ref] = { | |
| 'description': formula_info['description'], | |
| 'formula': formula, | |
| 'result': result, | |
| 'formatted_result': f"{result:,.2f}" if isinstance(result, (int, float)) else str(result) | |
| } | |
| except Exception as e: | |
| non_computable_formulas[cell_ref] = { | |
| 'description': formula_info['description'], | |
| 'formula': formula, | |
| 'error': str(e), | |
| 'missing_variables': [] | |
| } | |
| else: | |
| non_computable_formulas[cell_ref] = { | |
| 'description': formula_info['description'], | |
| 'formula': formula, | |
| 'missing_variables': missing_vars | |
| } | |
| # Create summary | |
| summary = f""" | |
| ## π Analysis Summary | |
| **Total Formulas Loaded:** {len(self.formulas)} | |
| **β Computable Formulas:** {len(computable_formulas)} | |
| **β Non-Computable Formulas:** {len(non_computable_formulas)} | |
| **π Files Processed:** {len(file_paths)} | |
| **π’ Data Points Extracted:** {len(extracted_data)} | |
| """ | |
| # Create extracted data display | |
| data_display = "## π₯ Extracted Property Data\n\n" | |
| data_display += "| Variable | Value |\n|----------|-------|\n" | |
| for key, value in sorted(extracted_data.items()): | |
| if isinstance(value, float): | |
| data_display += f"| {key} | {value:,.2f} |\n" | |
| else: | |
| data_display += f"| {key} | {value} |\n" | |
| # Create results display | |
| results_display = "## β Computed Formulas\n\n" | |
| for cell_ref, info in sorted(computable_formulas.items()): | |
| results_display += f"### {cell_ref}: {info['description']}\n" | |
| results_display += f"**Formula:** `{info['formula']}`\n" | |
| results_display += f"**Result:** {info['formatted_result']}\n\n" | |
| if non_computable_formulas: | |
| results_display += "\n## β Non-Computable Formulas\n\n" | |
| for cell_ref, info in sorted(non_computable_formulas.items()): | |
| results_display += f"### {cell_ref}: {info['description']}\n" | |
| results_display += f"**Formula:** `{info['formula']}`\n" | |
| if info.get('missing_variables'): | |
| results_display += f"**Missing Variables:** {', '.join(info['missing_variables'])}\n" | |
| if info.get('error'): | |
| results_display += f"**Error:** {info['error']}\n" | |
| results_display += "\n" | |
| # Create JSON output | |
| json_output = { | |
| 'summary': { | |
| 'total_formulas': len(self.formulas), | |
| 'computable': len(computable_formulas), | |
| 'non_computable': len(non_computable_formulas), | |
| 'files_processed': len(file_paths) | |
| }, | |
| 'extracted_data': extracted_data, | |
| 'computable_formulas': computable_formulas, | |
| 'non_computable_formulas': non_computable_formulas | |
| } | |
| json_str = json.dumps(json_output, indent=2) | |
| return summary, data_display + "\n\n" + results_display, json_str | |
| except Exception as e: | |
| error_msg = f"β Error processing files:\n{str(e)}\n\n{traceback.format_exc()}" | |
| return error_msg, "", "" | |
| # Initialize the analyzer | |
| analyzer = PropertyFormulaAnalyzer("formulas.txt") | |
| # Create Gradio interface | |
| with gr.Blocks(title="Property Formula Analyzer", theme=gr.themes.Soft()) as app: | |
| gr.Markdown(""" | |
| # π’ Property Formula Analyzer | |
| Upload property documents (PDF or TXT) to automatically extract data and compute real estate formulas. | |
| The system will analyze your documents and calculate all computable formulas based on the extracted data. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| label="π Upload Property Documents", | |
| file_count="multiple", | |
| file_types=[".pdf", ".txt"], | |
| type="filepath" | |
| ) | |
| analyze_btn = gr.Button("π Analyze & Compute Formulas", variant="primary", size="lg") | |
| gr.Markdown(""" | |
| ### π Instructions: | |
| 1. Upload one or more property documents (PDF or TXT format) | |
| 2. Click "Analyze & Compute Formulas" | |
| 3. Review the extracted data and computed formulas | |
| 4. Download the JSON results for further analysis | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| summary_output = gr.Markdown(label="Summary") | |
| with gr.Row(): | |
| with gr.Column(): | |
| results_output = gr.Markdown(label="Results") | |
| with gr.Row(): | |
| with gr.Column(): | |
| json_output = gr.Code( | |
| label="π₯ Download Results (JSON)", | |
| language="json", | |
| lines=20 | |
| ) | |
| # Connect the button to the processing function | |
| analyze_btn.click( | |
| fn=analyzer.process_files, | |
| inputs=[file_input], | |
| outputs=[summary_output, results_output, json_output] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π Notes: | |
| - The system automatically extracts property metrics like units, price, NOI, operating expenses, etc. | |
| - Formulas are computed only when all required variables are available in the extracted data | |
| - Non-computable formulas are listed with their missing variables | |
| - All results can be downloaded as JSON for further processing | |
| """) | |
| if __name__ == "__main__": | |
| app.launch() |