| import gradio as gr |
| import openai |
| import os |
| import tempfile |
| import sys |
| import io |
| import subprocess |
| import importlib.util |
| import re |
| from contextlib import redirect_stdout |
| import textwrap |
| import shutil |
| import traceback |
| import json |
| from typing import List, Tuple, Optional, Dict |
| import requests |
| import threading |
| import uuid |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import time |
|
|
| |
| try: |
| tempdir = tempfile.gettempdir() |
| for item in os.listdir(tempdir): |
| item_path = os.path.join(tempdir, item) |
| if item.startswith('tmp') or item.endswith('.py'): |
| try: |
| if os.path.isfile(item_path): |
| os.unlink(item_path) |
| elif os.path.isdir(item_path): |
| shutil.rmtree(item_path) |
| except: |
| pass |
| |
| pip_cache = os.path.expanduser('~/.cache/pip') |
| if os.path.exists(pip_cache): |
| try: |
| shutil.rmtree(pip_cache) |
| except: |
| pass |
| except: |
| pass |
|
|
| |
| OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY") |
| client = openai.OpenAI( |
| api_key=OPENROUTER_API_KEY, |
| base_url="https://openrouter.ai/api/v1" |
| ) |
|
|
| |
| MODEL_NAME = "openrouter/owl-alpha" |
|
|
| |
| executor = ThreadPoolExecutor(max_workers=5) |
|
|
| class SessionManager: |
| """مدیریت سشنهای مستقل برای هر کاربر""" |
| |
| def __init__(self): |
| self.sessions = {} |
| self.lock = threading.Lock() |
| |
| def create_session(self): |
| """ایجاد یک سشن جدید""" |
| session_id = str(uuid.uuid4()) |
| with self.lock: |
| self.sessions[session_id] = { |
| 'created_at': time.time(), |
| 'temp_dir': tempfile.mkdtemp(prefix=f'session_{session_id[:8]}_') |
| } |
| return session_id |
| |
| def get_session_temp_dir(self, session_id): |
| """دریافت دایرکتوری موقت سشن""" |
| with self.lock: |
| if session_id in self.sessions: |
| return self.sessions[session_id]['temp_dir'] |
| return None |
| |
| def cleanup_session(self, session_id): |
| """پاکسازی سشن""" |
| with self.lock: |
| if session_id in self.sessions: |
| try: |
| temp_dir = self.sessions[session_id]['temp_dir'] |
| if os.path.exists(temp_dir): |
| shutil.rmtree(temp_dir) |
| del self.sessions[session_id] |
| except: |
| pass |
| |
| def cleanup_old_sessions(self, max_age=3600): |
| """پاکسازی سشنهای قدیمی""" |
| current_time = time.time() |
| with self.lock: |
| expired_sessions = [ |
| session_id for session_id, data in self.sessions.items() |
| if current_time - data['created_at'] > max_age |
| ] |
| for session_id in expired_sessions: |
| self.cleanup_session(session_id) |
|
|
| |
| session_manager = SessionManager() |
|
|
| class ErrorAnalyzer: |
| """Analyze errors and suggest fixes""" |
| |
| @staticmethod |
| def analyze_error(error_message: str, code: str) -> Dict: |
| """Analyze error and return fix strategy""" |
| error_type = "unknown" |
| suggestions = [] |
| packages_to_install = [] |
| |
| |
| if "No module named" in error_message or "ModuleNotFoundError" in error_message: |
| error_type = "import_error" |
| module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message) |
| if module_match: |
| module = module_match.group(1) |
| packages_to_install.append(module) |
| suggestions.append(f"Install missing module: {module}") |
| |
| |
| elif "Permission denied" in error_message or "PermissionError" in error_message: |
| error_type = "permission_error" |
| suggestions.append("Use temp directory for file operations") |
| suggestions.append("Avoid system directories") |
| |
| |
| elif "MemoryError" in error_message or "killed" in error_message.lower(): |
| error_type = "memory_error" |
| suggestions.append("Reduce data size or use chunks") |
| suggestions.append("Process data in smaller batches") |
| |
| |
| elif "FileNotFoundError" in error_message or "No such file" in error_message: |
| error_type = "file_error" |
| suggestions.append("Check file paths") |
| suggestions.append("Create directory if needed") |
| |
| |
| elif "SyntaxError" in error_message: |
| error_type = "syntax_error" |
| suggestions.append("Fix syntax issues") |
| suggestions.append("Check indentation") |
| |
| |
| elif "AttributeError" in error_message: |
| error_type = "attribute_error" |
| suggestions.append("Check method/attribute names") |
| suggestions.append("Verify object types") |
| |
| |
| elif "TypeError" in error_message: |
| error_type = "type_error" |
| suggestions.append("Check data types") |
| suggestions.append("Add type conversions") |
| |
| |
| elif "ValueError" in error_message: |
| error_type = "value_error" |
| suggestions.append("Validate input data") |
| suggestions.append("Add error handling") |
| |
| |
| elif "URLError" in error_message or "ConnectionError" in error_message: |
| error_type = "network_error" |
| suggestions.append("Check internet connection") |
| suggestions.append("Add retry logic") |
| |
| |
| if "openpyxl" in error_message or "xlrd" in error_message: |
| packages_to_install.append("openpyxl") |
| suggestions.append("Install Excel support: openpyxl") |
| |
| if "PIL" in error_message or "Pillow" in error_message: |
| packages_to_install.append("Pillow") |
| suggestions.append("Install image processing: Pillow") |
| |
| return { |
| "error_type": error_type, |
| "suggestions": suggestions, |
| "packages": packages_to_install, |
| "original_error": error_message |
| } |
|
|
| def indent_code(code, spaces=4): |
| """Indent the code by the specified number of spaces.""" |
| indented_lines = [] |
| for line in code.split('\n'): |
| if line.strip(): |
| indented_lines.append(' ' * spaces + line) |
| else: |
| indented_lines.append(line) |
| return '\n'.join(indented_lines) |
|
|
| def detect_required_packages(code): |
| """Detect required packages from Python code (optimized for accuracy).""" |
| required_packages = set() |
| |
| |
| pre_installed = { |
| 'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn', |
| 'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime', |
| 'proglog', 'openpyxl', 'moviepy' |
| } |
| |
| |
| import_patterns = [ |
| r'^(?:import|from)\s+(\w+)(?:\.\w+)*', |
| ] |
| |
| |
| pip_patterns = [ |
| r'#?\s*pip\s+install\s+([^\s#]+)', |
| r'#?\s*install\s+([^\s#]+)' |
| ] |
| |
| |
| usage_patterns = { |
| r'\.to_excel': 'openpyxl', |
| r'\.read_excel': 'openpyxl', |
| r'\.ExcelWriter': 'openpyxl', |
| r'\.to_parquet': 'pyarrow', |
| r'\.read_parquet': 'pyarrow', |
| r'\.to_sql': 'sqlalchemy', |
| r'\.read_sql': 'sqlalchemy', |
| r'\.to_feather': 'pyarrow', |
| r'\.read_feather': 'pyarrow', |
| r'\.to_stata': 'statsmodels', |
| r'\.read_stata': 'statsmodels', |
| r'\.to_clipboard': 'pyperclip', |
| r'xlsxwriter': 'xlsxwriter', |
| r'openpyxl': 'openpyxl', |
| } |
| |
| |
| for pattern in pip_patterns: |
| matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE) |
| for match in matches: |
| if match and not match.startswith(('"', "'")): |
| pkg = match.split('==')[0].split('>')[0].split('<')[0].strip() |
| if pkg and pkg not in pre_installed: |
| required_packages.add(pkg) |
| |
| |
| for line in code.split('\n'): |
| line = line.strip() |
| if line.startswith(('import ', 'from ')): |
| match = re.search(import_patterns[0], line) |
| if match: |
| module = match.group(1) |
| if module and module not in {'__future__', 'typing'}: |
| module_map = { |
| 'cv2': 'opencv-python', |
| 'sklearn': 'scikit-learn', |
| 'tf': 'tensorflow', |
| 'torch': 'torch', |
| 'numpy': 'numpy', |
| 'pandas': 'pandas', |
| 'PIL': 'Pillow', |
| 'pillow': 'Pillow', |
| 'matplotlib': 'matplotlib', |
| 'seaborn': 'seaborn', |
| 'onnxruntime': 'onnxruntime', |
| 'rembg': 'rembg', |
| 'requests': 'requests', |
| 'openpyxl': 'openpyxl', |
| 'xlrd': 'xlrd', |
| 'xlwt': 'xlwt', |
| 'xlsxwriter': 'xlsxwriter', |
| 'pyarrow': 'pyarrow', |
| 'sqlalchemy': 'sqlalchemy', |
| 'psycopg2': 'psycopg2-binary', |
| 'pymongo': 'pymongo', |
| 'redis': 'redis', |
| 'beautifulsoup4': 'beautifulsoup4', |
| 'bs4': 'beautifulsoup4', |
| 'scrapy': 'scrapy', |
| 'selenium': 'selenium', |
| 'flask': 'flask', |
| 'fastapi': 'fastapi', |
| 'django': 'django', |
| 'pytest': 'pytest', |
| 'hypothesis': 'hypothesis', |
| 'faker': 'faker', |
| } |
| if module in module_map: |
| pkg = module_map[module] |
| if pkg not in pre_installed: |
| required_packages.add(pkg) |
| elif module not in { |
| 'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections', |
| 'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile', |
| 'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64', |
| 'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket', |
| 'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile', |
| 'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs' |
| }: |
| if module not in pre_installed: |
| required_packages.add(module) |
| |
| |
| for pattern, package in usage_patterns.items(): |
| if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed: |
| required_packages.add(package) |
| |
| |
| if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']): |
| if 'openpyxl' not in pre_installed: |
| required_packages.add('openpyxl') |
| |
| |
| required_packages = required_packages - pre_installed |
| |
| return list(required_packages) |
|
|
| def install_package(package_name): |
| """Install a package using pip if it's not already installed.""" |
| try: |
| |
| import_name = package_name |
| if package_name == 'opencv-python': |
| import_name = 'cv2' |
| elif package_name == 'scikit-learn': |
| import_name = 'sklearn' |
| elif package_name == 'pillow' or package_name == 'Pillow': |
| import_name = 'PIL' |
| elif package_name == 'beautifulsoup4': |
| import_name = 'bs4' |
| elif package_name == 'psycopg2-binary': |
| import_name = 'psycopg2' |
| else: |
| import_name = package_name.replace('-', '_') |
| |
| spec = importlib.util.find_spec(import_name) |
| if spec is None: |
| print(f"Installing package: {package_name}") |
| result = subprocess.run([ |
| sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name |
| ], capture_output=True, text=True) |
| |
| if result.returncode == 0: |
| print(f"✅ {package_name} installed successfully.") |
| return True |
| else: |
| print(f"❌ Failed to install {package_name}: {result.stderr}") |
| return False |
| else: |
| print(f"✅ {package_name} already installed.") |
| return True |
| except Exception as e: |
| print(f"❌ Error checking/installing {package_name}: {str(e)}") |
| return False |
|
|
| def install_packages_if_needed(packages): |
| """Install required packages.""" |
| if not packages: |
| print("No additional packages to install.") |
| return True |
| |
| success_count = 0 |
| failed_packages = [] |
| |
| for package in packages: |
| if package: |
| if install_package(package): |
| success_count += 1 |
| else: |
| failed_packages.append(package) |
| |
| print(f"✅ Installed/checked {success_count}/{len(packages)} packages.") |
| |
| if failed_packages: |
| print(f"⚠️ Failed to install: {', '.join(failed_packages)}") |
| |
| return len(failed_packages) == 0 |
|
|
| def download_file_from_url(url: str, temp_dir: str) -> Optional[str]: |
| """Download a file from URL to temp_dir and return local path.""" |
| try: |
| filename = url.split('/')[-1].split('?')[0] or 'downloaded_file' |
| if '.' not in filename: |
| filename += '.txt' |
| local_path = os.path.join(temp_dir, filename) |
| |
| response = requests.get(url, stream=True, timeout=30) |
| response.raise_for_status() |
| |
| with open(local_path, 'wb') as f: |
| for chunk in response.iter_content(chunk_size=8192): |
| f.write(chunk) |
| |
| print(f"Downloaded {url} to {local_path}") |
| return local_path |
| except Exception as e: |
| print(f"Failed to download {url}: {e}") |
| return None |
|
|
| def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1): |
| """Generate Python code using OpenRouter API with error awareness.""" |
| |
| error_context = "" |
| if previous_errors: |
| error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n" |
| for err in previous_errors: |
| error_context += f"- Error type: {err.get('error_type', 'unknown')}\n" |
| error_context += f" Details: {err.get('original_error', '')[:200]}\n" |
| error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n" |
| |
| alternative_approaches = "" |
| if attempt > 1: |
| alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:" |
| if attempt == 2: |
| alternative_approaches += "\n- Use simpler libraries if possible" |
| alternative_approaches += "\n- Add more error handling" |
| alternative_approaches += "\n- Check file paths carefully" |
| elif attempt >= 3: |
| alternative_approaches += "\n- Use only standard library if possible" |
| alternative_approaches += "\n- Implement fallback solutions" |
| alternative_approaches += "\n- Generate mock data if files are problematic" |
| |
| prompt_template = textwrap.dedent(""" |
| You are a Python expert. Instruction: "{instruction}" |
| Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone). |
| {error_context} |
| {alternative_approaches} |
| |
| Write a complete Python script that: |
| 1. Import all necessary libraries at the top. |
| 2. Add "# pip install package_name" comments after imports for all needed libraries. |
| 3. Define file_paths = {file_paths_list} |
| 4. Add comprehensive error handling with try-except blocks |
| 5. Create output directory if needed using os.makedirs(exist_ok=True) |
| 6. Save output to a temp directory using tempfile.mkdtemp() |
| 7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath() |
| 8. If file operations fail, try alternative approaches |
| |
| Important rules: |
| - For pandas Excel operations, always add: # pip install openpyxl |
| - Always use absolute paths with os.path.abspath() |
| - Create directories before saving files |
| - Handle common errors (FileNotFoundError, PermissionError, etc.) |
| - If a library fails, try alternatives (e.g., csv instead of pandas) |
| - No __name__ == '__main__', no functions, just direct code |
| - Add detailed error messages |
| |
| Example with error handling: |
| import os |
| import tempfile |
| try: |
| import pandas as pd |
| # pip install pandas openpyxl |
| except ImportError: |
| print("Pandas not available, using csv module") |
| import csv |
| |
| file_paths = {file_paths_list} |
| |
| try: |
| temp_dir = tempfile.mkdtemp() |
| os.makedirs(temp_dir, exist_ok=True) |
| output_path = os.path.join(temp_dir, 'output.xlsx') |
| |
| # Your main logic here with error handling |
| |
| print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}") |
| except Exception as e: |
| print(f"ERROR: {{e}}") |
| # Fallback solution |
| try: |
| # Alternative approach |
| pass |
| except: |
| print("All approaches failed") |
| |
| Output ONLY Python code, no markdown. |
| """) |
| |
| file_paths_list = str(file_paths) |
| file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)' |
| |
| prompt = prompt_template.format( |
| instruction=instruction or "No instruction provided", |
| file_paths_str=file_paths_str, |
| file_paths_list=file_paths_list, |
| error_context=error_context, |
| alternative_approaches=alternative_approaches |
| ) |
| |
| try: |
| response = client.chat.completions.create( |
| model=MODEL_NAME, |
| messages=[ |
| {"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."}, |
| {"role": "user", "content": prompt} |
| ], |
| temperature=0.1 if attempt == 1 else 0.3 |
| ) |
| |
| generated_code = response.choices[0].message.content.strip() |
| |
| |
| generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip() |
| |
| return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)" |
| |
| except Exception as api_error: |
| error_msg = f"API Error: {api_error}" |
| print(error_msg) |
| |
| return """import sys |
| print("OUTPUT_TEXT: Code generation failed due to API error") |
| sys.exit(0)""" |
|
|
| def execute_code_with_retry(code: str, session_id: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]: |
| """Execute code with retry logic and error recovery""" |
| tf_path = None |
| attempt = 0 |
| |
| while attempt < max_attempts: |
| attempt += 1 |
| print(f"\n=== Execution attempt {attempt}/{max_attempts} (Session: {session_id}) ===") |
| |
| try: |
| |
| print("Detecting packages...") |
| required_packages = detect_required_packages(code) |
| print("Detected packages:", required_packages) |
| install_packages_if_needed(required_packages) |
| |
| |
| indented = indent_code(code) |
| wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)" |
| |
| |
| print("Compiling code...") |
| try: |
| compile(wrapped_code, '<string>', 'exec') |
| print("Compile OK.") |
| except SyntaxError as se: |
| error_msg = f"Syntax Error: {se}" |
| if attempt < max_attempts: |
| print(f"Syntax error on attempt {attempt}, will regenerate code") |
| return False, error_msg, None |
| else: |
| return False, error_msg, None |
| |
| |
| print("Creating temp file...") |
| session_temp_dir = session_manager.get_session_temp_dir(session_id) |
| if not session_temp_dir: |
| session_temp_dir = tempfile.mkdtemp() |
| |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, dir=session_temp_dir) as tf: |
| tf.write(wrapped_code) |
| tf_path = tf.name |
| print(f"Temp file: {tf_path}") |
| |
| |
| print(f"Executing...") |
| result = subprocess.run( |
| [sys.executable, tf_path], |
| capture_output=True, |
| text=True, |
| timeout=60 |
| ) |
| stdout, stderr = result.stdout, result.stderr |
| rc = result.returncode |
| |
| |
| if tf_path and os.path.exists(tf_path): |
| try: |
| os.unlink(tf_path) |
| except: |
| pass |
| |
| if rc != 0: |
| error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}" |
| print(error_msg) |
| |
| |
| if attempt < max_attempts: |
| |
| error_analysis = ErrorAnalyzer.analyze_error(stderr, code) |
| |
| |
| if error_analysis['packages']: |
| print(f"Attempting to install missing packages: {error_analysis['packages']}") |
| for pkg in error_analysis['packages']: |
| install_package(pkg) |
| |
| return False, stderr, None |
| else: |
| return False, error_msg, None |
| |
| |
| output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I) |
| output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL) |
| |
| if output_path_match: |
| output_path = output_path_match.group(1).strip() |
| if os.path.exists(output_path): |
| return True, stdout, output_path |
| else: |
| error_msg = f"Output path not found: {output_path}" |
| if attempt < max_attempts: |
| print(error_msg) |
| return False, error_msg, None |
| else: |
| return False, error_msg, None |
| elif output_text_match: |
| return True, output_text_match.group(1).strip(), None |
| else: |
| if stdout.strip(): |
| return True, stdout, None |
| else: |
| return False, "No output generated", None |
| |
| except subprocess.TimeoutExpired: |
| error_msg = "Timeout: Code execution took too long" |
| if attempt < max_attempts: |
| print(error_msg) |
| return False, error_msg, None |
| else: |
| return False, error_msg, None |
| except Exception as e: |
| error_msg = f"Execution error: {str(e)}" |
| if attempt < max_attempts: |
| print(error_msg) |
| return False, error_msg, None |
| else: |
| return False, error_msg, None |
| |
| return False, "Max attempts reached", None |
|
|
| def process_request_async(instruction, files, urls_input): |
| """تابع اصلی پردازش درخواست به صورت همزمان""" |
| session_id = session_manager.create_session() |
| |
| try: |
| if not instruction.strip(): |
| return "لطفاً دستور را وارد کنید. (فایلها و لینکها اختیاری هستند)", None |
| |
| file_paths = [] |
| |
| |
| if files: |
| file_paths = [f.name for f in files] |
| |
| |
| if urls_input and urls_input.strip(): |
| session_temp_dir = session_manager.get_session_temp_dir(session_id) |
| urls = [url.strip() for url in urls_input.split(',') if url.strip()] |
| downloaded_paths = [] |
| for url in urls: |
| local_path = download_file_from_url(url, session_temp_dir) |
| if local_path: |
| downloaded_paths.append(local_path) |
| file_paths.extend(downloaded_paths) |
| print(f"Downloaded {len(downloaded_paths)} files from URLs for session {session_id}") |
| |
| |
| previous_errors = [] |
| generated_codes = [] |
| |
| |
| for attempt in range(1, 4): |
| print(f"\n{'='*50}") |
| print(f"MAIN ATTEMPT {attempt}/3 (Session: {session_id})") |
| print(f"{'='*50}") |
| |
| |
| print("Generating code...") |
| generated_code = generate_code_with_openrouter( |
| instruction, |
| file_paths, |
| previous_errors=previous_errors if attempt > 1 else None, |
| attempt=attempt |
| ) |
| |
| if len(generated_code) < 20: |
| return f"کد ضعیف تولید شد: {generated_code}", None |
| |
| generated_codes.append(generated_code) |
| print(f"Generated code preview: {generated_code[:200]}...") |
| |
| |
| success, output, file_path = execute_code_with_retry(generated_code, session_id, max_attempts=2) |
| |
| if success: |
| |
| result_text = f"✅ Success on attempt {attempt}!\n\n" |
| result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n" |
| result_text += f"Output:\n{output}" |
| session_manager.cleanup_session(session_id) |
| return result_text, file_path |
| else: |
| |
| print(f"\n❌ Attempt {attempt} failed") |
| error_analysis = ErrorAnalyzer.analyze_error(output, generated_code) |
| previous_errors.append(error_analysis) |
| |
| print(f"Error type: {error_analysis['error_type']}") |
| print(f"Suggestions: {', '.join(error_analysis['suggestions'])}") |
| |
| |
| if attempt == 3: |
| error_report = f"❌ Failed after {attempt} attempts.\n\n" |
| error_report += "Error History:\n" |
| for i, err in enumerate(previous_errors, 1): |
| error_report += f"\nAttempt {i}:\n" |
| error_report += f"- Error type: {err['error_type']}\n" |
| error_report += f"- Details: {err['original_error'][:200]}...\n" |
| |
| error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```" |
| session_manager.cleanup_session(session_id) |
| return error_report, None |
| |
| session_manager.cleanup_session(session_id) |
| return "Unexpected end of retry loop", None |
| |
| except Exception as e: |
| error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}" |
| print(error_msg) |
| session_manager.cleanup_session(session_id) |
| return error_msg, None |
|
|
| def process_request_wrapper(instruction, files, urls_input): |
| """Wrapper function for Gradio to handle async execution""" |
| |
| future = executor.submit(process_request_async, instruction, files, urls_input) |
| |
| try: |
| |
| result = future.result(timeout=300) |
| return result |
| except Exception as e: |
| return f"Error processing request: {str(e)}", None |
|
|
| |
| with gr.Blocks(title="AI File Processor - Self Correcting - Concurrent") as demo: |
| gr.Markdown(""" |
| # 🤖 AI File Processor - Self Correcting Edition |
| """) |
| |
| with gr.Row(): |
| instruction = gr.Textbox( |
| label="دستور", |
| lines=2, |
| placeholder="مثال: یک نمودار دایرهای از دادههای فروش بکش", |
| rtl=True |
| ) |
| |
| with gr.Row(): |
| files = gr.File(file_count="multiple", label="فایلهای آپلود شده (اختیاری)") |
| urls = gr.Textbox( |
| label="لینک فایلها (جدا با کاما، اختیاری)", |
| placeholder="https://example.com/file1.csv, https://example.com/file2.jpg", |
| info="لینکها دانلود خواهند شد و حجم فایل چک نمیشود." |
| ) |
| |
| btn = gr.Button("🚀 اجرا", variant="primary") |
| |
| with gr.Row(): |
| output = gr.Textbox(label="نتیجه", lines=15) |
| file_out = gr.File(label="فایل خروجی (در صورت تولید)") |
| |
| |
| gr.Examples( |
| examples=[ |
| ["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None], |
| ["یک نمودار میلهای از دادههای تصادفی رسم کن", None, None], |
| ["یک تصویر 500x500 پیکسل با رنگهای تصادفی بساز", None, None], |
| ["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"], |
| ], |
| inputs=[instruction, files, urls], |
| ) |
| |
| btn.click(fn=process_request_wrapper, inputs=[instruction, files, urls], outputs=[output, file_out]) |
|
|
| if __name__ == "__main__": |
| |
| session_manager.cleanup_old_sessions() |
| |
| |
| try: |
| |
| demo.queue(max_size=20) |
| except TypeError: |
| try: |
| |
| demo.queue(concurrency_count=5, max_size=20) |
| except: |
| |
| demo.queue() |
| |
| demo.launch( |
| share=True, |
| server_name="0.0.0.0", |
| server_port=7860, |
| show_error=True |
| ) |