OdysseyArena / GUI_Repo_Task.py
beatccjiang's picture
Upload project files to Hugging Face Spaces
907121e verified
# ==================== Repo 任务模块 ====================
"""
Repo 任务相关的所有函数和界面组件
支持多用户并发:使用 gr.State 管理每个用户会话的状态
使用统一进度管理模块存储数据
"""
import json
import os
from typing import List, Tuple, Optional, Dict, Any
import gradio as gr
# 导入统一进度管理模块
import progress_manager
# 导入 Repo 环境
import sys
current_dir = os.path.dirname(os.path.abspath(__file__))
repoenv_path = os.path.join(current_dir, "RepoEnv")
if os.path.exists(repoenv_path):
sys.path.insert(0, repoenv_path)
from RepoEnv_v7 import ComputerEnvSetupInductionEnvV7_5
# ------------------- 常量 -------------------
REPO_MAX_STEPS = 120
# ------------------- Example Text -------------------
REPO_EXAMPLE_TEXT = """
## 📖 Repository Environment Usage Example
### Example Scenario
You need to configure the Python environment and install the correct package versions so that the project can run successfully: python run.py
### Available Commands
- `pip install python==3.10` - Install Python version
- `pip install pkg0==1.2` - Install package (supports version constraints)
- `pip uninstall pkg0` - Uninstall package
- `pip list` - View current environment status
- `repo tree` - View repository structure
- `python run.py` - Run project (success means task completed)
### Example Hidden Rules (Users need to discover these in actual tasks)
- Requires python>=3.10
- Requires pkg1==1.0
- Requires pkg2>=1.2,<=2.0
- Requires pkg3<=1.0
- All version numbers of pkg3 must match pkg1 (including integer and decimal parts)
- Major version number of pkg2 must match pkg1 (integer part)
### Example Steps
1. **Step 1**: Input `pip install python==3.10`, click "Execute Action"
- Environment feedback: Successfully installed python==3.10
- Reason: Successfully installed
2. **Step 2**: Input `python run.py`, click "Execute Action"
- Environment feedback: ModuleNotFoundError: No module named 'pkg1'.
- Reason: pkg1 not installed
3. **Step 3**: Input `pip install pkg1==1.0`, click "Execute Action"
- Environment feedback: Successfully installed pkg1==1.0
- Reason: Successfully installed pkg1==1.0
4. **Step 4**: Input `python run.py`, click "Execute Action"
- Environment feedback: ModuleNotFoundError: No module named 'pkg2'.
- Reason: pkg2 not installed
5. **Step 5**: Input `pip install pkg2==2.0`, click "Execute Action"
- Environment feedback: Successfully installed pkg2==2.0
- Reason: Successfully installed pkg2==2.0
6. **Step 6**: Input `python run.py`, click "Execute Action"
- Environment feedback: RuntimeError: ABI mismatch detected between 'pkg6' and dependent packages.
- Reason: Major version number of pkg2 does not match pkg1
7. **Step 7**: Input `pip install pkg2==1.0`, click "Execute Action"
- Environment feedback: Successfully installed pkg3==1.0
- Reason: Successfully installed pkg3==1.2
8. **Step 8**: Input `python run.py`, click "Execute Action"
- Environment feedback: ModuleNotFoundError: No module named 'pkg3'.
- Reason: pkg2 not installed
9. **Step 9**: Input `pip install pkg3==1.0`, click "Execute Action"
- Environment feedback: Successfully installed pkg3==1.0
- Reason: Successfully installed pkg3==0.1
10. **Step 10**: Input `python run.py`, click "Execute Action"
- Environment feedback: RuntimeError: tightly-coupled components are out of sync with 'pkg1'.
- Reason: All version numbers of pkg3 must match pkg1 (including integer and decimal parts)
11. **Step 11**: Input `pip install pkg3==1.0`, click "Execute Action"
- Environment feedback: Successfully installed pkg3==1.0
- Reason: Successfully installed pkg3==1.0
12. **Step 12**: Input `python run.py`, click "Execute Action"
- Environment feedback: 🎉 Task completed! Project runs successfully!
- Reason: All conditions met
### Tips
- Packages may have dependencies and version conflicts
- Need to carefully handle version constraints
- Maximum 120 steps allowed
### Goal
Successfully run `python run.py` so that the project can execute normally
"""
# ------------------- 状态管理 -------------------
def create_repo_state() -> Dict[str, Any]:
"""创建初始的 Repo 任务状态(每个用户会话独立)"""
return {
'env': None, # ComputerEnvSetupInductionEnvV7_5 实例
'test_data': [], # 测试数据
'current_env_idx': 0, # 当前环境索引
'history_records': [], # 操作历史记录
}
# ------------------- 工具函数 -------------------
def format_repo_state(obs: Dict[str, Any]) -> str:
"""格式化 Repo 环境状态显示"""
lines = []
if obs.get("python_version"):
py_ver = obs["python_version"]
lines.append(f"Python: {py_ver[0]}.{py_ver[1]}")
else:
lines.append("Python: <not set>")
installed = obs.get("installed", {})
if installed:
lines.append("\nInstalled packages:")
for pkg in sorted(installed.keys()):
ver = installed[pkg]
lines.append(f" - {pkg}=={ver[0]}.{ver[1]}")
else:
lines.append("\nInstalled packages: <none>")
return "\n".join(lines)
def load_repo_test_data(state: Dict[str, Any], current_dir: str) -> Tuple[Dict[str, Any], str]:
"""加载 Repo 测试数据"""
test_file = os.path.join(
current_dir, "test_data/repo/test_repo_lite_251217.json")
if not os.path.exists(test_file):
test_file = "test_data/repo/test_repo_lite_251217.json"
try:
with open(test_file, 'r', encoding='utf-8') as f:
state['test_data'] = json.load(f)
return state, f"✅ Successfully loaded {len(state['test_data'])} test environments"
except FileNotFoundError:
return state, f"❌ File not found: {test_file}"
except Exception as e:
return state, f"❌ Load failed: {str(e)}"
def repo_save_progress_internal(state: Dict[str, Any], current_user_id: str, save_dir: str) -> str:
"""保存 Repo 环境进度(使用统一进度管理模块)"""
# Auto-generate user ID if not provided
if not current_user_id:
import uuid
current_user_id = f"user_{uuid.uuid4().hex[:8]}"
env = state.get('env')
if env is None:
return "⚠️ No progress to save"
try:
obs = env._get_obs()
current_env_idx = state.get('current_env_idx', 0)
history_records = state.get('history_records', [])
# 转换版本格式为 JSON 兼容格式
python_version_json = None
if obs.get("python_version"):
py_ver = obs["python_version"]
if isinstance(py_ver, tuple):
python_version_json = list(py_ver)
else:
python_version_json = py_ver
installed_json = {}
installed_raw = obs.get("installed", {})
for pkg, ver in installed_raw.items():
if isinstance(ver, tuple):
installed_json[pkg] = list(ver)
else:
installed_json[pkg] = ver
env_progress = {
"user_id": current_user_id,
"env_idx": current_env_idx,
"env_idx_display": current_env_idx + 1,
# 不再保存 world_spec,因为可以从 test_data[env_idx] 获取
"python_version": python_version_json,
"installed": installed_json,
"history": history_records,
"num_steps": obs.get("step", 0),
"done": obs.get("success", False),
"success": obs.get("success", False),
}
result = progress_manager.save_task_environment_progress(
current_user_id, save_dir, "repo", current_env_idx, env_progress
)
return f"✅ Progress saved (Environment {current_env_idx + 1}, Steps {len(history_records)})"
except Exception as e:
return f"❌ Save failed: {str(e)}"
def repo_load_environment(state: Dict[str, Any], env_idx_display: int, current_user_id: str, save_dir: str) -> Tuple[Dict[str, Any], str, str, str, str, str, str]:
"""加载 Repo 环境(使用统一进度管理模块)
Returns: (state, info, state_display, logic, history_display, progress, steps_info)
"""
# Auto-generate user ID if not provided
if not current_user_id:
import uuid
current_user_id = f"user_{uuid.uuid4().hex[:8]}"
test_data = state.get('test_data', [])
if not test_data:
return state, "❌ Please load test data first", "", "", "", "Click 'View Uncompleted Problems' button to view progress", "0 / 120"
env_idx = env_idx_display - 1
if env_idx < 0 or env_idx >= len(test_data):
return state, f"❌ Environment index out of range (1-{len(test_data)})", "", "", "", "Click 'View Unfinished Problems' button to view progress", "0 / 120"
# 使用统一进度管理模块检查是否有保存的进度
saved_progress_data = progress_manager.get_task_environment_progress(
current_user_id, save_dir, "repo", env_idx
)
# 如果有保存的进度,加载它
if saved_progress_data:
state['current_env_idx'] = env_idx
state['history_records'] = saved_progress_data.get("history", [])
num_steps = saved_progress_data.get("num_steps", len(state['history_records']))
# 从 test_data 获取 world_spec(不再从保存的数据中获取,以节省存储空间)
# 为了向后兼容,如果保存的数据中有 world_spec,优先使用(旧数据可能没有 test_data)
world_spec = saved_progress_data.get("world_spec")
if not world_spec and env_idx < len(test_data):
world_spec = test_data[env_idx]
if world_spec:
state['env'] = ComputerEnvSetupInductionEnvV7_5(world_spec, max_steps=REPO_MAX_STEPS)
state['env'].step_count = num_steps
# 转换 Python 版本格式
py_ver = saved_progress_data.get("python_version")
if py_ver and isinstance(py_ver, list):
state['env'].python_version = tuple(py_ver)
elif py_ver and isinstance(py_ver, tuple):
state['env'].python_version = py_ver
# 转换 installed 格式
installed_raw = saved_progress_data.get("installed", {})
installed = {}
for pkg, ver in installed_raw.items():
if isinstance(ver, list):
installed[pkg] = tuple(ver)
elif isinstance(ver, tuple):
installed[pkg] = ver
state['env'].installed = installed
state['env'].done = saved_progress_data.get("done", False)
state['env'].success = saved_progress_data.get("success", False)
obs = state['env']._get_obs()
state_display = format_repo_state(obs)
history_display = "\n\n".join(state['history_records']) if state['history_records'] else "No history records"
info = f"✅ Environment {env_idx_display}/{len(test_data)} loaded\n"
info += f"Steps: {len(state['history_records'])}"
current_steps = state['env'].step_count
steps_info = f"{current_steps} / {REPO_MAX_STEPS}"
return state, info, state_display, "", history_display, "Click 'View Unfinished Problems' button to view progress", steps_info
# 没有保存的进度,初始化新环境
state['current_env_idx'] = env_idx
world_spec = test_data[env_idx]
state['env'] = ComputerEnvSetupInductionEnvV7_5(world_spec, max_steps=REPO_MAX_STEPS)
state['history_records'] = []
repo_save_progress_internal(state, current_user_id, save_dir)
obs = state['env']._get_obs()
state_display = format_repo_state(obs)
history_display = "Environment initialized (new environment)\n"
info = f"✅ Environment {env_idx_display}/{len(test_data)} initialized (new environment)\n"
current_steps = state['env'].step_count
steps_info = f"{current_steps} / {REPO_MAX_STEPS}"
return state, info, state_display, "", history_display, "Click 'View Unfinished Problems' button to view progress", steps_info
def repo_step_environment(state: Dict[str, Any], action_str: str, current_user_id: str, save_dir: str) -> Tuple[Dict[str, Any], str, str, str, bool, str]:
"""执行 Repo 环境一步动作
Returns: (state, feedback, state_display, history_display, done, steps_info)
"""
env = state.get('env')
history_records = state.get('history_records', [])
current_state_display = ""
if env is not None:
obs = env._get_obs()
current_state_display = format_repo_state(obs)
if env is None:
return state, "❌ Please initialize environment first", current_state_display if current_state_display else "Please initialize environment first", "", False, "0 / 120"
# Auto-generate user ID if not provided
if not current_user_id:
import uuid
current_user_id = f"user_{uuid.uuid4().hex[:8]}"
# 执行动作
obs, reward, done, info = env.step(action_str.strip())
state_display = format_repo_state(obs)
history_records.append(
f"Step {len(history_records) + 1}: {action_str}\nFeedback: {obs.get('last_message', '')}")
state['history_records'] = history_records
history_display = "\n\n".join(history_records)
repo_save_progress_internal(state, current_user_id, save_dir)
feedback_info = obs.get('last_message', '')
if done and env.success:
feedback_info += "\n🎉 Task completed! Project runs successfully!"
current_steps = env.step_count
steps_info = f"{current_steps} / {REPO_MAX_STEPS}"
return state, feedback_info, state_display, history_display, done, steps_info
def repo_reset_environment(state: Dict[str, Any], current_user_id: str, save_dir: str) -> Tuple[Dict[str, Any], str, str, str, str, str]:
"""重置 Repo 环境
Returns: (state, info, state_display, history_display, progress, steps_info)
"""
env = state.get('env')
if env is None:
return state, "❌ Please initialize environment first", "", "", "Click 'View Uncompleted Problems' button to view progress", "0 / 120"
env.reset()
state['history_records'] = []
repo_save_progress_internal(state, current_user_id, save_dir)
obs = env._get_obs()
state_display = format_repo_state(obs)
history_display = "Environment reset\n"
current_steps = env.step_count
steps_info = f"{current_steps} / {REPO_MAX_STEPS}"
return state, "✅ Environment reset", state_display, history_display, "Click 'View Unfinished Problems' button to view progress", steps_info
def get_repo_current_env_idx(state: Dict[str, Any]) -> int:
"""获取当前 Repo 环境索引"""
return state.get('current_env_idx', 0)
def get_repo_test_data(state: Dict[str, Any]) -> List[dict]:
"""获取 Repo 测试数据"""
return state.get('test_data', [])
def get_repo_history_records(state: Dict[str, Any]) -> List[str]:
"""获取 Repo 历史记录"""
return state.get('history_records', [])
def get_repo_progress_summary(state: Dict[str, Any], user_id: str, save_dir: str) -> str:
"""获取 Repo 任务用户进度摘要(使用统一进度管理模块)
Args:
state: 会话状态
user_id: 用户ID
save_dir: 保存目录
Returns: 格式化的进度摘要字符串
"""
# Auto-generate user ID if not provided
if not user_id or not user_id.strip():
import uuid
user_id = f"user_{uuid.uuid4().hex[:8]}"
user_id = user_id.strip()
test_data = state.get('test_data', [])
# 使用统一进度管理模块加载进度
task_data = progress_manager.load_task_progress(user_id, save_dir, "repo")
environments = task_data.get("environments", {})
completed_envs = set()
for env_key, progress_data in environments.items():
env_idx = progress_data.get("env_idx", -1)
success = progress_data.get("success", False)
done = progress_data.get("done", False)
num_steps = progress_data.get("num_steps", 0)
# 检查是否完成
is_completed = False
if success or done:
is_completed = True
elif num_steps >= REPO_MAX_STEPS:
is_completed = True
if is_completed:
completed_envs.add(env_idx)
# 获取总环境数
total_envs = len(test_data) if test_data else 0
if total_envs == 0:
return "⚠️ Please load test data first"
# 找出未完成的环境
all_env_indices = set(range(total_envs))
incomplete_envs = sorted(all_env_indices - completed_envs)
# 构建摘要信息
summary_lines = []
summary_lines.append(f"📊 Repo Task - Progress Summary for User {user_id}")
summary_lines.append(f"Total environments: {total_envs}")
summary_lines.append(f"Completed: {len(completed_envs)}/{total_envs}")
summary_lines.append(f"Incomplete: {len(incomplete_envs)}/{total_envs}")
if incomplete_envs:
summary_lines.append("\n❌ Incomplete environments:")
# 每行显示5个环境索引
for i in range(0, len(incomplete_envs), 5):
env_display_list = [str(env_idx + 1) for env_idx in incomplete_envs[i:i+5]]
summary_lines.append(" " + ", ".join(env_display_list))
else:
summary_lines.append("\n🎉 Congratulations! All environments are completed!")
return "\n".join(summary_lines)
def create_repo_interface(current_dir: str, save_dir: str, user_id_input: gr.Textbox) -> Tuple[gr.Row, gr.Number, gr.Button, gr.Button, gr.Textbox, gr.Textbox, gr.Textbox, gr.Textbox, gr.Textbox, gr.Textbox]:
"""创建 Repo 任务界面组件
Returns: (repo_interface, repo_env_idx_input, repo_init_btn, repo_reset_btn,
repo_env_info, repo_state_display, repo_steps_info_text,
repo_action_input, repo_step_btn, repo_feedback_display, repo_history_display)
注意:环境控制组件(repo_env_idx_input, repo_init_btn, repo_reset_btn, repo_env_info)
需要在主界面中手动添加到进度摘要下方,不包含在 repo_interface 中。
为了保持函数签名一致,这里返回 None 作为占位符,主界面会忽略这些返回值。
"""
# 创建主界面 Row(不包含环境控制)
with gr.Row(visible=False) as repo_interface:
# 左侧:步数信息和操作历史及环境反馈
with gr.Column(scale=1):
repo_steps_info_text = gr.Textbox(
label="Steps Info",
value="0 / 120",
interactive=False,
visible=True,
lines=2
)
gr.Markdown("### 📜 Action History & Environment Feedback")
repo_history_display = gr.Textbox(
label="Action History & Environment Feedback",
interactive=False,
lines=10
)
# Right side: Command input
with gr.Column(scale=1):
# Temporarily hide environment state display
gr.Markdown("### 📦 Current State", visible=False)
repo_state_display = gr.Textbox(
label="Environment State",
interactive=False,
lines=10,
value="Please load environment first",
visible=False # Hide environment state display
)
gr.Markdown("### 🎯 Command Input")
repo_action_input = gr.Textbox(
label="Input Command",
placeholder="e.g., pip install python==3.10",
info="Supported: pip install/uninstall, python run.py, pip list, repo tree"
)
repo_step_btn = gr.Button("Execute Command", variant="primary")
gr.Markdown("### 💬 Environment Feedback")
repo_feedback_display = gr.Textbox(
label="Feedback Info",
interactive=False,
lines=5,
visible=True
)
# 返回占位符(主界面会使用自己创建的环境控制组件)
return (repo_interface, None, None, None,
None, repo_state_display, repo_steps_info_text,
repo_action_input, repo_step_btn, repo_feedback_display, repo_history_display)