Merge pull request #3322 from hashicorp/f-authenticated-volumes

Authenticated client volumes
This commit is contained in:
Alex Dadgar
2017-10-11 18:09:21 -07:00
committed by GitHub
11 changed files with 283 additions and 23 deletions

View File

@@ -749,7 +749,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("----- ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
@@ -844,7 +844,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
@@ -959,7 +959,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
// Create a new alloc runner
l2 := prefixedTestLogger("ar2: ")
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2)
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "")
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
err = ar2.RestoreState()
if err != nil {
@@ -1413,7 +1413,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false)
// Set prevAlloc like Client does
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger)
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "")
go ar2.Run()
defer ar2.Destroy()

View File

@@ -51,7 +51,7 @@ type prevAllocWatcher interface {
// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
// alloc's previous allocation was local or remote. If this alloc has no
// previous alloc then a noop implementation is returned.
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher {
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher {
if alloc.PreviousAllocation == "" {
// No previous allocation, use noop transitioner
return noopPrevAlloc{}
@@ -75,13 +75,14 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer,
}
return &remotePrevAlloc{
allocID: alloc.ID,
prevAllocID: alloc.PreviousAllocation,
tasks: tg.Tasks,
config: config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
rpc: rpc,
logger: l,
allocID: alloc.ID,
prevAllocID: alloc.PreviousAllocation,
tasks: tg.Tasks,
config: config,
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
rpc: rpc,
logger: l,
migrateToken: migrateToken,
}
}
@@ -236,6 +237,10 @@ type remotePrevAlloc struct {
waitingLock sync.RWMutex
logger *log.Logger
// migrateToken allows a client to migrate data in an ACL-protected remote
// volume
migrateToken string
}
// IsWaiting returns true if there's a concurrent call inside Wait
@@ -423,7 +428,8 @@ func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string)
}
url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID)
resp, err := apiClient.Raw().Response(url, nil)
qo := &nomadapi.QueryOptions{SecretID: p.migrateToken}
resp, err := apiClient.Raw().Response(url, qo)
if err != nil {
prevAllocDir.Destroy()
return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err)

View File

@@ -29,7 +29,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
task.Driver = "mock_driver"
task.Config["run_for"] = "500ms"
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger())
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "")
// Wait in a goroutine with a context to make sure it exits at the right time
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -33,6 +33,7 @@ import (
vaultapi "github.com/hashicorp/vault/api"
"github.com/mitchellh/hashstructure"
"github.com/shirou/gopsutil/host"
"golang.org/x/crypto/blake2b"
)
const (
@@ -525,6 +526,23 @@ func (c *Client) LatestHostStats() *stats.HostStats {
return c.hostStatsCollector.Stats()
}
// ValidateMigrateToken verifies that a token is for a specific client and
// allocation, and has been created by a trusted party that has privileged
// knowledge of the client's secret identifier
func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool {
if !c.config.ACLEnabled {
return true
}
h, err := blake2b.New512([]byte(c.secretNodeID()))
if err != nil {
return false
}
h.Write([]byte(allocID))
expectedMigrateToken := string(h.Sum(nil))
return expectedMigrateToken == migrateToken
}
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
c.allocLock.RLock()
@@ -1303,6 +1321,10 @@ type allocUpdates struct {
// filtered is the set of allocations that were not pulled because their
// AllocModifyIndex didn't change.
filtered map[string]struct{}
// migrateTokens are a list of tokens necessary for when clients pull data
// from authorized volumes
migrateTokens map[string]string
}
// watchAllocations is used to scan for updates to allocations
@@ -1460,8 +1482,9 @@ OUTER:
// Push the updates.
update := &allocUpdates{
filtered: filtered,
pulled: pulledAllocs,
filtered: filtered,
pulled: pulledAllocs,
migrateTokens: resp.MigrateTokens,
}
select {
case updates <- update:
@@ -1530,7 +1553,8 @@ func (c *Client) runAllocs(update *allocUpdates) {
// Start the new allocations
for _, add := range diff.added {
if err := c.addAlloc(add); err != nil {
migrateToken := update.migrateTokens[add.ID]
if err := c.addAlloc(add, migrateToken); err != nil {
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
add.ID, err)
}
@@ -1572,7 +1596,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
}
// addAlloc is invoked when we should add an allocation
func (c *Client) addAlloc(alloc *structs.Allocation) error {
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
// Check if we already have an alloc runner
c.allocLock.Lock()
if _, ok := c.allocs[alloc.ID]; ok {
@@ -1589,7 +1613,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
}
c.configLock.RLock()
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger)
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
c.configLock.RUnlock()

View File

@@ -24,6 +24,7 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/mitchellh/hashstructure"
"github.com/stretchr/testify/assert"
"golang.org/x/crypto/blake2b"
ctestutil "github.com/hashicorp/nomad/client/testutil"
)
@@ -962,3 +963,48 @@ func TestClient_BlockedAllocations(t *testing.T) {
<-ar.WaitCh()
}
}
func TestClient_ValidateMigrateToken_ValidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := testClient(t, func(c *config.Config) {
c.ACLEnabled = true
})
defer c.Shutdown()
alloc := mock.Alloc()
h, err := blake2b.New512([]byte(c.secretNodeID()))
assert.Nil(err)
h.Write([]byte(alloc.ID))
validToken := string(h.Sum(nil))
assert.Equal(c.ValidateMigrateToken(alloc.ID, validToken), true)
}
func TestClient_ValidateMigrateToken_InvalidToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := testClient(t, func(c *config.Config) {
c.ACLEnabled = true
})
defer c.Shutdown()
assert.Equal(c.ValidateMigrateToken("", ""), false)
alloc := mock.Alloc()
assert.Equal(c.ValidateMigrateToken(alloc.ID, alloc.ID), false)
assert.Equal(c.ValidateMigrateToken(alloc.ID, ""), false)
}
func TestClient_ValidateMigrateToken_ACLDisabled(t *testing.T) {
t.Parallel()
assert := assert.New(t)
c := testClient(t, func(c *config.Config) {})
defer c.Shutdown()
assert.Equal(c.ValidateMigrateToken("", ""), true)
}

View File

@@ -135,6 +135,12 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http
}
func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var secret string
s.parseToken(req, &secret)
if !s.agent.Client().ValidateMigrateToken(allocID, secret) {
return nil, structs.ErrPermissionDenied
}
allocFS, err := s.agent.Client().GetAllocFS(allocID)
if err != nil {
return nil, fmt.Errorf(allocNotFoundErr)

View File

@@ -1,6 +1,7 @@
package agent
import (
"fmt"
"net/http"
"net/http/httptest"
"reflect"
@@ -12,6 +13,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"golang.org/x/crypto/blake2b"
)
func TestHTTP_AllocsList(t *testing.T) {
@@ -316,6 +318,52 @@ func TestHTTP_AllocSnapshot(t *testing.T) {
})
}
func createMigrateTokenForClientAndAlloc(allocID, clientSecret string) (string, error) {
h, err := blake2b.New512([]byte(clientSecret))
if err != nil {
return "", err
}
h.Write([]byte(allocID))
validMigrateToken, err := string(h.Sum(nil)), nil
return validMigrateToken, err
}
func TestHTTP_AllocSnapshot_WithMigrateToken(t *testing.T) {
t.Parallel()
assert := assert.New(t)
httpACLTest(t, nil, func(s *TestAgent) {
// Request without a token fails
req, err := http.NewRequest("GET", "/v1/client/allocation/123/snapshot", nil)
assert.Nil(err)
// Make the unauthorized request
respW := httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotNil(err)
assert.EqualError(err, structs.ErrPermissionDenied.Error())
// Create an allocation
alloc := mock.Alloc()
validMigrateToken, err := createMigrateTokenForClientAndAlloc(alloc.ID, s.Agent.Client().Node().SecretID)
assert.Nil(err)
// Request with a token succeeds
url := fmt.Sprintf("/v1/client/allocation/%s/snapshot", alloc.ID)
req, err = http.NewRequest("GET", url, nil)
assert.Nil(err)
req.Header.Set("X-Nomad-Token", validMigrateToken)
// Make the unauthorized request
respW = httptest.NewRecorder()
_, err = s.Server.ClientAllocRequest(respW, req)
assert.NotContains(err.Error(), structs.ErrPermissionDenied.Error())
})
}
func TestHTTP_AllocGC(t *testing.T) {
t.Parallel()
httpTest(t, nil, func(s *TestAgent) {

View File

@@ -7,6 +7,7 @@ import (
"sync"
"time"
"golang.org/x/crypto/blake2b"
"golang.org/x/sync/errgroup"
"github.com/armon/go-metrics"
@@ -640,6 +641,17 @@ func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
return n.srv.blockingRPC(&opts)
}
// generateMigrateToken will create a token for a client to access an
// authenticated volume of another client to migrate data for sticky volumes.
func generateMigrateToken(allocID, nodeSecretID string) (string, error) {
h, err := blake2b.New512([]byte(nodeSecretID))
if err != nil {
return "", err
}
h.Write([]byte(allocID))
return string(h.Sum(nil)), nil
}
// GetClientAllocs is used to request a lightweight list of alloc modify indexes
// per allocation.
func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
@@ -687,10 +699,40 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
}
reply.Allocs = make(map[string]uint64)
reply.MigrateTokens = make(map[string]string)
// Setup the output
if len(allocs) != 0 {
for _, alloc := range allocs {
reply.Allocs[alloc.ID] = alloc.AllocModifyIndex
// If the allocation is going to do a migration, create a
// migration token so that the client can authenticate with
// the node hosting the previous allocation.
if alloc.ShouldMigrate() {
prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation)
if err != nil {
return err
}
if prevAllocation != nil && prevAllocation.NodeID != alloc.NodeID {
allocNode, err := state.NodeByID(ws, prevAllocation.NodeID)
if err != nil {
return err
}
if allocNode == nil {
// Node must have been GC'd so skip the token
continue
}
token, err := generateMigrateToken(prevAllocation.ID, allocNode.SecretID)
if err != nil {
return err
}
reply.MigrateTokens[alloc.ID] = token
}
}
reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
}
} else {

View File

@@ -1384,6 +1384,61 @@ func TestClientEndpoint_GetClientAllocs_Blocking(t *testing.T) {
}
}
// A MigrateToken should not be created if an allocation shares the same node
// with its previous allocation
func TestClientEndpoint_GetClientAllocs_WithoutMigrateTokens(t *testing.T) {
t.Parallel()
assert := assert.New(t)
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
node.CreateIndex = resp.Index
node.ModifyIndex = resp.Index
// Inject fake evaluations
prevAlloc := mock.Alloc()
prevAlloc.NodeID = node.ID
alloc := mock.Alloc()
alloc.NodeID = node.ID
alloc.PreviousAllocation = prevAlloc.ID
alloc.DesiredStatus = structs.AllocClientStatusComplete
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
err := state.UpsertAllocs(100, []*structs.Allocation{prevAlloc, alloc})
assert.Nil(err)
// Lookup the allocs
get := &structs.NodeSpecificRequest{
NodeID: node.ID,
SecretID: node.SecretID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp2 structs.NodeClientAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.GetClientAllocs", get, &resp2)
assert.Nil(err)
assert.Equal(uint64(100), resp2.Index)
assert.Equal(2, len(resp2.Allocs))
assert.Equal(uint64(100), resp2.Allocs[alloc.ID])
assert.Equal(0, len(resp2.MigrateTokens))
}
func TestClientEndpoint_GetAllocs_Blocking(t *testing.T) {
t.Parallel()
s1 := testServer(t, nil)

View File

@@ -817,6 +817,11 @@ type NodeAllocsResponse struct {
// NodeClientAllocsResponse is used to return allocs meta data for a single node
type NodeClientAllocsResponse struct {
Allocs map[string]uint64
// MigrateTokens are used when ACLs are enabled to allow cross node,
// authenticated access to sticky volumes
MigrateTokens map[string]string
QueryMeta
}
@@ -4685,6 +4690,10 @@ func (a *Allocation) RanSuccessfully() bool {
// ShouldMigrate returns if the allocation needs data migration
func (a *Allocation) ShouldMigrate() bool {
if a.PreviousAllocation == "" {
return false
}
if a.DesiredStatus == AllocDesiredStatusStop || a.DesiredStatus == AllocDesiredStatusEvict {
return false
}

View File

@@ -2045,7 +2045,8 @@ func TestTaskArtifact_Validate_Dest(t *testing.T) {
func TestAllocation_ShouldMigrate(t *testing.T) {
alloc := Allocation{
TaskGroup: "foo",
PreviousAllocation: "123",
TaskGroup: "foo",
Job: &Job{
TaskGroups: []*TaskGroup{
{
@@ -2064,7 +2065,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) {
}
alloc1 := Allocation{
TaskGroup: "foo",
PreviousAllocation: "123",
TaskGroup: "foo",
Job: &Job{
TaskGroups: []*TaskGroup{
{
@@ -2080,7 +2082,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) {
}
alloc2 := Allocation{
TaskGroup: "foo",
PreviousAllocation: "123",
TaskGroup: "foo",
Job: &Job{
TaskGroups: []*TaskGroup{
{
@@ -2099,7 +2102,8 @@ func TestAllocation_ShouldMigrate(t *testing.T) {
}
alloc3 := Allocation{
TaskGroup: "foo",
PreviousAllocation: "123",
TaskGroup: "foo",
Job: &Job{
TaskGroups: []*TaskGroup{
{
@@ -2112,6 +2116,26 @@ func TestAllocation_ShouldMigrate(t *testing.T) {
if alloc3.ShouldMigrate() {
t.Fatalf("bad: %v", alloc)
}
// No previous
alloc4 := Allocation{
TaskGroup: "foo",
Job: &Job{
TaskGroups: []*TaskGroup{
{
Name: "foo",
EphemeralDisk: &EphemeralDisk{
Migrate: true,
Sticky: true,
},
},
},
},
}
if alloc4.ShouldMigrate() {
t.Fatalf("bad: %v", alloc4)
}
}
func TestTaskArtifact_Validate_Checksum(t *testing.T) {