|
|
| from __future__ import annotations |
|
|
| import hashlib |
| import re |
| from datetime import datetime, timezone |
| from decimal import Decimal |
| from pathlib import Path |
| from typing import Optional |
| from xml.etree import ElementTree as ET |
|
|
| from src.fiscal.entities import Empresa, ItemNotaFiscal, NotaFiscal |
|
|
| NS_NFE = "http://www.portalfiscal.inf.br/nfe" |
| VERSAO_NFE = "4.00" |
|
|
|
|
| def _sub(pai: ET.Element, tag: str, texto: str = "") -> ET.Element: |
| el = ET.SubElement(pai, tag) |
| if texto: |
| el.text = str(texto) |
| return el |
|
|
|
|
| def _fmt_dec(v: Decimal, casas: int = 2) -> str: |
| return f"{v:.{casas}f}" |
|
|
|
|
| def _limpar(s: str) -> str: |
| return re.sub(r"[^\w\s\-.,/@]", "", str(s))[:60] |
|
|
|
|
| class GeradorNFeXML: |
| |
| def __init__(self, nf: NotaFiscal, ambiente: str = "2"): |
| self.nf = nf |
| self.ambiente = ambiente |
| self._cuf_map = { |
| "AC": "12", "AL": "27", "AP": "16", "AM": "13", "BA": "29", |
| "CE": "23", "DF": "53", "ES": "32", "GO": "52", "MA": "21", |
| "MT": "51", "MS": "50", "MG": "31", "PA": "15", "PB": "25", |
| "PR": "41", "PE": "26", "PI": "22", "RJ": "33", "RN": "24", |
| "RS": "43", "RO": "11", "RR": "14", "SC": "42", "SP": "35", |
| "SE": "28", "TO": "17", |
| } |
|
|
| def _cuf(self, uf: str) -> str: |
| return self._cuf_map.get(uf.upper(), "35") |
|
|
| def _gerar_chave(self) -> str: |
| """Gera a chave de acesso de 44 dígitos da NF-e.""" |
| nf = self.nf |
| uf = self._cuf(nf.emitente.endereco.uf.value) |
| aamm = nf.data_emissao.strftime("%y%m") |
| cnpj = nf.emitente.cnpj.zfill(14) |
| mod = nf.modelo.zfill(2) |
| serie = nf.serie.zfill(3) |
| numero = nf.numero.zfill(9) |
| tp_emis = "1" |
| c_nf = hashlib.md5(f"{cnpj}{numero}".encode()).hexdigest()[:8].upper() |
| c_nf_digits = "".join(str(int(c, 16) % 10) for c in c_nf)[:8] |
|
|
| chave_sem_dv = f"{uf}{aamm}{cnpj}{mod}{serie}{numero}{tp_emis}{c_nf_digits}" |
|
|
| |
| pesos = [2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, |
| 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4] |
| soma = sum(int(chave_sem_dv[i]) * pesos[i] for i in range(43)) |
| resto = soma % 11 |
| dv = 0 if resto in (0, 1) else 11 - resto |
|
|
| return chave_sem_dv + str(dv) |
|
|
| def _ide(self, root_nfe: ET.Element, chave: str) -> None: |
| nf = self.nf |
| ide = _sub(root_nfe, "ide") |
| _sub(ide, "cUF", self._cuf(nf.emitente.endereco.uf.value)) |
| _sub(ide, "cNF", chave[35:43]) |
| _sub(ide, "natOp", _limpar(nf.natureza_operacao)) |
| _sub(ide, "mod", nf.modelo) |
| _sub(ide, "serie", nf.serie.zfill(3)) |
| _sub(ide, "nNF", nf.numero.zfill(9)) |
| _sub(ide, "dhEmi", datetime.combine(nf.data_emissao, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00") |
| _sub(ide, "dhSaiEnt", datetime.combine(nf.data_saida_entrada, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00") |
| _sub(ide, "tpNF", nf.tipo_operacao) |
| _sub(ide, "idDest", "1") |
| _sub(ide, "cMunFG", nf.emitente.endereco.cod_municipio or "3550308") |
| _sub(ide, "tpImp", "1") |
| _sub(ide, "tpEmis", "1") |
| _sub(ide, "cDV", chave[-1]) |
| _sub(ide, "tpAmb", self.ambiente) |
| _sub(ide, "finNFe", "1") |
| _sub(ide, "indFinal", "0") |
| _sub(ide, "indPres", nf.ind_pagamento) |
| _sub(ide, "procEmi", "0") |
| _sub(ide, "verProc", "1.0.0") |
|
|
| def _endereco(self, pai: ET.Element, emp: Empresa, prefixo: str = "") -> None: |
| end = emp.endereco |
| ender = _sub(pai, f"ender{prefixo}") |
| _sub(ender, "xLgr", _limpar(end.logradouro)) |
| _sub(ender, "nro", end.numero) |
| if end.complemento: |
| _sub(ender, "xCpl", _limpar(end.complemento)) |
| _sub(ender, "xBairro", _limpar(end.bairro)) |
| _sub(ender, "cMun", end.cod_municipio or "3550308") |
| _sub(ender, "xMun", _limpar(end.municipio)) |
| _sub(ender, "UF", end.uf.value) |
| _sub(ender, "CEP", re.sub(r"\D", "", end.cep)) |
| _sub(ender, "cPais", "1058") |
| _sub(ender, "xPais", "BRASIL") |
| if emp.contato.telefone: |
| _sub(ender, "fone", re.sub(r"\D", "", emp.contato.telefone)) |
|
|
| def _emit(self, root_nfe: ET.Element) -> None: |
| nf = self.nf |
| emit = _sub(root_nfe, "emit") |
| _sub(emit, "CNPJ", nf.emitente.cnpj) |
| _sub(emit, "xNome", _limpar(nf.emitente.razao_social)[:60]) |
| if nf.emitente.nome_fantasia: |
| _sub(emit, "xFant", _limpar(nf.emitente.nome_fantasia)[:60]) |
| self._endereco(emit, nf.emitente, "Emit") |
| if nf.emitente.ie: |
| _sub(emit, "IE", re.sub(r"\D", "", nf.emitente.ie)) |
| _sub(emit, "CRT", nf.emitente.regime_tributario.value[:1]) |
|
|
| def _dest(self, root_nfe: ET.Element) -> None: |
| nf = self.nf |
| dest = _sub(root_nfe, "dest") |
| _sub(dest, "CNPJ", nf.destinatario.cnpj) |
| _sub(dest, "xNome", _limpar(nf.destinatario.razao_social)[:60]) |
| self._endereco(dest, nf.destinatario, "Dest") |
| if nf.destinatario.ie: |
| _sub(dest, "IE", re.sub(r"\D", "", nf.destinatario.ie)) |
| _sub(dest, "indIEDest", "1") |
| if nf.destinatario.contato.email: |
| _sub(dest, "email", nf.destinatario.contato.email[:60]) |
|
|
| def _det(self, root_nfe: ET.Element) -> None: |
| nf = self.nf |
| for item in nf.itens: |
| det = _sub(root_nfe, "det") |
| det.set("nItem", str(item.numero_item)) |
|
|
| prod = _sub(det, "prod") |
| _sub(prod, "cProd", item.produto.codigo[:60]) |
| _sub(prod, "cEAN", item.produto.cod_barras or "SEM GTIN") |
| _sub(prod, "xProd", _limpar(item.produto.descricao)[:120]) |
| _sub(prod, "NCM", re.sub(r"\D", "", item.produto.ncm)[:8]) |
| if item.produto.cest: |
| _sub(prod, "CEST", re.sub(r"\D", "", item.produto.cest)) |
| _sub(prod, "CFOP", item.cfop) |
| _sub(prod, "uCom", item.produto.unidade[:6]) |
| _sub(prod, "qCom", _fmt_dec(item.quantidade, 4)) |
| _sub(prod, "vUnCom", _fmt_dec(item.valor_unitario, 10)) |
| _sub(prod, "vProd", _fmt_dec(item.valor_produtos)) |
| _sub(prod, "cEANTrib", item.produto.cod_barras or "SEM GTIN") |
| _sub(prod, "uTrib", item.produto.unidade[:6]) |
| _sub(prod, "qTrib", _fmt_dec(item.quantidade, 4)) |
| _sub(prod, "vUnTrib", _fmt_dec(item.valor_unitario, 10)) |
| if item.frete > 0: |
| _sub(prod, "vFrete", _fmt_dec(item.frete)) |
| if item.seguro > 0: |
| _sub(prod, "vSeg", _fmt_dec(item.seguro)) |
| if item.desconto > 0: |
| _sub(prod, "vDesc", _fmt_dec(item.desconto)) |
| if item.outras_despesas > 0: |
| _sub(prod, "vOutro", _fmt_dec(item.outras_despesas)) |
| _sub(prod, "indTot", "1") |
|
|
| |
| imposto = _sub(det, "imposto") |
| self._icms_item(imposto, item) |
| self._ipi_item(imposto, item) |
| self._pis_item(imposto, item) |
| self._cofins_item(imposto, item) |
|
|
| def _icms_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: |
| icms = _sub(imposto, "ICMS") |
| cst = item.produto.cst_icms |
|
|
| |
| if cst in {"000"}: |
| grupo = _sub(icms, "ICMS00") |
| _sub(grupo, "orig", "0") |
| _sub(grupo, "CST", cst) |
| _sub(grupo, "modBC", "3") |
| _sub(grupo, "vBC", _fmt_dec(item.base_icms)) |
| _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) |
| _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) |
| elif cst in {"020"}: |
| grupo = _sub(icms, "ICMS20") |
| _sub(grupo, "orig", "0") |
| _sub(grupo, "CST", cst) |
| _sub(grupo, "modBC", "3") |
| _sub(grupo, "pRedBC", _fmt_dec(Decimal("33.33"), 4)) |
| _sub(grupo, "vBC", _fmt_dec(item.base_icms)) |
| _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) |
| _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) |
| elif cst in {"040", "041", "050"}: |
| grupo = _sub(icms, f"ICMS{cst}") |
| _sub(grupo, "orig", "0") |
| _sub(grupo, "CST", cst) |
| elif cst.startswith("1"): |
| grupo = _sub(icms, "ICMSSN102") |
| _sub(grupo, "orig", "0") |
| _sub(grupo, "CSOSN", cst) |
| else: |
| grupo = _sub(icms, "ICMS90") |
| _sub(grupo, "orig", "0") |
| _sub(grupo, "CST", "090") |
| _sub(grupo, "modBC", "3") |
| _sub(grupo, "vBC", _fmt_dec(item.base_icms)) |
| _sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4)) |
| _sub(grupo, "vICMS", _fmt_dec(item.valor_icms)) |
|
|
| def _ipi_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: |
| if item.produto.aliq_ipi == 0 and item.produto.cst_ipi in {"52", "53", "54"}: |
| ipi = _sub(imposto, "IPI") |
| _sub(ipi, "cEnq", "999") |
| ipi_trib = _sub(ipi, "IPITrib") |
| _sub(ipi_trib, "CST", item.produto.cst_ipi) |
| _sub(ipi_trib, "vBC", _fmt_dec(Decimal("0"))) |
| _sub(ipi_trib, "pIPI", _fmt_dec(Decimal("0"), 4)) |
| _sub(ipi_trib, "vIPI", _fmt_dec(Decimal("0"))) |
| return |
|
|
| ipi = _sub(imposto, "IPI") |
| _sub(ipi, "cEnq", "999") |
| ipi_trib = _sub(ipi, "IPITrib") |
| _sub(ipi_trib, "CST", item.produto.cst_ipi) |
| _sub(ipi_trib, "vBC", _fmt_dec(item.base_ipi)) |
| _sub(ipi_trib, "pIPI", _fmt_dec(item.produto.aliq_ipi, 4)) |
| _sub(ipi_trib, "vIPI", _fmt_dec(item.valor_ipi)) |
|
|
| def _pis_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: |
| pis = _sub(imposto, "PIS") |
| cst = item.produto.cst_pis |
| if cst in {"07", "08", "09"}: |
| pisnt = _sub(pis, "PISNT") |
| _sub(pisnt, "CST", cst) |
| else: |
| pisaliq = _sub(pis, "PISAliq") |
| _sub(pisaliq, "CST", cst) |
| _sub(pisaliq, "vBC", _fmt_dec(item.valor_produtos)) |
| _sub(pisaliq, "pPIS", _fmt_dec(Decimal("0.65"), 4)) |
| _sub(pisaliq, "vPIS", _fmt_dec(Decimal("0"))) |
|
|
| def _cofins_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None: |
| cofins = _sub(imposto, "COFINS") |
| cst = item.produto.cst_cofins |
| if cst in {"07", "08", "09"}: |
| cofinsnt = _sub(cofins, "COFINSNT") |
| _sub(cofinsnt, "CST", cst) |
| else: |
| cofinsaliq = _sub(cofins, "COFINSAliq") |
| _sub(cofinsaliq, "CST", cst) |
| _sub(cofinsaliq, "vBC", _fmt_dec(item.valor_produtos)) |
| _sub(cofinsaliq, "pCOFINS", _fmt_dec(Decimal("3.00"), 4)) |
| _sub(cofinsaliq, "vCOFINS", _fmt_dec(Decimal("0"))) |
|
|
| def _total(self, root_nfe: ET.Element) -> None: |
| nf = self.nf |
| total = _sub(root_nfe, "total") |
| ict = _sub(total, "ICMSTot") |
| _sub(ict, "vBC", _fmt_dec(sum(i.base_icms for i in nf.itens))) |
| _sub(ict, "vICMS", _fmt_dec(nf.valor_icms)) |
| _sub(ict, "vICMSDeson", "0.00") |
| _sub(ict, "vFCPUFDest", "0.00") |
| _sub(ict, "vICMSUFDest", "0.00") |
| _sub(ict, "vICMSUFRemet", "0.00") |
| _sub(ict, "vFCP", "0.00") |
| _sub(ict, "vBCST", "0.00") |
| _sub(ict, "vST", "0.00") |
| _sub(ict, "vFCPST", "0.00") |
| _sub(ict, "vFCPSTRet", "0.00") |
| _sub(ict, "vProd", _fmt_dec(nf.valor_produtos)) |
| _sub(ict, "vFrete", _fmt_dec(nf.valor_frete)) |
| _sub(ict, "vSeg", _fmt_dec(nf.valor_seguro)) |
| _sub(ict, "vDesc", _fmt_dec(nf.valor_desconto)) |
| _sub(ict, "vII", "0.00") |
| _sub(ict, "vIPI", _fmt_dec(nf.valor_ipi)) |
| _sub(ict, "vIPIDevol", "0.00") |
| _sub(ict, "vPIS", "0.00") |
| _sub(ict, "vCOFINS", "0.00") |
| _sub(ict, "vOutro", _fmt_dec(nf.valor_outras_despesas)) |
| _sub(ict, "vNF", _fmt_dec(nf.valor_total)) |
| _sub(ict, "vTotTrib", "0.00") |
|
|
| def _transp(self, root_nfe: ET.Element) -> None: |
| transp = _sub(root_nfe, "transp") |
| _sub(transp, "modFrete", "9") |
|
|
| def _pag(self, root_nfe: ET.Element) -> None: |
| pag = _sub(root_nfe, "pag") |
| detPag = _sub(pag, "detPag") |
| _sub(detPag, "tPag", "01") |
| _sub(detPag, "vPag", _fmt_dec(self.nf.valor_total)) |
|
|
| def _inf_adic(self, root_nfe: ET.Element) -> None: |
| nf = self.nf |
| if nf.info_complementar: |
| inf = _sub(root_nfe, "infAdic") |
| _sub(inf, "infCpl", _limpar(nf.info_complementar)[:500]) |
|
|
| def gerar_xml(self) -> str: |
| |
| chave = self.nf.chave_acesso or self._gerar_chave() |
|
|
| |
| nfeProc = ET.Element("nfeProc") |
| nfeProc.set("xmlns", NS_NFE) |
| nfeProc.set("versao", VERSAO_NFE) |
|
|
| nfe = _sub(nfeProc, "NFe") |
| nfe.set("xmlns", NS_NFE) |
|
|
| infNFe = _sub(nfe, "infNFe") |
| infNFe.set("versao", VERSAO_NFE) |
| infNFe.set("Id", f"NFe{chave}") |
|
|
| self._ide(infNFe, chave) |
| self._emit(infNFe) |
| self._dest(infNFe) |
| self._det(infNFe) |
| self._total(infNFe) |
| self._transp(infNFe) |
| self._pag(infNFe) |
| self._inf_adic(infNFe) |
|
|
| ET.indent(nfeProc, space=" ") |
| return '<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(nfeProc, encoding="unicode") |
|
|
| def salvar(self, diretorio: str | Path = ".") -> Path: |
| chave = self.nf.chave_acesso or self._gerar_chave() |
| caminho = Path(diretorio) / f"NFe{chave}.xml" |
| caminho.parent.mkdir(parents=True, exist_ok=True) |
| caminho.write_text(self.gerar_xml(), encoding="utf-8") |
| return caminho |
|
|
|
|
| class AssinadorNFe: |
| """ |
| Assina digitalmente o XML da NF-e com certificado ICP-Brasil (A1 ou A3). |
| |
| Requer: cryptography + lxml (ou signxml) |
| O certificado deve ser um PKCS#12 (.pfx ou .p12) válido emitido por |
| uma Autoridade Certificadora credenciada pela ICP-Brasil. |
| """ |
|
|
| def __init__(self, cert_path: str, cert_senha: str): |
| self.cert_path = cert_path |
| self.cert_senha = cert_senha |
| self._privkey = None |
| self._cert = None |
| self._carregar_certificado() |
|
|
| def _carregar_certificado(self) -> None: |
| try: |
| from cryptography.hazmat.primitives.serialization import pkcs12 |
| dados = Path(self.cert_path).read_bytes() |
| priv, cert, _ = pkcs12.load_key_and_certificates( |
| dados, self.cert_senha.encode() |
| ) |
| self._privkey = priv |
| self._cert = cert |
| except ImportError: |
| raise RuntimeError("Instale 'cryptography': pip install cryptography") |
| except Exception as e: |
| raise RuntimeError(f"Erro ao carregar certificado: {e}") |
|
|
| def assinar(self, xml_str: str) -> str: |
| """ |
| Assina o XML da NF-e conforme o padrão XMLDSig exigido pela SEFAZ. |
| Retorna o XML assinado como string. |
| """ |
| try: |
| from cryptography.hazmat.primitives import hashes, serialization |
| from cryptography.hazmat.primitives.asymmetric import padding |
| import base64 |
| from lxml import etree |
|
|
| doc = etree.fromstring(xml_str.encode("utf-8")) |
| ns = {"nfe": NS_NFE} |
|
|
| |
| inf_nfe = doc.find(".//nfe:infNFe", ns) |
| if inf_nfe is None: |
| raise ValueError("Elemento infNFe não encontrado no XML") |
|
|
| ref_id = inf_nfe.get("Id", "") |
|
|
| |
| inf_bytes = etree.tostring(inf_nfe, method="c14n", exclusive=True) |
|
|
| |
| digest = hashes.Hash(hashes.SHA1()) |
| digest.update(inf_bytes) |
| digest_value = base64.b64encode(digest.finalize()).decode() |
|
|
| |
| signed_info_xml = ( |
| '<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">' |
| '<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>' |
| '<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>' |
| f'<Reference URI="#{ref_id}">' |
| "<Transforms>" |
| '<Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature"/>' |
| '<Transform Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>' |
| "</Transforms>" |
| '<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>' |
| f"<DigestValue>{digest_value}</DigestValue>" |
| "</Reference>" |
| "</SignedInfo>" |
| ) |
|
|
| si_doc = etree.fromstring(signed_info_xml.encode()) |
| si_bytes = etree.tostring(si_doc, method="c14n", exclusive=True) |
|
|
| |
| sig_bytes = self._privkey.sign(si_bytes, padding.PKCS1v15(), hashes.SHA1()) |
| sig_value = base64.b64encode(sig_bytes).decode() |
|
|
| |
| cert_der = self._cert.public_bytes(serialization.Encoding.DER) |
| cert_b64 = base64.b64encode(cert_der).decode() |
|
|
| |
| signature_xml = ( |
| '<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">' |
| + signed_info_xml |
| + f"<SignatureValue>{sig_value}</SignatureValue>" |
| "<KeyInfo>" |
| f"<X509Data><X509Certificate>{cert_b64}</X509Certificate></X509Data>" |
| "</KeyInfo>" |
| "</Signature>" |
| ) |
|
|
| sig_el = etree.fromstring(signature_xml.encode()) |
| inf_nfe.append(sig_el) |
|
|
| return etree.tostring(doc, xml_declaration=True, encoding="unicode") |
|
|
| except ImportError: |
| raise RuntimeError("Instale 'lxml' e 'cryptography': pip install lxml cryptography") |
|
|
| def assinar_arquivo(self, xml_path: Path, destino: Optional[Path] = None) -> Path: |
| xml_str = xml_path.read_text(encoding="utf-8") |
| assinado = self.assinar(xml_str) |
| saida = destino or xml_path.with_suffix(".assinado.xml") |
| saida.write_text(assinado, encoding="utf-8") |
| return saida |
|
|