|
|
package application |
|
|
|
|
|
import ( |
|
|
"context" |
|
|
"fmt" |
|
|
"net" |
|
|
"slices" |
|
|
"time" |
|
|
|
|
|
"github.com/google/uuid" |
|
|
"github.com/mudler/LocalAI/core/gallery" |
|
|
"github.com/mudler/LocalAI/core/p2p" |
|
|
"github.com/mudler/LocalAI/core/schema" |
|
|
"github.com/mudler/LocalAI/core/services" |
|
|
|
|
|
"github.com/mudler/edgevpn/pkg/node" |
|
|
"github.com/mudler/xlog" |
|
|
) |
|
|
|
|
|
func (a *Application) StopP2P() error { |
|
|
if a.p2pCancel != nil { |
|
|
a.p2pCancel() |
|
|
a.p2pCancel = nil |
|
|
a.p2pCtx = nil |
|
|
|
|
|
time.Sleep(200 * time.Millisecond) |
|
|
} |
|
|
return nil |
|
|
} |
|
|
|
|
|
func (a *Application) StartP2P() error { |
|
|
|
|
|
if a.applicationConfig.P2PToken == "" { |
|
|
return fmt.Errorf("P2P token is not set") |
|
|
} |
|
|
|
|
|
networkID := a.applicationConfig.P2PNetworkID |
|
|
|
|
|
ctx, cancel := context.WithCancel(a.ApplicationConfig().Context) |
|
|
a.p2pCtx = ctx |
|
|
a.p2pCancel = cancel |
|
|
|
|
|
var n *node.Node |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if a.applicationConfig.Federated { |
|
|
_, port, err := net.SplitHostPort(a.applicationConfig.APIAddress) |
|
|
if err != nil { |
|
|
return err |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
node, err := p2p.ExposeService(ctx, "localhost", port, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID)) |
|
|
if err != nil { |
|
|
return err |
|
|
} |
|
|
|
|
|
if err := p2p.ServiceDiscoverer(ctx, node, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.FederatedID), nil, false); err != nil { |
|
|
return err |
|
|
} |
|
|
|
|
|
n = node |
|
|
|
|
|
if err := a.p2pSync(ctx, node); err != nil { |
|
|
return err |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if n == nil { |
|
|
node, err := p2p.NewNode(a.applicationConfig.P2PToken) |
|
|
if err != nil { |
|
|
return err |
|
|
} |
|
|
err = node.Start(ctx) |
|
|
if err != nil { |
|
|
return fmt.Errorf("starting new node: %w", err) |
|
|
} |
|
|
n = node |
|
|
} |
|
|
|
|
|
|
|
|
xlog.Info("Starting P2P server discovery...") |
|
|
if err := p2p.ServiceDiscoverer(ctx, n, a.applicationConfig.P2PToken, p2p.NetworkID(networkID, p2p.WorkerID), func(serviceID string, node schema.NodeData) { |
|
|
var tunnelAddresses []string |
|
|
for _, v := range p2p.GetAvailableNodes(p2p.NetworkID(networkID, p2p.WorkerID)) { |
|
|
if v.IsOnline() { |
|
|
tunnelAddresses = append(tunnelAddresses, v.TunnelAddress) |
|
|
} else { |
|
|
xlog.Info("Node is offline", "node", v.ID) |
|
|
} |
|
|
} |
|
|
if a.applicationConfig.TunnelCallback != nil { |
|
|
a.applicationConfig.TunnelCallback(tunnelAddresses) |
|
|
} |
|
|
}, true); err != nil { |
|
|
return err |
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (a *Application) RestartP2P() error { |
|
|
a.p2pMutex.Lock() |
|
|
defer a.p2pMutex.Unlock() |
|
|
|
|
|
|
|
|
if a.p2pCancel != nil { |
|
|
a.p2pCancel() |
|
|
a.p2pCancel = nil |
|
|
a.p2pCtx = nil |
|
|
|
|
|
time.Sleep(200 * time.Millisecond) |
|
|
} |
|
|
|
|
|
appConfig := a.ApplicationConfig() |
|
|
|
|
|
|
|
|
if appConfig.P2PToken == "" { |
|
|
return fmt.Errorf("P2P token is not set") |
|
|
} |
|
|
|
|
|
|
|
|
ctx, cancel := context.WithCancel(appConfig.Context) |
|
|
a.p2pCtx = ctx |
|
|
a.p2pCancel = cancel |
|
|
|
|
|
|
|
|
address := appConfig.APIAddress |
|
|
if address == "" { |
|
|
address = "127.0.0.1:8080" |
|
|
} |
|
|
|
|
|
|
|
|
go func() { |
|
|
if err := a.StartP2P(); err != nil { |
|
|
xlog.Error("Failed to start P2P stack", "error", err) |
|
|
cancel() |
|
|
} |
|
|
}() |
|
|
xlog.Info("P2P stack restarted with new settings") |
|
|
|
|
|
return nil |
|
|
} |
|
|
|
|
|
func syncState(ctx context.Context, n *node.Node, app *Application) error { |
|
|
xlog.Debug("[p2p-sync] Syncing state") |
|
|
|
|
|
whatWeHave := []string{} |
|
|
for _, model := range app.ModelConfigLoader().GetAllModelsConfigs() { |
|
|
whatWeHave = append(whatWeHave, model.Name) |
|
|
} |
|
|
|
|
|
ledger, _ := n.Ledger() |
|
|
currentData := ledger.CurrentData() |
|
|
xlog.Debug("[p2p-sync] Current data", "data", currentData) |
|
|
data, exists := ledger.GetKey("shared_state", "models") |
|
|
if !exists { |
|
|
ledger.AnnounceUpdate(ctx, time.Minute, "shared_state", "models", whatWeHave) |
|
|
xlog.Debug("No models found in the ledger, announced our models", "models", whatWeHave) |
|
|
} |
|
|
|
|
|
models := []string{} |
|
|
if err := data.Unmarshal(&models); err != nil { |
|
|
xlog.Warn("error unmarshalling models", "error", err) |
|
|
return nil |
|
|
} |
|
|
|
|
|
xlog.Debug("[p2p-sync] Models comparison", "ourModels", whatWeHave, "ledgerModels", models) |
|
|
|
|
|
|
|
|
whatIsNotThere := []string{} |
|
|
for _, model := range whatWeHave { |
|
|
if !slices.Contains(models, model) { |
|
|
whatIsNotThere = append(whatIsNotThere, model) |
|
|
} |
|
|
} |
|
|
if len(whatIsNotThere) > 0 { |
|
|
xlog.Debug("[p2p-sync] Announcing our models", "models", append(models, whatIsNotThere...)) |
|
|
ledger.AnnounceUpdate( |
|
|
ctx, |
|
|
1*time.Minute, |
|
|
"shared_state", |
|
|
"models", |
|
|
append(models, whatIsNotThere...), |
|
|
) |
|
|
} |
|
|
|
|
|
|
|
|
for _, model := range models { |
|
|
if slices.Contains(whatWeHave, model) { |
|
|
xlog.Debug("[p2p-sync] Model is already present in this instance", "model", model) |
|
|
continue |
|
|
} |
|
|
|
|
|
|
|
|
xlog.Info("[p2p-sync] Installing model which is not present in this instance", "model", model) |
|
|
|
|
|
uuid, err := uuid.NewUUID() |
|
|
if err != nil { |
|
|
xlog.Error("error generating UUID", "error", err) |
|
|
continue |
|
|
} |
|
|
|
|
|
app.GalleryService().ModelGalleryChannel <- services.GalleryOp[gallery.GalleryModel, gallery.ModelConfig]{ |
|
|
ID: uuid.String(), |
|
|
GalleryElementName: model, |
|
|
Galleries: app.ApplicationConfig().Galleries, |
|
|
BackendGalleries: app.ApplicationConfig().BackendGalleries, |
|
|
} |
|
|
} |
|
|
|
|
|
return nil |
|
|
} |
|
|
|
|
|
func (a *Application) p2pSync(ctx context.Context, n *node.Node) error { |
|
|
go func() { |
|
|
for { |
|
|
select { |
|
|
case <-ctx.Done(): |
|
|
return |
|
|
case <-time.After(1 * time.Minute): |
|
|
if err := syncState(ctx, n, a); err != nil { |
|
|
xlog.Error("error syncing state", "error", err) |
|
|
} |
|
|
} |
|
|
|
|
|
} |
|
|
}() |
|
|
return nil |
|
|
} |
|
|
|