nomad: endpoint to force re-evaluation of job

This commit is contained in:
Armon Dadgar
2015-08-15 18:11:26 -07:00
parent d36d420a71
commit e4ad49940d
3 changed files with 130 additions and 0 deletions

View File

@@ -83,6 +83,61 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
return nil
}
// Evaluate is used to force a job for re-evaluation
func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error {
if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluate"}, time.Now())
// Validate the arguments
if args.JobID == "" {
return fmt.Errorf("missing job ID for evaluation")
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.GetJobByID(args.JobID)
if err != nil {
return err
}
if job == nil {
return fmt.Errorf("job not found")
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: generateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: job.ModifyIndex,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
WriteRequest: structs.WriteRequest{Region: args.Region},
}
// Commit this evaluation via Raft
_, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Eval create failed: %v", err)
return err
}
// Setup the reply
reply.EvalID = eval.ID
reply.EvalCreateIndex = evalIndex
reply.JobModifyIndex = job.ModifyIndex
reply.Index = evalIndex
return nil
}
// Deregister is used to remove a job the cluster.
func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobDeregisterResponse) error {
if done, err := j.srv.forward("Job.Deregister", args, args, reply); done {

View File

@@ -161,6 +161,75 @@ func TestJobEndpoint_Register_Existing(t *testing.T) {
}
}
func TestJobEndpoint_Evaluate(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.JobRegisterResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Force a re-evaluation
reEval := &structs.JobEvaluateRequest{
JobID: job.ID,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Lookup the evaluation
state := s1.fsm.State()
eval, err := state.GetEvalByID(resp.EvalID)
if err != nil {
t.Fatalf("err: %v", err)
}
if eval == nil {
t.Fatalf("expected eval")
}
if eval.CreateIndex != resp.EvalCreateIndex {
t.Fatalf("index mis-match")
}
if eval.Priority != job.Priority {
t.Fatalf("bad: %#v", eval)
}
if eval.Type != job.Type {
t.Fatalf("bad: %#v", eval)
}
if eval.TriggeredBy != structs.EvalTriggerJobRegister {
t.Fatalf("bad: %#v", eval)
}
if eval.JobID != job.ID {
t.Fatalf("bad: %#v", eval)
}
if eval.JobModifyIndex != resp.JobModifyIndex {
t.Fatalf("bad: %#v", eval)
}
if eval.Status != structs.EvalStatusPending {
t.Fatalf("bad: %#v", eval)
}
}
func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()

View File

@@ -155,6 +155,12 @@ type JobDeregisterRequest struct {
WriteRequest
}
// JobEvaluateRequest is used when we just need to re-evaluate a target job
type JobEvaluateRequest struct {
JobID string
WriteRequest
}
// JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct {
JobID string