File size: 3,286 Bytes
0f07ba7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 |
package worker
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"time"
cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/p2p"
"github.com/mudler/LocalAI/pkg/signals"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
"github.com/phayes/freeport"
)
type P2P struct {
WorkerFlags `embed:""`
Token string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
NoRunner bool `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
RunnerAddress string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
RunnerPort string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}
func (r *P2P) Run(ctx *cliContext.Context) error {
systemState, err := system.GetSystemState(
system.WithBackendPath(r.BackendsPath),
system.WithBackendSystemPath(r.BackendsSystemPath),
)
if err != nil {
return err
}
// Check if the token is set
// as we always need it.
if r.Token == "" {
return fmt.Errorf("Token is required")
}
port, err := freeport.GetFreePort()
if err != nil {
return err
}
address := "127.0.0.1"
c, cancel := context.WithCancel(context.Background())
defer cancel()
if r.NoRunner {
// Let override which port and address to bind if the user
// configure the llama-cpp service on its own
p := fmt.Sprint(port)
if r.RunnerAddress != "" {
address = r.RunnerAddress
}
if r.RunnerPort != "" {
p = r.RunnerPort
}
_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
xlog.Info("You need to start llama-cpp-rpc-server", "address", address, "port", p)
} else {
// Start llama.cpp directly from the version we have pre-packaged
go func() {
for {
xlog.Info("Starting llama-cpp-rpc-server", "address", address, "port", port)
grpcProcess, err := findLLamaCPPBackend(r.BackendGalleries, systemState)
if err != nil {
xlog.Error("Failed to find llama-cpp-rpc-server", "error", err)
return
}
var extraArgs []string
if r.ExtraLLamaCPPArgs != "" {
extraArgs = strings.Split(r.ExtraLLamaCPPArgs, " ")
}
args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, extraArgs...)
xlog.Debug("Starting llama-cpp-rpc-server", "address", address, "port", port, "args", args, "argCount", len(args))
cmd := exec.Command(
grpcProcess, args...,
)
cmd.Env = os.Environ()
cmd.Stderr = os.Stdout
cmd.Stdout = os.Stdout
if err := cmd.Start(); err != nil {
xlog.Error("Failed to start llama-cpp-rpc-server", "error", err, "grpcProcess", grpcProcess, "args", args)
}
cmd.Wait()
}
}()
_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
if err != nil {
return err
}
}
signals.RegisterGracefulTerminationHandler(func() {
cancel()
})
for {
time.Sleep(1 * time.Second)
}
}
|