IOI-RUN / audit_streamlit_project.py
Roudrigus's picture
Upload 82 files
0f0ef8d verified
# -*- coding: utf-8 -*-
"""
Auditor de projeto Streamlit — chaves duplicadas, estrutura e relacionamentos.
Verifica:
1) Chaves duplicadas em st.form/st.button/st.download_button.
2) Widgets sem 'key' (risco em loops).
3) Imports faltantes no app.py para módulos usados no roteamento.
4) Cobertura MODULES ↔ Roteamento (entries sem rota e rotas sem entry).
5) Arquivos de módulos inexistentes e módulos sem main().
6) Imports não usados.
7) Ciclos de importação entre arquivos .py (somente locais).
8) Emite relatório em console e JSON.
Uso:
python audit_streamlit_project.py
python audit_streamlit_project.py --root . --app app.py --modules modules_map.py --exclude venv .venv .git
Saída JSON:
.audit_report.json (na raiz especificada)
"""
import os
import re
import ast
import json
import argparse
from collections import defaultdict
# -----------------------
# Util — File discovery
# -----------------------
def find_python_files(root, exclude_dirs=None):
exclude_dirs = set(exclude_dirs or [])
for dirpath, dirnames, filenames in os.walk(root):
# filtra diretorios ignorados
dirnames[:] = [
d for d in dirnames
if os.path.join(dirpath, d) not in {os.path.join(root, ex) for ex in exclude_dirs}
and d not in exclude_dirs
]
for fn in filenames:
if fn.endswith(".py"):
yield os.path.join(dirpath, fn)
def read_text(path):
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
try:
with open(path, "r", encoding="latin-1") as f:
return f.read()
except Exception:
return ""
def parse_ast(path):
src = read_text(path)
if not src:
return None, ""
try:
tree = ast.parse(src, filename=path)
return tree, src
except Exception:
return None, src
# -----------------------
# Scan — Streamlit keys
# -----------------------
KEY_PATTERNS = {
"form_literal": re.compile(r'st\.form\(\s*\'"[\'"]'),
"button_key": re.compile(r'st\.button\([^)]*key\s*=\s*\'"[\'"]'),
"download_key": re.compile(r'st\.download_button\([^)]*key\s*=\s*\'"[\'"]'),
}
# widgets sem key (para alertar)
MISSING_KEY_PATTERNS = {
"button_no_key": re.compile(r'st\.button\((?![^)]*key\s*=)'),
"download_no_key": re.compile(r'st\.download_button\((?![^)]*key\s*=)'),
}
def scan_duplicate_and_missing_keys(file_path):
dups = defaultdict(list)
missing = defaultdict(list)
try:
with open(file_path, "r", encoding="utf-8") as f:
for i, line in enumerate(f, 1):
# dup keys
for _, pat in KEY_PATTERNS.items():
for m in pat.finditer(line):
dups[m.group(1)].append(i)
# missing key warnings
for name, pat in MISSING_KEY_PATTERNS.items():
if pat.search(line):
missing[name].append(i)
except Exception:
pass
dup_filtered = {k: v for k, v in dups.items() if len(v) > 1}
return dup_filtered, missing
# -----------------------
# AST helpers — imports
# -----------------------
def extract_imports_defs_calls(tree):
"""
Retorna:
imports: { alias_ou_nome -> modulo_base }
used_names: set de nomes referenciados
defs: set de nomes de funções definidas
calls_main: set de nomes/lvalues em chamadas *.main()
"""
imports = {} # alias -> base_module
used_names = set()
defs = set()
calls_main = set()
class V(ast.NodeVisitor):
def visit_Import(self, node):
for alias in node.names:
base = alias.name.split(".")[0]
asname = alias.asname or alias.name
asname = asname.split(".")[0]
imports[asname] = base
def visit_ImportFrom(self, node):
if node.module:
base = node.module.split(".")[0]
for alias in node.names:
asname = alias.asname or alias.name
imports[asname] = base
def visit_FunctionDef(self, node):
defs.add(node.name)
self.generic_visit(node)
def visit_Name(self, node):
used_names.add(node.id)
def visit_Attribute(self, node):
# captura padrão X.main(...)
if isinstance(node.ctx, ast.Load) and getattr(node, "attr", None) == "main":
if isinstance(node.value, ast.Name):
calls_main.add(node.value.id)
else:
# pkg.sub.main -> tenta achar o nome raiz
root = node.value
while isinstance(root, ast.Attribute):
root = root.value
if isinstance(root, ast.Name):
calls_main.add(root.id)
self.generic_visit(node)
if tree:
V().visit(tree)
return imports, used_names, defs, calls_main
# -----------------------
# modules_map.py — parse
# -----------------------
def load_modules_map(modules_map_path):
"""
Extrai:
- route_keys: chaves top-level do dict MODULES (ex.: "consulta", "operacao"...)
- internal_keys: valores do campo "key" dentro de cada entrada
"""
route_keys = set()
internal_keys = set()
src = read_text(modules_map_path)
if not src:
return route_keys, internal_keys
# chaves top-level (aproximação): linhas com " \"nome\": {"
for m in re.finditer(r'^[ \t]*"([^"]+)"\s*:\s*\{', src, re.MULTILINE):
route_keys.add(m.group(1))
# field "key": "valor"
for m in re.finditer(r'"key"\s*:\s*"([^"]+)"', src):
internal_keys.add(m.group(1))
return route_keys, internal_keys
# -----------------------
# Roteamento em app.py
# -----------------------
def extract_routing(app_src):
"""
Busca padrões:
if/elif pagina_id == "consulta":
consulta.main()
Retorna lista de tuplas: (route_key, called_module_name)
"""
routes = []
# bloco "if" inicial
m_if = re.search(
r'if\s+pagina_id\s*==\s*\'"[\'"]\s*:\s*(.*?)\n\s*(?:elif|#|$)',
app_src, re.DOTALL
)
if m_if:
route = m_if.group(1)
block = m_if.group(2)
called = None
cm = re.search(r'([A-Za-z_][A-Za-z0-9_]*)\s*\.\s*main\s*\(', block)
if cm:
called = cm.group(1)
routes.append((route, called))
# blocos "elif"
for m in re.finditer(
r'elif\s+pagina_id\s*==\s*\'"[\'"]\s*:\s*(.*?)\n\s*(?:elif|#|$)',
app_src, re.DOTALL
):
route = m.group(1)
block = m.group(2)
called = None
cm = re.search(r'([A-Za-z_][A-Za-z0-9_]*)\s*\.\s*main\s*\(', block)
if cm:
called = cm.group(1)
routes.append((route, called))
return routes
# -----------------------
# Import graph & cycles
# -----------------------
def build_local_import_graph(py_files):
"""
Monta grafo de importações locais: base_name -> { base_names importados }
"""
# mapeia base_name -> arquivo
base_to_file = {}
for f in py_files:
base = os.path.splitext(os.path.basename(f))[0]
base_to_file[base] = f
graph = defaultdict(set)
for f in py_files:
base = os.path.splitext(os.path.basename(f))[0]
tree, _ = parse_ast(f)
imports, _, _, _ = extract_imports_defs_calls(tree)
for alias, base_mod in imports.items():
# se alias ou base_mod mapeia para arquivo local, considera aresta
target = None
if alias in base_to_file:
target = alias
elif base_mod in base_to_file:
target = base_mod
if target and target != base:
graph[base].add(target)
return graph
def find_cycles(graph):
"""
Detecta ciclos no grafo (lista de ciclos) — sem mutar o dicionário durante a iteração.
"""
# Conjunto estático de nós (origens + destinos)
nodes = set(graph.keys())
for vs in graph.values():
nodes.update(vs)
visited = set()
stack = set()
cycles = []
path = []
def dfs(u):
visited.add(u)
stack.add(u)
path.append(u)
for v in graph.get(u, set()): # <- sem criar chaves novas
if v not in visited:
dfs(v)
elif v in stack:
# ciclo encontrado — extrai subpath (v até fim) + fecha em v
if v in path:
idx = len(path) - 1
while idx >= 0 and path[idx] != v:
idx -= 1
if idx >= 0:
cycle = path[idx:] + [v]
cycles.append(cycle)
stack.remove(u)
path.pop()
for node in list(nodes): # <- lista estática
if node not in visited:
dfs(node)
# Deduplicar ciclos por forma canônica (rotação mínima)
def canonical(cyc):
core = cyc[:-1] # remove a repetição final
if not core:
return tuple()
rots = [tuple(core[i:] + core[:i]) for i in range(len(core))]
return min(rots)
seen = set()
unique = []
for cyc in cycles:
can = canonical(cyc)
if can and can not in seen:
seen.add(can)
unique.append(cyc)
return unique
# -----------------------
# Unused imports (aprox)
# -----------------------
def find_unused_imports(tree, imports, used_names):
"""
Aproximação: se o alias importado não aparece em used_names -> não usado.
Não detecta usos por getattr/reflection; serve como guia inicial.
"""
unused = []
for alias in imports.keys():
if alias not in used_names:
unused.append(alias)
return unused
# -----------------------
# Auditor principal
# -----------------------
def audit(root, app_path, modules_map_path, exclude_dirs=None, output_json=".audit_report.json"):
report = {
"duplicate_keys": {}, # file -> {key: [lines]}
"widgets_without_key": {}, # file -> {pattern: [lines]}
"missing_imports_in_app": [], # [(route_key, called_module, reason)]
"routing_vs_modules": {
"routes_without_modules_entry": [], # [route_key]
"modules_entry_without_route": [], # [modules_map_key]
},
"module_files_missing": [], # [module_name]
"modules_without_main": [], # [module_name]
"unused_imports": {}, # file -> [alias]
"import_cycles": [], # [[mod_a, mod_b, ..., mod_a]]
}
# 1) varrer arquivos
py_files = list(find_python_files(root, exclude_dirs=exclude_dirs))
# mapa base_name -> file
base_to_file = {os.path.splitext(os.path.basename(f))[0]: f for f in py_files}
# 2) chaves duplicadas e widgets sem key
for f in py_files:
dups, missing = scan_duplicate_and_missing_keys(f)
if dups:
report["duplicate_keys"][f] = dups
if any(missing.values()):
report["widgets_without_key"][f] = {k: v for k, v in missing.items() if v}
# 3) carrega app.py e modules_map.py
app_full = os.path.join(root, app_path)
modules_map_full = os.path.join(root, modules_map_path)
app_tree, app_src = parse_ast(app_full)
routes = extract_routing(app_src) if app_src else []
# imports e defs do app
app_imports, app_used, app_defs, app_calls_main = extract_imports_defs_calls(app_tree)
# 4) MODULES
route_keys_in_map, internal_keys_in_map = load_modules_map(modules_map_full)
# 5) checar import para cada rota
routes_set = set()
for route_key, called_module in routes:
routes_set.add(route_key)
if not called_module:
report["missing_imports_in_app"].append((route_key, None, "Bloco da rota não chama *.main()"))
continue
# foi importado?
imported_aliases = set(app_imports.keys()) # aliases disponíveis
if called_module not in imported_aliases:
report["missing_imports_in_app"].append((route_key, called_module, "Módulo não importado no app.py"))
# arquivo existe?
if called_module not in base_to_file:
# talvez seja alias de import (base module)
base_mod = app_imports.get(called_module)
if not (base_mod and base_mod in base_to_file):
report["module_files_missing"].append(called_module)
else:
# checar main()
t, _ = parse_ast(base_to_file[called_module])
_, _, defs, _ = extract_imports_defs_calls(t)
if "main" not in defs:
report["modules_without_main"].append(called_module)
# 6) cobertura rota vs modules_map
# - rotas no app que não existem no modules_map
for r in routes_set:
if r not in route_keys_in_map and r not in internal_keys_in_map:
report["routing_vs_modules"]["routes_without_modules_entry"].append(r)
# - entries no modules_map que não têm rota no app
for m in route_keys_in_map:
if m not in routes_set:
report["routing_vs_modules"]["modules_entry_without_route"].append(m)
# 7) unused imports por arquivo
for f in py_files:
t, _ = parse_ast(f)
imp, used, defs, calls_main = extract_imports_defs_calls(t)
unused = find_unused_imports(t, imp, used)
if unused:
report["unused_imports"][f] = unused
# 8) ciclos de import local
graph = build_local_import_graph(py_files)
cycles = find_cycles(graph)
report["import_cycles"] = cycles
# 9) remover duplicidades simples nas listas
report["missing_imports_in_app"] = list(dict.fromkeys(report["missing_imports_in_app"]))
report["module_files_missing"] = sorted(set(report["module_files_missing"]))
report["modules_without_main"] = sorted(set(report["modules_without_main"]))
report["routing_vs_modules"]["routes_without_modules_entry"] = sorted(
set(report["routing_vs_modules"]["routes_without_modules_entry"]))
report["routing_vs_modules"]["modules_entry_without_route"] = sorted(
set(report["routing_vs_modules"]["modules_entry_without_route"]))
# 10) saída
print("\n=== RELATÓRIO DE AUDITORIA — Streamlit Project ===")
# chaves duplicadas
print("\n[Chaves duplicadas]")
if not report["duplicate_keys"]:
print(" ✔ Nenhuma chave duplicada literal encontrada.")
else:
for file, dups in report["duplicate_keys"].items():
print(f" - {file}")
for key, lines in dups.items():
print(f" * key='{key}' duplicada em linhas {lines}")
# widgets sem key
print("\n[Widgets sem 'key' (atenção em loops)]")
if not report["widgets_without_key"]:
print(" ✔ Nenhum potencial widget sem key encontrado.")
else:
for file, miss in report["widgets_without_key"].items():
print(f" - {file}")
for kind, lines in miss.items():
print(f" * {kind}: linhas {lines}")
# imports faltantes e módulos
print("\n[Imports faltantes no app e módulos]")
if not report["missing_imports_in_app"]:
print(" ✔ Nenhum import faltante detectado no app.py (para rotas).")
else:
for route_key, called_module, reason in report["missing_imports_in_app"]:
print(f" - rota='{route_key}' -> módulo='{called_module}' • {reason}")
if not report["module_files_missing"]:
print(" ✔ Nenhum arquivo de módulo ausente detectado.")
else:
print(" Arquivos de módulo não encontrados:", report["module_files_missing"])
if not report["modules_without_main"]:
print(" ✔ Todos os módulos localizados possuem main().")
else:
print(" Módulos sem main():", report["modules_without_main"])
# cobertura MODULES ↔ Roteamento
print("\n[Consistência: MODULES x Roteamento]")
rwm = report["routing_vs_modules"]
if not rwm["routes_without_modules_entry"]:
print(" ✔ Todas as rotas possuem entrada em modules_map.py (ou 'key' interna).")
else:
print(" Rotas sem entrada no modules_map.py:", rwm["routes_without_modules_entry"])
if not rwm["modules_entry_without_route"]:
print(" ✔ Todas as entradas do modules_map.py possuem rota no app.py.")
else:
print(" Entradas do modules_map.py sem rota no app.py:", rwm["modules_entry_without_route"])
# imports não usados
print("\n[Imports não usados (aprox.)]")
if not report["unused_imports"]:
print(" ✔ Nenhum import potencialmente não usado encontrado.")
else:
for file, unused in report["unused_imports"].items():
print(f" - {file}: {unused}")
# ciclos
print("\n[Ciclos de importação]")
if not report["import_cycles"]:
print(" ✔ Nenhum ciclo de importação detectado.")
else:
for cyc in report["import_cycles"]:
print(" - ciclo:", " -> ".join(cyc))
# salvar JSON
out_path = os.path.join(root, output_json)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"\n📄 Relatório JSON salvo em: {out_path}")
return report
# -----------------------
# CLI
# -----------------------
def cli():
p = argparse.ArgumentParser(description="Auditor de projeto Streamlit")
p.add_argument("--root", default=".", help="Raiz do projeto (default: .)")
p.add_argument("--app", default="app.py", help="Caminho do app.py (relativo à raiz)")
p.add_argument("--modules", default="modules_map.py", help="Caminho do modules_map.py (relativo à raiz)")
p.add_argument("--exclude", nargs="*", default=[".git", ".venv", "venv", "__pycache__", ".streamlit"],
help="Pastas a excluir da varredura")
p.add_argument("--json", default=".audit_report.json", help="Nome do arquivo JSON de saída")
args = p.parse_args()
audit(
root=args.root,
app_path=args.app,
modules_map_path=args.modules,
exclude_dirs=args.exclude,
output_json=args.json
)
if __name__ == "__main__":
cli()