Gradi02 commited on
Commit
704e5bb
·
unverified ·
2 Parent(s): 5870f43e8d43fa

Merge pull request #14 from Tobkubos/backend-setup

Browse files
backend/app/api/routes.py CHANGED
@@ -1,7 +1,11 @@
 
 
1
  import logging
2
  from fastapi import APIRouter, HTTPException, Request, status
 
3
  from slowapi.errors import RateLimitExceeded
4
  from limits import parse
 
5
 
6
  from app.models.schemas import (
7
  AnalysisRequest,
@@ -23,6 +27,7 @@ from app.config_manager import _load_all_configs, save_guild_config
23
  logger = logging.getLogger(__name__)
24
 
25
  router = APIRouter()
 
26
 
27
  @router.get(
28
  "/",
@@ -109,26 +114,8 @@ async def get_discord_guild_config(guild_id: str):
109
  "log_channel_id": guild_config.get("log_channel_id", None)
110
  }
111
 
112
- @router.post(
113
- "/analyze",
114
- response_model=AnalysisResponse,
115
- responses={
116
- 400: {"model": ErrorResponse, "description": "Bad request"},
117
- 408: {"model": ErrorResponse, "description": "Request timeout"},
118
- 415: {"model": ErrorResponse, "description": "Unsupported media type"},
119
- 429: {"model": ErrorResponse, "description": "Too many requests"},
120
- 500: {"model": ErrorResponse, "description": "Internal server error"},
121
- },
122
- tags=["Analysis"],
123
- summary="Analyze content for deepfake detection",
124
- )
125
- async def analyze(request: Request, payload: AnalysisRequest) -> AnalysisResponse:
126
- guild_id = payload.guild_id
127
- limit_item = parse("1/5seconds")
128
-
129
- if not limiter.limiter.hit(limit_item, f"analyze:{guild_id}"):
130
- raise HTTPException(status_code=429, detail="Rate limit exceeded for this guild")
131
-
132
  if isinstance(payload, TextAnalysisRequest):
133
  content_type = "text"
134
  elif isinstance(payload, ImageAnalysisRequest):
@@ -136,28 +123,29 @@ async def analyze(request: Request, payload: AnalysisRequest) -> AnalysisRespons
136
  else:
137
  raise HTTPException(
138
  status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
139
- detail="Unsupported file/content type. Only text and image are currently supported."
140
  )
141
 
142
- settings = get_settings()
143
-
144
  try:
145
  if content_type == "text":
146
  if len(payload.text) > settings.MAX_CONTENT_SIZES["text"]:
147
- raise ValueError(f"Text content exceeds maximum length of {settings.MAX_CONTENT_SIZES['text']} characters")
148
  if len(payload.text) < 50:
149
  raise ValueError("Text content must be at least 50 characters")
150
 
151
- analysis_result = await analyze_text(payload.text, guild_id)
152
 
153
  elif content_type == "image":
154
  image_bytes = await download_file(str(payload.image_url))
155
  if not image_bytes:
156
  raise ValueError("Failed to download image")
157
  if len(image_bytes) > settings.MAX_CONTENT_SIZES["image"]:
158
- raise ValueError(f"Image size exceeds maximum of {settings.MAX_CONTENT_SIZES['image']} bytes")
159
 
160
- analysis_result = await analyze_image(image_bytes, guild_id)
 
 
 
161
 
162
  except ValueError as e:
163
  raise HTTPException(status_code=400, detail=str(e))
@@ -166,10 +154,66 @@ async def analyze(request: Request, payload: AnalysisRequest) -> AnalysisRespons
166
  except DeepfakeDetectionError as e:
167
  raise HTTPException(status_code=e.status_code, detail=e.message)
168
  except Exception as e:
169
- logger.error(f"{content_type.capitalize()} analysis error: {str(e)}", exc_info=True)
170
  raise HTTPException(status_code=500, detail=f"Failed to analyze {content_type}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
 
 
 
172
  logger.info(f"{content_type.capitalize()} analysis completed. Result: {analysis_result}")
 
173
  used_model = analysis_result.get("used_model", settings.AVAILABLE_MODELS.get(content_type)[0])
174
 
175
  return AnalysisResponse(
 
1
+ import asyncio
2
+ from collections import defaultdict
3
  import logging
4
  from fastapi import APIRouter, HTTPException, Request, status
5
+ from app.services.queue import get_queue_service
6
  from slowapi.errors import RateLimitExceeded
7
  from limits import parse
8
+ from redis.exceptions import LockError
9
 
10
  from app.models.schemas import (
11
  AnalysisRequest,
 
27
  logger = logging.getLogger(__name__)
28
 
29
  router = APIRouter()
30
+ local_guild_locks = defaultdict(asyncio.Lock)
31
 
32
  @router.get(
33
  "/",
 
114
  "log_channel_id": guild_config.get("log_channel_id", None)
115
  }
116
 
117
+ async def _execute_analysis(payload: AnalysisRequest, guild_id: str, settings) -> dict:
118
+ """Funkcja pomocnicza wykonująca właściwy proces pobierania i analizy."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
119
  if isinstance(payload, TextAnalysisRequest):
120
  content_type = "text"
121
  elif isinstance(payload, ImageAnalysisRequest):
 
123
  else:
124
  raise HTTPException(
125
  status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE,
126
+ detail="Unsupported file/content type."
127
  )
128
 
 
 
129
  try:
130
  if content_type == "text":
131
  if len(payload.text) > settings.MAX_CONTENT_SIZES["text"]:
132
+ raise ValueError(f"Text content exceeds maximum length.")
133
  if len(payload.text) < 50:
134
  raise ValueError("Text content must be at least 50 characters")
135
 
136
+ result = await analyze_text(payload.text, guild_id)
137
 
138
  elif content_type == "image":
139
  image_bytes = await download_file(str(payload.image_url))
140
  if not image_bytes:
141
  raise ValueError("Failed to download image")
142
  if len(image_bytes) > settings.MAX_CONTENT_SIZES["image"]:
143
+ raise ValueError(f"Image size exceeds maximum.")
144
 
145
+ result = await analyze_image(image_bytes, guild_id)
146
+
147
+ result["content_type"] = content_type
148
+ return result
149
 
150
  except ValueError as e:
151
  raise HTTPException(status_code=400, detail=str(e))
 
154
  except DeepfakeDetectionError as e:
155
  raise HTTPException(status_code=e.status_code, detail=e.message)
156
  except Exception as e:
157
+ logger.error(f"Analysis error: {str(e)}", exc_info=True)
158
  raise HTTPException(status_code=500, detail=f"Failed to analyze {content_type}")
159
+
160
+ @router.post(
161
+ "/analyze",
162
+ response_model=AnalysisResponse,
163
+ responses={
164
+ 400: {"model": ErrorResponse, "description": "Bad request"},
165
+ 408: {"model": ErrorResponse, "description": "Request timeout"},
166
+ 415: {"model": ErrorResponse, "description": "Unsupported media type"},
167
+ 429: {"model": ErrorResponse, "description": "Too many requests"},
168
+ 500: {"model": ErrorResponse, "description": "Internal server error"},
169
+ },
170
+ tags=["Analysis"],
171
+ summary="Analyze content for deepfake detection",
172
+ )
173
+ async def analyze(request: Request, payload: AnalysisRequest) -> AnalysisResponse:
174
+ guild_id = payload.guild_id
175
+ user_id = payload.user_id
176
+
177
+ limit_item = parse("1/5seconds")
178
+ if not limiter.limiter.hit(limit_item, f"analyze:user:{user_id}"):
179
+ logger.warning(f"Użytkownik {user_id} przekroczył limit zapytań (1/5s).")
180
+ raise HTTPException(
181
+ status_code=429,
182
+ detail="Przekroczyłeś limit zapytań. Możesz wykonać tylko 1 analizę na 5 sekund."
183
+ )
184
+
185
+ settings = get_settings()
186
+ queue_service = get_queue_service()
187
+
188
+ # 2. Sprawdzamy, czy Redis jest aktywny w pliku konfiguracyjnym oraz czy klient został zainicjalizowany
189
+ if settings.REDIS_ENABLED and queue_service.redis_client is not None:
190
+ logger.info(f"Używam rozproszonej blokady Redis dla użytkownika {user_id}")
191
+
192
+ # Tworzymy blokadę przy użyciu klienta z QueueService [1.2.6]
193
+ redis_lock = queue_service.redis_client.lock(
194
+ "global_analysis_queue_lock", timeout=60, blocking_timeout=120
195
+ )
196
+ try:
197
+ async with redis_lock:
198
+ analysis_result = await _execute_analysis(payload, guild_id, settings)
199
+ except LockError:
200
+ logger.error(f"Użytkownik {user_id} odrzucony z kolejki Redis z powodu timeoutu.")
201
+ raise HTTPException(
202
+ status_code=503,
203
+ detail="Serwer jest zbyt zajęty (kolejka Redis przepełniona). Spróbuj ponownie za chwilę."
204
+ )
205
+ else:
206
+ # FALLBACK: Jeśli Redis jest wyłączony, aplikacja automatycznie używa kolejki in-memory
207
+ logger.info(f"Redis jest wyłączony. Używam lokalnej blokady in-memory dla gildii {guild_id}")
208
+
209
+ local_lock = local_guild_locks[guild_id]
210
+ async with local_lock:
211
+ analysis_result = await _execute_analysis(payload, guild_id, settings)
212
 
213
+ # 3. Zwrócenie wyniku analizy
214
+ content_type = analysis_result["content_type"]
215
  logger.info(f"{content_type.capitalize()} analysis completed. Result: {analysis_result}")
216
+
217
  used_model = analysis_result.get("used_model", settings.AVAILABLE_MODELS.get(content_type)[0])
218
 
219
  return AnalysisResponse(
backend/app/models/schemas.py CHANGED
@@ -6,6 +6,7 @@ class TextAnalysisRequest(BaseModel):
6
  content_type: Literal["text"]
7
  text: str = Field(..., description="Text content to analyze for deepfake detection")
8
  guild_id: str = Field(..., description="ID serwera Discord, z którego pochodzi żądanie")
 
9
 
10
  class Config:
11
  json_schema_extra = {
@@ -20,6 +21,7 @@ class ImageAnalysisRequest(BaseModel):
20
  content_type: Literal["image"]
21
  image_url: HttpUrl = Field(..., description="URL of the image to analyze")
22
  guild_id: str = Field(..., description="ID serwera Discord, z którego pochodzi żądanie")
 
23
 
24
  class Config:
25
  json_schema_extra = {
 
6
  content_type: Literal["text"]
7
  text: str = Field(..., description="Text content to analyze for deepfake detection")
8
  guild_id: str = Field(..., description="ID serwera Discord, z którego pochodzi żądanie")
9
+ user_id: str = Field(..., description="ID użytkownika Discord, który wywołał analizę")
10
 
11
  class Config:
12
  json_schema_extra = {
 
21
  content_type: Literal["image"]
22
  image_url: HttpUrl = Field(..., description="URL of the image to analyze")
23
  guild_id: str = Field(..., description="ID serwera Discord, z którego pochodzi żądanie")
24
+ user_id: str = Field(..., description="ID użytkownika Discord, który wywołał analizę")
25
 
26
  class Config:
27
  json_schema_extra = {
backend/app/services/queue.py CHANGED
@@ -1,6 +1,7 @@
1
  import logging
2
  from typing import Optional, Any, Dict
3
  import json
 
4
 
5
  from app.core.config import get_settings
6
 
@@ -17,17 +18,23 @@ class QueueService:
17
  def __init__(self):
18
  """Initialize the queue service."""
19
  self.settings = get_settings()
20
- self.redis_client = None
21
 
22
  if self.settings.REDIS_ENABLED:
23
  self._initialize_redis()
24
 
25
  def _initialize_redis(self):
26
- """Initialize Redis connection (future implementation)."""
27
- # This will be implemented when Redis support is added
28
- logger.info(
29
- f"Redis queue service initialized: {self.settings.REDIS_URL}"
30
- )
 
 
 
 
 
 
31
 
32
  async def enqueue_analysis(
33
  self,
@@ -36,15 +43,7 @@ class QueueService:
36
  task_id: str,
37
  ) -> bool:
38
  """
39
- Enqueue an analysis task.
40
-
41
- Args:
42
- file_url: URL of the file to analyze
43
- model: Detector model to use
44
- task_id: Unique task identifier
45
-
46
- Returns:
47
- True if successful, False otherwise
48
  """
49
  task_data = {
50
  "task_id": task_id,
@@ -54,33 +53,24 @@ class QueueService:
54
 
55
  logger.info(f"Enqueuing analysis task: {task_id}")
56
 
57
- if self.settings.REDIS_ENABLED:
58
- # Future: Push to Redis queue
59
- # await self.redis_client.lpush(
60
- # self.settings.REDIS_QUEUE_NAME,
61
- # json.dumps(task_data)
62
- # )
63
- pass
64
 
65
- return True
66
 
67
  async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
68
  """
69
  Get analysis result for a task.
70
-
71
- Args:
72
- task_id: Task identifier
73
-
74
- Returns:
75
- Analysis result or None if not found
76
  """
77
  logger.info(f"Retrieving result for task: {task_id}")
78
 
79
- if self.settings.REDIS_ENABLED:
80
- # Future: Get from Redis
81
- # result = await self.redis_client.get(f"result:{task_id}")
82
- # return json.loads(result) if result else None
83
- pass
84
 
85
  return None
86
 
 
1
  import logging
2
  from typing import Optional, Any, Dict
3
  import json
4
+ import redis.asyncio as redis
5
 
6
  from app.core.config import get_settings
7
 
 
18
  def __init__(self):
19
  """Initialize the queue service."""
20
  self.settings = get_settings()
21
+ self.redis_client = Optional[redis.Redis] = None
22
 
23
  if self.settings.REDIS_ENABLED:
24
  self._initialize_redis()
25
 
26
  def _initialize_redis(self):
27
+
28
+ try:
29
+ self.redis_client = redis.from_url(
30
+ self.settings.REDIS_URL, decode_responses=True
31
+ )
32
+ logger.info(
33
+ f"Redis queue service initialized successfully: {self.settings.REDIS_URL}"
34
+ )
35
+ except Exception as e:
36
+ logger.error(f"Failed to initialize Redis client: {e}")
37
+ self.redis_client = None
38
 
39
  async def enqueue_analysis(
40
  self,
 
43
  task_id: str,
44
  ) -> bool:
45
  """
46
+ Enqueue an analysis task (future background worker implementation).
 
 
 
 
 
 
 
 
47
  """
48
  task_data = {
49
  "task_id": task_id,
 
53
 
54
  logger.info(f"Enqueuing analysis task: {task_id}")
55
 
56
+ if self.settings.REDIS_ENABLED and self.redis_client:
57
+ await self.redis_client.lpush(
58
+ self.settings.REDIS_QUEUE_NAME,
59
+ json.dumps(task_data)
60
+ )
61
+ return True
 
62
 
63
+ return False
64
 
65
  async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
66
  """
67
  Get analysis result for a task.
 
 
 
 
 
 
68
  """
69
  logger.info(f"Retrieving result for task: {task_id}")
70
 
71
+ if self.settings.REDIS_ENABLED and self.redis_client:
72
+ result = await self.redis_client.get(f"result:{task_id}")
73
+ return json.loads(result) if result else None
 
 
74
 
75
  return None
76
 
backend/requirements.txt CHANGED
@@ -12,4 +12,5 @@ protobuf
12
  Pillow
13
  slowapi
14
  pytest==7.4.3
15
- pytest-asyncio==0.21.1
 
 
12
  Pillow
13
  slowapi
14
  pytest==7.4.3
15
+ pytest-asyncio==0.21.1
16
+ redis
index.js CHANGED
@@ -233,6 +233,7 @@ async function handleAnalysis(interaction, userContent, targetMessage = null, ex
233
  try {
234
  const { type, payload } = preparePayload(userContent, explicitContentType);
235
  payload.guild_id = interaction.guildId;
 
236
 
237
  console.log(`Wysyłanie zapytania typu: ${type} do API z modelem: ${payload.model || "domyślny"}...`);
238
 
 
233
  try {
234
  const { type, payload } = preparePayload(userContent, explicitContentType);
235
  payload.guild_id = interaction.guildId;
236
+ payload.user_id = interaction.user.id;
237
 
238
  console.log(`Wysyłanie zapytania typu: ${type} do API z modelem: ${payload.model || "domyślny"}...`);
239