File size: 3,286 Bytes
0f07ba7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package worker

import (
	"context"
	"fmt"
	"os"
	"os/exec"
	"strings"
	"time"

	cliContext "github.com/mudler/LocalAI/core/cli/context"
	"github.com/mudler/LocalAI/core/p2p"
	"github.com/mudler/LocalAI/pkg/signals"
	"github.com/mudler/LocalAI/pkg/system"
	"github.com/mudler/xlog"
	"github.com/phayes/freeport"
)

type P2P struct {
	WorkerFlags        `embed:""`
	Token              string `env:"LOCALAI_TOKEN,LOCALAI_P2P_TOKEN,TOKEN" help:"P2P token to use"`
	NoRunner           bool   `env:"LOCALAI_NO_RUNNER,NO_RUNNER" help:"Do not start the llama-cpp-rpc-server"`
	RunnerAddress      string `env:"LOCALAI_RUNNER_ADDRESS,RUNNER_ADDRESS" help:"Address of the llama-cpp-rpc-server"`
	RunnerPort         string `env:"LOCALAI_RUNNER_PORT,RUNNER_PORT" help:"Port of the llama-cpp-rpc-server"`
	Peer2PeerNetworkID string `env:"LOCALAI_P2P_NETWORK_ID,P2P_NETWORK_ID" help:"Network ID for P2P mode, can be set arbitrarly by the user for grouping a set of instances" group:"p2p"`
}

func (r *P2P) Run(ctx *cliContext.Context) error {

	systemState, err := system.GetSystemState(
		system.WithBackendPath(r.BackendsPath),
		system.WithBackendSystemPath(r.BackendsSystemPath),
	)
	if err != nil {
		return err
	}

	// Check if the token is set
	// as we always need it.
	if r.Token == "" {
		return fmt.Errorf("Token is required")
	}

	port, err := freeport.GetFreePort()
	if err != nil {
		return err
	}

	address := "127.0.0.1"

	c, cancel := context.WithCancel(context.Background())
	defer cancel()

	if r.NoRunner {
		// Let override which port and address to bind if the user
		// configure the llama-cpp service on its own
		p := fmt.Sprint(port)
		if r.RunnerAddress != "" {
			address = r.RunnerAddress
		}
		if r.RunnerPort != "" {
			p = r.RunnerPort
		}

		_, err = p2p.ExposeService(c, address, p, r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
		if err != nil {
			return err
		}
		xlog.Info("You need to start llama-cpp-rpc-server", "address", address, "port", p)
	} else {
		// Start llama.cpp directly from the version we have pre-packaged
		go func() {
			for {
				xlog.Info("Starting llama-cpp-rpc-server", "address", address, "port", port)

				grpcProcess, err := findLLamaCPPBackend(r.BackendGalleries, systemState)
				if err != nil {
					xlog.Error("Failed to find llama-cpp-rpc-server", "error", err)
					return
				}

				var extraArgs []string

				if r.ExtraLLamaCPPArgs != "" {
					extraArgs = strings.Split(r.ExtraLLamaCPPArgs, " ")
				}
				args := append([]string{"--host", address, "--port", fmt.Sprint(port)}, extraArgs...)
				xlog.Debug("Starting llama-cpp-rpc-server", "address", address, "port", port, "args", args, "argCount", len(args))

				cmd := exec.Command(
					grpcProcess, args...,
				)

				cmd.Env = os.Environ()

				cmd.Stderr = os.Stdout
				cmd.Stdout = os.Stdout

				if err := cmd.Start(); err != nil {
					xlog.Error("Failed to start llama-cpp-rpc-server", "error", err, "grpcProcess", grpcProcess, "args", args)
				}

				cmd.Wait()
			}
		}()

		_, err = p2p.ExposeService(c, address, fmt.Sprint(port), r.Token, p2p.NetworkID(r.Peer2PeerNetworkID, p2p.WorkerID))
		if err != nil {
			return err
		}
	}

	signals.RegisterGracefulTerminationHandler(func() {
		cancel()
	})

	for {
		time.Sleep(1 * time.Second)
	}
}