feat: support adjusting child proc oom scores (#12655)

This commit is contained in:
Jon Ayers 2024-04-03 09:42:03 -05:00 committed by GitHub
parent ac8d1c6696
commit 426e9f2b96
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 320 additions and 39 deletions

View File

@ -62,7 +62,10 @@ const (
// EnvProcPrioMgmt determines whether we attempt to manage
// process CPU and OOM Killer priority.
const EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT"
const (
EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT"
EnvProcOOMScore = "CODER_PROC_OOM_SCORE"
)
type Options struct {
Filesystem afero.Fs
@ -1575,10 +1578,31 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
a.processManagementTick = ticker.C
}
oomScore := unsetOOMScore
if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok {
score, err := strconv.Atoi(strings.TrimSpace(scoreStr))
if err == nil && score >= -1000 && score <= 1000 {
oomScore = score
} else {
a.logger.Error(ctx, "invalid oom score",
slog.F("min_value", -1000),
slog.F("max_value", 1000),
slog.F("value", scoreStr),
)
}
}
debouncer := &logDebouncer{
logger: a.logger,
messages: map[string]time.Time{},
interval: time.Minute,
}
for {
procs, err := a.manageProcessPriority(ctx)
procs, err := a.manageProcessPriority(ctx, debouncer, oomScore)
// Avoid spamming the logs too often.
if err != nil {
a.logger.Error(ctx, "manage process priority",
debouncer.Error(ctx, "manage process priority",
slog.Error(err),
)
}
@ -1594,27 +1618,34 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() {
}
}
func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process, error) {
// unsetOOMScore is set to an invalid OOM score to imply an unset value.
const unsetOOMScore = 1001
func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) {
const (
niceness = 10
)
// We fetch the agent score each time because it's possible someone updates the
// value after it is started.
agentScore, err := a.getAgentOOMScore()
if err != nil {
agentScore = unsetOOMScore
}
if oomScore == unsetOOMScore && agentScore != unsetOOMScore {
// If the child score has not been explicitly specified we should
// set it to a score relative to the agent score.
oomScore = childOOMScore(agentScore)
}
procs, err := agentproc.List(a.filesystem, a.syscaller)
if err != nil {
return nil, xerrors.Errorf("list: %w", err)
}
var (
modProcs = []*agentproc.Process{}
logger slog.Logger
)
modProcs := []*agentproc.Process{}
for _, proc := range procs {
logger = a.logger.With(
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
)
containsFn := func(e string) bool {
contains := strings.Contains(proc.Cmd(), e)
return contains
@ -1622,14 +1653,16 @@ func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process
// If the process is prioritized we should adjust
// it's oom_score_adj and avoid lowering its niceness.
if slices.ContainsFunc[[]string, string](prioritizedProcs, containsFn) {
if slices.ContainsFunc(prioritizedProcs, containsFn) {
continue
}
score, err := proc.Niceness(a.syscaller)
if err != nil {
logger.Warn(ctx, "unable to get proc niceness",
slog.Error(err),
score, niceErr := proc.Niceness(a.syscaller)
if niceErr != nil && !xerrors.Is(niceErr, os.ErrPermission) {
debouncer.Warn(ctx, "unable to get proc niceness",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.Error(niceErr),
)
continue
}
@ -1643,15 +1676,31 @@ func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process
continue
}
err = proc.SetNiceness(a.syscaller, niceness)
if err != nil {
logger.Warn(ctx, "unable to set proc niceness",
slog.F("niceness", niceness),
slog.Error(err),
)
continue
if niceErr == nil {
err := proc.SetNiceness(a.syscaller, niceness)
if err != nil && !xerrors.Is(err, os.ErrPermission) {
debouncer.Warn(ctx, "unable to set proc niceness",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.F("niceness", niceness),
slog.Error(err),
)
}
}
// If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it.
if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) {
oomScoreStr := strconv.Itoa(oomScore)
err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644)
if err != nil && !xerrors.Is(err, os.ErrPermission) {
debouncer.Warn(ctx, "unable to set oom_score_adj",
slog.F("cmd", proc.Cmd()),
slog.F("pid", proc.PID),
slog.F("score", oomScoreStr),
slog.Error(err),
)
}
}
modProcs = append(modProcs, proc)
}
return modProcs, nil
@ -2005,3 +2054,77 @@ func PrometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger sl
}
})
}
// childOOMScore returns the oom_score_adj for a child process. It is based
// on the oom_score_adj of the agent process.
func childOOMScore(agentScore int) int {
// If the agent has a negative oom_score_adj, we set the child to 0
// so it's treated like every other process.
if agentScore < 0 {
return 0
}
// If the agent is already almost at the maximum then set it to the max.
if agentScore >= 998 {
return 1000
}
// If the agent oom_score_adj is >=0, we set the child to slightly
// less than the maximum. If users want a different score they set it
// directly.
return 998
}
func (a *agent) getAgentOOMScore() (int, error) {
scoreStr, err := afero.ReadFile(a.filesystem, "/proc/self/oom_score_adj")
if err != nil {
return 0, xerrors.Errorf("read file: %w", err)
}
score, err := strconv.Atoi(strings.TrimSpace(string(scoreStr)))
if err != nil {
return 0, xerrors.Errorf("parse int: %w", err)
}
return score, nil
}
// isCustomOOMScore checks to see if the oom_score_adj is not a value that would
// originate from an agent-spawned process.
func isCustomOOMScore(agentScore int, process *agentproc.Process) bool {
score := process.OOMScoreAdj
return agentScore != score && score != 1000 && score != 0 && score != 998
}
// logDebouncer skips writing a log for a particular message if
// it's been emitted within the given interval duration.
// It's a shoddy implementation used in one spot that should be replaced at
// some point.
type logDebouncer struct {
logger slog.Logger
messages map[string]time.Time
interval time.Duration
}
func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) {
l.log(ctx, slog.LevelWarn, msg, fields...)
}
func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) {
l.log(ctx, slog.LevelError, msg, fields...)
}
func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) {
// This (bad) implementation assumes you wouldn't reuse the same msg
// for different levels.
if last, ok := l.messages[msg]; ok && time.Since(last) < l.interval {
return
}
switch level {
case slog.LevelWarn:
l.logger.Warn(ctx, msg, fields...)
case slog.LevelError:
l.logger.Error(ctx, msg, fields...)
}
l.messages[msg] = time.Now()
}

View File

@ -2529,11 +2529,11 @@ func TestAgent_ManageProcessPriority(t *testing.T) {
logger = slog.Make(sloghuman.Sink(io.Discard))
)
requireFileWrite(t, fs, "/proc/self/oom_score_adj", "-500")
// Create some processes.
for i := 0; i < 4; i++ {
// Create a prioritized process. This process should
// have it's oom_score_adj set to -500 and its nice
// score should be untouched.
// Create a prioritized process.
var proc agentproc.Process
if i == 0 {
proc = agentproctest.GenerateProcess(t, fs,
@ -2551,8 +2551,8 @@ func TestAgent_ManageProcessPriority(t *testing.T) {
},
)
syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil)
syscaller.EXPECT().GetPriority(proc.PID).Return(20, nil)
syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil)
}
syscaller.EXPECT().
Kill(proc.PID, syscall.Signal(0)).
@ -2571,6 +2571,9 @@ func TestAgent_ManageProcessPriority(t *testing.T) {
})
actualProcs := <-modProcs
require.Len(t, actualProcs, len(expectedProcs)-1)
for _, proc := range actualProcs {
requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "0")
}
})
t.Run("IgnoreCustomNice", func(t *testing.T) {
@ -2589,8 +2592,11 @@ func TestAgent_ManageProcessPriority(t *testing.T) {
logger = slog.Make(sloghuman.Sink(io.Discard))
)
err := afero.WriteFile(fs, "/proc/self/oom_score_adj", []byte("0"), 0o644)
require.NoError(t, err)
// Create some processes.
for i := 0; i < 2; i++ {
for i := 0; i < 3; i++ {
proc := agentproctest.GenerateProcess(t, fs)
syscaller.EXPECT().
Kill(proc.PID, syscall.Signal(0)).
@ -2618,7 +2624,59 @@ func TestAgent_ManageProcessPriority(t *testing.T) {
})
actualProcs := <-modProcs
// We should ignore the process with a custom nice score.
require.Len(t, actualProcs, 1)
require.Len(t, actualProcs, 2)
for _, proc := range actualProcs {
_, ok := expectedProcs[proc.PID]
require.True(t, ok)
requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "998")
}
})
t.Run("CustomOOMScore", func(t *testing.T) {
t.Parallel()
if runtime.GOOS != "linux" {
t.Skip("Skipping non-linux environment")
}
var (
fs = afero.NewMemMapFs()
ticker = make(chan time.Time)
syscaller = agentproctest.NewMockSyscaller(gomock.NewController(t))
modProcs = make(chan []*agentproc.Process)
logger = slog.Make(sloghuman.Sink(io.Discard))
)
err := afero.WriteFile(fs, "/proc/self/oom_score_adj", []byte("0"), 0o644)
require.NoError(t, err)
// Create some processes.
for i := 0; i < 3; i++ {
proc := agentproctest.GenerateProcess(t, fs)
syscaller.EXPECT().
Kill(proc.PID, syscall.Signal(0)).
Return(nil)
syscaller.EXPECT().GetPriority(proc.PID).Return(20, nil)
syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil)
}
_, _, _, _, _ = setupAgent(t, agentsdk.Manifest{}, 0, func(c *agenttest.Client, o *agent.Options) {
o.Syscaller = syscaller
o.ModifiedProcesses = modProcs
o.EnvironmentVariables = map[string]string{
agent.EnvProcPrioMgmt: "1",
agent.EnvProcOOMScore: "-567",
}
o.Filesystem = fs
o.Logger = logger
o.ProcessManagementTick = ticker
})
actualProcs := <-modProcs
// We should ignore the process with a custom nice score.
require.Len(t, actualProcs, 3)
for _, proc := range actualProcs {
requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "-567")
}
})
t.Run("DisabledByDefault", func(t *testing.T) {
@ -2739,3 +2797,17 @@ func requireEcho(t *testing.T, conn net.Conn) {
require.NoError(t, err)
require.Equal(t, "test", string(b))
}
func requireFileWrite(t testing.TB, fs afero.Fs, fp, data string) {
t.Helper()
err := afero.WriteFile(fs, fp, []byte(data), 0o600)
require.NoError(t, err)
}
func requireFileEquals(t testing.TB, fs afero.Fs, fp, expect string) {
t.Helper()
actual, err := afero.ReadFile(fs, fp)
require.NoError(t, err)
require.Equal(t, expect, string(actual))
}

View File

@ -2,6 +2,7 @@ package agentproctest
import (
"fmt"
"strconv"
"testing"
"github.com/spf13/afero"
@ -29,8 +30,9 @@ func GenerateProcess(t *testing.T, fs afero.Fs, muts ...func(*agentproc.Process)
cmdline := fmt.Sprintf("%s\x00%s\x00%s", arg1, arg2, arg3)
process := agentproc.Process{
CmdLine: cmdline,
PID: int32(pid),
CmdLine: cmdline,
PID: int32(pid),
OOMScoreAdj: 0,
}
for _, mut := range muts {
@ -45,5 +47,9 @@ func GenerateProcess(t *testing.T, fs afero.Fs, muts ...func(*agentproc.Process)
err = afero.WriteFile(fs, fmt.Sprintf("%s/cmdline", process.Dir), []byte(process.CmdLine), 0o444)
require.NoError(t, err)
score := strconv.Itoa(process.OOMScoreAdj)
err = afero.WriteFile(fs, fmt.Sprintf("%s/oom_score_adj", process.Dir), []byte(score), 0o444)
require.NoError(t, err)
return process
}

View File

@ -5,6 +5,7 @@ package agentproc
import (
"errors"
"os"
"path/filepath"
"strconv"
"strings"
@ -50,10 +51,26 @@ func List(fs afero.Fs, syscaller Syscaller) ([]*Process, error) {
}
return nil, xerrors.Errorf("read cmdline: %w", err)
}
oomScore, err := afero.ReadFile(fs, filepath.Join(defaultProcDir, entry, "oom_score_adj"))
if err != nil {
if xerrors.Is(err, os.ErrPermission) {
continue
}
return nil, xerrors.Errorf("read oom_score_adj: %w", err)
}
oom, err := strconv.Atoi(strings.TrimSpace(string(oomScore)))
if err != nil {
return nil, xerrors.Errorf("convert oom score: %w", err)
}
processes = append(processes, &Process{
PID: int32(pid),
CmdLine: string(cmdline),
Dir: filepath.Join(defaultProcDir, entry),
PID: int32(pid),
CmdLine: string(cmdline),
Dir: filepath.Join(defaultProcDir, entry),
OOMScoreAdj: oom,
})
}

View File

@ -14,7 +14,8 @@ type Syscaller interface {
const defaultProcDir = "/proc"
type Process struct {
Dir string
CmdLine string
PID int32
Dir string
CmdLine string
PID int32
OOMScoreAdj int
}

View File

@ -1,7 +1,10 @@
package agent
import (
"bytes"
"context"
"encoding/json"
"io"
"net/netip"
"sync"
"testing"
@ -14,6 +17,7 @@ import (
"tailscale.com/types/netlogtype"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogjson"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/agent/proto"
"github.com/coder/coder/v2/testutil"
@ -210,3 +214,58 @@ func newFakeStatsDest() *fakeStatsDest {
resps: make(chan *proto.UpdateStatsResponse),
}
}
func Test_logDebouncer(t *testing.T) {
t.Parallel()
var (
buf bytes.Buffer
logger = slog.Make(slogjson.Sink(&buf))
ctx = context.Background()
)
debouncer := &logDebouncer{
logger: logger,
messages: map[string]time.Time{},
interval: time.Minute,
}
fields := map[string]interface{}{
"field_1": float64(1),
"field_2": "2",
}
debouncer.Error(ctx, "my message", "field_1", 1, "field_2", "2")
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")
// Shouldn't log this.
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")
require.Len(t, debouncer.messages, 2)
type entry struct {
Msg string `json:"msg"`
Level string `json:"level"`
Fields map[string]interface{} `json:"fields"`
}
assertLog := func(msg string, level string, fields map[string]interface{}) {
line, err := buf.ReadString('\n')
require.NoError(t, err)
var e entry
err = json.Unmarshal([]byte(line), &e)
require.NoError(t, err)
require.Equal(t, msg, e.Msg)
require.Equal(t, level, e.Level)
require.Equal(t, fields, e.Fields)
}
assertLog("my message", "ERROR", fields)
assertLog("another message", "WARN", fields)
debouncer.messages["another message"] = time.Now().Add(-2 * time.Minute)
debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2")
assertLog("another message", "WARN", fields)
// Assert nothing else was written.
_, err := buf.ReadString('\n')
require.ErrorIs(t, err, io.EOF)
}

View File

@ -283,6 +283,9 @@ func (r *RootCmd) workspaceAgent() *serpent.Command {
if v, ok := os.LookupEnv(agent.EnvProcPrioMgmt); ok {
environmentVariables[agent.EnvProcPrioMgmt] = v
}
if v, ok := os.LookupEnv(agent.EnvProcOOMScore); ok {
environmentVariables[agent.EnvProcOOMScore] = v
}
agnt := agent.New(agent.Options{
Client: client,