|
|
import importlib |
|
|
import sys |
|
|
from dataclasses import dataclass |
|
|
from logging import getLogger |
|
|
from pathlib import Path |
|
|
from typing import Union |
|
|
|
|
|
from rich import print |
|
|
from rich.padding import Padding |
|
|
from rich.panel import Panel |
|
|
from rich.syntax import Syntax |
|
|
from rich.tree import Tree |
|
|
|
|
|
from fastapi_cli.exceptions import FastAPICLIException |
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
|
try: |
|
|
from fastapi import FastAPI |
|
|
except ImportError: |
|
|
FastAPI = None |
|
|
|
|
|
|
|
|
def get_default_path() -> Path: |
|
|
path = Path("main.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
path = Path("app.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
path = Path("api.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
path = Path("app/main.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
path = Path("app/app.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
path = Path("app/api.py") |
|
|
if path.is_file(): |
|
|
return path |
|
|
raise FastAPICLIException( |
|
|
"Could not find a default file to run, please provide an explicit path" |
|
|
) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class ModuleData: |
|
|
module_import_str: str |
|
|
extra_sys_path: Path |
|
|
|
|
|
|
|
|
def get_module_data_from_path(path: Path) -> ModuleData: |
|
|
logger.info( |
|
|
"Searching for package file structure from directories with [blue]__init__.py[/blue] files" |
|
|
) |
|
|
use_path = path.resolve() |
|
|
module_path = use_path |
|
|
if use_path.is_file() and use_path.stem == "__init__": |
|
|
module_path = use_path.parent |
|
|
module_paths = [module_path] |
|
|
extra_sys_path = module_path.parent |
|
|
for parent in module_path.parents: |
|
|
init_path = parent / "__init__.py" |
|
|
if init_path.is_file(): |
|
|
module_paths.insert(0, parent) |
|
|
extra_sys_path = parent.parent |
|
|
else: |
|
|
break |
|
|
logger.info(f"Importing from {extra_sys_path.resolve()}") |
|
|
root = module_paths[0] |
|
|
name = f"π {root.name}" if root.is_file() else f"π {root.name}" |
|
|
root_tree = Tree(name) |
|
|
if root.is_dir(): |
|
|
root_tree.add("[dim]π __init__.py[/dim]") |
|
|
tree = root_tree |
|
|
for sub_path in module_paths[1:]: |
|
|
sub_name = ( |
|
|
f"π {sub_path.name}" if sub_path.is_file() else f"π {sub_path.name}" |
|
|
) |
|
|
tree = tree.add(sub_name) |
|
|
if sub_path.is_dir(): |
|
|
tree.add("[dim]π __init__.py[/dim]") |
|
|
title = "[b green]Python module file[/b green]" |
|
|
if len(module_paths) > 1 or module_path.is_dir(): |
|
|
title = "[b green]Python package file structure[/b green]" |
|
|
panel = Padding( |
|
|
Panel( |
|
|
root_tree, |
|
|
title=title, |
|
|
expand=False, |
|
|
padding=(1, 2), |
|
|
), |
|
|
1, |
|
|
) |
|
|
print(panel) |
|
|
module_str = ".".join(p.stem for p in module_paths) |
|
|
logger.info(f"Importing module [green]{module_str}[/green]") |
|
|
return ModuleData( |
|
|
module_import_str=module_str, extra_sys_path=extra_sys_path.resolve() |
|
|
) |
|
|
|
|
|
|
|
|
def get_app_name(*, mod_data: ModuleData, app_name: Union[str, None] = None) -> str: |
|
|
try: |
|
|
mod = importlib.import_module(mod_data.module_import_str) |
|
|
except (ImportError, ValueError) as e: |
|
|
logger.error(f"Import error: {e}") |
|
|
logger.warning( |
|
|
"Ensure all the package directories have an [blue]__init__.py[/blue] file" |
|
|
) |
|
|
raise |
|
|
if not FastAPI: |
|
|
raise FastAPICLIException( |
|
|
"Could not import FastAPI, try running 'pip install fastapi'" |
|
|
) from None |
|
|
object_names = dir(mod) |
|
|
object_names_set = set(object_names) |
|
|
if app_name: |
|
|
if app_name not in object_names_set: |
|
|
raise FastAPICLIException( |
|
|
f"Could not find app name {app_name} in {mod_data.module_import_str}" |
|
|
) |
|
|
app = getattr(mod, app_name) |
|
|
if not isinstance(app, FastAPI): |
|
|
raise FastAPICLIException( |
|
|
f"The app name {app_name} in {mod_data.module_import_str} doesn't seem to be a FastAPI app" |
|
|
) |
|
|
return app_name |
|
|
for preferred_name in ["app", "api"]: |
|
|
if preferred_name in object_names_set: |
|
|
obj = getattr(mod, preferred_name) |
|
|
if isinstance(obj, FastAPI): |
|
|
return preferred_name |
|
|
for name in object_names: |
|
|
obj = getattr(mod, name) |
|
|
if isinstance(obj, FastAPI): |
|
|
return name |
|
|
raise FastAPICLIException("Could not find FastAPI app in module, try using --app") |
|
|
|
|
|
|
|
|
def get_import_string( |
|
|
*, path: Union[Path, None] = None, app_name: Union[str, None] = None |
|
|
) -> str: |
|
|
if not path: |
|
|
path = get_default_path() |
|
|
logger.info(f"Using path [blue]{path}[/blue]") |
|
|
logger.info(f"Resolved absolute path {path.resolve()}") |
|
|
if not path.exists(): |
|
|
raise FastAPICLIException(f"Path does not exist {path}") |
|
|
mod_data = get_module_data_from_path(path) |
|
|
sys.path.insert(0, str(mod_data.extra_sys_path)) |
|
|
use_app_name = get_app_name(mod_data=mod_data, app_name=app_name) |
|
|
import_example = Syntax( |
|
|
f"from {mod_data.module_import_str} import {use_app_name}", "python" |
|
|
) |
|
|
import_panel = Padding( |
|
|
Panel( |
|
|
import_example, |
|
|
title="[b green]Importable FastAPI app[/b green]", |
|
|
expand=False, |
|
|
padding=(1, 2), |
|
|
), |
|
|
1, |
|
|
) |
|
|
logger.info("Found importable FastAPI app") |
|
|
print(import_panel) |
|
|
import_string = f"{mod_data.module_import_str}:{use_app_name}" |
|
|
logger.info(f"Using import string [b green]{import_string}[/b green]") |
|
|
return import_string |
|
|
|