para.AI_ASSUNTOS_CNJ / app /core /query_builder.py
Carlexxx
para.AI beta
d456722
from __future__ import annotations
def build_aggs(config):
aggs={}
for ff in config.facet_fields:
name=getattr(ff,"field",None) or getattr(ff,"dest",None); size=getattr(ff,"facet_size",10); key=name.replace(".","_")
if getattr(ff,"facet_histogram",False): aggs[f"por_{key}"]={"date_histogram":{"field":name,"calendar_interval":"year","min_doc_count":1}}
else:
af=f"{name}.raw" if ff.es_type=="keyword_text" else name; aggs[f"por_{key}"]={"terms":{"field":af,"size":size}}
return aggs
def build_search_query(q,filters=None,page=1,size=10,com_facets=True,config=None):
fields=config.search_fields_es if config else ["*"]; excl=config.source_excludes if config else []
fc=[]
for campo,valor in (filters or {}).items():
fd=next((f for f in (config.campos_filter if config else []) if getattr(f,"field",None)==campo or getattr(f,"dest",None)==campo),None)
fc.append({"term":{f"{campo}.raw":valor}} if fd and fd.es_type=="keyword_text" else {"term":{campo:valor}})
qb={"from":(page-1)*size,"size":size,"query":{"bool":{"must":{"multi_match":{"query":q,"fields":fields,"type":"best_fields","fuzziness":"AUTO","prefix_length":2}},"filter":fc}},"_source":{"excludes":excl} if excl else {}}
if config:
hl={sf.field:{"number_of_fragments":2,"fragment_size":150} for sf in config.search_fields if any(fd.field==sf.field and fd.es_type in("text","keyword_text") for fd in config.campos_filter)}
if hl: qb["highlight"]={"fields":hl,"pre_tags":["<mark>"],"post_tags":["</mark>"]}
if com_facets and config: qb["aggs"]=build_aggs(config)
return qb
def build_autocomplete_query(q,size,config):
sf=config.suggest_fields or ([config.search_fields[0].field] if config.search_fields else [])
sh=[{"match":{f"{f}.autocomplete":{"query":q,"boost":2.0 if i==0 else 1.0}}} for i,f in enumerate(sf[:3])]
return {"size":size,"_source":sf[:3],"query":{"bool":{"should":sh}}}