brainsqueeze commited on
Commit
d916808
·
verified ·
1 Parent(s): fc7f1bb

Delete ask_candid/tools/elastic

Browse files
ask_candid/tools/elastic/__init__.py DELETED
File without changes
ask_candid/tools/elastic/index_data_tool.py DELETED
@@ -1,59 +0,0 @@
1
- from typing import Type, Optional
2
- import logging
3
-
4
- from pydantic import BaseModel, Field
5
-
6
- from elasticsearch import Elasticsearch
7
-
8
- from langchain.callbacks.manager import CallbackManagerForToolRun
9
- from langchain.tools.base import BaseTool
10
- from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA
11
-
12
- logging.basicConfig(level="INFO")
13
- logger = logging.getLogger("elasticsearch_playground")
14
- es = Elasticsearch(
15
- cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
16
- api_key=SEMANTIC_ELASTIC_QA.api_key,
17
- verify_certs=True,
18
- request_timeout=60 * 3
19
- )
20
-
21
-
22
- class IndexShowDataInput(BaseModel):
23
- """Input for the index show data tool."""
24
-
25
- index_name: str = Field(
26
- ..., description="The name of the index for which the data is to be retrieved"
27
- )
28
-
29
-
30
- class IndexShowDataTool(BaseTool):
31
- """Tool for getting a list of entries from an ElasticSearch index, helpful to figure out what data is available."""
32
-
33
- name: str = "elastic_index_show_data" # Added type annotation
34
- description: str = (
35
- "Input is an index name, output is a JSON based string with an extract of the data of the index"
36
- )
37
- args_schema: Optional[Type[BaseModel]] = (
38
- IndexShowDataInput # This should be placed before methods
39
- )
40
-
41
- def _run(
42
- self,
43
- index_name: str,
44
- run_manager: Optional[CallbackManagerForToolRun] = None,
45
- ) -> str:
46
- """Get all indices in the Elasticsearch server, usually separated by a line break."""
47
- try:
48
- # Ensure `es` is properly initialized before this method is called
49
- res = es.search(
50
- index=index_name,
51
- from_=0,
52
- size=20,
53
- query={"match_all": {}},
54
- )
55
- return str(res["hits"])
56
- except Exception as e:
57
- print(e)
58
- logger.exception("Could not fetch index data for %s", index_name)
59
- return ""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ask_candid/tools/elastic/index_details_tool.py DELETED
@@ -1,75 +0,0 @@
1
- from typing import Type, Optional
2
- import logging
3
-
4
- from pydantic import BaseModel, Field
5
-
6
- from elasticsearch import Elasticsearch
7
-
8
- from langchain.callbacks.manager import (
9
- AsyncCallbackManagerForToolRun,
10
- CallbackManagerForToolRun,
11
- )
12
- from langchain.tools.base import BaseTool
13
- from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA
14
-
15
-
16
- logging.basicConfig(level="INFO")
17
- logger = logging.getLogger("elasticsearch_playground")
18
- es = Elasticsearch(
19
- cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
20
- api_key=SEMANTIC_ELASTIC_QA.api_key,
21
- verify_certs=True,
22
- request_timeout=60 * 3,
23
- )
24
-
25
-
26
- class IndexDetailsInput(BaseModel):
27
- """Input for the list index details tool."""
28
-
29
- index_name: str = Field(
30
- ...,
31
- description="The name of the index for which the details are to be retrieved",
32
- )
33
-
34
-
35
- class IndexDetailsTool(BaseTool):
36
- """Tool for getting information about a single ElasticSearch index."""
37
-
38
- name: str = "elastic_index_show_details" # Added type annotation
39
- description: str = (
40
- "Input is an index name, output is a JSON-based string with the aliases, mappings containing the field names, and settings of an index."
41
- )
42
- args_schema: Optional[Type[BaseModel]] = (
43
- IndexDetailsInput # Ensure this is above the methods
44
- )
45
-
46
- def _run(
47
- self,
48
- index_name: str,
49
- run_manager: Optional[CallbackManagerForToolRun] = None,
50
- ) -> str:
51
- """Get information about a single Elasticsearch index."""
52
- try:
53
- # Ensure that `es` is correctly initialized before calling this method
54
- alias = es.indices.get_alias(index=index_name)
55
- field_mappings = es.indices.get_field_mapping(index=index_name, fields="*")
56
- field_settings = es.indices.get_settings(index=index_name)
57
- return str(
58
- {
59
- "alias": alias[index_name],
60
- "field_mappings": field_mappings[index_name],
61
- "settings": field_settings[index_name],
62
- }
63
- )
64
- except Exception as e:
65
- logger.exception(
66
- "Could not fetch index information for %s: %s", index_name, e
67
- )
68
- return ""
69
-
70
- async def _arun(
71
- self,
72
- index_name: str = "",
73
- run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
74
- ) -> str:
75
- raise NotImplementedError("IndexDetailsTool does not support async operations")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ask_candid/tools/elastic/index_search_tool.py DELETED
@@ -1,131 +0,0 @@
1
- import logging
2
- import json
3
-
4
- import tiktoken
5
- from elasticsearch import Elasticsearch
6
-
7
- # from pydantic.v1 import BaseModel, Field # <-- Uses v1 namespace
8
- from pydantic import BaseModel, Field
9
- from langchain.tools import StructuredTool
10
-
11
- from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA
12
-
13
- logging.basicConfig(level="INFO")
14
- logger = logging.getLogger("elasticsearch_playground")
15
- es = Elasticsearch(
16
- cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
17
- api_key=SEMANTIC_ELASTIC_QA.api_key,
18
- verify_certs=True,
19
- request_timeout=60 * 3,
20
- )
21
-
22
-
23
- class SearchToolInput(BaseModel):
24
- """Input for the index show data tool."""
25
-
26
- index_name: str = Field(
27
- ..., description="The name of the index for which the data is to be retrieved"
28
- )
29
- query: str = Field(
30
- ...,
31
- description="The ElasticSearch JSON query used to filter all hits. Should use the _source field if possible to specify required fields.",
32
- )
33
- from_: int = Field(
34
- ..., description="The record index from which the query will start"
35
- )
36
- size: int = Field(
37
- ...,
38
- description="How many records will be retrieved from the ElasticSearch query",
39
- )
40
-
41
-
42
- def elastic_search(
43
- pcs_codes: dict,
44
- index_name: str,
45
- query: str,
46
- from_: int = 0,
47
- size: int = 20,
48
- ):
49
- """Executes a specific query on an ElasticSearch index and returns all hits or aggregation results"""
50
- size = min(50, size)
51
- encoding = tiktoken.encoding_for_model("gpt-4")
52
- try:
53
- full_dict: dict = json.loads(query)
54
- query_dict = None
55
- aggs_dict = None
56
- sort_dict = None
57
- if "query" in full_dict:
58
- query_dict = full_dict["query"]
59
- if "aggs" in full_dict:
60
- aggs_dict = full_dict["aggs"]
61
- if "sort" in full_dict:
62
- sort_dict = full_dict["sort"]
63
- if query_dict is None and aggs_dict is None and sort_dict is None:
64
- # Assume that there is a query but that the query part was ommitted.
65
- query_dict = full_dict
66
- if query_dict is None and aggs_dict is not None:
67
- # This is an aggregation query, therefore we suppress the hits here
68
- size = 200
69
- logger.info(query)
70
- # Print the query
71
- # print(f"Executing Elasticsearch Query: {query}")
72
- final_res = ""
73
- retries = 0
74
- while retries < 100:
75
- res = es.search(
76
- index=index_name,
77
- from_=from_,
78
- size=size,
79
- query=query_dict,
80
- aggs=aggs_dict,
81
- sort=sort_dict,
82
- )
83
- if query_dict is None and aggs_dict is not None:
84
- # When a result has aggregations, just return that and ignore the rest
85
- final_res = str(res["aggregations"])
86
- elif query_dict is not None and aggs_dict is not None:
87
- # Return both hits and aggregations
88
- final_res = str(
89
- {
90
- "hits": res.get("hits", {}),
91
- "aggregations": res.get("aggregations", {}),
92
- }
93
- )
94
-
95
- else:
96
- final_res = str(res["hits"])
97
-
98
- tokens = encoding.encode(final_res)
99
- retries += 1
100
- if len(tokens) > 6000:
101
- size -= 1
102
- else:
103
- return final_res
104
-
105
- except Exception as e:
106
- logger.exception("Could not execute query %s", query)
107
- msg = str(e)
108
- return msg
109
-
110
-
111
- def create_search_tool(pcs_codes):
112
- return StructuredTool.from_function(
113
- func=lambda index_name, query, from_, size: elastic_search(
114
- pcs_codes=pcs_codes,
115
- index_name=index_name,
116
- query=query,
117
- from_=from_,
118
- size=size,
119
- ),
120
- name="elastic_index_search_tool",
121
- description=(
122
- """This tool allows executing queries on an Elasticsearch index efficiently. Provide:
123
- 1. index_name (string): The target Elasticsearch index.
124
- 2. query (dictionary): Defines the query structure, supporting:
125
- a. Filters: For precise data retrieval (e.g., match, term, range).
126
- b. Aggregations: For statistical summaries and grouping (e.g., sum, average, histogram).
127
- c. Full-text search: For analyzing and ranking text-based results (e.g., match, multi-match, query_string).
128
- """
129
- ),
130
- args_schema=SearchToolInput,
131
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ask_candid/tools/elastic/list_indices_tool.py DELETED
@@ -1,59 +0,0 @@
1
- from typing import Type, Optional, List
2
- import logging
3
-
4
- from pydantic import BaseModel, Field
5
-
6
- from elasticsearch import Elasticsearch
7
-
8
- from langchain.callbacks.manager import AsyncCallbackManagerForToolRun
9
- from langchain.tools.base import BaseTool
10
-
11
- from ask_candid.base.config.connections import SEMANTIC_ELASTIC_QA
12
-
13
- logging.basicConfig(level="INFO")
14
- logger = logging.getLogger("elasticsearch_playground")
15
- es = Elasticsearch(
16
- cloud_id=SEMANTIC_ELASTIC_QA.cloud_id,
17
- api_key=SEMANTIC_ELASTIC_QA.api_key,
18
- verify_certs=True,
19
- request_timeout=60 * 3
20
- )
21
-
22
-
23
- class ListIndicesInput(BaseModel):
24
- """Input for the list indices tool."""
25
-
26
- separator: str = Field(..., description="Separator for the list of indices")
27
-
28
-
29
- class ListIndicesTool(BaseTool):
30
- """Tool for getting all ElasticSearch indices."""
31
-
32
- name: str = "elastic_list_indices" # Added type annotation
33
- description: str = (
34
- "Input is a delimiter like comma or new line. Output is a separated list of indices in the database. "
35
- "Always use this tool to get to know the indices in the ElasticSearch cluster."
36
- )
37
- args_schema: Optional[Type[BaseModel]] = (
38
- ListIndicesInput # Define this before methods
39
- )
40
-
41
- def _run(self, separator: str) -> str:
42
- """Get all indices in the Elasticsearch server, usually separated by a line break."""
43
- try:
44
- # Ensure that `es` is correctly initialized before calling this method
45
- indices: List[str] = es.cat.indices(h="index", s="index").split()
46
- # Filter out hidden indices starting with a dot
47
- return separator.join(
48
- [index for index in indices if not index.startswith(".")]
49
- )
50
- except Exception as e:
51
- logger.exception("Could not list indices: %s", e)
52
- return ""
53
-
54
- async def _arun(
55
- self,
56
- separator: str = "",
57
- run_manager: Optional[AsyncCallbackManagerForToolRun] = None,
58
- ) -> str:
59
- raise NotImplementedError("ListIndicesTool does not support async operations")