| | package p2p |
| |
|
| | import ( |
| | "context" |
| | "errors" |
| | "fmt" |
| | "io" |
| | "net" |
| |
|
| | "github.com/mudler/LocalAI/core/schema" |
| | "github.com/mudler/edgevpn/pkg/node" |
| | "github.com/mudler/xlog" |
| | ) |
| |
|
| | func (f *FederatedServer) Start(ctx context.Context) error { |
| | n, err := NewNode(f.p2ptoken) |
| | if err != nil { |
| | return fmt.Errorf("creating a new node: %w", err) |
| | } |
| | err = n.Start(ctx) |
| | if err != nil { |
| | return fmt.Errorf("creating a new node: %w", err) |
| | } |
| |
|
| | if err := ServiceDiscoverer(ctx, n, f.p2ptoken, f.service, func(servicesID string, tunnel schema.NodeData) { |
| | xlog.Debug("Discovered node", "node", tunnel.ID) |
| | }, false); err != nil { |
| | return err |
| | } |
| |
|
| | return f.proxy(ctx, n) |
| | } |
| |
|
| | func (fs *FederatedServer) proxy(ctx context.Context, node *node.Node) error { |
| |
|
| | xlog.Info("Allocating service", "service", fs.service, "address", fs.listenAddr) |
| | |
| | l, err := net.Listen("tcp", fs.listenAddr) |
| | if err != nil { |
| | xlog.Error("Error listening", "error", err) |
| | return err |
| | } |
| |
|
| | go func() { |
| | <-ctx.Done() |
| | l.Close() |
| | }() |
| |
|
| | nodeAnnounce(ctx, node) |
| |
|
| | defer l.Close() |
| | for { |
| | select { |
| | case <-ctx.Done(): |
| | return errors.New("context canceled") |
| | default: |
| | xlog.Debug("New connection", "address", l.Addr().String()) |
| | |
| | conn, err := l.Accept() |
| | if err != nil { |
| | fmt.Println("Error accepting: ", err.Error()) |
| | continue |
| | } |
| |
|
| | |
| | go func() { |
| | workerID := "" |
| | if fs.workerTarget != "" { |
| | workerID = fs.workerTarget |
| | } else if fs.loadBalanced { |
| | xlog.Debug("Load balancing request") |
| |
|
| | workerID = fs.SelectLeastUsedServer() |
| | if workerID == "" { |
| | xlog.Debug("Least used server not found, selecting random") |
| | workerID = fs.RandomServer() |
| | } |
| | } else { |
| | workerID = fs.RandomServer() |
| | } |
| |
|
| | if workerID == "" { |
| | xlog.Error("No available nodes yet") |
| | fs.sendHTMLResponse(conn, 503, "Sorry, waiting for nodes to connect") |
| | return |
| | } |
| |
|
| | xlog.Debug("Selected node", "node", workerID) |
| | nodeData, exists := GetNode(fs.service, workerID) |
| | if !exists { |
| | xlog.Error("Node not found", "node", workerID) |
| | fs.sendHTMLResponse(conn, 404, "Node not found") |
| | return |
| | } |
| |
|
| | proxyP2PConnection(ctx, node, nodeData.ServiceID, conn) |
| | if fs.loadBalanced { |
| | fs.RecordRequest(workerID) |
| | } |
| | }() |
| | } |
| | } |
| | } |
| |
|
| | |
| | |
| | func (fs *FederatedServer) sendHTMLResponse(conn net.Conn, statusCode int, message string) { |
| | defer conn.Close() |
| |
|
| | |
| | htmlContent := fmt.Sprintf("<html><body><h1>%s</h1></body></html>\r\n", message) |
| |
|
| | |
| | response := fmt.Sprintf( |
| | "HTTP/1.1 %d %s\r\n"+ |
| | "Content-Type: text/html\r\n"+ |
| | "Connection: close\r\n"+ |
| | "\r\n"+ |
| | "%s", |
| | statusCode, getHTTPStatusText(statusCode), htmlContent, |
| | ) |
| |
|
| | |
| | _, writeErr := io.WriteString(conn, response) |
| | if writeErr != nil { |
| | xlog.Error("Error writing response to client", "error", writeErr) |
| | } |
| | } |
| |
|
| | |
| | func getHTTPStatusText(statusCode int) string { |
| | switch statusCode { |
| | case 503: |
| | return "Service Unavailable" |
| | case 404: |
| | return "Not Found" |
| | case 200: |
| | return "OK" |
| | default: |
| | return "Unknown Status" |
| | } |
| | } |
| |
|