From a1eff9dda603919fbe76d461a1b924ca3b64fab1 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 1 Feb 2018 11:28:52 -0800 Subject: [PATCH] Server stat/list impl --- nomad/client_fs_endpoint.go | 143 ++++++++- nomad/client_fs_endpoint_test.go | 482 +++++++++++++++++++++++++++++++ nomad/server.go | 3 +- 3 files changed, 623 insertions(+), 5 deletions(-) diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 6d5081f16..8a105793a 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -17,7 +17,6 @@ import ( ) // TODO a Single RPC for "Cat", "ReadAt", "Stream" endpoints -// TODO all the non-streaming RPC endpoints // FileSystem endpoint is used for accessing the logs and filesystem of // allocations from a Node. @@ -25,8 +24,8 @@ type FileSystem struct { srv *Server } -func (f *FileSystem) Register() { - f.srv.streamingRpcs.Register("FileSystem.Logs", f.Logs) +func (f *FileSystem) register() { + f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs) } func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *codec.Encoder) { @@ -41,8 +40,144 @@ func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *co }) } +// List is used to list the contents of an allocation's directory. +func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error { + // We only allow stale reads since the only potentially stale information is + // the Node registration and the cost is fairly high for adding another hope + // in the forwarding chain. + args.QueryOptions.AllowStale = true + + // Potentially forward to a different region. + if done, err := f.srv.forward("FileSystem.List", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "file_system", "list"}, time.Now()) + + // Check filesystem read permissions + if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) { + return structs.ErrPermissionDenied + } + + // Verify the arguments. + if args.AllocID == "" { + return errors.New("missing allocation ID") + } + + // Lookup the allocation + snap, err := f.srv.State().Snapshot() + if err != nil { + return err + } + + alloc, err := snap.AllocByID(nil, args.AllocID) + if err != nil { + return err + } + if alloc == nil { + return fmt.Errorf("unknown allocation %q", args.AllocID) + } + + // Get the connection to the client + state, ok := f.srv.getNodeConn(alloc.NodeID) + if !ok { + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + // Determine the Server that has a connection to the node. + srv, err := f.srv.serverWithNodeConn(alloc.NodeID, f.srv.Region()) + if err != nil { + return err + } + + if srv == nil { + return structs.ErrNoNodeConn + } + + return f.srv.forwardServer(srv, "FileSystem.List", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "FileSystem.List", args, reply) +} + +// Stat is used to stat a file in the allocation's directory. +func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatResponse) error { + // We only allow stale reads since the only potentially stale information is + // the Node registration and the cost is fairly high for adding another hope + // in the forwarding chain. + args.QueryOptions.AllowStale = true + + // Potentially forward to a different region. + if done, err := f.srv.forward("FileSystem.Stat", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "file_system", "stat"}, time.Now()) + + // Check filesystem read permissions + if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) { + return structs.ErrPermissionDenied + } + + // Verify the arguments. + if args.AllocID == "" { + return errors.New("missing allocation ID") + } + + // Lookup the allocation + snap, err := f.srv.State().Snapshot() + if err != nil { + return err + } + + alloc, err := snap.AllocByID(nil, args.AllocID) + if err != nil { + return err + } + if alloc == nil { + return fmt.Errorf("unknown allocation %q", args.AllocID) + } + + // Get the connection to the client + state, ok := f.srv.getNodeConn(alloc.NodeID) + if !ok { + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + // Determine the Server that has a connection to the node. + srv, err := f.srv.serverWithNodeConn(alloc.NodeID, f.srv.Region()) + if err != nil { + return err + } + + if srv == nil { + return structs.ErrNoNodeConn + } + + return f.srv.forwardServer(srv, "FileSystem.Stat", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "FileSystem.Stat", args, reply) +} + // Stats is used to retrieve the Clients stats. -func (f *FileSystem) Logs(conn io.ReadWriteCloser) { +func (f *FileSystem) logs(conn io.ReadWriteCloser) { defer conn.Close() defer metrics.MeasureSince([]string{"nomad", "file_system", "logs"}, time.Now()) diff --git a/nomad/client_fs_endpoint_test.go b/nomad/client_fs_endpoint_test.go index 80d7cb4e5..a1c10ebff 100644 --- a/nomad/client_fs_endpoint_test.go +++ b/nomad/client_fs_endpoint_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/client" "github.com/hashicorp/nomad/client/config" @@ -20,6 +21,482 @@ import ( codec "github.com/ugorji/go/codec" ) +func TestClientFS_List_Local(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := TestServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + c := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s.config.RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "2s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + nodes := s.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // Upsert the allocation + state := s.State() + require.Nil(state.UpsertJob(999, a.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) + }) + + // Make the request without having a node-id + req := &cstructs.FsListRequest{ + Path: "/", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp cstructs.FsListResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), "missing") + + // Fetch the response setting the alloc id + req.AllocID = a.ID + var resp2 cstructs.FsListResponse + err = msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp2) + require.Nil(err) + require.NotEmpty(resp2.Files) +} + +func TestClientFS_List_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s, root := TestACLServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a bad token + policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) + + cases := []struct { + Name string + Token string + ExpectedError string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedError: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedError: "unknown allocation", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedError: "unknown allocation", + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + + // Make the request + req := &cstructs.FsListRequest{ + AllocID: uuid.Generate(), + Path: "/", + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + AuthToken: c.Token, + }, + } + + // Fetch the response + var resp cstructs.FsListResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), c.ExpectedError) + }) + } +} + +func TestClientFS_List_Remote(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s2) + + c := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.config.RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "2s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // Upsert the allocation + state1 := s1.State() + state2 := s2.State() + require.Nil(state1.UpsertJob(999, a.Job)) + require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) + require.Nil(state2.UpsertJob(999, a.Job)) + require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state2.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) + }) + + // Force remove the connection locally in case it exists + s1.nodeConnsLock.Lock() + delete(s1.nodeConns, c.NodeID()) + s1.nodeConnsLock.Unlock() + + // Make the request without having a node-id + req := &cstructs.FsListRequest{ + AllocID: a.ID, + Path: "/", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp cstructs.FsListResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp) + require.Nil(err) + require.NotEmpty(resp.Files) +} + +func TestClientFS_Stat_Local(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s := TestServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + c := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s.config.RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "2s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + nodes := s.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // Upsert the allocation + state := s.State() + require.Nil(state.UpsertJob(999, a.Job)) + require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) + }) + + // Make the request without having a node-id + req := &cstructs.FsStatRequest{ + Path: "/", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp cstructs.FsStatResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), "missing") + + // Fetch the response setting the alloc id + req.AllocID = a.ID + var resp2 cstructs.FsStatResponse + err = msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp2) + require.Nil(err) + require.NotNil(resp2.Info) +} + +func TestClientFS_Stat_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s, root := TestACLServer(t, nil) + defer s.Shutdown() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Create a bad token + policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny}) + tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad) + + policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS}) + tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood) + + cases := []struct { + Name string + Token string + ExpectedError string + }{ + { + Name: "bad token", + Token: tokenBad.SecretID, + ExpectedError: structs.ErrPermissionDenied.Error(), + }, + { + Name: "good token", + Token: tokenGood.SecretID, + ExpectedError: "unknown allocation", + }, + { + Name: "root token", + Token: root.SecretID, + ExpectedError: "unknown allocation", + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + + // Make the request + req := &cstructs.FsStatRequest{ + AllocID: uuid.Generate(), + Path: "/", + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + AuthToken: c.Token, + }, + } + + // Fetch the response + var resp cstructs.FsStatResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) + require.NotNil(err) + require.Contains(err.Error(), c.ExpectedError) + }) + } +} + +func TestClientFS_Stat_Remote(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s1 := TestServer(t, nil) + defer s1.Shutdown() + s2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s2) + + c := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.config.RPCAddr.String()} + }) + defer c.Shutdown() + + // Force an allocation onto the node + a := mock.Alloc() + a.Job.Type = structs.JobTypeBatch + a.NodeID = c.NodeID() + a.Job.TaskGroups[0].Count = 1 + a.Job.TaskGroups[0].Tasks[0] = &structs.Task{ + Name: "web", + Driver: "mock_driver", + Config: map[string]interface{}{ + "run_for": "2s", + }, + LogConfig: structs.DefaultLogConfig(), + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + }, + } + + // Wait for the client to connect + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + t.Fatalf("should have a clients") + }) + + // Upsert the allocation + state1 := s1.State() + state2 := s2.State() + require.Nil(state1.UpsertJob(999, a.Job)) + require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a})) + require.Nil(state2.UpsertJob(999, a.Job)) + require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a})) + + // Wait for the client to run the allocation + testutil.WaitForResult(func() (bool, error) { + alloc, err := state2.AllocByID(nil, a.ID) + if err != nil { + return false, err + } + if alloc == nil { + return false, fmt.Errorf("unknown alloc") + } + if alloc.ClientStatus != structs.AllocClientStatusComplete { + return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus) + } + + return true, nil + }, func(err error) { + t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) + }) + + // Force remove the connection locally in case it exists + s1.nodeConnsLock.Lock() + delete(s1.nodeConns, c.NodeID()) + s1.nodeConnsLock.Unlock() + + // Make the request without having a node-id + req := &cstructs.FsStatRequest{ + AllocID: a.ID, + Path: "/", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp cstructs.FsStatResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) + require.Nil(err) + require.NotNil(resp.Info) +} + func TestClientFS_Logs_NoAlloc(t *testing.T) { t.Parallel() require := require.New(t) @@ -693,6 +1170,11 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) { t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err) }) + // Force remove the connection locally in case it exists + s1.nodeConnsLock.Lock() + delete(s1.nodeConns, c.NodeID()) + s1.nodeConnsLock.Unlock() + // Make the request req := &cstructs.FsLogsRequest{ AllocID: a.ID, diff --git a/nomad/server.go b/nomad/server.go index 704e50a3b..99fcc0ba3 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -930,7 +930,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { // Streaming endpoints s.staticEndpoints.FileSystem = &FileSystem{s} - s.staticEndpoints.FileSystem.Register() + s.staticEndpoints.FileSystem.register() } // Register the static handlers @@ -948,6 +948,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { server.Register(s.staticEndpoints.Search) s.staticEndpoints.Enterprise.Register(server) server.Register(s.staticEndpoints.ClientStats) + server.Register(s.staticEndpoints.FileSystem) // Create new dynamic endpoints and add them to the RPC server. node := &Node{srv: s, ctx: ctx}