""" RegTech BR — Hugging Face Space ================================ Compliance analyzer for Brazilian crypto asset regulation. Uses FAISS RAG + Claude Sonnet 4.6 to produce structured risk assessments. Setup: 1. Upload faiss_index.bin, embeddings.npy, chunks_meta.jsonl to the Space root folder 2. Add ANTHROPIC_API_KEY as a Space Secret 3. Deploy """ import os import sys import json import re import time import types import html import warnings import unicodedata from pathlib import Path import numpy as np import requests # ============================================================ # Python 3.12/3.13 / Gradio / pydub compatibility fix # ============================================================ warnings.filterwarnings( "ignore", category=DeprecationWarning, message=".*audioop.*", ) try: import audioop as _audioop sys.modules.setdefault("pyaudioop", _audioop) except Exception: dummy_audioop = types.ModuleType("audioop") sys.modules.setdefault("audioop", dummy_audioop) sys.modules.setdefault("pyaudioop", dummy_audioop) import gradio as gr # ============================================================ # Paths # ============================================================ # Seus arquivos estão na raiz do Space: # chunks_meta.jsonl # embeddings.npy # faiss_index.bin INDEX_DIR = Path(".") print("Loading RegTech BR index...", flush=True) print(f"Current working directory: {Path.cwd()}", flush=True) print(f"Index directory: {INDEX_DIR.resolve()}", flush=True) required_files = [ INDEX_DIR / "chunks_meta.jsonl", INDEX_DIR / "embeddings.npy", INDEX_DIR / "faiss_index.bin", ] missing_files = [str(p) for p in required_files if not p.exists()] if missing_files: print("Files currently available in Space root:", flush=True) for p in sorted(Path(".").iterdir()): if p.is_file(): print(f" - {p.name} | {p.stat().st_size / 1024:.2f} KB", flush=True) raise FileNotFoundError( "Missing required index files. Upload these files to the Space root folder:\n" + "\n".join(missing_files) ) import faiss from sentence_transformers import SentenceTransformer CHUNKS: list[dict] = [] print("Loading chunks_meta.jsonl...", flush=True) with open(INDEX_DIR / "chunks_meta.jsonl", encoding="utf-8") as f: for line in f: line = line.strip() if line: CHUNKS.append(json.loads(line)) print(f"Loaded chunks: {len(CHUNKS)}", flush=True) print("Loading embeddings.npy...", flush=True) EMBEDDINGS = np.load(INDEX_DIR / "embeddings.npy") print(f"Embeddings shape: {EMBEDDINGS.shape}", flush=True) print("Loading faiss_index.bin...", flush=True) INDEX = faiss.read_index(str(INDEX_DIR / "faiss_index.bin")) print(f"FAISS vectors: {INDEX.ntotal}", flush=True) print("Loading sentence-transformer model...", flush=True) MODEL = SentenceTransformer("paraphrase-multilingual-MiniLM-L12-v2") print(f"Index ready — {len(CHUNKS)} chunks, {INDEX.ntotal} vectors", flush=True) # ============================================================ # Normalization and routing # ============================================================ def normalize(text: str) -> str: text = unicodedata.normalize("NFD", text or "") text = "".join(c for c in text if unicodedata.category(c) != "Mn") text = text.lower() text = re.sub(r"[^a-z0-9]+", " ", text) return re.sub(r"\s+", " ", text).strip() AUTHORITY_KW = { "BCB": [ "banco central", "bcb", "bacen", "psav", "psaav", "prestadora de servicos de ativos virtuais", "prestadores de servicos de ativos virtuais", "ativo virtual", "ativos virtuais", "criptoativo", "criptoativos", "autorizacao", "autorizar", "autorizada", "cambio", "capital internacional", "mercado de cambio", "segregacao patrimonial", "segregacao", "patrimonial", "ativos dos clientes", "ativos de clientes", "ativos de titularidade dos clientes", "circular", "resolucao bcb", "instrucao normativa bcb", ], "CVM": [ "cvm", "comissao de valores", "valores mobiliarios", "valor mobiliario", "token", "tokens", "security token", "oferta publica", "oferta publica de distribuicao", "investidor", "dividendos", "receita da plataforma", "direito de voto", "captar", "captacao", "rwa", "parecer", "parecer de orientacao", ], "COAF": [ "coaf", "pessoa exposta politicamente", "pessoas expostas politicamente", "pep", "politically exposed", "kyc", "conheca seu cliente", "identificacao do cliente", "cliente anonimo", "anonimo", "anonima", "prevencao", "lavagem", "terrorismo", "pld", "ftp", "pld ftp", "aml", "cft", ], } SOURCE_KW = { "lei_14478": [ "lei 14 478", "lei 14478", "lei n 14 478", "marco legal", "lei dos criptoativos", ], "decreto_11563": [ "decreto 11 563", "decreto 11563", "decreto n 11 563", ], "bcb_circular_3978": [ "circular 3978", "circular n 3978", "pld", "ftp", "lavagem", "terrorismo", "kyc", "identificacao do cliente", "anonimo", "anonima", ], "bcb_in701": [ "instrucao normativa 701", "instrucao normativa bcb 701", "certificacao tecnica", "segregacao", "segregacao patrimonial", "ativos dos clientes", "ativos virtuais de titularidade dos clientes", ], "bcb_res548": [ "resolucao 548", "resolucao bcb 548", "autorizacao psav", "autorizacao", "psav", ], "cvm": [ "cvm", "valores mobiliarios", "valor mobiliario", "token", "tokens", "oferta publica", "dividendos", "direito de voto", ], "coaf": [ "coaf", "pep", "pessoa exposta politicamente", "pessoas expostas politicamente", ], } def detect_route(query: str) -> dict: q = normalize(query) route = { "authority_filters": [], "source_id_contains": [], "query_expansion": [], } if any(normalize(k) in q for k in SOURCE_KW["lei_14478"]): route["source_id_contains"].append("lei_14478") route["query_expansion"].extend([ "Lei 14.478 de 2022", "marco legal dos criptoativos", "prestadora de serviços de ativos virtuais", ]) if any(normalize(k) in q for k in SOURCE_KW["decreto_11563"]): route["source_id_contains"].append("decreto_11563") route["query_expansion"].extend([ "Decreto 11.563 de 2023", "Banco Central do Brasil", "prestadoras de serviços de ativos virtuais", ]) hits = { authority: sum(1 for k in kws if normalize(k) in q) for authority, kws in AUTHORITY_KW.items() } max_hits = max(hits.values()) if hits else 0 if max_hits: route["authority_filters"] = [ authority for authority, h in hits.items() if h == max_hits or h >= 2 ] for key, kws in SOURCE_KW.items(): if any(normalize(k) in q for k in kws): route["source_id_contains"].append(key) if any(term in q for term in ["segregacao", "patrimonial", "ativos dos clientes"]): route["authority_filters"].append("BCB") route["source_id_contains"].extend(["bcb_in701", "bcb_res548"]) route["query_expansion"].extend([ "segregação patrimonial de ativos virtuais", "ativos de titularidade da instituição", "ativos de titularidade de clientes e usuários", "prestadora de serviços de ativos virtuais", ]) if any(term in q for term in ["sem autorizacao", "autorizacao", "banco central", "psav"]): route["authority_filters"].append("BCB") route["source_id_contains"].extend(["bcb_res548", "bcb_in701", "lei_14478", "decreto_11563"]) route["query_expansion"].extend([ "autorização para prestadora de serviços de ativos virtuais", "Banco Central do Brasil", "PSAV", "Resolução BCB 548", ]) if any(term in q for term in ["kyc", "anonimo", "anonima", "identificacao", "lavagem"]): route["authority_filters"].extend(["BCB", "COAF"]) route["source_id_contains"].extend(["bcb_circular_3978", "coaf_res036"]) route["query_expansion"].extend([ "identificação e qualificação de clientes", "cliente anônimo", "prevenção à lavagem de dinheiro", "financiamento do terrorismo", ]) for key in ["authority_filters", "source_id_contains", "query_expansion"]: route[key] = list(dict.fromkeys(route[key])) return route def route_boost(chunk: dict, route: dict) -> float: boost = 0.0 source_id = normalize(str(chunk.get("source_id", ""))) authority = chunk.get("authority") for token in route.get("source_id_contains", []): token_norm = normalize(str(token)) if token_norm and token_norm in source_id: boost += 0.35 if authority in route.get("authority_filters", []): boost += 0.08 return min(boost, 0.55) def lexical_boost(query_norm: str, chunk: dict) -> float: tags = chunk.get("tags", []) if not isinstance(tags, list): tags = [str(tags)] parts = " ".join([ str(chunk.get("source_id", "")), str(chunk.get("source_label", "")), str(chunk.get("authority", "")), " ".join(str(t) for t in tags), str(chunk.get("text", "")), ]) chunk_norm = normalize(parts) terms = [t for t in query_norm.split() if len(t) >= 4] if not terms: return 0.0 unique_terms = list(dict.fromkeys(terms)) hits = sum(1 for t in unique_terms if t in chunk_norm) return min(0.12, hits / max(6, len(unique_terms)) * 0.12) def retrieve(query: str, top_k: int = 6) -> list[dict]: route = detect_route(query) expanded = query + "\n" + "\n".join(route.get("query_expansion", [])) q_vec = MODEL.encode( [expanded], normalize_embeddings=True, convert_to_numpy=True, ).astype(np.float32) k_search = min(len(CHUNKS), max(top_k * 20, 30)) scores, indices = INDEX.search(q_vec, k_search) query_norm = normalize(expanded) ranked = [] for score, idx in zip(scores[0], indices[0]): if idx < 0: continue semantic_score = float(score) if semantic_score < 0.20: continue chunk = CHUNKS[int(idx)].copy() final_score = semantic_score + lexical_boost(query_norm, chunk) + route_boost(chunk, route) chunk["_score"] = semantic_score chunk["_final"] = final_score ranked.append(chunk) ranked.sort(key=lambda r: float(r.get("_final", 0.0)), reverse=True) seen = set() unique = [] for r in ranked: raw_cid = r.get("chunk_id", r.get("source_id", "unknown")) try: cid = json.dumps(raw_cid, sort_keys=True, ensure_ascii=False, default=str) except Exception: cid = str(raw_cid) if cid not in seen: seen.add(cid) unique.append(r) return unique[:top_k] def format_context(results: list[dict]) -> str: lines = [] for i, r in enumerate(results, 1): article = f" — {r['article_hint']}" if r.get("article_hint") else "" norm = f" [{r['normative_reference_hint']}]" if r.get("normative_reference_hint") else "" lines.append( f"[SOURCE {i}] {r.get('source_label', '')}{article}{norm}\n" f"Source ID: {r.get('source_id', '?')} | " f"Authority: {r.get('authority', '?')} | " f"Score: {float(r.get('_final', 0.0)):.3f}\n" f"{str(r.get('text', ''))[:700]}..." ) return "\n\n---\n\n".join(lines) # ============================================================ # Claude API # ============================================================ SYSTEM = """You are RegTech BR, a specialist AI in Brazilian crypto asset regulation. Analyze the compliance query and produce a structured JSON assessment. Respond ONLY with valid JSON — no markdown fences. Schema: { "risk_level": "LOW | MEDIUM | HIGH | UNCLEAR", "compliance_status": "COMPLIANT | NON-COMPLIANT | REQUIRES_REVIEW | INSUFFICIENT_INFO", "applicable_regulations": ["list"], "relevant_articles": ["list"], "finding": "2-5 sentence assessment", "corrective_action": "specific steps or 'No action required'", "confidence": "HIGH | MEDIUM | LOW", "authority": "BCB | CVM | COAF | mixed | federal" } Rules: - If the query describes operating without required authorization, flag high risk. - If the query describes weak KYC or anonymous transactions, flag high risk. - If the query describes no segregation of client assets, flag high risk. - If the query describes tokens with dividends, voting rights, or public fundraising, flag CVM securities risk. - Base the answer strictly on the retrieved regulatory context. """ def extract_json_object(raw: str) -> str: raw = (raw or "").strip() raw = re.sub(r"^```(?:json)?", "", raw, flags=re.IGNORECASE).strip() raw = re.sub(r"```$", "", raw).strip() if raw.startswith("{") and raw.endswith("}"): return raw start = raw.find("{") end = raw.rfind("}") if start >= 0 and end > start: return raw[start:end + 1] return raw def call_claude(query: str, context: str) -> dict | None: api_key = os.environ.get("ANTHROPIC_API_KEY", "") if not api_key: print("Missing ANTHROPIC_API_KEY.", flush=True) return None prompt = ( f"COMPLIANCE QUERY:\n{query}\n\n" f"REGULATORY CONTEXT:\n\n{context}\n\n" f"Produce a structured compliance assessment." ) try: response = requests.post( "https://api.anthropic.com/v1/messages", headers={ "Content-Type": "application/json", "x-api-key": api_key, "anthropic-version": "2023-06-01", }, json={ "model": "claude-sonnet-4-6", "max_tokens": 1200, "system": SYSTEM, "messages": [ { "role": "user", "content": prompt, } ], }, timeout=90, ) response.raise_for_status() raw = "".join( block.get("text", "") for block in response.json().get("content", []) if block.get("type") == "text" ) clean = extract_json_object(raw) parsed = json.loads(clean) return parsed except Exception as exc: print(f"Claude error: {type(exc).__name__}: {exc}", flush=True) return None # ============================================================ # HTML rendering # ============================================================ RISK_COLOR = { "HIGH": "#dc2626", "MEDIUM": "#d97706", "LOW": "#16a34a", "UNCLEAR": "#6b7280", } STATUS_ICON = { "NON-COMPLIANT": "⛔", "COMPLIANT": "✅", "REQUIRES_REVIEW": "⚠️", "INSUFFICIENT_INFO": "❓", } def as_list(value) -> list[str]: if value is None: return [] if isinstance(value, list): return [str(v) for v in value] return [str(value)] def esc(value) -> str: return html.escape("" if value is None else str(value)) def render_report(report: dict, query: str, results: list[dict]) -> str: risk = str(report.get("risk_level", "UNCLEAR")).upper() status = str(report.get("compliance_status", "INSUFFICIENT_INFO")).upper() confidence = str(report.get("confidence", "LOW")).upper() authority = str(report.get("authority", "?")) color = RISK_COLOR.get(risk, "#6b7280") icon = STATUS_ICON.get(status, "❓") regs = "".join( f"
  • {esc(item)}
  • " for item in as_list(report.get("applicable_regulations", [])) ) arts = "".join( f"
  • {esc(item)}
  • " for item in as_list(report.get("relevant_articles", [])) ) srcs = "".join( f'' f'{esc(r.get("source_id", ""))}' for r in results ) finding = esc(report.get("finding", "—")) corrective_action = esc(report.get("corrective_action", "—")) return f"""
    {esc(risk)} RISK {icon} {esc(status)} confidence: {esc(confidence)} · authority: {esc(authority)}
    Finding
    {finding}
    Corrective Action
    {corrective_action}
    Applicable Regulations
      {regs}
    Relevant Articles
      {arts}
    Sources retrieved
    {srcs}
    """ def render_error(msg: str) -> str: return f"""
    ⚠️ {esc(msg)}
    """ # ============================================================ # Gradio app # ============================================================ EXAMPLES = [ "Nossa plataforma permite compra e venda de criptoativos sem autorização formal do Banco Central.", "Realizamos KYC apenas para transações acima de R$100.000. Transações menores são anônimas.", "Nosso token REV distribui dividendos e dá direito de voto. Será ofertado publicamente por R$30M.", "Nossa exchange não realiza segregação patrimonial dos ativos dos clientes.", "Não possuímos diretor responsável formalmente designado para PLD/FTP.", "Qual é o marco legal dos criptoativos no Brasil segundo a Lei 14.478 de 2022?", "Qual órgão foi designado pelo Decreto 11.563/2023 para regular as PSAVs?", ] def analyze(query: str) -> tuple[str, str]: if not query or not query.strip(): return render_error("Please enter a compliance query."), "" query = query.strip() results = retrieve(query) if not results: return render_error("No relevant regulatory chunks found. Try rephrasing your query."), "" context = format_context(results) report = call_claude(query, context) if not report: return render_error( "Could not reach Claude API. Check that ANTHROPIC_API_KEY is set as a Space Secret." ), context return render_report(report, query, results), context CSS = """ @import url('https://fonts.googleapis.com/css2?family=IBM+Plex+Mono:wght@400;500;700&family=IBM+Plex+Sans:wght@300;400;600&display=swap'); body { background: #0a0f1e !important; } .gradio-container { background: #0a0f1e !important; font-family: 'IBM Plex Sans', sans-serif !important; max-width: 900px !important; margin: 0 auto !important; } #header { text-align: center; padding: 2rem 0 1rem; border-bottom: 1px solid #1e3a5f; margin-bottom: 1.5rem; } #header h1 { font-family: 'IBM Plex Mono', monospace; font-size: 1.8rem; color: #38bdf8; letter-spacing: -0.02em; margin: 0 0 0.3rem; } #header p { color: #64748b; font-size: 0.85rem; margin: 0; } .query-box textarea { background: #0f172a !important; border: 1px solid #1e3a5f !important; color: #e2e8f0 !important; font-family: 'IBM Plex Mono', monospace !important; font-size: 0.9rem !important; border-radius: 8px !important; } .query-box textarea:focus { border-color: #38bdf8 !important; box-shadow: 0 0 0 2px rgba(56,189,248,0.15) !important; } .analyze-btn { background: #0369a1 !important; color: #fff !important; font-family: 'IBM Plex Mono', monospace !important; font-weight: 600 !important; letter-spacing: 0.05em !important; border: none !important; border-radius: 8px !important; height: 44px !important; font-size: 0.85rem !important; transition: background 0.2s !important; } .analyze-btn:hover { background: #0284c7 !important; } .context-box textarea { background: #0f172a !important; border: 1px solid #1e293b !important; color: #64748b !important; font-family: 'IBM Plex Mono', monospace !important; font-size: 0.75rem !important; } label { color: #94a3b8 !important; font-size: 0.8rem !important; } """ with gr.Blocks(css=CSS, title="RegTech BR") as demo: gr.HTML(""" """) with gr.Row(): with gr.Column(scale=4): query_box = gr.Textbox( label="Compliance query or document excerpt", placeholder="Describe your policy, product, or compliance question in Portuguese or English...", lines=4, elem_classes=["query-box"], ) with gr.Column(scale=1, min_width=120): analyze_btn = gr.Button("Analyze →", elem_classes=["analyze-btn"]) example_dropdown = gr.Dropdown( label="Example queries", choices=EXAMPLES, value=None, interactive=True, ) use_example_btn = gr.Button( "Use selected example", elem_classes=["analyze-btn"], ) report_html = gr.HTML(label="Compliance Assessment") with gr.Accordion("Retrieved regulatory context", open=False): context_box = gr.Textbox( label="Raw chunks retrieved by RAG", lines=10, interactive=False, elem_classes=["context-box"], ) gr.HTML("""
    ⚠ Experimental pipeline. Not legal advice. Results require review by qualified professionals.
    RegTech BR · Fernando Rodrigues · Kaggle: fernandosr85
    """) use_example_btn.click( fn=lambda example: example or "", inputs=[example_dropdown], outputs=[query_box], ) analyze_btn.click( fn=analyze, inputs=[query_box], outputs=[report_html, context_box], ) query_box.submit( fn=analyze, inputs=[query_box], outputs=[report_html, context_box], ) if __name__ == "__main__": port = int(os.environ.get("PORT", 7860)) demo.queue().launch( server_name="0.0.0.0", server_port=port, share=True, show_api=False, )