aifiles / app.py
suprimedev's picture
Update app.py
2e5d4d1 verified
import gradio as gr
import openai
import os
import tempfile
import sys
import io
import subprocess
import importlib.util
import re
from contextlib import redirect_stdout
import textwrap
import shutil
import traceback
import json
from typing import List, Tuple, Optional, Dict
import requests
import threading
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
# Clean up any existing temp files on startup to save space
try:
tempdir = tempfile.gettempdir()
for item in os.listdir(tempdir):
item_path = os.path.join(tempdir, item)
if item.startswith('tmp') or item.endswith('.py'):
try:
if os.path.isfile(item_path):
os.unlink(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
except:
pass
# Clean pip cache if exists
pip_cache = os.path.expanduser('~/.cache/pip')
if os.path.exists(pip_cache):
try:
shutil.rmtree(pip_cache)
except:
pass
except:
pass
# Use OpenRouter API (OpenAI-compatible)
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY")
client = openai.OpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1"
)
MODEL_NAME = "x-ai/grok-4.1-fast"
# Global thread pool for concurrent execution
executor = ThreadPoolExecutor(max_workers=5) # حداکثر ۵ درخواست همزمان
class SessionManager:
"""مدیریت سشن‌های مستقل برای هر کاربر"""
def __init__(self):
self.sessions = {}
self.lock = threading.Lock()
def create_session(self):
"""ایجاد یک سشن جدید"""
session_id = str(uuid.uuid4())
with self.lock:
self.sessions[session_id] = {
'created_at': time.time(),
'temp_dir': tempfile.mkdtemp(prefix=f'session_{session_id[:8]}_')
}
return session_id
def get_session_temp_dir(self, session_id):
"""دریافت دایرکتوری موقت سشن"""
with self.lock:
if session_id in self.sessions:
return self.sessions[session_id]['temp_dir']
return None
def cleanup_session(self, session_id):
"""پاک‌سازی سشن"""
with self.lock:
if session_id in self.sessions:
try:
temp_dir = self.sessions[session_id]['temp_dir']
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)
del self.sessions[session_id]
except:
pass
def cleanup_old_sessions(self, max_age=3600):
"""پاک‌سازی سشن‌های قدیمی"""
current_time = time.time()
with self.lock:
expired_sessions = [
session_id for session_id, data in self.sessions.items()
if current_time - data['created_at'] > max_age
]
for session_id in expired_sessions:
self.cleanup_session(session_id)
# مدیر سشن گلوبال
session_manager = SessionManager()
class ErrorAnalyzer:
"""Analyze errors and suggest fixes"""
@staticmethod
def analyze_error(error_message: str, code: str) -> Dict:
"""Analyze error and return fix strategy"""
error_type = "unknown"
suggestions = []
packages_to_install = []
# Import errors
if "No module named" in error_message or "ModuleNotFoundError" in error_message:
error_type = "import_error"
module_match = re.search(r"No module named ['\"]([^'\"]+)['\"]", error_message)
if module_match:
module = module_match.group(1)
packages_to_install.append(module)
suggestions.append(f"Install missing module: {module}")
# Permission errors
elif "Permission denied" in error_message or "PermissionError" in error_message:
error_type = "permission_error"
suggestions.append("Use temp directory for file operations")
suggestions.append("Avoid system directories")
# Memory errors
elif "MemoryError" in error_message or "killed" in error_message.lower():
error_type = "memory_error"
suggestions.append("Reduce data size or use chunks")
suggestions.append("Process data in smaller batches")
# File not found
elif "FileNotFoundError" in error_message or "No such file" in error_message:
error_type = "file_error"
suggestions.append("Check file paths")
suggestions.append("Create directory if needed")
# Syntax errors
elif "SyntaxError" in error_message:
error_type = "syntax_error"
suggestions.append("Fix syntax issues")
suggestions.append("Check indentation")
# Attribute errors
elif "AttributeError" in error_message:
error_type = "attribute_error"
suggestions.append("Check method/attribute names")
suggestions.append("Verify object types")
# Type errors
elif "TypeError" in error_message:
error_type = "type_error"
suggestions.append("Check data types")
suggestions.append("Add type conversions")
# Value errors
elif "ValueError" in error_message:
error_type = "value_error"
suggestions.append("Validate input data")
suggestions.append("Add error handling")
# Network errors
elif "URLError" in error_message or "ConnectionError" in error_message:
error_type = "network_error"
suggestions.append("Check internet connection")
suggestions.append("Add retry logic")
# Package specific errors
if "openpyxl" in error_message or "xlrd" in error_message:
packages_to_install.append("openpyxl")
suggestions.append("Install Excel support: openpyxl")
if "PIL" in error_message or "Pillow" in error_message:
packages_to_install.append("Pillow")
suggestions.append("Install image processing: Pillow")
return {
"error_type": error_type,
"suggestions": suggestions,
"packages": packages_to_install,
"original_error": error_message
}
def indent_code(code, spaces=4):
"""Indent the code by the specified number of spaces."""
indented_lines = []
for line in code.split('\n'):
if line.strip(): # Only indent non-empty lines
indented_lines.append(' ' * spaces + line)
else:
indented_lines.append(line)
return '\n'.join(indented_lines)
def detect_required_packages(code):
"""Detect required packages from Python code (optimized for accuracy)."""
required_packages = set()
# Pre-installed packages from requirements.txt
pre_installed = {
'gradio', 'openai', 'pillow', 'rembg', 'numpy', 'opencv-python', 'scikit-learn',
'tensorflow', 'torch', 'lxml', 'requests', 'matplotlib', 'seaborn', 'onnxruntime',
'proglog', 'openpyxl', 'moviepy'
}
# Import patterns
import_patterns = [
r'^(?:import|from)\s+(\w+)(?:\.\w+)*',
]
# Patterns for pip install in code/comments
pip_patterns = [
r'#?\s*pip\s+install\s+([^\s#]+)',
r'#?\s*install\s+([^\s#]+)'
]
# Usage patterns that require additional packages
usage_patterns = {
r'\.to_excel': 'openpyxl',
r'\.read_excel': 'openpyxl',
r'\.ExcelWriter': 'openpyxl',
r'\.to_parquet': 'pyarrow',
r'\.read_parquet': 'pyarrow',
r'\.to_sql': 'sqlalchemy',
r'\.read_sql': 'sqlalchemy',
r'\.to_feather': 'pyarrow',
r'\.read_feather': 'pyarrow',
r'\.to_stata': 'statsmodels',
r'\.read_stata': 'statsmodels',
r'\.to_clipboard': 'pyperclip',
r'xlsxwriter': 'xlsxwriter',
r'openpyxl': 'openpyxl',
}
# Check for pip install comments
for pattern in pip_patterns:
matches = re.findall(pattern, code, re.IGNORECASE | re.MULTILINE)
for match in matches:
if match and not match.startswith(('"', "'")):
pkg = match.split('==')[0].split('>')[0].split('<')[0].strip()
if pkg and pkg not in pre_installed:
required_packages.add(pkg)
# Check imports with mapping
for line in code.split('\n'):
line = line.strip()
if line.startswith(('import ', 'from ')):
match = re.search(import_patterns[0], line)
if match:
module = match.group(1)
if module and module not in {'__future__', 'typing'}:
module_map = {
'cv2': 'opencv-python',
'sklearn': 'scikit-learn',
'tf': 'tensorflow',
'torch': 'torch',
'numpy': 'numpy',
'pandas': 'pandas',
'PIL': 'Pillow',
'pillow': 'Pillow',
'matplotlib': 'matplotlib',
'seaborn': 'seaborn',
'onnxruntime': 'onnxruntime',
'rembg': 'rembg',
'requests': 'requests',
'openpyxl': 'openpyxl',
'xlrd': 'xlrd',
'xlwt': 'xlwt',
'xlsxwriter': 'xlsxwriter',
'pyarrow': 'pyarrow',
'sqlalchemy': 'sqlalchemy',
'psycopg2': 'psycopg2-binary',
'pymongo': 'pymongo',
'redis': 'redis',
'beautifulsoup4': 'beautifulsoup4',
'bs4': 'beautifulsoup4',
'scrapy': 'scrapy',
'selenium': 'selenium',
'flask': 'flask',
'fastapi': 'fastapi',
'django': 'django',
'pytest': 'pytest',
'hypothesis': 'hypothesis',
'faker': 'faker',
}
if module in module_map:
pkg = module_map[module]
if pkg not in pre_installed:
required_packages.add(pkg)
elif module not in {
'os', 'sys', 'io', 'json', 're', 'time', 'math', 'random', 'collections',
'itertools', 'functools', 'operator', 'string', 'pathlib', 'tempfile',
'subprocess', 'logging', 'argparse', 'csv', 'xml', 'html', 'base64',
'hashlib', 'urllib', 'http', 'threading', 'multiprocessing', 'socket',
'asyncio', 'concurrent', 'abc', 'enum', 'dataclasses', 'zipfile',
'datetime', 'calendar', 'copy', 'pickle', 'struct', 'binascii', 'codecs'
}:
if module not in pre_installed:
required_packages.add(module)
# Check for usage patterns that need additional packages
for pattern, package in usage_patterns.items():
if re.search(pattern, code, re.IGNORECASE) and package not in pre_installed:
required_packages.add(package)
# Special case: if pandas is imported and Excel operations detected
if 'pandas' in code and any(pattern in code for pattern in ['.to_excel', '.read_excel', 'ExcelWriter']):
if 'openpyxl' not in pre_installed:
required_packages.add('openpyxl')
# Remove pre-installed packages
required_packages = required_packages - pre_installed
return list(required_packages)
def install_package(package_name):
"""Install a package using pip if it's not already installed."""
try:
# Special handling for some packages
import_name = package_name
if package_name == 'opencv-python':
import_name = 'cv2'
elif package_name == 'scikit-learn':
import_name = 'sklearn'
elif package_name == 'pillow' or package_name == 'Pillow':
import_name = 'PIL'
elif package_name == 'beautifulsoup4':
import_name = 'bs4'
elif package_name == 'psycopg2-binary':
import_name = 'psycopg2'
else:
import_name = package_name.replace('-', '_')
spec = importlib.util.find_spec(import_name)
if spec is None:
print(f"Installing package: {package_name}")
result = subprocess.run([
sys.executable, "-m", "pip", "install", "--quiet", "--no-cache-dir", package_name
], capture_output=True, text=True)
if result.returncode == 0:
print(f"✅ {package_name} installed successfully.")
return True
else:
print(f"❌ Failed to install {package_name}: {result.stderr}")
return False
else:
print(f"✅ {package_name} already installed.")
return True
except Exception as e:
print(f"❌ Error checking/installing {package_name}: {str(e)}")
return False
def install_packages_if_needed(packages):
"""Install required packages."""
if not packages:
print("No additional packages to install.")
return True
success_count = 0
failed_packages = []
for package in packages:
if package:
if install_package(package):
success_count += 1
else:
failed_packages.append(package)
print(f"✅ Installed/checked {success_count}/{len(packages)} packages.")
if failed_packages:
print(f"⚠️ Failed to install: {', '.join(failed_packages)}")
return len(failed_packages) == 0
def download_file_from_url(url: str, temp_dir: str) -> Optional[str]:
"""Download a file from URL to temp_dir and return local path."""
try:
filename = url.split('/')[-1].split('?')[0] or 'downloaded_file'
if '.' not in filename:
filename += '.txt' # Default extension
local_path = os.path.join(temp_dir, filename)
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Downloaded {url} to {local_path}")
return local_path
except Exception as e:
print(f"Failed to download {url}: {e}")
return None
def generate_code_with_openrouter(instruction, file_paths, previous_errors=None, attempt=1):
"""Generate Python code using OpenRouter API with error awareness."""
error_context = ""
if previous_errors:
error_context = "\n\nPREVIOUS ERRORS TO AVOID:\n"
for err in previous_errors:
error_context += f"- Error type: {err.get('error_type', 'unknown')}\n"
error_context += f" Details: {err.get('original_error', '')[:200]}\n"
error_context += f" Suggestions: {', '.join(err.get('suggestions', []))}\n"
alternative_approaches = ""
if attempt > 1:
alternative_approaches = f"\n\nThis is attempt {attempt}. Please use a different approach:"
if attempt == 2:
alternative_approaches += "\n- Use simpler libraries if possible"
alternative_approaches += "\n- Add more error handling"
alternative_approaches += "\n- Check file paths carefully"
elif attempt >= 3:
alternative_approaches += "\n- Use only standard library if possible"
alternative_approaches += "\n- Implement fallback solutions"
alternative_approaches += "\n- Generate mock data if files are problematic"
prompt_template = textwrap.dedent("""
You are a Python expert. Instruction: "{instruction}"
Input files: {file_paths_str} (use file_paths[0] for first file, iterate for multiple; if empty, generate based on instruction alone).
{error_context}
{alternative_approaches}
Write a complete Python script that:
1. Import all necessary libraries at the top.
2. Add "# pip install package_name" comments after imports for all needed libraries.
3. Define file_paths = {file_paths_list}
4. Add comprehensive error handling with try-except blocks
5. Create output directory if needed using os.makedirs(exist_ok=True)
6. Save output to a temp directory using tempfile.mkdtemp()
7. Print "OUTPUT_FILE_PATH: /full/path/to/output" at the end using os.path.abspath()
8. If file operations fail, try alternative approaches
Important rules:
- For pandas Excel operations, always add: # pip install openpyxl
- Always use absolute paths with os.path.abspath()
- Create directories before saving files
- Handle common errors (FileNotFoundError, PermissionError, etc.)
- If a library fails, try alternatives (e.g., csv instead of pandas)
- No __name__ == '__main__', no functions, just direct code
- Add detailed error messages
Example with error handling:
import os
import tempfile
try:
import pandas as pd
# pip install pandas openpyxl
except ImportError:
print("Pandas not available, using csv module")
import csv
file_paths = {file_paths_list}
try:
temp_dir = tempfile.mkdtemp()
os.makedirs(temp_dir, exist_ok=True)
output_path = os.path.join(temp_dir, 'output.xlsx')
# Your main logic here with error handling
print(f"OUTPUT_FILE_PATH: {{os.path.abspath(output_path)}}")
except Exception as e:
print(f"ERROR: {{e}}")
# Fallback solution
try:
# Alternative approach
pass
except:
print("All approaches failed")
Output ONLY Python code, no markdown.
""")
file_paths_list = str(file_paths)
file_paths_str = ', '.join([os.path.basename(p) if p else 'None' for p in file_paths]) if file_paths else 'None (generate from scratch)'
prompt = prompt_template.format(
instruction=instruction or "No instruction provided",
file_paths_str=file_paths_str,
file_paths_list=file_paths_list,
error_context=error_context,
alternative_approaches=alternative_approaches
)
try:
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": "Output only clean executable Python code with comprehensive error handling."},
{"role": "user", "content": prompt}
],
temperature=0.1 if attempt == 1 else 0.3 # Increase creativity on retries
)
generated_code = response.choices[0].message.content.strip()
# Clean code blocks
generated_code = re.sub(r'^```python\s*|\s*```$', '', generated_code, flags=re.MULTILINE).strip()
return generated_code if generated_code else "import sys\nprint('OUTPUT_TEXT: No code generated')\nsys.exit(0)"
except Exception as api_error:
error_msg = f"API Error: {api_error}"
print(error_msg)
# Return simple fallback
return """import sys
print("OUTPUT_TEXT: Code generation failed due to API error")
sys.exit(0)"""
def execute_code_with_retry(code: str, session_id: str, max_attempts: int = 3) -> Tuple[bool, str, Optional[str]]:
"""Execute code with retry logic and error recovery"""
tf_path = None
attempt = 0
while attempt < max_attempts:
attempt += 1
print(f"\n=== Execution attempt {attempt}/{max_attempts} (Session: {session_id}) ===")
try:
# Step 1: Detect and install packages
print("Detecting packages...")
required_packages = detect_required_packages(code)
print("Detected packages:", required_packages)
install_packages_if_needed(required_packages)
# Step 2: Wrap code
indented = indent_code(code)
wrapped_code = f"try:\n{indented}\nexcept Exception as e:\n print(f'ERROR: {{e}}')\n import traceback; traceback.print_exc()\n import sys; sys.exit(1)"
# Step 3: Compile check
print("Compiling code...")
try:
compile(wrapped_code, '<string>', 'exec')
print("Compile OK.")
except SyntaxError as se:
error_msg = f"Syntax Error: {se}"
if attempt < max_attempts:
print(f"Syntax error on attempt {attempt}, will regenerate code")
return False, error_msg, None
else:
return False, error_msg, None
# Step 4: Create temp file
print("Creating temp file...")
session_temp_dir = session_manager.get_session_temp_dir(session_id)
if not session_temp_dir:
session_temp_dir = tempfile.mkdtemp()
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, dir=session_temp_dir) as tf:
tf.write(wrapped_code)
tf_path = tf.name
print(f"Temp file: {tf_path}")
# Step 5: Execute
print(f"Executing...")
result = subprocess.run(
[sys.executable, tf_path],
capture_output=True,
text=True,
timeout=60
)
stdout, stderr = result.stdout, result.stderr
rc = result.returncode
# Clean up temp file
if tf_path and os.path.exists(tf_path):
try:
os.unlink(tf_path)
except:
pass
if rc != 0:
error_msg = f"Execution failed (RC {rc}):\nStderr: {stderr}"
print(error_msg)
# Check if we should retry
if attempt < max_attempts:
# Analyze error for next attempt
error_analysis = ErrorAnalyzer.analyze_error(stderr, code)
# Try to fix by installing missing packages
if error_analysis['packages']:
print(f"Attempting to install missing packages: {error_analysis['packages']}")
for pkg in error_analysis['packages']:
install_package(pkg)
return False, stderr, None
else:
return False, error_msg, None
# Extract output
output_path_match = re.search(r'OUTPUT_FILE_PATH:\s*(.+)', stdout, re.I)
output_text_match = re.search(r'OUTPUT_TEXT:\s*(.+)', stdout, re.I | re.DOTALL)
if output_path_match:
output_path = output_path_match.group(1).strip()
if os.path.exists(output_path):
return True, stdout, output_path
else:
error_msg = f"Output path not found: {output_path}"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
elif output_text_match:
return True, output_text_match.group(1).strip(), None
else:
if stdout.strip():
return True, stdout, None
else:
return False, "No output generated", None
except subprocess.TimeoutExpired:
error_msg = "Timeout: Code execution took too long"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
except Exception as e:
error_msg = f"Execution error: {str(e)}"
if attempt < max_attempts:
print(error_msg)
return False, error_msg, None
else:
return False, error_msg, None
return False, "Max attempts reached", None
def process_request_async(instruction, files, urls_input):
"""تابع اصلی پردازش درخواست به صورت همزمان"""
session_id = session_manager.create_session()
try:
if not instruction.strip():
return "لطفاً دستور را وارد کنید. (فایل‌ها و لینک‌ها اختیاری هستند)", None
file_paths = []
# Handle uploaded files
if files:
file_paths = [f.name for f in files]
# Handle URLs: download to session temp dir
if urls_input and urls_input.strip():
session_temp_dir = session_manager.get_session_temp_dir(session_id)
urls = [url.strip() for url in urls_input.split(',') if url.strip()]
downloaded_paths = []
for url in urls:
local_path = download_file_from_url(url, session_temp_dir)
if local_path:
downloaded_paths.append(local_path)
file_paths.extend(downloaded_paths)
print(f"Downloaded {len(downloaded_paths)} files from URLs for session {session_id}")
# Track errors for learning
previous_errors = []
generated_codes = []
# Main retry loop
for attempt in range(1, 4): # 3 attempts
print(f"\n{'='*50}")
print(f"MAIN ATTEMPT {attempt}/3 (Session: {session_id})")
print(f"{'='*50}")
# Generate code
print("Generating code...")
generated_code = generate_code_with_openrouter(
instruction,
file_paths,
previous_errors=previous_errors if attempt > 1 else None,
attempt=attempt
)
if len(generated_code) < 20:
return f"کد ضعیف تولید شد: {generated_code}", None
generated_codes.append(generated_code)
print(f"Generated code preview: {generated_code[:200]}...")
# Try to execute
success, output, file_path = execute_code_with_retry(generated_code, session_id, max_attempts=2)
if success:
# Success!
result_text = f"✅ Success on attempt {attempt}!\n\n"
result_text += f"Generated Code:\n```python\n{generated_code}\n```\n\n"
result_text += f"Output:\n{output}"
session_manager.cleanup_session(session_id) # Cleanup on success
return result_text, file_path
else:
# Analyze error
print(f"\n❌ Attempt {attempt} failed")
error_analysis = ErrorAnalyzer.analyze_error(output, generated_code)
previous_errors.append(error_analysis)
print(f"Error type: {error_analysis['error_type']}")
print(f"Suggestions: {', '.join(error_analysis['suggestions'])}")
# If this was the last attempt
if attempt == 3:
error_report = f"❌ Failed after {attempt} attempts.\n\n"
error_report += "Error History:\n"
for i, err in enumerate(previous_errors, 1):
error_report += f"\nAttempt {i}:\n"
error_report += f"- Error type: {err['error_type']}\n"
error_report += f"- Details: {err['original_error'][:200]}...\n"
error_report += f"\n\nLast generated code:\n```python\n{generated_code}\n```"
session_manager.cleanup_session(session_id) # Cleanup on failure
return error_report, None
session_manager.cleanup_session(session_id)
return "Unexpected end of retry loop", None
except Exception as e:
error_msg = f"General error: {type(e).__name__}: {e}\nFull traceback: {traceback.format_exc()}"
print(error_msg)
session_manager.cleanup_session(session_id)
return error_msg, None
def process_request_wrapper(instruction, files, urls_input):
"""Wrapper function for Gradio to handle async execution"""
# Submit task to thread pool
future = executor.submit(process_request_async, instruction, files, urls_input)
try:
# Wait for result with timeout
result = future.result(timeout=300) # 5 minutes timeout
return result
except Exception as e:
return f"Error processing request: {str(e)}", None
# Gradio Interface
with gr.Blocks(title="AI File Processor - Self Correcting - Concurrent") as demo:
gr.Markdown("""
# 🤖 AI File Processor - Self Correcting Edition
""")
with gr.Row():
instruction = gr.Textbox(
label="دستور",
lines=2,
placeholder="مثال: یک نمودار دایره‌ای از داده‌های فروش بکش",
rtl=True
)
with gr.Row():
files = gr.File(file_count="multiple", label="فایل‌های آپلود شده (اختیاری)")
urls = gr.Textbox(
label="لینک فایل‌ها (جدا با کاما، اختیاری)",
placeholder="https://example.com/file1.csv, https://example.com/file2.jpg",
info="لینک‌ها دانلود خواهند شد و حجم فایل چک نمی‌شود."
)
btn = gr.Button("🚀 اجرا", variant="primary")
with gr.Row():
output = gr.Textbox(label="نتیجه", lines=15)
file_out = gr.File(label="فایل خروجی (در صورت تولید)")
# Examples
gr.Examples(
examples=[
["یک فایل اکسل با 100 محصول فروشگاهی شامل نام، قیمت و موجودی بساز", None, None],
["یک نمودار میله‌ای از داده‌های تصادفی رسم کن", None, None],
["یک تصویر 500x500 پیکسل با رنگ‌های تصادفی بساز", None, None],
["این فایل را به فرمت JSON تبدیل کن", None, "https://example.com/sample.csv"],
],
inputs=[instruction, files, urls],
)
btn.click(fn=process_request_wrapper, inputs=[instruction, files, urls], outputs=[output, file_out])
if __name__ == "__main__":
# Cleanup old sessions on startup
session_manager.cleanup_old_sessions()
# Launch with concurrency settings - روش سازگار با تمام نسخه‌های Gradio
try:
# روش ۱: برای نسخه‌های جدید Gradio
demo.queue(max_size=20)
except TypeError:
try:
# روش ۲: برای نسخه‌های قدیمی‌تر
demo.queue(concurrency_count=5, max_size=20)
except:
# روش ۳: بدون پارامتر
demo.queue()
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)