Spaces:
Sleeping
Sleeping
| import os | |
| import csv | |
| import platform | |
| import uuid | |
| import time | |
| import threading | |
| from urllib.request import urlopen | |
| from urllib.error import URLError | |
| from datetime import datetime | |
| from postgrest import SyncPostgrestClient | |
| from dotenv import load_dotenv | |
| # Carrega .env se existir (útil localmente) | |
| load_dotenv() | |
| # --- Configuração de Versão --- | |
| VERSAO = "v1.0.HF-20260206" | |
| # --- Conexão Supabase / Postgrest --- | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_KEY") | |
| supabase = None | |
| if SUPABASE_URL and SUPABASE_KEY: | |
| try: | |
| url = f"{SUPABASE_URL.strip('/')}/rest/v1" | |
| headers = { | |
| "apikey": SUPABASE_KEY, | |
| "Authorization": f"Bearer {SUPABASE_KEY}", | |
| "Content-Type": "application/json", | |
| "Prefer": "return=minimal,resolution=merge-duplicates" | |
| } | |
| supabase = SyncPostgrestClient(url, headers=headers) | |
| print("Estado: Gerenciador de Banco de Dados pronto.") | |
| except Exception as e: | |
| print(f"Aviso: Conexão com banco de dados falhou: {e}") | |
| # --- Estrutura de Dados --- | |
| HEADERS = [ | |
| "T0_1_Device", "T0_2_Data", "T0_3_Hora", "T0_4_Pesquisador", "T0_5_Posto", "T0_6_Sentido", "T0_7_Clima", "T0_8_ID", | |
| "T1_1_UF_Orig", "T1_2_Cid_Orig", "T1_3_Loc_Orig", "T1_4_UF_Dest", "T1_5_Cid_Dest", "T1_6_Loc_Dest", "T1_7_Classe", "T1_8_Categ", "T1_9_ModoID", | |
| "T2_1_Ocupantes", "T2_2_Freq", "T2_3_DuraH", "T2_4_DuraM", "T2_5_Custo", | |
| "T3_0_Bloco", "T3_1_Cenario1", "T3_2_Cenario2", "T3_3_Cenario3", "T3_4_Cenario4", "T3_5_Cenario5", "T3_6_Cenario6", "T3_7_Cenario7", | |
| "T4_1_Acesso", "T4_2_Saida", "T4_3_Transf", "T4_4_Estac", "T4_5_AtivExtra", "T4_6_Escolaridade", | |
| "T5_1_Sexo", "T5_2_Idade", "T5_3_Renda", "T5_4_Obs", "T0_9_Versao", "T0_10_RealizaViagem", "T0_11_IP", | |
| "T3_1_TAcesso", "T3_1_TViagem", "T3_1_Custo", | |
| "T3_2_TAcesso", "T3_2_TViagem", "T3_2_Custo", | |
| "T3_3_TAcesso", "T3_3_TViagem", "T3_3_Custo", | |
| "T3_4_TAcesso", "T3_4_TViagem", "T3_4_Custo", | |
| "T3_5_TAcesso", "T3_5_TViagem", "T3_5_Custo", | |
| "T3_6_TAcesso", "T3_6_TViagem", "T3_6_Custo", | |
| "T3_7_TAcesso", "T3_7_TViagem", "T3_7_Custo", | |
| "V_Realizada", "V_Valida", "T1_Hora_Orig", "T5_Hora_Fim" | |
| ] | |
| # --- Gerenciamento de Estado (Sessão) --- | |
| def get_state(page): | |
| """Recupera o estado da sessão de forma compatível com todas as versões do Flet.""" | |
| try: | |
| # 1. Tenta recuperar usando dicionário (Padrão Flet moderno) | |
| if hasattr(page, "session") and page.session: | |
| try: | |
| if page.session.contains_key("app_state"): | |
| state = page.session["app_state"] | |
| if state: return state | |
| except: | |
| # Fallback para versões muito antigas ou comportamentos inesperados | |
| pass | |
| # 2. Atributo direto na página (Persistência na execução atual) | |
| if hasattr(page, "_app_state_f"): | |
| return page._app_state_f | |
| # 3. Inicializa se nada foi encontrado | |
| device_id = str(uuid.uuid4())[:8].upper() | |
| entrevista_id = f"{device_id}{int(time.time())}" | |
| initial_state = { | |
| "respostas": {}, | |
| "tela_atual": "setup", | |
| "setup_cache": { | |
| "T0_1_Device": device_id, | |
| "T0_2_Data": datetime.now().strftime("%Y-%m-%d"), | |
| "T0_3_Hora": datetime.now().strftime("%H:%M"), | |
| "T0_4_Pesquisador": "", | |
| "T0_5_Posto": "", | |
| "T0_6_Sentido": "Interior", | |
| "T0_7_Clima": "Sol", | |
| "T0_8_ID": entrevista_id, | |
| "T0_11_IP": obter_ip_publico() or getattr(page, "client_ip", "0.0.0.0"), | |
| } | |
| } | |
| set_state(page, initial_state) | |
| return initial_state | |
| except Exception as e: | |
| print(f"Erro ao recuperar estado: {e}") | |
| # Emergência: Retorna um objeto mínimo para não crashar | |
| return {"respostas": {"T0_8_ID": "TEMP" + str(int(time.time()))}, "tela_atual": "t1", "setup_cache": {}} | |
| def set_state(page, state): | |
| """Salva o estado da sessão de forma robusta.""" | |
| try: | |
| if hasattr(page, "session") and page.session: | |
| try: | |
| page.session["app_state"] = state | |
| except: | |
| pass | |
| page._app_state_f = state | |
| except Exception as e: | |
| print(f"Erro ao salvar estado: {e}") | |
| def finalizar_e_limpar(page): | |
| state = get_state(page) | |
| state["respostas"] = {} | |
| device_id = state["setup_cache"].get("T0_1_Device", str(uuid.uuid4())[:8].upper()) | |
| entrevista_id = f"{device_id}{int(time.time())}" | |
| state["setup_cache"]["T0_3_Hora"] = datetime.now().strftime("%H:%M") | |
| state["setup_cache"]["T0_8_ID"] = entrevista_id | |
| set_state(page, state) | |
| # --- Persistência de Dados --- | |
| def persistir(page, is_final=False): | |
| state = get_state(page) | |
| respostas = state.get("respostas", {}) | |
| setup = state.get("setup_cache", {}) | |
| # Captura o horário de término IMEDIATAMENTE (antes da thread) | |
| if is_final and not respostas.get("T5_Hora_Fim"): | |
| respostas["T5_Hora_Fim"] = datetime.now().strftime("%H:%M:%S") | |
| total_dados = {**setup, "T0_9_Versao": VERSAO} | |
| total_dados.update(respostas) | |
| if not total_dados.get("T0_8_ID"): | |
| total_dados["T0_8_ID"] = f"ERR_{int(time.time())}" | |
| linha = {h: str(total_dados.get(h, "")) for h in HEADERS} | |
| def logica(): | |
| try: | |
| print(f"Estado: Peristindo dados... ID {total_dados.get('T0_8_ID')} ({len(respostas)} campos)") | |
| if supabase: | |
| try: | |
| # Envia como lista [linha] e reforça o on_conflict | |
| res = supabase.from_("rmbs_pd").upsert([linha], on_conflict="T0_8_ID").execute() | |
| except Exception as db_err: | |
| print(f"Erro banco de dados (Supabase): {db_err}") | |
| try: | |
| nome_csv = "backup_pesquisas.csv" | |
| existe = os.path.exists(nome_csv) | |
| with open(nome_csv, "a", encoding="utf-8-sig", newline="") as f: | |
| writer = csv.DictWriter(f, fieldnames=HEADERS, extrasaction='ignore') | |
| if not existe: writer.writeheader() | |
| writer.writerow(linha) | |
| except Exception as csv_err: | |
| print(f"Erro backup CSV local: {csv_err}") | |
| except Exception as e: | |
| print(f"Erro persistência background: {e}") | |
| threading.Thread(target=logica, daemon=True).start() | |
| def obter_caminho_base(): | |
| return os.getcwd() | |
| def obter_ip_publico(): | |
| """Tenta obter o IP público real via serviço externo.""" | |
| try: | |
| # Usa o ipify (serviço gratuito e rápido) | |
| with urlopen('https://api.ipify.org', timeout=5) as response: | |
| return response.read().decode('utf-8') | |
| except (URLError, Exception) as e: | |
| print(f"Aviso: Não foi possível obter IP público real: {e}") | |
| return None | |