File size: 6,467 Bytes
f56729d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
45ac00a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
"""
Utility functions for the Folio CLI.

This module provides helper functions used across the CLI.
"""

import os
from pathlib import Path

import pandas as pd

from src.folio.portfolio import process_portfolio_data


def load_portfolio(path, state, console=None):
    """Load a portfolio from a CSV file.

    Args:
        path: Path to the portfolio CSV file
        state: Application state dictionary
        console: Rich console for output

    Returns:
        Tuple of (groups, summary)
    """
    from rich.console import Console

    if console is None:
        console = Console()

    # Resolve the path
    if not os.path.isabs(path):
        # Try relative to current directory
        resolved_path = Path(os.getcwd()) / path
        if not resolved_path.exists():
            # Try relative to project root
            project_root = Path(__file__).parent.parent.parent
            resolved_path = project_root / path
    else:
        resolved_path = Path(path)

    # Check if the file exists
    if not resolved_path.exists():
        raise FileNotFoundError(f"Portfolio file not found: {path}")

    # Load the portfolio
    console.print(f"Loading portfolio from [cyan]{resolved_path}[/cyan]...")

    try:
        df = pd.read_csv(resolved_path)
        groups, summary, _ = process_portfolio_data(df, update_prices=True)

        # Update the state
        state["portfolio_groups"] = groups
        state["portfolio_summary"] = summary
        state["loaded_portfolio"] = str(resolved_path)

        console.print(
            f"Loaded portfolio with [green]{len(groups)}[/green] position groups."
        )
        return groups, summary
    except Exception as e:
        raise RuntimeError(f"Error loading portfolio: {e!s}") from e


def find_position_group(ticker, portfolio_groups):
    """Find a position group by ticker.

    Args:
        ticker: Ticker symbol to find
        portfolio_groups: List of portfolio groups

    Returns:
        PortfolioGroup if found, None otherwise
    """
    if not portfolio_groups:
        return None

    # Normalize the ticker
    ticker = ticker.upper()

    # Find the group
    for group in portfolio_groups:
        if group.ticker == ticker:
            return group

    return None


def parse_args(args, arg_specs):
    """Parse command arguments according to specifications.

    Args:
        args: List of argument strings
        arg_specs: Dictionary mapping argument names to specifications
            Each specification is a dictionary with:
                - type: Type to convert to (float, int, str, bool)
                - default: Default value
                - help: Help text
                - aliases: List of aliases (e.g., ['-r', '--range'])

    Returns:
        Dictionary of parsed arguments
    """
    # Initialize with default values
    result = {name: spec.get("default") for name, spec in arg_specs.items()}

    # Parse arguments
    i = 0
    while i < len(args):
        arg = args[i]

        # Check if this is a flag/option
        if arg.startswith("-"):
            # Find the matching argument
            found = False
            for name, spec in arg_specs.items():
                aliases = spec.get("aliases", [])
                if arg in aliases:
                    # This is a match
                    found = True

                    # Handle boolean flags
                    if spec.get("type") is bool:
                        result[name] = True
                        i += 1
                        break

                    # Handle value arguments
                    if i + 1 < len(args):
                        try:
                            # Convert to the specified type
                            value = args[i + 1]
                            if spec.get("type") is float:
                                result[name] = float(value)
                            elif spec.get("type") is int:
                                result[name] = int(value)
                            else:
                                result[name] = value
                            i += 2
                            break
                        except ValueError as ve:
                            raise ValueError(
                                f"Invalid value for {arg}: {args[i + 1]}"
                            ) from ve
                    else:
                        raise ValueError(f"Missing value for {arg}")

            if not found:
                raise ValueError(f"Unknown argument: {arg}")
        else:
            # This is a positional argument
            # For now, we'll just skip it
            i += 1

    return result


def filter_portfolio_groups(portfolio_groups, filter_criteria=None):
    """Filter portfolio groups based on criteria.

    Args:
        portfolio_groups: List of portfolio groups
        filter_criteria: Dictionary of filter criteria
            - tickers: List of tickers to include
            - min_value: Minimum position value
            - max_value: Maximum position value
            - has_options: Whether to include positions with options
            - has_stock: Whether to include positions with stock

    Returns:
        Filtered list of portfolio groups
    """
    if not filter_criteria:
        return portfolio_groups

    filtered_groups = portfolio_groups

    # Filter by tickers
    if filter_criteria.get("tickers"):
        tickers = [t.upper() for t in filter_criteria["tickers"]]
        filtered_groups = [g for g in filtered_groups if g.ticker in tickers]

    # Filter by value
    if filter_criteria.get("min_value") is not None:
        filtered_groups = [
            g for g in filtered_groups if g.net_exposure >= filter_criteria["min_value"]
        ]

    if filter_criteria.get("max_value") is not None:
        filtered_groups = [
            g for g in filtered_groups if g.net_exposure <= filter_criteria["max_value"]
        ]

    # Filter by position type
    if filter_criteria.get("has_options") is not None:
        has_options = filter_criteria["has_options"]
        filtered_groups = [
            g for g in filtered_groups if bool(g.option_positions) == has_options
        ]

    if filter_criteria.get("has_stock") is not None:
        has_stock = filter_criteria["has_stock"]
        filtered_groups = [
            g for g in filtered_groups if bool(g.stock_position) == has_stock
        ]

    return filtered_groups