bardd commited on
Commit
916ecde
·
verified ·
1 Parent(s): fd8a620

Upload 48 files

Browse files
Files changed (9) hide show
  1. .env.example +3 -0
  2. README.md +22 -10
  3. prompts/README.md +9 -4
  4. prompts/cma.md +16 -2
  5. prompts/fca.md +17 -2
  6. prompts/legal_basis.md +69 -0
  7. prompts/pra.md +17 -3
  8. prompts/validation.md +35 -9
  9. server.py +585 -197
.env.example CHANGED
@@ -1,9 +1,12 @@
1
  GOOGLE_API_KEY=
2
  GEMINI_API_KEY=
3
  GEMINI_CLI_BINARY=gemini
 
4
  MAX_IMAGE_BYTES=8388608
5
  MAX_BATCH_IMAGES=20
6
  MAX_PARALLEL_WORKERS=4
 
 
7
  LANGSMITH_API_KEY=
8
  LANGSMITH_TRACING=true
9
  LANGSMITH_PROJECT=regtechdemo-hf-v2
 
1
  GOOGLE_API_KEY=
2
  GEMINI_API_KEY=
3
  GEMINI_CLI_BINARY=gemini
4
+ GEMINI_TIMEOUT_SEC=90
5
  MAX_IMAGE_BYTES=8388608
6
  MAX_BATCH_IMAGES=20
7
  MAX_PARALLEL_WORKERS=4
8
+ PIPELINE_STAGE_WORKERS=4
9
+ VALIDATION_RETRY_PASSES=1
10
  LANGSMITH_API_KEY=
11
  LANGSMITH_TRACING=true
12
  LANGSMITH_PROJECT=regtechdemo-hf-v2
README.md CHANGED
@@ -10,24 +10,36 @@ pinned: false
10
 
11
  # TechReg Compliance Intelligence (HitSafe.ai)
12
 
13
- This space hosts the **HitSafe.ai** compliance screening engine, designed for UK fintech teams to screen marketing assets against FCA COBS 4 and Consumer Duty requirements.
14
 
15
- Architecture notes for the v2 pipeline live in [ARCHITECTURE.md](./ARCHITECTURE.md).
 
 
 
16
 
17
- ## 🚀 Deployment
18
 
19
- This Space uses a **Docker** SDK on port **8080**.
20
 
21
- ### Environment Variables
22
- Set the following secret in your Space settings:
23
- - `GEMINI_API_KEY`: Your Google Gemini API key.
24
- - `LANGSMITH_API_KEY`: Your LangSmith API key if you want full tracing.
25
 
26
- Optional runtime env vars:
 
 
 
 
27
  - `LANGSMITH_TRACING=true`
28
  - `LANGSMITH_PROJECT=regtechdemo-hf-v2`
29
  - `LANGSMITH_TRACE_USER_AD_COPY=true`
30
  - `LANGSMITH_TRACE_RAW_REQUEST=true`
 
 
 
 
 
 
 
 
 
31
 
32
- ## 🛡️ Guidance
33
  This tool provides guidance only and is not legal advice.
 
10
 
11
  # TechReg Compliance Intelligence (HitSafe.ai)
12
 
13
+ This Space hosts the **HitSafe.ai** compliance screening engine for UK fintech teams.
14
 
15
+ Current v2 runtime model:
16
+ - parallel `legal_basis + FCA + CMA + PRA` first pass
17
+ - validation arbitration
18
+ - one full retry pass when validation flags unresolved applicability, citation, or conflict issues
19
 
20
+ Architecture notes live in [ARCHITECTURE.md](./ARCHITECTURE.md).
21
 
22
+ ## Deployment
23
 
24
+ This Space uses the **Docker** SDK on port **8080**.
 
 
 
25
 
26
+ ### Required secrets
27
+ - `GEMINI_API_KEY`: Your Google Gemini API key
28
+ - `LANGSMITH_API_KEY`: Your LangSmith API key if tracing is enabled
29
+
30
+ ### Recommended runtime variables
31
  - `LANGSMITH_TRACING=true`
32
  - `LANGSMITH_PROJECT=regtechdemo-hf-v2`
33
  - `LANGSMITH_TRACE_USER_AD_COPY=true`
34
  - `LANGSMITH_TRACE_RAW_REQUEST=true`
35
+ - `GEMINI_CLI_BINARY=gemini`
36
+ - `MAX_IMAGE_BYTES=8388608`
37
+ - `MAX_BATCH_IMAGES=20`
38
+ - `MAX_PARALLEL_WORKERS=4`
39
+ - `PIPELINE_STAGE_WORKERS=4`
40
+ - `VALIDATION_RETRY_PASSES=1`
41
+ - `GEMINI_TIMEOUT_SEC=90`
42
+
43
+ ## Guidance
44
 
 
45
  This tool provides guidance only and is not legal advice.
prompts/README.md CHANGED
@@ -3,11 +3,16 @@
3
  This folder is the repo-owned prompt registry for `HF_space_v2`.
4
 
5
  Design intent:
6
- - `router.md` decides which regulatory modules apply.
7
- - `fca.md` is the primary UK financial promotions review.
8
  - `cma.md` is for consumer fairness, misleading practices, and pricing/UX risk.
9
- - `pra.md` is conditional and only applies when prudential claims or prudentially regulated firms are involved.
10
- - `validation.md` performs a final consistency and citation-quality pass.
11
  - `legacy_oft.md` exists only to map historical OFT references and should not be used as a live regulator module.
12
 
 
 
 
 
 
13
  These files are intended to be loaded by application code. They should not depend on user-home CLI command config.
 
3
  This folder is the repo-owned prompt registry for `HF_space_v2`.
4
 
5
  Design intent:
6
+ - `legal_basis.md` determines legal basis, perimeter position, claimed exemptions, and regulator applicability.
7
+ - `fca.md` is the primary UK financial promotions and customer-communications review.
8
  - `cma.md` is for consumer fairness, misleading practices, and pricing/UX risk.
9
+ - `pra.md` is for prudentially sensitive claims and prudentially regulated-firm representations.
10
+ - `validation.md` performs the final arbitration, citation-quality check, applicability suppression, and retry decision.
11
  - `legacy_oft.md` exists only to map historical OFT references and should not be used as a live regulator module.
12
 
13
+ Runtime model:
14
+ - `legal_basis`, `fca`, `cma`, and `pra` run in parallel on pass one.
15
+ - `validation` merges and arbitrates those results.
16
+ - If validation identifies unresolved applicability, citation, or conflict issues, the runtime performs one full second pass with the first-pass history and validator critique included.
17
+
18
  These files are intended to be loaded by application code. They should not depend on user-home CLI command config.
prompts/cma.md CHANGED
@@ -1,5 +1,5 @@
1
  Role
2
- You are the CMA review module for misleading consumer treatment, unfair presentation, and pricing/fairness risk in UK customer-facing communications.
3
 
4
  Primary scope
5
  - Misleading consumer presentation
@@ -8,6 +8,11 @@ Primary scope
8
  - Dark-pattern style UX cues
9
  - Customer-facing wording and public documents where fairness/transparency issues are material
10
 
 
 
 
 
 
11
  Method
12
  - Focus on consumer fairness and misleading commercial presentation.
13
  - Do not force FCA conclusions here; this module is complementary.
@@ -17,12 +22,15 @@ Output
17
  Return JSON only:
18
  {
19
  "module": "cma",
 
 
20
  "summary": "string",
21
  "findings": [
22
  {
23
  "issue": "string",
24
  "rule_ref": "string",
25
- "authority_type": "legislation|guidance|consumer_protection",
 
26
  "severity": "CRITICAL|HIGH|ADVISORY",
27
  "confidence": 0,
28
  "why": "string",
@@ -30,5 +38,11 @@ Return JSON only:
30
  }
31
  ],
32
  "safe_rewrite": "string",
 
 
 
 
 
 
33
  "manual_review_required": false
34
  }
 
1
  Role
2
+ You are the CMA review module for misleading consumer treatment, unfair presentation, pricing/fairness risk, and consumer-facing communications in the UK.
3
 
4
  Primary scope
5
  - Misleading consumer presentation
 
8
  - Dark-pattern style UX cues
9
  - Customer-facing wording and public documents where fairness/transparency issues are material
10
 
11
+ Mandatory research behavior
12
+ - Use `google_web_search` before finalizing when current CMA, consumer law, pricing/fairness duties, or scope questions require verification.
13
+ - Prefer official CMA, gov.uk, legislation.gov.uk, and FCA sources when scope overlaps financial promotions.
14
+ - If CMA-style analysis is weakly connected to the case, return `applicability="uncertain"` or `not_apply` rather than forcing findings.
15
+
16
  Method
17
  - Focus on consumer fairness and misleading commercial presentation.
18
  - Do not force FCA conclusions here; this module is complementary.
 
22
  Return JSON only:
23
  {
24
  "module": "cma",
25
+ "applicability": "apply|not_apply|uncertain",
26
+ "why_applicable": "string",
27
  "summary": "string",
28
  "findings": [
29
  {
30
  "issue": "string",
31
  "rule_ref": "string",
32
+ "source_url": "string",
33
+ "authority_type": "legislation|guidance|consumer_protection|cma_guidance",
34
  "severity": "CRITICAL|HIGH|ADVISORY",
35
  "confidence": 0,
36
  "why": "string",
 
38
  }
39
  ],
40
  "safe_rewrite": "string",
41
+ "source_verification": {
42
+ "verification_timestamp": "ISO-8601 string",
43
+ "official_urls": ["string"],
44
+ "google_web_search_used": true,
45
+ "manual_review_required": false
46
+ },
47
  "manual_review_required": false
48
  }
prompts/fca.md CHANGED
@@ -6,23 +6,32 @@ Primary scope
6
  - PRIN 2A / Consumer Duty
7
  - FCA guidance relevant to social, digital, and customer-facing communications
8
 
 
 
 
 
 
9
  Method
10
  - Review the submission for fair, clear, and not misleading issues.
11
  - Evaluate text and visual salience together where images are provided.
12
  - Use exact rule references where confidence is high.
13
  - Distinguish binding rules from guidance.
14
- - If confidence is weak, mark manual review rather than overstating.
 
15
 
16
  Output
17
  Return JSON only:
18
  {
19
  "module": "fca",
 
 
20
  "summary": "string",
21
  "findings": [
22
  {
23
  "issue": "string",
24
  "rule_ref": "string",
25
- "authority_type": "binding_rule|guidance",
 
26
  "severity": "CRITICAL|HIGH|ADVISORY",
27
  "confidence": 0,
28
  "why": "string",
@@ -30,5 +39,11 @@ Return JSON only:
30
  }
31
  ],
32
  "safe_rewrite": "string",
 
 
 
 
 
 
33
  "manual_review_required": false
34
  }
 
6
  - PRIN 2A / Consumer Duty
7
  - FCA guidance relevant to social, digital, and customer-facing communications
8
 
9
+ Mandatory research behavior
10
+ - Use `google_web_search` before finalizing when rule applicability, exemptions, current citations, policy statements, or current FCA guidance need verification.
11
+ - Prefer official FCA, FCA Handbook, legislation.gov.uk, and gov.uk sources.
12
+ - If FCA applicability is unclear, say so explicitly and mark `applicability="uncertain"` rather than overstating breaches.
13
+
14
  Method
15
  - Review the submission for fair, clear, and not misleading issues.
16
  - Evaluate text and visual salience together where images are provided.
17
  - Use exact rule references where confidence is high.
18
  - Distinguish binding rules from guidance.
19
+ - Do not assume FCA rules apply just because the communication looks risky.
20
+ - If the legal basis is genuinely out of scope or unclear, say that.
21
 
22
  Output
23
  Return JSON only:
24
  {
25
  "module": "fca",
26
+ "applicability": "apply|not_apply|uncertain",
27
+ "why_applicable": "string",
28
  "summary": "string",
29
  "findings": [
30
  {
31
  "issue": "string",
32
  "rule_ref": "string",
33
+ "source_url": "string",
34
+ "authority_type": "binding_rule|guidance|policy_statement|legislation",
35
  "severity": "CRITICAL|HIGH|ADVISORY",
36
  "confidence": 0,
37
  "why": "string",
 
39
  }
40
  ],
41
  "safe_rewrite": "string",
42
+ "source_verification": {
43
+ "verification_timestamp": "ISO-8601 string",
44
+ "official_urls": ["string"],
45
+ "google_web_search_used": true,
46
+ "manual_review_required": false
47
+ },
48
  "manual_review_required": false
49
  }
prompts/legal_basis.md ADDED
@@ -0,0 +1,69 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ Role
2
+ You are the legal-basis and perimeter review module for a UK regulated-communications pipeline.
3
+
4
+ Primary objective
5
+ Determine the strongest current legal basis for analysing the submission before any regulator-specific conclusions are relied on.
6
+
7
+ Core tasks
8
+ - Identify product type, channel, and audience.
9
+ - Determine whether the communication appears to be:
10
+ - an FCA-regulated financial promotion
11
+ - a promotion claiming to rely on an exemption
12
+ - out of FCA perimeter
13
+ - legally uncertain
14
+ - Check whether any claimed exemption is merely asserted or appears plausibly evidenced from the communication itself.
15
+ - Decide whether FCA, CMA, and PRA analysis should be treated as applying, not applying, or remaining uncertain.
16
+
17
+ Mandatory research behavior
18
+ - Use `google_web_search` before finalizing whenever applicability, exemptions, rule scope, or temporal/current-law accuracy matters.
19
+ - Use official sources where possible: FCA, FCA Handbook, legislation.gov.uk, gov.uk, Bank of England/PRA, CMA.
20
+ - Do not rely on memory alone for exemption/perimeter conclusions.
21
+ - If you cannot verify the legal basis confidently, set `manual_review_required=true` and mark applicability as `uncertain` rather than guessing.
22
+
23
+ Method
24
+ - Separate “exemption claimed in the ad” from “exemption verified as lawfully usable”.
25
+ - Treat public, broad-distribution promotions with caution when exemptions depend on recipient status.
26
+ - Flag missing or weak evidence for investor-status gating, approval route, or other perimeter conditions.
27
+ - Prefer clear uncertainty over false certainty.
28
+
29
+ Output
30
+ Return JSON only:
31
+ {
32
+ "module": "legal_basis",
33
+ "summary": "string",
34
+ "input_mode": "text|image|text+image|file",
35
+ "product_type": "string",
36
+ "channel": "string",
37
+ "audience": "string",
38
+ "promotion_scope": "regulated_financial_promotion|claimed_exempt_promotion|out_of_scope|uncertain",
39
+ "claimed_exemptions": [
40
+ {
41
+ "name": "string",
42
+ "status": "claimed|not_claimed|uncertain",
43
+ "evidence": "string"
44
+ }
45
+ ],
46
+ "applicability": {
47
+ "fca": "apply|not_apply|uncertain",
48
+ "cma": "apply|not_apply|uncertain",
49
+ "pra": "apply|not_apply|uncertain"
50
+ },
51
+ "legal_basis_findings": [
52
+ {
53
+ "issue": "string",
54
+ "rule_ref": "string",
55
+ "source_url": "string",
56
+ "severity": "CRITICAL|HIGH|ADVISORY",
57
+ "confidence": 0,
58
+ "why": "string",
59
+ "fix": "string"
60
+ }
61
+ ],
62
+ "source_verification": {
63
+ "verification_timestamp": "ISO-8601 string",
64
+ "official_urls": ["string"],
65
+ "google_web_search_used": true,
66
+ "manual_review_required": false
67
+ },
68
+ "manual_review_required": false
69
+ }
prompts/pra.md CHANGED
@@ -7,21 +7,29 @@ Primary scope
7
  - Safety-of-funds framing
8
  - Prudential soundness representations by firms plausibly within PRA-relevant populations
9
 
 
 
 
 
 
10
  Method
11
- - Run only when the router identifies prudential relevance.
12
- - If the submission is mainly an FCA promotions matter, say that and keep findings narrow.
13
  - Avoid broad application to ordinary marketing copy unless prudential claims are explicit.
14
 
15
  Output
16
  Return JSON only:
17
  {
18
  "module": "pra",
 
 
19
  "summary": "string",
20
  "findings": [
21
  {
22
  "issue": "string",
23
  "rule_ref": "string",
24
- "authority_type": "rule|statement|guidance",
 
25
  "severity": "CRITICAL|HIGH|ADVISORY",
26
  "confidence": 0,
27
  "why": "string",
@@ -29,5 +37,11 @@ Return JSON only:
29
  }
30
  ],
31
  "safe_rewrite": "string",
 
 
 
 
 
 
32
  "manual_review_required": false
33
  }
 
7
  - Safety-of-funds framing
8
  - Prudential soundness representations by firms plausibly within PRA-relevant populations
9
 
10
+ Mandatory research behavior
11
+ - Use `google_web_search` before finalizing when prudential rule scope, current PRA expectations, or firm-type applicability need verification.
12
+ - Prefer Bank of England / PRA, FCA, and legislation.gov.uk sources.
13
+ - If prudential relevance is weak, return `applicability="not_apply"` or `uncertain` instead of manufacturing findings.
14
+
15
  Method
16
+ - Keep findings narrow and tied to prudential claims.
17
+ - If the submission is mainly an FCA promotions matter, say that and avoid broad PRA conclusions.
18
  - Avoid broad application to ordinary marketing copy unless prudential claims are explicit.
19
 
20
  Output
21
  Return JSON only:
22
  {
23
  "module": "pra",
24
+ "applicability": "apply|not_apply|uncertain",
25
+ "why_applicable": "string",
26
  "summary": "string",
27
  "findings": [
28
  {
29
  "issue": "string",
30
  "rule_ref": "string",
31
+ "source_url": "string",
32
+ "authority_type": "rule|statement|guidance|legislation",
33
  "severity": "CRITICAL|HIGH|ADVISORY",
34
  "confidence": 0,
35
  "why": "string",
 
37
  }
38
  ],
39
  "safe_rewrite": "string",
40
+ "source_verification": {
41
+ "verification_timestamp": "ISO-8601 string",
42
+ "official_urls": ["string"],
43
+ "google_web_search_used": true,
44
+ "manual_review_required": false
45
+ },
46
  "manual_review_required": false
47
  }
prompts/validation.md CHANGED
@@ -1,19 +1,29 @@
1
  Role
2
- You are the final validation stage for a multi-module UK regulatory review pipeline.
3
 
4
  Inputs
5
- - Router output
6
  - FCA module output
7
- - CMA module output when present
8
- - PRA module output when present
 
 
 
 
 
 
 
9
 
10
  Task
 
11
  - Merge the module outputs into one final answer.
 
 
12
  - Remove duplicates.
13
- - Downgrade weak or inconsistent findings.
14
- - Check that rule references and severity levels are internally coherent.
15
- - If modules conflict, surface the conflict and set manual review.
16
- - Prefer caution over false certainty.
17
 
18
  Output
19
  Return JSON only:
@@ -21,11 +31,17 @@ Return JSON only:
21
  "overall_verdict": "PASS|FAIL|MANUAL_REVIEW",
22
  "risk_level": "low|medium|high",
23
  "summary": "string",
 
 
 
 
 
24
  "validated_findings": [
25
  {
26
- "module": "fca|cma|pra",
27
  "issue": "string",
28
  "rule_ref": "string",
 
29
  "severity": "CRITICAL|HIGH|ADVISORY",
30
  "confidence": 0,
31
  "why": "string",
@@ -34,5 +50,15 @@ Return JSON only:
34
  ],
35
  "safe_rewrite": "string",
36
  "conflicts": ["string"],
 
 
 
 
 
 
 
 
 
 
37
  "manual_review_required": false
38
  }
 
1
  Role
2
+ You are the final validation and arbitration stage for a multi-module UK regulatory review pipeline.
3
 
4
  Inputs
5
+ - legal_basis output
6
  - FCA module output
7
+ - CMA module output
8
+ - PRA module output
9
+ - optional prior pass history
10
+ - optional validator critique for retry pass
11
+
12
+ Mandatory research behavior
13
+ - Use `google_web_search` again before finalizing whenever you need to verify current applicability, exemptions, rule citations, source URLs, or conflicts across module outputs.
14
+ - Prefer official FCA, FCA Handbook, legislation.gov.uk, gov.uk, Bank of England/PRA, and CMA sources.
15
+ - If official verification remains inconclusive, set `manual_review_required=true` rather than pretending certainty.
16
 
17
  Task
18
+ - Treat legal-basis/perimeter analysis as a first-class constraint.
19
  - Merge the module outputs into one final answer.
20
+ - Suppress findings from modules that are not applicable.
21
+ - If applicability is uncertain, prefer `MANUAL_REVIEW` over definitive breach claims.
22
  - Remove duplicates.
23
+ - Resolve or surface conflicts.
24
+ - Check that cited rules, source URLs, and severity levels are coherent.
25
+ - Decide whether one full retry pass is needed.
26
+ - On a retry pass, use the prior pass history and validator critique to correct the second-pass outcome, not to repeat the same mistake.
27
 
28
  Output
29
  Return JSON only:
 
31
  "overall_verdict": "PASS|FAIL|MANUAL_REVIEW",
32
  "risk_level": "low|medium|high",
33
  "summary": "string",
34
+ "applicability_summary": {
35
+ "fca": "apply|not_apply|uncertain",
36
+ "cma": "apply|not_apply|uncertain",
37
+ "pra": "apply|not_apply|uncertain"
38
+ },
39
  "validated_findings": [
40
  {
41
+ "module": "legal_basis|fca|cma|pra",
42
  "issue": "string",
43
  "rule_ref": "string",
44
+ "source_url": "string",
45
  "severity": "CRITICAL|HIGH|ADVISORY",
46
  "confidence": 0,
47
  "why": "string",
 
50
  ],
51
  "safe_rewrite": "string",
52
  "conflicts": ["string"],
53
+ "retry_required": false,
54
+ "retry_targets": ["legal_basis", "fca", "cma", "pra"],
55
+ "retry_reason": "string",
56
+ "retry_guidance": ["string"],
57
+ "source_verification": {
58
+ "verification_timestamp": "ISO-8601 string",
59
+ "official_urls": ["string"],
60
+ "google_web_search_used": true,
61
+ "manual_review_required": false
62
+ },
63
  "manual_review_required": false
64
  }
server.py CHANGED
@@ -34,6 +34,8 @@ LOCKED_GEMINI_MODEL = "gemini-3-flash-preview"
34
  MAX_IMAGE_BYTES = int(os.environ.get("MAX_IMAGE_BYTES", str(8 * 1024 * 1024)))
35
  MAX_BATCH_IMAGES = int(os.environ.get("MAX_BATCH_IMAGES", "20"))
36
  MAX_PARALLEL_WORKERS = max(1, int(os.environ.get("MAX_PARALLEL_WORKERS", "4")))
 
 
37
  LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT", "regtechdemo-hf-v2")
38
  LANGSMITH_TRACE_USER_AD_COPY = (
39
  os.environ.get("LANGSMITH_TRACE_USER_AD_COPY", "true").strip().lower() == "true"
@@ -76,13 +78,15 @@ JSON_SCHEMA_HINT = {
76
  "safe_rewrite": "optional ad rewrite",
77
  }
78
  PROMPT_FILE_MAP = {
79
- "router": "router.md",
80
  "fca": "fca.md",
81
  "cma": "cma.md",
82
  "pra": "pra.md",
83
  "validation": "validation.md",
84
  }
85
- PIPELINE_MODULE_ORDER = ["fca", "cma", "pra"]
 
 
86
  PROMPT_CACHE: dict[str, str] = {}
87
 
88
  if os.environ.get("LANGSMITH_API_KEY") and ls is None:
@@ -204,79 +208,43 @@ def build_submission_block(
204
  return "\n".join(parts)
205
 
206
 
207
- def build_router_prompt(
208
- *,
209
- ad_text: str,
210
- extra_context: str,
211
- image_at_path: str | None,
212
- system_prompt: str,
213
- request_id: str | None = None,
214
- ) -> str:
215
- with traced_stage(
216
- "build_router_prompt",
217
- "tool",
218
- inputs=sanitize_for_langsmith(
219
- {
220
- "ad_text": ad_text,
221
- "extra_context": extra_context,
222
- "image_at_path": image_at_path,
223
- "system_prompt": system_prompt,
224
- },
225
- ad_text=ad_text,
226
- ),
227
- metadata={"request_id": request_id},
228
- tags=["prompt-build", "router"],
229
- ) as (_run, outputs):
230
- operator_override = get_operator_override(system_prompt)
231
- prompt = [
232
- load_prompt_template("router"),
233
- "",
234
- build_submission_block(
235
- ad_text=ad_text,
236
- extra_context=extra_context,
237
- image_at_path=image_at_path,
238
- ),
239
- ]
240
- if operator_override:
241
- prompt += ["", "Additional operator instructions:", operator_override]
242
- full_prompt = "\n".join(prompt).strip()
243
- outputs["prompt"] = sanitize_for_langsmith(full_prompt, ad_text=ad_text)
244
- return full_prompt
245
-
246
-
247
- def build_module_prompt(
248
- module_name: str,
249
  *,
250
  ad_text: str,
251
  extra_context: str,
252
  image_at_path: str | None,
253
  system_prompt: str,
254
- router_output: dict[str, Any],
 
 
255
  request_id: str | None = None,
256
  ) -> str:
257
  with traced_stage(
258
- f"build_{module_name}_prompt",
259
  "tool",
260
  inputs=sanitize_for_langsmith(
261
  {
262
- "module": module_name,
263
  "ad_text": ad_text,
264
  "extra_context": extra_context,
265
  "image_at_path": image_at_path,
266
  "system_prompt": system_prompt,
267
- "router_output": router_output,
 
 
268
  },
269
  ad_text=ad_text,
270
  ),
271
- metadata={"request_id": request_id, "module": module_name},
272
- tags=["prompt-build", module_name],
273
  ) as (_run, outputs):
274
  operator_override = get_operator_override(system_prompt)
275
  prompt = [
276
- load_prompt_template(module_name),
277
  "",
278
- "Router output JSON:",
279
- json.dumps(router_output, ensure_ascii=True, indent=2),
280
  "",
281
  build_submission_block(
282
  ad_text=ad_text,
@@ -284,6 +252,18 @@ def build_module_prompt(
284
  image_at_path=image_at_path,
285
  ),
286
  ]
 
 
 
 
 
 
 
 
 
 
 
 
287
  if operator_override:
288
  prompt += ["", "Additional operator instructions:", operator_override]
289
  full_prompt = "\n".join(prompt).strip()
@@ -297,8 +277,11 @@ def build_validation_prompt(
297
  extra_context: str,
298
  image_at_path: str | None,
299
  system_prompt: str,
300
- router_output: dict[str, Any],
 
301
  module_outputs: dict[str, dict[str, Any]],
 
 
302
  request_id: str | None = None,
303
  ) -> str:
304
  with traced_stage(
@@ -310,20 +293,26 @@ def build_validation_prompt(
310
  "extra_context": extra_context,
311
  "image_at_path": image_at_path,
312
  "system_prompt": system_prompt,
313
- "router_output": router_output,
 
314
  "module_outputs": module_outputs,
 
 
315
  },
316
  ad_text=ad_text,
317
  ),
318
- metadata={"request_id": request_id},
319
  tags=["prompt-build", "validation"],
320
  ) as (_run, outputs):
321
  operator_override = get_operator_override(system_prompt)
322
  prompt = [
323
  load_prompt_template("validation"),
324
  "",
325
- "Router output JSON:",
326
- json.dumps(router_output, ensure_ascii=True, indent=2),
 
 
 
327
  "",
328
  "Module outputs JSON:",
329
  json.dumps(module_outputs, ensure_ascii=True, indent=2),
@@ -334,6 +323,18 @@ def build_validation_prompt(
334
  image_at_path=image_at_path,
335
  ),
336
  ]
 
 
 
 
 
 
 
 
 
 
 
 
337
  if operator_override:
338
  prompt += ["", "Additional operator instructions:", operator_override]
339
  full_prompt = "\n".join(prompt).strip()
@@ -684,12 +685,6 @@ def dedupe_preserve_order(values: list[str]) -> list[str]:
684
  output.append(key)
685
  return output
686
 
687
-
688
- def normalize_module_name(module_name: str) -> str:
689
- value = str(module_name or "").strip().lower()
690
- return value if value in PIPELINE_MODULE_ORDER else ""
691
-
692
-
693
  def stage_result(
694
  stage_name: str,
695
  ok: bool,
@@ -725,48 +720,191 @@ def run_named_stage(
725
  return stage_result(stage_name, ok, status, result)
726
 
727
 
728
- def default_router_output(ad_text: str, image_at_path: str | None) -> dict[str, Any]:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
729
  return {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
730
  "input_mode": infer_input_mode(ad_text, image_at_path),
731
  "product_type": "unknown",
732
  "channel": "unknown",
733
  "audience": "unknown",
734
- "recommended_modules": ["fca"],
735
- "routing_rationale": ["Router failed or returned invalid JSON; defaulted to FCA-only review."],
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
736
  "manual_review_required": True,
737
  }
738
 
739
 
740
- def coerce_router_output(
741
  stage: dict[str, Any],
742
  *,
743
  ad_text: str,
744
  image_at_path: str | None,
745
  ) -> dict[str, Any]:
746
  parsed = stage.get("parsed_output")
747
- fallback = default_router_output(ad_text, image_at_path)
748
  if not isinstance(parsed, dict):
749
  return fallback
750
 
751
- recommended_raw = parsed.get("recommended_modules")
752
- recommended: list[str] = []
753
- if isinstance(recommended_raw, list):
754
- for item in recommended_raw:
755
- normalized = normalize_module_name(str(item))
756
- if normalized:
757
- recommended.append(normalized)
758
- if "fca" not in recommended:
759
- recommended.insert(0, "fca")
760
-
761
- routing_rationale = parsed.get("routing_rationale")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
762
  return {
 
 
763
  "input_mode": str(parsed.get("input_mode") or infer_input_mode(ad_text, image_at_path)),
764
  "product_type": str(parsed.get("product_type") or "unknown"),
765
  "channel": str(parsed.get("channel") or "unknown"),
766
  "audience": str(parsed.get("audience") or "unknown"),
767
- "recommended_modules": dedupe_preserve_order(recommended or ["fca"]),
768
- "routing_rationale": routing_rationale if isinstance(routing_rationale, list) else fallback["routing_rationale"],
769
- "manual_review_required": bool(parsed.get("manual_review_required", False) or not stage.get("ok")),
 
 
 
 
 
 
 
770
  }
771
 
772
 
@@ -774,56 +912,124 @@ def coerce_module_output(module_name: str, stage: dict[str, Any]) -> dict[str, A
774
  parsed = stage.get("parsed_output")
775
  fallback = {
776
  "module": module_name,
 
 
777
  "summary": f"{module_name.upper()} module did not return valid JSON.",
778
  "findings": [],
779
  "safe_rewrite": "",
 
 
 
 
 
 
780
  "manual_review_required": True,
781
  }
782
  if not isinstance(parsed, dict):
783
  return fallback
784
 
785
- findings = parsed.get("findings")
786
- normalized_findings: list[dict[str, Any]] = []
787
- if isinstance(findings, list):
788
- for finding in findings:
789
- if not isinstance(finding, dict):
790
- continue
791
- normalized_findings.append(
792
- {
793
- "issue": str(finding.get("issue") or "Unspecified issue"),
794
- "rule_ref": str(finding.get("rule_ref") or "Unknown"),
795
- "authority_type": str(finding.get("authority_type") or "unknown"),
796
- "severity": str(finding.get("severity") or "ADVISORY").upper(),
797
- "confidence": int(finding.get("confidence") or 0),
798
- "why": str(finding.get("why") or "No explanation provided."),
799
- "fix": str(finding.get("fix") or "No fix provided."),
800
- }
801
  )
802
 
 
 
803
  return {
804
  "module": normalize_module_name(str(parsed.get("module") or module_name)) or module_name,
 
 
805
  "summary": str(parsed.get("summary") or f"{module_name.upper()} module completed."),
806
- "findings": normalized_findings,
807
  "safe_rewrite": str(parsed.get("safe_rewrite") or ""),
808
- "manual_review_required": bool(parsed.get("manual_review_required", False) or not stage.get("ok")),
 
 
 
 
 
809
  }
810
 
811
 
812
  def synthesize_validation_output(
813
- router_output: dict[str, Any],
814
  module_outputs: dict[str, dict[str, Any]],
 
 
815
  ) -> dict[str, Any]:
816
  validated_findings: list[dict[str, Any]] = []
 
817
  safe_rewrite = ""
818
- manual_review_required = bool(router_output.get("manual_review_required"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
819
 
820
- for module_name in PIPELINE_MODULE_ORDER:
821
  module_output = module_outputs.get(module_name)
822
  if not module_output:
823
  continue
824
- manual_review_required = manual_review_required or bool(module_output.get("manual_review_required"))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
825
  if not safe_rewrite and module_output.get("safe_rewrite"):
826
  safe_rewrite = str(module_output.get("safe_rewrite"))
 
827
  for finding in module_output.get("findings", []):
828
  if not isinstance(finding, dict):
829
  continue
@@ -832,25 +1038,82 @@ def synthesize_validation_output(
832
  "module": module_name,
833
  "issue": str(finding.get("issue") or "Unspecified issue"),
834
  "rule_ref": str(finding.get("rule_ref") or "Unknown"),
 
835
  "severity": str(finding.get("severity") or "ADVISORY").upper(),
836
- "confidence": int(finding.get("confidence") or 0),
837
  "why": str(finding.get("why") or "No explanation provided."),
838
  "fix": str(finding.get("fix") or "No fix provided."),
839
  }
840
  )
841
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
842
  has_high = any(severity_rank(item.get("severity", "")) >= 2 for item in validated_findings)
843
- risk_level = "high" if has_high else ("medium" if validated_findings or manual_review_required else "low")
844
- overall_verdict = "MANUAL_REVIEW" if manual_review_required else ("FAIL" if validated_findings else "PASS")
845
- summary = "No material issues identified." if not validated_findings else "Potential issues identified across selected regulatory modules."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
846
 
847
  return {
848
  "overall_verdict": overall_verdict,
849
  "risk_level": risk_level,
850
  "summary": summary,
 
851
  "validated_findings": validated_findings,
852
  "safe_rewrite": safe_rewrite,
853
- "conflicts": [],
 
 
 
 
 
 
 
 
 
 
854
  "manual_review_required": manual_review_required,
855
  }
856
 
@@ -858,27 +1121,35 @@ def synthesize_validation_output(
858
  def coerce_validation_output(
859
  stage: dict[str, Any],
860
  *,
861
- router_output: dict[str, Any],
862
  module_outputs: dict[str, dict[str, Any]],
 
863
  ) -> dict[str, Any]:
864
  parsed = stage.get("parsed_output")
865
- fallback = synthesize_validation_output(router_output, module_outputs)
866
  if not isinstance(parsed, dict):
867
  return fallback
868
 
869
- validated_findings_raw = parsed.get("validated_findings")
 
 
 
 
 
870
  validated_findings: list[dict[str, Any]] = []
871
- if isinstance(validated_findings_raw, list):
872
- for finding in validated_findings_raw:
873
- if not isinstance(finding, dict):
874
- continue
 
875
  validated_findings.append(
876
  {
877
- "module": normalize_module_name(str(finding.get("module") or "")) or "fca",
878
  "issue": str(finding.get("issue") or "Unspecified issue"),
879
  "rule_ref": str(finding.get("rule_ref") or "Unknown"),
 
880
  "severity": str(finding.get("severity") or "ADVISORY").upper(),
881
- "confidence": int(finding.get("confidence") or 0),
882
  "why": str(finding.get("why") or "No explanation provided."),
883
  "fix": str(finding.get("fix") or "No fix provided."),
884
  }
@@ -891,17 +1162,53 @@ def coerce_validation_output(
891
  if risk_level not in {"low", "medium", "high"}:
892
  risk_level = fallback["risk_level"]
893
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
894
  conflicts = parsed.get("conflicts")
 
 
895
  return {
896
  "overall_verdict": str(parsed.get("overall_verdict") or fallback["overall_verdict"]).upper(),
897
  "risk_level": risk_level,
898
  "summary": str(parsed.get("summary") or fallback["summary"]),
 
899
  "validated_findings": validated_findings,
900
  "safe_rewrite": str(parsed.get("safe_rewrite") or fallback["safe_rewrite"]),
901
  "conflicts": conflicts if isinstance(conflicts, list) else fallback["conflicts"],
902
- "manual_review_required": bool(
903
- parsed.get("manual_review_required", False) or fallback["manual_review_required"]
904
- ),
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
905
  }
906
 
907
 
@@ -919,7 +1226,8 @@ def build_legacy_output(validation_output: dict[str, Any]) -> dict[str, Any]:
919
  "fix": str(finding.get("fix") or "No fix provided."),
920
  "module": str(finding.get("module") or "unknown"),
921
  "severity": str(finding.get("severity") or "ADVISORY"),
922
- "confidence": int(finding.get("confidence") or 0),
 
923
  }
924
  )
925
 
@@ -931,9 +1239,51 @@ def build_legacy_output(validation_output: dict[str, Any]) -> dict[str, Any]:
931
  "overall_verdict": validation_output.get("overall_verdict", "MANUAL_REVIEW"),
932
  "manual_review_required": bool(validation_output.get("manual_review_required", False)),
933
  "conflicts": validation_output.get("conflicts", []),
 
 
934
  }
935
 
936
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
937
  def run_review_pipeline(
938
  *,
939
  ad_text: str,
@@ -959,95 +1309,133 @@ def run_review_pipeline(
959
  metadata={"request_id": request_id, **(trace_metadata or {})},
960
  tags=["review-pipeline"],
961
  ) as (_run, outputs):
962
- router_prompt = build_router_prompt(
963
- ad_text=ad_text,
964
- extra_context=extra_context,
965
- image_at_path=image_at_path,
966
- system_prompt=system_prompt,
967
- request_id=request_id,
968
- )
969
- router_stage = run_named_stage(
970
- "router",
971
- router_prompt,
972
- ad_text=ad_text,
973
- request_id=request_id,
974
- trace_metadata=trace_metadata,
975
- )
976
- router_output = coerce_router_output(router_stage, ad_text=ad_text, image_at_path=image_at_path)
 
 
 
 
 
 
 
 
 
 
977
 
978
- selected_modules = [
979
- module_name
980
- for module_name in PIPELINE_MODULE_ORDER
981
- if module_name in set(router_output.get("recommended_modules", []))
982
- ]
983
- if "fca" not in selected_modules:
984
- selected_modules.insert(0, "fca")
985
- selected_modules = dedupe_preserve_order(selected_modules)
986
-
987
- module_stage_results: dict[str, dict[str, Any]] = {}
988
- module_outputs: dict[str, dict[str, Any]] = {}
989
- for module_name in selected_modules:
990
- module_prompt = build_module_prompt(
991
- module_name,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
992
  ad_text=ad_text,
993
  extra_context=extra_context,
994
  image_at_path=image_at_path,
995
  system_prompt=system_prompt,
996
- router_output=router_output,
 
 
 
 
997
  request_id=request_id,
998
  )
999
- module_stage = run_named_stage(
1000
- module_name,
1001
- module_prompt,
1002
  ad_text=ad_text,
1003
  request_id=request_id,
1004
- trace_metadata={"module": module_name, **(trace_metadata or {})},
 
 
 
 
 
 
1005
  )
1006
- module_stage_results[module_name] = module_stage
1007
- module_outputs[module_name] = coerce_module_output(module_name, module_stage)
1008
 
1009
- validation_prompt = build_validation_prompt(
1010
- ad_text=ad_text,
1011
- extra_context=extra_context,
1012
- image_at_path=image_at_path,
1013
- system_prompt=system_prompt,
1014
- router_output=router_output,
1015
- module_outputs=module_outputs,
1016
- request_id=request_id,
1017
- )
1018
- validation_stage = run_named_stage(
1019
- "validation",
1020
- validation_prompt,
1021
- ad_text=ad_text,
1022
- request_id=request_id,
1023
- trace_metadata=trace_metadata,
1024
- )
1025
- validation_output = coerce_validation_output(
1026
- validation_stage,
1027
- router_output=router_output,
1028
- module_outputs=module_outputs,
1029
- )
1030
- legacy_output = build_legacy_output(validation_output)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1031
 
 
1032
  pipeline_output = {
1033
  "request_id": request_id,
1034
  "input_mode": infer_input_mode(ad_text, image_at_path),
1035
- "router": {
1036
- "stage": router_stage,
1037
- "output": router_output,
1038
- },
1039
- "selected_modules": selected_modules,
1040
- "modules": {
1041
- module_name: {
1042
- "stage": module_stage_results[module_name],
1043
- "output": module_outputs[module_name],
1044
- }
1045
- for module_name in selected_modules
1046
- },
1047
- "validation": {
1048
- "stage": validation_stage,
1049
- "output": validation_output,
1050
- },
1051
  "legacy_output": legacy_output,
1052
  }
1053
  outputs["pipeline_output"] = sanitize_for_langsmith(pipeline_output, ad_text=ad_text)
 
34
  MAX_IMAGE_BYTES = int(os.environ.get("MAX_IMAGE_BYTES", str(8 * 1024 * 1024)))
35
  MAX_BATCH_IMAGES = int(os.environ.get("MAX_BATCH_IMAGES", "20"))
36
  MAX_PARALLEL_WORKERS = max(1, int(os.environ.get("MAX_PARALLEL_WORKERS", "4")))
37
+ PIPELINE_STAGE_WORKERS = max(1, int(os.environ.get("PIPELINE_STAGE_WORKERS", "4")))
38
+ VALIDATION_RETRY_PASSES = max(0, int(os.environ.get("VALIDATION_RETRY_PASSES", "1")))
39
  LANGSMITH_PROJECT = os.environ.get("LANGSMITH_PROJECT", "regtechdemo-hf-v2")
40
  LANGSMITH_TRACE_USER_AD_COPY = (
41
  os.environ.get("LANGSMITH_TRACE_USER_AD_COPY", "true").strip().lower() == "true"
 
78
  "safe_rewrite": "optional ad rewrite",
79
  }
80
  PROMPT_FILE_MAP = {
81
+ "legal_basis": "legal_basis.md",
82
  "fca": "fca.md",
83
  "cma": "cma.md",
84
  "pra": "pra.md",
85
  "validation": "validation.md",
86
  }
87
+ PIPELINE_STAGE_ORDER = ["legal_basis", "fca", "cma", "pra"]
88
+ REGULATOR_STAGE_ORDER = ["fca", "cma", "pra"]
89
+ ALL_REVIEW_STAGES = set(PIPELINE_STAGE_ORDER)
90
  PROMPT_CACHE: dict[str, str] = {}
91
 
92
  if os.environ.get("LANGSMITH_API_KEY") and ls is None:
 
208
  return "\n".join(parts)
209
 
210
 
211
+ def build_parallel_stage_prompt(
212
+ stage_name: str,
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
213
  *,
214
  ad_text: str,
215
  extra_context: str,
216
  image_at_path: str | None,
217
  system_prompt: str,
218
+ pass_number: int,
219
+ prior_passes: list[dict[str, Any]] | None = None,
220
+ retry_context: dict[str, Any] | None = None,
221
  request_id: str | None = None,
222
  ) -> str:
223
  with traced_stage(
224
+ f"build_{stage_name}_prompt",
225
  "tool",
226
  inputs=sanitize_for_langsmith(
227
  {
228
+ "stage": stage_name,
229
  "ad_text": ad_text,
230
  "extra_context": extra_context,
231
  "image_at_path": image_at_path,
232
  "system_prompt": system_prompt,
233
+ "pass_number": pass_number,
234
+ "prior_passes": prior_passes or [],
235
+ "retry_context": retry_context or {},
236
  },
237
  ad_text=ad_text,
238
  ),
239
+ metadata={"request_id": request_id, "stage": stage_name, "pass_number": pass_number},
240
+ tags=["prompt-build", stage_name],
241
  ) as (_run, outputs):
242
  operator_override = get_operator_override(system_prompt)
243
  prompt = [
244
+ load_prompt_template(stage_name),
245
  "",
246
+ f"Pipeline pass: {pass_number}",
247
+ "This runtime uses Gemini CLI. When the prompt requires `google_web_search`, you must use it before finalizing if the tool is available.",
248
  "",
249
  build_submission_block(
250
  ad_text=ad_text,
 
252
  image_at_path=image_at_path,
253
  ),
254
  ]
255
+ if prior_passes:
256
+ prompt += [
257
+ "",
258
+ "Prior pipeline pass history JSON:",
259
+ json.dumps(prior_passes, ensure_ascii=True, indent=2),
260
+ ]
261
+ if retry_context:
262
+ prompt += [
263
+ "",
264
+ "Validator retry context JSON:",
265
+ json.dumps(retry_context, ensure_ascii=True, indent=2),
266
+ ]
267
  if operator_override:
268
  prompt += ["", "Additional operator instructions:", operator_override]
269
  full_prompt = "\n".join(prompt).strip()
 
277
  extra_context: str,
278
  image_at_path: str | None,
279
  system_prompt: str,
280
+ pass_number: int,
281
+ legal_basis_output: dict[str, Any],
282
  module_outputs: dict[str, dict[str, Any]],
283
+ prior_passes: list[dict[str, Any]] | None = None,
284
+ retry_context: dict[str, Any] | None = None,
285
  request_id: str | None = None,
286
  ) -> str:
287
  with traced_stage(
 
293
  "extra_context": extra_context,
294
  "image_at_path": image_at_path,
295
  "system_prompt": system_prompt,
296
+ "pass_number": pass_number,
297
+ "legal_basis_output": legal_basis_output,
298
  "module_outputs": module_outputs,
299
+ "prior_passes": prior_passes or [],
300
+ "retry_context": retry_context or {},
301
  },
302
  ad_text=ad_text,
303
  ),
304
+ metadata={"request_id": request_id, "pass_number": pass_number},
305
  tags=["prompt-build", "validation"],
306
  ) as (_run, outputs):
307
  operator_override = get_operator_override(system_prompt)
308
  prompt = [
309
  load_prompt_template("validation"),
310
  "",
311
+ f"Pipeline pass: {pass_number}",
312
+ "This runtime uses Gemini CLI. When the prompt requires `google_web_search`, you must use it before finalizing if the tool is available.",
313
+ "",
314
+ "Legal basis output JSON:",
315
+ json.dumps(legal_basis_output, ensure_ascii=True, indent=2),
316
  "",
317
  "Module outputs JSON:",
318
  json.dumps(module_outputs, ensure_ascii=True, indent=2),
 
323
  image_at_path=image_at_path,
324
  ),
325
  ]
326
+ if prior_passes:
327
+ prompt += [
328
+ "",
329
+ "Prior pipeline pass history JSON:",
330
+ json.dumps(prior_passes, ensure_ascii=True, indent=2),
331
+ ]
332
+ if retry_context:
333
+ prompt += [
334
+ "",
335
+ "Validator retry context JSON:",
336
+ json.dumps(retry_context, ensure_ascii=True, indent=2),
337
+ ]
338
  if operator_override:
339
  prompt += ["", "Additional operator instructions:", operator_override]
340
  full_prompt = "\n".join(prompt).strip()
 
685
  output.append(key)
686
  return output
687
 
 
 
 
 
 
 
688
  def stage_result(
689
  stage_name: str,
690
  ok: bool,
 
720
  return stage_result(stage_name, ok, status, result)
721
 
722
 
723
+ def normalize_stage_name(stage_name: str) -> str:
724
+ value = str(stage_name or "").strip().lower()
725
+ return value if value in ALL_REVIEW_STAGES else ""
726
+
727
+
728
+ def normalize_module_name(module_name: str) -> str:
729
+ value = str(module_name or "").strip().lower()
730
+ return value if value in REGULATOR_STAGE_ORDER else ""
731
+
732
+
733
+ def normalize_applicability(value: Any) -> str:
734
+ normalized = str(value or "").strip().lower()
735
+ if normalized in {"apply", "not_apply", "uncertain"}:
736
+ return normalized
737
+ return "uncertain"
738
+
739
+
740
+ def normalize_confidence(value: Any) -> float:
741
+ try:
742
+ numeric = float(value)
743
+ except (TypeError, ValueError):
744
+ return 0.0
745
+ if numeric < 0:
746
+ return 0.0
747
+ if numeric > 100:
748
+ return 100.0
749
+ return round(numeric, 2)
750
+
751
+
752
+ def normalize_string_list(value: Any) -> list[str]:
753
+ if not isinstance(value, list):
754
+ return []
755
+ items = [str(item).strip() for item in value if str(item).strip()]
756
+ return dedupe_preserve_order(items)
757
+
758
+
759
+ def normalize_source_verification(value: Any) -> dict[str, Any]:
760
+ if not isinstance(value, dict):
761
+ return {
762
+ "verification_timestamp": "",
763
+ "official_urls": [],
764
+ "google_web_search_used": False,
765
+ "manual_review_required": True,
766
+ }
767
+
768
+ official_urls = normalize_string_list(
769
+ value.get("official_urls")
770
+ or value.get("source_urls")
771
+ or value.get("urls")
772
+ or []
773
+ )
774
+ if not official_urls:
775
+ official_urls = dedupe_preserve_order(
776
+ normalize_string_list(value.get("handbook_urls"))
777
+ + normalize_string_list(value.get("policy_urls"))
778
+ + normalize_string_list(value.get("policy_statement_urls"))
779
+ + normalize_string_list(value.get("legislation_urls"))
780
+ )
781
+
782
  return {
783
+ "verification_timestamp": str(value.get("verification_timestamp") or ""),
784
+ "official_urls": official_urls,
785
+ "google_web_search_used": bool(value.get("google_web_search_used", False)),
786
+ "manual_review_required": bool(value.get("manual_review_required", False)),
787
+ }
788
+
789
+
790
+ def normalize_finding(
791
+ finding: dict[str, Any],
792
+ *,
793
+ default_module: str,
794
+ default_authority_type: str = "unknown",
795
+ ) -> dict[str, Any]:
796
+ return {
797
+ "module": default_module,
798
+ "issue": str(finding.get("issue") or "Unspecified issue"),
799
+ "rule_ref": str(finding.get("rule_ref") or "Unknown"),
800
+ "source_url": str(finding.get("source_url") or ""),
801
+ "authority_type": str(finding.get("authority_type") or default_authority_type),
802
+ "severity": str(finding.get("severity") or "ADVISORY").upper(),
803
+ "confidence": normalize_confidence(finding.get("confidence")),
804
+ "why": str(finding.get("why") or "No explanation provided."),
805
+ "fix": str(finding.get("fix") or "No fix provided."),
806
+ }
807
+
808
+
809
+ def default_legal_basis_output(ad_text: str, image_at_path: str | None) -> dict[str, Any]:
810
+ return {
811
+ "module": "legal_basis",
812
+ "summary": "Legal basis could not be determined reliably.",
813
  "input_mode": infer_input_mode(ad_text, image_at_path),
814
  "product_type": "unknown",
815
  "channel": "unknown",
816
  "audience": "unknown",
817
+ "promotion_scope": "uncertain",
818
+ "claimed_exemptions": [],
819
+ "applicability": {
820
+ "fca": "uncertain",
821
+ "cma": "uncertain",
822
+ "pra": "uncertain",
823
+ },
824
+ "legal_basis_findings": [
825
+ {
826
+ "module": "legal_basis",
827
+ "issue": "Legal basis could not be verified",
828
+ "rule_ref": "Perimeter / exemption verification required",
829
+ "source_url": "",
830
+ "authority_type": "verification",
831
+ "severity": "ADVISORY",
832
+ "confidence": 0.0,
833
+ "why": "The legal-basis stage failed or returned invalid JSON, so regulator applicability is uncertain.",
834
+ "fix": "Re-run with verified official sources or escalate to manual review.",
835
+ }
836
+ ],
837
+ "source_verification": {
838
+ "verification_timestamp": "",
839
+ "official_urls": [],
840
+ "google_web_search_used": False,
841
+ "manual_review_required": True,
842
+ },
843
  "manual_review_required": True,
844
  }
845
 
846
 
847
+ def coerce_legal_basis_output(
848
  stage: dict[str, Any],
849
  *,
850
  ad_text: str,
851
  image_at_path: str | None,
852
  ) -> dict[str, Any]:
853
  parsed = stage.get("parsed_output")
854
+ fallback = default_legal_basis_output(ad_text, image_at_path)
855
  if not isinstance(parsed, dict):
856
  return fallback
857
 
858
+ claimed_exemptions: list[dict[str, Any]] = []
859
+ for item in parsed.get("claimed_exemptions", []):
860
+ if not isinstance(item, dict):
861
+ continue
862
+ status = str(item.get("status") or "uncertain").strip().lower()
863
+ if status not in {"claimed", "not_claimed", "uncertain"}:
864
+ status = "uncertain"
865
+ claimed_exemptions.append(
866
+ {
867
+ "name": str(item.get("name") or "Unknown"),
868
+ "status": status,
869
+ "evidence": str(item.get("evidence") or ""),
870
+ }
871
+ )
872
+
873
+ legal_basis_findings: list[dict[str, Any]] = []
874
+ for finding in parsed.get("legal_basis_findings", []):
875
+ if isinstance(finding, dict):
876
+ legal_basis_findings.append(
877
+ normalize_finding(
878
+ finding,
879
+ default_module="legal_basis",
880
+ default_authority_type="verification",
881
+ )
882
+ )
883
+
884
+ source_verification = normalize_source_verification(parsed.get("source_verification"))
885
+ manual_review_required = bool(
886
+ parsed.get("manual_review_required", False)
887
+ or source_verification.get("manual_review_required", False)
888
+ or not stage.get("ok")
889
+ )
890
+
891
  return {
892
+ "module": "legal_basis",
893
+ "summary": str(parsed.get("summary") or fallback["summary"]),
894
  "input_mode": str(parsed.get("input_mode") or infer_input_mode(ad_text, image_at_path)),
895
  "product_type": str(parsed.get("product_type") or "unknown"),
896
  "channel": str(parsed.get("channel") or "unknown"),
897
  "audience": str(parsed.get("audience") or "unknown"),
898
+ "promotion_scope": str(parsed.get("promotion_scope") or "uncertain"),
899
+ "claimed_exemptions": claimed_exemptions,
900
+ "applicability": {
901
+ "fca": normalize_applicability(parsed.get("applicability", {}).get("fca") if isinstance(parsed.get("applicability"), dict) else None),
902
+ "cma": normalize_applicability(parsed.get("applicability", {}).get("cma") if isinstance(parsed.get("applicability"), dict) else None),
903
+ "pra": normalize_applicability(parsed.get("applicability", {}).get("pra") if isinstance(parsed.get("applicability"), dict) else None),
904
+ },
905
+ "legal_basis_findings": legal_basis_findings or fallback["legal_basis_findings"],
906
+ "source_verification": source_verification,
907
+ "manual_review_required": manual_review_required,
908
  }
909
 
910
 
 
912
  parsed = stage.get("parsed_output")
913
  fallback = {
914
  "module": module_name,
915
+ "applicability": "uncertain",
916
+ "why_applicable": f"{module_name.upper()} applicability could not be verified.",
917
  "summary": f"{module_name.upper()} module did not return valid JSON.",
918
  "findings": [],
919
  "safe_rewrite": "",
920
+ "source_verification": {
921
+ "verification_timestamp": "",
922
+ "official_urls": [],
923
+ "google_web_search_used": False,
924
+ "manual_review_required": True,
925
+ },
926
  "manual_review_required": True,
927
  }
928
  if not isinstance(parsed, dict):
929
  return fallback
930
 
931
+ findings: list[dict[str, Any]] = []
932
+ for finding in parsed.get("findings", []):
933
+ if isinstance(finding, dict):
934
+ findings.append(
935
+ normalize_finding(
936
+ finding,
937
+ default_module=module_name,
938
+ )
 
 
 
 
 
 
 
 
939
  )
940
 
941
+ source_verification = normalize_source_verification(parsed.get("source_verification"))
942
+
943
  return {
944
  "module": normalize_module_name(str(parsed.get("module") or module_name)) or module_name,
945
+ "applicability": normalize_applicability(parsed.get("applicability")),
946
+ "why_applicable": str(parsed.get("why_applicable") or ""),
947
  "summary": str(parsed.get("summary") or f"{module_name.upper()} module completed."),
948
+ "findings": findings,
949
  "safe_rewrite": str(parsed.get("safe_rewrite") or ""),
950
+ "source_verification": source_verification,
951
+ "manual_review_required": bool(
952
+ parsed.get("manual_review_required", False)
953
+ or source_verification.get("manual_review_required", False)
954
+ or not stage.get("ok")
955
+ ),
956
  }
957
 
958
 
959
  def synthesize_validation_output(
960
+ legal_basis_output: dict[str, Any],
961
  module_outputs: dict[str, dict[str, Any]],
962
+ *,
963
+ pass_number: int,
964
  ) -> dict[str, Any]:
965
  validated_findings: list[dict[str, Any]] = []
966
+ conflicts: list[str] = []
967
  safe_rewrite = ""
968
+ source_urls = list(legal_basis_output.get("source_verification", {}).get("official_urls", []))
969
+ google_web_search_used = bool(
970
+ legal_basis_output.get("source_verification", {}).get("google_web_search_used", False)
971
+ )
972
+ applicability_summary = {
973
+ module_name: normalize_applicability(
974
+ legal_basis_output.get("applicability", {}).get(module_name)
975
+ )
976
+ for module_name in REGULATOR_STAGE_ORDER
977
+ }
978
+ manual_review_required = bool(legal_basis_output.get("manual_review_required", False))
979
+
980
+ for finding in legal_basis_output.get("legal_basis_findings", []):
981
+ if isinstance(finding, dict):
982
+ validated_findings.append(
983
+ {
984
+ "module": "legal_basis",
985
+ "issue": str(finding.get("issue") or "Unspecified issue"),
986
+ "rule_ref": str(finding.get("rule_ref") or "Unknown"),
987
+ "source_url": str(finding.get("source_url") or ""),
988
+ "severity": str(finding.get("severity") or "ADVISORY").upper(),
989
+ "confidence": normalize_confidence(finding.get("confidence")),
990
+ "why": str(finding.get("why") or "No explanation provided."),
991
+ "fix": str(finding.get("fix") or "No fix provided."),
992
+ }
993
+ )
994
 
995
+ for module_name in REGULATOR_STAGE_ORDER:
996
  module_output = module_outputs.get(module_name)
997
  if not module_output:
998
  continue
999
+
1000
+ module_applicability = normalize_applicability(module_output.get("applicability"))
1001
+ source_verification = module_output.get("source_verification", {})
1002
+ source_urls.extend(source_verification.get("official_urls", []))
1003
+ google_web_search_used = google_web_search_used or bool(source_verification.get("google_web_search_used", False))
1004
+
1005
+ legal_basis_applicability = applicability_summary.get(module_name, "uncertain")
1006
+ effective_applicability = legal_basis_applicability
1007
+ if effective_applicability == "uncertain" and module_applicability != "uncertain":
1008
+ effective_applicability = module_applicability
1009
+ applicability_summary[module_name] = module_applicability
1010
+
1011
+ if (
1012
+ legal_basis_applicability != "uncertain"
1013
+ and module_applicability != "uncertain"
1014
+ and legal_basis_applicability != module_applicability
1015
+ ):
1016
+ conflicts.append(
1017
+ f"{module_name.upper()} applicability conflict: legal_basis={legal_basis_applicability}, module={module_applicability}."
1018
+ )
1019
+ manual_review_required = True
1020
+
1021
+ if effective_applicability != "apply":
1022
+ if module_output.get("findings"):
1023
+ conflicts.append(
1024
+ f"{module_name.upper()} returned findings while applicability is {effective_applicability}."
1025
+ )
1026
+ manual_review_required = True
1027
+ manual_review_required = manual_review_required or bool(module_output.get("manual_review_required", False))
1028
+ continue
1029
+
1030
  if not safe_rewrite and module_output.get("safe_rewrite"):
1031
  safe_rewrite = str(module_output.get("safe_rewrite"))
1032
+
1033
  for finding in module_output.get("findings", []):
1034
  if not isinstance(finding, dict):
1035
  continue
 
1038
  "module": module_name,
1039
  "issue": str(finding.get("issue") or "Unspecified issue"),
1040
  "rule_ref": str(finding.get("rule_ref") or "Unknown"),
1041
+ "source_url": str(finding.get("source_url") or ""),
1042
  "severity": str(finding.get("severity") or "ADVISORY").upper(),
1043
+ "confidence": normalize_confidence(finding.get("confidence")),
1044
  "why": str(finding.get("why") or "No explanation provided."),
1045
  "fix": str(finding.get("fix") or "No fix provided."),
1046
  }
1047
  )
1048
 
1049
+ manual_review_required = manual_review_required or bool(module_output.get("manual_review_required", False))
1050
+
1051
+ deduped_findings: list[dict[str, Any]] = []
1052
+ seen_finding_keys: set[tuple[str, str, str]] = set()
1053
+ for finding in validated_findings:
1054
+ key = (
1055
+ str(finding.get("module") or ""),
1056
+ str(finding.get("issue") or ""),
1057
+ str(finding.get("rule_ref") or ""),
1058
+ )
1059
+ if key in seen_finding_keys:
1060
+ continue
1061
+ seen_finding_keys.add(key)
1062
+ deduped_findings.append(finding)
1063
+
1064
+ validated_findings = deduped_findings
1065
+ source_urls = dedupe_preserve_order([url for url in source_urls if url])
1066
+ applicability_uncertain = any(
1067
+ applicability_summary.get(module_name) == "uncertain" for module_name in REGULATOR_STAGE_ORDER
1068
+ )
1069
+ if applicability_uncertain:
1070
+ manual_review_required = True
1071
+
1072
  has_high = any(severity_rank(item.get("severity", "")) >= 2 for item in validated_findings)
1073
+ if validated_findings:
1074
+ risk_level = "high" if has_high else "medium"
1075
+ overall_verdict = "FAIL"
1076
+ summary = "Validated issues remain after legal-basis and regulator arbitration."
1077
+ elif manual_review_required:
1078
+ risk_level = "medium"
1079
+ overall_verdict = "MANUAL_REVIEW"
1080
+ summary = "No definitive breach set can be returned safely; manual review is required."
1081
+ else:
1082
+ risk_level = "low"
1083
+ overall_verdict = "PASS"
1084
+ summary = "No material issues identified after legal-basis and regulator arbitration."
1085
+
1086
+ retry_required = pass_number <= VALIDATION_RETRY_PASSES and bool(
1087
+ conflicts or applicability_uncertain or not google_web_search_used or not source_urls
1088
+ )
1089
+ retry_guidance: list[str] = []
1090
+ if conflicts:
1091
+ retry_guidance.append("Resolve applicability conflicts between legal basis and regulator modules.")
1092
+ if applicability_uncertain:
1093
+ retry_guidance.append("Verify whether any claimed exemption or perimeter route is actually available.")
1094
+ if not google_web_search_used:
1095
+ retry_guidance.append("Use google_web_search and cite official sources before finalizing.")
1096
+ if not source_urls:
1097
+ retry_guidance.append("Return official source URLs for legal basis and cited rules.")
1098
 
1099
  return {
1100
  "overall_verdict": overall_verdict,
1101
  "risk_level": risk_level,
1102
  "summary": summary,
1103
+ "applicability_summary": applicability_summary,
1104
  "validated_findings": validated_findings,
1105
  "safe_rewrite": safe_rewrite,
1106
+ "conflicts": dedupe_preserve_order(conflicts),
1107
+ "retry_required": retry_required,
1108
+ "retry_targets": list(PIPELINE_STAGE_ORDER) if retry_required else [],
1109
+ "retry_reason": "; ".join(dedupe_preserve_order(retry_guidance)),
1110
+ "retry_guidance": dedupe_preserve_order(retry_guidance),
1111
+ "source_verification": {
1112
+ "verification_timestamp": "",
1113
+ "official_urls": source_urls,
1114
+ "google_web_search_used": google_web_search_used,
1115
+ "manual_review_required": manual_review_required,
1116
+ },
1117
  "manual_review_required": manual_review_required,
1118
  }
1119
 
 
1121
  def coerce_validation_output(
1122
  stage: dict[str, Any],
1123
  *,
1124
+ legal_basis_output: dict[str, Any],
1125
  module_outputs: dict[str, dict[str, Any]],
1126
+ pass_number: int,
1127
  ) -> dict[str, Any]:
1128
  parsed = stage.get("parsed_output")
1129
+ fallback = synthesize_validation_output(legal_basis_output, module_outputs, pass_number=pass_number)
1130
  if not isinstance(parsed, dict):
1131
  return fallback
1132
 
1133
+ applicability_summary_raw = parsed.get("applicability_summary")
1134
+ applicability_summary = dict(fallback["applicability_summary"])
1135
+ if isinstance(applicability_summary_raw, dict):
1136
+ for module_name in REGULATOR_STAGE_ORDER:
1137
+ applicability_summary[module_name] = normalize_applicability(applicability_summary_raw.get(module_name))
1138
+
1139
  validated_findings: list[dict[str, Any]] = []
1140
+ for finding in parsed.get("validated_findings", []):
1141
+ if isinstance(finding, dict):
1142
+ normalized_module = str(finding.get("module") or "").strip().lower()
1143
+ if normalized_module not in {"legal_basis", *REGULATOR_STAGE_ORDER}:
1144
+ normalized_module = "legal_basis"
1145
  validated_findings.append(
1146
  {
1147
+ "module": normalized_module,
1148
  "issue": str(finding.get("issue") or "Unspecified issue"),
1149
  "rule_ref": str(finding.get("rule_ref") or "Unknown"),
1150
+ "source_url": str(finding.get("source_url") or ""),
1151
  "severity": str(finding.get("severity") or "ADVISORY").upper(),
1152
+ "confidence": normalize_confidence(finding.get("confidence")),
1153
  "why": str(finding.get("why") or "No explanation provided."),
1154
  "fix": str(finding.get("fix") or "No fix provided."),
1155
  }
 
1162
  if risk_level not in {"low", "medium", "high"}:
1163
  risk_level = fallback["risk_level"]
1164
 
1165
+ source_verification = normalize_source_verification(parsed.get("source_verification"))
1166
+ manual_review_required = bool(
1167
+ parsed.get("manual_review_required", False)
1168
+ or fallback["manual_review_required"]
1169
+ or source_verification.get("manual_review_required", False)
1170
+ )
1171
+ retry_required = bool(parsed.get("retry_required", False) or fallback["retry_required"])
1172
+ if pass_number > VALIDATION_RETRY_PASSES:
1173
+ retry_required = False
1174
+
1175
+ retry_targets = [
1176
+ normalize_stage_name(item)
1177
+ for item in parsed.get("retry_targets", [])
1178
+ if normalize_stage_name(item)
1179
+ ]
1180
+ if retry_required and not retry_targets:
1181
+ retry_targets = list(PIPELINE_STAGE_ORDER)
1182
+
1183
  conflicts = parsed.get("conflicts")
1184
+ retry_guidance = parsed.get("retry_guidance")
1185
+
1186
  return {
1187
  "overall_verdict": str(parsed.get("overall_verdict") or fallback["overall_verdict"]).upper(),
1188
  "risk_level": risk_level,
1189
  "summary": str(parsed.get("summary") or fallback["summary"]),
1190
+ "applicability_summary": applicability_summary,
1191
  "validated_findings": validated_findings,
1192
  "safe_rewrite": str(parsed.get("safe_rewrite") or fallback["safe_rewrite"]),
1193
  "conflicts": conflicts if isinstance(conflicts, list) else fallback["conflicts"],
1194
+ "retry_required": retry_required,
1195
+ "retry_targets": retry_targets,
1196
+ "retry_reason": str(parsed.get("retry_reason") or fallback["retry_reason"]),
1197
+ "retry_guidance": retry_guidance if isinstance(retry_guidance, list) else fallback["retry_guidance"],
1198
+ "source_verification": {
1199
+ "verification_timestamp": str(
1200
+ source_verification.get("verification_timestamp")
1201
+ or fallback["source_verification"]["verification_timestamp"]
1202
+ ),
1203
+ "official_urls": source_verification.get("official_urls")
1204
+ or fallback["source_verification"]["official_urls"],
1205
+ "google_web_search_used": bool(
1206
+ source_verification.get("google_web_search_used")
1207
+ or fallback["source_verification"]["google_web_search_used"]
1208
+ ),
1209
+ "manual_review_required": manual_review_required,
1210
+ },
1211
+ "manual_review_required": manual_review_required,
1212
  }
1213
 
1214
 
 
1226
  "fix": str(finding.get("fix") or "No fix provided."),
1227
  "module": str(finding.get("module") or "unknown"),
1228
  "severity": str(finding.get("severity") or "ADVISORY"),
1229
+ "confidence": normalize_confidence(finding.get("confidence")),
1230
+ "source_url": str(finding.get("source_url") or ""),
1231
  }
1232
  )
1233
 
 
1239
  "overall_verdict": validation_output.get("overall_verdict", "MANUAL_REVIEW"),
1240
  "manual_review_required": bool(validation_output.get("manual_review_required", False)),
1241
  "conflicts": validation_output.get("conflicts", []),
1242
+ "applicability_summary": validation_output.get("applicability_summary", {}),
1243
+ "source_verification": validation_output.get("source_verification", {}),
1244
  }
1245
 
1246
 
1247
+ def execute_parallel_stage_group(
1248
+ stage_prompts: dict[str, str],
1249
+ *,
1250
+ ad_text: str,
1251
+ request_id: str,
1252
+ trace_metadata: dict[str, Any] | None = None,
1253
+ ) -> dict[str, dict[str, Any]]:
1254
+ stage_results: dict[str, dict[str, Any]] = {}
1255
+ if not stage_prompts:
1256
+ return stage_results
1257
+
1258
+ worker_count = min(PIPELINE_STAGE_WORKERS, len(stage_prompts))
1259
+ with ThreadPoolExecutor(max_workers=worker_count) as executor:
1260
+ future_map = {
1261
+ executor.submit(
1262
+ run_named_stage,
1263
+ stage_name,
1264
+ prompt,
1265
+ ad_text=ad_text,
1266
+ request_id=request_id,
1267
+ trace_metadata={"parallel_group": True, **(trace_metadata or {})},
1268
+ ): stage_name
1269
+ for stage_name, prompt in stage_prompts.items()
1270
+ }
1271
+ for future in as_completed(future_map):
1272
+ stage_name = future_map[future]
1273
+ try:
1274
+ stage_results[stage_name] = future.result()
1275
+ except Exception as err:
1276
+ stage_results[stage_name] = {
1277
+ "stage": stage_name,
1278
+ "ok": False,
1279
+ "status": 500,
1280
+ "parsed_output": None,
1281
+ "raw_output": None,
1282
+ "error": f"Unexpected stage error: {err}",
1283
+ }
1284
+ return stage_results
1285
+
1286
+
1287
  def run_review_pipeline(
1288
  *,
1289
  ad_text: str,
 
1309
  metadata={"request_id": request_id, **(trace_metadata or {})},
1310
  tags=["review-pipeline"],
1311
  ) as (_run, outputs):
1312
+ passes: list[dict[str, Any]] = []
1313
+ retry_context: dict[str, Any] | None = None
1314
+ final_validation_output: dict[str, Any] | None = None
1315
+
1316
+ for pass_number in range(1, VALIDATION_RETRY_PASSES + 2):
1317
+ stage_prompts = {
1318
+ stage_name: build_parallel_stage_prompt(
1319
+ stage_name,
1320
+ ad_text=ad_text,
1321
+ extra_context=extra_context,
1322
+ image_at_path=image_at_path,
1323
+ system_prompt=system_prompt,
1324
+ pass_number=pass_number,
1325
+ prior_passes=passes,
1326
+ retry_context=retry_context,
1327
+ request_id=request_id,
1328
+ )
1329
+ for stage_name in PIPELINE_STAGE_ORDER
1330
+ }
1331
+ stage_results = execute_parallel_stage_group(
1332
+ stage_prompts,
1333
+ ad_text=ad_text,
1334
+ request_id=request_id,
1335
+ trace_metadata={"pass_number": pass_number, **(trace_metadata or {})},
1336
+ )
1337
 
1338
+ legal_basis_stage = stage_results.get("legal_basis") or {
1339
+ "stage": "legal_basis",
1340
+ "ok": False,
1341
+ "status": 500,
1342
+ "parsed_output": None,
1343
+ "raw_output": None,
1344
+ "error": "Legal basis stage missing.",
1345
+ }
1346
+ legal_basis_output = coerce_legal_basis_output(
1347
+ legal_basis_stage,
1348
+ ad_text=ad_text,
1349
+ image_at_path=image_at_path,
1350
+ )
1351
+
1352
+ module_stage_results: dict[str, dict[str, Any]] = {}
1353
+ module_outputs: dict[str, dict[str, Any]] = {}
1354
+ for module_name in REGULATOR_STAGE_ORDER:
1355
+ module_stage = stage_results.get(module_name) or {
1356
+ "stage": module_name,
1357
+ "ok": False,
1358
+ "status": 500,
1359
+ "parsed_output": None,
1360
+ "raw_output": None,
1361
+ "error": f"{module_name.upper()} stage missing.",
1362
+ }
1363
+ module_stage_results[module_name] = module_stage
1364
+ module_outputs[module_name] = coerce_module_output(module_name, module_stage)
1365
+
1366
+ validation_prompt = build_validation_prompt(
1367
  ad_text=ad_text,
1368
  extra_context=extra_context,
1369
  image_at_path=image_at_path,
1370
  system_prompt=system_prompt,
1371
+ pass_number=pass_number,
1372
+ legal_basis_output=legal_basis_output,
1373
+ module_outputs=module_outputs,
1374
+ prior_passes=passes,
1375
+ retry_context=retry_context,
1376
  request_id=request_id,
1377
  )
1378
+ validation_stage = run_named_stage(
1379
+ "validation",
1380
+ validation_prompt,
1381
  ad_text=ad_text,
1382
  request_id=request_id,
1383
+ trace_metadata={"pass_number": pass_number, **(trace_metadata or {})},
1384
+ )
1385
+ validation_output = coerce_validation_output(
1386
+ validation_stage,
1387
+ legal_basis_output=legal_basis_output,
1388
+ module_outputs=module_outputs,
1389
+ pass_number=pass_number,
1390
  )
 
 
1391
 
1392
+ pass_record = {
1393
+ "pass_number": pass_number,
1394
+ "parallel_stage_order": list(PIPELINE_STAGE_ORDER),
1395
+ "parallel_stages": {
1396
+ "legal_basis": {
1397
+ "stage": legal_basis_stage,
1398
+ "output": legal_basis_output,
1399
+ },
1400
+ **{
1401
+ module_name: {
1402
+ "stage": module_stage_results[module_name],
1403
+ "output": module_outputs[module_name],
1404
+ }
1405
+ for module_name in REGULATOR_STAGE_ORDER
1406
+ },
1407
+ },
1408
+ "validation": {
1409
+ "stage": validation_stage,
1410
+ "output": validation_output,
1411
+ },
1412
+ }
1413
+ passes.append(pass_record)
1414
+
1415
+ if validation_output.get("retry_required") and pass_number <= VALIDATION_RETRY_PASSES:
1416
+ retry_context = {
1417
+ "retry_reason": validation_output.get("retry_reason", ""),
1418
+ "retry_targets": validation_output.get("retry_targets", list(PIPELINE_STAGE_ORDER)),
1419
+ "retry_guidance": validation_output.get("retry_guidance", []),
1420
+ "prior_validation_output": validation_output,
1421
+ }
1422
+ continue
1423
+
1424
+ final_validation_output = validation_output
1425
+ break
1426
+
1427
+ if final_validation_output is None:
1428
+ final_validation_output = passes[-1]["validation"]["output"]
1429
 
1430
+ legacy_output = build_legacy_output(final_validation_output)
1431
  pipeline_output = {
1432
  "request_id": request_id,
1433
  "input_mode": infer_input_mode(ad_text, image_at_path),
1434
+ "parallel_stage_order": list(PIPELINE_STAGE_ORDER),
1435
+ "retry_performed": len(passes) > 1,
1436
+ "total_passes": len(passes),
1437
+ "passes": passes,
1438
+ "final_validation": final_validation_output,
 
 
 
 
 
 
 
 
 
 
 
1439
  "legacy_output": legacy_output,
1440
  }
1441
  outputs["pipeline_output"] = sanitize_for_langsmith(pipeline_output, ad_text=ad_text)