Spaces:
Runtime error
Runtime error
| import os | |
| import asyncio | |
| import tempfile | |
| from logbert_rca_pipeline_api import detect_anomalies_and_explain | |
| import redis | |
| import boto3 | |
| from botocore.exceptions import ClientError | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel | |
| import uvicorn | |
| # FastAPI app | |
| app = FastAPI() | |
| # Initialize Redis client (adjust host/port/db as needed) | |
| redis_client = redis.Redis(host='localhost', port=6379, db=0) | |
| # Define the Redis queue name | |
| REDIS_QUEUE = "log_queue" | |
| # Request model | |
| class LogRequest(BaseModel): | |
| filename: str | |
| async def process_log(filename, file_content): | |
| # Save file_content to a temporary file and run RCA pipeline | |
| with tempfile.NamedTemporaryFile(delete=False, mode="wb", suffix=".log") as tmp: | |
| tmp.write(file_content) | |
| tmp_path = tmp.name | |
| loop = asyncio.get_event_loop() | |
| def _run_pipeline(): | |
| return detect_anomalies_and_explain(tmp_path) | |
| results = await loop.run_in_executor(None, _run_pipeline) | |
| os.unlink(tmp_path) | |
| if results and len(results) > 0: | |
| return results[0] | |
| else: | |
| return {"filename": filename, "anomaly": False, "details": "No anomaly detected."} | |
| S3_BUCKET = "your-s3-bucket-name" | |
| async def get_file_from_s3(filename): | |
| loop = asyncio.get_event_loop() | |
| def _download(): | |
| s3 = boto3.client("s3") | |
| try: | |
| response = s3.get_object(Bucket=S3_BUCKET, Key=filename.decode( | |
| ) if isinstance(filename, bytes) else filename) | |
| return response["Body"].read() | |
| except ClientError as e: | |
| print(f"Error downloading {filename} from S3: {e}") | |
| return None | |
| return await loop.run_in_executor(None, _download) | |
| async def main(): | |
| while True: | |
| loop = asyncio.get_event_loop() | |
| filename = await loop.run_in_executor(None, redis_client.rpop, REDIS_QUEUE) | |
| if filename: | |
| file_content = await get_file_from_s3(filename) | |
| if file_content is not None: | |
| rca_result = await process_log(filename, file_content) | |
| await save_rca_to_db(rca_result) | |
| try: | |
| await loop.run_in_executor(None, redis_client.lpush, READY_FOR_RCA_QUEUE, filename) | |
| print(f"Notified {READY_FOR_RCA_QUEUE} for {filename}") | |
| except Exception as redis_exc: | |
| print(f"Failed to notify ready-for-rca queue: {redis_exc}") | |
| else: | |
| print(f"File {filename} could not be downloaded from S3.") | |
| else: | |
| await asyncio.sleep(2) | |
| # FastAPI endpoint to process a log file from S3 | |
| async def process_log_endpoint(request: LogRequest): | |
| file_content = await get_file_from_s3(request.filename) | |
| if file_content is None: | |
| raise HTTPException(status_code=404, detail=f"File {request.filename} not found in S3 bucket.") | |
| result = await process_log(request.filename, file_content) | |
| return result | |
| if __name__ == "__main__": | |
| import sys | |
| if len(sys.argv) > 1 and sys.argv[1] == "serve": | |
| uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True) | |
| else: | |
| asyncio.run(main()) | |