MukeshKapoor25 commited on
Commit
480ece8
·
1 Parent(s): a9a6245

Implement Redis integration and async database operations for anomaly results

Browse files
Files changed (2) hide show
  1. app.py +37 -8
  2. sql.py +89 -0
app.py CHANGED
@@ -1,30 +1,59 @@
 
1
  import time
2
  import os
3
- import requests
 
 
 
4
 
 
5
  UPLOAD_DIR = "/data/uploads"
6
  PROCESSED_DIR = "/data/processed"
7
  os.makedirs(PROCESSED_DIR, exist_ok=True)
8
 
 
 
 
 
 
 
 
 
 
9
  def process_log(file_path):
10
  # Dummy anomaly detection logic; replace with your real pipeline
11
  # For example, call your detect_anomalies_and_explain here
12
  return {"filename": os.path.basename(file_path), "anomaly": True, "details": "Example anomaly detected."}
13
 
 
 
14
  def save_rca_to_db(rca_result):
15
- # Replace with actual DB insert logic (e.g., via RCA API or direct DB connection)
16
- requests.post("http://rca_api:8000/rca/", json=rca_result)
 
 
 
 
17
 
18
  def main():
19
- processed_files = set()
20
  while True:
21
- for filename in os.listdir(UPLOAD_DIR):
 
 
22
  file_path = os.path.join(UPLOAD_DIR, filename)
23
- if filename not in processed_files and os.path.isfile(file_path):
24
  rca_result = process_log(file_path)
25
  save_rca_to_db(rca_result)
26
- processed_files.add(filename)
27
- time.sleep(5)
 
 
 
 
 
 
 
 
28
 
29
  if __name__ == "__main__":
30
  main()
 
1
+
2
  import time
3
  import os
4
+ import redis
5
+
6
+ import asyncio
7
+ from sql import insert_rca_result, connect_to_database, disconnect_from_database
8
 
9
+ # Directories
10
  UPLOAD_DIR = "/data/uploads"
11
  PROCESSED_DIR = "/data/processed"
12
  os.makedirs(PROCESSED_DIR, exist_ok=True)
13
 
14
+ # Redis configuration (must be set in environment variables)
15
+ REDIS_HOST = os.environ["REDIS_HOST"]
16
+ REDIS_PORT = int(os.environ["REDIS_PORT"])
17
+ REDIS_QUEUE = os.environ["REDIS_QUEUE"]
18
+ READY_FOR_RCA_QUEUE = os.environ.get("READY_FOR_RCA_QUEUE", "logbert_ready_for_rca")
19
+
20
+ # Initialize Redis client
21
+ redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, decode_responses=True)
22
+
23
  def process_log(file_path):
24
  # Dummy anomaly detection logic; replace with your real pipeline
25
  # For example, call your detect_anomalies_and_explain here
26
  return {"filename": os.path.basename(file_path), "anomaly": True, "details": "Example anomaly detected."}
27
 
28
+
29
+
30
  def save_rca_to_db(rca_result):
31
+ # Store anomaly result in rca_results table using sql.py
32
+ async def _save():
33
+ await connect_to_database()
34
+ await insert_rca_result(rca_result)
35
+ await disconnect_from_database()
36
+ asyncio.run(_save())
37
 
38
  def main():
 
39
  while True:
40
+ # Block until a filename is available in the Redis queue
41
+ filename = redis_client.rpop(REDIS_QUEUE)
42
+ if filename:
43
  file_path = os.path.join(UPLOAD_DIR, filename)
44
+ if os.path.isfile(file_path):
45
  rca_result = process_log(file_path)
46
  save_rca_to_db(rca_result)
47
+ # Notify ready-for-rca queue
48
+ try:
49
+ redis_client.lpush(READY_FOR_RCA_QUEUE, filename)
50
+ print(f"Notified {READY_FOR_RCA_QUEUE} for {filename}")
51
+ except Exception as redis_exc:
52
+ print(f"Failed to notify ready-for-rca queue: {redis_exc}")
53
+ else:
54
+ print(f"File not found: {file_path}")
55
+ else:
56
+ time.sleep(2)
57
 
58
  if __name__ == "__main__":
59
  main()
sql.py ADDED
@@ -0,0 +1,89 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import logging
3
+ from dotenv import load_dotenv
4
+ import databases
5
+ import sqlalchemy
6
+ from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
7
+ from sqlalchemy.orm import sessionmaker
8
+
9
+ # Load environment variables from .env file
10
+ load_dotenv()
11
+
12
+ # Configure logging
13
+ logging.basicConfig(
14
+ level=logging.INFO,
15
+ format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
16
+ )
17
+ logger = logging.getLogger(__name__)
18
+
19
+ # Load the DATABASE_URL from environment variables
20
+
21
+ # Explicitly load the .env file from the current project directory
22
+ ENV_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../.env")
23
+ load_dotenv(dotenv_path=ENV_PATH)
24
+
25
+ DATABASE_URL = os.getenv("DATABASE_URI")
26
+
27
+ if not DATABASE_URL:
28
+ logger.error("DATABASE_URI not found in environment variables.")
29
+ raise ValueError("DATABASE_URI not found in environment variables. Ensure it is set in the .env file.")
30
+
31
+
32
+ # Initialize the database connection and metadata
33
+ try:
34
+ database = databases.Database(DATABASE_URL)
35
+ metadata = sqlalchemy.MetaData()
36
+ logger.info("Database connection initialized successfully.")
37
+ except Exception as e:
38
+ logger.error("Failed to initialize database connection: %s", e)
39
+ raise
40
+
41
+ # Define rca_results table
42
+ rca_results = sqlalchemy.Table(
43
+ "rca_results",
44
+ metadata,
45
+ sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True, autoincrement=True),
46
+ sqlalchemy.Column("filename", sqlalchemy.String, nullable=False),
47
+ sqlalchemy.Column("anomaly", sqlalchemy.Boolean, nullable=False),
48
+ sqlalchemy.Column("details", sqlalchemy.Text, nullable=True),
49
+ )
50
+
51
+ # Insert anomaly result into rca_results table
52
+ async def insert_rca_result(rca_result):
53
+ query = rca_results.insert().values(
54
+ filename=rca_result["filename"],
55
+ anomaly=rca_result["anomaly"],
56
+ details=rca_result.get("details", "")
57
+ )
58
+ await database.execute(query)
59
+
60
+ # Create async SQLAlchemy engine and session for PostgreSQL
61
+ ASYNC_DATABASE_URL = DATABASE_URL.replace('postgresql://', 'postgresql+asyncpg://')
62
+ engine = create_async_engine(ASYNC_DATABASE_URL, echo=False, future=True)
63
+ async_session = sessionmaker(
64
+ engine, expire_on_commit=False, class_=AsyncSession
65
+ )
66
+
67
+ # Helper function to initialize database
68
+ async def connect_to_database():
69
+ """
70
+ Connects to the database when the application starts.
71
+ """
72
+ try:
73
+ await database.connect()
74
+ logger.info("Successfully connected to the database.")
75
+ except Exception as e:
76
+ logger.error("Error connecting to the database: %s", e)
77
+ raise
78
+
79
+ # Helper function to disconnect database
80
+ async def disconnect_from_database():
81
+ """
82
+ Disconnects from the database when the application shuts down.
83
+ """
84
+ try:
85
+ await database.disconnect()
86
+ logger.info("Successfully disconnected from the database.")
87
+ except Exception as e:
88
+ logger.error("Error disconnecting from the database: %s", e)
89
+ raise