Amlan-109
feat: Initial commit of LocalAI Amlan Edition with premium branding and personalization
750bbe6
package openresponses
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/xlog"
)
// ResponseStore provides thread-safe storage for Open Responses API responses
type ResponseStore struct {
mu sync.RWMutex
responses map[string]*StoredResponse
ttl time.Duration // Time-to-live for stored responses (0 = no expiration)
cleanupCtx context.Context
cleanupCancel context.CancelFunc
}
// StreamedEvent represents a buffered SSE event for streaming resume
type StreamedEvent struct {
SequenceNumber int `json:"sequence_number"`
EventType string `json:"event_type"`
Data []byte `json:"data"` // JSON-serialized event
}
// StoredResponse contains a complete response with its input request and output items
type StoredResponse struct {
Request *schema.OpenResponsesRequest
Response *schema.ORResponseResource
Items map[string]*schema.ORItemField // item_id -> item mapping for quick lookup
StoredAt time.Time
ExpiresAt *time.Time // nil if no expiration
// Background execution support
CancelFunc context.CancelFunc // For cancellation of background tasks
StreamEvents []StreamedEvent // Buffered events for streaming resume
StreamEnabled bool // Was created with stream=true
IsBackground bool // Was created with background=true
EventsChan chan struct{} // Signals new events for live subscribers
mu sync.RWMutex // Protect concurrent access to this response
}
var (
globalStore *ResponseStore
storeOnce sync.Once
)
// GetGlobalStore returns the singleton response store instance
func GetGlobalStore() *ResponseStore {
storeOnce.Do(func() {
globalStore = NewResponseStore(0) // Default: no TTL, will be updated from appConfig
})
return globalStore
}
// SetTTL updates the TTL for the store
// This will affect all new responses stored after this call
func (s *ResponseStore) SetTTL(ttl time.Duration) {
s.mu.Lock()
defer s.mu.Unlock()
// Stop existing cleanup loop if running
if s.cleanupCancel != nil {
s.cleanupCancel()
s.cleanupCancel = nil
s.cleanupCtx = nil
}
s.ttl = ttl
// If TTL > 0, start cleanup loop
if ttl > 0 {
s.cleanupCtx, s.cleanupCancel = context.WithCancel(context.Background())
go s.cleanupLoop(s.cleanupCtx)
}
xlog.Debug("Updated Open Responses store TTL", "ttl", ttl, "cleanup_running", ttl > 0)
}
// NewResponseStore creates a new response store with optional TTL
// If ttl is 0, responses are stored indefinitely
func NewResponseStore(ttl time.Duration) *ResponseStore {
store := &ResponseStore{
responses: make(map[string]*StoredResponse),
ttl: ttl,
}
// Start cleanup goroutine if TTL is set
if ttl > 0 {
store.cleanupCtx, store.cleanupCancel = context.WithCancel(context.Background())
go store.cleanupLoop(store.cleanupCtx)
}
return store
}
// Store stores a response with its request and items
func (s *ResponseStore) Store(responseID string, request *schema.OpenResponsesRequest, response *schema.ORResponseResource) {
s.mu.Lock()
defer s.mu.Unlock()
// Build item index for quick lookup
items := make(map[string]*schema.ORItemField)
for i := range response.Output {
item := &response.Output[i]
if item.ID != "" {
items[item.ID] = item
}
}
stored := &StoredResponse{
Request: request,
Response: response,
Items: items,
StoredAt: time.Now(),
ExpiresAt: nil,
}
// Set expiration if TTL is configured
if s.ttl > 0 {
expiresAt := time.Now().Add(s.ttl)
stored.ExpiresAt = &expiresAt
}
s.responses[responseID] = stored
xlog.Debug("Stored Open Responses response", "response_id", responseID, "items_count", len(items))
}
// Get retrieves a stored response by ID
func (s *ResponseStore) Get(responseID string) (*StoredResponse, error) {
s.mu.RLock()
defer s.mu.RUnlock()
stored, exists := s.responses[responseID]
if !exists {
return nil, fmt.Errorf("response not found: %s", responseID)
}
// Check expiration
if stored.ExpiresAt != nil && time.Now().After(*stored.ExpiresAt) {
// Expired, but we'll return it anyway and let caller handle cleanup
return nil, fmt.Errorf("response expired: %s", responseID)
}
return stored, nil
}
// GetItem retrieves a specific item from a stored response
func (s *ResponseStore) GetItem(responseID, itemID string) (*schema.ORItemField, error) {
stored, err := s.Get(responseID)
if err != nil {
return nil, err
}
item, exists := stored.Items[itemID]
if !exists {
return nil, fmt.Errorf("item not found: %s in response %s", itemID, responseID)
}
return item, nil
}
// FindItem searches for an item across all stored responses
// Returns the item and the response ID it was found in
func (s *ResponseStore) FindItem(itemID string) (*schema.ORItemField, string, error) {
s.mu.RLock()
defer s.mu.RUnlock()
now := time.Now()
for responseID, stored := range s.responses {
// Skip expired responses
if stored.ExpiresAt != nil && now.After(*stored.ExpiresAt) {
continue
}
if item, exists := stored.Items[itemID]; exists {
return item, responseID, nil
}
}
return nil, "", fmt.Errorf("item not found in any stored response: %s", itemID)
}
// Delete removes a response from storage
func (s *ResponseStore) Delete(responseID string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.responses, responseID)
xlog.Debug("Deleted Open Responses response", "response_id", responseID)
}
// Cleanup removes expired responses
func (s *ResponseStore) Cleanup() int {
if s.ttl == 0 {
return 0
}
s.mu.Lock()
defer s.mu.Unlock()
now := time.Now()
count := 0
for id, stored := range s.responses {
if stored.ExpiresAt != nil && now.After(*stored.ExpiresAt) {
delete(s.responses, id)
count++
}
}
if count > 0 {
xlog.Debug("Cleaned up expired Open Responses", "count", count)
}
return count
}
// cleanupLoop runs periodic cleanup of expired responses
func (s *ResponseStore) cleanupLoop(ctx context.Context) {
if s.ttl == 0 {
return
}
ticker := time.NewTicker(s.ttl / 2) // Cleanup at half TTL interval
defer ticker.Stop()
for {
select {
case <-ctx.Done():
xlog.Debug("Stopped Open Responses store cleanup loop")
return
case <-ticker.C:
s.Cleanup()
}
}
}
// Count returns the number of stored responses
func (s *ResponseStore) Count() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.responses)
}
// StoreBackground stores a background response with cancel function and optional streaming support
func (s *ResponseStore) StoreBackground(responseID string, request *schema.OpenResponsesRequest, response *schema.ORResponseResource, cancelFunc context.CancelFunc, streamEnabled bool) {
s.mu.Lock()
defer s.mu.Unlock()
// Build item index for quick lookup
items := make(map[string]*schema.ORItemField)
for i := range response.Output {
item := &response.Output[i]
if item.ID != "" {
items[item.ID] = item
}
}
stored := &StoredResponse{
Request: request,
Response: response,
Items: items,
StoredAt: time.Now(),
ExpiresAt: nil,
CancelFunc: cancelFunc,
StreamEvents: []StreamedEvent{},
StreamEnabled: streamEnabled,
IsBackground: true,
EventsChan: make(chan struct{}, 100), // Buffered channel for event notifications
}
// Set expiration if TTL is configured
if s.ttl > 0 {
expiresAt := time.Now().Add(s.ttl)
stored.ExpiresAt = &expiresAt
}
s.responses[responseID] = stored
xlog.Debug("Stored background Open Responses response", "response_id", responseID, "stream_enabled", streamEnabled)
}
// UpdateStatus updates the status of a stored response
func (s *ResponseStore) UpdateStatus(responseID string, status string, completedAt *int64) error {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("response not found: %s", responseID)
}
stored.mu.Lock()
defer stored.mu.Unlock()
stored.Response.Status = status
stored.Response.CompletedAt = completedAt
xlog.Debug("Updated response status", "response_id", responseID, "status", status)
return nil
}
// UpdateResponse updates the entire response object for a stored response
func (s *ResponseStore) UpdateResponse(responseID string, response *schema.ORResponseResource) error {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("response not found: %s", responseID)
}
stored.mu.Lock()
defer stored.mu.Unlock()
// Rebuild item index
items := make(map[string]*schema.ORItemField)
for i := range response.Output {
item := &response.Output[i]
if item.ID != "" {
items[item.ID] = item
}
}
stored.Response = response
stored.Items = items
xlog.Debug("Updated response", "response_id", responseID, "status", response.Status, "items_count", len(items))
return nil
}
// AppendEvent appends a streaming event to the buffer for resume support
func (s *ResponseStore) AppendEvent(responseID string, event *schema.ORStreamEvent) error {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return fmt.Errorf("response not found: %s", responseID)
}
// Serialize the event
data, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("failed to marshal event: %w", err)
}
stored.mu.Lock()
stored.StreamEvents = append(stored.StreamEvents, StreamedEvent{
SequenceNumber: event.SequenceNumber,
EventType: event.Type,
Data: data,
})
stored.mu.Unlock()
// Notify any subscribers of new event
select {
case stored.EventsChan <- struct{}{}:
default:
// Channel full, subscribers will catch up
}
return nil
}
// GetEventsAfter returns all events with sequence number greater than startingAfter
func (s *ResponseStore) GetEventsAfter(responseID string, startingAfter int) ([]StreamedEvent, error) {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("response not found: %s", responseID)
}
stored.mu.RLock()
defer stored.mu.RUnlock()
var result []StreamedEvent
for _, event := range stored.StreamEvents {
if event.SequenceNumber > startingAfter {
result = append(result, event)
}
}
return result, nil
}
// Cancel cancels a background response if it's still in progress
func (s *ResponseStore) Cancel(responseID string) (*schema.ORResponseResource, error) {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("response not found: %s", responseID)
}
stored.mu.Lock()
defer stored.mu.Unlock()
// If already in a terminal state, just return the response (idempotent)
status := stored.Response.Status
if status == schema.ORStatusCompleted || status == schema.ORStatusFailed ||
status == schema.ORStatusIncomplete || status == schema.ORStatusCancelled {
xlog.Debug("Response already in terminal state", "response_id", responseID, "status", status)
return stored.Response, nil
}
// Cancel the context if available
if stored.CancelFunc != nil {
stored.CancelFunc()
xlog.Debug("Cancelled background response", "response_id", responseID)
}
// Update status to cancelled
now := time.Now().Unix()
stored.Response.Status = schema.ORStatusCancelled
stored.Response.CompletedAt = &now
return stored.Response, nil
}
// GetEventsChan returns the events notification channel for a response
func (s *ResponseStore) GetEventsChan(responseID string) (chan struct{}, error) {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return nil, fmt.Errorf("response not found: %s", responseID)
}
return stored.EventsChan, nil
}
// IsStreamEnabled checks if a response was created with streaming enabled
func (s *ResponseStore) IsStreamEnabled(responseID string) (bool, error) {
s.mu.RLock()
stored, exists := s.responses[responseID]
s.mu.RUnlock()
if !exists {
return false, fmt.Errorf("response not found: %s", responseID)
}
stored.mu.RLock()
defer stored.mu.RUnlock()
return stored.StreamEnabled, nil
}