docker: started work on porting docker driver to new plugin framework

This commit is contained in:
Nick Ethier
2018-11-06 00:39:48 -05:00
parent 9d8d3e158a
commit 98b295d617
18 changed files with 5940 additions and 1 deletions

View File

@@ -650,12 +650,16 @@ func (tr *TaskRunner) persistLocalState() error {
// buildTaskConfig builds a drivers.TaskConfig with an unique ID for the task.
// The ID is consistently built from the alloc ID, task name and restart attempt.
func (tr *TaskRunner) buildTaskConfig() *drivers.TaskConfig {
return &drivers.TaskConfig{
ID: fmt.Sprintf("%s/%s/%d", tr.allocID, tr.taskName, tr.restartTracker.GetCount()),
Name: tr.task.Name,
Resources: &drivers.Resources{
NomadResources: tr.task.Resources,
//TODO Calculate the LinuxResources
LinuxResources: &drivers.LinuxResources{
MemoryLimitBytes: int64(tr.Task().Resources.MemoryMB) * 1024 * 1024,
CPUShares: int64(tr.Task().Resources.CPU),
},
},
Env: tr.envBuilder.Build().Map(),
User: tr.task.User,

View File

@@ -0,0 +1,416 @@
package docker
import (
"context"
"fmt"
"regexp"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// createCoordinator allows us to only create a single coordinator
createCoordinator sync.Once
// globalCoordinator is the shared coordinator and should only be retrieved
// using the GetDockerCoordinator() method.
globalCoordinator *dockerCoordinator
// imageNotFoundMatcher is a regex expression that matches the image not
// found error Docker returns.
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
)
// pullFuture is a sharable future for retrieving a pulled images ID and any
// error that may have occurred during the pull.
type pullFuture struct {
waitCh chan struct{}
err error
imageID string
}
// newPullFuture returns a new pull future
func newPullFuture() *pullFuture {
return &pullFuture{
waitCh: make(chan struct{}),
}
}
// wait waits till the future has a result
func (p *pullFuture) wait() *pullFuture {
<-p.waitCh
return p
}
// result returns the results of the future and should only ever be called after
// wait returns.
func (p *pullFuture) result() (imageID string, err error) {
return p.imageID, p.err
}
// set is used to set the results and unblock any waiter. This may only be
// called once.
func (p *pullFuture) set(imageID string, err error) {
p.imageID = imageID
p.err = err
close(p.waitCh)
}
// DockerImageClient provides the methods required to do CRUD operations on the
// Docker images
type DockerImageClient interface {
PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error
InspectImage(id string) (*docker.Image, error)
RemoveImage(id string) error
}
// LogEventFn is a callback which allows Drivers to emit task events.
type LogEventFn func(message string, annotations map[string]string)
// dockerCoordinatorConfig is used to configure the Docker coordinator.
type dockerCoordinatorConfig struct {
// logger is the logger the coordinator should use
logger hclog.Logger
// cleanup marks whether images should be deleting when the reference count
// is zero
cleanup bool
// client is the Docker client to use for communicating with Docker
client DockerImageClient
// removeDelay is the delay between an image's reference count going to
// zero and the image actually being deleted.
removeDelay time.Duration
}
// dockerCoordinator is used to coordinate actions against images to prevent
// racy deletions. It can be thought of as a reference counter on images.
type dockerCoordinator struct {
*dockerCoordinatorConfig
// imageLock is used to lock access to all images
imageLock sync.Mutex
// pullFutures is used to allow multiple callers to pull the same image but
// only have one request be sent to Docker
pullFutures map[string]*pullFuture
// pullLoggers is used to track the LogEventFn for each alloc pulling an image.
// If multiple alloc's are attempting to pull the same image, each will need
// to register its own LogEventFn with the coordinator.
pullLoggers map[string][]LogEventFn
// pullLoggerLock is used to sync access to the pullLoggers map
pullLoggerLock sync.RWMutex
// imageRefCount is the reference count of image IDs
imageRefCount map[string]map[string]struct{}
// deleteFuture is indexed by image ID and has a cancelable delete future
deleteFuture map[string]context.CancelFunc
}
// NewDockerCoordinator returns a new Docker coordinator
func NewDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
if config.client == nil {
return nil
}
return &dockerCoordinator{
dockerCoordinatorConfig: config,
pullFutures: make(map[string]*pullFuture),
pullLoggers: make(map[string][]LogEventFn),
imageRefCount: make(map[string]map[string]struct{}),
deleteFuture: make(map[string]context.CancelFunc),
}
}
// GetDockerCoordinator returns the shared dockerCoordinator instance
func GetDockerCoordinator(config *dockerCoordinatorConfig) *dockerCoordinator {
createCoordinator.Do(func() {
globalCoordinator = NewDockerCoordinator(config)
})
return globalCoordinator
}
// PullImage is used to pull an image. It returns the pulled imaged ID or an
// error that occurred during the pull
func (d *dockerCoordinator) PullImage(image string, authOptions *docker.AuthConfiguration, callerID string, emitFn LogEventFn) (imageID string, err error) {
// Get the future
d.imageLock.Lock()
future, ok := d.pullFutures[image]
d.registerPullLogger(image, emitFn)
if !ok {
// Make the future
future = newPullFuture()
d.pullFutures[image] = future
go d.pullImageImpl(image, authOptions, future)
}
d.imageLock.Unlock()
// We unlock while we wait since this can take a while
id, err := future.wait().result()
d.imageLock.Lock()
defer d.imageLock.Unlock()
// Delete the future since we don't need it and we don't want to cache an
// image being there if it has possibly been manually deleted (outside of
// Nomad).
if _, ok := d.pullFutures[image]; ok {
delete(d.pullFutures, image)
}
// If we are cleaning up, we increment the reference count on the image
if err == nil && d.cleanup {
d.incrementImageReferenceImpl(id, image, callerID)
}
return id, err
}
// pullImageImpl is the implementation of pulling an image. The results are
// returned via the passed future
func (d *dockerCoordinator) pullImageImpl(image string, authOptions *docker.AuthConfiguration, future *pullFuture) {
defer d.clearPullLogger(image)
// Parse the repo and tag
repo, tag := parseDockerImage(image)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pm := newImageProgressManager(image, cancel, d.handlePullInactivity,
d.handlePullProgressReport, d.handleSlowPullProgressReport)
defer pm.stop()
pullOptions := docker.PullImageOptions{
Repository: repo,
Tag: tag,
OutputStream: pm,
RawJSONStream: true,
Context: ctx,
}
// Attempt to pull the image
var auth docker.AuthConfiguration
if authOptions != nil {
auth = *authOptions
}
err := d.client.PullImage(pullOptions, auth)
if ctxErr := ctx.Err(); ctxErr == context.DeadlineExceeded {
d.logger.Error("timeout pulling container", "image_ref", dockerImageRef(repo, tag))
future.set("", recoverablePullError(ctxErr, image))
return
}
if err != nil {
d.logger.Error("failed pulling container", "image_ref", dockerImageRef(repo, tag),
"error", err)
future.set("", recoverablePullError(err, image))
return
}
d.logger.Debug("docker pull succeeded", "image_ref", dockerImageRef(repo, tag))
dockerImage, err := d.client.InspectImage(image)
if err != nil {
d.logger.Error("failed getting image id", "image_name", image, "error", err)
future.set("", recoverableErrTimeouts(err))
return
}
future.set(dockerImage.ID, nil)
return
}
// IncrementImageReference is used to increment an image reference count
func (d *dockerCoordinator) IncrementImageReference(imageID, imageName, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
if d.cleanup {
d.incrementImageReferenceImpl(imageID, imageName, callerID)
}
}
// incrementImageReferenceImpl assumes the lock is held
func (d *dockerCoordinator) incrementImageReferenceImpl(imageID, imageName, callerID string) {
// Cancel any pending delete
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Debug("cancelling removal of container image", "image_name", imageName)
cancel()
delete(d.deleteFuture, imageID)
}
// Increment the reference
references, ok := d.imageRefCount[imageID]
if !ok {
references = make(map[string]struct{})
d.imageRefCount[imageID] = references
}
if _, ok := references[callerID]; !ok {
references[callerID] = struct{}{}
d.logger.Debug("image reference count incremented", "image_name", imageName, "image_id", imageID, "references", len(references))
}
}
// RemoveImage removes the given image. If there are any errors removing the
// image, the remove is retried internally.
func (d *dockerCoordinator) RemoveImage(imageID, callerID string) {
d.imageLock.Lock()
defer d.imageLock.Unlock()
if !d.cleanup {
return
}
references, ok := d.imageRefCount[imageID]
if !ok {
d.logger.Warn("RemoveImage on non-referenced counted image id", "image_id", imageID)
return
}
// Decrement the reference count
delete(references, callerID)
count := len(references)
d.logger.Debug("image id reference count decremented", "image_id", imageID, "references", count)
// Nothing to do
if count != 0 {
return
}
// This should never be the case but we safety guard so we don't leak a
// cancel.
if cancel, ok := d.deleteFuture[imageID]; ok {
d.logger.Error("image id has lingering delete future", "image_id", imageID)
cancel()
}
// Setup a future to delete the image
ctx, cancel := context.WithCancel(context.Background())
d.deleteFuture[imageID] = cancel
go d.removeImageImpl(imageID, ctx)
// Delete the key from the reference count
delete(d.imageRefCount, imageID)
}
// removeImageImpl is used to remove an image. It wil wait the specified remove
// delay to remove the image. If the context is cancelled before that the image
// removal will be cancelled.
func (d *dockerCoordinator) removeImageImpl(id string, ctx context.Context) {
// Wait for the delay or a cancellation event
select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(d.removeDelay):
}
// Ensure we are suppose to delete. Do a short check while holding the lock
// so there can't be interleaving. There is still the smallest chance that
// the delete occurs after the image has been pulled but before it has been
// incremented. For handling that we just treat it as a recoverable error in
// the docker driver.
d.imageLock.Lock()
select {
case <-ctx.Done():
d.imageLock.Unlock()
return
default:
}
d.imageLock.Unlock()
for i := 0; i < 3; i++ {
err := d.client.RemoveImage(id)
if err == nil {
break
}
if err == docker.ErrNoSuchImage {
d.logger.Debug("unable to cleanup image, does not exist", "image_id", id)
return
}
if derr, ok := err.(*docker.Error); ok && derr.Status == 409 {
d.logger.Debug("unable to cleanup image, still in use", "image_id", id)
return
}
// Retry on unknown errors
d.logger.Debug("failed to remove image", "image_id", id, "attempt", i+1, "error", err)
select {
case <-ctx.Done():
// We have been cancelled
return
case <-time.After(3 * time.Second):
}
}
d.logger.Debug("cleanup removed downloaded image", "image_id", id)
// Cleanup the future from the map and free the context by cancelling it
d.imageLock.Lock()
if cancel, ok := d.deleteFuture[id]; ok {
delete(d.deleteFuture, id)
cancel()
}
d.imageLock.Unlock()
}
func (d *dockerCoordinator) registerPullLogger(image string, logger LogEventFn) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
if _, ok := d.pullLoggers[image]; !ok {
d.pullLoggers[image] = []LogEventFn{}
}
d.pullLoggers[image] = append(d.pullLoggers[image], logger)
}
func (d *dockerCoordinator) clearPullLogger(image string) {
d.pullLoggerLock.Lock()
defer d.pullLoggerLock.Unlock()
delete(d.pullLoggers, image)
}
func (d *dockerCoordinator) emitEvent(image, message string, annotations map[string]string) {
d.pullLoggerLock.RLock()
defer d.pullLoggerLock.RUnlock()
for i := range d.pullLoggers[image] {
go d.pullLoggers[image][i](message, annotations)
}
}
func (d *dockerCoordinator) handlePullInactivity(image, msg string, timestamp time.Time) {
d.logger.Error("image pull aborted due to inactivity", "image_name", image,
"last_event_timestamp", timestamp.String(), "last_event", msg)
}
func (d *dockerCoordinator) handlePullProgressReport(image, msg string, _ time.Time) {
d.logger.Debug("image pull progress", "image_name", image, "message", msg)
}
func (d *dockerCoordinator) handleSlowPullProgressReport(image, msg string, _ time.Time) {
d.emitEvent(image, fmt.Sprintf("Docker image pull progress: %s", msg), map[string]string{
"image": image,
})
}
// recoverablePullError wraps the error gotten when trying to pull and image if
// the error is recoverable.
func recoverablePullError(err error, image string) error {
recoverable := true
if imageNotFoundMatcher.MatchString(err.Error()) {
recoverable = false
}
return structs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
}

View File

@@ -0,0 +1,239 @@
package docker
import (
"fmt"
"testing"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/testutil"
)
type mockImageClient struct {
pulled map[string]int
idToName map[string]string
removed map[string]int
pullDelay time.Duration
}
func newMockImageClient(idToName map[string]string, pullDelay time.Duration) *mockImageClient {
return &mockImageClient{
pulled: make(map[string]int),
removed: make(map[string]int),
idToName: idToName,
pullDelay: pullDelay,
}
}
func (m *mockImageClient) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) error {
time.Sleep(m.pullDelay)
m.pulled[opts.Repository]++
return nil
}
func (m *mockImageClient) InspectImage(id string) (*docker.Image, error) {
return &docker.Image{
ID: m.idToName[id],
}, nil
}
func (m *mockImageClient) RemoveImage(id string) error {
m.removed[id]++
return nil
}
func TestDockerCoordinator_ConcurrentPulls(t *testing.T) {
t.Parallel()
image := "foo"
imageID := uuid.Generate()
mapping := map[string]string{imageID: image}
// Add a delay so we can get multiple queued up
mock := newMockImageClient(mapping, 10*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testlog.HCLogger(t),
cleanup: true,
client: mock,
removeDelay: 100 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
id := ""
for i := 0; i < 10; i++ {
go func() {
id, _ = coordinator.PullImage(image, nil, uuid.Generate(), nil)
}()
}
testutil.WaitForResult(func() (bool, error) {
p := mock.pulled[image]
if p >= 10 {
return false, fmt.Errorf("Wrong number of pulls: %d", p)
}
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 10 {
return false, fmt.Errorf("Got reference count %d; want %d", len(references), 10)
}
// Ensure there is no pull future
if len(coordinator.pullFutures) != 0 {
return false, fmt.Errorf("Pull future exists after pull finished")
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestDockerCoordinator_Pull_Remove(t *testing.T) {
t.Parallel()
image := "foo"
imageID := uuid.Generate()
mapping := map[string]string{imageID: image}
// Add a delay so we can get multiple queued up
mock := newMockImageClient(mapping, 10*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testlog.HCLogger(t),
cleanup: true,
client: mock,
removeDelay: 1 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
id := ""
callerIDs := make([]string, 10, 10)
for i := 0; i < 10; i++ {
callerIDs[i] = uuid.Generate()
id, _ = coordinator.PullImage(image, nil, callerIDs[i], nil)
}
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 10 {
t.Fatalf("Got reference count %d; want %d", len(references), 10)
}
// Remove some
for i := 0; i < 8; i++ {
coordinator.RemoveImage(id, callerIDs[i])
}
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 2 {
t.Fatalf("Got reference count %d; want %d", len(references), 2)
}
// Remove all
for i := 8; i < 10; i++ {
coordinator.RemoveImage(id, callerIDs[i])
}
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}
// Check that only one delete happened
testutil.WaitForResult(func() (bool, error) {
removes := mock.removed[id]
return removes == 1, fmt.Errorf("Wrong number of removes: %d", removes)
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Make sure there is no future still
if _, ok := coordinator.deleteFuture[id]; ok {
t.Fatal("Got delete future")
}
}
func TestDockerCoordinator_Remove_Cancel(t *testing.T) {
t.Parallel()
image := "foo"
imageID := uuid.Generate()
mapping := map[string]string{imageID: image}
mock := newMockImageClient(mapping, 1*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testlog.HCLogger(t),
cleanup: true,
client: mock,
removeDelay: 100 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}
// Remove image
coordinator.RemoveImage(id, callerID)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}
// Pull image again within delay
id, _ = coordinator.PullImage(image, nil, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 1 {
t.Fatalf("Got reference count %d; want %d", len(references), 1)
}
// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
t.Fatalf("Image deleted when it shouldn't have")
}
}
func TestDockerCoordinator_No_Cleanup(t *testing.T) {
t.Parallel()
image := "foo"
imageID := uuid.Generate()
mapping := map[string]string{imageID: image}
mock := newMockImageClient(mapping, 1*time.Millisecond)
config := &dockerCoordinatorConfig{
logger: testlog.HCLogger(t),
cleanup: false,
client: mock,
removeDelay: 1 * time.Millisecond,
}
// Create a coordinator
coordinator := NewDockerCoordinator(config)
callerID := uuid.Generate()
// Pull image
id, _ := coordinator.PullImage(image, nil, callerID, nil)
// Check the reference count
if references := coordinator.imageRefCount[id]; len(references) != 0 {
t.Fatalf("Got reference count %d; want %d", len(references), 0)
}
// Remove image
coordinator.RemoveImage(id, callerID)
// Check that only no delete happened
if removes := mock.removed[id]; removes != 0 {
t.Fatalf("Image deleted when it shouldn't have")
}
}

1556
drivers/docker/driver.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,36 @@
//+build !windows
package docker
import (
docker "github.com/fsouza/go-dockerclient"
"github.com/moby/moby/daemon/caps"
)
const (
// Setting default network mode for non-windows OS as bridge
defaultNetworkMode = "bridge"
)
func getPortBinding(ip string, port string) []docker.PortBinding {
return []docker.PortBinding{{HostIP: ip, HostPort: port}}
}
func tweakCapabilities(basics, adds, drops []string) ([]string, error) {
// Moby mixes 2 different capabilities formats: prefixed with "CAP_"
// and not. We do the conversion here to have a consistent,
// non-prefixed format on the Nomad side.
for i, cap := range basics {
basics[i] = "CAP_" + cap
}
effectiveCaps, err := caps.TweakCapabilities(basics, adds, drops)
if err != nil {
return effectiveCaps, err
}
for i, cap := range effectiveCaps {
effectiveCaps[i] = cap[len("CAP_"):]
}
return effectiveCaps, nil
}

View File

@@ -0,0 +1,99 @@
package docker
import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/client/testutil"
tu "github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestDockerDriver_authFromHelper(t *testing.T) {
dir, err := ioutil.TempDir("", "test-docker-driver_authfromhelper")
require.NoError(t, err)
defer os.RemoveAll(dir)
helperPayload := "{\"Username\":\"hashi\",\"Secret\":\"nomad\"}"
helperContent := []byte(fmt.Sprintf("#!/bin/sh\ncat > %s/helper-$1.out;echo '%s'", dir, helperPayload))
helperFile := filepath.Join(dir, "docker-credential-testnomad")
err = ioutil.WriteFile(helperFile, helperContent, 0777)
require.NoError(t, err)
path := os.Getenv("PATH")
os.Setenv("PATH", fmt.Sprintf("%s:%s", path, dir))
defer os.Setenv("PATH", path)
helper := authFromHelper("testnomad")
creds, err := helper("registry.local:5000/repo/image")
require.NoError(t, err)
require.NotNil(t, creds)
require.Equal(t, "hashi", creds.Username)
require.Equal(t, "nomad", creds.Password)
if _, err := os.Stat(filepath.Join(dir, "helper-get.out")); os.IsNotExist(err) {
t.Fatalf("Expected helper-get.out to exist")
}
content, err := ioutil.ReadFile(filepath.Join(dir, "helper-get.out"))
require.NoError(t, err)
require.Equal(t, []byte("https://registry.local:5000"), content)
}
func TestDockerDriver_PidsLimit(t *testing.T) {
if !tu.IsTravis() {
t.Parallel()
}
if !testutil.DockerIsConnected(t) {
t.Skip("Docker not connected")
}
task, _, _ := dockerTask(t)
task.Config["pids_limit"] = "1"
task.Config["command"] = "/bin/sh"
task.Config["args"] = []string{"-c", "sleep 2 & sleep 2"}
ctx := testDockerDriverContexts(t, task)
defer ctx.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
// Copy the image into the task's directory
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
resp, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
defer resp.Handle.Kill()
select {
case res := <-resp.Handle.WaitCh():
if res.Successful() {
t.Fatalf("expected error, but container exited successful")
}
case <-time.After(time.Duration(tu.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
// XXX Logging doesn't work on OSX so just test on Linux
// Check that data was written to the directory.
outputFile := filepath.Join(ctx.ExecCtx.TaskDir.LogDir, "redis-demo.stderr.0")
act, err := ioutil.ReadFile(outputFile)
if err != nil {
t.Fatalf("Couldn't read expected output: %v", err)
}
exp := "can't fork"
if !strings.Contains(string(act), exp) {
t.Fatalf("Expected failed fork: %q", act)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,105 @@
// +build !windows
package docker
import (
"io/ioutil"
"path/filepath"
"strings"
"syscall"
"testing"
"time"
"github.com/hashicorp/nomad/client/testutil"
"github.com/hashicorp/nomad/nomad/structs"
tu "github.com/hashicorp/nomad/testutil"
)
func TestDockerDriver_Signal(t *testing.T) {
if !tu.IsTravis() {
t.Parallel()
}
if !testutil.DockerIsConnected(t) {
t.Skip("Docker not connected")
}
task := &structs.Task{
Name: "redis-demo",
Driver: "docker",
Config: map[string]interface{}{
"image": "busybox",
"load": "busybox.tar",
"command": "/bin/sh",
"args": []string{"local/test.sh"},
},
Resources: &structs.Resources{
MemoryMB: 256,
CPU: 512,
},
LogConfig: &structs.LogConfig{
MaxFiles: 10,
MaxFileSizeMB: 10,
},
}
ctx := testDockerDriverContexts(t, task)
defer ctx.Destroy()
d := NewDockerDriver(ctx.DriverCtx)
// Copy the image into the task's directory
copyImage(t, ctx.ExecCtx.TaskDir, "busybox.tar")
testFile := filepath.Join(ctx.ExecCtx.TaskDir.LocalDir, "test.sh")
testData := []byte(`
at_term() {
echo 'Terminated.' > $NOMAD_TASK_DIR/output
exit 3
}
trap at_term INT
while true; do
echo 'sleeping'
sleep 0.2
done
`)
if err := ioutil.WriteFile(testFile, testData, 0777); err != nil {
t.Fatalf("Failed to write data: %v", err)
}
_, err := d.Prestart(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("error in prestart: %v", err)
}
resp, err := d.Start(ctx.ExecCtx, task)
if err != nil {
t.Fatalf("err: %v", err)
}
defer resp.Handle.Kill()
waitForExist(t, resp.Handle.(*DockerHandle).client, resp.Handle.(*DockerHandle))
time.Sleep(1 * time.Second)
if err := resp.Handle.Signal(syscall.SIGINT); err != nil {
t.Fatalf("Signal returned an error: %v", err)
}
select {
case res := <-resp.Handle.WaitCh():
if res.Successful() {
t.Fatalf("should err: %v", res)
}
case <-time.After(time.Duration(tu.TestMultiplier()*5) * time.Second):
t.Fatalf("timeout")
}
// Check the log file to see it exited because of the signal
outputFile := filepath.Join(ctx.ExecCtx.TaskDir.LocalDir, "output")
act, err := ioutil.ReadFile(outputFile)
if err != nil {
t.Fatalf("Couldn't read expected output: %v", err)
}
exp := "Terminated."
if strings.TrimSpace(string(act)) != exp {
t.Fatalf("Command outputted %v; want %v", act, exp)
}
}

View File

@@ -0,0 +1,17 @@
package docker
import docker "github.com/fsouza/go-dockerclient"
const (
// Default network mode for windows containers is nat
defaultNetworkMode = "nat"
)
//Currently Windows containers don't support host ip in port binding.
func getPortBinding(ip string, port string) []docker.PortBinding {
return []docker.PortBinding{{HostIP: "", HostPort: port}}
}
func tweakCapabilities(basics, adds, drops []string) ([]string, error) {
return nil, nil
}

278
drivers/docker/handle.go Normal file
View File

@@ -0,0 +1,278 @@
package docker
import (
"fmt"
"os"
"runtime"
"strings"
"sync"
"syscall"
"time"
"github.com/armon/circbuf"
metrics "github.com/armon/go-metrics"
docker "github.com/fsouza/go-dockerclient"
hclog "github.com/hashicorp/go-hclog"
plugin "github.com/hashicorp/go-plugin"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/docklog"
"github.com/hashicorp/nomad/helper/stats"
"github.com/hashicorp/nomad/plugins/drivers"
"golang.org/x/net/context"
)
type taskHandle struct {
client *docker.Client
waitClient *docker.Client
logger hclog.Logger
dlogger docklog.DockerLogger
dloggerPluginClient *plugin.Client
task *drivers.TaskConfig
container *docker.Container
resourceUsageLock sync.RWMutex
resourceUsage *structs.TaskResourceUsage
doneCh chan bool
waitCh chan struct{}
removeContainerOnExit bool
net *structs.DriverNetwork
startedAt time.Time
completedAt time.Time
exitResult *drivers.ExitResult
}
func (h *taskHandle) Exec(ctx context.Context, cmd string, args []string) (*drivers.ExecTaskResult, error) {
fullCmd := make([]string, len(args)+1)
fullCmd[0] = cmd
copy(fullCmd[1:], args)
createExecOpts := docker.CreateExecOptions{
AttachStdin: false,
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: fullCmd,
Container: h.container.ID,
Context: ctx,
}
exec, err := h.client.CreateExec(createExecOpts)
if err != nil {
return nil, err
}
execResult := &drivers.ExecTaskResult{ExitResult: &drivers.ExitResult{}}
stdout, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize))
stderr, _ := circbuf.NewBuffer(int64(drivers.CheckBufSize))
startOpts := docker.StartExecOptions{
Detach: false,
Tty: false,
OutputStream: stdout,
ErrorStream: stderr,
Context: ctx,
}
if err := client.StartExec(exec.ID, startOpts); err != nil {
return nil, err
}
execResult.Stdout = stdout.Bytes()
execResult.Stderr = stderr.Bytes()
res, err := client.InspectExec(exec.ID)
if err != nil {
return execResult, err
}
execResult.ExitResult.ExitCode = res.ExitCode
return execResult, nil
}
func (h *taskHandle) Signal(s os.Signal) error {
// Convert types
sysSig, ok := s.(syscall.Signal)
if !ok {
return fmt.Errorf("Failed to determine signal number")
}
// TODO When we expose signals we will need a mapping layer that converts
// MacOS signals to the correct signal number for docker. Or we change the
// interface to take a signal string and leave it up to driver to map?
dockerSignal := docker.Signal(sysSig)
opts := docker.KillContainerOptions{
ID: h.container.ID,
Signal: dockerSignal,
}
return h.client.KillContainer(opts)
}
// Kill is used to terminate the task.
func (h *taskHandle) Kill(killTimeout time.Duration, signal os.Signal) error {
// Only send signal if killTimeout is set, otherwise stop container
if killTimeout > 0 {
if err := h.Signal(signal); err != nil {
return err
}
select {
case <-h.waitCh:
return nil
case <-time.After(killTimeout):
}
}
// Stop the container
err := h.client.StopContainer(h.container.ID, 0)
if err != nil {
// Container has already been removed.
if strings.Contains(err.Error(), NoSuchContainerError) {
h.logger.Debug("attempted to stop nonexistent container")
return nil
}
h.logger.Error("failed to stop container", "error", err)
return fmt.Errorf("Failed to stop container %s: %s", h.container.ID, err)
}
h.logger.Info("stopped container")
return nil
}
func (h *taskHandle) Stats() (*structs.TaskResourceUsage, error) {
h.resourceUsageLock.RLock()
defer h.resourceUsageLock.RUnlock()
var err error
if h.resourceUsage == nil {
err = fmt.Errorf("stats collection hasn't started yet")
}
return h.resourceUsage, err
}
func (h *taskHandle) run() {
exitCode, werr := h.waitClient.WaitContainer(h.container.ID)
if werr != nil {
h.logger.Error("failed to wait for container; already terminated")
}
if exitCode != 0 {
werr = fmt.Errorf("Docker container exited with non-zero exit code: %d", exitCode)
}
container, ierr := h.waitClient.InspectContainer(h.container.ID)
oom := false
if ierr != nil {
h.logger.Error("failed to inspect container", "error", ierr)
} else if container.State.OOMKilled {
oom = true
werr = fmt.Errorf("OOM Killed")
labels := []metrics.Label{
{
Name: "job",
Value: h.task.JobName,
},
{
Name: "task_group",
Value: h.task.TaskGroupName,
},
{
Name: "task",
Value: h.task.Name,
},
}
metrics.IncrCounterWithLabels([]string{"driver", "docker", "oom"}, 1, labels)
}
close(h.doneCh)
// Shutdown the syslog collector
// Stop the container just incase the docker daemon's wait returned
// incorrectly
if err := h.client.StopContainer(h.container.ID, 0); err != nil {
_, noSuchContainer := err.(*docker.NoSuchContainer)
_, containerNotRunning := err.(*docker.ContainerNotRunning)
if !containerNotRunning && !noSuchContainer {
h.logger.Error("error stopping container", "error", err)
}
}
// Remove the container
if h.removeContainerOnExit == true {
if err := h.client.RemoveContainer(docker.RemoveContainerOptions{ID: h.container.ID, RemoveVolumes: true, Force: true}); err != nil {
h.logger.Error("error removing container", "error", err)
}
} else {
h.logger.Debug("not removing container due to config")
}
// Set the result
h.exitResult = &drivers.ExitResult{
ExitCode: exitCode,
Signal: 0,
OOMKilled: oom,
}
close(h.waitCh)
}
// collectStats starts collecting resource usage stats of a docker container
func (h *taskHandle) collectStats() {
statsCh := make(chan *docker.Stats)
statsOpts := docker.StatsOptions{ID: h.container.ID, Done: h.doneCh, Stats: statsCh, Stream: true}
go func() {
//TODO handle Stats error
if err := h.waitClient.Stats(statsOpts); err != nil {
h.logger.Debug("error collecting stats from container", "error", err)
}
}()
numCores := runtime.NumCPU()
for {
select {
case s := <-statsCh:
if s != nil {
ms := &structs.MemoryStats{
RSS: s.MemoryStats.Stats.Rss,
Cache: s.MemoryStats.Stats.Cache,
Swap: s.MemoryStats.Stats.Swap,
MaxUsage: s.MemoryStats.MaxUsage,
Measured: DockerMeasuredMemStats,
}
cs := &structs.CpuStats{
ThrottledPeriods: s.CPUStats.ThrottlingData.ThrottledPeriods,
ThrottledTime: s.CPUStats.ThrottlingData.ThrottledTime,
Measured: DockerMeasuredCpuStats,
}
// Calculate percentage
cs.Percent = calculatePercent(
s.CPUStats.CPUUsage.TotalUsage, s.PreCPUStats.CPUUsage.TotalUsage,
s.CPUStats.SystemCPUUsage, s.PreCPUStats.SystemCPUUsage, numCores)
cs.SystemMode = calculatePercent(
s.CPUStats.CPUUsage.UsageInKernelmode, s.PreCPUStats.CPUUsage.UsageInKernelmode,
s.CPUStats.CPUUsage.TotalUsage, s.PreCPUStats.CPUUsage.TotalUsage, numCores)
cs.UserMode = calculatePercent(
s.CPUStats.CPUUsage.UsageInUsermode, s.PreCPUStats.CPUUsage.UsageInUsermode,
s.CPUStats.CPUUsage.TotalUsage, s.PreCPUStats.CPUUsage.TotalUsage, numCores)
cs.TotalTicks = (cs.Percent / 100) * stats.TotalTicksAvailable() / float64(numCores)
h.resourceUsageLock.Lock()
h.resourceUsage = &structs.TaskResourceUsage{
ResourceUsage: &structs.ResourceUsage{
MemoryStats: ms,
CpuStats: cs,
},
Timestamp: s.Read.UTC().UnixNano(),
}
h.resourceUsageLock.Unlock()
}
case <-h.doneCh:
return
}
}
}
func calculatePercent(newSample, oldSample, newTotal, oldTotal uint64, cores int) float64 {
numerator := newSample - oldSample
denom := newTotal - oldTotal
if numerator <= 0 || denom <= 0 {
return 0.0
}
return (float64(numerator) / float64(denom)) * float64(cores) * 100.0
}

289
drivers/docker/progress.go Normal file
View File

@@ -0,0 +1,289 @@
package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"strings"
"sync"
"time"
"github.com/docker/docker/pkg/jsonmessage"
units "github.com/docker/go-units"
)
const (
// dockerPullActivityDeadline is the default value set in the imageProgressManager
// when newImageProgressManager is called
dockerPullActivityDeadline = 2 * time.Minute
// dockerImageProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
dockerImageProgressReportInterval = 10 * time.Second
// dockerImageSlowProgressReportInterval is the default value set in the
// imageProgressManager when newImageProgressManager is called
dockerImageSlowProgressReportInterval = 2 * time.Minute
)
// layerProgress tracks the state and downloaded bytes of a single layer within
// a docker image
type layerProgress struct {
id string
status layerProgressStatus
currentBytes int64
totalBytes int64
}
type layerProgressStatus int
const (
layerProgressStatusUnknown layerProgressStatus = iota
layerProgressStatusStarting
layerProgressStatusWaiting
layerProgressStatusDownloading
layerProgressStatusVerifying
layerProgressStatusDownloaded
layerProgressStatusExtracting
layerProgressStatusComplete
layerProgressStatusExists
)
func lpsFromString(status string) layerProgressStatus {
switch status {
case "Pulling fs layer":
return layerProgressStatusStarting
case "Waiting":
return layerProgressStatusWaiting
case "Downloading":
return layerProgressStatusDownloading
case "Verifying Checksum":
return layerProgressStatusVerifying
case "Download complete":
return layerProgressStatusDownloaded
case "Extracting":
return layerProgressStatusExtracting
case "Pull complete":
return layerProgressStatusComplete
case "Already exists":
return layerProgressStatusExists
default:
return layerProgressStatusUnknown
}
}
// imageProgress tracks the status of each child layer as its pulled from a
// docker image repo
type imageProgress struct {
sync.RWMutex
lastMessage *jsonmessage.JSONMessage
timestamp time.Time
layers map[string]*layerProgress
pullStart time.Time
}
// get returns a status message and the timestamp of the last status update
func (p *imageProgress) get() (string, time.Time) {
p.RLock()
defer p.RUnlock()
if p.lastMessage == nil {
return "No progress", p.timestamp
}
var pulled, pulling, waiting int
for _, l := range p.layers {
switch {
case l.status == layerProgressStatusStarting ||
l.status == layerProgressStatusWaiting:
waiting++
case l.status == layerProgressStatusDownloading ||
l.status == layerProgressStatusVerifying:
pulling++
case l.status >= layerProgressStatusDownloaded:
pulled++
}
}
elapsed := time.Now().Sub(p.pullStart)
cur := p.currentBytes()
total := p.totalBytes()
var est int64
if cur != 0 {
est = (elapsed.Nanoseconds() / cur * total) - elapsed.Nanoseconds()
}
var msg strings.Builder
fmt.Fprintf(&msg, "Pulled %d/%d (%s/%s) layers: %d waiting/%d pulling",
pulled, len(p.layers), units.BytesSize(float64(cur)), units.BytesSize(float64(total)),
waiting, pulling)
if est > 0 {
fmt.Fprintf(&msg, " - est %.1fs remaining", time.Duration(est).Seconds())
}
return msg.String(), p.timestamp
}
// set takes a status message received from the docker engine api during an image
// pull and updates the status of the corresponding layer
func (p *imageProgress) set(msg *jsonmessage.JSONMessage) {
p.Lock()
defer p.Unlock()
p.lastMessage = msg
p.timestamp = time.Now()
lps := lpsFromString(msg.Status)
if lps == layerProgressStatusUnknown {
return
}
layer, ok := p.layers[msg.ID]
if !ok {
layer = &layerProgress{id: msg.ID}
p.layers[msg.ID] = layer
}
layer.status = lps
if msg.Progress != nil && lps == layerProgressStatusDownloading {
layer.currentBytes = msg.Progress.Current
layer.totalBytes = msg.Progress.Total
} else if lps == layerProgressStatusDownloaded {
layer.currentBytes = layer.totalBytes
}
}
// currentBytes iterates through all image layers and sums the total of
// current bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) currentBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.currentBytes
}
return b
}
// totalBytes iterates through all image layers and sums the total of
// total bytes. The caller is responsible for acquiring a read lock on the
// imageProgress struct
func (p *imageProgress) totalBytes() int64 {
var b int64
for _, l := range p.layers {
b += l.totalBytes
}
return b
}
// progressReporterFunc defines the method for handling inactivity and report
// events from the imageProgressManager. The image name, current status message
// and timestamp of last received status update are passed in.
type progressReporterFunc func(image string, msg string, timestamp time.Time)
// imageProgressManager tracks the progress of pulling a docker image from an
// image repository.
// It also implemented the io.Writer interface so as to be passed to the docker
// client pull image method in order to receive status updates from the docker
// engine api.
type imageProgressManager struct {
imageProgress *imageProgress
image string
activityDeadline time.Duration
inactivityFunc progressReporterFunc
reportInterval time.Duration
reporter progressReporterFunc
slowReportInterval time.Duration
slowReporter progressReporterFunc
lastSlowReport time.Time
cancel context.CancelFunc
stopCh chan struct{}
buf bytes.Buffer
}
func newImageProgressManager(
image string, cancel context.CancelFunc,
inactivityFunc, reporter, slowReporter progressReporterFunc) *imageProgressManager {
pm := &imageProgressManager{
image: image,
activityDeadline: dockerPullActivityDeadline,
inactivityFunc: inactivityFunc,
reportInterval: dockerImageProgressReportInterval,
reporter: reporter,
slowReportInterval: dockerImageSlowProgressReportInterval,
slowReporter: slowReporter,
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
},
cancel: cancel,
stopCh: make(chan struct{}),
}
pm.start()
return pm
}
// start intiates the ticker to trigger the inactivity and reporter handlers
func (pm *imageProgressManager) start() {
now := time.Now()
pm.imageProgress.pullStart = now
pm.lastSlowReport = now
go func() {
ticker := time.NewTicker(dockerImageProgressReportInterval)
for {
select {
case <-ticker.C:
msg, lastStatusTime := pm.imageProgress.get()
t := time.Now()
if t.Sub(lastStatusTime) > pm.activityDeadline {
pm.inactivityFunc(pm.image, msg, lastStatusTime)
pm.cancel()
return
}
if t.Sub(pm.lastSlowReport) > pm.slowReportInterval {
pm.slowReporter(pm.image, msg, lastStatusTime)
pm.lastSlowReport = t
}
pm.reporter(pm.image, msg, lastStatusTime)
case <-pm.stopCh:
return
}
}
}()
}
func (pm *imageProgressManager) stop() {
close(pm.stopCh)
}
func (pm *imageProgressManager) Write(p []byte) (n int, err error) {
n, err = pm.buf.Write(p)
var msg jsonmessage.JSONMessage
for {
line, err := pm.buf.ReadBytes('\n')
if err == io.EOF {
// Partial write of line; push back onto buffer and break until full line
pm.buf.Write(line)
break
}
if err != nil {
return n, err
}
err = json.Unmarshal(line, &msg)
if err != nil {
return n, err
}
if msg.Error != nil {
// error received from the docker engine api
return n, msg.Error
}
pm.imageProgress.set(&msg)
}
return
}

View File

@@ -0,0 +1,52 @@
package docker
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func Test_DockerImageProgressManager(t *testing.T) {
pm := &imageProgressManager{
imageProgress: &imageProgress{
timestamp: time.Now(),
layers: make(map[string]*layerProgress),
},
}
_, err := pm.Write([]byte(`{"status":"Pulling from library/golang","id":"1.9.5"}
{"status":"Pulling fs layer","progressDetail":{},"id":"c73ab1c6897b"}
{"status":"Pulling fs layer","progressDetail":{},"id":"1ab373b3deae"}
`))
require.NoError(t, err)
require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2")
cur := pm.imageProgress.currentBytes()
require.Zero(t, cur)
tot := pm.imageProgress.totalBytes()
require.Zero(t, tot)
_, err = pm.Write([]byte(`{"status":"Pulling fs layer","progress`))
require.NoError(t, err)
require.Equal(t, 2, len(pm.imageProgress.layers), "number of layers should be 2")
_, err = pm.Write([]byte(`Detail":{},"id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
_, err = pm.Write([]byte(`{"status":"Downloading","progressDetail":{"current":45800,"total":4335495},"progress":"[\u003e ] 45.8kB/4.335MB","id":"b542772b4177"}
{"status":"Downloading","progressDetail":{"current":113576,"total":11108010},"progress":"[\u003e ] 113.6kB/11.11MB","id":"1ab373b3deae"}
{"status":"Downloading","progressDetail":{"current":694257,"total":4335495},"progress":"[========\u003e ] 694.3kB/4.335MB","id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
require.Equal(t, int64(807833), pm.imageProgress.currentBytes())
require.Equal(t, int64(15443505), pm.imageProgress.totalBytes())
_, err = pm.Write([]byte(`{"status":"Download complete","progressDetail":{},"id":"b542772b4177"}` + "\n"))
require.NoError(t, err)
require.Equal(t, 3, len(pm.imageProgress.layers), "number of layers should be 3")
require.Equal(t, int64(4449071), pm.imageProgress.currentBytes())
require.Equal(t, int64(15443505), pm.imageProgress.totalBytes())
}

33
drivers/docker/state.go Normal file
View File

@@ -0,0 +1,33 @@
package docker
import (
"sync"
)
type taskStore struct {
store map[string]*taskHandle
lock sync.RWMutex
}
func newTaskStore() *taskStore {
return &taskStore{store: map[string]*taskHandle{}}
}
func (ts *taskStore) Set(id string, handle *taskHandle) {
ts.lock.Lock()
defer ts.lock.Unlock()
ts.store[id] = handle
}
func (ts *taskStore) Get(id string) (*taskHandle, bool) {
ts.lock.RLock()
defer ts.lock.RUnlock()
t, ok := ts.store[id]
return t, ok
}
func (ts *taskStore) Delete(id string) {
ts.lock.Lock()
defer ts.lock.Unlock()
delete(ts.store, id)
}

190
drivers/docker/utils.go Normal file
View File

@@ -0,0 +1,190 @@
package docker
import (
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"github.com/docker/cli/cli/config/configfile"
"github.com/docker/distribution/reference"
"github.com/docker/docker/registry"
docker "github.com/fsouza/go-dockerclient"
)
func parseDockerImage(image string) (repo, tag string) {
repo, tag = docker.ParseRepositoryTag(image)
if tag != "" {
return repo, tag
}
if i := strings.IndexRune(image, '@'); i > -1 { // Has digest (@sha256:...)
// when pulling images with a digest, the repository contains the sha hash, and the tag is empty
// see: https://github.com/fsouza/go-dockerclient/blob/master/image_test.go#L471
repo = image
} else {
tag = "latest"
}
return repo, tag
}
func dockerImageRef(repo string, tag string) string {
if tag == "" {
return repo
}
return fmt.Sprintf("%s:%s", repo, tag)
}
// loadDockerConfig loads the docker config at the specified path, returning an
// error if it couldn't be read.
func loadDockerConfig(file string) (*configfile.ConfigFile, error) {
f, err := os.Open(file)
if err != nil {
return nil, fmt.Errorf("Failed to open auth config file: %v, error: %v", file, err)
}
defer f.Close()
cfile := new(configfile.ConfigFile)
if err = cfile.LoadFromReader(f); err != nil {
return nil, fmt.Errorf("Failed to parse auth config file: %v", err)
}
return cfile, nil
}
// parseRepositoryInfo takes a repo and returns the Docker RepositoryInfo. This
// is useful for interacting with a Docker config object.
func parseRepositoryInfo(repo string) (*registry.RepositoryInfo, error) {
name, err := reference.ParseNamed(repo)
if err != nil {
return nil, fmt.Errorf("Failed to parse named repo %q: %v", repo, err)
}
repoInfo, err := registry.ParseRepositoryInfo(name)
if err != nil {
return nil, fmt.Errorf("Failed to parse repository: %v", err)
}
return repoInfo, nil
}
// firstValidAuth tries a list of auth backends, returning first error or AuthConfiguration
func firstValidAuth(repo string, backends []authBackend) (*docker.AuthConfiguration, error) {
for _, backend := range backends {
auth, err := backend(repo)
if auth != nil || err != nil {
return auth, err
}
}
return nil, nil
}
// authFromTaskConfig generates an authBackend for any auth given in the task-configuration
func authFromTaskConfig(driverConfig *TaskConfig) authBackend {
return func(string) (*docker.AuthConfiguration, error) {
if len(driverConfig.Auth.Email) == 0 {
return nil, nil
}
return &docker.AuthConfiguration{
Username: driverConfig.Auth.Username,
Password: driverConfig.Auth.Password,
Email: driverConfig.Auth.Email,
ServerAddress: driverConfig.Auth.ServerAddr,
}, nil
}
}
// authFromDockerConfig generate an authBackend for a dockercfg-compatible file.
// The authBacken can either be from explicit auth definitions or via credential
// helpers
func authFromDockerConfig(file string) authBackend {
return func(repo string) (*docker.AuthConfiguration, error) {
if file == "" {
return nil, nil
}
repoInfo, err := parseRepositoryInfo(repo)
if err != nil {
return nil, err
}
cfile, err := loadDockerConfig(file)
if err != nil {
return nil, err
}
return firstValidAuth(repo, []authBackend{
func(string) (*docker.AuthConfiguration, error) {
dockerAuthConfig := registry.ResolveAuthConfig(cfile.AuthConfigs, repoInfo.Index)
auth := &docker.AuthConfiguration{
Username: dockerAuthConfig.Username,
Password: dockerAuthConfig.Password,
Email: dockerAuthConfig.Email,
ServerAddress: dockerAuthConfig.ServerAddress,
}
if authIsEmpty(auth) {
return nil, nil
}
return auth, nil
},
authFromHelper(cfile.CredentialHelpers[registry.GetAuthConfigKey(repoInfo.Index)]),
authFromHelper(cfile.CredentialsStore),
})
}
}
// authFromHelper generates an authBackend for a docker-credentials-helper;
// A script taking the requested domain on input, outputting JSON with
// "Username" and "Secret"
func authFromHelper(helperName string) authBackend {
return func(repo string) (*docker.AuthConfiguration, error) {
if helperName == "" {
return nil, nil
}
helper := dockerAuthHelperPrefix + helperName
cmd := exec.Command(helper, "get")
repoInfo, err := parseRepositoryInfo(repo)
if err != nil {
return nil, err
}
// Ensure that the HTTPs prefix exists
repoAddr := fmt.Sprintf("https://%s", repoInfo.Index.Name)
cmd.Stdin = strings.NewReader(repoAddr)
output, err := cmd.Output()
if err != nil {
switch err.(type) {
default:
return nil, err
case *exec.ExitError:
return nil, fmt.Errorf("%s with input %q failed with stderr: %s", helper, repo, output)
}
}
var response map[string]string
if err := json.Unmarshal(output, &response); err != nil {
return nil, err
}
auth := &docker.AuthConfiguration{
Username: response["Username"],
Password: response["Secret"],
}
if authIsEmpty(auth) {
return nil, nil
}
return auth, nil
}
}
// authIsEmpty returns if auth is nil or an empty structure
func authIsEmpty(auth *docker.AuthConfiguration) bool {
if auth == nil {
return false
}
return auth.Username == "" &&
auth.Password == "" &&
auth.Email == "" &&
auth.ServerAddress == ""
}

View File

@@ -17,6 +17,11 @@ import (
"github.com/zclconf/go-cty/cty/msgpack"
)
const (
// CheckBufSize is the size of the check output result<Paste>
CheckBufSize = 4 * 1024
)
// DriverPlugin is the interface with drivers will implement. It is also
// implemented by a plugin client which proxies the calls to go-plugin. See
// the proto/driver.proto file for detailed information about each RPC and
@@ -98,6 +103,8 @@ type Capabilities struct {
type TaskConfig struct {
ID string
JobName string
TaskGroupName string
Name string
Env map[string]string
Resources *Resources
@@ -157,6 +164,17 @@ func (tc *TaskConfig) EncodeDriverConfig(val cty.Value) error {
return nil
}
func (tc *TaskConfig) EncodeConcreteDriverConfig(t interface{}) error {
data := []byte{}
err := base.MsgPackEncode(&data, t)
if err != nil {
return err
}
tc.rawDriverConfig = data
return nil
}
type Resources struct {
NomadResources *structs.Resources
LinuxResources *LinuxResources

View File

@@ -331,6 +331,12 @@ message TaskConfig {
// StderrPath is the path to the file to open and write task stderr to
string stderr_path = 11;
// TaskGroupName is the name of the task group which this task is a member of
string task_group_name = 12;
// JobName is the name of the job of which this task is part of
string job_name = 13;
}
message Resources {

View File

@@ -51,6 +51,8 @@ func taskConfigFromProto(pb *proto.TaskConfig) *TaskConfig {
}
return &TaskConfig{
ID: pb.Id,
JobName: pb.JobName,
TaskGroupName: pb.TaskGroupName,
Name: pb.Name,
Env: pb.Env,
rawDriverConfig: pb.MsgpackDriverConfig,
@@ -70,6 +72,8 @@ func taskConfigToProto(cfg *TaskConfig) *proto.TaskConfig {
}
pb := &proto.TaskConfig{
Id: cfg.ID,
JobName: cfg.JobName,
TaskGroupName: cfg.TaskGroupName,
Name: cfg.Name,
Env: cfg.Env,
Resources: resourcesToProto(cfg.Resources),

View File

@@ -1,6 +1,7 @@
package catalog
import (
"github.com/hashicorp/nomad/drivers/docker"
"github.com/hashicorp/nomad/drivers/exec"
"github.com/hashicorp/nomad/drivers/java"
"github.com/hashicorp/nomad/drivers/qemu"
@@ -15,4 +16,5 @@ func init() {
Register(exec.PluginID, exec.PluginConfig)
Register(qemu.PluginID, qemu.PluginConfig)
Register(java.PluginID, java.PluginConfig)
RegisterDeferredConfig(docker.PluginID, docker.PluginConfig, docker.PluginLoader)
}