Batch Deregister RPC

This commit is contained in:
Alex Dadgar
2018-03-14 15:32:18 -07:00
parent 0cc4e3caf0
commit 657ec0a0c4
5 changed files with 625 additions and 312 deletions

View File

@@ -238,6 +238,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
}
// Check enterprise only message types.
@@ -431,14 +433,41 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering job failed: %v", err)
return err
}
return nil
}
func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now())
var req structs.JobBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering %v failed: %v", jobNS, err)
return err
}
}
return n.upsertEvals(index, req.Evals)
}
// handleJobDeregister is used to deregister a job.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(req.Namespace, req.JobID); err != nil {
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}
if req.Purge {
if err := n.state.DeleteJob(index, req.Namespace, req.JobID); err != nil {
if purge {
if err := n.state.DeleteJob(index, namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
@@ -446,18 +475,18 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.Namespace, req.JobID)
n.state.DeletePeriodicLaunch(index, namespace, jobID)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, req.Namespace, req.JobID)
current, err := n.state.JobByID(ws, namespace, jobID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err)
return err
}
if current == nil {
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", req.JobID, req.Namespace)
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
}
stopped := current.Copy()

View File

@@ -612,6 +612,84 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
}
}
func TestFSM_BatchDeregisterJob(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)
job2 := mock.Job()
req2 := structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{
Namespace: job2.Namespace,
},
}
buf, err = structs.Encode(structs.JobRegisterRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
req3 := structs.JobBatchDeregisterRequest{
Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{
structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}: &structs.JobDeregisterOptions{},
structs.NamespacedID{
ID: job2.ID,
Namespace: job2.Namespace,
}: &structs.JobDeregisterOptions{
Purge: true,
},
},
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobBatchDeregisterRequestType, req3)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)
// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
require.Nil(err)
require.NotNil(jobOut)
require.True(jobOut.Stop)
// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
require.NotContains(fsm.periodicDispatcher.tracked, tuple)
// Verify it was not removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
require.Nil(err)
require.NotNil(launchOut)
// Verify the other jbo was purged
jobOut2, err := fsm.State().JobByID(ws, job2.Namespace, job2.ID)
require.Nil(err)
require.Nil(jobOut2)
}
func TestFSM_UpdateEval(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

View File

@@ -615,8 +615,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
// since all should be able to handle deregistration in the same way.
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
@@ -646,6 +645,87 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return nil
}
// BatchDeregister is used to remove a set of jobs from the cluster.
func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error {
if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "batch_deregister"}, time.Now())
// Resolve the ACL token
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}
// Validate the arguments
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
}
if len(args.Evals) != 0 {
return fmt.Errorf("evaluations should not be populated")
}
// Loop through checking for permissions
for jobNS := range args.Jobs {
// Check for submit-job permissions
if aclObj != nil && !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
}
// Loop through to create evals
for jobNS, options := range args.Jobs {
if options == nil {
return fmt.Errorf("no deregister options provided for %v", jobNS)
}
// Lookup the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID)
if err != nil {
return err
}
// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
continue
}
priority := structs.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
jtype = job.Type
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Priority: priority,
Type: jtype,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
}
args.Evals = append(args.Evals, eval)
}
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: batch deregister failed: %v", err)
return err
}
reply.Index = index
return nil
}
// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {

File diff suppressed because it is too large Load Diff

View File

@@ -77,6 +77,7 @@ const (
ACLTokenBootstrapRequestType
AutopilotRequestType
UpsertNodeEventsType
JobBatchDeregisterRequestType
)
const (
@@ -379,6 +380,26 @@ type JobDeregisterRequest struct {
WriteRequest
}
// JobBatchDeregisterRequest is used to batch deregister jobs and upsert
// evaluations.
type JobBatchDeregisterRequest struct {
// Jobs is the set of jobs to deregister
Jobs map[NamespacedID]*JobDeregisterOptions
// Evals is the set of evaluations to create.
Evals []*Evaluation
WriteRequest
}
// JobDeregisterOptions configures how a job is deregistered.
type JobDeregisterOptions struct {
// Purge controls whether the deregister purges the job from the system or
// whether the job is just marked as stopped and will be removed by the
// garbage collector
Purge bool
}
// JobEvaluateRequest is used when we just need to re-evaluate a target job
type JobEvaluateRequest struct {
JobID string
@@ -787,6 +808,13 @@ type JobDeregisterResponse struct {
QueryMeta
}
// JobBatchDeregisterResponse is used to respond to a batch job deregistration
type JobBatchDeregisterResponse struct {
// JobEvals maps the job to its created evaluation
JobEvals map[NamespacedID]string
QueryMeta
}
// JobValidateResponse is the response from validate request
type JobValidateResponse struct {
// DriverConfigValidated indicates whether the agent validated the driver