Merge pull request #6423 from hashicorp/b-direct-node-failure

cli: recover from client ACL lookup failures
This commit is contained in:
Mahmood Ali
2019-10-15 17:09:59 -04:00
committed by GitHub
2 changed files with 69 additions and 129 deletions

View File

@@ -210,12 +210,7 @@ func (a *Allocations) Exec(ctx context.Context,
func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task string, tty bool, command []string,
errCh chan<- error, q *QueryOptions) (sendFn func(*ExecStreamingInput) error, output <-chan *ExecStreamingOutput) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
errCh <- err
return nil, nil
}
nodeClient, _ := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if q == nil {
q = &QueryOptions{}
@@ -236,15 +231,17 @@ func (a *Allocations) execFrames(ctx context.Context, alloc *Allocation, task st
reqPath := fmt.Sprintf("/v1/client/allocation/%s/exec", alloc.ID)
conn, _, err := nodeClient.websocket(reqPath, q)
if err != nil {
// There was an error talking directly to the client. Non-network
// errors are fatal, but network errors can attempt to route via RPC.
if _, ok := err.(net.Error); !ok {
var conn *websocket.Conn
if nodeClient != nil {
conn, _, err = nodeClient.websocket(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
errCh <- err
return nil, nil
}
}
if conn == nil {
conn, _, err = a.client.websocket(reqPath, q)
if err != nil {
errCh <- err

179
api/fs.go
View File

@@ -92,72 +92,24 @@ func (a *AllocFS) Stat(alloc *Allocation, path string, q *QueryOptions) (*AllocF
// ReadAt is used to read bytes at a given offset until limit at the given path
// in an allocation directory. If limit is <= 0, there is no limit.
func (a *AllocFS) ReadAt(alloc *Allocation, path string, offset int64, limit int64, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
reqPath := fmt.Sprintf("/v1/client/fs/readat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}
// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}
return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["limit"] = strconv.FormatInt(limit, 10)
})
}
// Cat is used to read contents of a file at the given path in an allocation
// directory
func (a *AllocFS) Cat(alloc *Allocation, path string, q *QueryOptions) (io.ReadCloser, error) {
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if err != nil {
return nil, err
}
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["path"] = path
reqPath := fmt.Sprintf("/v1/client/fs/cat/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
return nil, err
}
// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
return nil, err
}
}
return r, nil
return queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
})
}
// Stream streams the content of a file blocking on EOF.
@@ -172,40 +124,19 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
cancel <-chan struct{}, q *QueryOptions) (<-chan *StreamFrame, <-chan error) {
errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
})
if err != nil {
errCh <- err
return nil, errCh
}
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["path"] = path
q.Params["offset"] = strconv.FormatInt(offset, 10)
q.Params["origin"] = origin
reqPath := fmt.Sprintf("/v1/client/fs/stream/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}
// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}
// Create the output channel
frames := make(chan *StreamFrame, 10)
@@ -244,6 +175,40 @@ func (a *AllocFS) Stream(alloc *Allocation, path, origin string, offset int64,
return frames, errCh
}
func queryClientNode(c *Client, alloc *Allocation, reqPath string, q *QueryOptions, customizeQ func(*QueryOptions)) (io.ReadCloser, error) {
nodeClient, _ := c.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
if customizeQ != nil {
customizeQ(q)
}
var r io.ReadCloser
var err error
if nodeClient != nil {
r, err = nodeClient.rawQuery(reqPath, q)
if _, ok := err.(net.Error); err != nil && !ok {
// found a non networking error talking to client directly
return nil, err
}
}
// failed to query node, access through server directly
// or network error when talking to the client directly
if r == nil {
return c.rawQuery(reqPath, q)
}
return r, err
}
// Logs streams the content of a tasks logs blocking on EOF.
// The parameters are:
// * allocation: the allocation to stream from.
@@ -264,42 +229,20 @@ func (a *AllocFS) Logs(alloc *Allocation, follow bool, task, logType, origin str
errCh := make(chan error, 1)
nodeClient, err := a.client.GetNodeClientWithTimeout(alloc.NodeID, ClientConnTimeout, q)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := queryClientNode(a.client, alloc, reqPath, q,
func(q *QueryOptions) {
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
})
if err != nil {
errCh <- err
return nil, errCh
}
if q == nil {
q = &QueryOptions{}
}
if q.Params == nil {
q.Params = make(map[string]string)
}
q.Params["follow"] = strconv.FormatBool(follow)
q.Params["task"] = task
q.Params["type"] = logType
q.Params["origin"] = origin
q.Params["offset"] = strconv.FormatInt(offset, 10)
reqPath := fmt.Sprintf("/v1/client/fs/logs/%s", alloc.ID)
r, err := nodeClient.rawQuery(reqPath, q)
if err != nil {
// There was a networking error when talking directly to the client.
if _, ok := err.(net.Error); !ok {
errCh <- err
return nil, errCh
}
// Try via the server
r, err = a.client.rawQuery(reqPath, q)
if err != nil {
errCh <- err
return nil, errCh
}
}
// Create the output channel
frames := make(chan *StreamFrame, 10)