mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
Add functionality for authenticated volumes
This commit is contained in:
committed by
Alex Dadgar
parent
58914c7356
commit
fba1653057
@@ -749,7 +749,7 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("----- ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
|
||||
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
@@ -844,7 +844,7 @@ func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2)
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, ar.config, l2, "")
|
||||
ar2 := NewAllocRunner(l2, ar.config, ar.stateDB, upd.Update,
|
||||
alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
@@ -959,7 +959,7 @@ func TestAllocRunner_SaveRestoreState_Upgrade(t *testing.T) {
|
||||
// Create a new alloc runner
|
||||
l2 := prefixedTestLogger("ar2: ")
|
||||
alloc2 := &structs.Allocation{ID: ar.alloc.ID}
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2)
|
||||
prevAlloc := newAllocWatcher(alloc2, ar, nil, origConfig, l2, "")
|
||||
ar2 := NewAllocRunner(l2, origConfig, ar.stateDB, upd.Update, alloc2, ar.vaultClient, ar.consulClient, prevAlloc)
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
@@ -1413,7 +1413,7 @@ func TestAllocRunner_MoveAllocDir(t *testing.T) {
|
||||
upd2, ar2 := testAllocRunnerFromAlloc(alloc2, false)
|
||||
|
||||
// Set prevAlloc like Client does
|
||||
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger)
|
||||
ar2.prevAlloc = newAllocWatcher(alloc2, ar, nil, ar2.config, ar2.logger, "")
|
||||
|
||||
go ar2.Run()
|
||||
defer ar2.Destroy()
|
||||
|
||||
@@ -51,7 +51,7 @@ type prevAllocWatcher interface {
|
||||
// newAllocWatcher creates a prevAllocWatcher appropriate for whether this
|
||||
// alloc's previous allocation was local or remote. If this alloc has no
|
||||
// previous alloc then a noop implementation is returned.
|
||||
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger) prevAllocWatcher {
|
||||
func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer, config *config.Config, l *log.Logger, migrateToken string) prevAllocWatcher {
|
||||
if alloc.PreviousAllocation == "" {
|
||||
// No previous allocation, use noop transitioner
|
||||
return noopPrevAlloc{}
|
||||
@@ -75,13 +75,14 @@ func newAllocWatcher(alloc *structs.Allocation, prevAR *AllocRunner, rpc rpcer,
|
||||
}
|
||||
|
||||
return &remotePrevAlloc{
|
||||
allocID: alloc.ID,
|
||||
prevAllocID: alloc.PreviousAllocation,
|
||||
tasks: tg.Tasks,
|
||||
config: config,
|
||||
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
|
||||
rpc: rpc,
|
||||
logger: l,
|
||||
allocID: alloc.ID,
|
||||
prevAllocID: alloc.PreviousAllocation,
|
||||
tasks: tg.Tasks,
|
||||
config: config,
|
||||
migrate: tg.EphemeralDisk != nil && tg.EphemeralDisk.Migrate,
|
||||
rpc: rpc,
|
||||
logger: l,
|
||||
migrateToken: migrateToken,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -236,6 +237,10 @@ type remotePrevAlloc struct {
|
||||
waitingLock sync.RWMutex
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
// migrateToken allows a client to migrate data in an ACL-protected remote
|
||||
// volume
|
||||
migrateToken string
|
||||
}
|
||||
|
||||
// IsWaiting returns true if there's a concurrent call inside Wait
|
||||
@@ -423,7 +428,8 @@ func (p *remotePrevAlloc) migrateAllocDir(ctx context.Context, nodeAddr string)
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("/v1/client/allocation/%v/snapshot", p.prevAllocID)
|
||||
resp, err := apiClient.Raw().Response(url, nil)
|
||||
qo := &nomadapi.QueryOptions{SecretID: p.migrateToken}
|
||||
resp, err := apiClient.Raw().Response(url, qo)
|
||||
if err != nil {
|
||||
prevAllocDir.Destroy()
|
||||
return nil, fmt.Errorf("error getting snapshot from previous alloc %q: %v", p.prevAllocID, err)
|
||||
|
||||
@@ -29,7 +29,7 @@ func TestPrevAlloc_LocalPrevAlloc(t *testing.T) {
|
||||
task.Driver = "mock_driver"
|
||||
task.Config["run_for"] = "500ms"
|
||||
|
||||
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger())
|
||||
waiter := newAllocWatcher(newAlloc, prevAR, nil, nil, testLogger(), "")
|
||||
|
||||
// Wait in a goroutine with a context to make sure it exits at the right time
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
vaultapi "github.com/hashicorp/vault/api"
|
||||
"github.com/mitchellh/hashstructure"
|
||||
"github.com/shirou/gopsutil/host"
|
||||
"golang.org/x/crypto/blake2b"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -525,6 +526,20 @@ func (c *Client) LatestHostStats() *stats.HostStats {
|
||||
return c.hostStatsCollector.Stats()
|
||||
}
|
||||
|
||||
func (c *Client) ValidateMigrateToken(allocID, migrateToken string) bool {
|
||||
if !c.config.ACLEnabled {
|
||||
return true
|
||||
}
|
||||
|
||||
h, err := blake2b.New512([]byte(c.secretNodeID()))
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
h.Write([]byte(allocID))
|
||||
expectedMigrateToken := string(h.Sum(nil))
|
||||
return expectedMigrateToken == migrateToken
|
||||
}
|
||||
|
||||
// GetAllocFS returns the AllocFS interface for the alloc dir of an allocation
|
||||
func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
|
||||
c.allocLock.RLock()
|
||||
@@ -1303,6 +1318,10 @@ type allocUpdates struct {
|
||||
// filtered is the set of allocations that were not pulled because their
|
||||
// AllocModifyIndex didn't change.
|
||||
filtered map[string]struct{}
|
||||
|
||||
// migrateTokens are a list of tokens necessary for when clients pull data
|
||||
// from authorized volumes
|
||||
migrateTokens map[string]string
|
||||
}
|
||||
|
||||
// watchAllocations is used to scan for updates to allocations
|
||||
@@ -1460,8 +1479,9 @@ OUTER:
|
||||
|
||||
// Push the updates.
|
||||
update := &allocUpdates{
|
||||
filtered: filtered,
|
||||
pulled: pulledAllocs,
|
||||
filtered: filtered,
|
||||
pulled: pulledAllocs,
|
||||
migrateTokens: resp.MigrateTokens,
|
||||
}
|
||||
select {
|
||||
case updates <- update:
|
||||
@@ -1530,7 +1550,8 @@ func (c *Client) runAllocs(update *allocUpdates) {
|
||||
|
||||
// Start the new allocations
|
||||
for _, add := range diff.added {
|
||||
if err := c.addAlloc(add); err != nil {
|
||||
migrateToken := update.migrateTokens[add.ID]
|
||||
if err := c.addAlloc(add, migrateToken); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to add alloc '%s': %v",
|
||||
add.ID, err)
|
||||
}
|
||||
@@ -1572,7 +1593,7 @@ func (c *Client) updateAlloc(exist, update *structs.Allocation) error {
|
||||
}
|
||||
|
||||
// addAlloc is invoked when we should add an allocation
|
||||
func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
||||
func (c *Client) addAlloc(alloc *structs.Allocation, migrateToken string) error {
|
||||
// Check if we already have an alloc runner
|
||||
c.allocLock.Lock()
|
||||
if _, ok := c.allocs[alloc.ID]; ok {
|
||||
@@ -1589,7 +1610,7 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
|
||||
}
|
||||
|
||||
c.configLock.RLock()
|
||||
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger)
|
||||
prevAlloc := newAllocWatcher(alloc, prevAR, c, c.configCopy, c.logger, migrateToken)
|
||||
|
||||
ar := NewAllocRunner(c.logger, c.configCopy, c.stateDB, c.updateAllocStatus, alloc, c.vaultClient, c.consulService, prevAlloc)
|
||||
c.configLock.RUnlock()
|
||||
|
||||
@@ -88,11 +88,12 @@ func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Requ
|
||||
return nil, CodedError(404, resourceNotFoundErr)
|
||||
}
|
||||
allocID := tokens[0]
|
||||
migrateToken := req.Header.Get("X-Nomad-Token")
|
||||
switch tokens[1] {
|
||||
case "stats":
|
||||
return s.allocStats(allocID, resp, req)
|
||||
case "snapshot":
|
||||
return s.allocSnapshot(allocID, resp, req)
|
||||
return s.allocSnapshot(allocID, migrateToken, resp, req)
|
||||
case "gc":
|
||||
return s.allocGC(allocID, resp, req)
|
||||
}
|
||||
@@ -134,7 +135,11 @@ func (s *HTTPServer) allocGC(allocID string, resp http.ResponseWriter, req *http
|
||||
return nil, s.agent.Client().CollectAllocation(allocID)
|
||||
}
|
||||
|
||||
func (s *HTTPServer) allocSnapshot(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
func (s *HTTPServer) allocSnapshot(allocID, migrateToken string, resp http.ResponseWriter, req *http.Request) (interface{}, error) {
|
||||
if !s.agent.Client().ValidateMigrateToken(allocID, migrateToken) {
|
||||
return nil, fmt.Errorf("invalid migrate token for allocation %q", allocID)
|
||||
}
|
||||
|
||||
allocFS, err := s.agent.Client().GetAllocFS(allocID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(allocNotFoundErr)
|
||||
|
||||
@@ -704,6 +704,9 @@ func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
|
||||
reply.Allocs[alloc.ID] = alloc.AllocModifyIndex
|
||||
|
||||
allocNode, err := state.NodeByID(ws, args.NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
token, err := generateMigrateToken(alloc.ID, allocNode.SecretID)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user