Cohere-Streamlit / src /streamlit_app.py
kgauvin603's picture
Update src/streamlit_app.py
fad0863 verified
# src/streamlit_app.py
import os, json, re, base64, configparser
import streamlit as st
st.set_page_config(page_title="OCI GenAI Agent β€” Streamlit", page_icon="πŸ’¬", layout="wide")
st.title("πŸ’¬ OCI Generative AI Agent β€” Streamlit")
st.caption("Uses OCI_CONFIG_CONTENT + OCI_KEY_BASE64 by default, rewrites key_file β†’ /tmp/oci_api_key.pem, and dedupes config keys.")
# ───────────────────────────────────────────────────────────────────────────────
# Sidebar: minimal controls
# ───────────────────────────────────────────────────────────────────────────────
with st.sidebar:
st.subheader("OCI Settings")
agent_endpoint_ocid = st.text_input(
"Agent Endpoint OCID",
value=os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID", ""),
placeholder="ocid1.genaiagentendpoint.oc1....",
)
runtime_override = st.text_input(
"Runtime endpoint override (optional)",
value=os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", ""),
placeholder="https://agent-runtime.generativeai.us-ashburn-1.oci.oraclecloud.com",
help="Only set if region autodetection fails.",
)
st.markdown("---")
if st.button("Auth self-test"):
try:
cfg, _ = None, None
cfg, _ = None, None # avoid lint noise
cfg, _ = (lambda: build_config_from_env())()
st.success("βœ… OCI config looks valid.")
masked = {k: (v[:22] + "…") if isinstance(v, str) and k != "key_content" else v for k, v in cfg.items()}
st.code(json.dumps(masked, indent=2))
except Exception as e:
st.error(f"❌ Auth/Config error: {e}")
with st.expander("Show (masked) environment"):
safe = {
"OCI_CONFIG_CONTENT": "present" if os.getenv("OCI_CONFIG_CONTENT") else "absent",
"OCI_KEY_BASE64": "present" if os.getenv("OCI_KEY_BASE64") else "absent",
"OCI_REGION": os.getenv("OCI_REGION", ""),
"OCI_TENANCY_OCID": (os.getenv("OCI_TENANCY_OCID","")[:22] + "…") if os.getenv("OCI_TENANCY_OCID") else "",
"OCI_USER_OCID": (os.getenv("OCI_USER_OCID","")[:22] + "…") if os.getenv("OCI_USER_OCID") else "",
"OCI_GENAI_AGENT_ENDPOINT_OCID": (os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID","")[:26] + "…") if os.getenv("OCI_GENAI_AGENT_ENDPOINT_OCID") else "",
}
st.code(json.dumps(safe, indent=2))
# ───────────────────────────────────────────────────────────────────────────────
# OCI Config helpers β€” prefer OCI_CONFIG_CONTENT + OCI_KEY_BASE64
# ───────────────────────────────────────────────────────────────────────────────
import oci
from oci.exceptions import InvalidConfig
from oci.generative_ai_agent_runtime import GenerativeAiAgentRuntimeClient
import oci.generative_ai_agent_runtime.models as models
OCID_RE = {
"tenancy": re.compile(r"^ocid1\.tenancy\..+"),
"user": re.compile(r"^ocid1\.user\..+"),
}
def _write_files_from_config_content() -> str:
"""
Use OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM.
- Deduplicate options in [DEFAULT]
- Force key_file=/tmp/oci_api_key.pem
- Ensure region exists (fallback to OCI_REGION secret if missing)
Returns config path or "" if OCI_CONFIG_CONTENT not provided.
"""
cfg_text = os.getenv("OCI_CONFIG_CONTENT", "").strip()
if not cfg_text:
return ""
# Decode key (prefer base64)
key_b64 = os.getenv("OCI_KEY_BASE64", "").strip()
key_pem = os.getenv("OCI_KEY_PEM", "").strip()
if key_b64:
try:
pem_text = base64.b64decode(key_b64).decode("utf-8")
except Exception as e:
st.error(f"Failed to decode OCI_KEY_BASE64: {e}")
st.stop()
else:
pem_text = key_pem
if "BEGIN PRIVATE KEY" not in pem_text:
st.error("Missing/invalid key: set OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM with PEM headers.")
st.stop()
# Write key with strict perms
key_path = "/tmp/oci_api_key.pem"
with open(key_path, "w", encoding="utf-8") as f:
f.write(pem_text)
os.chmod(key_path, 0o600)
# Parse config with strict=False to tolerate duplicates, then clean
parser = configparser.ConfigParser(interpolation=None, strict=False)
if "[DEFAULT]" not in cfg_text:
cfg_text = "[DEFAULT]\n" + cfg_text
parser.read_string(cfg_text)
if "DEFAULT" not in parser:
st.error("OCI_CONFIG_CONTENT must contain a [DEFAULT] profile.")
st.stop()
# Build a clean DEFAULT dict (first occurrence wins)
clean = {}
for k, v in parser["DEFAULT"].items():
key = k.strip().lower()
if key not in clean:
clean[key] = v.strip()
# Ensure region exists (fall back to OCI_REGION if missing)
if not clean.get("region"):
env_region = os.getenv("OCI_REGION", "").strip()
if not env_region:
st.error("Your OCI_CONFIG_CONTENT is missing 'region=', and OCI_REGION secret is not set.")
st.stop()
clean["region"] = env_region
# Force key_file path to our decoded key
clean["key_file"] = key_path
# Write sanitized config
oci_dir = os.path.expanduser("~/.oci")
os.makedirs(oci_dir, exist_ok=True)
cfg_path = os.path.join(oci_dir, "config")
out = configparser.ConfigParser(interpolation=None)
out["DEFAULT"] = clean
with open(cfg_path, "w", encoding="utf-8") as f:
out.write(f)
return cfg_path
def build_config_from_env():
"""
Preferred path: OCI_CONFIG_CONTENT + OCI_KEY_BASE64/PEM (sanitized) β†’ from_file().
Fallback: discrete env vars (TENANCY/USER/FINGERPRINT/REGION + key).
"""
cfg_path = _write_files_from_config_content()
if cfg_path:
try:
cfg = oci.config.from_file(cfg_path, profile_name="DEFAULT")
oci.config.validate_config(cfg)
return cfg, {}
except InvalidConfig as e:
st.error(f"OCI config invalid (from file): {e}")
st.stop()
# Fallback to discrete envs (if you really don't want OCI_CONFIG_CONTENT)
tenancy = os.getenv("OCI_TENANCY_OCID", "").strip()
user = os.getenv("OCI_USER_OCID", "").strip()
fingerprint = os.getenv("OCI_FINGERPRINT", "").strip()
region = os.getenv("OCI_REGION", "").strip()
key_b64 = os.getenv("OCI_KEY_BASE64", "").strip()
key_pem = os.getenv("OCI_KEY_PEM", "").strip()
passphrase = os.getenv("OCI_KEY_PASSPHRASE", None)
errs = []
if not tenancy or not OCID_RE["tenancy"].match(tenancy): errs.append("OCI_TENANCY_OCID malformed.")
if not user or not OCID_RE["user"].match(user): errs.append("OCI_USER_OCID malformed.")
if not fingerprint: errs.append("OCI_FINGERPRINT required.")
if not region: errs.append("OCI_REGION required (e.g., us-ashburn-1).")
key_content = ""
if key_b64:
try:
key_content = base64.b64decode(key_b64).decode("utf-8")
if "BEGIN PRIVATE KEY" not in key_content:
errs.append("OCI_KEY_BASE64 decoded but missing PEM header/footer.")
except Exception as e:
errs.append(f"OCI_KEY_BASE64 decode failed: {e}")
elif key_pem:
key_content = key_pem
if "BEGIN PRIVATE KEY" not in key_content:
errs.append("OCI_KEY_PEM missing PEM header/footer.")
else:
errs.append("Provide OCI_KEY_BASE64 (preferred) or OCI_KEY_PEM.")
if errs:
st.error("Config issues:\n- " + "\n- ".join(errs))
st.stop()
cfg = {
"tenancy": tenancy,
"user": user,
"fingerprint": fingerprint,
"region": region,
"key_content": key_content,
}
if passphrase:
cfg["pass_phrase"] = passphrase
try:
oci.config.validate_config(cfg)
except InvalidConfig as e:
st.error(f"OCI config invalid (env mode): {e}")
st.stop()
return cfg, {}
# ───────────────────────────────────────────────────────────────────────────────
# Client builder β€” no cache while debugging
# ───────────────────────────────────────────────────────────────────────────────
def get_client(runtime_ep_override: str = "") -> GenerativeAiAgentRuntimeClient:
cfg, kwargs = build_config_from_env()
if runtime_ep_override:
return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=runtime_ep_override, **kwargs)
env_ep = os.getenv("OCI_GENAI_AGENT_RUNTIME_ENDPOINT", "").strip()
if env_ep:
return GenerativeAiAgentRuntimeClient(cfg, service_endpoint=env_ep, **kwargs)
return GenerativeAiAgentRuntimeClient(cfg, **kwargs)
def ensure_session(client: GenerativeAiAgentRuntimeClient, endpoint_ocid: str) -> str:
if st.session_state.get("agent_session_id"):
return st.session_state["agent_session_id"]
details = models.CreateSessionDetails(display_name="hf-session", description="HF Streamlit session")
resp = client.create_session(details, endpoint_ocid)
st.session_state["agent_session_id"] = resp.data.id
return st.session_state["agent_session_id"]
def send_chat(client, endpoint_ocid: str, session_id: str, text: str) -> str:
payload = models.ChatDetails(user_message=text, session_id=session_id)
resp = client.chat(endpoint_ocid, payload)
try:
return resp.data.message.content.text
except Exception:
return str(resp.data)
# ───────────────────────────────────────────────────────────────────────────────
# Chat UI
# ───────────────────────────────────────────────────────────────────────────────
if "messages" not in st.session_state:
st.session_state["messages"] = []
if "agent_session_id" not in st.session_state:
st.session_state["agent_session_id"] = ""
colA, colB = st.columns([3,1])
with colB:
if st.button("Start New Session"):
st.session_state["messages"] = []
st.session_state["agent_session_id"] = ""
if st.button("Clear Chat"):
st.session_state["messages"] = []
if st.session_state["agent_session_id"]:
st.caption(f"Session: `{st.session_state['agent_session_id']}`")
for m in st.session_state["messages"]:
with st.chat_message(m["role"]):
st.markdown(m["content"])
prompt = st.chat_input("Type your message…")
if prompt:
if not agent_endpoint_ocid:
st.warning("Please provide your Agent Endpoint OCID in the sidebar.")
else:
st.session_state["messages"].append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
with st.chat_message("assistant"):
box = st.empty()
try:
client = get_client(runtime_override)
sid = ensure_session(client, agent_endpoint_ocid)
with st.spinner("Thinking…"):
reply = send_chat(client, agent_endpoint_ocid, sid, prompt)
box.markdown(reply)
st.session_state["messages"].append({"role": "assistant", "content": reply})
except Exception as e:
box.error(f"❌ Error: {e}")
st.exception(e)