coder/codersdk/provisionerdaemons.go

267 lines
8.9 KiB
Go

package codersdk
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/cookiejar"
"time"
"github.com/google/uuid"
"github.com/hashicorp/yamux"
"golang.org/x/xerrors"
"nhooyr.io/websocket"
"github.com/coder/coder/v2/buildinfo"
"github.com/coder/coder/v2/codersdk/drpc"
"github.com/coder/coder/v2/provisionerd/proto"
"github.com/coder/coder/v2/provisionerd/runner"
)
type LogSource string
type LogLevel string
const (
LogSourceProvisionerDaemon LogSource = "provisioner_daemon"
LogSourceProvisioner LogSource = "provisioner"
LogLevelTrace LogLevel = "trace"
LogLevelDebug LogLevel = "debug"
LogLevelInfo LogLevel = "info"
LogLevelWarn LogLevel = "warn"
LogLevelError LogLevel = "error"
)
type ProvisionerDaemon struct {
ID uuid.UUID `json:"id" format:"uuid"`
CreatedAt time.Time `json:"created_at" format:"date-time"`
LastSeenAt NullTime `json:"last_seen_at,omitempty" format:"date-time"`
Name string `json:"name"`
Version string `json:"version"`
APIVersion string `json:"api_version"`
Provisioners []ProvisionerType `json:"provisioners"`
Tags map[string]string `json:"tags"`
}
// ProvisionerJobStatus represents the at-time state of a job.
type ProvisionerJobStatus string
// Active returns whether the job is still active or not.
// It returns true if canceling as well, since the job isn't
// in an entirely inactive state yet.
func (p ProvisionerJobStatus) Active() bool {
return p == ProvisionerJobPending ||
p == ProvisionerJobRunning ||
p == ProvisionerJobCanceling
}
const (
ProvisionerJobPending ProvisionerJobStatus = "pending"
ProvisionerJobRunning ProvisionerJobStatus = "running"
ProvisionerJobSucceeded ProvisionerJobStatus = "succeeded"
ProvisionerJobCanceling ProvisionerJobStatus = "canceling"
ProvisionerJobCanceled ProvisionerJobStatus = "canceled"
ProvisionerJobFailed ProvisionerJobStatus = "failed"
ProvisionerJobUnknown ProvisionerJobStatus = "unknown"
)
// JobErrorCode defines the error code returned by job runner.
type JobErrorCode string
const (
RequiredTemplateVariables JobErrorCode = "REQUIRED_TEMPLATE_VARIABLES"
)
// JobIsMissingParameterErrorCode returns whether the error is a missing parameter error.
// This can indicate to consumers that they should check parameters.
func JobIsMissingParameterErrorCode(code JobErrorCode) bool {
return string(code) == runner.MissingParameterErrorCode
}
// ProvisionerJob describes the job executed by the provisioning daemon.
type ProvisionerJob struct {
ID uuid.UUID `json:"id" format:"uuid"`
CreatedAt time.Time `json:"created_at" format:"date-time"`
StartedAt *time.Time `json:"started_at,omitempty" format:"date-time"`
CompletedAt *time.Time `json:"completed_at,omitempty" format:"date-time"`
CanceledAt *time.Time `json:"canceled_at,omitempty" format:"date-time"`
Error string `json:"error,omitempty"`
ErrorCode JobErrorCode `json:"error_code,omitempty" enums:"REQUIRED_TEMPLATE_VARIABLES"`
Status ProvisionerJobStatus `json:"status" enums:"pending,running,succeeded,canceling,canceled,failed"`
WorkerID *uuid.UUID `json:"worker_id,omitempty" format:"uuid"`
FileID uuid.UUID `json:"file_id" format:"uuid"`
Tags map[string]string `json:"tags"`
QueuePosition int `json:"queue_position"`
QueueSize int `json:"queue_size"`
}
// ProvisionerJobLog represents the provisioner log entry annotated with source and level.
type ProvisionerJobLog struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at" format:"date-time"`
Source LogSource `json:"log_source"`
Level LogLevel `json:"log_level" enums:"trace,debug,info,warn,error"`
Stage string `json:"stage"`
Output string `json:"output"`
}
// provisionerJobLogsAfter streams logs that occurred after a specific time.
func (c *Client) provisionerJobLogsAfter(ctx context.Context, path string, after int64) (<-chan ProvisionerJobLog, io.Closer, error) {
afterQuery := ""
if after != 0 {
afterQuery = fmt.Sprintf("&after=%d", after)
}
followURL, err := c.URL.Parse(fmt.Sprintf("%s?follow%s", path, afterQuery))
if err != nil {
return nil, nil, err
}
jar, err := cookiejar.New(nil)
if err != nil {
return nil, nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(followURL, []*http.Cookie{{
Name: SessionTokenCookie,
Value: c.SessionToken(),
}})
httpClient := &http.Client{
Jar: jar,
Transport: c.HTTPClient.Transport,
}
conn, res, err := websocket.Dial(ctx, followURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
CompressionMode: websocket.CompressionDisabled,
})
if err != nil {
if res == nil {
return nil, nil, err
}
return nil, nil, ReadBodyAsError(res)
}
logs := make(chan ProvisionerJobLog)
closed := make(chan struct{})
go func() {
defer close(closed)
defer close(logs)
defer conn.Close(websocket.StatusGoingAway, "")
var log ProvisionerJobLog
for {
msgType, msg, err := conn.Read(ctx)
if err != nil {
return
}
if msgType != websocket.MessageText {
return
}
err = json.Unmarshal(msg, &log)
if err != nil {
return
}
select {
case <-ctx.Done():
return
case logs <- log:
}
}
}()
return logs, closeFunc(func() error {
<-closed
return nil
}), nil
}
// ServeProvisionerDaemonRequest are the parameters to call ServeProvisionerDaemon with
// @typescript-ignore ServeProvisionerDaemonRequest
type ServeProvisionerDaemonRequest struct {
// ID is a unique ID for a provisioner daemon.
ID uuid.UUID `json:"id" format:"uuid"`
// Name is the human-readable unique identifier for the daemon.
Name string `json:"name" example:"my-cool-provisioner-daemon"`
// Organization is the organization for the URL. If no orgID is provided,
// then it is assumed to use the default organization.
Organization uuid.UUID `json:"organization" format:"uuid"`
// Provisioners is a list of provisioner types hosted by the provisioner daemon
Provisioners []ProvisionerType `json:"provisioners"`
// Tags is a map of key-value pairs that tag the jobs this provisioner daemon can handle
Tags map[string]string `json:"tags"`
// PreSharedKey is an authentication key to use on the API instead of the normal session token from the client.
PreSharedKey string `json:"pre_shared_key"`
}
// ServeProvisionerDaemon returns the gRPC service for a provisioner daemon
// implementation. The context is during dial, not during the lifetime of the
// client. Client should be closed after use.
func (c *Client) ServeProvisionerDaemon(ctx context.Context, req ServeProvisionerDaemonRequest) (proto.DRPCProvisionerDaemonClient, error) {
orgParam := req.Organization.String()
if req.Organization == uuid.Nil {
orgParam = DefaultOrganization
}
serverURL, err := c.URL.Parse(fmt.Sprintf("/api/v2/organizations/%s/provisionerdaemons/serve", orgParam))
if err != nil {
return nil, xerrors.Errorf("parse url: %w", err)
}
query := serverURL.Query()
query.Add("version", proto.CurrentVersion.String())
query.Add("id", req.ID.String())
query.Add("name", req.Name)
query.Add("version", proto.CurrentVersion.String())
for _, provisioner := range req.Provisioners {
query.Add("provisioner", string(provisioner))
}
for key, value := range req.Tags {
query.Add("tag", fmt.Sprintf("%s=%s", key, value))
}
serverURL.RawQuery = query.Encode()
httpClient := &http.Client{
Transport: c.HTTPClient.Transport,
}
headers := http.Header{}
headers.Set(BuildVersionHeader, buildinfo.Version())
if req.PreSharedKey == "" {
// use session token if we don't have a PSK.
jar, err := cookiejar.New(nil)
if err != nil {
return nil, xerrors.Errorf("create cookie jar: %w", err)
}
jar.SetCookies(serverURL, []*http.Cookie{{
Name: SessionTokenCookie,
Value: c.SessionToken(),
}})
httpClient.Jar = jar
} else {
headers.Set(ProvisionerDaemonPSK, req.PreSharedKey)
}
conn, res, err := websocket.Dial(ctx, serverURL.String(), &websocket.DialOptions{
HTTPClient: httpClient,
// Need to disable compression to avoid a data-race.
CompressionMode: websocket.CompressionDisabled,
HTTPHeader: headers,
})
if err != nil {
if res == nil {
return nil, err
}
return nil, ReadBodyAsError(res)
}
// Align with the frame size of yamux.
conn.SetReadLimit(256 * 1024)
config := yamux.DefaultConfig()
config.LogOutput = io.Discard
// Use background context because caller should close the client.
_, wsNetConn := WebsocketNetConn(context.Background(), conn, websocket.MessageBinary)
session, err := yamux.Client(wsNetConn, config)
if err != nil {
_ = conn.Close(websocket.StatusGoingAway, "")
_ = wsNetConn.Close()
return nil, xerrors.Errorf("multiplex client: %w", err)
}
return proto.NewDRPCProvisionerDaemonClient(drpc.MultiplexedConn(session)), nil
}