diff --git a/nomad/deployment_endpoint.go b/nomad/deployment_endpoint.go index af7d82746..07e985983 100644 --- a/nomad/deployment_endpoint.go +++ b/nomad/deployment_endpoint.go @@ -14,6 +14,56 @@ type Deployment struct { srv *Server } +// TODO http endpoint and api +// List returns the list of deployments in the system +func (d *Deployment) List(args *structs.DeploymentListRequest, reply *structs.DeploymentListResponse) error { + if done, err := d.srv.forward("Deployment.List", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "deployment", "List"}, time.Now()) + + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + run: func(ws memdb.WatchSet, state *state.StateStore) error { + // Capture all the deployments + var err error + var iter memdb.ResultIterator + if prefix := args.QueryOptions.Prefix; prefix != "" { + iter, err = state.DeploymentsByIDPrefix(ws, prefix) + } else { + iter, err = state.Deployments(ws) + } + if err != nil { + return err + } + + var deploys []*structs.Deployment + for { + raw := iter.Next() + if raw == nil { + break + } + deploy := raw.(*structs.Deployment) + deploys = append(deploys, deploy) + } + reply.Deployments = deploys + + // Use the last index that affected the jobs table + index, err := state.Index("deployment") + if err != nil { + return err + } + reply.Index = index + + // Set the query response + d.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return d.srv.blockingRPC(&opts) +} + // TODO http endpoint and api // Allocations returns the list of allocations that are a part of the deployment func (d *Deployment) Allocations(args *structs.DeploymentSpecificRequest, reply *structs.AllocListResponse) error { diff --git a/nomad/deployment_endpoint_test.go b/nomad/deployment_endpoint_test.go index d03c752eb..f249940cc 100644 --- a/nomad/deployment_endpoint_test.go +++ b/nomad/deployment_endpoint_test.go @@ -10,6 +10,127 @@ import ( "github.com/hashicorp/nomad/testutil" ) +func TestDeploymentEndpoint_List(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + deployment := mock.Deployment() + state := s1.fsm.State() + + if err := state.UpsertDeployment(1000, deployment, false); err != nil { + t.Fatalf("err: %v", err) + } + + // Lookup the deployments + get := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp structs.DeploymentListResponse + if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index != 1000 { + t.Fatalf("Bad index: %d %d", resp.Index, 1000) + } + + if len(resp.Deployments) != 1 { + t.Fatalf("bad: %#v", resp.Deployments) + } + if resp.Deployments[0].ID != deployment.ID { + t.Fatalf("bad: %#v", resp.Deployments[0]) + } + + // Lookup the deploys by prefix + get = &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{Region: "global", Prefix: deployment.ID[:4]}, + } + + var resp2 structs.DeploymentListResponse + if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != 1000 { + t.Fatalf("Bad index: %d %d", resp2.Index, 1000) + } + + if len(resp2.Deployments) != 1 { + t.Fatalf("bad: %#v", resp2.Deployments) + } + if resp2.Deployments[0].ID != deployment.ID { + t.Fatalf("bad: %#v", resp2.Deployments[0]) + } +} + +func TestDeploymentEndpoint_List_Blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the deployment + deployment := mock.Deployment() + + // Upsert alloc triggers watches + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertDeployment(3, deployment, false); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req := &structs.DeploymentListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 1, + }, + } + start := time.Now() + var resp structs.DeploymentListResponse + if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 3 { + t.Fatalf("Bad index: %d %d", resp.Index, 3) + } + if len(resp.Deployments) != 1 || resp.Deployments[0].ID != deployment.ID { + t.Fatalf("bad: %#v", resp.Deployments) + } + + // Deployment updates trigger watches + deployment2 := deployment.Copy() + deployment2.Status = structs.DeploymentStatusPaused + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertDeployment(5, deployment2, false); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req.MinQueryIndex = 3 + start = time.Now() + var resp2 structs.DeploymentListResponse + if err := msgpackrpc.CallWithCodec(codec, "Deployment.List", req, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Since(start); elapsed < 100*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp2) + } + if resp2.Index != 5 { + t.Fatalf("Bad index: %d %d", resp2.Index, 5) + } + if len(resp2.Deployments) != 1 || resp.Deployments[0].ID != deployment2.ID || + resp2.Deployments[0].Status != structs.DeploymentStatusPaused { + t.Fatalf("bad: %#v", resp2.Deployments) + } +} + func TestDeploymentEndpoint_Allocations(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 41668ee68..fda10de47 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -312,6 +312,19 @@ func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error return iter, nil } +func (s *StateStore) DeploymentsByIDPrefix(ws memdb.WatchSet, deploymentID string) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + // Walk the entire deployments table + iter, err := txn.Get("deployment", "id_prefix", deploymentID) + if err != nil { + return nil, err + } + + ws.Add(iter.WatchCh()) + return iter, nil +} + func (s *StateStore) DeploymentByID(ws memdb.WatchSet, deploymentID string) (*structs.Deployment, error) { txn := s.db.Txn(false) return s.deploymentByIDImpl(ws, deploymentID, txn) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index aac9f5a65..c2134c298 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -511,6 +511,92 @@ func TestStateStore_Deployments(t *testing.T) { } } +func TestStateStore_DeploymentsByIDPrefix(t *testing.T) { + state := testStateStore(t) + deploy := mock.Deployment() + + deploy.ID = "11111111-662e-d0ab-d1c9-3e434af7bdb4" + err := state.UpsertDeployment(1000, deploy, false) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a watchset so we can test that getters don't cause it to fire + ws := memdb.NewWatchSet() + iter, err := state.DeploymentsByIDPrefix(ws, deploy.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + gatherDeploys := func(iter memdb.ResultIterator) []*structs.Deployment { + var deploys []*structs.Deployment + for { + raw := iter.Next() + if raw == nil { + break + } + deploy := raw.(*structs.Deployment) + deploys = append(deploys, deploy) + } + return deploys + } + + deploys := gatherDeploys(iter) + if len(deploys) != 1 { + t.Fatalf("err: %v", err) + } + + if watchFired(ws) { + t.Fatalf("bad") + } + + iter, err = state.DeploymentsByIDPrefix(ws, "11") + if err != nil { + t.Fatalf("err: %v", err) + } + + deploys = gatherDeploys(iter) + if len(deploys) != 1 { + t.Fatalf("err: %v", err) + } + + deploy = mock.Deployment() + deploy.ID = "11222222-662e-d0ab-d1c9-3e434af7bdb4" + err = state.UpsertDeployment(1001, deploy, false) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !watchFired(ws) { + t.Fatalf("bad") + } + + ws = memdb.NewWatchSet() + iter, err = state.DeploymentsByIDPrefix(ws, "11") + if err != nil { + t.Fatalf("err: %v", err) + } + + deploys = gatherDeploys(iter) + if len(deploys) != 2 { + t.Fatalf("err: %v", err) + } + + iter, err = state.DeploymentsByIDPrefix(ws, "1111") + if err != nil { + t.Fatalf("err: %v", err) + } + + deploys = gatherDeploys(iter) + if len(deploys) != 1 { + t.Fatalf("err: %v", err) + } + + if watchFired(ws) { + t.Fatalf("bad") + } +} + func TestStateStore_UpsertNode_Node(t *testing.T) { state := testStateStore(t) node := mock.Node() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index fdde1e2c4..09e19b2d7 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -492,6 +492,11 @@ type GenericRequest struct { QueryOptions } +// DeploymentListRequest is used to list the deployments +type DeploymentListRequest struct { + QueryOptions +} + // DeploymentStatusUpdateRequest is used to update the status of a deployment as // well as optionally creating an evaluation atomically. type DeploymentStatusUpdateRequest struct { @@ -791,6 +796,12 @@ type AllocListResponse struct { QueryMeta } +// DeploymentListResponse is used for a list request +type DeploymentListResponse struct { + Deployments []*Deployment + QueryMeta +} + // EvalListResponse is used for a list request type EvalListResponse struct { Evaluations []*Evaluation