| | from io import StringIO |
| | import streamlit as st |
| |
|
| | from smem_token_parser import SMEM_Parser, read_tokens_from_lines |
| | from smem_obj import SMEM_Obj, ObjType |
| |
|
| | MIN_COLS = 4 |
| |
|
| | |
| | st.set_page_config(page_title="Soar Agent Memory Inspector", layout="wide") |
| | st.title("Logic Inspector") |
| |
|
| |
|
| | def get_smem_root_from_file(smem_file): |
| | |
| | tokens = read_tokens_from_lines(smem_file) |
| | if tokens == None: |
| | st.error("Error reading file: '"+str(smem_file)+"'") |
| | return None |
| | parser = SMEM_Parser() |
| | parser.parse_file(tokens) |
| | return parser.get_context_root() |
| |
|
| | def reset_cols(): |
| | st.session_state.col_obj_list = [None if i > 0 else x for i,x in enumerate(st.session_state.col_obj_list)] |
| |
|
| | |
| |
|
| | if "col_obj_list" not in st.session_state: |
| | st.session_state["col_obj_list"] = None |
| |
|
| | file_upload_expander = st.expander(label="Select a file to inspect", expanded=(st.session_state.col_obj_list == None)) |
| | file = file_upload_expander.file_uploader(" ") |
| |
|
| | if file is not None: |
| | if st.session_state.col_obj_list is None: |
| | root = get_smem_root_from_file(StringIO(file.getvalue().decode("utf-8"))) |
| | if root: |
| | st.session_state["col_obj_list"] = [None]*MIN_COLS |
| | st.session_state.col_obj_list[0] = root |
| | st.experimental_rerun() |
| | else: |
| | st.session_state["col_obj_list"] = None |
| |
|
| | if st.session_state.col_obj_list is None: |
| | st.stop() |
| |
|
| |
|
| | |
| | @st.cache(show_spinner=False, hash_funcs={SMEM_Obj: id}) |
| | def get_filter_features_dict(root_obj): |
| | return root_obj.get_referenced_features() |
| |
|
| | |
| | features_dict = get_filter_features_dict(st.session_state.col_obj_list[0]) |
| | filters_expander = st.expander(label="Filters") |
| | filters_cols = filters_expander.columns(min(len(features_dict), 5)) |
| | filters_dict = {} |
| | for i,key in enumerate(features_dict): |
| | col = filters_cols[i % len(filters_cols)] |
| | filters_dict[key] = col.multiselect(label=key, options=["(none)"]+sorted(features_dict[key]), on_change=reset_cols) |
| |
|
| |
|
| | |
| |
|
| | def add_col(index, obj): |
| | st.session_state.col_obj_list = st.session_state.col_obj_list[:index+1] |
| | st.session_state.col_obj_list.append(obj) |
| | |
| | st.experimental_rerun() |
| |
|
| | def get_header_str(obj_type): |
| | if obj_type == ObjType.CONTEXT: |
| | return "START" |
| | elif obj_type == ObjType.COND_CONJ: |
| | return "CONDITION" |
| | elif obj_type == ObjType.OP: |
| | return "CHOICE" |
| | elif obj_type == ObjType.ACT_GROUP: |
| | return "RESULT" |
| | else: |
| | return " " |
| |
|
| | def get_tested_wmes_from_obj_str(obj_str): |
| | attr_list = [] |
| | val_list = [] |
| | clauses = str(obj_str).split(" AND ") |
| | for clause in clauses: |
| | attr,val = clause.replace("(","").replace(")","").split(" is ", maxsplit=1) |
| | attr_list += [attr] |
| | val_list += [val] |
| | return attr_list, val_list |
| |
|
| | st.subheader("Click the buttons below to select conditions and to inspect resulting actions.") |
| |
|
| | cols = st.columns(max(MIN_COLS,len(st.session_state.col_obj_list))) |
| | |
| | for i,col in enumerate(cols): |
| | try: |
| | if st.session_state.col_obj_list[i] == None: |
| | break |
| | except Exception as e: |
| | |
| | break |
| | |
| | obj = st.session_state.col_obj_list[i] |
| | obj_str,obj_label,obj_desc = obj.to_string() |
| | sub_objs = list(obj.get_child_objects(compact=True)) |
| | sub_objs.sort(key=lambda x:x.to_string()[1]) |
| | |
| | if len(sub_objs) > 0: |
| | col.markdown("**"+get_header_str(sub_objs[0].obj_type)+"**",) |
| | col.markdown("---") |
| | |
| | col.text(obj_str) |
| | if obj_desc != None: |
| | col.markdown("*"+obj_desc+"*") |
| |
|
| | |
| | for j,sub in enumerate(sub_objs): |
| | sub_str,sub_label,sub_desc = sub.to_string() |
| | if sub_desc != None: |
| | button_text = sub_desc |
| | else: |
| | button_text = sub_label |
| | |
| | if sub.obj_type == ObjType.COND_PRIM or sub.obj_type == ObjType.COND_CONJ: |
| | |
| | keep = True |
| | attr_list, val_list = get_tested_wmes_from_obj_str(sub_desc) |
| | |
| | for attr in filters_dict: |
| | if attr not in attr_list: |
| | |
| | continue |
| | if val_list[attr_list.index(attr)] not in filters_dict[attr] and len(filters_dict[attr]) > 0: |
| | |
| | keep = False |
| | break |
| | if not keep: |
| | continue |
| |
|
| | if sub.obj_type == ObjType.OP: |
| | subsub = list(sub.get_child_objects(compact=True))[0] |
| | _,group_string,_ = subsub.to_string() |
| | group_string = "\n * "+str(group_string).replace("AND", "\n * ") |
| | col.markdown(group_string) |
| | if col.button(button_text, key="button"+str(i)+"-"+str(j)): |
| | add_col(i,sub) |
| | elif sub.obj_type == ObjType.ACT_GROUP: |
| | col.markdown("*Output Details:* ") |
| | col.markdown("\n * "+str(sub_str).replace(") AND", ")\n * ")) |
| | elif col.button(button_text, key="button"+str(i)+"-"+str(j)): |
| | add_col(i,sub) |
| |
|