File size: 5,007 Bytes
a2cbcac
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import asyncio
import sys
from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Tuple

# Add the project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))

# Import workflows
from src.workflows.workflow import run_analysis
from src.tools.daily_csv_tool import save_workflow_to_symbol_csv


def get_trading_dates(start_date: str, end_date: str) -> List[str]:
    """Generate list of trading dates (excluding weekends)."""
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    
    dates = []
    current = start
    while current <= end:
        # Skip weekends (Monday=0, Sunday=6)
        if current.weekday() < 5:  # Monday to Friday
            dates.append(current.strftime("%Y-%m-%d"))
        current += timedelta(days=1)
    
    return dates


def print_workflow_summary(result: dict, date: str) -> None:
    """Print summary of workflow results for a specific date."""
    if not result.get('success'):
        print(f"{date} - Analysis failed: {result.get('error', 'Unknown error')}")
        return
        
    print(f"{date} - Analysis completed successfully")


def _prompt(prompt: str) -> str:
    try:
        return input(prompt)
    except EOFError:
        return ""


def prompt_symbol_and_dates() -> Tuple[List[str], str, str]:
    """Prompt the user (in English) for a stock symbol and date range.

    Returns:
        (symbols_list, start_date, end_date)
    """
    # Symbol
    while True:
        sym = _prompt("Enter a stock symbol (e.g., AAPL): ").strip().upper()
        if sym:
            break
        print("Symbol cannot be empty. Example: AAPL")

    # Date parsing helper
    def parse_date(label: str) -> str:
        while True:
            value = _prompt(
                f"Enter {label} date in YYYY-MM-DD format (e.g., 2025-05-28): "
            ).strip()
            try:
                datetime.strptime(value, "%Y-%m-%d")
                return value
            except ValueError:
                print("Invalid date format. Expected YYYY-MM-DD, e.g., 2025-05-28")

    start_date = parse_date("start")
    end_date = parse_date("end")

    # Validate ordering
    while datetime.strptime(start_date, "%Y-%m-%d") > datetime.strptime(end_date, "%Y-%m-%d"):
        print("Start date is after end date. Please re-enter.")
        start_date = parse_date("start")
        end_date = parse_date("end")

    return [sym], start_date, end_date


async def main():
    """Main execution with interactive symbol and date input."""
    try:
        print("PrimoAgent - AI Financial Analysis Platform")
        print("=" * 60)
        
        # Interactive input
        symbols, start_date, end_date = prompt_symbol_and_dates()
        print(f"Symbols: {', '.join(symbols)}")
        print(f"Analysis Period: {start_date} to {end_date}")
        
        # Get trading dates
        trading_dates = get_trading_dates(start_date, end_date)
        print(f"Trading Days: {len(trading_dates)}")
        print(f"Starting workflow execution...")

        if not trading_dates:
            print("No trading days in the selected date range.")
            return
        
        successful_runs = 0
        failed_runs = 0
        
        # Process each trading day
        for i, date in enumerate(trading_dates, 1):
            print(f"\n{'='*60}")
            print(f"Day {i}/{len(trading_dates)}: {date}")
            print(f"{'='*60}")
            
            # Format session ID
            date_formatted = date.replace("-", "_")
            session_id = f"daily_analysis_{date_formatted}"
            
            try:
                # Execute workflow for this date
                result = await run_analysis(symbols, session_id, date)
                
                # Print results summary
                print_workflow_summary(result, date)
                
                if result.get('success'):
                    successful_runs += 1
                    # Save per-symbol CSV in ./data
                    if save_workflow_to_symbol_csv(result, date, data_dir="./output/csv"):
                        print(f"Per-symbol CSV saved in ./output/csv for {date}")
                else:
                    failed_runs += 1
                    
            except Exception as e:
                print(f"{date} - Execution error: {e}")
                failed_runs += 1
        
        # Final Summary
        print(f"\nANALYSIS COMPLETE")
        print("=" * 60)
        print(f"Successful Runs: {successful_runs}")
        print(f"Failed Runs: {failed_runs}")
        success_rate = (successful_runs / len(trading_dates) * 100) if trading_dates else 0
        print(f"Success Rate: {success_rate:.1f}%")
        print(f"Completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        
    except Exception as e:
        print(f"Main execution error: {e}")

if __name__ == "__main__":
    asyncio.run(main())