mirror of https://github.com/coder/coder.git
490 lines
14 KiB
Go
490 lines
14 KiB
Go
package provisionerdserver
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/cenkalti/backoff/v4"
|
|
"github.com/google/uuid"
|
|
"golang.org/x/exp/slices"
|
|
"golang.org/x/xerrors"
|
|
|
|
"cdr.dev/slog"
|
|
"github.com/coder/coder/v2/coderd/database"
|
|
"github.com/coder/coder/v2/coderd/database/dbtime"
|
|
"github.com/coder/coder/v2/coderd/database/provisionerjobs"
|
|
"github.com/coder/coder/v2/coderd/database/pubsub"
|
|
)
|
|
|
|
const (
|
|
dbMaxBackoff = 10 * time.Second
|
|
// backPollDuration is the period for the backup polling described in Acquirer comment
|
|
backupPollDuration = 30 * time.Second
|
|
)
|
|
|
|
// Acquirer is shared among multiple routines that need to call
|
|
// database.Store.AcquireProvisionerJob. The callers that acquire jobs are called "acquirees". The
|
|
// goal is to minimize polling the database (i.e. lower our average query rate) and simplify the
|
|
// acquiree's logic by handling retrying the database if a job is not available at the time of the
|
|
// call.
|
|
//
|
|
// When multiple acquirees share a set of provisioner types and tags, we define them as part of the
|
|
// same "domain". Only one acquiree from each domain may query the database at a time. If the
|
|
// database returns no jobs for that acquiree, the entire domain waits until the Acquirer is
|
|
// notified over the pubsub of a new job acceptable to the domain.
|
|
//
|
|
// As a backup to pubsub notifications, each domain is allowed to query periodically once every 30s.
|
|
// This ensures jobs are not stuck permanently if the service that created them fails to publish
|
|
// (e.g. a crash).
|
|
type Acquirer struct {
|
|
ctx context.Context
|
|
logger slog.Logger
|
|
store AcquirerStore
|
|
ps pubsub.Pubsub
|
|
|
|
mu sync.Mutex
|
|
q map[dKey]domain
|
|
|
|
// testing only
|
|
backupPollDuration time.Duration
|
|
}
|
|
|
|
type AcquirerOption func(*Acquirer)
|
|
|
|
func TestingBackupPollDuration(dur time.Duration) AcquirerOption {
|
|
return func(a *Acquirer) {
|
|
a.backupPollDuration = dur
|
|
}
|
|
}
|
|
|
|
// AcquirerStore is the subset of database.Store that the Acquirer needs
|
|
type AcquirerStore interface {
|
|
AcquireProvisionerJob(context.Context, database.AcquireProvisionerJobParams) (database.ProvisionerJob, error)
|
|
}
|
|
|
|
func NewAcquirer(ctx context.Context, logger slog.Logger, store AcquirerStore, ps pubsub.Pubsub,
|
|
opts ...AcquirerOption,
|
|
) *Acquirer {
|
|
a := &Acquirer{
|
|
ctx: ctx,
|
|
logger: logger,
|
|
store: store,
|
|
ps: ps,
|
|
q: make(map[dKey]domain),
|
|
backupPollDuration: backupPollDuration,
|
|
}
|
|
for _, opt := range opts {
|
|
opt(a)
|
|
}
|
|
a.subscribe()
|
|
return a
|
|
}
|
|
|
|
// AcquireJob acquires a job with one of the given provisioner types and compatible
|
|
// tags from the database. The call blocks until a job is acquired, the context is
|
|
// done, or the database returns an error _other_ than that no jobs are available.
|
|
// If no jobs are available, this method handles retrying as appropriate.
|
|
func (a *Acquirer) AcquireJob(
|
|
ctx context.Context, organization uuid.UUID, worker uuid.UUID, pt []database.ProvisionerType, tags Tags,
|
|
) (
|
|
retJob database.ProvisionerJob, retErr error,
|
|
) {
|
|
logger := a.logger.With(
|
|
slog.F("organization_id", organization),
|
|
slog.F("worker_id", worker),
|
|
slog.F("provisioner_types", pt),
|
|
slog.F("tags", tags))
|
|
logger.Debug(ctx, "acquiring job")
|
|
dk := domainKey(organization, pt, tags)
|
|
dbTags, err := tags.ToJSON()
|
|
if err != nil {
|
|
return database.ProvisionerJob{}, err
|
|
}
|
|
// buffer of 1 so that cancel doesn't deadlock while writing to the channel
|
|
clearance := make(chan struct{}, 1)
|
|
for {
|
|
a.want(organization, pt, tags, clearance)
|
|
select {
|
|
case <-ctx.Done():
|
|
err := ctx.Err()
|
|
logger.Debug(ctx, "acquiring job canceled", slog.Error(err))
|
|
internalError := a.cancel(dk, clearance)
|
|
if internalError != nil {
|
|
// internalError takes precedence
|
|
return database.ProvisionerJob{}, internalError
|
|
}
|
|
return database.ProvisionerJob{}, err
|
|
case <-clearance:
|
|
logger.Debug(ctx, "got clearance to call database")
|
|
job, err := a.store.AcquireProvisionerJob(ctx, database.AcquireProvisionerJobParams{
|
|
OrganizationID: organization,
|
|
StartedAt: sql.NullTime{
|
|
Time: dbtime.Now(),
|
|
Valid: true,
|
|
},
|
|
WorkerID: uuid.NullUUID{
|
|
UUID: worker,
|
|
Valid: true,
|
|
},
|
|
Types: pt,
|
|
Tags: dbTags,
|
|
})
|
|
if xerrors.Is(err, sql.ErrNoRows) {
|
|
logger.Debug(ctx, "no job available")
|
|
continue
|
|
}
|
|
// we are not going to retry, so signal we are done
|
|
internalError := a.done(dk, clearance)
|
|
if internalError != nil {
|
|
// internal error takes precedence
|
|
return database.ProvisionerJob{}, internalError
|
|
}
|
|
if err != nil {
|
|
logger.Warn(ctx, "error attempting to acquire job", slog.Error(err))
|
|
return database.ProvisionerJob{}, xerrors.Errorf("failed to acquire job: %w", err)
|
|
}
|
|
logger.Debug(ctx, "successfully acquired job")
|
|
return job, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// want signals that an acquiree wants clearance to query for a job with the given dKey.
|
|
func (a *Acquirer) want(organization uuid.UUID, pt []database.ProvisionerType, tags Tags, clearance chan<- struct{}) {
|
|
dk := domainKey(organization, pt, tags)
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
cleared := false
|
|
d, ok := a.q[dk]
|
|
if !ok {
|
|
ctx, cancel := context.WithCancel(a.ctx)
|
|
d = domain{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
a: a,
|
|
key: dk,
|
|
pt: pt,
|
|
tags: tags,
|
|
acquirees: make(map[chan<- struct{}]*acquiree),
|
|
}
|
|
a.q[dk] = d
|
|
go d.poll(a.backupPollDuration)
|
|
// this is a new request for this dKey, so is cleared.
|
|
cleared = true
|
|
}
|
|
w, ok := d.acquirees[clearance]
|
|
if !ok {
|
|
w = &acquiree{clearance: clearance}
|
|
d.acquirees[clearance] = w
|
|
}
|
|
// pending means that we got a job posting for this dKey while we were
|
|
// querying, so we should clear this acquiree to retry another time.
|
|
if w.pending {
|
|
cleared = true
|
|
w.pending = false
|
|
}
|
|
w.inProgress = cleared
|
|
if cleared {
|
|
// this won't block because clearance is buffered.
|
|
clearance <- struct{}{}
|
|
}
|
|
}
|
|
|
|
// cancel signals that an acquiree no longer wants clearance to query. Any error returned is a serious internal error
|
|
// indicating that integrity of the internal state is corrupted by a code bug.
|
|
func (a *Acquirer) cancel(dk dKey, clearance chan<- struct{}) error {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
d, ok := a.q[dk]
|
|
if !ok {
|
|
// this is a code error, as something removed the domain early, or cancel
|
|
// was called twice.
|
|
err := xerrors.New("cancel for domain that doesn't exist")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
w, ok := d.acquirees[clearance]
|
|
if !ok {
|
|
// this is a code error, as something removed the acquiree early, or cancel
|
|
// was called twice.
|
|
err := xerrors.New("cancel for an acquiree that doesn't exist")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
delete(d.acquirees, clearance)
|
|
if w.inProgress && len(d.acquirees) > 0 {
|
|
// this one canceled before querying, so give another acquiree a chance
|
|
// instead
|
|
for _, other := range d.acquirees {
|
|
if other.inProgress {
|
|
err := xerrors.New("more than one acquiree in progress for same key")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
other.inProgress = true
|
|
other.clearance <- struct{}{}
|
|
break // just one
|
|
}
|
|
}
|
|
if len(d.acquirees) == 0 {
|
|
d.cancel()
|
|
delete(a.q, dk)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// done signals that the acquiree has completed acquiring a job (usually successfully, but we also get this call if
|
|
// there is a database error other than ErrNoRows). Any error returned is a serious internal error indicating that
|
|
// integrity of the internal state is corrupted by a code bug.
|
|
func (a *Acquirer) done(dk dKey, clearance chan struct{}) error {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
d, ok := a.q[dk]
|
|
if !ok {
|
|
// this is a code error, as something removed the domain early, or done
|
|
// was called twice.
|
|
err := xerrors.New("done for a domain that doesn't exist")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
w, ok := d.acquirees[clearance]
|
|
if !ok {
|
|
// this is a code error, as something removed the dKey early, or done
|
|
// was called twice.
|
|
err := xerrors.New("done for an acquiree that doesn't exist")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
if !w.inProgress {
|
|
err := xerrors.New("done acquiree was not in progress")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
delete(d.acquirees, clearance)
|
|
if len(d.acquirees) == 0 {
|
|
d.cancel()
|
|
delete(a.q, dk)
|
|
return nil
|
|
}
|
|
// in the mainline, this means that the acquiree successfully got a job.
|
|
// if any others are waiting, clear one of them to try to get a job next so
|
|
// that we process the jobs until there are no more acquirees or the database
|
|
// is empty of jobs meeting our criteria
|
|
for _, other := range d.acquirees {
|
|
if other.inProgress {
|
|
err := xerrors.New("more than one acquiree in progress for same key")
|
|
a.logger.Critical(a.ctx, "internal error", slog.Error(err))
|
|
return err
|
|
}
|
|
other.inProgress = true
|
|
other.clearance <- struct{}{}
|
|
break // just one
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *Acquirer) subscribe() {
|
|
subscribed := make(chan struct{})
|
|
go func() {
|
|
defer close(subscribed)
|
|
eb := backoff.NewExponentialBackOff()
|
|
eb.MaxElapsedTime = 0 // retry indefinitely
|
|
eb.MaxInterval = dbMaxBackoff
|
|
bkoff := backoff.WithContext(eb, a.ctx)
|
|
var cancel context.CancelFunc
|
|
err := backoff.Retry(func() error {
|
|
cancelFn, err := a.ps.SubscribeWithErr(provisionerjobs.EventJobPosted, a.jobPosted)
|
|
if err != nil {
|
|
a.logger.Warn(a.ctx, "failed to subscribe to job postings", slog.Error(err))
|
|
return err
|
|
}
|
|
cancel = cancelFn
|
|
return nil
|
|
}, bkoff)
|
|
if err != nil {
|
|
if a.ctx.Err() == nil {
|
|
a.logger.Error(a.ctx, "code bug: retry failed before context canceled", slog.Error(err))
|
|
}
|
|
return
|
|
}
|
|
defer cancel()
|
|
bkoff.Reset()
|
|
a.logger.Debug(a.ctx, "subscribed to job postings")
|
|
|
|
// unblock the outer function from returning
|
|
subscribed <- struct{}{}
|
|
|
|
// hold subscriptions open until context is canceled
|
|
<-a.ctx.Done()
|
|
}()
|
|
<-subscribed
|
|
}
|
|
|
|
func (a *Acquirer) jobPosted(ctx context.Context, message []byte, err error) {
|
|
if xerrors.Is(err, pubsub.ErrDroppedMessages) {
|
|
a.logger.Warn(a.ctx, "pubsub may have dropped job postings")
|
|
a.clearOrPendAll()
|
|
return
|
|
}
|
|
if err != nil {
|
|
a.logger.Warn(a.ctx, "unhandled pubsub error", slog.Error(err))
|
|
return
|
|
}
|
|
posting := provisionerjobs.JobPosting{}
|
|
err = json.Unmarshal(message, &posting)
|
|
if err != nil {
|
|
a.logger.Error(a.ctx, "unable to parse job posting",
|
|
slog.F("message", string(message)),
|
|
slog.Error(err),
|
|
)
|
|
return
|
|
}
|
|
a.logger.Debug(ctx, "got job posting", slog.F("posting", posting))
|
|
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
for _, d := range a.q {
|
|
if d.contains(posting) {
|
|
a.clearOrPendLocked(d)
|
|
// we only need to wake up a single domain since there is only one
|
|
// new job available
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *Acquirer) clearOrPendAll() {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
for _, d := range a.q {
|
|
a.clearOrPendLocked(d)
|
|
}
|
|
}
|
|
|
|
func (a *Acquirer) clearOrPend(d domain) {
|
|
a.mu.Lock()
|
|
defer a.mu.Unlock()
|
|
if len(d.acquirees) == 0 {
|
|
// this can happen if the domain is removed right around the time the
|
|
// backup poll (which calls this function) triggers. Nothing to do
|
|
// since there are no acquirees.
|
|
return
|
|
}
|
|
a.clearOrPendLocked(d)
|
|
}
|
|
|
|
func (*Acquirer) clearOrPendLocked(d domain) {
|
|
// MUST BE CALLED HOLDING THE a.mu LOCK
|
|
var nominee *acquiree
|
|
for _, w := range d.acquirees {
|
|
if nominee == nil {
|
|
nominee = w
|
|
}
|
|
// acquiree in progress always takes precedence, since we don't want to
|
|
// wake up more than one acquiree per dKey at a time.
|
|
if w.inProgress {
|
|
nominee = w
|
|
break
|
|
}
|
|
}
|
|
if nominee.inProgress {
|
|
nominee.pending = true
|
|
return
|
|
}
|
|
nominee.inProgress = true
|
|
nominee.clearance <- struct{}{}
|
|
}
|
|
|
|
type dKey string
|
|
|
|
// domainKey generates a canonical map key for the given provisioner types and
|
|
// tags. It uses the null byte (0x00) as a delimiter because it is an
|
|
// unprintable control character and won't show up in any "reasonable" set of
|
|
// string tags, even in non-Latin scripts. It is important that Tags are
|
|
// validated not to contain this control character prior to use.
|
|
func domainKey(orgID uuid.UUID, pt []database.ProvisionerType, tags Tags) dKey {
|
|
sb := strings.Builder{}
|
|
_, _ = sb.WriteString(orgID.String())
|
|
_ = sb.WriteByte(0x00)
|
|
|
|
// make a copy of pt before sorting, so that we don't mutate the original
|
|
// slice or underlying array.
|
|
pts := make([]database.ProvisionerType, len(pt))
|
|
copy(pts, pt)
|
|
slices.Sort(pts)
|
|
for _, t := range pts {
|
|
_, _ = sb.WriteString(string(t))
|
|
_ = sb.WriteByte(0x00)
|
|
}
|
|
_ = sb.WriteByte(0x00)
|
|
var keys []string
|
|
for k := range tags {
|
|
keys = append(keys, k)
|
|
}
|
|
slices.Sort(keys)
|
|
for _, k := range keys {
|
|
_, _ = sb.WriteString(k)
|
|
_ = sb.WriteByte(0x00)
|
|
_, _ = sb.WriteString(tags[k])
|
|
_ = sb.WriteByte(0x00)
|
|
}
|
|
return dKey(sb.String())
|
|
}
|
|
|
|
// acquiree represents a specific client of Acquirer that wants to acquire a job
|
|
type acquiree struct {
|
|
clearance chan<- struct{}
|
|
// inProgress is true when the acquiree was granted clearance and a query
|
|
// is possibly in progress.
|
|
inProgress bool
|
|
// pending is true if we get a job posting while a query is in progress, so
|
|
// that we know to try again, even if we didn't get a job on the query.
|
|
pending bool
|
|
}
|
|
|
|
// domain represents a set of acquirees with the same provisioner types and
|
|
// tags. Acquirees in the same domain are restricted such that only one queries
|
|
// the database at a time.
|
|
type domain struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
a *Acquirer
|
|
key dKey
|
|
pt []database.ProvisionerType
|
|
tags Tags
|
|
acquirees map[chan<- struct{}]*acquiree
|
|
}
|
|
|
|
func (d domain) contains(p provisionerjobs.JobPosting) bool {
|
|
if !slices.Contains(d.pt, p.ProvisionerType) {
|
|
return false
|
|
}
|
|
for k, v := range p.Tags {
|
|
dv, ok := d.tags[k]
|
|
if !ok {
|
|
return false
|
|
}
|
|
if v != dv {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (d domain) poll(dur time.Duration) {
|
|
tkr := time.NewTicker(dur)
|
|
defer tkr.Stop()
|
|
for {
|
|
select {
|
|
case <-d.ctx.Done():
|
|
return
|
|
case <-tkr.C:
|
|
d.a.clearOrPend(d)
|
|
}
|
|
}
|
|
}
|