mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
dynamic host volumes: initial state store implementation (#24353)
This changeset implements the state store schema for Dynamic Host Volumes, and methods used to query the state for RPCs. Ref: https://hashicorp.atlassian.net/browse/NET-11549
This commit is contained in:
42
nomad/mock/host_volumes.go
Normal file
42
nomad/mock/host_volumes.go
Normal file
@@ -0,0 +1,42 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package mock
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
func HostVolume() *structs.HostVolume {
|
||||
|
||||
volID := uuid.Generate()
|
||||
vol := &structs.HostVolume{
|
||||
Namespace: structs.DefaultNamespace,
|
||||
ID: volID,
|
||||
Name: "example",
|
||||
PluginID: "example-plugin",
|
||||
NodePool: structs.NodePoolDefault,
|
||||
NodeID: uuid.Generate(),
|
||||
Constraints: []*structs.Constraint{
|
||||
{
|
||||
LTarget: "${meta.rack}",
|
||||
RTarget: "r1",
|
||||
Operand: "=",
|
||||
},
|
||||
},
|
||||
RequestedCapacityMin: 100000,
|
||||
RequestedCapacityMax: 200000,
|
||||
Capacity: 150000,
|
||||
RequestedCapabilities: []*structs.HostVolumeCapability{
|
||||
{
|
||||
AttachmentMode: structs.HostVolumeAttachmentModeFilesystem,
|
||||
AccessMode: structs.HostVolumeAccessModeSingleNodeWriter,
|
||||
},
|
||||
},
|
||||
Parameters: map[string]string{"foo": "bar"},
|
||||
HostPath: "/var/data/nomad/alloc_mounts/" + volID,
|
||||
State: structs.HostVolumeStatePending,
|
||||
}
|
||||
return vol
|
||||
}
|
||||
@@ -26,6 +26,7 @@ const (
|
||||
TableACLBindingRules = "acl_binding_rules"
|
||||
TableAllocs = "allocs"
|
||||
TableJobSubmission = "job_submission"
|
||||
TableHostVolumes = "host_volumes"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -41,6 +42,7 @@ const (
|
||||
indexName = "name"
|
||||
indexSigningKey = "signing_key"
|
||||
indexAuthMethod = "auth_method"
|
||||
indexNodePool = "node_pool"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -97,6 +99,7 @@ func init() {
|
||||
aclRolesTableSchema,
|
||||
aclAuthMethodsTableSchema,
|
||||
bindingRulesTableSchema,
|
||||
hostVolumeTableSchema,
|
||||
}...)
|
||||
}
|
||||
|
||||
@@ -161,8 +164,8 @@ func nodeTableSchema() *memdb.TableSchema {
|
||||
Field: "SecretID",
|
||||
},
|
||||
},
|
||||
"node_pool": {
|
||||
Name: "node_pool",
|
||||
indexNodePool: {
|
||||
Name: indexNodePool,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
@@ -844,8 +847,8 @@ func vaultAccessorTableSchema() *memdb.TableSchema {
|
||||
},
|
||||
},
|
||||
|
||||
"node_id": {
|
||||
Name: "node_id",
|
||||
indexNodeID: {
|
||||
Name: indexNodeID,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
@@ -882,8 +885,8 @@ func siTokenAccessorTableSchema() *memdb.TableSchema {
|
||||
},
|
||||
},
|
||||
|
||||
"node_id": {
|
||||
Name: "node_id",
|
||||
indexNodeID: {
|
||||
Name: indexNodeID,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
@@ -1643,3 +1646,61 @@ func bindingRulesTableSchema() *memdb.TableSchema {
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// HostVolumes are identified by id globally, and searchable by namespace+name,
|
||||
// node, or node_pool
|
||||
func hostVolumeTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: TableHostVolumes,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
indexID: {
|
||||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Namespace",
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "ID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
indexName: {
|
||||
Name: indexName,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Namespace",
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Name",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
indexNodeID: {
|
||||
Name: indexNodeID,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "NodeID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
indexNodePool: {
|
||||
Name: indexNodePool,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "NodePool",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
188
nomad/state/state_store_host_volumes.go
Normal file
188
nomad/state/state_store_host_volumes.go
Normal file
@@ -0,0 +1,188 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// HostVolumeByID retrieve a specific host volume
|
||||
func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs bool) (*structs.HostVolume, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
obj, err := txn.First(TableHostVolumes, indexID, ns, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if obj == nil {
|
||||
return nil, nil
|
||||
}
|
||||
vol := obj.(*structs.HostVolume)
|
||||
if !withAllocs {
|
||||
return vol, nil
|
||||
}
|
||||
|
||||
vol = vol.Copy()
|
||||
vol.Allocations = []*structs.AllocListStub{}
|
||||
|
||||
// we can't use AllocsByNodeTerminal because we only want to filter out
|
||||
// allocs that are client-terminal, not server-terminal
|
||||
allocs, err := s.AllocsByNode(nil, vol.NodeID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not query allocs to check for host volume claims: %w", err)
|
||||
}
|
||||
for _, alloc := range allocs {
|
||||
if alloc.ClientTerminalStatus() {
|
||||
continue
|
||||
}
|
||||
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
|
||||
if volClaim.Type == structs.VolumeTypeHost && volClaim.Source == vol.Name {
|
||||
vol.Allocations = append(vol.Allocations, alloc.Stub(nil))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return vol, nil
|
||||
}
|
||||
|
||||
// UpsertHostVolumes upserts a set of host volumes
|
||||
func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolume) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, v := range volumes {
|
||||
if exists, err := s.namespaceExists(txn, v.Namespace); err != nil {
|
||||
return err
|
||||
} else if !exists {
|
||||
return fmt.Errorf("host volume %s is in nonexistent namespace %s", v.ID, v.Namespace)
|
||||
}
|
||||
|
||||
obj, err := txn.First(TableHostVolumes, indexID, v.Namespace, v.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if obj != nil {
|
||||
old := obj.(*structs.HostVolume)
|
||||
v.CreateIndex = old.CreateIndex
|
||||
v.CreateTime = old.CreateTime
|
||||
} else {
|
||||
v.CreateIndex = index
|
||||
}
|
||||
|
||||
// If the fingerprint is written from the node before the create RPC
|
||||
// handler completes, we'll never update from the initial pending , so
|
||||
// reconcile that here
|
||||
node, err := s.NodeByID(nil, v.NodeID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if node == nil {
|
||||
return fmt.Errorf("host volume %s has nonexistent node ID %s", v.ID, v.NodeID)
|
||||
}
|
||||
if _, ok := node.HostVolumes[v.Name]; ok {
|
||||
v.State = structs.HostVolumeStateReady
|
||||
}
|
||||
|
||||
// Allocations are denormalized on read, so we don't want these to be
|
||||
// written to the state store.
|
||||
v.Allocations = nil
|
||||
v.ModifyIndex = index
|
||||
|
||||
err = txn.Insert(TableHostVolumes, v)
|
||||
if err != nil {
|
||||
return fmt.Errorf("host volume insert: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert(tableIndex, &IndexEntry{TableHostVolumes, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %w", err)
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
}
|
||||
|
||||
// DeleteHostVolumes deletes a set of host volumes in the same namespace
|
||||
func (s *StateStore) DeleteHostVolumes(index uint64, ns string, ids []string) error {
|
||||
txn := s.db.WriteTxn(index)
|
||||
defer txn.Abort()
|
||||
|
||||
for _, id := range ids {
|
||||
|
||||
obj, err := txn.First(TableHostVolumes, indexID, ns, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if obj != nil {
|
||||
vol := obj.(*structs.HostVolume)
|
||||
|
||||
allocs, err := s.AllocsByNodeTerminal(nil, vol.NodeID, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not query allocs to check for host volume claims: %w", err)
|
||||
}
|
||||
for _, alloc := range allocs {
|
||||
for _, volClaim := range alloc.Job.LookupTaskGroup(alloc.TaskGroup).Volumes {
|
||||
if volClaim.Type == structs.VolumeTypeHost && volClaim.Name == vol.Name {
|
||||
return fmt.Errorf("could not delete volume %s in use by alloc %s",
|
||||
vol.ID, alloc.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
err = txn.Delete(TableHostVolumes, vol)
|
||||
if err != nil {
|
||||
return fmt.Errorf("host volume delete: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert(tableIndex, &IndexEntry{TableHostVolumes, index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %w", err)
|
||||
}
|
||||
|
||||
return txn.Commit()
|
||||
|
||||
}
|
||||
|
||||
// HostVolumes queries all the host volumes and is mostly used for
|
||||
// snapshot/restore
|
||||
func (s *StateStore) HostVolumes(ws memdb.WatchSet, sort SortOption) (memdb.ResultIterator, error) {
|
||||
return s.hostVolumesIter(ws, indexID, sort)
|
||||
}
|
||||
|
||||
// HostVolumesByName retrieves all host volumes of the same name
|
||||
func (s *StateStore) HostVolumesByName(ws memdb.WatchSet, ns, name string, sort SortOption) (memdb.ResultIterator, error) {
|
||||
return s.hostVolumesIter(ws, "name_prefix", sort, ns, name)
|
||||
}
|
||||
|
||||
// HostVolumesByNodeID retrieves all host volumes on the same node
|
||||
func (s *StateStore) HostVolumesByNodeID(ws memdb.WatchSet, nodeID string, sort SortOption) (memdb.ResultIterator, error) {
|
||||
return s.hostVolumesIter(ws, indexNodeID, sort, nodeID)
|
||||
}
|
||||
|
||||
// HostVolumesByNodePool retrieves all host volumes in the same node pool
|
||||
func (s *StateStore) HostVolumesByNodePool(ws memdb.WatchSet, nodePool string, sort SortOption) (memdb.ResultIterator, error) {
|
||||
return s.hostVolumesIter(ws, indexNodePool, sort, nodePool)
|
||||
}
|
||||
|
||||
func (s *StateStore) hostVolumesIter(ws memdb.WatchSet, index string, sort SortOption, args ...any) (memdb.ResultIterator, error) {
|
||||
txn := s.db.ReadTxn()
|
||||
|
||||
var iter memdb.ResultIterator
|
||||
var err error
|
||||
|
||||
switch sort {
|
||||
case SortReverse:
|
||||
iter, err = txn.GetReverse(TableHostVolumes, index, args...)
|
||||
default:
|
||||
iter, err = txn.Get(TableHostVolumes, index, args...)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ws.Add(iter.WatchCh())
|
||||
return iter, nil
|
||||
}
|
||||
165
nomad/state/state_store_host_volumes_test.go
Normal file
165
nomad/state/state_store_host_volumes_test.go
Normal file
@@ -0,0 +1,165 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
func TestStateStore_HostVolumes_CRUD(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
store := testStateStore(t)
|
||||
index, err := store.LatestIndex()
|
||||
must.NoError(t, err)
|
||||
|
||||
nodes := []*structs.Node{
|
||||
mock.Node(),
|
||||
mock.Node(),
|
||||
mock.Node(),
|
||||
}
|
||||
nodes[2].NodePool = "prod"
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
|
||||
index, nodes[0], NodeUpsertWithNodePool))
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
|
||||
index, nodes[1], NodeUpsertWithNodePool))
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
|
||||
index, nodes[2], NodeUpsertWithNodePool))
|
||||
|
||||
ns := mock.Namespace()
|
||||
must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{ns}))
|
||||
|
||||
vols := []*structs.HostVolume{
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
}
|
||||
vols[0].NodeID = nodes[0].ID
|
||||
vols[1].NodeID = nodes[1].ID
|
||||
vols[1].Name = "another-example"
|
||||
vols[2].NodeID = nodes[2].ID
|
||||
vols[2].NodePool = nodes[2].NodePool
|
||||
vols[3].Namespace = ns.Name
|
||||
vols[3].NodeID = nodes[2].ID
|
||||
vols[3].NodePool = nodes[2].NodePool
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertHostVolumes(index, vols))
|
||||
|
||||
vol, err := store.HostVolumeByID(nil, vols[0].Namespace, vols[0].ID, true)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, vol)
|
||||
must.Eq(t, vols[0].ID, vol.ID)
|
||||
must.NotNil(t, vol.Allocations)
|
||||
must.Len(t, 0, vol.Allocations)
|
||||
|
||||
vol, err = store.HostVolumeByID(nil, vols[0].Namespace, vols[0].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, vol)
|
||||
must.Nil(t, vol.Allocations)
|
||||
|
||||
consumeIter := func(iter memdb.ResultIterator) map[string]*structs.HostVolume {
|
||||
got := map[string]*structs.HostVolume{}
|
||||
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||
vol := raw.(*structs.HostVolume)
|
||||
got[vol.ID] = vol
|
||||
}
|
||||
return got
|
||||
}
|
||||
|
||||
iter, err := store.HostVolumesByName(nil, structs.DefaultNamespace, "example", SortDefault)
|
||||
must.NoError(t, err)
|
||||
got := consumeIter(iter)
|
||||
must.NotNil(t, got[vols[0].ID], must.Sprint("expected vol0"))
|
||||
must.NotNil(t, got[vols[2].ID], must.Sprint("expected vol2"))
|
||||
must.MapLen(t, 2, got, must.Sprint(`expected 2 volumes named "example" in default namespace`))
|
||||
|
||||
iter, err = store.HostVolumesByNodePool(nil, nodes[2].NodePool, SortDefault)
|
||||
must.NoError(t, err)
|
||||
got = consumeIter(iter)
|
||||
must.NotNil(t, got[vols[2].ID], must.Sprint("expected vol2"))
|
||||
must.NotNil(t, got[vols[3].ID], must.Sprint("expected vol3"))
|
||||
must.MapLen(t, 2, got, must.Sprint(`expected 2 volumes in prod node pool`))
|
||||
|
||||
iter, err = store.HostVolumesByNodeID(nil, nodes[2].ID, SortDefault)
|
||||
must.NoError(t, err)
|
||||
got = consumeIter(iter)
|
||||
must.NotNil(t, got[vols[2].ID], must.Sprint("expected vol2"))
|
||||
must.NotNil(t, got[vols[3].ID], must.Sprint("expected vol3"))
|
||||
must.MapLen(t, 2, got, must.Sprint(`expected 2 volumes on node 2`))
|
||||
|
||||
// simulate a node registering one of the volumes
|
||||
nodes[2] = nodes[2].Copy()
|
||||
nodes[2].HostVolumes = map[string]*structs.ClientHostVolumeConfig{"example": {
|
||||
Name: vols[2].Name,
|
||||
Path: vols[2].HostPath,
|
||||
}}
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, nodes[2]))
|
||||
|
||||
// update all the volumes, which should update the state of vol2 as well
|
||||
for i, vol := range vols {
|
||||
vol = vol.Copy()
|
||||
vol.RequestedCapacityMax = 300000
|
||||
vols[i] = vol
|
||||
}
|
||||
index++
|
||||
must.NoError(t, store.UpsertHostVolumes(index, vols))
|
||||
|
||||
iter, err = store.HostVolumesByName(nil, structs.DefaultNamespace, "example", SortDefault)
|
||||
must.NoError(t, err)
|
||||
got = consumeIter(iter)
|
||||
must.MapLen(t, 2, got, must.Sprint(`expected 2 volumes named "example" in default namespace`))
|
||||
|
||||
vol0 := got[vols[0].ID]
|
||||
must.NotNil(t, vol0)
|
||||
must.Eq(t, index, vol0.ModifyIndex)
|
||||
vol2 := got[vols[2].ID]
|
||||
must.NotNil(t, vol2)
|
||||
must.Eq(t, index, vol2.ModifyIndex)
|
||||
must.Eq(t, structs.HostVolumeStateReady, vol2.State, must.Sprint(
|
||||
"expected volume state to be updated because its been fingerprinted by a node"))
|
||||
|
||||
alloc := mock.AllocForNode(nodes[2])
|
||||
alloc.Job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{"example": {
|
||||
Name: "example",
|
||||
Type: structs.VolumeTypeHost,
|
||||
Source: vols[2].Name,
|
||||
}}
|
||||
index++
|
||||
must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup,
|
||||
index, []*structs.Allocation{alloc}))
|
||||
|
||||
index++
|
||||
err = store.DeleteHostVolumes(index, vol2.Namespace, []string{vols[1].ID, vols[2].ID})
|
||||
must.EqError(t, err, fmt.Sprintf(
|
||||
"could not delete volume %s in use by alloc %s", vols[2].ID, alloc.ID))
|
||||
vol, err = store.HostVolumeByID(nil, vols[1].Namespace, vols[1].ID, true)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, vol, must.Sprint("volume that didn't error should not be deleted"))
|
||||
|
||||
err = store.DeleteHostVolumes(index, vol2.Namespace, []string{vols[1].ID})
|
||||
must.NoError(t, err)
|
||||
vol, err = store.HostVolumeByID(nil, vols[1].Namespace, vols[1].ID, true)
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, vol)
|
||||
|
||||
vol, err = store.HostVolumeByID(nil, vols[2].Namespace, vols[2].ID, true)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, vol)
|
||||
must.Len(t, 1, vol.Allocations)
|
||||
|
||||
iter, err = store.HostVolumes(nil, SortReverse)
|
||||
must.NoError(t, err)
|
||||
got = consumeIter(iter)
|
||||
must.MapLen(t, 3, got, must.Sprint(`expected 3 volumes remain`))
|
||||
}
|
||||
@@ -291,3 +291,11 @@ func (r *StateRestore) JobSubmissionRestore(jobSubmission *structs.JobSubmission
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// HostVolumeRestore restores a single host volume into the host_volumes table
|
||||
func (r *StateRestore) HostVolumeRestore(vol *structs.HostVolume) error {
|
||||
if err := r.txn.Insert(TableHostVolumes, vol); err != nil {
|
||||
return fmt.Errorf("host volume insert failed: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
197
nomad/structs/host_volumes.go
Normal file
197
nomad/structs/host_volumes.go
Normal file
@@ -0,0 +1,197 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package structs
|
||||
|
||||
import (
|
||||
"maps"
|
||||
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
type HostVolume struct {
|
||||
// Namespace is the Nomad namespace for the host volume, which constrains
|
||||
// which jobs can mount it.
|
||||
Namespace string
|
||||
|
||||
// ID is a UUID-like string generated by the server.
|
||||
ID string
|
||||
|
||||
// Name is the name that group.volume will use to identify the volume
|
||||
// source. Not expected to be unique.
|
||||
Name string
|
||||
|
||||
// PluginID is the name of the host volume plugin on the client that will be
|
||||
// used for creating the volume. If omitted, the client will use its default
|
||||
// built-in plugin.
|
||||
PluginID string
|
||||
|
||||
// NodePool is the node pool of the node where the volume is placed. If the
|
||||
// user doesn't provide a node ID, a node will be selected using the
|
||||
// NodePool and Constraints. If the user provides both NodePool and NodeID,
|
||||
// NodePool will be used to validate the request. If omitted, the server
|
||||
// will populate this value in before writing the volume to Raft.
|
||||
NodePool string
|
||||
|
||||
// NodeID is the node where the volume is placed. If the user doesn't
|
||||
// provide a NodeID, one will be selected using the NodePool and
|
||||
// Constraints. If omitted, this field will then be populated by the server
|
||||
// before writing the volume to Raft.
|
||||
NodeID string
|
||||
|
||||
// Constraints are optional. If the NodeID is not provided, the NodePool and
|
||||
// Constraints are used to select a node. If the NodeID is provided,
|
||||
// Constraints are used to validate that the node meets those constraints at
|
||||
// the time of volume creation.
|
||||
Constraints []*Constraint
|
||||
|
||||
// Because storage may allow only specific intervals of size, we accept a
|
||||
// min and max and return the actual capacity when the volume is created or
|
||||
// updated on the client
|
||||
RequestedCapacityMin int64 // bytes
|
||||
RequestedCapacityMax int64 // bytes
|
||||
Capacity int64 // bytes
|
||||
|
||||
// RequestedCapabilities defines the options available to group.volume
|
||||
// blocks. The scheduler checks against the listed capability blocks and
|
||||
// selects a node for placement if *any* capability block works.
|
||||
RequestedCapabilities []*HostVolumeCapability
|
||||
|
||||
// Parameters are an opaque map of parameters for the host volume plugin.
|
||||
Parameters map[string]string
|
||||
|
||||
// HostPath is the path on disk where the volume's mount point was
|
||||
// created. We record this to make debugging easier.
|
||||
HostPath string
|
||||
|
||||
// State represents the overall state of the volume. One of pending, ready,
|
||||
// deleted.
|
||||
State HostVolumeState
|
||||
|
||||
CreateIndex uint64
|
||||
CreateTime int64 // Unix timestamp in nanoseconds since epoch
|
||||
|
||||
ModifyIndex uint64
|
||||
ModifyTime int64 // Unix timestamp in nanoseconds since epoch
|
||||
|
||||
// Allocations is the list of non-client-terminal allocations with claims on
|
||||
// this host volume. They are denormalized on read and this field will be
|
||||
// never written to Raft
|
||||
Allocations []*AllocListStub
|
||||
}
|
||||
|
||||
type HostVolumeState string
|
||||
|
||||
const (
|
||||
HostVolumeStateUnknown HostVolumeState = "" // never write this to Raft
|
||||
HostVolumeStatePending HostVolumeState = "pending"
|
||||
HostVolumeStateReady HostVolumeState = "ready"
|
||||
HostVolumeStateDeleted HostVolumeState = "deleted"
|
||||
)
|
||||
|
||||
func (hv *HostVolume) Copy() *HostVolume {
|
||||
if hv == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nhv := *hv
|
||||
nhv.Constraints = helper.CopySlice(hv.Constraints)
|
||||
nhv.RequestedCapabilities = helper.CopySlice(hv.RequestedCapabilities)
|
||||
nhv.Parameters = maps.Clone(hv.Parameters)
|
||||
return &nhv
|
||||
}
|
||||
|
||||
func (hv *HostVolume) Stub() *HostVolumeStub {
|
||||
if hv == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &HostVolumeStub{
|
||||
Namespace: hv.Namespace,
|
||||
ID: hv.ID,
|
||||
Name: hv.Name,
|
||||
PluginID: hv.PluginID,
|
||||
NodePool: hv.NodePool,
|
||||
NodeID: hv.NodeID,
|
||||
Capacity: hv.Capacity,
|
||||
State: hv.State,
|
||||
CreateIndex: hv.CreateIndex,
|
||||
CreateTime: hv.CreateTime,
|
||||
ModifyIndex: hv.ModifyIndex,
|
||||
ModifyTime: hv.ModifyTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (hv *HostVolume) Validate(existing *HostVolume) error {
|
||||
// TODO(1.10.0): validate a host volume is validate or that changes to a
|
||||
// host volume are valid
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetNamespace implements the paginator.NamespaceGetter interface
|
||||
func (hv *HostVolume) GetNamespace() string {
|
||||
return hv.Namespace
|
||||
}
|
||||
|
||||
// GetID implements the paginator.IDGetter interface
|
||||
func (hv *HostVolume) GetID() string {
|
||||
return hv.ID
|
||||
}
|
||||
|
||||
// HostVolumeCapability is the requested attachment and access mode for a volume
|
||||
type HostVolumeCapability struct {
|
||||
AttachmentMode HostVolumeAttachmentMode
|
||||
AccessMode HostVolumeAccessMode
|
||||
}
|
||||
|
||||
func (hvc *HostVolumeCapability) Copy() *HostVolumeCapability {
|
||||
if hvc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nhvc := *hvc
|
||||
return &nhvc
|
||||
}
|
||||
|
||||
// HostVolumeAttachmentMode chooses the type of storage API that will be used to
|
||||
// interact with the device.
|
||||
type HostVolumeAttachmentMode string
|
||||
|
||||
const (
|
||||
HostVolumeAttachmentModeUnknown HostVolumeAttachmentMode = ""
|
||||
HostVolumeAttachmentModeBlockDevice HostVolumeAttachmentMode = "block-device"
|
||||
HostVolumeAttachmentModeFilesystem HostVolumeAttachmentMode = "file-system"
|
||||
)
|
||||
|
||||
// HostVolumeAccessMode indicates how Nomad should make the volume available to
|
||||
// concurrent allocations.
|
||||
type HostVolumeAccessMode string
|
||||
|
||||
const (
|
||||
HostVolumeAccessModeUnknown HostVolumeAccessMode = ""
|
||||
|
||||
HostVolumeAccessModeSingleNodeReader HostVolumeAccessMode = "single-node-reader-only"
|
||||
HostVolumeAccessModeSingleNodeWriter HostVolumeAccessMode = "single-node-writer"
|
||||
|
||||
HostVolumeAccessModeMultiNodeReader HostVolumeAccessMode = "multi-node-reader-only"
|
||||
HostVolumeAccessModeMultiNodeSingleWriter HostVolumeAccessMode = "multi-node-single-writer"
|
||||
HostVolumeAccessModeMultiNodeMultiWriter HostVolumeAccessMode = "multi-node-multi-writer"
|
||||
)
|
||||
|
||||
// HostVolumeStub is used for responses for the list volumes endpoint
|
||||
type HostVolumeStub struct {
|
||||
Namespace string
|
||||
ID string
|
||||
Name string
|
||||
PluginID string
|
||||
NodePool string
|
||||
NodeID string
|
||||
Capacity int64 // bytes
|
||||
State HostVolumeState
|
||||
|
||||
CreateIndex uint64
|
||||
CreateTime int64
|
||||
|
||||
ModifyIndex uint64
|
||||
ModifyTime int64
|
||||
}
|
||||
55
nomad/structs/host_volumes_test.go
Normal file
55
nomad/structs/host_volumes_test.go
Normal file
@@ -0,0 +1,55 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package structs
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
func TestHostVolume_Copy(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
out := (*HostVolume)(nil).Copy()
|
||||
must.Nil(t, out)
|
||||
|
||||
vol := &HostVolume{
|
||||
Namespace: DefaultNamespace,
|
||||
ID: uuid.Generate(),
|
||||
Name: "example",
|
||||
PluginID: "example-plugin",
|
||||
NodePool: NodePoolDefault,
|
||||
NodeID: uuid.Generate(),
|
||||
Constraints: []*Constraint{{
|
||||
LTarget: "${meta.rack}",
|
||||
RTarget: "r1",
|
||||
Operand: "=",
|
||||
}},
|
||||
Capacity: 150000,
|
||||
RequestedCapabilities: []*HostVolumeCapability{{
|
||||
AttachmentMode: HostVolumeAttachmentModeFilesystem,
|
||||
AccessMode: HostVolumeAccessModeSingleNodeWriter,
|
||||
}},
|
||||
Parameters: map[string]string{"foo": "bar"},
|
||||
}
|
||||
|
||||
out = vol.Copy()
|
||||
must.Eq(t, vol, out)
|
||||
|
||||
out.Allocations = []*AllocListStub{{ID: uuid.Generate()}}
|
||||
out.Constraints[0].LTarget = "${meta.node_class}"
|
||||
out.RequestedCapabilities = append(out.RequestedCapabilities, &HostVolumeCapability{
|
||||
AttachmentMode: HostVolumeAttachmentModeBlockDevice,
|
||||
AccessMode: HostVolumeAccessModeMultiNodeReader,
|
||||
})
|
||||
out.Parameters["foo"] = "baz"
|
||||
|
||||
must.Nil(t, vol.Allocations)
|
||||
must.Eq(t, "${meta.rack}", vol.Constraints[0].LTarget)
|
||||
must.Len(t, 1, vol.RequestedCapabilities)
|
||||
must.Eq(t, "bar", vol.Parameters["foo"])
|
||||
}
|
||||
Reference in New Issue
Block a user