coder/coderd/provisionerdserver/acquirer_test.go

751 lines
25 KiB
Go

package provisionerdserver_test
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"sync"
"testing"
"time"
"github.com/google/uuid"
"github.com/sqlc-dev/pqtype"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/exp/slices"
"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"
"github.com/coder/coder/v2/coderd/database"
"github.com/coder/coder/v2/coderd/database/dbmem"
"github.com/coder/coder/v2/coderd/database/dbtestutil"
"github.com/coder/coder/v2/coderd/database/dbtime"
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/coderd/provisionerdserver"
"github.com/coder/coder/v2/testutil"
)
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
// TestAcquirer_Store tests that a database.Store is accepted as a provisionerdserver.AcquirerStore
func TestAcquirer_Store(t *testing.T) {
t.Parallel()
db := dbmem.New()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
_ = provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), db, ps)
}
func TestAcquirer_Single(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
orgID := uuid.New()
workerID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
jobID := uuid.New()
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
require.Len(t, fs.params, 1)
require.Equal(t, workerID, fs.params[0].WorkerID.UUID)
}
// TestAcquirer_MultipleSameDomain tests multiple acquirees with the same provisioners and tags
func TestAcquirer_MultipleSameDomain(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
acquirees := make([]*testAcquiree, 0, 10)
jobIDs := make(map[uuid.UUID]bool)
workerIDs := make(map[uuid.UUID]bool)
orgID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
for i := 0; i < 10; i++ {
wID := uuid.New()
workerIDs[wID] = true
a := newTestAcquiree(t, orgID, wID, pt, tags)
acquirees = append(acquirees, a)
a.startAcquire(ctx, uut)
}
for i := 0; i < 10; i++ {
jobID := uuid.New()
jobIDs[jobID] = true
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
}
gotJobIDs := make(map[uuid.UUID]bool)
for i := 0; i < 10; i++ {
j := acquirees[i].success(ctx)
gotJobIDs[j.ID] = true
}
require.Equal(t, jobIDs, gotJobIDs)
require.Len(t, fs.overlaps, 0)
gotWorkerCalls := make(map[uuid.UUID]bool)
for _, params := range fs.params {
gotWorkerCalls[params.WorkerID.UUID] = true
}
require.Equal(t, workerIDs, gotWorkerCalls)
}
// TestAcquirer_WaitsOnNoJobs tests that after a call that returns no jobs, Acquirer waits for a new
// job posting before retrying
func TestAcquirer_WaitsOnNoJobs(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
orgID := uuid.New()
workerID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
jobID := uuid.New()
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
require.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
require.Eventually(t, func() bool {
fs.mu.Lock()
defer fs.mu.Unlock()
return len(fs.params) == 1
}, testutil.WaitShort, testutil.IntervalFast)
acquiree.requireBlocked()
// First send in some with incompatible tags & types
postJob(t, ps, database.ProvisionerTypeEcho, provisionerdserver.Tags{
"cool": "tapes",
"strong": "bad",
})
postJob(t, ps, database.ProvisionerTypeEcho, provisionerdserver.Tags{
"environment": "fighters",
})
postJob(t, ps, database.ProvisionerTypeTerraform, provisionerdserver.Tags{
"environment": "on-prem",
})
acquiree.requireBlocked()
// compatible tags
postJob(t, ps, database.ProvisionerTypeEcho, provisionerdserver.Tags{})
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
}
// TestAcquirer_RetriesPending tests that if we get a job posting while a db call is in progress
// we retry to acquire a job immediately, even if the first call returned no jobs. We want this
// behavior since the query that found no jobs could have resolved before the job was posted, but
// the query result could reach us later than the posting over the pubsub.
func TestAcquirer_RetriesPending(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
orgID := uuid.New()
workerID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
jobID := uuid.New()
acquiree.startAcquire(ctx, uut)
require.Eventually(t, func() bool {
fs.mu.Lock()
defer fs.mu.Unlock()
return len(fs.params) == 1
}, testutil.WaitShort, testutil.IntervalFast)
// First call to DB is in progress. Send in posting
postJob(t, ps, database.ProvisionerTypeEcho, provisionerdserver.Tags{})
// there is a race between the posting being processed and the DB call
// returning. In either case we should retry, but we're trying to hit the
// case where the posting is processed first, so sleep a little bit to give
// it a chance.
time.Sleep(testutil.IntervalMedium)
// Now, when first DB call returns ErrNoRows we retry.
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
require.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
}
// TestAcquirer_DifferentDomains tests that acquirees with different tags don't block each other
func TestAcquirer_DifferentDomains(t *testing.T) {
t.Parallel()
fs := newFakeTaggedStore(t)
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
orgID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
worker0 := uuid.New()
tags0 := provisionerdserver.Tags{
"worker": "0",
}
acquiree0 := newTestAcquiree(t, orgID, worker0, pt, tags0)
worker1 := uuid.New()
tags1 := provisionerdserver.Tags{
"worker": "1",
}
acquiree1 := newTestAcquiree(t, orgID, worker1, pt, tags1)
jobID := uuid.New()
fs.jobs = []database.ProvisionerJob{
{ID: jobID, Provisioner: database.ProvisionerTypeEcho, Tags: database.StringMap{"worker": "1"}},
}
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
ctx0, cancel0 := context.WithCancel(ctx)
defer cancel0()
acquiree0.startAcquire(ctx0, uut)
select {
case params := <-fs.params:
require.Equal(t, worker0, params.WorkerID.UUID)
case <-ctx.Done():
t.Fatal("timed out waiting for call to database from worker0")
}
acquiree0.requireBlocked()
// worker1 should not be blocked by worker0, as they are different tags
acquiree1.startAcquire(ctx, uut)
job := acquiree1.success(ctx)
require.Equal(t, jobID, job.ID)
cancel0()
acquiree0.requireCanceled(ctx)
}
func TestAcquirer_BackupPoll(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
uut := provisionerdserver.NewAcquirer(
ctx, logger.Named("acquirer"), fs, ps,
provisionerdserver.TestingBackupPollDuration(testutil.IntervalMedium),
)
workerID := uuid.New()
orgID := uuid.New()
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
acquiree := newTestAcquiree(t, orgID, workerID, pt, tags)
jobID := uuid.New()
err := fs.sendCtx(ctx, database.ProvisionerJob{}, sql.ErrNoRows)
require.NoError(t, err)
err = fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
acquiree.startAcquire(ctx, uut)
job := acquiree.success(ctx)
require.Equal(t, jobID, job.ID)
}
// TestAcquirer_UnblockOnCancel tests that a canceled call doesn't block a call
// from the same domain.
func TestAcquirer_UnblockOnCancel(t *testing.T) {
t.Parallel()
fs := newFakeOrderedStore()
ps := pubsub.NewInMemory()
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()
logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
pt := []database.ProvisionerType{database.ProvisionerTypeEcho}
orgID := uuid.New()
worker0 := uuid.New()
tags := provisionerdserver.Tags{
"environment": "on-prem",
}
acquiree0 := newTestAcquiree(t, orgID, worker0, pt, tags)
worker1 := uuid.New()
acquiree1 := newTestAcquiree(t, orgID, worker1, pt, tags)
jobID := uuid.New()
uut := provisionerdserver.NewAcquirer(ctx, logger.Named("acquirer"), fs, ps)
// queue up 2 responses --- we may not need both, since acquiree0 will
// usually cancel before calling, but cancel is async, so it might call.
for i := 0; i < 2; i++ {
err := fs.sendCtx(ctx, database.ProvisionerJob{ID: jobID}, nil)
require.NoError(t, err)
}
ctx0, cancel0 := context.WithCancel(ctx)
cancel0()
acquiree0.startAcquire(ctx0, uut)
acquiree1.startAcquire(ctx, uut)
job := acquiree1.success(ctx)
require.Equal(t, jobID, job.ID)
}
func TestAcquirer_MatchTags(t *testing.T) {
t.Parallel()
if testing.Short() {
t.Skip("skipping this test due to -short")
}
testCases := []struct {
name string
provisionerJobTags map[string]string
acquireJobTags map[string]string
unmatchedOrg bool // acquire will use a random org id
expectAcquire bool
}{
{
name: "untagged provisioner and untagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": ""},
acquireJobTags: map[string]string{"scope": "organization", "owner": ""},
expectAcquire: true,
},
{
name: "tagged provisioner and tagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
expectAcquire: true,
},
{
name: "double-tagged provisioner and tagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "chicago"},
expectAcquire: true,
},
{
name: "double-tagged provisioner and double-tagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "chicago"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "chicago"},
expectAcquire: true,
},
{
name: "user-scoped provisioner and user-scoped job",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa"},
expectAcquire: true,
},
{
name: "user-scoped provisioner with tags and user-scoped job",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
expectAcquire: true,
},
{
name: "user-scoped provisioner with tags and user-scoped job with tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
expectAcquire: true,
},
{
name: "user-scoped provisioner with multiple tags and user-scoped job with tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "chicago"},
expectAcquire: true,
},
{
name: "user-scoped provisioner with multiple tags and user-scoped job with multiple tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "chicago"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "chicago"},
expectAcquire: true,
},
{
name: "untagged provisioner and tagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "organization", "owner": ""},
expectAcquire: false,
},
{
name: "tagged provisioner and untagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": ""},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
expectAcquire: false,
},
{
name: "tagged provisioner and double-tagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "chicago"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
expectAcquire: false,
},
{
name: "double-tagged provisioner and double-tagged job with differing tags",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "chicago"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem", "datacenter": "new_york"},
expectAcquire: false,
},
{
name: "user-scoped provisioner and untagged job",
provisionerJobTags: map[string]string{"scope": "organization", "owner": ""},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa"},
expectAcquire: false,
},
{
name: "user-scoped provisioner and different user-scoped job",
provisionerJobTags: map[string]string{"scope": "user", "owner": "bbb"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa"},
expectAcquire: false,
},
{
name: "org-scoped provisioner and user-scoped job",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa"},
acquireJobTags: map[string]string{"scope": "organization", "owner": ""},
expectAcquire: false,
},
{
name: "user-scoped provisioner and org-scoped job with tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "organization", "owner": ""},
expectAcquire: false,
},
{
name: "user-scoped provisioner and user-scoped job with tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa"},
expectAcquire: false,
},
{
name: "user-scoped provisioner with tags and user-scoped job with multiple tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "chicago"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem"},
expectAcquire: false,
},
{
name: "user-scoped provisioner with tags and user-scoped job with differing tags",
provisionerJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "new_york"},
acquireJobTags: map[string]string{"scope": "user", "owner": "aaa", "environment": "on-prem", "datacenter": "chicago"},
expectAcquire: false,
},
{
name: "matching tags with unmatched org",
provisionerJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
acquireJobTags: map[string]string{"scope": "organization", "owner": "", "environment": "on-prem"},
expectAcquire: false,
unmatchedOrg: true,
},
}
for _, tt := range testCases {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
ctx := testutil.Context(t, testutil.WaitShort)
// NOTE: explicitly not using fake store for this test.
db, ps := dbtestutil.NewDB(t)
log := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
org, err := db.InsertOrganization(ctx, database.InsertOrganizationParams{
ID: uuid.New(),
Name: "test org",
Description: "the organization of testing",
CreatedAt: dbtime.Now(),
UpdatedAt: dbtime.Now(),
})
require.NoError(t, err)
pj, err := db.InsertProvisionerJob(ctx, database.InsertProvisionerJobParams{
ID: uuid.New(),
CreatedAt: dbtime.Now(),
UpdatedAt: dbtime.Now(),
OrganizationID: org.ID,
InitiatorID: uuid.New(),
Provisioner: database.ProvisionerTypeEcho,
StorageMethod: database.ProvisionerStorageMethodFile,
FileID: uuid.New(),
Type: database.ProvisionerJobTypeWorkspaceBuild,
Input: []byte("{}"),
Tags: tt.provisionerJobTags,
TraceMetadata: pqtype.NullRawMessage{},
})
require.NoError(t, err)
ptypes := []database.ProvisionerType{database.ProvisionerTypeEcho}
acq := provisionerdserver.NewAcquirer(ctx, log, db, ps)
acquireOrgID := org.ID
if tt.unmatchedOrg {
acquireOrgID = uuid.New()
}
aj, err := acq.AcquireJob(ctx, acquireOrgID, uuid.New(), ptypes, tt.acquireJobTags)
if tt.expectAcquire {
assert.NoError(t, err)
assert.Equal(t, pj.ID, aj.ID)
} else {
assert.Empty(t, aj, "should not have acquired job")
assert.ErrorIs(t, err, context.DeadlineExceeded, "should have timed out")
}
})
}
t.Run("GenTable", func(t *testing.T) {
t.Parallel()
// Generate a table that can be copy-pasted into docs/admin/provisioners.md
lines := []string{
"\n",
"| Provisioner Tags | Job Tags | Can Run Job? |",
"|------------------|----------|--------------|",
}
// turn the JSON map into k=v for readability
kvs := func(m map[string]string) string {
ss := make([]string, 0, len(m))
// ensure consistent ordering of tags
for _, k := range []string{"scope", "owner", "environment", "datacenter"} {
if v, found := m[k]; found {
ss = append(ss, k+"="+v)
}
}
return strings.Join(ss, " ")
}
for _, tt := range testCases {
acquire := "✅"
if !tt.expectAcquire {
acquire = "❌"
}
s := fmt.Sprintf("| %s | %s | %s |", kvs(tt.acquireJobTags), kvs(tt.provisionerJobTags), acquire)
lines = append(lines, s)
}
t.Logf("You can paste this into docs/admin/provisioners.md")
t.Logf(strings.Join(lines, "\n"))
})
}
func postJob(t *testing.T, ps pubsub.Pubsub, pt database.ProvisionerType, tags provisionerdserver.Tags) {
t.Helper()
msg, err := json.Marshal(provisionerjobs.JobPosting{
ProvisionerType: pt,
Tags: tags,
})
require.NoError(t, err)
err = ps.Publish(provisionerjobs.EventJobPosted, msg)
require.NoError(t, err)
}
// fakeOrderedStore is a fake store that lets tests send AcquireProvisionerJob
// results in order over a channel, and tests for overlapped calls.
type fakeOrderedStore struct {
jobs chan database.ProvisionerJob
errors chan error
mu sync.Mutex
params []database.AcquireProvisionerJobParams
// inflight and overlaps track whether any calls from workers overlap with
// one another
inflight map[uuid.UUID]bool
overlaps [][]uuid.UUID
}
func newFakeOrderedStore() *fakeOrderedStore {
return &fakeOrderedStore{
// buffer the channels so that we can queue up lots of responses to
// occur nearly simultaneously
jobs: make(chan database.ProvisionerJob, 100),
errors: make(chan error, 100),
inflight: make(map[uuid.UUID]bool),
}
}
func (s *fakeOrderedStore) AcquireProvisionerJob(
_ context.Context, params database.AcquireProvisionerJobParams,
) (
database.ProvisionerJob, error,
) {
s.mu.Lock()
s.params = append(s.params, params)
for workerID := range s.inflight {
s.overlaps = append(s.overlaps, []uuid.UUID{workerID, params.WorkerID.UUID})
}
s.inflight[params.WorkerID.UUID] = true
s.mu.Unlock()
job := <-s.jobs
err := <-s.errors
s.mu.Lock()
delete(s.inflight, params.WorkerID.UUID)
s.mu.Unlock()
return job, err
}
func (s *fakeOrderedStore) sendCtx(ctx context.Context, job database.ProvisionerJob, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
case s.jobs <- job:
// OK
}
select {
case <-ctx.Done():
return ctx.Err()
case s.errors <- err:
// OK
}
return nil
}
// fakeTaggedStore is a test store that allows tests to specify which jobs are
// available, and returns them to callers with the appropriate provisioner type
// and tags. It doesn't care about the order.
type fakeTaggedStore struct {
t *testing.T
mu sync.Mutex
jobs []database.ProvisionerJob
params chan database.AcquireProvisionerJobParams
}
func newFakeTaggedStore(t *testing.T) *fakeTaggedStore {
return &fakeTaggedStore{
t: t,
params: make(chan database.AcquireProvisionerJobParams, 100),
}
}
func (s *fakeTaggedStore) AcquireProvisionerJob(
_ context.Context, params database.AcquireProvisionerJobParams,
) (
database.ProvisionerJob, error,
) {
defer func() { s.params <- params }()
var tags provisionerdserver.Tags
err := json.Unmarshal(params.Tags, &tags)
if !assert.NoError(s.t, err) {
return database.ProvisionerJob{}, err
}
s.mu.Lock()
defer s.mu.Unlock()
jobLoop:
for i, job := range s.jobs {
if !slices.Contains(params.Types, job.Provisioner) {
continue
}
for k, v := range job.Tags {
pv, ok := tags[k]
if !ok {
continue jobLoop
}
if v != pv {
continue jobLoop
}
}
// found a job!
s.jobs = append(s.jobs[:i], s.jobs[i+1:]...)
return job, nil
}
return database.ProvisionerJob{}, sql.ErrNoRows
}
// testAcquiree is a helper type that handles asynchronously calling AcquireJob
// and asserting whether or not it returns, blocks, or is canceled.
type testAcquiree struct {
t *testing.T
orgID uuid.UUID
workerID uuid.UUID
pt []database.ProvisionerType
tags provisionerdserver.Tags
ec chan error
jc chan database.ProvisionerJob
}
func newTestAcquiree(t *testing.T, orgID uuid.UUID, workerID uuid.UUID, pt []database.ProvisionerType, tags provisionerdserver.Tags) *testAcquiree {
return &testAcquiree{
t: t,
orgID: orgID,
workerID: workerID,
pt: pt,
tags: tags,
ec: make(chan error, 1),
jc: make(chan database.ProvisionerJob, 1),
}
}
func (a *testAcquiree) startAcquire(ctx context.Context, uut *provisionerdserver.Acquirer) {
go func() {
j, e := uut.AcquireJob(ctx, a.orgID, a.workerID, a.pt, a.tags)
a.ec <- e
a.jc <- j
}()
}
func (a *testAcquiree) success(ctx context.Context) database.ProvisionerJob {
select {
case <-ctx.Done():
a.t.Fatal("timeout waiting for AcquireJob error")
case err := <-a.ec:
require.NoError(a.t, err)
}
select {
case <-ctx.Done():
a.t.Fatal("timeout waiting for AcquireJob job")
case job := <-a.jc:
return job
}
// unhittable
return database.ProvisionerJob{}
}
func (a *testAcquiree) requireBlocked() {
select {
case <-a.ec:
a.t.Fatal("AcquireJob should block")
default:
// OK
}
}
func (a *testAcquiree) requireCanceled(ctx context.Context) {
select {
case err := <-a.ec:
require.ErrorIs(a.t, err, context.Canceled)
case <-ctx.Done():
a.t.Fatal("timed out waiting for AcquireJob")
}
select {
case job := <-a.jc:
require.Equal(a.t, uuid.Nil, job.ID)
case <-ctx.Done():
a.t.Fatal("timed out waiting for AcquireJob")
}
}