Spaces:
Sleeping
Sleeping
File size: 5,617 Bytes
5b29309 2cd7bd9 5b29309 2cd7bd9 5b29309 5e341a9 701b0a2 5e341a9 701b0a2 5e341a9 5b29309 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | import time
import logging
from typing import List, Dict, Optional
from config import BotConfig, BASE_URL
from browser_session import BrowserSession
from auth_handler import AuthHandler
from studio_navigator import StudioNavigator
from listing_scraper import ListingScraper
from product_scraper import ProductScraper
from export_manager import ExportManager
import queue
logger = logging.getLogger("bot")
class QueueLoggingHandler(logging.Handler):
def __init__(self, q: queue.Queue):
super().__init__()
self.q = q
def emit(self, record: logging.LogRecord) -> None:
try:
msg = self.format(record)
self.q.put(msg)
except Exception:
pass
class BotRunner:
def __init__(
self,
username: str,
password: str,
studio_input: str,
min_price: float = 6.0,
log_queue: Optional[queue.Queue] = None,
stop_event=None,
):
self.username = username
self.password = password
self.studio_input = studio_input
self.min_price = min_price
self.log_queue = log_queue or queue.Queue()
self.stop_event = stop_event
def _log(self, msg: str):
try:
self.log_queue.put(msg)
except Exception:
pass
def run(self):
# attach logging handler so module logs appear in GUI
q_handler = QueueLoggingHandler(self.log_queue)
q_handler.setFormatter(logging.Formatter("%(asctime)s %(levelname)s: %(message)s"))
root_logger = logging.getLogger()
root_logger.addHandler(q_handler)
logging.getLogger("bot").addHandler(q_handler)
self._log("Phase 0 ▶ Preparing session")
config = BotConfig(
username=self.username,
password=self.password,
studio_url=self.studio_input if self.studio_input.startswith("http") else "",
min_price=self.min_price,
)
session = BrowserSession(headless=False, state_dir=config.state_dir)
records: List[Dict] = []
try:
session.start()
self._log("Phase 1 ▶ Authentication")
auth = AuthHandler(session, config.username, config.password)
if not auth.ensure_authenticated():
self._log("Authentication failed")
return
self._log("Phase 2 ▶ Studio navigation")
navigator = StudioNavigator(session)
studio_input = self.studio_input.strip()
if studio_input.startswith("http") or studio_input.startswith("/") or "dvd_search.php" in studio_input:
studio_url = navigator._make_absolute(studio_input) if not studio_input.startswith("http") else studio_input
studio_url = navigator._ensure_price_sort(studio_url)
self._log(f"Direct studio URL detected, navigating directly: {studio_url}")
if not navigator.navigate_to_studio_url(studio_url):
self._log("Could not open studio page")
return
else:
self._log(f"Studio name detected, searching directory: {studio_input}")
studio_url = navigator.find_studio_by_name(studio_input)
if not studio_url:
self._log(f"Could not find studio: {studio_input}")
return
self._log("Phase 3 ▶ Listing scan")
listing = ListingScraper(
session=session,
min_price=self.min_price,
stop_event=self.stop_event,
max_pages=10000, # Allow unlimited pages effectively
page_timeout=30, # 30 seconds per page
total_timeout=3600, # 1 hour total for entire scan
)
scraper = ProductScraper(session=session)
scraped_count = 0
self._log("Phase 4 ▶ Product details")
for idx, pinfo in enumerate(listing.iter_qualifying_products(studio_url), 1):
if self.stop_event and self.stop_event.is_set():
self._log("Stopped by user")
break
self._log(f"Scraping {idx}: {pinfo.get('title','')}")
record = scraper.scrape_product(pinfo["url"])
# Always prefer product page title (H1) over listing title
# Listing title contains extra marketing copy we don't want
if not record.get("title") and pinfo.get("title"):
record["title"] = pinfo["title"] # Fallback only
if not record.get("price") and pinfo.get("price") is not None:
record["price"] = f"${pinfo['price']:.2f}"
records.append(record)
scraped_count += 1
time.sleep(0.2)
if scraped_count == 0:
self._log("No qualifying products found")
return
self._log(f"Found {scraped_count} qualifying product(s)")
self._log("Phase 5 ▶ Export")
mgr = ExportManager(output_format="csv")
out = mgr.save(records)
if out:
self._log(f"Export completed → {out}")
except Exception as e:
logger.exception("Unexpected error in BotRunner")
self._log(f"Error: {e}")
finally:
try:
root_logger.removeHandler(q_handler)
logging.getLogger("bot").removeHandler(q_handler)
except Exception:
pass
session.stop()
|