File size: 22,791 Bytes
8a8a771
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
import requests
import json
import time
from typing import Dict, List, Optional, Tuple

class GraniteAgent:
    def __init__(self, config):
        self.config = config
        self.base_url = config.OLLAMA_BASE_URL
        self.model = config.GRANITE_MODEL
        self.max_retries = 3
        self.retry_delay = 2  # seconds
        
    def assign_wellness_tasks(self, user_profile: Dict, mental_health_assessment: str, risk_level: int) -> List[Dict]:
        """Assign wellness tasks based on mental health assessment using Granite AI"""
        
        # First attempt: Comprehensive AI generation
        tasks = self._generate_ai_tasks(user_profile, mental_health_assessment, risk_level)
        
        if tasks:
            return tasks
            
        # Second attempt: Simplified AI generation with retry
        tasks = self._generate_simplified_tasks(user_profile, risk_level)
        
        if tasks:
            return tasks
            
        # Final fallback: Basic AI generation with minimal context
        tasks = self._generate_basic_tasks(risk_level)
        
        if tasks:
            return tasks
            
        # Emergency fallback: Only if AI is completely unavailable
        print("WARNING: Granite AI completely unavailable, using emergency preset tasks")
        return self._get_emergency_preset_tasks(risk_level)
    
    def _generate_ai_tasks(self, user_profile: Dict, mental_health_assessment: str, risk_level: int) -> Optional[List[Dict]]:
        """Primary method: Generate comprehensive personalized tasks using Granite AI"""
        
        prompt = self._build_comprehensive_prompt(user_profile, mental_health_assessment, risk_level)
        
        for attempt in range(self.max_retries):
            try:
                response = self._call_granite_api(prompt, temperature=0.5)
                
                if response:
                    tasks = self._extract_and_validate_tasks(response, risk_level)
                    if tasks and len(tasks) >= 3:  # Ensure we have enough tasks
                        print(f"βœ“ Generated {len(tasks)} comprehensive AI tasks")
                        return tasks
                        
            except Exception as e:
                print(f"Attempt {attempt + 1} failed for comprehensive generation: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        return None
    
    def _generate_simplified_tasks(self, user_profile: Dict, risk_level: int) -> Optional[List[Dict]]:
        """Fallback method: Generate simplified tasks with reduced context"""
        
        prompt = self._build_simplified_prompt(user_profile, risk_level)
        
        for attempt in range(self.max_retries):
            try:
                response = self._call_granite_api(prompt, temperature=0.4)
                
                if response:
                    tasks = self._extract_and_validate_tasks(response, risk_level)
                    if tasks and len(tasks) >= 2:
                        print(f"βœ“ Generated {len(tasks)} simplified AI tasks")
                        return tasks
                        
            except Exception as e:
                print(f"Attempt {attempt + 1} failed for simplified generation: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        return None
    
    def _generate_basic_tasks(self, risk_level: int) -> Optional[List[Dict]]:
        """Secondary fallback: Generate basic tasks with minimal context"""
        
        prompt = self._build_basic_prompt(risk_level)
        
        for attempt in range(self.max_retries):
            try:
                response = self._call_granite_api(prompt, temperature=0.3)
                
                if response:
                    tasks = self._extract_and_validate_tasks(response, risk_level, min_tasks=1)
                    if tasks:
                        print(f"βœ“ Generated {len(tasks)} basic AI tasks")
                        return tasks
                        
            except Exception as e:
                print(f"Attempt {attempt + 1} failed for basic generation: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.retry_delay)
        
        return None
    
    def _build_comprehensive_prompt(self, user_profile: Dict, mental_health_assessment: str, risk_level: int) -> str:
        """Build detailed prompt for comprehensive task generation"""
        
        return f"""

You are a professional wellness coach. Create personalized wellness tasks based on user data.



GUIDELINES:

- Use professional, clear language

- Focus on evidence-based wellness practices

- Ensure all recommendations are safe and appropriate

- Use inclusive, respectful language



USER CONTEXT:

Risk Level: {risk_level}/10 (10 = highest risk)

Assessment: {mental_health_assessment}



USER PROFILE:

- Stress Level: {user_profile.get('Stress_Level', 'Not specified')}

- Sleep: {user_profile.get('Sleep_Hours', 'Not specified')} hours/night, quality: {user_profile.get('Sleep_Quality', 'Not specified')}

- Work: {user_profile.get('Work_Hours', 'Not specified')} hours/week

- Exercise: {user_profile.get('Physical_Activity_Hours', 'Not specified')} hours/week

- Occupation: {user_profile.get('Occupation', 'Not specified')}

- Age: {user_profile.get('Age', 'Not specified')}

- Mood: {user_profile.get('Mood', 'Not specified')}

- Anxiety: {user_profile.get('Anxiety_Frequency', 'Not specified')}

- Energy: {user_profile.get('Energy_Level', 'Not specified')}



TASK REQUIREMENTS:

{self._get_risk_specific_requirements(risk_level)}



Generate 4-6 personalized wellness tasks. Each task must be:

1. Relevant to their specific situation

2. Practical and achievable

3. Evidence-based for mental health improvement

4. Appropriate for their risk level

5. Include clear, actionable instructions



MANDATORY JSON FORMAT (return ONLY valid JSON):

[

    {{

        "task_type": "select from: meditation, exercise, sleep_schedule, social_connection, journaling, breathing_exercise, nature_walk, healthy_meal, screen_break, gratitude_practice, professional_help, mindfulness, stress_management, routine_building, creative_activity, relaxation_technique",

        "title": "Clear, engaging title",

        "description": "Brief description explaining the benefits",

        "duration_days": appropriate_number,

        "difficulty": "easy/medium/hard",

        "instructions": "Step-by-step instructions tailored to their profile",

        "completion_criteria": "Clear, measurable success criteria",

        "personalization_notes": "Why this task fits their specific situation"

    }}

]

"""
    
    def _build_simplified_prompt(self, user_profile: Dict, risk_level: int) -> str:
        """Build simplified prompt when comprehensive generation fails"""
        
        key_factors = []
        if user_profile.get('Stress_Level'):
            key_factors.append(f"Stress: {user_profile['Stress_Level']}")
        if user_profile.get('Sleep_Hours'):
            key_factors.append(f"Sleep: {user_profile['Sleep_Hours']}h")
        if user_profile.get('Work_Hours'):
            key_factors.append(f"Work: {user_profile['Work_Hours']}h/week")
        
        factors_str = ", ".join(key_factors) if key_factors else "Limited profile data"
        
        return f"""

You are a wellness coach. Create practical wellness tasks.



USER: {factors_str}

Risk Level: {risk_level}/10



{self._get_risk_specific_requirements(risk_level)}



Generate 3-4 practical wellness tasks as JSON array:

[

    {{

        "task_type": "meditation|exercise|sleep_schedule|journaling|breathing_exercise|professional_help|stress_management",

        "title": "Clear task title",

        "description": "Brief helpful description",

        "duration_days": number,

        "difficulty": "easy|medium|hard",

        "instructions": "Step-by-step instructions",

        "completion_criteria": "How to measure success"

    }}

]

"""
    
    def _build_basic_prompt(self, risk_level: int) -> str:
        """Build minimal prompt for basic task generation"""
        
        return f"""

You are a wellness coach. Generate wellness tasks for risk level {risk_level}/10.



{self._get_risk_specific_requirements(risk_level)}



Return 2-3 tasks as JSON:

[

    {{

        "task_type": "breathing_exercise|meditation|professional_help|journaling",

        "title": "Task title",

        "description": "What this helps with",

        "duration_days": 1-7,

        "difficulty": "easy|medium",

        "instructions": "Clear instructions",

        "completion_criteria": "Success measure"

    }}

]

"""
    
    def _get_risk_specific_requirements(self, risk_level: int) -> str:
        """Get specific requirements based on risk level"""
        
        if risk_level >= 8:
            return """

CRITICAL PRIORITY:

- MUST include immediate professional help seeking

- Focus on crisis intervention and safety

- Include emergency resources and contacts

- Tasks should provide immediate coping mechanisms

- Maximum task duration: 1-2 days

"""
        elif risk_level >= 6:
            return """

HIGH PRIORITY:

- Strongly recommend professional consultation within 1 week

- Include daily anxiety/stress management techniques

- Focus on stabilization and routine building

- Provide structured, manageable activities

- Task duration: 2-7 days

"""
        elif risk_level >= 4:
            return """

MODERATE PRIORITY:

- Include both self-care and gradual improvement activities

- Balance mental and physical wellness approaches

- Encourage social connection and support

- Build sustainable, healthy habits

- Task duration: 5-14 days

"""
        else:
            return """

MAINTENANCE/PREVENTION:

- Focus on wellness enhancement and prevention

- Include enjoyable, engaging activities

- Support long-term habit building

- Promote overall life satisfaction

- Task duration: 7-21 days

"""
    
    def _extract_and_validate_tasks(self, response: str, risk_level: int, min_tasks: int = 2) -> Optional[List[Dict]]:
        """Extract and validate tasks from AI response"""
        
        try:
            # Clean the response
            cleaned_response = self._clean_json_string(response)
            if not cleaned_response:
                return None
            
            # Try to parse JSON
            try:
                tasks = json.loads(cleaned_response)
            except json.JSONDecodeError:
                # Try to extract individual task objects
                tasks = self._extract_individual_tasks(response)
                if not tasks:
                    return None
            
            # Validate task structure
            validated_tasks = self._validate_task_structure(tasks, risk_level)
            
            if validated_tasks and len(validated_tasks) >= min_tasks:
                return validated_tasks
            
        except Exception as e:
            print(f"Error extracting tasks: {e}")
        
        return None
    
    def _clean_json_string(self, json_str: str) -> Optional[str]:
        """Clean malformed JSON string"""
        try:
            # Remove common issues
            cleaned = json_str.strip()
            
            # Find JSON array boundaries
            start_idx = cleaned.find('[')
            end_idx = cleaned.rfind(']')
            
            if start_idx == -1 or end_idx == -1:
                return None
            
            cleaned = cleaned[start_idx:end_idx + 1]
            
            # Remove newlines and extra whitespace
            cleaned = cleaned.replace('\n', ' ').replace('\r', '')
            cleaned = ' '.join(cleaned.split())
            
            # Fix common JSON issues
            cleaned = cleaned.replace(',}', '}')  # Remove trailing commas
            cleaned = cleaned.replace(',]', ']')
            
            return cleaned
            
        except Exception:
            return None
    
    def _extract_individual_tasks(self, response: str) -> Optional[List[Dict]]:
        """Extract individual task objects when array parsing fails"""
        tasks = []
        
        # Find individual task objects
        import re
        task_pattern = r'\{[^{}]*"task_type"[^{}]*\}'
        matches = re.findall(task_pattern, response, re.DOTALL)
        
        for match in matches:
            try:
                # Clean the match
                clean_match = match.replace('\n', ' ').replace('\r', '')
                clean_match = ' '.join(clean_match.split())
                clean_match = clean_match.replace(',}', '}')
                
                task = json.loads(clean_match)
                tasks.append(task)
            except:
                continue
                
        return tasks if tasks else None
    
    def _validate_task_structure(self, tasks: List[Dict], risk_level: int) -> Optional[List[Dict]]:
        """Validate and sanitize task structure"""
        
        if not isinstance(tasks, list):
            return None
            
        valid_tasks = []
        required_fields = ['task_type', 'title', 'description', 'duration_days', 'difficulty', 'instructions', 'completion_criteria']
        
        valid_task_types = {
            'meditation', 'exercise', 'sleep_schedule', 'social_connection', 
            'journaling', 'breathing_exercise', 'nature_walk', 'healthy_meal', 
            'screen_break', 'gratitude_practice', 'professional_help', 
            'mindfulness', 'stress_management', 'routine_building', 
            'creative_activity', 'relaxation_technique'
        }
        
        # Ensure high-risk users get professional help
        has_professional_help = False
        
        for task in tasks:
            if not isinstance(task, dict):
                continue
                
            # Check required fields
            if not all(field in task and str(task[field]).strip() for field in required_fields):
                continue
                
            # Validate and clean task
            validated_task = self._clean_and_validate_task(task, valid_task_types)
            if validated_task:
                valid_tasks.append(validated_task)
                
                if validated_task['task_type'] == 'professional_help':
                    has_professional_help = True
        
        # Ensure high-risk users have professional help task
        if risk_level >= 7 and not has_professional_help and len(valid_tasks) > 0:
            professional_task = self._generate_professional_help_task(risk_level)
            valid_tasks.insert(0, professional_task)
        
        return valid_tasks if len(valid_tasks) >= 1 else None
    
    def _clean_and_validate_task(self, task: Dict, valid_task_types: set) -> Optional[Dict]:
        """Clean and validate individual task"""
        
        try:
            # Validate task_type
            if task.get('task_type') not in valid_task_types:
                return None
                
            # Clean and validate duration
            try:
                duration = int(task.get('duration_days', 7))
                duration = max(1, min(30, duration))  # Clamp between 1-30 days
            except (ValueError, TypeError):
                duration = 7
                
            # Validate difficulty
            difficulty = task.get('difficulty', 'medium').lower()
            if difficulty not in ['easy', 'medium', 'hard']:
                difficulty = 'medium'
                
            # Clean text fields
            cleaned_task = {
                'task_type': task['task_type'],
                'title': str(task['title']).strip()[:100],
                'description': str(task['description']).strip()[:300],
                'duration_days': duration,
                'difficulty': difficulty,
                'instructions': str(task['instructions']).strip()[:1000],
                'completion_criteria': str(task['completion_criteria']).strip()[:200]
            }
            
            # Add optional fields if present
            if 'personalization_notes' in task:
                cleaned_task['personalization_notes'] = str(task['personalization_notes']).strip()[:200]
                
            return cleaned_task
            
        except Exception as e:
            print(f"Error cleaning task: {e}")
            return None
    
    def _generate_professional_help_task(self, risk_level: int) -> Dict:
        """Generate professional help task for high-risk users"""
        
        urgency = "immediately" if risk_level >= 8 else "within 1-2 days"
        
        return {
            "task_type": "professional_help",
            "title": "Seek Professional Mental Health Support",
            "description": f"Contact a mental health professional {urgency} for assessment and support",
            "duration_days": 2,
            "difficulty": "medium",
            "instructions": f"Contact your healthcare provider, call a mental health helpline, or visit a mental health clinic {urgency}. If in immediate crisis, call emergency services (911) or go to the nearest emergency room.",
            "completion_criteria": "Make contact with a mental health professional or crisis support service"
        }
    
    def _get_emergency_preset_tasks(self, risk_level: int) -> List[Dict]:
        """ONLY used when Granite AI is completely unavailable - minimal preset tasks"""
        
        print("EMERGENCY: Using preset tasks - Granite AI unavailable")
        
        if risk_level >= 7:
            return [
                {
                    "task_type": "professional_help",
                    "title": "Emergency Professional Help",
                    "description": "Seek immediate professional mental health support",
                    "duration_days": 1,
                    "difficulty": "medium",
                    "instructions": "Contact emergency mental health services, your doctor, or call a crisis helpline immediately.",
                    "completion_criteria": "Make contact with professional help"
                }
            ]
        elif risk_level >= 4:
            return [
                {
                    "task_type": "breathing_exercise",
                    "title": "Daily Breathing Practice",
                    "description": "Use breathing exercises to manage stress and anxiety",
                    "duration_days": 7,
                    "difficulty": "easy",
                    "instructions": "Practice 4-7-8 breathing: Inhale for 4 counts, hold for 7 counts, exhale for 8 counts. Repeat 4 times, twice daily.",
                    "completion_criteria": "Complete breathing exercise twice daily for one week"
                },
                {
                    "task_type": "sleep_schedule",
                    "title": "Improve Sleep Routine",
                    "description": "Establish a consistent sleep schedule for better rest",
                    "duration_days": 14,
                    "difficulty": "medium",
                    "instructions": "Go to bed and wake up at the same time daily. Create a 30-minute wind-down routine before bed.",
                    "completion_criteria": "Maintain consistent sleep schedule for 2 weeks"
                }
            ]
        else:
            return [
                {
                    "task_type": "gratitude_practice",
                    "title": "Daily Gratitude Journal",
                    "description": "Practice gratitude to boost mood and well-being",
                    "duration_days": 14,
                    "difficulty": "easy",
                    "instructions": "Write down 3 things you're grateful for each morning. Be specific and reflect on why you appreciate them.",
                    "completion_criteria": "Complete gratitude entries for 14 consecutive days"
                },
                {
                    "task_type": "nature_walk",
                    "title": "Weekly Nature Walks",
                    "description": "Connect with nature to reduce stress and improve mood",
                    "duration_days": 21,
                    "difficulty": "easy",
                    "instructions": "Take a 20-30 minute walk in a park, garden, or natural area twice per week. Focus on your surroundings.",
                    "completion_criteria": "Complete 6 nature walks over 3 weeks"
                }
            ]
    
    def _call_granite_api(self, prompt: str, temperature: float = 0.3) -> Optional[str]:
        """Enhanced API call with better error handling and logging"""
        
        data = {
            "model": self.model,
            "prompt": prompt,
            "stream": False,
            "options": {
                "temperature": temperature,
                "top_p": 0.9,
                "num_predict": 2000,  # Increased for better task generation
                "repeat_penalty": 1.1,
                "stop": ["Human:", "Assistant:", "\n\n---"]
            }
        }
        
        try:
            print(f"πŸ€– Calling Granite API (temp={temperature})...")
            response = requests.post(self.base_url, json=data, timeout=90)
            
            if response.status_code == 200:
                result = response.json()
                ai_response = result.get('response', '').strip()
                
                if ai_response:
                    print(f"βœ“ Granite API responded ({len(ai_response)} chars)")
                    return ai_response
                else:
                    print("⚠ Granite API returned empty response")
                    return None
            else:
                print(f"❌ Granite API error: {response.status_code}")
                return None
                
        except requests.exceptions.ConnectionError:
            print("❌ Cannot connect to Granite (Ollama not running?)")
            return None
        except requests.exceptions.Timeout:
            print("❌ Granite API timeout")
            return None
        except Exception as e:
            print(f"❌ Granite API error: {e}")
            return None