clientv2: base driver plugin (#4671)

Driver plugin framework to facilitate development of driver plugins.

Implementing plugins only need to implement the DriverPlugin interface.
The framework proxies this interface to the go-plugin GRPC interface generated
from the driver.proto spec.

A testing harness is provided to allow implementing drivers to test the full
lifecycle of the driver plugin. An example use:

func TestMyDriver(t *testing.T) {
    harness := NewDriverHarness(t, &MyDiverPlugin{})
    // The harness implements the DriverPlugin interface and can be used as such
    taskHandle, err := harness.StartTask(...)
}
This commit is contained in:
Nick Ethier
2018-09-26 13:33:37 -04:00
committed by Michael Schurter
parent 78c15dcaa5
commit e2bf0a388e
17 changed files with 1988 additions and 467 deletions

View File

@@ -68,7 +68,7 @@ type Executor interface {
// Version returns the executor API version
Version() (*ExecutorVersion, error)
// Stats fetchs process usage stats for the executor and each pid if availble
// Stats fetchs process usage stats for the executor and each pid if available
Stats() (*cstructs.TaskResourceUsage, error)
// Signal sends the given signal to the user process

View File

@@ -61,3 +61,8 @@ var MsgpackHandle = func() *codec.MsgpackHandle {
func MsgPackDecode(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out)
}
// MsgPackEncode is used to encode an object to MsgPack
func MsgPackEncode(b *[]byte, in interface{}) error {
return codec.NewEncoderBytes(b, MsgpackHandle).Encode(in)
}

302
plugins/drivers/client.go Normal file
View File

@@ -0,0 +1,302 @@
package drivers
import (
"errors"
"fmt"
"io"
"time"
"github.com/golang/protobuf/ptypes"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/drivers/proto"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"golang.org/x/net/context"
)
var _ DriverPlugin = &driverPluginClient{}
type driverPluginClient struct {
base.BasePluginClient
client proto.DriverClient
logger hclog.Logger
}
func (d *driverPluginClient) TaskConfigSchema() (*hclspec.Spec, error) {
req := &proto.TaskConfigSchemaRequest{}
resp, err := d.client.TaskConfigSchema(context.Background(), req)
if err != nil {
return nil, err
}
return resp.Spec, nil
}
func (d *driverPluginClient) Capabilities() (*Capabilities, error) {
req := &proto.CapabilitiesRequest{}
resp, err := d.client.Capabilities(context.Background(), req)
if err != nil {
return nil, err
}
caps := &Capabilities{}
if resp.Capabilities != nil {
caps.SendSignals = resp.Capabilities.SendSignals
caps.Exec = resp.Capabilities.Exec
switch resp.Capabilities.FsIsolation {
case proto.DriverCapabilities_NONE:
caps.FSIsolation = FSIsolationNone
case proto.DriverCapabilities_CHROOT:
caps.FSIsolation = FSIsolationChroot
case proto.DriverCapabilities_IMAGE:
caps.FSIsolation = FSIsolationImage
default:
caps.FSIsolation = FSIsolationNone
}
}
return caps, nil
}
// Fingerprint the driver, return a chan that will be pushed to periodically and on changes to health
func (d *driverPluginClient) Fingerprint(ctx context.Context) (<-chan *Fingerprint, error) {
req := &proto.FingerprintRequest{}
stream, err := d.client.Fingerprint(context.Background(), req)
if err != nil {
return nil, err
}
ch := make(chan *Fingerprint)
go d.handleFingerprint(ch, stream)
return ch, nil
}
func (d *driverPluginClient) handleFingerprint(ch chan *Fingerprint, stream proto.Driver_FingerprintClient) {
defer close(ch)
for {
pb, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
d.logger.Error("error receiving stream from Fingerprint driver RPC", "error", err)
ch <- &Fingerprint{Err: fmt.Errorf("error from RPC stream: %v", err)}
break
}
f := &Fingerprint{
Attributes: pb.Attributes,
Health: healthStateFromProto(pb.Health),
HealthDescription: pb.HealthDescription,
}
ch <- f
}
}
// RecoverTask does internal state recovery to be able to control the task of
// the given TaskHandle
func (d *driverPluginClient) RecoverTask(h *TaskHandle) error {
req := &proto.RecoverTaskRequest{Handle: taskHandleToProto(h)}
_, err := d.client.RecoverTask(context.Background(), req)
return err
}
// StartTask starts execution of a task with the given TaskConfig. A TaskHandle
// is returned to the caller that can be used to recover state of the task,
// should the driver crash or exit prematurely.
func (d *driverPluginClient) StartTask(c *TaskConfig) (*TaskHandle, error) {
req := &proto.StartTaskRequest{
Task: taskConfigToProto(c),
}
resp, err := d.client.StartTask(context.Background(), req)
if err != nil {
return nil, err
}
return taskHandleFromProto(resp.Handle), nil
}
// WaitTask returns a channel that will have an ExitResult pushed to it once when the task
// exits on its own or is killed. If WaitTask is called after the task has exited, the channel
// will immedialy return the ExitResult. WaitTask can be called multiple times for
// the same task without issue.
func (d *driverPluginClient) WaitTask(ctx context.Context, id string) (<-chan *ExitResult, error) {
ch := make(chan *ExitResult)
go d.handleWaitTask(ctx, id, ch)
return ch, nil
}
func (d *driverPluginClient) handleWaitTask(ctx context.Context, id string, ch chan *ExitResult) {
defer close(ch)
var result ExitResult
req := &proto.WaitTaskRequest{
TaskId: id,
}
resp, err := d.client.WaitTask(ctx, req)
if err != nil {
result.Err = err
} else {
result.ExitCode = int(resp.Result.ExitCode)
result.Signal = int(resp.Result.Signal)
result.OOMKilled = resp.Result.OomKilled
if len(resp.Err) > 0 {
result.Err = errors.New(resp.Err)
}
}
ch <- &result
}
// StopTask stops the task with the given taskID. A timeout and signal can be
// given to control a graceful termination of the task. The driver will send the
// given signal to the task and wait for the given timeout for it to exit. If the
// task does not exit within the timeout it will be forcefully killed.
func (d *driverPluginClient) StopTask(taskID string, timeout time.Duration, signal string) error {
req := &proto.StopTaskRequest{
TaskId: taskID,
Timeout: ptypes.DurationProto(timeout),
Signal: signal,
}
_, err := d.client.StopTask(context.Background(), req)
return err
}
// DestroyTask removes the task from the driver's in memory state. The task
// cannot be running unless force is set to true. If force is set to true the
// driver will forcefully terminate the task before removing it.
func (d *driverPluginClient) DestroyTask(taskID string, force bool) error {
req := &proto.DestroyTaskRequest{
TaskId: taskID,
Force: force,
}
_, err := d.client.DestroyTask(context.Background(), req)
return err
}
// InspectTask returns status information for a task
func (d *driverPluginClient) InspectTask(taskID string) (*TaskStatus, error) {
req := &proto.InspectTaskRequest{TaskId: taskID}
resp, err := d.client.InspectTask(context.Background(), req)
if err != nil {
return nil, err
}
status, err := taskStatusFromProto(resp.Task)
if err != nil {
return nil, err
}
if resp.Driver != nil {
status.DriverAttributes = resp.Driver.Attributes
}
if resp.NetworkOverride != nil {
status.NetworkOverride = &NetworkOverride{
PortMap: resp.NetworkOverride.PortMap,
Addr: resp.NetworkOverride.Addr,
AutoAdvertise: resp.NetworkOverride.AutoAdvertise,
}
}
return status, nil
}
// TaskStats returns resource usage statistics for the task
func (d *driverPluginClient) TaskStats(taskID string) (*TaskStats, error) {
req := &proto.TaskStatsRequest{TaskId: taskID}
resp, err := d.client.TaskStats(context.Background(), req)
if err != nil {
return nil, err
}
stats, err := taskStatsFromProto(resp.Stats)
if err != nil {
return nil, err
}
return stats, nil
}
// TaskEvents returns a channel that will receive events from the driver about all
// tasks such as lifecycle events, terminal errors, etc.
func (d *driverPluginClient) TaskEvents(ctx context.Context) (<-chan *TaskEvent, error) {
req := &proto.TaskEventsRequest{}
stream, err := d.client.TaskEvents(ctx, req)
if err != nil {
return nil, err
}
ch := make(chan *TaskEvent)
go d.handleTaskEvents(ch, stream)
return ch, nil
}
func (d *driverPluginClient) handleTaskEvents(ch chan *TaskEvent, stream proto.Driver_TaskEventsClient) {
defer close(ch)
for {
ev, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
d.logger.Error("error receiving stream from TaskEvents driver RPC", "error", err)
ch <- &TaskEvent{Err: err}
break
}
timestamp, _ := ptypes.Timestamp(ev.Timestamp)
event := &TaskEvent{
TaskID: ev.TaskId,
Annotations: ev.Annotations,
Message: ev.Message,
Timestamp: timestamp,
}
ch <- event
}
}
// SignalTask will send the given signal to the specified task
func (d *driverPluginClient) SignalTask(taskID string, signal string) error {
req := &proto.SignalTaskRequest{
TaskId: taskID,
Signal: signal,
}
_, err := d.client.SignalTask(context.Background(), req)
return err
}
// ExecTask will run the given command within the execution context of the task.
// The driver will wait for the given timeout for the command to complete before
// terminating it. The stdout and stderr of the command will be return to the caller,
// along with other exit information such as exit code.
func (d *driverPluginClient) ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error) {
req := &proto.ExecTaskRequest{
TaskId: taskID,
Command: cmd,
Timeout: ptypes.DurationProto(timeout),
}
resp, err := d.client.ExecTask(context.Background(), req)
if err != nil {
return nil, err
}
result := &ExecTaskResult{
Stdout: resp.Stdout,
Stderr: resp.Stderr,
ExitResult: exitResultFromProto(resp.Result),
}
return result, nil
}

212
plugins/drivers/driver.go Normal file
View File

@@ -0,0 +1,212 @@
package drivers
import (
"fmt"
"path/filepath"
"sort"
"time"
"github.com/hashicorp/nomad/client/allocdir"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"golang.org/x/net/context"
)
// DriverPlugin is the interface with drivers will implement. It is also
// implemented by a plugin client which proxies the calls to go-plugin. See
// the proto/driver.proto file for detailed information about each RPC and
// message structure.
type DriverPlugin interface {
base.BasePlugin
TaskConfigSchema() (*hclspec.Spec, error)
Capabilities() (*Capabilities, error)
Fingerprint(context.Context) (<-chan *Fingerprint, error)
RecoverTask(*TaskHandle) error
StartTask(*TaskConfig) (*TaskHandle, error)
WaitTask(ctx context.Context, taskID string) (<-chan *ExitResult, error)
StopTask(taskID string, timeout time.Duration, signal string) error
DestroyTask(taskID string, force bool) error
InspectTask(taskID string) (*TaskStatus, error)
TaskStats(taskID string) (*TaskStats, error)
TaskEvents(context.Context) (<-chan *TaskEvent, error)
SignalTask(taskID string, signal string) error
ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error)
}
// DriverSignalTaskNotSupported can be embedded by drivers which don't support
// the SignalTask RPC. This satisfies the SignalTask func requirement for the
// DriverPlugin interface.
type DriverSignalTaskNotSupported struct{}
func (_ DriverSignalTaskNotSupported) SignalTask(taskID, signal string) error {
return fmt.Errorf("SignalTask is not supported by this driver")
}
// DriverExecTaskNotSupported can be embedded by drivers which don't support
// the ExecTask RPC. This satisfies the ExecTask func requirement of the
// DriverPlugin interface.
type DriverExecTaskNotSupported struct{}
func (_ DriverExecTaskNotSupported) ExecTask(taskID, signal string) error {
return fmt.Errorf("ExecTask is not supported by this driver")
}
type HealthState string
var (
HealthStateUndetected = HealthState("undetected")
HealthStateUnhealthy = HealthState("unhealthy")
HealthStateHealthy = HealthState("healthy")
)
type Fingerprint struct {
Attributes map[string]string
Health HealthState
HealthDescription string
// Err is set by the plugin if an error occured during fingerprinting
Err error
}
type FSIsolation string
var (
FSIsolationNone = FSIsolation("none")
FSIsolationChroot = FSIsolation("chroot")
FSIsolationImage = FSIsolation("image")
)
type Capabilities struct {
// SendSignals marks the driver as being able to send signals
SendSignals bool
// Exec marks the driver as being able to execute arbitrary commands
// such as health checks. Used by the ScriptExecutor interface.
Exec bool
//FSIsolation indicates what kind of filesystem isolation the driver supports.
FSIsolation FSIsolation
}
type TaskConfig struct {
ID string
Name string
Env map[string]string
Resources Resources
Devices []DeviceConfig
Mounts []MountConfig
User string
AllocDir string
rawDriverConfig []byte
}
func (tc *TaskConfig) EnvList() []string {
l := make([]string, 0, len(tc.Env))
for k, v := range tc.Env {
l = append(l, k+"="+v)
}
sort.Strings(l)
return l
}
func (tc *TaskConfig) TaskDir() *allocdir.TaskDir {
taskDir := filepath.Join(tc.AllocDir, tc.Name)
return &allocdir.TaskDir{
Dir: taskDir,
SharedAllocDir: filepath.Join(tc.AllocDir, allocdir.SharedAllocName),
LogDir: filepath.Join(tc.AllocDir, allocdir.SharedAllocName, allocdir.LogDirName),
SharedTaskDir: filepath.Join(taskDir, allocdir.SharedAllocName),
LocalDir: filepath.Join(taskDir, allocdir.TaskLocal),
SecretsDir: filepath.Join(taskDir, allocdir.TaskSecrets),
}
}
func (tc *TaskConfig) DecodeDriverConfig(t interface{}) error {
return base.MsgPackDecode(tc.rawDriverConfig, t)
}
func (tc *TaskConfig) EncodeDriverConfig(t interface{}) error {
return base.MsgPackEncode(&tc.rawDriverConfig, t)
}
type Resources struct {
CPUPeriod int64
CPUQuota int64
CPUShares int64
MemoryLimitBytes int64
OOMScoreAdj int64
CpusetCPUs string
CpusetMems string
}
type DeviceConfig struct {
TaskPath string
HostPath string
Permissions string
}
type MountConfig struct {
TaskPath string
HostPath string
Readonly bool
}
const (
TaskStateUnknown TaskState = "unknown"
TaskStateRunning TaskState = "running"
TaskStateExited TaskState = "exited"
)
type TaskState string
type NetworkOverride struct {
PortMap map[string]int32
Addr string
AutoAdvertise bool
}
type ExitResult struct {
ExitCode int
Signal int
OOMKilled bool
Err error
}
type TaskStatus struct {
ID string
Name string
State TaskState
StartedAt time.Time
CompletedAt time.Time
ExitResult *ExitResult
DriverAttributes map[string]string
NetworkOverride *NetworkOverride
}
type TaskStats struct {
ID string
Timestamp int64
AggResourceUsage *cstructs.ResourceUsage
ResourceUsageByPid map[string]*cstructs.ResourceUsage
}
type TaskEvent struct {
TaskID string
Timestamp time.Time
Message string
Annotations map[string]string
// Err is only used if an error occured while consuming the RPC stream
Err error
}
type ExecTaskResult struct {
Stdout []byte
Stderr []byte
ExitResult *ExitResult
}

View File

@@ -0,0 +1,5 @@
package drivers
import "fmt"
var ErrTaskNotFound = fmt.Errorf("task not found for given id")

41
plugins/drivers/plugin.go Normal file
View File

@@ -0,0 +1,41 @@
package drivers
import (
"context"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/drivers/proto"
"google.golang.org/grpc"
)
// PluginDriver wraps a DriverPlugin and implements go-plugins GRPCPlugin
// interface to expose the the interface over gRPC
type PluginDriver struct {
plugin.NetRPCUnsupportedPlugin
impl DriverPlugin
logger hclog.Logger
}
func NewDriverPlugin(d DriverPlugin, logger hclog.Logger) plugin.GRPCPlugin {
return &PluginDriver{
impl: d,
logger: logger.Named("driver_plugin"),
}
}
func (p *PluginDriver) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
proto.RegisterDriverServer(s, &driverPluginServer{
impl: p.impl,
broker: broker,
logger: p.logger,
})
return nil
}
func (p *PluginDriver) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
return &driverPluginClient{
client: proto.NewDriverClient(c),
logger: p.logger,
}, nil
}

View File

@@ -0,0 +1,242 @@
package drivers
import (
"bytes"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"github.com/ugorji/go/codec"
"golang.org/x/net/context"
)
type testDriverState struct {
Pid int
Log string
}
func TestBaseDriver_Fingerprint(t *testing.T) {
t.Parallel()
require := require.New(t)
fingerprints := []*Fingerprint{
{
Attributes: map[string]string{"foo": "bar"},
Health: HealthStateUnhealthy,
HealthDescription: "starting up",
},
{
Attributes: map[string]string{"foo": "bar"},
Health: HealthStateHealthy,
HealthDescription: "running",
},
}
var complete bool
impl := &MockDriver{
FingerprintF: func(ctx context.Context) (<-chan *Fingerprint, error) {
ch := make(chan *Fingerprint)
go func() {
defer close(ch)
ch <- fingerprints[0]
time.Sleep(500 * time.Millisecond)
ch <- fingerprints[1]
complete = true
}()
return ch, nil
},
}
harness := NewDriverHarness(t, impl)
defer harness.Kill()
ch, err := harness.Fingerprint(context.Background())
require.NoError(err)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case f := <-ch:
require.Exactly(f, fingerprints[0])
case <-time.After(1 * time.Second):
require.Fail("did not receive fingerprint[0]")
}
select {
case f := <-ch:
require.Exactly(f, fingerprints[1])
case <-time.After(1 * time.Second):
require.Fail("did not receive fingerprint[1]")
}
}()
require.False(complete)
wg.Wait()
require.True(complete)
}
func TestBaseDriver_RecoverTask(t *testing.T) {
t.Parallel()
require := require.New(t)
// build driver state and encode it into proto msg
state := testDriverState{Pid: 1, Log: "foo"}
var buf bytes.Buffer
enc := codec.NewEncoder(&buf, structs.MsgpackHandle)
enc.Encode(state)
// mock the RecoverTask driver call
impl := &MockDriver{
RecoverTaskF: func(h *TaskHandle) error {
var actual testDriverState
require.NoError(h.GetDriverState(&actual))
require.Equal(state, actual)
return nil
},
}
harness := NewDriverHarness(t, impl)
defer harness.Kill()
handle := &TaskHandle{
driverState: buf.Bytes(),
}
err := harness.RecoverTask(handle)
require.NoError(err)
}
func TestBaseDriver_StartTask(t *testing.T) {
t.Parallel()
require := require.New(t)
cfg := &TaskConfig{
ID: "foo",
}
state := &testDriverState{Pid: 1, Log: "log"}
var handle *TaskHandle
impl := &MockDriver{
StartTaskF: func(c *TaskConfig) (*TaskHandle, error) {
handle = NewTaskHandle("test")
handle.Config = c
handle.State = TaskStateRunning
handle.SetDriverState(state)
return handle, nil
},
}
harness := NewDriverHarness(t, impl)
defer harness.Kill()
resp, err := harness.StartTask(cfg)
require.NoError(err)
require.Equal(cfg.ID, resp.Config.ID)
require.Equal(handle.State, resp.State)
var actualState testDriverState
require.NoError(resp.GetDriverState(&actualState))
require.Equal(*state, actualState)
}
func TestBaseDriver_WaitTask(t *testing.T) {
t.Parallel()
require := require.New(t)
result := &ExitResult{ExitCode: 1, Signal: 9}
signalTask := make(chan struct{})
impl := &MockDriver{
WaitTaskF: func(_ context.Context, id string) (<-chan *ExitResult, error) {
ch := make(chan *ExitResult)
go func() {
<-signalTask
ch <- result
}()
return ch, nil
},
}
harness := NewDriverHarness(t, impl)
defer harness.Kill()
var wg sync.WaitGroup
wg.Add(1)
var finished bool
go func() {
defer wg.Done()
ch, err := harness.WaitTask(context.TODO(), "foo")
require.NoError(err)
actualResult := <-ch
finished = true
require.Exactly(result, actualResult)
}()
require.False(finished)
close(signalTask)
wg.Wait()
require.True(finished)
}
func TestBaseDriver_TaskEvents(t *testing.T) {
t.Parallel()
require := require.New(t)
now := time.Now().UTC().Truncate(time.Millisecond)
events := []*TaskEvent{
{
TaskID: "abc",
Timestamp: now,
Annotations: map[string]string{"foo": "bar"},
Message: "starting",
},
{
TaskID: "xyz",
Timestamp: now.Add(2 * time.Second),
Annotations: map[string]string{"foo": "bar"},
Message: "starting",
},
{
TaskID: "xyz",
Timestamp: now.Add(3 * time.Second),
Annotations: map[string]string{"foo": "bar"},
Message: "running",
},
{
TaskID: "abc",
Timestamp: now.Add(4 * time.Second),
Annotations: map[string]string{"foo": "bar"},
Message: "running",
},
}
impl := &MockDriver{
TaskEventsF: func(ctx context.Context) (<-chan *TaskEvent, error) {
ch := make(chan *TaskEvent)
go func() {
defer close(ch)
for _, event := range events {
ch <- event
}
}()
return ch, nil
},
}
harness := NewDriverHarness(t, impl)
defer harness.Kill()
ch, err := harness.TaskEvents(context.Background())
require.NoError(err)
for _, event := range events {
select {
case actual := <-ch:
require.Exactly(actual, event)
case <-time.After(500 * time.Millisecond):
require.Fail("failed to receive event")
}
}
}

View File

@@ -1,5 +1,5 @@
syntax = "proto3";
package hashicorp.nomad.plugins.drivers.base.proto;
package hashicorp.nomad.plugins.drivers.proto;
option go_package = "proto";
import "google/protobuf/duration.proto";
@@ -48,13 +48,9 @@ service Driver {
// DestroyTask removes the task from the driver's internal state and cleans
// up any additional resources created by the driver. It cannot be called
// on a running task.
// on a running task, unless force is set to true.
rpc DestroyTask(DestroyTaskRequest) returns (DestroyTaskResponse) {}
// ListTasks returns a list of summary information of all the tasks the
// driver is tracking.
rpc ListTasks(ListTasksRequest) returns (ListTasksResponse) {}
// InspectTask returns detailed information for the given task
rpc InspectTask(InspectTaskRequest) returns (InspectTaskResponse) {}
@@ -205,18 +201,13 @@ message DestroyTaskRequest {
// TaskId is the ID of the target task
string task_id = 1;
// Force destroys the task even if it is still in a running state
bool force = 2;
}
message DestroyTaskResponse {}
message ListTasksRequest {}
message ListTasksResponse {
// Tasks includes a list of summary information for each task
repeated TaskStatus tasks = 1;
}
message InspectTaskRequest {
// TaskId is the ID of the target task
@@ -471,19 +462,15 @@ message TaskStatus {
// State is the state of the task's execution
TaskState state = 3;
// SizeOnDiskMb is the disk space the driver reports the task is consuming
// in megabytes.
int64 size_on_disk_mb = 4;
// StartedAt is the timestamp when the task was started
google.protobuf.Timestamp started_at = 5;
google.protobuf.Timestamp started_at = 4;
// CompletedAt is the timestamp when the task exited.
// If the task is still running, CompletedAt will not be set
google.protobuf.Timestamp completed_at = 6;
google.protobuf.Timestamp completed_at = 5;
// Result is set when CompletedAt is set.
ExitResult result = 7;
ExitResult result = 6;
}
message TaskDriverStatus {
@@ -548,7 +535,7 @@ message MemoryUsage {
enum Fields {
RSS = 0;
CACHE = 1;
MAX_UASGE = 2;
MAX_USAGE = 2;
KERNEL_USAGE = 3;
KERNEL_MAX_USAGE = 4;
}

254
plugins/drivers/server.go Normal file
View File

@@ -0,0 +1,254 @@
package drivers
import (
"io"
"golang.org/x/net/context"
"github.com/golang/protobuf/ptypes"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/plugins/drivers/proto"
)
type driverPluginServer struct {
broker *plugin.GRPCBroker
impl DriverPlugin
logger hclog.Logger
}
func (b *driverPluginServer) TaskConfigSchema(ctx context.Context, req *proto.TaskConfigSchemaRequest) (*proto.TaskConfigSchemaResponse, error) {
spec, err := b.impl.TaskConfigSchema()
if err != nil {
return nil, err
}
resp := &proto.TaskConfigSchemaResponse{
Spec: spec,
}
return resp, nil
}
func (b *driverPluginServer) Capabilities(ctx context.Context, req *proto.CapabilitiesRequest) (*proto.CapabilitiesResponse, error) {
caps, err := b.impl.Capabilities()
if err != nil {
return nil, err
}
resp := &proto.CapabilitiesResponse{
Capabilities: &proto.DriverCapabilities{
SendSignals: caps.SendSignals,
Exec: caps.Exec,
},
}
switch caps.FSIsolation {
case FSIsolationNone:
resp.Capabilities.FsIsolation = proto.DriverCapabilities_NONE
case FSIsolationChroot:
resp.Capabilities.FsIsolation = proto.DriverCapabilities_CHROOT
case FSIsolationImage:
resp.Capabilities.FsIsolation = proto.DriverCapabilities_IMAGE
}
return resp, nil
}
func (b *driverPluginServer) Fingerprint(req *proto.FingerprintRequest, srv proto.Driver_FingerprintServer) error {
ctx := srv.Context()
ch, err := b.impl.Fingerprint(ctx)
if err != nil {
return err
}
for {
select {
case <-ctx.Done():
return nil
case f, ok := <-ch:
if !ok {
return nil
}
resp := &proto.FingerprintResponse{
Attributes: f.Attributes,
Health: healthStateToProto(f.Health),
HealthDescription: f.HealthDescription,
}
if err := srv.Send(resp); err != nil {
return err
}
}
}
}
func (b *driverPluginServer) RecoverTask(ctx context.Context, req *proto.RecoverTaskRequest) (*proto.RecoverTaskResponse, error) {
err := b.impl.RecoverTask(taskHandleFromProto(req.Handle))
if err != nil {
return nil, err
}
return &proto.RecoverTaskResponse{}, nil
}
func (b *driverPluginServer) StartTask(ctx context.Context, req *proto.StartTaskRequest) (*proto.StartTaskResponse, error) {
handle, err := b.impl.StartTask(taskConfigFromProto(req.Task))
if err != nil {
return nil, err
}
resp := &proto.StartTaskResponse{
Handle: taskHandleToProto(handle),
}
return resp, nil
}
func (b *driverPluginServer) WaitTask(ctx context.Context, req *proto.WaitTaskRequest) (*proto.WaitTaskResponse, error) {
ch, err := b.impl.WaitTask(ctx, req.TaskId)
if err != nil {
return nil, err
}
result := <-ch
var errStr string
if result.Err != nil {
errStr = result.Err.Error()
}
resp := &proto.WaitTaskResponse{
Err: errStr,
Result: &proto.ExitResult{
ExitCode: int32(result.ExitCode),
Signal: int32(result.Signal),
OomKilled: result.OOMKilled,
},
}
return resp, nil
}
func (b *driverPluginServer) StopTask(ctx context.Context, req *proto.StopTaskRequest) (*proto.StopTaskResponse, error) {
timeout, err := ptypes.Duration(req.Timeout)
if err != nil {
return nil, err
}
err = b.impl.StopTask(req.TaskId, timeout, req.Signal)
if err != nil {
return nil, err
}
return &proto.StopTaskResponse{}, nil
}
func (b *driverPluginServer) DestroyTask(ctx context.Context, req *proto.DestroyTaskRequest) (*proto.DestroyTaskResponse, error) {
err := b.impl.DestroyTask(req.TaskId, req.Force)
if err != nil {
return nil, err
}
return &proto.DestroyTaskResponse{}, nil
}
func (b *driverPluginServer) InspectTask(ctx context.Context, req *proto.InspectTaskRequest) (*proto.InspectTaskResponse, error) {
status, err := b.impl.InspectTask(req.TaskId)
if err != nil {
return nil, err
}
protoStatus, err := taskStatusToProto(status)
if err != nil {
return nil, err
}
resp := &proto.InspectTaskResponse{
Task: protoStatus,
Driver: &proto.TaskDriverStatus{
Attributes: status.DriverAttributes,
},
NetworkOverride: &proto.NetworkOverride{
PortMap: status.NetworkOverride.PortMap,
Addr: status.NetworkOverride.Addr,
AutoAdvertise: status.NetworkOverride.AutoAdvertise,
},
}
return resp, nil
}
func (b *driverPluginServer) TaskStats(ctx context.Context, req *proto.TaskStatsRequest) (*proto.TaskStatsResponse, error) {
stats, err := b.impl.TaskStats(req.TaskId)
if err != nil {
return nil, err
}
pb, err := taskStatsToProto(stats)
if err != nil {
return nil, err
}
resp := &proto.TaskStatsResponse{
Stats: pb,
}
return resp, nil
}
func (b *driverPluginServer) ExecTask(ctx context.Context, req *proto.ExecTaskRequest) (*proto.ExecTaskResponse, error) {
timeout, err := ptypes.Duration(req.Timeout)
if err != nil {
return nil, err
}
result, err := b.impl.ExecTask(req.TaskId, req.Command, timeout)
if err != nil {
return nil, err
}
resp := &proto.ExecTaskResponse{
Stdout: result.Stdout,
Stderr: result.Stderr,
Result: exitResultToProto(result.ExitResult),
}
return resp, nil
}
func (b *driverPluginServer) SignalTask(ctx context.Context, req *proto.SignalTaskRequest) (*proto.SignalTaskResponse, error) {
err := b.impl.SignalTask(req.TaskId, req.Signal)
if err != nil {
return nil, err
}
resp := &proto.SignalTaskResponse{}
return resp, nil
}
func (b *driverPluginServer) TaskEvents(req *proto.TaskEventsRequest, srv proto.Driver_TaskEventsServer) error {
ch, err := b.impl.TaskEvents(srv.Context())
if err != nil {
return err
}
for {
event := <-ch
if event == nil {
break
}
pbTimestamp, err := ptypes.TimestampProto(event.Timestamp)
if err != nil {
return err
}
pbEvent := &proto.DriverTaskEvent{
TaskId: event.TaskID,
Timestamp: pbTimestamp,
Message: event.Message,
Annotations: event.Annotations,
}
if err = srv.Send(pbEvent); err == io.EOF {
break
} else if err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,30 @@
package drivers
import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/ugorji/go/codec"
)
// TaskHandle is the state shared between a driver and the client.
// It is returned to the client after starting the task and used
// for recovery of tasks during a driver restart.
type TaskHandle struct {
Driver string
Config *TaskConfig
State TaskState
driverState []byte
}
func NewTaskHandle(driver string) *TaskHandle {
return &TaskHandle{Driver: driver}
}
func (h *TaskHandle) SetDriverState(v interface{}) error {
h.driverState = []byte{}
return codec.NewEncoderBytes(&h.driverState, structs.MsgpackHandle).Encode(v)
}
func (h *TaskHandle) GetDriverState(v interface{}) error {
return codec.NewDecoderBytes(h.driverState, structs.MsgpackHandle).Decode(v)
}

112
plugins/drivers/testing.go Normal file
View File

@@ -0,0 +1,112 @@
package drivers
import (
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
type DriverHarness struct {
DriverPlugin
client *plugin.GRPCClient
server *plugin.GRPCServer
t testing.T
}
func NewDriverHarness(t testing.T, d DriverPlugin) *DriverHarness {
client, server := plugin.TestPluginGRPCConn(t, map[string]plugin.Plugin{
base.PluginTypeDriver: &PluginDriver{
impl: d,
logger: testlog.HCLogger(t),
},
})
raw, err := client.Dispense(base.PluginTypeDriver)
if err != nil {
t.Fatalf("err dispensing plugin: %v", err)
}
dClient := raw.(DriverPlugin)
h := &DriverHarness{
client: client,
server: server,
DriverPlugin: dClient,
}
return h
}
func (h *DriverHarness) Kill() {
h.client.Close()
h.server.Stop()
}
// MkAllocDir creates a tempory directory and allocdir structure.
// A cleanup func is returned and should be defered so as to not leak dirs
// between tests.
func (h *DriverHarness) MkAllocDir(t *TaskConfig) func() {
allocDir, err := ioutil.TempDir("", "nomad_driver_harness-")
require.NoError(h.t, err)
require.NoError(h.t, os.Mkdir(filepath.Join(allocDir, t.Name), os.ModePerm))
require.NoError(h.t, os.MkdirAll(filepath.Join(allocDir, "alloc/logs"), os.ModePerm))
t.AllocDir = allocDir
return func() { os.RemoveAll(allocDir) }
}
// MockDriver is used for testing.
// Each function can be set as a closure to make assertions about how data
// is passed through the base plugin layer.
type MockDriver struct {
base.MockPlugin
TaskConfigSchemaF func() (*hclspec.Spec, error)
FingerprintF func(context.Context) (<-chan *Fingerprint, error)
CapabilitiesF func() (*Capabilities, error)
RecoverTaskF func(*TaskHandle) error
StartTaskF func(*TaskConfig) (*TaskHandle, error)
WaitTaskF func(context.Context, string) (<-chan *ExitResult, error)
StopTaskF func(string, time.Duration, string) error
DestroyTaskF func(string, bool) error
InspectTaskF func(string) (*TaskStatus, error)
TaskStatsF func(string) (*TaskStats, error)
TaskEventsF func(context.Context) (<-chan *TaskEvent, error)
SignalTaskF func(string, string) error
ExecTaskF func(string, []string, time.Duration) (*ExecTaskResult, error)
}
func (d *MockDriver) TaskConfigSchema() (*hclspec.Spec, error) { return d.TaskConfigSchemaF() }
func (d *MockDriver) Fingerprint(ctx context.Context) (<-chan *Fingerprint, error) {
return d.FingerprintF(ctx)
}
func (d *MockDriver) Capabilities() (*Capabilities, error) { return d.CapabilitiesF() }
func (d *MockDriver) RecoverTask(h *TaskHandle) error { return d.RecoverTaskF(h) }
func (d *MockDriver) StartTask(c *TaskConfig) (*TaskHandle, error) { return d.StartTaskF(c) }
func (d *MockDriver) WaitTask(ctx context.Context, id string) (<-chan *ExitResult, error) {
return d.WaitTaskF(ctx, id)
}
func (d *MockDriver) StopTask(taskID string, timeout time.Duration, signal string) error {
return d.StopTaskF(taskID, timeout, signal)
}
func (d *MockDriver) DestroyTask(taskID string, force bool) error {
return d.DestroyTaskF(taskID, force)
}
func (d *MockDriver) InspectTask(taskID string) (*TaskStatus, error) { return d.InspectTaskF(taskID) }
func (d *MockDriver) TaskStats(taskID string) (*TaskStats, error) { return d.TaskStats(taskID) }
func (d *MockDriver) TaskEvents(ctx context.Context) (<-chan *TaskEvent, error) {
return d.TaskEventsF(ctx)
}
func (d *MockDriver) SignalTask(taskID string, signal string) error {
return d.SignalTask(taskID, signal)
}
func (d *MockDriver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*ExecTaskResult, error) {
return d.ExecTaskF(taskID, cmd, timeout)
}

View File

@@ -0,0 +1,24 @@
package drivers
import (
"testing"
"github.com/stretchr/testify/require"
)
var _ DriverPlugin = (*MockDriver)(nil)
// Very simple test to ensure the test harness works as expected
func TestDriverHarness(t *testing.T) {
handle := &TaskHandle{Config: &TaskConfig{Name: "mock"}}
d := &MockDriver{
StartTaskF: func(task *TaskConfig) (*TaskHandle, error) {
return handle, nil
},
}
harness := NewDriverHarness(t, d)
defer harness.Kill()
actual, err := harness.StartTask(&TaskConfig{})
require.NoError(t, err)
require.Equal(t, handle.Config.Name, actual.Config.Name)
}

296
plugins/drivers/utils.go Normal file
View File

@@ -0,0 +1,296 @@
package drivers
import (
"strings"
"time"
"github.com/golang/protobuf/ptypes"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/drivers/proto"
)
var protoTaskStateMap = map[TaskState]proto.TaskState{
TaskStateUnknown: proto.TaskState_UNKNOWN,
TaskStateRunning: proto.TaskState_RUNNING,
TaskStateExited: proto.TaskState_EXITED,
}
func healthStateToProto(health HealthState) proto.FingerprintResponse_HealthState {
switch health {
case HealthStateUndetected:
return proto.FingerprintResponse_UNDETECTED
case HealthStateUnhealthy:
return proto.FingerprintResponse_UNHEALTHY
case HealthStateHealthy:
return proto.FingerprintResponse_HEALTHY
}
return proto.FingerprintResponse_UNDETECTED
}
func healthStateFromProto(pb proto.FingerprintResponse_HealthState) HealthState {
switch pb {
case proto.FingerprintResponse_UNDETECTED:
return HealthStateUndetected
case proto.FingerprintResponse_UNHEALTHY:
return HealthStateUnhealthy
case proto.FingerprintResponse_HEALTHY:
return HealthStateHealthy
}
return HealthStateUndetected
}
func taskConfigFromProto(pb *proto.TaskConfig) *TaskConfig {
if pb == nil {
return &TaskConfig{}
}
return &TaskConfig{
ID: pb.Id,
Name: pb.Name,
Env: pb.Env,
rawDriverConfig: pb.MsgpackDriverConfig,
Resources: Resources{}, //TODO
Devices: []DeviceConfig{}, //TODO
Mounts: []MountConfig{}, //TODO
User: pb.User,
AllocDir: pb.AllocDir,
}
}
func taskConfigToProto(cfg *TaskConfig) *proto.TaskConfig {
if cfg == nil {
return &proto.TaskConfig{}
}
pb := &proto.TaskConfig{
Id: cfg.ID,
Name: cfg.Name,
Env: cfg.Env,
Resources: &proto.Resources{},
Mounts: []*proto.Mount{},
Devices: []*proto.Device{},
User: cfg.User,
AllocDir: cfg.AllocDir,
MsgpackDriverConfig: cfg.rawDriverConfig,
}
return pb
}
func taskHandleFromProto(pb *proto.TaskHandle) *TaskHandle {
if pb == nil {
return &TaskHandle{}
}
return &TaskHandle{
Config: taskConfigFromProto(pb.Config),
State: TaskState(strings.ToLower(pb.State.String())),
driverState: pb.DriverState,
}
}
func taskHandleToProto(handle *TaskHandle) *proto.TaskHandle {
return &proto.TaskHandle{
Config: taskConfigToProto(handle.Config),
State: protoTaskStateMap[handle.State],
DriverState: handle.driverState,
}
}
func exitResultToProto(result *ExitResult) *proto.ExitResult {
return &proto.ExitResult{
ExitCode: int32(result.ExitCode),
Signal: int32(result.Signal),
OomKilled: result.OOMKilled,
}
}
func exitResultFromProto(pb *proto.ExitResult) *ExitResult {
return &ExitResult{
ExitCode: int(pb.ExitCode),
Signal: int(pb.Signal),
OOMKilled: pb.OomKilled,
}
}
func taskStatusToProto(status *TaskStatus) (*proto.TaskStatus, error) {
started, err := ptypes.TimestampProto(status.StartedAt)
if err != nil {
return nil, err
}
completed, err := ptypes.TimestampProto(status.CompletedAt)
if err != nil {
return nil, err
}
return &proto.TaskStatus{
Id: status.ID,
Name: status.Name,
StartedAt: started,
CompletedAt: completed,
Result: exitResultToProto(status.ExitResult),
}, nil
}
func taskStatusFromProto(pb *proto.TaskStatus) (*TaskStatus, error) {
started, err := ptypes.Timestamp(pb.StartedAt)
if err != nil {
return nil, err
}
completed, err := ptypes.Timestamp(pb.CompletedAt)
if err != nil {
return nil, err
}
return &TaskStatus{
ID: pb.Id,
Name: pb.Name,
StartedAt: started,
CompletedAt: completed,
ExitResult: exitResultFromProto(pb.Result),
}, nil
}
func taskStatsToProto(stats *TaskStats) (*proto.TaskStats, error) {
timestamp, err := ptypes.TimestampProto(time.Unix(stats.Timestamp, 0))
if err != nil {
return nil, err
}
pids := map[string]*proto.TaskResourceUsage{}
for pid, ru := range stats.ResourceUsageByPid {
pids[pid] = resourceUsageToProto(ru)
}
return &proto.TaskStats{
Id: stats.ID,
Timestamp: timestamp,
AggResourceUsage: resourceUsageToProto(stats.AggResourceUsage),
ResourceUsageByPid: pids,
}, nil
}
func taskStatsFromProto(pb *proto.TaskStats) (*TaskStats, error) {
timestamp, err := ptypes.Timestamp(pb.Timestamp)
if err != nil {
return nil, err
}
pids := map[string]*cstructs.ResourceUsage{}
for pid, ru := range pb.ResourceUsageByPid {
pids[pid] = resourceUsageFromProto(ru)
}
stats := &TaskStats{
ID: pb.Id,
Timestamp: timestamp.Unix(),
AggResourceUsage: resourceUsageFromProto(pb.AggResourceUsage),
ResourceUsageByPid: pids,
}
return stats, nil
}
func resourceUsageToProto(ru *cstructs.ResourceUsage) *proto.TaskResourceUsage {
cpu := &proto.CPUUsage{}
for _, field := range ru.CpuStats.Measured {
switch field {
case "System Mode":
cpu.SystemMode = ru.CpuStats.SystemMode
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_SYSTEM_MODE)
case "User Mode":
cpu.UserMode = ru.CpuStats.UserMode
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_USER_MODE)
case "Total Ticks":
cpu.TotalTicks = ru.CpuStats.TotalTicks
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_TOTAL_TICKS)
case "Throttled Periods":
cpu.ThrottledPeriods = ru.CpuStats.ThrottledPeriods
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_THROTTLED_PERIODS)
case "Throttled Time":
cpu.ThrottledTime = ru.CpuStats.ThrottledTime
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_THROTTLED_TIME)
case "Percent":
cpu.Percent = ru.CpuStats.Percent
cpu.MeasuredFields = append(cpu.MeasuredFields, proto.CPUUsage_PERCENT)
}
}
memory := &proto.MemoryUsage{}
for _, field := range ru.MemoryStats.Measured {
switch field {
case "RSS":
memory.Rss = ru.MemoryStats.RSS
memory.MeasuredFields = append(memory.MeasuredFields, proto.MemoryUsage_RSS)
case "Cache":
memory.Cache = ru.MemoryStats.Cache
memory.MeasuredFields = append(memory.MeasuredFields, proto.MemoryUsage_CACHE)
case "Max Usage":
memory.MaxUsage = ru.MemoryStats.MaxUsage
memory.MeasuredFields = append(memory.MeasuredFields, proto.MemoryUsage_MAX_USAGE)
case "Kernel Usage":
memory.KernelUsage = ru.MemoryStats.KernelUsage
memory.MeasuredFields = append(memory.MeasuredFields, proto.MemoryUsage_KERNEL_USAGE)
case "Kernel Max Usage":
memory.KernelMaxUsage = ru.MemoryStats.KernelMaxUsage
memory.MeasuredFields = append(memory.MeasuredFields, proto.MemoryUsage_KERNEL_MAX_USAGE)
}
}
return &proto.TaskResourceUsage{
Cpu: cpu,
Memory: memory,
}
}
func resourceUsageFromProto(pb *proto.TaskResourceUsage) *cstructs.ResourceUsage {
cpu := cstructs.CpuStats{}
if pb.Cpu != nil {
for _, field := range pb.Cpu.MeasuredFields {
switch field {
case proto.CPUUsage_SYSTEM_MODE:
cpu.SystemMode = pb.Cpu.SystemMode
cpu.Measured = append(cpu.Measured, "System Mode")
case proto.CPUUsage_USER_MODE:
cpu.UserMode = pb.Cpu.UserMode
cpu.Measured = append(cpu.Measured, "User Mode")
case proto.CPUUsage_TOTAL_TICKS:
cpu.TotalTicks = pb.Cpu.TotalTicks
cpu.Measured = append(cpu.Measured, "Total Ticks")
case proto.CPUUsage_THROTTLED_PERIODS:
cpu.ThrottledPeriods = pb.Cpu.ThrottledPeriods
cpu.Measured = append(cpu.Measured, "Throttled Periods")
case proto.CPUUsage_THROTTLED_TIME:
cpu.ThrottledTime = pb.Cpu.ThrottledTime
cpu.Measured = append(cpu.Measured, "Throttled Time")
case proto.CPUUsage_PERCENT:
cpu.Percent = pb.Cpu.Percent
cpu.Measured = append(cpu.Measured, "Percent")
}
}
}
memory := cstructs.MemoryStats{}
if pb.Memory != nil {
for _, field := range pb.Memory.MeasuredFields {
switch field {
case proto.MemoryUsage_RSS:
memory.RSS = pb.Memory.Rss
memory.Measured = append(memory.Measured, "RSS")
case proto.MemoryUsage_CACHE:
memory.Cache = pb.Memory.Cache
memory.Measured = append(memory.Measured, "Cache")
case proto.MemoryUsage_MAX_USAGE:
memory.MaxUsage = pb.Memory.MaxUsage
memory.Measured = append(memory.Measured, "Max Usage")
case proto.MemoryUsage_KERNEL_USAGE:
memory.KernelUsage = pb.Memory.KernelUsage
memory.Measured = append(memory.Measured, "Kernel Usage")
case proto.MemoryUsage_KERNEL_MAX_USAGE:
memory.KernelMaxUsage = pb.Memory.KernelMaxUsage
memory.Measured = append(memory.Measured, "Kernel Max Usage")
}
}
}
return &cstructs.ResourceUsage{
CpuStats: &cpu,
MemoryStats: &memory,
}
}

View File

@@ -0,0 +1,99 @@
package utils
import (
"encoding/json"
"fmt"
"io"
"os"
"os/exec"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/driver/executor"
dstructs "github.com/hashicorp/nomad/client/driver/structs"
"github.com/hashicorp/nomad/helper/discover"
"github.com/hashicorp/nomad/nomad/structs"
)
// CgroupsMounted returns true if the cgroups are mounted on a system otherwise
// returns false
func CgroupsMounted(node *structs.Node) bool {
_, ok := node.Attributes["unique.cgroup.mountpoint"]
return ok
}
// CreateExecutor launches an executor plugin and returns an instance of the
// Executor interface
func CreateExecutor(w io.Writer, level hclog.Level, CMinPort, CMaxPort uint,
executorConfig *dstructs.ExecutorConfig) (executor.Executor, *plugin.Client, error) {
c, err := json.Marshal(executorConfig)
if err != nil {
return nil, nil, fmt.Errorf("unable to create executor config: %v", err)
}
bin, err := discover.NomadExecutable()
if err != nil {
return nil, nil, fmt.Errorf("unable to find the nomad binary: %v", err)
}
config := &plugin.ClientConfig{
Cmd: exec.Command(bin, "executor", string(c)),
}
config.HandshakeConfig = driver.HandshakeConfig
config.Plugins = driver.GetPluginMap(w, level, executorConfig.FSIsolation)
config.MaxPort = CMaxPort
config.MinPort = CMinPort
// setting the setsid of the plugin process so that it doesn't get signals sent to
// the nomad client.
if config.Cmd != nil {
isolateCommand(config.Cmd)
}
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin := raw.(executor.Executor)
return executorPlugin, executorClient, nil
}
func CreateExecutorWithConfig(config *plugin.ClientConfig, w io.Writer) (executor.Executor, *plugin.Client, error) {
config.HandshakeConfig = driver.HandshakeConfig
// Setting this to DEBUG since the log level at the executor server process
// is already set, and this effects only the executor client.
config.Plugins = driver.GetPluginMap(w, hclog.Debug, false)
executorClient := plugin.NewClient(config)
rpcClient, err := executorClient.Client()
if err != nil {
return nil, nil, fmt.Errorf("error creating rpc client for executor plugin: %v", err)
}
raw, err := rpcClient.Dispense("executor")
if err != nil {
return nil, nil, fmt.Errorf("unable to dispense the executor plugin: %v", err)
}
executorPlugin, ok := raw.(*driver.ExecutorRPC)
if !ok {
return nil, nil, fmt.Errorf("unexpected executor rpc type: %T", raw)
}
return executorPlugin, executorClient, nil
}
// KillProcess kills a process with the given pid
func KillProcess(pid int) error {
proc, err := os.FindProcess(pid)
if err != nil {
return err
}
return proc.Kill()
}

View File

@@ -0,0 +1,18 @@
// +build darwin dragonfly freebsd linux netbsd openbsd solaris
package utils
import (
"os/exec"
"syscall"
)
// isolateCommand sets the setsid flag in exec.Cmd to true so that the process
// becomes the process leader in a new session and doesn't receive signals that
// are sent to the parent process.
func isolateCommand(cmd *exec.Cmd) {
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.Setsid = true
}

View File

@@ -0,0 +1,8 @@
package utils
import (
"os/exec"
)
// TODO Figure out if this is needed in Windows
func isolateCommand(cmd *exec.Cmd) {}