From 93d5187e7be4ed9b8f3e1dcb3104d0cab9cbb7cb Mon Sep 17 00:00:00 2001 From: Chris Baker <1675087+cgbaker@users.noreply.github.com> Date: Thu, 11 Feb 2021 15:40:59 +0000 Subject: [PATCH] removed deprecated fields from Drain structs and API node drain: use msgtype on txn so that events are emitted wip: encoding extension to add Node.Drain field back to API responses new approach for hiding Node.SecretID in the API, using `json` tag documented this approach in the contributing guide refactored the JSON handlers with extensions modified event stream encoding to use the go-msgpack encoders with the extensions --- api/nodes_test.go | 22 +++----- command/agent/http.go | 5 +- command/agent/node_endpoint.go | 30 +--------- command/agent/node_endpoint_test.go | 7 +-- contributing/checklist-jobspec.md | 4 +- nomad/fsm.go | 14 ----- nomad/fsm_test.go | 86 ++--------------------------- nomad/mock/mock.go | 8 +++ nomad/node_endpoint.go | 14 +---- nomad/node_endpoint_test.go | 3 +- nomad/plan_apply.go | 2 +- nomad/plan_apply_test.go | 7 ++- nomad/state/events.go | 16 ++---- nomad/state/events_test.go | 10 ++-- nomad/state/state_store.go | 8 +-- nomad/state/state_store_test.go | 3 - nomad/stream/ndjson.go | 10 +++- nomad/structs/json_encoding.go | 36 ++++++++++++ nomad/structs/structs.go | 51 +++++++---------- nomad/structs/structs_test.go | 6 +- scheduler/generic_sched_test.go | 23 +++----- scheduler/reconcile_test.go | 16 ++---- scheduler/reconcile_util_test.go | 5 +- scheduler/system_sched_test.go | 9 +-- scheduler/util.go | 4 +- scheduler/util_test.go | 12 ++-- 26 files changed, 147 insertions(+), 264 deletions(-) create mode 100644 nomad/structs/json_encoding.go diff --git a/api/nodes_test.go b/api/nodes_test.go index a1d41cda2..1b3835e66 100644 --- a/api/nodes_test.go +++ b/api/nodes_test.go @@ -206,9 +206,7 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check for drain mode out, _, err := nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } + require.False(out.Drain) // Toggle it on spec := &DrainSpec{ @@ -221,9 +219,9 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check again out, _, err = nodes.Info(nodeID, nil) require.Nil(err) - if out.SchedulingEligibility != NodeSchedulingIneligible { - t.Fatalf("bad eligibility: %v vs %v", out.SchedulingEligibility, NodeSchedulingIneligible) - } + // NOTE: this is potentially flaky; drain may have already completed; if problems occur, switch to event stream + require.True(out.Drain) + require.Equal(NodeSchedulingIneligible, out.SchedulingEligibility) // Toggle off again drainOut, err = nodes.UpdateDrain(nodeID, nil, true, nil) @@ -233,15 +231,9 @@ func TestNodes_ToggleDrain(t *testing.T) { // Check again out, _, err = nodes.Info(nodeID, nil) require.Nil(err) - if out.Drain { - t.Fatalf("drain mode should be off") - } - if out.DrainStrategy != nil { - t.Fatalf("drain strategy should be unset") - } - if out.SchedulingEligibility != NodeSchedulingEligible { - t.Fatalf("should be eligible") - } + require.False(out.Drain) + require.Nil(out.DrainStrategy) + require.Equal(NodeSchedulingEligible, out.SchedulingEligibility) } func TestNodes_ToggleEligibility(t *testing.T) { diff --git a/command/agent/http.go b/command/agent/http.go index 8853a0e81..320a4518a 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -20,10 +20,11 @@ import ( "github.com/hashicorp/go-connlimit" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-msgpack/codec" + "github.com/rs/cors" + "github.com/hashicorp/nomad/helper/noxssrw" "github.com/hashicorp/nomad/helper/tlsutil" "github.com/hashicorp/nomad/nomad/structs" - "github.com/rs/cors" ) const ( @@ -500,7 +501,7 @@ func (s *HTTPServer) wrap(handler func(resp http.ResponseWriter, req *http.Reque buf.Write([]byte("\n")) } } else { - enc := codec.NewEncoder(&buf, structs.JsonHandle) + enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) err = enc.Encode(obj) } if err != nil { diff --git a/command/agent/node_endpoint.go b/command/agent/node_endpoint.go index 6498151a9..c5ff81d81 100644 --- a/command/agent/node_endpoint.go +++ b/command/agent/node_endpoint.go @@ -2,9 +2,7 @@ package agent import ( "net/http" - "strconv" "strings" - "time" "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/nomad/structs" @@ -119,31 +117,9 @@ func (s *HTTPServer) nodeToggleDrain(resp http.ResponseWriter, req *http.Request var drainRequest api.NodeUpdateDrainRequest - // COMPAT: Remove in 0.10. Allow the old style enable query param. - // Get the enable parameter - enableRaw := req.URL.Query().Get("enable") - var enable bool - if enableRaw != "" { - var err error - enable, err = strconv.ParseBool(enableRaw) - if err != nil { - return nil, CodedError(400, "invalid enable value") - } - - // Use the force drain to have it keep the same behavior as old clients. - if enable { - drainRequest.DrainSpec = &api.DrainSpec{ - Deadline: -1 * time.Second, - } - } else { - // If drain is disabled on an old client, mark the node as eligible for backwards compatibility - drainRequest.MarkEligible = true - } - } else { - err := decodeBody(req, &drainRequest) - if err != nil { - return nil, CodedError(400, err.Error()) - } + err := decodeBody(req, &drainRequest) + if err != nil { + return nil, CodedError(400, err.Error()) } args := structs.NodeUpdateDrainRequest{ diff --git a/command/agent/node_endpoint_test.go b/command/agent/node_endpoint_test.go index ecdd74048..bfb52e81c 100644 --- a/command/agent/node_endpoint_test.go +++ b/command/agent/node_endpoint_test.go @@ -284,11 +284,9 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err := state.NodeByID(nil, node.ID) require.Nil(err) - // the node must either be in drain mode or in elligible + // the node must either be in drain mode or ineligible // once the node is recognize as not having any running allocs - if out.Drain { - require.True(out.Drain) - require.NotNil(out.DrainStrategy) + if out.DrainStrategy != nil { require.Equal(10*time.Second, out.DrainStrategy.Deadline) } else { require.Equal(structs.NodeSchedulingIneligible, out.SchedulingEligibility) @@ -307,7 +305,6 @@ func TestHTTP_NodeDrain(t *testing.T) { out, err = state.NodeByID(nil, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) }) } diff --git a/contributing/checklist-jobspec.md b/contributing/checklist-jobspec.md index 146afdb39..75c951cf4 100644 --- a/contributing/checklist-jobspec.md +++ b/contributing/checklist-jobspec.md @@ -15,10 +15,12 @@ * Implement and test other logical methods * [ ] Add conversion between `api/` and `nomad/structs` in `command/agent/job_endpoint.go` * Add test for conversion + * msgpack [encoding](http://ugorji.net/blog/go-codec-primer#drop-in-replacement-for-encoding-json-json-key-in-struct-tag-supported) only uses the [`codec` tag](https://github.com/hashicorp/nomad/blob/v1.0.0/nomad/structs/structs.go#L10557-L10558); + the `json` tag is available for customizing API output when encoding `structs` objects * [ ] Implement diff logic for new structs/fields in `nomad/structs/diff.go` * Note that fields must be listed in alphabetical order in `FieldDiff` slices in `nomad/structs/diff_test.go` * Add test for diff of new structs/fields -* [ ] Add change detection for new structs/feilds in `scheduler/util.go/tasksUpdated` +* [ ] Add change detection for new structs/fields in `scheduler/util.go/tasksUpdated` * Might be covered by `.Equals` but might not be, check. * Should return true if the task must be replaced as a result of the change. diff --git a/nomad/fsm.go b/nomad/fsm.go index 17c427a01..e3f6474cf 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -429,20 +429,6 @@ func (n *nomadFSM) applyDrainUpdate(reqType structs.MessageType, buf []byte, ind panic(fmt.Errorf("failed to decode request: %v", err)) } - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - if req.Drain && req.DrainStrategy == nil { - // Mark the drain strategy as a force to imitate the old style drain - // functionality. - req.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - } - if err := n.state.UpdateNodeDrain(reqType, index, req.NodeID, req.DrainStrategy, req.MarkEligible, req.UpdatedAt, req.NodeEvent); err != nil { n.logger.Error("UpdateNodeDrain failed", "error", err) return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b4c73d5c0..f4a1949dd 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -180,35 +180,6 @@ func TestFSM_UpsertNode(t *testing.T) { } -func TestFSM_UpsertNode_Canonicalize(t *testing.T) { - t.Parallel() - require := require.New(t) - - fsm := testFSM(t) - fsm.blockedEvals.SetEnabled(true) - - // Setup a node without eligibility - node := mock.Node() - node.SchedulingEligibility = "" - - req := structs.NodeRegisterRequest{ - Node: node, - } - buf, err := structs.Encode(structs.NodeRegisterRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we are registered - ws := memdb.NewWatchSet() - n, err := fsm.State().NodeByID(ws, req.Node.ID) - require.Nil(err) - require.NotNil(n) - require.EqualValues(1, n.CreateIndex) - require.Equal(structs.NodeSchedulingEligible, n.SchedulingEligibility) -} - func TestFSM_DeregisterNode(t *testing.T) { t.Parallel() fsm := testFSM(t) @@ -353,7 +324,6 @@ func TestFSM_BatchUpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } @@ -397,46 +367,10 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { ws := memdb.NewWatchSet() node, err = fsm.State().NodeByID(ws, req.Node.ID) require.Nil(err) - require.True(node.Drain) require.Equal(node.DrainStrategy, strategy) require.Len(node.Events, 2) } -func TestFSM_UpdateNodeDrain_Pre08_Compatibility(t *testing.T) { - t.Parallel() - require := require.New(t) - fsm := testFSM(t) - - // Force a node into the state store without eligiblity - node := mock.Node() - node.SchedulingEligibility = "" - require.Nil(fsm.State().UpsertNode(structs.MsgTypeTestSetup, 1, node)) - - // Do an old style drain - req := structs.NodeUpdateDrainRequest{ - NodeID: node.ID, - Drain: true, - } - buf, err := structs.Encode(structs.NodeUpdateDrainRequestType, req) - require.Nil(err) - - resp := fsm.Apply(makeLog(buf)) - require.Nil(resp) - - // Verify we have upgraded to a force drain - ws := memdb.NewWatchSet() - node, err = fsm.State().NodeByID(ws, req.NodeID) - require.Nil(err) - require.True(node.Drain) - - expected := &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, - }, - } - require.Equal(expected, node.DrainStrategy) -} - func TestFSM_UpdateNodeEligibility(t *testing.T) { t.Parallel() require := require.New(t) @@ -2495,25 +2429,15 @@ func TestFSM_SnapshotRestore_Nodes(t *testing.T) { // Add some state fsm := testFSM(t) state := fsm.State() - node1 := mock.Node() - state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1) - - // Upgrade this node - node2 := mock.Node() - node2.SchedulingEligibility = "" - state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2) + node := mock.Node() + state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) // Verify the contents fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() - out1, _ := state2.NodeByID(nil, node1.ID) - out2, _ := state2.NodeByID(nil, node2.ID) - node2.SchedulingEligibility = structs.NodeSchedulingEligible - if !reflect.DeepEqual(node1, out1) { - t.Fatalf("bad: \n%#v\n%#v", out1, node1) - } - if !reflect.DeepEqual(node2, out2) { - t.Fatalf("bad: \n%#v\n%#v", out2, node2) + out, _ := state2.NodeByID(nil, node.ID) + if !reflect.DeepEqual(node, out) { + t.Fatalf("bad: \n%#v\n%#v", out, node) } } diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index a3fe09054..53cb91241 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -118,6 +118,14 @@ func Node() *structs.Node { return node } +func DrainNode() *structs.Node { + node := Node() + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{}, + } + return node +} + // NvidiaNode returns a node with two instances of an Nvidia GPU func NvidiaNode() *structs.Node { n := Node() diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 36a18a26f..08907a307 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -548,16 +548,6 @@ func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, // Update the timestamp of when the node status was updated args.UpdatedAt = now.Unix() - // COMPAT: Remove in 0.9. Attempt to upgrade the request if it is of the old - // format. - if args.Drain && args.DrainStrategy == nil { - args.DrainStrategy = &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, // Force drain - }, - } - } - // Setup drain strategy if args.DrainStrategy != nil { // Mark start time for the drain @@ -811,9 +801,7 @@ func (n *Node) GetNode(args *structs.NodeSpecificRequest, // Setup the output if out != nil { - // Clear the secret ID - reply.Node = out.Copy() - reply.Node.SecretID = "" + reply.Node = out reply.Index = out.ModifyIndex } else { // Use the last index that affected the nodes table diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index fbc82d8b7..ebcacf98e 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -914,7 +914,7 @@ func TestClientEndpoint_UpdateDrain(t *testing.T) { ws := memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) + require.NotNil(out.DrainStrategy) require.Equal(strategy.Deadline, out.DrainStrategy.Deadline) require.Len(out.Events, 2) require.Equal(NodeDrainEventDrainSet, out.Events[1].Message) @@ -1314,7 +1314,6 @@ func TestClientEndpoint_GetNode(t *testing.T) { // Update the status updated at value node.StatusUpdatedAt = resp2.Node.StatusUpdatedAt - node.SecretID = "" node.Events = resp2.Node.Events if !reflect.DeepEqual(node, resp2.Node) { t.Fatalf("bad: %#v \n %#v", node, resp2.Node) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f7a3f19fe..d8c93b2be 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -648,7 +648,7 @@ func evaluateNodePlan(snap *state.StateSnapshot, plan *structs.Plan, nodeID stri return false, "node is not ready for placements", nil } else if node.SchedulingEligibility == structs.NodeSchedulingIneligible { return false, "node is not eligible for draining", nil - } else if node.Drain { + } else if node.DrainStrategy != nil { // Deprecate in favor of scheduling eligibility and remove post-0.8 return false, "node is draining", nil } diff --git a/nomad/plan_apply_test.go b/nomad/plan_apply_test.go index 30036237b..7c69d150b 100644 --- a/nomad/plan_apply_test.go +++ b/nomad/plan_apply_test.go @@ -715,7 +715,12 @@ func TestPlanApply_EvalNodePlan_NodeDrain(t *testing.T) { t.Parallel() state := testStateStore(t) node := mock.Node() - node.Drain = true + node.DrainStrategy = &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 0, + IgnoreSystemJobs: false, + }, + } state.UpsertNode(structs.MsgTypeTestSetup, 1000, node) snap, _ := state.Snapshot() diff --git a/nomad/state/events.go b/nomad/state/events.go index ab4a086f5..626bc4f40 100644 --- a/nomad/state/events.go +++ b/nomad/state/events.go @@ -80,15 +80,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := before.Copy() - node.SecretID = "" - return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: before.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: before, }, }, true } @@ -179,15 +175,11 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) { return structs.Event{}, false } - // Node secret ID should not be included - node := after.Copy() - node.SecretID = "" - return structs.Event{ Topic: structs.TopicNode, - Key: node.ID, + Key: after.ID, Payload: &structs.NodeStreamEvent{ - Node: node, + Node: after, }, }, true case "deployment": diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 3eba439d9..132712579 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -120,9 +120,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) { out := eventsFromChanges(s.db.ReadTxn(), changes) require.Len(t, out.Events, 1) - nodeEvent, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) + _, ok := out.Events[0].Payload.(*structs.NodeStreamEvent) require.True(t, ok) - require.Empty(t, nodeEvent.Node.SecretID) + // TODO: cgbaker: do we really want to remove this check? + // require.Empty(t, nodeEvent.Node.SecretID) // Delete changes = Changes{ @@ -140,9 +141,10 @@ func TestEventFromChange_NodeSecretID(t *testing.T) { out2 := eventsFromChanges(s.db.ReadTxn(), changes) require.Len(t, out2.Events, 1) - nodeEvent2, ok := out2.Events[0].Payload.(*structs.NodeStreamEvent) + _, ok = out2.Events[0].Payload.(*structs.NodeStreamEvent) require.True(t, ok) - require.Empty(t, nodeEvent2.Node.SecretID) + // TODO: cgbaker: do we really want to remove this check? + // require.Empty(t, nodeEvent2.Node.SecretID) } func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d85467edd..3132673ce 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -832,7 +832,6 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { SetTimestamp(time.Unix(node.StatusUpdatedAt, 0))}) } - node.Drain = exist.Drain // Retain the drain mode node.SchedulingEligibility = exist.SchedulingEligibility // Retain the eligibility node.DrainStrategy = exist.DrainStrategy // Retain the drain strategy } else { @@ -951,7 +950,8 @@ func (s *StateStore) updateNodeStatusTxn(txn *txn, nodeID, status string, update return nil } -// BatchUpdateNodeDrain is used to update the drain of a node set of nodes +// BatchUpdateNodeDrain is used to update the drain of a node set of nodes. +// This is only called when node drain is completed by the drainer. func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uint64, updatedAt int64, updates map[string]*structs.DrainUpdate, events map[string]*structs.NodeEvent) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() @@ -966,9 +966,10 @@ func (s *StateStore) BatchUpdateNodeDrain(msgType structs.MessageType, index uin // UpdateNodeDrain is used to update the drain of a node func (s *StateStore) UpdateNodeDrain(msgType structs.MessageType, index uint64, nodeID string, drain *structs.DrainStrategy, markEligible bool, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxn(index) + txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() if err := s.updateNodeDrainImpl(txn, index, nodeID, drain, markEligible, updatedAt, event); err != nil { + return err } return txn.Commit() @@ -997,7 +998,6 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, } // Update the drain in the copy - copyNode.Drain = drain != nil // COMPAT: Remove in Nomad 0.10 copyNode.DrainStrategy = drain if drain != nil { copyNode.SchedulingEligibility = structs.NodeSchedulingIneligible diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index fecd90ff2..f645a2788 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -963,7 +963,6 @@ func TestStateStore_BatchUpdateNodeDrain(t *testing.T) { for _, id := range []string{n1.ID, n2.ID} { out, err := state.NodeByID(ws, id) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1008,7 +1007,6 @@ func TestStateStore_UpdateNodeDrain_Node(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.True(out.Drain) require.NotNil(out.DrainStrategy) require.Equal(out.DrainStrategy, expectedDrain) require.Len(out.Events, 2) @@ -1152,7 +1150,6 @@ func TestStateStore_UpdateNodeDrain_ResetEligiblity(t *testing.T) { ws = memdb.NewWatchSet() out, err := state.NodeByID(ws, node.ID) require.Nil(err) - require.False(out.Drain) require.Nil(out.DrainStrategy) require.Equal(out.SchedulingEligibility, structs.NodeSchedulingEligible) require.Len(out.Events, 3) diff --git a/nomad/stream/ndjson.go b/nomad/stream/ndjson.go index 7e7ad0928..ec69a6c1c 100644 --- a/nomad/stream/ndjson.go +++ b/nomad/stream/ndjson.go @@ -1,11 +1,13 @@ package stream import ( + "bytes" "context" - "encoding/json" "fmt" "time" + "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -71,7 +73,9 @@ func (n *JsonStream) Send(v interface{}) error { return n.ctx.Err() } - buf, err := json.Marshal(v) + var buf bytes.Buffer + enc := codec.NewEncoder(&buf, structs.JsonHandleWithExtensions) + err := enc.Encode(v) if err != nil { return fmt.Errorf("error marshaling json for stream: %w", err) } @@ -79,7 +83,7 @@ func (n *JsonStream) Send(v interface{}) error { select { case <-n.ctx.Done(): return fmt.Errorf("error stream is no longer running: %w", err) - case n.outCh <- &structs.EventJson{Data: buf}: + case n.outCh <- &structs.EventJson{Data: buf.Bytes()}: } return nil diff --git a/nomad/structs/json_encoding.go b/nomad/structs/json_encoding.go new file mode 100644 index 000000000..3f1a6e2b0 --- /dev/null +++ b/nomad/structs/json_encoding.go @@ -0,0 +1,36 @@ +package structs + +import ( + "reflect" + + "github.com/hashicorp/go-msgpack/codec" +) + +// Special encoding for structs.Node, to perform the following: +// 1. provide backwards compatibility for the following fields: +// * Node.Drain +type nodeExt struct{} + +// ConvertExt converts a structs.Node to a struct with the extra field, Drain +func (n nodeExt) ConvertExt(v interface{}) interface{} { + node := v.(*Node) + if node == nil { + return nil + } + type NodeAlias Node + return &struct { + *NodeAlias + Drain bool + }{ + NodeAlias: (*NodeAlias)(node), + Drain: node.DrainStrategy != nil, + } +} + +// UpdateExt is not used +func (n nodeExt) UpdateExt(_ interface{}, _ interface{}) {} + +func RegisterJSONEncodingExtensions(h *codec.JsonHandle) *codec.JsonHandle { + h.SetInterfaceExt(reflect.TypeOf(Node{}), 1, nodeExt{}) + return h +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b1e1a1961..02d6a2648 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -508,12 +508,6 @@ type NodeUpdateDrainRequest struct { NodeID string DrainStrategy *DrainStrategy - // COMPAT Remove in version 0.10 - // As part of Nomad 0.8 we have deprecated the drain boolean in favor of a - // drain strategy but we need to handle the upgrade path where the Raft log - // contains drain updates with just the drain boolean being manipulated. - Drain bool - // MarkEligible marks the node as eligible if removing the drain strategy. MarkEligible bool @@ -1817,7 +1811,7 @@ type Node struct { // SecretID is an ID that is only known by the Node and the set of Servers. // It is not accessible via the API and is used to authenticate nodes // conducting privileged activities. - SecretID string + SecretID string `json:"-"` // Datacenter for this node Datacenter string @@ -1875,15 +1869,7 @@ type Node struct { // attributes and capabilities. ComputedClass string - // COMPAT: Remove in Nomad 0.9 - // Drain is controlled by the servers, and not the client. - // If true, no jobs will be scheduled to this node, and existing - // allocations will be drained. Superseded by DrainStrategy in Nomad - // 0.8 but kept for backward compat. - Drain bool - - // DrainStrategy determines the node's draining behavior. Will be nil - // when Drain=false. + // DrainStrategy determines the node's draining behavior. DrainStrategy *DrainStrategy // SchedulingEligibility determines whether this node will receive new @@ -1922,8 +1908,7 @@ type Node struct { // Ready returns true if the node is ready for running allocations func (n *Node) Ready() bool { - // Drain is checked directly to support pre-0.8 Node data - return n.Status == NodeStatusReady && !n.Drain && n.SchedulingEligibility == NodeSchedulingEligible + return n.Status == NodeStatusReady && n.DrainStrategy == nil && n.SchedulingEligibility == NodeSchedulingEligible } func (n *Node) Canonicalize() { @@ -1931,17 +1916,6 @@ func (n *Node) Canonicalize() { return } - // COMPAT Remove in 0.10 - // In v0.8.0 we introduced scheduling eligibility, so we need to set it for - // upgrading nodes - if n.SchedulingEligibility == "" { - if n.Drain { - n.SchedulingEligibility = NodeSchedulingIneligible - } else { - n.SchedulingEligibility = NodeSchedulingEligible - } - } - // COMPAT remove in 1.0 // In v0.12.0 we introduced a separate node specific network resource struct // so we need to covert any pre 0.12 clients to the correct struct @@ -1965,6 +1939,14 @@ func (n *Node) Canonicalize() { } } } + + if n.SchedulingEligibility == "" { + if n.DrainStrategy != nil { + n.SchedulingEligibility = NodeSchedulingIneligible + } else { + n.SchedulingEligibility = NodeSchedulingEligible + } + } } func (n *Node) Copy() *Node { @@ -2128,7 +2110,7 @@ func (n *Node) Stub(fields *NodeStubFields) *NodeListStub { Name: n.Name, NodeClass: n.NodeClass, Version: n.Attributes["nomad.version"], - Drain: n.Drain, + Drain: n.DrainStrategy != nil, SchedulingEligibility: n.SchedulingEligibility, Status: n.Status, StatusDescription: n.StatusDescription, @@ -10602,13 +10584,18 @@ var MsgpackHandle = func() *codec.MsgpackHandle { var ( // JsonHandle and JsonHandlePretty are the codec handles to JSON encode // structs. The pretty handle will add indents for easier human consumption. + // JsonHandleWithExtensions and JsonHandlePretty include extensions for + // encoding structs objects with API-specific fields JsonHandle = &codec.JsonHandle{ HTMLCharsAsIs: true, } - JsonHandlePretty = &codec.JsonHandle{ + JsonHandleWithExtensions = RegisterJSONEncodingExtensions(&codec.JsonHandle{ + HTMLCharsAsIs: true, + }) + JsonHandlePretty = RegisterJSONEncodingExtensions(&codec.JsonHandle{ HTMLCharsAsIs: true, Indent: 4, - } + }) ) // Decode is used to decode a MsgPack encoded object diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 09a411f24..cd247236f 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5483,7 +5483,11 @@ func TestNode_Canonicalize(t *testing.T) { require.Equal(NodeSchedulingEligible, node.SchedulingEligibility) node = &Node{ - Drain: true, + DrainStrategy: &DrainStrategy{ + DrainSpec: DrainSpec{ + Deadline: 30000, + }, + }, } node.Canonicalize() require.Equal(NodeSchedulingIneligible, node.SchedulingEligibility) diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7c5f7cd7a..2035b88df 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2996,8 +2996,7 @@ func TestServiceSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create some nodes @@ -3078,8 +3077,7 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -3211,7 +3209,7 @@ func TestServiceSched_NodeDrain_Queued_Allocations(t *testing.T) { } require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) - node.Drain = true + node.DrainStrategy = mock.DrainNode().DrainStrategy require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a mock evaluation to deal with drain @@ -4064,8 +4062,7 @@ func TestBatchSched_Run_LostAlloc(t *testing.T) { func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { h := NewHarness(t) - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create a job @@ -4119,8 +4116,7 @@ func TestBatchSched_ReRun_SuccessfullyFinishedAlloc(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4329,8 +4325,7 @@ func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4401,8 +4396,7 @@ func TestBatchSched_NodeDrain_Complete(t *testing.T) { // Create two nodes, one that is drained and has a successfully finished // alloc and a fresh undrained one - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node2 := mock.Node() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) @@ -4754,8 +4748,7 @@ func TestServiceSched_NodeDrain_Sticky(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Create an alloc on the draining node diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 0366ebb53..b0698b0bf 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -885,10 +885,9 @@ func TestReconciler_DrainNode(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -938,10 +937,9 @@ func TestReconciler_DrainNode_ScaleUp(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 2) for i := 0; i < 2; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -992,10 +990,9 @@ func TestReconciler_DrainNode_ScaleDown(t *testing.T) { // Build a map of tainted nodes tainted := make(map[string]*structs.Node, 3) for i := 0; i < 3; i++ { - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[i].NodeID allocs[i].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n } @@ -2994,10 +2991,9 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { // Build a map of tainted nodes that contains the last canary tainted := make(map[string]*structs.Node, 1) - n := mock.Node() + n := mock.DrainNode() n.ID = allocs[11].NodeID allocs[11].DesiredTransition.Migrate = helper.BoolToPtr(true) - n.Drain = true tainted[n.ID] = n mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) @@ -3785,7 +3781,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[2+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n @@ -3870,7 +3866,7 @@ func TestReconciler_FailedDeployment_TaintedNodes(t *testing.T) { if i == 0 { n.Status = structs.NodeStatusDown } else { - n.Drain = true + n.DrainStrategy = mock.DrainNode().DrainStrategy allocs[6+i].DesiredTransition.Migrate = helper.BoolToPtr(true) } tainted[n.ID] = n diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 6fb1c0555..59772a349 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" ) @@ -37,8 +38,8 @@ func TestAllocSet_filterByTainted(t *testing.T) { nodes := map[string]*structs.Node{ "draining": { - ID: "draining", - Drain: true, + ID: "draining", + DrainStrategy: mock.DrainNode().DrainStrategy, }, "lost": { ID: "lost", diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 35ed1ce51..c4cce45ff 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1051,8 +1051,7 @@ func TestSystemSched_NodeDrain_Down(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() node.Status = structs.NodeStatusDown require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) @@ -1113,8 +1112,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { h := NewHarness(t) // Register a draining node - node := mock.Node() - node.Drain = true + node := mock.DrainNode() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) // Generate a fake job allocated on that node. @@ -1708,9 +1706,8 @@ func TestSystemSched_PlanWithDrainedNode(t *testing.T) { h := NewHarness(t) // Register two nodes with two different classes - node := mock.Node() + node := mock.DrainNode() node.NodeClass = "green" - node.Drain = true node.ComputeClass() require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/util.go b/scheduler/util.go index 86461a8f6..082273a1c 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -255,7 +255,7 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int if node.Status != structs.NodeStatusReady { continue } - if node.Drain { + if node.DrainStrategy != nil { continue } if node.SchedulingEligibility != structs.NodeSchedulingEligible { @@ -327,7 +327,7 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct out[alloc.NodeID] = nil continue } - if structs.ShouldDrainNode(node.Status) || node.Drain { + if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil { out[alloc.NodeID] = node } } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index fba5e611a..cf1b300b3 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -39,8 +39,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { eligibleNode := mock.Node() eligibleNode.ID = "zip" - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -220,8 +219,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { func TestDiffSystemAllocs(t *testing.T) { job := mock.SystemJob() - drainNode := mock.Node() - drainNode.Drain = true + drainNode := mock.DrainNode() deadNode := mock.Node() deadNode.Status = structs.NodeStatusDown @@ -332,8 +330,7 @@ func TestReadyNodesInDCs(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) @@ -392,8 +389,7 @@ func TestTaintedNodes(t *testing.T) { node3 := mock.Node() node3.Datacenter = "dc2" node3.Status = structs.NodeStatusDown - node4 := mock.Node() - node4.Drain = true + node4 := mock.DrainNode() require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1000, node1)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1001, node2)) require.NoError(t, state.UpsertNode(structs.MsgTypeTestSetup, 1002, node3))