Leonydis137 commited on
Commit
e0f0929
·
verified ·
1 Parent(s): 33bc96e

Upload 10 files

Browse files
diagnostics.py CHANGED
@@ -1,20 +1,24 @@
1
 
2
- import platform
3
- import psutil
4
- import shutil
5
 
6
- def run_diagnostics():
7
- report = {
8
- "platform": platform.platform(),
9
- "cpu_percent": psutil.cpu_percent(),
10
- "memory": psutil.virtual_memory()._asdict(),
11
- "disk": shutil.disk_usage("/"),
12
- "status": "🟢 All systems functional"
13
- }
14
 
15
- if report["memory"]["percent"] > 90:
16
- report["status"] = "🟡 High memory usage"
17
- if report["cpu_percent"] > 85:
18
- report["status"] = "🟡 High CPU load"
 
 
 
 
 
 
19
 
20
- return report
 
 
 
 
 
1
 
2
+ import traceback
3
+ from datetime import datetime
 
4
 
5
+ class DiagnosticEngine:
6
+ def __init__(self):
7
+ self.error_logs = []
 
 
 
 
 
8
 
9
+ def capture_exception(self, e, context=""):
10
+ tb = traceback.format_exc()
11
+ error_entry = {
12
+ "timestamp": datetime.utcnow().isoformat(),
13
+ "error": str(e),
14
+ "traceback": tb,
15
+ "context": context
16
+ }
17
+ self.error_logs.append(error_entry)
18
+ return error_entry
19
 
20
+ def get_logs(self):
21
+ return self.error_logs
22
+
23
+ def last_error(self):
24
+ return self.error_logs[-1] if self.error_logs else None
external_api_integrations.py ADDED
@@ -0,0 +1,35 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import requests
3
+
4
+ # Define endpoints for demonstration
5
+ API_ENDPOINTS = {
6
+ "weather": "https://api.open-meteo.com/v1/forecast?latitude=52.52&longitude=13.41&current_weather=true",
7
+ "news": "https://newsapi.org/v2/top-headlines?country=us&apiKey=YOUR_NEWS_API_KEY",
8
+ "stocks": "https://finnhub.io/api/v1/quote?symbol=AAPL&token=YOUR_STOCK_API_KEY"
9
+ }
10
+
11
+ def get_weather():
12
+ try:
13
+ response = requests.get(API_ENDPOINTS["weather"])
14
+ return response.json()
15
+ except Exception as e:
16
+ return {"error": str(e)}
17
+
18
+ def get_news():
19
+ try:
20
+ response = requests.get(API_ENDPOINTS["news"])
21
+ return response.json()
22
+ except Exception as e:
23
+ return {"error": str(e)}
24
+
25
+ def get_stock():
26
+ try:
27
+ response = requests.get(API_ENDPOINTS["stocks"])
28
+ return response.json()
29
+ except Exception as e:
30
+ return {"error": str(e)}
31
+
32
+ if __name__ == "__main__":
33
+ print("🌦️ Weather:", get_weather())
34
+ print("🗞️ News:", get_news())
35
+ print("📈 Stock:", get_stock())
model_switcher.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import os
3
+ from sentence_transformers import SentenceTransformer
4
+
5
+ class ModelManager:
6
+ def __init__(self, default_model="all-MiniLM-L6-v2"):
7
+ self.current_model_name = default_model
8
+ self.model = SentenceTransformer(default_model)
9
+ self.available_models = [
10
+ "all-MiniLM-L6-v2",
11
+ "paraphrase-MiniLM-L3-v2",
12
+ "multi-qa-MiniLM-L6-cos-v1"
13
+ ]
14
+
15
+ def switch_model(self, model_name):
16
+ if model_name not in self.available_models:
17
+ raise ValueError(f"Model '{model_name}' is not in the list of available models.")
18
+ self.current_model_name = model_name
19
+ self.model = SentenceTransformer(model_name)
20
+ return f"Switched to model: {model_name}"
21
+
22
+ def get_current_model(self):
23
+ return self.current_model_name
24
+
25
+ def list_models(self):
26
+ return self.available_models
model_updater.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import json
3
+ import os
4
+ import time
5
+ from datetime import datetime
6
+
7
+ MODEL_VERSION_PATH = "model_version.json"
8
+
9
+ def load_version():
10
+ if os.path.exists(MODEL_VERSION_PATH):
11
+ with open(MODEL_VERSION_PATH, "r") as f:
12
+ return json.load(f)
13
+ return {"version": "1.0.0", "last_updated": None}
14
+
15
+ def update_model_version(new_version: str):
16
+ version_info = {
17
+ "version": new_version,
18
+ "last_updated": datetime.utcnow().isoformat()
19
+ }
20
+ with open(MODEL_VERSION_PATH, "w") as f:
21
+ json.dump(version_info, f, indent=2)
22
+ print(f"✅ Model updated to version {new_version}")
23
+
24
+ def auto_update():
25
+ version_data = load_version()
26
+ current_version = version_data["version"]
27
+ print(f"🔍 Current model version: {current_version}")
28
+
29
+ # Simulate version check and update
30
+ major, minor, patch = map(int, current_version.split('.'))
31
+ patch += 1
32
+ new_version = f"{major}.{minor}.{patch}"
33
+ update_model_version(new_version)
34
+
35
+ if __name__ == "__main__":
36
+ while True:
37
+ auto_update()
38
+ time.sleep(3600) # Update every hour
planner_scheduler.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import datetime
3
+ from typing import List, Dict, Optional
4
+
5
+ class Task:
6
+ def __init__(self, description: str, deadline: Optional[str] = None):
7
+ self.description = description
8
+ self.deadline = deadline
9
+ self.created_at = datetime.datetime.now()
10
+ self.status = "scheduled"
11
+
12
+ def to_dict(self) -> Dict:
13
+ return {
14
+ "description": self.description,
15
+ "deadline": self.deadline,
16
+ "created_at": self.created_at.isoformat(),
17
+ "status": self.status
18
+ }
19
+
20
+ class PlannerScheduler:
21
+ def __init__(self):
22
+ self.tasks: List[Task] = []
23
+
24
+ def schedule_task(self, description: str, deadline: Optional[str] = None) -> str:
25
+ task = Task(description, deadline)
26
+ self.tasks.append(task)
27
+ return f"Task '{description}' scheduled."
28
+
29
+ def list_tasks(self, status_filter: Optional[str] = None) -> List[Dict]:
30
+ return [t.to_dict() for t in self.tasks if status_filter is None or t.status == status_filter]
31
+
32
+ def update_task_status(self, index: int, new_status: str) -> str:
33
+ if 0 <= index < len(self.tasks):
34
+ self.tasks[index].status = new_status
35
+ return f"Task '{self.tasks[index].description}' marked as {new_status}."
36
+ return "Invalid task index."
prompt_optimizer.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import random
3
+
4
+ class PromptOptimizer:
5
+ def __init__(self):
6
+ self.prompts = [
7
+ "What is your main objective today?",
8
+ "How can I assist with your current task?",
9
+ "What’s the next priority to focus on?",
10
+ ]
11
+ self.performance_data = {}
12
+
13
+ def get_prompt(self):
14
+ return random.choice(self.prompts)
15
+
16
+ def record_feedback(self, prompt, success=True):
17
+ if prompt not in self.performance_data:
18
+ self.performance_data[prompt] = {"success": 0, "fail": 0}
19
+ if success:
20
+ self.performance_data[prompt]["success"] += 1
21
+ else:
22
+ self.performance_data[prompt]["fail"] += 1
23
+
24
+ def optimize_prompts(self):
25
+ # Basic strategy: remove consistently failing prompts
26
+ self.prompts = [
27
+ p for p in self.prompts
28
+ if self.performance_data.get(p, {}).get("fail", 0) < 3
29
+ ]
secure_api_gateway.py ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from fastapi import FastAPI, Request, Header, HTTPException
3
+
4
+ app = FastAPI()
5
+
6
+ AUTHORIZED_KEYS = {
7
+ "admin-key-123": "admin",
8
+ "dev-key-456": "developer",
9
+ "user-key-789": "user"
10
+ }
11
+
12
+ @app.middleware("http")
13
+ async def verify_api_key(request: Request, call_next):
14
+ api_key = request.headers.get("x-api-key")
15
+ if not api_key or api_key not in AUTHORIZED_KEYS:
16
+ raise HTTPException(status_code=401, detail="Unauthorized")
17
+ response = await call_next(request)
18
+ return response
19
+
20
+ @app.get("/secure-data")
21
+ async def secure_data(x_api_key: str = Header(...)):
22
+ user_role = AUTHORIZED_KEYS.get(x_api_key, "unknown")
23
+ return {"status": "success", "role": user_role, "data": "🔐 confidential data"}
self_assessment.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import time
3
+ import json
4
+ from datetime import datetime
5
+
6
+ class SelfAssessmentLogger:
7
+ def __init__(self, file_path="self_assessment_log.json"):
8
+ self.file_path = file_path
9
+ self.log = []
10
+
11
+ def log_assessment(self, module_name, status, details=""):
12
+ entry = {
13
+ "timestamp": datetime.utcnow().isoformat(),
14
+ "module": module_name,
15
+ "status": status,
16
+ "details": details
17
+ }
18
+ self.log.append(entry)
19
+ self._persist()
20
+
21
+ def assess_module(self, module_name, test_func):
22
+ try:
23
+ result = test_func()
24
+ status = "PASS" if result else "FAIL"
25
+ self.log_assessment(module_name, status)
26
+ return status
27
+ except Exception as e:
28
+ self.log_assessment(module_name, "ERROR", str(e))
29
+ return "ERROR"
30
+
31
+ def _persist(self):
32
+ with open(self.file_path, "w") as f:
33
+ json.dump(self.log, f, indent=2)
34
+
35
+ def get_log(self):
36
+ return self.log
self_evaluator.py ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ import time
3
+ import logging
4
+
5
+ logging.basicConfig(level=logging.INFO)
6
+
7
+ class SelfEvaluator:
8
+ def __init__(self):
9
+ self.evaluation_log = []
10
+
11
+ def evaluate(self, task_output: str):
12
+ quality_score = self.heuristic_score(task_output)
13
+ self.evaluation_log.append({
14
+ "output": task_output,
15
+ "score": quality_score,
16
+ "timestamp": time.time()
17
+ })
18
+ logging.info(f"Self-evaluation completed: Score={quality_score}")
19
+ return quality_score
20
+
21
+ def heuristic_score(self, output: str):
22
+ # Simple evaluation heuristic based on length and keyword presence
23
+ if "error" in output.lower():
24
+ return 0.2
25
+ elif len(output) < 20:
26
+ return 0.5
27
+ else:
28
+ return 0.9
29
+
30
+ def get_log(self):
31
+ return self.evaluation_log
user_feedback.py ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+
2
+ from fastapi import FastAPI, Request
3
+ from pydantic import BaseModel
4
+ from typing import List
5
+ import datetime
6
+
7
+ app = FastAPI()
8
+
9
+ feedback_storage = []
10
+
11
+ class FeedbackEntry(BaseModel):
12
+ user_id: str
13
+ feedback_text: str
14
+ rating: int # Scale 1–5
15
+
16
+ @app.post("/submit_feedback")
17
+ async def submit_feedback(entry: FeedbackEntry):
18
+ timestamped_entry = {
19
+ "user_id": entry.user_id,
20
+ "feedback": entry.feedback_text,
21
+ "rating": entry.rating,
22
+ "timestamp": datetime.datetime.utcnow().isoformat()
23
+ }
24
+ feedback_storage.append(timestamped_entry)
25
+ return {"status": "success", "received": timestamped_entry}
26
+
27
+ @app.get("/feedback_log")
28
+ def get_feedback_log() -> List[dict]:
29
+ return feedback_storage