Alibrown commited on
Commit
881cf96
·
verified ·
1 Parent(s): 770c01d

Create adi.py

Browse files
Files changed (1) hide show
  1. app/adi.py +496 -0
app/adi.py ADDED
@@ -0,0 +1,496 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # =====================================================================================================
2
+ # Anti-Dump Algorithm (ADI) - FIXED VERSION
3
+ # A mathematical framework for evaluating and filtering low-quality, unproductive text inputs.
4
+ #
5
+ # Copyright 2008 - 2026 S. Volkan Kücükbudak
6
+ # NOTE: This file contains the core logic for calculating the ADI. It is not an application itself.
7
+ # It serves as a library to be integrated into other tools.
8
+ #
9
+ # IF YOU USE THIS CODE, PLEASE READ THE LICENSE FILE.
10
+ # Do not steal free code. Respecting developers' credits ensures that projects like this remain open-source.
11
+ # =====================================================================================================
12
+ # https://github.com/VolkanSah/Anti-Dump-Index
13
+ # =====================================================================================================
14
+ # QUICK USAGE EXAMPLE
15
+ # This section demonstrates how to initialize the analyzer and run it on sample texts.
16
+ # =====================================================================================================
17
+ #
18
+ # analyzer = DumpindexAnalyzer()
19
+ #
20
+ # test_inputs = [
21
+ # "Pls fix my code. Urgent!!!",
22
+ # """I'm trying to implement a login function in Python.
23
+ # When calling auth.login(), I get a TypeError.
24
+ # Here's my code:
25
+ # ```python
26
+ # def login(username, password):
27
+ # return auth.login(username)
28
+ # ```
29
+ # I'm using Python 3.8 and the auth library version 2.1."""
30
+ # ]
31
+ #
32
+ # for input_text in test_inputs:
33
+ # result = analyzer.analyze_input(input_text)
34
+ # print("-" * 50)
35
+ # print(f"Analysis for: {input_text[:50]}...")
36
+ # print(f"ADI: {result['adi']}")
37
+ # print(f"Decision: {result['decision']}")
38
+ # print("Recommendations:")
39
+ # for rec in result['recommendations']:
40
+ # print(f"- {rec}")
41
+ # print("\nMetrics:", result['metrics'])
42
+ # print("-" * 50)
43
+ #
44
+ # =====================================================================================================
45
+ # END OF EXAMPLE
46
+ # =====================================================================================================
47
+
48
+ from dataclasses import dataclass
49
+ from typing import List, Dict, Tuple, Optional
50
+ import re
51
+ import numpy as np
52
+ import json
53
+ from pathlib import Path
54
+
55
+ @dataclass
56
+ class InputMetrics:
57
+ noise: float
58
+ effort: float
59
+ context: float
60
+ details: float
61
+ bonus_factors: float
62
+ penalty_factors: float
63
+ repetition_penalty: float = 0.0
64
+
65
+ class DumpindexAnalyzer:
66
+ def __init__(self, weights: Dict[str, float] = None, enable_logging: bool = False):
67
+ """
68
+ Initialize the ADI Analyzer.
69
+
70
+ Args:
71
+ weights: Custom weight configuration for your use case
72
+ enable_logging: If True, logs all analyses to adi_logs.jsonl for later optimization
73
+ """
74
+ self.weights = weights or {
75
+ 'noise': 1.0,
76
+ 'effort': 2.0,
77
+ 'context': 1.5,
78
+ 'details': 1.5,
79
+ 'bonus': 0.5,
80
+ 'penalty': 1.0
81
+ }
82
+
83
+ self.enable_logging = enable_logging
84
+ self.log_file = Path('adi_logs.jsonl')
85
+
86
+ # Pattern definitions for metric extraction
87
+ # !!!! Only demo examples! In production you need your own or get data from vectors!!!!
88
+ self.noise_patterns = {
89
+ 'urgency': r'\b(urgent|asap|emergency|!!+|\?\?+)\b',
90
+ 'informal': r'\b(pls|plz|thx|omg|wtf)\b',
91
+ 'vague': r'\b(something|somehow|maybe|probably)\b'
92
+ }
93
+
94
+ self.detail_patterns = {
95
+ 'code_elements': r'\b(function|class|method|variable|array|object|def|return)\b',
96
+ 'technical_terms': r'\b(error|exception|bug|issue|crash|fail|traceback|stack)\b',
97
+ 'specifics': r'[a-zA-Z_][a-zA-Z0-9_]*\.[a-zA-Z_][a-zA-Z0-9_]*'
98
+ }
99
+
100
+ self.context_indicators = {
101
+ 'background': r'\b(because|since|as|when|while)\b',
102
+ 'environment': r'\b(using|version|environment|platform|system)\b',
103
+ 'goal': r'\b(trying to|want to|need to|goal is|attempting to)\b'
104
+ }
105
+
106
+ def _has_negation_before(self, text: str, match_pos: int, window_size: int = 50) -> bool:
107
+ """
108
+ Check if a negation word appears within a specified window before the match position.
109
+ This prevents false positives like 'I have no idea when this started' counting as context.
110
+
111
+ Args:
112
+ text: The full input text
113
+ match_pos: Position of the matched pattern
114
+ window_size: Number of characters to look back (default: 50)
115
+
116
+ Returns:
117
+ True if negation found, False otherwise
118
+ """
119
+ window_start = max(0, match_pos - window_size)
120
+ window = text[window_start:match_pos].lower()
121
+ return bool(re.search(r'\b(no|not|never|without|dont|don\'t|doesnt|doesn\'t)\b', window))
122
+
123
+ def calculate_repetition_penalty(self, text: str) -> float:
124
+ """
125
+ Calculate penalty for keyword stuffing and repetitive patterns.
126
+ This prevents gaming the system by repeating technical terms.
127
+
128
+ Returns:
129
+ Penalty score (0 to 3, where higher means more repetition)
130
+ """
131
+ words = text.lower().split()
132
+ if len(words) == 0:
133
+ return 0.0
134
+
135
+ # Calculate unique word ratio
136
+ unique_ratio = len(set(words)) / len(words)
137
+
138
+ # Detect excessive repetition of the same word
139
+ word_counts = {}
140
+ for word in words:
141
+ if len(word) > 3: # Ignore short words like 'the', 'and'
142
+ word_counts[word] = word_counts.get(word, 0) + 1
143
+
144
+ max_repetition = max(word_counts.values()) if word_counts else 1
145
+ repetition_factor = min(max_repetition / len(words), 0.5)
146
+
147
+ # Combined penalty
148
+ penalty = (1 - unique_ratio) * 2 + repetition_factor * 2
149
+ return min(penalty, 3.0)
150
+
151
+ def calculate_noise(self, text: str) -> Tuple[float, Dict]:
152
+ """
153
+ Calculates the noise ratio in the input text by detecting irrelevant or informal words.
154
+ Returns the ratio of noise words to total words, and a dictionary of all matched patterns.
155
+ """
156
+ noise_count = 0
157
+ noise_details = {}
158
+
159
+ for category, pattern in self.noise_patterns.items():
160
+ matches = re.findall(pattern, text.lower())
161
+ noise_count += len(matches)
162
+ noise_details[category] = matches
163
+
164
+ total_words = len(text.split())
165
+ return (noise_count / max(total_words, 1), noise_details)
166
+
167
+ def calculate_effort(self, text: str) -> float:
168
+ """
169
+ Assesses the effort invested in the input's structure.
170
+ FIXED: Now handles edge cases like very short sentences properly.
171
+ """
172
+ sentences = [s.strip() for s in re.split(r'[.!?]+', text) if s.strip()]
173
+ if not sentences:
174
+ return 0.0
175
+
176
+ avg_sentence_length = np.mean([len(s.split()) for s in sentences])
177
+ has_formatting = bool(re.search(r'```|\*\*|\n\s*\n', text))
178
+ has_punctuation = bool(re.search(r'[.,;:]', text))
179
+
180
+ # FIX: Weight sentence count AND length, not just length range
181
+ sentence_quality = (
182
+ (len(sentences) >= 3) * 1.0 + # Bonus for multiple sentences
183
+ (20 <= avg_sentence_length <= 50) * 2.0 + # Ideal length range
184
+ (avg_sentence_length >= 5) * 0.5 # Minimum meaningful length
185
+ )
186
+
187
+ effort_score = min(5.0, (
188
+ sentence_quality +
189
+ has_formatting * 1.5 +
190
+ has_punctuation * 1.5
191
+ ))
192
+
193
+ return effort_score
194
+
195
+ def calculate_context(self, text: str) -> float:
196
+ """
197
+ Measures the presence of background information.
198
+ FIXED: Now checks for negations to avoid false positives.
199
+ """
200
+ context_score = 0.0
201
+
202
+ for category, pattern in self.context_indicators.items():
203
+ for match in re.finditer(pattern, text.lower()):
204
+ # Only count if NOT preceded by negation
205
+ if not self._has_negation_before(text, match.start()):
206
+ context_score += 1.0
207
+ break # Only count once per category
208
+
209
+ return min(5.0, context_score)
210
+
211
+ def calculate_details(self, text: str) -> Tuple[float, Dict]:
212
+ """
213
+ Quantifies the level of technical depth. This function looks for specific
214
+ technical keywords, code snippets, and structured data that adds value.
215
+ """
216
+ detail_score = 0.0
217
+ detail_findings = {}
218
+
219
+ for category, pattern in self.detail_patterns.items():
220
+ matches = re.findall(pattern, text.lower())
221
+ score = len(matches) * 0.5
222
+ detail_findings[category] = matches
223
+ detail_score += score
224
+
225
+ # Cap the score to prevent keyword stuffing from dominating
226
+ return (min(5.0, detail_score), detail_findings)
227
+
228
+ def calculate_bonus_factors(self, text: str) -> float:
229
+ """
230
+ Identifies and rewards positive formatting elements like code blocks,
231
+ links, or bulleted lists, which significantly improve clarity.
232
+ """
233
+ bonus_score = 0.0
234
+
235
+ if re.search(r'```[\s\S]*?```', text):
236
+ bonus_score += 1.0
237
+ if re.search(r'\[.*?\]\(.*?\)', text):
238
+ bonus_score += 0.5
239
+ if re.search(r'\n\s*[-*+]\s', text):
240
+ bonus_score += 0.5
241
+
242
+ return bonus_score
243
+
244
+ def calculate_penalty_factors(self, text: str) -> Tuple[float, Dict]:
245
+ """
246
+ Deducts points for negative characteristics, such as excessive capitalization,
247
+ redundant punctuation, or inputs that are too short to be useful.
248
+ """
249
+ penalties = {}
250
+
251
+ # Excessive capitalization
252
+ alpha_chars = re.findall(r'[a-zA-Z]', text)
253
+ if alpha_chars:
254
+ caps_ratio = len(re.findall(r'[A-Z]', text)) / len(alpha_chars)
255
+ if caps_ratio > 0.7:
256
+ penalties['excessive_caps'] = caps_ratio
257
+
258
+ # Excessive punctuation
259
+ excessive_punctuation = len(re.findall(r'[!?]{2,}', text))
260
+ if excessive_punctuation:
261
+ penalties['excessive_punctuation'] = excessive_punctuation
262
+
263
+ # Too short
264
+ if len(text.split()) < 10:
265
+ penalties['too_short'] = 1.0
266
+
267
+ penalty_score = sum(penalties.values()) if penalties else 0
268
+ return (min(5.0, penalty_score), penalties)
269
+
270
+ def calculate_adi(self, metrics: InputMetrics) -> float:
271
+ """
272
+ Calculates the final Anti-Dump Index (ADI) score using the weighted formula.
273
+ FIXED: Now includes repetition penalty in the denominator to dampen gaming attempts.
274
+ """
275
+ try:
276
+ numerator = (
277
+ self.weights['noise'] * metrics.noise -
278
+ (self.weights['effort'] * metrics.effort +
279
+ self.weights['bonus'] * metrics.bonus_factors)
280
+ )
281
+
282
+ # FIX: Add repetition penalty to denominator to reduce impact of keyword stuffing
283
+ denominator = (
284
+ self.weights['context'] * metrics.context +
285
+ self.weights['details'] * metrics.details +
286
+ self.weights['penalty'] * metrics.penalty_factors +
287
+ metrics.repetition_penalty
288
+ )
289
+
290
+ # Ensure we never divide by zero
291
+ return numerator / max(denominator, 0.1)
292
+
293
+ except Exception as e:
294
+ print(f"Error calculating ADI: {e}")
295
+ return float('inf')
296
+
297
+ def analyze_input(self, text: str, user_context: Optional[Dict] = None) -> Dict:
298
+ """
299
+ Main entry point for the analysis. Orchestrates the entire workflow.
300
+
301
+ Args:
302
+ text: The input text to analyze
303
+ user_context: Optional dict with 'tier', 'history_avg' for context-aware routing
304
+
305
+ Returns:
306
+ Dictionary with ADI score, metrics, decision, and recommendations
307
+ """
308
+ # Calculate all metrics
309
+ noise_value, noise_details = self.calculate_noise(text)
310
+ effort_value = self.calculate_effort(text)
311
+ context_value = self.calculate_context(text)
312
+ details_value, detail_findings = self.calculate_details(text)
313
+ bonus_value = self.calculate_bonus_factors(text)
314
+ penalty_value, penalty_details = self.calculate_penalty_factors(text)
315
+ repetition_value = self.calculate_repetition_penalty(text)
316
+
317
+ metrics = InputMetrics(
318
+ noise=noise_value,
319
+ effort=effort_value,
320
+ context=context_value,
321
+ details=details_value,
322
+ bonus_factors=bonus_value,
323
+ penalty_factors=penalty_value,
324
+ repetition_penalty=repetition_value
325
+ )
326
+
327
+ adi = self.calculate_adi(metrics)
328
+
329
+ # Context-aware adjustment (if user tier provided)
330
+ adi_adjusted = adi
331
+ if user_context:
332
+ if user_context.get('tier') == 'enterprise':
333
+ adi_adjusted *= 0.8 # More lenient for paying customers
334
+ if user_context.get('history_avg', 0) < 0:
335
+ adi_adjusted *= 0.9 # Boost for users with good track record
336
+
337
+ decision = self._make_decision(adi_adjusted)
338
+ recommendations = self._generate_recommendations(
339
+ metrics, noise_details, detail_findings, penalty_details
340
+ )
341
+
342
+ result = {
343
+ 'adi': round(adi, 3),
344
+ 'adi_adjusted': round(adi_adjusted, 3) if user_context else None,
345
+ 'metrics': {
346
+ 'noise': round(noise_value, 3),
347
+ 'effort': round(effort_value, 3),
348
+ 'context': round(context_value, 3),
349
+ 'details': round(details_value, 3),
350
+ 'bonus_factors': round(bonus_value, 3),
351
+ 'penalty_factors': round(penalty_value, 3),
352
+ 'repetition_penalty': round(repetition_value, 3)
353
+ },
354
+ 'decision': decision,
355
+ 'recommendations': recommendations,
356
+ 'details': {
357
+ 'noise_findings': noise_details,
358
+ 'technical_details': detail_findings,
359
+ 'penalties': penalty_details
360
+ }
361
+ }
362
+
363
+ # Optional logging for later weight optimization
364
+ if self.enable_logging:
365
+ self._log_analysis(text, adi, metrics)
366
+
367
+ return result
368
+
369
+ def _make_decision(self, adi: float) -> str:
370
+ """
371
+ Translates the numerical ADI score into a categorical decision.
372
+ """
373
+ if adi > 1:
374
+ return "REJECT"
375
+ elif 0 <= adi <= 1:
376
+ return "MEDIUM_PRIORITY"
377
+ else:
378
+ return "HIGH_PRIORITY"
379
+
380
+ def _generate_recommendations(self, metrics: InputMetrics,
381
+ noise_details: Dict,
382
+ detail_findings: Dict,
383
+ penalty_details: Dict) -> List[str]:
384
+ """
385
+ Generates actionable suggestions to help the user improve their input.
386
+ """
387
+ recommendations = []
388
+
389
+ if metrics.noise > 0.3:
390
+ recommendations.append("Reduce informal or urgent expressions.")
391
+
392
+ if metrics.context < 1.0:
393
+ recommendations.append("Provide more context (environment, background, goal).")
394
+
395
+ if metrics.details < 1.0:
396
+ recommendations.append("Include specific technical details or error messages.")
397
+
398
+ if metrics.effort < 2.0:
399
+ recommendations.append("Improve the structure of your input with proper sentences.")
400
+
401
+ if metrics.repetition_penalty > 1.0:
402
+ recommendations.append("Avoid repeating the same keywords excessively.")
403
+
404
+ if metrics.penalty_factors > 0:
405
+ if 'excessive_caps' in penalty_details:
406
+ recommendations.append("Avoid excessive capitalization.")
407
+ if 'excessive_punctuation' in penalty_details:
408
+ recommendations.append("Reduce excessive punctuation marks.")
409
+ if 'too_short' in penalty_details:
410
+ recommendations.append("Provide a more detailed description (minimum 10 words).")
411
+
412
+ if not recommendations:
413
+ recommendations.append("Your input quality is excellent. No improvements needed.")
414
+
415
+ return recommendations
416
+
417
+ def _log_analysis(self, text: str, adi: float, metrics: InputMetrics):
418
+ """
419
+ Log analysis results to file for later weight optimization.
420
+ Format: One JSON object per line (JSONL).
421
+ """
422
+ log_entry = {
423
+ 'text_hash': hash(text),
424
+ 'text_length': len(text),
425
+ 'adi': round(adi, 3),
426
+ 'metrics': {
427
+ 'noise': round(metrics.noise, 3),
428
+ 'effort': round(metrics.effort, 3),
429
+ 'context': round(metrics.context, 3),
430
+ 'details': round(metrics.details, 3),
431
+ 'bonus_factors': round(metrics.bonus_factors, 3),
432
+ 'penalty_factors': round(metrics.penalty_factors, 3),
433
+ 'repetition_penalty': round(metrics.repetition_penalty, 3)
434
+ }
435
+ }
436
+
437
+ with open(self.log_file, 'a') as f:
438
+ f.write(json.dumps(log_entry) + '\n')
439
+
440
+ def validate_weights(self, test_cases: List[Tuple[str, str]]) -> float:
441
+ """
442
+ Validate current weights against manually labeled test cases.
443
+
444
+ Args:
445
+ test_cases: List of (input_text, expected_decision) tuples
446
+ Example: [("Help pls!", "REJECT"), ("Python KeyError...", "HIGH_PRIORITY")]
447
+
448
+ Returns:
449
+ Accuracy score (0.0 to 1.0)
450
+ """
451
+ if not test_cases:
452
+ raise ValueError("test_cases cannot be empty")
453
+
454
+ correct = 0
455
+ for text, expected in test_cases:
456
+ result = self.analyze_input(text)
457
+ if result['decision'] == expected:
458
+ correct += 1
459
+
460
+ accuracy = correct / len(test_cases)
461
+ print(f"Weight Validation: {correct}/{len(test_cases)} correct ({accuracy:.1%})")
462
+ return accuracy
463
+
464
+
465
+ # =====================================================================================================
466
+ # USAGE EXAMPLE
467
+ # =====================================================================================================
468
+ # if __name__ == "__main__":
469
+ # analyzer = DumpindexAnalyzer(enable_logging=False)
470
+ #
471
+ # test_inputs = [
472
+ # "Pls fix my code. Urgent!!!",
473
+ # """I'm trying to implement a login function in Python.
474
+ # When calling auth.login(), I get a TypeError.
475
+ # Here's my code:
476
+ # ```python
477
+ # def login(username, password):
478
+ # # return auth.login(username)
479
+ # ```
480
+ # I'm using Python 3.8 and the auth library version 2.1.""",
481
+ # "error error error bug bug crash crash function method class object variable", # Keyword stuffing test
482
+ # ]
483
+
484
+ # for input_text in test_inputs:
485
+ # result = analyzer.analyze_input(input_text)
486
+ # print("-" * 70)
487
+ # print(f"Input: {input_text[:60]}...")
488
+ # print(f"ADI: {result['adi']}")
489
+ # print(f"Decision: {result['decision']}")
490
+ # print("Recommendations:")
491
+ # for rec in result['recommendations']:
492
+ # print(f" - {rec}")
493
+ # print(f"Metrics: {result['metrics']}")
494
+ # print("-" * 70)
495
+
496
+ # Have fun :) Volkan Sah