hoghoghi2 / src /app.py
Really-amin's picture
Update src/app.py
e9ab547 verified
#!/usr/bin/env python3
"""
Iran Legal Information Dashboard - Hugging Face Spaces Version
============================================================
Complete Legal Document Management System with OCR, AI Analysis, and Automated Web Scraping
Optimized for Hugging Face Spaces deployment
"""
import streamlit as st
import pandas as pd
import plotly.express as px
import sqlite3
import os
import tempfile
import io
import json
import hashlib
import logging
import time
import re
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from urllib.parse import urlparse, urljoin
from contextlib import contextmanager
import requests
from bs4 import BeautifulSoup
from PIL import Image
import fitz # PyMuPDF
import cv2
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Page configuration
st.set_page_config(
page_title="داشبورد اطلاعات حقوقی ایران",
page_icon="⚖️",
layout="wide",
initial_sidebar_state="expanded"
)
# Enhanced CSS with Persian Typography and Fixed Sidebar
def load_css():
st.markdown("""
<style>
@import url('https://fonts.googleapis.com/css2?family=Vazirmatn:wght@300;400;500;600;700;800&display=swap');
@import url('https://fonts.googleapis.com/css2?family=Yekan+Bakh:wght@300;400;500;600;700&display=swap');
:root {
--primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
--secondary-gradient: linear-gradient(135deg, #f8fafc 0%, #e2e8f0 100%);
--text-primary: #1a202c;
--text-secondary: #4a5568;
--text-light: #718096;
--white: #ffffff;
--shadow-light: 0 4px 6px rgba(0, 0, 0, 0.05);
--shadow-medium: 0 10px 25px rgba(0, 0, 0, 0.1);
--shadow-heavy: 0 20px 40px rgba(102, 126, 234, 0.2);
--border-radius: 16px;
--transition: all 0.3s cubic-bezier(0.4, 0, 0.2, 1);
--sidebar-width: 280px;
}
* {
font-family: 'Vazirmatn', 'Yekan Bakh', 'Tahoma', 'Arial', sans-serif !important;
font-feature-settings: "kern" 1, "liga" 1;
text-rendering: optimizeLegibility;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
.main {
direction: rtl;
text-align: right;
background: var(--secondary-gradient);
min-height: 100vh;
padding: 1.5rem 1.5rem 1.5rem var(--sidebar-width);
line-height: 1.8;
font-weight: 400;
transition: var(--transition);
}
.css-1d391kg {
position: fixed !important;
top: 0;
left: 0;
height: 100vh;
width: var(--sidebar-width);
background: var(--primary-gradient) !important;
z-index: 999999;
overflow-y: auto;
box-shadow: 4px 0 20px rgba(0, 0, 0, 0.15);
border-right: 1px solid rgba(255, 255, 255, 0.1);
backdrop-filter: blur(20px);
}
.css-1d391kg .css-17eq0hr {
background: transparent !important;
padding: 0 !important;
}
.sidebar .sidebar-content {
background: transparent !important;
color: white !important;
padding: 2rem 1.5rem;
height: 100%;
overflow-y: auto;
}
.css-1d391kg .stRadio > div {
background: transparent !important;
}
.css-1d391kg .stRadio label {
background: rgba(255, 255, 255, 0.1) !important;
color: white !important;
border: 1px solid rgba(255, 255, 255, 0.2) !important;
border-radius: 12px !important;
padding: 0.8rem 1.2rem !important;
margin: 0.3rem 0 !important;
font-weight: 500 !important;
font-size: 0.95rem !important;
text-align: right !important;
direction: rtl !important;
transition: all 0.3s ease !important;
backdrop-filter: blur(10px) !important;
cursor: pointer !important;
display: block !important;
width: 100% !important;
}
.css-1d391kg .stRadio label:hover {
background: rgba(255, 255, 255, 0.2) !important;
transform: translateX(-5px) !important;
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.2) !important;
}
.css-1d391kg .stRadio input[type="radio"]:checked + label {
background: rgba(255, 255, 255, 0.25) !important;
border-color: rgba(255, 255, 255, 0.4) !important;
box-shadow: 0 0 0 2px rgba(255, 255, 255, 0.3) !important;
font-weight: 600 !important;
}
.css-1d391kg .stRadio input[type="radio"] {
display: none !important;
}
.css-1d391kg .stMarkdown, .css-1d391kg h1, .css-1d391kg h2, .css-1d391kg h3,
.css-1d391kg p, .css-1d391kg div {
color: white !important;
text-align: right !important;
direction: rtl !important;
}
.css-1d391kg .stMarkdown h3 {
font-size: 1.1rem !important;
font-weight: 600 !important;
margin: 1.5rem 0 1rem 0 !important;
text-align: center !important;
border-bottom: 2px solid rgba(255, 255, 255, 0.2) !important;
padding-bottom: 0.8rem !important;
}
.sidebar-header {
text-align: center;
padding: 1.5rem 1rem;
border-bottom: 2px solid rgba(255, 255, 255, 0.15);
margin-bottom: 2rem;
}
.sidebar-header h2 {
color: white !important;
font-size: 1.4rem !important;
font-weight: 700 !important;
margin: 0 !important;
line-height: 1.4 !important;
text-shadow: 2px 2px 4px rgba(0, 0, 0, 0.3);
}
.sidebar-header p {
color: rgba(255, 255, 255, 0.8) !important;
font-size: 0.85rem !important;
margin: 0.5rem 0 0 0 !important;
font-weight: 400 !important;
}
.sidebar-stats {
background: rgba(255, 255, 255, 0.1);
border-radius: 12px;
padding: 1.5rem;
margin: 2rem 0;
backdrop-filter: blur(10px);
border: 1px solid rgba(255, 255, 255, 0.2);
}
.sidebar-stats h3 {
color: white !important;
font-size: 1.1rem !important;
font-weight: 600 !important;
margin-bottom: 1rem !important;
text-align: center !important;
}
.stat-item {
display: flex;
justify-content: space-between;
align-items: center;
padding: 0.5rem 0;
border-bottom: 1px solid rgba(255, 255, 255, 0.1);
direction: rtl;
}
.stat-item:last-child {
border-bottom: none;
}
.stat-label {
color: rgba(255, 255, 255, 0.8) !important;
font-size: 0.9rem;
font-weight: 400;
}
.stat-value {
color: white !important;
font-size: 1rem;
font-weight: 600;
}
.main-header {
background: var(--primary-gradient);
padding: 3rem 2.5rem;
border-radius: var(--border-radius);
color: var(--white);
margin-bottom: 2.5rem;
text-align: center;
box-shadow: var(--shadow-heavy);
position: relative;
overflow: hidden;
}
.main-header::before {
content: '';
position: absolute;
top: -50%;
left: -50%;
width: 200%;
height: 200%;
background: linear-gradient(45deg, transparent, rgba(255,255,255,0.1), transparent);
transform: rotate(45deg);
animation: shimmer 4s infinite;
}
@keyframes shimmer {
0% { transform: translateX(-100%) translateY(-100%) rotate(45deg); }
100% { transform: translateX(100%) translateY(100%) rotate(45deg); }
}
.main-header h1 {
font-family: 'Yekan Bakh', 'Vazirmatn', sans-serif !important;
font-size: 2.5rem;
margin-bottom: 1rem;
font-weight: 700;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
line-height: 1.3;
}
.main-header p {
font-size: 1.2rem;
opacity: 0.9;
margin: 0;
line-height: 1.6;
font-weight: 400;
}
h1, h2, h3, h4, h5, h6 {
font-family: 'Yekan Bakh', 'Vazirmatn', sans-serif !important;
font-weight: 600;
line-height: 1.4;
color: var(--text-primary);
margin-bottom: 1.2rem;
}
h1 { font-size: 2.2rem; font-weight: 700; }
h2 { font-size: 1.8rem; font-weight: 600; }
h3 { font-size: 1.5rem; font-weight: 600; }
h4 { font-size: 1.3rem; font-weight: 500; }
p, div, span, li {
font-family: 'Vazirmatn', sans-serif !important;
line-height: 1.8;
color: var(--text-secondary);
font-weight: 400;
}
strong, b {
font-weight: 600 !important;
color: var(--text-primary);
}
.metric-card {
background: var(--primary-gradient);
padding: 2.5rem 2rem;
border-radius: var(--border-radius);
color: var(--white);
text-align: center;
margin: 1rem 0;
box-shadow: var(--shadow-medium);
transition: var(--transition);
position: relative;
overflow: hidden;
border: 1px solid rgba(255, 255, 255, 0.1);
}
.metric-card::before {
content: '';
position: absolute;
top: 0;
left: -100%;
width: 100%;
height: 100%;
background: linear-gradient(90deg, transparent, rgba(255,255,255,0.2), transparent);
transition: left 0.6s ease;
}
.metric-card:hover::before {
left: 100%;
}
.metric-card:hover {
transform: translateY(-8px) scale(1.02);
box-shadow: 0 20px 40px rgba(102, 126, 234, 0.3);
}
.metric-value {
font-family: 'Yekan Bakh', sans-serif !important;
font-size: 3rem;
font-weight: 800;
margin: 0.8rem 0;
text-shadow: 2px 2px 4px rgba(0,0,0,0.3);
line-height: 1;
}
.metric-label {
font-size: 1rem;
opacity: 0.9;
font-weight: 500;
letter-spacing: 0.5px;
line-height: 1.4;
}
.metric-subtitle {
font-size: 0.85rem;
opacity: 0.7;
font-weight: 400;
margin-top: 0.5rem;
}
.feature-card {
background: rgba(255, 255, 255, 0.95);
backdrop-filter: blur(20px);
padding: 2.5rem;
border-radius: var(--border-radius);
box-shadow: var(--shadow-light);
margin: 2rem 0;
border-right: 4px solid #667eea;
transition: var(--transition);
position: relative;
border: 1px solid rgba(102, 126, 234, 0.1);
}
.feature-card:hover {
transform: translateY(-5px);
box-shadow: var(--shadow-medium);
border-right-width: 6px;
}
.feature-card::before {
content: '';
position: absolute;
top: 0;
left: 0;
right: 0;
height: 3px;
background: var(--primary-gradient);
border-radius: var(--border-radius) var(--border-radius) 0 0;
}
.status-indicator {
display: inline-flex;
align-items: center;
padding: 0.5rem 1.2rem;
border-radius: 25px;
font-size: 0.9rem;
font-weight: 500;
margin: 0.4rem 0.3rem;
box-shadow: var(--shadow-light);
transition: var(--transition);
font-family: 'Vazirmatn', sans-serif !important;
}
.status-indicator:hover {
transform: translateY(-2px);
box-shadow: var(--shadow-medium);
}
.status-success { background: #10b981; color: white; }
.status-warning { background: #f59e0b; color: white; }
.status-info { background: #3b82f6; color: white; }
.status-error { background: #ef4444; color: white; }
.stButton > button {
background: var(--primary-gradient) !important;
color: white !important;
border: none !important;
border-radius: 12px !important;
padding: 0.8rem 2.5rem !important;
font-weight: 600 !important;
font-family: 'Vazirmatn', sans-serif !important;
font-size: 1rem !important;
transition: var(--transition) !important;
box-shadow: var(--shadow-light) !important;
line-height: 1.4 !important;
text-align: center !important;
}
.stButton > button:hover {
transform: translateY(-3px) !important;
box-shadow: var(--shadow-medium) !important;
}
.stButton > button:active {
transform: translateY(-1px) !important;
}
@media (max-width: 1024px) {
:root { --sidebar-width: 250px; }
.main { padding-left: var(--sidebar-width); }
.css-1d391kg { width: var(--sidebar-width); }
}
@media (max-width: 768px) {
:root { --sidebar-width: 0px; }
.main { padding-left: 1rem !important; padding-right: 1rem !important; }
.css-1d391kg { position: relative !important; width: 100% !important; height: auto !important; margin-bottom: 1rem; }
.main-header h1 { font-size: 1.8rem; }
.main-header p { font-size: 1rem; }
.metric-value { font-size: 2.2rem; }
}
#MainMenu { visibility: hidden; }
footer { visibility: hidden; }
header { visibility: hidden; }
.stDeployButton { display: none; }
::-webkit-scrollbar {
width: 8px;
height: 8px;
}
::-webkit-scrollbar-track {
background: rgba(255, 255, 255, 0.1);
border-radius: 4px;
}
::-webkit-scrollbar-thumb {
background: rgba(255, 255, 255, 0.3);
border-radius: 4px;
}
::-webkit-scrollbar-thumb:hover {
background: rgba(255, 255, 255, 0.5);
}
.stSpinner > div {
border-top-color: #667eea !important;
}
.stAlert {
direction: rtl !important;
text-align: right !important;
font-family: 'Vazirmatn', sans-serif !important;
border-radius: 12px !important;
font-weight: 500 !important;
}
.sidebar-footer {
position: absolute;
bottom: 1rem;
left: 1rem;
right: 1rem;
text-align: center;
padding: 1rem;
background: rgba(255, 255, 255, 0.1);
border-radius: 12px;
backdrop-filter: blur(10px);
}
.sidebar-footer p {
color: rgba(255, 255, 255, 0.7) !important;
font-size: 0.8rem !important;
margin: 0 !important;
line-height: 1.4 !important;
}
</style>
""", unsafe_allow_html=True)
# Database Manager
class DatabaseManager:
def __init__(self, db_path: str = None):
self.logger = logging.getLogger(__name__)
self.db_path = db_path or "/tmp/iran_legal.db"
self.logger.info(f"🗄️ Using database path: {self.db_path}")
self.initialize_database()
def _find_writable_path(self, paths):
for path in paths:
try:
directory = os.path.dirname(path)
if directory and not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
test_file = path + ".test"
with open(test_file, 'w') as f:
f.write("test")
os.remove(test_file)
self.logger.info(f"✅ Found writable path: {path}")
return path
except (OSError, PermissionError) as e:
self.logger.warning(f"⚠️ Cannot write to {path}: {e}")
fallback_path = f"/tmp/iran_legal_{int(time.time())}.db"
self.logger.warning(f"⚠️ Using fallback path: {fallback_path}")
return fallback_path
def initialize_database(self):
try:
with sqlite3.connect(self.db_path, timeout=30.0) as conn:
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA foreign_keys = ON;")
cursor = conn.cursor()
cursor.execute("SELECT sqlite_version();")
version = cursor.fetchone()[0]
self.logger.info(f"📊 SQLite version: {version}")
self._create_tables(conn)
cursor.execute("SELECT COUNT(*) FROM documents")
if cursor.fetchone()[0] == 0:
self._add_sample_data(conn)
self.logger.info("📝 Added sample data to empty database")
except Exception as e:
self.logger.error(f"❌ Database initialization failed: {e}")
self.db_path = ":memory:"
with sqlite3.connect(self.db_path) as conn:
self._create_tables(conn)
self._add_minimal_sample_data(conn)
self.logger.info("🆘 Created fallback in-memory database")
def _create_tables(self, conn):
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
source TEXT,
category TEXT,
ai_score REAL DEFAULT 0.0,
ocr_confidence REAL DEFAULT 0.0,
file_size INTEGER DEFAULT 0,
mime_type TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
language TEXT DEFAULT 'fa',
keywords TEXT
)
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS scraped_items (
id TEXT PRIMARY KEY,
url TEXT NOT NULL,
title TEXT,
content TEXT,
domain TEXT,
rating_score REAL DEFAULT 0.0,
word_count INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
status TEXT DEFAULT 'completed'
)
""")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_docs_category ON documents(category);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_docs_score ON documents(ai_score);")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_scraped_domain ON scraped_items(domain);")
conn.commit()
def _add_sample_data(self, conn):
sample_documents = [
{
'title': 'قانون اساسی جمهوری اسلامی ایران - اصول کلی',
'content': 'فصل اول - اصول کلی\nاصل یکم: حکومت ایران، جمهوری اسلامی است...',
'source': 'قانون اساسی ج.ا.ایران',
'category': 'قانون',
'ai_score': 0.95,
'keywords': json.dumps(['قانون اساسی', 'جمهوری اسلامی', 'حاکمیت']),
'file_size': 2450,
'language': 'fa',
'mime_type': 'text/plain'
},
{
'title': 'قانون مجازات اسلامی - مقدمات',
'content': 'باب اول - احکام عمومی\nماده ۱- مجازات‌ها به اعتبار کیفیت به سه دسته تقسیم می‌شوند...',
'source': 'قانون مجازات اسلامی',
'category': 'قانون',
'ai_score': 0.88,
'keywords': json.dumps(['مجازات', 'حدود', 'قصاص', 'تعزیرات']),
'file_size': 1850,
'language': 'fa',
'mime_type': 'text/plain'
},
{
'title': 'نمونه قرارداد خرید و فروش',
'content': 'قرارداد خرید و فروش\nطرفین قرارداد:\nفروشنده:...\nخریدار:...',
'source': 'نمونه قرارداد',
'category': 'قرارداد',
'ai_score': 0.75,
'keywords': json.dumps(['قرارداد', 'خرید', 'فروش', 'طرفین']),
'file_size': 1200,
'language': 'fa',
'mime_type': 'text/plain'
}
]
for doc in sample_documents:
conn.execute("""
INSERT INTO documents (title, content, source, category, ai_score, keywords, file_size, language, mime_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
doc['title'], doc['content'], doc['source'], doc['category'],
doc['ai_score'], doc['keywords'], doc['file_size'], doc['language'], doc['mime_type']
))
sample_scraped = [
{
'id': 'sample_hf_001',
'url': 'https://dastour.ir/sample',
'title': 'نمونه محتوای قانونی',
'content': 'این یک نمونه محتوای قانونی است که از وب‌سایت‌های معتبر جمع‌آوری شده است.',
'domain': 'dastour.ir',
'rating_score': 0.85,
'word_count': 25,
'status': 'completed'
}
]
for item in sample_scraped:
conn.execute("""
INSERT INTO scraped_items (id, url, title, content, domain, rating_score, word_count, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
item['id'], item['url'], item['title'], item['content'],
item['domain'], item['rating_score'], item['word_count'], item['status']
))
conn.commit()
def _add_minimal_sample_data(self, conn):
conn.execute("""
INSERT INTO documents (title, content, source, category, ai_score, keywords, file_size, language, mime_type)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
'نمونه سند قانونی',
'این یک نمونه سند قانونی برای نمایش عملکرد سیستم است.',
'نمونه',
'قانون',
0.5,
json.dumps(['نمونه', 'قانون']),
100,
'fa',
'text/plain'
))
conn.commit()
@contextmanager
def get_connection(self):
conn = None
try:
conn = sqlite3.connect(self.db_path, timeout=10.0)
conn.row_factory = sqlite3.Row
yield conn
except Exception as e:
self.logger.error(f"Database connection error: {e}")
raise
finally:
if conn:
conn.close()
def get_statistics(self) -> Dict:
stats = {
'total_documents': 0,
'total_scraped': 0,
'avg_ai_score': 0.0,
'avg_rating': 0.0,
'categories': {}
}
try:
with self.get_connection() as conn:
cursor = conn.execute("SELECT COUNT(*) FROM documents")
stats['total_documents'] = cursor.fetchone()[0]
cursor = conn.execute("SELECT AVG(ai_score) FROM documents WHERE ai_score > 0")
stats['avg_ai_score'] = cursor.fetchone()[0] or 0.0
cursor = conn.execute("SELECT COUNT(*) FROM scraped_items")
stats['total_scraped'] = cursor.fetchone()[0]
cursor = conn.execute("SELECT AVG(rating_score) FROM scraped_items WHERE rating_score > 0")
stats['avg_rating'] = cursor.fetchone()[0] or 0.0
cursor = conn.execute("SELECT category, COUNT(*) FROM documents WHERE category IS NOT NULL GROUP BY category")
stats['categories'] = dict(cursor.fetchall())
except Exception as e:
self.logger.error(f"Error getting statistics: {e}")
return stats
def get_documents(self, limit: int = 100) -> List[Dict]:
try:
with self.get_connection() as conn:
cursor = conn.execute("SELECT * FROM documents ORDER BY created_at DESC LIMIT ?", (limit,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"Error getting documents: {e}")
return []
def get_scraped_items(self, limit: int = 100) -> List[Dict]:
try:
with self.get_connection() as conn:
cursor = conn.execute("SELECT * FROM scraped_items ORDER BY created_at DESC LIMIT ?", (limit,))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
self.logger.error(f"Error getting scraped items: {e}")
return []
def create_document(self, document_data: Dict[str, Any]) -> int:
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO documents (title, content, source, category, ai_score, ocr_confidence, file_size, mime_type, language, keywords)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
document_data['title'],
document_data['content'],
document_data['source'],
document_data['category'],
document_data['ai_score'],
document_data.get('ocr_confidence', 0.0),
document_data.get('file_size', 0),
document_data.get('mime_type', ''),
document_data.get('language', 'fa'),
json.dumps(document_data.get('keywords', []))
))
document_id = cursor.lastrowid
conn.commit()
return document_id
except Exception as e:
self.logger.error(f"Error creating document: {e}")
raise
def create_scraped_item(self, item_data: Dict[str, Any]) -> str:
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO scraped_items (id, url, title, content, domain, rating_score, word_count, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
item_data['id'],
item_data['url'],
item_data['title'],
item_data['content'],
item_data['domain'],
item_data['rating_score'],
item_data['word_count'],
item_data['status']
))
conn.commit()
return item_data['id']
except Exception as e:
self.logger.error(f"Error creating scraped item: {e}")
raise
# OCR Pipeline
class OCRPipeline:
def __init__(self):
self.tesseract_config = r'--oem 3 --psm 6 -l fas+eng'
self.initialized = False
self.ocr_engine = None
def initialize(self):
if not self.initialized:
try:
import pytesseract
self.ocr_engine = pytesseract
self.initialized = True
logger.info("✅ OCR pipeline initialized successfully")
except Exception as e:
logger.error(f"❌ OCR initialization failed: {e}")
raise
def extract_text_from_pdf(self, pdf_path: str) -> Dict[str, Any]:
try:
start_time = time.time()
doc = fitz.open(pdf_path)
extracted_text = []
total_confidence = 0.0
page_count = len(doc)
for page in doc:
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2))
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
img_array = np.array(img)
gray = cv2.cvtColor(img_array, cv2.COLOR_RGB2GRAY)
_, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
text = self.ocr_engine.image_to_string(thresh, config=self.tesseract_config)
extracted_text.append(text.strip())
confidence = 0.9 # Placeholder for actual OCR confidence
total_confidence += confidence
doc.close()
processing_time = time.time() - start_time
avg_confidence = total_confidence / page_count if page_count > 0 else 0.0
return {
'success': True,
'extracted_text': '\n'.join(extracted_text),
'confidence': avg_confidence,
'processing_time': processing_time,
'page_count': page_count,
'language_detected': 'fa'
}
except Exception as e:
logger.error(f"Error in OCR processing: {e}")
return {
'success': False,
'error_message': str(e)
}
# AI Analysis Engine
class AIAnalysisEngine:
def __init__(self):
self.legal_keywords = {
'قانون': ['قانون', 'ماده', 'تبصره', 'بند', 'فصل', 'باب'],
'قرارداد': ['قرارداد', 'عقد', 'طرفین', 'متعاهدین'],
'حکم': ['حکم', 'رای', 'دادگاه', 'قاضی'],
'اداری': ['اداره', 'سازمان', 'وزارت', 'دولت']
}
def analyze_text(self, text: str, title: str = "") -> Dict:
if not text:
return {'ai_score': 0.0, 'category': 'نامشخص', 'keywords': [], 'word_count': 0, 'char_count': 0}
quality_score = self._calculate_quality_score(text)
category = self._predict_category(text + " " + title)
keywords = self._extract_keywords(text)
return {
'ai_score': quality_score,
'category': category,
'keywords': keywords,
'word_count': len(text.split()),
'char_count': len(text)
}
def _calculate_quality_score(self, text: str) -> float:
score = 0.0
word_count = len(text.split())
if 50 <= word_count <= 5000:
score += 0.3
elif word_count >= 20:
score += 0.1
legal_term_count = 0
for category_terms in self.legal_keywords.values():
for term in category_terms:
legal_term_count += text.count(term)
if legal_term_count >= 3:
score += 0.4
elif legal_term_count >= 1:
score += 0.2
persian_ratio = len(re.findall(r'[\u0600-\u06FF]', text)) / max(len(text), 1)
if persian_ratio > 0.5:
score += 0.3
return min(score, 1.0)
def _predict_category(self, text: str) -> str:
text_lower = text.lower()
category_scores = {}
for category, keywords in self.legal_keywords.items():
score = sum(text_lower.count(keyword) for keyword in keywords)
category_scores[category] = score
if category_scores:
best_category = max(category_scores, key=category_scores.get)
return best_category if category_scores[best_category] > 0 else 'عمومی'
return 'عمومی'
def _extract_keywords(self, text: str, max_keywords: int = 5) -> List[str]:
try:
tfidf = TfidfVectorizer(max_features=max_keywords, stop_words=None)
tfidf_matrix = tfidf.fit_transform([text])
feature_names = tfidf.get_feature_names_out()
return list(feature_names)
except Exception as e:
logger.error(f"Error extracting keywords: {e}")
words = re.findall(r'[\u0600-\u06FF]{3,}', text)
word_freq = {word: text.count(word) for word in set(words) if len(word) > 2}
sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)
return [word[0] for word in sorted_words[:max_keywords]]
# Automated Web Scraping Service
class ScrapingService:
def __init__(self, db: DatabaseManager, ai_engine: AIAnalysisEngine):
self.db = db
self.ai_engine = ai_engine
self.user_agent = "LegalDashboardBot/1.0 (+http://example.com)"
self.legal_domains = ['.ir', '.gov.ir', '.org.ir', 'dastour.ir', 'qavanin.ir'] # Add more as needed
async def crawl_website(self, start_url: str, max_pages: int = 10, delay: float = 1.0) -> List[Dict]:
async with aiohttp.ClientSession(headers={'User-Agent': self.user_agent}) as session:
visited_urls = set()
to_visit = [start_url]
results = []
while to_visit and len(results) < max_pages:
url = to_visit.pop(0)
if url in visited_urls:
continue
visited_urls.add(url)
try:
async with session.get(url, timeout=30) as response:
if response.status == 200:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "بدون عنوان"
content = ' '.join(p.get_text(strip=True) for p in soup.find_all('p'))
if not content.strip():
continue
analysis = self.ai_engine.analyze_text(content, title)
item = {
'id': hashlib.md5(url.encode()).hexdigest(),
'url': url,
'title': title[:100],
'content': content[:5000],
'domain': urlparse(url).netloc,
'rating_score': analysis['ai_score'],
'word_count': analysis['word_count'],
'status': 'completed'
}
self.db.create_scraped_item(item)
results.append(item)
links = [urljoin(url, a.get('href')) for a in soup.find_all('a', href=True)]
for link in links:
parsed_link = urlparse(link)
if parsed_link.netloc and any(parsed_link.netloc.endswith(domain) for domain in self.legal_domains):
if link not in visited_urls and link not in to_visit:
to_visit.append(link)
await asyncio.sleep(delay)
except Exception as e:
logger.error(f"Error scraping {url}: {e}")
results.append({
'id': hashlib.md5(url.encode()).hexdigest(),
'url': url,
'title': 'خطا در استخراج',
'content': '',
'domain': urlparse(url).netloc,
'rating_score': 0.0,
'word_count': 0,
'status': 'failed'
})
return results
# UI Helper Functions
def show_status_message(message: str, status_type: str = "info"):
status_class = f"status-{status_type}"
st.markdown(f'<div class="status-indicator {status_class}">{message}</div>', unsafe_allow_html=True)
def create_metric_card(title: str, value: str, subtitle: str = ""):
return st.markdown(f"""
<div class="metric-card">
<div class="metric-label">{title}</div>
<div class="metric-value">{value}</div>
{f'<div class="metric-subtitle">{subtitle}</div>' if subtitle else ''}
</div>
""", unsafe_allow_html=True)
# Initialize Services
@st.cache_resource
def initialize_services():
try:
db_manager = DatabaseManager()
ai_engine = AIAnalysisEngine()
ocr_pipeline = OCRPipeline()
scraping_service = ScrapingService(db_manager, ai_engine)
stats = db_manager.get_statistics()
if stats['total_documents'] > 0:
show_status_message(f"دیتابیس با موفقیت بارگذاری شد ({stats['total_documents']} سند)", "success")
else:
show_status_message("دیتابیس خالی است، داده‌های نمونه استفاده می‌شود", "warning")
return db_manager, ai_engine, ocr_pipeline, scraping_service
except Exception as e:
logger.error(f"Error initializing services: {e}")
show_status_message(f"خطا در راه‌اندازی سرویس‌ها: {str(e)}", "error")
raise
# Page Functions
def show_dashboard(db: DatabaseManager, ai_engine: AIAnalysisEngine):
st.markdown("""
<div class="main-header">
<h1>⚖️ داشبورد اطلاعات حقوقی ایران</h1>
<p>سامانه هوشمند مدیریت اسناد حقوقی با قابلیت OCR و استخراج خودکار اطلاعات</p>
</div>
""", unsafe_allow_html=True)
stats = db.get_statistics()
col1, col2, col3, col4 = st.columns(4)
with col1:
create_metric_card("کل اسناد", str(stats['total_documents']), "تعداد اسناد ثبت‌شده")
with col2:
create_metric_card("میانگین امتیاز AI", f"{stats['avg_ai_score']:.2f}", "کیفیت محتوا")
with col3:
create_metric_card("محتوای استخراج‌شده", str(stats['total_scraped']), "از وب‌سایت‌ها")
with col4:
create_metric_card("دسته‌بندی‌ها", str(len(stats['categories'])), "انواع اسناد")
col1, col2 = st.columns(2)
with col1:
st.subheader("📊 توزیع دسته‌بندی اسناد")
categories = stats.get('categories', {})
if categories:
fig = px.pie(names=list(categories.keys()), values=list(categories.values()), title="توزیع اسناد بر اساس دسته‌بندی")
fig.update_traces(textposition='inside', textinfo='percent+label')
fig.update_layout(template="plotly_white")
st.plotly_chart(fig, use_container_width=True)
else:
show_status_message("داده‌ای برای نمایش وجود ندارد", "info")
with col2:
st.subheader("📈 روند ثبت اسناد")
documents = db.get_documents(limit=1000)
if documents:
df_docs = pd.DataFrame(documents)
df_docs['created_at'] = pd.to_datetime(df_docs['created_at']).dt.date
df_trend = df_docs.groupby('created_at').size().reset_index(name='تعداد اسناد')
fig = px.line(df_trend, x='created_at', y='تعداد اسناد', title="روند ثبت اسناد (30 روز اخیر)")
fig.update_layout(xaxis_title="تاریخ", yaxis_title="تعداد اسناد", template="plotly_white")
st.plotly_chart(fig, use_container_width=True)
else:
show_status_message("هیچ سندی برای نمایش وجود ندارد", "info")
def show_upload_page(db: DatabaseManager, ocr_pipeline: OCRPipeline, ai_engine: AIAnalysisEngine):
st.title("📤 آپلود و پردازش اسناد")
st.markdown("""
<div class="feature-card">
<h3>📁 آپلود فایل PDF</h3>
<p>فایل‌های PDF خود را برای پردازش OCR و تحلیل هوش مصنوعی بارگذاری کنید</p>
</div>
""", unsafe_allow_html=True)
uploaded_files = st.file_uploader("انتخاب فایل‌های PDF", type=['pdf'], accept_multiple_files=True)
if uploaded_files:
col1, col2 = st.columns(2)
with col1:
source = st.text_input("منبع سند:", value="آپلود کاربر")
category = st.selectbox("دسته‌بندی:", ["خودکار", "قانون", "قرارداد", "حکم", "اداری", "عمومی"])
with col2:
process_mode = st.selectbox("حالت پردازش:", ["استاندارد", "دقیق", "سریع"])
if st.button("🚀 شروع پردازش", type="primary"):
ocr_pipeline.initialize()
progress_bar = st.progress(0)
status_text = st.empty()
results = []
for i, uploaded_file in enumerate(uploaded_files):
status_text.text(f"در حال پردازش {uploaded_file.name}...")
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(uploaded_file.read())
tmp_file_path = tmp_file.name
try:
ocr_result = ocr_pipeline.extract_text_from_pdf(tmp_file_path)
if ocr_result.get('success'):
analysis = ai_engine.analyze_text(ocr_result['extracted_text'], uploaded_file.name)
document_data = {
'title': uploaded_file.name.replace('.pdf', ''),
'content': ocr_result['extracted_text'],
'source': source,
'category': analysis['category'] if category == "خودکار" else category,
'ai_score': analysis['ai_score'],
'ocr_confidence': ocr_result['confidence'],
'file_size': uploaded_file.size,
'mime_type': 'application/pdf',
'language': ocr_result['language_detected'],
'keywords': analysis['keywords']
}
doc_id = db.create_document(document_data)
results.append({
'filename': uploaded_file.name,
'status': 'موفق',
'document_id': doc_id,
'ai_score': analysis['ai_score']
})
else:
results.append({
'filename': uploaded_file.name,
'status': 'خطا',
'error': ocr_result.get('error_message', 'خطای نامشخص')
})
except Exception as e:
results.append({
'filename': uploaded_file.name,
'status': 'خطا',
'error': str(e)
})
finally:
if os.path.exists(tmp_file_path):
os.unlink(tmp_file_path)
progress_bar.progress((i + 1) / len(uploaded_files))
status_text.text("پردازش کامل شد!")
successful = len([r for r in results if r['status'] == 'موفق'])
st.markdown(f"""
<div class="feature-card">
<h3>نتایج پردازش</h3>
<p>موفق: {successful} | ناموفق: {len(results) - successful}</p>
</div>
""", unsafe_allow_html=True)
for result in results:
with st.expander(f"{'✅' if result['status'] == 'موفق' else '❌'} {result['filename']}"):
if result['status'] == 'موفق':
st.write(f"**شناسه سند:** {result['document_id']}")
st.write(f"**امتیاز AI:** {result['ai_score']:.2f}")
else:
st.write(f"**خطا:** {result['error']}")
def show_scraping_page(db: DatabaseManager, scraping_service: ScrapingService):
st.title("🌐 استخراج خودکار محتوا")
st.markdown("""
<div class="feature-card">
<h3>🌐 استخراج محتوای حقوقی</h3>
<p>وارد کردن آدرس وب‌سایت برای استخراج خودکار محتوای حقوقی</p>
</div>
""", unsafe_allow_html=True)
col1, col2 = st.columns(2)
with col1:
start_url = st.text_input("آدرس وب‌سایت شروع:", placeholder="https://example.ir")
max_pages = st.number_input("حداکثر صفحات:", 1, 50, 10)
with col2:
delay = st.slider("تأخیر بین درخواست‌ها (ثانیه):", 0.5, 5.0, 1.0)
if st.button("🚀 شروع استخراج", type="primary"):
if start_url:
with st.spinner("در حال استخراج..."):
results = asyncio.run(scraping_service.crawl_website(start_url, max_pages, delay))
successful = len([r for r in results if r['status'] == 'completed'])
st.markdown(f"""
<div class="feature-card">
<h3>نتایج استخراج</h3>
<p>موفق: {successful} | ناموفق: {len(results) - successful}</p>
</div>
""", unsafe_allow_html=True)
for result in results:
with st.expander(f"{'✅' if result['status'] == 'completed' else '❌'} {result['title']}"):
st.write(f"**آدرس:** {result['url']}")
st.write(f"**دامنه:** {result['domain']}")
st.write(f"**امتیاز کیفیت:** {result['rating_score']:.2f}")
st.text_area("محتوا:", result['content'][:1000], height=100, disabled=True)
else:
show_status_message("لطفاً یک آدرس معتبر وارد کنید", "error")
def show_analytics_page(db: DatabaseManager):
st.title("📊 تحلیل و گزارش")
stats = db.get_statistics()
col1, col2, col3, col4 = st.columns(4)
with col1:
create_metric_card("کل اسناد", str(stats['total_documents']))
with col2:
create_metric_card("میانگین امتیاز AI", f"{stats['avg_ai_score']:.2f}")
with col3:
create_metric_card("محتوای استخراج‌شده", str(stats['total_scraped']))
with col4:
create_metric_card("دسته‌بندی‌ها", str(len(stats['categories'])))
col1, col2 = st.columns(2)
with col1:
st.subheader("📊 توزیع دسته‌بندی")
categories = stats.get('categories', {})
if categories:
fig = px.bar(x=list(categories.keys()), y=list(categories.values()), title="تعداد اسناد در هر دسته‌بندی")
fig.update_layout(xaxis_title="دسته‌بندی", yaxis_title="تعداد اسناد", template="plotly_white")
st.plotly_chart(fig, use_container_width=True)
else:
show_status_message("داده‌ای برای نمایش وجود ندارد", "info")
with col2:
st.subheader("🎯 توزیع امتیاز AI")
documents = db.get_documents()
if documents:
ai_scores = [doc['ai_score'] for doc in documents]
score_ranges = {
'عالی (0.8-1.0)': len([s for s in ai_scores if s >= 0.8]),
'خوب (0.6-0.8)': len([s for s in ai_scores if 0.6 <= s < 0.8]),
'متوسط (0.4-0.6)': len([s for s in ai_scores if 0.4 <= s < 0.6]),
'ضعیف (0.0-0.4)': len([s for s in ai_scores if s < 0.4])
}
fig = px.pie(values=list(score_ranges.values()), names=list(score_ranges.keys()), title="توزیع کیفیت اسناد")
fig.update_traces(textposition='inside', textinfo='percent+label')
fig.update_layout(template="plotly_white")
st.plotly_chart(fig, use_container_width=True)
def main():
load_css()
db, ai_engine, ocr_pipeline, scraping_service = initialize_services()
st.sidebar.markdown("""
<div class="sidebar-header">
<h2>⚖️ داشبورد حقوقی</h2>
<p>مدیریت هوشمند اسناد حقوقی</p>
</div>
""", unsafe_allow_html=True)
stats = db.get_statistics()
st.sidebar.markdown("""
<div class="sidebar-stats">
<h3>آمار سیستم</h3>
<div class="stat-item"><span class="stat-label">کل اسناد:</span><span class="stat-value">{}</span></div>
<div class="stat-item"><span class="stat-label">محتوای استخراج‌شده:</span><span class="stat-value">{}</span></div>
<div class="stat-item"><span class="stat-label">میانگین امتیاز AI:</span><span class="stat-value">{:.2f}</span></div>
</div>
""".format(stats['total_documents'], stats['total_scraped'], stats['avg_ai_score']), unsafe_allow_html=True)
page = st.sidebar.radio(
"صفحه:",
["داشبورد", "آپلود اسناد", "استخراج وب", "تحلیل"],
index=0
)
if page == "داشبورد":
show_dashboard(db, ai_engine)
elif page == "آپلود اسناد":
show_upload_page(db, ocr_pipeline, ai_engine)
elif page == "استخراج وب":
show_scraping_page(db, scraping_service)
elif page == "تحلیل":
show_analytics_page(db)
st.sidebar.markdown("""
<div class="sidebar-footer">
<p>نسخه 1.0.0 | توسعه‌یافته برای فضاهای Hugging Face</p>
</div>
""", unsafe_allow_html=True)
if __name__ == "__main__":
main()