From eedbd36fef471c082a220450db57579e66c365a4 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 1 Jul 2024 11:41:10 -0400 Subject: [PATCH] qemu: pass task resources into driver for cgroup setup (#23466) As part of the work for 1.7.0 we moved portions of the task cgroup setup down into the executor. This requires that the executor constructor get the `TaskConfig.Resources` struct, and this was missing from the `qemu` driver. We fixed a panic caused by this change in #19089 before we shipped, but this fix was effectively undo after we added plumbing for custom cgroups for `raw_exec` in 1.8.0. As a result, running `qemu` tasks always fail on Linux. This was undetected in testing because our CI environment doesn't have QEMU installed. I've got all the unit tests running locally again and have added QEMU installation when we're running the drivers tests. Fixes: https://github.com/hashicorp/nomad/issues/23250 --- .changelog/23466.txt | 3 + .github/workflows/test-core.yaml | 5 + drivers/qemu/driver.go | 1 + drivers/qemu/driver_test.go | 226 +++++++++++++++++-------------- 4 files changed, 130 insertions(+), 105 deletions(-) create mode 100644 .changelog/23466.txt diff --git a/.changelog/23466.txt b/.changelog/23466.txt new file mode 100644 index 000000000..74f048c29 --- /dev/null +++ b/.changelog/23466.txt @@ -0,0 +1,3 @@ +```release-note:bug +qemu: Fixed a bug that prevented `qemu` tasks from running on Linux +``` diff --git a/.github/workflows/test-core.yaml b/.github/workflows/test-core.yaml index 2b24713de..e8d40891f 100644 --- a/.github/workflows/test-core.yaml +++ b/.github/workflows/test-core.yaml @@ -114,6 +114,11 @@ jobs: steps: - uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 - uses: hashicorp/setup-golang@v3 + + - name: Install optional dependencies + if: ${{ matrix.groups == 'drivers' }} + run: sudo apt install qemu-system + - name: Run Matrix Tests env: GOTEST_GROUP: ${{matrix.groups}} diff --git a/drivers/qemu/driver.go b/drivers/qemu/driver.go index 57c99522a..4fb6e8174 100644 --- a/drivers/qemu/driver.go +++ b/drivers/qemu/driver.go @@ -585,6 +585,7 @@ func (d *Driver) StartTask(cfg *drivers.TaskConfig) (*drivers.TaskHandle, *drive StdoutPath: cfg.StdoutPath, StderrPath: cfg.StderrPath, NetworkIsolation: cfg.NetworkIsolation, + Resources: cfg.Resources.Copy(), } ps, err := execImpl.Launch(execCmd) if err != nil { diff --git a/drivers/qemu/driver_test.go b/drivers/qemu/driver_test.go index 9454bcbf1..7148760ae 100644 --- a/drivers/qemu/driver_test.go +++ b/drivers/qemu/driver_test.go @@ -12,15 +12,18 @@ import ( "time" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/client/lib/cgroupslib" + "github.com/hashicorp/nomad/client/lib/numalib" ctestutil "github.com/hashicorp/nomad/client/testutil" "github.com/hashicorp/nomad/helper/pluginutils/hclutils" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/plugins/base" "github.com/hashicorp/nomad/plugins/drivers" dtestutil "github.com/hashicorp/nomad/plugins/drivers/testutils" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" + "github.com/shoenig/test/must" ) // TODO(preetha) - tests remaining @@ -30,32 +33,23 @@ import ( func TestQemuDriver_Start_Wait_Stop(t *testing.T) { ci.Parallel(t) ctestutil.QemuCompatible(t) + ctestutil.CgroupsCompatible(t) - require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + topology := numalib.Scan(numalib.PlatformScanners()) d := NewQemuDriver(ctx, testlog.HCLogger(t)) + d.(*Driver).nomadConfig = &base.ClientDriverConfig{Topology: topology} harness := dtestutil.NewDriverHarness(t, d) + allocID := uuid.Generate() + harness.MakeTaskCgroup(allocID, "linux") task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "linux", - Resources: &drivers.Resources{ - NomadResources: &structs.AllocatedTaskResources{ - Memory: structs.AllocatedMemoryResources{ - MemoryMB: 512, - }, - Cpu: structs.AllocatedCpuResources{ - CpuShares: 100, - }, - Networks: []*structs.NetworkResource{ - { - ReservedPorts: []structs.Port{{Label: "main", Value: 22000}, {Label: "web", Value: 80}}, - }, - }, - }, - }, + AllocID: allocID, + ID: uuid.Generate(), + Name: "linux", + Resources: testResources(allocID, "linux"), } tc := &TaskConfig{ @@ -66,9 +60,9 @@ func TestQemuDriver_Start_Wait_Stop(t *testing.T) { "main": 22, "web": 8080, }, - Args: []string{"-nodefconfig", "-nodefaults"}, + Args: []string{"-nodefaults"}, } - require.NoError(task.EncodeConcreteDriverConfig(&tc)) + must.NoError(t, task.EncodeConcreteDriverConfig(&tc)) cleanup := harness.MkAllocDir(task, true) defer cleanup() @@ -77,15 +71,15 @@ func TestQemuDriver_Start_Wait_Stop(t *testing.T) { copyFile("./test-resources/linux-0.2.img", filepath.Join(taskDir, "linux-0.2.img"), t) handle, _, err := harness.StartTask(task) - require.NoError(err) + must.NoError(t, err) - require.NotNil(handle) + must.NotNil(t, handle) // Ensure that sending a Signal returns an error err = d.SignalTask(task.ID, "SIGINT") - require.NotNil(err) + must.EqError(t, err, "QEMU driver can't signal commands") - require.NoError(harness.DestroyTask(task.ID, true)) + must.NoError(t, harness.DestroyTask(task.ID, true)) } @@ -117,33 +111,23 @@ func copyFile(src, dst string, t *testing.T) { func TestQemuDriver_User(t *testing.T) { ci.Parallel(t) ctestutil.QemuCompatible(t) + ctestutil.CgroupsCompatible(t) - require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + topology := numalib.Scan(numalib.PlatformScanners()) d := NewQemuDriver(ctx, testlog.HCLogger(t)) + d.(*Driver).nomadConfig = &base.ClientDriverConfig{Topology: topology} harness := dtestutil.NewDriverHarness(t, d) + allocID := uuid.Generate() + harness.MakeTaskCgroup(allocID, "linux") task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "linux", - User: "alice", - Resources: &drivers.Resources{ - NomadResources: &structs.AllocatedTaskResources{ - Memory: structs.AllocatedMemoryResources{ - MemoryMB: 512, - }, - Cpu: structs.AllocatedCpuResources{ - CpuShares: 100, - }, - Networks: []*structs.NetworkResource{ - { - ReservedPorts: []structs.Port{{Label: "main", Value: 22000}, {Label: "web", Value: 80}}, - }, - }, - }, - }, + ID: uuid.Generate(), + Name: "linux", + User: "alice", + Resources: testResources(allocID, "linux"), } tc := &TaskConfig{ @@ -154,9 +138,9 @@ func TestQemuDriver_User(t *testing.T) { "main": 22, "web": 8080, }, - Args: []string{"-nodefconfig", "-nodefaults"}, + Args: []string{"-nodefaults"}, } - require.NoError(task.EncodeConcreteDriverConfig(&tc)) + must.NoError(t, task.EncodeConcreteDriverConfig(&tc)) cleanup := harness.MkAllocDir(task, true) defer cleanup() @@ -165,43 +149,30 @@ func TestQemuDriver_User(t *testing.T) { copyFile("./test-resources/linux-0.2.img", filepath.Join(taskDir, "linux-0.2.img"), t) _, _, err := harness.StartTask(task) - require.Error(err) - require.Contains(err.Error(), "unknown user alice", err.Error()) - + must.ErrorContains(t, err, "unknown user alice") } -// Verifies getting resource usage stats -// -// TODO(preetha) this test needs random sleeps to pass +// TestQemuDriver_Stats verifies we can get resources usage stats func TestQemuDriver_Stats(t *testing.T) { ci.Parallel(t) ctestutil.QemuCompatible(t) + ctestutil.CgroupsCompatible(t) - require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + topology := numalib.Scan(numalib.PlatformScanners()) d := NewQemuDriver(ctx, testlog.HCLogger(t)) + d.(*Driver).nomadConfig = &base.ClientDriverConfig{Topology: topology} harness := dtestutil.NewDriverHarness(t, d) + allocID := uuid.Generate() + harness.MakeTaskCgroup(allocID, "linux") task := &drivers.TaskConfig{ - ID: uuid.Generate(), - Name: "linux", - Resources: &drivers.Resources{ - NomadResources: &structs.AllocatedTaskResources{ - Memory: structs.AllocatedMemoryResources{ - MemoryMB: 512, - }, - Cpu: structs.AllocatedCpuResources{ - CpuShares: 100, - }, - Networks: []*structs.NetworkResource{ - { - ReservedPorts: []structs.Port{{Label: "main", Value: 22000}, {Label: "web", Value: 80}}, - }, - }, - }, - }, + AllocID: allocID, + ID: uuid.Generate(), + Name: "linux", + Resources: testResources(allocID, "linux"), } tc := &TaskConfig{ @@ -212,9 +183,9 @@ func TestQemuDriver_Stats(t *testing.T) { "main": 22, "web": 8080, }, - Args: []string{"-nodefconfig", "-nodefaults"}, + Args: []string{"-nodefaults"}, } - require.NoError(task.EncodeConcreteDriverConfig(&tc)) + must.NoError(t, task.EncodeConcreteDriverConfig(&tc)) cleanup := harness.MkAllocDir(task, true) defer cleanup() @@ -223,36 +194,47 @@ func TestQemuDriver_Stats(t *testing.T) { copyFile("./test-resources/linux-0.2.img", filepath.Join(taskDir, "linux-0.2.img"), t) handle, _, err := harness.StartTask(task) - require.NoError(err) - - require.NotNil(handle) + must.NoError(t, err) + must.NotNil(t, handle) // Wait for task to start - _, err = harness.WaitTask(context.Background(), handle.Config.ID) - require.NoError(err) + exitCh, err := harness.WaitTask(context.Background(), handle.Config.ID) + must.NoError(t, err) // Wait until task started - require.NoError(harness.WaitUntilStarted(task.ID, 1*time.Second)) - time.Sleep(30 * time.Second) - statsCtx, cancel := context.WithCancel(context.Background()) + must.NoError(t, harness.WaitUntilStarted(task.ID, 1*time.Second)) + + t.Cleanup(func() { harness.DestroyTask(task.ID, true) }) + + timeout := 30 * time.Second + statsCtx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() - statsCh, err := harness.TaskStats(statsCtx, task.ID, time.Second*10) - require.NoError(err) + statsCh, err := harness.TaskStats(statsCtx, task.ID, time.Second*1) + must.NoError(t, err) - select { - case stats := <-statsCh: - t.Logf("CPU:%+v Memory:%+v\n", stats.ResourceUsage.CpuStats, stats.ResourceUsage.MemoryStats) - require.NotZero(stats.ResourceUsage.MemoryStats.RSS) - require.NoError(harness.DestroyTask(task.ID, true)) - case <-time.After(time.Second * 1): - require.Fail("timeout receiving from stats") + for { + select { + case exitCode := <-exitCh: + t.Fatalf("should not have exited: %+v", exitCode) + case stats := <-statsCh: + if stats == nil { + continue // receiving empty stats races with ctx.Done + } + t.Logf("CPU:%+v Memory:%+v\n", + stats.ResourceUsage.CpuStats, stats.ResourceUsage.MemoryStats) + if stats.ResourceUsage.MemoryStats.RSS != 0 { + must.NonZero(t, stats.ResourceUsage.MemoryStats.RSS) + must.NoError(t, harness.DestroyTask(task.ID, true)) + return + } + case <-ctx.Done(): + t.Fatal("timed out before receiving non-zero stats") + } } - } func TestQemuDriver_Fingerprint(t *testing.T) { ci.Parallel(t) - require := require.New(t) ctestutil.QemuCompatible(t) @@ -263,13 +245,14 @@ func TestQemuDriver_Fingerprint(t *testing.T) { harness := dtestutil.NewDriverHarness(t, d) fingerCh, err := harness.Fingerprint(context.Background()) - require.NoError(err) + must.NoError(t, err) select { case finger := <-fingerCh: - require.Equal(drivers.HealthStateHealthy, finger.Health) - require.True(finger.Attributes["driver.qemu"].GetBool()) + must.Eq(t, drivers.HealthStateHealthy, finger.Health) + ok, _ := finger.Attributes["driver.qemu"].GetBool() + must.True(t, ok) case <-time.After(time.Duration(testutil.TestMultiplier()*5) * time.Second): - require.Fail("timeout receiving fingerprint") + t.Fatal("timeout receiving fingerprint") } } @@ -303,8 +286,7 @@ config { var tc *TaskConfig hclutils.NewConfigParser(taskConfigSpec).ParseHCL(t, cfgStr, &tc) - - require.EqualValues(t, expected, tc) + must.Eq(t, expected, tc) } func TestIsAllowedDriveInterface(t *testing.T) { @@ -312,11 +294,13 @@ func TestIsAllowedDriveInterface(t *testing.T) { invalidInterfaces := []string{"foo", "virtio-foo"} for _, i := range validInterfaces { - require.Truef(t, isAllowedDriveInterface(i), "drive_interface should be allowed: %v", i) + must.True(t, isAllowedDriveInterface(i), + must.Sprintf("drive_interface should be allowed: %v", i)) } for _, i := range invalidInterfaces { - require.Falsef(t, isAllowedDriveInterface(i), "drive_interface should be not allowed: %v", i) + must.False(t, isAllowedDriveInterface(i), + must.Sprintf("drive_interface should not be allowed: %v", i)) } } @@ -343,11 +327,13 @@ func TestIsAllowedImagePath(t *testing.T) { } for _, p := range validPaths { - require.Truef(t, isAllowedImagePath(allowedPaths, allocDir, p), "path should be allowed: %v", p) + must.True(t, isAllowedImagePath(allowedPaths, allocDir, p), + must.Sprintf("path should be allowed: %v", p)) } for _, p := range invalidPaths { - require.Falsef(t, isAllowedImagePath(allowedPaths, allocDir, p), "path should be not allowed: %v", p) + must.False(t, isAllowedImagePath(allowedPaths, allocDir, p), + must.Sprintf("path should be not allowed: %v", p)) } } @@ -370,13 +356,43 @@ func TestArgsAllowList(t *testing.T) { } for _, args := range validArgs { - require.NoError(t, validateArgs(pluginConfigAllowList, args)) - require.NoError(t, validateArgs([]string{}, args)) + must.NoError(t, validateArgs(pluginConfigAllowList, args)) + must.NoError(t, validateArgs([]string{}, args)) } for _, args := range invalidArgs { - require.Error(t, validateArgs(pluginConfigAllowList, args)) - require.NoError(t, validateArgs([]string{}, args)) + must.Error(t, validateArgs(pluginConfigAllowList, args)) + must.NoError(t, validateArgs([]string{}, args)) } } + +func testResources(allocID, task string) *drivers.Resources { + if allocID == "" || task == "" { + panic("must be set") + } + + r := &drivers.Resources{ + NomadResources: &structs.AllocatedTaskResources{ + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 128, + }, + Cpu: structs.AllocatedCpuResources{ + CpuShares: 100, + }, + Networks: []*structs.NetworkResource{ + { + ReservedPorts: []structs.Port{ + {Label: "main", Value: 22000}, {Label: "web", Value: 8888}}, + }, + }, + }, + LinuxResources: &drivers.LinuxResources{ + MemoryLimitBytes: 134217728, + CPUShares: 100, + CpusetCgroupPath: cgroupslib.LinuxResourcesPath(allocID, task, false), + }, + } + + return r +}