from __future__ import annotations import hashlib import re from datetime import datetime, timezone from decimal import Decimal from pathlib import Path from typing import Optional from xml.etree import ElementTree as ET from src.fiscal.entities import Empresa, ItemNotaFiscal, NotaFiscal NS_NFE = "http://www.portalfiscal.inf.br/nfe" VERSAO_NFE = "4.00" def _sub(pai: ET.Element, tag: str, texto: str = "") -> ET.Element: el = ET.SubElement(pai, tag) if texto: el.text = str(texto) return el def _fmt_dec(v: Decimal, casas: int = 2) -> str: return f"{v:.{casas}f}" def _limpar(s: str) -> str: return re.sub(r"[^\w\s\-.,/@]", "", str(s))[:60] class GeradorNFeXML: def __init__(self, nf: NotaFiscal, ambiente: str = "2"): self.nf = nf self.ambiente = ambiente # 1=Produção, 2=Homologação self._cuf_map = { "AC": "12", "AL": "27", "AP": "16", "AM": "13", "BA": "29", "CE": "23", "DF": "53", "ES": "32", "GO": "52", "MA": "21", "MT": "51", "MS": "50", "MG": "31", "PA": "15", "PB": "25", "PR": "41", "PE": "26", "PI": "22", "RJ": "33", "RN": "24", "RS": "43", "RO": "11", "RR": "14", "SC": "42", "SP": "35", "SE": "28", "TO": "17", } def _cuf(self, uf: str) -> str: return self._cuf_map.get(uf.upper(), "35") def _gerar_chave(self) -> str: """Gera a chave de acesso de 44 dígitos da NF-e.""" nf = self.nf uf = self._cuf(nf.emitente.endereco.uf.value) aamm = nf.data_emissao.strftime("%y%m") cnpj = nf.emitente.cnpj.zfill(14) mod = nf.modelo.zfill(2) serie = nf.serie.zfill(3) numero = nf.numero.zfill(9) tp_emis = "1" # Emissão normal c_nf = hashlib.md5(f"{cnpj}{numero}".encode()).hexdigest()[:8].upper() c_nf_digits = "".join(str(int(c, 16) % 10) for c in c_nf)[:8] chave_sem_dv = f"{uf}{aamm}{cnpj}{mod}{serie}{numero}{tp_emis}{c_nf_digits}" pesos = [2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4] soma = sum(int(chave_sem_dv[i]) * pesos[i] for i in range(43)) resto = soma % 11 dv = 0 if resto in (0, 1) else 11 - resto return chave_sem_dv + str(dv) def _ide(self, root_nfe: ET.Element, chave: str) -> None: nf = self.nf ide = _sub(root_nfe, "ide") _sub(ide, "cUF", self._cuf(nf.emitente.endereco.uf.value)) _sub(ide, "cNF", chave[35:43]) _sub(ide, "natOp", _limpar(nf.natureza_operacao)) _sub(ide, "mod", nf.modelo) _sub(ide, "serie", nf.serie.zfill(3)) _sub(ide, "nNF", nf.numero.zfill(9)) _sub(ide, "dhEmi", datetime.combine(nf.data_emissao, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00") _sub(ide, "dhSaiEnt", datetime.combine(nf.data_saida_entrada, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00") _sub(ide, "tpNF", nf.tipo_operacao) _sub(ide, "idDest", "1") # 1=Operação interna _sub(ide, "cMunFG", nf.emitente.endereco.cod_municipio or "3550308") _sub(ide, "tpImp", "1") # 1=DANFE normal retrato _sub(ide, "tpEmis", "1") # 1=Emissão normal _sub(ide, "cDV", chave[-1]) _sub(ide, "tpAmb", self.ambiente) _sub(ide, "finNFe", "1") # 1=NF-e normal _sub(ide, "indFinal", "0") # 0=Normal _sub(ide, "indPres", nf.ind_pagamento) _sub(ide, "procEmi", "0") # 0=Emissão de NF-e com aplicativo do contribuinte _sub(ide, "verProc", "1.0.0") def _endereco(self, pai: ET.Element, emp: Empresa, prefixo: str = "") -> None: end = emp.endereco ender = _sub(pai, f"ender{prefixo}") _sub(ender, "xLgr", _limpar(end.logradouro)) _sub(ender, "nro", end.numero) if end.complemento: _sub(ender, "xCpl", _limpar(end.complemento)) _sub(ender, "xBairro", _limpar(end.bairro)) _sub(ender, "cMun", end.cod_municipio or "3550308") _sub(ender, "xMun", _limpar(end.municipio)) _sub(ender, "UF", end.uf.value) _sub(ender, "CEP", re.sub(r"\D", "", end.cep)) _sub(ender, "cPais", "1058") _sub(ender, "xPais", "BRASIL") if emp.contato.telefone: _sub(ender, "fone", re.sub(r"\D", "", emp.contato.telefone)) def _emit(self, root_nfe: ET.Element) -> None: nf = self.nf emit = _sub(root_nfe, "emit") _sub(emit, "CNPJ", nf.emitente.cnpj) _sub(emit, "xNome", _limpar(nf.emitente.razao_social)[:60]) if nf.emitente.nome_fantasia: _sub(emit, "xFant", _limpar(nf.emitente.nome_fantasia)[:60]) self._endereco(emit, nf.emitente, "Emit") if nf.emitente.ie: _sub(emit, "IE", re.sub(r"\D", "", nf.emitente.ie)) _sub(emit, "CRT", nf.emitente.regime_tributario.value[:1]) def _dest(self, root_nfe: ET.Element) -> None: nf = self.nf dest = _sub(root_nfe, "dest") _sub(dest, "CNPJ", nf.destinatario.cnpj) _sub(dest, "xNome", _limpar(nf.destinatario.razao_social)[:60]) self._endereco(dest, nf.destinatario, "Dest") if nf.destinatario.ie: _sub(dest, "IE", re.sub(r"\D", "", nf.destinatario.ie)) _sub(dest, "indIEDest", "1") # 1=Contribuinte ICMS if nf.destinatario.contato.email: _sub(dest, "email", nf.destinatario.contato.email[:60]) def _det(self, root_nfe: ET.Element) -> None: nf = self.nf for item in nf.itens: det = _sub(root_nfe, "det") det.set("nItem", str(item.numero_item)) prod = _sub(det, "prod") _sub(prod, "cProd", item.produto.codigo[:60]) _sub(prod, "cEAN", item.produto.cod_barras or "SEM GTIN") _sub(prod, "xProd", _limpar(item.produto.descricao)[:120]) _sub(prod, "NCM", re.sub(r"\D", "", item.produto.ncm)[:8]) if item.produto.cest: _sub(prod, "CEST", re.sub(r"\D", "", item.produto.cest)) _sub(prod, "CFOP", item.cfop) _sub(prod, "uCom", item.produto.unidade[:6]) _sub(prod, "qCom", _fmt_dec(item.quantidade, 4)) _sub(prod, "vUnCom", _fmt_dec(item.valor_unitario, 10)) _sub(prod, "vProd", _fmt_dec(item.valor_produtos)) _sub(prod, "cEANTrib", item.produto.cod_barras or "SEM GTIN") _sub(prod, "uTrib", item.produto.unidade[:6]) _sub(prod, "qTrib", _fmt_dec(item.quantidade, 4)) _sub(prod, "vUnTrib", _fmt_dec(item.valor_unitario, 10)) if item.frete > 0: _sub(prod, "vFrete", _fmt_dec(item.frete)) if item.seguro > 0: _sub(prod, "vSeg", _fmt_dec(item.seguro)) if item.desconto > 0: _sub(prod, "vDesc", _fmt_dec(item.desconto)) if item.outras_despesas > 0: _sub(prod, "vOutro", _fmt_dec(item.outras_despesas)) _sub(prod, "indTot", "1") # 1=Valor do item compõe o total imposto = _sub(det, "imposto") self._icms_item(imposto, item) self._ipi_item(imposto, item) self._pis_item(imposto, item) self._cofins_item(imposto, item) def _icms_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: icms = _sub(imposto, "ICMS") cst = item.produto.cst_icms if cst in {"000"}: grupo = _sub(icms, "ICMS00") _sub(grupo, "orig", "0") _sub(grupo, "CST", cst) _sub(grupo, "modBC", "3") _sub(grupo, "vBC", _fmt_dec(item.base_icms)) _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) elif cst in {"020"}: grupo = _sub(icms, "ICMS20") _sub(grupo, "orig", "0") _sub(grupo, "CST", cst) _sub(grupo, "modBC", "3") _sub(grupo, "pRedBC", _fmt_dec(Decimal("33.33"), 4)) _sub(grupo, "vBC", _fmt_dec(item.base_icms)) _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) elif cst in {"040", "041", "050"}: grupo = _sub(icms, f"ICMS{cst}") _sub(grupo, "orig", "0") _sub(grupo, "CST", cst) elif cst.startswith("1"): # Simples Nacional grupo = _sub(icms, "ICMSSN102") _sub(grupo, "orig", "0") _sub(grupo, "CSOSN", cst) else: grupo = _sub(icms, "ICMS90") _sub(grupo, "orig", "0") _sub(grupo, "CST", "090") _sub(grupo, "modBC", "3") _sub(grupo, "vBC", _fmt_dec(item.base_icms)) _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) def _ipi_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: if item.produto.aliq_ipi == 0 and item.produto.cst_ipi in {"52", "53", "54"}: ipi = _sub(imposto, "IPI") _sub(ipi, "cEnq", "999") ipi_trib = _sub(ipi, "IPITrib") _sub(ipi_trib, "CST", item.produto.cst_ipi) _sub(ipi_trib, "vBC", _fmt_dec(Decimal("0"))) _sub(ipi_trib, "pIPI", _fmt_dec(Decimal("0"), 4)) _sub(ipi_trib, "vIPI", _fmt_dec(Decimal("0"))) return ipi = _sub(imposto, "IPI") _sub(ipi, "cEnq", "999") ipi_trib = _sub(ipi, "IPITrib") _sub(ipi_trib, "CST", item.produto.cst_ipi) _sub(ipi_trib, "vBC", _fmt_dec(item.base_ipi)) _sub(ipi_trib, "pIPI", _fmt_dec(item.produto.aliq_ipi, 4)) _sub(ipi_trib, "vIPI", _fmt_dec(item.valor_ipi)) def _pis_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: pis = _sub(imposto, "PIS") cst = item.produto.cst_pis if cst in {"07", "08", "09"}: pisnt = _sub(pis, "PISNT") _sub(pisnt, "CST", cst) else: pisaliq = _sub(pis, "PISAliq") _sub(pisaliq, "CST", cst) _sub(pisaliq, "vBC", _fmt_dec(item.valor_produtos)) _sub(pisaliq, "pPIS", _fmt_dec(Decimal("0.65"), 4)) _sub(pisaliq, "vPIS", _fmt_dec(Decimal("0"))) def _cofins_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: cofins = _sub(imposto, "COFINS") cst = item.produto.cst_cofins if cst in {"07", "08", "09"}: cofinsnt = _sub(cofins, "COFINSNT") _sub(cofinsnt, "CST", cst) else: cofinsaliq = _sub(cofins, "COFINSAliq") _sub(cofinsaliq, "CST", cst) _sub(cofinsaliq, "vBC", _fmt_dec(item.valor_produtos)) _sub(cofinsaliq, "pCOFINS", _fmt_dec(Decimal("3.00"), 4)) _sub(cofinsaliq, "vCOFINS", _fmt_dec(Decimal("0"))) def _total(self, root_nfe: ET.Element) -> None: nf = self.nf total = _sub(root_nfe, "total") ict = _sub(total, "ICMSTot") _sub(ict, "vBC", _fmt_dec(sum(i.base_icms for i in nf.itens))) _sub(ict, "vICMS", _fmt_dec(nf.valor_icms)) _sub(ict, "vICMSDeson", "0.00") _sub(ict, "vFCPUFDest", "0.00") _sub(ict, "vICMSUFDest", "0.00") _sub(ict, "vICMSUFRemet", "0.00") _sub(ict, "vFCP", "0.00") _sub(ict, "vBCST", "0.00") _sub(ict, "vST", "0.00") _sub(ict, "vFCPST", "0.00") _sub(ict, "vFCPSTRet", "0.00") _sub(ict, "vProd", _fmt_dec(nf.valor_produtos)) _sub(ict, "vFrete", _fmt_dec(nf.valor_frete)) _sub(ict, "vSeg", _fmt_dec(nf.valor_seguro)) _sub(ict, "vDesc", _fmt_dec(nf.valor_desconto)) _sub(ict, "vII", "0.00") _sub(ict, "vIPI", _fmt_dec(nf.valor_ipi)) _sub(ict, "vIPIDevol", "0.00") _sub(ict, "vPIS", "0.00") _sub(ict, "vCOFINS", "0.00") _sub(ict, "vOutro", _fmt_dec(nf.valor_outras_despesas)) _sub(ict, "vNF", _fmt_dec(nf.valor_total)) _sub(ict, "vTotTrib", "0.00") def _transp(self, root_nfe: ET.Element) -> None: transp = _sub(root_nfe, "transp") _sub(transp, "modFrete", "9") # 9=Sem frete def _pag(self, root_nfe: ET.Element) -> None: pag = _sub(root_nfe, "pag") detPag = _sub(pag, "detPag") _sub(detPag, "tPag", "01") # 01=Dinheiro _sub(detPag, "vPag", _fmt_dec(self.nf.valor_total)) def _inf_adic(self, root_nfe: ET.Element) -> None: nf = self.nf if nf.info_complementar: inf = _sub(root_nfe, "infAdic") _sub(inf, "infCpl", _limpar(nf.info_complementar)[:500]) def gerar_xml(self) -> str: chave = self.nf.chave_acesso or self._gerar_chave() nfeProc = ET.Element("nfeProc") nfeProc.set("xmlns", NS_NFE) nfeProc.set("versao", VERSAO_NFE) nfe = _sub(nfeProc, "NFe") nfe.set("xmlns", NS_NFE) infNFe = _sub(nfe, "infNFe") infNFe.set("versao", VERSAO_NFE) infNFe.set("Id", f"NFe{chave}") self._ide(infNFe, chave) self._emit(infNFe) self._dest(infNFe) self._det(infNFe) self._total(infNFe) self._transp(infNFe) self._pag(infNFe) self._inf_adic(infNFe) ET.indent(nfeProc, space=" ") return '\n' + ET.tostring(nfeProc, encoding="unicode") def salvar(self, diretorio: str | Path = ".") -> Path: chave = self.nf.chave_acesso or self._gerar_chave() caminho = Path(diretorio) / f"NFe{chave}.xml" caminho.parent.mkdir(parents=True, exist_ok=True) caminho.write_text(self.gerar_xml(), encoding="utf-8") return caminho class AssinadorNFe: """ Assina digitalmente o XML da NF-e com certificado ICP-Brasil (A1 ou A3). Requer: cryptography + lxml (ou signxml) O certificado deve ser um PKCS#12 (.pfx ou .p12) válido emitido por uma Autoridade Certificadora credenciada pela ICP-Brasil. """ def __init__(self, cert_path: str, cert_senha: str): self.cert_path = cert_path self.cert_senha = cert_senha self._privkey = None self._cert = None self._carregar_certificado() def _carregar_certificado(self) -> None: try: from cryptography.hazmat.primitives.serialization import pkcs12 dados = Path(self.cert_path).read_bytes() priv, cert, _ = pkcs12.load_key_and_certificates( dados, self.cert_senha.encode() ) self._privkey = priv self._cert = cert except ImportError: raise RuntimeError("Instale 'cryptography': pip install cryptography") except Exception as e: raise RuntimeError(f"Erro ao carregar certificado: {e}") def assinar(self, xml_str: str) -> str: """ Assina o XML da NF-e conforme o padrão XMLDSig exigido pela SEFAZ. Retorna o XML assinado como string. """ try: from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding import base64 from lxml import etree doc = etree.fromstring(xml_str.encode("utf-8")) ns = {"nfe": NS_NFE} inf_nfe = doc.find(".//nfe:infNFe", ns) if inf_nfe is None: raise ValueError("Elemento infNFe não encontrado no XML") ref_id = inf_nfe.get("Id", "") inf_bytes = etree.tostring(inf_nfe, method="c14n", exclusive=True) digest = hashes.Hash(hashes.SHA1()) digest.update(inf_bytes) digest_value = base64.b64encode(digest.finalize()).decode() signed_info_xml = ( '' '' '' f'' "" '' '' "" '' f"{digest_value}" "" "" ) si_doc = etree.fromstring(signed_info_xml.encode()) si_bytes = etree.tostring(si_doc, method="c14n", exclusive=True) sig_bytes = self._privkey.sign(si_bytes, padding.PKCS1v15(), hashes.SHA1()) sig_value = base64.b64encode(sig_bytes).decode() cert_der = self._cert.public_bytes(serialization.Encoding.DER) cert_b64 = base64.b64encode(cert_der).decode() signature_xml = ( '' + signed_info_xml + f"{sig_value}" "" f"{cert_b64}" "" "" ) sig_el = etree.fromstring(signature_xml.encode()) inf_nfe.append(sig_el) return etree.tostring(doc, xml_declaration=True, encoding="unicode") except ImportError: raise RuntimeError("Instale 'lxml' e 'cryptography': pip install lxml cryptography") def assinar_arquivo(self, xml_path: Path, destino: Optional[Path] = None) -> Path: xml_str = xml_path.read_text(encoding="utf-8") assinado = self.assinar(xml_str) saida = destino or xml_path.with_suffix(".assinado.xml") saida.write_text(assinado, encoding="utf-8") return saida