diff --git a/agent/agent.go b/agent/agent.go index 0cb2aa2aca..abbe9c8ea4 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -62,7 +62,10 @@ const ( // EnvProcPrioMgmt determines whether we attempt to manage // process CPU and OOM Killer priority. -const EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT" +const ( + EnvProcPrioMgmt = "CODER_PROC_PRIO_MGMT" + EnvProcOOMScore = "CODER_PROC_OOM_SCORE" +) type Options struct { Filesystem afero.Fs @@ -1575,10 +1578,31 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() { a.processManagementTick = ticker.C } + oomScore := unsetOOMScore + if scoreStr, ok := a.environmentVariables[EnvProcOOMScore]; ok { + score, err := strconv.Atoi(strings.TrimSpace(scoreStr)) + if err == nil && score >= -1000 && score <= 1000 { + oomScore = score + } else { + a.logger.Error(ctx, "invalid oom score", + slog.F("min_value", -1000), + slog.F("max_value", 1000), + slog.F("value", scoreStr), + ) + } + } + + debouncer := &logDebouncer{ + logger: a.logger, + messages: map[string]time.Time{}, + interval: time.Minute, + } + for { - procs, err := a.manageProcessPriority(ctx) + procs, err := a.manageProcessPriority(ctx, debouncer, oomScore) + // Avoid spamming the logs too often. if err != nil { - a.logger.Error(ctx, "manage process priority", + debouncer.Error(ctx, "manage process priority", slog.Error(err), ) } @@ -1594,27 +1618,34 @@ func (a *agent) manageProcessPriorityUntilGracefulShutdown() { } } -func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process, error) { +// unsetOOMScore is set to an invalid OOM score to imply an unset value. +const unsetOOMScore = 1001 + +func (a *agent) manageProcessPriority(ctx context.Context, debouncer *logDebouncer, oomScore int) ([]*agentproc.Process, error) { const ( niceness = 10 ) + // We fetch the agent score each time because it's possible someone updates the + // value after it is started. + agentScore, err := a.getAgentOOMScore() + if err != nil { + agentScore = unsetOOMScore + } + if oomScore == unsetOOMScore && agentScore != unsetOOMScore { + // If the child score has not been explicitly specified we should + // set it to a score relative to the agent score. + oomScore = childOOMScore(agentScore) + } + procs, err := agentproc.List(a.filesystem, a.syscaller) if err != nil { return nil, xerrors.Errorf("list: %w", err) } - var ( - modProcs = []*agentproc.Process{} - logger slog.Logger - ) + modProcs := []*agentproc.Process{} for _, proc := range procs { - logger = a.logger.With( - slog.F("cmd", proc.Cmd()), - slog.F("pid", proc.PID), - ) - containsFn := func(e string) bool { contains := strings.Contains(proc.Cmd(), e) return contains @@ -1622,14 +1653,16 @@ func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process // If the process is prioritized we should adjust // it's oom_score_adj and avoid lowering its niceness. - if slices.ContainsFunc[[]string, string](prioritizedProcs, containsFn) { + if slices.ContainsFunc(prioritizedProcs, containsFn) { continue } - score, err := proc.Niceness(a.syscaller) - if err != nil { - logger.Warn(ctx, "unable to get proc niceness", - slog.Error(err), + score, niceErr := proc.Niceness(a.syscaller) + if niceErr != nil && !xerrors.Is(niceErr, os.ErrPermission) { + debouncer.Warn(ctx, "unable to get proc niceness", + slog.F("cmd", proc.Cmd()), + slog.F("pid", proc.PID), + slog.Error(niceErr), ) continue } @@ -1643,15 +1676,31 @@ func (a *agent) manageProcessPriority(ctx context.Context) ([]*agentproc.Process continue } - err = proc.SetNiceness(a.syscaller, niceness) - if err != nil { - logger.Warn(ctx, "unable to set proc niceness", - slog.F("niceness", niceness), - slog.Error(err), - ) - continue + if niceErr == nil { + err := proc.SetNiceness(a.syscaller, niceness) + if err != nil && !xerrors.Is(err, os.ErrPermission) { + debouncer.Warn(ctx, "unable to set proc niceness", + slog.F("cmd", proc.Cmd()), + slog.F("pid", proc.PID), + slog.F("niceness", niceness), + slog.Error(err), + ) + } } + // If the oom score is valid and it's not already set and isn't a custom value set by another process then it's ok to update it. + if oomScore != unsetOOMScore && oomScore != proc.OOMScoreAdj && !isCustomOOMScore(agentScore, proc) { + oomScoreStr := strconv.Itoa(oomScore) + err := afero.WriteFile(a.filesystem, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), []byte(oomScoreStr), 0o644) + if err != nil && !xerrors.Is(err, os.ErrPermission) { + debouncer.Warn(ctx, "unable to set oom_score_adj", + slog.F("cmd", proc.Cmd()), + slog.F("pid", proc.PID), + slog.F("score", oomScoreStr), + slog.Error(err), + ) + } + } modProcs = append(modProcs, proc) } return modProcs, nil @@ -2005,3 +2054,77 @@ func PrometheusMetricsHandler(prometheusRegistry *prometheus.Registry, logger sl } }) } + +// childOOMScore returns the oom_score_adj for a child process. It is based +// on the oom_score_adj of the agent process. +func childOOMScore(agentScore int) int { + // If the agent has a negative oom_score_adj, we set the child to 0 + // so it's treated like every other process. + if agentScore < 0 { + return 0 + } + + // If the agent is already almost at the maximum then set it to the max. + if agentScore >= 998 { + return 1000 + } + + // If the agent oom_score_adj is >=0, we set the child to slightly + // less than the maximum. If users want a different score they set it + // directly. + return 998 +} + +func (a *agent) getAgentOOMScore() (int, error) { + scoreStr, err := afero.ReadFile(a.filesystem, "/proc/self/oom_score_adj") + if err != nil { + return 0, xerrors.Errorf("read file: %w", err) + } + + score, err := strconv.Atoi(strings.TrimSpace(string(scoreStr))) + if err != nil { + return 0, xerrors.Errorf("parse int: %w", err) + } + + return score, nil +} + +// isCustomOOMScore checks to see if the oom_score_adj is not a value that would +// originate from an agent-spawned process. +func isCustomOOMScore(agentScore int, process *agentproc.Process) bool { + score := process.OOMScoreAdj + return agentScore != score && score != 1000 && score != 0 && score != 998 +} + +// logDebouncer skips writing a log for a particular message if +// it's been emitted within the given interval duration. +// It's a shoddy implementation used in one spot that should be replaced at +// some point. +type logDebouncer struct { + logger slog.Logger + messages map[string]time.Time + interval time.Duration +} + +func (l *logDebouncer) Warn(ctx context.Context, msg string, fields ...any) { + l.log(ctx, slog.LevelWarn, msg, fields...) +} + +func (l *logDebouncer) Error(ctx context.Context, msg string, fields ...any) { + l.log(ctx, slog.LevelError, msg, fields...) +} + +func (l *logDebouncer) log(ctx context.Context, level slog.Level, msg string, fields ...any) { + // This (bad) implementation assumes you wouldn't reuse the same msg + // for different levels. + if last, ok := l.messages[msg]; ok && time.Since(last) < l.interval { + return + } + switch level { + case slog.LevelWarn: + l.logger.Warn(ctx, msg, fields...) + case slog.LevelError: + l.logger.Error(ctx, msg, fields...) + } + l.messages[msg] = time.Now() +} diff --git a/agent/agent_test.go b/agent/agent_test.go index 2813d45125..45ebf7b709 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -2529,11 +2529,11 @@ func TestAgent_ManageProcessPriority(t *testing.T) { logger = slog.Make(sloghuman.Sink(io.Discard)) ) + requireFileWrite(t, fs, "/proc/self/oom_score_adj", "-500") + // Create some processes. for i := 0; i < 4; i++ { - // Create a prioritized process. This process should - // have it's oom_score_adj set to -500 and its nice - // score should be untouched. + // Create a prioritized process. var proc agentproc.Process if i == 0 { proc = agentproctest.GenerateProcess(t, fs, @@ -2551,8 +2551,8 @@ func TestAgent_ManageProcessPriority(t *testing.T) { }, ) - syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil) syscaller.EXPECT().GetPriority(proc.PID).Return(20, nil) + syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil) } syscaller.EXPECT(). Kill(proc.PID, syscall.Signal(0)). @@ -2571,6 +2571,9 @@ func TestAgent_ManageProcessPriority(t *testing.T) { }) actualProcs := <-modProcs require.Len(t, actualProcs, len(expectedProcs)-1) + for _, proc := range actualProcs { + requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "0") + } }) t.Run("IgnoreCustomNice", func(t *testing.T) { @@ -2589,8 +2592,11 @@ func TestAgent_ManageProcessPriority(t *testing.T) { logger = slog.Make(sloghuman.Sink(io.Discard)) ) + err := afero.WriteFile(fs, "/proc/self/oom_score_adj", []byte("0"), 0o644) + require.NoError(t, err) + // Create some processes. - for i := 0; i < 2; i++ { + for i := 0; i < 3; i++ { proc := agentproctest.GenerateProcess(t, fs) syscaller.EXPECT(). Kill(proc.PID, syscall.Signal(0)). @@ -2618,7 +2624,59 @@ func TestAgent_ManageProcessPriority(t *testing.T) { }) actualProcs := <-modProcs // We should ignore the process with a custom nice score. - require.Len(t, actualProcs, 1) + require.Len(t, actualProcs, 2) + for _, proc := range actualProcs { + _, ok := expectedProcs[proc.PID] + require.True(t, ok) + requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "998") + } + }) + + t.Run("CustomOOMScore", func(t *testing.T) { + t.Parallel() + + if runtime.GOOS != "linux" { + t.Skip("Skipping non-linux environment") + } + + var ( + fs = afero.NewMemMapFs() + ticker = make(chan time.Time) + syscaller = agentproctest.NewMockSyscaller(gomock.NewController(t)) + modProcs = make(chan []*agentproc.Process) + logger = slog.Make(sloghuman.Sink(io.Discard)) + ) + + err := afero.WriteFile(fs, "/proc/self/oom_score_adj", []byte("0"), 0o644) + require.NoError(t, err) + + // Create some processes. + for i := 0; i < 3; i++ { + proc := agentproctest.GenerateProcess(t, fs) + syscaller.EXPECT(). + Kill(proc.PID, syscall.Signal(0)). + Return(nil) + syscaller.EXPECT().GetPriority(proc.PID).Return(20, nil) + syscaller.EXPECT().SetPriority(proc.PID, 10).Return(nil) + } + + _, _, _, _, _ = setupAgent(t, agentsdk.Manifest{}, 0, func(c *agenttest.Client, o *agent.Options) { + o.Syscaller = syscaller + o.ModifiedProcesses = modProcs + o.EnvironmentVariables = map[string]string{ + agent.EnvProcPrioMgmt: "1", + agent.EnvProcOOMScore: "-567", + } + o.Filesystem = fs + o.Logger = logger + o.ProcessManagementTick = ticker + }) + actualProcs := <-modProcs + // We should ignore the process with a custom nice score. + require.Len(t, actualProcs, 3) + for _, proc := range actualProcs { + requireFileEquals(t, fs, fmt.Sprintf("/proc/%d/oom_score_adj", proc.PID), "-567") + } }) t.Run("DisabledByDefault", func(t *testing.T) { @@ -2739,3 +2797,17 @@ func requireEcho(t *testing.T, conn net.Conn) { require.NoError(t, err) require.Equal(t, "test", string(b)) } + +func requireFileWrite(t testing.TB, fs afero.Fs, fp, data string) { + t.Helper() + err := afero.WriteFile(fs, fp, []byte(data), 0o600) + require.NoError(t, err) +} + +func requireFileEquals(t testing.TB, fs afero.Fs, fp, expect string) { + t.Helper() + actual, err := afero.ReadFile(fs, fp) + require.NoError(t, err) + + require.Equal(t, expect, string(actual)) +} diff --git a/agent/agentproc/agentproctest/proc.go b/agent/agentproc/agentproctest/proc.go index c36e04ec1c..4fa1c698b5 100644 --- a/agent/agentproc/agentproctest/proc.go +++ b/agent/agentproc/agentproctest/proc.go @@ -2,6 +2,7 @@ package agentproctest import ( "fmt" + "strconv" "testing" "github.com/spf13/afero" @@ -29,8 +30,9 @@ func GenerateProcess(t *testing.T, fs afero.Fs, muts ...func(*agentproc.Process) cmdline := fmt.Sprintf("%s\x00%s\x00%s", arg1, arg2, arg3) process := agentproc.Process{ - CmdLine: cmdline, - PID: int32(pid), + CmdLine: cmdline, + PID: int32(pid), + OOMScoreAdj: 0, } for _, mut := range muts { @@ -45,5 +47,9 @@ func GenerateProcess(t *testing.T, fs afero.Fs, muts ...func(*agentproc.Process) err = afero.WriteFile(fs, fmt.Sprintf("%s/cmdline", process.Dir), []byte(process.CmdLine), 0o444) require.NoError(t, err) + score := strconv.Itoa(process.OOMScoreAdj) + err = afero.WriteFile(fs, fmt.Sprintf("%s/oom_score_adj", process.Dir), []byte(score), 0o444) + require.NoError(t, err) + return process } diff --git a/agent/agentproc/proc_unix.go b/agent/agentproc/proc_unix.go index f52caed52e..2eeb7d5a22 100644 --- a/agent/agentproc/proc_unix.go +++ b/agent/agentproc/proc_unix.go @@ -5,6 +5,7 @@ package agentproc import ( "errors" + "os" "path/filepath" "strconv" "strings" @@ -50,10 +51,26 @@ func List(fs afero.Fs, syscaller Syscaller) ([]*Process, error) { } return nil, xerrors.Errorf("read cmdline: %w", err) } + + oomScore, err := afero.ReadFile(fs, filepath.Join(defaultProcDir, entry, "oom_score_adj")) + if err != nil { + if xerrors.Is(err, os.ErrPermission) { + continue + } + + return nil, xerrors.Errorf("read oom_score_adj: %w", err) + } + + oom, err := strconv.Atoi(strings.TrimSpace(string(oomScore))) + if err != nil { + return nil, xerrors.Errorf("convert oom score: %w", err) + } + processes = append(processes, &Process{ - PID: int32(pid), - CmdLine: string(cmdline), - Dir: filepath.Join(defaultProcDir, entry), + PID: int32(pid), + CmdLine: string(cmdline), + Dir: filepath.Join(defaultProcDir, entry), + OOMScoreAdj: oom, }) } diff --git a/agent/agentproc/syscaller.go b/agent/agentproc/syscaller.go index 25dc6cfd54..fba3bf32ce 100644 --- a/agent/agentproc/syscaller.go +++ b/agent/agentproc/syscaller.go @@ -14,7 +14,8 @@ type Syscaller interface { const defaultProcDir = "/proc" type Process struct { - Dir string - CmdLine string - PID int32 + Dir string + CmdLine string + PID int32 + OOMScoreAdj int } diff --git a/agent/stats_internal_test.go b/agent/stats_internal_test.go index bfd6a3436d..57b21a655a 100644 --- a/agent/stats_internal_test.go +++ b/agent/stats_internal_test.go @@ -1,7 +1,10 @@ package agent import ( + "bytes" "context" + "encoding/json" + "io" "net/netip" "sync" "testing" @@ -14,6 +17,7 @@ import ( "tailscale.com/types/netlogtype" "cdr.dev/slog" + "cdr.dev/slog/sloggers/slogjson" "cdr.dev/slog/sloggers/slogtest" "github.com/coder/coder/v2/agent/proto" "github.com/coder/coder/v2/testutil" @@ -210,3 +214,58 @@ func newFakeStatsDest() *fakeStatsDest { resps: make(chan *proto.UpdateStatsResponse), } } + +func Test_logDebouncer(t *testing.T) { + t.Parallel() + + var ( + buf bytes.Buffer + logger = slog.Make(slogjson.Sink(&buf)) + ctx = context.Background() + ) + + debouncer := &logDebouncer{ + logger: logger, + messages: map[string]time.Time{}, + interval: time.Minute, + } + + fields := map[string]interface{}{ + "field_1": float64(1), + "field_2": "2", + } + + debouncer.Error(ctx, "my message", "field_1", 1, "field_2", "2") + debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2") + // Shouldn't log this. + debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2") + + require.Len(t, debouncer.messages, 2) + + type entry struct { + Msg string `json:"msg"` + Level string `json:"level"` + Fields map[string]interface{} `json:"fields"` + } + + assertLog := func(msg string, level string, fields map[string]interface{}) { + line, err := buf.ReadString('\n') + require.NoError(t, err) + + var e entry + err = json.Unmarshal([]byte(line), &e) + require.NoError(t, err) + require.Equal(t, msg, e.Msg) + require.Equal(t, level, e.Level) + require.Equal(t, fields, e.Fields) + } + assertLog("my message", "ERROR", fields) + assertLog("another message", "WARN", fields) + + debouncer.messages["another message"] = time.Now().Add(-2 * time.Minute) + debouncer.Warn(ctx, "another message", "field_1", 1, "field_2", "2") + assertLog("another message", "WARN", fields) + // Assert nothing else was written. + _, err := buf.ReadString('\n') + require.ErrorIs(t, err, io.EOF) +} diff --git a/cli/agent.go b/cli/agent.go index aaef3805e6..1f91f1c98b 100644 --- a/cli/agent.go +++ b/cli/agent.go @@ -283,6 +283,9 @@ func (r *RootCmd) workspaceAgent() *serpent.Command { if v, ok := os.LookupEnv(agent.EnvProcPrioMgmt); ok { environmentVariables[agent.EnvProcPrioMgmt] = v } + if v, ok := os.LookupEnv(agent.EnvProcOOMScore); ok { + environmentVariables[agent.EnvProcOOMScore] = v + } agnt := agent.New(agent.Options{ Client: client,