Jobsforce commited on
Commit
d17c2ae
·
verified ·
1 Parent(s): 785e0b8

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +51 -60
app.py CHANGED
@@ -1,92 +1,83 @@
1
- from fastapi import FastAPI, Request, BackgroundTasks
2
- from fastapi.responses import StreamingResponse, JSONResponse
3
  from transformers import pipeline
4
  from nltk.tokenize import sent_tokenize
5
- import asyncio
6
- import uuid
7
- import logging
8
  import json
9
- from typing import Dict
10
- from collections import deque
 
 
11
 
12
  app = FastAPI()
13
-
14
- # Logging
15
- logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
16
  logger = logging.getLogger(__name__)
17
 
18
- # Load pipeline once at startup
19
  classifier = pipeline("text-classification", model="priyabrat/AI.or.Human.text.classification")
20
- sessions: Dict[str, Dict] = {}
21
- queues: Dict[str, deque] = {}
 
 
 
 
22
 
23
  @app.get("/health")
24
- async def health():
25
  return {"status": "healthy"}
26
 
27
  @app.post("/start-session")
28
- async def start_session(request: Request, background_tasks: BackgroundTasks):
29
  data = await request.json()
30
  user_id = data.get("user_id")
31
  text = data.get("text")
32
 
33
  if not user_id or not text:
34
- return JSONResponse(content={"error": "user_id and text are required"}, status_code=400)
35
 
36
  if user_id in sessions:
37
- return JSONResponse(content={"message": "Session already exists", "status": sessions[user_id]["status"]}, status_code=409)
38
-
39
- sessions[user_id] = {"status": "processing"}
40
- queues[user_id] = deque()
41
-
42
- background_tasks.add_task(process_text, user_id, text)
43
- return {"message": "Session started", "status": "processing"}
44
-
45
- async def process_text(user_id: str, text: str):
46
- try:
47
- lines = sent_tokenize(text) if '\n' not in text else [l.strip() for l in text.strip().split('\n') if l.strip()]
48
- for idx, line in enumerate(lines, 1):
49
- result = classifier(line)[0]
50
- label = result['label']
51
- confidence = round(result['score'] * 100, 2)
52
- payload = {
53
- "line": idx,
54
- "text": line,
55
- "label": "AI-generated" if label == "LABEL_0" else "Human-written",
56
- "confidence": confidence
57
- }
58
- queues[user_id].append(f"data: {json.dumps(payload)}\n\n")
59
- await asyncio.sleep(0.1)
60
-
61
- queues[user_id].append("event: done\ndata: Session complete\n\n")
62
- except Exception as e:
63
- logger.error(f"Error: {e}")
64
- queues[user_id].append(f"event: error\ndata: {str(e)}\n\n")
65
- finally:
66
- sessions[user_id]['status'] = "done"
67
- await asyncio.sleep(1)
68
- sessions.pop(user_id, None)
69
- queues.pop(user_id, None)
70
 
71
  @app.get("/stream/{user_id}")
72
  async def stream(user_id: str):
73
  if user_id not in sessions:
74
- return JSONResponse(content={"error": "No active session"}, status_code=404)
75
 
76
- async def event_stream():
77
  while True:
78
- if queues[user_id]:
79
- yield queues[user_id].popleft()
80
- elif sessions[user_id]['status'] == 'done':
 
 
 
 
81
  break
82
- await asyncio.sleep(0.1)
83
 
84
  return StreamingResponse(event_stream(), media_type="text/event-stream")
85
 
86
  @app.get("/status/{user_id}")
87
- async def status(user_id: str):
88
  return {"status": sessions.get(user_id, {}).get("status", "no_session")}
89
-
90
- @app.get("/")
91
- async def index():
92
- return {"message": "Server is running"}
 
1
+ from fastapi import FastAPI, Request
2
+ from fastapi.responses import JSONResponse, StreamingResponse, PlainTextResponse
3
  from transformers import pipeline
4
  from nltk.tokenize import sent_tokenize
5
+ import time
 
 
6
  import json
7
+ import threading
8
+ import queue
9
+ import logging
10
+ import os
11
 
12
  app = FastAPI()
13
+ logging.basicConfig(level=logging.INFO)
 
 
14
  logger = logging.getLogger(__name__)
15
 
 
16
  classifier = pipeline("text-classification", model="priyabrat/AI.or.Human.text.classification")
17
+ sessions = {}
18
+ queues = {}
19
+
20
+ @app.get("/")
21
+ async def index():
22
+ return PlainTextResponse("✅ FastAPI server running on Hugging Face Spaces!")
23
 
24
  @app.get("/health")
25
+ async def health_check():
26
  return {"status": "healthy"}
27
 
28
  @app.post("/start-session")
29
+ async def start_session(request: Request):
30
  data = await request.json()
31
  user_id = data.get("user_id")
32
  text = data.get("text")
33
 
34
  if not user_id or not text:
35
+ return JSONResponse({"error": "user_id and text required"}, status_code=400)
36
 
37
  if user_id in sessions:
38
+ return JSONResponse({"message": "Session exists", "status": sessions[user_id]["status"]}, status_code=409)
39
+
40
+ sessions[user_id] = {"status": "pending"}
41
+ queues[user_id] = queue.Queue()
42
+
43
+ def worker():
44
+ try:
45
+ sessions[user_id]["status"] = "processing"
46
+ lines = sent_tokenize(text) if '\n' not in text else [l.strip() for l in text.split('\n') if l.strip()]
47
+ for i, line in enumerate(lines, 1):
48
+ result = classifier(line)[0]
49
+ queues[user_id].put(f"data: {json.dumps({'line': i, 'text': line, 'label': result['label'], 'confidence': round(result['score']*100,2)})}\n\n")
50
+ time.sleep(0.1)
51
+ queues[user_id].put("event: done\ndata: Session complete\n\n")
52
+ except Exception as e:
53
+ queues[user_id].put(f"event: error\ndata: {str(e)}\n\n")
54
+ finally:
55
+ sessions[user_id]["status"] = "done"
56
+ time.sleep(1)
57
+ del sessions[user_id]
58
+ del queues[user_id]
59
+
60
+ threading.Thread(target=worker, daemon=True).start()
61
+ return {"message": "Session started", "status": "pending"}
 
 
 
 
 
 
 
 
 
62
 
63
  @app.get("/stream/{user_id}")
64
  async def stream(user_id: str):
65
  if user_id not in sessions:
66
+ return JSONResponse({"error": "No active session"}, status_code=404)
67
 
68
+ def event_stream():
69
  while True:
70
+ try:
71
+ msg = queues[user_id].get(timeout=30)
72
+ yield msg
73
+ if "event: done" in msg or "event: error" in msg:
74
+ break
75
+ except queue.Empty:
76
+ yield "event: timeout\ndata: No activity\n\n"
77
  break
 
78
 
79
  return StreamingResponse(event_stream(), media_type="text/event-stream")
80
 
81
  @app.get("/status/{user_id}")
82
+ async def session_status(user_id: str):
83
  return {"status": sessions.get(user_id, {}).get("status", "no_session")}