lsb / app.py
gkc55's picture
Fix: added import os for environment variable access
eab12d2
from flask import Flask, render_template, request, jsonify
import torch
import torch.nn as nn
from PIL import Image
import numpy as np
import io
import os
import pickle
from scipy.stats import entropy
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 5 * 1024 * 1024
# --------------------
# Model definitions
# --------------------
class BinaryDNN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(196614, 384), nn.BatchNorm1d(384), nn.ReLU(), nn.Dropout(0.45),
nn.Linear(384, 192), nn.BatchNorm1d(192), nn.ReLU(), nn.Dropout(0.35),
nn.Linear(192, 96), nn.BatchNorm1d(96), nn.ReLU(), nn.Dropout(0.25),
nn.Linear(96, 2))
def forward(self, x): return self.net(x)
class MultiDNN(nn.Module):
def __init__(self):
super().__init__()
self.net = nn.Sequential(
nn.Linear(196614, 384), nn.BatchNorm1d(384), nn.ReLU(), nn.Dropout(0.45),
nn.Linear(384, 192), nn.BatchNorm1d(192), nn.ReLU(), nn.Dropout(0.35),
nn.Linear(192, 96), nn.BatchNorm1d(96), nn.ReLU(), nn.Dropout(0.25),
nn.Linear(96, 4))
def forward(self, x): return self.net(x)
# --------------------
# Load artifacts ONCE
# --------------------
with open('final_scaler.pkl', 'rb') as f:
scaler = pickle.load(f)
with open('final_bin_le.pkl', 'rb') as f:
binary_le = pickle.load(f)
with open('final_multi_le.pkl', 'rb') as f:
multi_le = pickle.load(f)
device = torch.device('cpu')
binary_model = BinaryDNN()
binary_model.load_state_dict(torch.load('final_bin_best.pth', map_location=device))
binary_model.eval()
multi_model = MultiDNN()
multi_model.load_state_dict(torch.load('final_multi_best.pth', map_location=device))
multi_model.eval()
# --------------------
# V3 Feature extraction
# --------------------
def extract_lsb_feature(img, k=2):
mask = (1 << k) - 1
return (img & mask).flatten()
def extract_randomness_features(lsb_feat):
chunk_size = 512
lsb_reshaped = lsb_feat.reshape(-1, chunk_size)
ent_values = []
for block in lsb_reshaped[::10]:
hist = np.bincount(block.astype(np.int32), minlength=4)
hist = hist / max(hist.sum(), 1)
ent = entropy(hist + 1e-10)
ent_values.append(ent)
mean_entropy = np.mean(ent_values)
sample = lsb_feat[:5000].astype(np.int32)
transitions = np.sum(np.abs(np.diff(sample)))
transition_rate = transitions / max(len(sample) - 1, 1)
hist_full = np.bincount(lsb_feat.astype(np.int32), minlength=4)
expected = len(lsb_feat) / 4
chi2_stat = np.sum((hist_full - expected) ** 2 / expected) if expected > 0 else 0
return np.array([mean_entropy, transition_rate, chi2_stat], dtype=np.float32)
def extract_url_pattern_features(lsb_feat):
chunk_size = 4
num_chunks = len(lsb_feat) // chunk_size
lsb_trimmed = lsb_feat[:num_chunks * chunk_size].astype(np.int32)
byte_values = []
for i in range(0, len(lsb_trimmed), chunk_size):
val = (lsb_trimmed[i] << 6) | (lsb_trimmed[i+1] << 4) | (lsb_trimmed[i+2] << 2) | lsb_trimmed[i+3]
byte_values.append(val)
byte_values = np.array(byte_values)
url_char_count = np.sum((byte_values >= 10) & (byte_values <= 15))
html_char_count = np.sum((byte_values >= 0) & (byte_values <= 5))
unique_ratio = len(np.unique(byte_values)) / max(len(byte_values), 1)
return np.array([
url_char_count / max(len(byte_values), 1),
html_char_count / max(len(byte_values), 1),
unique_ratio
], dtype=np.float32)
def extract_features(image, k=2):
image = image.resize((256, 256), Image.LANCZOS)
if image.mode != 'RGB':
image = image.convert('RGB')
img_array = np.array(image)
lsb_feat = extract_lsb_feature(img_array, k=k)
rand_feat = extract_randomness_features(lsb_feat)
url_feat = extract_url_pattern_features(lsb_feat)
combined = np.concatenate([lsb_feat.astype(np.float32), rand_feat, url_feat])
return combined
# --------------------
# Flask endpoints
# --------------------
@app.route('/')
def index():
return render_template('index.html')
@app.route('/predict', methods=['POST'])
def predict():
try:
file = request.files['file']
image = Image.open(io.BytesIO(file.read()))
features = extract_features(image)
features_normalized = scaler.transform(features.reshape(1, -1)).astype(np.float32)
features_tensor = torch.FloatTensor(features_normalized)
with torch.no_grad():
binary_output = binary_model(features_tensor)
binary_probs = torch.softmax(binary_output, dim=1).cpu().numpy()[0]
binary_pred_idx = int(np.argmax(binary_probs))
binary_class = binary_le.classes_[binary_pred_idx]
if binary_class == 'stego':
with torch.no_grad():
multi_output = multi_model(features_tensor)
multi_probs = torch.softmax(multi_output, dim=1).cpu().numpy()[0]
multi_pred_idx = int(np.argmax(multi_probs))
payload_type = multi_le.classes_[multi_pred_idx]
else:
payload_type = 'clean'
multi_probs = np.zeros(len(multi_le.classes_))
multi_probs[multi_le.transform(['clean'])[0]] = 1.0
result = {
'binary_prediction': {
'class': binary_class,
'confidence': float(binary_probs[binary_pred_idx]),
'probabilities': {c: float(binary_probs[binary_le.transform([c])[0]]) for c in binary_le.classes_}
},
'multiclass_prediction': {
'payload_type': payload_type,
'probabilities': {c: float(multi_probs[i]) for i, c in enumerate(multi_le.classes_)}
}
}
return jsonify(result)
except Exception as e:
import traceback
print('Error:', str(e))
traceback.print_exc()
return jsonify({'error': str(e)}), 500
@app.errorhandler(413)
def request_entity_too_large(error):
return jsonify({'error': 'File size exceeds 5MB'}), 413
if __name__ == "__main__":
port = int(os.environ.get("PORT", 7860))
app.run(debug=True, host="0.0.0.0", port=port)