Speed / src /generators /nfe_xml.py
LesterCerioli's picture
Building Speed LLM
d9d7b41
Raw
History Blame Contribute Delete
18.4 kB
from __future__ import annotations
import hashlib
import re
from datetime import datetime, timezone
from decimal import Decimal
from pathlib import Path
from typing import Optional
from xml.etree import ElementTree as ET
from src.fiscal.entities import Empresa, ItemNotaFiscal, NotaFiscal
NS_NFE = "http://www.portalfiscal.inf.br/nfe"
VERSAO_NFE = "4.00"
def _sub(pai: ET.Element, tag: str, texto: str = "") -> ET.Element:
el = ET.SubElement(pai, tag)
if texto:
el.text = str(texto)
return el
def _fmt_dec(v: Decimal, casas: int = 2) -> str:
return f"{v:.{casas}f}"
def _limpar(s: str) -> str:
return re.sub(r"[^\w\s\-.,/@]", "", str(s))[:60]
class GeradorNFeXML:
def __init__(self, nf: NotaFiscal, ambiente: str = "2"):
self.nf = nf
self.ambiente = ambiente # 1=Produção, 2=Homologação
self._cuf_map = {
"AC": "12", "AL": "27", "AP": "16", "AM": "13", "BA": "29",
"CE": "23", "DF": "53", "ES": "32", "GO": "52", "MA": "21",
"MT": "51", "MS": "50", "MG": "31", "PA": "15", "PB": "25",
"PR": "41", "PE": "26", "PI": "22", "RJ": "33", "RN": "24",
"RS": "43", "RO": "11", "RR": "14", "SC": "42", "SP": "35",
"SE": "28", "TO": "17",
}
def _cuf(self, uf: str) -> str:
return self._cuf_map.get(uf.upper(), "35")
def _gerar_chave(self) -> str:
"""Gera a chave de acesso de 44 dígitos da NF-e."""
nf = self.nf
uf = self._cuf(nf.emitente.endereco.uf.value)
aamm = nf.data_emissao.strftime("%y%m")
cnpj = nf.emitente.cnpj.zfill(14)
mod = nf.modelo.zfill(2)
serie = nf.serie.zfill(3)
numero = nf.numero.zfill(9)
tp_emis = "1" # Emissão normal
c_nf = hashlib.md5(f"{cnpj}{numero}".encode()).hexdigest()[:8].upper()
c_nf_digits = "".join(str(int(c, 16) % 10) for c in c_nf)[:8]
chave_sem_dv = f"{uf}{aamm}{cnpj}{mod}{serie}{numero}{tp_emis}{c_nf_digits}"
pesos = [2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4,
5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4, 5, 6, 7, 8, 9, 2, 3, 4]
soma = sum(int(chave_sem_dv[i]) * pesos[i] for i in range(43))
resto = soma % 11
dv = 0 if resto in (0, 1) else 11 - resto
return chave_sem_dv + str(dv)
def _ide(self, root_nfe: ET.Element, chave: str) -> None:
nf = self.nf
ide = _sub(root_nfe, "ide")
_sub(ide, "cUF", self._cuf(nf.emitente.endereco.uf.value))
_sub(ide, "cNF", chave[35:43])
_sub(ide, "natOp", _limpar(nf.natureza_operacao))
_sub(ide, "mod", nf.modelo)
_sub(ide, "serie", nf.serie.zfill(3))
_sub(ide, "nNF", nf.numero.zfill(9))
_sub(ide, "dhEmi", datetime.combine(nf.data_emissao, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00")
_sub(ide, "dhSaiEnt", datetime.combine(nf.data_saida_entrada, datetime.min.time()).strftime("%Y-%m-%dT%H:%M:%S") + "-03:00")
_sub(ide, "tpNF", nf.tipo_operacao)
_sub(ide, "idDest", "1") # 1=Operação interna
_sub(ide, "cMunFG", nf.emitente.endereco.cod_municipio or "3550308")
_sub(ide, "tpImp", "1") # 1=DANFE normal retrato
_sub(ide, "tpEmis", "1") # 1=Emissão normal
_sub(ide, "cDV", chave[-1])
_sub(ide, "tpAmb", self.ambiente)
_sub(ide, "finNFe", "1") # 1=NF-e normal
_sub(ide, "indFinal", "0") # 0=Normal
_sub(ide, "indPres", nf.ind_pagamento)
_sub(ide, "procEmi", "0") # 0=Emissão de NF-e com aplicativo do contribuinte
_sub(ide, "verProc", "1.0.0")
def _endereco(self, pai: ET.Element, emp: Empresa, prefixo: str = "") -> None:
end = emp.endereco
ender = _sub(pai, f"ender{prefixo}")
_sub(ender, "xLgr", _limpar(end.logradouro))
_sub(ender, "nro", end.numero)
if end.complemento:
_sub(ender, "xCpl", _limpar(end.complemento))
_sub(ender, "xBairro", _limpar(end.bairro))
_sub(ender, "cMun", end.cod_municipio or "3550308")
_sub(ender, "xMun", _limpar(end.municipio))
_sub(ender, "UF", end.uf.value)
_sub(ender, "CEP", re.sub(r"\D", "", end.cep))
_sub(ender, "cPais", "1058")
_sub(ender, "xPais", "BRASIL")
if emp.contato.telefone:
_sub(ender, "fone", re.sub(r"\D", "", emp.contato.telefone))
def _emit(self, root_nfe: ET.Element) -> None:
nf = self.nf
emit = _sub(root_nfe, "emit")
_sub(emit, "CNPJ", nf.emitente.cnpj)
_sub(emit, "xNome", _limpar(nf.emitente.razao_social)[:60])
if nf.emitente.nome_fantasia:
_sub(emit, "xFant", _limpar(nf.emitente.nome_fantasia)[:60])
self._endereco(emit, nf.emitente, "Emit")
if nf.emitente.ie:
_sub(emit, "IE", re.sub(r"\D", "", nf.emitente.ie))
_sub(emit, "CRT", nf.emitente.regime_tributario.value[:1])
def _dest(self, root_nfe: ET.Element) -> None:
nf = self.nf
dest = _sub(root_nfe, "dest")
_sub(dest, "CNPJ", nf.destinatario.cnpj)
_sub(dest, "xNome", _limpar(nf.destinatario.razao_social)[:60])
self._endereco(dest, nf.destinatario, "Dest")
if nf.destinatario.ie:
_sub(dest, "IE", re.sub(r"\D", "", nf.destinatario.ie))
_sub(dest, "indIEDest", "1") # 1=Contribuinte ICMS
if nf.destinatario.contato.email:
_sub(dest, "email", nf.destinatario.contato.email[:60])
def _det(self, root_nfe: ET.Element) -> None:
nf = self.nf
for item in nf.itens:
det = _sub(root_nfe, "det")
det.set("nItem", str(item.numero_item))
prod = _sub(det, "prod")
_sub(prod, "cProd", item.produto.codigo[:60])
_sub(prod, "cEAN", item.produto.cod_barras or "SEM GTIN")
_sub(prod, "xProd", _limpar(item.produto.descricao)[:120])
_sub(prod, "NCM", re.sub(r"\D", "", item.produto.ncm)[:8])
if item.produto.cest:
_sub(prod, "CEST", re.sub(r"\D", "", item.produto.cest))
_sub(prod, "CFOP", item.cfop)
_sub(prod, "uCom", item.produto.unidade[:6])
_sub(prod, "qCom", _fmt_dec(item.quantidade, 4))
_sub(prod, "vUnCom", _fmt_dec(item.valor_unitario, 10))
_sub(prod, "vProd", _fmt_dec(item.valor_produtos))
_sub(prod, "cEANTrib", item.produto.cod_barras or "SEM GTIN")
_sub(prod, "uTrib", item.produto.unidade[:6])
_sub(prod, "qTrib", _fmt_dec(item.quantidade, 4))
_sub(prod, "vUnTrib", _fmt_dec(item.valor_unitario, 10))
if item.frete > 0:
_sub(prod, "vFrete", _fmt_dec(item.frete))
if item.seguro > 0:
_sub(prod, "vSeg", _fmt_dec(item.seguro))
if item.desconto > 0:
_sub(prod, "vDesc", _fmt_dec(item.desconto))
if item.outras_despesas > 0:
_sub(prod, "vOutro", _fmt_dec(item.outras_despesas))
_sub(prod, "indTot", "1") # 1=Valor do item compõe o total
imposto = _sub(det, "imposto")
self._icms_item(imposto, item)
self._ipi_item(imposto, item)
self._pis_item(imposto, item)
self._cofins_item(imposto, item)
def _icms_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None:
icms = _sub(imposto, "ICMS")
cst = item.produto.cst_icms
if cst in {"000"}:
grupo = _sub(icms, "ICMS00")
_sub(grupo, "orig", "0")
_sub(grupo, "CST", cst)
_sub(grupo, "modBC", "3")
_sub(grupo, "vBC", _fmt_dec(item.base_icms))
_sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4))
_sub(grupo, "vICMS", _fmt_dec(item.valor_icms))
elif cst in {"020"}:
grupo = _sub(icms, "ICMS20")
_sub(grupo, "orig", "0")
_sub(grupo, "CST", cst)
_sub(grupo, "modBC", "3")
_sub(grupo, "pRedBC", _fmt_dec(Decimal("33.33"), 4))
_sub(grupo, "vBC", _fmt_dec(item.base_icms))
_sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4))
_sub(grupo, "vICMS", _fmt_dec(item.valor_icms))
elif cst in {"040", "041", "050"}:
grupo = _sub(icms, f"ICMS{cst}")
_sub(grupo, "orig", "0")
_sub(grupo, "CST", cst)
elif cst.startswith("1"): # Simples Nacional
grupo = _sub(icms, "ICMSSN102")
_sub(grupo, "orig", "0")
_sub(grupo, "CSOSN", cst)
else:
grupo = _sub(icms, "ICMS90")
_sub(grupo, "orig", "0")
_sub(grupo, "CST", "090")
_sub(grupo, "modBC", "3")
_sub(grupo, "vBC", _fmt_dec(item.base_icms))
_sub(grupo, "pICMS", _fmt_dec(item.produto.aliq_icms, 4))
_sub(grupo, "vICMS", _fmt_dec(item.valor_icms))
def _ipi_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None:
if item.produto.aliq_ipi == 0 and item.produto.cst_ipi in {"52", "53", "54"}:
ipi = _sub(imposto, "IPI")
_sub(ipi, "cEnq", "999")
ipi_trib = _sub(ipi, "IPITrib")
_sub(ipi_trib, "CST", item.produto.cst_ipi)
_sub(ipi_trib, "vBC", _fmt_dec(Decimal("0")))
_sub(ipi_trib, "pIPI", _fmt_dec(Decimal("0"), 4))
_sub(ipi_trib, "vIPI", _fmt_dec(Decimal("0")))
return
ipi = _sub(imposto, "IPI")
_sub(ipi, "cEnq", "999")
ipi_trib = _sub(ipi, "IPITrib")
_sub(ipi_trib, "CST", item.produto.cst_ipi)
_sub(ipi_trib, "vBC", _fmt_dec(item.base_ipi))
_sub(ipi_trib, "pIPI", _fmt_dec(item.produto.aliq_ipi, 4))
_sub(ipi_trib, "vIPI", _fmt_dec(item.valor_ipi))
def _pis_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None:
pis = _sub(imposto, "PIS")
cst = item.produto.cst_pis
if cst in {"07", "08", "09"}:
pisnt = _sub(pis, "PISNT")
_sub(pisnt, "CST", cst)
else:
pisaliq = _sub(pis, "PISAliq")
_sub(pisaliq, "CST", cst)
_sub(pisaliq, "vBC", _fmt_dec(item.valor_produtos))
_sub(pisaliq, "pPIS", _fmt_dec(Decimal("0.65"), 4))
_sub(pisaliq, "vPIS", _fmt_dec(Decimal("0")))
def _cofins_item(self, imposto: ET.Element, item: ItemNotaFiscal) -> None:
cofins = _sub(imposto, "COFINS")
cst = item.produto.cst_cofins
if cst in {"07", "08", "09"}:
cofinsnt = _sub(cofins, "COFINSNT")
_sub(cofinsnt, "CST", cst)
else:
cofinsaliq = _sub(cofins, "COFINSAliq")
_sub(cofinsaliq, "CST", cst)
_sub(cofinsaliq, "vBC", _fmt_dec(item.valor_produtos))
_sub(cofinsaliq, "pCOFINS", _fmt_dec(Decimal("3.00"), 4))
_sub(cofinsaliq, "vCOFINS", _fmt_dec(Decimal("0")))
def _total(self, root_nfe: ET.Element) -> None:
nf = self.nf
total = _sub(root_nfe, "total")
ict = _sub(total, "ICMSTot")
_sub(ict, "vBC", _fmt_dec(sum(i.base_icms for i in nf.itens)))
_sub(ict, "vICMS", _fmt_dec(nf.valor_icms))
_sub(ict, "vICMSDeson", "0.00")
_sub(ict, "vFCPUFDest", "0.00")
_sub(ict, "vICMSUFDest", "0.00")
_sub(ict, "vICMSUFRemet", "0.00")
_sub(ict, "vFCP", "0.00")
_sub(ict, "vBCST", "0.00")
_sub(ict, "vST", "0.00")
_sub(ict, "vFCPST", "0.00")
_sub(ict, "vFCPSTRet", "0.00")
_sub(ict, "vProd", _fmt_dec(nf.valor_produtos))
_sub(ict, "vFrete", _fmt_dec(nf.valor_frete))
_sub(ict, "vSeg", _fmt_dec(nf.valor_seguro))
_sub(ict, "vDesc", _fmt_dec(nf.valor_desconto))
_sub(ict, "vII", "0.00")
_sub(ict, "vIPI", _fmt_dec(nf.valor_ipi))
_sub(ict, "vIPIDevol", "0.00")
_sub(ict, "vPIS", "0.00")
_sub(ict, "vCOFINS", "0.00")
_sub(ict, "vOutro", _fmt_dec(nf.valor_outras_despesas))
_sub(ict, "vNF", _fmt_dec(nf.valor_total))
_sub(ict, "vTotTrib", "0.00")
def _transp(self, root_nfe: ET.Element) -> None:
transp = _sub(root_nfe, "transp")
_sub(transp, "modFrete", "9") # 9=Sem frete
def _pag(self, root_nfe: ET.Element) -> None:
pag = _sub(root_nfe, "pag")
detPag = _sub(pag, "detPag")
_sub(detPag, "tPag", "01") # 01=Dinheiro
_sub(detPag, "vPag", _fmt_dec(self.nf.valor_total))
def _inf_adic(self, root_nfe: ET.Element) -> None:
nf = self.nf
if nf.info_complementar:
inf = _sub(root_nfe, "infAdic")
_sub(inf, "infCpl", _limpar(nf.info_complementar)[:500])
def gerar_xml(self) -> str:
chave = self.nf.chave_acesso or self._gerar_chave()
nfeProc = ET.Element("nfeProc")
nfeProc.set("xmlns", NS_NFE)
nfeProc.set("versao", VERSAO_NFE)
nfe = _sub(nfeProc, "NFe")
nfe.set("xmlns", NS_NFE)
infNFe = _sub(nfe, "infNFe")
infNFe.set("versao", VERSAO_NFE)
infNFe.set("Id", f"NFe{chave}")
self._ide(infNFe, chave)
self._emit(infNFe)
self._dest(infNFe)
self._det(infNFe)
self._total(infNFe)
self._transp(infNFe)
self._pag(infNFe)
self._inf_adic(infNFe)
ET.indent(nfeProc, space=" ")
return '<?xml version="1.0" encoding="UTF-8"?>\n' + ET.tostring(nfeProc, encoding="unicode")
def salvar(self, diretorio: str | Path = ".") -> Path:
chave = self.nf.chave_acesso or self._gerar_chave()
caminho = Path(diretorio) / f"NFe{chave}.xml"
caminho.parent.mkdir(parents=True, exist_ok=True)
caminho.write_text(self.gerar_xml(), encoding="utf-8")
return caminho
class AssinadorNFe:
"""
Assina digitalmente o XML da NF-e com certificado ICP-Brasil (A1 ou A3).
Requer: cryptography + lxml (ou signxml)
O certificado deve ser um PKCS#12 (.pfx ou .p12) válido emitido por
uma Autoridade Certificadora credenciada pela ICP-Brasil.
"""
def __init__(self, cert_path: str, cert_senha: str):
self.cert_path = cert_path
self.cert_senha = cert_senha
self._privkey = None
self._cert = None
self._carregar_certificado()
def _carregar_certificado(self) -> None:
try:
from cryptography.hazmat.primitives.serialization import pkcs12
dados = Path(self.cert_path).read_bytes()
priv, cert, _ = pkcs12.load_key_and_certificates(
dados, self.cert_senha.encode()
)
self._privkey = priv
self._cert = cert
except ImportError:
raise RuntimeError("Instale 'cryptography': pip install cryptography")
except Exception as e:
raise RuntimeError(f"Erro ao carregar certificado: {e}")
def assinar(self, xml_str: str) -> str:
"""
Assina o XML da NF-e conforme o padrão XMLDSig exigido pela SEFAZ.
Retorna o XML assinado como string.
"""
try:
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
import base64
from lxml import etree
doc = etree.fromstring(xml_str.encode("utf-8"))
ns = {"nfe": NS_NFE}
inf_nfe = doc.find(".//nfe:infNFe", ns)
if inf_nfe is None:
raise ValueError("Elemento infNFe não encontrado no XML")
ref_id = inf_nfe.get("Id", "")
inf_bytes = etree.tostring(inf_nfe, method="c14n", exclusive=True)
digest = hashes.Hash(hashes.SHA1())
digest.update(inf_bytes)
digest_value = base64.b64encode(digest.finalize()).decode()
signed_info_xml = (
'<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">'
'<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>'
'<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>'
f'<Reference URI="#{ref_id}">'
"<Transforms>"
'<Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature"/>'
'<Transform Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>'
"</Transforms>"
'<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>'
f"<DigestValue>{digest_value}</DigestValue>"
"</Reference>"
"</SignedInfo>"
)
si_doc = etree.fromstring(signed_info_xml.encode())
si_bytes = etree.tostring(si_doc, method="c14n", exclusive=True)
sig_bytes = self._privkey.sign(si_bytes, padding.PKCS1v15(), hashes.SHA1())
sig_value = base64.b64encode(sig_bytes).decode()
cert_der = self._cert.public_bytes(serialization.Encoding.DER)
cert_b64 = base64.b64encode(cert_der).decode()
signature_xml = (
'<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">'
+ signed_info_xml
+ f"<SignatureValue>{sig_value}</SignatureValue>"
"<KeyInfo>"
f"<X509Data><X509Certificate>{cert_b64}</X509Certificate></X509Data>"
"</KeyInfo>"
"</Signature>"
)
sig_el = etree.fromstring(signature_xml.encode())
inf_nfe.append(sig_el)
return etree.tostring(doc, xml_declaration=True, encoding="unicode")
except ImportError:
raise RuntimeError("Instale 'lxml' e 'cryptography': pip install lxml cryptography")
def assinar_arquivo(self, xml_path: Path, destino: Optional[Path] = None) -> Path:
xml_str = xml_path.read_text(encoding="utf-8")
assinado = self.assinar(xml_str)
saida = destino or xml_path.with_suffix(".assinado.xml")
saida.write_text(assinado, encoding="utf-8")
return saida