File size: 3,943 Bytes
7f216ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3d1efb
7f216ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3d1efb
7f216ac
 
 
 
 
 
 
 
 
 
 
 
 
d3d1efb
7f216ac
 
 
 
 
d3d1efb
34da818
7f216ac
 
 
 
 
 
 
 
 
64342b9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7f216ac
 
 
 
 
 
 
 
 
 
 
 
 
 
d3d1efb
7f216ac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import datetime
import requests
from pathlib import Path
from typing import Optional
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from playwright.sync_api import sync_playwright

# Import Global State
from ..state import get_user_temp_dir

# --- Shared Utilities ---

def create_session(retries: int = 3, backoff_factor: float = 0.5, status_forcelist=(429, 500, 502, 503, 504)) -> requests.Session:
    # Initialize requests session with exponential backoff and retry logic
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff_factor,
        status_forcelist=status_forcelist,
        allowed_methods=frozenset(["GET", "POST", "PUT", "DELETE", "HEAD", "OPTIONS"])
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    return session

SESSION = create_session()

def short_num(n: float | int) -> str:
    # Scale large integers into abbreviated strings (K, M, B)
    try:
        n = float(n)
    except Exception:
        return str(n)
    if n >= 1_000_000_000:
        return f"{n/1_000_000_000:.2f}B"
    if n >= 1_000_000:
        return f"{n/1_000_000:.2f}M"
    if n >= 1_000:
        return f"{n/1_000:.2f}K"
    return str(round(n))

def now_str(fmt: str = "%d-%m-%Y %H:%M:%S") -> str:
    # Return current local system time in specified format
    return datetime.datetime.now().strftime(fmt)

# --- PDF Generation ---

def convert_html_to_pdf(html_content: str, user_id: str) -> Optional[Path]:
    # Render HTML string to PDF file using headless Chromium
    print("\n   Converting to PDF (Using Playwright)...")
    timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
    pdf_name = f"{timestamp}-crypto-analysis.pdf"
    
    user_dir = get_user_temp_dir(user_id)
    pdf_path = user_dir / pdf_name
        
    try:
        with sync_playwright() as p:
            browser = p.chromium.launch()
            try:
                page = browser.new_page()
                
                # Ensure styles/assets are fully loaded before printing
                page.set_content(html_content, wait_until="load")
                
                # Export to US Letter with full background rendering and 0 margins
                page.pdf(
                    path=pdf_path,
                    format="Letter",       
                    landscape=False,
                    scale=1.0,            
                    print_background=True,
                    margin={"top": "0", "bottom": "0", "left": "0", "right": "0"}
                )
            finally:
                browser.close()

        file_size = pdf_path.stat().st_size
        print(f"   PDF created: {pdf_name}")
        print(f"   Size: {file_size:,} bytes")
        print(f"   Location: {user_dir}")
        return pdf_path

    except Exception as e:
        print(f"   ❌ Playwright Error: {e}")
        return None

# --- File Cleanup ---

def cleanup_after_analysis(spot_file: Optional[Path], futures_file: Optional[Path]) -> int:
    # Delete temporary analysis files if they were modified on the current date
    files_cleaned = 0
    now_date = datetime.datetime.now().date()
    
    for file_path, file_type in [(spot_file, "spot"), (futures_file, "futures PDF")]:
        if file_path and file_path.exists():
            try:
                file_time = datetime.datetime.fromtimestamp(file_path.stat().st_mtime)
                if file_time.date() == now_date:
                    file_path.unlink()
                    print(f"   🗑️  Cleaned up {file_type} file: {file_path.name}")
                    files_cleaned += 1
            except Exception as e:
                print(f"   ⚠️  Could not remove {file_type} file: {e}")
    
    if files_cleaned > 0:
        print(f"   ✅ Cleaned up {files_cleaned} source files")
    return files_cleaned