mirror of https://gitlab.com/ngerakines/tavern.git
153 lines
3.5 KiB
Go
153 lines
3.5 KiB
Go
package publisher
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/getsentry/sentry-go"
|
|
sentrygin "github.com/getsentry/sentry-go/gin"
|
|
ginzap "github.com/gin-contrib/zap"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/oklog/run"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/urfave/cli/v2"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ngerakines/tavern/common"
|
|
"github.com/ngerakines/tavern/config"
|
|
"github.com/ngerakines/tavern/g"
|
|
"github.com/ngerakines/tavern/metrics"
|
|
)
|
|
|
|
var Command = cli.Command{
|
|
Name: "publisher",
|
|
Usage: "Run the publisher server.",
|
|
Flags: []cli.Flag{
|
|
&config.ListenFlag,
|
|
&config.EnvironmentFlag,
|
|
&config.PublisherCallbackLocationFlag,
|
|
&config.EnableSentryFlag,
|
|
&config.SentryFlag,
|
|
},
|
|
Action: publisherCommandAction,
|
|
}
|
|
|
|
func publisherCommandAction(cliCtx *cli.Context) error {
|
|
logger, err := config.Logger(cliCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
logger.Info("Starting",
|
|
zap.String("command", cliCtx.Command.Name),
|
|
zap.String("GOOS", runtime.GOOS),
|
|
zap.String("env", cliCtx.String("environment")))
|
|
|
|
sentryConfig, err := config.NewSentryConfig(cliCtx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if sentryConfig.Enabled {
|
|
err = sentry.Init(sentry.ClientOptions{
|
|
Dsn: sentryConfig.Key,
|
|
Environment: cliCtx.String("environment"),
|
|
Release: g.Version(),
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sentry.ConfigureScope(func(scope *sentry.Scope) {
|
|
scope.SetTags(map[string]string{"container": "publisher"})
|
|
})
|
|
defer sentry.Recover()
|
|
}
|
|
|
|
r := gin.New()
|
|
if sentryConfig.Enabled {
|
|
r.Use(sentrygin.New(sentrygin.Options{
|
|
Repanic: true,
|
|
}))
|
|
}
|
|
|
|
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
|
|
|
|
r.GET("/", func(i *gin.Context) {
|
|
i.Data(200, "text/plain", []byte("OK"))
|
|
})
|
|
|
|
registry := prometheus.NewRegistry()
|
|
if err = registry.Register(prometheus.NewGoCollector()); err != nil {
|
|
return err
|
|
}
|
|
fact := promauto.With(registry)
|
|
httpClient := common.InstrumentedDefaultHTTPClient(fact, "publisher", "client")
|
|
|
|
mm := metrics.NewMetricsMiddleware("publisher", "server", promauto.With(registry))
|
|
r.Use(mm.Handle)
|
|
|
|
q := &queue{}
|
|
p := &publisher{
|
|
q: q,
|
|
httpClient: httpClient,
|
|
logger: logger,
|
|
}
|
|
houndCB := tavernCallback{
|
|
location: cliCtx.String("publisher-callback"),
|
|
httpClient: httpClient,
|
|
}
|
|
p.callback = houndCB.callback
|
|
|
|
h := handlers{q: q,}
|
|
|
|
r.POST("/", h.handleSubmission)
|
|
|
|
promhandler := promhttp.InstrumentMetricHandler(
|
|
registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
|
|
)
|
|
|
|
r.GET("/metrics", func(c *gin.Context) {
|
|
promhandler.ServeHTTP(c.Writer, c.Request)
|
|
})
|
|
|
|
var group run.Group
|
|
|
|
srv := &http.Server{
|
|
Addr: config.ListenAddress(cliCtx),
|
|
Handler: r,
|
|
}
|
|
|
|
group.Add(func() error {
|
|
logger.Info("starting http service", zap.String("address", srv.Addr))
|
|
return srv.ListenAndServe()
|
|
}, func(error) {
|
|
httpCancelCtx, httpCancelCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer httpCancelCtxCancel()
|
|
logger.Info("stopping http service")
|
|
if err := srv.Shutdown(httpCancelCtx); err != nil {
|
|
logger.Info("error stopping http service:", zap.Error(err))
|
|
}
|
|
})
|
|
|
|
RunWorker(&group, logger, p)
|
|
|
|
quit := make(chan os.Signal)
|
|
signal.Notify(quit, os.Interrupt)
|
|
group.Add(func() error {
|
|
logger.Info("starting signal listener")
|
|
<-quit
|
|
return nil
|
|
}, func(error) {
|
|
logger.Info("stopping signal listener")
|
|
close(quit)
|
|
})
|
|
|
|
return group.Run()
|
|
}
|