tavern/publisher/command.go

153 lines
3.5 KiB
Go

package publisher
import (
"context"
"net/http"
"os"
"os/signal"
"runtime"
"time"
"github.com/getsentry/sentry-go"
sentrygin "github.com/getsentry/sentry-go/gin"
ginzap "github.com/gin-contrib/zap"
"github.com/gin-gonic/gin"
"github.com/oklog/run"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/config"
"github.com/ngerakines/tavern/g"
"github.com/ngerakines/tavern/metrics"
)
var Command = cli.Command{
Name: "publisher",
Usage: "Run the publisher server.",
Flags: []cli.Flag{
&config.ListenFlag,
&config.EnvironmentFlag,
&config.PublisherCallbackLocationFlag,
&config.EnableSentryFlag,
&config.SentryFlag,
},
Action: publisherCommandAction,
}
func publisherCommandAction(cliCtx *cli.Context) error {
logger, err := config.Logger(cliCtx)
if err != nil {
return err
}
logger.Info("Starting",
zap.String("command", cliCtx.Command.Name),
zap.String("GOOS", runtime.GOOS),
zap.String("env", cliCtx.String("environment")))
sentryConfig, err := config.NewSentryConfig(cliCtx)
if err != nil {
return err
}
if sentryConfig.Enabled {
err = sentry.Init(sentry.ClientOptions{
Dsn: sentryConfig.Key,
Environment: cliCtx.String("environment"),
Release: g.Version(),
})
if err != nil {
return err
}
sentry.ConfigureScope(func(scope *sentry.Scope) {
scope.SetTags(map[string]string{"container": "publisher"})
})
defer sentry.Recover()
}
r := gin.New()
if sentryConfig.Enabled {
r.Use(sentrygin.New(sentrygin.Options{
Repanic: true,
}))
}
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
r.GET("/", func(i *gin.Context) {
i.Data(200, "text/plain", []byte("OK"))
})
registry := prometheus.NewRegistry()
if err = registry.Register(prometheus.NewGoCollector()); err != nil {
return err
}
fact := promauto.With(registry)
httpClient := common.InstrumentedDefaultHTTPClient(fact, "publisher", "client")
mm := metrics.NewMetricsMiddleware("publisher", "server", promauto.With(registry))
r.Use(mm.Handle)
q := &queue{}
p := &publisher{
q: q,
httpClient: httpClient,
logger: logger,
}
houndCB := tavernCallback{
location: cliCtx.String("publisher-callback"),
httpClient: httpClient,
}
p.callback = houndCB.callback
h := handlers{q: q,}
r.POST("/", h.handleSubmission)
promhandler := promhttp.InstrumentMetricHandler(
registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
)
r.GET("/metrics", func(c *gin.Context) {
promhandler.ServeHTTP(c.Writer, c.Request)
})
var group run.Group
srv := &http.Server{
Addr: config.ListenAddress(cliCtx),
Handler: r,
}
group.Add(func() error {
logger.Info("starting http service", zap.String("address", srv.Addr))
return srv.ListenAndServe()
}, func(error) {
httpCancelCtx, httpCancelCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
defer httpCancelCtxCancel()
logger.Info("stopping http service")
if err := srv.Shutdown(httpCancelCtx); err != nil {
logger.Info("error stopping http service:", zap.Error(err))
}
})
RunWorker(&group, logger, p)
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
group.Add(func() error {
logger.Info("starting signal listener")
<-quit
return nil
}, func(error) {
logger.Info("stopping signal listener")
close(quit)
})
return group.Run()
}