mirror of https://gitlab.com/ngerakines/tavern.git
119 lines
2.2 KiB
Go
119 lines
2.2 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ngerakines/tavern/fed"
|
|
"github.com/ngerakines/tavern/storage"
|
|
)
|
|
|
|
type webfinger struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
queue storage.StringQueue
|
|
storage storage.Storage
|
|
httpClient HTTPClient
|
|
}
|
|
|
|
func NewWebFingerWorker(logger *zap.Logger, queue storage.StringQueue, storage storage.Storage) Job {
|
|
return &webfinger{
|
|
logger: logger,
|
|
queue: queue,
|
|
storage: storage,
|
|
httpClient: DefaultHTTPClient(),
|
|
}
|
|
}
|
|
|
|
func (job *webfinger) Run(parent context.Context) error {
|
|
job.ctx, job.cancel = context.WithCancel(parent)
|
|
defer job.cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(time.Second):
|
|
err := job.work()
|
|
if err != nil {
|
|
job.logger.Error("error processing work", zap.Error(err))
|
|
return err
|
|
}
|
|
case <-job.ctx.Done():
|
|
return ignoreCanceled(job.ctx.Err())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (job *webfinger) Shutdown(parent context.Context) error {
|
|
job.cancel()
|
|
select {
|
|
case <-parent.Done():
|
|
return parent.Err()
|
|
case <-job.ctx.Done():
|
|
return job.ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (job *webfinger) work() error {
|
|
work, err := job.queue.Take()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(work) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var actorID string
|
|
|
|
if strings.HasPrefix(work, "https://") {
|
|
actorID = work
|
|
}
|
|
|
|
if len(actorID) == 0 {
|
|
wfc := fed.WebFingerClient{
|
|
HTTPClient: job.httpClient,
|
|
Logger: job.logger,
|
|
}
|
|
|
|
wfp, err := wfc.Fetch(work)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
actorID, err = fed.ActorIDFromWebFingerPayload(wfp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
job.logger.Debug("parsed actor id from webfinger payload", zap.String("actor", actorID))
|
|
}
|
|
|
|
count, err := job.storage.RowCount(job.ctx, `SELECT COUNT(*) FROM actors WHERE actor_id = $1`, actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if count > 0 {
|
|
return nil
|
|
}
|
|
|
|
ac := fed.ActorClient{
|
|
HTTPClient: job.httpClient,
|
|
Logger: job.logger,
|
|
}
|
|
|
|
actorBody, actorPayload, err := ac.Get(actorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
keyID, keyPEM, err := storage.KeyFromActor(actorPayload)
|
|
|
|
actorRowID := storage.NewV4()
|
|
keyRowID := storage.NewV4()
|
|
|
|
return job.storage.CreateActor(context.Background(), actorRowID, keyRowID, actorID, actorBody, keyID, keyPEM)
|
|
}
|