#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MoneyPack Security Suite v5.0 Advanced Anti-Malware & RAT Detection Terminal Edition - Zero False Positives - Kill On Sight Author: MoneyPack """ import os import sys import time import json import math import hashlib import shutil import socket import sqlite3 import threading import subprocess import re import platform import datetime from pathlib import Path from collections import defaultdict def install_deps(): required = ["psutil", "rich"] for pkg in required: try: __import__(pkg) except ImportError: print(f"[*] Installing {pkg}...") subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"]) install_deps() import psutil from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn from rich.text import Text from rich.align import Align from rich.rule import Rule from rich.prompt import Prompt, Confirm from rich.style import Style from rich.box import DOUBLE_EDGE, ROUNDED from rich import box console = Console() # ============================================================================ # MONEYPACK BRANDING # ============================================================================ MONEYPACK_LOGO = ( "[bold cyan]\n" " \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557\n" " \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d\n" "[/bold cyan][bold bright_green]\n" " \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557\n" " \u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u255a\u2588\u2588\u2557 \u2588\u2588\u2554\u255d\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551 \u2588\u2588\u2554\u255d\n" " \u2588\u2588\u2554\u2588\u2588\u2588\u2588\u2554\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2554\u2588\u2588\u2557 \u2588\u2588\u2551\u2588\u2588\u2588\u2588\u2588\u2557 \u255a\u2588\u2588\u2588\u2588\u2554\u255d \u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2554\u255d\n" " \u2588\u2588\u2551\u255a\u2588\u2588\u2554\u255d\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u255d \u255a\u2588\u2588\u2554\u255d \u2588\u2588\u2554\u2550\u2550\u2550\u255d \u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2554\u2550\u2588\u2588\u2557\n" " \u2588\u2588\u2551 \u255a\u2550\u255d \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2551 \u2588\u2588\u2557\n" " \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u255d \u255a\u2550\u255d\n" "[/bold bright_green]\n" "[bold bright_cyan] +================================================+[/bold bright_cyan]\n" "[bold bright_cyan] |[/bold bright_cyan][bold bright_white] S E C U R I T Y S U I T E v 5 . 0 [/bold bright_white][bold bright_cyan]|[/bold bright_cyan]\n" "[bold bright_cyan] |[/bold bright_cyan][bold bright_green] Created by MoneyPack [/bold bright_green][bold bright_cyan]|[/bold bright_cyan]\n" "[bold bright_cyan] |[/bold bright_cyan][dim] Anti-Malware - RAT Detection - Zero FP [/dim][bold bright_cyan]|[/bold bright_cyan]\n" "[bold bright_cyan] +================================================+[/bold bright_cyan]\n" ) MINI_LOGO = "[bold bright_green]+ MoneyPack[/bold bright_green] [bold cyan]Security[/bold cyan]" # ============================================================================ # CONFIGURATION # ============================================================================ class Config: APP_NAME = "MoneyPack Security Suite" VERSION = "5.0.0" AUTHOR = "MoneyPack" BASE_DIR = Path.home() / ".moneypack_security" QUARANTINE_DIR = BASE_DIR / "quarantine" LOG_DIR = BASE_DIR / "logs" DB_PATH = BASE_DIR / "moneypack.db" MAX_FILE_SIZE_MB = 50 RAT_PROCESS_NAMES = ["darkcomet","njrat","netbus","subseven","back_orifice","poison_ivy","gh0st","xtreme_rat","blackshades","cybergate","havex","remcos","quasar","asyncrat","nanocore","orcus","luminosity","imminent_monitor","warzone","revenge_rat","limerat","dcrat","xworm","redline","raccoon","vidar","mars_stealer","aurora_stealer","stealc"] SUSPICIOUS_PORTS = [1177,1234,1243,1524,4444,5555,6666,6667,6669,7777,8787,9999,12345,12346,31337,27374,54321,65535] DANGEROUS_EXTENSIONS = {".exe",".scr",".pif",".bat",".cmd",".com",".vbs",".vbe",".js",".jse",".wsh",".wsf",".ps1",".hta",".cpl",".msi"} # SAFE PATHS - Never flag files in these directories SAFE_PATHS = [] @classmethod def init_safe_paths(cls): """Build list of trusted system paths that should NEVER be flagged.""" cls.SAFE_PATHS = [] # Python installation python_dir = os.path.dirname(sys.executable) cls.SAFE_PATHS.append(python_dir.lower()) # Site-packages for p in sys.path: if "site-packages" in p or "lib" in p.lower(): cls.SAFE_PATHS.append(p.lower()) if platform.system() == "Windows": cls.SAFE_PATHS.extend([ os.environ.get("WINDIR", r"C:\Windows").lower(), os.environ.get("PROGRAMFILES", r"C:\Program Files").lower(), os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)").lower(), os.environ.get("PROGRAMDATA", r"C:\ProgramData").lower(), (Path.home() / "AppData/Local/Programs/Python").as_posix().lower(), (Path.home() / "AppData/Local/Microsoft").as_posix().lower(), (Path.home() / "AppData/Local/Programs").as_posix().lower(), r"c:\program files\dotnet".lower(), r"c:\program files\windowsapps".lower(), ]) elif platform.system() == "Linux": cls.SAFE_PATHS.extend(["/usr/lib","/usr/bin","/usr/share","/usr/local","/lib","/opt"]) elif platform.system() == "Darwin": cls.SAFE_PATHS.extend(["/usr/lib","/usr/bin","/Library","/System","/Applications"]) # Filter empty cls.SAFE_PATHS = [p for p in cls.SAFE_PATHS if p] Config.init_safe_paths() # ============================================================================ # DATABASE # ============================================================================ class Database: def __init__(self): Config.BASE_DIR.mkdir(parents=True, exist_ok=True) Config.QUARANTINE_DIR.mkdir(parents=True, exist_ok=True) Config.LOG_DIR.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(str(Config.DB_PATH), check_same_thread=False) self.conn.row_factory = sqlite3.Row self.lock = threading.Lock() self.conn.executescript("CREATE TABLE IF NOT EXISTS threats(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,threat_type TEXT,severity TEXT,description TEXT,file_path TEXT,process_name TEXT,process_pid INTEGER,action_taken TEXT,hash_sha256 TEXT);CREATE TABLE IF NOT EXISTS quarantine(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,original_path TEXT,quarantine_path TEXT,hash_sha256 TEXT,threat_type TEXT,file_size INTEGER);CREATE TABLE IF NOT EXISTS scans(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,scan_type TEXT,target TEXT,files_scanned INTEGER,threats_found INTEGER,duration REAL);") self.conn.commit() def log_threat(self, **kw): with self.lock: self.conn.execute("INSERT INTO threats(timestamp,threat_type,severity,description,file_path,process_name,process_pid,action_taken,hash_sha256) VALUES(?,?,?,?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),kw.get("threat_type"),kw.get("severity"),kw.get("description"),kw.get("file_path"),kw.get("process_name"),kw.get("process_pid"),kw.get("action_taken"),kw.get("hash_sha256"))) self.conn.commit() def get_threats(self, limit=50): return [dict(r) for r in self.conn.execute("SELECT * FROM threats ORDER BY timestamp DESC LIMIT ?",(limit,)).fetchall()] def get_threat_count(self): return self.conn.execute("SELECT COUNT(*) as c FROM threats").fetchone()["c"] def get_today_count(self): return self.conn.execute("SELECT COUNT(*) as c FROM threats WHERE timestamp >= ?",(datetime.date.today().isoformat(),)).fetchone()["c"] def log_scan(self, scan_type, target, files, threats, duration): with self.lock: self.conn.execute("INSERT INTO scans(timestamp,scan_type,target,files_scanned,threats_found,duration) VALUES(?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),scan_type,target,files,threats,duration)) self.conn.commit() def log_quarantine(self, original, qpath, h, ttype, size): with self.lock: self.conn.execute("INSERT INTO quarantine(timestamp,original_path,quarantine_path,hash_sha256,threat_type,file_size) VALUES(?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),original,qpath,h,ttype,size)) self.conn.commit() def get_quarantined(self): return [dict(r) for r in self.conn.execute("SELECT * FROM quarantine ORDER BY timestamp DESC").fetchall()] # ============================================================================ # SCANNER - ZERO FALSE POSITIVES # ============================================================================ def is_safe_path(filepath): """Check if file is in a trusted system directory.""" fp_lower = str(filepath).lower().replace("\\", "/") for safe in Config.SAFE_PATHS: safe_normalized = safe.replace("\\", "/") if fp_lower.startswith(safe_normalized): return True return False def is_legit_dotnet_dll(filename): """Check if a .dll file is a legitimate .NET/system library.""" name_lower = filename.lower() # .NET naming convention: Namespace.SubNamespace.dll # These are NEVER malware double-extensions legit_patterns = [ ".net.", ".core.", ".runtime.", ".compiler.", ".service.", ".client.", ".server.", ".library.", ".authentication.", ".authorization.", ".extensions.", ".configuration.", ".abstractions.", ".primitives.", ".interop.", ".resources.", ".drawing.", ".data.", ".web.", ".http.", ".security.", ".diagnostics.", ".collections.", ".threading.", ".io.", ".text.", ".linq.", ".xml.", ".json.", ".spatial.", "system.", "microsoft.", "newtonsoft.", "nuget.", ] for pattern in legit_patterns: if pattern in name_lower: return True return False class Scanner: def __init__(self, db): self.db = db self._stop = False def stop(self): self._stop = True def reset(self): self._stop = False def hash_file(self, path): sha256 = hashlib.sha256() try: with open(path, "rb") as f: while chunk := f.read(65536): sha256.update(chunk) return sha256.hexdigest() except (PermissionError, OSError): return None def scan_file(self, filepath): """Scan a file - ONLY flags genuinely suspicious files, never system files.""" result = {"path": str(filepath), "clean": True, "threats": [], "hash": None} try: # RULE 1: Skip files in trusted system directories if is_safe_path(filepath): return result size = os.path.getsize(filepath) if size == 0 or size > Config.MAX_FILE_SIZE_MB * 1024 * 1024: return result filename = os.path.basename(filepath) ext = os.path.splitext(filename)[1].lower() # RULE 2: Only scan dangerous file types # Don't scan .py, .txt, .dll in system folders, .json, etc. if ext not in Config.DANGEROUS_EXTENSIONS: # Exception: scan unknown files in user directories (Downloads, Desktop, Temp) user_dirs = [ str(Path.home() / "Downloads").lower(), str(Path.home() / "Desktop").lower(), str(Path.home() / "Documents").lower(), ] if platform.system() == "Windows": user_dirs.append(str(Path.home() / "AppData/Local/Temp").lower()) else: user_dirs.extend(["/tmp", "/dev/shm"]) in_user_dir = any(str(filepath).lower().startswith(d) for d in user_dirs) if not in_user_dir: return result # RULE 3: Double extension - only flag if NOT a known .NET/system naming pattern parts = filename.split(".") if len(parts) >= 3: real_ext = "." + parts[-1].lower() if real_ext in Config.DANGEROUS_EXTENSIONS: # Check if it's legit .NET naming if not is_legit_dotnet_dll(filename): # This IS suspicious - something.pdf.exe type trick result["clean"] = False result["threats"].append({ "type": "double_extension", "severity": "CRITICAL", "desc": f"DISGUISED FILE: appears as .{parts[-2]} but is actually {real_ext}" }) # RULE 4: For executables, do deep content scan if ext in Config.DANGEROUS_EXTENSIONS: fhash = self.hash_file(filepath) result["hash"] = fhash with open(filepath, "rb") as f: content = f.read(min(size, 2 * 1024 * 1024)) # Only flag COMBINATIONS of suspicious indicators (not single patterns) # A single pattern like "cmd.exe /c" is normal in many tools # We need MULTIPLE red flags together red_flags = 0 descriptions = [] # Encoded PowerShell (almost always malicious) if b"powershell" in content.lower() and b"-enc" in content.lower(): red_flags += 3 descriptions.append("Encoded PowerShell execution") if b"powershell" in content.lower() and b"hidden" in content.lower() and b"bypass" in content.lower(): red_flags += 3 descriptions.append("Hidden PowerShell with bypass") # Download and execute pattern (very suspicious) if (b"DownloadString" in content or b"DownloadFile" in content) and b"Invoke" in content: red_flags += 3 descriptions.append("Downloads and executes remote code") if b"IEX(New-Object Net.WebClient)" in content: red_flags += 4 descriptions.append("PowerShell download cradle") # Registry persistence + network = RAT if b"CurrentVersion\\Run" in content and (b"socket" in content.lower() or b"connect" in content.lower()): red_flags += 3 descriptions.append("Registry persistence with network capability") # Keylogging indicators if b"GetAsyncKeyState" in content and b"SetWindowsHookEx" in content: red_flags += 4 descriptions.append("Keylogger APIs detected") # Process injection if b"VirtualAllocEx" in content and b"WriteProcessMemory" in content and b"CreateRemoteThread" in content: red_flags += 4 descriptions.append("Process injection technique") # Screen/webcam capture + network if (b"screenCapture" in content or b"webcamCapture" in content) and b"send" in content.lower(): red_flags += 3 descriptions.append("Screen/webcam capture with exfiltration") # Ransomware indicators if b"encrypt" in content.lower() and b"bitcoin" in content.lower(): red_flags += 4 descriptions.append("Ransomware indicators (encryption + bitcoin)") if b"vssadmin" in content.lower() and b"delete" in content.lower() and b"shadows" in content.lower(): red_flags += 4 descriptions.append("Deletes shadow copies (ransomware)") # Cryptominer if b"stratum+tcp" in content.lower() or b"xmrig" in content.lower() or b"monero" in content.lower(): red_flags += 4 descriptions.append("Cryptocurrency miner detected") # PE packed with high entropy (only flag if ALSO has other indicators) if content[:2] == b"MZ" and red_flags >= 1: freq = defaultdict(int) for b in content[:4096]: freq[b] += 1 entropy = -sum((c/4096)*math.log2(c/4096) for c in freq.values() if c > 0) if entropy > 7.8: red_flags += 2 descriptions.append(f"Packed/encrypted binary (entropy: {entropy:.1f})") # VERDICT: Need 3+ red flags to confirm as malware if red_flags >= 3: result["clean"] = False result["threats"].append({ "type": "malware", "severity": "CRITICAL", "desc": " | ".join(descriptions) }) except (PermissionError, OSError): pass return result def scan_directory(self, directory, progress=None): self._stop = False results = {"threats": [], "scanned": 0, "threat_count": 0} start = time.time() files = [] for root, dirs, fnames in os.walk(directory): if self._stop: break dirs[:] = [d for d in dirs if d not in {".git","node_modules","__pycache__",".venv","venv",".moneypack_security"}] for f in fnames: files.append(os.path.join(root, f)) total = len(files) for i, fp in enumerate(files): if self._stop: break r = self.scan_file(fp) results["scanned"] += 1 if not r["clean"]: results["threats"].append(r) results["threat_count"] += 1 for t in r["threats"]: self.db.log_threat(threat_type=t["type"],severity=t["severity"],description=t["desc"],file_path=r["path"],hash_sha256=r["hash"]) if progress: progress(i+1, total, results["threat_count"]) duration = time.time() - start self.db.log_scan("directory", str(directory), results["scanned"], results["threat_count"], duration) results["duration"] = round(duration, 2) results["total"] = total return results # ============================================================================ # PROCESS MONITOR # ============================================================================ class ProcessMonitor: def __init__(self, db): self.db = db def get_suspicious(self): suspicious = [] for proc in psutil.process_iter(['pid','name','exe','cmdline','username','memory_info']): try: info = proc.info alerts = [] severity = "LOW" name = (info.get('name') or '').lower() exe = (info.get('exe') or '').lower() cmd = ' '.join(info.get('cmdline') or []).lower() for rat in Config.RAT_PROCESS_NAMES: if rat in name or rat in exe: alerts.append(f"Known RAT: {rat}") severity = "CRITICAL" patterns = [(r"powershell.*-enc","Encoded PowerShell"),(r"powershell.*hidden.*bypass","Hidden PowerShell bypass"),(r"certutil.*-decode","Certutil decode"),(r"mshta.*http","MSHTA remote execution"),(r"vssadmin.*delete.*shadows","Shadow copy deletion")] for pat, desc in patterns: if re.search(pat, cmd): alerts.append(desc) severity = "CRITICAL" # Only flag temp-dir processes if they also have other indicators if alerts and info.get('exe'): temp = ["\\temp\\","\\appdata\\local\\temp\\","/tmp/","/dev/shm/"] for t in temp: if t in exe: alerts.append("Running from temp directory") break try: for conn in proc.net_connections(): if conn.status == 'ESTABLISHED' and conn.raddr: if conn.raddr.port in Config.SUSPICIOUS_PORTS: alerts.append(f"Connected to RAT port: {conn.raddr.ip}:{conn.raddr.port}") severity = "CRITICAL" except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError): pass if alerts: mem = info.get('memory_info') suspicious.append({"pid":info['pid'],"name":info.get('name','?'),"exe":info.get('exe') or 'N/A',"user":info.get('username','N/A'),"alerts":alerts,"severity":severity,"mem_mb":round(mem.rss/1048576,1) if mem else 0}) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): continue return suspicious def kill(self, pid): try: p = psutil.Process(pid) n = p.name() p.terminate() p.wait(timeout=5) self.db.log_threat(threat_type="process_killed",severity="HIGH",description=f"Killed: {n} (PID {pid})",process_name=n,process_pid=pid,action_taken="terminated") return True, f"Terminated {n} (PID {pid})" except psutil.NoSuchProcess: return False, "Process already gone" except psutil.AccessDenied: try: psutil.Process(pid).kill() return True, f"Force-killed PID {pid}" except Exception as e: return False, f"Access denied: {e}" except Exception as e: return False, str(e) # ============================================================================ # NETWORK MONITOR # ============================================================================ class NetworkMonitor: def __init__(self, db): self.db = db def analyze(self): suspicious = [] for c in psutil.net_connections(kind='inet'): if not c.raddr: continue try: proc = psutil.Process(c.pid) if c.pid else None name = proc.name() if proc else "System" except (psutil.NoSuchProcess, psutil.AccessDenied): name = "Unknown" alerts = [] if c.raddr.port in Config.SUSPICIOUS_PORTS: alerts.append(f"Known RAT port: {c.raddr.port}") if name.lower() in ("bash","sh","cmd.exe","powershell.exe") and c.status == "ESTABLISHED": alerts.append("POSSIBLE REVERSE SHELL") if alerts: suspicious.append({"pid":c.pid or 0,"process":name,"remote":f"{c.raddr.ip}:{c.raddr.port}","alerts":alerts,"severity":"CRITICAL"}) return suspicious # ============================================================================ # QUARANTINE # ============================================================================ class Quarantine: def __init__(self, db): self.db = db def quarantine_file(self, filepath): fp = Path(filepath) if not fp.exists(): return False, "File not found" try: h = hashlib.sha256(fp.read_bytes()).hexdigest() size = fp.stat().st_size ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") qname = f"{ts}_{h[:12]}_{fp.name}.quarantined" qpath = Config.QUARANTINE_DIR / qname shutil.move(str(fp), str(qpath)) os.chmod(str(qpath), 0o000) meta = {"original":str(fp),"hash":h,"time":datetime.datetime.now().isoformat(),"size":size} (qpath.with_suffix(".quarantined.json")).write_text(json.dumps(meta,indent=2)) self.db.log_quarantine(str(fp),str(qpath),h,"malware",size) return True, f"Quarantined: {fp.name}" except Exception as e: return False, str(e) def delete_file(self, filepath): """Permanently delete a malicious file.""" fp = Path(filepath) if not fp.exists(): return False, "File not found" try: os.chmod(str(fp), 0o777) fp.unlink() return True, f"DELETED: {fp.name}" except Exception as e: return False, str(e) def list_all(self): items = [] for f in Config.QUARANTINE_DIR.iterdir(): if f.suffix == ".quarantined": meta_path = f.with_suffix(".quarantined.json") meta = {} if meta_path.exists(): try: meta = json.loads(meta_path.read_text()) except Exception: pass items.append({"name":f.name,"original":meta.get("original","?"),"time":meta.get("time","?")[:19],"size":meta.get("size",0)}) return items # ============================================================================ # MAIN APP # ============================================================================ class MoneyPackApp: def __init__(self): self.db = Database() self.scanner = Scanner(self.db) self.proc_mon = ProcessMonitor(self.db) self.net_mon = NetworkMonitor(self.db) self.quarantine = Quarantine(self.db) def clear(self): os.system('cls' if os.name == 'nt' else 'clear') def show_banner(self): self.clear() console.print(MONEYPACK_LOGO) def show_status_bar(self): threats = self.db.get_today_count() total = self.db.get_threat_count() status = "[bold bright_green]* PROTECTED[/bold bright_green]" if threats == 0 else "[bold red]* THREATS ACTIVE[/bold red]" bar = Table(box=box.SIMPLE_HEAVY,show_header=False,style="bright_cyan",border_style="bright_cyan",pad_edge=False) bar.add_column(ratio=1) bar.add_column(ratio=1) bar.add_column(ratio=1) bar.add_row(f" {status}",f"[bright_cyan]Threats Today:[/bright_cyan] [bold white]{threats}[/bold white]",f"[bright_cyan]Total Killed:[/bright_cyan] [bold white]{total}[/bold white]") console.print(bar) console.print() def main_menu(self): menu = Table(title=f"\n{MINI_LOGO} [dim]|[/dim] [bold bright_cyan]Command Center[/bold bright_cyan]",box=DOUBLE_EDGE,border_style="bright_cyan",title_style="bold",show_header=True,header_style="bold bright_green") menu.add_column("Key",style="bold bright_green",width=6,justify="center") menu.add_column("Action",style="bold white",width=28) menu.add_column("Description",style="dim",width=45) for k,a,d in [("1","Quick Scan","Scan Downloads, Desktop, Temp"),("2","Full Scan","Deep scan any directory"),("3","Process Hunter","Find & kill RATs/malware processes"),("4","Network Guard","Detect C2/reverse shells"),("5","Threat Log","View detected threats"),("6","Quarantine","View isolated files"),("7","Real-Time Guard","Continuous monitoring (Ctrl+C to stop)"),("8","System Status","Full security overview"),("0","Exit","Shutdown MoneyPack")]: menu.add_row(k,a,d) console.print(menu) console.print() def run(self): while True: self.show_banner() self.show_status_bar() self.main_menu() choice = Prompt.ask("[bold bright_green]MoneyPack[/bold bright_green] [bold cyan]>[/bold cyan]",choices=["0","1","2","3","4","5","6","7","8"],default="1") if choice == "0": self.clear() console.print(Panel("[bold bright_green]MoneyPack Security Suite[/bold bright_green]\n\n[bright_cyan]Stay safe. - MoneyPack[/bright_cyan]",border_style="bright_green",box=DOUBLE_EDGE)) break elif choice == "1": self._quick_scan() elif choice == "2": self._full_scan() elif choice == "3": self._process_hunter() elif choice == "4": self._network_guard() elif choice == "5": self._threat_log() elif choice == "6": self._quarantine_view() elif choice == "7": self._realtime_guard() elif choice == "8": self._system_status() if choice != "0": console.print() Prompt.ask("[dim]Press Enter to continue[/dim]") def _handle_threats(self, threats): """Show threats and offer to KILL/QUARANTINE them.""" if not threats: return console.print(f"\n[bold red]{'='*60}[/bold red]") console.print(f"[bold red] {len(threats)} CONFIRMED THREAT(S) FOUND[/bold red]") console.print(f"[bold red]{'='*60}[/bold red]\n") table = Table(box=ROUNDED,border_style="red") table.add_column("#",style="bold white",width=4) table.add_column("File",style="bold white") table.add_column("Threat",style="bold red") table.add_column("Path",style="dim") for i, t in enumerate(threats, 1): desc = t["threats"][0]["desc"] if t["threats"] else "Unknown" table.add_row(str(i), os.path.basename(t["path"]), desc[:40], t["path"][:50]) console.print(table) console.print("\n[bold bright_cyan]What do you want to do?[/bold bright_cyan]") console.print("[bold red] [K] KILL ALL[/bold red] - Delete every threat permanently") console.print("[bold yellow] [Q] QUARANTINE ALL[/bold yellow] - Isolate (can restore later)") console.print("[bold white] [S] SELECT[/bold white] - Choose one by number") console.print("[dim] [N] Nothing - Leave them[/dim]") action = Prompt.ask("\n[bold bright_green]MoneyPack[/bold bright_green] [bold cyan]action[/bold cyan]",choices=["k","q","s","n"],default="q") if action == "k": for t in threats: ok, msg = self.quarantine.delete_file(t["path"]) if ok: console.print(f" [bold red]KILLED:[/bold red] {os.path.basename(t['path'])}") self.db.log_threat(threat_type="killed",severity="CRITICAL",description=f"Deleted: {t['path']}",file_path=t["path"],action_taken="deleted") else: console.print(f" [bold yellow]FAILED:[/bold yellow] {msg}") console.print(f"\n[bold bright_green]All threats eliminated.[/bold bright_green]") elif action == "q": for t in threats: ok, msg = self.quarantine.quarantine_file(t["path"]) if ok: console.print(f" [bold yellow]QUARANTINED:[/bold yellow] {os.path.basename(t['path'])}") else: console.print(f" [bold red]FAILED:[/bold red] {msg}") console.print(f"\n[bold bright_green]Threats isolated in quarantine vault.[/bold bright_green]") elif action == "s": idx = int(Prompt.ask("[bright_cyan]Enter threat number[/bright_cyan]")) - 1 if 0 <= idx < len(threats): sub = Prompt.ask("[bright_cyan][K]ill or [Q]uarantine?[/bright_cyan]",choices=["k","q"],default="k") t = threats[idx] if sub == "k": ok, msg = self.quarantine.delete_file(t["path"]) console.print(f" [bold red]{'KILLED' if ok else 'FAILED'}:[/bold red] {msg}") else: ok, msg = self.quarantine.quarantine_file(t["path"]) console.print(f" [bold yellow]{'QUARANTINED' if ok else 'FAILED'}:[/bold yellow] {msg}") def _quick_scan(self): self.clear() console.print(Panel("[bold bright_green]Quick Scan[/bold bright_green]\n[dim]Scanning user directories only - zero false positives[/dim]",border_style="bright_green",box=DOUBLE_EDGE)) paths = [Path.home()/"Downloads",Path.home()/"Desktop",Path.home()/"Documents"] if platform.system() == "Windows": paths.append(Path.home()/"AppData/Local/Temp") elif platform.system() == "Linux": paths.extend([Path("/tmp"),Path("/dev/shm")]) all_threats = [] with Progress(SpinnerColumn("dots",style="bright_green"),TextColumn("[bold bright_cyan]{task.description}[/bold bright_cyan]"),BarColumn(bar_width=40,complete_style="bright_green"),TextColumn("[bright_green]{task.percentage:>3.0f}%[/bright_green]"),TimeElapsedColumn(),console=console) as progress: for path in paths: if path.exists(): task = progress.add_task(f"Scanning {path.name}...",total=100) def cb(done,total,threats): if total > 0: progress.update(task,completed=int(done/total*100)) result = self.scanner.scan_directory(str(path),progress=cb) progress.update(task,completed=100) all_threats.extend(result["threats"]) console.print() if not all_threats: console.print(Panel("[bold bright_green]SYSTEM CLEAN[/bold bright_green]\n[bright_green]No threats found. You're safe.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE)) else: self._handle_threats(all_threats) def _full_scan(self): self.clear() console.print(Panel("[bold bright_cyan]Full Scan[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE)) target = Prompt.ask("[bright_cyan]Path to scan[/bright_cyan]",default=str(Path.home())) if not os.path.exists(target): console.print("[bold red]Path not found.[/bold red]") return console.print(f"[bright_cyan]Scanning:[/bright_cyan] {target}") console.print(Rule(style="bright_cyan")) with Progress(SpinnerColumn("dots12",style="bright_cyan"),TextColumn("[bold bright_cyan]{task.description}[/bold bright_cyan]"),BarColumn(bar_width=50,complete_style="bright_green"),TextColumn("[bright_green]{task.percentage:>3.0f}%[/bright_green]"),TextColumn("[dim]{task.fields[info]}[/dim]"),TimeElapsedColumn(),console=console) as progress: task = progress.add_task("Scanning...",total=100,info="") def cb(done,total,threats): if total > 0: progress.update(task,completed=int(done/total*100),info=f"{done}/{total} | {threats} threats") self.scanner.reset() result = self.scanner.scan_directory(target,progress=cb) progress.update(task,completed=100,info="Done") console.print() console.print(f"[bright_cyan]Scanned:[/bright_cyan] {result['scanned']} files in {result['duration']}s") if not result["threats"]: console.print(Panel("[bold bright_green]ALL CLEAN - No threats found[/bold bright_green]",border_style="bright_green",box=DOUBLE_EDGE)) else: self._handle_threats(result["threats"]) def _process_hunter(self): self.clear() console.print(Panel("[bold bright_cyan]Process Hunter[/bold bright_cyan]\n[dim]Finding RATs and malware processes...[/dim]",border_style="bright_cyan",box=DOUBLE_EDGE)) with console.status("[bright_cyan]Hunting...[/bright_cyan]",spinner="dots12"): suspicious = self.proc_mon.get_suspicious() time.sleep(1) if not suspicious: console.print(Panel("[bold bright_green]NO MALICIOUS PROCESSES[/bold bright_green]\n[bright_green]All processes are clean.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE)) return console.print(f"\n[bold red]{len(suspicious)} THREAT(S) FOUND:[/bold red]\n") for proc in suspicious: color = "red" if proc["severity"]=="CRITICAL" else "yellow" content = f"[white]PID:[/white] {proc['pid']} [white]User:[/white] {proc['user']} [white]Mem:[/white] {proc['mem_mb']}MB\n[white]Path:[/white] {proc['exe']}\n" for a in proc["alerts"]: content += f"\n [bold {color}]! {a}[/bold {color}]" console.print(Panel(content,title=f"[bold {color}][{proc['severity']}] {proc['name']}[/bold {color}]",border_style=color,box=ROUNDED)) console.print() if Confirm.ask("[bold red]KILL a process?[/bold red]"): pid_str = Prompt.ask("[bright_cyan]Enter PID to kill[/bright_cyan]") try: ok, msg = self.proc_mon.kill(int(pid_str)) console.print(f"[bold {'bright_green' if ok else 'red'}]{msg}[/bold {'bright_green' if ok else 'red'}]") except ValueError: console.print("[red]Invalid PID[/red]") def _network_guard(self): self.clear() console.print(Panel("[bold bright_cyan]Network Guard[/bold bright_cyan]\n[dim]Checking for C2 callbacks & reverse shells...[/dim]",border_style="bright_cyan",box=DOUBLE_EDGE)) with console.status("[bright_cyan]Analyzing...[/bright_cyan]",spinner="dots12"): suspicious = self.net_mon.analyze() time.sleep(0.5) if not suspicious: console.print(Panel("[bold bright_green]NETWORK CLEAN[/bold bright_green]\n[bright_green]No suspicious connections.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE)) else: for c in suspicious: console.print(Panel(f"[white]Process:[/white] {c['process']} (PID {c['pid']})\n[white]Remote:[/white] {c['remote']}\n[bold red]{' | '.join(c['alerts'])}[/bold red]",title="[bold red]CRITICAL[/bold red]",border_style="red",box=ROUNDED)) if Confirm.ask("\n[bold red]Kill the process?[/bold red]"): pid_str = Prompt.ask("[bright_cyan]PID[/bright_cyan]") try: ok, msg = self.proc_mon.kill(int(pid_str)) console.print(f"[bold {'bright_green' if ok else 'red'}]{msg}[/bold {'bright_green' if ok else 'red'}]") except ValueError: console.print("[red]Invalid[/red]") def _threat_log(self): self.clear() console.print(Panel("[bold bright_cyan]Threat Log[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE)) threats = self.db.get_threats(30) if not threats: console.print("[dim]No threats recorded.[/dim]") return table = Table(box=ROUNDED,border_style="bright_cyan") table.add_column("Time",style="dim",width=19) table.add_column("Severity",width=10) table.add_column("Type",style="bright_cyan") table.add_column("Description",style="white") table.add_column("Action",style="bold bright_green") for t in threats: sev = t.get("severity","?") color = "red" if sev=="CRITICAL" else "yellow" if sev=="HIGH" else "white" table.add_row(t.get("timestamp","")[:19],f"[{color}]{sev}[/{color}]",t.get("threat_type",""),str(t.get("description",""))[:40],t.get("action_taken","detected")) console.print(table) def _quarantine_view(self): self.clear() console.print(Panel("[bold bright_cyan]Quarantine Vault[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE)) items = self.quarantine.list_all() if not items: console.print("[dim]Empty - no quarantined files.[/dim]") return table = Table(box=ROUNDED,border_style="bright_cyan") table.add_column("#",width=4) table.add_column("File",style="white") table.add_column("Date",style="dim") for i, item in enumerate(items, 1): table.add_row(str(i),os.path.basename(item["original"]),item["time"]) console.print(table) def _realtime_guard(self): self.clear() console.print(Panel("[bold bright_green]Real-Time Guard ACTIVE[/bold bright_green]\n[dim]Ctrl+C to stop[/dim]",border_style="bright_green",box=DOUBLE_EDGE)) try: while True: ts = datetime.datetime.now().strftime("%H:%M:%S") sp = self.proc_mon.get_suspicious() for p in sp: console.print(f"[bold red][{ts}] THREAT: {p['name']} PID:{p['pid']} - {p['alerts'][0]}[/bold red]") self.db.log_threat(threat_type="realtime",severity="CRITICAL",description=p["alerts"][0],process_name=p["name"],process_pid=p["pid"]) sn = self.net_mon.analyze() for c in sn: console.print(f"[bold red][{ts}] NETWORK: {c['process']} -> {c['remote']}[/bold red]") if not sp and not sn: console.print(f"[dim][{ts}] Clear[/dim]") time.sleep(5) except KeyboardInterrupt: console.print("\n[bold bright_green]Guard stopped.[/bold bright_green]") def _system_status(self): self.clear() console.print(Panel("[bold bright_cyan]System Status[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE)) cpu = psutil.cpu_percent(interval=1) mem = psutil.virtual_memory() st = Table(box=ROUNDED,border_style="bright_cyan") st.add_column("",style="bright_cyan",width=20) st.add_column("",style="bold white") st.add_row("OS",f"{platform.system()} {platform.release()}") st.add_row("CPU",f"{'[red]' if cpu>80 else '[bright_green]'}{cpu}%") st.add_row("RAM",f"{mem.percent}% ({mem.used//1024//1024//1024}/{mem.total//1024//1024//1024} GB)") st.add_row("Processes",str(len(psutil.pids()))) st.add_row("Threats Today",f"[bold red]{self.db.get_today_count()}[/bold red]" if self.db.get_today_count() else "[bright_green]0[/bright_green]") console.print(st) console.print() sp = self.proc_mon.get_suspicious() sn = self.net_mon.analyze() console.print(f" Process Security: {'[bold red]THREAT' if sp else '[bold bright_green]CLEAN'}[/bold {'red' if sp else 'bright_green'}]") console.print(f" Network Security: {'[bold red]THREAT' if sn else '[bold bright_green]CLEAN'}[/bold {'red' if sn else 'bright_green'}]") console.print() console.print(Align.center(Text(f"MoneyPack Security v{Config.VERSION}",style="bold bright_green"))) if __name__ == "__main__": try: app = MoneyPackApp() app.run() except KeyboardInterrupt: console.print("\n[bold bright_green]MoneyPack - Shutdown[/bold bright_green]") except Exception as e: console.print(f"[bold red]Error: {e}[/bold red]") sys.exit(1)