from __future__ import annotations def build_aggs(config): aggs={} for ff in config.facet_fields: name=getattr(ff,"field",None) or getattr(ff,"dest",None); size=getattr(ff,"facet_size",10); key=name.replace(".","_") if getattr(ff,"facet_histogram",False): aggs[f"por_{key}"]={"date_histogram":{"field":name,"calendar_interval":"year","min_doc_count":1}} else: af=f"{name}.raw" if ff.es_type=="keyword_text" else name; aggs[f"por_{key}"]={"terms":{"field":af,"size":size}} return aggs def build_search_query(q,filters=None,page=1,size=10,com_facets=True,config=None): fields=config.search_fields_es if config else ["*"]; excl=config.source_excludes if config else [] fc=[] for campo,valor in (filters or {}).items(): fd=next((f for f in (config.campos_filter if config else []) if getattr(f,"field",None)==campo or getattr(f,"dest",None)==campo),None) fc.append({"term":{f"{campo}.raw":valor}} if fd and fd.es_type=="keyword_text" else {"term":{campo:valor}}) qb={"from":(page-1)*size,"size":size,"query":{"bool":{"must":{"multi_match":{"query":q,"fields":fields,"type":"best_fields","fuzziness":"AUTO","prefix_length":2}},"filter":fc}},"_source":{"excludes":excl} if excl else {}} if config: hl={sf.field:{"number_of_fragments":2,"fragment_size":150} for sf in config.search_fields if any(fd.field==sf.field and fd.es_type in("text","keyword_text") for fd in config.campos_filter)} if hl: qb["highlight"]={"fields":hl,"pre_tags":[""],"post_tags":[""]} if com_facets and config: qb["aggs"]=build_aggs(config) return qb def build_autocomplete_query(q,size,config): sf=config.suggest_fields or ([config.search_fields[0].field] if config.search_fields else []) sh=[{"match":{f"{f}.autocomplete":{"query":q,"boost":2.0 if i==0 else 1.0}}} for i,f in enumerate(sf[:3])] return {"size":size,"_source":sf[:3],"query":{"bool":{"should":sh}}}