Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import io | |
| import datetime | |
| import re | |
| import base64 | |
| import subprocess | |
| import time | |
| import secrets | |
| import traceback | |
| import logging | |
| import csv | |
| from functools import wraps | |
| from threading import Thread | |
| from flask import Flask, request, jsonify, render_template, Response, redirect, url_for, flash, session | |
| from flask_sqlalchemy import SQLAlchemy | |
| from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user | |
| from werkzeug.security import generate_password_hash, check_password_hash | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| import requests | |
| import PyPDF2 | |
| import fitz | |
| from PIL import Image, ImageOps | |
| import pytesseract | |
| import docx | |
| from docx import Document | |
| from geopy.geocoders import Nominatim | |
| import folium | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| try: | |
| from office365.runtime.auth.client_credential import ClientCredential | |
| from office365.sharepoint.client_context import ClientContext | |
| HAS_SP = True | |
| except ImportError: | |
| HAS_SP = False | |
| app = Flask(__name__, template_folder='templates') | |
| app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1) | |
| app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'v178_master_key') | |
| app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024 | |
| app.config['SESSION_COOKIE_SAMESITE'] = 'None' | |
| app.config['SESSION_COOKIE_SECURE'] = True | |
| db_path = '/data/users.db' if os.path.exists('/data') else os.path.join(os.getcwd(), 'users.db') | |
| app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}' | |
| app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False | |
| db = SQLAlchemy(app) | |
| login_manager = LoginManager() | |
| login_manager.init_app(app) | |
| login_manager.login_view = 'login' | |
| USER_CONTEXTS = {} | |
| API_KEY = os.environ.get("GOOGLE_API_KEY") | |
| SP_SITE = os.environ.get('SP_SITE_URL') | |
| SP_ID = os.environ.get('SP_CLIENT_ID') | |
| SP_SECRET = os.environ.get('SP_CLIENT_SECRET') | |
| try: subprocess.check_output(["tesseract", "--version"]); HAS_OCR = True | |
| except: HAS_OCR = False | |
| class User(UserMixin, db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| username = db.Column(db.String(150), unique=True, nullable=False) | |
| password = db.Column(db.String(150), nullable=False) | |
| api_token = db.Column(db.String(100), unique=True, nullable=False) | |
| class AnalysisHistory(db.Model): | |
| id = db.Column(db.Integer, primary_key=True) | |
| user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) | |
| fmi = db.Column(db.String(50)) | |
| municipio = db.Column(db.String(100)) | |
| date_created = db.Column(db.String(50)) | |
| json_data = db.Column(db.Text) | |
| def load_user(user_id): return User.query.get(int(user_id)) | |
| class SharePointManager: | |
| def ctx(self): | |
| if not (HAS_SP and SP_SITE and SP_ID): return None | |
| try: return ClientContext(SP_SITE).with_credentials(ClientCredential(SP_ID, SP_SECRET)) | |
| except: return None | |
| def get_user_from_sp(self, username): | |
| ctx = self.ctx() | |
| if not ctx: return None | |
| try: | |
| items = ctx.web.lists.get_by_title("InmoGuard_Usuarios").items.filter(f"Title eq '{username}'").get().execute_query() | |
| if len(items) > 0: return items[0].properties | |
| except: pass | |
| return None | |
| def create_user_in_sp(self, username, password, token): | |
| def _task(): | |
| ctx = self.ctx() | |
| if not ctx: return | |
| try: ctx.web.lists.get_by_title("InmoGuard_Usuarios").add_item({"Title": username, "Password": password, "ApiToken": token}).execute_query() | |
| except: pass | |
| Thread(target=_task).start() | |
| def save_history_to_sp(self, username, fmi, muni, data_json): | |
| def _task(): | |
| ctx = self.ctx() | |
| if not ctx: return | |
| try: | |
| safe_json = json.dumps(data_json) | |
| if len(safe_json) > 60000: safe_json = safe_json[:60000] | |
| ctx.web.lists.get_by_title("InmoGuard_Historial").add_item({ | |
| "Title": fmi, "Usuario": username, "Municipio": muni, | |
| "Fecha": datetime.datetime.now().strftime("%Y-%m-%d"), "JsonData": safe_json | |
| }).execute_query() | |
| except: pass | |
| Thread(target=_task).start() | |
| def fetch_history_from_sp(self, username): | |
| ctx = self.ctx() | |
| if not ctx: return None | |
| try: | |
| items = ctx.web.lists.get_by_title("InmoGuard_Historial").items.filter(f"Usuario eq '{username}'").order_by("ID", False).top(15).get().execute_query() | |
| data = [] | |
| for i in items: | |
| try: | |
| data.append({ | |
| "fmi": i.properties.get('Title'), "municipio": i.properties.get('Municipio'), | |
| "date": i.properties.get('Fecha'), "data": json.loads(i.properties.get('JsonData', '{}')) | |
| }) | |
| except: pass | |
| return data | |
| except: return None | |
| sp_manager = SharePointManager() | |
| # --- CONEXIÓN V178: CAZADOR DE MODELOS --- | |
| CURRENT_MODEL = None | |
| def get_best_model(): | |
| global CURRENT_MODEL | |
| if CURRENT_MODEL: return CURRENT_MODEL | |
| if not API_KEY: return None | |
| try: | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models?key={API_KEY}" | |
| resp = requests.get(url, timeout=10) | |
| if resp.status_code == 200: | |
| data = resp.json() | |
| available = [m['name'].replace('models/', '') for m in data.get('models', []) if 'generateContent' in m.get('supportedGenerationMethods', [])] | |
| logger.info(f"Modelos API: {available}") | |
| for pref in ['gemini-1.5-flash', 'gemini-1.5-flash-latest', 'gemini-1.5-pro']: | |
| for m in available: | |
| if pref in m: | |
| CURRENT_MODEL = m | |
| return m | |
| if available: return available[0] | |
| except: pass | |
| return "gemini-1.5-flash" | |
| def call_gemini_api(payload): | |
| if not API_KEY: return {"error": True, "msg": "Falta API Key"} | |
| model = get_best_model() | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={API_KEY}" | |
| if "safetySettings" not in payload: | |
| payload["safetySettings"] = [{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}] | |
| for i in range(3): | |
| try: | |
| res = requests.post(url, json=payload, headers={'Content-Type': 'application/json'}, timeout=180) | |
| if res.status_code == 200: return res.json() | |
| elif res.status_code == 429: time.sleep(5) | |
| elif res.status_code == 404: | |
| global CURRENT_MODEL; CURRENT_MODEL = None; model = get_best_model(); | |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={API_KEY}" | |
| continue | |
| else: return {"error": True, "msg": f"Google Error {res.status_code}"} | |
| except: time.sleep(2) | |
| return {"error": True, "msg": "Servidor saturado."} | |
| def repair_json(raw): | |
| try: | |
| start = raw.find('{'); end = raw.rfind('}') + 1 | |
| if start == -1 or end == 0: return {} | |
| clean = raw[start:end].replace('```json', '').replace('```', '') | |
| return json.loads(clean) | |
| except: return {} | |
| ORIP_MAP = { "50N":"Bogotá Norte", "50S":"Bogotá Sur", "50C":"Bogotá Centro", "040":"Barranquilla", "370":"Cali", "001":"Medellín" } | |
| def identify_orip(fmi): | |
| if not fmi or '-' not in fmi: return "N/A" | |
| return ORIP_MAP.get(fmi.split('-')[0].strip(), "Oficina Registro") | |
| def extract_patterns(text): | |
| data = { "nupre": "No detectado", "fmis": [], "chips": [], "cedula": "No detectada" } | |
| try: | |
| fmis = re.findall(r'\b\d{3}-\d+\b', text) | |
| data["fmis"] = list(set(fmis)) | |
| nupre = re.search(r'\b[A-Z]{3}-?\d{4}-?[A-Z]{4}\b', text) | |
| if nupre: data["nupre"] = nupre.group(0) | |
| chip = re.search(r'\b[A-Z]{3}\s*\d{5,}\b', text) | |
| if chip: data["chips"] = [chip.group(0).replace(" ", "")] | |
| ced = re.search(r'Cédula Catastral[:\s]+([\d\w-]+)', text, re.IGNORECASE) | |
| if ced: data["cedula"] = ced.group(1) | |
| except: pass | |
| return data | |
| def analyze_metadata(file_stream): | |
| try: | |
| pdf = PyPDF2.PdfReader(file_stream) | |
| meta = str(pdf.metadata).lower() | |
| if 'ilovepdf' in meta: return {"riesgo": "ALTO", "alerta": "⚠️ Modificado: iLovePDF"} | |
| return {"riesgo": "BAJO", "alerta": "✅ Original"} | |
| except: return {"riesgo": "BAJO", "alerta": "✅ Válido"} | |
| def extract_multimodal(files): | |
| text_parts = [] | |
| image_parts = [] | |
| forensic = [] | |
| for f in files: | |
| try: | |
| fname = f.filename.lower() | |
| content = f.read(); f.seek(0) | |
| if fname.endswith('.pdf'): | |
| forensic.append({"archivo": fname, "datos": analyze_metadata(io.BytesIO(content))}) | |
| try: | |
| pdf = PyPDF2.PdfReader(io.BytesIO(content)) | |
| t = "" | |
| for p in pdf.pages: t += (p.extract_text() or "") + "\n" | |
| text_parts.append(f"--- TEXTO PDF {fname} ---\n{t}") | |
| except: pass | |
| elif fname.endswith(('.jpg', '.jpeg', '.png')): | |
| b64_img = base64.b64encode(content).decode('utf-8') | |
| mime = "image/png" if fname.endswith('.png') else "image/jpeg" | |
| image_parts.append({"inline_data": {"mime_type": mime, "data": b64_img}}) | |
| if HAS_OCR: | |
| t = pytesseract.image_to_string(Image.open(io.BytesIO(content))) | |
| text_parts.append(f"--- OCR IMAGEN {fname} ---\n{t}") | |
| except: pass | |
| full_text = "\n".join(text_parts) | |
| return full_text, image_parts, forensic | |
| def get_geo_data(text_full, address_ia, muni_ia): | |
| try: | |
| geo = Nominatim(user_agent=f"actidata_v178_{int(time.time())}", timeout=5) | |
| target = muni_ia if muni_ia else "Colombia" | |
| addr = address_ia.lower().replace('apto','').strip() if address_ia else "" | |
| addr = re.sub(r'int\s*\d+', '', addr) | |
| addr = re.sub(r'piso\s*\d+', '', addr) | |
| loc = None | |
| if addr and len(addr)>5: | |
| try: loc = geo.geocode(f"{addr}, {target}, Colombia") | |
| except: pass | |
| if not loc: | |
| try: loc = geo.geocode(f"{target}, Colombia") | |
| except: pass | |
| if loc: | |
| # V178: SOLO DATOS (El mapa lo hace el JS) | |
| g_link = f"http://googleusercontent.com/maps.google.com/maps?q={loc.latitude},{loc.longitude}&layer=c&cbll={loc.latitude},{loc.longitude}" | |
| return {"lat": loc.latitude, "lon": loc.longitude}, target, g_link | |
| except: pass | |
| return None, "No ubicado", "" | |
| def calculate_inmoscore(data): | |
| score = 100 | |
| details = [] | |
| has_sae = data.get('sae_flags', {}).get('tiene_embargo_sae', False) | |
| has_env = data.get('sae_flags', {}).get('tiene_problemas_ambientales', False) | |
| label = "VIABLE" | |
| if has_sae: | |
| if not has_env: | |
| score -= 5; details.append("ACTIVO SAE (Viable Exclusivo)") | |
| label = "VIABLE COMERCIALIZACIÓN EXCLUSIVA SAE" | |
| # PRESERVAR TEXTO IA | |
| if "SAE" not in data.get('dic', {}).get('txt', ''): | |
| data['dic']['txt'] = (data.get('dic', {}).get('txt', '') + "\n\nNOTA: Activo SAE sin restricciones ambientales. VIABLE.") | |
| else: | |
| score -= 80; details.append("ACTIVO SAE (Bloqueo Ambiental)") | |
| label = "NO VIABLE - RESTRICCIÓN AMBIENTAL" | |
| if "falsa" in str(data.get('vur')).lower(): score -= 30; details.append("Falsa Tradición") | |
| data.setdefault('dic', {})['res'] = label | |
| tag = "EXCELENTE" if score >= 90 else ("BUENO" if score >= 70 else "REGULAR") | |
| return {"puntaje": max(0, score), "detalles": details, "etiqueta": tag} | |
| # ========================================== | |
| # CEREBRO IA V178 (FULL DETAIL) | |
| # ========================================== | |
| def analyze_logic(text, imgs, forensic): | |
| patterns = extract_patterns(text) | |
| final_data = { | |
| "sae_flags": {"tiene_embargo_sae": False, "tiene_problemas_ambientales": False}, | |
| "meta": { | |
| 'cedula_catastral': patterns['cedula'], 'nupre': patterns['nupre'], | |
| 'chip': patterns['chips'][0] if patterns['chips'] else "", | |
| 'fmis_detected': patterns['fmis'], | |
| 'orip_location': identify_orip(patterns['fmis'][0] if patterns['fmis'] else ""), | |
| 'municipio': 'Colombia', 'dir_legal': '' | |
| }, | |
| "propietarios_actuales": [], "propietarios_anteriores": [], "historial_propiedad": [], | |
| "vur": {"anotaciones_detalle": [], "falsa_tradicion": "No detectada"}, | |
| "marketing": {"commercial_pitch": "Pendiente.", "descripcion_venta": "-", "puntos_fuertes": []}, | |
| "inspeccion_visual": {}, "negociacion": {}, "rentabilidad": {}, | |
| "cruce_documentos": {"inconsistencias": "Pendiente"}, | |
| "analisis_sae_frisco": {}, "analisis_ambiental": {}, "semaforo_riesgos": {}, | |
| "dic": {"res": "EN PROCESO", "txt": "..."} | |
| } | |
| # PROMPT V178: MÁXIMO DETALLE | |
| prompt_text = ( | |
| "Actúa como Abogado Senior, Arquitecto y Corredor de Lujo. Analiza texto e imágenes." | |
| f"\nDatos: NUPRE={patterns['nupre']}, FMI={patterns['fmis']}." | |
| "\n\n--- TAREAS OBLIGATORIAS ---" | |
| "1. VISUAL: Describe pisos (mármol/madera?), luz, cocina, baños. ¡Detalle de Perito!" | |
| "2. HISTORIAL: Extrae TODA la cadena de dueños anteriores (Nombre, Fecha, Acto)." | |
| "3. JURIDICO: Redacta un concepto legal extenso (3 párrafos). Cita gravámenes." | |
| "4. AUDITORIA: Compara áreas físicas vs jurídicas en 'cruce_documentos'." | |
| "5. MARKETING: Redacta una descripción de venta de 500 palabras, estilo revista." | |
| "6. SAE: Si hay SAE y no problemas ambientales -> 'VIABLE COMERCIALIZACIÓN EXCLUSIVA SAE'." | |
| "\n\n--- JSON OBLIGATORIO ---" | |
| """ | |
| { | |
| "meta": { "cedula_catastral": "...", "chip": "...", "dir_legal": "...", "municipio": "..." }, | |
| "propietarios_actuales": [{"nombre": "...", "porcentaje": "..."}], | |
| "propietarios_anteriores": [{"nombre": "...", "periodo": "..."}], | |
| "historial_propiedad": [{"fecha": "YYYY", "acto": "...", "detalles": "..."}], | |
| "marketing": { "commercial_pitch": "TEXTO LARGO...", "puntos_fuertes": ["..."] }, | |
| "inspeccion_visual": { "estado_fisico": "...", "observaciones": "..." }, | |
| "negociacion": { "estrategia_sugerida": "...", "descuento_sugerido": "..." }, | |
| "rentabilidad": { "valor_venta": "...", "valor_renta": "..." }, | |
| "cruce_documentos": { "inconsistencias": "Diferencia áreas...", "analisis_linderos": "..." }, | |
| "analisis_sae_frisco": { "viabilidad_venta": "VIABLE/NO VIABLE", "detalles": "..." }, | |
| "analisis_ambiental": { "ronda_hidrica": "SI/NO", "reserva_forestal": "SI/NO", "comunidades_etnicas": "SI/NO", "concepto": "..." }, | |
| "semaforo_riesgos": { "juridico": "ALTO/MEDIO/BAJO", "fisico": "..." }, | |
| "vur": { "anotaciones_detalle": [{"nro":"...", "desc":"...", "estado":"..."}], "falsa_tradicion": "..." }, | |
| "sae_flags": { "tiene_embargo_sae": true/false, "tiene_problemas_ambientales": true/false }, | |
| "dic": { "res": "VIABLE / NO VIABLE", "txt": "CONCEPTUALIZACIÓN DETALLADA..." } | |
| } | |
| """ | |
| ) | |
| content_parts = [{"text": prompt_text}, {"text": f"CTX TEXTO:\n{text[:300000]}"}] | |
| if imgs: | |
| for img in imgs[:5]: content_parts.append(img) | |
| res = call_gemini_api({"contents": [{"parts": content_parts}]}) | |
| ai_data = {} | |
| if "candidates" in res and len(res["candidates"]) > 0: | |
| content_parts_resp = res['candidates'][0].get('content', {}).get('parts', []) | |
| if content_parts_resp: ai_data = repair_json(content_parts_resp[0]['text']) | |
| final_data.update(ai_data) | |
| final_data['meta'].update({'cedula_catastral': patterns['cedula'], 'nupre': patterns['nupre'], 'chip': patterns['chips'][0] if patterns['chips'] else "", 'fmis_detected': patterns['fmis'], 'orip_location': identify_orip(patterns['fmis'][0] if patterns['fmis'] else "")}) | |
| final_data['forense_digital'] = forensic | |
| # GEO DATA PARA LEAFLET | |
| coords, loc, g_link = get_geo_data(text, final_data['meta'].get('dir_legal'), final_data['meta'].get('municipio')) | |
| final_data['coords'] = coords; final_data['g_maps_link'] = g_link | |
| final_data['inmoscore'] = calculate_inmoscore(final_data) | |
| return final_data | |
| # ========================================== | |
| # RUTAS | |
| # ========================================== | |
| def login(): | |
| if request.method == 'POST': | |
| u = User.query.filter_by(username=request.form.get('username')).first() | |
| if u and check_password_hash(u.password, request.form.get('password')): | |
| login_user(u); return redirect(url_for('index')) | |
| sp_data = sp_manager.get_user_from_sp(request.form.get('username')) | |
| if sp_data and check_password_hash(sp_data['Password'], request.form.get('password')): | |
| new_u = User(username=request.form.get('username'), password=sp_data['Password'], api_token=sp_data['ApiToken']) | |
| db.session.add(new_u); db.session.commit() | |
| login_user(new_u); return redirect(url_for('index')) | |
| flash('Datos incorrectos') | |
| return render_template('login.html') | |
| def register(): | |
| if request.method == 'POST': | |
| uname = request.form.get('username') | |
| if User.query.filter_by(username=uname).first(): flash('Existe') | |
| else: | |
| new = User(username=request.form.get('username'), password=generate_password_hash(request.form.get('password'), method='scrypt'), api_token=secrets.token_hex(16)) | |
| db.session.add(new); db.session.commit() | |
| sp_manager.create_user_in_sp(new.username, new.password, new.api_token) | |
| return redirect(url_for('login')) | |
| return render_template('register.html') | |
| def logout(): logout_user(); return redirect(url_for('login')) | |
| def index(): return render_template('index.html', user=current_user) | |
| def analyze_route(): | |
| try: | |
| files = request.files.getlist('files') | |
| txt, imgs, forensic = extract_multimodal(files) | |
| USER_CONTEXTS[current_user.id] = {"text": txt} | |
| data = analyze_logic(txt, imgs, forensic) | |
| hist = AnalysisHistory( | |
| user_id=current_user.id, | |
| fmi=data.get('meta', {}).get('fmis_detected', ['S/N'])[0], | |
| municipio=data.get('meta', {}).get('municipio', '-'), | |
| date_created=datetime.datetime.now().strftime("%Y-%m-%d"), | |
| json_data=json.dumps(data) | |
| ) | |
| db.session.add(hist); db.session.commit() | |
| sp_manager.save_history_to_sp(current_user.username, hist.fmi, hist.municipio, data) | |
| return jsonify(data) | |
| except Exception as e: | |
| logger.error(f"CRITICO: {e}") | |
| return jsonify({"error":True, "msg":str(e)}) | |
| def get_history_api(): | |
| sp_data = sp_manager.fetch_history_from_sp(current_user.username) | |
| if sp_data: return jsonify(sp_data) | |
| hs = AnalysisHistory.query.filter_by(user_id=current_user.id).order_by(AnalysisHistory.id.desc()).limit(15).all() | |
| return jsonify([{"fmi":h.fmi, "municipio":h.municipio, "date":h.date_created, "data":json.loads(h.json_data)} for h in hs]) | |
| def test_sp(): return jsonify({"status": "OK", "sp_enabled": HAS_SP}) | |
| def ask_route(): | |
| ctx_text = "" | |
| if current_user.id in USER_CONTEXTS: ctx_text = USER_CONTEXTS[current_user.id]['text'] | |
| else: | |
| last = AnalysisHistory.query.filter_by(user_id=current_user.id).order_by(AnalysisHistory.id.desc()).first() | |
| if last: ctx_text = str(json.loads(last.json_data)) | |
| if not ctx_text: return jsonify({"answer": "No hay contexto."}) | |
| q = request.json.get('question') | |
| res = call_gemini_api({"contents": [{"parts": [{"text": f"Pregunta: {q}\nContexto:\n{ctx_text[:30000]}"}]}]}) | |
| ans = "Error" | |
| if "candidates" in res and len(res["candidates"]) > 0: ans = res['candidates'][0]['content']['parts'][0]['text'] | |
| return jsonify({"answer": ans}) | |
| # Rutas Descarga | |
| def dbro(): | |
| try: return Response(create_brochure_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Ficha.docx'}) | |
| except: return "Error", 500 | |
| def dw(): | |
| try: return Response(create_word_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Concepto.docx'}) | |
| except: return "Error", 500 | |
| def dprom(): | |
| try: return Response(create_promesa_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Promesa.docx'}) | |
| except: return "Error", 500 | |
| def dj(): return Response(json.dumps(request.json), mimetype='application/json', headers={'Content-Disposition': 'attachment;filename=data.json'}) | |
| def dcsv(): | |
| data = request.json | |
| si = io.StringIO(); cw = csv.writer(si) | |
| cw.writerow(['FMI', 'Municipio', 'Valor', 'Score']) | |
| cw.writerow([", ".join(data.get('meta', {}).get('fmis_detected', [])), data.get('meta', {}).get('municipio', ''), data.get('rentabilidad', {}).get('valor_venta', ''), data.get('inmoscore', {}).get('puntaje', '')]) | |
| return Response(si.getvalue(), mimetype='text/csv', headers={'Content-Disposition': 'attachment;filename=data.csv'}) | |
| with app.app_context(): | |
| db.create_all() | |
| if not User.query.filter_by(username='admin').first(): | |
| db.session.add(User(username='admin', password=generate_password_hash('admin', method='scrypt'), api_token='master-key')) | |
| db.session.commit() | |
| if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=7860) |