Spaces:
Running
Running
Create pipeline/stages/s2_classify.py
Browse files- pipeline/stages/s2_classify.py +124 -0
pipeline/stages/s2_classify.py
ADDED
|
@@ -0,0 +1,124 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""
|
| 2 |
+
S2: CLASSIFY — Technical vs non-technical classification.
|
| 3 |
+
|
| 4 |
+
Per SPEC-PIPELINE-001 Part B.2:
|
| 5 |
+
Rule-based classifier first (keyword density, channel_b markers).
|
| 6 |
+
High-confidence → S3. Ambiguous → oracle queue.
|
| 7 |
+
|
| 8 |
+
Invariant: Every segment is classified. No segment is unclassified.
|
| 9 |
+
"""
|
| 10 |
+
|
| 11 |
+
from __future__ import annotations
|
| 12 |
+
|
| 13 |
+
import re
|
| 14 |
+
from dataclasses import dataclass
|
| 15 |
+
from enum import Enum
|
| 16 |
+
from typing import Optional
|
| 17 |
+
|
| 18 |
+
from pipeline.stages.s1_segment import Segment
|
| 19 |
+
from pipeline.types import ExclusionReason
|
| 20 |
+
|
| 21 |
+
|
| 22 |
+
class Classification(str, Enum):
|
| 23 |
+
TECHNICAL = "TECHNICAL"
|
| 24 |
+
NON_TECHNICAL = "NON_TECHNICAL"
|
| 25 |
+
AMBIGUOUS = "AMBIGUOUS" # → oracle queue
|
| 26 |
+
|
| 27 |
+
|
| 28 |
+
@dataclass
|
| 29 |
+
class ClassifiedSegment:
|
| 30 |
+
segment: Segment
|
| 31 |
+
classification: Classification
|
| 32 |
+
confidence: float # 0.0 - 1.0
|
| 33 |
+
exclusion_reason: Optional[ExclusionReason] = None # if NON_TECHNICAL
|
| 34 |
+
|
| 35 |
+
|
| 36 |
+
# Keyword sets for rule-based classification
|
| 37 |
+
_TECHNICAL_KEYWORDS = {
|
| 38 |
+
"module", "endmodule", "input", "output", "wire", "reg", "assign",
|
| 39 |
+
"always", "posedge", "negedge", "assert", "assume", "cover",
|
| 40 |
+
"property", "constraint", "parameter", "localparam", "generate",
|
| 41 |
+
"function", "task", "begin", "end", "if", "else", "case",
|
| 42 |
+
"invariant", "specification", "requirement", "shall", "must",
|
| 43 |
+
"verification", "equivalence", "formal", "synthesis", "netlist",
|
| 44 |
+
}
|
| 45 |
+
|
| 46 |
+
_NON_TECHNICAL_PATTERNS = [
|
| 47 |
+
(re.compile(r"(?i)copyright|license|all rights reserved|permission is hereby"),
|
| 48 |
+
ExclusionReason.LICENSE_HEADER),
|
| 49 |
+
(re.compile(r"(?i)changelog|release notes|version history|what.s new"),
|
| 50 |
+
ExclusionReason.CHANGELOG),
|
| 51 |
+
(re.compile(r"(?i)table of contents|page \d+|index$"),
|
| 52 |
+
ExclusionReason.FORMATTING),
|
| 53 |
+
(re.compile(r"(?i)acknowledgments?|thanks to|we would like to"),
|
| 54 |
+
ExclusionReason.EDITORIAL),
|
| 55 |
+
(re.compile(r"(?i)disclaimer|no warranty|as.is"),
|
| 56 |
+
ExclusionReason.BOILERPLATE),
|
| 57 |
+
# Platform injection — ChatGPT filecite metadata, not user content
|
| 58 |
+
(re.compile(r"fileciteturn\d+file\d+"),
|
| 59 |
+
ExclusionReason.BOILERPLATE),
|
| 60 |
+
]
|
| 61 |
+
|
| 62 |
+
|
| 63 |
+
def classify(seg: Segment) -> ClassifiedSegment:
|
| 64 |
+
"""Classify a segment as TECHNICAL, NON_TECHNICAL, or AMBIGUOUS."""
|
| 65 |
+
text = seg.text
|
| 66 |
+
|
| 67 |
+
# Whitespace-only check
|
| 68 |
+
if not text.strip():
|
| 69 |
+
return ClassifiedSegment(
|
| 70 |
+
segment=seg,
|
| 71 |
+
classification=Classification.NON_TECHNICAL,
|
| 72 |
+
confidence=1.0,
|
| 73 |
+
exclusion_reason=ExclusionReason.WHITESPACE,
|
| 74 |
+
)
|
| 75 |
+
|
| 76 |
+
# Non-technical pattern matching
|
| 77 |
+
for pattern, reason in _NON_TECHNICAL_PATTERNS:
|
| 78 |
+
if pattern.search(text):
|
| 79 |
+
return ClassifiedSegment(
|
| 80 |
+
segment=seg,
|
| 81 |
+
classification=Classification.NON_TECHNICAL,
|
| 82 |
+
confidence=0.9,
|
| 83 |
+
exclusion_reason=reason,
|
| 84 |
+
)
|
| 85 |
+
|
| 86 |
+
# Technical keyword density
|
| 87 |
+
words = set(re.findall(r"\w+", text.lower()))
|
| 88 |
+
tech_count = len(words & _TECHNICAL_KEYWORDS)
|
| 89 |
+
total_words = max(len(words), 1)
|
| 90 |
+
tech_density = tech_count / total_words
|
| 91 |
+
|
| 92 |
+
# Channel B markers
|
| 93 |
+
has_verilog = bool(re.search(r"\b(module|endmodule|always|assign)\b", text))
|
| 94 |
+
has_assertion = bool(re.search(r"\b(assert|assume|cover)\s+property", text))
|
| 95 |
+
has_formal = bool(re.search(r"\b(invariant|specification|requirement)\b", text, re.I))
|
| 96 |
+
|
| 97 |
+
if has_verilog or has_assertion:
|
| 98 |
+
return ClassifiedSegment(
|
| 99 |
+
segment=seg,
|
| 100 |
+
classification=Classification.TECHNICAL,
|
| 101 |
+
confidence=0.95,
|
| 102 |
+
)
|
| 103 |
+
|
| 104 |
+
if tech_density > 0.15 or has_formal:
|
| 105 |
+
return ClassifiedSegment(
|
| 106 |
+
segment=seg,
|
| 107 |
+
classification=Classification.TECHNICAL,
|
| 108 |
+
confidence=0.7 + tech_density,
|
| 109 |
+
)
|
| 110 |
+
|
| 111 |
+
if tech_density < 0.05:
|
| 112 |
+
return ClassifiedSegment(
|
| 113 |
+
segment=seg,
|
| 114 |
+
classification=Classification.NON_TECHNICAL,
|
| 115 |
+
confidence=0.7,
|
| 116 |
+
exclusion_reason=ExclusionReason.BOILERPLATE,
|
| 117 |
+
)
|
| 118 |
+
|
| 119 |
+
# Ambiguous — send to oracle
|
| 120 |
+
return ClassifiedSegment(
|
| 121 |
+
segment=seg,
|
| 122 |
+
classification=Classification.AMBIGUOUS,
|
| 123 |
+
confidence=tech_density,
|
| 124 |
+
)
|