Seth commited on
Commit
8f3dcde
·
1 Parent(s): 1695b82
backend/app/database.py CHANGED
@@ -267,6 +267,57 @@ class LinkedinCampaign(Base):
267
  executed_at = Column(DateTime, nullable=True)
268
 
269
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
  class UnipileHostedAuthState(Base):
271
  __tablename__ = "unipile_hosted_auth_states"
272
 
@@ -456,6 +507,36 @@ def run_migrations(connection_engine):
456
  if "linkedin_followup_next_email_number" not in ctcols:
457
  conn.execute(text("ALTER TABLE contacts ADD COLUMN linkedin_followup_next_email_number INTEGER"))
458
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
459
 
460
  # Create tables then migrate legacy SQLite schemas
461
  Base.metadata.create_all(bind=engine)
 
267
  executed_at = Column(DateTime, nullable=True)
268
 
269
 
270
+ class FileOutreachExecution(Base):
271
+ """Server-side orchestration for wizard email+LinkedIn sequences (per uploaded CSV file)."""
272
+
273
+ __tablename__ = "file_outreach_executions"
274
+
275
+ id = Column(Integer, primary_key=True, index=True)
276
+ tenant_id = Column(Integer, ForeignKey("tenants.id"), nullable=True, index=True)
277
+ user_id = Column(Integer, ForeignKey("users.id"), nullable=True, index=True)
278
+ file_id = Column(String, index=True, nullable=False)
279
+ status = Column(String, default="idle") # idle | running | paused | completed
280
+ progress_done = Column(Integer, default=0)
281
+ progress_total = Column(Integer, default=0)
282
+ stats_json = Column(Text, nullable=True)
283
+ cursor_json = Column(Text, nullable=True) # delivery_started_at, etc.
284
+ last_error = Column(Text, nullable=True)
285
+ linkedin_followup_interval_hours = Column(Integer, default=72)
286
+ created_at = Column(DateTime, default=datetime.utcnow)
287
+ updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
288
+
289
+ __table_args__ = (UniqueConstraint("tenant_id", "file_id", name="uq_outreach_exec_tenant_file"),)
290
+
291
+
292
+ class OutreachSendReceipt(Base):
293
+ """One row per (file, contact, sequence action slot) send attempt."""
294
+
295
+ __tablename__ = "outreach_send_receipts"
296
+
297
+ id = Column(Integer, primary_key=True, index=True)
298
+ tenant_id = Column(Integer, ForeignKey("tenants.id"), nullable=True, index=True)
299
+ file_id = Column(String, index=True, nullable=False)
300
+ contact_id = Column(Integer, ForeignKey("contacts.id"), nullable=False, index=True)
301
+ step_slot = Column(Integer, nullable=False)
302
+ channel = Column(String, nullable=False)
303
+ action = Column(String, default="")
304
+ status = Column(String, default="pending") # sent | failed | skipped
305
+ detail = Column(Text, nullable=True)
306
+ unipile_message_id = Column(String, nullable=True)
307
+ unipile_tracking_id = Column(String, nullable=True, index=True)
308
+ smtp_message_id = Column(String, nullable=True, index=True)
309
+ tracking_label = Column(String(220), nullable=True, index=True)
310
+ opened_at = Column(DateTime, nullable=True)
311
+ open_count = Column(Integer, default=0)
312
+ replied_at = Column(DateTime, nullable=True)
313
+ bounced_at = Column(DateTime, nullable=True)
314
+ created_at = Column(DateTime, default=datetime.utcnow)
315
+
316
+ __table_args__ = (
317
+ UniqueConstraint("tenant_id", "file_id", "contact_id", "step_slot", name="uq_outreach_receipt_slot"),
318
+ )
319
+
320
+
321
  class UnipileHostedAuthState(Base):
322
  __tablename__ = "unipile_hosted_auth_states"
323
 
 
507
  if "linkedin_followup_next_email_number" not in ctcols:
508
  conn.execute(text("ALTER TABLE contacts ADD COLUMN linkedin_followup_next_email_number INTEGER"))
509
 
510
+ insp = inspect(connection_engine)
511
+ if insp.has_table("file_outreach_executions"):
512
+ ecols = [c["name"] for c in insp.get_columns("file_outreach_executions")]
513
+ if "linkedin_followup_interval_hours" not in ecols:
514
+ conn.execute(
515
+ text(
516
+ "ALTER TABLE file_outreach_executions ADD COLUMN linkedin_followup_interval_hours INTEGER DEFAULT 72"
517
+ )
518
+ )
519
+
520
+ insp = inspect(connection_engine)
521
+ if insp.has_table("outreach_send_receipts"):
522
+ rcols = [c["name"] for c in insp.get_columns("outreach_send_receipts")]
523
+ if "tracking_label" not in rcols:
524
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN tracking_label TEXT"))
525
+ if "unipile_tracking_id" not in rcols:
526
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN unipile_tracking_id TEXT"))
527
+ if "smtp_message_id" not in rcols:
528
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN smtp_message_id TEXT"))
529
+ if "opened_at" not in rcols:
530
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN opened_at DATETIME"))
531
+ if "open_count" not in rcols:
532
+ conn.execute(
533
+ text("ALTER TABLE outreach_send_receipts ADD COLUMN open_count INTEGER DEFAULT 0")
534
+ )
535
+ if "replied_at" not in rcols:
536
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN replied_at DATETIME"))
537
+ if "bounced_at" not in rcols:
538
+ conn.execute(text("ALTER TABLE outreach_send_receipts ADD COLUMN bounced_at DATETIME"))
539
+
540
 
541
  # Create tables then migrate legacy SQLite schemas
542
  Base.metadata.create_all(bind=engine)
backend/app/main.py CHANGED
@@ -18,6 +18,7 @@ import json
18
  import asyncio
19
  import math
20
  import re
 
21
  import requests
22
  from urllib.parse import urlparse, unquote
23
  from datetime import datetime, timedelta
@@ -39,6 +40,7 @@ from .database import (
39
  UnipileAccount,
40
  LinkedinCampaign,
41
  UnipileHostedAuthState,
 
42
  )
43
  from pydantic import ValidationError
44
 
@@ -72,6 +74,8 @@ from .smartlead_client import SmartleadClient
72
  from .auth_routes import router as auth_router
73
  from .tenant_deps import TenantContext, get_tenant_context
74
  from .tenant_routes import router as tenant_router
 
 
75
  from .deal_revenue import build_quarterly_board
76
 
77
  app = FastAPI()
@@ -105,6 +109,7 @@ app.add_middleware(
105
 
106
  app.include_router(auth_router)
107
  app.include_router(tenant_router)
 
108
 
109
  # Create uploads directory
110
  UPLOAD_DIR = Path("/data/uploads")
@@ -113,6 +118,12 @@ UNIPILE_API_BASE = os.getenv("UNIPILE_API_BASE", "").rstrip("/")
113
  UNIPILE_API_KEY = os.getenv("UNIPILE_API_KEY", "")
114
  FRONTEND_ORIGIN = os.getenv("FRONTEND_ORIGIN", "").rstrip("/")
115
  UNIPILE_WEBHOOK_SECRET = os.getenv("UNIPILE_WEBHOOK_SECRET", "").strip()
 
 
 
 
 
 
116
 
117
  # ---- API ----
118
  def _safe_str(value):
@@ -1682,12 +1693,26 @@ async def unipile_linkedin_hosted_callback(request: Request, db: Session = Depen
1682
  return {"ok": True}
1683
 
1684
 
 
 
 
 
 
 
 
 
 
 
1685
  @app.post("/api/webhooks/unipile")
1686
- async def unipile_users_webhook(request: Request, db: Session = Depends(get_db)):
1687
  """
1688
- Receive UniPile USERS webhook events (e.g. new_relation when an invitation is accepted).
1689
- Register `request_url` in UniPile to this path (POST /api/v1/webhooks, source: users).
1690
- See: https://developer.unipile.com/docs/detecting-accepted-invitations
 
 
 
 
1691
  """
1692
  if UNIPILE_WEBHOOK_SECRET:
1693
  auth_h = request.headers.get("Unipile-Auth") or request.headers.get("X-Unipile-Auth") or ""
@@ -1700,51 +1725,215 @@ async def unipile_users_webhook(request: Request, db: Session = Depends(get_db))
1700
  raise HTTPException(status_code=400, detail="Expected JSON body")
1701
 
1702
  event = _safe_str((body or {}).get("event"))
1703
- if event != "new_relation":
1704
- return {"ok": True, "ignored": True, "reason": "unsupported_event"}
 
 
 
 
 
 
 
 
1705
 
1706
- account_id = _safe_str((body or {}).get("account_id"))
1707
- provider_id = _safe_str((body or {}).get("user_provider_id"))
1708
- if not account_id or not provider_id:
1709
- return {"ok": True, "ignored": True, "reason": "missing_ids"}
 
1710
 
1711
- ua = (
1712
- db.query(UnipileAccount)
1713
- .filter(UnipileAccount.unipile_account_id == account_id)
1714
- .first()
1715
- )
1716
- if not ua:
1717
- return {"ok": True, "ignored": True, "reason": "unknown_account"}
1718
 
1719
- now = datetime.utcnow()
1720
- contacts = (
1721
- db.query(Contact)
1722
- .filter(
1723
- Contact.tenant_id == ua.tenant_id,
1724
- Contact.unipile_provider_id == provider_id,
1725
- Contact.source == "linkedin_campaign",
 
1726
  )
1727
- .all()
1728
- )
1729
- updated = 0
1730
- for ct in contacts:
1731
- if ct.linkedin_connection_accepted_at:
1732
- continue
1733
- ct.linkedin_invite_pending = 0
1734
- ct.linkedin_connection_accepted_at = now
1735
- updated += 1
1736
- db.commit()
1737
- return {"ok": True, "updated_contacts": updated}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1738
 
1739
 
1740
  @app.get("/api/unipile/webhook-url-hint")
1741
  async def unipile_webhook_url_hint(request: Request):
1742
- """Public URL your UniPile webhook should POST to (same host as this API)."""
1743
  root = _public_origin_from_request(request)
 
1744
  return {
1745
- "post_url": f"{root}/api/webhooks/unipile",
1746
- "unipile_dashboard": "Create webhook source `users` event new_relation (default for USERS webhook).",
1747
- "optional_auth_header": "Unipile-Auth or X-Unipile-Auth matching UNIPILE_WEBHOOK_SECRET",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1748
  "latency_note": "UniPile may deliver new_relation many hours after acceptance; LinkedIn has no real-time connection API.",
1749
  }
1750
 
 
18
  import asyncio
19
  import math
20
  import re
21
+ import logging
22
  import requests
23
  from urllib.parse import urlparse, unquote
24
  from datetime import datetime, timedelta
 
40
  UnipileAccount,
41
  LinkedinCampaign,
42
  UnipileHostedAuthState,
43
+ OutreachSendReceipt,
44
  )
45
  from pydantic import ValidationError
46
 
 
74
  from .auth_routes import router as auth_router
75
  from .tenant_deps import TenantContext, get_tenant_context
76
  from .tenant_routes import router as tenant_router
77
+ from .outreach_routes import router as outreach_router
78
+ from .outreach_routes import parse_tracking_label, refresh_outreach_execution_stats_for_file, normalize_smtp_message_id
79
  from .deal_revenue import build_quarterly_board
80
 
81
  app = FastAPI()
 
109
 
110
  app.include_router(auth_router)
111
  app.include_router(tenant_router)
112
+ app.include_router(outreach_router)
113
 
114
  # Create uploads directory
115
  UPLOAD_DIR = Path("/data/uploads")
 
118
  UNIPILE_API_KEY = os.getenv("UNIPILE_API_KEY", "")
119
  FRONTEND_ORIGIN = os.getenv("FRONTEND_ORIGIN", "").rstrip("/")
120
  UNIPILE_WEBHOOK_SECRET = os.getenv("UNIPILE_WEBHOOK_SECRET", "").strip()
121
+ UNIPILE_WEBHOOK_LOG_EVENTS = os.getenv("UNIPILE_WEBHOOK_LOG_EVENTS", "").strip().lower() in (
122
+ "1",
123
+ "true",
124
+ "yes",
125
+ )
126
+ _unipile_webhook_log = logging.getLogger("emailout.unipile_webhook")
127
 
128
  # ---- API ----
129
  def _safe_str(value):
 
1693
  return {"ok": True}
1694
 
1695
 
1696
+ def _parse_unipile_webhook_datetime(raw) -> Optional[datetime]:
1697
+ if not raw or not isinstance(raw, str):
1698
+ return None
1699
+ try:
1700
+ s = raw.strip().replace("Z", "")
1701
+ return datetime.fromisoformat(s)
1702
+ except Exception:
1703
+ return None
1704
+
1705
+
1706
  @app.post("/api/webhooks/unipile")
1707
+ async def unipile_webhook(request: Request, db: Session = Depends(get_db)):
1708
  """
1709
+ UniPile webhooks (same URL for multiple UniPile registrations).
1710
+
1711
+ Supported ``event`` values:
1712
+
1713
+ * ``new_relation`` — LinkedIn invite accepted (source: users). Updates pending contacts.
1714
+ * ``mail_opened``, ``mail_link_clicked`` — email tracking (source: ``email_tracking``).
1715
+ * ``mail_received`` — mailbox email (source: ``email`` in UniPile API, “Email” / new-mail webhook in the dashboard). Replies matched via ``in_reply_to`` / ``tracking_id`` / ``label`` / RFC ``message_id``.
1716
  """
1717
  if UNIPILE_WEBHOOK_SECRET:
1718
  auth_h = request.headers.get("Unipile-Auth") or request.headers.get("X-Unipile-Auth") or ""
 
1725
  raise HTTPException(status_code=400, detail="Expected JSON body")
1726
 
1727
  event = _safe_str((body or {}).get("event"))
1728
+ if UNIPILE_WEBHOOK_LOG_EVENTS and isinstance(body, dict):
1729
+ ir = body.get("in_reply_to")
1730
+ _unipile_webhook_log.info(
1731
+ "UniPile webhook event=%s role=%s keys=%s in_reply_to_keys=%s",
1732
+ event,
1733
+ body.get("role"),
1734
+ sorted(body.keys()),
1735
+ sorted(ir.keys()) if isinstance(ir, dict) else None,
1736
+ )
1737
+ now = datetime.utcnow()
1738
 
1739
+ if event == "new_relation":
1740
+ account_id = _safe_str((body or {}).get("account_id"))
1741
+ provider_id = _safe_str((body or {}).get("user_provider_id"))
1742
+ if not account_id or not provider_id:
1743
+ return {"ok": True, "ignored": True, "reason": "missing_ids"}
1744
 
1745
+ ua = (
1746
+ db.query(UnipileAccount)
1747
+ .filter(UnipileAccount.unipile_account_id == account_id)
1748
+ .first()
1749
+ )
1750
+ if not ua:
1751
+ return {"ok": True, "ignored": True, "reason": "unknown_account"}
1752
 
1753
+ contacts = (
1754
+ db.query(Contact)
1755
+ .filter(
1756
+ Contact.tenant_id == ua.tenant_id,
1757
+ Contact.unipile_provider_id == provider_id,
1758
+ Contact.source.in_(("apollo_csv", "linkedin_campaign")),
1759
+ )
1760
+ .all()
1761
  )
1762
+ updated = 0
1763
+ for ct in contacts:
1764
+ if ct.linkedin_connection_accepted_at:
1765
+ continue
1766
+ ct.linkedin_invite_pending = 0
1767
+ ct.linkedin_connection_accepted_at = now
1768
+ updated += 1
1769
+ db.commit()
1770
+ return {"ok": True, "updated_contacts": updated, "event": event}
1771
+
1772
+ if event in ("mail_opened", "mail_link_clicked"):
1773
+ label = _safe_str((body or {}).get("label"))
1774
+ parsed = parse_tracking_label(label)
1775
+ if not parsed:
1776
+ return {"ok": True, "ignored": True, "reason": "no_tracking_label"}
1777
+ tenant_id, file_id, contact_id, step_slot = parsed
1778
+ rec = (
1779
+ db.query(OutreachSendReceipt)
1780
+ .filter(
1781
+ OutreachSendReceipt.tenant_id == tenant_id,
1782
+ OutreachSendReceipt.file_id == file_id,
1783
+ OutreachSendReceipt.contact_id == contact_id,
1784
+ OutreachSendReceipt.step_slot == step_slot,
1785
+ OutreachSendReceipt.channel == "gmail",
1786
+ )
1787
+ .first()
1788
+ )
1789
+ if not rec:
1790
+ return {"ok": True, "ignored": True, "reason": "receipt_not_found"}
1791
+ when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now
1792
+ if not rec.opened_at:
1793
+ rec.opened_at = when
1794
+ try:
1795
+ rec.open_count = int(rec.open_count or 0) + 1
1796
+ except (TypeError, ValueError):
1797
+ rec.open_count = 1
1798
+ db.commit()
1799
+ refresh_outreach_execution_stats_for_file(db, tenant_id, file_id)
1800
+ db.commit()
1801
+ return {"ok": True, "event": event, "updated_receipt_id": rec.id}
1802
+
1803
+ if event == "mail_received":
1804
+ account_id = _safe_str((body or {}).get("account_id"))
1805
+ role = _safe_str((body or {}).get("role"))
1806
+ if role != "inbox":
1807
+ return {"ok": True, "ignored": True, "reason": "not_inbox"}
1808
+
1809
+ ua = (
1810
+ db.query(UnipileAccount)
1811
+ .filter(UnipileAccount.unipile_account_id == account_id)
1812
+ .first()
1813
+ )
1814
+ if not ua:
1815
+ return {"ok": True, "ignored": True, "reason": "unknown_account"}
1816
+
1817
+ tenant_id = int(ua.tenant_id)
1818
+ in_reply = (body or {}).get("in_reply_to") or {}
1819
+ parent_id = _safe_str(in_reply.get("id"))
1820
+ parent_rfc = normalize_smtp_message_id(_safe_str(in_reply.get("message_id")))
1821
+ trk = _safe_str((body or {}).get("tracking_id"))
1822
+ label = _safe_str((body or {}).get("label"))
1823
+
1824
+ rec = None
1825
+ if parent_id:
1826
+ rec = (
1827
+ db.query(OutreachSendReceipt)
1828
+ .filter(
1829
+ OutreachSendReceipt.tenant_id == tenant_id,
1830
+ OutreachSendReceipt.channel == "gmail",
1831
+ OutreachSendReceipt.unipile_message_id == parent_id,
1832
+ )
1833
+ .first()
1834
+ )
1835
+ if not rec and parent_rfc:
1836
+ rec = (
1837
+ db.query(OutreachSendReceipt)
1838
+ .filter(
1839
+ OutreachSendReceipt.tenant_id == tenant_id,
1840
+ OutreachSendReceipt.channel == "gmail",
1841
+ OutreachSendReceipt.smtp_message_id == parent_rfc,
1842
+ )
1843
+ .first()
1844
+ )
1845
+ if not rec and trk:
1846
+ rec = (
1847
+ db.query(OutreachSendReceipt)
1848
+ .filter(
1849
+ OutreachSendReceipt.tenant_id == tenant_id,
1850
+ OutreachSendReceipt.channel == "gmail",
1851
+ or_(
1852
+ OutreachSendReceipt.unipile_tracking_id == trk,
1853
+ OutreachSendReceipt.unipile_message_id == trk,
1854
+ ),
1855
+ )
1856
+ .first()
1857
+ )
1858
+ if not rec and label:
1859
+ p2 = parse_tracking_label(label)
1860
+ if p2:
1861
+ tid2, fid2, cid2, slot2 = p2
1862
+ if tid2 == tenant_id:
1863
+ rec = (
1864
+ db.query(OutreachSendReceipt)
1865
+ .filter(
1866
+ OutreachSendReceipt.tenant_id == tid2,
1867
+ OutreachSendReceipt.file_id == fid2,
1868
+ OutreachSendReceipt.contact_id == cid2,
1869
+ OutreachSendReceipt.step_slot == slot2,
1870
+ OutreachSendReceipt.channel == "gmail",
1871
+ )
1872
+ .first()
1873
+ )
1874
+
1875
+ folders = (body or {}).get("folders") or []
1876
+ subj = (_safe_str((body or {}).get("subject")) or "").lower()
1877
+ bounce_like = any("bounce" in str(f).lower() for f in folders if f) or "undeliverable" in subj
1878
+ has_reply_parent = bool(in_reply and (in_reply.get("id") or in_reply.get("message_id")))
1879
+
1880
+ if rec:
1881
+ when = _parse_unipile_webhook_datetime((body or {}).get("date")) or now
1882
+ if bounce_like and not rec.bounced_at:
1883
+ rec.bounced_at = when
1884
+ elif has_reply_parent and not bounce_like and not rec.replied_at:
1885
+ rec.replied_at = when
1886
+ db.commit()
1887
+ refresh_outreach_execution_stats_for_file(db, rec.tenant_id, rec.file_id)
1888
+ db.commit()
1889
+ return {"ok": True, "event": event, "updated_receipt_id": rec.id}
1890
+
1891
+ return {"ok": True, "ignored": True, "reason": "no_matching_receipt"}
1892
+
1893
+ return {"ok": True, "ignored": True, "reason": "unsupported_event", "event": event or ""}
1894
 
1895
 
1896
  @app.get("/api/unipile/webhook-url-hint")
1897
  async def unipile_webhook_url_hint(request: Request):
1898
+ """Public URL your UniPile webhooks should POST to (same host as this API)."""
1899
  root = _public_origin_from_request(request)
1900
+ post_url = f"{root}/api/webhooks/unipile"
1901
  return {
1902
+ "post_url": post_url,
1903
+ "auth_header": "Set Unipile-Auth or X-Unipile-Auth to UNIPILE_WEBHOOK_SECRET on each webhook registration.",
1904
+ "registrations": [
1905
+ {
1906
+ "source": "users",
1907
+ "events": ["new_relation"],
1908
+ "doc": "https://developer.unipile.com/docs/detecting-accepted-invitations",
1909
+ },
1910
+ {
1911
+ "source": "email_tracking",
1912
+ "events": ["mail_opened", "mail_link_clicked"],
1913
+ "doc": "https://developer.unipile.com/docs/tracking-email",
1914
+ },
1915
+ {
1916
+ "source": "email",
1917
+ "events": ["mail_received", "mail_sent", "mail_moved"],
1918
+ "doc": "https://developer.unipile.com/docs/new-emails-webhook",
1919
+ "dashboard_hint": "Create an **Email** (mailbox) webhook in the UniPile UI — not the **Messaging** (chat) webhook. Use the same callback URL.",
1920
+ "note": "Reply attribution uses mail_received with role inbox and in_reply_to / tracking_id / label; enable mail_received for connected mail accounts.",
1921
+ },
1922
+ {
1923
+ "source": "messaging",
1924
+ "events": [
1925
+ "message_received",
1926
+ "message_read",
1927
+ "message_reaction",
1928
+ "message_edited",
1929
+ "message_deleted",
1930
+ "message_delivered",
1931
+ ],
1932
+ "doc": "https://developer.unipile.com/docs/new-messages-webhook",
1933
+ "note": "LinkedIn/chat channels — not used for Gmail mail_received.",
1934
+ },
1935
+ ],
1936
+ "debug_logging": "Set UNIPILE_WEBHOOK_LOG_EVENTS=1 to log each webhook's event name, role, and JSON keys (no body content).",
1937
  "latency_note": "UniPile may deliver new_relation many hours after acceptance; LinkedIn has no real-time connection API.",
1938
  }
1939
 
backend/app/outreach_routes.py ADDED
@@ -0,0 +1,907 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Orchestrate sending generated email + LinkedIn sequence content via UniPile for wizard campaigns (per file_id).
3
+
4
+ Progress and receipts are persisted so the dashboard can poll execution state while a campaign is running.
5
+ """
6
+
7
+ from __future__ import annotations
8
+
9
+ import json
10
+ import os
11
+ from datetime import datetime, timedelta
12
+ from typing import Any, Dict, List, Optional, Tuple
13
+
14
+ import requests
15
+ from fastapi import APIRouter, Depends, HTTPException, Query
16
+ from sqlalchemy import func, or_
17
+ from sqlalchemy.orm import Session
18
+ from urllib.parse import urlparse, unquote
19
+
20
+ from .database import (
21
+ Contact,
22
+ FileOutreachExecution,
23
+ GeneratedSequence,
24
+ OutreachSendReceipt,
25
+ UnipileAccount,
26
+ UploadedFile,
27
+ User,
28
+ )
29
+ from .tenant_deps import TenantContext, get_tenant_context
30
+
31
+ router = APIRouter(prefix="/api/campaign-files", tags=["campaign-files"])
32
+
33
+
34
+ def _safe_str(value: Any) -> str:
35
+ if value is None:
36
+ return ""
37
+ return str(value).strip()
38
+
39
+
40
+ def _pick_from_raw(raw_data: Dict, aliases: List[str]) -> str:
41
+ if not isinstance(raw_data, dict):
42
+ return ""
43
+ lowered = {str(k).strip().lower(): v for k, v in raw_data.items()}
44
+ for alias in aliases:
45
+ if alias in lowered:
46
+ return _safe_str(lowered[alias])
47
+ return ""
48
+
49
+
50
+ def _pick_linkedin_url(raw_data: Dict) -> str:
51
+ return _pick_from_raw(
52
+ raw_data,
53
+ [
54
+ "linkedin",
55
+ "linkedin url",
56
+ "linkedin profile",
57
+ "person linkedin url",
58
+ "person linkedin profile url",
59
+ "linkedin profile url",
60
+ "linkedin public url",
61
+ "linkedin_url",
62
+ "linkedin profile link",
63
+ ],
64
+ )
65
+
66
+
67
+ def _normalize_linkedin_url(raw: str) -> str:
68
+ s = _safe_str(raw)
69
+ if not s:
70
+ return ""
71
+ if s.startswith("http://"):
72
+ s = "https://" + s[len("http://") :]
73
+ return s
74
+
75
+
76
+ def _linkedin_public_identifier(raw: str) -> str:
77
+ s = _normalize_linkedin_url(raw).strip()
78
+ if not s:
79
+ return ""
80
+ low = s.lower()
81
+ if "linkedin.com" in low:
82
+ try:
83
+ path = urlparse(s).path or ""
84
+ except Exception:
85
+ path = ""
86
+ path = unquote(path).strip("/")
87
+ parts = [p for p in path.split("/") if p]
88
+ for i, seg in enumerate(parts):
89
+ if seg.lower() == "in" and i + 1 < len(parts):
90
+ return parts[i + 1].split("?")[0].strip()
91
+ if seg.lower() == "sales" and i + 2 < len(parts) and parts[i + 1].lower() == "lead":
92
+ return parts[i + 2].split("?")[0].strip()
93
+ return ""
94
+ return s.split("?")[0].strip("/").strip()
95
+
96
+
97
+ def _unipile_env() -> Tuple[str, str]:
98
+ base = os.getenv("UNIPILE_API_BASE", "").rstrip("/")
99
+ key = os.getenv("UNIPILE_API_KEY", "")
100
+ return base, key
101
+
102
+
103
+ def _unipile_post_json(path: str, payload: dict) -> dict:
104
+ base, key = _unipile_env()
105
+ if not base or not key:
106
+ raise HTTPException(status_code=400, detail="UniPile is not configured (UNIPILE_API_BASE / UNIPILE_API_KEY).")
107
+ url = f"{base}{path}"
108
+ resp = requests.post(
109
+ url,
110
+ headers={
111
+ "X-API-KEY": key,
112
+ "accept": "application/json",
113
+ "content-type": "application/json",
114
+ },
115
+ json=payload,
116
+ timeout=60,
117
+ )
118
+ try:
119
+ data = resp.json()
120
+ except Exception:
121
+ data = {"raw": resp.text}
122
+ if resp.status_code >= 400:
123
+ msg = None
124
+ if isinstance(data, dict):
125
+ msg = data.get("message") or data.get("detail") or data.get("error")
126
+ raise HTTPException(
127
+ status_code=502,
128
+ detail=msg or f"UniPile error ({resp.status_code})",
129
+ )
130
+ return data if isinstance(data, dict) else {}
131
+
132
+
133
+ def _unipile_call(method: str, path: str, payload: Optional[dict] = None) -> Tuple[int, Any]:
134
+ base, key = _unipile_env()
135
+ if not base or not key:
136
+ raise HTTPException(status_code=400, detail="UniPile is not configured.")
137
+ url = f"{base}{path}"
138
+ try:
139
+ resp = requests.request(
140
+ method=method.upper(),
141
+ url=url,
142
+ headers={
143
+ "X-API-KEY": key,
144
+ "accept": "application/json",
145
+ "content-type": "application/json",
146
+ },
147
+ json=payload,
148
+ timeout=60,
149
+ )
150
+ except requests.RequestException as e:
151
+ raise HTTPException(status_code=502, detail=f"UniPile request failed: {e}") from e
152
+ try:
153
+ data = resp.json()
154
+ except Exception:
155
+ data = {"raw": resp.text}
156
+ return resp.status_code, data
157
+
158
+
159
+ def _normalize_plan_steps(raw: Any) -> List[dict]:
160
+ if raw is None:
161
+ return []
162
+ if isinstance(raw, dict) and isinstance(raw.get("steps"), list):
163
+ return [x for x in raw["steps"] if isinstance(x, dict)]
164
+ if isinstance(raw, list):
165
+ return [x for x in raw if isinstance(x, dict)]
166
+ return []
167
+
168
+
169
+ def build_ordered_actions(steps: List[dict]) -> List[dict]:
170
+ """Flatten sequence plan to sendable actions with global step_order (matches GeneratedSequence.step_order).
171
+
172
+ ``wait_days_before`` is the sum of wizard ``wait`` steps immediately preceding this action (calendar days
173
+ after the previous slot completes before this slot may send).
174
+ """
175
+ out: List[dict] = []
176
+ ord_global = 0
177
+ wait_accum = 0
178
+ for s in steps:
179
+ if s.get("type") == "wait":
180
+ try:
181
+ d = int(s.get("days") if s.get("days") is not None else 1)
182
+ except (TypeError, ValueError):
183
+ d = 1
184
+ wait_accum += max(0, min(d, 365))
185
+ continue
186
+ if s.get("type") != "action":
187
+ continue
188
+ ord_global += 1
189
+ ch = (s.get("channel") or "").strip()
190
+ act = (s.get("action") or "").strip()
191
+ ref = s.get("unipile_account_ref_id")
192
+ try:
193
+ ref_i = int(ref) if ref is not None and str(ref).strip() != "" else None
194
+ except (TypeError, ValueError):
195
+ ref_i = None
196
+ out.append(
197
+ {
198
+ "order": ord_global,
199
+ "channel": ch,
200
+ "action": act,
201
+ "account_ref_id": ref_i,
202
+ "wait_days_before": int(wait_accum),
203
+ }
204
+ )
205
+ wait_accum = 0
206
+ return out
207
+
208
+
209
+ def compose_tracking_label(tenant_id: int, file_id: str, contact_id: int, step_slot: int) -> str:
210
+ """Stable label for UniPile tracking + webhook attribution (pipe-separated, no pipes in file_id assumed)."""
211
+ raw = f"eo|{tenant_id}|{file_id}|{contact_id}|{step_slot}"
212
+ return raw[:200]
213
+
214
+
215
+ def parse_tracking_label(label: str) -> Optional[Tuple[int, str, int, int]]:
216
+ if not label or not label.startswith("eo|"):
217
+ return None
218
+ parts = label.split("|")
219
+ if len(parts) != 5:
220
+ return None
221
+ try:
222
+ return int(parts[1]), parts[2], int(parts[3]), int(parts[4])
223
+ except (TypeError, ValueError):
224
+ return None
225
+
226
+
227
+ def normalize_smtp_message_id(raw: Optional[str]) -> str:
228
+ """Normalize RFC 5322 Message-ID for comparison (strip angle brackets, lower case)."""
229
+ s = _safe_str(raw)
230
+ if not s:
231
+ return ""
232
+ s = s.strip()
233
+ if len(s) >= 2 and s[0] == "<" and s[-1] == ">":
234
+ s = s[1:-1].strip()
235
+ return s.lower()
236
+
237
+
238
+ def _delivery_started_at_naive(ex: FileOutreachExecution) -> Optional[datetime]:
239
+ if not ex.cursor_json:
240
+ return None
241
+ try:
242
+ cur = json.loads(ex.cursor_json)
243
+ raw = cur.get("delivery_started_at")
244
+ if not raw or not isinstance(raw, str):
245
+ return None
246
+ s = raw.strip().replace("Z", "")
247
+ return datetime.fromisoformat(s)
248
+ except Exception:
249
+ return None
250
+
251
+
252
+ def touch_delivery_anchor(ex: FileOutreachExecution, *, reset_run: bool) -> None:
253
+ cur: Dict[str, Any] = {}
254
+ if ex.cursor_json:
255
+ try:
256
+ cur = json.loads(ex.cursor_json)
257
+ except Exception:
258
+ cur = {}
259
+ if reset_run or not cur.get("delivery_started_at"):
260
+ cur["delivery_started_at"] = datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
261
+ ex.cursor_json = json.dumps(cur)
262
+
263
+
264
+ def _wait_eligible_for_slot(
265
+ ex: FileOutreachExecution,
266
+ act: dict,
267
+ slot: int,
268
+ prev_rec: Optional[OutreachSendReceipt],
269
+ now: datetime,
270
+ ) -> bool:
271
+ try:
272
+ wd = int(act.get("wait_days_before") or 0)
273
+ except (TypeError, ValueError):
274
+ wd = 0
275
+ if wd <= 0:
276
+ return True
277
+ if slot <= 0:
278
+ anchor = _delivery_started_at_naive(ex) or now
279
+ return now >= anchor + timedelta(days=wd)
280
+ if not prev_rec or not prev_rec.created_at:
281
+ return False
282
+ return now >= prev_rec.created_at + timedelta(days=wd)
283
+
284
+
285
+ def _linkedin_dm_interval_eligible(contact: Contact, interval_h: int, now: datetime) -> bool:
286
+ if contact.linkedin_invite_pending and not contact.linkedin_connection_accepted_at:
287
+ return False
288
+ h = max(1, min(int(interval_h or 72), 720))
289
+ if contact.linkedin_connection_accepted_at:
290
+ if contact.linkedin_last_followup_sent_at:
291
+ return now >= contact.linkedin_last_followup_sent_at + timedelta(hours=h)
292
+ return now >= contact.linkedin_connection_accepted_at + timedelta(hours=h)
293
+ if contact.linkedin_last_followup_sent_at:
294
+ return now >= contact.linkedin_last_followup_sent_at + timedelta(hours=h)
295
+ return True
296
+
297
+
298
+ def _resolve_unipile_row(db: Session, tenant_id: int, user_id: int, ref_id: Optional[int]) -> Optional[UnipileAccount]:
299
+ if ref_id is None:
300
+ return None
301
+ return (
302
+ db.query(UnipileAccount)
303
+ .filter(
304
+ UnipileAccount.id == int(ref_id),
305
+ UnipileAccount.tenant_id == tenant_id,
306
+ UnipileAccount.user_id == user_id,
307
+ )
308
+ .first()
309
+ )
310
+
311
+
312
+ def _send_unipile_email(
313
+ account_uid: str,
314
+ to_email: str,
315
+ display_name: str,
316
+ subject: str,
317
+ body: str,
318
+ tracking_label: str,
319
+ ) -> Dict[str, Optional[str]]:
320
+ payload = {
321
+ "account_id": account_uid,
322
+ "subject": subject or "(no subject)",
323
+ "body": body or "",
324
+ "to": [{"identifier": to_email, "display_name": display_name or to_email}],
325
+ "tracking_options": {"opens": True, "links": True, "label": tracking_label[:200]},
326
+ }
327
+ data = _unipile_post_json("/api/v1/emails", payload)
328
+ mid = data.get("id") or data.get("email_id")
329
+ trk = data.get("tracking_id")
330
+ rfc = data.get("message_id") or data.get("internet_message_id") or data.get("smtp_message_id")
331
+ return {
332
+ "message_id": _safe_str(mid) if mid else None,
333
+ "tracking_id": _safe_str(trk) if trk else None,
334
+ "smtp_message_id": _safe_str(rfc) if rfc else None,
335
+ }
336
+
337
+
338
+ def _send_unipile_dm(account_id: str, provider_id: str, text: str) -> None:
339
+ chat = _unipile_post_json(
340
+ "/api/v1/chats",
341
+ {"account_id": account_id, "attendees": [provider_id]},
342
+ )
343
+ chat_id = _safe_str(chat.get("id") if isinstance(chat, dict) else "")
344
+ if not chat_id:
345
+ raise ValueError("UniPile chat id not found")
346
+ _unipile_post_json(
347
+ f"/api/v1/chats/{requests.utils.quote(chat_id, safe='')}/messages",
348
+ {"text": text or ""},
349
+ )
350
+
351
+
352
+ def _linkedin_invite_note(text: str) -> str:
353
+ t = _safe_str(text).replace("\r\n", "\n").strip()
354
+ if len(t) <= 280:
355
+ return t
356
+ return t[:277].rstrip() + "..."
357
+
358
+
359
+ def _aggregate_stats(db: Session, tenant_id: int, file_id: str) -> Dict[str, Any]:
360
+ rows = (
361
+ db.query(OutreachSendReceipt.status, func.count(OutreachSendReceipt.id))
362
+ .filter(OutreachSendReceipt.tenant_id == tenant_id, OutreachSendReceipt.file_id == file_id)
363
+ .group_by(OutreachSendReceipt.status)
364
+ .all()
365
+ )
366
+ counts = {st: int(n or 0) for st, n in rows}
367
+ sent = int(counts.get("sent", 0))
368
+ failed = int(counts.get("failed", 0))
369
+ skipped = int(counts.get("skipped", 0))
370
+ total_done = sent + failed + skipped
371
+
372
+ sent_gmail = (
373
+ db.query(func.count(OutreachSendReceipt.id))
374
+ .filter(
375
+ OutreachSendReceipt.tenant_id == tenant_id,
376
+ OutreachSendReceipt.file_id == file_id,
377
+ OutreachSendReceipt.channel == "gmail",
378
+ OutreachSendReceipt.status == "sent",
379
+ )
380
+ .scalar()
381
+ or 0
382
+ )
383
+ opens = (
384
+ db.query(func.count(OutreachSendReceipt.id))
385
+ .filter(
386
+ OutreachSendReceipt.tenant_id == tenant_id,
387
+ OutreachSendReceipt.file_id == file_id,
388
+ OutreachSendReceipt.channel == "gmail",
389
+ OutreachSendReceipt.status == "sent",
390
+ or_(OutreachSendReceipt.opened_at.isnot(None), func.coalesce(OutreachSendReceipt.open_count, 0) > 0),
391
+ )
392
+ .scalar()
393
+ or 0
394
+ )
395
+ replies = (
396
+ db.query(func.count(OutreachSendReceipt.id))
397
+ .filter(
398
+ OutreachSendReceipt.tenant_id == tenant_id,
399
+ OutreachSendReceipt.file_id == file_id,
400
+ OutreachSendReceipt.channel == "gmail",
401
+ OutreachSendReceipt.status == "sent",
402
+ OutreachSendReceipt.replied_at.isnot(None),
403
+ )
404
+ .scalar()
405
+ or 0
406
+ )
407
+ bounces = (
408
+ db.query(func.count(OutreachSendReceipt.id))
409
+ .filter(
410
+ OutreachSendReceipt.tenant_id == tenant_id,
411
+ OutreachSendReceipt.file_id == file_id,
412
+ OutreachSendReceipt.channel == "gmail",
413
+ OutreachSendReceipt.status == "sent",
414
+ OutreachSendReceipt.bounced_at.isnot(None),
415
+ )
416
+ .scalar()
417
+ or 0
418
+ )
419
+
420
+ open_rate = None
421
+ reply_rate = None
422
+ if sent_gmail and int(sent_gmail) > 0:
423
+ open_rate = round(100.0 * float(opens) / float(sent_gmail), 1)
424
+ reply_rate = round(100.0 * float(replies) / float(sent_gmail), 1)
425
+
426
+ return {
427
+ "sent": sent,
428
+ "failed": failed,
429
+ "skipped": skipped,
430
+ "total_dispatched": total_done,
431
+ "opens": int(opens),
432
+ "replies": int(replies),
433
+ "bounces": int(bounces),
434
+ "sent_gmail": int(sent_gmail),
435
+ "open_rate": open_rate,
436
+ "reply_rate": reply_rate,
437
+ }
438
+
439
+
440
+ def refresh_outreach_execution_stats_for_file(db: Session, tenant_id: int, file_id: str) -> None:
441
+ """Recompute stats_json on the execution row (e.g. after UniPile webhooks update receipts)."""
442
+ ex = (
443
+ db.query(FileOutreachExecution)
444
+ .filter(FileOutreachExecution.tenant_id == tenant_id, FileOutreachExecution.file_id == file_id)
445
+ .first()
446
+ )
447
+ if not ex:
448
+ return
449
+ ex.stats_json = json.dumps(_aggregate_stats(db, tenant_id, file_id))
450
+ ex.updated_at = datetime.utcnow()
451
+
452
+
453
+ def _get_or_create_execution(db: Session, tenant_id: int, user_id: int, file_id: str) -> FileOutreachExecution:
454
+ row = (
455
+ db.query(FileOutreachExecution)
456
+ .filter(FileOutreachExecution.tenant_id == tenant_id, FileOutreachExecution.file_id == file_id)
457
+ .first()
458
+ )
459
+ if row:
460
+ return row
461
+ row = FileOutreachExecution(
462
+ tenant_id=tenant_id,
463
+ user_id=user_id,
464
+ file_id=file_id,
465
+ status="idle",
466
+ progress_done=0,
467
+ progress_total=0,
468
+ )
469
+ db.add(row)
470
+ db.commit()
471
+ db.refresh(row)
472
+ return row
473
+
474
+
475
+ def _receipt_for(db: Session, tenant_id: int, file_id: str, contact_id: int, step_slot: int) -> Optional[OutreachSendReceipt]:
476
+ return (
477
+ db.query(OutreachSendReceipt)
478
+ .filter(
479
+ OutreachSendReceipt.tenant_id == tenant_id,
480
+ OutreachSendReceipt.file_id == file_id,
481
+ OutreachSendReceipt.contact_id == contact_id,
482
+ OutreachSendReceipt.step_slot == step_slot,
483
+ )
484
+ .first()
485
+ )
486
+
487
+
488
+ def _find_next_send(
489
+ db: Session,
490
+ tenant_id: int,
491
+ file_id: str,
492
+ contacts: List[Contact],
493
+ actions: List[dict],
494
+ ex: FileOutreachExecution,
495
+ ) -> Optional[Tuple[Contact, int, dict]]:
496
+ n_actions = len(actions)
497
+ if not contacts or not actions:
498
+ return None
499
+ now = datetime.utcnow()
500
+ try:
501
+ li_h = int(ex.linkedin_followup_interval_hours or 72)
502
+ except (TypeError, ValueError):
503
+ li_h = 72
504
+ for c in contacts:
505
+ for slot in range(n_actions):
506
+ if _receipt_for(db, tenant_id, file_id, c.id, slot):
507
+ continue
508
+ ok = True
509
+ prev_rec: Optional[OutreachSendReceipt] = None
510
+ for ps in range(slot):
511
+ pr = _receipt_for(db, tenant_id, file_id, c.id, ps)
512
+ if not pr or pr.status not in ("sent", "failed", "skipped"):
513
+ ok = False
514
+ break
515
+ if ps == slot - 1:
516
+ prev_rec = pr
517
+ if not ok:
518
+ continue
519
+ act = actions[slot]
520
+ if not _wait_eligible_for_slot(ex, act, slot, prev_rec, now):
521
+ continue
522
+ if act.get("channel") == "linkedin" and act.get("action") == "linkedin_dm":
523
+ if not _linkedin_dm_interval_eligible(c, li_h, now):
524
+ continue
525
+ return (c, slot, act)
526
+ return None
527
+
528
+
529
+ def _fetch_generated(
530
+ db: Session, tenant_id: int, file_id: str, row_index: int, channel: str, step_order: int
531
+ ) -> Optional[GeneratedSequence]:
532
+ return (
533
+ db.query(GeneratedSequence)
534
+ .filter(
535
+ GeneratedSequence.tenant_id == tenant_id,
536
+ GeneratedSequence.file_id == file_id,
537
+ GeneratedSequence.sequence_id == row_index,
538
+ GeneratedSequence.channel == channel,
539
+ GeneratedSequence.step_order == step_order,
540
+ )
541
+ .first()
542
+ )
543
+
544
+
545
+ def _execute_one_send(
546
+ db: Session,
547
+ tc: TenantContext,
548
+ uf: UploadedFile,
549
+ user_row: User,
550
+ contact: Contact,
551
+ slot: int,
552
+ act: dict,
553
+ ) -> OutreachSendReceipt:
554
+ tenant_id = int(tc.tenant_id)
555
+ user_id = int(tc.user_id)
556
+ file_id = uf.file_id
557
+ order = int(act["order"])
558
+ channel = act["channel"]
559
+ action = act["action"]
560
+ ref_id = act.get("account_ref_id")
561
+
562
+ rec = OutreachSendReceipt(
563
+ tenant_id=tenant_id,
564
+ file_id=file_id,
565
+ contact_id=contact.id,
566
+ step_slot=slot,
567
+ channel=channel,
568
+ action=action,
569
+ status="pending",
570
+ )
571
+ db.add(rec)
572
+ db.flush()
573
+
574
+ gen = _fetch_generated(db, tenant_id, file_id, int(contact.row_index or 0), channel, order)
575
+ if not gen:
576
+ rec.status = "failed"
577
+ rec.detail = "No generated content for this step (run generation first)."
578
+ return rec
579
+
580
+ try:
581
+ if channel == "gmail" and action == "email":
582
+ mb_ref = ref_id if ref_id is not None else getattr(user_row, "default_mailbox_unipile_account_ref_id", None)
583
+ acc = _resolve_unipile_row(db, tenant_id, user_id, int(mb_ref) if mb_ref is not None else None)
584
+ if not acc or not acc.unipile_account_id:
585
+ rec.status = "failed"
586
+ rec.detail = "No connected mailbox for this step."
587
+ return rec
588
+ to_email = _safe_str(gen.email or contact.email)
589
+ if not to_email or "@" not in to_email:
590
+ rec.status = "skipped"
591
+ rec.detail = "Missing recipient email."
592
+ return rec
593
+ label = compose_tracking_label(tenant_id, file_id, contact.id, slot)
594
+ rec.tracking_label = label
595
+ display = " ".join(
596
+ p for p in [_safe_str(gen.first_name or contact.first_name), _safe_str(gen.last_name or contact.last_name)] if p
597
+ ).strip()
598
+ send_ids = _send_unipile_email(
599
+ acc.unipile_account_id,
600
+ to_email,
601
+ display or to_email,
602
+ gen.subject or "",
603
+ gen.email_content or "",
604
+ label,
605
+ )
606
+ rec.unipile_message_id = send_ids.get("message_id") or send_ids.get("tracking_id")
607
+ rec.unipile_tracking_id = send_ids.get("tracking_id") or None
608
+ smtp_raw = send_ids.get("smtp_message_id")
609
+ rec.smtp_message_id = normalize_smtp_message_id(smtp_raw) if smtp_raw else None
610
+ rec.status = "sent"
611
+ return rec
612
+
613
+ if channel == "linkedin" and action in ("linkedin_connect", "linkedin_dm"):
614
+ li_ref = ref_id if ref_id is not None else getattr(user_row, "default_unipile_account_ref_id", None)
615
+ li_acc = _resolve_unipile_row(db, tenant_id, user_id, int(li_ref) if li_ref is not None else None)
616
+ if not li_acc or not li_acc.unipile_account_id:
617
+ rec.status = "failed"
618
+ rec.detail = "No connected LinkedIn account for this step."
619
+ return rec
620
+ acc_uid = li_acc.unipile_account_id
621
+ raw_li = _pick_linkedin_url(contact.raw_data or {})
622
+ slug = _linkedin_public_identifier(raw_li)
623
+ if not slug:
624
+ rec.status = "skipped"
625
+ rec.detail = "No LinkedIn profile URL on contact."
626
+ return rec
627
+
628
+ slug_enc = requests.utils.quote(slug, safe="")
629
+ acc_enc = requests.utils.quote(acc_uid, safe="")
630
+ st_prof, profile = _unipile_call("GET", f"/api/v1/users/{slug_enc}?account_id={acc_enc}")
631
+ if st_prof >= 400 or not isinstance(profile, dict):
632
+ rec.status = "failed"
633
+ rec.detail = "UniPile profile lookup failed."
634
+ return rec
635
+ provider_id = _safe_str(profile.get("provider_id"))
636
+ if not provider_id:
637
+ rec.status = "failed"
638
+ rec.detail = "UniPile profile had no provider_id."
639
+ return rec
640
+
641
+ if action == "linkedin_connect":
642
+ note = _linkedin_invite_note(gen.email_content or "")
643
+ st_inv, inv_res = _unipile_call(
644
+ "POST",
645
+ "/api/v1/users/invite",
646
+ {"provider_id": provider_id, "account_id": acc_uid, "message": note},
647
+ )
648
+ if st_inv < 400:
649
+ contact.unipile_provider_id = provider_id
650
+ contact.linkedin_invite_pending = 1
651
+ contact.linkedin_invite_sent_at = datetime.utcnow()
652
+ contact.linkedin_followup_next_email_number = 2
653
+ rec.status = "sent"
654
+ return rec
655
+ inv_err = ""
656
+ if isinstance(inv_res, dict):
657
+ inv_err = _safe_str(
658
+ inv_res.get("message") or inv_res.get("detail") or inv_res.get("error") or ""
659
+ )
660
+ try:
661
+ _send_unipile_dm(acc_uid, provider_id, gen.email_content or "")
662
+ contact.unipile_provider_id = provider_id
663
+ contact.linkedin_invite_pending = 0
664
+ contact.linkedin_connection_accepted_at = datetime.utcnow()
665
+ contact.linkedin_last_followup_sent_at = datetime.utcnow()
666
+ contact.linkedin_followup_next_email_number = 2
667
+ rec.status = "sent"
668
+ rec.detail = f"invite_failed_dm_sent: {inv_err[:200]}"
669
+ return rec
670
+ except Exception as e:
671
+ rec.status = "failed"
672
+ rec.detail = f"invite: {inv_err}; dm: {e}"
673
+ return rec
674
+
675
+ # linkedin_dm
676
+ pid = _safe_str(contact.unipile_provider_id) or provider_id
677
+ if not pid:
678
+ rec.status = "skipped"
679
+ rec.detail = "No provider_id for DM (connection may not be accepted yet)."
680
+ return rec
681
+ _send_unipile_dm(acc_uid, pid, gen.email_content or "")
682
+ rec.status = "sent"
683
+ contact.linkedin_last_followup_sent_at = datetime.utcnow()
684
+ return rec
685
+
686
+ rec.status = "skipped"
687
+ rec.detail = f"Unsupported action {channel}/{action}"
688
+ return rec
689
+ except HTTPException as e:
690
+ rec.status = "failed"
691
+ rec.detail = _safe_str(e.detail)[:2000]
692
+ return rec
693
+ except Exception as e:
694
+ rec.status = "failed"
695
+ rec.detail = str(e)[:2000]
696
+ return rec
697
+
698
+
699
+ def _refresh_execution_counters(db: Session, ex: FileOutreachExecution, tenant_id: int, file_id: str, total: int) -> None:
700
+ stats = _aggregate_stats(db, tenant_id, file_id)
701
+ done = int(stats["total_dispatched"])
702
+ ex.progress_done = min(done, total)
703
+ ex.progress_total = total
704
+ ex.stats_json = json.dumps(stats)
705
+ if total > 0 and ex.progress_done >= total:
706
+ ex.status = "completed"
707
+ ex.updated_at = datetime.utcnow()
708
+
709
+
710
+ @router.get("/{file_id}/execution")
711
+ def get_campaign_execution(file_id: str, tc: TenantContext = Depends(get_tenant_context)):
712
+ db = tc.db
713
+ uf = (
714
+ db.query(UploadedFile)
715
+ .filter(UploadedFile.tenant_id == tc.tenant_id, UploadedFile.file_id == file_id)
716
+ .first()
717
+ )
718
+ if not uf:
719
+ raise HTTPException(status_code=404, detail="File not found")
720
+ ex = _get_or_create_execution(db, int(tc.tenant_id), int(tc.user_id), file_id)
721
+ stats = json.loads(ex.stats_json) if ex.stats_json else _aggregate_stats(db, int(tc.tenant_id), file_id)
722
+ pct = 0
723
+ if ex.progress_total and ex.progress_total > 0:
724
+ pct = min(100, max(0, round(100 * (ex.progress_done or 0) / ex.progress_total)))
725
+ return {
726
+ "file_id": file_id,
727
+ "status": ex.status,
728
+ "progress_done": ex.progress_done,
729
+ "progress_total": ex.progress_total,
730
+ "progress_percent": pct,
731
+ "stats": stats,
732
+ "last_error": ex.last_error,
733
+ }
734
+
735
+
736
+ @router.post("/{file_id}/execution/start")
737
+ def start_campaign_execution(
738
+ file_id: str,
739
+ reset: bool = False,
740
+ linkedin_followup_hours: Optional[int] = Query(None, ge=1, le=720),
741
+ tc: TenantContext = Depends(get_tenant_context),
742
+ ):
743
+ db = tc.db
744
+ uf = (
745
+ db.query(UploadedFile)
746
+ .filter(UploadedFile.tenant_id == tc.tenant_id, UploadedFile.file_id == file_id)
747
+ .first()
748
+ )
749
+ if not uf:
750
+ raise HTTPException(status_code=404, detail="File not found")
751
+
752
+ raw_plan = getattr(uf, "sequence_plan_json", None)
753
+ steps: List[dict] = []
754
+ if raw_plan and str(raw_plan).strip():
755
+ try:
756
+ steps = _normalize_plan_steps(json.loads(raw_plan))
757
+ except Exception:
758
+ steps = []
759
+ actions = build_ordered_actions(steps)
760
+ contacts = (
761
+ db.query(Contact)
762
+ .filter(
763
+ Contact.tenant_id == tc.tenant_id,
764
+ Contact.file_id == file_id,
765
+ Contact.source == "apollo_csv",
766
+ )
767
+ .order_by(Contact.row_index.asc(), Contact.id.asc())
768
+ .all()
769
+ )
770
+ if not actions:
771
+ raise HTTPException(status_code=400, detail="No sendable sequence steps on this file. Save a sequence in the campaign wizard.")
772
+ if not contacts:
773
+ raise HTTPException(status_code=400, detail="No contacts for this file.")
774
+
775
+ gen_n = (
776
+ db.query(func.count(GeneratedSequence.id))
777
+ .filter(
778
+ GeneratedSequence.tenant_id == tc.tenant_id,
779
+ GeneratedSequence.file_id == file_id,
780
+ )
781
+ .scalar()
782
+ or 0
783
+ )
784
+ if int(gen_n) == 0:
785
+ raise HTTPException(status_code=400, detail="Generate sequences before starting the campaign.")
786
+
787
+ tenant_id = int(tc.tenant_id)
788
+ user_id = int(tc.user_id)
789
+
790
+ ex = _get_or_create_execution(db, tenant_id, user_id, file_id)
791
+
792
+ if ex.status == "running" and reset:
793
+ raise HTTPException(status_code=400, detail="Pause the campaign before restarting delivery (reset).")
794
+
795
+ if ex.status == "running" and not reset:
796
+ return get_campaign_execution(file_id, tc)
797
+ if ex.status == "paused" and not reset:
798
+ total = ex.progress_total or len(contacts) * len(actions)
799
+ ex.status = "running"
800
+ ex.progress_total = total
801
+ ex.last_error = None
802
+ if linkedin_followup_hours is not None:
803
+ ex.linkedin_followup_interval_hours = int(linkedin_followup_hours)
804
+ ex.updated_at = datetime.utcnow()
805
+ db.commit()
806
+ return get_campaign_execution(file_id, tc)
807
+
808
+ if ex.status == "completed" and not reset:
809
+ return get_campaign_execution(file_id, tc)
810
+
811
+ if reset:
812
+ db.query(OutreachSendReceipt).filter(
813
+ OutreachSendReceipt.tenant_id == tenant_id,
814
+ OutreachSendReceipt.file_id == file_id,
815
+ ).delete()
816
+
817
+ db.commit()
818
+
819
+ total = len(contacts) * len(actions)
820
+ ex.status = "running"
821
+ ex.progress_total = total
822
+ ex.progress_done = 0
823
+ ex.last_error = None
824
+ touch_delivery_anchor(ex, reset_run=bool(reset))
825
+ if linkedin_followup_hours is not None:
826
+ ex.linkedin_followup_interval_hours = int(linkedin_followup_hours)
827
+ ex.stats_json = json.dumps(_aggregate_stats(db, tenant_id, file_id))
828
+ ex.updated_at = datetime.utcnow()
829
+ db.commit()
830
+ return get_campaign_execution(file_id, tc)
831
+
832
+
833
+ @router.post("/{file_id}/execution/pause")
834
+ def pause_campaign_execution(file_id: str, tc: TenantContext = Depends(get_tenant_context)):
835
+ db = tc.db
836
+ ex = (
837
+ db.query(FileOutreachExecution)
838
+ .filter(FileOutreachExecution.tenant_id == tc.tenant_id, FileOutreachExecution.file_id == file_id)
839
+ .first()
840
+ )
841
+ if not ex:
842
+ raise HTTPException(status_code=404, detail="No execution state for this file")
843
+ if ex.status == "running":
844
+ ex.status = "paused"
845
+ ex.updated_at = datetime.utcnow()
846
+ db.commit()
847
+ return get_campaign_execution(file_id, tc)
848
+
849
+
850
+ @router.post("/{file_id}/execution/tick")
851
+ def tick_campaign_execution(
852
+ file_id: str,
853
+ limit: int = Query(3, ge=1, le=25),
854
+ tc: TenantContext = Depends(get_tenant_context),
855
+ ):
856
+ db = tc.db
857
+ uf = (
858
+ db.query(UploadedFile)
859
+ .filter(UploadedFile.tenant_id == tc.tenant_id, UploadedFile.file_id == file_id)
860
+ .first()
861
+ )
862
+ if not uf:
863
+ raise HTTPException(status_code=404, detail="File not found")
864
+
865
+ ex = _get_or_create_execution(db, int(tc.tenant_id), int(tc.user_id), file_id)
866
+ if ex.status != "running":
867
+ return get_campaign_execution(file_id, tc)
868
+
869
+ steps = []
870
+ raw = getattr(uf, "sequence_plan_json", None)
871
+ if raw and str(raw).strip():
872
+ try:
873
+ steps = _normalize_plan_steps(json.loads(raw))
874
+ except Exception:
875
+ steps = []
876
+ actions = build_ordered_actions(steps)
877
+ contacts = (
878
+ db.query(Contact)
879
+ .filter(
880
+ Contact.tenant_id == tc.tenant_id,
881
+ Contact.file_id == file_id,
882
+ Contact.source == "apollo_csv",
883
+ )
884
+ .order_by(Contact.row_index.asc(), Contact.id.asc())
885
+ .all()
886
+ )
887
+ user_row = db.query(User).filter(User.id == tc.user_id).first()
888
+ if not user_row or not actions or not contacts:
889
+ ex.status = "failed"
890
+ ex.last_error = "Missing user, actions, or contacts."
891
+ db.commit()
892
+ return get_campaign_execution(file_id, tc)
893
+
894
+ n = max(1, min(int(limit or 3), 25))
895
+ for _ in range(n):
896
+ nxt = _find_next_send(db, int(tc.tenant_id), file_id, contacts, actions, ex)
897
+ if not nxt:
898
+ break
899
+ c, slot, act = nxt
900
+ _execute_one_send(db, tc, uf, user_row, c, slot, act)
901
+ db.commit()
902
+ _refresh_execution_counters(db, ex, int(tc.tenant_id), file_id, ex.progress_total or len(contacts) * len(actions))
903
+ db.commit()
904
+
905
+ _refresh_execution_counters(db, ex, int(tc.tenant_id), file_id, ex.progress_total or len(contacts) * len(actions))
906
+ db.commit()
907
+ return get_campaign_execution(file_id, tc)
frontend/src/components/campaigns/CampaignsDashboardTab.jsx CHANGED
@@ -9,6 +9,7 @@ import {
9
  ExternalLink,
10
  Archive,
11
  ArchiveRestore,
 
12
  } from 'lucide-react';
13
  import { Button } from '@/components/ui/button';
14
  import CreateCampaignWizard from '@/components/campaigns/CreateCampaignWizard';
@@ -84,6 +85,12 @@ function normalizeCampaign(c) {
84
  Math.round(c.generationProgressPercent ?? (genDone ? 100 : hasFile ? 0 : 100))
85
  ),
86
  sequence: Array.isArray(c.sequence) ? c.sequence : [],
 
 
 
 
 
 
87
  };
88
  }
89
 
@@ -151,7 +158,7 @@ function statusLabel(status) {
151
  }
152
 
153
  /** Circular ring; hover shows native tooltip with percent (title). */
154
- function CampaignGenerationRing({ percent, complete, fileId, compact = false }) {
155
  if (!fileId || complete) return null;
156
  const p = Math.min(100, Math.max(0, percent));
157
  const r = compact ? 14 : 17;
@@ -160,10 +167,17 @@ function CampaignGenerationRing({ percent, complete, fileId, compact = false })
160
  const cx = size / 2;
161
  const c = 2 * Math.PI * r;
162
  const offset = c * (1 - p / 100);
 
 
 
 
 
 
 
163
  return (
164
  <div
165
  className={cn('relative shrink-0 cursor-default', compact ? 'h-9 w-9' : 'h-11 w-11')}
166
- title={`Content generation: ${Math.round(p)}%`}
167
  >
168
  <svg
169
  width={size}
@@ -178,7 +192,7 @@ function CampaignGenerationRing({ percent, complete, fileId, compact = false })
178
  cy={cx}
179
  r={r}
180
  fill="none"
181
- stroke="#7c3aed"
182
  strokeWidth={stroke}
183
  strokeLinecap="round"
184
  strokeDasharray={c}
@@ -188,7 +202,8 @@ function CampaignGenerationRing({ percent, complete, fileId, compact = false })
188
  </svg>
189
  <span
190
  className={cn(
191
- 'pointer-events-none absolute inset-0 flex items-center justify-center gap-0 px-0.5 text-center font-bold tabular-nums leading-none text-violet-800',
 
192
  compact ? 'text-[9px]' : 'text-[10px]'
193
  )}
194
  aria-hidden
@@ -277,6 +292,70 @@ export default function CampaignsDashboardTab() {
277
  return () => clearInterval(iv);
278
  }, []);
279
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
280
  const tabCounts = useMemo(() => {
281
  const active = campaigns.filter((c) => !c.archived);
282
  const archived = campaigns.filter((c) => c.archived);
@@ -347,7 +426,23 @@ export default function CampaignsDashboardTab() {
347
 
348
  const closeBulkMenu = useCallback(() => setBulkMenuOpen(false), []);
349
 
350
- const bulkResume = useCallback(() => {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
351
  setCampaigns((prev) =>
352
  prev.map((c) =>
353
  selectedSet.has(c.id) && c.status === 'paused' ? { ...c, status: 'running' } : c
@@ -356,7 +451,19 @@ export default function CampaignsDashboardTab() {
356
  closeBulkMenu();
357
  }, [selectedSet, closeBulkMenu]);
358
 
359
- const bulkPause = useCallback(() => {
 
 
 
 
 
 
 
 
 
 
 
 
360
  setCampaigns((prev) =>
361
  prev.map((c) =>
362
  selectedSet.has(c.id) && c.status === 'running' ? { ...c, status: 'paused' } : c
@@ -398,12 +505,68 @@ export default function CampaignsDashboardTab() {
398
  closeBulkMenu();
399
  }, [selectedIds, closeBulkMenu]);
400
 
401
- const toggleStatus = useCallback((id) => {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
402
  setCampaigns((prev) =>
403
  prev.map((c) => {
404
  if (c.id !== id) return c;
405
- if (c.status === 'running') return { ...c, status: 'paused' };
406
- if (c.status === 'paused') return { ...c, status: 'running' };
 
 
 
 
407
  return c;
408
  })
409
  );
@@ -415,6 +578,68 @@ export default function CampaignsDashboardTab() {
415
  setSelectedIds((prev) => prev.filter((x) => x !== id));
416
  }, []);
417
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
418
  const onWizardComplete = useCallback((payload) => {
419
  const contacts = payload.contacts || 0;
420
  const id = payload.campaignId || uid();
@@ -458,6 +683,37 @@ export default function CampaignsDashboardTab() {
458
  return [row, ...prev];
459
  });
460
  setWizardCampaign(null);
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
461
  }, []);
462
 
463
  const onWizardDraftPersist = useCallback((payload) => {
@@ -722,6 +978,24 @@ export default function CampaignsDashboardTab() {
722
  const contentPct = c.generationComplete
723
  ? '100%'
724
  : `${Math.round(c.generationProgressPercent ?? 0)}%`;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
725
  return (
726
  <tr
727
  key={c.id}
@@ -739,26 +1013,42 @@ export default function CampaignsDashboardTab() {
739
  <td className="max-w-[360px] px-3 py-3 align-middle">
740
  <div className="flex items-start gap-3">
741
  <div className="mt-0.5 flex h-9 w-9 shrink-0 items-center justify-center">
742
- {c.fileId && !c.generationComplete ? (
 
 
 
 
 
 
 
743
  <CampaignGenerationRing
744
  percent={c.generationProgressPercent ?? 0}
745
  complete={false}
746
  fileId={c.fileId}
747
  compact
748
  />
749
- ) : c.fileId && c.generationComplete ? (
 
 
 
 
 
 
 
 
 
750
  <span
751
- className="flex h-9 w-9 items-center justify-center rounded-full bg-emerald-50 text-[10px] font-bold text-emerald-700 ring-1 ring-emerald-200"
752
- title="Content generation complete"
753
  >
754
  100%
755
  </span>
756
  ) : (
757
  <span
758
- className="flex h-9 w-9 items-center justify-center rounded-full bg-slate-100 text-[10px] font-medium text-slate-400"
759
- title="No upload"
760
  >
761
-
762
  </span>
763
  )}
764
  </div>
@@ -802,10 +1092,14 @@ export default function CampaignsDashboardTab() {
802
  {contentPct}
803
  </td>
804
  <td className="px-3 py-3 text-right tabular-nums text-slate-500">
805
-
806
  </td>
807
  <td className="px-3 py-3 text-right tabular-nums text-fuchsia-700">
808
- {c.openRate != null ? `— / ${c.openRate}%` : '—'}
 
 
 
 
809
  </td>
810
  <td className="px-3 py-3 text-right tabular-nums text-amber-700">
811
 
@@ -870,6 +1164,16 @@ export default function CampaignsDashboardTab() {
870
  >
871
  Open in wizard
872
  </button>
 
 
 
 
 
 
 
 
 
 
873
  <button
874
  type="button"
875
  className="block w-full px-3 py-2 text-left text-red-600 hover:bg-red-50"
 
9
  ExternalLink,
10
  Archive,
11
  ArchiveRestore,
12
+ RotateCcw,
13
  } from 'lucide-react';
14
  import { Button } from '@/components/ui/button';
15
  import CreateCampaignWizard from '@/components/campaigns/CreateCampaignWizard';
 
85
  Math.round(c.generationProgressPercent ?? (genDone ? 100 : hasFile ? 0 : 100))
86
  ),
87
  sequence: Array.isArray(c.sequence) ? c.sequence : [],
88
+ executionProgressPercent: Math.min(
89
+ 100,
90
+ Math.round(Number(c.executionProgressPercent) || 0)
91
+ ),
92
+ executionDeliveryStatus: c.executionDeliveryStatus || 'idle',
93
+ deliveryStats: c.deliveryStats && typeof c.deliveryStats === 'object' ? c.deliveryStats : null,
94
  };
95
  }
96
 
 
158
  }
159
 
160
  /** Circular ring; hover shows native tooltip with percent (title). */
161
+ function CampaignGenerationRing({ percent, complete, fileId, compact = false, variant = 'generation' }) {
162
  if (!fileId || complete) return null;
163
  const p = Math.min(100, Math.max(0, percent));
164
  const r = compact ? 14 : 17;
 
167
  const cx = size / 2;
168
  const c = 2 * Math.PI * r;
169
  const offset = c * (1 - p / 100);
170
+ const strokeColor = variant === 'delivery' ? '#0369a1' : '#7c3aed';
171
+ const title =
172
+ variant === 'delivery' ? `Delivery: ${Math.round(p)}%` : `Content generation: ${Math.round(p)}%`;
173
+ const textClass =
174
+ variant === 'delivery'
175
+ ? 'text-sky-900'
176
+ : 'text-violet-800';
177
  return (
178
  <div
179
  className={cn('relative shrink-0 cursor-default', compact ? 'h-9 w-9' : 'h-11 w-11')}
180
+ title={title}
181
  >
182
  <svg
183
  width={size}
 
192
  cy={cx}
193
  r={r}
194
  fill="none"
195
+ stroke={strokeColor}
196
  strokeWidth={stroke}
197
  strokeLinecap="round"
198
  strokeDasharray={c}
 
202
  </svg>
203
  <span
204
  className={cn(
205
+ 'pointer-events-none absolute inset-0 flex items-center justify-center gap-0 px-0.5 text-center font-bold tabular-nums leading-none',
206
+ textClass,
207
  compact ? 'text-[9px]' : 'text-[10px]'
208
  )}
209
  aria-hidden
 
292
  return () => clearInterval(iv);
293
  }, []);
294
 
295
+ useEffect(() => {
296
+ const pollDelivery = async () => {
297
+ const list = campaignsRef.current.filter(
298
+ (c) =>
299
+ c.fileId &&
300
+ c.generationComplete &&
301
+ !c.archived &&
302
+ (c.status === 'running' ||
303
+ c.status === 'paused' ||
304
+ c.executionDeliveryStatus === 'completed')
305
+ );
306
+ if (!list.length) return;
307
+ const updates = await Promise.all(
308
+ list.map(async (c) => {
309
+ try {
310
+ if (c.status === 'running' && c.executionDeliveryStatus !== 'completed') {
311
+ await apiFetch(
312
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/tick?limit=4`,
313
+ { method: 'POST' }
314
+ );
315
+ }
316
+ const res = await apiFetch(
317
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution`
318
+ );
319
+ if (!res.ok) return { id: c.id, skip: true };
320
+ const d = await res.json();
321
+ return {
322
+ id: c.id,
323
+ skip: false,
324
+ executionProgressPercent: d.progress_percent ?? 0,
325
+ executionDeliveryStatus: d.status || 'idle',
326
+ deliveryStats: d.stats || null,
327
+ openRate:
328
+ d.stats?.open_rate != null ? Math.round(Number(d.stats.open_rate)) : c.openRate,
329
+ replyRate:
330
+ d.stats?.reply_rate != null
331
+ ? Math.round(Number(d.stats.reply_rate))
332
+ : c.replyRate,
333
+ };
334
+ } catch {
335
+ return { id: c.id, skip: true };
336
+ }
337
+ })
338
+ );
339
+ setCampaigns((prev) =>
340
+ prev.map((c) => {
341
+ const u = updates.find((x) => x.id === c.id);
342
+ if (!u || u.skip) return c;
343
+ return {
344
+ ...c,
345
+ executionProgressPercent: u.executionProgressPercent,
346
+ executionDeliveryStatus: u.executionDeliveryStatus,
347
+ deliveryStats: u.deliveryStats,
348
+ openRate: u.openRate ?? c.openRate,
349
+ replyRate: u.replyRate ?? c.replyRate,
350
+ };
351
+ })
352
+ );
353
+ };
354
+ const iv = setInterval(pollDelivery, 6000);
355
+ pollDelivery();
356
+ return () => clearInterval(iv);
357
+ }, []);
358
+
359
  const tabCounts = useMemo(() => {
360
  const active = campaigns.filter((c) => !c.archived);
361
  const archived = campaigns.filter((c) => c.archived);
 
426
 
427
  const closeBulkMenu = useCallback(() => setBulkMenuOpen(false), []);
428
 
429
+ const bulkResume = useCallback(async () => {
430
+ const targets = campaignsRef.current.filter(
431
+ (c) => selectedSet.has(c.id) && c.status === 'paused' && c.fileId && c.generationComplete
432
+ );
433
+ for (const c of targets) {
434
+ try {
435
+ await apiFetch(`/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/start`, {
436
+ method: 'POST',
437
+ });
438
+ await apiFetch(
439
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/tick?limit=5`,
440
+ { method: 'POST' }
441
+ );
442
+ } catch (e) {
443
+ console.error(e);
444
+ }
445
+ }
446
  setCampaigns((prev) =>
447
  prev.map((c) =>
448
  selectedSet.has(c.id) && c.status === 'paused' ? { ...c, status: 'running' } : c
 
451
  closeBulkMenu();
452
  }, [selectedSet, closeBulkMenu]);
453
 
454
+ const bulkPause = useCallback(async () => {
455
+ const targets = campaignsRef.current.filter(
456
+ (c) => selectedSet.has(c.id) && c.status === 'running' && c.fileId && c.generationComplete
457
+ );
458
+ for (const c of targets) {
459
+ try {
460
+ await apiFetch(`/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/pause`, {
461
+ method: 'POST',
462
+ });
463
+ } catch (e) {
464
+ console.error(e);
465
+ }
466
+ }
467
  setCampaigns((prev) =>
468
  prev.map((c) =>
469
  selectedSet.has(c.id) && c.status === 'running' ? { ...c, status: 'paused' } : c
 
505
  closeBulkMenu();
506
  }, [selectedIds, closeBulkMenu]);
507
 
508
+ const toggleStatus = useCallback(async (id) => {
509
+ const cur = campaignsRef.current.find((c) => c.id === id);
510
+ if (!cur) return;
511
+
512
+ if (!cur.fileId || !cur.generationComplete) {
513
+ setCampaigns((prev) =>
514
+ prev.map((c) => {
515
+ if (c.id !== id) return c;
516
+ if (c.status === 'running') return { ...c, status: 'paused' };
517
+ if (c.status === 'paused') return { ...c, status: 'running' };
518
+ if (c.status === 'draft') return { ...c, status: 'running' };
519
+ return c;
520
+ })
521
+ );
522
+ return;
523
+ }
524
+
525
+ if (cur.status === 'running') {
526
+ try {
527
+ const pr = await apiFetch(
528
+ `/api/campaign-files/${encodeURIComponent(cur.fileId)}/execution/pause`,
529
+ { method: 'POST' }
530
+ );
531
+ if (!pr.ok) {
532
+ const err = await pr.json().catch(() => ({}));
533
+ alert(typeof err.detail === 'string' ? err.detail : 'Could not pause campaign send');
534
+ return;
535
+ }
536
+ } catch (e) {
537
+ alert(String(e));
538
+ return;
539
+ }
540
+ } else {
541
+ try {
542
+ const st = await apiFetch(
543
+ `/api/campaign-files/${encodeURIComponent(cur.fileId)}/execution/start`,
544
+ { method: 'POST' }
545
+ );
546
+ if (!st.ok) {
547
+ const err = await st.json().catch(() => ({}));
548
+ alert(typeof err.detail === 'string' ? err.detail : 'Could not start campaign send');
549
+ return;
550
+ }
551
+ await apiFetch(
552
+ `/api/campaign-files/${encodeURIComponent(cur.fileId)}/execution/tick?limit=5`,
553
+ { method: 'POST' }
554
+ );
555
+ } catch (e) {
556
+ alert(String(e));
557
+ return;
558
+ }
559
+ }
560
+
561
  setCampaigns((prev) =>
562
  prev.map((c) => {
563
  if (c.id !== id) return c;
564
+ if (c.status === 'running')
565
+ return { ...c, status: 'paused', executionDeliveryStatus: 'paused' };
566
+ if (c.status === 'paused')
567
+ return { ...c, status: 'running', executionDeliveryStatus: 'running' };
568
+ if (c.status === 'draft')
569
+ return { ...c, status: 'running', executionDeliveryStatus: 'running' };
570
  return c;
571
  })
572
  );
 
578
  setSelectedIds((prev) => prev.filter((x) => x !== id));
579
  }, []);
580
 
581
+ const restartDelivery = useCallback(async (c) => {
582
+ if (!c.fileId || !c.generationComplete) return;
583
+ if (
584
+ !window.confirm(
585
+ 'Restart delivery from scratch? All send receipts for this campaign will be cleared and delivery will run again from step one.'
586
+ )
587
+ ) {
588
+ setMenuOpenId(null);
589
+ return;
590
+ }
591
+ setMenuOpenId(null);
592
+ try {
593
+ if (c.status === 'running') {
594
+ const pr = await apiFetch(
595
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/pause`,
596
+ { method: 'POST' }
597
+ );
598
+ if (!pr.ok) {
599
+ const err = await pr.json().catch(() => ({}));
600
+ alert(typeof err.detail === 'string' ? err.detail : 'Pause the campaign before restarting.');
601
+ return;
602
+ }
603
+ }
604
+ const st = await apiFetch(
605
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/start?reset=1`,
606
+ { method: 'POST' }
607
+ );
608
+ if (!st.ok) {
609
+ const err = await st.json().catch(() => ({}));
610
+ alert(typeof err.detail === 'string' ? err.detail : 'Could not restart delivery');
611
+ return;
612
+ }
613
+ await apiFetch(
614
+ `/api/campaign-files/${encodeURIComponent(c.fileId)}/execution/tick?limit=5`,
615
+ { method: 'POST' }
616
+ );
617
+ const r = await apiFetch(`/api/campaign-files/${encodeURIComponent(c.fileId)}/execution`);
618
+ const d = r.ok ? await r.json().catch(() => ({})) : {};
619
+ setCampaigns((prev) =>
620
+ prev.map((row) =>
621
+ row.id === c.id
622
+ ? normalizeCampaign({
623
+ ...row,
624
+ status: 'running',
625
+ executionProgressPercent: d.progress_percent ?? 0,
626
+ executionDeliveryStatus: d.status || 'running',
627
+ deliveryStats: d.stats || null,
628
+ openRate:
629
+ d.stats?.open_rate != null ? Math.round(Number(d.stats.open_rate)) : row.openRate,
630
+ replyRate:
631
+ d.stats?.reply_rate != null
632
+ ? Math.round(Number(d.stats.reply_rate))
633
+ : row.replyRate,
634
+ })
635
+ : row
636
+ )
637
+ );
638
+ } catch (e) {
639
+ alert(String(e));
640
+ }
641
+ }, []);
642
+
643
  const onWizardComplete = useCallback((payload) => {
644
  const contacts = payload.contacts || 0;
645
  const id = payload.campaignId || uid();
 
683
  return [row, ...prev];
684
  });
685
  setWizardCampaign(null);
686
+ if (payload.fileId && payload.generationComplete) {
687
+ const fid = payload.fileId;
688
+ void (async () => {
689
+ try {
690
+ const st = await apiFetch(`/api/campaign-files/${encodeURIComponent(fid)}/execution/start`, {
691
+ method: 'POST',
692
+ });
693
+ if (!st.ok) return;
694
+ await apiFetch(`/api/campaign-files/${encodeURIComponent(fid)}/execution/tick?limit=5`, {
695
+ method: 'POST',
696
+ });
697
+ const r = await apiFetch(`/api/campaign-files/${encodeURIComponent(fid)}/execution`);
698
+ if (!r.ok) return;
699
+ const d = await r.json();
700
+ setCampaigns((prev) =>
701
+ prev.map((row) =>
702
+ row.id === id
703
+ ? normalizeCampaign({
704
+ ...row,
705
+ executionProgressPercent: d.progress_percent ?? 0,
706
+ executionDeliveryStatus: d.status || 'running',
707
+ deliveryStats: d.stats || null,
708
+ })
709
+ : row
710
+ )
711
+ );
712
+ } catch (e) {
713
+ console.error(e);
714
+ }
715
+ })();
716
+ }
717
  }, []);
718
 
719
  const onWizardDraftPersist = useCallback((payload) => {
 
978
  const contentPct = c.generationComplete
979
  ? '100%'
980
  : `${Math.round(c.generationProgressPercent ?? 0)}%`;
981
+ const execStatus = c.executionDeliveryStatus || 'idle';
982
+ const deliveryPct = Math.round(c.executionProgressPercent ?? 0);
983
+ const deliveryDone =
984
+ execStatus === 'completed' ||
985
+ (deliveryPct >= 100 && execStatus === 'running');
986
+ const showDeliveryRing =
987
+ !!c.fileId &&
988
+ c.generationComplete &&
989
+ (c.status === 'running' || c.status === 'paused') &&
990
+ !deliveryDone;
991
+ const sentCount =
992
+ c.deliveryStats && typeof c.deliveryStats.sent === 'number'
993
+ ? c.deliveryStats.sent
994
+ : null;
995
+ const opensCount =
996
+ c.deliveryStats && typeof c.deliveryStats.opens === 'number'
997
+ ? c.deliveryStats.opens
998
+ : null;
999
  return (
1000
  <tr
1001
  key={c.id}
 
1013
  <td className="max-w-[360px] px-3 py-3 align-middle">
1014
  <div className="flex items-start gap-3">
1015
  <div className="mt-0.5 flex h-9 w-9 shrink-0 items-center justify-center">
1016
+ {!c.fileId ? (
1017
+ <span
1018
+ className="flex h-9 w-9 items-center justify-center rounded-full bg-slate-100 text-[10px] font-medium text-slate-400"
1019
+ title="No upload"
1020
+ >
1021
+
1022
+ </span>
1023
+ ) : !c.generationComplete ? (
1024
  <CampaignGenerationRing
1025
  percent={c.generationProgressPercent ?? 0}
1026
  complete={false}
1027
  fileId={c.fileId}
1028
  compact
1029
  />
1030
+ ) : showDeliveryRing ? (
1031
+ <CampaignGenerationRing
1032
+ variant="delivery"
1033
+ percent={deliveryPct}
1034
+ complete={false}
1035
+ fileId={c.fileId}
1036
+ compact
1037
+ />
1038
+ ) : deliveryDone &&
1039
+ (c.status === 'running' || c.status === 'paused') ? (
1040
  <span
1041
+ className="flex h-9 w-9 items-center justify-center rounded-full bg-sky-50 text-[10px] font-bold text-sky-800 ring-1 ring-sky-200"
1042
+ title="Delivery complete"
1043
  >
1044
  100%
1045
  </span>
1046
  ) : (
1047
  <span
1048
+ className="flex h-9 w-9 items-center justify-center rounded-full bg-emerald-50 text-[10px] font-bold text-emerald-700 ring-1 ring-emerald-200"
1049
+ title="Content generation complete"
1050
  >
1051
+ 100%
1052
  </span>
1053
  )}
1054
  </div>
 
1092
  {contentPct}
1093
  </td>
1094
  <td className="px-3 py-3 text-right tabular-nums text-slate-500">
1095
+ {sentCount != null ? sentCount.toLocaleString() : ''}
1096
  </td>
1097
  <td className="px-3 py-3 text-right tabular-nums text-fuchsia-700">
1098
+ {opensCount != null && opensCount > 0
1099
+ ? `${opensCount}`
1100
+ : c.openRate != null
1101
+ ? `— / ${c.openRate}%`
1102
+ : '—'}
1103
  </td>
1104
  <td className="px-3 py-3 text-right tabular-nums text-amber-700">
1105
 
 
1164
  >
1165
  Open in wizard
1166
  </button>
1167
+ {c.fileId && c.generationComplete ? (
1168
+ <button
1169
+ type="button"
1170
+ className="flex w-full items-center gap-2 px-3 py-2 text-left hover:bg-slate-50"
1171
+ onClick={() => restartDelivery(c)}
1172
+ >
1173
+ <RotateCcw className="h-3.5 w-3.5 shrink-0 text-slate-500" />
1174
+ Restart delivery
1175
+ </button>
1176
+ ) : null}
1177
  <button
1178
  type="button"
1179
  className="block w-full px-3 py-2 text-left text-red-600 hover:bg-red-50"