From 9bd696e2de75fa2b06801216d63a38084a2338ea Mon Sep 17 00:00:00 2001 From: Nick Ethier Date: Fri, 12 Oct 2018 14:31:33 -0400 Subject: [PATCH] drivers/mock: start mock driver implementation --- drivers/mock/driver.go | 370 +++++++++++++++++++++++++++++++++++++++++ drivers/mock/handle.go | 88 ++++++++++ drivers/mock/state.go | 33 ++++ 3 files changed, 491 insertions(+) create mode 100644 drivers/mock/driver.go create mode 100644 drivers/mock/handle.go create mode 100644 drivers/mock/state.go diff --git a/drivers/mock/driver.go b/drivers/mock/driver.go new file mode 100644 index 000000000..141049865 --- /dev/null +++ b/drivers/mock/driver.go @@ -0,0 +1,370 @@ +package mock + +import ( + "context" + "errors" + "fmt" + "strconv" + "strings" + "time" + + hclog "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/drivers/shared/eventer" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" + "github.com/hashicorp/nomad/plugins/drivers" + "github.com/hashicorp/nomad/plugins/shared/hclspec" +) + +const ( + // pluginName is the name of the plugin + pluginName = "mock" + + // fingerprintPeriod is the interval at which the driver will send fingerprint responses + fingerprintPeriod = 30 * time.Second +) + +var ( + // pluginInfo is the response returned for the PluginInfo RPC + pluginInfo = &base.PluginInfoResponse{ + Type: base.PluginTypeDriver, + PluginApiVersion: "0.0.1", + PluginVersion: "0.1.0", + Name: pluginName, + } + + // configSpec is the hcl specification returned by the ConfigSchema RPC + configSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "shutdown_periodic_after": hclspec.NewDefault( + hclspec.NewAttr("shutdown_periodic_after", "bool", false), + hclspec.NewLiteral("false"), + ), + "shutdown_periodic_duration": hclspec.NewAttr("shutdown_periodic_duration", "number", false), + }) + + // taskConfigSpec is the hcl specification for the driver config section of + // a task within a job. It is returned in the TaskConfigSchema RPC + taskConfigSpec = hclspec.NewObject(map[string]*hclspec.Spec{ + "start_error": hclspec.NewAttr("start_error", "string", false), + "start_error_recoverable": hclspec.NewAttr("start_error_recoverable", "bool", false), + "start_block_for": hclspec.NewAttr("start_block_for", "number", false), + "kill_after": hclspec.NewAttr("kill_after", "number", false), + "run_for": hclspec.NewAttr("run_for", "number", false), + "exit_code": hclspec.NewAttr("exit_code", "number", false), + "exit_signal": hclspec.NewAttr("exit_signal", "number", false), + "exit_err_msg": hclspec.NewAttr("exit_err_msg", "string", false), + "signal_err": hclspec.NewAttr("signal_err", "string", false), + "driver_ip": hclspec.NewAttr("driver_ip", "string", false), + "driver_advertise": hclspec.NewAttr("driver_advertise", "bool", false), + "driver_port_map": hclspec.NewAttr("driver_port_map", "string", false), + "stdout_string": hclspec.NewAttr("stdout_string", "string", false), + "stdout_repeat": hclspec.NewAttr("stdout_repeat", "number", false), + "stdout_repeat_duration": hclspec.NewAttr("stdout_repeat_duration", "number", false), + }) + + // capabilities is returned by the Capabilities RPC and indicates what + // optional features this driver supports + capabilities = &drivers.Capabilities{ + SendSignals: true, + Exec: true, + FSIsolation: cstructs.FSIsolationNone, + } +) + +// Driver is a mock DriverPlugin implementation +type Driver struct { + // eventer is used to handle multiplexing of TaskEvents calls such that an + // event can be broadcast to all callers + eventer *eventer.Eventer + + // config is the driver configuration set by the SetConfig RPC + config *Config + + // tasks is the in memory datastore mapping taskIDs to mockDriverHandles + tasks *taskStore + + // ctx is the context for the driver. It is passed to other subsystems to + // coordinate shutdown + ctx context.Context + + // signalShutdown is called when the driver is shutting down and cancels the + // ctx passed to any subsystems + signalShutdown context.CancelFunc + + shutdownFingerprintTime time.Time + + // logger will log to the plugin output which is usually an 'executor.out' + // file located in the root of the TaskDir + logger hclog.Logger +} + +// Config is the configuration for the driver that applies to all tasks +type Config struct { + // ShutdownPeriodicAfter is a toggle that can be used during tests to + // "stop" a previously-functioning driver, allowing for testing of periodic + // drivers and fingerprinters + ShutdownPeriodicAfter bool `cty:"shutdown_periodic_after"` + + // ShutdownPeriodicDuration is a option that can be used during tests + // to "stop" a previously functioning driver after the specified duration + // for testing of periodic drivers and fingerprinters. + ShutdownPeriodicDuration time.Duration `cty:"shutdown_periodic_duration"` +} + +// TaskConfig is the driver configuration of a task within a job +type TaskConfig struct { + + // StartErr specifies the error that should be returned when starting the + // mock driver. + StartErr string `cty:"start_error"` + + // StartErrRecoverable marks the error returned is recoverable + StartErrRecoverable bool `cty:"start_error_recoverable"` + + // StartBlockFor specifies a duration in which to block before returning + StartBlockFor time.Duration `cty:"start_block_for"` + + // KillAfter is the duration after which the mock driver indicates the task + // has exited after getting the initial SIGINT signal + KillAfter time.Duration `cty:"kill_after"` + + // RunFor is the duration for which the fake task runs for. After this + // period the MockDriver responds to the task running indicating that the + // task has terminated + RunFor time.Duration `cty:"run_for"` + + // ExitCode is the exit code with which the MockDriver indicates the task + // has exited + ExitCode int `cty:"exit_code"` + + // ExitSignal is the signal with which the MockDriver indicates the task has + // been killed + ExitSignal int `cty:"exit_signal"` + + // ExitErrMsg is the error message that the task returns while exiting + ExitErrMsg string `cty:"exit_err_msg"` + + // SignalErr is the error message that the task returns if signalled + SignalErr string `cty:"signal_error"` + + // DriverIP will be returned as the DriverNetwork.IP from Start() + DriverIP string `cty:"driver_ip"` + + // DriverAdvertise will be returned as DriverNetwork.AutoAdvertise from + // Start(). + DriverAdvertise bool `cty:"driver_advertise"` + + // DriverPortMap will parse a label:number pair and return it in + // DriverNetwork.PortMap from Start(). + DriverPortMap string `cty:"driver_port_map"` + + // StdoutString is the string that should be sent to stdout + StdoutString string `cty:"stdout_string"` + + // StdoutRepeat is the number of times the output should be sent. + StdoutRepeat int `cty:"stdout_repeat"` + + // StdoutRepeatDur is the duration between repeated outputs. + StdoutRepeatDur time.Duration `cty:"stdout_repeat_duration"` +} + +type MockTaskState struct { + TaskConfig *drivers.TaskConfig + StartedAt time.Time +} + +func (d *Driver) PluginInfo() (*base.PluginInfoResponse, error) { + return pluginInfo, nil +} + +func (d *Driver) ConfigSchema() (*hclspec.Spec, error) { + return configSpec, nil +} + +func (d *Driver) SetConfig(data []byte) error { + var config Config + if err := base.MsgPackDecode(data, &config); err != nil { + return err + } + + d.config = &config + if d.config.ShutdownPeriodicAfter { + d.shutdownFingerprintTime = time.Now().Add(d.config.ShutdownPeriodicDuration) + } + return nil +} + +func (d *Driver) TaskConfigSchema() (*hclspec.Spec, error) { + return taskConfigSpec, nil +} + +func (d *Driver) Capabilities() (*drivers.Capabilities, error) { + return capabilities, nil +} + +func (d *Driver) Fingerprint(ctx context.Context) (<-chan *drivers.Fingerprint, error) { + ch := make(chan *drivers.Fingerprint) + go d.handleFingerprint(ctx, ch) + return ch, nil +} + +func (d *Driver) handleFingerprint(ctx context.Context, ch chan *drivers.Fingerprint) { + ticker := time.NewTimer(0) + for { + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-ticker.C: + ticker.Reset(fingerprintPeriod) + ch <- d.buildFingerprint() + } + } +} + +func (d *Driver) buildFingerprint() *drivers.Fingerprint { + var health drivers.HealthState + var desc string + attrs := map[string]string{} + if !d.shutdownFingerprintTime.IsZero() && time.Now().After(d.shutdownFingerprintTime) { + health = drivers.HealthStateUndetected + desc = "mock disabled" + } else { + health = drivers.HealthStateHealthy + attrs["driver.mock"] = "1" + desc = "mock enabled" + } + + return &drivers.Fingerprint{ + Attributes: map[string]string{}, + Health: health, + HealthDescription: desc, + } +} + +func (d *Driver) RecoverTask(*drivers.TaskHandle) error { + panic("not implemented") +} + +func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *cstructs.DriverNetwork, error) { + var driverConfig TaskConfig + if err := cfg.DecodeDriverConfig(&driverConfig); err != nil { + return nil, nil, err + } + + if driverConfig.StartBlockFor != 0 { + time.Sleep(driverConfig.StartBlockFor) + } + + if driverConfig.StartErr != "" { + return nil, nil, structs.NewRecoverableError(errors.New(driverConfig.StartErr), driverConfig.StartErrRecoverable) + } + + // Create the driver network + net := &cstructs.DriverNetwork{ + IP: driverConfig.DriverIP, + AutoAdvertise: driverConfig.DriverAdvertise, + } + if raw := driverConfig.DriverPortMap; len(raw) > 0 { + parts := strings.Split(raw, ":") + if len(parts) != 2 { + return nil, nil, fmt.Errorf("malformed port map: %q", raw) + } + port, err := strconv.Atoi(parts[1]) + if err != nil { + return nil, nil, fmt.Errorf("malformed port map: %q -- error: %v", raw, err) + } + net.PortMap = map[string]int{parts[0]: port} + } + + h := &mockTaskHandle{ + task: cfg, + runFor: driverConfig.RunFor, + killAfter: driverConfig.KillAfter, + exitCode: driverConfig.ExitCode, + exitSignal: driverConfig.ExitSignal, + stdoutString: driverConfig.StdoutString, + stdoutRepeat: driverConfig.StdoutRepeat, + stdoutRepeatDur: driverConfig.StdoutRepeatDur, + logger: d.logger, + doneCh: make(chan struct{}), + } + if driverConfig.ExitErrMsg != "" { + h.exitErr = errors.New(driverConfig.ExitErrMsg) + } + if driverConfig.SignalErr != "" { + h.signalErr = fmt.Errorf(driverConfig.SignalErr) + } + + driverState := MockTaskState{ + TaskConfig: cfg, + StartedAt: h.startedAt, + } + handle := drivers.NewTaskHandle(pluginName) + handle.Config = cfg + if err := handle.SetDriverState(&driverState); err != nil { + d.logger.Error("failed to start task, error setting driver state", "error", err) + return nil, nil, fmt.Errorf("failed to set driver state: %v", err) + } + + d.tasks.Set(cfg.ID, h) + + d.logger.Debug("starting task", "name", cfg.Name) + go h.run() + return handle, net, nil + +} + +func (d *Driver) WaitTask(ctx context.Context, taskID string) (<-chan *drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + ch := make(chan *drivers.ExitResult) + go d.handleWait(ctx, handle, ch) + + return ch, nil + +} +func (d *Driver) handleWait(ctx context.Context, handle *mockTaskHandle, ch chan *drivers.ExitResult) { + defer close(ch) + + select { + case <-ctx.Done(): + return + case <-d.ctx.Done(): + return + case <-handle.doneCh: + ch <- handle.exitResult + } +} +func (d *Driver) StopTask(taskID string, timeout time.Duration, signal string) error { + panic("not implemented") +} + +func (d *Driver) DestroyTask(taskID string, force bool) error { + panic("not implemented") +} + +func (d *Driver) InspectTask(taskID string) (*drivers.TaskStatus, error) { + panic("not implemented") +} + +func (d *Driver) TaskStats(taskID string) (*cstructs.TaskResourceUsage, error) { + panic("not implemented") +} + +func (d *Driver) TaskEvents(context.Context) (<-chan *drivers.TaskEvent, error) { + panic("not implemented") +} + +func (d *Driver) SignalTask(taskID string, signal string) error { + panic("not implemented") +} + +func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { + panic("not implemented") +} diff --git a/drivers/mock/handle.go b/drivers/mock/handle.go new file mode 100644 index 000000000..2296610a1 --- /dev/null +++ b/drivers/mock/handle.go @@ -0,0 +1,88 @@ +package mock + +import ( + "io" + "time" + + hclog "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/lib/fifo" + "github.com/hashicorp/nomad/plugins/drivers" +) + +// mockTaskHandle is a task handler which supervises a mock task +type mockTaskHandle struct { + logger hclog.Logger + + runFor time.Duration + killAfter time.Duration + killTimeout time.Duration + exitCode int + exitSignal int + exitErr error + signalErr error + stdoutString string + stdoutRepeat int + stdoutRepeatDur time.Duration + + doneCh chan struct{} + + task *drivers.TaskConfig + procState drivers.TaskState + startedAt time.Time + completedAt time.Time + exitResult *drivers.ExitResult +} + +func (h *mockTaskHandle) run() { + + // Setup logging output + if h.stdoutString != "" { + go h.handleLogging() + } + + timer := time.NewTimer(h.runFor) + defer timer.Stop() + for { + select { + case <-timer.C: + select { + case <-h.doneCh: + // already closed + default: + close(h.doneCh) + } + case <-h.doneCh: + h.logger.Debug("finished running task", "name", h.task.Name) + h.exitResult = &drivers.ExitResult{ + ExitCode: h.exitCode, + Signal: h.exitSignal, + Err: h.exitErr, + } + return + } + } +} + +func (h *mockTaskHandle) handleLogging() { + stdout, err := fifo.Open(h.task.StdoutPath) + if err != nil { + h.exitErr = err + close(h.doneCh) + h.logger.Error("failed to write to stdout: %v", err) + return + } + + for i := 0; i < h.stdoutRepeat; i++ { + select { + case <-h.doneCh: + return + case <-time.After(h.stdoutRepeatDur): + if _, err := io.WriteString(stdout, h.stdoutString); err != nil { + h.exitErr = err + close(h.doneCh) + h.logger.Error("failed to write to stdout", "error", err) + return + } + } + } +} diff --git a/drivers/mock/state.go b/drivers/mock/state.go new file mode 100644 index 000000000..b3da44016 --- /dev/null +++ b/drivers/mock/state.go @@ -0,0 +1,33 @@ +package mock + +import ( + "sync" +) + +type taskStore struct { + store map[string]*mockTaskHandle + lock sync.RWMutex +} + +func newTaskStore() *taskStore { + return &taskStore{store: map[string]*mockTaskHandle{}} +} + +func (ts *taskStore) Set(id string, handle *mockTaskHandle) { + ts.lock.Lock() + defer ts.lock.Unlock() + ts.store[id] = handle +} + +func (ts *taskStore) Get(id string) (*mockTaskHandle, bool) { + ts.lock.RLock() + defer ts.lock.RUnlock() + t, ok := ts.store[id] + return t, ok +} + +func (ts *taskStore) Delete(id string) { + ts.lock.Lock() + defer ts.lock.Unlock() + delete(ts.store, id) +}