File size: 3,183 Bytes
480ece8
6f2ff70
 
c19ec49
6f2ff70
 
 
480ece8
6f2ff70
 
 
 
 
480ece8
c19ec49
6f2ff70
 
 
 
 
 
 
 
c19ec49
6f2ff70
 
 
480ece8
 
6f2ff70
 
 
 
 
 
c19ec49
6f2ff70
 
 
 
 
 
 
 
480ece8
 
6f2ff70
c19ec49
6f2ff70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c19ec49
6f2ff70
 
480ece8
6f2ff70
 
 
 
480ece8
6f2ff70
480ece8
 
 
 
6f2ff70
480ece8
6f2ff70
 
 
 
 
 
 
 
 
 
 
c19ec49
 
6f2ff70
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100



import os
import asyncio
import tempfile
from logbert_rca_pipeline_api import detect_anomalies_and_explain
import redis
import boto3
from botocore.exceptions import ClientError
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import uvicorn


# FastAPI app
app = FastAPI()

# Initialize Redis client (adjust host/port/db as needed)
redis_client = redis.Redis(host='localhost', port=6379, db=0)

# Define the Redis queue name
REDIS_QUEUE = "log_queue"

# Request model
class LogRequest(BaseModel):
    filename: str


async def process_log(filename, file_content):
    # Save file_content to a temporary file and run RCA pipeline
    with tempfile.NamedTemporaryFile(delete=False, mode="wb", suffix=".log") as tmp:
        tmp.write(file_content)
        tmp_path = tmp.name
    loop = asyncio.get_event_loop()

    def _run_pipeline():
        return detect_anomalies_and_explain(tmp_path)
    results = await loop.run_in_executor(None, _run_pipeline)
    os.unlink(tmp_path)
    if results and len(results) > 0:
        return results[0]
    else:
        return {"filename": filename, "anomaly": False, "details": "No anomaly detected."}


S3_BUCKET = "your-s3-bucket-name"


async def get_file_from_s3(filename):
    loop = asyncio.get_event_loop()

    def _download():
        s3 = boto3.client("s3")
        try:
            response = s3.get_object(Bucket=S3_BUCKET, Key=filename.decode(
            ) if isinstance(filename, bytes) else filename)
            return response["Body"].read()
        except ClientError as e:
            print(f"Error downloading {filename} from S3: {e}")
            return None
    return await loop.run_in_executor(None, _download)


async def main():
    while True:
        loop = asyncio.get_event_loop()
        filename = await loop.run_in_executor(None, redis_client.rpop, REDIS_QUEUE)
        if filename:
            file_content = await get_file_from_s3(filename)
            if file_content is not None:
                rca_result = await process_log(filename, file_content)
                await save_rca_to_db(rca_result)
                try:
                    await loop.run_in_executor(None, redis_client.lpush, READY_FOR_RCA_QUEUE, filename)
                    print(f"Notified {READY_FOR_RCA_QUEUE} for {filename}")
                except Exception as redis_exc:
                    print(f"Failed to notify ready-for-rca queue: {redis_exc}")
            else:
                print(f"File {filename} could not be downloaded from S3.")
        else:
            await asyncio.sleep(2)


# FastAPI endpoint to process a log file from S3
@app.post("/process-log")
async def process_log_endpoint(request: LogRequest):
    file_content = await get_file_from_s3(request.filename)
    if file_content is None:
        raise HTTPException(status_code=404, detail=f"File {request.filename} not found in S3 bucket.")
    result = await process_log(request.filename, file_content)
    return result

if __name__ == "__main__":
    import sys
    if len(sys.argv) > 1 and sys.argv[1] == "serve":
        uvicorn.run("app:app", host="0.0.0.0", port=8000, reload=True)
    else:
        asyncio.run(main())