Merge pull request #826 from hashicorp/f-compress-raft

LZW compress raft entries
This commit is contained in:
Alex Dadgar
2016-02-20 16:03:05 -08:00
5 changed files with 187 additions and 86 deletions

View File

@@ -112,6 +112,7 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
ignoreUnknown = true
}
PARSE_TYPE:
switch msgType {
case structs.NodeRegisterRequestType:
return n.applyUpsertNode(buf[1:], log.Index)
@@ -133,6 +134,16 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAllocUpdate(buf[1:], log.Index)
case structs.AllocClientUpdateRequestType:
return n.applyAllocClientUpdate(buf[1:], log.Index)
case structs.CompressedRequestType:
decomp, err := structs.Uncompress(buf[1:])
if err != nil {
panic(fmt.Errorf("failed to decompress request: %#v", buf))
}
// Store the inner message type and buffer and re-enter switch
msgType = structs.MessageType(decomp[0])
buf = decomp
goto PARSE_TYPE
default:
if ignoreUnknown {
n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)

View File

@@ -73,7 +73,7 @@ func TestFSM_UpsertNode(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: mock.Node(),
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -109,7 +109,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -122,7 +122,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
req2 := structs.NodeDeregisterRequest{
NodeID: node.ID,
}
buf, err = structs.Encode(structs.NodeDeregisterRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -150,7 +150,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -169,7 +169,7 @@ func TestFSM_UpdateNodeStatus(t *testing.T) {
NodeID: node.ID,
Status: structs.NodeStatusReady,
}
buf, err = structs.Encode(structs.NodeUpdateStatusRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeUpdateStatusRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -207,7 +207,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
req := structs.NodeRegisterRequest{
Node: node,
}
buf, err := structs.Encode(structs.NodeRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.NodeRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -221,7 +221,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
NodeID: node.ID,
Drain: true,
}
buf, err = structs.Encode(structs.NodeUpdateDrainRequestType, req2)
buf, err = structs.EncodeCompressed(structs.NodeUpdateDrainRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -248,7 +248,7 @@ func TestFSM_RegisterJob(t *testing.T) {
req := structs.JobRegisterRequest{
Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -295,7 +295,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
req := structs.JobRegisterRequest{
Job: job,
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
buf, err := structs.EncodeCompressed(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -308,7 +308,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
req2 := structs.JobDeregisterRequest{
JobID: job.ID,
}
buf, err = structs.Encode(structs.JobDeregisterRequestType, req2)
buf, err = structs.EncodeCompressed(structs.JobDeregisterRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -349,7 +349,7 @@ func TestFSM_UpdateEval(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{mock.Eval()},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -390,7 +390,7 @@ func TestFSM_UpdateEval_Blocked(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -432,7 +432,7 @@ func TestFSM_DeleteEval(t *testing.T) {
req := structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
buf, err := structs.Encode(structs.EvalUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.EvalUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -445,7 +445,7 @@ func TestFSM_DeleteEval(t *testing.T) {
req2 := structs.EvalDeleteRequest{
Evals: []string{eval.ID},
}
buf, err = structs.Encode(structs.EvalDeleteRequestType, req2)
buf, err = structs.EncodeCompressed(structs.EvalDeleteRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -472,7 +472,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{alloc},
}
buf, err := structs.Encode(structs.AllocUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -500,7 +500,7 @@ func TestFSM_UpsertAllocs(t *testing.T) {
req2 := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{evictAlloc},
}
buf, err = structs.Encode(structs.AllocUpdateRequestType, req2)
buf, err = structs.EncodeCompressed(structs.AllocUpdateRequestType, req2)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -550,7 +550,7 @@ func TestFSM_UpdateAllocFromClient(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -597,7 +597,7 @@ func TestFSM_UpdateAllocFromClient_Unblock(t *testing.T) {
req := structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
}
buf, err := structs.Encode(structs.AllocClientUpdateRequestType, req)
buf, err := structs.EncodeCompressed(structs.AllocClientUpdateRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}

View File

@@ -242,7 +242,7 @@ func (s *Server) forwardRegion(region, method string, args interface{}, reply in
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.
func (s *Server) raftApplyFuture(t structs.MessageType, msg interface{}) (raft.ApplyFuture, error) {
buf, err := structs.Encode(t, msg)
buf, err := structs.EncodeCompressed(t, msg)
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}

View File

@@ -2,6 +2,7 @@ package structs
import (
"bytes"
"compress/lzw"
"crypto/sha1"
"errors"
"fmt"
@@ -38,6 +39,7 @@ const (
EvalDeleteRequestType
AllocUpdateRequestType
AllocClientUpdateRequestType
CompressedRequestType
)
const (
@@ -2500,6 +2502,53 @@ var MsgpackHandle = func() *codec.MsgpackHandle {
return h
}()
// EncodeCompressed encodes and compresses the passed payload. The compressed
// payload is prefixed with the CompressedRequestType header byte.
func EncodeCompressed(t MessageType, msg interface{}) ([]byte, error) {
// Create a buffer that will store in its first byte the compressed
// header type and in its following bytes the compressed payload.
var buf bytes.Buffer
buf.WriteByte(uint8(CompressedRequestType))
// Create the compressed writer to compress the user payload.
compWriter := lzw.NewWriter(&buf, lzw.LSB, 8)
// Encode the input.
encoded, err := Encode(t, msg)
if err != nil {
return nil, err
}
// Compress the encoded data.
if _, err := compWriter.Write(encoded); err != nil {
return nil, err
}
// Close the writer to ensure the data gets flushed.
if err := compWriter.Close(); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Uncompress uncompresses the compressed payload returned by EncodeCompressed
// stripped of the header byte. The decompressed payload can be decoded by
// calling Decode with the original message type.
func Uncompress(buf []byte) ([]byte, error) {
uncomp := lzw.NewReader(bytes.NewReader(buf), lzw.LSB, 8)
defer uncomp.Close()
// Read all the data
var b bytes.Buffer
if _, err := io.Copy(&b, uncomp); err != nil {
return nil, err
}
// Return the uncompressed bytes
return b.Bytes(), nil
}
// Decode is used to decode a MsgPack encoded object
func Decode(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), MsgpackHandle).Decode(out)

View File

@@ -9,6 +9,76 @@ import (
"github.com/hashicorp/go-multierror"
)
func testJob() *Job {
return &Job{
Region: "global",
ID: GenerateUUID(),
Name: "my-job",
Type: JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*Constraint{
&Constraint{
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
Periodic: &PeriodicConfig{
Enabled: false,
},
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
Count: 10,
RestartPolicy: &RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*Task{
&Task{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
},
},
Resources: &Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
DynamicPorts: []Port{{Label: "http"}},
},
},
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
}
}
func TestJob_Validate(t *testing.T) {
j := &Job{}
err := j.Validate()
@@ -94,73 +164,7 @@ func TestJob_Validate(t *testing.T) {
}
func TestJob_Copy(t *testing.T) {
j := &Job{
Region: "global",
ID: GenerateUUID(),
Name: "my-job",
Type: JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*Constraint{
&Constraint{
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
Periodic: &PeriodicConfig{
Enabled: false,
},
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
Count: 10,
RestartPolicy: &RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*Task{
&Task{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
},
},
Resources: &Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
DynamicPorts: []Port{{Label: "http"}},
},
},
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
}
j := testJob()
c := j.Copy()
if !reflect.DeepEqual(j, c) {
t.Fatalf("Copy() returned an unequal Job; got %#v; want %#v", c, j)
@@ -716,3 +720,40 @@ func TestRestartPolicy_Validate(t *testing.T) {
t.Fatalf("expect restart interval error, got: %v", err)
}
}
func TestEncodeCompressed(t *testing.T) {
// Create an input payload
msgType := JobRegisterRequestType
req := JobRegisterRequest{Job: testJob()}
// Encode and compress it
buf, err := EncodeCompressed(msgType, req)
if err != nil {
t.Fatalf("EncodeCompressed(%v, %#v) failed: %v", msgType, req, err)
}
if len(buf) == 0 {
t.Fatalf("EncodeCompressed(%v, %#v) returned empty", msgType, req)
}
// Uncompress and check data
decomp, err := Uncompress(buf[1:])
if err != nil {
t.Fatalf("Uncompress(%#v) errored: %v", decomp, err)
}
if len(decomp) < 2 {
t.Fatalf("Uncompress(%#v) returned too little: %#v", buf, decomp)
}
if act := MessageType(decomp[0]); act != msgType {
t.Fatalf("bad: received incorrect MessageType: %v", act)
}
// Decode payload
var decodedJob JobRegisterRequest
if err := Decode(decomp[1:], &decodedJob); err != nil {
t.Fatalf("Decode of uncompressed payload failed: %v", err)
}
if !reflect.DeepEqual(decodedJob, req) {
t.Fatalf("Decode failed: got %#v; want %#v", decodedJob, req)
}
}