Go client for HAProxy configuration and runtime API
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

223 lines
5.6 KiB

// Copyright 2019 HAProxy Technologies
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
package runtime
import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/haproxytech/client-native/v6/runtime/options"
)
const (
taskTimeout = 30 * time.Second
statsSocket socketType = "stats"
masterSocket socketType = "master"
)
type socketType string
// TaskResponse ...
type TaskResponse struct {
result string
err error
}
// Task has command to execute on runtime api, and response channel for result
type Task struct {
command string
response chan TaskResponse
socket socketType
}
// SingleRuntime handles one runtime API
type SingleRuntime struct {
jobs chan Task
socketPath string
worker int
process int
}
// Init must be given path to runtime socket and worker number. If in master-worker mode,
// give the path to the master socket path, and non 0 number for workers. Process is for
// nbproc > 1. In master-worker mode it's the same as the worker number, but when having
// multiple stats socket lines bound to processes then use the correct process number
func (s *SingleRuntime) Init(ctx context.Context, socketPath string, worker int, process int, opt ...options.RuntimeOptions) error {
var runtimeOptions options.RuntimeOptions
if len(opt) > 0 {
runtimeOptions = opt[0]
}
s.socketPath = socketPath
s.jobs = make(chan Task)
s.worker = worker
s.process = process
go s.handleIncomingJobs(ctx)
if !runtimeOptions.DoNotCheckRuntimeOnInit {
if runtimeOptions.AllowDelayedStartMax != nil {
now := time.Now()
var err error
for {
if _, err = s.ExecuteRaw("help"); err == nil {
break
}
time.Sleep(*runtimeOptions.AllowDelayedStartTick)
if time.Since(now) > *runtimeOptions.AllowDelayedStartMax {
return fmt.Errorf("cannot connect to runtime API %s within %s: %w", socketPath, *runtimeOptions.AllowDelayedStartMax, err)
}
}
} else {
// check if we have a valid socket
if _, err := s.ExecuteRaw("help"); err != nil {
return err
}
}
}
return nil
}
func (s *SingleRuntime) handleIncomingJobs(ctx context.Context) {
for {
select {
case job, ok := <-s.jobs:
if !ok {
return
}
result, err := s.readFromSocket(job.command, job.socket)
if err != nil {
job.response <- TaskResponse{err: err}
} else {
job.response <- TaskResponse{result: result}
}
case <-ctx.Done():
return
}
}
}
func (s *SingleRuntime) readFromSocket(command string, socket socketType) (string, error) {
var api net.Conn
var err error
if api, err = net.DialTimeout("unix", s.socketPath, taskTimeout); err != nil {
return "", err
}
defer func() {
_ = api.Close()
}()
if err = api.SetDeadline(time.Now().Add(taskTimeout)); err != nil {
return "", err
}
var fullCommand string
switch socket {
case statsSocket:
fullCommand = fmt.Sprintf("set severity-output number;%s\n", command)
if s.worker > 0 {
fullCommand = fmt.Sprintf("@%v set severity-output number;@%v %s;quit\n", s.worker, s.worker, command)
}
case masterSocket:
fullCommand = fmt.Sprintf("%s;quit", command)
}
_, err = api.Write([]byte(fullCommand))
if err != nil {
return "", err
}
// return "", nil
if api == nil {
return "", fmt.Errorf("no connection")
}
bufferSize := 1024
buf := make([]byte, bufferSize)
var data strings.Builder
for {
n, readErr := api.Read(buf)
if readErr != nil {
break
}
data.Write(buf[0:n])
}
result := strings.TrimSuffix(data.String(), "\n> ")
result = strings.TrimSuffix(result, "\n")
return result, nil //nolint:nilerr
}
// ExecuteRaw executes command on runtime API and returns raw result
func (s *SingleRuntime) ExecuteRaw(command string) (string, error) {
// allow one retry if connection breaks temporarily
return s.executeRaw(command, 1, statsSocket)
}
// Execute executes command on runtime API
func (s *SingleRuntime) Execute(command string) error {
rawdata, err := s.ExecuteRaw(command)
if err != nil {
return fmt.Errorf("%w [%s]", err, command)
}
if len(rawdata) > 5 {
switch rawdata[1:5] {
case "[3]:", "[2]:", "[1]:", "[0]:":
return fmt.Errorf("[%c] %s [%s]", rawdata[2], rawdata[5:], command)
}
}
return nil
}
func (s *SingleRuntime) ExecuteWithResponse(command string) (string, error) {
rawdata, err := s.ExecuteRaw(command)
if err != nil {
return "", fmt.Errorf("%w [%s]", err, command)
}
if len(rawdata) > 5 {
switch rawdata[1:5] {
case "[3]:", "[2]:", "[1]:", "[0]:":
return "", fmt.Errorf("[%c] %s [%s]", rawdata[2], rawdata[5:], command)
}
}
if len(rawdata) > 1 {
return rawdata[1:], nil
}
return "", nil
}
func (s *SingleRuntime) ExecuteMaster(command string) (string, error) {
// allow one retry if connection breaks temporarily
return s.executeRaw(command, 1, masterSocket)
}
func (s *SingleRuntime) executeRaw(command string, retry int, socket socketType) (string, error) {
response := make(chan TaskResponse)
task := Task{
command: command,
response: response,
socket: socket,
}
s.jobs <- task
rsp := <-response
if rsp.err != nil && retry > 0 {
retry--
return s.executeRaw(command, retry, socket)
}
return rsp.result, rsp.err
}