import gradio as gr import openai import os import tempfile import sys import io import subprocess import importlib.util import re from contextlib import redirect_stdout import textwrap import shutil import traceback import json from typing import List, Tuple, Optional, Dict import requests import threading import uuid from concurrent.futures import ThreadPoolExecutor, as_completed import time # Clean up any existing temp files on startup to save space try: tempdir = tempfile.gettempdir() for item in os.listdir(tempdir): item_path = os.path.join(tempdir, item) if item.startswith('tmp') or item.endswith('.py'): try: if os.path.isfile(item_path): os.unlink(item_path) elif os.path.isdir(item_path): shutil.rmtree(item_path) except: pass # Clean pip cache if exists pip_cache = os.path.expanduser('~/.cache/pip') if os.path.exists(pip_cache): try: shutil.rmtree(pip_cache) except: pass except: pass # Use OpenRouter API (OpenAI-compatible) OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") client = openai.OpenAI( api_key=OPENROUTER_API_KEY, base_url="https://openrouter.ai/api/v1" ) MODEL_NAME = "x-ai/grok-4.1-fast" # Global thread pool for concurrent execution executor = ThreadPoolExecutor(max_workers=5) # حداکثر ۵ درخواست همزمان class SessionManager: """مدیریت سشن‌های مستقل برای هر کاربر""" def __init__(self): self.sessions = {} self.lock = threading.Lock() def create_session(self): """ایجاد یک سشن جدید""" session_id = str(uuid.uuid4()) with self.lock: self.sessions[session_id] = { 'created_at': time.time(), 'temp_dir': tempfile.mkdtemp(prefix=f'session_{session_id[:8]}_') } return session_id def get_session_temp_dir(self, session_id): """دریافت دایرکتوری موقت سشن""" with self.lock: if session_id in self.sessions: return self.sessions[session_id]['temp_dir'] return None def cleanup_session(self, session_id): """پاک‌سازی سشن""" with self.lock: if session_id in self.sessions: try: temp_dir = self.sessions[session_id]['temp_dir'] if os.path.exists(temp_dir): shutil.rmtree(temp_dir) del self.sessions[session_id] except: pass def cleanup_old_sessions(self, max_age=3600): """پاک‌سازی سشن‌های قدیمی""" current_time = time.time() with self.lock: expired_sessions = [ session_id for session_id, data in self.sessions.items() if current_time - data['created_at'] > max_age ] for session_id in expired_sessions: self.cleanup_session(session_id) # مدیر سشن گلوبال session_manager = SessionManager() class ErrorAnalyzer: """Analyze errors and suggest fixes""" @staticmethod def analyze_error(error_message: str, code: str) -> Dict: """Analyze error and return fix strategy""" error_type = "unknown" suggestions = [] packages_to_install = [] # Import errors if "No module named" in error_message or "ModuleNotFoundError" in error_message: error_type = "import_error" module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message) if module_match: module = module_match.group(1) packages_to_install.append(module) suggestions.append(f"Install missing module: {module}") # Permission errors elif "Permission denied" in error_message or "PermissionError" in error_message: error_type = "permission_error" suggestions.append("Use temp directory for file operations") suggestions.append("Avoid system directories") # Memory errors elif "MemoryError" in error_message or "killed" in error_message.lower(): error_type = "memory_error" suggestions.append("Reduce data size or use chunks") suggestions.append("Process data in smaller batches") # File not found elif "FileNotFoundError" in error_message or "No such file" in error_message: error_type = "file_error" suggestions.append("Check file paths") suggestions.append("Create directory if needed") # Syntax errors elif "SyntaxError" in error_message: error_type = "syntax_error" suggestions.append("Fix syntax issues") suggestions.append("Check indentation") # Attribute errors elif "AttributeError" in error_message: error_type = "attribute_error" suggestions.append("Check method/attribute names") suggestions.append("Verify object types") # Type errors elif "TypeError" in error_message: error_type = "type_error" suggestions.append("Check data types") suggestions.append("Add type conversions") # Value errors elif "ValueError" in error_message: error_type = "value_error" suggestions.append("Validate input data") suggestions.append("Add error handling") # Network errors elif "URLError" in error_message or "ConnectionError" in error_message: error_type = "network_error" suggestions.append("Check internet connection") suggestions.append("Add retry logic") # Package specific errors if "openpyxl" in error_message or "xlrd" in error_message: packages_to_install.append("openpyxl") suggestions.append("Install Excel support: openpyxl") if "PIL" in error_message or "Pillow" in error_message: packages_to_install.append("Pillow") suggestions.append("Install image processing: Pillow") return { "error_type": error_type, "suggestions": suggestions, "packages": packages_to_install, "original_error": error_message } def indent_code(code, spaces=4): """Indent the code by the specified number of spaces.""" indented_lines = [] for line in code.split('\n'): if line.strip(): # Only indent non-empty lines indented_lines.append(' ' * spaces + line) else: indented_lines.append(line) return '\n'.join(indented_lines) def detect_required_packages(code): """Detect required packages from Python code (optimized for accuracy).""" required_packages = set() # Pre-installed packages from requirements.txt pre_installed = { 'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn', 'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime', 'proglog', 'openpyxl', 'moviepy' } # Import patterns import_patterns = [ r'^(?:import|from)\s+(\w+)(?:\.\w+)*', ] # Patterns for pip install in code/comments pip_patterns = [ r'#?\s*pip\s+install\s+([^\s#]+)', r'#?\s*install\s+([^\s#]+)' ] # Usage patterns that require additional packages usage_patterns = { r'\.to_excel': 'openpyxl', r'\.read_excel': 'openpyxl', r'\.ExcelWriter': 'openpyxl', r'\.to_parquet': 'pyarrow', r'\.read_parquet': 'pyarrow', r'\.to_sql': 'sqlalchemy', r'\.read_sql': 'sqlalchemy', r'\.to_feather': 'pyarrow', r'\.read_feather': 'pyarrow', r'\.to_stata': 'statsmodels', r'\.read_stata': 'statsmodels', r'\.to_clipboard': 'pyperclip', r'xlsxwriter': 'xlsxwriter', r'openpyxl': 'openpyxl', } # Check for pip install comments for pattern in pip_patterns: matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE) for match in matches: if match and not match.startswith(('"', "'")): pkg = match.split('==')[0].split('>')[0].split('<')[0].strip() if pkg and pkg not in pre_installed: required_packages.add(pkg) # Check imports with mapping for line in code.split('\n'): line = line.strip() if line.startswith(('import ', 'from ')): match = re.search(import_patterns[0], line) if match: module = match.group(1) if module and module not in {'__future__', 'typing'}: module_map = { 'cv2': 'opencv-python', 'sklearn': 'scikit-learn', 'tf': 'tensorflow', 'torch': 'torch', 'numpy': 'numpy', 'pandas': 'pandas', 'PIL': 'Pillow', 'pillow': 'Pillow', 'matplotlib': 'matplotlib', 'seaborn': 'seaborn', 'onnxruntime': 'onnxruntime', 'rembg': 'rembg', 'requests': 'requests', 'openpyxl': 'openpyxl', 'xlrd': 'xlrd', 'xlwt': 'xlwt', 'xlsxwriter': 'xlsxwriter', 'pyarrow': 'pyarrow', 'sqlalchemy': 'sqlalchemy', 'psycopg2': 'psycopg2-binary', 'pymongo': 'pymongo', 'redis': 'redis', 'beautifulsoup4': 'beautifulsoup4', 'bs4': 'beautifulsoup4', 'scrapy': 'scrapy', 'selenium': 'selenium', 'flask': 'flask', 'fastapi': 'fastapi', 'django': 'django', 'pytest': 'pytest', 'hypothesis': 'hypothesis', 'faker': 'faker', } if module in module_map: pkg = module_map[module] if pkg not in pre_installed: required_packages.add(pkg) elif module not in { 'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections', 'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile', 'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64', 'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket', 'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile', 'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs' }: if module not in pre_installed: required_packages.add(module) # Check for usage patterns that need additional packages for pattern, package in usage_patterns.items(): if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed: required_packages.add(package) # Special case: if pandas is imported and Excel operations detected if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']): if 'openpyxl' not in pre_installed: required_packages.add('openpyxl') # Remove pre-installed packages required_packages = required_packages - pre_installed return list(required_packages) def install_package(package_name): """Install a package using pip if it's not already installed.""" try: # Special handling for some packages import_name = package_name if package_name == 'opencv-python': import_name = 'cv2' elif package_name == 'scikit-learn': import_name = 'sklearn' elif package_name == 'pillow' or package_name == 'Pillow': import_name = 'PIL' elif package_name == 'beautifulsoup4': import_name = 'bs4' elif package_name == 'psycopg2-binary': import_name = 'psycopg2' else: import_name = package_name.replace('-', '_') spec = importlib.util.find_spec(import_name) if spec is None: print(f"Installing package: {package_name}") result = subprocess.run([ sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name ], capture_output=True, text=True) if result.returncode == 0: print(f"✅ {package_name} installed successfully.") return True else: print(f"❌ Failed to install {package_name}: {result.stderr}") return False else: print(f"✅ {package_name} already installed.") return True except Exception as e: print(f"❌ Error checking/installing {package_name}: {str(e)}") return False def install_packages_if_needed(packages): """Install required packages.""" if not packages: print("No additional packages to install.") return True success_count = 0 failed_packages = [] for package in packages: if package: if install_package(package): success_count += 1 else: failed_packages.append(package) print(f"✅ Installed/checked {success_count}/{len(packages)} packages.") if failed_packages: print(f"⚠️ Failed to install: {', '.join(failed_packages)}") return len(failed_packages) == 0 def download_file_from_url(url: str, temp_dir: str) -> Optional[str]: """Download a file from URL to temp_dir and return local path.""" try: filename = url.split('/')[-1].split('?')[0] or 'downloaded_file' if '.' not in filename: filename += '.txt' # Default extension local_path = os.path.join(temp_dir, filename) response = requests.get(url, stream=True, timeout=30) response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"Downloaded {url} to {local_path}") return local_path except Exception as e: print(f"Failed to download {url}: {e}") return None def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1): """Generate Python code using OpenRouter API with error awareness.""" error_context = "" if previous_errors: error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n" for err in previous_errors: error_context += f"- Error type: {err.get('error_type', 'unknown')}\n" error_context += f" Details: {err.get('original_error', '')[:200]}\n" error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n" alternative_approaches = "" if attempt > 1: alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:" if attempt == 2: alternative_approaches += "\n- Use simpler libraries if possible" alternative_approaches += "\n- Add more error handling" alternative_approaches += "\n- Check file paths carefully" elif attempt >= 3: alternative_approaches += "\n- Use only standard library if possible" alternative_approaches += "\n- Implement fallback solutions" alternative_approaches += "\n- Generate mock data if files are problematic" prompt_template = textwrap.dedent(""" You are a Python expert. Instruction: "{instruction}" Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone). {error_context} {alternative_approaches} Write a complete Python script that: 1. Import all necessary libraries at the top. 2. Add "# pip install package_name" comments after imports for all needed libraries. 3. Define file_paths = {file_paths_list} 4. Add comprehensive error handling with try-except blocks 5. Create output directory if needed using os.makedirs(exist_ok=True) 6. Save output to a temp directory using tempfile.mkdtemp() 7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath() 8. If file operations fail, try alternative approaches Important rules: - For pandas Excel operations, always add: # pip install openpyxl - Always use absolute paths with os.path.abspath() - Create directories before saving files - Handle common errors (FileNotFoundError, PermissionError, etc.) - If a library fails, try alternatives (e.g., csv instead of pandas) - No __name__ == '__main__', no functions, just direct code - Add detailed error messages Example with error handling: import os import tempfile try: import pandas as pd # pip install pandas openpyxl except ImportError: print("Pandas not available, using csv module") import csv file_paths = {file_paths_list} try: temp_dir = tempfile.mkdtemp() os.makedirs(temp_dir, exist_ok=True) output_path = os.path.join(temp_dir, 'output.xlsx') # Your main logic here with error handling print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}") except Exception as e: print(f"ERROR: {{e}}") # Fallback solution try: # Alternative approach pass except: print("All approaches failed") Output ONLY Python code, no markdown. """) file_paths_list = str(file_paths) file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)' prompt = prompt_template.format( instruction=instruction or "No instruction provided", file_paths_str=file_paths_str, file_paths_list=file_paths_list, error_context=error_context, alternative_approaches=alternative_approaches ) try: response = client.chat.completions.create( model=MODEL_NAME, messages=[ {"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."}, {"role": "user", "content": prompt} ], temperature=0.1 if attempt == 1 else 0.3 # Increase creativity on retries ) generated_code = response.choices[0].message.content.strip() # Clean code blocks generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip() return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)" except Exception as api_error: error_msg = f"API Error: {api_error}" print(error_msg) # Return simple fallback return """import sys print("OUTPUT_TEXT: Code generation failed due to API error") sys.exit(0)""" def execute_code_with_retry(code: str, session_id: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]: """Execute code with retry logic and error recovery""" tf_path = None attempt = 0 while attempt < max_attempts: attempt += 1 print(f"\n=== Execution attempt {attempt}/{max_attempts} (Session: {session_id}) ===") try: # Step 1: Detect and install packages print("Detecting packages...") required_packages = detect_required_packages(code) print("Detected packages:", required_packages) install_packages_if_needed(required_packages) # Step 2: Wrap code indented = indent_code(code) wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)" # Step 3: Compile check print("Compiling code...") try: compile(wrapped_code, '', 'exec') print("Compile OK.") except SyntaxError as se: error_msg = f"Syntax Error: {se}" if attempt < max_attempts: print(f"Syntax error on attempt {attempt}, will regenerate code") return False, error_msg, None else: return False, error_msg, None # Step 4: Create temp file print("Creating temp file...") session_temp_dir = session_manager.get_session_temp_dir(session_id) if not session_temp_dir: session_temp_dir = tempfile.mkdtemp() with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, dir=session_temp_dir) as tf: tf.write(wrapped_code) tf_path = tf.name print(f"Temp file: {tf_path}") # Step 5: Execute print(f"Executing...") result = subprocess.run( [sys.executable, tf_path], capture_output=True, text=True, timeout=60 ) stdout, stderr = result.stdout, result.stderr rc = result.returncode # Clean up temp file if tf_path and os.path.exists(tf_path): try: os.unlink(tf_path) except: pass if rc != 0: error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}" print(error_msg) # Check if we should retry if attempt < max_attempts: # Analyze error for next attempt error_analysis = ErrorAnalyzer.analyze_error(stderr, code) # Try to fix by installing missing packages if error_analysis['packages']: print(f"Attempting to install missing packages: {error_analysis['packages']}") for pkg in error_analysis['packages']: install_package(pkg) return False, stderr, None else: return False, error_msg, None # Extract output output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I) output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL) if output_path_match: output_path = output_path_match.group(1).strip() if os.path.exists(output_path): return True, stdout, output_path else: error_msg = f"Output path not found: {output_path}" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None elif output_text_match: return True, output_text_match.group(1).strip(), None else: if stdout.strip(): return True, stdout, None else: return False, "No output generated", None except subprocess.TimeoutExpired: error_msg = "Timeout: Code execution took too long" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None except Exception as e: error_msg = f"Execution error: {str(e)}" if attempt < max_attempts: print(error_msg) return False, error_msg, None else: return False, error_msg, None return False, "Max attempts reached", None def process_request_async(instruction, files, urls_input): """تابع اصلی پردازش درخواست به صورت همزمان""" session_id = session_manager.create_session() try: if not instruction.strip(): return "لطفاً دستور را وارد کنید. (فایل‌ها و لینک‌ها اختیاری هستند)", None file_paths = [] # Handle uploaded files if files: file_paths = [f.name for f in files] # Handle URLs: download to session temp dir if urls_input and urls_input.strip(): session_temp_dir = session_manager.get_session_temp_dir(session_id) urls = [url.strip() for url in urls_input.split(',') if url.strip()] downloaded_paths = [] for url in urls: local_path = download_file_from_url(url, session_temp_dir) if local_path: downloaded_paths.append(local_path) file_paths.extend(downloaded_paths) print(f"Downloaded {len(downloaded_paths)} files from URLs for session {session_id}") # Track errors for learning previous_errors = [] generated_codes = [] # Main retry loop for attempt in range(1, 4): # 3 attempts print(f"\n{'='*50}") print(f"MAIN ATTEMPT {attempt}/3 (Session: {session_id})") print(f"{'='*50}") # Generate code print("Generating code...") generated_code = generate_code_with_openrouter( instruction, file_paths, previous_errors=previous_errors if attempt > 1 else None, attempt=attempt ) if len(generated_code) < 20: return f"کد ضعیف تولید شد: {generated_code}", None generated_codes.append(generated_code) print(f"Generated code preview: {generated_code[:200]}...") # Try to execute success, output, file_path = execute_code_with_retry(generated_code, session_id, max_attempts=2) if success: # Success! result_text = f"✅ Success on attempt {attempt}!\n\n" result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n" result_text += f"Output:\n{output}" session_manager.cleanup_session(session_id) # Cleanup on success return result_text, file_path else: # Analyze error print(f"\n❌ Attempt {attempt} failed") error_analysis = ErrorAnalyzer.analyze_error(output, generated_code) previous_errors.append(error_analysis) print(f"Error type: {error_analysis['error_type']}") print(f"Suggestions: {', '.join(error_analysis['suggestions'])}") # If this was the last attempt if attempt == 3: error_report = f"❌ Failed after {attempt} attempts.\n\n" error_report += "Error History:\n" for i, err in enumerate(previous_errors, 1): error_report += f"\nAttempt {i}:\n" error_report += f"- Error type: {err['error_type']}\n" error_report += f"- Details: {err['original_error'][:200]}...\n" error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```" session_manager.cleanup_session(session_id) # Cleanup on failure return error_report, None session_manager.cleanup_session(session_id) return "Unexpected end of retry loop", None except Exception as e: error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}" print(error_msg) session_manager.cleanup_session(session_id) return error_msg, None def process_request_wrapper(instruction, files, urls_input): """Wrapper function for Gradio to handle async execution""" # Submit task to thread pool future = executor.submit(process_request_async, instruction, files, urls_input) try: # Wait for result with timeout result = future.result(timeout=300) # 5 minutes timeout return result except Exception as e: return f"Error processing request: {str(e)}", None # Gradio Interface with gr.Blocks(title="AI File Processor - Self Correcting - Concurrent") as demo: gr.Markdown(""" # 🤖 AI File Processor - Self Correcting Edition """) with gr.Row(): instruction = gr.Textbox( label="دستور", lines=2, placeholder="مثال: یک نمودار دایره‌ای از داده‌های فروش بکش", rtl=True ) with gr.Row(): files = gr.File(file_count="multiple", label="فایل‌های آپلود شده (اختیاری)") urls = gr.Textbox( label="لینک فایل‌ها (جدا با کاما، اختیاری)", placeholder="https://example.com/file1.csv, https://example.com/file2.jpg", info="لینک‌ها دانلود خواهند شد و حجم فایل چک نمی‌شود." ) btn = gr.Button("🚀 اجرا", variant="primary") with gr.Row(): output = gr.Textbox(label="نتیجه", lines=15) file_out = gr.File(label="فایل خروجی (در صورت تولید)") # Examples gr.Examples( examples=[ ["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None], ["یک نمودار میله‌ای از داده‌های تصادفی رسم کن", None, None], ["یک تصویر 500x500 پیکسل با رنگ‌های تصادفی بساز", None, None], ["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"], ], inputs=[instruction, files, urls], ) btn.click(fn=process_request_wrapper, inputs=[instruction, files, urls], outputs=[output, file_out]) if __name__ == "__main__": # Cleanup old sessions on startup session_manager.cleanup_old_sessions() # Launch with concurrency settings - روش سازگار با تمام نسخه‌های Gradio try: # روش ۱: برای نسخه‌های جدید Gradio demo.queue(max_size=20) except TypeError: try: # روش ۲: برای نسخه‌های قدیمی‌تر demo.queue(concurrency_count=5, max_size=20) except: # روش ۳: بدون پارامتر demo.queue() demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )