File size: 34,268 Bytes
b93364a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
#!/usr/bin/env python
"""Main CLI Module."""

import argparse
import contextlib
import difflib
import os
import re
import sys
import time
import webbrowser
from datetime import datetime
from functools import partial, update_wrapper
from pathlib import Path
from types import MethodType
from typing import Any, Dict, List, Optional

import pandas as pd
import requests
from openbb import obb
from openbb_cli.config import constants
from openbb_cli.config.constants import (
    ASSETS_DIRECTORY,
    ENV_FILE_SETTINGS,
    HOME_DIRECTORY,
    REPOSITORY_DIRECTORY,
)
from openbb_cli.config.menu_text import MenuText
from openbb_cli.controllers.base_controller import BaseController
from openbb_cli.controllers.platform_controller_factory import (
    PlatformControllerFactory,
)
from openbb_cli.controllers.script_parser import is_reset, parse_openbb_script
from openbb_cli.controllers.utils import (
    bootup,
    first_time_user,
    get_flair_and_username,
    parse_and_split_input,
    print_goodbye,
    print_rich_table,
    reset,
    suppress_stdout,
    welcome_message,
)
from openbb_cli.session import Session
from prompt_toolkit.formatted_text import HTML
from prompt_toolkit.styles import Style
from pydantic import BaseModel

PLATFORM_ROUTERS = {
    d: "menu" if not isinstance(getattr(obb, d), BaseModel) else "command"
    for d in dir(obb)
    if "_" not in d
}
NON_DATA_ROUTERS = ["coverage", "account", "reference", "system", "user"]
DATA_PROCESSING_ROUTERS = ["technical", "quantitative", "econometrics"]

# pylint: disable=too-many-public-methods,import-outside-toplevel, too-many-function-args
# pylint: disable=too-many-branches,no-member,C0302,too-many-return-statements, inconsistent-return-statements

env_file = str(ENV_FILE_SETTINGS)
session = Session()


class CLIController(BaseController):
    """CLI Controller class."""

    CHOICES_COMMANDS = ["record", "stop", "exe", "results"]
    CHOICES_MENUS = [
        "settings",
    ]

    for router, value in PLATFORM_ROUTERS.items():
        if value == "menu":
            CHOICES_MENUS.append(router)
        else:
            CHOICES_COMMANDS.append(router)

    PATH = "/"
    CHOICES_GENERATION = False

    def __init__(self, jobs_cmds: Optional[List[str]] = None):
        """Construct CLI controller."""
        self.ROUTINE_FILES: Dict[str, str] = dict()
        self.ROUTINE_DEFAULT_FILES: Dict[str, str] = dict()
        self.ROUTINE_PERSONAL_FILES: Dict[str, str] = dict()
        self.ROUTINE_CHOICES: Dict[str, Any] = dict()

        super().__init__(jobs_cmds)

        self.queue: List[str] = list()

        if jobs_cmds:
            self.queue = parse_and_split_input(
                an_input=" ".join(jobs_cmds), custom_filters=[]
            )

        self.update_success = False

        self._generate_platform_commands()

        self.update_runtime_choices()

    def _generate_platform_commands(self):
        """Generate Platform based commands/menus."""

        def method_call_class(self, _, controller, name, parent_path, target):
            self.queue = self.load_class(
                controller, name, parent_path, target, self.queue
            )

        # pylint: disable=unused-argument
        def method_call_command(self, _, router: str):
            """Call command."""
            mdl = getattr(obb, router)
            df = pd.DataFrame.from_dict(mdl.model_dump(), orient="index")
            if isinstance(df.columns, pd.RangeIndex):
                df.columns = [str(i) for i in df.columns]
            return print_rich_table(df, show_index=True)

        for router, value in PLATFORM_ROUTERS.items():
            target = getattr(obb, router)

            if value == "menu":
                pcf = PlatformControllerFactory(
                    target, reference=obb.reference["paths"]  # type: ignore
                )
                DynamicController = pcf.create()

                # Bind the method to the class
                bound_method = MethodType(method_call_class, self)

                # Update the wrapper and set the attribute
                bound_method = update_wrapper(  # type: ignore
                    partial(
                        bound_method,
                        controller=DynamicController,
                        name=router,
                        target=target,
                        parent_path=self.path,
                    ),
                    method_call_class,
                )
            else:
                bound_method = MethodType(method_call_command, self)
                bound_method = update_wrapper(  # type: ignore
                    partial(bound_method, router=router),
                    method_call_command,
                )

            setattr(self, f"call_{router}", bound_method)

    def update_runtime_choices(self):
        """Update runtime choices."""
        routines_directory = Path(session.user.preferences.export_directory, "routines")

        if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT:
            # choices: dict = self.choices_default
            choices: dict = {c: {} for c in self.controller_choices}  # type: ignore

            self.ROUTINE_FILES = {
                filepath.name: filepath  # type: ignore
                for filepath in routines_directory.rglob("*.openbb")
            }
            self.ROUTINE_DEFAULT_FILES = {
                filepath.name: filepath  # type: ignore
                for filepath in Path(routines_directory / "hub" / "default").rglob(
                    "*.openbb"
                )
            }
            self.ROUTINE_PERSONAL_FILES = {
                filepath.name: filepath  # type: ignore
                for filepath in Path(routines_directory / "hub" / "personal").rglob(
                    "*.openbb"
                )
            }

            choices["exe"] = {
                "--file": {
                    filename: {} for filename in list(self.ROUTINE_FILES.keys())
                },
                "-f": "--file",
                "--example": None,
                "-e": "--example",
                "--input": None,
                "-i": "--input",
                "--url": None,
                "--help": None,
                "-h": "--help",
            }
            choices["record"] = {
                "--name": None,
                "-n": "--name",
                "--description": None,
                "-d": "--description",
                "--public": None,
                "-p": "--public",
                "--tag1": {c: None for c in constants.SCRIPT_TAGS},
                "--tag2": {c: None for c in constants.SCRIPT_TAGS},
                "--tag3": {c: None for c in constants.SCRIPT_TAGS},
                "--help": None,
                "-h": "--help",
            }
            choices["stop"] = {"--help": None, "-h": "--help"}
            choices["results"] = {
                "--help": None,
                "-h": "--help",
                "--export": {c: None for c in ["csv", "json", "xlsx", "png", "jpg"]},
                "--index": None,
                "--key": None,
                "--chart": None,
                "--sheet_name": None,
            }

            self.update_completer(choices)

    def print_help(self):
        """Print help."""
        mt = MenuText("")

        mt.add_info("Configure the platform and manage your account")
        for router, value in PLATFORM_ROUTERS.items():
            if router not in NON_DATA_ROUTERS or router in ["reference", "coverage"]:
                continue
            if value == "menu":
                menu_description = (
                    obb.reference["routers"]  # type: ignore
                    .get(f"{self.PATH}{router}", {})
                    .get("description")
                ) or ""
                mt.add_menu(
                    name=router,
                    description=menu_description.split(".")[0].lower(),
                )
            else:
                mt.add_cmd(router)

        mt.add_info("\nConfigure your CLI")
        mt.add_menu(
            "settings",
            description="enable and disable feature flags, preferences and settings",
        )
        mt.add_raw("\n")
        mt.add_info("Record and execute your own .openbb routine scripts")
        mt.add_cmd("record", description="start recording current session")
        mt.add_cmd(
            "stop", description="stop session recording and convert to .openbb routine"
        )
        mt.add_cmd(
            "exe",
            description="execute .openbb routine scripts (use exe --example for an example)",
        )
        mt.add_raw("\n")
        mt.add_info("Retrieve data from different asset classes and providers")

        for router, value in PLATFORM_ROUTERS.items():
            if router in NON_DATA_ROUTERS or router in DATA_PROCESSING_ROUTERS:
                continue
            if value == "menu":
                menu_description = (
                    obb.reference["routers"]  # type: ignore
                    .get(f"{self.PATH}{router}", {})
                    .get("description")
                ) or ""
                mt.add_menu(
                    name=router,
                    description=menu_description.split(".")[0].lower(),
                )
            else:
                mt.add_cmd(router)

        if any(router in PLATFORM_ROUTERS for router in DATA_PROCESSING_ROUTERS):
            mt.add_info("\nAnalyze and process previously obtained data")

            for router, value in PLATFORM_ROUTERS.items():
                if router not in DATA_PROCESSING_ROUTERS:
                    continue
                if value == "menu":
                    menu_description = (
                        obb.reference["routers"]  # type: ignore
                        .get(f"{self.PATH}{router}", {})
                        .get("description")
                    ) or ""
                    mt.add_menu(
                        name=router,
                        description=menu_description.split(".")[0].lower(),
                    )
                else:
                    mt.add_cmd(router)

        mt.add_raw("\n")
        mt.add_cmd("results")
        if session.obbject_registry.obbjects:
            mt.add_info("\nCached Results")
            for key, value in list(session.obbject_registry.all.items())[  # type: ignore
                : session.settings.N_TO_DISPLAY_OBBJECT_REGISTRY
            ]:
                mt.add_raw(
                    f"[yellow]OBB{key}[/yellow]: {value['command']}",  # type: ignore[index]
                    left_spacing=True,
                )

        session.console.print(text=mt.menu_text, menu="Home")
        self.update_runtime_choices()

    def parse_input(self, an_input: str) -> List:
        """Overwrite the BaseController parse_input for `askobb` and 'exe'.

        This will allow us to search for something like "P/E" ratio.
        """
        # Filtering out sorting parameters with forward slashes like P/E
        sort_filter = r"((\ -q |\ --question|\ ).*?(/))"
        # Filter out urls
        url = r"(exe (--url )?(https?://)?my\.openbb\.(dev|co)/u/.*/routine/.*)"
        custom_filters = [sort_filter, url]
        return parse_and_split_input(an_input=an_input, custom_filters=custom_filters)

    def call_settings(self, _):
        """Process settings command."""
        from openbb_cli.controllers.settings_controller import (
            SettingsController,
        )

        self.queue = self.load_class(SettingsController, self.queue)

    def call_exe(self, other_args: List[str]):
        """Process exe command."""
        # Merge rest of string path to other_args and remove queue since it is a dir
        other_args += self.queue

        if not other_args:
            session.console.print(
                "[info]Provide a path to the routine you wish to execute. For an example, please use "
                "`exe --example`.\n[/info]"
            )
            return
        parser = argparse.ArgumentParser(
            add_help=False,
            formatter_class=argparse.ArgumentDefaultsHelpFormatter,
            prog="exe",
            description="Execute automated routine script. For an example, please use "
            "`exe --example`.",
        )
        parser.add_argument(
            "--file",
            "-f",
            help="The path or .openbb file to run.",
            dest="file",
            required="-h" not in other_args
            and "--help" not in other_args
            and "-e" not in other_args
            and "--example" not in other_args
            and "--url" not in other_args
            and "my.openbb" not in other_args[0],
            type=str,
            nargs="+",
        )
        parser.add_argument(
            "-i",
            "--input",
            help="Select multiple inputs to be replaced in the routine and separated by commas. E.g. GME,AMC,BTC-USD",
            dest="routine_args",
            type=str,
        )
        parser.add_argument(
            "-e",
            "--example",
            help="Run an example script to understand how routines can be used.",
            dest="example",
            action="store_true",
            default=False,
        )
        parser.add_argument(
            "--url", help="URL to run openbb script from.", dest="url", type=str
        )
        if other_args and "-" not in other_args[0][0]:
            if other_args[0].startswith("my.") or other_args[0].startswith("http"):
                other_args.insert(0, "--url")
            else:
                other_args.insert(0, "--file")
        ns_parser = self.parse_known_args_and_warn(parser, other_args)
        if ns_parser:
            if ns_parser.example:
                routine_path = ASSETS_DIRECTORY / "routines" / "routine_example.openbb"
                session.console.print(  # TODO: Point to docs when ready
                    "[info]Executing an example, please visit our docs "
                    "to learn how to create your own script.[/info]\n"
                )
                time.sleep(3)
            elif ns_parser.url:
                if not ns_parser.url.startswith(
                    "https"
                ) and not ns_parser.url.startswith("http:"):
                    url = "https://" + ns_parser.url
                elif ns_parser.url.startswith("http://"):
                    url = ns_parser.url.replace("http://", "https://")
                else:
                    url = ns_parser.url
                username = url.split("/")[-3]
                script_name = url.split("/")[-1]
                file_name = f"{username}_{script_name}.openbb"
                final_url = f"{url}?raw=true"
                response = requests.get(final_url, timeout=10)
                if response.status_code != 200:
                    session.console.print(
                        "[red]Could not find the requested script.[/red]"
                    )
                    return
                routine_text = response.json()["script"]
                file_path = Path(session.user.preferences.export_directory, "routines")
                routine_path = file_path / file_name
                with open(routine_path, "w") as file:
                    file.write(routine_text)
                self.update_runtime_choices()

            elif ns_parser.file:
                file_path = " ".join(ns_parser.file)  # type: ignore
                # if string is not in this format "default/file.openbb" then check for files in ROUTINE_FILES
                full_path = file_path
                hub_routine = file_path.split("/")  # type: ignore
                # Change with: my.openbb.co
                if hub_routine[0] == "default":
                    routine_path = Path(
                        self.ROUTINE_DEFAULT_FILES.get(hub_routine[1], full_path)
                    )
                elif hub_routine[0] == "personal":
                    routine_path = Path(
                        self.ROUTINE_PERSONAL_FILES.get(hub_routine[1], full_path)
                    )
                else:
                    routine_path = Path(self.ROUTINE_FILES.get(file_path, full_path))  # type: ignore
            else:
                return

            try:
                with open(routine_path) as fp:
                    raw_lines = list(fp)

                script_inputs = []
                # Capture ARGV either as list if args separated by commas or as single value
                if routine_args := ns_parser.routine_args:
                    pattern = r"\[(.*?)\]"
                    matches = re.findall(pattern, routine_args)

                    for match in matches:
                        routine_args = routine_args.replace(f"[{match}]", "")
                        script_inputs.append(match)

                    script_inputs.extend(
                        [val for val in routine_args.split(",") if val]
                    )

                err, parsed_script = parse_openbb_script(
                    raw_lines=raw_lines, script_inputs=script_inputs
                )

                # If there err output is not an empty string then it means there was an
                # issue in parsing the routine and therefore we don't want to feed it
                # to the terminal
                if err:
                    session.console.print(err)
                    return

                self.queue = [
                    val
                    for val in parse_and_split_input(
                        an_input=parsed_script, custom_filters=[]
                    )
                    if val
                ]

                if "export" in self.queue[0]:
                    export_path = self.queue[0].split(" ")[1]
                    # If the path selected does not start from the user root, give relative location from root
                    if export_path[0] == "~":
                        export_path = export_path.replace(
                            "~", HOME_DIRECTORY.as_posix()
                        )
                    elif export_path[0] != "/":
                        export_path = os.path.join(
                            os.path.dirname(os.path.abspath(__file__)), export_path
                        )

                    # Check if the directory exists
                    if os.path.isdir(export_path):
                        session.console.print(
                            f"Export data to be saved in the selected folder: '{export_path}'"
                        )
                    else:
                        os.makedirs(export_path)
                        session.console.print(
                            f"[green]Folder '{export_path}' successfully created.[/green]"
                        )
                    self.queue = self.queue[1:]

            except FileNotFoundError:
                session.console.print(
                    f"[red]File '{routine_path}' doesn't exist.[/red]"
                )
                return


def handle_job_cmds(jobs_cmds: Optional[List[str]]) -> Optional[List[str]]:
    """Handle job commands."""
    export_path = ""
    if jobs_cmds and "export" in jobs_cmds[0]:
        commands = jobs_cmds[0].split("/")
        first_split = commands[0].split(" ")
        if len(first_split) > 1:
            export_path = first_split[1]
        jobs_cmds = ["/".join(commands[1:])]
    if not export_path:
        return jobs_cmds
    if export_path[0] == "~":
        export_path = export_path.replace("~", HOME_DIRECTORY.as_posix())
    elif export_path[0] != "/":
        export_path = os.path.join(
            os.path.dirname(os.path.abspath(__file__)), export_path
        )

    # Check if the directory exists
    if os.path.isdir(export_path):
        session.console.print(
            f"Export data to be saved in the selected folder: '{export_path}'"
        )
    else:
        os.makedirs(export_path)
        session.console.print(
            f"[green]Folder '{export_path}' successfully created.[/green]"
        )
    return jobs_cmds


# pylint: disable=unused-argument
def run_cli(jobs_cmds: Optional[List[str]] = None, test_mode=False):
    """Run the CLI menu."""
    ret_code = 1
    t_controller = CLIController(jobs_cmds)
    an_input = ""

    jobs_cmds = handle_job_cmds(jobs_cmds)

    bootup()
    if not jobs_cmds:
        welcome_message()

        if first_time_user():
            with contextlib.suppress(EOFError):
                webbrowser.open("https://docs.openbb.co/cli")

        t_controller.print_help()

    while ret_code:
        # There is a command in the queue
        if t_controller.queue and len(t_controller.queue) > 0:
            # If the command is quitting the menu we want to return in here
            if t_controller.queue[0] in ("q", "..", "quit"):
                print_goodbye()
                break

            # Consume 1 element from the queue
            an_input = t_controller.queue[0]
            t_controller.queue = t_controller.queue[1:]

            # Print the current location because this was an instruction and we want user to know what was the action
            if an_input and an_input.split(" ")[0] in t_controller.CHOICES_COMMANDS:
                session.console.print(f"{get_flair_and_username()} / $ {an_input}")

        # Get input command from user
        else:
            try:
                # Get input from user using auto-completion
                if session.prompt_session and session.settings.USE_PROMPT_TOOLKIT:
                    # Check if toolbar hint was enabled
                    if session.settings.TOOLBAR_HINT:
                        an_input = session.prompt_session.prompt(  # type: ignore[union-attr]
                            f"{get_flair_and_username()} / $ ",
                            completer=t_controller.completer,
                            search_ignore_case=True,
                            bottom_toolbar=HTML(
                                '<style bg="ansiblack" fg="ansiwhite">[h]</style> help menu    '
                                '<style bg="ansiblack" fg="ansiwhite">[q]</style> return to previous menu    '
                                '<style bg="ansiblack" fg="ansiwhite">[e]</style> exit the program    '
                                '<style bg="ansiblack" fg="ansiwhite">[cmd -h]</style> '
                                "see usage and available options    "
                            ),
                            style=Style.from_dict(
                                {
                                    "bottom-toolbar": "#ffffff bg:#333333",
                                }
                            ),
                        )
                    else:
                        an_input = session.prompt_session.prompt(  # type: ignore[union-attr]
                            f"{get_flair_and_username()} / $ ",
                            completer=t_controller.completer,
                            search_ignore_case=True,
                        )

                # Get input from user without auto-completion
                else:
                    an_input = input(f"{get_flair_and_username()} / $ ")

            except (KeyboardInterrupt, EOFError):
                print_goodbye()
                break

        try:
            # Process the input command
            t_controller.queue = t_controller.switch(an_input)

            if an_input in ("q", "quit", "..", "exit", "e"):
                print_goodbye()
                break

            # Check if the user wants to reset application
            if an_input in ("r", "reset") or t_controller.update_success:
                reset(t_controller.queue if t_controller.queue else [])
                break

        except SystemExit:
            session.console.print(
                f"[red]The command '{an_input}' doesn't exist on the / menu.[/red]\n",
            )
            similar_cmd = difflib.get_close_matches(
                an_input.split(" ")[0] if " " in an_input else an_input,
                t_controller.controller_choices,
                n=1,
                cutoff=0.7,
            )
            if similar_cmd:
                an_input = similar_cmd[0]
                if " " in an_input:
                    candidate_input = (
                        f"{similar_cmd[0]} {' '.join(an_input.split(' ')[1:])}"
                    )
                    if candidate_input == an_input:
                        an_input = ""
                        t_controller.queue = []
                        session.console.print("\n")
                        continue
                    an_input = candidate_input

                session.console.print(f"[green]Replacing by '{an_input}'.[/green]")
                t_controller.queue.insert(0, an_input)


def insert_start_slash(cmds: List[str]) -> List[str]:
    """Insert a slash at the beginning of a command sequence."""
    if not cmds[0].startswith("/"):
        cmds[0] = f"/{cmds[0]}"
    if cmds[0].startswith("/home"):
        cmds[0] = f"/{cmds[0][5:]}"
    return cmds


def run_scripts(
    path: Path,
    test_mode: bool = False,
    verbose: bool = False,
    routines_args: Optional[List[str]] = None,
    special_arguments: Optional[Dict[str, str]] = None,
    output: bool = True,
):
    """Run given .openbb scripts.

    Parameters
    ----------
    path : str
        The location of the .openbb file
    test_mode : bool
        Whether the CLI is in test mode
    verbose : bool
        Whether to run tests in verbose mode
    routines_args : List[str]
        One or multiple inputs to be replaced in the routine and separated by commas.
        E.g. GME,AMC,BTC-USD
    special_arguments: Optional[Dict[str, str]]
        Replace `${key=default}` with `value` for every key in the dictionary
    output: bool
        Whether to log tests to txt files
    """
    if not path.exists():
        session.console.print(f"File '{path}' doesn't exist. Launching base CLI.\n")
        if not test_mode:
            run_cli()

    # THIS NEEDS TO BE REFACTORED!!! - ITS USED FOR TESTING
    with path.open() as fp:
        raw_lines = [x for x in fp if (not is_reset(x)) and ("#" not in x) and x]
        raw_lines = [
            raw_line.strip("\n") for raw_line in raw_lines if raw_line.strip("\n")
        ]

        if routines_args:
            lines = []
            for rawline in raw_lines:
                templine = rawline
                for i, arg in enumerate(routines_args):
                    templine = templine.replace(f"$ARGV[{i}]", arg)
                lines.append(templine)
        # Handle new testing arguments:
        elif special_arguments:
            lines = []
            for line in raw_lines:
                new_line = re.sub(
                    r"\${[^{]+=[^{]+}",
                    lambda x: replace_dynamic(x, special_arguments),  # type: ignore
                    line,
                )
                lines.append(new_line)

        else:
            lines = raw_lines

        if test_mode and "exit" not in lines[-1]:
            lines.append("exit")

        # Deals with the export with a path with "/" in it
        export_folder = ""
        if "export" in lines[0]:
            export_folder = lines[0].split("export ")[1].rstrip()
            lines = lines[1:]

        simulate_argv = f"/{'/'.join([line.rstrip() for line in lines])}"
        file_cmds = simulate_argv.replace("//", "/home/").split()
        file_cmds = insert_start_slash(file_cmds) if file_cmds else file_cmds
        file_cmds = (
            [f"export {export_folder}{' '.join(file_cmds)}"]
            if export_folder
            else [" ".join(file_cmds)]
        )

        if not test_mode or verbose:
            run_cli(file_cmds, test_mode=True)
        else:
            with suppress_stdout():
                session.console.print(f"To ensure: {output}")
                if output:
                    timestamp = datetime.now().timestamp()
                    stamp_str = str(timestamp).replace(".", "")
                    whole_path = Path(REPOSITORY_DIRECTORY / "integration_test_output")
                    whole_path.mkdir(parents=True, exist_ok=True)
                    first_cmd = file_cmds[0].split("/")[1]
                    with open(
                        whole_path / f"{stamp_str}_{first_cmd}_output.txt", "w"
                    ) as output_file, contextlib.redirect_stdout(output_file):
                        run_cli(file_cmds, test_mode=True)
                else:
                    run_cli(file_cmds, test_mode=True)


def replace_dynamic(match: re.Match, special_arguments: Dict[str, str]) -> str:
    """Replace ${key=default} with value in special_arguments if it exists, else with default.

    Parameters
    ----------
    match: re.Match[str]
        The match object
    special_arguments: Dict[str, str]
        The key value pairs to replace in the scripts

    Returns
    ----------
    str
        The new string
    """
    cleaned = match[0].replace("{", "").replace("}", "").replace("$", "")
    key, default = cleaned.split("=")
    dict_value = special_arguments.get(key, default)
    if dict_value:
        return dict_value
    return default


def run_routine(file: str, routines_args=Optional[str]):
    """Execute command routine from .openbb file."""
    user_routine_path = Path(session.user.preferences.export_directory, "routines")
    default_routine_path = ASSETS_DIRECTORY / "routines" / file

    if user_routine_path.exists():
        run_scripts(path=user_routine_path, routines_args=routines_args)
    elif default_routine_path.exists():
        run_scripts(path=default_routine_path, routines_args=routines_args)
    else:
        session.console.print(
            f"Routine not found, please put your `.openbb` file into : {user_routine_path}."
        )


# pylint: disable=unused-argument
def main(
    debug: bool,
    dev: bool,
    path_list: List[str],
    routines_args: Optional[List[str]] = None,
    **kwargs,
):
    """Run the CLI with various options.

    Parameters
    ----------
    debug : bool
        Whether to run the CLI in debug mode
    dev:
        Points backend towards development environment instead of production
    test : bool
        Whether to run the CLI in integrated test mode
    filtert : str
        Filter test files with given string in name
    paths : List[str]
        The paths to run for scripts or to test
    verbose : bool
        Whether to show output from tests
    routines_args : List[str]
        One or multiple inputs to be replaced in the routine and separated by commas.
        E.g. GME,AMC,BTC-USD
    """
    if debug:
        session.settings.DEBUG_MODE = True

    if dev:
        session.settings.DEV_BACKEND = True
        session.settings.BASE_URL = "https://payments.openbb.dev/"
        session.settings.HUB_URL = "https://my.openbb.dev"

    if isinstance(path_list, list) and path_list[0].endswith(".openbb"):
        run_routine(file=path_list[0], routines_args=routines_args)
    elif path_list:
        argv_cmds = list([" ".join(path_list).replace(" /", "/home/")])
        argv_cmds = insert_start_slash(argv_cmds) if argv_cmds else argv_cmds
        run_cli(argv_cmds)
    else:
        run_cli()


def parse_args_and_run():
    """Parse input arguments and run CLI."""
    parser = argparse.ArgumentParser(
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
        prog="cli",
        description="The OpenBB Platform CLI.",
    )
    parser.add_argument(
        "-d",
        "--debug",
        dest="debug",
        action="store_true",
        default=False,
        help="Runs the CLI in debug mode.",
    )
    parser.add_argument(
        "--dev",
        dest="dev",
        action="store_true",
        default=False,
        help="Points backend towards development environment instead of production",
    )
    parser.add_argument(
        "--file",
        help="The path or .openbb file to run.",
        dest="path",
        nargs="+",
        default="",
        type=str,
    )
    parser.add_argument(
        "-i",
        "--input",
        help=(
            "Select multiple inputs to be replaced in the routine and separated by commas."
            "E.g. GME,AMC,BTC-USD"
        ),
        dest="routine_args",
        type=lambda s: [str(item) for item in s.split(",")],
        default=None,
    )
    parser.add_argument(
        "-t",
        "--test",
        action="store_true",
        help=(
            "Run the CLI in testing mode. Also run this option and '-h'"
            " to see testing argument options."
        ),
    )
    # The args -m, -f and --HistoryManager.hist_file are used only in reports menu
    # by papermill and that's why they have suppress help.
    parser.add_argument(
        "-m",
        help=argparse.SUPPRESS,
        dest="module",
        default="",
        type=str,
    )
    parser.add_argument(
        "-f",
        help=argparse.SUPPRESS,
        dest="module_file",
        default="",
        type=str,
    )
    parser.add_argument(
        "--HistoryManager.hist_file",
        help=argparse.SUPPRESS,
        dest="module_hist_file",
        default="",
        type=str,
    )
    if sys.argv[1:] and "-" not in sys.argv[1][0]:
        sys.argv.insert(1, "--file")
    ns_parser, unknown = parser.parse_known_args()

    # This ensures that if cli.py receives unknown args it will not start.
    # Use -d flag if you want to see the unknown args.
    if unknown:
        if ns_parser.debug:
            session.console.print(unknown)
        else:
            sys.exit(-1)

    main(
        ns_parser.debug,
        ns_parser.dev,
        ns_parser.path,
        ns_parser.routine_args,
        module=ns_parser.module,
        module_file=ns_parser.module_file,
        module_hist_file=ns_parser.module_hist_file,
    )


def launch(
    debug: bool = False, dev: bool = False, queue: Optional[List[str]] = None
) -> None:
    """Launch CLI."""
    if queue:
        main(debug, dev, queue, module="")
    else:
        parse_args_and_run()


if __name__ == "__main__":
    parse_args_and_run()