Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| from transformers import pipeline | |
| import re | |
| import time | |
| # تحميل النموذج | |
| classifier = pipeline("zero-shot-classification", model="cross-encoder/nli-distilroberta-base") | |
| # عنوان التطبيق | |
| st.title("Keyword & URL Analysis App") | |
| # اختيار العملية | |
| operation = st.radio("Choose an operation:", ["Filter Keywords", "Analyze URLs"]) | |
| # إدخال الملف النصي | |
| uploaded_file = st.file_uploader("Upload a text file", type=["txt"]) | |
| if uploaded_file is not None: | |
| # قراءة الملف النصي | |
| content = uploaded_file.read().decode("utf-8") | |
| items = [line.strip() for line in content.splitlines() if line.strip()] | |
| # تحديد الفئات | |
| categories = ["shop", "game", "stream"] | |
| # قوائم لتخزين النتائج (Keywords) | |
| shopping_items = [] | |
| gaming_items = [] | |
| streaming_items = [] | |
| unknown_items = [] | |
| # قوائم لتحليل الروابط | |
| parameters = [] | |
| domains = [] | |
| full_page_types = [] | |
| file_extensions = [] | |
| # متغيرات للتحكم في العملية | |
| progress_bar = st.progress(0) | |
| pause_button = st.button("Pause") | |
| stop_button = st.button("Stop") | |
| continue_button = st.button("Continue") | |
| paused = False | |
| stopped = False | |
| current_index = 0 # مؤشر للعنصر الحالي | |
| batch_size = 10 # عدد العناصر التي يتم معالجتها في الدفعة الواحدة | |
| # دالة تصنيف الكلمات المفتاحية باستخدام الدفعات | |
| def classify_keywords_batch(items, categories, start_index=0): | |
| global paused, stopped, current_index | |
| total_items = len(items) | |
| for i in range(start_index, total_items, batch_size): | |
| if stopped: | |
| break | |
| if paused: | |
| time.sleep(0.5) | |
| continue | |
| # معالجة دفعة من العناصر | |
| batch = items[i:i + batch_size] | |
| results = classifier(batch, categories) | |
| for j, result in enumerate(results): | |
| best_category = result['labels'][0] | |
| score = result['scores'][0] | |
| if best_category == "shop" and score > 0.5: | |
| shopping_items.append(batch[j]) | |
| elif best_category == "game" and score > 0.5: | |
| gaming_items.append(batch[j]) | |
| elif best_category == "stream" and score > 0.5: | |
| streaming_items.append(batch[j]) | |
| else: | |
| unknown_items.append(batch[j]) | |
| # تحديث المؤشر الحالي | |
| current_index = min(i + batch_size, total_items) | |
| # تحديث شريط التقدم | |
| progress = min((current_index) / total_items, 1.0) | |
| progress_bar.progress(progress) | |
| # تحديث النتائج في الوقت الحقيقي | |
| update_results() | |
| # إبطاء العملية قليلاً للسماح بتحديث الواجهة | |
| time.sleep(0.1) | |
| # دالة تحليل الروابط | |
| def analyze_urls(urls, start_index=0): | |
| global paused, stopped, current_index | |
| total_items = len(urls) | |
| for i in range(start_index, total_items, batch_size): | |
| if stopped: | |
| break | |
| if paused: | |
| time.sleep(0.5) | |
| continue | |
| # معالجة دفعة من الروابط | |
| batch = urls[i:i + batch_size] | |
| for url in batch: | |
| # استخراج الباراميترات بدون '=' | |
| params = re.findall(r'(\w+)=', url) | |
| parameters.extend(params) | |
| # استخراج الدومينات فقط (مثل .com, .org) | |
| domain_match = re.search(r'\.([a-zA-Z]{2,})$', url) | |
| if domain_match: | |
| domain = domain_match.group(1) | |
| if domain not in domains: | |
| domains.append(domain) | |
| # استخراج أنماط الصفحات الكاملة (مثل product_detail.php?, index.php?) | |
| page_type_match = re.search(r'(\w+\.[a-z]+)\?', url) | |
| if page_type_match: | |
| page_type = page_type_match.group(1) | |
| if page_type not in full_page_types: | |
| full_page_types.append(page_type) | |
| # استخراج الصيغ (مثل php, phtml, asp) بدون باقي الرابط | |
| extension_match = re.search(r'\.([a-z]+)(\?|$)', url) | |
| if extension_match: | |
| extension = extension_match.group(1) | |
| if extension not in file_extensions: | |
| file_extensions.append(extension) | |
| # تحديث المؤشر الحالي | |
| current_index = min(i + batch_size, total_items) | |
| # تحديث شريط التقدم | |
| progress = min((current_index) / total_items, 1.0) | |
| progress_bar.progress(progress) | |
| # تحديث النتائج في الوقت الحقيقي | |
| update_results() | |
| # إبطاء العملية قليلاً للسماح بتحديث الواجهة | |
| time.sleep(0.1) | |
| # دالة تحديث النتائج | |
| def update_results(): | |
| # تحديث محتوى المربعات النصية للكلمات المفتاحية | |
| st.session_state.shopping_text = "\n".join(shopping_items) | |
| st.session_state.gaming_text = "\n".join(gaming_items) | |
| st.session_state.streaming_text = "\n".join(streaming_items) | |
| st.session_state.unknown_text = "\n".join(unknown_items) | |
| # تحديث محتوى المربعات الخاصة بالروابط | |
| st.session_state.parameters = "\n".join(set(parameters)) | |
| st.session_state.domains = "\n".join(domains) | |
| st.session_state.full_page_types = "\n".join(full_page_types) | |
| st.session_state.file_extensions = "\n".join(file_extensions) | |
| # دالة تصدير النتائج | |
| def export_results(key, filename): | |
| with open(filename, "w") as f: | |
| f.write(st.session_state[key]) | |
| st.success(f"Results exported to {filename}") | |
| # زر البدء | |
| if st.button("Start"): | |
| stopped = False | |
| paused = False | |
| current_index = 0 | |
| if operation == "Filter Keywords": | |
| classify_keywords_batch(items, categories, start_index=current_index) | |
| elif operation == "Analyze URLs": | |
| analyze_urls(items, start_index=current_index) | |
| # زر الإيقاف المؤقت | |
| if pause_button: | |
| paused = True | |
| st.write("Process paused.") | |
| # زر الاستمرار | |
| if continue_button and paused: | |
| paused = False | |
| st.write("Process resumed.") | |
| if operation == "Filter Keywords": | |
| classify_keywords_batch(items, categories, start_index=current_index) | |
| elif operation == "Analyze URLs": | |
| analyze_urls(items, start_index=current_index) | |
| # زر التوقف الكامل | |
| if stop_button: | |
| stopped = True | |
| st.write("Process stopped.") | |
| # عرض النتائج بناءً على الخيار المختار | |
| if operation == "Filter Keywords": | |
| # عرض النتائج للكلمات المفتاحية | |
| st.header("Shopping Keywords") | |
| if 'shopping_text' not in st.session_state: | |
| st.session_state.shopping_text = "" | |
| st.text_area("Copy the shopping keywords here:", value=st.session_state.shopping_text, height=200, key="shopping") | |
| st.button("Export Shopping Keywords", on_click=export_results, args=("shopping_text", "shopping_keywords.txt")) | |
| st.header("Gaming Keywords") | |
| if 'gaming_text' not in st.session_state: | |
| st.session_state.gaming_text = "" | |
| st.text_area("Copy the gaming keywords here:", value=st.session_state.gaming_text, height=200, key="gaming") | |
| st.button("Export Gaming Keywords", on_click=export_results, args=("gaming_text", "gaming_keywords.txt")) | |
| st.header("Streaming Keywords") | |
| if 'streaming_text' not in st.session_state: | |
| st.session_state.streaming_text = "" | |
| st.text_area("Copy the streaming keywords here:", value=st.session_state.streaming_text, height=200, key="streaming") | |
| st.button("Export Streaming Keywords", on_click=export_results, args=("streaming_text", "streaming_keywords.txt")) | |
| st.header("Unknown Keywords") | |
| if 'unknown_text' not in st.session_state: | |
| st.session_state.unknown_text = "" | |
| st.text_area("Copy the unknown keywords here:", value=st.session_state.unknown_text, height=200, key="unknown") | |
| st.button("Export Unknown Keywords", on_click=export_results, args=("unknown_text", "unknown_keywords.txt")) | |
| elif operation == "Analyze URLs": | |
| # عرض النتائج للروابط | |
| st.header("Parameters") | |
| if 'parameters' not in st.session_state: | |
| st.session_state.parameters = "" | |
| st.text_area("Copy the parameters here:", value=st.session_state.parameters, height=200, key="parameters") | |
| st.button("Export Parameters", on_click=export_results, args=("parameters", "parameters.txt")) | |
| st.header("Domains") | |
| if 'domains' not in st.session_state: | |
| st.session_state.domains = "" | |
| st.text_area("Copy the domains here:", value=st.session_state.domains, height=200, key="domains") | |
| st.button("Export Domains", on_click=export_results, args=("domains", "domains.txt")) | |
| st.header("Full PageType") | |
| if 'full_page_types' not in st.session_state: | |
| st.session_state.full_page_types = "" | |
| st.text_area("Copy the full page types here:", value=st.session_state.full_page_types, height=200, key="full_page_types") | |
| st.button("Export Full PageTypes", on_click=export_results, args=("full_page_types", "full_page_types.txt")) | |
| st.header("File Extensions") | |
| if 'file_extensions' not in st.session_state: | |
| st.session_state.file_extensions = "" | |
| st.text_area("Copy the file extensions here:", value=st.session_state.file_extensions, height=200, key="file_extensions") | |
| st.button("Export File Extensions", on_click=export_results, args=("file_extensions", "file_extensions.txt")) | |
| else: | |
| st.warning("Please upload a text file to start analysis.") |