import streamlit as st import os import io import json import csv import numpy as np import faiss import uuid import time import sys from typing import List, Dict, Any import base64 # === HuggingFace 模型相關套件 (替換為 InferenceClient) === try: from huggingface_hub import InferenceClient except ImportError: st.error("請檢查是否安裝了所有 Hugging Face 相關依賴:pip install huggingface-hub") # === LangChain/RAG 相關套件 (保持不變) === from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_core.documents import Document from langchain_community.vectorstores import FAISS from langchain_community.vectorstores.utils import DistanceStrategy from langchain_community.docstore.in_memory import InMemoryDocstore import pandas as pd # 嘗試匯入 pypdftry: try: import pypdf except ImportError: pypdf = None # --- 頁面設定 --- st.set_page_config(page_title="Cybersecurity AI Assistant (Hugging Face RAG & IP Correlated Analysis)", page_icon="🛡️", layout="wide") st.title("🛡️ LLM with FAISS RAG & IP Correlated Analysis (Inference Client)") st.markdown("已啟用:**IndexFlatIP** + **L2 正規化** + **Hugging Face Inference Client (API)**。支援 JSON/CSV/TXT/**W3C Log** 執行**IP 關聯批量分析**。") # --- Streamlit Session State 初始化 (修正強化,確保所有變數都有初始值) --- if 'execute_batch_analysis' not in st.session_state: st.session_state.execute_batch_analysis = False if 'batch_results' not in st.session_state: st.session_state.batch_results = [] # 確保初始化為空列表 if 'rag_current_file_key' not in st.session_state: st.session_state.rag_current_file_key = None if 'batch_current_file_key' not in st.session_state: st.session_state.batch_current_file_key = None if 'vector_store' not in st.session_state: st.session_state.vector_store = None if 'json_data_for_batch' not in st.session_state: st.session_state.json_data_for_batch = None # 保持 None,因為可能檔案沒上傳 # --- 定義模型列表 --- MODEL_OPTIONS = { "OpenAI GPT-OSS 20B (Hugging Face)": "openai/gpt-oss-20b", "secgpt (Hugging Face)": "clouditera/secgpt", "RNJ (Hugging Face)": "EssentialAI/rnj-1-instruct", "Meta Llama 3.1 8B Instruct (Hugging Face)": "meta-llama/Llama-3.1-8B-Instruct", "Meta Llama 3.3 70B Instruct (Hugging Face)": "meta-llama/Llama-3.3-70B-Instruct", "fdtn-ai Foundation-Sec 8B Instruct (Hugging Face)": "fdtn-ai/Foundation-Sec-8B-Instruct", "Qwen (Hugging Face)": "Qwen/Qwen2.5-7B-Instruct", "Gemma 3 27B Instruct (Hugging Face)": "google/gemma-3-27b-it", "Kimi K2 Instruct (Hugging Face)": "moonshotai/Kimi-K2-Instruct-0905" } WINDOW_SIZE = 10 # 關聯 Log 的最大數量 (包含當前 Log) # === W3C Log 專屬解析器 (新增) === def parse_w3c_log(log_content: str) -> List[Dict[str, Any]]: """ 解析 W3C Extended Log File Format (如 IIS Log),包括提取 #Fields:。 Args: log_content (str): Log 檔案的字串內容。 Returns: List[Dict[str, Any]]: 轉換後的 JSON 物件列表。 """ lines = log_content.splitlines() field_names = None data_lines = [] for line in lines: line = line.strip() if not line: continue if line.startswith("#Fields:"): # 找到欄位定義,例如 "#Fields: date time s-ip cs-method ..." field_names = line.split()[1:] # 跳過 "#Fields:" 本身 elif not line.startswith("#"): # 這是實際的資料行 data_lines.append(line) if not field_names: # 如果沒有找到 #Fields,則退回到原始 Log 條目模式 # st.warning("未檢測到 W3C Log 的 #Fields: 標頭,退回原始 Log 條目模式。") return [{"raw_log_entry": line} for line in lines if line.strip() and not line.startswith("#")] json_data = [] # 定義需要轉換為數字的欄位名稱 (可根據您的需求擴充,使用底線版本) numeric_fields = ['sc_status', 'time_taken', 'bytes', 'resp_len', 'req_size'] for data_line in data_lines: # W3C Log 預設使用空格分隔。這裡使用 split() values = data_line.split(' ') # 簡易的欄位數量檢查 if len(values) != len(field_names): # 如果欄位數量不匹配,將該行視為原始 Log 條目 json_data.append({"raw_log_entry": data_line}) continue record = {} for key, value in zip(field_names, values): # 將 W3C 欄位名稱中的 '-' 替換成 Python 友好的 '_' key = key.strip().replace('-', '_') value = value.strip() if value else "" # 處理數字轉換 if key in numeric_fields: try: record[key] = int(value) except ValueError: try: record[key] = float(value) except ValueError: record[key] = value else: record[key] = value if record: json_data.append(record) return json_data # === 核心檔案轉換函式 (CSV/TXT -> JSON List) (保留並微調) === def convert_csv_txt_to_json_list(file_content: bytes, file_type: str) -> List[Dict[str, Any]]: """ 將 CSV 或 TXT 檔案內容 (假定為 CSV 格式,含標頭) 轉換為 JSON 物件列表。 """ log_content = file_content.decode("utf-8").strip() if not log_content: return [] string_io = io.StringIO(log_content) # 嘗試使用 csv.DictReader 自動將第一行視為 Key try: reader = csv.DictReader(string_io) except Exception: # 如果失敗,退回每行一個原始 Log 條目 return [{"raw_log_entry": line.strip()} for line in log_content.splitlines() if line.strip()] json_data = [] if reader and reader.fieldnames: # 使用者可能使用的數值欄位名稱 numeric_fields = ['sc-status', 'time-taken', 'bytes', 'resp-len', 'req-size', 'status_code', 'size', 'duration'] for row in reader: record = {} for key, value in row.items(): if key is None: continue key = key.strip() value = value.strip() if value else "" # 處理數字轉換 if key in numeric_fields: try: record[key] = int(value) except ValueError: try: record[key] = float(value) except ValueError: record[key] = value else: record[key] = value if record: json_data.append(record) # 再次檢查是否為空,如果是空,可能不是標準 CSV/JSON if not json_data: string_io.seek(0) lines = string_io.readlines() return [{"raw_log_entry": line.strip()} for line in lines if line.strip()] return json_data # === 檔案類型分發器 (已修改) === def convert_uploaded_file_to_json_list(uploaded_file) -> List[Dict[str, Any]]: """根據檔案類型,將上傳的檔案內容轉換為 Log JSON 列表。""" file_bytes = uploaded_file.getvalue() file_name_lower = uploaded_file.name.lower() # --- Case 1: JSON --- if file_name_lower.endswith('.json'): stringio = io.StringIO(file_bytes.decode("utf-8")) parsed_data = json.load(stringio) if isinstance(parsed_data, dict): # 處理包裹在 'alerts' 或 'logs' 鍵中的列表 if 'alerts' in parsed_data and isinstance(parsed_data['alerts'], list): return parsed_data['alerts'] elif 'logs' in parsed_data and isinstance(parsed_data['logs'], list): return parsed_data['logs'] else: return [parsed_data] # 單一字典視為單一 Log elif isinstance(parsed_data, list): return parsed_data # 列表直接返回 else: raise ValueError("JSON 檔案格式不支援 (非 List 或 Dict)。") # --- Case 2, 3, & 4: CSV/TXT/LOG --- elif file_name_lower.endswith(('.csv', '.txt', '.log')): file_type = 'csv' if file_name_lower.endswith('.csv') else ('log' if file_name_lower.endswith('.log') else 'txt') if file_type == 'log': # 針對 .log 檔案,嘗試使用 W3C 解析器 log_content = file_bytes.decode("utf-8").strip() if not log_content: return [] return parse_w3c_log(log_content) else: # CSV 和 TXT 保持使用原來的 csv.DictReader 邏輯 return convert_csv_txt_to_json_list(file_bytes, file_type) else: raise ValueError("不支援的檔案類型。") # --- 側邊欄設定 (已更新 'type' 參數) --- with st.sidebar: st.header("⚙️ 設定") # --- 新增模型選單 --- selected_model_name = st.selectbox( "選擇 LLM 模型", list(MODEL_OPTIONS.keys()), index=0 # 預設選擇第一個 ) MODEL_ID = MODEL_OPTIONS[selected_model_name] # 更新 MODEL_ID if not os.environ.get("HF_TOKEN"): st.error("環境變數 **HF_TOKEN** 未設定。請設定後重新啟動應用程式。") st.info(f"LLM 模型:**{MODEL_ID}** (Hugging Face Inference API)") st.warning("⚠️ **注意**: 該模型使用 Inference API 呼叫,請確保您的 HF Token 具有存取權限。") st.divider() st.subheader("📂 檔案上傳") # === 1. 批量分析檔案 (支援多種格式) === batch_uploaded_file = st.file_uploader( "1️⃣ 上傳 **Log 檔案** (用於批量分析)", type=['json', 'csv', 'txt', 'log'], # <--- 這裡增加了 'log' key="batch_uploader", help="支援 JSON (Array), CSV (含標題), TXT/LOG (視為 W3C 或一般 Log)" ) # === 2. RAG 知識庫檔案 === rag_uploaded_file = st.file_uploader( "2️⃣ 上傳 **RAG 參考知識庫** (Logs/PDF/Code 等)", type=['txt', 'py', 'log', 'csv', 'md', 'pdf'], # <--- 這裡增加了 'log' key="rag_uploader" ) st.divider() st.subheader("💡 批量分析指令") analysis_prompt = st.text_area( "針對每個 Log 執行的指令", value="You are a security expert tasked with analyzing logs related to Initial Access, Establish Foothold & Reconnaissance, Lateral Movement, Targeting & Data Exfiltration, Malware Deployment & Execution and Ransom & Negotiation. Respond with a clear, structured analysis using the following mandatory sections: \n\n- Priority: Provide the overall priority level. (Answer High-risk detected!, Medium-risk detected!, or Normal-Behavior detected! only) \n- Explanation: If this log is not normal behavior, explain the potential impact and why this specific log requires attention. If not, **omit the explanation section**. \n- Action Plan: If this log is not normal behavior, What should be the immediate steps to address this specific log? If not, **omit the action plan section**.", height=200 ) st.markdown("此指令將對檔案中的**每一個 Log 條目**執行一次獨立分析 (使用 **IP 關聯視窗**)。") if batch_uploaded_file: if st.button("🚀 執行批量分析"): if not os.environ.get("HF_TOKEN"): st.error("無法執行,環境變數 **HF_TOKEN** 未設定。") else: # 確保檔案已解析成功 if st.session_state.get('json_data_for_batch'): st.session_state.execute_batch_analysis = True else: st.error("請先等待 Log 檔案解析完成。") else: st.info("請上傳 Log 檔案以啟用批量分析按鈕。") st.divider() st.subheader("🔍 RAG 檢索設定") similarity_threshold = st.slider("📐 Cosine Similarity 門檻", 0.0, 1.0, 0.4, 0.01) st.divider() st.subheader("模型參數") system_prompt = st.text_area("System Prompt", value="You are a Senior Security Analyst, named Ernest. You provide expert, authoritative, and concise advice on Information Security. Your analysis must be based strictly on the provided context.", height=100) max_output_tokens = st.slider("Max Output Tokens", 128, 4096, 2048, 128) temperature = st.slider("Temperature", 0.0, 1.0, 0.1, 0.1) top_p = st.slider("Top P", 0.1, 1.0, 0.95, 0.05) st.divider() if st.button("🗑️ 清除所有紀錄"): # 僅清除動態狀態,保留 HF_TOKEN for key in list(st.session_state.keys()): if key not in ['HF_TOKEN']: del st.session_state[key] st.rerun() # --- 初始化 Hugging Face LLM Client (已更新,MODEL_ID 作為參數) --- # 確保 load_inference_client 接受 model_id 作為參數,以利用 Streamlit 的快取機制。 @st.cache_resource def load_inference_client(model_id): if not os.environ.get("HF_TOKEN"): return None try: client = InferenceClient(model_id, token=os.environ.get("HF_TOKEN")) st.success(f"Hugging Face Inference Client **{model_id}** 載入成功。") return client except Exception as e: st.error(f"Hugging Face Inference Client 載入失敗: {e}") return None inference_client = None if os.environ.get("HF_TOKEN"): with st.spinner(f"正在連線到 Inference Client: {MODEL_ID}..."): # 傳遞 MODEL_ID inference_client = load_inference_client(MODEL_ID) if inference_client is None and os.environ.get("HF_TOKEN"): st.warning(f"Hugging Face Inference Client **{MODEL_ID}** 無法連線。") elif not os.environ.get("HF_TOKEN"): st.error("請在環境變數中設定 HF_TOKEN。") # === Embedding 模型 (保持不變) === @st.cache_resource def load_embedding_model(): model_kwargs = {'device': 'cpu', 'trust_remote_code': True} encode_kwargs = {'normalize_embeddings': False} return HuggingFaceEmbeddings(model_name="BAAI/bge-large-zh-v1.5", model_kwargs=model_kwargs, encode_kwargs=encode_kwargs) with st.spinner("正在載入 Embedding 模型..."): embedding_model = load_embedding_model() # === 建立向量庫 / Search 函數 (保持不變) === def process_file_to_faiss(uploaded_file): text_content = "" try: if uploaded_file.type == "application/pdf": if pypdf: pdf_reader = pypdf.PdfReader(uploaded_file) for page in pdf_reader.pages: text_content += page.extract_text() + "\n" else: return None, "PDF library missing" else: stringio = io.StringIO(uploaded_file.getvalue().decode("utf-8")) text_content = stringio.read() if not text_content.strip(): return None, "File is empty" # 這裡將文件內容按行分割為 Document,每行一個 Document events = [line for line in text_content.splitlines() if line.strip()] docs = [Document(page_content=e) for e in events] if not docs: return None, "No documents created" # 進行 Embedding 和 FAISS 初始化 (IndexFlatIP + L2 normalization) embeddings = embedding_model.embed_documents([d.page_content for d in docs]) embeddings_np = np.array(embeddings).astype("float32") faiss.normalize_L2(embeddings_np) dimension = embeddings_np.shape[1] index = faiss.IndexFlatIP(dimension) # 使用內積 (Inner Product) index.add(embeddings_np) doc_ids = [str(uuid.uuid4()) for _ in range(len(docs))] docstore = InMemoryDocstore({_id: doc for _id, doc in zip(doc_ids, docs)}) index_to_docstore_id = {i: _id for i, _id in enumerate(doc_ids)} # 使用 Cosine 距離策略,配合 IndexFlatIP 和 L2 normalization 達到 Cosine Similarity vector_store = FAISS(embedding_function=embedding_model, index=index, docstore=docstore, index_to_docstore_id=index_to_docstore_id, distance_strategy=DistanceStrategy.COSINE) return vector_store, f"{len(docs)} chunks created." except Exception as e: return None, f"Error: {str(e)}" def faiss_cosine_search_all(vector_store, query, threshold): q_emb = embedding_model.embed_query(query) q_emb = np.array([q_emb]).astype("float32") faiss.normalize_L2(q_emb) index = vector_store.index D, I = index.search(q_emb, k=index.ntotal) selected = [] # Cosine Similarity = D (IndexFlatIP + L2 normalization) for score, idx in zip(D[0], I[0]): if idx == -1: continue # 檢測相似度是否超過門檻 if score >= threshold: doc_id = vector_store.index_to_docstore_id[idx] doc = vector_store.docstore.search(doc_id) selected.append((doc, score)) selected.sort(key=lambda x: x[1], reverse=True) return selected # === Hugging Face 生成單一 Log 分析回答 (保持不變) === def generate_rag_response_hf_for_log(client, model_id, log_sequence_text, user_prompt, sys_prompt, vector_store, threshold, max_output_tokens, temperature, top_p): if client is None: return "ERROR: Client Error", "" context_text = "" # RAG 檢索邏輯 if vector_store: selected = faiss_cosine_search_all(vector_store, log_sequence_text, threshold) if selected: # 只取前 5 個最相關的片段 retrieved_contents = [f"--- Reference Chunk (sim={score:.3f}) ---\n{doc.page_content}" for i, (doc, score) in enumerate(selected[:5])] context_text = "\n".join(retrieved_contents) rag_instruction = f"""=== RETRIEVED REFERENCE CONTEXT (Cosine ≥ {threshold}) ==={context_text if context_text else 'No relevant reference context found.'}=== END REFERENCE CONTEXT ===ANALYSIS INSTRUCTION: {user_prompt}Based on the provided LOG SEQUENCE and REFERENCE CONTEXT, you must analyze the **entire sequence** to detect any continuous attack chains or evolving threats.""" log_content_section = f"""=== CURRENT LOG SEQUENCE TO ANALYZE (Window Size: Max {WINDOW_SIZE} logs associated by IP) ==={log_sequence_text}=== END LOG SEQUENCE ===""" messages = [ {"role": "system", "content": sys_prompt}, {"role": "user", "content": f"{rag_instruction}\n\n{log_content_section}"} ] try: # 使用 chat_completion 進行模型呼叫 response_stream = client.chat_completion( messages, max_tokens=max_output_tokens, temperature=temperature, top_p=top_p, stream=False # 這裡使用非串流 ) if response_stream and response_stream.choices: return response_stream.choices[0].message.content.strip(), context_text else: return "Format Error: Model returned empty response or invalid format.", context_text except Exception as e: return f"Model Error: {str(e)}", context_text # ======================================================================= # === 檔案處理區塊 (RAG 檔案) - 保持不變 === if rag_uploaded_file: file_key = f"vs_{rag_uploaded_file.name}_{rag_uploaded_file.size}" if st.session_state.rag_current_file_key != file_key or 'vector_store' not in st.session_state: # 清除舊的 vector store 以節省內存 if 'vector_store' in st.session_state: del st.session_state.vector_store with st.spinner(f"正在建立 RAG 參考知識庫 ({rag_uploaded_file.name})..."): vs, msg = process_file_to_faiss(rag_uploaded_file) if vs: st.session_state.vector_store = vs st.session_state.rag_current_file_key = file_key st.toast(f"RAG 參考知識庫已更新!{msg}", icon="✅") else: st.session_state.rag_current_file_key = None st.error(msg) elif 'vector_store' in st.session_state: del st.session_state.vector_store del st.session_state.rag_current_file_key st.info("目前沒有 RAG 檔案,你可以上傳 RAG 作為參考知識庫。") # === 檔案處理區塊 (批量分析檔案 - **已更新** ) === if batch_uploaded_file: batch_file_key = f"batch_{batch_uploaded_file.name}_{batch_uploaded_file.size}" if st.session_state.batch_current_file_key != batch_file_key or 'json_data_for_batch' not in st.session_state: try: # 清除舊的數據 if 'json_data_for_batch' in st.session_state: del st.session_state.json_data_for_batch if 'batch_results' in st.session_state: del st.session_state.batch_results # 使用新的統一解析函式 parsed_data = convert_uploaded_file_to_json_list(batch_uploaded_file) if not parsed_data: raise ValueError(f"{batch_uploaded_file.name} 檔案載入失敗或內容為空。") # 儲存處理後的數據 st.session_state.json_data_for_batch = parsed_data st.session_state.batch_current_file_key = batch_file_key st.toast(f"檔案已解析並轉換為 {len(parsed_data)} 個 Log 條目。", icon="✅") except Exception as e: st.error(f"檔案解析錯誤: {e}") if 'json_data_for_batch' in st.session_state: del st.session_state.json_data_for_batch st.session_state.batch_current_file_key = None # 設置為 None 避免錯誤的 Key elif 'json_data_for_batch' in st.session_state: # 檔案被移除,清除相關數據 del st.session_state.json_data_for_batch del st.session_state.batch_current_file_key # 確保 batch_results 被清除,避免 'NoneType' 錯誤 if "batch_results" in st.session_state: del st.session_state.batch_results st.info("目前沒有批量分析檔案,請上傳日誌檔案以分析結果。") # === 執行批量分析邏輯 (已修改為 IP 關聯視窗) === if st.session_state.execute_batch_analysis and 'json_data_for_batch' in st.session_state and st.session_state.json_data_for_batch is not None: st.session_state.execute_batch_analysis = False start_time = time.time() # 這裡必須確保 st.session_state.batch_results 是 List,而不是 None if 'batch_results' not in st.session_state or st.session_state.batch_results is None: st.session_state.batch_results = [] st.session_state.batch_results = [] if inference_client is None: st.error("Client 未連線,無法執行。") else: logs_list = st.session_state.json_data_for_batch if logs_list: vs = st.session_state.get("vector_store", None) # 將 Log 條目轉換為 JSON 字串,用於 LLM 輸入 formatted_logs = [json.dumps(log, indent=2, ensure_ascii=False) for log in logs_list] analysis_sequences = [] # --- 核心修改:基於 IP 關聯的 Log Sequence 建構 --- for i in range(len(formatted_logs)): current_log_entry = logs_list[i] current_log_str = formatted_logs[i] # 嘗試從當前 Log 條目中提取 IP 地址 (優先 W3C 格式,然後是一般日誌格式) # 使用者可以根據自己的日誌格式調整這裡的 Key target_ip = current_log_entry.get('c_ip') or current_log_entry.get('c-ip') or current_log_entry.get('remote_addr') or current_log_entry.get('source_ip') sequence_text = [] correlated_logs = [] if target_ip and target_ip != "-": # 假設 '-' 是 W3C 中的空值 # 篩選過去的 Log,最多 WINDOW_SIZE - 1 個,且 IP 必須匹配 # 從 i-1 倒序檢查到 0 for j in range(i - 1, -1, -1): prior_log_entry = logs_list[j] prior_ip = prior_log_entry.get('c_ip') or prior_log_entry.get('c-ip') or prior_log_entry.get('remote_addr') or prior_log_entry.get('source_ip') # 檢查 IP 是否匹配 if prior_ip == target_ip: # 插入到最前面,保持時間順序 correlated_logs.insert(0, formatted_logs[j]) # 限制累積的 Log 數量(不包含當前 Log) if len(correlated_logs) >= WINDOW_SIZE - 1: break # 1. 加入相關聯的 Log (時間較早的) for j, log_str in enumerate(correlated_logs): # log_idx 是這些 Log 在 logs_list 中的原始索引 (不完全準確,但提供參考) sequence_text.append(f"--- Correlated Log Index (IP:{target_ip}) ---\n{log_str}") else: # 如果沒有找到 IP,只分析當前 Log (確保 sequence_text 不是空的) st.warning(f"Log #{i+1} 找不到 IP 欄位 ({target_ip}),僅分析當前 Log 條目。") # 2. 加入當前的目標 Log sequence_text.append(f"--- TARGET LOG TO ANALYZE (Index {i+1}) ---\n{current_log_str}") analysis_sequences.append({ "sequence_text": "\n\n".join(sequence_text), "target_log_id": i + 1, "original_log_entry": logs_list[i] }) # --- LLM 執行迴圈 --- total_sequences = len(analysis_sequences) st.header(f"⚡ 批量分析執行中 (IP 關聯視窗 $N={WINDOW_SIZE}$)...") progress_bar = st.progress(0, text=f"準備處理 {total_sequences} 個序列...") results_container = st.container() for i, seq_data in enumerate(analysis_sequences): log_id = seq_data["target_log_id"] progress_bar.progress((i + 1) / total_sequences, text=f"Processing {i + 1}/{total_sequences} (Log #{log_id})...") try: response, retrieved_ctx = generate_rag_response_hf_for_log( client=inference_client, model_id=MODEL_ID, log_sequence_text=seq_data["sequence_text"], user_prompt=analysis_prompt, sys_prompt=system_prompt, vector_store=vs, threshold=similarity_threshold, max_output_tokens=max_output_tokens, temperature=temperature, top_p=top_p ) item = { "log_id": log_id, "log_content": seq_data["original_log_entry"], "sequence_analyzed": seq_data["sequence_text"], "analysis_result": response, "context": retrieved_ctx } st.session_state.batch_results.append(item) csv_string = pd.json_normalize(item) csv_string = csv_string.to_csv(header=True, index=False, encoding='utf-8') with results_container: # 呈現 LLM 分析結果 is_high = any(x in response.lower() for x in ['high-risk detected!']) is_medium = any(x in response.lower() for x in ['medium-risk detected!']) if is_high: b64 = base64.b64encode(csv_string.encode('utf-8')) st.markdown( f"""