redhairedshanks1 commited on
Commit
dd0b012
·
1 Parent(s): 1eebd15

Update services/agent_crewai.py

Browse files
Files changed (1) hide show
  1. services/agent_crewai.py +555 -526
services/agent_crewai.py CHANGED
@@ -1,526 +1,555 @@
1
- # services/agent_crewai.py
2
- """
3
- CrewAI-based agent for MasterLLM orchestration.
4
- """
5
- import json
6
- import os
7
- from typing import Optional, Dict, Any, List, Generator
8
-
9
- from crewai import Agent, Task, Crew, Process
10
- from crewai.tools import BaseTool
11
- from pydantic import BaseModel, Field
12
-
13
- # Import your remote utilities
14
- from utilities.extract_text import extract_text_remote
15
- from utilities.extract_tables import extract_tables_remote
16
- from utilities.describe_images import describe_images_remote
17
- from utilities.summarizer import summarize_remote
18
- from utilities.classify import classify_remote
19
- from utilities.ner import ner_remote
20
- from utilities.translator import translate_remote
21
- from utilities.signature_verification import signature_verification_remote
22
- from utilities.stamp_detection import stamp_detection_remote
23
-
24
-
25
- # ========================
26
- # TOOL INPUT SCHEMAS
27
- # ========================
28
-
29
- class FileSpanInput(BaseModel):
30
- file_path: str = Field(..., description="Absolute/local path to the uploaded file")
31
- start_page: int = Field(1, description="Start page (1-indexed)")
32
- end_page: int = Field(1, description="End page (inclusive, 1-indexed)")
33
-
34
-
35
- class TextOrFileInput(BaseModel):
36
- text: Optional[str] = Field(None, description="Raw text to process")
37
- file_path: Optional[str] = Field(None, description="Path to a document on disk (PDF/Image)")
38
- start_page: int = Field(1, description="Start page (1-indexed)")
39
- end_page: int = Field(1, description="End page (inclusive, 1-indexed)")
40
-
41
-
42
- class TranslateInput(TextOrFileInput):
43
- target_lang: str = Field(..., description="Target language code or name (e.g., 'es' or 'Spanish')")
44
-
45
-
46
- # ========================
47
- # HELPER FUNCTIONS
48
- # ========================
49
-
50
- def _base_state(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
51
- """Build the base state your utilities expect."""
52
- filename = os.path.basename(file_path)
53
- return {
54
- "filename": filename,
55
- "temp_files": {filename: file_path},
56
- "start_page": start_page,
57
- "end_page": end_page,
58
- }
59
-
60
-
61
- # ========================
62
- # CREWAI TOOLS
63
- # ========================
64
-
65
- class ExtractTextTool(BaseTool):
66
- name: str = "extract_text"
67
- description: str = """Extract text from a document between start_page and end_page (inclusive).
68
- Use this when the user asks to read, analyze, or summarize document text.
69
- Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
70
-
71
- def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
72
- state = _base_state(file_path, start_page, end_page)
73
- out = extract_text_remote(state)
74
- text = out.get("text") or out.get("extracted_text") or ""
75
- return json.dumps({"text": text})
76
-
77
-
78
- class ExtractTablesTool(BaseTool):
79
- name: str = "extract_tables"
80
- description: str = """Extract tables from a document between start_page and end_page.
81
- Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
82
-
83
- def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
84
- state = _base_state(file_path, start_page, end_page)
85
- out = extract_tables_remote(state)
86
- tables = out.get("tables", [])
87
- return json.dumps({"tables": tables, "table_count": len(tables)})
88
-
89
-
90
- class DescribeImagesTool(BaseTool):
91
- name: str = "describe_images"
92
- description: str = """Generate captions/descriptions for images in the specified page range.
93
- Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
94
-
95
- def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
96
- state = _base_state(file_path, start_page, end_page)
97
- out = describe_images_remote(state)
98
- return json.dumps({"image_descriptions": out.get("image_descriptions", out)})
99
-
100
-
101
- class SummarizeTextTool(BaseTool):
102
- name: str = "summarize_text"
103
- description: str = """Summarize either raw text or a document (by file_path + optional page span).
104
- Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
105
- At least one of text or file_path must be provided."""
106
-
107
- def _run(
108
- self,
109
- text: Optional[str] = None,
110
- file_path: Optional[str] = None,
111
- start_page: int = 1,
112
- end_page: int = 1,
113
- ) -> str:
114
- state: Dict[str, Any] = {
115
- "text": text,
116
- "start_page": start_page,
117
- "end_page": end_page,
118
- }
119
- if file_path:
120
- state.update(_base_state(file_path, start_page, end_page))
121
- out = summarize_remote(state)
122
- return json.dumps({"summary": out.get("summary", out)})
123
-
124
-
125
- class ClassifyTextTool(BaseTool):
126
- name: str = "classify_text"
127
- description: str = """Classify a text or document content.
128
- Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
129
- At least one of text or file_path must be provided."""
130
-
131
- def _run(
132
- self,
133
- text: Optional[str] = None,
134
- file_path: Optional[str] = None,
135
- start_page: int = 1,
136
- end_page: int = 1,
137
- ) -> str:
138
- state: Dict[str, Any] = {
139
- "text": text,
140
- "start_page": start_page,
141
- "end_page": end_page,
142
- }
143
- if file_path:
144
- state.update(_base_state(file_path, start_page, end_page))
145
- out = classify_remote(state)
146
- return json.dumps({"classification": out.get("classification", out)})
147
-
148
-
149
- class ExtractEntitesTool(BaseTool):
150
- name: str = "extract_entities"
151
- description: str = """Perform Named Entity Recognition (NER) on text or a document.
152
- Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
153
- At least one of text or file_path must be provided."""
154
-
155
- def _run(
156
- self,
157
- text: Optional[str] = None,
158
- file_path: Optional[str] = None,
159
- start_page: int = 1,
160
- end_page: int = 1,
161
- ) -> str:
162
- state: Dict[str, Any] = {
163
- "text": text,
164
- "start_page": start_page,
165
- "end_page": end_page,
166
- }
167
- if file_path:
168
- state.update(_base_state(file_path, start_page, end_page))
169
- out = ner_remote(state)
170
- return json.dumps({"ner": out.get("ner", out)})
171
-
172
-
173
- class TranslateTextTool(BaseTool):
174
- name: str = "translate_text"
175
- description: str = """Translate text or a document to target_lang (e.g., 'es', 'fr', 'de', 'Spanish').
176
- Input should be a JSON object with: target_lang (required), text (optional), file_path (optional),
177
- start_page (default 1), end_page (default 1). At least one of text or file_path must be provided."""
178
-
179
- def _run(
180
- self,
181
- target_lang: str,
182
- text: Optional[str] = None,
183
- file_path: Optional[str] = None,
184
- start_page: int = 1,
185
- end_page: int = 1,
186
- ) -> str:
187
- state: Dict[str, Any] = {
188
- "text": text,
189
- "start_page": start_page,
190
- "end_page": end_page,
191
- "target_lang": target_lang,
192
- }
193
- if file_path:
194
- state.update(_base_state(file_path, start_page, end_page))
195
- out = translate_remote(state)
196
- return json.dumps({
197
- "translation": out.get("translation", out),
198
- "target_lang": target_lang
199
- })
200
-
201
-
202
- class SignatureVerificationTool(BaseTool):
203
- name: str = "signature_verification"
204
- description: str = """Verify signatures/stamps presence and authenticity indicators in specified page range.
205
- Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
206
-
207
- def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
208
- state = _base_state(file_path, start_page, end_page)
209
- out = signature_verification_remote(state)
210
- return json.dumps({"signature_verification": out.get("signature_verification", out)})
211
-
212
-
213
- class StampDetectionTool(BaseTool):
214
- name: str = "stamp_detection"
215
- description: str = """Detect stamps in a document in the specified page range.
216
- Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
217
-
218
- def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
219
- state = _base_state(file_path, start_page, end_page)
220
- out = stamp_detection_remote(state)
221
- return json.dumps({"stamp_detection": out.get("stamp_detection", out)})
222
-
223
-
224
- # ========================
225
- # TOOL REGISTRY
226
- # ========================
227
-
228
- def get_master_tools() -> List[BaseTool]:
229
- """Export all tools for CrewAI agent binding."""
230
- return [
231
- ExtractTextTool(),
232
- ExtractTablesTool(),
233
- DescribeImagesTool(),
234
- SummarizeTextTool(),
235
- ClassifyTextTool(),
236
- ExtractEntitesTool(),
237
- TranslateTextTool(),
238
- SignatureVerificationTool(),
239
- StampDetectionTool(),
240
- ]
241
-
242
-
243
- # ========================
244
- # AGENT CONFIGURATION
245
- # ========================
246
-
247
- SYSTEM_INSTRUCTIONS = """You are MasterLLM, a precise document processing agent.
248
-
249
- Your responsibilities:
250
- - Use tools for any action (extraction, tables, images, summarization, classification, NER, translation, signature verification, stamp detection).
251
- - If a tool requires file_path and the user didn't provide one, use the provided session_file_path.
252
- - Use page spans when relevant (start_page, end_page).
253
- - Combine results when needed (e.g., extract_text -> summarize_text; tables -> summarize_text).
254
- - If a PLAN is provided, follow it strictly unless it's impossible.
255
- - Keep outputs compact - do not include raw base64 or giant blobs.
256
- - Always return a final JSON result with:
257
- {
258
- "steps_executed": [...],
259
- "outputs": { ... },
260
- "errors": [],
261
- "meta": {
262
- "model": "crewai-gemini",
263
- "notes": "short note if needed"
264
- }
265
- }
266
- """
267
-
268
-
269
- def create_master_agent(session_file_path: str = "", plan_json: str = "{}") -> Agent:
270
- """Create the master document processing agent."""
271
- tools = get_master_tools()
272
-
273
- backstory = f"""{SYSTEM_INSTRUCTIONS}
274
-
275
- Current session file: {session_file_path}
276
- Execution plan: {plan_json}
277
- """
278
-
279
- # Use Google Gemini as the LLM
280
- # Free tier: 15 RPM, 1M TPM, 1500 RPD for gemini-1.5-flash
281
- # CrewAI supports Gemini via "gemini/model-name" format
282
- llm_model = os.getenv("CREWAI_LLM", "gemini/gemini-2.0-flash")
283
-
284
- agent = Agent(
285
- role="Document Processing Specialist",
286
- goal="Process documents according to the given plan using available tools, and return structured JSON results",
287
- backstory=backstory,
288
- tools=tools,
289
- verbose=True,
290
- allow_delegation=False,
291
- max_iter=12,
292
- llm=llm_model,
293
- )
294
-
295
- return agent
296
-
297
-
298
- def create_master_crew(
299
- user_input: str,
300
- session_file_path: str = "",
301
- plan: Optional[Dict[str, Any]] = None,
302
- ) -> Crew:
303
- """Create a crew with the master agent and a task based on user input."""
304
- plan_json = json.dumps(plan or {})
305
- agent = create_master_agent(session_file_path, plan_json)
306
-
307
- task_description = f"""
308
- Execute the following document processing request:
309
-
310
- User Request: {user_input}
311
-
312
- Session File Path: {session_file_path}
313
- Execution Plan: {plan_json}
314
-
315
- Instructions:
316
- 1. Follow the plan steps in order
317
- 2. Use the file path provided for all file-based operations
318
- 3. Combine results from multiple tools when appropriate
319
- 4. Return a comprehensive JSON result with all outputs
320
-
321
- Expected Output Format:
322
- {{
323
- "steps_executed": ["step1", "step2", ...],
324
- "outputs": {{
325
- "text": "...",
326
- "tables": [...],
327
- "summary": "...",
328
- // other outputs based on what was executed
329
- }},
330
- "errors": [],
331
- "meta": {{
332
- "model": "crewai-gemini",
333
- "pipeline": "{plan.get('pipeline', '') if plan else ''}",
334
- "pages_processed": "{plan.get('start_page', 1)}-{plan.get('end_page', 1) if plan else '1-1'}"
335
- }}
336
- }}
337
- """
338
-
339
- task = Task(
340
- description=task_description,
341
- expected_output="A JSON object containing all processed results, executed steps, and any errors",
342
- agent=agent,
343
- )
344
-
345
- crew = Crew(
346
- agents=[agent],
347
- tasks=[task],
348
- process=Process.sequential,
349
- verbose=True,
350
- )
351
-
352
- return crew
353
-
354
-
355
- # ========================
356
- # MAIN ENTRY POINTS
357
- # ========================
358
-
359
- def run_agent(
360
- user_input: str,
361
- session_file_path: Optional[str] = None,
362
- plan: Optional[Dict[str, Any]] = None,
363
- chat_history: Optional[List[Any]] = None,
364
- ) -> Dict[str, Any]:
365
- """
366
- Invokes the CrewAI agent to process the document.
367
- Returns a dict with the processing results.
368
- """
369
- crew = create_master_crew(
370
- user_input=user_input,
371
- session_file_path=session_file_path or "",
372
- plan=plan,
373
- )
374
-
375
- result = crew.kickoff()
376
-
377
- # Parse the result - CrewAI returns a CrewOutput object
378
- try:
379
- if hasattr(result, 'raw'):
380
- raw_output = result.raw
381
- else:
382
- raw_output = str(result)
383
-
384
- # Try to parse as JSON
385
- try:
386
- parsed = json.loads(raw_output)
387
- return {"output": parsed}
388
- except json.JSONDecodeError:
389
- # Try to extract JSON from the response
390
- import re
391
- json_match = re.search(r'\{.*\}', raw_output, re.DOTALL)
392
- if json_match:
393
- try:
394
- parsed = json.loads(json_match.group())
395
- return {"output": parsed}
396
- except json.JSONDecodeError:
397
- pass
398
-
399
- # Return as-is if not JSON
400
- return {"output": {"result": raw_output, "format": "text"}}
401
- except Exception as e:
402
- return {"output": {"error": str(e), "raw_result": str(result)}}
403
-
404
-
405
- def run_agent_streaming(
406
- user_input: str,
407
- session_file_path: Optional[str] = None,
408
- plan: Optional[Dict[str, Any]] = None,
409
- chat_history: Optional[List[Any]] = None,
410
- ) -> Generator[Dict[str, Any], None, None]:
411
- """
412
- Streaming version of run_agent that yields intermediate step updates.
413
- Each yield contains: {"type": "step"|"final", "data": {...}}
414
-
415
- Note: CrewAI doesn't have native streaming like LangChain's AgentExecutor,
416
- so we simulate it by yielding progress updates and then the final result.
417
- """
418
- import threading
419
- import queue
420
- import time
421
-
422
- result_queue: queue.Queue = queue.Queue()
423
-
424
- # Yield initial status
425
- yield {
426
- "type": "step",
427
- "step": 0,
428
- "status": "initializing",
429
- "tool": "crew_setup",
430
- "input_preview": f"Setting up pipeline: {plan.get('pipeline', 'unknown') if plan else 'unknown'}"
431
- }
432
-
433
- def run_crew():
434
- try:
435
- crew = create_master_crew(
436
- user_input=user_input,
437
- session_file_path=session_file_path or "",
438
- plan=plan,
439
- )
440
- result = crew.kickoff()
441
- result_queue.put(("success", result))
442
- except Exception as e:
443
- result_queue.put(("error", str(e)))
444
-
445
- # Start crew execution in a separate thread
446
- thread = threading.Thread(target=run_crew)
447
- thread.start()
448
-
449
- # Yield progress updates while waiting
450
- step_count = 1
451
- pipeline_steps = plan.get("pipeline", "").split("-") if plan else []
452
-
453
- for step_name in pipeline_steps:
454
- yield {
455
- "type": "step",
456
- "step": step_count,
457
- "status": "executing",
458
- "tool": step_name,
459
- "input_preview": f"Processing: {step_name}"
460
- }
461
- step_count += 1
462
-
463
- # Check if result is ready
464
- try:
465
- result_type, result_data = result_queue.get(timeout=2.0)
466
- break
467
- except queue.Empty:
468
- continue
469
-
470
- # Wait for completion if not already done
471
- thread.join(timeout=120) # Max 2 minutes timeout
472
-
473
- # Get final result
474
- try:
475
- if result_queue.empty():
476
- yield {
477
- "type": "error",
478
- "error": "Execution timeout - crew did not complete in time"
479
- }
480
- return
481
-
482
- result_type, result_data = result_queue.get_nowait()
483
-
484
- if result_type == "error":
485
- yield {
486
- "type": "error",
487
- "error": result_data
488
- }
489
- return
490
-
491
- # Parse the result
492
- try:
493
- if hasattr(result_data, 'raw'):
494
- raw_output = result_data.raw
495
- else:
496
- raw_output = str(result_data)
497
-
498
- # Try to parse as JSON
499
- try:
500
- parsed = json.loads(raw_output)
501
- except json.JSONDecodeError:
502
- import re
503
- json_match = re.search(r'\{.*\}', raw_output, re.DOTALL)
504
- if json_match:
505
- try:
506
- parsed = json.loads(json_match.group())
507
- except json.JSONDecodeError:
508
- parsed = {"result": raw_output, "format": "text"}
509
- else:
510
- parsed = {"result": raw_output, "format": "text"}
511
-
512
- yield {
513
- "type": "final",
514
- "data": parsed
515
- }
516
- except Exception as e:
517
- yield {
518
- "type": "final",
519
- "data": {"error": str(e), "raw_result": str(result_data)}
520
- }
521
-
522
- except queue.Empty:
523
- yield {
524
- "type": "error",
525
- "error": "No result received from crew execution"
526
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # services/agent_crewai.py
2
+ """
3
+ CrewAI-based agent for MasterLLM orchestration.
4
+ """
5
+ import json
6
+ import os
7
+ from typing import Optional, Dict, Any, List, Generator
8
+
9
+ from crewai import Agent, Task, Crew, Process
10
+ from crewai.tools import BaseTool
11
+ from pydantic import BaseModel, Field
12
+
13
+ # Import your remote utilities
14
+ from utilities.extract_text import extract_text_remote
15
+ from utilities.extract_tables import extract_tables_remote
16
+ from utilities.describe_images import describe_images_remote
17
+ from utilities.summarizer import summarize_remote
18
+ from utilities.classify import classify_remote
19
+ from utilities.ner import ner_remote
20
+ from utilities.translator import translate_remote
21
+ from utilities.signature_verification import signature_verification_remote
22
+ from utilities.stamp_detection import stamp_detection_remote
23
+
24
+
25
+ # ========================
26
+ # TOOL INPUT SCHEMAS
27
+ # ========================
28
+
29
+ class FileSpanInput(BaseModel):
30
+ file_path: str = Field(..., description="Absolute/local path to the uploaded file")
31
+ start_page: int = Field(1, description="Start page (1-indexed)")
32
+ end_page: int = Field(1, description="End page (inclusive, 1-indexed)")
33
+
34
+
35
+ class TextOrFileInput(BaseModel):
36
+ text: Optional[str] = Field(None, description="Raw text to process")
37
+ file_path: Optional[str] = Field(None, description="Path to a document on disk (PDF/Image)")
38
+ start_page: int = Field(1, description="Start page (1-indexed)")
39
+ end_page: int = Field(1, description="End page (inclusive, 1-indexed)")
40
+
41
+
42
+ class TranslateInput(TextOrFileInput):
43
+ target_lang: str = Field(..., description="Target language code or name (e.g., 'es' or 'Spanish')")
44
+
45
+
46
+ # ========================
47
+ # HELPER FUNCTIONS
48
+ # ========================
49
+
50
+ def _base_state(file_path: str, start_page: int = 1, end_page: int = 1) -> Dict[str, Any]:
51
+ """Build the base state your utilities expect."""
52
+ filename = os.path.basename(file_path)
53
+ return {
54
+ "filename": filename,
55
+ "temp_files": {filename: file_path},
56
+ "start_page": start_page,
57
+ "end_page": end_page,
58
+ }
59
+
60
+
61
+ # ========================
62
+ # CREWAI TOOLS
63
+ # ========================
64
+
65
+ class ExtractTextTool(BaseTool):
66
+ name: str = "extract_text"
67
+ description: str = """Extract text from a document between start_page and end_page (inclusive).
68
+ Use this when the user asks to read, analyze, or summarize document text.
69
+ Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
70
+
71
+ def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
72
+ state = _base_state(file_path, start_page, end_page)
73
+ out = extract_text_remote(state)
74
+ text = out.get("text") or out.get("extracted_text") or ""
75
+ return json.dumps({"text": text})
76
+
77
+
78
+ class ExtractTablesTool(BaseTool):
79
+ name: str = "extract_tables"
80
+ description: str = """Extract tables from a document between start_page and end_page.
81
+ Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
82
+
83
+ def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
84
+ state = _base_state(file_path, start_page, end_page)
85
+ out = extract_tables_remote(state)
86
+ tables = out.get("tables", [])
87
+ return json.dumps({"tables": tables, "table_count": len(tables)})
88
+
89
+
90
+ class DescribeImagesTool(BaseTool):
91
+ name: str = "describe_images"
92
+ description: str = """Generate captions/descriptions for images in the specified page range.
93
+ Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
94
+
95
+ def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
96
+ state = _base_state(file_path, start_page, end_page)
97
+ out = describe_images_remote(state)
98
+ return json.dumps({"image_descriptions": out.get("image_descriptions", out)})
99
+
100
+
101
+ class SummarizeTextTool(BaseTool):
102
+ name: str = "summarize_text"
103
+ description: str = """Summarize either raw text or a document (by file_path + optional page span).
104
+ Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
105
+ At least one of text or file_path must be provided."""
106
+
107
+ def _run(
108
+ self,
109
+ text: Optional[str] = None,
110
+ file_path: Optional[str] = None,
111
+ start_page: int = 1,
112
+ end_page: int = 1,
113
+ ) -> str:
114
+ state: Dict[str, Any] = {
115
+ "text": text,
116
+ "start_page": start_page,
117
+ "end_page": end_page,
118
+ }
119
+ if file_path:
120
+ state.update(_base_state(file_path, start_page, end_page))
121
+ out = summarize_remote(state)
122
+ return json.dumps({"summary": out.get("summary", out)})
123
+
124
+
125
+ class ClassifyTextTool(BaseTool):
126
+ name: str = "classify_text"
127
+ description: str = """Classify a text or document content.
128
+ Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
129
+ At least one of text or file_path must be provided."""
130
+
131
+ def _run(
132
+ self,
133
+ text: Optional[str] = None,
134
+ file_path: Optional[str] = None,
135
+ start_page: int = 1,
136
+ end_page: int = 1,
137
+ ) -> str:
138
+ state: Dict[str, Any] = {
139
+ "text": text,
140
+ "start_page": start_page,
141
+ "end_page": end_page,
142
+ }
143
+ if file_path:
144
+ state.update(_base_state(file_path, start_page, end_page))
145
+ out = classify_remote(state)
146
+ return json.dumps({"classification": out.get("classification", out)})
147
+
148
+
149
+ class ExtractEntitesTool(BaseTool):
150
+ name: str = "extract_entities"
151
+ description: str = """Perform Named Entity Recognition (NER) on text or a document.
152
+ Input should be a JSON object with: text (optional), file_path (optional), start_page (default 1), end_page (default 1).
153
+ At least one of text or file_path must be provided."""
154
+
155
+ def _run(
156
+ self,
157
+ text: Optional[str] = None,
158
+ file_path: Optional[str] = None,
159
+ start_page: int = 1,
160
+ end_page: int = 1,
161
+ ) -> str:
162
+ state: Dict[str, Any] = {
163
+ "text": text,
164
+ "start_page": start_page,
165
+ "end_page": end_page,
166
+ }
167
+ if file_path:
168
+ state.update(_base_state(file_path, start_page, end_page))
169
+ out = ner_remote(state)
170
+ return json.dumps({"ner": out.get("ner", out)})
171
+
172
+
173
+ class TranslateTextTool(BaseTool):
174
+ name: str = "translate_text"
175
+ description: str = """Translate text or a document to target_lang (e.g., 'es', 'fr', 'de', 'Spanish').
176
+ Input should be a JSON object with: target_lang (required), text (optional), file_path (optional),
177
+ start_page (default 1), end_page (default 1). At least one of text or file_path must be provided."""
178
+
179
+ def _run(
180
+ self,
181
+ target_lang: str,
182
+ text: Optional[str] = None,
183
+ file_path: Optional[str] = None,
184
+ start_page: int = 1,
185
+ end_page: int = 1,
186
+ ) -> str:
187
+ state: Dict[str, Any] = {
188
+ "text": text,
189
+ "start_page": start_page,
190
+ "end_page": end_page,
191
+ "target_lang": target_lang,
192
+ }
193
+ if file_path:
194
+ state.update(_base_state(file_path, start_page, end_page))
195
+ out = translate_remote(state)
196
+ return json.dumps({
197
+ "translation": out.get("translation", out),
198
+ "target_lang": target_lang
199
+ })
200
+
201
+
202
+ class SignatureVerificationTool(BaseTool):
203
+ name: str = "signature_verification"
204
+ description: str = """Verify signatures/stamps presence and authenticity indicators in specified page range.
205
+ Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
206
+
207
+ def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
208
+ state = _base_state(file_path, start_page, end_page)
209
+ out = signature_verification_remote(state)
210
+ return json.dumps({"signature_verification": out.get("signature_verification", out)})
211
+
212
+
213
+ class StampDetectionTool(BaseTool):
214
+ name: str = "stamp_detection"
215
+ description: str = """Detect stamps in a document in the specified page range.
216
+ Input should be a JSON object with: file_path (required), start_page (default 1), end_page (default 1)."""
217
+
218
+ def _run(self, file_path: str, start_page: int = 1, end_page: int = 1) -> str:
219
+ state = _base_state(file_path, start_page, end_page)
220
+ out = stamp_detection_remote(state)
221
+ return json.dumps({"stamp_detection": out.get("stamp_detection", out)})
222
+
223
+
224
+ # ========================
225
+ # TOOL REGISTRY
226
+ # ========================
227
+
228
+ def get_master_tools() -> List[BaseTool]:
229
+ """Export all tools for CrewAI agent binding."""
230
+ return [
231
+ ExtractTextTool(),
232
+ ExtractTablesTool(),
233
+ DescribeImagesTool(),
234
+ SummarizeTextTool(),
235
+ ClassifyTextTool(),
236
+ ExtractEntitesTool(),
237
+ TranslateTextTool(),
238
+ SignatureVerificationTool(),
239
+ StampDetectionTool(),
240
+ ]
241
+
242
+
243
+ # ========================
244
+ # AGENT CONFIGURATION
245
+ # ========================
246
+
247
+ SYSTEM_INSTRUCTIONS = """You are MasterLLM, a precise document processing agent.
248
+
249
+ Your responsibilities:
250
+ - Use tools for any action (extraction, tables, images, summarization, classification, NER, translation, signature verification, stamp detection).
251
+ - If a tool requires file_path and the user didn't provide one, use the provided session_file_path.
252
+ - Use page spans when relevant (start_page, end_page).
253
+ - Combine results when needed (e.g., extract_text -> summarize_text; tables -> summarize_text).
254
+ - If a PLAN is provided, follow it strictly unless it's impossible.
255
+ - Keep outputs compact - do not include raw base64 or giant blobs.
256
+ - Always return a final JSON result with:
257
+ {
258
+ "steps_executed": [...],
259
+ "outputs": { ... },
260
+ "errors": [],
261
+ "meta": {
262
+ "model": "crewai-gemini",
263
+ "notes": "short note if needed"
264
+ }
265
+ }
266
+ """
267
+
268
+
269
+ def create_master_agent(session_file_path: str = "", plan_json: str = "{}") -> Agent:
270
+ """Create the master document processing agent."""
271
+ tools = get_master_tools()
272
+
273
+ backstory = f"""{SYSTEM_INSTRUCTIONS}
274
+
275
+ Current session file: {session_file_path}
276
+ Execution plan: {plan_json}
277
+ """
278
+
279
+ # Initialize LLM explicitly using LiteLLM wrapper
280
+ # This works even if google-generativeai is not installed
281
+ try:
282
+ from litellm import completion
283
+ from langchain.llms.base import LLM
284
+ from typing import Any, List, Optional
285
+
286
+ class LiteLLMWrapper(LLM):
287
+ """Wrapper for LiteLLM to use with CrewAI"""
288
+ model_name: str = "gemini/gemini-2.0-flash"
289
+
290
+ @property
291
+ def _llm_type(self) -> str:
292
+ return "litellm"
293
+
294
+ def _call(
295
+ self,
296
+ prompt: str,
297
+ stop: Optional[List[str]] = None,
298
+ **kwargs: Any,
299
+ ) -> str:
300
+ response = completion(
301
+ model=self.model_name,
302
+ messages=[{"role": "user", "content": prompt}],
303
+ api_key=os.getenv("GOOGLE_API_KEY") or os.getenv("GEMINI_API_KEY"),
304
+ )
305
+ return response.choices[0].message.content
306
+
307
+ llm = LiteLLMWrapper()
308
+ except Exception as e:
309
+ print(f"Warning: Could not initialize LiteLLM wrapper: {e}")
310
+ print("Falling back to string-based LLM (may require google-generativeai)")
311
+ llm = os.getenv("CREWAI_LLM", "gemini/gemini-2.0-flash")
312
+
313
+ agent = Agent(
314
+ role="Document Processing Specialist",
315
+ goal="Process documents according to the given plan using available tools, and return structured JSON results",
316
+ backstory=backstory,
317
+ tools=tools,
318
+ verbose=True,
319
+ allow_delegation=False,
320
+ max_iter=12,
321
+ llm=llm,
322
+ )
323
+
324
+ return agent
325
+
326
+
327
+ def create_master_crew(
328
+ user_input: str,
329
+ session_file_path: str = "",
330
+ plan: Optional[Dict[str, Any]] = None,
331
+ ) -> Crew:
332
+ """Create a crew with the master agent and a task based on user input."""
333
+ plan_json = json.dumps(plan or {})
334
+ agent = create_master_agent(session_file_path, plan_json)
335
+
336
+ task_description = f"""
337
+ Execute the following document processing request:
338
+
339
+ User Request: {user_input}
340
+
341
+ Session File Path: {session_file_path}
342
+ Execution Plan: {plan_json}
343
+
344
+ Instructions:
345
+ 1. Follow the plan steps in order
346
+ 2. Use the file path provided for all file-based operations
347
+ 3. Combine results from multiple tools when appropriate
348
+ 4. Return a comprehensive JSON result with all outputs
349
+
350
+ Expected Output Format:
351
+ {{
352
+ "steps_executed": ["step1", "step2", ...],
353
+ "outputs": {{
354
+ "text": "...",
355
+ "tables": [...],
356
+ "summary": "...",
357
+ // other outputs based on what was executed
358
+ }},
359
+ "errors": [],
360
+ "meta": {{
361
+ "model": "crewai-gemini",
362
+ "pipeline": "{plan.get('pipeline', '') if plan else ''}",
363
+ "pages_processed": "{plan.get('start_page', 1)}-{plan.get('end_page', 1) if plan else '1-1'}"
364
+ }}
365
+ }}
366
+ """
367
+
368
+ task = Task(
369
+ description=task_description,
370
+ expected_output="A JSON object containing all processed results, executed steps, and any errors",
371
+ agent=agent,
372
+ )
373
+
374
+ crew = Crew(
375
+ agents=[agent],
376
+ tasks=[task],
377
+ process=Process.sequential,
378
+ verbose=True,
379
+ )
380
+
381
+ return crew
382
+
383
+
384
+ # ========================
385
+ # MAIN ENTRY POINTS
386
+ # ========================
387
+
388
+ def run_agent(
389
+ user_input: str,
390
+ session_file_path: Optional[str] = None,
391
+ plan: Optional[Dict[str, Any]] = None,
392
+ chat_history: Optional[List[Any]] = None,
393
+ ) -> Dict[str, Any]:
394
+ """
395
+ Invokes the CrewAI agent to process the document.
396
+ Returns a dict with the processing results.
397
+ """
398
+ crew = create_master_crew(
399
+ user_input=user_input,
400
+ session_file_path=session_file_path or "",
401
+ plan=plan,
402
+ )
403
+
404
+ result = crew.kickoff()
405
+
406
+ # Parse the result - CrewAI returns a CrewOutput object
407
+ try:
408
+ if hasattr(result, 'raw'):
409
+ raw_output = result.raw
410
+ else:
411
+ raw_output = str(result)
412
+
413
+ # Try to parse as JSON
414
+ try:
415
+ parsed = json.loads(raw_output)
416
+ return {"output": parsed}
417
+ except json.JSONDecodeError:
418
+ # Try to extract JSON from the response
419
+ import re
420
+ json_match = re.search(r'\{.*\}', raw_output, re.DOTALL)
421
+ if json_match:
422
+ try:
423
+ parsed = json.loads(json_match.group())
424
+ return {"output": parsed}
425
+ except json.JSONDecodeError:
426
+ pass
427
+
428
+ # Return as-is if not JSON
429
+ return {"output": {"result": raw_output, "format": "text"}}
430
+ except Exception as e:
431
+ return {"output": {"error": str(e), "raw_result": str(result)}}
432
+
433
+
434
+ def run_agent_streaming(
435
+ user_input: str,
436
+ session_file_path: Optional[str] = None,
437
+ plan: Optional[Dict[str, Any]] = None,
438
+ chat_history: Optional[List[Any]] = None,
439
+ ) -> Generator[Dict[str, Any], None, None]:
440
+ """
441
+ Streaming version of run_agent that yields intermediate step updates.
442
+ Each yield contains: {"type": "step"|"final", "data": {...}}
443
+
444
+ Note: CrewAI doesn't have native streaming like LangChain's AgentExecutor,
445
+ so we simulate it by yielding progress updates and then the final result.
446
+ """
447
+ import threading
448
+ import queue
449
+ import time
450
+
451
+ result_queue: queue.Queue = queue.Queue()
452
+
453
+ # Yield initial status
454
+ yield {
455
+ "type": "step",
456
+ "step": 0,
457
+ "status": "initializing",
458
+ "tool": "crew_setup",
459
+ "input_preview": f"Setting up pipeline: {plan.get('pipeline', 'unknown') if plan else 'unknown'}"
460
+ }
461
+
462
+ def run_crew():
463
+ try:
464
+ crew = create_master_crew(
465
+ user_input=user_input,
466
+ session_file_path=session_file_path or "",
467
+ plan=plan,
468
+ )
469
+ result = crew.kickoff()
470
+ result_queue.put(("success", result))
471
+ except Exception as e:
472
+ result_queue.put(("error", str(e)))
473
+
474
+ # Start crew execution in a separate thread
475
+ thread = threading.Thread(target=run_crew)
476
+ thread.start()
477
+
478
+ # Yield progress updates while waiting
479
+ step_count = 1
480
+ pipeline_steps = plan.get("pipeline", "").split("-") if plan else []
481
+
482
+ for step_name in pipeline_steps:
483
+ yield {
484
+ "type": "step",
485
+ "step": step_count,
486
+ "status": "executing",
487
+ "tool": step_name,
488
+ "input_preview": f"Processing: {step_name}"
489
+ }
490
+ step_count += 1
491
+
492
+ # Check if result is ready
493
+ try:
494
+ result_type, result_data = result_queue.get(timeout=2.0)
495
+ break
496
+ except queue.Empty:
497
+ continue
498
+
499
+ # Wait for completion if not already done
500
+ thread.join(timeout=120) # Max 2 minutes timeout
501
+
502
+ # Get final result
503
+ try:
504
+ if result_queue.empty():
505
+ yield {
506
+ "type": "error",
507
+ "error": "Execution timeout - crew did not complete in time"
508
+ }
509
+ return
510
+
511
+ result_type, result_data = result_queue.get_nowait()
512
+
513
+ if result_type == "error":
514
+ yield {
515
+ "type": "error",
516
+ "error": result_data
517
+ }
518
+ return
519
+
520
+ # Parse the result
521
+ try:
522
+ if hasattr(result_data, 'raw'):
523
+ raw_output = result_data.raw
524
+ else:
525
+ raw_output = str(result_data)
526
+
527
+ # Try to parse as JSON
528
+ try:
529
+ parsed = json.loads(raw_output)
530
+ except json.JSONDecodeError:
531
+ import re
532
+ json_match = re.search(r'\{.*\}', raw_output, re.DOTALL)
533
+ if json_match:
534
+ try:
535
+ parsed = json.loads(json_match.group())
536
+ except json.JSONDecodeError:
537
+ parsed = {"result": raw_output, "format": "text"}
538
+ else:
539
+ parsed = {"result": raw_output, "format": "text"}
540
+
541
+ yield {
542
+ "type": "final",
543
+ "data": parsed
544
+ }
545
+ except Exception as e:
546
+ yield {
547
+ "type": "final",
548
+ "data": {"error": str(e), "raw_result": str(result_data)}
549
+ }
550
+
551
+ except queue.Empty:
552
+ yield {
553
+ "type": "error",
554
+ "error": "No result received from crew execution"
555
+ }