small fixes

This commit is contained in:
Alex Dadgar
2018-09-15 16:42:38 -07:00
parent 260b566c91
commit 32f9da9e07
8 changed files with 49 additions and 67 deletions

View File

@@ -134,7 +134,7 @@ func testTaskRunnerFromAlloc(t *testing.T, restarts bool, alloc *structs.Allocat
vclient := vaultclient.NewMockVaultClient()
cclient := consul.NewMockAgent()
serviceClient := consul.NewServiceClient(cclient, logger, true)
serviceClient := consul.NewServiceClient(cclient, testlog.HCLogger(t), true)
go serviceClient.Run()
tr := NewTaskRunner(logger, conf, db, upd.Update, taskDir, alloc, task, vclient, serviceClient)
if !restarts {
@@ -633,7 +633,7 @@ func TestTaskRunner_UnregisterConsul_Retries(t *testing.T) {
ctx := testTaskRunnerFromAlloc(t, true, alloc)
// Use mockConsulServiceClient
consul := consulApi.NewMockConsulServiceClient(t)
consul := consulApi.NewMockConsulServiceClient(t, testlog.HCLogger(t))
ctx.tr.consul = consul
ctx.tr.MarkReceived()
@@ -1851,7 +1851,7 @@ func TestTaskRunner_CheckWatcher_Restart(t *testing.T) {
// backed by a mock consul whose checks are always unhealthy.
consulAgent := consul.NewMockAgent()
consulAgent.SetStatus("critical")
consulClient := consul.NewServiceClient(consulAgent, ctx.tr.logger, true)
consulClient := consul.NewServiceClient(consulAgent, testlog.HCLogger(t), true)
go consulClient.Run()
defer consulClient.Shutdown()

View File

@@ -533,7 +533,7 @@ func (c *ServiceClient) sync() error {
}
}
c.logger.Debug("sync complete", "registed_services", sreg, "deregistered_services", sdereg,
c.logger.Debug("sync complete", "registered_services", sreg, "deregistered_services", sdereg,
"registered_checks", creg, "deregistered_checks", cdereg)
return nil
}

View File

@@ -4,7 +4,6 @@ import (
"testing"
"time"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -15,14 +14,7 @@ import (
// testDrainingNode creates a *drainingNode with a 1h deadline but no allocs
func testDrainingNode(t *testing.T) *drainingNode {
t.Helper()
sconfig := &state.StateStoreConfig{
LogOutput: testlog.NewWriter(t),
Region: "global",
}
state, err := state.NewStateStore(sconfig)
require.Nil(t, err)
state := state.TestStateStore(t)
node := mock.Node()
node.DrainStrategy = &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{

View File

@@ -16,16 +16,7 @@ import (
func testNodeDrainWatcher(t *testing.T) (*nodeDrainWatcher, *state.StateStore, *MockNodeTracker) {
t.Helper()
sconfig := &state.StateStoreConfig{
LogOutput: testlog.NewWriter(t),
Region: "global",
}
state, err := state.NewStateStore(sconfig)
if err != nil {
t.Fatalf("failed to create state store: %v", err)
}
state := state.TestStateStore(t)
limiter := rate.NewLimiter(100.0, 100)
logger := testlog.HCLogger(t)
m := NewMockNodeTracker()

View File

@@ -267,7 +267,7 @@ func (n *nomadFSM) applyUpsertNode(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_node"}, time.Now())
var req structs.NodeRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
// Handle upgrade paths
@@ -291,7 +291,7 @@ func (n *nomadFSM) applyDeregisterNode(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_node"}, time.Now())
var req structs.NodeDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteNode(index, req.NodeID); err != nil {
@@ -305,7 +305,7 @@ func (n *nomadFSM) applyStatusUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_status_update"}, time.Now())
var req structs.NodeUpdateStatusRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateNodeStatus(index, req.NodeID, req.Status, req.NodeEvent); err != nil {
@@ -333,7 +333,7 @@ func (n *nomadFSM) applyDrainUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_drain_update"}, time.Now())
var req structs.NodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
// COMPAT Remove in version 0.10
@@ -361,7 +361,7 @@ func (n *nomadFSM) applyBatchDrainUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_node_drain_update"}, time.Now())
var req structs.BatchNodeUpdateDrainRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BatchUpdateNodeDrain(index, req.Updates, req.NodeEvents); err != nil {
@@ -375,7 +375,7 @@ func (n *nomadFSM) applyNodeEligibilityUpdate(buf []byte, index uint64) interfac
defer metrics.MeasureSince([]string{"nomad", "fsm", "node_eligibility_update"}, time.Now())
var req structs.NodeUpdateEligibilityRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
// Lookup the existing node
@@ -404,7 +404,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "register_job"}, time.Now())
var req structs.JobRegisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
/* Handle upgrade paths:
@@ -426,7 +426,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// tracking it.
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Error("periodicDispatcher.Add failed", "error", err)
return fmt.Errorf("failed adding job to periodic dispatcher", "error", err)
return fmt.Errorf("failed adding job to periodic dispatcher: %v", err)
}
// Create a watch set
@@ -497,7 +497,7 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_job"}, time.Now())
var req structs.JobDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
@@ -512,7 +512,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{}
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now())
var req structs.JobBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
for jobNS, options := range req.Jobs {
@@ -572,7 +572,7 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "update_eval"}, time.Now())
var req structs.EvalUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
return n.upsertEvals(index, req.Evals)
}
@@ -617,7 +617,7 @@ func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now())
var req structs.EvalDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil {
@@ -631,7 +631,7 @@ func (n *nomadFSM) applyAllocUpdate(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
// Attach the job to all the allocations. It is pulled out in the
@@ -675,7 +675,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{}
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_client_update"}, time.Now())
var req structs.AllocUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if len(req.Alloc) == 0 {
return nil
@@ -739,7 +739,7 @@ func (n *nomadFSM) applyAllocUpdateDesiredTransition(buf []byte, index uint64) i
defer metrics.MeasureSince([]string{"nomad", "fsm", "alloc_update_desired_transition"}, time.Now())
var req structs.AllocUpdateDesiredTransitionRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateAllocsDesiredTransitions(index, req.Allocs, req.Evals); err != nil {
@@ -764,8 +764,7 @@ func (n *nomadFSM) applyUpsertNodeEvent(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_node_events"}, time.Now())
var req structs.EmitNodeEventsRequest
if err := structs.Decode(buf, &req); err != nil {
n.logger.Error("failed to decode EmitNodeEventsRequest", "error", err)
return err
panic(fmt.Errorf("failed to decode EmitNodeEventsRequest: %v", err))
}
if err := n.state.UpsertNodeEvents(index, req.NodeEvents); err != nil {
@@ -782,7 +781,7 @@ func (n *nomadFSM) applyUpsertVaultAccessor(buf []byte, index uint64) interface{
defer metrics.MeasureSince([]string{"nomad", "fsm", "upsert_vault_accessor"}, time.Now())
var req structs.VaultAccessorsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertVaultAccessor(index, req.Accessors); err != nil {
@@ -798,7 +797,7 @@ func (n *nomadFSM) applyDeregisterVaultAccessor(buf []byte, index uint64) interf
defer metrics.MeasureSince([]string{"nomad", "fsm", "deregister_vault_accessor"}, time.Now())
var req structs.VaultAccessorsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteVaultAccessors(index, req.Accessors); err != nil {
@@ -814,7 +813,7 @@ func (n *nomadFSM) applyPlanResults(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_plan_results"}, time.Now())
var req structs.ApplyPlanResultsRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertPlanResults(index, &req); err != nil {
@@ -831,7 +830,7 @@ func (n *nomadFSM) applyDeploymentStatusUpdate(buf []byte, index uint64) interfa
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_status_update"}, time.Now())
var req structs.DeploymentStatusUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateDeploymentStatus(index, &req); err != nil {
@@ -848,7 +847,7 @@ func (n *nomadFSM) applyDeploymentPromotion(buf []byte, index uint64) interface{
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_promotion"}, time.Now())
var req structs.ApplyDeploymentPromoteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateDeploymentPromotion(index, &req); err != nil {
@@ -866,7 +865,7 @@ func (n *nomadFSM) applyDeploymentAllocHealth(buf []byte, index uint64) interfac
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_alloc_health"}, time.Now())
var req structs.ApplyDeploymentAllocHealthRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateDeploymentAllocHealth(index, &req); err != nil {
@@ -883,7 +882,7 @@ func (n *nomadFSM) applyDeploymentDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_deployment_delete"}, time.Now())
var req structs.DeploymentDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteDeployment(index, req.Deployments); err != nil {
@@ -899,7 +898,7 @@ func (n *nomadFSM) applyJobStability(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_job_stability"}, time.Now())
var req structs.JobStabilityRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpdateJobStability(index, req.Namespace, req.JobID, req.JobVersion, req.Stable); err != nil {
@@ -915,7 +914,7 @@ func (n *nomadFSM) applyACLPolicyUpsert(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_upsert"}, time.Now())
var req structs.ACLPolicyUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertACLPolicies(index, req.Policies); err != nil {
@@ -930,7 +929,7 @@ func (n *nomadFSM) applyACLPolicyDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_policy_delete"}, time.Now())
var req structs.ACLPolicyDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteACLPolicies(index, req.Names); err != nil {
@@ -945,7 +944,7 @@ func (n *nomadFSM) applyACLTokenUpsert(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_upsert"}, time.Now())
var req structs.ACLTokenUpsertRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.UpsertACLTokens(index, req.Tokens); err != nil {
@@ -960,7 +959,7 @@ func (n *nomadFSM) applyACLTokenDelete(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_delete"}, time.Now())
var req structs.ACLTokenDeleteRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.DeleteACLTokens(index, req.AccessorIDs); err != nil {
@@ -975,7 +974,7 @@ func (n *nomadFSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{}
defer metrics.MeasureSince([]string{"nomad", "fsm", "apply_acl_token_bootstrap"}, time.Now())
var req structs.ACLTokenBootstrapRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := n.state.BootstrapACLTokens(index, req.ResetIndex, req.Token); err != nil {
@@ -988,7 +987,7 @@ func (n *nomadFSM) applyACLTokenBootstrap(buf []byte, index uint64) interface{}
func (n *nomadFSM) applyAutopilotUpdate(buf []byte, index uint64) interface{} {
var req structs.AutopilotSetConfigRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request", "error", err))
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"nomad", "fsm", "autopilot"}, time.Now())
@@ -1061,7 +1060,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
switch snapType {
case TimeTableSnapshot:
if err := n.timetable.Deserialize(dec); err != nil {
return fmt.Errorf("time table deserialize failed", "error", err)
return fmt.Errorf("time table deserialize failed: %v", err)
}
case NodeSnapshot:
@@ -1246,7 +1245,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
// remove this block.
index, err := newState.Index("job_summary")
if err != nil {
return fmt.Errorf("couldn't fetch index of job summary table", "error", err)
return fmt.Errorf("couldn't fetch index of job summary table: %v", err)
}
// If the index is 0 that means there is no job summary in the snapshot so
@@ -1258,7 +1257,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
return fmt.Errorf("unable to query latest index: %v", index)
}
if err := newState.ReconcileJobSummaries(latestIndex); err != nil {
return fmt.Errorf("error reconciling summaries", "error", err)
return fmt.Errorf("error reconciling summaries: %v", err)
}
}
@@ -1291,12 +1290,12 @@ func (n *nomadFSM) failLeakedDeployments(state *state.StateStore) error {
// and thus the older deployment leaks and then the job is removed.
iter, err := state.Deployments(nil)
if err != nil {
return fmt.Errorf("failed to query deployments", "error", err)
return fmt.Errorf("failed to query deployments: %v", err)
}
dindex, err := state.Index("deployment")
if err != nil {
return fmt.Errorf("couldn't fetch index of deployments table", "error", err)
return fmt.Errorf("couldn't fetch index of deployments table: %v", err)
}
for {
@@ -1348,7 +1347,7 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
snap, err := n.state.Snapshot()
if err != nil {
return fmt.Errorf("unable to create snapshot", "error", err)
return fmt.Errorf("unable to create snapshot: %v", err)
}
// Invoking the scheduler for every job so that we can populate the number

View File

@@ -931,7 +931,7 @@ func (s *Server) replicateACLPolicies(stopCh chan struct{}) {
},
}
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
s.logger.Debug("starting ACL policy replication from authoritative region", "authorative_region", req.Region)
s.logger.Debug("starting ACL policy replication from authoritative region", "authoritative_region", req.Region)
START:
for {
@@ -1073,7 +1073,7 @@ func (s *Server) replicateACLTokens(stopCh chan struct{}) {
},
}
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
s.logger.Debug("starting ACL token replication from authoritative region", "authorative_region", req.Region)
s.logger.Debug("starting ACL token replication from authoritative region", "authoritative_region", req.Region)
START:
for {

View File

@@ -365,7 +365,7 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
// Setup the Consul syncer
if err := s.setupConsulSyncer(); err != nil {
s.logger.Error("failed to create server consul syncer", "error", err)
return nil, fmt.Errorf("failed to create server Consul syncer: %v", "error", err)
return nil, fmt.Errorf("failed to create server Consul syncer: %v", err)
}
// Setup the deployment watcher.

View File

@@ -236,7 +236,7 @@ func TestPluginLoader_External_Config_Bad(t *testing.T) {
Config: map[string]interface{}{
"foo": "1",
"bar": "2",
"non-existant": "3",
"non-existent": "3",
},
},
},
@@ -244,7 +244,7 @@ func TestPluginLoader_External_Config_Bad(t *testing.T) {
_, err := NewPluginLoader(lconfig)
require.Error(err)
require.Contains(err.Error(), "No argument or block type is named \"non-existant\"")
require.Contains(err.Error(), "No argument or block type is named \"non-existent\"")
}
func TestPluginLoader_External_VersionOverlap(t *testing.T) {
@@ -452,7 +452,7 @@ func TestPluginLoader_Internal_Config_Bad(t *testing.T) {
Config: map[string]interface{}{
"foo": "1",
"bar": "2",
"non-existant": "3",
"non-existent": "3",
},
},
},
@@ -460,7 +460,7 @@ func TestPluginLoader_Internal_Config_Bad(t *testing.T) {
_, err := NewPluginLoader(lconfig)
require.Error(err)
require.Contains(err.Error(), "No argument or block type is named \"non-existant\"")
require.Contains(err.Error(), "No argument or block type is named \"non-existent\"")
}
func TestPluginLoader_InternalOverrideExternal(t *testing.T) {