Server stat/list impl

This commit is contained in:
Alex Dadgar
2018-02-01 11:28:52 -08:00
parent 3c689ba97f
commit a1eff9dda6
3 changed files with 623 additions and 5 deletions

View File

@@ -17,7 +17,6 @@ import (
)
// TODO a Single RPC for "Cat", "ReadAt", "Stream" endpoints
// TODO all the non-streaming RPC endpoints
// FileSystem endpoint is used for accessing the logs and filesystem of
// allocations from a Node.
@@ -25,8 +24,8 @@ type FileSystem struct {
srv *Server
}
func (f *FileSystem) Register() {
f.srv.streamingRpcs.Register("FileSystem.Logs", f.Logs)
func (f *FileSystem) register() {
f.srv.streamingRpcs.Register("FileSystem.Logs", f.logs)
}
func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *codec.Encoder) {
@@ -41,8 +40,144 @@ func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *co
})
}
// List is used to list the contents of an allocation's directory.
func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope
// in the forwarding chain.
args.QueryOptions.AllowStale = true
// Potentially forward to a different region.
if done, err := f.srv.forward("FileSystem.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "file_system", "list"}, time.Now())
// Check filesystem read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
return structs.ErrPermissionDenied
}
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing allocation ID")
}
// Lookup the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("unknown allocation %q", args.AllocID)
}
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(alloc.NodeID, f.srv.Region())
if err != nil {
return err
}
if srv == nil {
return structs.ErrNoNodeConn
}
return f.srv.forwardServer(srv, "FileSystem.List", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "FileSystem.List", args, reply)
}
// Stat is used to stat a file in the allocation's directory.
func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatResponse) error {
// We only allow stale reads since the only potentially stale information is
// the Node registration and the cost is fairly high for adding another hope
// in the forwarding chain.
args.QueryOptions.AllowStale = true
// Potentially forward to a different region.
if done, err := f.srv.forward("FileSystem.Stat", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "file_system", "stat"}, time.Now())
// Check filesystem read permissions
if aclObj, err := f.srv.ResolveToken(args.AuthToken); err != nil {
return err
} else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityReadFS) {
return structs.ErrPermissionDenied
}
// Verify the arguments.
if args.AllocID == "" {
return errors.New("missing allocation ID")
}
// Lookup the allocation
snap, err := f.srv.State().Snapshot()
if err != nil {
return err
}
alloc, err := snap.AllocByID(nil, args.AllocID)
if err != nil {
return err
}
if alloc == nil {
return fmt.Errorf("unknown allocation %q", args.AllocID)
}
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(alloc.NodeID, f.srv.Region())
if err != nil {
return err
}
if srv == nil {
return structs.ErrNoNodeConn
}
return f.srv.forwardServer(srv, "FileSystem.Stat", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "FileSystem.Stat", args, reply)
}
// Stats is used to retrieve the Clients stats.
func (f *FileSystem) Logs(conn io.ReadWriteCloser) {
func (f *FileSystem) logs(conn io.ReadWriteCloser) {
defer conn.Close()
defer metrics.MeasureSince([]string{"nomad", "file_system", "logs"}, time.Now())

View File

@@ -8,6 +8,7 @@ import (
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
@@ -20,6 +21,482 @@ import (
codec "github.com/ugorji/go/codec"
)
func TestClientFS_List_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Make the request without having a node-id
req := &cstructs.FsListRequest{
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.FsListResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "missing")
// Fetch the response setting the alloc id
req.AllocID = a.ID
var resp2 cstructs.FsListResponse
err = msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp2)
require.Nil(err)
require.NotEmpty(resp2.Files)
}
func TestClientFS_List_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create a bad token
policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request
req := &cstructs.FsListRequest{
AllocID: uuid.Generate(),
Path: "/",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
AuthToken: c.Token,
},
}
// Fetch the response
var resp cstructs.FsListResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), c.ExpectedError)
})
}
}
func TestClientFS_List_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state1 := s1.State()
state2 := s2.State()
require.Nil(state1.UpsertJob(999, a.Job))
require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a}))
require.Nil(state2.UpsertJob(999, a.Job))
require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state2.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request without having a node-id
req := &cstructs.FsListRequest{
AllocID: a.ID,
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.FsListResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.List", req, &resp)
require.Nil(err)
require.NotEmpty(resp.Files)
}
func TestClientFS_Stat_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s := TestServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer c.Shutdown()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state := s.State()
require.Nil(state.UpsertJob(999, a.Job))
require.Nil(state.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Make the request without having a node-id
req := &cstructs.FsStatRequest{
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.FsStatResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), "missing")
// Fetch the response setting the alloc id
req.AllocID = a.ID
var resp2 cstructs.FsStatResponse
err = msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp2)
require.Nil(err)
require.NotNil(resp2.Info)
}
func TestClientFS_Stat_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s, root := TestACLServer(t, nil)
defer s.Shutdown()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Create a bad token
policyBad := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDeny})
tokenBad := mock.CreatePolicyAndToken(t, s.State(), 1005, "invalid", policyBad)
policyGood := mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityReadFS})
tokenGood := mock.CreatePolicyAndToken(t, s.State(), 1009, "valid2", policyGood)
cases := []struct {
Name string
Token string
ExpectedError string
}{
{
Name: "bad token",
Token: tokenBad.SecretID,
ExpectedError: structs.ErrPermissionDenied.Error(),
},
{
Name: "good token",
Token: tokenGood.SecretID,
ExpectedError: "unknown allocation",
},
{
Name: "root token",
Token: root.SecretID,
ExpectedError: "unknown allocation",
},
}
for _, c := range cases {
t.Run(c.Name, func(t *testing.T) {
// Make the request
req := &cstructs.FsStatRequest{
AllocID: uuid.Generate(),
Path: "/",
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
AuthToken: c.Token,
},
}
// Fetch the response
var resp cstructs.FsStatResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
require.NotNil(err)
require.Contains(err.Error(), c.ExpectedError)
})
}
}
func TestClientFS_Stat_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1 := TestServer(t, nil)
defer s1.Shutdown()
s2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
})
defer c.Shutdown()
// Force an allocation onto the node
a := mock.Alloc()
a.Job.Type = structs.JobTypeBatch
a.NodeID = c.NodeID()
a.Job.TaskGroups[0].Count = 1
a.Job.TaskGroups[0].Tasks[0] = &structs.Task{
Name: "web",
Driver: "mock_driver",
Config: map[string]interface{}{
"run_for": "2s",
},
LogConfig: structs.DefaultLogConfig(),
Resources: &structs.Resources{
CPU: 500,
MemoryMB: 256,
},
}
// Wait for the client to connect
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
t.Fatalf("should have a clients")
})
// Upsert the allocation
state1 := s1.State()
state2 := s2.State()
require.Nil(state1.UpsertJob(999, a.Job))
require.Nil(state1.UpsertAllocs(1003, []*structs.Allocation{a}))
require.Nil(state2.UpsertJob(999, a.Job))
require.Nil(state2.UpsertAllocs(1003, []*structs.Allocation{a}))
// Wait for the client to run the allocation
testutil.WaitForResult(func() (bool, error) {
alloc, err := state2.AllocByID(nil, a.ID)
if err != nil {
return false, err
}
if alloc == nil {
return false, fmt.Errorf("unknown alloc")
}
if alloc.ClientStatus != structs.AllocClientStatusComplete {
return false, fmt.Errorf("alloc client status: %v", alloc.ClientStatus)
}
return true, nil
}, func(err error) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request without having a node-id
req := &cstructs.FsStatRequest{
AllocID: a.ID,
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.FsStatResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
require.Nil(err)
require.NotNil(resp.Info)
}
func TestClientFS_Logs_NoAlloc(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -693,6 +1170,11 @@ func TestClientFS_Logs_Remote_Region(t *testing.T) {
t.Fatalf("Alloc on node %q not finished: %v", c.NodeID(), err)
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
// Make the request
req := &cstructs.FsLogsRequest{
AllocID: a.ID,

View File

@@ -930,7 +930,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
// Streaming endpoints
s.staticEndpoints.FileSystem = &FileSystem{s}
s.staticEndpoints.FileSystem.Register()
s.staticEndpoints.FileSystem.register()
}
// Register the static handlers
@@ -948,6 +948,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
server.Register(s.staticEndpoints.Search)
s.staticEndpoints.Enterprise.Register(server)
server.Register(s.staticEndpoints.ClientStats)
server.Register(s.staticEndpoints.FileSystem)
// Create new dynamic endpoints and add them to the RPC server.
node := &Node{srv: s, ctx: ctx}