Job GC endpoint

This commit is contained in:
Alex Dadgar
2016-02-20 15:50:41 -08:00
parent f4432b8853
commit 902c14beb4
12 changed files with 391 additions and 10 deletions

17
api/system.go Normal file
View File

@@ -0,0 +1,17 @@
package api
// Status is used to query the status-related endpoints.
type System struct {
client *Client
}
// System returns a handle on the system endpoints.
func (c *Client) System() *System {
return &System{client: c}
}
func (s *System) GarbageCollect() error {
var req struct{}
_, err := s.client.write("/v1/system/gc", &req, nil, nil)
return err
}

14
api/system_test.go Normal file
View File

@@ -0,0 +1,14 @@
package api
import (
"testing"
)
func TestSystem_GarbageCollect(t *testing.T) {
c, s := makeClient(t, nil, nil)
defer s.Stop()
e := c.System()
if err := e.GarbageCollect(); err != nil {
t.Fatal(err)
}
}

View File

@@ -118,6 +118,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest))
s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest))
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
if enableDebug {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)

View File

@@ -0,0 +1,24 @@
package agent
import (
"net/http"
"github.com/hashicorp/nomad/nomad/structs"
)
func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
if req.Method != "PUT" {
return nil, CodedError(405, ErrInvalidMethod)
}
var args structs.GenericRequest
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var gResp structs.GenericResponse
if err := s.agent.RPC("System.GarbageCollect", &args, &gResp); err != nil {
return nil, err
}
return nil, nil
}

View File

@@ -0,0 +1,23 @@
package agent
import (
"net/http"
"net/http/httptest"
"testing"
)
func TestHTTP_SystemGarbageCollect(t *testing.T) {
httpTest(t, nil, func(s *TestServer) {
// Make the HTTP request
req, err := http.NewRequest("PUT", "/v1/system/gc", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
respW := httptest.NewRecorder()
// Make the request
if _, err := s.Server.GarbageCollectRequest(respW, req); err != nil {
t.Fatalf("err: %v", err)
}
})
}

View File

@@ -2,6 +2,7 @@ package nomad
import (
"fmt"
"math"
"time"
"github.com/hashicorp/nomad/nomad/state"
@@ -48,10 +49,18 @@ func (c *CoreScheduler) jobGC(eval *structs.Evaluation) error {
return err
}
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced job GC")
} else {
// Get the time table to calculate GC cutoffs.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.JobGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: job GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.JobGCThreshold)
@@ -125,12 +134,20 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold := tt.NearestIndex(cutoff)
var oldThreshold uint64
if eval.TriggeredBy == structs.EvalTriggerForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.srv.logger.Println("[DEBUG] sched.core: forced eval GC")
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}
c.srv.logger.Printf("[DEBUG] sched.core: eval GC: scanning before index %d (%v)",
oldThreshold, c.srv.config.EvalGCThreshold)

View File

@@ -69,6 +69,61 @@ func TestCoreScheduler_EvalGC(t *testing.T) {
}
}
func TestCoreScheduler_EvalGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert "dead" eval
state := s1.fsm.State()
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
err := state.UpsertEvals(1000, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
// Insert "dead" alloc
alloc := mock.Alloc()
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
err = state.UpsertAllocs(1001, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobEvalGC)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
out, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("bad: %v", outA)
}
}
func TestCoreScheduler_NodeGC(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
@@ -112,6 +167,45 @@ func TestCoreScheduler_NodeGC(t *testing.T) {
}
}
func TestCoreScheduler_NodeGC_Force(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert "dead" node
state := s1.fsm.State()
node := mock.Node()
node.Status = structs.NodeStatusDown
err := state.UpsertNode(1000, node)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobNodeGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}
// Should be gone
out, err := state.NodeByID(node.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %v", out)
}
}
func TestCoreScheduler_JobGC(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
@@ -215,3 +309,104 @@ func TestCoreScheduler_JobGC(t *testing.T) {
}
}
}
func TestCoreScheduler_JobGC_Force(t *testing.T) {
tests := []struct {
test, evalStatus, allocStatus string
shouldExist bool
}{
{
test: "Terminal",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: false,
},
{
test: "Has Alloc",
evalStatus: structs.EvalStatusFailed,
allocStatus: structs.AllocDesiredStatusRun,
shouldExist: true,
},
{
test: "Has Eval",
evalStatus: structs.EvalStatusPending,
allocStatus: structs.AllocDesiredStatusFailed,
shouldExist: true,
},
}
for _, test := range tests {
s1 := testServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Insert job.
state := s1.fsm.State()
job := mock.Job()
job.GC = true
err := state.UpsertJob(1000, job)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert eval
eval := mock.Eval()
eval.JobID = job.ID
eval.Status = test.evalStatus
err = state.UpsertEvals(1001, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Insert alloc
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = test.allocStatus
err = state.UpsertAllocs(1002, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Create a core scheduler
snap, err := state.Snapshot()
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
core := NewCoreScheduler(s1, snap)
// Attempt the GC
gc := s1.forceCoreJobEval(structs.CoreJobJobGC)
gc.ModifyIndex = 2000
err = core.Process(gc)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
// Should still exist
out, err := state.JobByID(job.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && out == nil) || (!test.shouldExist && out != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outE, err := state.EvalByID(eval.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outE == nil) || (!test.shouldExist && outE != nil) {
t.Fatalf("test(%s) bad: %v", test.test, out)
}
outA, err := state.AllocByID(alloc.ID)
if err != nil {
t.Fatalf("test(%s) err: %v", test.test, err)
}
if (test.shouldExist && outA == nil) || (!test.shouldExist && outA != nil) {
t.Fatalf("test(%s) bad: %v", test.test, outA)
}
}
}

View File

@@ -269,6 +269,14 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation {
}
}
// forceCoreJobEval returns an evaluation for a core job that will ignore GC
// cutoffs.
func (s *Server) forceCoreJobEval(job string) *structs.Evaluation {
eval := s.coreJobEval(job)
eval.TriggeredBy = structs.EvalTriggerForceGC
return eval
}
// reapFailedEvaluations is used to reap evaluations that
// have reached their delivery limit and should be failed
func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {

View File

@@ -144,6 +144,7 @@ type endpoints struct {
Alloc *Alloc
Region *Region
Periodic *Periodic
System *System
}
// NewServer is used to construct a new Nomad server from the
@@ -380,6 +381,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Alloc = &Alloc{s}
s.endpoints.Region = &Region{s}
s.endpoints.Periodic = &Periodic{s}
s.endpoints.System = &System{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
@@ -390,6 +392,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Alloc)
s.rpcServer.Register(s.endpoints.Region)
s.rpcServer.Register(s.endpoints.Periodic)
s.rpcServer.Register(s.endpoints.System)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

View File

@@ -2183,6 +2183,7 @@ const (
EvalTriggerPeriodicJob = "periodic-job"
EvalTriggerNodeUpdate = "node-update"
EvalTriggerScheduled = "scheduled"
EvalTriggerForceGC = "force-gc"
EvalTriggerRollingUpdate = "rolling-update"
)

23
nomad/system_endpoint.go Normal file
View File

@@ -0,0 +1,23 @@
package nomad
import (
"github.com/hashicorp/nomad/nomad/structs"
)
// System endpoint is used to call invoke system tasks.
type System struct {
srv *Server
}
// GarbageCollect is used to trigger the system to immediately garbage collect nodes, evals
// and jobs.
func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.GenericResponse) error {
if done, err := s.srv.forward("System.GarbageCollect", args, args, reply); done {
return err
}
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobEvalGC))
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobNodeGC))
s.srv.evalBroker.Enqueue(s.srv.forceCoreJobEval(structs.CoreJobJobGC))
return nil
}

View File

@@ -0,0 +1,54 @@
package nomad
import (
"fmt"
"testing"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
func TestSystemEndpoint_GarbageCollect(t *testing.T) {
//s1 := testServer(t, func(c *Config) {
//c.NumSchedulers = 0 // Prevent automatic dequeue
//})
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Insert a job that can be GC'd
state := s1.fsm.State()
job := mock.Job()
job.GC = true
if err := state.UpsertJob(0, job); err != nil {
t.Fatalf("UpsertAllocs() failed: %v", err)
}
// Make the GC request
req := &structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "System.GarbageCollect", req, &resp); err != nil {
t.Fatalf("expect err")
}
testutil.WaitForResult(func() (bool, error) {
// Check if the job has been GC'd
exist, err := state.JobByID(job.ID)
if err != nil {
return false, err
}
if exist != nil {
return false, fmt.Errorf("job %q wasn't garbage collected", job.ID)
}
return true, nil
}, func(err error) {
t.Fatalf("err: %s", err)
})
}