# src/streamlit_app.py import os, json, re, base64, configparser import streamlit as st st.set_page_config(page_title="OCI GenAI Agent β€” Streamlit", page_icon="πŸ’¬", layout="wide") st.title("πŸ’¬ OCI Generative AI Agent β€” Streamlit") st.caption("Uses OCI_CONFIG_CONTENT + OCI_KEY_BASE64 by default, rewrites key_file β†’ /tmp/oci_api_key.pem, and dedupes config keys.") # ─────────────────────────────────────────────────────────────────────────────── # Sidebar: minimal controls # ─────────────────────────────────────────────────────────────────────────────── with st.sidebar: st.subheader("OCI Settings") agent_endpoint_ocid = st.text_input( "Agent Endpoint OCID", value=os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID", ""), placeholder="ocid1.genaiagentendpoint.oc1....", ) runtime_override = st.text_input( "Runtime endpoint override (optional)", value=os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", ""), placeholder="https://agent-runtime.generativeai.us-ashburn-1.oci.oraclecloud.com", help="Only set if region autodetection fails.", ) st.markdown("---") if st.button("Auth self-test"): try: cfg, _ = None, None cfg, _ = None, None # avoid lint noise cfg, _ = (lambda: build_config_from_env())() st.success("βœ… OCI config looks valid.") masked = {k: (v[:22] + "…") if isinstance(v, str) and k != "key_content" else v for k, v in cfg.items()} st.code(json.dumps(masked, indent=2)) except Exception as e: st.error(f"❌ Auth/Config error: {e}") with st.expander("Show (masked) environment"): safe = { "OCI_CONFIG_CONTENT": "present" if os.getenv("OCI_CONFIG_CONTENT") else "absent", "OCI_KEY_BASE64": "present" if os.getenv("OCI_KEY_BASE64") else "absent", "OCI_REGION": os.getenv("OCI_REGION", ""), "OCI_TENANCY_OCID": (os.getenv("OCI_TENANCY_OCID","")[:22] + "…") if os.getenv("OCI_TENANCY_OCID") else "", "OCI_USER_OCID": (os.getenv("OCI_USER_OCID","")[:22] + "…") if os.getenv("OCI_USER_OCID") else "", "OCI_GENAI_AGENT_ENDPOINT_OCID": (os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID","")[:26] + "…") if os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID") else "", } st.code(json.dumps(safe, indent=2)) # ─────────────────────────────────────────────────────────────────────────────── # OCI Config helpers β€” prefer OCI_CONFIG_CONTENT + OCI_KEY_BASE64 # ─────────────────────────────────────────────────────────────────────────────── import oci from oci.exceptions import InvalidConfig from oci.generative_ai_agent_runtime import GenerativeAiAgentRuntimeClient import oci.generative_ai_agent_runtime.models as models OCID_RE = { "tenancy": re.compile(r"^ocid1\.tenancy\..+"), "user": re.compile(r"^ocid1\.user\..+"), } def _write_files_from_config_content() -> str: """ Use OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM. - Deduplicate options in [DEFAULT] - Force key_file=/tmp/oci_api_key.pem - Ensure region exists (fallback to OCI_REGION secret if missing) Returns config path or "" if OCI_CONFIG_CONTENT not provided. """ cfg_text = os.getenv("OCI_CONFIG_CONTENT", "").strip() if not cfg_text: return "" # Decode key (prefer base64) key_b64 = os.getenv("OCI_KEY_BASE64", "").strip() key_pem = os.getenv("OCI_KEY_PEM", "").strip() if key_b64: try: pem_text = base64.b64decode(key_b64).decode("utf-8") except Exception as e: st.error(f"Failed to decode OCI_KEY_BASE64: {e}") st.stop() else: pem_text = key_pem if "BEGIN PRIVATE KEY" not in pem_text: st.error("Missing/invalid key: set OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM with PEM headers.") st.stop() # Write key with strict perms key_path = "/tmp/oci_api_key.pem" with open(key_path, "w", encoding="utf-8") as f: f.write(pem_text) os.chmod(key_path, 0o600) # Parse config with strict=False to tolerate duplicates, then clean parser = configparser.ConfigParser(interpolation=None, strict=False) if "[DEFAULT]" not in cfg_text: cfg_text = "[DEFAULT]\n" + cfg_text parser.read_string(cfg_text) if "DEFAULT" not in parser: st.error("OCI_CONFIG_CONTENT must contain a [DEFAULT] profile.") st.stop() # Build a clean DEFAULT dict (first occurrence wins) clean = {} for k, v in parser["DEFAULT"].items(): key = k.strip().lower() if key not in clean: clean[key] = v.strip() # Ensure region exists (fall back to OCI_REGION if missing) if not clean.get("region"): env_region = os.getenv("OCI_REGION", "").strip() if not env_region: st.error("Your OCI_CONFIG_CONTENT is missing 'region=', and OCI_REGION secret is not set.") st.stop() clean["region"] = env_region # Force key_file path to our decoded key clean["key_file"] = key_path # Write sanitized config oci_dir = os.path.expanduser("~/.oci") os.makedirs(oci_dir, exist_ok=True) cfg_path = os.path.join(oci_dir, "config") out = configparser.ConfigParser(interpolation=None) out["DEFAULT"] = clean with open(cfg_path, "w", encoding="utf-8") as f: out.write(f) return cfg_path def build_config_from_env(): """ Preferred path: OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM (sanitized) β†’ from_file(). Fallback: discrete env vars (TENANCY/USER/FINGERPRINT/REGION + key). """ cfg_path = _write_files_from_config_content() if cfg_path: try: cfg = oci.config.from_file(cfg_path, profile_name="DEFAULT") oci.config.validate_config(cfg) return cfg, {} except InvalidConfig as e: st.error(f"OCI config invalid (from file): {e}") st.stop() # Fallback to discrete envs (if you really don't want OCI_CONFIG_CONTENT) tenancy = os.getenv("OCI_TENANCY_OCID", "").strip() user = os.getenv("OCI_USER_OCID", "").strip() fingerprint = os.getenv("OCI_FINGERPRINT", "").strip() region = os.getenv("OCI_REGION", "").strip() key_b64 = os.getenv("OCI_KEY_BASE64", "").strip() key_pem = os.getenv("OCI_KEY_PEM", "").strip() passphrase = os.getenv("OCI_KEY_PASSPHRASE", None) errs = [] if not tenancy or not OCID_RE["tenancy"].match(tenancy): errs.append("OCI_TENANCY_OCID malformed.") if not user or not OCID_RE["user"].match(user): errs.append("OCI_USER_OCID malformed.") if not fingerprint: errs.append("OCI_FINGERPRINT required.") if not region: errs.append("OCI_REGION required (e.g., us-ashburn-1).") key_content = "" if key_b64: try: key_content = base64.b64decode(key_b64).decode("utf-8") if "BEGIN PRIVATE KEY" not in key_content: errs.append("OCI_KEY_BASE64 decoded but missing PEM header/footer.") except Exception as e: errs.append(f"OCI_KEY_BASE64 decode failed: {e}") elif key_pem: key_content = key_pem if "BEGIN PRIVATE KEY" not in key_content: errs.append("OCI_KEY_PEM missing PEM header/footer.") else: errs.append("Provide OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM.") if errs: st.error("Config issues:\n- " + "\n- ".join(errs)) st.stop() cfg = { "tenancy": tenancy, "user": user, "fingerprint": fingerprint, "region": region, "key_content": key_content, } if passphrase: cfg["pass_phrase"] = passphrase try: oci.config.validate_config(cfg) except InvalidConfig as e: st.error(f"OCI config invalid (env mode): {e}") st.stop() return cfg, {} # ─────────────────────────────────────────────────────────────────────────────── # Client builder β€” no cache while debugging # ─────────────────────────────────────────────────────────────────────────────── def get_client(runtime_ep_override: str = "") -> GenerativeAiAgentRuntimeClient: cfg, kwargs = build_config_from_env() if runtime_ep_override: return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=runtime_ep_override, **kwargs) env_ep = os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", "").strip() if env_ep: return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=env_ep, **kwargs) return GenerativeAiAgentRuntimeClient(cfg, **kwargs) def ensure_session(client: GenerativeAiAgentRuntimeClient, endpoint_ocid: str) -> str: if st.session_state.get("agent_session_id"): return st.session_state["agent_session_id"] details = models.CreateSessionDetails(display_name="hf-session", description="HF Streamlit session") resp = client.create_session(details, endpoint_ocid) st.session_state["agent_session_id"] = resp.data.id return st.session_state["agent_session_id"] def send_chat(client, endpoint_ocid: str, session_id: str, text: str) -> str: payload = models.ChatDetails(user_message=text, session_id=session_id) resp = client.chat(endpoint_ocid, payload) try: return resp.data.message.content.text except Exception: return str(resp.data) # ─────────────────────────────────────────────────────────────────────────────── # Chat UI # ─────────────────────────────────────────────────────────────────────────────── if "messages" not in st.session_state: st.session_state["messages"] = [] if "agent_session_id" not in st.session_state: st.session_state["agent_session_id"] = "" colA, colB = st.columns([3,1]) with colB: if st.button("Start New Session"): st.session_state["messages"] = [] st.session_state["agent_session_id"] = "" if st.button("Clear Chat"): st.session_state["messages"] = [] if st.session_state["agent_session_id"]: st.caption(f"Session: `{st.session_state['agent_session_id']}`") for m in st.session_state["messages"]: with st.chat_message(m["role"]): st.markdown(m["content"]) prompt = st.chat_input("Type your message…") if prompt: if not agent_endpoint_ocid: st.warning("Please provide your Agent Endpoint OCID in the sidebar.") else: st.session_state["messages"].append({"role": "user", "content": prompt}) with st.chat_message("user"): st.markdown(prompt) with st.chat_message("assistant"): box = st.empty() try: client = get_client(runtime_override) sid = ensure_session(client, agent_endpoint_ocid) with st.spinner("Thinking…"): reply = send_chat(client, agent_endpoint_ocid, sid, prompt) box.markdown(reply) st.session_state["messages"].append({"role": "assistant", "content": reply}) except Exception as e: box.error(f"❌ Error: {e}") st.exception(e)