| """ |
| app.py β FastAPI application for Scraping + Sentiment Analysis + WordCloud. |
| """ |
| from __future__ import annotations |
|
|
| import base64 |
| import io |
| import csv |
| import json |
| import os |
| import traceback |
| from typing import Optional |
|
|
| import uvicorn |
| from fastapi import FastAPI, File, Form, Request, UploadFile |
| from fastapi.responses import HTMLResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.templating import Jinja2Templates |
|
|
| from services.medos import scrape_medos |
| from services.tiktok import scrape_tiktok |
| from services.news import scrape_news |
| from services.preprocessing import preprocess_text |
| from services.sentiment import analyze_sentiment |
| from services.wordcloud_service import generate_wordcloud |
| from services.facebook import scrape_facebook |
|
|
| |
| app = FastAPI(title="Sentiment Analysis Dashboard") |
|
|
| app.mount("/static", StaticFiles(directory="static"), name="static") |
|
|
| templates = Jinja2Templates(directory="templates") |
|
|
|
|
| |
|
|
| def _split_targets(raw: str | None) -> list[str]: |
| """Split a newline/comma-separated string into a clean list of non-empty strings.""" |
| if not raw or not raw.strip(): |
| return [] |
| parts = [] |
| for line in raw.replace(",", "\n").splitlines(): |
| s = line.strip() |
| if s: |
| parts.append(s) |
| return parts |
|
|
|
|
| def _is_enabled(flag: str | None) -> bool: |
| """Return True only if the enable flag is explicitly '1'.""" |
| return (flag or "").strip() == "1" |
|
|
|
|
| def _flatten_for_csv(raw_texts: list) -> list[dict]: |
| flat = [] |
| for item in raw_texts: |
| if isinstance(item, str): |
| flat.append({"text": item}) |
| elif isinstance(item, dict): |
| base = {k: v for k, v in item.items() if k != "comments"} |
| comments = item.get("comments", []) |
| if not comments: |
| flat.append(base) |
| else: |
| for c in comments: |
| row = dict(base) |
| if isinstance(c, str): |
| row["comment_text"] = c |
| elif isinstance(c, dict): |
| row["comment_author"] = c.get("author", "") |
| row["comment_text"] = c.get("comment", "") |
| flat.append(row) |
| for r in c.get("replies", []): |
| rep_row = dict(base) |
| rep_row["comment_author"] = r.get("author", "") |
| rep_row["comment_text"] = r.get("comment", "") |
| flat.append(rep_row) |
| continue |
| flat.append(row) |
| return flat |
|
|
| def _extract_texts(raw_texts: list) -> list[str]: |
| extracted = [] |
| for item in raw_texts: |
| if isinstance(item, str): |
| extracted.append(item) |
| elif isinstance(item, dict): |
| if "caption_short" in item: extracted.append(item["caption_short"]) |
| if "caption_detail" in item: extracted.append(item["caption_detail"]) |
| if "caption" in item: extracted.append(item["caption"]) |
| if "judul" in item: extracted.append(item["judul"]) |
| if "isi_berita" in item: extracted.append(item["isi_berita"]) |
| if "tag" in item: extracted.append(item["tag"]) |
| for c in item.get("comments", []): |
| if isinstance(c, str): |
| extracted.append(c) |
| elif isinstance(c, dict): |
| extracted.append(c.get("comment", "")) |
| for r in c.get("replies", []): |
| extracted.append(r.get("comment", "")) |
| return extracted |
|
|
| def _run_pipeline(raw_texts: list) -> dict: |
| """Shared preprocessing β sentiment β wordcloud pipeline.""" |
| if not raw_texts: |
| return { |
| "error": "Tidak ada teks yang berhasil dikumpulkan.", |
| "result": None, |
| "image": None, |
| "total_scraped": 0, |
| "csv_filename": None, |
| } |
|
|
| |
| import os |
| import csv |
| from datetime import datetime |
| os.makedirs("static/output", exist_ok=True) |
| csv_fname = f"scraped_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" |
| csv_path = os.path.join("static", "output", csv_fname) |
| |
| flat_data = _flatten_for_csv(raw_texts) |
| if flat_data: |
| keys = set() |
| for d in flat_data: keys.update(d.keys()) |
| with open(csv_path, "w", newline="", encoding="utf-8-sig") as f: |
| writer = csv.DictWriter(f, fieldnames=list(keys)) |
| writer.writeheader() |
| writer.writerows(flat_data) |
| csv_url = f"/static/output/{csv_fname}" |
| else: |
| csv_url = None |
|
|
| |
| text_list = _extract_texts(raw_texts) |
| |
| total_scraped = len(text_list) |
| print(f"[APP] Total item yg di-ekstrak teksnya: {total_scraped}") |
|
|
| |
| print("[APP] Preprocessingβ¦") |
| clean_texts = preprocess_text(text_list) |
| clean_texts = [t for t in clean_texts if t and t.strip()] |
|
|
| if not clean_texts: |
| return { |
| "error": "Semua teks kosong setelah preprocessing. Coba input yang berbeda.", |
| "result": None, |
| "image": None, |
| "total_scraped": total_scraped, |
| "csv_filename": csv_url, |
| } |
|
|
| |
| print(f"[APP] Analyzing sentiment on {len(clean_texts)} textsβ¦") |
| try: |
| sentiment = analyze_sentiment(clean_texts) |
| except Exception as e: |
| print(f"[APP] Sentiment error: {e}\n{traceback.format_exc()}") |
| sentiment = None |
|
|
| |
| print("[APP] Generating wordcloudβ¦") |
| image_b64 = None |
| try: |
| buf = io.BytesIO() |
| wc_ok = generate_wordcloud(clean_texts, buf) |
| if wc_ok: |
| buf.seek(0) |
| image_b64 = base64.b64encode(buf.read()).decode("utf-8") |
| except Exception as e: |
| print(f"[APP] WordCloud error: {e}") |
|
|
| return { |
| "error": None, |
| "result": sentiment, |
| "image": image_b64, |
| "total_scraped": total_scraped, |
| "csv_filename": csv_url, |
| } |
|
|
|
|
| |
|
|
| @app.get("/", response_class=HTMLResponse) |
| async def home(request: Request): |
| return templates.TemplateResponse(request=request, name="index.html") |
|
|
|
|
| @app.post("/process", response_class=HTMLResponse) |
| async def process( |
| request: Request, |
| |
| |
| enable_instagram: str = Form(""), |
| enable_tiktok: str = Form(""), |
| enable_facebook: str = Form(""), |
| enable_news: str = Form(""), |
| |
| |
| ig_username: str = Form(None), |
| ig_password: str = Form(None), |
| target_accounts: str = Form(None), |
| mode: str = Form("all"), |
| |
| |
| tiktok_cookie: str = Form(None), |
| tiktok_targets: str = Form(None), |
| |
| |
| fb_username: str = Form(None), |
| fb_password: str = Form(None), |
| facebook_groups: str = Form(None), |
| |
| |
| news_portals: str = Form(None), |
| news_keyword: str = Form("kabupaten cirebon"), |
| news_pages: int = Form(1), |
| ): |
| raw_texts: list = [] |
|
|
| |
| if _is_enabled(enable_instagram): |
| ig_targets = _split_targets(target_accounts) |
| if not ig_username or not ig_password: |
| print("[APP] Instagram diaktifkan tapi username/password kosong β skip.") |
| elif not ig_targets: |
| print("[APP] Instagram diaktifkan tapi tidak ada target β skip.") |
| else: |
| for tgt in ig_targets: |
| print(f"[APP] Scraping Instagram: {tgt}") |
| try: |
| texts = scrape_medos(ig_username, ig_password, tgt, mode) |
| raw_texts.extend(texts) |
| print(f"[APP] Instagram @{tgt} β {len(texts)} teks") |
| except Exception as e: |
| print(f"[APP] Instagram error ({tgt}): {e}") |
| else: |
| print("[APP] Instagram dinonaktifkan β skip.") |
|
|
| |
| if _is_enabled(enable_tiktok): |
| tt_targets = _split_targets(tiktok_targets) |
| if not tt_targets: |
| print("[APP] TikTok diaktifkan tapi tidak ada target β skip.") |
| else: |
| for tgt in tt_targets: |
| print(f"[APP] Scraping TikTok: {tgt}") |
| try: |
| texts = scrape_tiktok(tiktok_cookie or "", tgt) |
| raw_texts.extend(texts) |
| print(f"[APP] TikTok @{tgt} β {len(texts)} teks") |
| except Exception as e: |
| print(f"[APP] TikTok error ({tgt}): {e}") |
| else: |
| print("[APP] TikTok dinonaktifkan β skip.") |
|
|
| |
| |
| if _is_enabled(enable_facebook): |
| fb_groups = _split_targets(facebook_groups) |
| if not fb_username or not fb_password: |
| print("[APP] Facebook diaktifkan tapi username/password kosong β skip.") |
| elif not fb_groups: |
| print("[APP] Facebook diaktifkan tapi tidak ada URL grup β skip (tidak ada default).") |
| else: |
| print(f"[APP] Scraping Facebook {len(fb_groups)} grupβ¦") |
| try: |
| texts = scrape_facebook(fb_username, fb_password, fb_groups) |
| raw_texts.extend(texts) |
| print(f"[APP] Facebook β {len(texts)} teks") |
| except Exception as e: |
| print(f"[APP] Facebook error: {e}") |
| else: |
| print("[APP] Facebook dinonaktifkan β skip.") |
|
|
| |
| if _is_enabled(enable_news): |
| portals = _split_targets(news_portals) |
| if not portals: |
| print("[APP] News diaktifkan tapi tidak ada portal dipilih β skip.") |
| else: |
| for portal in portals: |
| print(f"[APP] Scraping news: portal={portal}, keyword={news_keyword}, pages={news_pages}") |
| try: |
| texts = scrape_news(portal, news_pages, keyword=news_keyword) |
| raw_texts.extend(texts) |
| print(f"[APP] News ({portal}) β {len(texts)} teks") |
| except Exception as e: |
| print(f"[APP] News error ({portal}): {e}") |
| else: |
| print("[APP] News dinonaktifkan β skip.") |
|
|
| |
| outcome = _run_pipeline(raw_texts) |
|
|
| return templates.TemplateResponse( |
| request=request, |
| name="index.html", |
| context={ |
| "error": outcome["error"], |
| "result": outcome["result"], |
| "image": outcome["image"], |
| "total_scraped": outcome["total_scraped"], |
| "csv_filename": outcome["csv_filename"], |
| "active_tab": "scraping", |
| }, |
| ) |
|
|
|
|
| @app.post("/wordcloud-dataset", response_class=HTMLResponse) |
| async def wordcloud_dataset( |
| request: Request, |
| dataset_text: str = Form(None), |
| dataset_file: UploadFile = File(None), |
| text_column: str = Form("text"), |
| ): |
| """ |
| Word cloud + sentiment from an uploaded dataset (CSV/TXT/JSON) or pasted text. |
| """ |
| raw_texts: list = [] |
|
|
| |
| if dataset_file and dataset_file.filename: |
| fname = dataset_file.filename.lower() |
| content_bytes = await dataset_file.read() |
| try: |
| content_str = content_bytes.decode("utf-8", errors="replace") |
| except Exception: |
| content_str = content_bytes.decode("latin-1", errors="replace") |
|
|
| if fname.endswith(".csv") or fname.endswith(".tsv"): |
| delimiter = "\t" if fname.endswith(".tsv") else "," |
| reader = csv.DictReader(io.StringIO(content_str), delimiter=delimiter) |
| cols = reader.fieldnames or [] |
| for row in reader: |
| if text_column and text_column in cols and row.get(text_column): |
| raw_texts.append(str(row[text_column])) |
| else: |
| raw_texts.append(row) |
|
|
| elif fname.endswith(".json"): |
| try: |
| data = json.loads(content_str) |
| if isinstance(data, list): |
| for item in data: |
| if isinstance(item, str) and item: |
| raw_texts.append(item) |
| elif isinstance(item, dict): |
| if text_column and text_column in item and item.get(text_column): |
| raw_texts.append(str(item[text_column])) |
| else: |
| raw_texts.append(item) |
| except Exception as e: |
| print(f"[Dataset] JSON parse error: {e}") |
| else: |
| |
| for line in content_str.splitlines(): |
| line = line.strip() |
| if line: |
| raw_texts.append(line) |
|
|
| elif dataset_text and dataset_text.strip(): |
| for line in dataset_text.splitlines(): |
| line = line.strip() |
| if line: |
| raw_texts.append(line) |
|
|
| if not raw_texts: |
| return templates.TemplateResponse( |
| request=request, |
| name="index.html", |
| context={ |
| "error": "Tidak ada teks ditemukan dalam dataset. Pastikan file / teks tidak kosong.", |
| "result": None, |
| "image": None, |
| "total_scraped": 0, |
| "csv_filename": None, |
| "active_tab": "dataset", |
| }, |
| ) |
|
|
| outcome = _run_pipeline(raw_texts) |
|
|
| return templates.TemplateResponse( |
| request=request, |
| name="index.html", |
| context={ |
| "error": outcome["error"], |
| "result": outcome["result"], |
| "image": outcome["image"], |
| "total_scraped": outcome["total_scraped"], |
| "csv_filename": outcome["csv_filename"], |
| "active_tab": "dataset", |
| }, |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run(app, host="0.0.0.0", port=8000) |