|
|
import asyncio |
|
|
import json |
|
|
import os |
|
|
import re |
|
|
import warnings |
|
|
from contextlib import asynccontextmanager |
|
|
from http import HTTPStatus |
|
|
from pathlib import Path |
|
|
from urllib.parse import urlencode |
|
|
|
|
|
from fastapi import FastAPI, HTTPException, Request, Response, status |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import FileResponse, JSONResponse |
|
|
from fastapi.staticfiles import StaticFiles |
|
|
from loguru import logger |
|
|
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor |
|
|
from pydantic import PydanticDeprecatedSince20 |
|
|
from pydantic_core import PydanticSerializationError |
|
|
from rich import print as rprint |
|
|
from starlette.middleware.base import BaseHTTPMiddleware |
|
|
|
|
|
from langflow.api import health_check_router, log_router, router |
|
|
from langflow.initial_setup.setup import ( |
|
|
create_or_update_starter_projects, |
|
|
initialize_super_user_if_needed, |
|
|
load_flows_from_directory, |
|
|
) |
|
|
from langflow.interface.types import get_and_cache_all_types_dict |
|
|
from langflow.interface.utils import setup_llm_caching |
|
|
from langflow.logging.logger import configure |
|
|
from langflow.middleware import ContentSizeLimitMiddleware |
|
|
from langflow.services.deps import get_settings_service, get_telemetry_service |
|
|
from langflow.services.utils import initialize_services, teardown_services |
|
|
|
|
|
|
|
|
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) |
|
|
|
|
|
|
|
|
MAX_PORT = 65535 |
|
|
|
|
|
|
|
|
class RequestCancelledMiddleware(BaseHTTPMiddleware): |
|
|
def __init__(self, app) -> None: |
|
|
super().__init__(app) |
|
|
|
|
|
async def dispatch(self, request: Request, call_next): |
|
|
sentinel = object() |
|
|
|
|
|
async def cancel_handler(): |
|
|
while True: |
|
|
if await request.is_disconnected(): |
|
|
return sentinel |
|
|
await asyncio.sleep(0.1) |
|
|
|
|
|
handler_task = asyncio.create_task(call_next(request)) |
|
|
cancel_task = asyncio.create_task(cancel_handler()) |
|
|
|
|
|
done, pending = await asyncio.wait([handler_task, cancel_task], return_when=asyncio.FIRST_COMPLETED) |
|
|
|
|
|
for task in pending: |
|
|
task.cancel() |
|
|
|
|
|
if cancel_task in done: |
|
|
return Response("Request was cancelled", status_code=499) |
|
|
return await handler_task |
|
|
|
|
|
|
|
|
class JavaScriptMIMETypeMiddleware(BaseHTTPMiddleware): |
|
|
async def dispatch(self, request: Request, call_next): |
|
|
try: |
|
|
response = await call_next(request) |
|
|
except Exception as exc: |
|
|
if isinstance(exc, PydanticSerializationError): |
|
|
message = ( |
|
|
"Something went wrong while serializing the response. " |
|
|
"Please share this error on our GitHub repository." |
|
|
) |
|
|
error_messages = json.dumps([message, str(exc)]) |
|
|
raise HTTPException(status_code=HTTPStatus.INTERNAL_SERVER_ERROR, detail=error_messages) from exc |
|
|
raise |
|
|
if ( |
|
|
"files/" not in request.url.path |
|
|
and request.url.path.endswith(".js") |
|
|
and response.status_code == HTTPStatus.OK |
|
|
): |
|
|
response.headers["Content-Type"] = "text/javascript" |
|
|
return response |
|
|
|
|
|
|
|
|
def get_lifespan(*, fix_migration=False, version=None): |
|
|
telemetry_service = get_telemetry_service() |
|
|
|
|
|
@asynccontextmanager |
|
|
async def lifespan(_app: FastAPI): |
|
|
configure(async_file=True) |
|
|
|
|
|
|
|
|
if version: |
|
|
rprint(f"[bold green]Starting Langflow v{version}...[/bold green]") |
|
|
else: |
|
|
rprint("[bold green]Starting Langflow...[/bold green]") |
|
|
try: |
|
|
await initialize_services(fix_migration=fix_migration) |
|
|
setup_llm_caching() |
|
|
await initialize_super_user_if_needed() |
|
|
all_types_dict = await get_and_cache_all_types_dict(get_settings_service()) |
|
|
await asyncio.to_thread(create_or_update_starter_projects, all_types_dict) |
|
|
telemetry_service.start() |
|
|
await load_flows_from_directory() |
|
|
yield |
|
|
|
|
|
except Exception as exc: |
|
|
if "langflow migration --fix" not in str(exc): |
|
|
logger.exception(exc) |
|
|
raise |
|
|
finally: |
|
|
|
|
|
logger.info("Cleaning up resources...") |
|
|
await teardown_services() |
|
|
await logger.complete() |
|
|
|
|
|
rprint("[bold red]Langflow shutdown complete[/bold red]") |
|
|
|
|
|
return lifespan |
|
|
|
|
|
|
|
|
def create_app(): |
|
|
"""Create the FastAPI app and include the router.""" |
|
|
from langflow.utils.version import get_version_info |
|
|
|
|
|
__version__ = get_version_info()["version"] |
|
|
|
|
|
configure() |
|
|
lifespan = get_lifespan(version=__version__) |
|
|
app = FastAPI(lifespan=lifespan, title="Langflow", version=__version__) |
|
|
app.add_middleware( |
|
|
ContentSizeLimitMiddleware, |
|
|
) |
|
|
|
|
|
setup_sentry(app) |
|
|
origins = ["*"] |
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=origins, |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
app.add_middleware(JavaScriptMIMETypeMiddleware) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def check_boundary(request: Request, call_next): |
|
|
if "/api/v1/files/upload" in request.url.path: |
|
|
content_type = request.headers.get("Content-Type") |
|
|
|
|
|
if not content_type or "multipart/form-data" not in content_type or "boundary=" not in content_type: |
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
|
content={"detail": "Content-Type header must be 'multipart/form-data' with a boundary parameter."}, |
|
|
) |
|
|
|
|
|
boundary = content_type.split("boundary=")[-1].strip() |
|
|
|
|
|
if not re.match(r"^[\w\-]{1,70}$", boundary): |
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
|
content={"detail": "Invalid boundary format"}, |
|
|
) |
|
|
|
|
|
body = await request.body() |
|
|
|
|
|
boundary_start = f"--{boundary}".encode() |
|
|
boundary_end = f"--{boundary}--\r\n".encode() |
|
|
|
|
|
if not body.startswith(boundary_start) or not body.endswith(boundary_end): |
|
|
return JSONResponse( |
|
|
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, |
|
|
content={"detail": "Invalid multipart formatting"}, |
|
|
) |
|
|
|
|
|
return await call_next(request) |
|
|
|
|
|
@app.middleware("http") |
|
|
async def flatten_query_string_lists(request: Request, call_next): |
|
|
flattened: list[tuple[str, str]] = [] |
|
|
for key, value in request.query_params.multi_items(): |
|
|
flattened.extend((key, entry) for entry in value.split(",")) |
|
|
|
|
|
request.scope["query_string"] = urlencode(flattened, doseq=True).encode("utf-8") |
|
|
|
|
|
return await call_next(request) |
|
|
|
|
|
settings = get_settings_service().settings |
|
|
if prome_port_str := os.environ.get("LANGFLOW_PROMETHEUS_PORT"): |
|
|
|
|
|
prome_port = int(prome_port_str) |
|
|
if prome_port > 0 or prome_port < MAX_PORT: |
|
|
rprint(f"[bold green]Starting Prometheus server on port {prome_port}...[/bold green]") |
|
|
settings.prometheus_enabled = True |
|
|
settings.prometheus_port = prome_port |
|
|
else: |
|
|
msg = f"Invalid port number {prome_port_str}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
if settings.prometheus_enabled: |
|
|
from prometheus_client import start_http_server |
|
|
|
|
|
start_http_server(settings.prometheus_port) |
|
|
|
|
|
app.include_router(router) |
|
|
app.include_router(health_check_router) |
|
|
app.include_router(log_router) |
|
|
|
|
|
@app.exception_handler(Exception) |
|
|
async def exception_handler(_request: Request, exc: Exception): |
|
|
if isinstance(exc, HTTPException): |
|
|
logger.error(f"HTTPException: {exc}", exc_info=exc) |
|
|
return JSONResponse( |
|
|
status_code=exc.status_code, |
|
|
content={"message": str(exc.detail)}, |
|
|
) |
|
|
logger.error(f"unhandled error: {exc}", exc_info=exc) |
|
|
return JSONResponse( |
|
|
status_code=HTTPStatus.INTERNAL_SERVER_ERROR, |
|
|
content={"message": str(exc)}, |
|
|
) |
|
|
|
|
|
FastAPIInstrumentor.instrument_app(app) |
|
|
|
|
|
return app |
|
|
|
|
|
|
|
|
def setup_sentry(app: FastAPI) -> None: |
|
|
settings = get_settings_service().settings |
|
|
if settings.sentry_dsn: |
|
|
import sentry_sdk |
|
|
from sentry_sdk.integrations.asgi import SentryAsgiMiddleware |
|
|
|
|
|
sentry_sdk.init( |
|
|
dsn=settings.sentry_dsn, |
|
|
traces_sample_rate=settings.sentry_traces_sample_rate, |
|
|
profiles_sample_rate=settings.sentry_profiles_sample_rate, |
|
|
) |
|
|
app.add_middleware(SentryAsgiMiddleware) |
|
|
|
|
|
|
|
|
def setup_static_files(app: FastAPI, static_files_dir: Path) -> None: |
|
|
"""Setup the static files directory. |
|
|
|
|
|
Args: |
|
|
app (FastAPI): FastAPI app. |
|
|
static_files_dir (str): Path to the static files directory. |
|
|
""" |
|
|
app.mount( |
|
|
"/", |
|
|
StaticFiles(directory=static_files_dir, html=True), |
|
|
name="static", |
|
|
) |
|
|
|
|
|
@app.exception_handler(404) |
|
|
async def custom_404_handler(_request, _exc): |
|
|
path = static_files_dir / "index.html" |
|
|
|
|
|
if not path.exists(): |
|
|
msg = f"File at path {path} does not exist." |
|
|
raise RuntimeError(msg) |
|
|
return FileResponse(path) |
|
|
|
|
|
|
|
|
def get_static_files_dir(): |
|
|
"""Get the static files directory relative to Langflow's main.py file.""" |
|
|
frontend_path = Path(__file__).parent |
|
|
return frontend_path / "frontend" |
|
|
|
|
|
|
|
|
def setup_app(static_files_dir: Path | None = None, *, backend_only: bool = False) -> FastAPI: |
|
|
"""Setup the FastAPI app.""" |
|
|
|
|
|
logger.info(f"Setting up app with static files directory {static_files_dir}") |
|
|
if not static_files_dir: |
|
|
static_files_dir = get_static_files_dir() |
|
|
|
|
|
if not backend_only and (not static_files_dir or not static_files_dir.exists()): |
|
|
msg = f"Static files directory {static_files_dir} does not exist." |
|
|
raise RuntimeError(msg) |
|
|
app = create_app() |
|
|
if not backend_only and static_files_dir is not None: |
|
|
setup_static_files(app, static_files_dir) |
|
|
return app |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
|
|
|
from langflow.__main__ import get_number_of_workers |
|
|
|
|
|
configure() |
|
|
uvicorn.run( |
|
|
"langflow.main:create_app", |
|
|
host="127.0.0.1", |
|
|
port=7860, |
|
|
workers=get_number_of_workers(), |
|
|
log_level="error", |
|
|
reload=True, |
|
|
loop="asyncio", |
|
|
) |
|
|
|