diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 17e869fe4..c374aa81c 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -749,7 +749,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -844,7 +844,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -959,7 +959,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "") ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { @@ -1413,7 +1413,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false) // Set prevAlloc like Client does - ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger) + ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "") go ar2.Run() defer ar2.Destroy() diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 8b029a7f7..4fb819978 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -51,7 +51,7 @@ type prevAllocWatcher interface { // newAllocWatcher creates a prevAllocWatcher appropriate for whether this // alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. -func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher { +func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { if alloc.PreviousAllocation == "" { // No previous allocation, use noop transitioner return noopPrevAlloc{} @@ -75,13 +75,14 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, } return &remotePrevAlloc{ - allocID: alloc.ID, - prevAllocID: alloc.PreviousAllocation, - tasks: tg.Tasks, - config: config, - migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, - rpc: rpc, - logger: l, + allocID: alloc.ID, + prevAllocID: alloc.PreviousAllocation, + tasks: tg.Tasks, + config: config, + migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, + rpc: rpc, + logger: l, + migrateToken: migrateToken, } } @@ -236,6 +237,10 @@ type remotePrevAlloc struct { waitingLock sync.RWMutex logger *log.Logger + + // migrateToken allows a client to migrate data in an ACL-protected remote + // volume + migrateToken string } // IsWaiting returns true if there's a concurrent call inside Wait @@ -423,7 +428,8 @@ func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) } url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID) - resp, err := apiClient.Raw().Response(url, nil) + qo := &nomadapi.QueryOptions{SecretID: p.migrateToken} + resp, err := apiClient.Raw().Response(url, qo) if err != nil { prevAllocDir.Destroy() return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index 01e9f1d7e..fb7e93819 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -29,7 +29,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { task.Driver = "mock_driver" task.Config["run_for"] = "500ms" - waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger()) + waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "") // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/client.go b/client/client.go index b8a2bd24a..27f1f2c4e 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/hashstructure" "github.com/shirou/gopsutil/host" + "golang.org/x/crypto/blake2b" ) const ( @@ -525,6 +526,23 @@ func (c *Client) LatestHostStats() *stats.HostStats { return c.hostStatsCollector.Stats() } +// ValidateMigrateToken verifies that a token is for a specific client and +// allocation, and has been created by a trusted party that has privileged +// knowledge of the client's secret identifier +func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool { + if !c.config.ACLEnabled { + return true + } + + h, err := blake2b.New512([]byte(c.secretNodeID())) + if err != nil { + return false + } + h.Write([]byte(allocID)) + expectedMigrateToken := string(h.Sum(nil)) + return expectedMigrateToken == migrateToken +} + // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { c.allocLock.RLock() @@ -1303,6 +1321,10 @@ type allocUpdates struct { // filtered is the set of allocations that were not pulled because their // AllocModifyIndex didn't change. filtered map[string]struct{} + + // migrateTokens are a list of tokens necessary for when clients pull data + // from authorized volumes + migrateTokens map[string]string } // watchAllocations is used to scan for updates to allocations @@ -1460,8 +1482,9 @@ OUTER: // Push the updates. update := &allocUpdates{ - filtered: filtered, - pulled: pulledAllocs, + filtered: filtered, + pulled: pulledAllocs, + migrateTokens: resp.MigrateTokens, } select { case updates <- update: @@ -1530,7 +1553,8 @@ func (c *Client) runAllocs(update *allocUpdates) { // Start the new allocations for _, add := range diff.added { - if err := c.addAlloc(add); err != nil { + migrateToken := update.migrateTokens[add.ID] + if err := c.addAlloc(add, migrateToken); err != nil { c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", add.ID, err) } @@ -1572,7 +1596,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { } // addAlloc is invoked when we should add an allocation -func (c *Client) addAlloc(alloc *structs.Allocation) error { +func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error { // Check if we already have an alloc runner c.allocLock.Lock() if _, ok := c.allocs[alloc.ID]; ok { @@ -1589,7 +1613,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { } c.configLock.RLock() - prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger) + prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) c.configLock.RUnlock() diff --git a/client/client_test.go b/client/client_test.go index 850d958b4..b3d36cb50 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/hashstructure" "github.com/stretchr/testify/assert" + "golang.org/x/crypto/blake2b" ctestutil "github.com/hashicorp/nomad/client/testutil" ) @@ -962,3 +963,48 @@ func TestClient_BlockedAllocations(t *testing.T) { <-ar.WaitCh() } } + +func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) { + c.ACLEnabled = true + }) + defer c.Shutdown() + + alloc := mock.Alloc() + h, err := blake2b.New512([]byte(c.secretNodeID())) + assert.Nil(err) + + h.Write([]byte(alloc.ID)) + validToken := string(h.Sum(nil)) + + assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true) +} + +func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) { + c.ACLEnabled = true + }) + defer c.Shutdown() + + assert.Equal(c.ValidateMigrateToken("", ""), false) + + alloc := mock.Alloc() + assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false) + assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false) +} + +func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) {}) + defer c.Shutdown() + + assert.Equal(c.ValidateMigrateToken("", ""), true) +} diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e9127a846..7011a3cfc 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -135,6 +135,12 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http } func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var secret string + s.parseToken(req, &secret) + if !s.agent.Client().ValidateMigrateToken(allocID, secret) { + return nil, structs.ErrPermissionDenied + } + allocFS, err := s.agent.Client().GetAllocFS(allocID) if err != nil { return nil, fmt.Errorf(allocNotFoundErr) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index ab552e1ac..102bb5d25 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "net/http" "net/http/httptest" "reflect" @@ -12,6 +13,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" + "golang.org/x/crypto/blake2b" ) func TestHTTP_AllocsList(t *testing.T) { @@ -316,6 +318,52 @@ func TestHTTP_AllocSnapshot(t *testing.T) { }) } +func createMigrateTokenForClientAndAlloc(allocID, clientSecret string) (string, error) { + h, err := blake2b.New512([]byte(clientSecret)) + + if err != nil { + return "", err + } + + h.Write([]byte(allocID)) + validMigrateToken, err := string(h.Sum(nil)), nil + return validMigrateToken, err +} + +func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + httpACLTest(t, nil, func(s *TestAgent) { + // Request without a token fails + req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) + assert.Nil(err) + + // Make the unauthorized request + respW := httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + assert.NotNil(err) + assert.EqualError(err, structs.ErrPermissionDenied.Error()) + + // Create an allocation + alloc := mock.Alloc() + + validMigrateToken, err := createMigrateTokenForClientAndAlloc(alloc.ID, s.Agent.Client().Node().SecretID) + assert.Nil(err) + + // Request with a token succeeds + url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID) + req, err = http.NewRequest("GET", url, nil) + assert.Nil(err) + + req.Header.Set("X-Nomad-Token", validMigrateToken) + + // Make the unauthorized request + respW = httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error()) + }) +} + func TestHTTP_AllocGC(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 675f01d92..3d6b6d04c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "golang.org/x/crypto/blake2b" "golang.org/x/sync/errgroup" "github.com/armon/go-metrics" @@ -640,6 +641,17 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } +// generateMigrateToken will create a token for a client to access an +// authenticated volume of another client to migrate data for sticky volumes. +func generateMigrateToken(allocID, nodeSecretID string) (string, error) { + h, err := blake2b.New512([]byte(nodeSecretID)) + if err != nil { + return "", err + } + h.Write([]byte(allocID)) + return string(h.Sum(nil)), nil +} + // GetClientAllocs is used to request a lightweight list of alloc modify indexes // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, @@ -687,10 +699,40 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, } reply.Allocs = make(map[string]uint64) + reply.MigrateTokens = make(map[string]string) + // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex + + // If the allocation is going to do a migration, create a + // migration token so that the client can authenticate with + // the node hosting the previous allocation. + if alloc.ShouldMigrate() { + prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation) + if err != nil { + return err + } + + if prevAllocation != nil && prevAllocation.NodeID != alloc.NodeID { + allocNode, err := state.NodeByID(ws, prevAllocation.NodeID) + if err != nil { + return err + } + if allocNode == nil { + // Node must have been GC'd so skip the token + continue + } + + token, err := generateMigrateToken(prevAllocation.ID, allocNode.SecretID) + if err != nil { + return err + } + reply.MigrateTokens[alloc.ID] = token + } + } + reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 74f5feb36..709242711 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1384,6 +1384,61 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } +// A MigrateToken should not be created if an allocation shares the same node +// with its previous allocation +func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake evaluations + prevAlloc := mock.Alloc() + prevAlloc.NodeID = node.ID + alloc := mock.Alloc() + alloc.NodeID = node.ID + alloc.PreviousAllocation = prevAlloc.ID + alloc.DesiredStatus = structs.AllocClientStatusComplete + state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) + err := state.UpsertAllocs(100, []*structs.Allocation{prevAlloc, alloc}) + assert.Nil(err) + + // Lookup the allocs + get := &structs.NodeSpecificRequest{ + NodeID: node.ID, + SecretID: node.SecretID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp2 structs.NodeClientAllocsResponse + + err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2) + assert.Nil(err) + + assert.Equal(uint64(100), resp2.Index) + assert.Equal(2, len(resp2.Allocs)) + assert.Equal(uint64(100), resp2.Allocs[alloc.ID]) + assert.Equal(0, len(resp2.MigrateTokens)) +} + func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { t.Parallel() s1 := testServer(t, nil) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 91b1cd07f..1c0b6dc71 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -817,6 +817,11 @@ type NodeAllocsResponse struct { // NodeClientAllocsResponse is used to return allocs meta data for a single node type NodeClientAllocsResponse struct { Allocs map[string]uint64 + + // MigrateTokens are used when ACLs are enabled to allow cross node, + // authenticated access to sticky volumes + MigrateTokens map[string]string + QueryMeta } @@ -4685,6 +4690,10 @@ func (a *Allocation) RanSuccessfully() bool { // ShouldMigrate returns if the allocation needs data migration func (a *Allocation) ShouldMigrate() bool { + if a.PreviousAllocation == "" { + return false + } + if a.DesiredStatus == AllocDesiredStatusStop || a.DesiredStatus == AllocDesiredStatusEvict { return false } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index cdbe73520..d5b4ff497 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2045,7 +2045,8 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) { func TestAllocation_ShouldMigrate(t *testing.T) { alloc := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2064,7 +2065,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc1 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2080,7 +2082,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc2 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2099,7 +2102,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc3 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2112,6 +2116,26 @@ func TestAllocation_ShouldMigrate(t *testing.T) { if alloc3.ShouldMigrate() { t.Fatalf("bad: %v", alloc) } + + // No previous + alloc4 := Allocation{ + TaskGroup: "foo", + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "foo", + EphemeralDisk: &EphemeralDisk{ + Migrate: true, + Sticky: true, + }, + }, + }, + }, + } + + if alloc4.ShouldMigrate() { + t.Fatalf("bad: %v", alloc4) + } } func TestTaskArtifact_Validate_Checksum(t *testing.T) {