| |
|
|
| from flask import Flask, render_template, request, jsonify |
| from flask_cors import CORS |
| import base64 |
| import io |
| import re |
| import json |
| import uuid |
| import time |
| import asyncio |
| from geopy.geocoders import Nominatim |
| from datetime import datetime |
| from models.logging_config import logger |
| from models.model_loader import load_model |
| from models.image_analysis import analyze_image |
| from models.pdf_analysis import extract_pdf_text, analyze_pdf_content |
| from models.property_summary import generate_property_summary |
| from models.fraud_classification import classify_fraud |
| from models.trust_score import generate_trust_score |
| from models.suggestions import generate_suggestions |
| from models.text_quality import assess_text_quality |
| from models.address_verification import verify_address |
| from models.cross_validation import perform_cross_validation |
| from models.location_analysis import analyze_location |
| from models.price_analysis import analyze_price |
| from models.legal_analysis import analyze_legal_details |
| from models.property_specs import verify_property_specs |
| from models.market_value import analyze_market_value |
| from models.image_quality import assess_image_quality |
| from models.property_relation import check_if_property_related |
| import torch |
| import numpy as np |
| import concurrent.futures |
| from PIL import Image |
|
|
| app = Flask(__name__) |
| CORS(app) |
|
|
| |
| geocoder = Nominatim(user_agent="indian_property_verifier", timeout=10) |
|
|
| def make_json_serializable(obj): |
| try: |
| if isinstance(obj, (bool, int, float, str, type(None))): |
| return obj |
| elif isinstance(obj, (list, tuple)): |
| return [make_json_serializable(item) for item in obj] |
| elif isinstance(obj, dict): |
| return {str(key): make_json_serializable(value) for key, value in obj.items()} |
| elif torch.is_tensor(obj): |
| return obj.item() if obj.numel() == 1 else obj.tolist() |
| elif np.isscalar(obj): |
| return obj.item() if hasattr(obj, 'item') else float(obj) |
| elif isinstance(obj, np.ndarray): |
| return obj.tolist() |
| else: |
| return str(obj) |
| except Exception as e: |
| logger.error(f"Error serializing object: {str(e)}") |
| return str(obj) |
|
|
| @app.route('/') |
| def index(): |
| return render_template('index.html') |
|
|
| @app.route('/get-location', methods=['POST']) |
| def get_location(): |
| try: |
| data = request.json or {} |
| latitude = data.get('latitude') |
| longitude = data.get('longitude') |
|
|
| if not latitude or not longitude: |
| logger.warning("Missing latitude or longitude") |
| return jsonify({ |
| 'status': 'error', |
| 'message': 'Latitude and longitude are required' |
| }), 400 |
|
|
| |
| try: |
| lat, lng = float(latitude), float(longitude) |
| if not (6.5 <= lat <= 37.5 and 68.0 <= lng <= 97.5): |
| return jsonify({ |
| 'status': 'error', |
| 'message': 'Coordinates are outside India' |
| }), 400 |
| except ValueError: |
| return jsonify({ |
| 'status': 'error', |
| 'message': 'Invalid coordinates format' |
| }), 400 |
|
|
| |
| for attempt in range(3): |
| try: |
| location = geocoder.reverse((latitude, longitude), exactly_one=True) |
| if location: |
| address_components = location.raw.get('address', {}) |
|
|
| |
| city = address_components.get('city', '') |
| if not city: |
| city = address_components.get('town', '') |
| if not city: |
| city = address_components.get('village', '') |
| if not city: |
| city = address_components.get('suburb', '') |
|
|
| state = address_components.get('state', '') |
| if not state: |
| state = address_components.get('state_district', '') |
|
|
| |
| postal_code = address_components.get('postcode', '') |
| if postal_code and not re.match(r'^\d{6}$', postal_code): |
| postal_code = '' |
|
|
| |
| road = address_components.get('road', '') |
| if not road: |
| road = address_components.get('street', '') |
|
|
| |
| area = address_components.get('suburb', '') |
| if not area: |
| area = address_components.get('neighbourhood', '') |
|
|
| return jsonify({ |
| 'status': 'success', |
| 'address': location.address, |
| 'street': road, |
| 'area': area, |
| 'city': city, |
| 'state': state, |
| 'country': 'India', |
| 'postal_code': postal_code, |
| 'latitude': latitude, |
| 'longitude': longitude, |
| 'formatted_address': f"{road}, {area}, {city}, {state}, India - {postal_code}" |
| }) |
| logger.warning(f"Geocoding failed on attempt {attempt + 1}") |
| time.sleep(1) |
| except Exception as e: |
| logger.error(f"Geocoding error on attempt {attempt + 1}: {str(e)}") |
| time.sleep(1) |
|
|
| return jsonify({ |
| 'status': 'error', |
| 'message': 'Could not determine location after retries' |
| }), 500 |
|
|
| except Exception as e: |
| logger.error(f"Error in get_location: {str(e)}") |
| return jsonify({ |
| 'status': 'error', |
| 'message': str(e) |
| }), 500 |
|
|
| def calculate_final_verdict(results): |
| """ |
| Calculate a comprehensive final verdict based on all analysis results. |
| This function combines all verification scores, fraud indicators, and quality assessments |
| to determine if a property listing is legitimate, suspicious, or fraudulent. |
| """ |
| try: |
| |
| verdict = { |
| 'status': 'unknown', |
| 'confidence': 0.0, |
| 'score': 0.0, |
| 'reasons': [], |
| 'critical_issues': [], |
| 'warnings': [], |
| 'recommendations': [] |
| } |
|
|
| |
| trust_score = results.get('trust_score', {}).get('score', 0) |
| fraud_classification = results.get('fraud_classification', {}) |
| quality_assessment = results.get('quality_assessment', {}) |
| specs_verification = results.get('specs_verification', {}) |
| cross_validation = results.get('cross_validation', []) |
| location_analysis = results.get('location_analysis', {}) |
| price_analysis = results.get('price_analysis', {}) |
| legal_analysis = results.get('legal_analysis', {}) |
| document_analysis = results.get('document_analysis', {}) |
| image_analysis = results.get('image_analysis', {}) |
|
|
| |
| component_scores = { |
| 'trust': trust_score, |
| 'fraud': 100 - (fraud_classification.get('alert_score', 0) * 100), |
| 'quality': quality_assessment.get('score', 0), |
| 'specs': specs_verification.get('verification_score', 0), |
| 'location': location_analysis.get('completeness_score', 0), |
| 'price': price_analysis.get('confidence', 0) * 100 if price_analysis.get('has_price') else 0, |
| 'legal': legal_analysis.get('completeness_score', 0), |
| 'documents': min(100, (document_analysis.get('pdf_count', 0) / 3) * 100) if document_analysis.get('pdf_count') else 0, |
| 'images': min(100, (image_analysis.get('image_count', 0) / 5) * 100) if image_analysis.get('image_count') else 0 |
| } |
|
|
| |
| weights = { |
| 'trust': 0.20, |
| 'fraud': 0.25, |
| 'quality': 0.15, |
| 'specs': 0.10, |
| 'location': 0.10, |
| 'price': 0.05, |
| 'legal': 0.05, |
| 'documents': 0.05, |
| 'images': 0.05 |
| } |
|
|
| final_score = sum(score * weights.get(component, 0) for component, score in component_scores.items()) |
| verdict['score'] = final_score |
|
|
| |
| fraud_level = fraud_classification.get('alert_level', 'minimal') |
| high_risk_indicators = len(fraud_classification.get('high_risk', [])) |
| critical_issues = [] |
| warnings = [] |
|
|
| |
| if fraud_level in ['critical', 'high']: |
| critical_issues.append(f"High fraud risk detected: {fraud_level} alert level") |
|
|
| if trust_score < 40: |
| critical_issues.append(f"Very low trust score: {trust_score}%") |
|
|
| if quality_assessment.get('score', 0) < 30: |
| critical_issues.append(f"Very low content quality: {quality_assessment.get('score', 0)}%") |
|
|
| if specs_verification.get('verification_score', 0) < 40: |
| critical_issues.append(f"Property specifications verification failed: {specs_verification.get('verification_score', 0)}%") |
|
|
| |
| if fraud_level == 'medium': |
| warnings.append(f"Medium fraud risk detected: {fraud_level} alert level") |
|
|
| if trust_score < 60: |
| warnings.append(f"Low trust score: {trust_score}%") |
|
|
| if quality_assessment.get('score', 0) < 60: |
| warnings.append(f"Low content quality: {quality_assessment.get('score', 0)}%") |
|
|
| if specs_verification.get('verification_score', 0) < 70: |
| warnings.append(f"Property specifications have issues: {specs_verification.get('verification_score', 0)}%") |
|
|
| |
| for check in cross_validation: |
| if check.get('status') in ['inconsistent', 'invalid', 'suspicious', 'no_match']: |
| warnings.append(f"Cross-validation issue: {check.get('message', 'Unknown issue')}") |
|
|
| |
| missing_critical = [] |
| if not location_analysis.get('completeness_score', 0) > 70: |
| missing_critical.append("Location information is incomplete") |
|
|
| if not price_analysis.get('has_price', False): |
| missing_critical.append("Price information is missing") |
|
|
| if not legal_analysis.get('completeness_score', 0) > 70: |
| missing_critical.append("Legal information is incomplete") |
|
|
| if document_analysis.get('pdf_count', 0) == 0: |
| missing_critical.append("No supporting documents provided") |
|
|
| if image_analysis.get('image_count', 0) == 0: |
| missing_critical.append("No property images provided") |
|
|
| if missing_critical: |
| warnings.append(f"Missing critical information: {', '.join(missing_critical)}") |
|
|
| |
| if critical_issues or (fraud_level in ['critical', 'high'] and trust_score < 50) or high_risk_indicators > 0: |
| verdict['status'] = 'fraudulent' |
| verdict['confidence'] = min(100, max(70, 100 - (trust_score * 0.5))) |
| elif warnings or (fraud_level == 'medium' and trust_score < 70) or specs_verification.get('verification_score', 0) < 60: |
| verdict['status'] = 'suspicious' |
| verdict['confidence'] = min(100, max(50, trust_score * 0.8)) |
| else: |
| verdict['status'] = 'legitimate' |
| verdict['confidence'] = min(100, max(70, trust_score * 0.9)) |
|
|
| |
| verdict['critical_issues'] = critical_issues |
| verdict['warnings'] = warnings |
|
|
| |
| if critical_issues: |
| verdict['recommendations'].append("Do not proceed with this property listing") |
| verdict['recommendations'].append("Report this listing to the platform") |
| elif warnings: |
| verdict['recommendations'].append("Proceed with extreme caution") |
| verdict['recommendations'].append("Request additional verification documents") |
| verdict['recommendations'].append("Verify all information with independent sources") |
| else: |
| verdict['recommendations'].append("Proceed with standard due diligence") |
| verdict['recommendations'].append("Verify final details before transaction") |
|
|
| |
| for missing in missing_critical: |
| verdict['recommendations'].append(f"Request {missing.lower()}") |
|
|
| return verdict |
| except Exception as e: |
| logger.error(f"Error calculating final verdict: {str(e)}") |
| return { |
| 'status': 'error', |
| 'confidence': 0.0, |
| 'score': 0.0, |
| 'reasons': [f"Error calculating verdict: {str(e)}"], |
| 'critical_issues': [], |
| 'warnings': [], |
| 'recommendations': ["Unable to determine property status due to an error"] |
| } |
|
|
| @app.route('/verify', methods=['POST']) |
| def verify_property(): |
| try: |
| if not request.form and not request.files: |
| logger.warning("No form data or files provided") |
| return jsonify({ |
| 'error': 'No data provided', |
| 'status': 'error' |
| }), 400 |
|
|
| |
| data = { |
| 'property_name': request.form.get('property_name', '').strip(), |
| 'property_type': request.form.get('property_type', '').strip(), |
| 'status': request.form.get('status', '').strip(), |
| 'description': request.form.get('description', '').strip(), |
| 'address': request.form.get('address', '').strip(), |
| 'city': request.form.get('city', '').strip(), |
| 'state': request.form.get('state', '').strip(), |
| 'country': request.form.get('country', 'India').strip(), |
| 'zip': request.form.get('zip', '').strip(), |
| 'latitude': request.form.get('latitude', '').strip(), |
| 'longitude': request.form.get('longitude', '').strip(), |
| 'bedrooms': request.form.get('bedrooms', '').strip(), |
| 'bathrooms': request.form.get('bathrooms', '').strip(), |
| 'total_rooms': request.form.get('total_rooms', '').strip(), |
| 'year_built': request.form.get('year_built', '').strip(), |
| 'parking': request.form.get('parking', '').strip(), |
| 'sq_ft': request.form.get('sq_ft', '').strip(), |
| 'market_value': request.form.get('market_value', '').strip(), |
| 'amenities': request.form.get('amenities', '').strip(), |
| 'nearby_landmarks': request.form.get('nearby_landmarks', '').strip(), |
| 'legal_details': request.form.get('legal_details', '').strip() |
| } |
|
|
| |
| required_fields = ['property_name', 'property_type', 'address', 'city', 'state'] |
| missing_fields = [field for field in required_fields if not data[field]] |
| if missing_fields: |
| logger.warning(f"Missing required fields: {', '.join(missing_fields)}") |
| return jsonify({ |
| 'error': f"Missing required fields: {', '.join(missing_fields)}", |
| 'status': 'error' |
| }), 400 |
|
|
| |
| images = [] |
| image_analysis = [] |
| if 'images' in request.files: |
| |
| image_files = {} |
| for img_file in request.files.getlist('images'): |
| if img_file.filename and img_file.filename.lower().endswith(('.jpg', '.jpeg', '.png')): |
| image_files[img_file.filename] = img_file |
|
|
| |
| for img_file in image_files.values(): |
| try: |
| img = Image.open(img_file) |
| buffered = io.BytesIO() |
| img.save(buffered, format="JPEG") |
| img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
| images.append(img_str) |
| image_analysis.append(analyze_image(img)) |
| except Exception as e: |
| logger.error(f"Error processing image {img_file.filename}: {str(e)}") |
| image_analysis.append({'error': str(e), 'is_property_related': False}) |
|
|
| |
| pdf_texts = [] |
| pdf_analysis = [] |
| if 'documents' in request.files: |
| |
| pdf_files = {} |
| for pdf_file in request.files.getlist('documents'): |
| if pdf_file.filename and pdf_file.filename.lower().endswith('.pdf'): |
| pdf_files[pdf_file.filename] = pdf_file |
|
|
| |
| for pdf_file in pdf_files.values(): |
| try: |
| pdf_text = extract_pdf_text(pdf_file) |
| pdf_texts.append({ |
| 'filename': pdf_file.filename, |
| 'text': pdf_text |
| }) |
| pdf_analysis.append(analyze_pdf_content(pdf_text, data)) |
| except Exception as e: |
| logger.error(f"Error processing PDF {pdf_file.filename}: {str(e)}") |
| pdf_analysis.append({'error': str(e)}) |
|
|
| |
| consolidated_text = f""" |
| Property Name: {data['property_name']} |
| Property Type: {data['property_type']} |
| Status: {data['status']} |
| Description: {data['description']} |
| Location: {data['address']}, {data['city']}, {data['state']}, {data['country']}, {data['zip']} |
| Coordinates: Lat {data['latitude']}, Long {data['longitude']} |
| Specifications: {data['bedrooms']} bedrooms, {data['bathrooms']} bathrooms, {data['total_rooms']} total rooms |
| Year Built: {data['year_built']} |
| Parking: {data['parking']} |
| Size: {data['sq_ft']} sq. ft. |
| Market Value: ₹{data['market_value']} |
| Amenities: {data['amenities']} |
| Nearby Landmarks: {data['nearby_landmarks']} |
| Legal Details: {data['legal_details']} |
| """ |
|
|
| |
| try: |
| description = data['description'] |
| if description and len(description) > 10: |
| text_language = detect(description) |
| if text_language != 'en': |
| translated_description = GoogleTranslator(source=text_language, target='en').translate(description) |
| data['description_translated'] = translated_description |
| else: |
| data['description_translated'] = description |
| else: |
| data['description_translated'] = description |
| except Exception as e: |
| logger.error(f"Error in language detection/translation: {str(e)}") |
| data['description_translated'] = data['description'] |
|
|
| |
| async def run_analyses(): |
| with concurrent.futures.ThreadPoolExecutor() as executor: |
| loop = asyncio.get_event_loop() |
| tasks = [ |
| loop.run_in_executor(executor, generate_property_summary, data), |
| loop.run_in_executor(executor, classify_fraud, consolidated_text, data), |
| loop.run_in_executor(executor, generate_trust_score, consolidated_text, image_analysis, pdf_analysis), |
| loop.run_in_executor(executor, generate_suggestions, consolidated_text, data), |
| loop.run_in_executor(executor, assess_text_quality, data['description_translated']), |
| loop.run_in_executor(executor, verify_address, data), |
| loop.run_in_executor(executor, perform_cross_validation, data), |
| loop.run_in_executor(executor, analyze_location, data), |
| loop.run_in_executor(executor, analyze_price, data), |
| loop.run_in_executor(executor, analyze_legal_details, data['legal_details']), |
| loop.run_in_executor(executor, verify_property_specs, data), |
| loop.run_in_executor(executor, analyze_market_value, data) |
| ] |
| results = await asyncio.gather(*tasks) |
| return results |
|
|
| |
| loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(loop) |
| analysis_results = loop.run_until_complete(run_analyses()) |
| loop.close() |
|
|
| |
| summary, fraud_classification, (trust_score, trust_reasoning), suggestions, quality_assessment, \ |
| address_verification, cross_validation, location_analysis, price_analysis, legal_analysis, \ |
| specs_verification, market_analysis = analysis_results |
|
|
| |
| document_analysis = { |
| 'pdf_count': len(pdf_texts), |
| 'pdf_texts': pdf_texts, |
| 'pdf_analysis': pdf_analysis |
| } |
| image_results = { |
| 'image_count': len(images), |
| 'image_analysis': image_analysis |
| } |
|
|
| report_id = str(uuid.uuid4()) |
|
|
| |
| results = { |
| 'report_id': report_id, |
| 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
| 'summary': summary, |
| 'fraud_classification': fraud_classification, |
| 'trust_score': { |
| 'score': trust_score, |
| 'reasoning': trust_reasoning |
| }, |
| 'suggestions': suggestions, |
| 'quality_assessment': quality_assessment, |
| 'address_verification': address_verification, |
| 'cross_validation': cross_validation, |
| 'location_analysis': location_analysis, |
| 'price_analysis': price_analysis, |
| 'legal_analysis': legal_analysis, |
| 'document_analysis': document_analysis, |
| 'image_analysis': image_results, |
| 'specs_verification': specs_verification, |
| 'market_analysis': market_analysis, |
| 'images': images |
| } |
|
|
| |
| final_verdict = calculate_final_verdict(results) |
| results['final_verdict'] = final_verdict |
|
|
| return jsonify(make_json_serializable(results)) |
|
|
| except Exception as e: |
| logger.error(f"Error in verify_property: {str(e)}") |
| return jsonify({ |
| 'error': 'Server error occurred. Please try again later.', |
| 'status': 'error', |
| 'details': str(e) |
| }), 500 |
|
|
| if __name__ == '__main__': |
| |
| app.run(host='0.0.0.0', port=8000, debug=True, use_reloader=False) |
|
|