mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
Merge pull request #2177 from hashicorp/b-blocking-getallocs
GetAllocs uses a blocking query
This commit is contained in:
@@ -1274,14 +1274,13 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
|
||||
}
|
||||
}
|
||||
|
||||
c.logger.Printf("[DEBUG] client: updated allocations at index %d (pulled %d) (filtered %d)",
|
||||
resp.Index, len(pull), len(filtered))
|
||||
|
||||
// Pull the allocations that passed filtering.
|
||||
allocsResp.Allocs = nil
|
||||
var pulledAllocs map[string]*structs.Allocation
|
||||
if len(pull) != 0 {
|
||||
// Pull the allocations that need to be updated.
|
||||
allocsReq.AllocIDs = pull
|
||||
allocsReq.MinQueryIndex = resp.Index - 1
|
||||
allocsResp = structs.AllocsGetResponse{}
|
||||
if err := c.RPC("Alloc.GetAllocs", &allocsReq, &allocsResp); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
|
||||
@@ -1296,6 +1295,28 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure that we received all the allocations we wanted
|
||||
pulledAllocs = make(map[string]*structs.Allocation, len(allocsResp.Allocs))
|
||||
for _, alloc := range allocsResp.Allocs {
|
||||
pulledAllocs[alloc.ID] = alloc
|
||||
}
|
||||
|
||||
for _, desiredID := range pull {
|
||||
if _, ok := pulledAllocs[desiredID]; !ok {
|
||||
// We didn't get everything we wanted. Do not update the
|
||||
// MinQueryIndex, sleep and then retry.
|
||||
wait := c.retryIntv(2 * time.Second)
|
||||
select {
|
||||
case <-time.After(wait):
|
||||
// Wait for the server we contact to receive the
|
||||
// allocations
|
||||
continue
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check for shutdown
|
||||
select {
|
||||
case <-c.shutdownCh:
|
||||
@@ -1304,19 +1325,18 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
|
||||
}
|
||||
}
|
||||
|
||||
c.logger.Printf("[DEBUG] client: updated allocations at index %d (total %d) (pulled %d) (filtered %d)",
|
||||
resp.Index, len(resp.Allocs), len(allocsResp.Allocs), len(filtered))
|
||||
|
||||
// Update the query index.
|
||||
if resp.Index > req.MinQueryIndex {
|
||||
req.MinQueryIndex = resp.Index
|
||||
}
|
||||
|
||||
// Push the updates.
|
||||
pulled := make(map[string]*structs.Allocation, len(allocsResp.Allocs))
|
||||
for _, alloc := range allocsResp.Allocs {
|
||||
pulled[alloc.ID] = alloc
|
||||
}
|
||||
update := &allocUpdates{
|
||||
filtered: filtered,
|
||||
pulled: pulled,
|
||||
pulled: pulledAllocs,
|
||||
}
|
||||
select {
|
||||
case updates <- update:
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
@@ -97,7 +96,7 @@ func (a *Alloc) GetAlloc(args *structs.AllocSpecificRequest,
|
||||
if out != nil {
|
||||
reply.Index = out.ModifyIndex
|
||||
} else {
|
||||
// Use the last index that affected the nodes table
|
||||
// Use the last index that affected the allocs table
|
||||
index, err := snap.Index("allocs")
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -118,32 +117,73 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest,
|
||||
if done, err := a.srv.forward("Alloc.GetAllocs", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_alloc"}, time.Now())
|
||||
defer metrics.MeasureSince([]string{"nomad", "alloc", "get_allocs"}, time.Now())
|
||||
|
||||
// Lookup the allocations
|
||||
snap, err := a.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
// Build the watch
|
||||
items := make([]watch.Item, 0, len(args.AllocIDs))
|
||||
for _, allocID := range args.AllocIDs {
|
||||
items = append(items, watch.Item{Alloc: allocID})
|
||||
}
|
||||
|
||||
allocs := make([]*structs.Allocation, len(args.AllocIDs))
|
||||
for i, alloc := range args.AllocIDs {
|
||||
out, err := snap.AllocByID(alloc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if out == nil {
|
||||
return fmt.Errorf("unknown alloc id %q", alloc)
|
||||
}
|
||||
|
||||
allocs[i] = out
|
||||
if reply.Index < out.ModifyIndex {
|
||||
reply.Index = out.ModifyIndex
|
||||
}
|
||||
// Setup the blocking query. We wait for at least one of the requested
|
||||
// allocations to be above the min query index. This guarantees that the
|
||||
// server has received that index.
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
watch: watch.NewItems(items...),
|
||||
run: func() error {
|
||||
// Lookup the allocation
|
||||
snap, err := a.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
thresholdMet := false
|
||||
maxIndex := uint64(0)
|
||||
for i, alloc := range args.AllocIDs {
|
||||
out, err := snap.AllocByID(alloc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if out == nil {
|
||||
// We don't have the alloc yet
|
||||
thresholdMet = false
|
||||
break
|
||||
}
|
||||
|
||||
// Store the pointer
|
||||
allocs[i] = out
|
||||
|
||||
// Check if we have passed the minimum index
|
||||
if out.ModifyIndex > args.QueryOptions.MinQueryIndex {
|
||||
thresholdMet = true
|
||||
}
|
||||
|
||||
if maxIndex < out.ModifyIndex {
|
||||
maxIndex = out.ModifyIndex
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the output
|
||||
if thresholdMet {
|
||||
reply.Allocs = allocs
|
||||
reply.Index = maxIndex
|
||||
} else {
|
||||
// Use the last index that affected the nodes table
|
||||
index, err := snap.Index("allocs")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = index
|
||||
}
|
||||
|
||||
// Set the query response
|
||||
a.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// Set the response
|
||||
a.srv.setQueryMeta(&reply.QueryMeta)
|
||||
reply.Allocs = allocs
|
||||
return nil
|
||||
return a.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
@@ -248,8 +248,10 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
|
||||
|
||||
// Lookup the allocs
|
||||
get := &structs.AllocsGetRequest{
|
||||
AllocIDs: []string{alloc.ID, alloc2.ID},
|
||||
QueryOptions: structs.QueryOptions{Region: "global"},
|
||||
AllocIDs: []string{alloc.ID, alloc2.ID},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
},
|
||||
}
|
||||
var resp structs.AllocsGetResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
|
||||
@@ -272,3 +274,57 @@ func TestAllocEndpoint_GetAllocs(t *testing.T) {
|
||||
t.Fatalf("expect error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocEndpoint_GetAllocs_Blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
state := s1.fsm.State()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the allocs
|
||||
alloc1 := mock.Alloc()
|
||||
alloc2 := mock.Alloc()
|
||||
|
||||
// First create an unrelated alloc
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
state.UpsertJobSummary(99, mock.JobSummary(alloc1.JobID))
|
||||
err := state.UpsertAllocs(100, []*structs.Allocation{alloc1})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Create the alloc we are watching later
|
||||
time.AfterFunc(200*time.Millisecond, func() {
|
||||
state.UpsertJobSummary(199, mock.JobSummary(alloc2.JobID))
|
||||
err := state.UpsertAllocs(200, []*structs.Allocation{alloc2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Lookup the allocs
|
||||
get := &structs.AllocsGetRequest{
|
||||
AllocIDs: []string{alloc1.ID, alloc2.ID},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 150,
|
||||
},
|
||||
}
|
||||
var resp structs.AllocsGetResponse
|
||||
start := time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Alloc.GetAllocs", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Since(start); elapsed < 200*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp.Index != 200 {
|
||||
t.Fatalf("Bad index: %d %d", resp.Index, 200)
|
||||
}
|
||||
if len(resp.Allocs) != 2 {
|
||||
t.Fatalf("bad: %#v", resp.Allocs)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user