MoneyPack-Security-Suite / moneypack_security.py
MoneyPack's picture
v5.0 - Zero false positives + Kill/Quarantine threats
d2fc87a verified
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MoneyPack Security Suite v5.0
Advanced Anti-Malware & RAT Detection
Terminal Edition - Zero False Positives - Kill On Sight
Author: MoneyPack
"""
import os
import sys
import time
import json
import math
import hashlib
import shutil
import socket
import sqlite3
import threading
import subprocess
import re
import platform
import datetime
from pathlib import Path
from collections import defaultdict
def install_deps():
required = ["psutil", "rich"]
for pkg in required:
try:
__import__(pkg)
except ImportError:
print(f"[*] Installing {pkg}...")
subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"])
install_deps()
import psutil
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn
from rich.text import Text
from rich.align import Align
from rich.rule import Rule
from rich.prompt import Prompt, Confirm
from rich.style import Style
from rich.box import DOUBLE_EDGE, ROUNDED
from rich import box
console = Console()
# ============================================================================
# MONEYPACK BRANDING
# ============================================================================
MONEYPACK_LOGO = (
"[bold cyan]\n"
" \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557\n"
" \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u255d\n"
"[/bold cyan][bold bright_green]\n"
" \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2557 \u2588\u2588\u2557\n"
" \u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u255a\u2588\u2588\u2557 \u2588\u2588\u2554\u255d\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2557\u2588\u2588\u2554\u2550\u2550\u2550\u2550\u255d\u2588\u2588\u2551 \u2588\u2588\u2554\u255d\n"
" \u2588\u2588\u2554\u2588\u2588\u2588\u2588\u2554\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2554\u2588\u2588\u2557 \u2588\u2588\u2551\u2588\u2588\u2588\u2588\u2588\u2557 \u255a\u2588\u2588\u2588\u2588\u2554\u255d \u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2588\u2588\u2588\u2554\u255d\n"
" \u2588\u2588\u2551\u255a\u2588\u2588\u2554\u255d\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2551\u2588\u2588\u2551\u255a\u2588\u2588\u2557\u2588\u2588\u2551\u2588\u2588\u2554\u2550\u2550\u255d \u255a\u2588\u2588\u2554\u255d \u2588\u2588\u2554\u2550\u2550\u2550\u255d \u2588\u2588\u2554\u2550\u2550\u2588\u2588\u2551\u2588\u2588\u2551 \u2588\u2588\u2554\u2550\u2588\u2588\u2557\n"
" \u2588\u2588\u2551 \u255a\u2550\u255d \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2554\u255d\u2588\u2588\u2551 \u255a\u2588\u2588\u2588\u2588\u2551\u2588\u2588\u2588\u2588\u2588\u2588\u2588\u2557 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551 \u2588\u2588\u2551\u255a\u2588\u2588\u2588\u2588\u2588\u2588\u2557\u2588\u2588\u2551 \u2588\u2588\u2557\n"
" \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u255d\u255a\u2550\u2550\u2550\u2550\u2550\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u255d \u255a\u2550\u2550\u2550\u2550\u2550\u255d\u255a\u2550\u255d \u255a\u2550\u255d\n"
"[/bold bright_green]\n"
"[bold bright_cyan] +================================================+[/bold bright_cyan]\n"
"[bold bright_cyan] |[/bold bright_cyan][bold bright_white] S E C U R I T Y S U I T E v 5 . 0 [/bold bright_white][bold bright_cyan]|[/bold bright_cyan]\n"
"[bold bright_cyan] |[/bold bright_cyan][bold bright_green] Created by MoneyPack [/bold bright_green][bold bright_cyan]|[/bold bright_cyan]\n"
"[bold bright_cyan] |[/bold bright_cyan][dim] Anti-Malware - RAT Detection - Zero FP [/dim][bold bright_cyan]|[/bold bright_cyan]\n"
"[bold bright_cyan] +================================================+[/bold bright_cyan]\n"
)
MINI_LOGO = "[bold bright_green]+ MoneyPack[/bold bright_green] [bold cyan]Security[/bold cyan]"
# ============================================================================
# CONFIGURATION
# ============================================================================
class Config:
APP_NAME = "MoneyPack Security Suite"
VERSION = "5.0.0"
AUTHOR = "MoneyPack"
BASE_DIR = Path.home() / ".moneypack_security"
QUARANTINE_DIR = BASE_DIR / "quarantine"
LOG_DIR = BASE_DIR / "logs"
DB_PATH = BASE_DIR / "moneypack.db"
MAX_FILE_SIZE_MB = 50
RAT_PROCESS_NAMES = ["darkcomet","njrat","netbus","subseven","back_orifice","poison_ivy","gh0st","xtreme_rat","blackshades","cybergate","havex","remcos","quasar","asyncrat","nanocore","orcus","luminosity","imminent_monitor","warzone","revenge_rat","limerat","dcrat","xworm","redline","raccoon","vidar","mars_stealer","aurora_stealer","stealc"]
SUSPICIOUS_PORTS = [1177,1234,1243,1524,4444,5555,6666,6667,6669,7777,8787,9999,12345,12346,31337,27374,54321,65535]
DANGEROUS_EXTENSIONS = {".exe",".scr",".pif",".bat",".cmd",".com",".vbs",".vbe",".js",".jse",".wsh",".wsf",".ps1",".hta",".cpl",".msi"}
# SAFE PATHS - Never flag files in these directories
SAFE_PATHS = []
@classmethod
def init_safe_paths(cls):
"""Build list of trusted system paths that should NEVER be flagged."""
cls.SAFE_PATHS = []
# Python installation
python_dir = os.path.dirname(sys.executable)
cls.SAFE_PATHS.append(python_dir.lower())
# Site-packages
for p in sys.path:
if "site-packages" in p or "lib" in p.lower():
cls.SAFE_PATHS.append(p.lower())
if platform.system() == "Windows":
cls.SAFE_PATHS.extend([
os.environ.get("WINDIR", r"C:\Windows").lower(),
os.environ.get("PROGRAMFILES", r"C:\Program Files").lower(),
os.environ.get("PROGRAMFILES(X86)", r"C:\Program Files (x86)").lower(),
os.environ.get("PROGRAMDATA", r"C:\ProgramData").lower(),
(Path.home() / "AppData/Local/Programs/Python").as_posix().lower(),
(Path.home() / "AppData/Local/Microsoft").as_posix().lower(),
(Path.home() / "AppData/Local/Programs").as_posix().lower(),
r"c:\program files\dotnet".lower(),
r"c:\program files\windowsapps".lower(),
])
elif platform.system() == "Linux":
cls.SAFE_PATHS.extend(["/usr/lib","/usr/bin","/usr/share","/usr/local","/lib","/opt"])
elif platform.system() == "Darwin":
cls.SAFE_PATHS.extend(["/usr/lib","/usr/bin","/Library","/System","/Applications"])
# Filter empty
cls.SAFE_PATHS = [p for p in cls.SAFE_PATHS if p]
Config.init_safe_paths()
# ============================================================================
# DATABASE
# ============================================================================
class Database:
def __init__(self):
Config.BASE_DIR.mkdir(parents=True, exist_ok=True)
Config.QUARANTINE_DIR.mkdir(parents=True, exist_ok=True)
Config.LOG_DIR.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(Config.DB_PATH), check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.lock = threading.Lock()
self.conn.executescript("CREATE TABLE IF NOT EXISTS threats(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,threat_type TEXT,severity TEXT,description TEXT,file_path TEXT,process_name TEXT,process_pid INTEGER,action_taken TEXT,hash_sha256 TEXT);CREATE TABLE IF NOT EXISTS quarantine(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,original_path TEXT,quarantine_path TEXT,hash_sha256 TEXT,threat_type TEXT,file_size INTEGER);CREATE TABLE IF NOT EXISTS scans(id INTEGER PRIMARY KEY AUTOINCREMENT,timestamp TEXT,scan_type TEXT,target TEXT,files_scanned INTEGER,threats_found INTEGER,duration REAL);")
self.conn.commit()
def log_threat(self, **kw):
with self.lock:
self.conn.execute("INSERT INTO threats(timestamp,threat_type,severity,description,file_path,process_name,process_pid,action_taken,hash_sha256) VALUES(?,?,?,?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),kw.get("threat_type"),kw.get("severity"),kw.get("description"),kw.get("file_path"),kw.get("process_name"),kw.get("process_pid"),kw.get("action_taken"),kw.get("hash_sha256")))
self.conn.commit()
def get_threats(self, limit=50):
return [dict(r) for r in self.conn.execute("SELECT * FROM threats ORDER BY timestamp DESC LIMIT ?",(limit,)).fetchall()]
def get_threat_count(self):
return self.conn.execute("SELECT COUNT(*) as c FROM threats").fetchone()["c"]
def get_today_count(self):
return self.conn.execute("SELECT COUNT(*) as c FROM threats WHERE timestamp >= ?",(datetime.date.today().isoformat(),)).fetchone()["c"]
def log_scan(self, scan_type, target, files, threats, duration):
with self.lock:
self.conn.execute("INSERT INTO scans(timestamp,scan_type,target,files_scanned,threats_found,duration) VALUES(?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),scan_type,target,files,threats,duration))
self.conn.commit()
def log_quarantine(self, original, qpath, h, ttype, size):
with self.lock:
self.conn.execute("INSERT INTO quarantine(timestamp,original_path,quarantine_path,hash_sha256,threat_type,file_size) VALUES(?,?,?,?,?,?)",(datetime.datetime.now().isoformat(),original,qpath,h,ttype,size))
self.conn.commit()
def get_quarantined(self):
return [dict(r) for r in self.conn.execute("SELECT * FROM quarantine ORDER BY timestamp DESC").fetchall()]
# ============================================================================
# SCANNER - ZERO FALSE POSITIVES
# ============================================================================
def is_safe_path(filepath):
"""Check if file is in a trusted system directory."""
fp_lower = str(filepath).lower().replace("\\", "/")
for safe in Config.SAFE_PATHS:
safe_normalized = safe.replace("\\", "/")
if fp_lower.startswith(safe_normalized):
return True
return False
def is_legit_dotnet_dll(filename):
"""Check if a .dll file is a legitimate .NET/system library."""
name_lower = filename.lower()
# .NET naming convention: Namespace.SubNamespace.dll
# These are NEVER malware double-extensions
legit_patterns = [
".net.", ".core.", ".runtime.", ".compiler.", ".service.",
".client.", ".server.", ".library.", ".authentication.",
".authorization.", ".extensions.", ".configuration.",
".abstractions.", ".primitives.", ".interop.", ".resources.",
".drawing.", ".data.", ".web.", ".http.", ".security.",
".diagnostics.", ".collections.", ".threading.", ".io.",
".text.", ".linq.", ".xml.", ".json.", ".spatial.",
"system.", "microsoft.", "newtonsoft.", "nuget.",
]
for pattern in legit_patterns:
if pattern in name_lower:
return True
return False
class Scanner:
def __init__(self, db):
self.db = db
self._stop = False
def stop(self):
self._stop = True
def reset(self):
self._stop = False
def hash_file(self, path):
sha256 = hashlib.sha256()
try:
with open(path, "rb") as f:
while chunk := f.read(65536):
sha256.update(chunk)
return sha256.hexdigest()
except (PermissionError, OSError):
return None
def scan_file(self, filepath):
"""Scan a file - ONLY flags genuinely suspicious files, never system files."""
result = {"path": str(filepath), "clean": True, "threats": [], "hash": None}
try:
# RULE 1: Skip files in trusted system directories
if is_safe_path(filepath):
return result
size = os.path.getsize(filepath)
if size == 0 or size > Config.MAX_FILE_SIZE_MB * 1024 * 1024:
return result
filename = os.path.basename(filepath)
ext = os.path.splitext(filename)[1].lower()
# RULE 2: Only scan dangerous file types
# Don't scan .py, .txt, .dll in system folders, .json, etc.
if ext not in Config.DANGEROUS_EXTENSIONS:
# Exception: scan unknown files in user directories (Downloads, Desktop, Temp)
user_dirs = [
str(Path.home() / "Downloads").lower(),
str(Path.home() / "Desktop").lower(),
str(Path.home() / "Documents").lower(),
]
if platform.system() == "Windows":
user_dirs.append(str(Path.home() / "AppData/Local/Temp").lower())
else:
user_dirs.extend(["/tmp", "/dev/shm"])
in_user_dir = any(str(filepath).lower().startswith(d) for d in user_dirs)
if not in_user_dir:
return result
# RULE 3: Double extension - only flag if NOT a known .NET/system naming pattern
parts = filename.split(".")
if len(parts) >= 3:
real_ext = "." + parts[-1].lower()
if real_ext in Config.DANGEROUS_EXTENSIONS:
# Check if it's legit .NET naming
if not is_legit_dotnet_dll(filename):
# This IS suspicious - something.pdf.exe type trick
result["clean"] = False
result["threats"].append({
"type": "double_extension",
"severity": "CRITICAL",
"desc": f"DISGUISED FILE: appears as .{parts[-2]} but is actually {real_ext}"
})
# RULE 4: For executables, do deep content scan
if ext in Config.DANGEROUS_EXTENSIONS:
fhash = self.hash_file(filepath)
result["hash"] = fhash
with open(filepath, "rb") as f:
content = f.read(min(size, 2 * 1024 * 1024))
# Only flag COMBINATIONS of suspicious indicators (not single patterns)
# A single pattern like "cmd.exe /c" is normal in many tools
# We need MULTIPLE red flags together
red_flags = 0
descriptions = []
# Encoded PowerShell (almost always malicious)
if b"powershell" in content.lower() and b"-enc" in content.lower():
red_flags += 3
descriptions.append("Encoded PowerShell execution")
if b"powershell" in content.lower() and b"hidden" in content.lower() and b"bypass" in content.lower():
red_flags += 3
descriptions.append("Hidden PowerShell with bypass")
# Download and execute pattern (very suspicious)
if (b"DownloadString" in content or b"DownloadFile" in content) and b"Invoke" in content:
red_flags += 3
descriptions.append("Downloads and executes remote code")
if b"IEX(New-Object Net.WebClient)" in content:
red_flags += 4
descriptions.append("PowerShell download cradle")
# Registry persistence + network = RAT
if b"CurrentVersion\\Run" in content and (b"socket" in content.lower() or b"connect" in content.lower()):
red_flags += 3
descriptions.append("Registry persistence with network capability")
# Keylogging indicators
if b"GetAsyncKeyState" in content and b"SetWindowsHookEx" in content:
red_flags += 4
descriptions.append("Keylogger APIs detected")
# Process injection
if b"VirtualAllocEx" in content and b"WriteProcessMemory" in content and b"CreateRemoteThread" in content:
red_flags += 4
descriptions.append("Process injection technique")
# Screen/webcam capture + network
if (b"screenCapture" in content or b"webcamCapture" in content) and b"send" in content.lower():
red_flags += 3
descriptions.append("Screen/webcam capture with exfiltration")
# Ransomware indicators
if b"encrypt" in content.lower() and b"bitcoin" in content.lower():
red_flags += 4
descriptions.append("Ransomware indicators (encryption + bitcoin)")
if b"vssadmin" in content.lower() and b"delete" in content.lower() and b"shadows" in content.lower():
red_flags += 4
descriptions.append("Deletes shadow copies (ransomware)")
# Cryptominer
if b"stratum+tcp" in content.lower() or b"xmrig" in content.lower() or b"monero" in content.lower():
red_flags += 4
descriptions.append("Cryptocurrency miner detected")
# PE packed with high entropy (only flag if ALSO has other indicators)
if content[:2] == b"MZ" and red_flags >= 1:
freq = defaultdict(int)
for b in content[:4096]:
freq[b] += 1
entropy = -sum((c/4096)*math.log2(c/4096) for c in freq.values() if c > 0)
if entropy > 7.8:
red_flags += 2
descriptions.append(f"Packed/encrypted binary (entropy: {entropy:.1f})")
# VERDICT: Need 3+ red flags to confirm as malware
if red_flags >= 3:
result["clean"] = False
result["threats"].append({
"type": "malware",
"severity": "CRITICAL",
"desc": " | ".join(descriptions)
})
except (PermissionError, OSError):
pass
return result
def scan_directory(self, directory, progress=None):
self._stop = False
results = {"threats": [], "scanned": 0, "threat_count": 0}
start = time.time()
files = []
for root, dirs, fnames in os.walk(directory):
if self._stop:
break
dirs[:] = [d for d in dirs if d not in {".git","node_modules","__pycache__",".venv","venv",".moneypack_security"}]
for f in fnames:
files.append(os.path.join(root, f))
total = len(files)
for i, fp in enumerate(files):
if self._stop:
break
r = self.scan_file(fp)
results["scanned"] += 1
if not r["clean"]:
results["threats"].append(r)
results["threat_count"] += 1
for t in r["threats"]:
self.db.log_threat(threat_type=t["type"],severity=t["severity"],description=t["desc"],file_path=r["path"],hash_sha256=r["hash"])
if progress:
progress(i+1, total, results["threat_count"])
duration = time.time() - start
self.db.log_scan("directory", str(directory), results["scanned"], results["threat_count"], duration)
results["duration"] = round(duration, 2)
results["total"] = total
return results
# ============================================================================
# PROCESS MONITOR
# ============================================================================
class ProcessMonitor:
def __init__(self, db):
self.db = db
def get_suspicious(self):
suspicious = []
for proc in psutil.process_iter(['pid','name','exe','cmdline','username','memory_info']):
try:
info = proc.info
alerts = []
severity = "LOW"
name = (info.get('name') or '').lower()
exe = (info.get('exe') or '').lower()
cmd = ' '.join(info.get('cmdline') or []).lower()
for rat in Config.RAT_PROCESS_NAMES:
if rat in name or rat in exe:
alerts.append(f"Known RAT: {rat}")
severity = "CRITICAL"
patterns = [(r"powershell.*-enc","Encoded PowerShell"),(r"powershell.*hidden.*bypass","Hidden PowerShell bypass"),(r"certutil.*-decode","Certutil decode"),(r"mshta.*http","MSHTA remote execution"),(r"vssadmin.*delete.*shadows","Shadow copy deletion")]
for pat, desc in patterns:
if re.search(pat, cmd):
alerts.append(desc)
severity = "CRITICAL"
# Only flag temp-dir processes if they also have other indicators
if alerts and info.get('exe'):
temp = ["\\temp\\","\\appdata\\local\\temp\\","/tmp/","/dev/shm/"]
for t in temp:
if t in exe:
alerts.append("Running from temp directory")
break
try:
for conn in proc.net_connections():
if conn.status == 'ESTABLISHED' and conn.raddr:
if conn.raddr.port in Config.SUSPICIOUS_PORTS:
alerts.append(f"Connected to RAT port: {conn.raddr.ip}:{conn.raddr.port}")
severity = "CRITICAL"
except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
pass
if alerts:
mem = info.get('memory_info')
suspicious.append({"pid":info['pid'],"name":info.get('name','?'),"exe":info.get('exe') or 'N/A',"user":info.get('username','N/A'),"alerts":alerts,"severity":severity,"mem_mb":round(mem.rss/1048576,1) if mem else 0})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return suspicious
def kill(self, pid):
try:
p = psutil.Process(pid)
n = p.name()
p.terminate()
p.wait(timeout=5)
self.db.log_threat(threat_type="process_killed",severity="HIGH",description=f"Killed: {n} (PID {pid})",process_name=n,process_pid=pid,action_taken="terminated")
return True, f"Terminated {n} (PID {pid})"
except psutil.NoSuchProcess:
return False, "Process already gone"
except psutil.AccessDenied:
try:
psutil.Process(pid).kill()
return True, f"Force-killed PID {pid}"
except Exception as e:
return False, f"Access denied: {e}"
except Exception as e:
return False, str(e)
# ============================================================================
# NETWORK MONITOR
# ============================================================================
class NetworkMonitor:
def __init__(self, db):
self.db = db
def analyze(self):
suspicious = []
for c in psutil.net_connections(kind='inet'):
if not c.raddr:
continue
try:
proc = psutil.Process(c.pid) if c.pid else None
name = proc.name() if proc else "System"
except (psutil.NoSuchProcess, psutil.AccessDenied):
name = "Unknown"
alerts = []
if c.raddr.port in Config.SUSPICIOUS_PORTS:
alerts.append(f"Known RAT port: {c.raddr.port}")
if name.lower() in ("bash","sh","cmd.exe","powershell.exe") and c.status == "ESTABLISHED":
alerts.append("POSSIBLE REVERSE SHELL")
if alerts:
suspicious.append({"pid":c.pid or 0,"process":name,"remote":f"{c.raddr.ip}:{c.raddr.port}","alerts":alerts,"severity":"CRITICAL"})
return suspicious
# ============================================================================
# QUARANTINE
# ============================================================================
class Quarantine:
def __init__(self, db):
self.db = db
def quarantine_file(self, filepath):
fp = Path(filepath)
if not fp.exists():
return False, "File not found"
try:
h = hashlib.sha256(fp.read_bytes()).hexdigest()
size = fp.stat().st_size
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
qname = f"{ts}_{h[:12]}_{fp.name}.quarantined"
qpath = Config.QUARANTINE_DIR / qname
shutil.move(str(fp), str(qpath))
os.chmod(str(qpath), 0o000)
meta = {"original":str(fp),"hash":h,"time":datetime.datetime.now().isoformat(),"size":size}
(qpath.with_suffix(".quarantined.json")).write_text(json.dumps(meta,indent=2))
self.db.log_quarantine(str(fp),str(qpath),h,"malware",size)
return True, f"Quarantined: {fp.name}"
except Exception as e:
return False, str(e)
def delete_file(self, filepath):
"""Permanently delete a malicious file."""
fp = Path(filepath)
if not fp.exists():
return False, "File not found"
try:
os.chmod(str(fp), 0o777)
fp.unlink()
return True, f"DELETED: {fp.name}"
except Exception as e:
return False, str(e)
def list_all(self):
items = []
for f in Config.QUARANTINE_DIR.iterdir():
if f.suffix == ".quarantined":
meta_path = f.with_suffix(".quarantined.json")
meta = {}
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text())
except Exception:
pass
items.append({"name":f.name,"original":meta.get("original","?"),"time":meta.get("time","?")[:19],"size":meta.get("size",0)})
return items
# ============================================================================
# MAIN APP
# ============================================================================
class MoneyPackApp:
def __init__(self):
self.db = Database()
self.scanner = Scanner(self.db)
self.proc_mon = ProcessMonitor(self.db)
self.net_mon = NetworkMonitor(self.db)
self.quarantine = Quarantine(self.db)
def clear(self):
os.system('cls' if os.name == 'nt' else 'clear')
def show_banner(self):
self.clear()
console.print(MONEYPACK_LOGO)
def show_status_bar(self):
threats = self.db.get_today_count()
total = self.db.get_threat_count()
status = "[bold bright_green]* PROTECTED[/bold bright_green]" if threats == 0 else "[bold red]* THREATS ACTIVE[/bold red]"
bar = Table(box=box.SIMPLE_HEAVY,show_header=False,style="bright_cyan",border_style="bright_cyan",pad_edge=False)
bar.add_column(ratio=1)
bar.add_column(ratio=1)
bar.add_column(ratio=1)
bar.add_row(f" {status}",f"[bright_cyan]Threats Today:[/bright_cyan] [bold white]{threats}[/bold white]",f"[bright_cyan]Total Killed:[/bright_cyan] [bold white]{total}[/bold white]")
console.print(bar)
console.print()
def main_menu(self):
menu = Table(title=f"\n{MINI_LOGO} [dim]|[/dim] [bold bright_cyan]Command Center[/bold bright_cyan]",box=DOUBLE_EDGE,border_style="bright_cyan",title_style="bold",show_header=True,header_style="bold bright_green")
menu.add_column("Key",style="bold bright_green",width=6,justify="center")
menu.add_column("Action",style="bold white",width=28)
menu.add_column("Description",style="dim",width=45)
for k,a,d in [("1","Quick Scan","Scan Downloads, Desktop, Temp"),("2","Full Scan","Deep scan any directory"),("3","Process Hunter","Find & kill RATs/malware processes"),("4","Network Guard","Detect C2/reverse shells"),("5","Threat Log","View detected threats"),("6","Quarantine","View isolated files"),("7","Real-Time Guard","Continuous monitoring (Ctrl+C to stop)"),("8","System Status","Full security overview"),("0","Exit","Shutdown MoneyPack")]:
menu.add_row(k,a,d)
console.print(menu)
console.print()
def run(self):
while True:
self.show_banner()
self.show_status_bar()
self.main_menu()
choice = Prompt.ask("[bold bright_green]MoneyPack[/bold bright_green] [bold cyan]>[/bold cyan]",choices=["0","1","2","3","4","5","6","7","8"],default="1")
if choice == "0":
self.clear()
console.print(Panel("[bold bright_green]MoneyPack Security Suite[/bold bright_green]\n\n[bright_cyan]Stay safe. - MoneyPack[/bright_cyan]",border_style="bright_green",box=DOUBLE_EDGE))
break
elif choice == "1": self._quick_scan()
elif choice == "2": self._full_scan()
elif choice == "3": self._process_hunter()
elif choice == "4": self._network_guard()
elif choice == "5": self._threat_log()
elif choice == "6": self._quarantine_view()
elif choice == "7": self._realtime_guard()
elif choice == "8": self._system_status()
if choice != "0":
console.print()
Prompt.ask("[dim]Press Enter to continue[/dim]")
def _handle_threats(self, threats):
"""Show threats and offer to KILL/QUARANTINE them."""
if not threats:
return
console.print(f"\n[bold red]{'='*60}[/bold red]")
console.print(f"[bold red] {len(threats)} CONFIRMED THREAT(S) FOUND[/bold red]")
console.print(f"[bold red]{'='*60}[/bold red]\n")
table = Table(box=ROUNDED,border_style="red")
table.add_column("#",style="bold white",width=4)
table.add_column("File",style="bold white")
table.add_column("Threat",style="bold red")
table.add_column("Path",style="dim")
for i, t in enumerate(threats, 1):
desc = t["threats"][0]["desc"] if t["threats"] else "Unknown"
table.add_row(str(i), os.path.basename(t["path"]), desc[:40], t["path"][:50])
console.print(table)
console.print("\n[bold bright_cyan]What do you want to do?[/bold bright_cyan]")
console.print("[bold red] [K] KILL ALL[/bold red] - Delete every threat permanently")
console.print("[bold yellow] [Q] QUARANTINE ALL[/bold yellow] - Isolate (can restore later)")
console.print("[bold white] [S] SELECT[/bold white] - Choose one by number")
console.print("[dim] [N] Nothing - Leave them[/dim]")
action = Prompt.ask("\n[bold bright_green]MoneyPack[/bold bright_green] [bold cyan]action[/bold cyan]",choices=["k","q","s","n"],default="q")
if action == "k":
for t in threats:
ok, msg = self.quarantine.delete_file(t["path"])
if ok:
console.print(f" [bold red]KILLED:[/bold red] {os.path.basename(t['path'])}")
self.db.log_threat(threat_type="killed",severity="CRITICAL",description=f"Deleted: {t['path']}",file_path=t["path"],action_taken="deleted")
else:
console.print(f" [bold yellow]FAILED:[/bold yellow] {msg}")
console.print(f"\n[bold bright_green]All threats eliminated.[/bold bright_green]")
elif action == "q":
for t in threats:
ok, msg = self.quarantine.quarantine_file(t["path"])
if ok:
console.print(f" [bold yellow]QUARANTINED:[/bold yellow] {os.path.basename(t['path'])}")
else:
console.print(f" [bold red]FAILED:[/bold red] {msg}")
console.print(f"\n[bold bright_green]Threats isolated in quarantine vault.[/bold bright_green]")
elif action == "s":
idx = int(Prompt.ask("[bright_cyan]Enter threat number[/bright_cyan]")) - 1
if 0 <= idx < len(threats):
sub = Prompt.ask("[bright_cyan][K]ill or [Q]uarantine?[/bright_cyan]",choices=["k","q"],default="k")
t = threats[idx]
if sub == "k":
ok, msg = self.quarantine.delete_file(t["path"])
console.print(f" [bold red]{'KILLED' if ok else 'FAILED'}:[/bold red] {msg}")
else:
ok, msg = self.quarantine.quarantine_file(t["path"])
console.print(f" [bold yellow]{'QUARANTINED' if ok else 'FAILED'}:[/bold yellow] {msg}")
def _quick_scan(self):
self.clear()
console.print(Panel("[bold bright_green]Quick Scan[/bold bright_green]\n[dim]Scanning user directories only - zero false positives[/dim]",border_style="bright_green",box=DOUBLE_EDGE))
paths = [Path.home()/"Downloads",Path.home()/"Desktop",Path.home()/"Documents"]
if platform.system() == "Windows":
paths.append(Path.home()/"AppData/Local/Temp")
elif platform.system() == "Linux":
paths.extend([Path("/tmp"),Path("/dev/shm")])
all_threats = []
with Progress(SpinnerColumn("dots",style="bright_green"),TextColumn("[bold bright_cyan]{task.description}[/bold bright_cyan]"),BarColumn(bar_width=40,complete_style="bright_green"),TextColumn("[bright_green]{task.percentage:>3.0f}%[/bright_green]"),TimeElapsedColumn(),console=console) as progress:
for path in paths:
if path.exists():
task = progress.add_task(f"Scanning {path.name}...",total=100)
def cb(done,total,threats):
if total > 0:
progress.update(task,completed=int(done/total*100))
result = self.scanner.scan_directory(str(path),progress=cb)
progress.update(task,completed=100)
all_threats.extend(result["threats"])
console.print()
if not all_threats:
console.print(Panel("[bold bright_green]SYSTEM CLEAN[/bold bright_green]\n[bright_green]No threats found. You're safe.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE))
else:
self._handle_threats(all_threats)
def _full_scan(self):
self.clear()
console.print(Panel("[bold bright_cyan]Full Scan[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE))
target = Prompt.ask("[bright_cyan]Path to scan[/bright_cyan]",default=str(Path.home()))
if not os.path.exists(target):
console.print("[bold red]Path not found.[/bold red]")
return
console.print(f"[bright_cyan]Scanning:[/bright_cyan] {target}")
console.print(Rule(style="bright_cyan"))
with Progress(SpinnerColumn("dots12",style="bright_cyan"),TextColumn("[bold bright_cyan]{task.description}[/bold bright_cyan]"),BarColumn(bar_width=50,complete_style="bright_green"),TextColumn("[bright_green]{task.percentage:>3.0f}%[/bright_green]"),TextColumn("[dim]{task.fields[info]}[/dim]"),TimeElapsedColumn(),console=console) as progress:
task = progress.add_task("Scanning...",total=100,info="")
def cb(done,total,threats):
if total > 0:
progress.update(task,completed=int(done/total*100),info=f"{done}/{total} | {threats} threats")
self.scanner.reset()
result = self.scanner.scan_directory(target,progress=cb)
progress.update(task,completed=100,info="Done")
console.print()
console.print(f"[bright_cyan]Scanned:[/bright_cyan] {result['scanned']} files in {result['duration']}s")
if not result["threats"]:
console.print(Panel("[bold bright_green]ALL CLEAN - No threats found[/bold bright_green]",border_style="bright_green",box=DOUBLE_EDGE))
else:
self._handle_threats(result["threats"])
def _process_hunter(self):
self.clear()
console.print(Panel("[bold bright_cyan]Process Hunter[/bold bright_cyan]\n[dim]Finding RATs and malware processes...[/dim]",border_style="bright_cyan",box=DOUBLE_EDGE))
with console.status("[bright_cyan]Hunting...[/bright_cyan]",spinner="dots12"):
suspicious = self.proc_mon.get_suspicious()
time.sleep(1)
if not suspicious:
console.print(Panel("[bold bright_green]NO MALICIOUS PROCESSES[/bold bright_green]\n[bright_green]All processes are clean.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE))
return
console.print(f"\n[bold red]{len(suspicious)} THREAT(S) FOUND:[/bold red]\n")
for proc in suspicious:
color = "red" if proc["severity"]=="CRITICAL" else "yellow"
content = f"[white]PID:[/white] {proc['pid']} [white]User:[/white] {proc['user']} [white]Mem:[/white] {proc['mem_mb']}MB\n[white]Path:[/white] {proc['exe']}\n"
for a in proc["alerts"]:
content += f"\n [bold {color}]! {a}[/bold {color}]"
console.print(Panel(content,title=f"[bold {color}][{proc['severity']}] {proc['name']}[/bold {color}]",border_style=color,box=ROUNDED))
console.print()
if Confirm.ask("[bold red]KILL a process?[/bold red]"):
pid_str = Prompt.ask("[bright_cyan]Enter PID to kill[/bright_cyan]")
try:
ok, msg = self.proc_mon.kill(int(pid_str))
console.print(f"[bold {'bright_green' if ok else 'red'}]{msg}[/bold {'bright_green' if ok else 'red'}]")
except ValueError:
console.print("[red]Invalid PID[/red]")
def _network_guard(self):
self.clear()
console.print(Panel("[bold bright_cyan]Network Guard[/bold bright_cyan]\n[dim]Checking for C2 callbacks & reverse shells...[/dim]",border_style="bright_cyan",box=DOUBLE_EDGE))
with console.status("[bright_cyan]Analyzing...[/bright_cyan]",spinner="dots12"):
suspicious = self.net_mon.analyze()
time.sleep(0.5)
if not suspicious:
console.print(Panel("[bold bright_green]NETWORK CLEAN[/bold bright_green]\n[bright_green]No suspicious connections.[/bright_green]",border_style="bright_green",box=DOUBLE_EDGE))
else:
for c in suspicious:
console.print(Panel(f"[white]Process:[/white] {c['process']} (PID {c['pid']})\n[white]Remote:[/white] {c['remote']}\n[bold red]{' | '.join(c['alerts'])}[/bold red]",title="[bold red]CRITICAL[/bold red]",border_style="red",box=ROUNDED))
if Confirm.ask("\n[bold red]Kill the process?[/bold red]"):
pid_str = Prompt.ask("[bright_cyan]PID[/bright_cyan]")
try:
ok, msg = self.proc_mon.kill(int(pid_str))
console.print(f"[bold {'bright_green' if ok else 'red'}]{msg}[/bold {'bright_green' if ok else 'red'}]")
except ValueError:
console.print("[red]Invalid[/red]")
def _threat_log(self):
self.clear()
console.print(Panel("[bold bright_cyan]Threat Log[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE))
threats = self.db.get_threats(30)
if not threats:
console.print("[dim]No threats recorded.[/dim]")
return
table = Table(box=ROUNDED,border_style="bright_cyan")
table.add_column("Time",style="dim",width=19)
table.add_column("Severity",width=10)
table.add_column("Type",style="bright_cyan")
table.add_column("Description",style="white")
table.add_column("Action",style="bold bright_green")
for t in threats:
sev = t.get("severity","?")
color = "red" if sev=="CRITICAL" else "yellow" if sev=="HIGH" else "white"
table.add_row(t.get("timestamp","")[:19],f"[{color}]{sev}[/{color}]",t.get("threat_type",""),str(t.get("description",""))[:40],t.get("action_taken","detected"))
console.print(table)
def _quarantine_view(self):
self.clear()
console.print(Panel("[bold bright_cyan]Quarantine Vault[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE))
items = self.quarantine.list_all()
if not items:
console.print("[dim]Empty - no quarantined files.[/dim]")
return
table = Table(box=ROUNDED,border_style="bright_cyan")
table.add_column("#",width=4)
table.add_column("File",style="white")
table.add_column("Date",style="dim")
for i, item in enumerate(items, 1):
table.add_row(str(i),os.path.basename(item["original"]),item["time"])
console.print(table)
def _realtime_guard(self):
self.clear()
console.print(Panel("[bold bright_green]Real-Time Guard ACTIVE[/bold bright_green]\n[dim]Ctrl+C to stop[/dim]",border_style="bright_green",box=DOUBLE_EDGE))
try:
while True:
ts = datetime.datetime.now().strftime("%H:%M:%S")
sp = self.proc_mon.get_suspicious()
for p in sp:
console.print(f"[bold red][{ts}] THREAT: {p['name']} PID:{p['pid']} - {p['alerts'][0]}[/bold red]")
self.db.log_threat(threat_type="realtime",severity="CRITICAL",description=p["alerts"][0],process_name=p["name"],process_pid=p["pid"])
sn = self.net_mon.analyze()
for c in sn:
console.print(f"[bold red][{ts}] NETWORK: {c['process']} -> {c['remote']}[/bold red]")
if not sp and not sn:
console.print(f"[dim][{ts}] Clear[/dim]")
time.sleep(5)
except KeyboardInterrupt:
console.print("\n[bold bright_green]Guard stopped.[/bold bright_green]")
def _system_status(self):
self.clear()
console.print(Panel("[bold bright_cyan]System Status[/bold bright_cyan]",border_style="bright_cyan",box=DOUBLE_EDGE))
cpu = psutil.cpu_percent(interval=1)
mem = psutil.virtual_memory()
st = Table(box=ROUNDED,border_style="bright_cyan")
st.add_column("",style="bright_cyan",width=20)
st.add_column("",style="bold white")
st.add_row("OS",f"{platform.system()} {platform.release()}")
st.add_row("CPU",f"{'[red]' if cpu>80 else '[bright_green]'}{cpu}%")
st.add_row("RAM",f"{mem.percent}% ({mem.used//1024//1024//1024}/{mem.total//1024//1024//1024} GB)")
st.add_row("Processes",str(len(psutil.pids())))
st.add_row("Threats Today",f"[bold red]{self.db.get_today_count()}[/bold red]" if self.db.get_today_count() else "[bright_green]0[/bright_green]")
console.print(st)
console.print()
sp = self.proc_mon.get_suspicious()
sn = self.net_mon.analyze()
console.print(f" Process Security: {'[bold red]THREAT' if sp else '[bold bright_green]CLEAN'}[/bold {'red' if sp else 'bright_green'}]")
console.print(f" Network Security: {'[bold red]THREAT' if sn else '[bold bright_green]CLEAN'}[/bold {'red' if sn else 'bright_green'}]")
console.print()
console.print(Align.center(Text(f"MoneyPack Security v{Config.VERSION}",style="bold bright_green")))
if __name__ == "__main__":
try:
app = MoneyPackApp()
app.run()
except KeyboardInterrupt:
console.print("\n[bold bright_green]MoneyPack - Shutdown[/bold bright_green]")
except Exception as e:
console.print(f"[bold red]Error: {e}[/bold red]")
sys.exit(1)