Spaces:
Runtime error
Runtime error
File size: 3,183 Bytes
480ece8 6f2ff70 c19ec49 6f2ff70 480ece8 6f2ff70 480ece8 c19ec49 6f2ff70 c19ec49 6f2ff70 480ece8 6f2ff70 c19ec49 6f2ff70 480ece8 6f2ff70 c19ec49 6f2ff70 c19ec49 6f2ff70 480ece8 6f2ff70 480ece8 6f2ff70 480ece8 6f2ff70 480ece8 6f2ff70 c19ec49 6f2ff70 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
import os
import asyncio
import tempfile
from logbert_rca_pipeline_api import detect_anomalies_and_explain
import redis
import boto3
from botocore.exceptions import ClientError
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn
# FastAPI app
app = FastAPI()
# Initialize Redis client (adjust host/port/db as needed)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# Define the Redis queue name
REDIS_QUEUE = "log_queue"
# Request model
class LogRequest(BaseModel):
filename: str
async def process_log(filename, file_content):
# Save file_content to a temporary file and run RCA pipeline
with tempfile.NamedTemporaryFile(delete=False, mode="wb", suffix=".log") as tmp:
tmp.write(file_content)
tmp_path = tmp.name
loop = asyncio.get_event_loop()
def _run_pipeline():
return detect_anomalies_and_explain(tmp_path)
results = await loop.run_in_executor(None, _run_pipeline)
os.unlink(tmp_path)
if results and len(results) > 0:
return results[0]
else:
return {"filename": filename, "anomaly": False, "details": "No anomaly detected."}
S3_BUCKET = "your-s3-bucket-name"
async def get_file_from_s3(filename):
loop = asyncio.get_event_loop()
def _download():
s3 = boto3.client("s3")
try:
response = s3.get_object(Bucket=S3_BUCKET, Key=filename.decode(
) if isinstance(filename, bytes) else filename)
return response["Body"].read()
except ClientError as e:
print(f"Error downloading {filename} from S3: {e}")
return None
return await loop.run_in_executor(None, _download)
async def main():
while True:
loop = asyncio.get_event_loop()
filename = await loop.run_in_executor(None, redis_client.rpop, REDIS_QUEUE)
if filename:
file_content = await get_file_from_s3(filename)
if file_content is not None:
rca_result = await process_log(filename, file_content)
await save_rca_to_db(rca_result)
try:
await loop.run_in_executor(None, redis_client.lpush, READY_FOR_RCA_QUEUE, filename)
print(f"Notified {READY_FOR_RCA_QUEUE} for {filename}")
except Exception as redis_exc:
print(f"Failed to notify ready-for-rca queue: {redis_exc}")
else:
print(f"File {filename} could not be downloaded from S3.")
else:
await asyncio.sleep(2)
# FastAPI endpoint to process a log file from S3
@app.post("/process-log")
async def process_log_endpoint(request: LogRequest):
file_content = await get_file_from_s3(request.filename)
if file_content is None:
raise HTTPException(status_code=404, detail=f"File {request.filename} not found in S3 bucket.")
result = await process_log(request.filename, file_content)
return result
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "serve":
uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
else:
asyncio.run(main())
|