""" FixFlow — Streamlit Frontend Autonomous Bug Resolution Agent powered by GLM 5.1 (Z.ai) """ import time import logging import threading from typing import Optional import streamlit as st from backend.agent import AgentResult, FixFlowAgent, generate_full_report from backend.config import GLM_MODEL, GLM_BASE_URL, GLM_API_KEY, GITHUB_TOKEN from backend.github_client import GitHubClient from backend.llm_client import GLMClient # ── Logging Setup ───────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("fixflow.app") # ── Page Config ─────────────────────────────────────────────────────────────── st.set_page_config( page_title="FixFlow — Autonomous Bug Resolution Agent", page_icon="🔧", layout="wide", initial_sidebar_state="collapsed", ) # ── Custom CSS ──────────────────────────────────────────────────────────────── st.markdown(""" """, unsafe_allow_html=True) # ── Session State Init ──────────────────────────────────────────────────────── def init_session(): defaults = { "result": None, "running": False, "step_statuses": {}, "step_messages": {}, "stream_buffer": "", "error": None, "glm_api_key": GLM_API_KEY, "github_token": GITHUB_TOKEN, "model": GLM_MODEL, "run_confidence": True, } for k, v in defaults.items(): if k not in st.session_state: st.session_state[k] = v init_session() # Header st.markdown("""
FixFlow
Autonomous Bug Resolution Agent
⚡ GLM 5.1 by Z.ai
""", unsafe_allow_html=True) # ── Input Section ───────────────────────────────────────────────────────────── st.markdown('
', unsafe_allow_html=True) st.markdown("### 🎯 Analyze a GitHub Issue") st.markdown('
Paste a GitHub issue URL and the repository to analyze. FixFlow will autonomously trace the root cause and generate a fix.
', unsafe_allow_html=True) col1, col2 = st.columns(2) with col1: issue_url = st.text_input( "GitHub Issue URL", placeholder="https://github.com/owner/repo/issues/123", help="Full URL to the GitHub issue you want to fix", key="issue_url_input", ) with col2: repo_url = st.text_input( "Repository URL", placeholder="https://github.com/owner/repo", help="The repository containing the buggy code", key="repo_url_input", ) # Auto-fill repo from issue URL if issue_url and not repo_url: # Try to extract repo from issue URL import re m = re.match(r"(https://github\.com/[^/]+/[^/]+)/issues/\d+", issue_url.strip()) if m: repo_url = m.group(1) # Example buttons st.markdown('
💡 Try with an example:
', unsafe_allow_html=True) ex_col1, ex_col2, ex_col3 = st.columns(3) with ex_col1: if st.button("FastAPI #1234 example", key="ex1", help="Example issue"): st.info("Set issue URL to a real FastAPI issue, e.g.: https://github.com/tiangolo/fastapi/issues/10876") with ex_col2: if st.button("Requests #6710 example", key="ex2", help="Example issue"): st.info("Set issue URL to: https://github.com/psf/requests/issues/6710") with ex_col3: if st.button("Flask #5742 example", key="ex3", help="Example issue"): st.info("Set issue URL to: https://github.com/pallets/flask/issues/5742") st.markdown('
', unsafe_allow_html=True) # ── Analyze Button ──────────────────────────────────────────────────────────── st.markdown("
", unsafe_allow_html=True) btn_col, info_col = st.columns([1, 3]) with btn_col: analyze_clicked = st.button( "🚀 Analyze & Fix", key="analyze_btn", type="primary", disabled=st.session_state.running, use_container_width=True, ) with info_col: if st.session_state.running: st.markdown( '
' '⏳ Analysis in progress... This may take 1-3 minutes depending on repo size.' '
', unsafe_allow_html=True ) elif st.session_state.result: total_time = sum(st.session_state.result.step_timings.values()) st.markdown( f'
' f'✅ Last analysis completed in {total_time:.1f}s
', unsafe_allow_html=True ) # ── Pipeline Execution ──────────────────────────────────────────────────────── STEP_LABELS = { "0_fetch": ("📡", "Fetching GitHub Data"), "1_issue": ("📋", "Analyzing Bug Report"), "2_mapping": ("🗺️", "Mapping Codebase"), "3_analysis": ("🔬", "Root Cause Analysis"), "4_fix": ("🔧", "Generating Fix"), "5_diff": ("📝", "Creating PR Description"), "6_confidence": ("🎯", "Confidence Evaluation"), } def run_agent(): """Execute the FixFlow agent pipeline (runs in main thread for Streamlit).""" st.session_state.running = True st.session_state.result = None st.session_state.error = None st.session_state.step_statuses = {} st.session_state.step_messages = {} st.session_state.stream_buffer = "" def on_status(step: str, status: str, message: str): st.session_state.step_statuses[step] = status st.session_state.step_messages[step] = message def on_stream(chunk: str): st.session_state.stream_buffer += chunk try: llm = GLMClient( api_key=st.session_state.glm_api_key, base_url=GLM_BASE_URL, model=st.session_state.model, ) gh = GitHubClient(token=st.session_state.github_token or None) agent = FixFlowAgent(llm_client=llm, github_client=gh) result = agent.run( issue_url=issue_url.strip(), repo_url=repo_url.strip(), on_status=on_status, stream_callback=on_stream, run_confidence_eval=st.session_state.run_confidence, ) st.session_state.result = result except Exception as e: st.session_state.error = str(e) logger.exception("Agent pipeline error") finally: st.session_state.running = False # Trigger on button click if analyze_clicked: if not st.session_state.glm_api_key: st.error("⚠️ Please enter your GLM API key in the sidebar.") elif not issue_url: st.error("⚠️ Please enter a GitHub Issue URL.") elif not repo_url: st.error("⚠️ Please enter the Repository URL.") else: run_agent() st.rerun() # ── Error Display ───────────────────────────────────────────────────────────── if st.session_state.error: st.error(f"❌ **Error:** {st.session_state.error}") with st.expander("🐛 Debug Information"): st.code(st.session_state.error) # ── Pipeline Progress ───────────────────────────────────────────────────────── if st.session_state.step_statuses or st.session_state.result: st.markdown("---") st.markdown("### ⚡ Pipeline Progress") statuses = st.session_state.step_statuses result: Optional[AgentResult] = st.session_state.result timings = result.step_timings if result else {} status_icons = { "running": "⏳", "complete": "✅", "error": "❌", "info": "ℹ️", } progress_cols = st.columns(min(len(STEP_LABELS), 4)) step_items = list(STEP_LABELS.items()) for i, (step_id, (icon, label)) in enumerate(step_items): status = statuses.get(step_id, "idle") timing = timings.get(step_id) css_class = f"step-{status}" if status != "idle" else "step-idle" status_icon = status_icons.get(status, "⬜") time_str = f"{timing:.1f}s" if timing else "" st.markdown( f'
' f'{status_icon}' f'{icon} {label}' f'{time_str}' f'
', unsafe_allow_html=True, ) # ── Results ─────────────────────────────────────────────────────────────────── if st.session_state.result: result: AgentResult = st.session_state.result st.markdown("---") # ── Summary Stats ───────────────────────────────────────────────────────── total_time = sum(result.step_timings.values()) stats = result.diff_stats st.markdown("### 📊 Analysis Summary") m1, m2, m3, m4 = st.columns(4) with m1: st.markdown( f'
' f'
{len(result.suspect_file_paths)}
' f'
Files Analyzed
' f'
', unsafe_allow_html=True ) with m2: st.markdown( f'
' f'
{stats.get("files_changed", 0)}
' f'
Files Changed
' f'
', unsafe_allow_html=True ) with m3: st.markdown( f'
' f'
+{stats.get("lines_added", 0)}
' f'
Lines Added
' f'
', unsafe_allow_html=True ) with m4: st.markdown( f'
' f'
{total_time:.0f}s
' f'
Total Time
' f'
', unsafe_allow_html=True ) st.markdown("
", unsafe_allow_html=True) # ── Step 1: Bug Summary ─────────────────────────────────────────────────── with st.expander("📋 Step 1: Bug Summary", expanded=True): st.markdown( f'
' f'⏱️ Completed in {result.step_timings.get("1_issue", 0):.1f}s' f'
', unsafe_allow_html=True ) st.markdown(result.bug_summary) # ── Step 2: Relevant Files ──────────────────────────────────────────────── with st.expander("🔍 Step 2: Relevant Files & Codebase Mapping", expanded=False): st.markdown( f'
' f'⏱️ Completed in {result.step_timings.get("2_mapping", 0):.1f}s | ' f'Selected {len(result.suspect_file_paths)} files for deep analysis' f'
', unsafe_allow_html=True ) if result.suspect_file_paths: st.markdown("**🎯 Files Selected for Analysis:**") for i, fp in enumerate(result.suspect_file_paths, 1): st.markdown(f"`{i}.` `{fp}`") st.markdown("---") st.markdown(result.relevant_files_analysis) # ── Step 3: Root Cause Analysis ─────────────────────────────────────────── with st.expander("🔬 Step 3: Root Cause Analysis (Chain-of-Thought)", expanded=True): st.markdown( f'
' f'⏱️ Completed in {result.step_timings.get("3_analysis", 0):.1f}s | ' f'This is the core reasoning chain — read carefully!' f'
', unsafe_allow_html=True ) st.markdown(result.root_cause_analysis) # ── Step 4: Proposed Fix (Diff) ─────────────────────────────────────────── with st.expander("🔧 Step 4: Proposed Fix", expanded=True): st.markdown( f'
' f'⏱️ Completed in {result.step_timings.get("4_fix", 0):.1f}s | ' f'{stats.get("files_changed", 0)} file(s) modified, ' f'+{stats.get("lines_added", 0)} / -{stats.get("lines_removed", 0)} lines' f'
', unsafe_allow_html=True ) if result.diffs: # Syntax-highlighted diff for filepath, diff_content in result.diffs.items(): st.markdown(f"**`{filepath}`**") st.code(diff_content, language="diff") else: st.warning("⚠️ No diffs generated. The LLM may not have proposed direct file changes.") if result.fix_generation_raw: st.markdown("**Raw fix proposal from GLM:**") st.markdown(result.fix_generation_raw) # Copy button for full diff if result.diff_formatted and result.diffs: st.markdown("---") copy_col, pr_col, _ = st.columns([1, 1, 2]) with copy_col: st.download_button( "📋 Download .diff Patch", data=result.diff_formatted, file_name="fixflow.diff", mime="text/plain", use_container_width=True, ) with pr_col: if st.button("🚀 Open GitHub Pull Request", use_container_width=True, type="primary"): if not st.session_state.github_token: st.error("⚠️ A GitHub Token with write access is required to open a PR.") else: with st.spinner("🚀 Creating Pull Request..."): gh = GitHubClient(token=st.session_state.github_token) try: branch_name = f"fixflow-patch-{int(time.time())}" title = f"Fix: {result.issue_data.get('title', 'Issue')}" body = result.fix_explanation + "\n\n---\n*Generated autonomously by FixFlow*" pr_url = gh.create_pull_request( repo_url=result.repo_url, branch_name=branch_name, files_content=result.fixed_files, title=title, body=body ) st.success(f"✅ Created successfully: [View PR]({pr_url})") except Exception as e: st.error(f"Failed to create PR: {e}") # ── Step 5: Fix Explanation ─────────────────────────────────────────────── with st.expander("📝 Step 5: PR Description & Fix Explanation", expanded=True): st.markdown( f'
' f'⏱️ Completed in {result.step_timings.get("5_diff", 0):.1f}s' f'
', unsafe_allow_html=True ) st.markdown(result.fix_explanation) # ── Confidence Eval (optional) ──────────────────────────────────────────── if result.confidence_eval: with st.expander("🎯 Confidence Self-Evaluation", expanded=False): st.markdown(result.confidence_eval) # ── Export Full Report ──────────────────────────────────────────────────── st.markdown("---") st.markdown("### 📤 Export Report") exp_col1, exp_col2, _ = st.columns([1, 1, 2]) full_report = generate_full_report(result) issue_num = result.issue_data.get("number", "0") repo_slug = repo_url.strip().rstrip("/").split("/")[-1] if repo_url else "repo" with exp_col1: st.download_button( "📄 Download Full Report (.md)", data=full_report, file_name=f"fixflow_{repo_slug}_issue_{issue_num}.md", mime="text/markdown", use_container_width=True, ) with exp_col2: if result.diff_formatted and result.diffs: st.download_button( "📦 Download Patch (.diff)", data=result.diff_formatted, file_name=f"fixflow_{repo_slug}_issue_{issue_num}.diff", mime="text/plain", use_container_width=True, ) st.markdown("---") st.markdown( '
' '🔧 FixFlow — Autonomous Bug Resolution · Powered by ' 'GLM 5.1 by Z.ai' '
', unsafe_allow_html=True ) # ── Empty State ─────────────────────────────────────────────────────────────── elif not st.session_state.running and not st.session_state.error: st.markdown("
", unsafe_allow_html=True) col1, col2, col3 = st.columns(3) cards = [ ("🐛", "Bug Report Parsing", "Automatically extracts error messages, reproduction steps, affected components, and technical clues from any GitHub issue."), ("🧠", "Chain-of-Thought Reasoning", "Traces the execution flow step-by-step, citing specific file names, functions, and line numbers to pinpoint the root cause."), ("🔧", "PR-Ready Fixes", "Generates minimal, precise code fixes with unified diffs and a complete pull request description you can copy directly."), ] for col, (icon, title, desc) in zip([col1, col2, col3], cards): with col: st.markdown( f'
' f'
{icon}
' f'
{title}
' f'
{desc}
' f'
', unsafe_allow_html=True, ) st.markdown("
", unsafe_allow_html=True) # How it works st.markdown("### 🔄 How It Works") steps_html = """
""" how_steps = [ ("1", "📡", "Fetch Issue", "Pulls the full GitHub issue: title, body, comments, labels"), ("2", "🗺️", "Map Codebase", "Identifies top 5-10 suspect files from the repo tree"), ("3", "🔬", "Analyze Code", "Deep code reading with chain-of-thought root cause tracing"), ("4", "🔧", "Generate Fix", "Creates corrected file versions with minimal changes"), ("5", "📝", "Write PR", "Produces unified diff + human-readable PR description"), ] for num, icon, title, desc in how_steps: steps_html += f"""
{num}
{icon}
{title}
{desc}
""" steps_html += "
" st.markdown(steps_html, unsafe_allow_html=True)