mirror of https://gitlab.com/ngerakines/tavern.git
Added publisher command and integrations from server to publisher. Closes #66.
This commit is contained in:
parent
8a9fbabc29
commit
26a0811a3f
|
@ -0,0 +1,38 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
var EnablePublisherFlag = cli.BoolFlag{
|
||||
Name: "enable-publisher",
|
||||
Usage: "Enable publisher integration",
|
||||
EnvVars: []string{"ENABLE_PUBLISHER"},
|
||||
Value: false,
|
||||
}
|
||||
|
||||
var PublisherLocationFlag = cli.StringFlag{
|
||||
Name: "publisher-location",
|
||||
Usage: "The publisher to interact with.",
|
||||
EnvVars: []string{"PUBLISHER"},
|
||||
Value: "http://localhost:9200/",
|
||||
}
|
||||
|
||||
var PublisherCallbackLocationFlag = cli.StringFlag{
|
||||
Name: "publisher-callback",
|
||||
Usage: "The publisher to interact with.",
|
||||
EnvVars: []string{"PUBLISHER_CALLBACK"},
|
||||
Value: "http://localhost:8000/webhooks/publisher",
|
||||
}
|
||||
|
||||
type PublisherConfig struct {
|
||||
Enabled bool
|
||||
Location string
|
||||
}
|
||||
|
||||
func NewPublisherConfig(cliCtx *cli.Context) PublisherConfig {
|
||||
return PublisherConfig{
|
||||
Enabled: cliCtx.Bool("enable-publisher"),
|
||||
Location: cliCtx.String("publisher-location"),
|
||||
}
|
||||
}
|
|
@ -20,27 +20,6 @@ type hasInbox interface {
|
|||
GetInbox() string
|
||||
}
|
||||
|
||||
type fakeInbox string
|
||||
|
||||
func (f fakeInbox) GetInbox() string {
|
||||
return string(f)
|
||||
}
|
||||
|
||||
// func (client ActorClient) Broadcast(ctx context.Context, store storage.Storage, localActor storage.LocalActor, payload []byte) error {
|
||||
// // TODO: Remove this hard-coded limit of 100 followers to broadcast to.
|
||||
// followers, err := store.ListAcceptedFollowers(ctx, localActor.User.ID, 100, 0)
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// actors, err := store.ActorsByActorID(ctx, followers)
|
||||
//
|
||||
// destinations := make([]hasInbox, 0)
|
||||
// for _, actor := range actors {
|
||||
// destinations = append(destinations, actor)
|
||||
// }
|
||||
// return client.SendToInboxes(ctx, localActor, destinations, payload)
|
||||
// }
|
||||
|
||||
func (client ActorClient) SendToInboxes(ctx context.Context, localActor storage.LocalActor, actors []hasInbox, payload []byte) error {
|
||||
for _, actor := range actors {
|
||||
client.Logger.Info("sending payload to follower", zap.String("inbox", actor.GetInbox()))
|
||||
|
|
2
main.go
2
main.go
|
@ -12,6 +12,7 @@ import (
|
|||
"github.com/ngerakines/tavern/g"
|
||||
"github.com/ngerakines/tavern/json"
|
||||
"github.com/ngerakines/tavern/migrations"
|
||||
"github.com/ngerakines/tavern/publisher"
|
||||
"github.com/ngerakines/tavern/start"
|
||||
"github.com/ngerakines/tavern/web"
|
||||
|
||||
|
@ -51,6 +52,7 @@ func main() {
|
|||
&asset.Command,
|
||||
&migrations.Command,
|
||||
&json.DebugJSONLDCommand,
|
||||
&publisher.Command,
|
||||
}
|
||||
|
||||
sort.Sort(cli.FlagsByName(app.Flags))
|
||||
|
|
|
@ -1,30 +1,33 @@
|
|||
package web
|
||||
package metrics
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
)
|
||||
|
||||
type metricsMiddleware struct {
|
||||
requests *prometheus.CounterVec
|
||||
totalTime *prometheus.HistogramVec
|
||||
requestSize *prometheus.SummaryVec
|
||||
responseSize *prometheus.SummaryVec
|
||||
type MetricsMiddleware struct {
|
||||
Requests *prometheus.CounterVec
|
||||
TotalTime *prometheus.HistogramVec
|
||||
RequestSize *prometheus.SummaryVec
|
||||
ResponseSize *prometheus.SummaryVec
|
||||
}
|
||||
|
||||
func newMetricsMiddleware(namespace, subsystem string, metricFactory promauto.Factory) metricsMiddleware {
|
||||
return metricsMiddleware{
|
||||
requests: metricFactory.NewCounterVec(
|
||||
func NewMetricsMiddleware(namespace, subsystem string, metricFactory promauto.Factory) MetricsMiddleware {
|
||||
return MetricsMiddleware{
|
||||
Requests: metricFactory.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
Name: "request_total",
|
||||
Help: "Total number of HTTP requests made.",
|
||||
Help: "Total number of HTTP Requests made.",
|
||||
}, []string{"status", "endpoint", "method"},
|
||||
),
|
||||
totalTime: metricFactory.NewHistogramVec(
|
||||
TotalTime: metricFactory.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -32,7 +35,7 @@ func newMetricsMiddleware(namespace, subsystem string, metricFactory promauto.Fa
|
|||
Help: "HTTP request latencies in seconds.",
|
||||
}, []string{"status", "endpoint", "method"},
|
||||
),
|
||||
requestSize: metricFactory.NewSummaryVec(
|
||||
RequestSize: metricFactory.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -40,7 +43,7 @@ func newMetricsMiddleware(namespace, subsystem string, metricFactory promauto.Fa
|
|||
Help: "HTTP request sizes in bytes.",
|
||||
}, []string{"status", "endpoint", "method"},
|
||||
),
|
||||
responseSize: metricFactory.NewSummaryVec(
|
||||
ResponseSize: metricFactory.NewSummaryVec(
|
||||
prometheus.SummaryOpts{
|
||||
Namespace: namespace,
|
||||
Subsystem: subsystem,
|
||||
|
@ -51,8 +54,24 @@ func newMetricsMiddleware(namespace, subsystem string, metricFactory promauto.Fa
|
|||
}
|
||||
}
|
||||
|
||||
// calcRequestSize returns the size of request object.
|
||||
func calcRequestSize(r *http.Request) float64 {
|
||||
func (mm MetricsMiddleware) Handle(c *gin.Context) {
|
||||
start := time.Now()
|
||||
c.Next()
|
||||
|
||||
status := fmt.Sprintf("%d", c.Writer.Status())
|
||||
endpoint := c.FullPath()
|
||||
method := c.Request.Method
|
||||
|
||||
lvs := []string{status, endpoint, method}
|
||||
|
||||
mm.Requests.WithLabelValues(lvs...).Inc()
|
||||
mm.TotalTime.WithLabelValues(lvs...).Observe(time.Since(start).Seconds())
|
||||
mm.RequestSize.WithLabelValues(lvs...).Observe(CalcRequestSize(c.Request))
|
||||
mm.ResponseSize.WithLabelValues(lvs...).Observe(float64(c.Writer.Size()))
|
||||
}
|
||||
|
||||
// CalcRequestSize returns the size of request object.
|
||||
func CalcRequestSize(r *http.Request) float64 {
|
||||
size := 0
|
||||
if r.URL != nil {
|
||||
size = len(r.URL.String())
|
|
@ -0,0 +1,61 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ngerakines/tavern/common"
|
||||
)
|
||||
|
||||
type tavernCallback struct {
|
||||
location string
|
||||
httpClient common.HTTPClient
|
||||
}
|
||||
|
||||
type callback func(string, string, error) error
|
||||
|
||||
func (cb tavernCallback) callback(activityID, destination string, err error) error {
|
||||
obj := map[string]string{
|
||||
"destination": destination,
|
||||
"activity": activityID,
|
||||
"status": "ok",
|
||||
}
|
||||
if err != nil {
|
||||
obj["status"] = "error"
|
||||
obj["error"] = err.Error()
|
||||
}
|
||||
|
||||
data, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("POST", cb.location, bytes.NewReader(data))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Add("content-type", "application/json")
|
||||
req.Header.Add("date", time.Now().UTC().Format(http.TimeFormat))
|
||||
|
||||
resp, err := cb.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 500000))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,149 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/getsentry/sentry-go"
|
||||
sentrygin "github.com/getsentry/sentry-go/gin"
|
||||
ginzap "github.com/gin-contrib/zap"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/oklog/run"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"github.com/urfave/cli/v2"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ngerakines/tavern/common"
|
||||
"github.com/ngerakines/tavern/config"
|
||||
"github.com/ngerakines/tavern/g"
|
||||
"github.com/ngerakines/tavern/metrics"
|
||||
)
|
||||
|
||||
var Command = cli.Command{
|
||||
Name: "publisher",
|
||||
Usage: "Run the publisher server.",
|
||||
Flags: []cli.Flag{
|
||||
&config.ListenFlag,
|
||||
&config.EnvironmentFlag,
|
||||
&config.PublisherCallbackLocationFlag,
|
||||
},
|
||||
Action: publisherCommandAction,
|
||||
}
|
||||
|
||||
func publisherCommandAction(cliCtx *cli.Context) error {
|
||||
logger, err := config.Logger(cliCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Starting",
|
||||
zap.String("command", cliCtx.Command.Name),
|
||||
zap.String("GOOS", runtime.GOOS),
|
||||
zap.String("env", cliCtx.String("environment")))
|
||||
|
||||
sentryConfig, err := config.NewSentryConfig(cliCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if sentryConfig.Enabled {
|
||||
err = sentry.Init(sentry.ClientOptions{
|
||||
Dsn: sentryConfig.Key,
|
||||
Environment: cliCtx.String("environment"),
|
||||
Release: g.Version(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sentry.ConfigureScope(func(scope *sentry.Scope) {
|
||||
scope.SetTags(map[string]string{"container": "publisher"})
|
||||
})
|
||||
defer sentry.Recover()
|
||||
}
|
||||
|
||||
r := gin.New()
|
||||
if sentryConfig.Enabled {
|
||||
r.Use(sentrygin.New(sentrygin.Options{
|
||||
Repanic: true,
|
||||
}))
|
||||
}
|
||||
|
||||
r.Use(ginzap.Ginzap(logger, time.RFC3339, true))
|
||||
|
||||
r.GET("/", func(i *gin.Context) {
|
||||
i.Data(200, "text/plain", []byte("OK"))
|
||||
})
|
||||
|
||||
registry := prometheus.NewRegistry()
|
||||
if err = registry.Register(prometheus.NewGoCollector()); err != nil {
|
||||
return err
|
||||
}
|
||||
fact := promauto.With(registry)
|
||||
httpClient := common.InstrumentedDefaultHTTPClient(fact, "publisher", "client")
|
||||
|
||||
mm := metrics.NewMetricsMiddleware("publisher", "server", promauto.With(registry))
|
||||
r.Use(mm.Handle)
|
||||
|
||||
q := &queue{}
|
||||
p := &publisher{
|
||||
q: q,
|
||||
httpClient: httpClient,
|
||||
}
|
||||
houndCB := tavernCallback{
|
||||
location: cliCtx.String("publisher-callback"),
|
||||
httpClient: httpClient,
|
||||
}
|
||||
p.callback = houndCB.callback
|
||||
|
||||
h := handlers{q: q,}
|
||||
|
||||
r.POST("/", h.handleSubmission)
|
||||
|
||||
promhandler := promhttp.InstrumentMetricHandler(
|
||||
registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
|
||||
)
|
||||
|
||||
r.GET("/metrics", func(c *gin.Context) {
|
||||
promhandler.ServeHTTP(c.Writer, c.Request)
|
||||
})
|
||||
|
||||
var group run.Group
|
||||
|
||||
srv := &http.Server{
|
||||
Addr: config.ListenAddress(cliCtx),
|
||||
Handler: r,
|
||||
}
|
||||
|
||||
group.Add(func() error {
|
||||
logger.Info("starting http service", zap.String("address", srv.Addr))
|
||||
return srv.ListenAndServe()
|
||||
}, func(error) {
|
||||
httpCancelCtx, httpCancelCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer httpCancelCtxCancel()
|
||||
logger.Info("stopping http service")
|
||||
if err := srv.Shutdown(httpCancelCtx); err != nil {
|
||||
logger.Info("error stopping http service:", zap.Error(err))
|
||||
}
|
||||
})
|
||||
|
||||
RunWorker(&group, logger, p)
|
||||
|
||||
quit := make(chan os.Signal)
|
||||
signal.Notify(quit, os.Interrupt)
|
||||
group.Add(func() error {
|
||||
logger.Info("starting signal listener")
|
||||
<-quit
|
||||
return nil
|
||||
}, func(error) {
|
||||
logger.Info("stopping signal listener")
|
||||
close(quit)
|
||||
})
|
||||
|
||||
return group.Run()
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type handlers struct {
|
||||
q *queue
|
||||
}
|
||||
|
||||
type submission struct {
|
||||
Key string `form:"key" binding:"required"`
|
||||
KeyID string `form:"key_id" binding:"required"`
|
||||
Destination string `form:"destination" binding:"required"`
|
||||
ActivityID string `form:"activity_id" binding:"required"`
|
||||
Payload string `form:"payload" binding:"required"`
|
||||
}
|
||||
|
||||
func (h handlers) handleSubmission(c *gin.Context) {
|
||||
var s submission
|
||||
if err := c.ShouldBind(&s); err != nil {
|
||||
c.String(http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
err := h.q.Add(s.Destination, s.KeyID, s.Key, s.ActivityID, s.Payload)
|
||||
if err != nil {
|
||||
c.String(http.StatusInternalServerError, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
c.Status(http.StatusAccepted)
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/yukimochi/httpsig"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ngerakines/tavern/common"
|
||||
)
|
||||
|
||||
type publisher struct {
|
||||
logger *zap.Logger
|
||||
q *queue
|
||||
callback callback
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
httpClient common.HTTPClient
|
||||
}
|
||||
|
||||
func (r *publisher) Run() error {
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
defer r.cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(time.Second):
|
||||
if err := r.work(); err != nil {
|
||||
r.logger.Error("error processing work", zap.Error(err))
|
||||
}
|
||||
case <-r.ctx.Done():
|
||||
return ignoreCanceled(r.ctx.Err())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *publisher) Shutdown(parent context.Context) error {
|
||||
r.cancel()
|
||||
select {
|
||||
case <-parent.Done():
|
||||
return parent.Err()
|
||||
case <-r.ctx.Done():
|
||||
return r.ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *publisher) work() error {
|
||||
jobs := r.q.Take()
|
||||
if len(jobs) == 0 {
|
||||
return nil
|
||||
}
|
||||
destination := jobs[0].Destination
|
||||
activityID := jobs[0].ActivityID
|
||||
|
||||
sigConfig := []httpsig.Algorithm{httpsig.RSA_SHA256}
|
||||
headersToSign := []string{httpsig.RequestTarget, "date" ,"digest", "content-type"}
|
||||
signer, algo, err := httpsig.NewSigner(sigConfig, headersToSign, httpsig.Signature)
|
||||
if err != nil {
|
||||
return r.callback(activityID, destination, err)
|
||||
}
|
||||
|
||||
dateHeader := time.Now().UTC().Format(http.TimeFormat)
|
||||
for i, job := range jobs {
|
||||
if i > 0 {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
checksum := sha256.Sum256([]byte(job.Payload))
|
||||
digest := base64.StdEncoding.EncodeToString(checksum[:])
|
||||
|
||||
request, err := http.NewRequest("POST", job.Destination, strings.NewReader(job.Payload))
|
||||
if err != nil {
|
||||
return r.callback(activityID, destination, err)
|
||||
}
|
||||
request.Header.Add("content-type", "application/json")
|
||||
request.Header.Add("date", dateHeader)
|
||||
request.Header.Add("digest", fmt.Sprintf("SHA-256=%s", digest))
|
||||
request.Header.Add("X-ALG", string(algo))
|
||||
if err = signer.SignRequest(job.PrivateKey, job.KeyID, request); err != nil {
|
||||
return r.callback(activityID, destination, err)
|
||||
}
|
||||
resp, err := r.httpClient.Do(request)
|
||||
if err != nil {
|
||||
return r.callback(activityID, destination, err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||
continue
|
||||
}
|
||||
|
||||
lr := io.LimitReader(resp.Body, 500000)
|
||||
|
||||
data, err := ioutil.ReadAll(lr)
|
||||
if err != nil {
|
||||
return r.callback(activityID, destination, err)
|
||||
}
|
||||
|
||||
return r.callback(activityID, destination, fmt.Errorf("unexpected response: %d %s", resp.StatusCode, string(data)))
|
||||
}
|
||||
|
||||
return r.callback(activityID, destination, nil)
|
||||
}
|
|
@ -0,0 +1,140 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"crypto/rsa"
|
||||
"crypto/x509"
|
||||
"encoding/pem"
|
||||
"sync"
|
||||
|
||||
"github.com/ngerakines/tavern/errors"
|
||||
)
|
||||
|
||||
type queue struct {
|
||||
lock sync.Mutex
|
||||
front *node
|
||||
}
|
||||
|
||||
type node struct {
|
||||
Next *node
|
||||
|
||||
Destination string
|
||||
PrivateKey *rsa.PrivateKey
|
||||
KeyID string
|
||||
Payloads map[string]string
|
||||
}
|
||||
|
||||
type jobInfo struct {
|
||||
Destination string
|
||||
PrivateKey *rsa.PrivateKey
|
||||
KeyID string
|
||||
ActivityID string
|
||||
Payload string
|
||||
}
|
||||
|
||||
func (q *queue) Take() []jobInfo {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
if q.front == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
jobs := make([]jobInfo, 0)
|
||||
for activityID, payload := range q.front.Payloads {
|
||||
jobs = append(jobs, jobInfo{
|
||||
Destination: q.front.Destination,
|
||||
PrivateKey: q.front.PrivateKey,
|
||||
KeyID: q.front.KeyID,
|
||||
ActivityID: activityID,
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
q.front = q.front.Next
|
||||
|
||||
return jobs
|
||||
}
|
||||
|
||||
func (q *queue) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
|
||||
privateKey, err := decodePrivateKey(privateKeyPEM)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if q.front != nil {
|
||||
return q.front.Add(destination, keyID, privateKeyPEM, activityID, payload)
|
||||
}
|
||||
|
||||
q.front = &node{
|
||||
Next: nil,
|
||||
Destination: destination,
|
||||
KeyID: keyID,
|
||||
PrivateKey: privateKey,
|
||||
Payloads: map[string]string{
|
||||
activityID: payload,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *queue) prune() {
|
||||
current := q.front
|
||||
|
||||
for current != nil {
|
||||
current = current.fastForward()
|
||||
current = current.Next
|
||||
}
|
||||
}
|
||||
|
||||
func (n *node) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
|
||||
if n.Destination == destination {
|
||||
n.Payloads[activityID] = payload
|
||||
return nil
|
||||
}
|
||||
if n.Next != nil {
|
||||
return n.Next.Add(destination, keyID, privateKeyPEM, activityID, payload)
|
||||
}
|
||||
|
||||
privateKey, err := decodePrivateKey(privateKeyPEM)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n.Next = &node{
|
||||
Next: nil,
|
||||
Destination: destination,
|
||||
KeyID: keyID,
|
||||
PrivateKey: privateKey,
|
||||
Payloads: map[string]string{
|
||||
activityID: payload,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *node) fastForward() *node {
|
||||
if n.Next == nil {
|
||||
return nil
|
||||
}
|
||||
if len(n.Next.Payloads) > 0 {
|
||||
return n.Next
|
||||
}
|
||||
return n.Next.fastForward()
|
||||
}
|
||||
|
||||
func decodePrivateKey(input string) (*rsa.PrivateKey, error) {
|
||||
block, _ := pem.Decode([]byte(input))
|
||||
if block == nil {
|
||||
return nil, errors.New("invalid RSA PEM")
|
||||
}
|
||||
|
||||
key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return key, nil
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/oklog/run"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Worker interface {
|
||||
Run() error
|
||||
Shutdown(context.Context) error
|
||||
}
|
||||
|
||||
func RunWorker(group *run.Group, logger *zap.Logger, w Worker) {
|
||||
group.Add(func() error {
|
||||
logger.Info("starting publisher")
|
||||
return ignoreCanceled(w.Run())
|
||||
}, func(error) {
|
||||
shutdownCtx, shutdownCtxCancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer shutdownCtxCancel()
|
||||
logger.Info("stopping publisher")
|
||||
err := ignoreCanceled(w.Shutdown(shutdownCtx))
|
||||
if err != nil {
|
||||
logger.Error("error stopping publisher", zap.Error(err))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func ignoreCanceled(err error) error {
|
||||
if err == nil || err == context.Canceled {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/ngerakines/tavern/config"
|
||||
"github.com/ngerakines/tavern/g"
|
||||
"github.com/ngerakines/tavern/job"
|
||||
"github.com/ngerakines/tavern/metrics"
|
||||
"github.com/ngerakines/tavern/storage"
|
||||
)
|
||||
|
||||
|
@ -66,6 +67,9 @@ var Command = cli.Command{
|
|||
|
||||
&config.AllowReplyCollectionUpdatesFlag,
|
||||
&config.AllowAutoAcceptFollowersFlag,
|
||||
|
||||
&config.EnablePublisherFlag,
|
||||
&config.PublisherLocationFlag,
|
||||
},
|
||||
Action: serverCommandAction,
|
||||
}
|
||||
|
@ -101,6 +105,8 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
publisherConfig := config.NewPublisherConfig(cliCtx)
|
||||
|
||||
if sentryConfig.Enabled {
|
||||
err = sentry.Init(sentry.ClientOptions{
|
||||
Dsn: sentryConfig.Key,
|
||||
|
@ -236,41 +242,27 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
}
|
||||
|
||||
h := handler{
|
||||
storage: s,
|
||||
logger: logger,
|
||||
domain: domain,
|
||||
sentryConfig: sentryConfig,
|
||||
fedConfig: fedConfig,
|
||||
webFingerQueue: webFingerQueue,
|
||||
crawlQueue: crawlQueue,
|
||||
assetQueue: assetQueue,
|
||||
adminUser: cliCtx.String("admin-name"),
|
||||
url: tmplUrlGen(siteBase),
|
||||
svgConverter: svgConv,
|
||||
assetStorage: assetStorage,
|
||||
httpClient: httpClient,
|
||||
metricFactory: promauto.With(registry),
|
||||
storage: s,
|
||||
logger: logger,
|
||||
domain: domain,
|
||||
sentryConfig: sentryConfig,
|
||||
fedConfig: fedConfig,
|
||||
webFingerQueue: webFingerQueue,
|
||||
crawlQueue: crawlQueue,
|
||||
assetQueue: assetQueue,
|
||||
adminUser: cliCtx.String("admin-name"),
|
||||
url: tmplUrlGen(siteBase),
|
||||
svgConverter: svgConv,
|
||||
assetStorage: assetStorage,
|
||||
httpClient: httpClient,
|
||||
metricFactory: promauto.With(registry),
|
||||
publisherClient: newPublisherClient(logger, httpClient, publisherConfig),
|
||||
}
|
||||
|
||||
configI18nMiddleware(sentryConfig, logger, utrans, domain, r)
|
||||
|
||||
mm := newMetricsMiddleware("web", "server", promauto.With(registry))
|
||||
|
||||
r.Use(func(c *gin.Context) {
|
||||
start := time.Now()
|
||||
c.Next()
|
||||
|
||||
status := fmt.Sprintf("%d", c.Writer.Status())
|
||||
endpoint := c.FullPath()
|
||||
method := c.Request.Method
|
||||
|
||||
lvs := []string{status, endpoint, method}
|
||||
|
||||
mm.requests.WithLabelValues(lvs...).Inc()
|
||||
mm.totalTime.WithLabelValues(lvs...).Observe(time.Since(start).Seconds())
|
||||
mm.requestSize.WithLabelValues(lvs...).Observe(calcRequestSize(c.Request))
|
||||
mm.responseSize.WithLabelValues(lvs...).Observe(float64(c.Writer.Size()))
|
||||
})
|
||||
mm := metrics.NewMetricsMiddleware("web", "server", promauto.With(registry))
|
||||
r.Use(mm.Handle)
|
||||
|
||||
root := r.Group("/")
|
||||
{
|
||||
|
@ -293,6 +285,10 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
root.GET("/asset/thumbnail/:checksum", h.viewThumbnail)
|
||||
root.GET("/asset/blurhash/:blurHash", h.viewBlur)
|
||||
|
||||
if publisherConfig.Enabled {
|
||||
r.POST("/webhooks/publisher", h.publisherWebhook)
|
||||
}
|
||||
|
||||
promhandler := promhttp.InstrumentMetricHandler(
|
||||
registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}),
|
||||
)
|
||||
|
|
|
@ -34,6 +34,8 @@ type handler struct {
|
|||
|
||||
httpClient common.HTTPClient
|
||||
metricFactory promauto.Factory
|
||||
|
||||
publisherClient *publisherClient
|
||||
}
|
||||
|
||||
func (h handler) hardFail(ctx *gin.Context, err error, fields ...zap.Field) {
|
||||
|
|
|
@ -441,9 +441,16 @@ func (h handler) createNote(c *gin.Context) {
|
|||
h.logger.Error("unable to get or fetch actor", zap.Error(err), zap.String("actor", dest))
|
||||
continue
|
||||
}
|
||||
err = nc.SendToInbox(ctx, localActor, foundActor, payload)
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL))
|
||||
if h.publisherClient != nil {
|
||||
err = h.publisherClient.Send(context.Background(), foundActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityURL, string(payload))
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
err = nc.SendToInbox(ctx, localActor, foundActor, payload)
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -455,9 +462,16 @@ func (h handler) createNote(c *gin.Context) {
|
|||
h.logger.Error("unable to get or fetch actor", zap.Error(err), zap.String("actor", dest))
|
||||
continue
|
||||
}
|
||||
err = nc.SendToInbox(ctx, localActor, foundActor, payload)
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL))
|
||||
if h.publisherClient != nil {
|
||||
err = h.publisherClient.Send(context.Background(), foundActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityURL, string(payload))
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
|
||||
}
|
||||
} else {
|
||||
err = nc.SendToInbox(ctx, localActor, foundActor, payload)
|
||||
if err != nil {
|
||||
h.logger.Error("failed sending to actor", zap.String("target", foundActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
package web
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type publisherEvent struct {
|
||||
Activity string `json:"activity"`
|
||||
Destination string `json:"destination"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
func (h handler) publisherWebhook(c *gin.Context) {
|
||||
var e publisherEvent
|
||||
err := c.ShouldBindJSON(&e)
|
||||
if err != nil {
|
||||
h.logger.Warn("unable to parse publisher webhook event", zap.Error(err))
|
||||
c.Status(http.StatusOK)
|
||||
} else {
|
||||
h.logger.Debug("publisher webhook event received", zap.Reflect("event", e))
|
||||
}
|
||||
|
||||
c.Status(http.StatusOK)
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package web
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ngerakines/tavern/common"
|
||||
"github.com/ngerakines/tavern/config"
|
||||
"github.com/ngerakines/tavern/g"
|
||||
)
|
||||
|
||||
type publisherClient struct {
|
||||
logger *zap.Logger
|
||||
httpClient common.HTTPClient
|
||||
location string
|
||||
}
|
||||
|
||||
func newPublisherClient(logger *zap.Logger, httpClient common.HTTPClient, config config.PublisherConfig) *publisherClient {
|
||||
if !config.Enabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &publisherClient{
|
||||
logger: logger,
|
||||
httpClient: httpClient,
|
||||
location: config.Location,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *publisherClient) Send(ctx context.Context, destination, keyID, keyPEM, activityID, payload string) error {
|
||||
|
||||
form := url.Values{}
|
||||
form.Add("destination", destination)
|
||||
form.Add("key_id", keyID)
|
||||
form.Add("key", keyPEM)
|
||||
form.Add("activity_id", activityID)
|
||||
form.Add("payload", payload)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", c.location, strings.NewReader(form.Encode()))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
req.Header.Set("User-Agent", g.UserAgent())
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if closeErr := resp.Body.Close(); closeErr != nil {
|
||||
c.logger.Error("unable to close response body", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
return fmt.Errorf("unexpected status code: %d %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue