ActiDataVURv3 / app.py
jcalbornoz's picture
Update app.py
1d8e93c verified
import os
import json
import io
import datetime
import re
import base64
import subprocess
import time
import secrets
import traceback
import logging
import csv
from functools import wraps
from threading import Thread
from flask import Flask, request, jsonify, render_template, Response, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.middleware.proxy_fix import ProxyFix
import requests
import PyPDF2
import fitz
from PIL import Image, ImageOps
import pytesseract
import docx
from docx import Document
from geopy.geocoders import Nominatim
import folium
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)
try:
from office365.runtime.auth.client_credential import ClientCredential
from office365.sharepoint.client_context import ClientContext
HAS_SP = True
except ImportError:
HAS_SP = False
app = Flask(__name__, template_folder='templates')
app.wsgi_app = ProxyFix(app.wsgi_app, x_proto=1, x_host=1)
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'v178_master_key')
app.config['MAX_CONTENT_LENGTH'] = 128 * 1024 * 1024
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
db_path = '/data/users.db' if os.path.exists('/data') else os.path.join(os.getcwd(), 'users.db')
app.config['SQLALCHEMY_DATABASE_URI'] = f'sqlite:///{db_path}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
USER_CONTEXTS = {}
API_KEY = os.environ.get("GOOGLE_API_KEY")
SP_SITE = os.environ.get('SP_SITE_URL')
SP_ID = os.environ.get('SP_CLIENT_ID')
SP_SECRET = os.environ.get('SP_CLIENT_SECRET')
try: subprocess.check_output(["tesseract", "--version"]); HAS_OCR = True
except: HAS_OCR = False
class User(UserMixin, db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(150), unique=True, nullable=False)
password = db.Column(db.String(150), nullable=False)
api_token = db.Column(db.String(100), unique=True, nullable=False)
class AnalysisHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
fmi = db.Column(db.String(50))
municipio = db.Column(db.String(100))
date_created = db.Column(db.String(50))
json_data = db.Column(db.Text)
@login_manager.user_loader
def load_user(user_id): return User.query.get(int(user_id))
class SharePointManager:
def ctx(self):
if not (HAS_SP and SP_SITE and SP_ID): return None
try: return ClientContext(SP_SITE).with_credentials(ClientCredential(SP_ID, SP_SECRET))
except: return None
def get_user_from_sp(self, username):
ctx = self.ctx()
if not ctx: return None
try:
items = ctx.web.lists.get_by_title("InmoGuard_Usuarios").items.filter(f"Title eq '{username}'").get().execute_query()
if len(items) > 0: return items[0].properties
except: pass
return None
def create_user_in_sp(self, username, password, token):
def _task():
ctx = self.ctx()
if not ctx: return
try: ctx.web.lists.get_by_title("InmoGuard_Usuarios").add_item({"Title": username, "Password": password, "ApiToken": token}).execute_query()
except: pass
Thread(target=_task).start()
def save_history_to_sp(self, username, fmi, muni, data_json):
def _task():
ctx = self.ctx()
if not ctx: return
try:
safe_json = json.dumps(data_json)
if len(safe_json) > 60000: safe_json = safe_json[:60000]
ctx.web.lists.get_by_title("InmoGuard_Historial").add_item({
"Title": fmi, "Usuario": username, "Municipio": muni,
"Fecha": datetime.datetime.now().strftime("%Y-%m-%d"), "JsonData": safe_json
}).execute_query()
except: pass
Thread(target=_task).start()
def fetch_history_from_sp(self, username):
ctx = self.ctx()
if not ctx: return None
try:
items = ctx.web.lists.get_by_title("InmoGuard_Historial").items.filter(f"Usuario eq '{username}'").order_by("ID", False).top(15).get().execute_query()
data = []
for i in items:
try:
data.append({
"fmi": i.properties.get('Title'), "municipio": i.properties.get('Municipio'),
"date": i.properties.get('Fecha'), "data": json.loads(i.properties.get('JsonData', '{}'))
})
except: pass
return data
except: return None
sp_manager = SharePointManager()
# --- CONEXIÓN V178: CAZADOR DE MODELOS ---
CURRENT_MODEL = None
def get_best_model():
global CURRENT_MODEL
if CURRENT_MODEL: return CURRENT_MODEL
if not API_KEY: return None
try:
url = f"https://generativelanguage.googleapis.com/v1beta/models?key={API_KEY}"
resp = requests.get(url, timeout=10)
if resp.status_code == 200:
data = resp.json()
available = [m['name'].replace('models/', '') for m in data.get('models', []) if 'generateContent' in m.get('supportedGenerationMethods', [])]
logger.info(f"Modelos API: {available}")
for pref in ['gemini-1.5-flash', 'gemini-1.5-flash-latest', 'gemini-1.5-pro']:
for m in available:
if pref in m:
CURRENT_MODEL = m
return m
if available: return available[0]
except: pass
return "gemini-1.5-flash"
def call_gemini_api(payload):
if not API_KEY: return {"error": True, "msg": "Falta API Key"}
model = get_best_model()
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={API_KEY}"
if "safetySettings" not in payload:
payload["safetySettings"] = [{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}]
for i in range(3):
try:
res = requests.post(url, json=payload, headers={'Content-Type': 'application/json'}, timeout=180)
if res.status_code == 200: return res.json()
elif res.status_code == 429: time.sleep(5)
elif res.status_code == 404:
global CURRENT_MODEL; CURRENT_MODEL = None; model = get_best_model();
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={API_KEY}"
continue
else: return {"error": True, "msg": f"Google Error {res.status_code}"}
except: time.sleep(2)
return {"error": True, "msg": "Servidor saturado."}
def repair_json(raw):
try:
start = raw.find('{'); end = raw.rfind('}') + 1
if start == -1 or end == 0: return {}
clean = raw[start:end].replace('```json', '').replace('```', '')
return json.loads(clean)
except: return {}
ORIP_MAP = { "50N":"Bogotá Norte", "50S":"Bogotá Sur", "50C":"Bogotá Centro", "040":"Barranquilla", "370":"Cali", "001":"Medellín" }
def identify_orip(fmi):
if not fmi or '-' not in fmi: return "N/A"
return ORIP_MAP.get(fmi.split('-')[0].strip(), "Oficina Registro")
def extract_patterns(text):
data = { "nupre": "No detectado", "fmis": [], "chips": [], "cedula": "No detectada" }
try:
fmis = re.findall(r'\b\d{3}-\d+\b', text)
data["fmis"] = list(set(fmis))
nupre = re.search(r'\b[A-Z]{3}-?\d{4}-?[A-Z]{4}\b', text)
if nupre: data["nupre"] = nupre.group(0)
chip = re.search(r'\b[A-Z]{3}\s*\d{5,}\b', text)
if chip: data["chips"] = [chip.group(0).replace(" ", "")]
ced = re.search(r'Cédula Catastral[:\s]+([\d\w-]+)', text, re.IGNORECASE)
if ced: data["cedula"] = ced.group(1)
except: pass
return data
def analyze_metadata(file_stream):
try:
pdf = PyPDF2.PdfReader(file_stream)
meta = str(pdf.metadata).lower()
if 'ilovepdf' in meta: return {"riesgo": "ALTO", "alerta": "⚠️ Modificado: iLovePDF"}
return {"riesgo": "BAJO", "alerta": "✅ Original"}
except: return {"riesgo": "BAJO", "alerta": "✅ Válido"}
def extract_multimodal(files):
text_parts = []
image_parts = []
forensic = []
for f in files:
try:
fname = f.filename.lower()
content = f.read(); f.seek(0)
if fname.endswith('.pdf'):
forensic.append({"archivo": fname, "datos": analyze_metadata(io.BytesIO(content))})
try:
pdf = PyPDF2.PdfReader(io.BytesIO(content))
t = ""
for p in pdf.pages: t += (p.extract_text() or "") + "\n"
text_parts.append(f"--- TEXTO PDF {fname} ---\n{t}")
except: pass
elif fname.endswith(('.jpg', '.jpeg', '.png')):
b64_img = base64.b64encode(content).decode('utf-8')
mime = "image/png" if fname.endswith('.png') else "image/jpeg"
image_parts.append({"inline_data": {"mime_type": mime, "data": b64_img}})
if HAS_OCR:
t = pytesseract.image_to_string(Image.open(io.BytesIO(content)))
text_parts.append(f"--- OCR IMAGEN {fname} ---\n{t}")
except: pass
full_text = "\n".join(text_parts)
return full_text, image_parts, forensic
def get_geo_data(text_full, address_ia, muni_ia):
try:
geo = Nominatim(user_agent=f"actidata_v178_{int(time.time())}", timeout=5)
target = muni_ia if muni_ia else "Colombia"
addr = address_ia.lower().replace('apto','').strip() if address_ia else ""
addr = re.sub(r'int\s*\d+', '', addr)
addr = re.sub(r'piso\s*\d+', '', addr)
loc = None
if addr and len(addr)>5:
try: loc = geo.geocode(f"{addr}, {target}, Colombia")
except: pass
if not loc:
try: loc = geo.geocode(f"{target}, Colombia")
except: pass
if loc:
# V178: SOLO DATOS (El mapa lo hace el JS)
g_link = f"http://googleusercontent.com/maps.google.com/maps?q={loc.latitude},{loc.longitude}&layer=c&cbll={loc.latitude},{loc.longitude}"
return {"lat": loc.latitude, "lon": loc.longitude}, target, g_link
except: pass
return None, "No ubicado", ""
def calculate_inmoscore(data):
score = 100
details = []
has_sae = data.get('sae_flags', {}).get('tiene_embargo_sae', False)
has_env = data.get('sae_flags', {}).get('tiene_problemas_ambientales', False)
label = "VIABLE"
if has_sae:
if not has_env:
score -= 5; details.append("ACTIVO SAE (Viable Exclusivo)")
label = "VIABLE COMERCIALIZACIÓN EXCLUSIVA SAE"
# PRESERVAR TEXTO IA
if "SAE" not in data.get('dic', {}).get('txt', ''):
data['dic']['txt'] = (data.get('dic', {}).get('txt', '') + "\n\nNOTA: Activo SAE sin restricciones ambientales. VIABLE.")
else:
score -= 80; details.append("ACTIVO SAE (Bloqueo Ambiental)")
label = "NO VIABLE - RESTRICCIÓN AMBIENTAL"
if "falsa" in str(data.get('vur')).lower(): score -= 30; details.append("Falsa Tradición")
data.setdefault('dic', {})['res'] = label
tag = "EXCELENTE" if score >= 90 else ("BUENO" if score >= 70 else "REGULAR")
return {"puntaje": max(0, score), "detalles": details, "etiqueta": tag}
# ==========================================
# CEREBRO IA V178 (FULL DETAIL)
# ==========================================
def analyze_logic(text, imgs, forensic):
patterns = extract_patterns(text)
final_data = {
"sae_flags": {"tiene_embargo_sae": False, "tiene_problemas_ambientales": False},
"meta": {
'cedula_catastral': patterns['cedula'], 'nupre': patterns['nupre'],
'chip': patterns['chips'][0] if patterns['chips'] else "",
'fmis_detected': patterns['fmis'],
'orip_location': identify_orip(patterns['fmis'][0] if patterns['fmis'] else ""),
'municipio': 'Colombia', 'dir_legal': ''
},
"propietarios_actuales": [], "propietarios_anteriores": [], "historial_propiedad": [],
"vur": {"anotaciones_detalle": [], "falsa_tradicion": "No detectada"},
"marketing": {"commercial_pitch": "Pendiente.", "descripcion_venta": "-", "puntos_fuertes": []},
"inspeccion_visual": {}, "negociacion": {}, "rentabilidad": {},
"cruce_documentos": {"inconsistencias": "Pendiente"},
"analisis_sae_frisco": {}, "analisis_ambiental": {}, "semaforo_riesgos": {},
"dic": {"res": "EN PROCESO", "txt": "..."}
}
# PROMPT V178: MÁXIMO DETALLE
prompt_text = (
"Actúa como Abogado Senior, Arquitecto y Corredor de Lujo. Analiza texto e imágenes."
f"\nDatos: NUPRE={patterns['nupre']}, FMI={patterns['fmis']}."
"\n\n--- TAREAS OBLIGATORIAS ---"
"1. VISUAL: Describe pisos (mármol/madera?), luz, cocina, baños. ¡Detalle de Perito!"
"2. HISTORIAL: Extrae TODA la cadena de dueños anteriores (Nombre, Fecha, Acto)."
"3. JURIDICO: Redacta un concepto legal extenso (3 párrafos). Cita gravámenes."
"4. AUDITORIA: Compara áreas físicas vs jurídicas en 'cruce_documentos'."
"5. MARKETING: Redacta una descripción de venta de 500 palabras, estilo revista."
"6. SAE: Si hay SAE y no problemas ambientales -> 'VIABLE COMERCIALIZACIÓN EXCLUSIVA SAE'."
"\n\n--- JSON OBLIGATORIO ---"
"""
{
"meta": { "cedula_catastral": "...", "chip": "...", "dir_legal": "...", "municipio": "..." },
"propietarios_actuales": [{"nombre": "...", "porcentaje": "..."}],
"propietarios_anteriores": [{"nombre": "...", "periodo": "..."}],
"historial_propiedad": [{"fecha": "YYYY", "acto": "...", "detalles": "..."}],
"marketing": { "commercial_pitch": "TEXTO LARGO...", "puntos_fuertes": ["..."] },
"inspeccion_visual": { "estado_fisico": "...", "observaciones": "..." },
"negociacion": { "estrategia_sugerida": "...", "descuento_sugerido": "..." },
"rentabilidad": { "valor_venta": "...", "valor_renta": "..." },
"cruce_documentos": { "inconsistencias": "Diferencia áreas...", "analisis_linderos": "..." },
"analisis_sae_frisco": { "viabilidad_venta": "VIABLE/NO VIABLE", "detalles": "..." },
"analisis_ambiental": { "ronda_hidrica": "SI/NO", "reserva_forestal": "SI/NO", "comunidades_etnicas": "SI/NO", "concepto": "..." },
"semaforo_riesgos": { "juridico": "ALTO/MEDIO/BAJO", "fisico": "..." },
"vur": { "anotaciones_detalle": [{"nro":"...", "desc":"...", "estado":"..."}], "falsa_tradicion": "..." },
"sae_flags": { "tiene_embargo_sae": true/false, "tiene_problemas_ambientales": true/false },
"dic": { "res": "VIABLE / NO VIABLE", "txt": "CONCEPTUALIZACIÓN DETALLADA..." }
}
"""
)
content_parts = [{"text": prompt_text}, {"text": f"CTX TEXTO:\n{text[:300000]}"}]
if imgs:
for img in imgs[:5]: content_parts.append(img)
res = call_gemini_api({"contents": [{"parts": content_parts}]})
ai_data = {}
if "candidates" in res and len(res["candidates"]) > 0:
content_parts_resp = res['candidates'][0].get('content', {}).get('parts', [])
if content_parts_resp: ai_data = repair_json(content_parts_resp[0]['text'])
final_data.update(ai_data)
final_data['meta'].update({'cedula_catastral': patterns['cedula'], 'nupre': patterns['nupre'], 'chip': patterns['chips'][0] if patterns['chips'] else "", 'fmis_detected': patterns['fmis'], 'orip_location': identify_orip(patterns['fmis'][0] if patterns['fmis'] else "")})
final_data['forense_digital'] = forensic
# GEO DATA PARA LEAFLET
coords, loc, g_link = get_geo_data(text, final_data['meta'].get('dir_legal'), final_data['meta'].get('municipio'))
final_data['coords'] = coords; final_data['g_maps_link'] = g_link
final_data['inmoscore'] = calculate_inmoscore(final_data)
return final_data
# ==========================================
# RUTAS
# ==========================================
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
u = User.query.filter_by(username=request.form.get('username')).first()
if u and check_password_hash(u.password, request.form.get('password')):
login_user(u); return redirect(url_for('index'))
sp_data = sp_manager.get_user_from_sp(request.form.get('username'))
if sp_data and check_password_hash(sp_data['Password'], request.form.get('password')):
new_u = User(username=request.form.get('username'), password=sp_data['Password'], api_token=sp_data['ApiToken'])
db.session.add(new_u); db.session.commit()
login_user(new_u); return redirect(url_for('index'))
flash('Datos incorrectos')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
uname = request.form.get('username')
if User.query.filter_by(username=uname).first(): flash('Existe')
else:
new = User(username=request.form.get('username'), password=generate_password_hash(request.form.get('password'), method='scrypt'), api_token=secrets.token_hex(16))
db.session.add(new); db.session.commit()
sp_manager.create_user_in_sp(new.username, new.password, new.api_token)
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
@login_required
def logout(): logout_user(); return redirect(url_for('login'))
@app.route('/')
@login_required
def index(): return render_template('index.html', user=current_user)
@app.route('/analyze', methods=['POST'])
@login_required
def analyze_route():
try:
files = request.files.getlist('files')
txt, imgs, forensic = extract_multimodal(files)
USER_CONTEXTS[current_user.id] = {"text": txt}
data = analyze_logic(txt, imgs, forensic)
hist = AnalysisHistory(
user_id=current_user.id,
fmi=data.get('meta', {}).get('fmis_detected', ['S/N'])[0],
municipio=data.get('meta', {}).get('municipio', '-'),
date_created=datetime.datetime.now().strftime("%Y-%m-%d"),
json_data=json.dumps(data)
)
db.session.add(hist); db.session.commit()
sp_manager.save_history_to_sp(current_user.username, hist.fmi, hist.municipio, data)
return jsonify(data)
except Exception as e:
logger.error(f"CRITICO: {e}")
return jsonify({"error":True, "msg":str(e)})
@app.route('/api/history', methods=['GET'])
@login_required
def get_history_api():
sp_data = sp_manager.fetch_history_from_sp(current_user.username)
if sp_data: return jsonify(sp_data)
hs = AnalysisHistory.query.filter_by(user_id=current_user.id).order_by(AnalysisHistory.id.desc()).limit(15).all()
return jsonify([{"fmi":h.fmi, "municipio":h.municipio, "date":h.date_created, "data":json.loads(h.json_data)} for h in hs])
@app.route('/test-sharepoint', methods=['GET'])
def test_sp(): return jsonify({"status": "OK", "sp_enabled": HAS_SP})
@app.route('/ask', methods=['POST'])
@login_required
def ask_route():
ctx_text = ""
if current_user.id in USER_CONTEXTS: ctx_text = USER_CONTEXTS[current_user.id]['text']
else:
last = AnalysisHistory.query.filter_by(user_id=current_user.id).order_by(AnalysisHistory.id.desc()).first()
if last: ctx_text = str(json.loads(last.json_data))
if not ctx_text: return jsonify({"answer": "No hay contexto."})
q = request.json.get('question')
res = call_gemini_api({"contents": [{"parts": [{"text": f"Pregunta: {q}\nContexto:\n{ctx_text[:30000]}"}]}]})
ans = "Error"
if "candidates" in res and len(res["candidates"]) > 0: ans = res['candidates'][0]['content']['parts'][0]['text']
return jsonify({"answer": ans})
# Rutas Descarga
@app.route('/download-brochure', methods=['POST'])
@login_required
def dbro():
try: return Response(create_brochure_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Ficha.docx'})
except: return "Error", 500
@app.route('/download-word', methods=['POST'])
@login_required
def dw():
try: return Response(create_word_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Concepto.docx'})
except: return "Error", 500
@app.route('/download-promesa', methods=['POST'])
@login_required
def dprom():
try: return Response(create_promesa_doc(request.json).read(), mimetype='application/msword', headers={'Content-Disposition': 'attachment;filename=Promesa.docx'})
except: return "Error", 500
@app.route('/download-json', methods=['POST'])
@login_required
def dj(): return Response(json.dumps(request.json), mimetype='application/json', headers={'Content-Disposition': 'attachment;filename=data.json'})
@app.route('/download-csv', methods=['POST'])
@login_required
def dcsv():
data = request.json
si = io.StringIO(); cw = csv.writer(si)
cw.writerow(['FMI', 'Municipio', 'Valor', 'Score'])
cw.writerow([", ".join(data.get('meta', {}).get('fmis_detected', [])), data.get('meta', {}).get('municipio', ''), data.get('rentabilidad', {}).get('valor_venta', ''), data.get('inmoscore', {}).get('puntaje', '')])
return Response(si.getvalue(), mimetype='text/csv', headers={'Content-Disposition': 'attachment;filename=data.csv'})
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
db.session.add(User(username='admin', password=generate_password_hash('admin', method='scrypt'), api_token='master-key'))
db.session.commit()
if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=7860)