From 58914c7356d1eeecda312e4d4a644bab6137516b Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Mon, 2 Oct 2017 19:18:33 +0000 Subject: [PATCH 01/10] add MigrateTokens to server response for allocs --- nomad/node_endpoint.go | 19 +++++++++++++ nomad/node_endpoint_test.go | 54 +++++++++++++++++++++++++++++++++++++ nomad/structs/structs.go | 4 +++ 3 files changed, 77 insertions(+) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 675f01d92..d78ade420 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "golang.org/x/crypto/blake2b" "golang.org/x/sync/errgroup" "github.com/armon/go-metrics" @@ -640,6 +641,15 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } +func generateMigrateToken(allocID, nodeSecretID string) (string, error) { + h, err := blake2b.New512([]byte(nodeSecretID)) + if err != nil { + return "", err + } + h.Write([]byte(allocID)) + return string(h.Sum(nil)), nil +} + // GetClientAllocs is used to request a lightweight list of alloc modify indexes // per allocation. func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, @@ -687,10 +697,19 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, } reply.Allocs = make(map[string]uint64) + reply.MigrateTokens = make(map[string]string) // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex + + allocNode, err := state.NodeByID(ws, args.NodeID) + token, err := generateMigrateToken(alloc.ID, allocNode.SecretID) + if err != nil { + return err + } + reply.MigrateTokens[alloc.ID] = token + reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } } else { diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 74f5feb36..92013381e 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1384,6 +1384,60 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } +func TestClientEndpoint_GetClientAllocs_WIthMigrateTokens(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + node.CreateIndex = resp.Index + node.ModifyIndex = resp.Index + + // Inject fake evaluations + alloc := mock.Alloc() + alloc.NodeID = node.ID + state := s1.fsm.State() + state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) + err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) + assert.Nil(err) + + // Lookup the allocs + get := &structs.NodeSpecificRequest{ + NodeID: node.ID, + SecretID: node.SecretID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp2 structs.NodeClientAllocsResponse + + err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2) + assert.Nil(err) + + assert.Equal(resp2.Index, uint64(100)) + assert.Equal(len(resp2.Allocs), 1) + assert.Equal(resp2.Allocs[alloc.ID], uint64(100)) + + // verify the correct migrate token was generated + expectedToken, err := generateMigrateToken(alloc.ID, node.SecretID) + assert.Nil(err) + + assert.Equal(resp2.MigrateTokens[alloc.ID], expectedToken) +} + func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { t.Parallel() s1 := testServer(t, nil) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 91b1cd07f..4e7cbe011 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -817,6 +817,10 @@ type NodeAllocsResponse struct { // NodeClientAllocsResponse is used to return allocs meta data for a single node type NodeClientAllocsResponse struct { Allocs map[string]uint64 + + // MigrateTokens are used when ACLs are enabled to verify volume access + MigrateTokens map[string]string + QueryMeta } From fba1653057a782d4c8d559f063eacb68d84b9147 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 3 Oct 2017 13:53:32 -0400 Subject: [PATCH 02/10] Add functionality for authenticated volumes --- client/alloc_runner_test.go | 8 ++++---- client/alloc_watcher.go | 24 +++++++++++++++--------- client/alloc_watcher_test.go | 2 +- client/client.go | 31 ++++++++++++++++++++++++++----- command/agent/alloc_endpoint.go | 9 +++++++-- nomad/node_endpoint.go | 3 +++ 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 17e869fe4..c374aa81c 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -749,7 +749,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -844,7 +844,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -959,7 +959,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "") ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { @@ -1413,7 +1413,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false) // Set prevAlloc like Client does - ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger) + ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "") go ar2.Run() defer ar2.Destroy() diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 8b029a7f7..4fb819978 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -51,7 +51,7 @@ type prevAllocWatcher interface { // newAllocWatcher creates a prevAllocWatcher appropriate for whether this // alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. -func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher { +func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { if alloc.PreviousAllocation == "" { // No previous allocation, use noop transitioner return noopPrevAlloc{} @@ -75,13 +75,14 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, } return &remotePrevAlloc{ - allocID: alloc.ID, - prevAllocID: alloc.PreviousAllocation, - tasks: tg.Tasks, - config: config, - migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, - rpc: rpc, - logger: l, + allocID: alloc.ID, + prevAllocID: alloc.PreviousAllocation, + tasks: tg.Tasks, + config: config, + migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, + rpc: rpc, + logger: l, + migrateToken: migrateToken, } } @@ -236,6 +237,10 @@ type remotePrevAlloc struct { waitingLock sync.RWMutex logger *log.Logger + + // migrateToken allows a client to migrate data in an ACL-protected remote + // volume + migrateToken string } // IsWaiting returns true if there's a concurrent call inside Wait @@ -423,7 +428,8 @@ func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) } url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID) - resp, err := apiClient.Raw().Response(url, nil) + qo := &nomadapi.QueryOptions{SecretID: p.migrateToken} + resp, err := apiClient.Raw().Response(url, qo) if err != nil { prevAllocDir.Destroy() return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index 01e9f1d7e..fb7e93819 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -29,7 +29,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { task.Driver = "mock_driver" task.Config["run_for"] = "500ms" - waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger()) + waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "") // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/client.go b/client/client.go index b8a2bd24a..d501abe0a 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/hashstructure" "github.com/shirou/gopsutil/host" + "golang.org/x/crypto/blake2b" ) const ( @@ -525,6 +526,20 @@ func (c *Client) LatestHostStats() *stats.HostStats { return c.hostStatsCollector.Stats() } +func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool { + if !c.config.ACLEnabled { + return true + } + + h, err := blake2b.New512([]byte(c.secretNodeID())) + if err != nil { + return false + } + h.Write([]byte(allocID)) + expectedMigrateToken := string(h.Sum(nil)) + return expectedMigrateToken == migrateToken +} + // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { c.allocLock.RLock() @@ -1303,6 +1318,10 @@ type allocUpdates struct { // filtered is the set of allocations that were not pulled because their // AllocModifyIndex didn't change. filtered map[string]struct{} + + // migrateTokens are a list of tokens necessary for when clients pull data + // from authorized volumes + migrateTokens map[string]string } // watchAllocations is used to scan for updates to allocations @@ -1460,8 +1479,9 @@ OUTER: // Push the updates. update := &allocUpdates{ - filtered: filtered, - pulled: pulledAllocs, + filtered: filtered, + pulled: pulledAllocs, + migrateTokens: resp.MigrateTokens, } select { case updates <- update: @@ -1530,7 +1550,8 @@ func (c *Client) runAllocs(update *allocUpdates) { // Start the new allocations for _, add := range diff.added { - if err := c.addAlloc(add); err != nil { + migrateToken := update.migrateTokens[add.ID] + if err := c.addAlloc(add, migrateToken); err != nil { c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", add.ID, err) } @@ -1572,7 +1593,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { } // addAlloc is invoked when we should add an allocation -func (c *Client) addAlloc(alloc *structs.Allocation) error { +func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error { // Check if we already have an alloc runner c.allocLock.Lock() if _, ok := c.allocs[alloc.ID]; ok { @@ -1589,7 +1610,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { } c.configLock.RLock() - prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger) + prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) c.configLock.RUnlock() diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e9127a846..36b2d2c63 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -88,11 +88,12 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return nil, CodedError(404, resourceNotFoundErr) } allocID := tokens[0] + migrateToken := req.Header.Get("X-Nomad-Token") switch tokens[1] { case "stats": return s.allocStats(allocID, resp, req) case "snapshot": - return s.allocSnapshot(allocID, resp, req) + return s.allocSnapshot(allocID, migrateToken, resp, req) case "gc": return s.allocGC(allocID, resp, req) } @@ -134,7 +135,11 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http return nil, s.agent.Client().CollectAllocation(allocID) } -func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) allocSnapshot(allocID, migrateToken string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if !s.agent.Client().ValidateMigrateToken(allocID, migrateToken) { + return nil, fmt.Errorf("invalid migrate token for allocation %q", allocID) + } + allocFS, err := s.agent.Client().GetAllocFS(allocID) if err != nil { return nil, fmt.Errorf(allocNotFoundErr) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d78ade420..2503fd8b6 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -704,6 +704,9 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Allocs[alloc.ID] = alloc.AllocModifyIndex allocNode, err := state.NodeByID(ws, args.NodeID) + if err != nil { + return err + } token, err := generateMigrateToken(alloc.ID, allocNode.SecretID) if err != nil { return err From 38f8217ea0c7b436bbfb191ff3c40d8c52f12c8e Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Wed, 4 Oct 2017 14:46:47 -0400 Subject: [PATCH 03/10] add tests for functionality --- client/client_test.go | 46 ++++++++++++++++++++++++++++ command/agent/alloc_endpoint_test.go | 18 +++++++++++ 2 files changed, 64 insertions(+) diff --git a/client/client_test.go b/client/client_test.go index 850d958b4..b3d36cb50 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/mitchellh/hashstructure" "github.com/stretchr/testify/assert" + "golang.org/x/crypto/blake2b" ctestutil "github.com/hashicorp/nomad/client/testutil" ) @@ -962,3 +963,48 @@ func TestClient_BlockedAllocations(t *testing.T) { <-ar.WaitCh() } } + +func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) { + c.ACLEnabled = true + }) + defer c.Shutdown() + + alloc := mock.Alloc() + h, err := blake2b.New512([]byte(c.secretNodeID())) + assert.Nil(err) + + h.Write([]byte(alloc.ID)) + validToken := string(h.Sum(nil)) + + assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true) +} + +func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) { + c.ACLEnabled = true + }) + defer c.Shutdown() + + assert.Equal(c.ValidateMigrateToken("", ""), false) + + alloc := mock.Alloc() + assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false) + assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false) +} + +func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) { + t.Parallel() + assert := assert.New(t) + + c := testClient(t, func(c *config.Config) {}) + defer c.Shutdown() + + assert.Equal(c.ValidateMigrateToken("", ""), true) +} diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index ab552e1ac..c8556b1c9 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -316,6 +316,24 @@ func TestHTTP_AllocSnapshot(t *testing.T) { }) } +func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { + t.Parallel() + assert := assert.New(t) + httpACLTest(t, nil, func(s *TestAgent) { + // TODO add an allocation, assert it is returned + + // Request without a token succeeds + req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) + assert.Nil(err) + + // Make the unauthorized request + respW := httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + assert.NotNil(err) + assert.Contains(err.Error(), "invalid migrate token") + }) +} + func TestHTTP_AllocGC(t *testing.T) { t.Parallel() httpTest(t, nil, func(s *TestAgent) { From 919bd2072d8dac8cf2095a87291b07308eb7a784 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 6 Oct 2017 20:20:30 -0400 Subject: [PATCH 04/10] adding valid case test for http endpoint --- command/agent/alloc_endpoint_test.go | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index c8556b1c9..27cd5d2f9 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/assert" + "golang.org/x/crypto/blake2b" ) func TestHTTP_AllocsList(t *testing.T) { @@ -320,8 +321,6 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { t.Parallel() assert := assert.New(t) httpACLTest(t, nil, func(s *TestAgent) { - // TODO add an allocation, assert it is returned - // Request without a token succeeds req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) assert.Nil(err) @@ -331,6 +330,28 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { _, err = s.Server.ClientAllocRequest(respW, req) assert.NotNil(err) assert.Contains(err.Error(), "invalid migrate token") + + // Create an allocation + state := s.Agent.server.State() + alloc := mock.Alloc() + state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) + + // Set up data to create an authenticated request + h, err := blake2b.New512([]byte(s.Agent.Client().Node().SecretID)) + h.Write([]byte(alloc.ID)) + validMigrateToken, err := string(h.Sum(nil)), nil + assert.Nil(err) + + // Request with a token succeeds + req.Header.Set("X-Nomad-Token", validMigrateToken) + req, err = http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) + assert.Nil(err) + + // Make the unauthorized request + respW = httptest.NewRecorder() + _, err = s.Server.ClientAllocRequest(respW, req) + assert.NotNil(err) + assert.Contains(err.Error(), "invalid migrate token") }) } From 236806835566ee6c419ba81dcd6f460b32780391 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 6 Oct 2017 20:54:09 -0400 Subject: [PATCH 05/10] fixing up code review comments --- client/client.go | 3 +++ command/agent/alloc_endpoint.go | 9 +++++---- nomad/node_endpoint_test.go | 2 +- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index d501abe0a..7e4fa1ec7 100644 --- a/client/client.go +++ b/client/client.go @@ -526,6 +526,9 @@ func (c *Client) LatestHostStats() *stats.HostStats { return c.hostStatsCollector.Stats() } +// ValidateMigrateToken verifies that a token is for a specific client and +// allocation, and has been created by a trusted party that has privilaged +// knowledge of the client's secret identifier func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool { if !c.config.ACLEnabled { return true diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 36b2d2c63..2d4613b54 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -88,12 +88,11 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return nil, CodedError(404, resourceNotFoundErr) } allocID := tokens[0] - migrateToken := req.Header.Get("X-Nomad-Token") switch tokens[1] { case "stats": return s.allocStats(allocID, resp, req) case "snapshot": - return s.allocSnapshot(allocID, migrateToken, resp, req) + return s.allocSnapshot(allocID, resp, req) case "gc": return s.allocGC(allocID, resp, req) } @@ -135,8 +134,10 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http return nil, s.agent.Client().CollectAllocation(allocID) } -func (s *HTTPServer) allocSnapshot(allocID, migrateToken string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if !s.agent.Client().ValidateMigrateToken(allocID, migrateToken) { +func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + var secret string + s.parseToken(req, &secret) + if !s.agent.Client().ValidateMigrateToken(allocID, secret) { return nil, fmt.Errorf("invalid migrate token for allocation %q", allocID) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 92013381e..729049ed6 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1384,7 +1384,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } -func TestClientEndpoint_GetClientAllocs_WIthMigrateTokens(t *testing.T) { +func TestClientEndpoint_GetClientAllocs_WithMigrateTokens(t *testing.T) { t.Parallel() assert := assert.New(t) From 5e0da10bcd9bc293554f225b3bcea2141136e16b Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 6 Oct 2017 21:45:55 -0400 Subject: [PATCH 06/10] adding migration token validation for gc endpoint --- command/agent/alloc_endpoint_test.go | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 27cd5d2f9..9076d0d24 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "net/http" "net/http/httptest" "reflect" @@ -317,6 +318,13 @@ func TestHTTP_AllocSnapshot(t *testing.T) { }) } +func createMigrateTokenForClientAndAlloc(allocID, clientSecret string) (string, error) { + h, err := blake2b.New512([]byte(clientSecret)) + h.Write([]byte(allocID)) + validMigrateToken, err := string(h.Sum(nil)), nil + return validMigrateToken, err +} + func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { t.Parallel() assert := assert.New(t) @@ -336,22 +344,20 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { alloc := mock.Alloc() state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) - // Set up data to create an authenticated request - h, err := blake2b.New512([]byte(s.Agent.Client().Node().SecretID)) - h.Write([]byte(alloc.ID)) - validMigrateToken, err := string(h.Sum(nil)), nil + validMigrateToken, err := createMigrateTokenForClientAndAlloc(alloc.ID, s.Agent.Client().Node().SecretID) assert.Nil(err) // Request with a token succeeds - req.Header.Set("X-Nomad-Token", validMigrateToken) - req, err = http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) + url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID) + req, err = http.NewRequest("GET", url, nil) assert.Nil(err) + req.Header.Set("X-Nomad-Token", validMigrateToken) + // Make the unauthorized request respW = httptest.NewRecorder() _, err = s.Server.ClientAllocRequest(respW, req) - assert.NotNil(err) - assert.Contains(err.Error(), "invalid migrate token") + assert.NotContains(err.Error(), "invalid migrate token") }) } From 76b2c50dbcf11c95ad1df81ad33207d711dca677 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Fri, 6 Oct 2017 21:54:55 -0400 Subject: [PATCH 07/10] fix up build warnings --- client/client.go | 2 +- command/agent/alloc_endpoint_test.go | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/client/client.go b/client/client.go index 7e4fa1ec7..27f1f2c4e 100644 --- a/client/client.go +++ b/client/client.go @@ -527,7 +527,7 @@ func (c *Client) LatestHostStats() *stats.HostStats { } // ValidateMigrateToken verifies that a token is for a specific client and -// allocation, and has been created by a trusted party that has privilaged +// allocation, and has been created by a trusted party that has privileged // knowledge of the client's secret identifier func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool { if !c.config.ACLEnabled { diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 9076d0d24..2eeef7e22 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -320,6 +320,11 @@ func TestHTTP_AllocSnapshot(t *testing.T) { func createMigrateTokenForClientAndAlloc(allocID, clientSecret string) (string, error) { h, err := blake2b.New512([]byte(clientSecret)) + + if err != nil { + return "", err + } + h.Write([]byte(allocID)) validMigrateToken, err := string(h.Sum(nil)), nil return validMigrateToken, err From 1b6f6e598adc7ef535f00b2db0d9066e73fbc886 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Mon, 9 Oct 2017 20:23:26 -0400 Subject: [PATCH 08/10] fixups from code review change creation of a migrate token to be for a previous allocation --- command/agent/alloc_endpoint.go | 2 +- command/agent/alloc_endpoint_test.go | 4 +--- nomad/node_endpoint.go | 30 ++++++++++++++++++++-------- nomad/node_endpoint_test.go | 23 +++++++++++---------- nomad/structs/structs.go | 3 ++- 5 files changed, 38 insertions(+), 24 deletions(-) diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index 2d4613b54..7011a3cfc 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,7 +138,7 @@ func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req var secret string s.parseToken(req, &secret) if !s.agent.Client().ValidateMigrateToken(allocID, secret) { - return nil, fmt.Errorf("invalid migrate token for allocation %q", allocID) + return nil, structs.ErrPermissionDenied } allocFS, err := s.agent.Client().GetAllocFS(allocID) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 2eeef7e22..e1a6041f2 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -334,7 +334,7 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { t.Parallel() assert := assert.New(t) httpACLTest(t, nil, func(s *TestAgent) { - // Request without a token succeeds + // Request without a token fails req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil) assert.Nil(err) @@ -345,9 +345,7 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { assert.Contains(err.Error(), "invalid migrate token") // Create an allocation - state := s.Agent.server.State() alloc := mock.Alloc() - state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID)) validMigrateToken, err := createMigrateTokenForClientAndAlloc(alloc.ID, s.Agent.Client().Node().SecretID) assert.Nil(err) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 2503fd8b6..8114c8314 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -641,6 +641,8 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, return n.srv.blockingRPC(&opts) } +// generateMigrateToken will create a token for a client to access an +// authenticated volume of another client to migrate data for sticky volumes. func generateMigrateToken(allocID, nodeSecretID string) (string, error) { h, err := blake2b.New512([]byte(nodeSecretID)) if err != nil { @@ -703,15 +705,27 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex - allocNode, err := state.NodeByID(ws, args.NodeID) - if err != nil { - return err + // used to create a migrate token for an authenticated volume, using + // the client's secret identifier for authentication. + if alloc.ShouldMigrate() { + prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation) + if err != nil { + return err + } + + if prevAllocation.NodeID != alloc.NodeID { + allocNode, err := state.NodeByID(ws, prevAllocation.NodeID) + + if err != nil { + return err + } + token, err := generateMigrateToken(prevAllocation.ID, allocNode.SecretID) + if err != nil { + return err + } + reply.MigrateTokens[alloc.ID] = token + } } - token, err := generateMigrateToken(alloc.ID, allocNode.SecretID) - if err != nil { - return err - } - reply.MigrateTokens[alloc.ID] = token reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) } diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 729049ed6..709242711 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -1384,7 +1384,9 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) { } } -func TestClientEndpoint_GetClientAllocs_WithMigrateTokens(t *testing.T) { +// A MigrateToken should not be created if an allocation shares the same node +// with its previous allocation +func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) { t.Parallel() assert := assert.New(t) @@ -1409,11 +1411,15 @@ func TestClientEndpoint_GetClientAllocs_WithMigrateTokens(t *testing.T) { node.ModifyIndex = resp.Index // Inject fake evaluations + prevAlloc := mock.Alloc() + prevAlloc.NodeID = node.ID alloc := mock.Alloc() alloc.NodeID = node.ID + alloc.PreviousAllocation = prevAlloc.ID + alloc.DesiredStatus = structs.AllocClientStatusComplete state := s1.fsm.State() state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID)) - err := state.UpsertAllocs(100, []*structs.Allocation{alloc}) + err := state.UpsertAllocs(100, []*structs.Allocation{prevAlloc, alloc}) assert.Nil(err) // Lookup the allocs @@ -1427,15 +1433,10 @@ func TestClientEndpoint_GetClientAllocs_WithMigrateTokens(t *testing.T) { err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2) assert.Nil(err) - assert.Equal(resp2.Index, uint64(100)) - assert.Equal(len(resp2.Allocs), 1) - assert.Equal(resp2.Allocs[alloc.ID], uint64(100)) - - // verify the correct migrate token was generated - expectedToken, err := generateMigrateToken(alloc.ID, node.SecretID) - assert.Nil(err) - - assert.Equal(resp2.MigrateTokens[alloc.ID], expectedToken) + assert.Equal(uint64(100), resp2.Index) + assert.Equal(2, len(resp2.Allocs)) + assert.Equal(uint64(100), resp2.Allocs[alloc.ID]) + assert.Equal(0, len(resp2.MigrateTokens)) } func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4e7cbe011..10cfaa003 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -818,7 +818,8 @@ type NodeAllocsResponse struct { type NodeClientAllocsResponse struct { Allocs map[string]uint64 - // MigrateTokens are used when ACLs are enabled to verify volume access + // MigrateTokens are used when ACLs are enabled to allow cross node, + // authenticated access to sticky volumes MigrateTokens map[string]string QueryMeta From b4ebc69dc77a87c632cdfa8c32248a034c282bdc Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 11 Oct 2017 17:04:09 -0700 Subject: [PATCH 09/10] Small fixes This commit: * Fixes the error checking in migration tests now that we are using the canonical ErrPermissionDenied error * Guard against NPE when looking up objects to generate the migration token * Handle an additional case in ShouldMigrate() --- command/agent/alloc_endpoint_test.go | 4 ++-- nomad/node_endpoint.go | 14 ++++++++++---- nomad/structs/structs.go | 4 ++++ 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index e1a6041f2..102bb5d25 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -342,7 +342,7 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { respW := httptest.NewRecorder() _, err = s.Server.ClientAllocRequest(respW, req) assert.NotNil(err) - assert.Contains(err.Error(), "invalid migrate token") + assert.EqualError(err, structs.ErrPermissionDenied.Error()) // Create an allocation alloc := mock.Alloc() @@ -360,7 +360,7 @@ func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) { // Make the unauthorized request respW = httptest.NewRecorder() _, err = s.Server.ClientAllocRequest(respW, req) - assert.NotContains(err.Error(), "invalid migrate token") + assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error()) }) } diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 8114c8314..3d6b6d04c 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -700,25 +700,31 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Allocs = make(map[string]uint64) reply.MigrateTokens = make(map[string]string) + // Setup the output if len(allocs) != 0 { for _, alloc := range allocs { reply.Allocs[alloc.ID] = alloc.AllocModifyIndex - // used to create a migrate token for an authenticated volume, using - // the client's secret identifier for authentication. + // If the allocation is going to do a migration, create a + // migration token so that the client can authenticate with + // the node hosting the previous allocation. if alloc.ShouldMigrate() { prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation) if err != nil { return err } - if prevAllocation.NodeID != alloc.NodeID { + if prevAllocation != nil && prevAllocation.NodeID != alloc.NodeID { allocNode, err := state.NodeByID(ws, prevAllocation.NodeID) - if err != nil { return err } + if allocNode == nil { + // Node must have been GC'd so skip the token + continue + } + token, err := generateMigrateToken(prevAllocation.ID, allocNode.SecretID) if err != nil { return err diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 10cfaa003..1c0b6dc71 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4690,6 +4690,10 @@ func (a *Allocation) RanSuccessfully() bool { // ShouldMigrate returns if the allocation needs data migration func (a *Allocation) ShouldMigrate() bool { + if a.PreviousAllocation == "" { + return false + } + if a.DesiredStatus == AllocDesiredStatusStop || a.DesiredStatus == AllocDesiredStatusEvict { return false } From 0bb80816f9de80481d0b26016c475337572b794a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 11 Oct 2017 18:08:37 -0700 Subject: [PATCH 10/10] fix test --- nomad/structs/structs_test.go | 32 ++++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index cdbe73520..d5b4ff497 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2045,7 +2045,8 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) { func TestAllocation_ShouldMigrate(t *testing.T) { alloc := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2064,7 +2065,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc1 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2080,7 +2082,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc2 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2099,7 +2102,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) { } alloc3 := Allocation{ - TaskGroup: "foo", + PreviousAllocation: "123", + TaskGroup: "foo", Job: &Job{ TaskGroups: []*TaskGroup{ { @@ -2112,6 +2116,26 @@ func TestAllocation_ShouldMigrate(t *testing.T) { if alloc3.ShouldMigrate() { t.Fatalf("bad: %v", alloc) } + + // No previous + alloc4 := Allocation{ + TaskGroup: "foo", + Job: &Job{ + TaskGroups: []*TaskGroup{ + { + Name: "foo", + EphemeralDisk: &EphemeralDisk{ + Migrate: true, + Sticky: true, + }, + }, + }, + }, + } + + if alloc4.ShouldMigrate() { + t.Fatalf("bad: %v", alloc4) + } } func TestTaskArtifact_Validate_Checksum(t *testing.T) {