filesai2 / src /streamlit_app.py
suprimedev's picture
Update src/streamlit_app.py
a09cc33 verified
import streamlit as st
import openai
import os
import tempfile
import sys
import io
import subprocess
import importlib.util
import re
from contextlib import redirect_stdout
import textwrap
import shutil
import traceback
import json
from typing import List, Tuple, Optional, Dict
import requests # Added for downloading from URLs
# Clean up any existing temp files on startup to save space
try:
tempdir = tempfile.gettempdir()
for item in os.listdir(tempdir):
item_path = os.path.join(tempdir, item)
if item.startswith('tmp') or item.endswith('.py'):
try:
if os.path.isfile(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
except:
pass
# Clean pip cache if exists
pip_cache = os.path.expanduser('~/.cache/pip')
if os.path.exists(pip_cache):
try:
shutil.rmtree(pip_cache)
except:
pass
except:
pass
# Use OpenRouter API (OpenAI-compatible)
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
client = openai.OpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1"
)
MODEL_NAME = "x-ai/grok-4-fast:free"
class ErrorAnalyzer:
"""Analyze errors and suggest fixes"""
@staticmethod
def analyze_error(error_message: str, code: str) -> Dict:
"""Analyze error and return fix strategy"""
error_type = "unknown"
suggestions = []
packages_to_install = []
# Import errors
if "No module named" in error_message or "ModuleNotFoundError" in error_message:
error_type = "import_error"
module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message)
if module_match:
module = module_match.group(1)
packages_to_install.append(module)
suggestions.append(f"Install missing module: {module}")
# Permission errors
elif "Permission denied" in error_message or "PermissionError" in error_message:
error_type = "permission_error"
suggestions.append("Use temp directory for file operations")
suggestions.append("Avoid system directories")
# Memory errors
elif "MemoryError" in error_message or "killed" in error_message.lower():
error_type = "memory_error"
suggestions.append("Reduce data size or use chunks")
suggestions.append("Process data in smaller batches")
# File not found
elif "FileNotFoundError" in error_message or "No such file" in error_message:
error_type = "file_error"
suggestions.append("Check file paths")
suggestions.append("Create directory if needed")
# Syntax errors
elif "SyntaxError" in error_message:
error_type = "syntax_error"
suggestions.append("Fix syntax issues")
suggestions.append("Check indentation")
# Attribute errors
elif "AttributeError" in error_message:
error_type = "attribute_error"
suggestions.append("Check method/attribute names")
suggestions.append("Verify object types")
# Type errors
elif "TypeError" in error_message:
error_type = "type_error"
suggestions.append("Check data types")
suggestions.append("Add type conversions")
# Value errors
elif "ValueError" in error_message:
error_type = "value_error"
suggestions.append("Validate input data")
suggestions.append("Add error handling")
# Network errors
elif "URLError" in error_message or "ConnectionError" in error_message:
error_type = "network_error"
suggestions.append("Check internet connection")
suggestions.append("Add retry logic")
# Package specific errors
if "openpyxl" in error_message or "xlrd" in error_message:
packages_to_install.append("openpyxl")
suggestions.append("Install Excel support: openpyxl")
if "PIL" in error_message or "Pillow" in error_message:
packages_to_install.append("Pillow")
suggestions.append("Install image processing: Pillow")
return {
"error_type": error_type,
"suggestions": suggestions,
"packages": packages_to_install,
"original_error": error_message
}
def indent_code(code, spaces=4):
"""Indent the code by the specified number of spaces."""
indented_lines = []
for line in code.split('\n'):
if line.strip(): # Only indent non-empty lines
indented_lines.append(' ' * spaces + line)
else:
indented_lines.append(line)
return '\n'.join(indented_lines)
def detect_required_packages(code):
"""Detect required packages from Python code (optimized for accuracy)."""
required_packages = set()
# Pre-installed packages from requirements.txt
pre_installed = {
'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn',
'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime',
'proglog', 'openpyxl', 'moviepy'
}
# Import patterns
import_patterns = [
r'^(?:import|from)\s+(\w+)(?:\.\w+)*',
]
# Patterns for pip install in code/comments
pip_patterns = [
r'#?\s*pip\s+install\s+([^\s#]+)',
r'#?\s*install\s+([^\s#]+)'
]
# Usage patterns that require additional packages
usage_patterns = {
r'\.to_excel': 'openpyxl',
r'\.read_excel': 'openpyxl',
r'\.ExcelWriter': 'openpyxl',
r'\.to_parquet': 'pyarrow',
r'\.read_parquet': 'pyarrow',
r'\.to_sql': 'sqlalchemy',
r'\.read_sql': 'sqlalchemy',
r'\.to_feather': 'pyarrow',
r'\.read_feather': 'pyarrow',
r'\.to_stata': 'statsmodels',
r'\.read_stata': 'statsmodels',
r'\.to_clipboard': 'pyperclip',
r'xlsxwriter': 'xlsxwriter',
r'openpyxl': 'openpyxl',
}
# Check for pip install comments
for pattern in pip_patterns:
matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE)
for match in matches:
if match and not match.startswith(('"', "'")):
pkg = match.split('==')[0].split('>')[0].split('<')[0].strip()
if pkg and pkg not in pre_installed:
required_packages.add(pkg)
# Check imports with mapping
for line in code.split('\n'):
line = line.strip()
if line.startswith(('import ', 'from ')):
match = re.search(import_patterns[0], line)
if match:
module = match.group(1)
if module and module not in {'__future__', 'typing'}:
module_map = {
'cv2': 'opencv-python',
'sklearn': 'scikit-learn',
'tf': 'tensorflow',
'torch': 'torch',
'numpy': 'numpy',
'pandas': 'pandas',
'PIL': 'Pillow',
'pillow': 'Pillow',
'matplotlib': 'matplotlib',
'seaborn': 'seaborn',
'onnxruntime': 'onnxruntime',
'rembg': 'rembg',
'requests': 'requests',
'openpyxl': 'openpyxl',
'xlrd': 'xlrd',
'xlwt': 'xlwt',
'xlsxwriter': 'xlsxwriter',
'pyarrow': 'pyarrow',
'sqlalchemy': 'sqlalchemy',
'psycopg2': 'psycopg2-binary',
'pymongo': 'pymongo',
'redis': 'redis',
'beautifulsoup4': 'beautifulsoup4',
'bs4': 'beautifulsoup4',
'scrapy': 'scrapy',
'selenium': 'selenium',
'flask': 'flask',
'fastapi': 'fastapi',
'django': 'django',
'pytest': 'pytest',
'hypothesis': 'hypothesis',
'faker': 'faker',
}
if module in module_map:
pkg = module_map[module]
if pkg not in pre_installed:
required_packages.add(pkg)
elif module not in {
'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections',
'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile',
'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64',
'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket',
'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile',
'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs'
}:
if module not in pre_installed:
required_packages.add(module)
# Check for usage patterns that need additional packages
for pattern, package in usage_patterns.items():
if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed:
required_packages.add(package)
# Special case: if pandas is imported and Excel operations detected
if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']):
if 'openpyxl' not in pre_installed:
required_packages.add('openpyxl')
# Remove pre-installed packages
required_packages = required_packages - pre_installed
return list(required_packages)
def install_package(package_name):
"""Install a package using pip if it's not already installed."""
try:
# Special handling for some packages
import_name = package_name
if package_name == 'opencv-python':
import_name = 'cv2'
elif package_name == 'scikit-learn':
import_name = 'sklearn'
elif package_name == 'pillow' or package_name == 'Pillow':
import_name = 'PIL'
elif package_name == 'beautifulsoup4':
import_name = 'bs4'
elif package_name == 'psycopg2-binary':
import_name = 'psycopg2'
else:
import_name = package_name.replace('-', '_')
spec = importlib.util.find_spec(import_name)
if spec is None:
print(f"Installing package: {package_name}")
result = subprocess.run([
sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name
], capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ {package_name} installed successfully.")
return True
else:
print(f"❌ Failed to install {package_name}: {result.stderr}")
return False
else:
print(f"✅ {package_name} already installed.")
return True
except Exception as e:
print(f"❌ Error checking/installing {package_name}: {str(e)}")
return False
def install_packages_if_needed(packages):
"""Install required packages."""
if not packages:
print("No additional packages to install.")
return True
success_count = 0
failed_packages = []
for package in packages:
if package:
if install_package(package):
success_count += 1
else:
failed_packages.append(package)
print(f"✅ Installed/checked {success_count}/{len(packages)} packages.")
if failed_packages:
print(f"⚠️ Failed to install: {', '.join(failed_packages)}")
return len(failed_packages) == 0
def download_file_from_url(url: str, temp_dir: str) -> Optional[str]:
"""Download a file from URL to temp_dir and return local path."""
try:
filename = url.split('/')[-1].split('?')[0] or 'downloaded_file'
if '.' not in filename:
filename += '.txt' # Default extension
local_path = os.path.join(temp_dir, filename)
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded {url} to {local_path}")
return local_path
except Exception as e:
print(f"Failed to download {url}: {e}")
return None
def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1):
"""Generate Python code using OpenRouter API with error awareness."""
error_context = ""
if previous_errors:
error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n"
for err in previous_errors:
error_context += f"- Error type: {err.get('error_type', 'unknown')}\n"
error_context += f" Details: {err.get('original_error', '')[:200]}\n"
error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n"
alternative_approaches = ""
if attempt > 1:
alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:"
if attempt == 2:
alternative_approaches += "\n- Use simpler libraries if possible"
alternative_approaches += "\n- Add more error handling"
alternative_approaches += "\n- Check file paths carefully"
elif attempt >= 3:
alternative_approaches += "\n- Use only standard library if possible"
alternative_approaches += "\n- Implement fallback solutions"
alternative_approaches += "\n- Generate mock data if files are problematic"
prompt_template = textwrap.dedent("""
You are a Python expert. Instruction: "{instruction}"
Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone).
{error_context}
{alternative_approaches}
Write a complete Python script that:
1. Import all necessary libraries at the top.
2. Add "# pip install package_name" comments after imports for all needed libraries.
3. Define file_paths = {file_paths_list}
4. Add comprehensive error handling with try-except blocks
5. Create output directory if needed using os.makedirs(exist_ok=True)
6. Save output to a temp directory using tempfile.mkdtemp()
7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath()
8. If file operations fail, try alternative approaches
Important rules:
- For pandas Excel operations, always add: # pip install openpyxl
- Always use absolute paths with os.path.abspath()
- Create directories before saving files
- Handle common errors (FileNotFoundError, PermissionError, etc.)
- If a library fails, try alternatives (e.g., csv instead of pandas)
- No __name__ == '__main__', no functions, just direct code
- Add detailed error messages
Example with error handling:
import os
import tempfile
try:
import pandas as pd
# pip install pandas openpyxl
except ImportError:
print("Pandas not available, using csv module")
import csv
file_paths = {file_paths_list}
try:
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
output_path = os.path.join(temp_dir, 'output.xlsx')
# Your main logic here with error handling
print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}")
except Exception as e:
print(f"ERROR: {{e}}")
# Fallback solution
try:
# Alternative approach
pass
except:
print("All approaches failed")
Output ONLY Python code, no markdown.
""")
file_paths_list = str(file_paths)
file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)'
prompt = prompt_template.format(
instruction=instruction or "No instruction provided",
file_paths_str=file_paths_str,
file_paths_list=file_paths_list,
error_context=error_context,
alternative_approaches=alternative_approaches
)
try:
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."},
{"role": "user", "content": prompt}
],
max_tokens=4000,
temperature=0.1 if attempt == 1 else 0.3 # Increase creativity on retries
)
generated_code = response.choices[0].message.content.strip()
# Clean code blocks
generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip()
return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)"
except Exception as api_error:
error_msg = f"API Error: {api_error}"
print(error_msg)
# Return simple fallback
return """import sys
print("OUTPUT_TEXT: Code generation failed due to API error")
sys.exit(0)"""
def execute_code_with_retry(code: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]:
"""Execute code with retry logic and error recovery"""
tf_path = None
attempt = 0
while attempt < max_attempts:
attempt += 1
print(f"\n=== Execution attempt {attempt}/{max_attempts} ===")
try:
# Step 1: Detect and install packages
print("Detecting packages...")
required_packages = detect_required_packages(code)
print("Detected packages:", required_packages)
install_packages_if_needed(required_packages)
# Step 2: Wrap code
indented = indent_code(code)
wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)"
# Step 3: Compile check
print("Compiling code...")
try:
compile(wrapped_code, '<string>', 'exec')
print("Compile OK.")
except SyntaxError as se:
error_msg = f"Syntax Error: {se}"
if attempt < max_attempts:
print(f"Syntax error on attempt {attempt}, will regenerate code")
return False, error_msg, None
else:
return False, error_msg, None
# Step 4: Create temp file
print("Creating temp file...")
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as tf:
tf.write(wrapped_code)
tf_path = tf.name
print(f"Temp file: {tf_path}")
# Step 5: Execute
print(f"Executing...")
result = subprocess.run(
[sys.executable, tf_path],
capture_output=True,
text=True,
timeout=60
)
stdout, stderr = result.stdout, result.stderr
rc = result.returncode
# Clean up temp file
if tf_path and os.path.exists(tf_path):
try:
os.unlink(tf_path)
except:
pass
if rc != 0:
error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}"
print(error_msg)
# Check if we should retry
if attempt < max_attempts:
# Analyze error for next attempt
error_analysis = ErrorAnalyzer.analyze_error(stderr, code)
# Try to fix by installing missing packages
if error_analysis['packages']:
print(f"Attempting to install missing packages: {error_analysis['packages']}")
for pkg in error_analysis['packages']:
install_package(pkg)
return False, stderr, None
else:
return False, error_msg, None
# Extract output
output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I)
output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL)
if output_path_match:
output_path = output_path_match.group(1).strip()
if os.path.exists(output_path):
return True, stdout, output_path
else:
error_msg = f"Output path not found: {output_path}"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
elif output_text_match:
return True, output_text_match.group(1).strip(), None
else:
if stdout.strip():
return True, stdout, None
else:
return False, "No output generated", None
except subprocess.TimeoutExpired:
error_msg = "Timeout: Code execution took too long"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
except Exception as e:
error_msg = f"Execution error: {str(e)}"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
return False, "Max attempts reached", None
def process_request(instruction, files, urls_input):
"""Main processing function with self-correction, supporting URLs."""
try:
if not instruction.strip():
return "لطفاً دستور را وارد کنید. (فایل‌ها و لینک‌ها اختیاری هستند)", None
file_paths = []
# Handle uploaded files
if files:
file_paths = [f.name for f in files]
# Handle URLs: download to temp dir
if urls_input and urls_input.strip():
temp_dir_for_downloads = tempfile.mkdtemp(prefix='url_downloads_')
urls = [url.strip() for url in urls_input.split(',') if url.strip()]
downloaded_paths = []
for url in urls:
local_path = download_file_from_url(url, temp_dir_for_downloads)
if local_path:
downloaded_paths.append(local_path)
file_paths.extend(downloaded_paths)
print(f"Downloaded {len(downloaded_paths)} files from URLs")
# Clean up note: Temp dirs will be cleaned on shutdown or manually if needed
# Track errors for learning
previous_errors = []
generated_codes = []
# Main retry loop
for attempt in range(1, 4): # 3 attempts
print(f"\n{'='*50}")
print(f"MAIN ATTEMPT {attempt}/3")
print(f"{'='*50}")
# Generate code
print("Generating code...")
generated_code = generate_code_with_openrouter(
instruction,
file_paths,
previous_errors=previous_errors if attempt > 1 else None,
attempt=attempt
)
if len(generated_code) < 20:
return f"کد ضعیف تولید شد: {generated_code}", None
generated_codes.append(generated_code)
print(f"Generated code preview: {generated_code[:200]}...")
# Try to execute
success, output, file_path = execute_code_with_retry(generated_code, max_attempts=2)
if success:
# Success!
result_text = f"✅ Success on attempt {attempt}!\n\n"
result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n"
result_text += f"Output:\n{output}"
return result_text, file_path
else:
# Analyze error
print(f"\n❌ Attempt {attempt} failed")
error_analysis = ErrorAnalyzer.analyze_error(output, generated_code)
previous_errors.append(error_analysis)
print(f"Error type: {error_analysis['error_type']}")
print(f"Suggestions: {', '.join(error_analysis['suggestions'])}")
# If this was the last attempt
if attempt == 3:
error_report = f"❌ Failed after {attempt} attempts.\n\n"
error_report += "Error History:\n"
for i, err in enumerate(previous_errors, 1):
error_report += f"\nAttempt {i}:\n"
error_report += f"- Error type: {err['error_type']}\n"
error_report += f"- Details: {err['original_error'][:200]}...\n"
error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```"
return error_report, None
return "Unexpected end of retry loop", None
except Exception as e:
error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}"
print(error_msg)
return error_msg, None
# Streamlit Interface
def main():
st.set_page_config(page_title="AI File Processor - Self Correcting", page_icon="🤖", layout="wide")
st.title("🤖 AI File Processor - Self Correcting Edition")
st.markdown("""
این سیستم می‌تواند:
- کد Python تولید کند
- خطاها را تشخیص و تحلیل کند
- به طور خودکار مشکلات را برطرف کند
- تا 3 بار با رویکردهای مختلف تلاش کند
**مثال دستورات:**
- "یک فایل اکسل با 1000 نام و شماره تلفن ایرانی بساز"
- "پس‌زمینه این تصویر را حذف کن"
- "این فایل CSV را به JSON تبدیل کن"
**نکته:** می‌توانید فایل‌ها را آپلود کنید یا لینک‌های فایل را (جدا شده با کاما) وارد کنید.
""")
# Inputs
instruction = st.text_area(
"دستور",
height=100,
placeholder="مثال: یک نمودار دایره‌ای از داده‌های فروش بکش"
)
col1, col2 = st.columns(2)
with col1:
uploaded_files = st.file_uploader(
"فایل‌های آپلود شده (اختیاری)",
accept_multiple_files=True
)
with col2:
urls_input = st.text_input(
"لینک فایل‌ها (جدا با کاما، اختیاری)",
placeholder="https://example.com/file1.csv, https://example.com/file2.jpg",
help="لینک‌ها دانلود خواهند شد و حجم فایل چک نمی‌شود."
)
# Process button
if st.button("🚀 اجرا", type="primary"):
if not instruction.strip():
st.error("لطفاً دستور را وارد کنید.")
else:
with st.spinner("در حال پردازش..."):
# Convert uploaded files to file-like objects
files = []
if uploaded_files:
for uploaded_file in uploaded_files:
# Save uploaded file to temp location
temp_dir = tempfile.mkdtemp()
file_path = os.path.join(temp_dir, uploaded_file.name)
with open(file_path, "wb") as f:
f.write(uploaded_file.getbuffer())
files.append(type('FileObj', (), {'name': file_path})())
# Process request
result_text, output_file = process_request(instruction, files, urls_input)
# Display results
st.subheader("نتایج")
st.text_area("خروجی", result_text, height=300)
# Download button for output file
if output_file and os.path.exists(output_file):
with open(output_file, "rb") as f:
st.download_button(
label="📥 دانلود فایل خروجی",
data=f,
file_name=os.path.basename(output_file),
mime="application/octet-stream"
)
# Examples
st.subheader("مثال‌ها")
examples = [
["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None],
["یک نمودار میله‌ای از داده‌های تصادفی رسم کن", None, None],
["یک تصویر 500x500 پیکسل با رنگ‌های تصادفی بساز", None, None],
["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"],
]
for i, example in enumerate(examples):
if st.button(f"مثال {i+1}: {example[0][:30]}..."):
st.experimental_set_query_params(
instruction=example[0],
urls=example[2] if example[2] else ""
)
st.experimental_rerun()
if __name__ == "__main__":
main()