Merge branch 'master' of github.com:hashicorp/nomad

This commit is contained in:
Alex Dadgar
2016-02-22 10:38:15 -08:00
12 changed files with 318 additions and 92 deletions

View File

@@ -7,11 +7,11 @@ services:
language: go
go:
- 1.5.3
- 1.6
- tip
env:
- DOCKER_VERSION=1.9.1 GO15VENDOREXPERIMENT=1
- DOCKER_VERSION=1.9.1
matrix:
allow_failures:

2
Godeps/Godeps.json generated
View File

@@ -164,7 +164,7 @@
},
{
"ImportPath": "github.com/hashicorp/go-immutable-radix",
"Rev": "12e90058b2897552deea141eff51bb7a07a09e63"
"Rev": "8e8ed81f8f0bf1bdd829593fdd5c29922c1ea990"
},
{
"ImportPath": "github.com/hashicorp/go-memdb",

View File

@@ -16,10 +16,6 @@ import (
)
const (
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 15 * time.Second
// taskReceivedSyncLimit is how long the client will wait before sending
// that a task was received to the server. The client does not immediately
// send that the task was received to the server because another transistion
@@ -30,7 +26,7 @@ const (
)
// AllocStateUpdater is used to update the status of an allocation
type AllocStateUpdater func(alloc *structs.Allocation) error
type AllocStateUpdater func(alloc *structs.Allocation)
// AllocRunner is used to wrap an allocation and provide the execution context.
type AllocRunner struct {
@@ -262,9 +258,12 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
} else if pending {
alloc.ClientStatus = structs.AllocClientStatusPending
} else if dead {
alloc.ClientStatus = structs.AllocClientStatusDead
}
return alloc
}
@@ -273,42 +272,19 @@ func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-r.dirtyCh:
r.retrySyncState(r.destroyCh)
r.syncStatus()
case <-r.destroyCh:
return
}
}
}
// retrySyncState is used to retry the state sync until success
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
if err := r.syncStatus(); err == nil {
// The Alloc State might have been re-computed so we are
// snapshoting only the alloc runner
r.saveAllocRunnerState()
return
}
select {
case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)):
case <-stopCh:
return
}
}
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc.
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()
// Attempt to update the status
if err := r.updater(alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
alloc.ID, alloc.ClientStatus, err)
return err
}
return nil
r.updater(alloc)
return r.saveAllocRunnerState()
}
// setStatus is used to update the allocation status
@@ -475,7 +451,7 @@ OUTER:
r.taskLock.Unlock()
// Final state sync
r.retrySyncState(nil)
r.syncStatus()
// Block until we should destroy the state of the alloc
r.handleDestroy()

View File

@@ -16,13 +16,11 @@ import (
type MockAllocStateUpdater struct {
Count int
Allocs []*structs.Allocation
Err error
}
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) error {
func (m *MockAllocStateUpdater) Update(alloc *structs.Allocation) {
m.Count += 1
m.Allocs = append(m.Allocs, alloc)
return m.Err
}
func testAllocRunner(restarts bool) (*MockAllocStateUpdater, *AllocRunner) {

View File

@@ -58,6 +58,14 @@ const (
// nodeUpdateRetryIntv is how often the client checks for updates to the
// node attributes or meta map.
nodeUpdateRetryIntv = 5 * time.Second
// allocSyncIntv is the batching period of allocation updates before they
// are synced with the server.
allocSyncIntv = 200 * time.Millisecond
// allocSyncRetryIntv is the interval on which we retry updating
// the status of the allocation
allocSyncRetryIntv = 5 * time.Second
)
// DefaultConfig returns the default configuration
@@ -100,6 +108,9 @@ type Client struct {
allocs map[string]*AllocRunner
allocLock sync.RWMutex
// allocUpdates stores allocations that need to be synced to the server.
allocUpdates chan *structs.Allocation
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
@@ -112,12 +123,13 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Create the client
c := &Client{
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
shutdownCh: make(chan struct{}),
config: cfg,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
allocs: make(map[string]*AllocRunner),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}
// Setup the Consul Service
@@ -166,6 +178,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Begin periodic snapshotting of state.
go c.periodicSnapshot()
// Begin syncing allocations to the server
go c.allocSync()
// Start the client!
go c.run()
@@ -816,19 +831,66 @@ func (c *Client) updateNodeStatus() error {
}
// updateAllocStatus is used to update the status of an allocation
func (c *Client) updateAllocStatus(alloc *structs.Allocation) error {
args := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
err := c.RPC("Node.UpdateAlloc", &args, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to update allocation: %v", err)
return err
func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
// Only send the fields that are updatable by the client.
stripped := new(structs.Allocation)
stripped.ID = alloc.ID
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
select {
case c.allocUpdates <- stripped:
case <-c.shutdownCh:
}
}
return nil
// allocSync is a long lived function that batches allocation updates to the
// server.
func (c *Client) allocSync() {
staggered := false
syncTicker := time.NewTicker(allocSyncIntv)
updates := make(map[string]*structs.Allocation)
for {
select {
case <-c.shutdownCh:
syncTicker.Stop()
return
case alloc := <-c.allocUpdates:
// Batch the allocation updates until the timer triggers.
updates[alloc.ID] = alloc
case <-syncTicker.C:
// Fast path if there are no updates
if len(updates) == 0 {
continue
}
sync := make([]*structs.Allocation, 0, len(updates))
for _, alloc := range updates {
sync = append(sync, alloc)
}
// Send to server.
args := structs.AllocUpdateRequest{
Alloc: sync,
WriteRequest: structs.WriteRequest{Region: c.config.Region},
}
var resp structs.GenericResponse
if err := c.RPC("Node.UpdateAlloc", &args, &resp); err != nil {
c.logger.Printf("[ERR] client: failed to update allocations: %v", err)
syncTicker.Stop()
syncTicker = time.NewTicker(c.retryIntv(allocSyncRetryIntv))
staggered = true
} else {
updates = make(map[string]*structs.Allocation)
if staggered {
syncTicker.Stop()
syncTicker = time.NewTicker(allocSyncIntv)
staggered = false
}
}
}
}
}
// allocUpdates holds the results of receiving updated allocations from the

View File

@@ -324,27 +324,27 @@ func TestClient_UpdateAllocStatus(t *testing.T) {
alloc := mock.Alloc()
alloc.NodeID = c1.Node().ID
originalStatus := "foo"
alloc.ClientStatus = originalStatus
state := s1.State()
state.UpsertAllocs(100, []*structs.Allocation{alloc})
newAlloc := new(structs.Allocation)
*newAlloc = *alloc
newAlloc.ClientStatus = structs.AllocClientStatusRunning
err := c1.updateAllocStatus(newAlloc)
if err != nil {
testutil.WaitForResult(func() (bool, error) {
out, err := state.AllocByID(alloc.ID)
if err != nil {
return false, err
}
if out == nil {
return false, fmt.Errorf("no such alloc")
}
if out.ClientStatus == originalStatus {
return false, fmt.Errorf("Alloc client status not updated; got %v", out.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
}
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil || out.ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", out)
}
})
}
func TestClient_WatchAllocs(t *testing.T) {
@@ -440,8 +440,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
task.Config["args"] = []string{"10"}
state := s1.State()
err := state.UpsertAllocs(100,
[]*structs.Allocation{alloc1})
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -470,12 +469,20 @@ func TestClient_SaveRestoreState(t *testing.T) {
defer c2.Shutdown()
// Ensure the allocation is running
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
if ar.Alloc().ClientStatus != structs.AllocClientStatusRunning {
t.Fatalf("bad: %#v", ar.Alloc())
}
testutil.WaitForResult(func() (bool, error) {
c2.allocLock.RLock()
ar := c2.allocs[alloc1.ID]
c2.allocLock.RUnlock()
status := ar.Alloc().ClientStatus
alive := status != structs.AllocClientStatusRunning ||
status != structs.AllocClientStatusPending
if !alive {
return false, fmt.Errorf("incorrect client status: %#v", ar.Alloc())
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestClient_Init(t *testing.T) {

View File

@@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"sync"
"time"
"github.com/armon/go-metrics"
@@ -10,9 +11,29 @@ import (
"github.com/hashicorp/nomad/nomad/watch"
)
const (
// batchUpdateInterval is how long we wait to batch updates
batchUpdateInterval = 50 * time.Millisecond
)
// Node endpoint is used for client interactions
type Node struct {
srv *Server
// updates holds pending client status updates for allocations
updates []*structs.Allocation
// updateFuture is used to wait for the pending batch update
// to complete. This may be nil if no batch is pending.
updateFuture *batchFuture
// updateTimer is the timer that will trigger the next batch
// update, and may be nil if there is no batch pending.
updateTimer *time.Timer
// updatesLock synchronizes access to the updates list,
// the future and the timer.
updatesLock sync.Mutex
}
// Register is used to upsert a client that is available for scheduling
@@ -456,18 +477,59 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
return fmt.Errorf("must update at least one allocation")
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, args)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err)
// Add this to the batch
n.updatesLock.Lock()
n.updates = append(n.updates, args.Alloc...)
// Start a new batch if none
future := n.updateFuture
if future == nil {
future = NewBatchFuture()
n.updateFuture = future
n.updateTimer = time.AfterFunc(batchUpdateInterval, func() {
// Get the pending updates
n.updatesLock.Lock()
updates := n.updates
future := n.updateFuture
n.updates = nil
n.updateFuture = nil
n.updateTimer = nil
n.updatesLock.Unlock()
// Perform the batch update
n.batchUpdate(future, updates)
})
}
n.updatesLock.Unlock()
// Wait for the future
if err := future.Wait(); err != nil {
return err
}
// Setup the response
reply.Index = index
reply.Index = future.Index()
return nil
}
// batchUpdate is used to update all the allocations
func (n *Node) batchUpdate(future *batchFuture, updates []*structs.Allocation) {
// Prepare the batch update
batch := &structs.AllocUpdateRequest{
Alloc: updates,
WriteRequest: structs.WriteRequest{Region: n.srv.config.Region},
}
// Commit this update via Raft
_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
if err != nil {
n.srv.logger.Printf("[ERR] nomad.client: alloc update failed: %v", err)
}
// Respond to the future
future.Respond(index, err)
}
// List is used to list the available nodes
func (n *Node) List(args *structs.NodeListRequest,
reply *structs.NodeListResponse) error {
@@ -617,3 +679,35 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
}
return evalIDs, evalIndex, nil
}
// batchFuture is used to wait on a batch update to complete
type batchFuture struct {
doneCh chan struct{}
err error
index uint64
}
// NewBatchFuture creates a new batch future
func NewBatchFuture() *batchFuture {
return &batchFuture{
doneCh: make(chan struct{}),
}
}
// Wait is used to block for the future to complete and returns the error
func (b *batchFuture) Wait() error {
<-b.doneCh
return b.err
}
// Index is used to return the index of the batch, only after Wait()
func (b *batchFuture) Index() uint64 {
return b.index
}
// Respond is used to unblock the future
func (b *batchFuture) Respond(index uint64, err error) {
b.index = index
b.err = err
close(b.doneCh)
}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"fmt"
"reflect"
"testing"
"time"
@@ -817,12 +818,70 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeAllocsResponse
start := time.Now()
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
t.Fatalf("Bad index: %d", resp2.Index)
}
if diff := time.Since(start); diff < batchUpdateInterval {
t.Fatalf("too fast: %v", diff)
}
// Lookup the alloc
out, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out.ClientStatus != structs.AllocClientStatusFailed {
t.Fatalf("Bad: %#v", out)
}
}
func TestClientEndpoint_BatchUpdate(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Inject fake evaluations
alloc := mock.Alloc()
alloc.NodeID = node.ID
state := s1.fsm.State()
err := state.UpsertAllocs(100, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Attempt update
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusFailed
// Call to do the batch update
bf := NewBatchFuture()
endpoint := s1.endpoints.Node
endpoint.batchUpdate(bf, []*structs.Allocation{clientAlloc})
if err := bf.Wait(); err != nil {
t.Fatalf("err: %v", err)
}
if bf.Index() == 0 {
t.Fatalf("Bad index: %d", bf.Index())
}
// Lookup the alloc
out, err := state.AllocByID(alloc.ID)
@@ -1168,3 +1227,30 @@ func TestClientEndpoint_ListNodes_Blocking(t *testing.T) {
t.Fatalf("bad: %#v", resp4.Nodes)
}
}
func TestBatchFuture(t *testing.T) {
bf := NewBatchFuture()
// Async respond to the future
expect := fmt.Errorf("testing")
go func() {
time.Sleep(10 * time.Millisecond)
bf.Respond(1000, expect)
}()
// Block for the result
start := time.Now()
err := bf.Wait()
diff := time.Since(start)
if diff < 5*time.Millisecond {
t.Fatalf("too fast")
}
// Check the results
if err != expect {
t.Fatalf("bad: %s", err)
}
if bf.Index() != 1000 {
t.Fatalf("bad: %d", bf.Index())
}
}

View File

@@ -374,7 +374,7 @@ func (s *Server) Leave() error {
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
s.endpoints.Status = &Status{s}
s.endpoints.Node = &Node{s}
s.endpoints.Node = &Node{srv: s}
s.endpoints.Job = &Job{s}
s.endpoints.Eval = &Eval{s}
s.endpoints.Plan = &Plan{s}

View File

@@ -3,8 +3,6 @@
# This script builds the application from source for multiple platforms.
set -e
export GO15VENDOREXPERIMENT=1
# Get the parent directory of where this script is.
SOURCE="${BASH_SOURCE[0]}"
while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done

View File

@@ -1,7 +1,5 @@
#!/usr/bin/env bash
export GO15VENDOREXPERIMENT=1
# Create a temp dir and clean it up on exit
TEMPDIR=`mktemp -d -t nomad-test.XXX`
trap "rm -rf $TEMPDIR" EXIT HUP INT QUIT TERM

View File

@@ -41,8 +41,15 @@ func (n *Node) isLeaf() bool {
}
func (n *Node) addEdge(e edge) {
num := len(n.edges)
idx := sort.Search(num, func(i int) bool {
return n.edges[i].label >= e.label
})
n.edges = append(n.edges, e)
n.edges.Sort()
if idx != num {
copy(n.edges[idx+1:], n.edges[idx:num])
n.edges[idx] = e
}
}
func (n *Node) replaceEdge(e edge) {