AltelAgent / app.py
futureDoctor's picture
app.py
c047192 verified
import gradio as gr
import pandas as pd
from urllib.parse import urlparse
import time
import requests
from io import BytesIO
import json
import seaborn as sns
import matplotlib.pyplot as plt
# Try to import Plotly, install if missing, fallback gracefully
try:
import plotly.express as px
import plotly.graph_objects as go
PLOTLY_AVAILABLE = True
print("✅ Plotly successfully imported")
except ImportError:
print("⚠️ Plotly not found, attempting to install...")
try:
import subprocess
import sys
subprocess.check_call([sys.executable, "-m", "pip", "install", "plotly", "--quiet"])
import plotly.express as px
import plotly.graph_objects as go
PLOTLY_AVAILABLE = True
print("✅ Plotly installed and imported successfully")
except Exception as e:
print(f"❌ Failed to install Plotly: {e}")
print("📊 Charts will be disabled, but app will work normally")
PLOTLY_AVAILABLE = False
# ===================== URL Validation =====================
def is_instagram_url(url: str) -> bool:
"""Validate if URL is a proper Instagram URL"""
try:
url = url.strip()
if not url:
return False
# Add https if missing
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
parsed = urlparse(url)
domain = parsed.netloc.lower()
# Check if it's Instagram domain
if 'instagram.com' not in domain:
return False
# Check if it has a valid path (not just homepage)
if not parsed.path or parsed.path in ['/', '']:
return False
return True
except Exception as e:
print(f"URL validation error: {e}")
return False
def check_url_accessible(url: str, timeout: int = 5) -> bool:
"""Lenient check for public reachability of the URL (Instagram often blocks bots)."""
try:
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive',
}
response = requests.head(url, allow_redirects=True, timeout=timeout, headers=headers)
print(f"URL check response: {response.status_code}")
return response.status_code in (200, 301, 302, 403, 429) or response.status_code < 500
except requests.exceptions.Timeout:
print("URL check timed out - assuming URL is valid")
return True
except requests.exceptions.ConnectionError:
print("Connection error during URL check - assuming URL is valid")
return True
except Exception as e:
print(f"URL accessibility check failed: {e}")
return True
def test_file_generation():
"""Test function to generate sample files with fake comments."""
try:
sample_data = pd.DataFrame({
'Пользователь': ['user1', 'user2', 'user3'],
'Комментарий': ['Тест 1', 'Тест 2', 'Тест 3'],
'Дата': ['2025-09-20', '2025-09-20', '2025-09-20'],
'Тональность': ['позитивный', 'нейтральный', 'негативный'],
'Категория': ['вопрос', 'отзыв', 'жалоба'],
'Модерация': ['безопасно', 'безопасно', 'безопасно'],
'Автоответ': ['Ответ 1', 'Ответ 2', 'Ответ 3']
})
categories = sample_data['Категория'].value_counts()
sentiments = sample_data['Тональность'].value_counts()
files = create_report_files(sample_data, categories, sentiments)
if files:
return f"✅ Тестовые файлы созданы: {len(files)} файлов", files
else:
return "❌ Ошибка создания тестовых файлов", []
except Exception as e:
return f"❌ Ошибка тестирования: {str(e)}", []
# ===================== Charts with Plotly =====================
def make_charts(categories: pd.Series, sentiments: pd.Series):
"""Create visualization charts with Seaborn"""
try:
# Category pie chart (Matplotlib only, since Seaborn doesn’t have native pie)
fig_cat, ax_cat = plt.subplots()
if not categories.empty:
ax_cat.pie(
categories.values,
labels=categories.index,
autopct='%1.1f%%',
startangle=140,
colors=sns.color_palette("Set3", len(categories))
)
ax_cat.set_title("Распределение по категориям", color="#E6007E")
else:
ax_cat.text(0.5, 0.5, "Нет данных для отображения",
ha="center", va="center")
# Sentiment bar chart
fig_sent, ax_sent = plt.subplots()
if not sentiments.empty:
sns.barplot(
x=sentiments.index,
y=sentiments.values,
palette={
'позитивный': '#28a745',
'негативный': '#dc3545',
'нейтральный': '#6c757d',
'positive': '#28a745',
'negative': '#dc3545',
'neutral': '#6c757d'
},
ax=ax_sent
)
ax_sent.set_title("Распределение по тональности", color="#E6007E")
ax_sent.set_ylabel("Количество")
ax_sent.set_xlabel("Тональность")
else:
ax_sent.text(0.5, 0.5, "Нет данных для отображения",
ha="center", va="center")
return fig_cat, fig_sent
except Exception as e:
print(f"Chart creation error: {e}")
return None, None
# ===================== File Generation =====================
def create_report_files(df: pd.DataFrame, categories: pd.Series, sentiments: pd.Series):
"""Create CSV and Excel report files (robust & portable)"""
import os
import tempfile
# Ensure non-empty frame for saving
if df is None or df.empty:
df = pd.DataFrame({"Сообщение": ["Нет данных для отображения"]})
# Create stable temp files that persist after write
csv_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
csv_tmp.close()
xlsx_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx")
xlsx_tmp.close()
csv_path = csv_tmp.name
xlsx_path = xlsx_tmp.name
# Save CSV (UTF-8 with BOM so Excel opens Cyrillic cleanly)
df.to_csv(csv_path, index=False, encoding="utf-8-sig")
# Excel writer: try openpyxl, fall back to xlsxwriter, fall back to very simple save
summary_rows = {
'Метрика': [
'Всего комментариев',
'Уникальных пользователей',
'Уникальных категорий',
'Уникальных тональностей'
],
'Значение': [
int(len(df)),
int(df["Пользователь"].nunique()) if "Пользователь" in df.columns else 0,
int(len(categories)) if isinstance(categories, pd.Series) and not categories.empty else 0,
int(len(sentiments)) if isinstance(sentiments, pd.Series) and not sentiments.empty else 0,
]
}
try:
try:
with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer:
df.to_excel(writer, sheet_name="Комментарии", index=False)
pd.DataFrame(summary_rows).to_excel(writer, sheet_name="Сводка", index=False)
if isinstance(categories, pd.Series) and not categories.empty:
cat_df = categories.reset_index()
cat_df.columns = ['Категория', 'Количество']
cat_df.to_excel(writer, sheet_name="Категории", index=False)
if isinstance(sentiments, pd.Series) and not sentiments.empty:
sen_df = sentiments.reset_index()
sen_df.columns = ['Тональность', 'Количество']
sen_df.to_excel(writer, sheet_name="Тональности", index=False)
except Exception:
with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as writer:
df.to_excel(writer, sheet_name="Комментарии", index=False)
pd.DataFrame(summary_rows).to_excel(writer, sheet_name="Сводка", index=False)
except Exception:
# As a very last resort, at least save the main sheet
df.to_excel(xlsx_path, index=False)
files_to_return = [p for p in (csv_path, xlsx_path) if os.path.exists(p) and os.path.getsize(p) > 0]
return files_to_return
# ===================== Main Processing Function =====================
def process_instagram_url(url: str):
"""Main function to process Instagram URL and return analysis results"""
# Initialize empty dataframes for consistent return structure
empty_df = pd.DataFrame(columns=["Пользователь", "Комментарий", "Дата", "Тональность", "Категория", "Модерация", "Автоответ"])
empty_moderation = pd.DataFrame(columns=["Пользователь", "Комментарий", "Модерация"])
empty_answers = pd.DataFrame(columns=["Пользователь", "Комментарий", "Автоответ"])
# Validate URL
if not url or not url.strip():
return (
"❌ Пожалуйста, введите Instagram URL",
empty_df, empty_moderation, empty_answers,
"Введите валидную Instagram ссылку для начала анализа.",
None, None, []
)
if not is_instagram_url(url):
return (
"❌ Это не валидная Instagram ссылка. Пожалуйста, введите корректную ссылку.",
empty_df, empty_moderation, empty_answers,
"Неверный формат ссылки Instagram.",
None, None, []
)
# Check URL accessibility (but don't fail if check fails)
print(f"Checking URL accessibility: {url}")
url_accessible = check_url_accessible(url)
if not url_accessible:
print(f"⚠️ URL accessibility check failed, but continuing anyway...")
# Don't return error here - Instagram often blocks automated checks
# but the API might still work
# Send request to webhook
try:
webhook_url = "https://azamat-m.app.n8n.cloud/webhook/instagram"
payload = {"urls": [url.strip()]}
headers = {
"Content-Type": "application/json",
"User-Agent": "InstagramAnalyzer/1.0"
}
print(f"Sending request to webhook: {payload}")
response = requests.post(webhook_url, json=payload, headers=headers, timeout=200) # Increased timeout to 200 seconds
print(f"Webhook response status: {response.status_code}")
print(f"Response headers: {dict(response.headers)}")
# Log first 200 chars of response for debugging
if hasattr(response, 'text'):
print(f"Response preview: {response.text[:200]}...")
except requests.exceptions.Timeout:
return (
"❌ Превышено время ожидания ответа от сервера (200 сек). Попробуйте позже.",
empty_df, empty_moderation, empty_answers,
"Тайм-аут запроса к серверу.",
None, None, []
)
except requests.exceptions.ConnectionError:
return (
"❌ Ошибка подключения к серверу анализа. Проверьте интернет-соединение.",
empty_df, empty_moderation, empty_answers,
"Ошибка подключения к серверу.",
None, None, []
)
except Exception as e:
return (
f"❌ Ошибка при отправке запроса: {str(e)}",
empty_df, empty_moderation, empty_answers,
f"Ошибка запроса: {str(e)}",
None, None, []
)
# Check response status
if response.status_code != 200:
return (
f"⚠️ Сервер вернул код ошибки {response.status_code}. Попробуйте позже.",
empty_df, empty_moderation, empty_answers,
f"Ошибка сервера: HTTP {response.status_code}",
None, None, []
)
# Parse response
try:
data = response.json()
print(f"Received data type: {type(data)}, length: {len(data) if isinstance(data, list) else 'N/A'}")
except json.JSONDecodeError as e:
return (
"❌ Сервер вернул некорректный ответ. Попробуйте позже.",
empty_df, empty_moderation, empty_answers,
f"Ошибка парсинга ответа: {str(e)}",
None, None, []
)
# Validate data format
if not isinstance(data, list) or len(data) == 0:
return (
"✅ Запрос выполнен успешно, но комментарии не найдены.",
empty_df, empty_moderation, empty_answers,
"Комментарии не найдены. Возможно, пост не содержит комментариев или они скрыты.",
None, None, []
)
# Process data
try:
processed_rows = []
for item in data:
# Extract all available fields
user = item.get("user", "")
comment = item.get("comment", item.get("chatInput", ""))
created_at = item.get("created_at", "")
sentiment = item.get("sentiment", "neutral")
category = item.get("category", "общее")
harmful = item.get("harmful_content", "none")
auto_answer = item.get("output", "")
# Format creation date if available
formatted_date = ""
if created_at:
try:
from datetime import datetime
dt = datetime.fromisoformat(created_at.replace('Z', '+00:00'))
formatted_date = dt.strftime("%Y-%m-%d %H:%M")
except:
formatted_date = created_at
# Translate sentiment values to Russian if needed
sentiment_mapping = {
"positive": "позитивный",
"negative": "негативный",
"neutral": "нейтральный"
}
sentiment_ru = sentiment_mapping.get(sentiment.lower(), sentiment)
# Translate category values to Russian if needed
category_mapping = {
"question": "вопрос",
"complaint": "жалоба",
"review": "отзыв",
"general": "общее"
}
category_ru = category_mapping.get(category.lower(), category)
# Translate harmful_content values
moderation_mapping = {
"none": "безопасно",
"toxic": "токсичный",
"spam": "спам"
}
moderation_ru = moderation_mapping.get(harmful.lower(), harmful)
processed_rows.append({
"Пользователь": user,
"Комментарий": comment,
"Дата": formatted_date,
"Тональность": sentiment_ru,
"Категория": category_ru,
"Модерация": moderation_ru,
"Автоответ": auto_answer
})
# Create dataframes
df_all = pd.DataFrame(processed_rows)
df_moderation = df_all[["Пользователь", "Комментарий", "Модерация"]].copy()
df_answers = df_all[["Пользователь", "Комментарий", "Автоответ"]].copy()
# Calculate statistics
total_comments = len(df_all)
unique_users = df_all["Пользователь"].nunique() if "Пользователь" in df_all.columns else 0
categories = df_all["Категория"].value_counts()
sentiments = df_all["Тональность"].value_counts()
# Create statistics markdown
stats_text = f"""
**📊 Общая статистика:**
- **Всего комментариев:** {total_comments}
- **Уникальных пользователей:** {unique_users}
**📂 По категориям:**
{chr(10).join([f'- **{category}:** {count}' for category, count in categories.items()])}
**💭 По тональности:**
{chr(10).join([f'- **{sentiment}:** {count}' for sentiment, count in sentiments.items()])}
""".strip()
# Create charts
fig_categories, fig_sentiments = make_charts(categories, sentiments)
# Create report files
print("Creating report files...")
report_files = create_report_files(df_all, categories, sentiments)
print(f"Report files created: {report_files}")
success_message = f"✅ Успешно обработано {total_comments} комментариев от {unique_users} пользователей!"
return (
success_message,
df_all,
df_moderation,
df_answers,
stats_text,
fig_categories,
fig_sentiments,
report_files
)
except Exception as e:
print(f"Data processing error: {e}")
return (
f"❌ Ошибка при обработке данных: {str(e)}",
empty_df, empty_moderation, empty_answers,
f"Ошибка обработки: {str(e)}",
None, None, []
)
# ===================== Custom CSS =====================
custom_css = """
/* Altel brand colors and styling */
.gradio-container {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
max-width: 1200px;
margin: 0 auto;
}
/* Headers and titles */
.gradio-container h1, .gradio-container h2, .gradio-container h3 {
color: #E6007E !important;
font-weight: 600;
}
/* Primary buttons */
.gradio-container .primary {
background: linear-gradient(135deg, #E6007E 0%, #C5006C 100%) !important;
color: white !important;
border: none !important;
border-radius: 8px !important;
font-weight: 600 !important;
box-shadow: 0 2px 4px rgba(230, 0, 126, 0.3) !important;
transition: all 0.3s ease !important;
}
.gradio-container .primary:hover {
transform: translateY(-1px) !important;
box-shadow: 0 4px 8px rgba(230, 0, 126, 0.4) !important;
}
/* Tab styling */
.gradio-container .tab-nav button {
color: #E6007E !important;
border-bottom: 2px solid transparent !important;
}
.gradio-container .tab-nav button.selected {
color: #E6007E !important;
border-bottom: 2px solid #E6007E !important;
font-weight: 600 !important;
}
/* Table headers */
.gradio-container table thead th {
background-color: #E6007E !important;
color: white !important;
font-weight: 600 !important;
}
/* Cards and blocks */
.gradio-container .block {
border-radius: 12px !important;
border: 1px solid #f0f0f0 !important;
box-shadow: 0 1px 3px rgba(0,0,0,0.1) !important;
}
/* Status messages */
.gradio-container .textbox textarea[readonly] {
background-color: #f8f9fa !important;
border-left: 4px solid #E6007E !important;
}
"""
# ===================== Gradio Interface =====================
def create_app():
"""Create the Gradio application"""
with gr.Blocks(css=custom_css, title="Instagram Comment Analyzer", theme=gr.themes.Soft()) as app:
# Header
gr.Markdown("""
# 📸 Instagram Comment Analyzer
Анализ комментариев Instagram с помощью ИИ. Получите детальную аналитику тональности,
категоризацию и модерацию контента.
**Как использовать:**
1. Вставьте ссылку на публичный пост Instagram
2. Нажмите "Анализировать комментарии"
3. Просмотрите результаты в различных вкладках
""")
# Input section
with gr.Row():
with gr.Column(scale=4):
url_input = gr.Textbox(
label="🔗 Instagram URL",
placeholder="https://www.instagram.com/p/XXXXXXXXX/",
info="Введите ссылку на пост, рилс или IGTV"
)
with gr.Column(scale=1):
analyze_btn = gr.Button(
"🚀 Анализировать комментарии",
variant="primary",
size="lg"
)
# Status output
status_output = gr.Textbox(
label="📋 Статус обработки",
interactive=False,
lines=2
)
# Results tabs
with gr.Tabs():
with gr.Tab("💬 Все комментарии"):
comments_df = gr.Dataframe(
headers=["Пользователь", "Комментарий", "Дата", "Тональность", "Категория", "Модерация", "Автоответ"],
interactive=False,
wrap=True
)
with gr.Tab("🛡️ Модерация"):
moderation_df = gr.Dataframe(
headers=["Пользователь", "Комментарий", "Модерация"],
interactive=False,
wrap=True
)
with gr.Tab("🤖 Автоответы"):
answers_df = gr.Dataframe(
headers=["Пользователь", "Комментарий", "Автоответ"],
interactive=False,
wrap=True
)
with gr.Tab("📊 Аналитика"):
with gr.Row():
stats_markdown = gr.Markdown("Загрузите Instagram ссылку для просмотра статистики.")
# Use Matplotlib components because make_charts returns matplotlib fig objects
with gr.Row():
with gr.Column():
categories_chart = gr.Plot(label="Распределение по категориям")
with gr.Column():
sentiments_chart = gr.Plot(label="Распределение по тональности")
# with gr.Column():
# categories_chart = gr.Matplotlib(label="Распределение по категориям")
# with gr.Column():
# sentiments_chart = gr.Matplotlib(label="Распределение по тональности")
download_files = gr.File(
label="📁 Скачать отчеты (CSV + Excel)",
file_count="multiple",
file_types=[".csv", ".xlsx"],
interactive=False,
visible=True
)
# Example section
gr.Markdown("""
### 📝 Примеры ссылок:
- `https://www.instagram.com/p/XXXXXXXXX/` - обычный пост
- `https://www.instagram.com/reel/XXXXXXXXX/` - рилс
- `https://www.instagram.com/tv/XXXXXXXXX/` - IGTV
⚠️ **Важно:** Ссылка должна вести на публичный контент
### 🔧 Отладка:
Если файлы не скачиваются, проверьте логи в консоли Hugging Face Spaces.
""")
# Add test file generation button
with gr.Row():
test_files_btn = gr.Button("🧪 Создать тестовые файлы", variant="secondary")
test_status = gr.Textbox(label="Статус теста", interactive=False, visible=False)
test_files_output = gr.File(label="Тестовые файлы", file_count="multiple", visible=False)
# Connect the processing function
analyze_btn.click(
fn=process_instagram_url,
inputs=[url_input],
outputs=[
status_output,
comments_df,
moderation_df,
answers_df,
stats_markdown,
categories_chart,
sentiments_chart,
download_files
]
)
# Connect test function
test_files_btn.click(
fn=test_file_generation,
inputs=[],
outputs=[test_status, test_files_output]
).then(
lambda: (gr.update(visible=True), gr.update(visible=True)),
outputs=[test_status, test_files_output]
)
return app
# ===================== Launch Application =====================
if __name__ == "__main__":
app = create_app()
app.launch(
share=False,
server_name="0.0.0.0",
server_port=7860
)