From fba1653057a782d4c8d559f063eacb68d84b9147 Mon Sep 17 00:00:00 2001 From: Chelsea Holland Komlo Date: Tue, 3 Oct 2017 13:53:32 -0400 Subject: [PATCH] Add functionality for authenticated volumes --- client/alloc_runner_test.go | 8 ++++---- client/alloc_watcher.go | 24 +++++++++++++++--------- client/alloc_watcher_test.go | 2 +- client/client.go | 31 ++++++++++++++++++++++++++----- command/agent/alloc_endpoint.go | 9 +++++++-- nomad/node_endpoint.go | 3 +++ 6 files changed, 56 insertions(+), 21 deletions(-) diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 17e869fe4..c374aa81c 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -749,7 +749,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("----- ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -844,7 +844,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "") ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() @@ -959,7 +959,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) { // Create a new alloc runner l2 := prefixedTestLogger("ar2: ") alloc2 := &structs.Allocation{ID: ar.alloc.ID} - prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2) + prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "") ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc) err = ar2.RestoreState() if err != nil { @@ -1413,7 +1413,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) { upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false) // Set prevAlloc like Client does - ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger) + ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "") go ar2.Run() defer ar2.Destroy() diff --git a/client/alloc_watcher.go b/client/alloc_watcher.go index 8b029a7f7..4fb819978 100644 --- a/client/alloc_watcher.go +++ b/client/alloc_watcher.go @@ -51,7 +51,7 @@ type prevAllocWatcher interface { // newAllocWatcher creates a prevAllocWatcher appropriate for whether this // alloc's previous allocation was local or remote. If this alloc has no // previous alloc then a noop implementation is returned. -func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher { +func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher { if alloc.PreviousAllocation == "" { // No previous allocation, use noop transitioner return noopPrevAlloc{} @@ -75,13 +75,14 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, } return &remotePrevAlloc{ - allocID: alloc.ID, - prevAllocID: alloc.PreviousAllocation, - tasks: tg.Tasks, - config: config, - migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, - rpc: rpc, - logger: l, + allocID: alloc.ID, + prevAllocID: alloc.PreviousAllocation, + tasks: tg.Tasks, + config: config, + migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate, + rpc: rpc, + logger: l, + migrateToken: migrateToken, } } @@ -236,6 +237,10 @@ type remotePrevAlloc struct { waitingLock sync.RWMutex logger *log.Logger + + // migrateToken allows a client to migrate data in an ACL-protected remote + // volume + migrateToken string } // IsWaiting returns true if there's a concurrent call inside Wait @@ -423,7 +428,8 @@ func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string) } url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID) - resp, err := apiClient.Raw().Response(url, nil) + qo := &nomadapi.QueryOptions{SecretID: p.migrateToken} + resp, err := apiClient.Raw().Response(url, qo) if err != nil { prevAllocDir.Destroy() return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err) diff --git a/client/alloc_watcher_test.go b/client/alloc_watcher_test.go index 01e9f1d7e..fb7e93819 100644 --- a/client/alloc_watcher_test.go +++ b/client/alloc_watcher_test.go @@ -29,7 +29,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) { task.Driver = "mock_driver" task.Config["run_for"] = "500ms" - waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger()) + waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "") // Wait in a goroutine with a context to make sure it exits at the right time ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/client.go b/client/client.go index b8a2bd24a..d501abe0a 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( vaultapi "github.com/hashicorp/vault/api" "github.com/mitchellh/hashstructure" "github.com/shirou/gopsutil/host" + "golang.org/x/crypto/blake2b" ) const ( @@ -525,6 +526,20 @@ func (c *Client) LatestHostStats() *stats.HostStats { return c.hostStatsCollector.Stats() } +func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool { + if !c.config.ACLEnabled { + return true + } + + h, err := blake2b.New512([]byte(c.secretNodeID())) + if err != nil { + return false + } + h.Write([]byte(allocID)) + expectedMigrateToken := string(h.Sum(nil)) + return expectedMigrateToken == migrateToken +} + // GetAllocFS returns the AllocFS interface for the alloc dir of an allocation func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) { c.allocLock.RLock() @@ -1303,6 +1318,10 @@ type allocUpdates struct { // filtered is the set of allocations that were not pulled because their // AllocModifyIndex didn't change. filtered map[string]struct{} + + // migrateTokens are a list of tokens necessary for when clients pull data + // from authorized volumes + migrateTokens map[string]string } // watchAllocations is used to scan for updates to allocations @@ -1460,8 +1479,9 @@ OUTER: // Push the updates. update := &allocUpdates{ - filtered: filtered, - pulled: pulledAllocs, + filtered: filtered, + pulled: pulledAllocs, + migrateTokens: resp.MigrateTokens, } select { case updates <- update: @@ -1530,7 +1550,8 @@ func (c *Client) runAllocs(update *allocUpdates) { // Start the new allocations for _, add := range diff.added { - if err := c.addAlloc(add); err != nil { + migrateToken := update.migrateTokens[add.ID] + if err := c.addAlloc(add, migrateToken); err != nil { c.logger.Printf("[ERR] client: failed to add alloc '%s': %v", add.ID, err) } @@ -1572,7 +1593,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error { } // addAlloc is invoked when we should add an allocation -func (c *Client) addAlloc(alloc *structs.Allocation) error { +func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error { // Check if we already have an alloc runner c.allocLock.Lock() if _, ok := c.allocs[alloc.ID]; ok { @@ -1589,7 +1610,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { } c.configLock.RLock() - prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger) + prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken) ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc) c.configLock.RUnlock() diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index e9127a846..36b2d2c63 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -88,11 +88,12 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ return nil, CodedError(404, resourceNotFoundErr) } allocID := tokens[0] + migrateToken := req.Header.Get("X-Nomad-Token") switch tokens[1] { case "stats": return s.allocStats(allocID, resp, req) case "snapshot": - return s.allocSnapshot(allocID, resp, req) + return s.allocSnapshot(allocID, migrateToken, resp, req) case "gc": return s.allocGC(allocID, resp, req) } @@ -134,7 +135,11 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http return nil, s.agent.Client().CollectAllocation(allocID) } -func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) allocSnapshot(allocID, migrateToken string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if !s.agent.Client().ValidateMigrateToken(allocID, migrateToken) { + return nil, fmt.Errorf("invalid migrate token for allocation %q", allocID) + } + allocFS, err := s.agent.Client().GetAllocFS(allocID) if err != nil { return nil, fmt.Errorf(allocNotFoundErr) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d78ade420..2503fd8b6 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -704,6 +704,9 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, reply.Allocs[alloc.ID] = alloc.AllocModifyIndex allocNode, err := state.NodeByID(ws, args.NodeID) + if err != nil { + return err + } token, err := generateMigrateToken(alloc.ID, allocNode.SecretID) if err != nil { return err