Kevinger commited on
Commit
68ed19a
·
1 Parent(s): 55c59f1

Enhance EndpointHandler with model optimizations and caching; add .gitignore for virtual environment

Browse files
Files changed (2) hide show
  1. .gitignore +3 -0
  2. handler.py +117 -33
.gitignore ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+
2
+ .venv/
3
+
handler.py CHANGED
@@ -1,45 +1,129 @@
1
  import os
 
 
2
 
3
- from typing import Any, Dict, List
4
  from flair.data import Sentence
5
  from flair.models import SequenceTagger
6
 
 
 
 
 
7
 
8
  class EndpointHandler:
9
  def __init__(self, path: str):
10
- self.tagger = SequenceTagger.load(os.path.join(path, "pytorch_model.bin"))
11
-
12
- def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
13
- """
14
- Args:
15
- inputs (:obj:`str`):
16
- a string containing some text
17
- Return:
18
- A :obj:`list`:. The object returned should be like [{"entity_group": "XXX", "word": "some word", "start": 3, "end": 6, "score": 0.82}] containing :
19
- - "entity_group": A string representing what the entity is.
20
- - "word": A substring of the original string that was detected as an entity.
21
- - "start": the offset within `input` leading to `answer`. context[start:stop] == word
22
- - "end": the ending offset within `input` leading to `answer`. context[start:stop] === word
23
- - "score": A score between 0 and 1 describing how confident the model is for this entity.
24
- """
25
- inputs = data.pop("inputs", data)
26
- sentence: Sentence = Sentence(inputs)
27
-
28
- # Also show scores for recognized NEs
29
- self.tagger.predict(sentence, label_name="predicted")
 
 
 
30
 
 
 
 
 
 
 
 
 
 
31
  entities = []
32
- for span in sentence.get_spans("predicted"):
33
- if len(span.tokens) == 0:
34
- continue
35
- current_entity = {
36
- "entity_group": span.tag,
37
- "word": span.text,
38
- "start": span.tokens[0].start_position,
39
- "end": span.tokens[-1].end_position,
40
- "score": span.score,
41
- }
42
-
43
- entities.append(current_entity)
 
 
 
 
44
 
45
  return entities
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
+ import logging
3
+ import torch
4
 
5
+ from typing import Any, Dict, List, Union
6
  from flair.data import Sentence
7
  from flair.models import SequenceTagger
8
 
9
+ # Configure logging
10
+ logging.basicConfig(level=logging.INFO)
11
+ logger = logging.getLogger(__name__)
12
+
13
 
14
  class EndpointHandler:
15
  def __init__(self, path: str):
16
+ # Log initialization
17
+ logger.info(f"Initializing Flair endpoint handler from {path}")
18
+
19
+ # Load model with performance optimizations
20
+ model_path = os.path.join(path, "pytorch_model.bin")
21
+
22
+ # Check if CUDA is available and enable if possible
23
+ use_cuda = torch.cuda.is_available()
24
+ device = torch.device("cuda" if use_cuda else "cpu")
25
+ logger.info(f"Using device: {device}")
26
+
27
+ # Load the model with optimizations
28
+ self.tagger = SequenceTagger.load(model_path)
29
+ self.tagger.to(device)
30
+
31
+ # Enable model evaluation mode for better inference performance
32
+ self.tagger.eval()
33
+
34
+ # Cache for commonly requested inputs
35
+ self.cache = {}
36
+ self.cache_size_limit = 1000 # Adjust based on memory constraints
37
+
38
+ logger.info("Model successfully loaded and ready for inference")
39
 
40
+ def preprocess(self, text: str) -> Sentence:
41
+ # Create a sentence with optimized tokenization
42
+ return Sentence(text)
43
+
44
+ def predict_batch(self, sentences: List[Sentence]) -> None:
45
+ with torch.no_grad(): # Disable gradient calculation for inference
46
+ self.tagger.predict(sentences, label_name="predicted", mini_batch_size=32)
47
+
48
+ def postprocess(self, sentence: Sentence) -> List[Dict[str, Any]]:
49
  entities = []
50
+
51
+ try:
52
+ for span in sentence.get_spans("predicted"):
53
+ if len(span.tokens) == 0:
54
+ continue
55
+
56
+ current_entity = {
57
+ "entity_group": span.tag,
58
+ "word": span.text,
59
+ "start": span.tokens[0].start_position,
60
+ "end": span.tokens[-1].end_position,
61
+ "score": float(span.score), # Ensure score is serializable
62
+ }
63
+ entities.append(current_entity)
64
+ except Exception as e:
65
+ logger.error(f"Error in postprocessing: {str(e)}")
66
 
67
  return entities
68
+
69
+ def __call__(
70
+ self, data: Union[Dict[str, Any], List[Dict[str, Any]]]
71
+ ) -> Union[List[Dict[str, Any]], List[List[Dict[str, Any]]]]:
72
+ # Handle both single input and batch input cases
73
+ is_batch_input = isinstance(data, list)
74
+
75
+ if not is_batch_input:
76
+ # Convert single input to batch format temporarily
77
+ data = [data]
78
+
79
+ # Extract inputs from each item in the batch
80
+ batch_inputs = []
81
+ for item in data:
82
+ text = item.pop("inputs", item) if isinstance(item, dict) else item
83
+
84
+ # Validate input
85
+ if not isinstance(text, str):
86
+ text = str(text)
87
+
88
+ # Check cache for this input
89
+ if text in self.cache:
90
+ batch_inputs.append((text, True))
91
+ else:
92
+ batch_inputs.append((text, False))
93
+
94
+ # Process non-cached inputs
95
+ sentences_to_process = []
96
+ for text, is_cached in batch_inputs:
97
+ if not is_cached:
98
+ sentences_to_process.append(self.preprocess(text))
99
+
100
+ # Batch process sentences if any need processing
101
+ if sentences_to_process:
102
+ self.predict_batch(sentences_to_process)
103
+
104
+ # Build results, including from cache
105
+ results = []
106
+ sentence_idx = 0
107
+
108
+ for text, is_cached in batch_inputs:
109
+ if is_cached:
110
+ # Get from cache
111
+ result = self.cache[text]
112
+ else:
113
+ # Process the sentence and cache result
114
+ sentence = sentences_to_process[sentence_idx]
115
+ result = self.postprocess(sentence)
116
+
117
+ # Update cache if not too large
118
+ if len(self.cache) < self.cache_size_limit:
119
+ self.cache[text] = result
120
+
121
+ sentence_idx += 1
122
+
123
+ results.append(result)
124
+
125
+ # Return single result if input was single
126
+ if not is_batch_input:
127
+ return results[0]
128
+
129
+ return results