Spaces:
Sleeping
Sleeping
| # src/streamlit_app.py | |
| import os, json, re, base64, configparser | |
| import streamlit as st | |
| st.set_page_config(page_title="OCI GenAI Agent β Streamlit", page_icon="π¬", layout="wide") | |
| st.title("π¬ OCI Generative AI Agent β Streamlit") | |
| st.caption("Uses OCI_CONFIG_CONTENT + OCI_KEY_BASE64 by default, rewrites key_file β /tmp/oci_api_key.pem, and dedupes config keys.") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Sidebar: minimal controls | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| with st.sidebar: | |
| st.subheader("OCI Settings") | |
| agent_endpoint_ocid = st.text_input( | |
| "Agent Endpoint OCID", | |
| value=os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID", ""), | |
| placeholder="ocid1.genaiagentendpoint.oc1....", | |
| ) | |
| runtime_override = st.text_input( | |
| "Runtime endpoint override (optional)", | |
| value=os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", ""), | |
| placeholder="https://agent-runtime.generativeai.us-ashburn-1.oci.oraclecloud.com", | |
| help="Only set if region autodetection fails.", | |
| ) | |
| st.markdown("---") | |
| if st.button("Auth self-test"): | |
| try: | |
| cfg, _ = None, None | |
| cfg, _ = None, None # avoid lint noise | |
| cfg, _ = (lambda: build_config_from_env())() | |
| st.success("β OCI config looks valid.") | |
| masked = {k: (v[:22] + "β¦") if isinstance(v, str) and k != "key_content" else v for k, v in cfg.items()} | |
| st.code(json.dumps(masked, indent=2)) | |
| except Exception as e: | |
| st.error(f"β Auth/Config error: {e}") | |
| with st.expander("Show (masked) environment"): | |
| safe = { | |
| "OCI_CONFIG_CONTENT": "present" if os.getenv("OCI_CONFIG_CONTENT") else "absent", | |
| "OCI_KEY_BASE64": "present" if os.getenv("OCI_KEY_BASE64") else "absent", | |
| "OCI_REGION": os.getenv("OCI_REGION", ""), | |
| "OCI_TENANCY_OCID": (os.getenv("OCI_TENANCY_OCID","")[:22] + "β¦") if os.getenv("OCI_TENANCY_OCID") else "", | |
| "OCI_USER_OCID": (os.getenv("OCI_USER_OCID","")[:22] + "β¦") if os.getenv("OCI_USER_OCID") else "", | |
| "OCI_GENAI_AGENT_ENDPOINT_OCID": (os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID","")[:26] + "β¦") if os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID") else "", | |
| } | |
| st.code(json.dumps(safe, indent=2)) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # OCI Config helpers β prefer OCI_CONFIG_CONTENT + OCI_KEY_BASE64 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| import oci | |
| from oci.exceptions import InvalidConfig | |
| from oci.generative_ai_agent_runtime import GenerativeAiAgentRuntimeClient | |
| import oci.generative_ai_agent_runtime.models as models | |
| OCID_RE = { | |
| "tenancy": re.compile(r"^ocid1\.tenancy\..+"), | |
| "user": re.compile(r"^ocid1\.user\..+"), | |
| } | |
| def _write_files_from_config_content() -> str: | |
| """ | |
| Use OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM. | |
| - Deduplicate options in [DEFAULT] | |
| - Force key_file=/tmp/oci_api_key.pem | |
| - Ensure region exists (fallback to OCI_REGION secret if missing) | |
| Returns config path or "" if OCI_CONFIG_CONTENT not provided. | |
| """ | |
| cfg_text = os.getenv("OCI_CONFIG_CONTENT", "").strip() | |
| if not cfg_text: | |
| return "" | |
| # Decode key (prefer base64) | |
| key_b64 = os.getenv("OCI_KEY_BASE64", "").strip() | |
| key_pem = os.getenv("OCI_KEY_PEM", "").strip() | |
| if key_b64: | |
| try: | |
| pem_text = base64.b64decode(key_b64).decode("utf-8") | |
| except Exception as e: | |
| st.error(f"Failed to decode OCI_KEY_BASE64: {e}") | |
| st.stop() | |
| else: | |
| pem_text = key_pem | |
| if "BEGIN PRIVATE KEY" not in pem_text: | |
| st.error("Missing/invalid key: set OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM with PEM headers.") | |
| st.stop() | |
| # Write key with strict perms | |
| key_path = "/tmp/oci_api_key.pem" | |
| with open(key_path, "w", encoding="utf-8") as f: | |
| f.write(pem_text) | |
| os.chmod(key_path, 0o600) | |
| # Parse config with strict=False to tolerate duplicates, then clean | |
| parser = configparser.ConfigParser(interpolation=None, strict=False) | |
| if "[DEFAULT]" not in cfg_text: | |
| cfg_text = "[DEFAULT]\n" + cfg_text | |
| parser.read_string(cfg_text) | |
| if "DEFAULT" not in parser: | |
| st.error("OCI_CONFIG_CONTENT must contain a [DEFAULT] profile.") | |
| st.stop() | |
| # Build a clean DEFAULT dict (first occurrence wins) | |
| clean = {} | |
| for k, v in parser["DEFAULT"].items(): | |
| key = k.strip().lower() | |
| if key not in clean: | |
| clean[key] = v.strip() | |
| # Ensure region exists (fall back to OCI_REGION if missing) | |
| if not clean.get("region"): | |
| env_region = os.getenv("OCI_REGION", "").strip() | |
| if not env_region: | |
| st.error("Your OCI_CONFIG_CONTENT is missing 'region=', and OCI_REGION secret is not set.") | |
| st.stop() | |
| clean["region"] = env_region | |
| # Force key_file path to our decoded key | |
| clean["key_file"] = key_path | |
| # Write sanitized config | |
| oci_dir = os.path.expanduser("~/.oci") | |
| os.makedirs(oci_dir, exist_ok=True) | |
| cfg_path = os.path.join(oci_dir, "config") | |
| out = configparser.ConfigParser(interpolation=None) | |
| out["DEFAULT"] = clean | |
| with open(cfg_path, "w", encoding="utf-8") as f: | |
| out.write(f) | |
| return cfg_path | |
| def build_config_from_env(): | |
| """ | |
| Preferred path: OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM (sanitized) β from_file(). | |
| Fallback: discrete env vars (TENANCY/USER/FINGERPRINT/REGION + key). | |
| """ | |
| cfg_path = _write_files_from_config_content() | |
| if cfg_path: | |
| try: | |
| cfg = oci.config.from_file(cfg_path, profile_name="DEFAULT") | |
| oci.config.validate_config(cfg) | |
| return cfg, {} | |
| except InvalidConfig as e: | |
| st.error(f"OCI config invalid (from file): {e}") | |
| st.stop() | |
| # Fallback to discrete envs (if you really don't want OCI_CONFIG_CONTENT) | |
| tenancy = os.getenv("OCI_TENANCY_OCID", "").strip() | |
| user = os.getenv("OCI_USER_OCID", "").strip() | |
| fingerprint = os.getenv("OCI_FINGERPRINT", "").strip() | |
| region = os.getenv("OCI_REGION", "").strip() | |
| key_b64 = os.getenv("OCI_KEY_BASE64", "").strip() | |
| key_pem = os.getenv("OCI_KEY_PEM", "").strip() | |
| passphrase = os.getenv("OCI_KEY_PASSPHRASE", None) | |
| errs = [] | |
| if not tenancy or not OCID_RE["tenancy"].match(tenancy): errs.append("OCI_TENANCY_OCID malformed.") | |
| if not user or not OCID_RE["user"].match(user): errs.append("OCI_USER_OCID malformed.") | |
| if not fingerprint: errs.append("OCI_FINGERPRINT required.") | |
| if not region: errs.append("OCI_REGION required (e.g., us-ashburn-1).") | |
| key_content = "" | |
| if key_b64: | |
| try: | |
| key_content = base64.b64decode(key_b64).decode("utf-8") | |
| if "BEGIN PRIVATE KEY" not in key_content: | |
| errs.append("OCI_KEY_BASE64 decoded but missing PEM header/footer.") | |
| except Exception as e: | |
| errs.append(f"OCI_KEY_BASE64 decode failed: {e}") | |
| elif key_pem: | |
| key_content = key_pem | |
| if "BEGIN PRIVATE KEY" not in key_content: | |
| errs.append("OCI_KEY_PEM missing PEM header/footer.") | |
| else: | |
| errs.append("Provide OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM.") | |
| if errs: | |
| st.error("Config issues:\n- " + "\n- ".join(errs)) | |
| st.stop() | |
| cfg = { | |
| "tenancy": tenancy, | |
| "user": user, | |
| "fingerprint": fingerprint, | |
| "region": region, | |
| "key_content": key_content, | |
| } | |
| if passphrase: | |
| cfg["pass_phrase"] = passphrase | |
| try: | |
| oci.config.validate_config(cfg) | |
| except InvalidConfig as e: | |
| st.error(f"OCI config invalid (env mode): {e}") | |
| st.stop() | |
| return cfg, {} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Client builder β no cache while debugging | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_client(runtime_ep_override: str = "") -> GenerativeAiAgentRuntimeClient: | |
| cfg, kwargs = build_config_from_env() | |
| if runtime_ep_override: | |
| return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=runtime_ep_override, **kwargs) | |
| env_ep = os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", "").strip() | |
| if env_ep: | |
| return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=env_ep, **kwargs) | |
| return GenerativeAiAgentRuntimeClient(cfg, **kwargs) | |
| def ensure_session(client: GenerativeAiAgentRuntimeClient, endpoint_ocid: str) -> str: | |
| if st.session_state.get("agent_session_id"): | |
| return st.session_state["agent_session_id"] | |
| details = models.CreateSessionDetails(display_name="hf-session", description="HF Streamlit session") | |
| resp = client.create_session(details, endpoint_ocid) | |
| st.session_state["agent_session_id"] = resp.data.id | |
| return st.session_state["agent_session_id"] | |
| def send_chat(client, endpoint_ocid: str, session_id: str, text: str) -> str: | |
| payload = models.ChatDetails(user_message=text, session_id=session_id) | |
| resp = client.chat(endpoint_ocid, payload) | |
| try: | |
| return resp.data.message.content.text | |
| except Exception: | |
| return str(resp.data) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Chat UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if "messages" not in st.session_state: | |
| st.session_state["messages"] = [] | |
| if "agent_session_id" not in st.session_state: | |
| st.session_state["agent_session_id"] = "" | |
| colA, colB = st.columns([3,1]) | |
| with colB: | |
| if st.button("Start New Session"): | |
| st.session_state["messages"] = [] | |
| st.session_state["agent_session_id"] = "" | |
| if st.button("Clear Chat"): | |
| st.session_state["messages"] = [] | |
| if st.session_state["agent_session_id"]: | |
| st.caption(f"Session: `{st.session_state['agent_session_id']}`") | |
| for m in st.session_state["messages"]: | |
| with st.chat_message(m["role"]): | |
| st.markdown(m["content"]) | |
| prompt = st.chat_input("Type your messageβ¦") | |
| if prompt: | |
| if not agent_endpoint_ocid: | |
| st.warning("Please provide your Agent Endpoint OCID in the sidebar.") | |
| else: | |
| st.session_state["messages"].append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| with st.chat_message("assistant"): | |
| box = st.empty() | |
| try: | |
| client = get_client(runtime_override) | |
| sid = ensure_session(client, agent_endpoint_ocid) | |
| with st.spinner("Thinkingβ¦"): | |
| reply = send_chat(client, agent_endpoint_ocid, sid, prompt) | |
| box.markdown(reply) | |
| st.session_state["messages"].append({"role": "assistant", "content": reply}) | |
| except Exception as e: | |
| box.error(f"β Error: {e}") | |
| st.exception(e) | |