Merge pull request #8131 from hashicorp/f-snapshot-restore

Implement snapshot restore
This commit is contained in:
Mahmood Ali
2020-06-15 08:32:34 -04:00
committed by GitHub
19 changed files with 920 additions and 37 deletions

View File

@@ -943,8 +943,15 @@ func decodeBody(resp *http.Response, out interface{}) error {
}
}
// encodeBody is used to encode a request body
// encodeBody prepares the reader to serve as the request body.
//
// Returns the `obj` input if it is a raw io.Reader object; otherwise
// returns a reader of the json format of the passed argument.
func encodeBody(obj interface{}) (io.Reader, error) {
if reader, ok := obj.(io.Reader); ok {
return reader, nil
}
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
if err := enc.Encode(obj); err != nil {

View File

@@ -222,6 +222,17 @@ func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) {
return cr, nil
}
// SnapshotRestore is used to restore a running nomad cluster to an original
// state.
func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) {
wm, err := op.c.write("/v1/operator/snapshot", in, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
type License struct {
// The unique identifier of the license
LicenseID string

View File

@@ -14,6 +14,7 @@ import (
"github.com/hashicorp/consul/agent/consul/autopilot"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/nomad/api"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
)
@@ -292,6 +293,8 @@ func (s *HTTPServer) SnapshotRequest(resp http.ResponseWriter, req *http.Request
switch req.Method {
case "GET":
return s.snapshotSaveRequest(resp, req)
case "PUT", "POST":
return s.snapshotRestoreRequest(resp, req)
default:
return nil, CodedError(405, ErrInvalidMethod)
}
@@ -331,7 +334,7 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req
httpPipe.Close()
}()
errCh := make(chan HTTPCodedError, 1)
errCh := make(chan HTTPCodedError, 2)
go func() {
defer cancel()
@@ -372,3 +375,91 @@ func (s *HTTPServer) snapshotSaveRequest(resp http.ResponseWriter, req *http.Req
return nil, codedErr
}
func (s *HTTPServer) snapshotRestoreRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
args := &structs.SnapshotRestoreRequest{}
s.parseWriteRequest(req, &args.WriteRequest)
var handler structs.StreamingRpcHandler
var handlerErr error
if server := s.agent.Server(); server != nil {
handler, handlerErr = server.StreamingRpcHandler("Operator.SnapshotRestore")
} else if client := s.agent.Client(); client != nil {
handler, handlerErr = client.RemoteStreamingRpcHandler("Operator.SnapshotRestore")
} else {
handlerErr = fmt.Errorf("misconfigured connection")
}
if handlerErr != nil {
return nil, CodedError(500, handlerErr.Error())
}
httpPipe, handlerPipe := net.Pipe()
decoder := codec.NewDecoder(httpPipe, structs.MsgpackHandle)
encoder := codec.NewEncoder(httpPipe, structs.MsgpackHandle)
// Create a goroutine that closes the pipe if the connection closes.
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
go func() {
<-ctx.Done()
httpPipe.Close()
}()
errCh := make(chan HTTPCodedError, 2)
go func() {
defer cancel()
// Send the request
if err := encoder.Encode(args); err != nil {
errCh <- CodedError(500, err.Error())
return
}
go func() {
var wrapper cstructs.StreamErrWrapper
bytes := make([]byte, 1024)
for {
n, err := req.Body.Read(bytes)
if n > 0 {
wrapper.Payload = bytes[:n]
err := encoder.Encode(wrapper)
if err != nil {
errCh <- CodedError(500, err.Error())
return
}
}
if err != nil {
wrapper.Payload = nil
wrapper.Error = &cstructs.RpcError{Message: err.Error()}
err := encoder.Encode(wrapper)
if err != nil {
errCh <- CodedError(500, err.Error())
}
return
}
}
}()
var res structs.SnapshotRestoreResponse
if err := decoder.Decode(&res); err != nil {
errCh <- CodedError(500, err.Error())
return
}
if res.ErrorMsg != "" {
errCh <- CodedError(res.ErrorCode, res.ErrorMsg)
return
}
errCh <- nil
}()
handler(handlerPipe)
cancel()
codedErr := <-errCh
return nil, codedErr
}

View File

@@ -11,12 +11,14 @@ import (
"net/http/httptest"
"os"
"path"
"path/filepath"
"strings"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -389,14 +391,17 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
})
}
func TestOperator_SnapshotSaveRequest(t *testing.T) {
func TestOperator_SnapshotRequests(t *testing.T) {
t.Parallel()
////// Nomad clusters topology - not specific to test
dir, err := ioutil.TempDir("", "nomadtest-operator-")
require.NoError(t, err)
defer os.RemoveAll(dir)
snapshotPath := filepath.Join(dir, "snapshot.bin")
job := mock.Job()
// test snapshot generation
httpTest(t, func(c *Config) {
c.Server.BootstrapExpect = 1
c.DevMode = false
@@ -404,10 +409,26 @@ func TestOperator_SnapshotSaveRequest(t *testing.T) {
c.AdvertiseAddrs.HTTP = "127.0.0.1"
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.AdvertiseAddrs.Serf = "127.0.0.1"
// don't actually run the job
c.Client.Enabled = false
}, func(s *TestAgent) {
// make a simple update
jargs := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var jresp structs.JobRegisterResponse
err := s.Agent.RPC("Job.Register", &jargs, &jresp)
require.NoError(t, err)
// now actually snapshot
req, _ := http.NewRequest("GET", "/v1/operator/snapshot", nil)
resp := httptest.NewRecorder()
_, err := s.Server.SnapshotRequest(resp, req)
_, err = s.Server.SnapshotRequest(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code)
@@ -416,11 +437,52 @@ func TestOperator_SnapshotSaveRequest(t *testing.T) {
require.Contains(t, digest, "sha-256=")
hash := sha256.New()
_, err = io.Copy(hash, resp.Body)
f, err := os.Create(snapshotPath)
require.NoError(t, err)
defer f.Close()
_, err = io.Copy(io.MultiWriter(f, hash), resp.Body)
require.NoError(t, err)
expectedChecksum := "sha-256=" + base64.StdEncoding.EncodeToString(hash.Sum(nil))
require.Equal(t, digest, expectedChecksum)
})
// test snapshot restoration
httpTest(t, func(c *Config) {
c.Server.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server2")
c.AdvertiseAddrs.HTTP = "127.0.0.1"
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.AdvertiseAddrs.Serf = "127.0.0.1"
// don't actually run the job
c.Client.Enabled = false
}, func(s *TestAgent) {
jobExists := func() bool {
// check job isn't present
req, _ := http.NewRequest("GET", "/v1/job/"+job.ID, nil)
resp := httptest.NewRecorder()
j, _ := s.Server.jobCRUD(resp, req, job.ID)
return j != nil
}
// job doesn't get initially
require.False(t, jobExists())
// restrore and check if job exists after
f, err := os.Open(snapshotPath)
require.NoError(t, err)
defer f.Close()
req, _ := http.NewRequest("PUT", "/v1/operator/snapshot", f)
resp := httptest.NewRecorder()
_, err = s.Server.SnapshotRequest(resp, req)
require.NoError(t, err)
require.Equal(t, 200, resp.Code)
require.True(t, jobExists())
})
}

View File

@@ -223,7 +223,8 @@ RETRY:
func (a *TestAgent) start() (*Agent, error) {
if a.LogOutput == nil {
a.LogOutput = testlog.NewWriter(a.T)
prefix := fmt.Sprintf("%v:%v ", a.Config.BindAddr, a.Config.Ports.RPC)
a.LogOutput = testlog.NewPrefixWriter(a.T, prefix)
}
inm := metrics.NewInmemSink(10*time.Second, time.Minute)

View File

@@ -517,6 +517,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"operator snapshot restore": func() (cli.Command, error) {
return &OperatorSnapshotRestoreCommand{
Meta: meta,
}, nil
},
"plan": func() (cli.Command, error) {
return &JobPlanCommand{

View File

@@ -6,6 +6,7 @@ import (
"path/filepath"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/command/agent"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
@@ -14,7 +15,7 @@ import (
func TestOperatorSnapshotInspect_Works(t *testing.T) {
t.Parallel()
snapPath := generateSnapshotFile(t)
snapPath := generateSnapshotFile(t, nil)
ui := new(cli.MockUi)
cmd := &OperatorSnapshotInspectCommand{Meta: Meta{Ui: ui}}
@@ -67,14 +68,14 @@ func TestOperatorSnapshotInspect_HandlesFailure(t *testing.T) {
}
func generateSnapshotFile(t *testing.T) string {
func generateSnapshotFile(t *testing.T, prepare func(srv *agent.TestAgent, client *api.Client, url string)) string {
tmpDir, err := ioutil.TempDir("", "nomad-tempdir")
require.NoError(t, err)
t.Cleanup(func() { os.RemoveAll(tmpDir) })
srv, _, url := testServer(t, false, func(c *agent.Config) {
srv, api, url := testServer(t, false, func(c *agent.Config) {
c.DevMode = false
c.DataDir = filepath.Join(tmpDir, "server")
@@ -85,6 +86,10 @@ func generateSnapshotFile(t *testing.T) string {
defer srv.Shutdown()
if prepare != nil {
prepare(srv, api, url)
}
ui := new(cli.MockUi)
cmd := &OperatorSnapshotSaveCommand{Meta: Meta{Ui: ui}}

View File

@@ -0,0 +1,95 @@
package command
import (
"fmt"
"os"
"strings"
"github.com/hashicorp/nomad/api"
"github.com/posener/complete"
)
type OperatorSnapshotRestoreCommand struct {
Meta
}
func (c *OperatorSnapshotRestoreCommand) Help() string {
helpText := `
Usage: nomad operator snapshot restore [options] FILE
Restores an atomic, point-in-time snapshot of the state of the Nomad servers
which includes jobs, nodes, allocations, periodic jobs, and ACLs.
Restores involve a potentially dangerous low-level Raft operation that is not
designed to handle server failures during a restore. This command is primarily
intended to be used when recovering from a disaster, restoring into a fresh
cluster of Nomad servers.
If ACLs are enabled, a management token must be supplied in order to perform
snapshot operations.
To restore a snapshot from the file "backup.snap":
$ nomad operator snapshot restore backup.snap
General Options:
` + generalOptionsUsage()
return strings.TrimSpace(helpText)
}
func (c *OperatorSnapshotRestoreCommand) AutocompleteFlags() complete.Flags {
return c.Meta.AutocompleteFlags(FlagSetClient)
}
func (c *OperatorSnapshotRestoreCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictNothing
}
func (c *OperatorSnapshotRestoreCommand) Synopsis() string {
return "Restore snapshot of Nomad server state"
}
func (c *OperatorSnapshotRestoreCommand) Name() string { return "operator snapshot restore" }
func (c *OperatorSnapshotRestoreCommand) Run(args []string) int {
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
flags.Usage = func() { c.Ui.Output(c.Help()) }
if err := flags.Parse(args); err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse args: %v", err))
return 1
}
// Check for misuse
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <filename>")
c.Ui.Error(commandErrorText(c))
return 1
}
snap, err := os.Open(args[0])
if err != nil {
c.Ui.Error(fmt.Sprintf("Error opening snapshot file: %q", err))
return 1
}
defer snap.Close()
// Set up a client.
client, err := c.Meta.Client()
if err != nil {
c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err))
return 1
}
// Call snapshot restore API with backup file.
_, err = client.Operator().SnapshotRestore(snap, &api.WriteOptions{})
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to get restore snapshot: %v", err))
return 1
}
c.Ui.Output("Snapshot Restored")
return 0
}

View File

@@ -0,0 +1,95 @@
package command
import (
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestOperatorSnapshotRestore_Works(t *testing.T) {
t.Parallel()
tmpDir, err := ioutil.TempDir("", "nomad-tempdir")
require.NoError(t, err)
defer os.RemoveAll(tmpDir)
snapshotPath := generateSnapshotFile(t, func(srv *agent.TestAgent, client *api.Client, url string) {
sampleJob := `
job "snapshot-test-job" {
type = "service"
datacenters = [ "dc1" ]
group "group1" {
count = 1
task "task1" {
driver = "exec"
resources = {
cpu = 1000
memory = 512
}
}
}
}`
ui := new(cli.MockUi)
cmd := &JobRunCommand{Meta: Meta{Ui: ui}}
cmd.JobGetter.testStdin = strings.NewReader(sampleJob)
code := cmd.Run([]string{"--address=" + url, "-detach", "-"})
require.Zero(t, code)
})
srv, _, url := testServer(t, false, func(c *agent.Config) {
c.DevMode = false
c.DataDir = filepath.Join(tmpDir, "server1")
c.AdvertiseAddrs.HTTP = "127.0.0.1"
c.AdvertiseAddrs.RPC = "127.0.0.1"
c.AdvertiseAddrs.Serf = "127.0.0.1"
})
defer srv.Shutdown()
// job is not found before restore
j, err := srv.Agent.Server().State().JobByID(nil, structs.DefaultNamespace, "snapshot-test-job")
require.NoError(t, err)
require.Nil(t, j)
ui := new(cli.MockUi)
cmd := &OperatorSnapshotRestoreCommand{Meta: Meta{Ui: ui}}
code := cmd.Run([]string{"--address=" + url, snapshotPath})
require.Empty(t, ui.ErrorWriter.String())
require.Zero(t, code)
require.Contains(t, ui.OutputWriter.String(), "Snapshot Restored")
foundJob, err := srv.Agent.Server().State().JobByID(nil, structs.DefaultNamespace, "snapshot-test-job")
require.NoError(t, err)
require.Equal(t, "snapshot-test-job", foundJob.ID)
}
func TestOperatorSnapshotRestore_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &OperatorSnapshotRestoreCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
code := cmd.Run([]string{"some", "bad", "args"})
require.Equal(t, 1, code)
require.Contains(t, ui.ErrorWriter.String(), commandErrorText(cmd))
ui.ErrorWriter.Reset()
// Fails when specified file does not exist
code = cmd.Run([]string{"/unicorns/leprechauns"})
require.Equal(t, 1, code)
require.Contains(t, ui.ErrorWriter.String(), "no such file")
}

View File

@@ -49,3 +49,21 @@ func TestOperatorSnapshotSave_Works(t *testing.T) {
require.NoError(t, err)
require.NotZero(t, meta.Index)
}
func TestOperatorSnapshotSave_Fails(t *testing.T) {
t.Parallel()
ui := new(cli.MockUi)
cmd := &OperatorSnapshotSaveCommand{Meta: Meta{Ui: ui}}
// Fails on misuse
code := cmd.Run([]string{"some", "bad", "args"})
require.Equal(t, 1, code)
require.Contains(t, ui.ErrorWriter.String(), commandErrorText(cmd))
ui.ErrorWriter.Reset()
// Fails when specified file does not exist
code = cmd.Run([]string{"/unicorns/leprechauns"})
require.Equal(t, 1, code)
require.Contains(t, ui.ErrorWriter.String(), "no such file")
}

View File

@@ -71,12 +71,15 @@ func (w *prefixStderr) Write(p []byte) (int, error) {
}
// Skip prefix if only writing whitespace
if len(bytes.TrimSpace(p)) > 0 {
_, err := os.Stderr.Write(w.prefix)
if err != nil {
return 0, err
}
if len(bytes.TrimSpace(p)) == 0 {
return os.Stderr.Write(p)
}
return os.Stderr.Write(p)
// decrease likely hood of partial line writes that may mess up test
// indicator success detection
buf := make([]byte, 0, len(w.prefix)+len(p))
buf = append(buf, w.prefix...)
buf = append(buf, p...)
return os.Stderr.Write(buf)
}

View File

@@ -189,6 +189,26 @@ WAIT:
goto RECONCILE
case member := <-reconcileCh:
s.reconcileMember(member)
case errCh := <-s.reassertLeaderCh:
// Recompute leader state, by asserting leadership and
// repopulating leader states.
// Check first if we are indeed the leaders first. We
// can get into this state when the initial
// establishLeadership has failed.
// Afterwards we will be waiting for the interval to
// trigger a reconciliation and can potentially end up
// here. There is no point to reassert because this
// agent was never leader in the first place.
if !establishedLeader {
errCh <- fmt.Errorf("leadership has not been established")
continue
}
// refresh leadership state
s.revokeLeadership()
err := s.establishLeadership(stopCh)
errCh <- err
}
}
}

View File

@@ -1,14 +1,17 @@
package nomad
import (
"errors"
"fmt"
"io"
"net"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/consul/agent/consul/autopilot"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/raft"
@@ -23,6 +26,7 @@ type Operator struct {
func (op *Operator) register() {
op.srv.streamingRpcs.Register("Operator.SnapshotSave", op.snapshotSave)
op.srv.streamingRpcs.Register("Operator.SnapshotRestore", op.snapshotRestore)
}
// RaftGetConfiguration is used to retrieve the current Raft configuration.
@@ -459,8 +463,7 @@ func (op *Operator) snapshotSave(conn io.ReadWriteCloser) {
}
defer snap.Close()
enc := codec.NewEncoder(conn, structs.MsgpackHandle)
if err := enc.Encode(&reply); err != nil {
if err := encoder.Encode(&reply); err != nil {
handleFailure(500, fmt.Errorf("failed to encode response: %v", err))
return
}
@@ -470,3 +473,156 @@ func (op *Operator) snapshotSave(conn io.ReadWriteCloser) {
}
}
}
func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) {
defer conn.Close()
var args structs.SnapshotRestoreRequest
var reply structs.SnapshotRestoreResponse
decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
handleFailure := func(code int, err error) {
encoder.Encode(&structs.SnapshotRestoreResponse{
ErrorCode: code,
ErrorMsg: err.Error(),
})
}
if err := decoder.Decode(&args); err != nil {
handleFailure(500, err)
return
}
// Forward to appropriate region
if args.Region != op.srv.Region() {
err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotRestore", args, conn)
if err != nil {
handleFailure(500, err)
}
return
}
// forward to leader
remoteServer, err := op.srv.getLeaderForRPC()
if err != nil {
handleFailure(500, err)
return
}
if remoteServer != nil {
err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotRestore", args, conn)
if err != nil {
handleFailure(500, err)
}
return
}
// Check agent permissions
if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
code := 500
if err == structs.ErrTokenNotFound {
code = 400
}
handleFailure(code, err)
return
} else if aclObj != nil && !aclObj.IsManagement() {
handleFailure(403, structs.ErrPermissionDenied)
return
}
op.srv.setQueryMeta(&reply.QueryMeta)
reader, errCh := decodeStreamOutput(decoder)
err = snapshot.Restore(op.logger.Named("snapshot"), reader, op.srv.raft)
if err != nil {
handleFailure(500, fmt.Errorf("failed to restore from snapshot: %v", err))
return
}
err = <-errCh
if err != nil {
handleFailure(400, fmt.Errorf("failed to read stream: %v", err))
return
}
// This'll be used for feedback from the leader loop.
timeoutCh := time.After(time.Minute)
lerrCh := make(chan error, 1)
select {
// Reassert leader actions and update all leader related state
// with new state store content.
case op.srv.reassertLeaderCh <- lerrCh:
// We might have lost leadership while waiting to kick the loop.
case <-timeoutCh:
handleFailure(500, fmt.Errorf("timed out waiting to re-run leader actions"))
// Make sure we don't get stuck during shutdown
case <-op.srv.shutdownCh:
}
select {
// Wait for the leader loop to finish up.
case err := <-lerrCh:
if err != nil {
handleFailure(500, err)
return
}
// We might have lost leadership while the loop was doing its
// thing.
case <-timeoutCh:
handleFailure(500, fmt.Errorf("timed out waiting for re-run of leader actions"))
// Make sure we don't get stuck during shutdown
case <-op.srv.shutdownCh:
}
reply.Index, _ = op.srv.State().LatestIndex()
op.srv.setQueryMeta(&reply.QueryMeta)
encoder.Encode(reply)
}
func decodeStreamOutput(decoder *codec.Decoder) (io.Reader, <-chan error) {
pr, pw := io.Pipe()
errCh := make(chan error, 1)
go func() {
defer close(errCh)
for {
var wrapper cstructs.StreamErrWrapper
err := decoder.Decode(&wrapper)
if err != nil {
pw.CloseWithError(fmt.Errorf("failed to decode input: %v", err))
errCh <- err
return
}
if len(wrapper.Payload) != 0 {
_, err = pw.Write(wrapper.Payload)
if err != nil {
pw.CloseWithError(err)
errCh <- err
return
}
}
if errW := wrapper.Error; errW != nil {
if errW.Message == io.EOF.Error() {
pw.CloseWithError(io.EOF)
} else {
pw.CloseWithError(errors.New(errW.Message))
}
return
}
}
}()
return pr, errCh
}

View File

@@ -12,10 +12,12 @@ import (
"reflect"
"strings"
"testing"
"time"
"github.com/hashicorp/go-msgpack/codec"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/acl"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/snapshot"
"github.com/hashicorp/nomad/helper/uuid"
@@ -714,3 +716,268 @@ func TestOperator_SnapshotSave_ACL(t *testing.T) {
})
}
}
func TestOperator_SnapshotRestore(t *testing.T) {
targets := []string{"leader", "non_leader", "remote_region"}
for _, c := range targets {
t.Run(c, func(t *testing.T) {
snap, job := generateSnapshot(t)
checkFn := func(t *testing.T, s *Server) {
found, err := s.State().JobByID(nil, job.Namespace, job.ID)
require.NoError(t, err)
require.Equal(t, job.ID, found.ID)
}
var req structs.SnapshotRestoreRequest
req.Region = "global"
testRestoreSnapshot(t, &req, snap, c, checkFn)
})
}
}
func generateSnapshot(t *testing.T) (*snapshot.Snapshot, *structs.Job) {
dir, err := ioutil.TempDir("", "nomadtest-operator-")
require.NoError(t, err)
t.Cleanup(func() { os.RemoveAll(dir) })
s, cleanup := TestServer(t, func(c *Config) {
c.BootstrapExpect = 1
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
})
defer cleanup()
job := mock.Job()
jobReq := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var jobResp structs.JobRegisterResponse
codec := rpcClient(t, s)
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
require.NoError(t, err)
err = s.State().UpsertJob(1000, job)
require.NoError(t, err)
snapshot, err := snapshot.New(s.logger, s.raft)
require.NoError(t, err)
t.Cleanup(func() { snapshot.Close() })
return snapshot, job
}
func testRestoreSnapshot(t *testing.T, req *structs.SnapshotRestoreRequest, snapshot io.Reader, target string,
assertionFn func(t *testing.T, server *Server)) {
////// Nomad clusters topology - not specific to test
dir, err := ioutil.TempDir("", "nomadtest-operator-")
require.NoError(t, err)
defer os.RemoveAll(dir)
server1, cleanupLS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server1")
// increase times outs to account for I/O operations that
// snapshot restore performs - some of which require sync calls
c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
c.RaftConfig.HeartbeatTimeout = 1 * time.Second
c.RaftConfig.ElectionTimeout = 1 * time.Second
c.RaftTimeout = 5 * time.Second
})
defer cleanupLS()
server2, cleanupRS := TestServer(t, func(c *Config) {
c.BootstrapExpect = 2
c.DevMode = false
c.DataDir = path.Join(dir, "server2")
// increase times outs to account for I/O operations that
// snapshot restore performs - some of which require sync calls
c.RaftConfig.LeaderLeaseTimeout = 1 * time.Second
c.RaftConfig.HeartbeatTimeout = 1 * time.Second
c.RaftConfig.ElectionTimeout = 1 * time.Second
c.RaftTimeout = 5 * time.Second
})
defer cleanupRS()
remoteRegionServer, cleanupRRS := TestServer(t, func(c *Config) {
c.Region = "two"
c.DevMode = false
c.DataDir = path.Join(dir, "remote_region_server")
})
defer cleanupRRS()
TestJoin(t, server1, server2)
TestJoin(t, server1, remoteRegionServer)
testutil.WaitForLeader(t, server1.RPC)
testutil.WaitForLeader(t, server2.RPC)
testutil.WaitForLeader(t, remoteRegionServer.RPC)
leader, nonLeader := server1, server2
if server2.IsLeader() {
leader, nonLeader = server2, server1
}
///////// Actually run query now
mapping := map[string]*Server{
"leader": leader,
"non_leader": nonLeader,
"remote_region": remoteRegionServer,
}
server := mapping[target]
require.NotNil(t, server, "target not found")
handler, err := server.StreamingRpcHandler("Operator.SnapshotRestore")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var resp structs.SnapshotRestoreResponse
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(req)
require.NoError(t, err)
buf := make([]byte, 1024)
for {
n, err := snapshot.Read(buf)
if n > 0 {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]}))
}
if err != nil {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}}))
break
}
}
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
require.Empty(t, resp.ErrorMsg)
t.Run("checking leader state", func(t *testing.T) {
assertionFn(t, leader)
})
t.Run("checking nonleader state", func(t *testing.T) {
assertionFn(t, leader)
})
}
func TestOperator_SnapshotRestore_ACL(t *testing.T) {
t.Parallel()
dir, err := ioutil.TempDir("", "nomadtest-operator-")
require.NoError(t, err)
defer os.RemoveAll(dir)
///////// Actually run query now
cases := []struct {
name string
errCode int
err error
}{
{"root", 0, nil},
{"no_permission_token", 403, structs.ErrPermissionDenied},
{"invalid token", 400, structs.ErrTokenNotFound},
{"unauthenticated", 403, structs.ErrPermissionDenied},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
snapshot, _ := generateSnapshot(t)
s, root, cleanupLS := TestACLServer(t, func(cfg *Config) {
cfg.BootstrapExpect = 1
cfg.DevMode = false
cfg.DataDir = path.Join(dir, "server_"+c.name)
})
defer cleanupLS()
testutil.WaitForLeader(t, s.RPC)
deniedToken := mock.CreatePolicyAndToken(t, s.fsm.State(), 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
token := ""
switch c.name {
case "root":
token = root.SecretID
case "no_permission_token":
token = deniedToken.SecretID
case "invalid token":
token = uuid.Generate()
case "unauthenticated":
token = ""
default:
t.Fatalf("unexpected case: %v", c.name)
}
handler, err := s.StreamingRpcHandler("Operator.SnapshotRestore")
require.NoError(t, err)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// start handler
go handler(p2)
var req structs.SnapshotRestoreRequest
var resp structs.SnapshotRestoreResponse
req.Region = "global"
req.AuthToken = token
// send request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
err = encoder.Encode(&req)
require.NoError(t, err)
if c.err == nil {
buf := make([]byte, 1024)
for {
n, err := snapshot.Read(buf)
if n > 0 {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Payload: buf[:n]}))
}
if err != nil {
require.NoError(t, encoder.Encode(&cstructs.StreamErrWrapper{Error: &cstructs.RpcError{Message: err.Error()}}))
break
}
}
}
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
err = decoder.Decode(&resp)
require.NoError(t, err)
// streaming errors appear as a response rather than a returned error
if c.err != nil {
require.Equal(t, c.err.Error(), resp.ErrorMsg)
require.Equal(t, c.errCode, resp.ErrorCode)
return
}
require.NotZero(t, resp.Index)
io.Copy(ioutil.Discard, p1)
})
}
}

View File

@@ -108,6 +108,14 @@ type Server struct {
raftInmem *raft.InmemStore
raftTransport *raft.NetworkTransport
// reassertLeaderCh is used to signal that the leader loop must
// re-establish leadership.
//
// This might be relevant in snapshot restores, where leader in-memory
// state changed significantly such that leader state (e.g. periodic
// jobs, eval brokers) need to be recomputed.
reassertLeaderCh chan chan error
// autopilot is the Autopilot instance for this server.
autopilot *autopilot.Autopilot
@@ -312,22 +320,23 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulACLs consu
// Create the server
s := &Server{
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
config: config,
consulCatalog: consulCatalog,
connPool: pool.NewPool(logger, serverRPCCache, serverMaxStreams, tlsWrap),
logger: logger,
tlsWrap: tlsWrap,
rpcServer: rpc.NewServer(),
streamingRpcs: structs.NewStreamingRpcRegistry(),
nodeConns: make(map[string][]*nodeConnState),
peers: make(map[string][]*serverParts),
localPeers: make(map[raft.ServerAddress]*serverParts),
reassertLeaderCh: make(chan chan error),
reconcileCh: make(chan serf.Member, 32),
eventCh: make(chan serf.Event, 256),
evalBroker: evalBroker,
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
}
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())

View File

@@ -246,3 +246,14 @@ type SnapshotSaveResponse struct {
QueryMeta
}
type SnapshotRestoreRequest struct {
WriteRequest
}
type SnapshotRestoreResponse struct {
ErrorCode int `codec:",omitempty"`
ErrorMsg string `codec:",omitempty"`
QueryMeta
}

View File

@@ -4,12 +4,14 @@ import (
"fmt"
"math/rand"
"net"
"os"
"sync/atomic"
"time"
testing "github.com/mitchellh/go-testing-interface"
"github.com/pkg/errors"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/pluginutils/catalog"
@@ -49,6 +51,19 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
nodeNum := atomic.AddUint32(&nodeNumber, 1)
config.NodeName = fmt.Sprintf("nomad-%03d", nodeNum)
// configer logger
level := hclog.Trace
if envLogLevel := os.Getenv("NOMAD_TEST_LOG_LEVEL"); envLogLevel != "" {
level = hclog.LevelFromString(envLogLevel)
}
opts := &hclog.LoggerOptions{
Level: level,
Output: testlog.NewPrefixWriter(t, config.NodeName+" "),
IncludeLocation: true,
}
config.Logger = hclog.NewInterceptLogger(opts)
config.LogOutput = opts.Output
// Tighten the Serf timing
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
config.SerfConfig.MemberlistConfig.SuspicionMult = 2
@@ -67,9 +82,6 @@ func TestServer(t testing.T, cb func(*Config)) (*Server, func()) {
f := false
config.VaultConfig.Enabled = &f
// Squelch output when -v isn't specified
config.LogOutput = testlog.NewWriter(t)
// Tighten the autopilot timing
config.AutopilotConfig.ServerStabilizationTime = 100 * time.Millisecond
config.ServerHealthInterval = 50 * time.Millisecond

View File

@@ -945,6 +945,10 @@ func decodeBody(resp *http.Response, out interface{}) error {
// encodeBody is used to encode a request body
func encodeBody(obj interface{}) (io.Reader, error) {
if reader, ok := obj.(io.Reader); ok {
return reader, nil
}
buf := bytes.NewBuffer(nil)
enc := json.NewEncoder(buf)
if err := enc.Encode(obj); err != nil {

View File

@@ -222,6 +222,17 @@ func (op *Operator) Snapshot(q *QueryOptions) (io.ReadCloser, error) {
return cr, nil
}
// SnapshotRestore is used to restore a running nomad cluster to an original
// state.
func (op *Operator) SnapshotRestore(in io.Reader, q *WriteOptions) (*WriteMeta, error) {
wm, err := op.c.write("/v1/operator/snapshot", in, nil, q)
if err != nil {
return nil, err
}
return wm, nil
}
type License struct {
// The unique identifier of the license
LicenseID string