tavern/job/crawler.go

109 lines
2.5 KiB
Go

package job
import (
"context"
"fmt"
"net/http"
"strings"
"time"
"github.com/gofrs/uuid"
"go.uber.org/zap"
"github.com/ngerakines/tavern/asset"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/config"
"github.com/ngerakines/tavern/fed"
"github.com/ngerakines/tavern/storage"
)
type crawl struct {
ctx context.Context
cancel context.CancelFunc
logger *zap.Logger
queue common.StringQueue
storage storage.Storage
httpClient common.HTTPClient
assetStorage asset.Storage
assetStorageConfig config.AssetStorageConfig
fedConfig config.FedConfig
}
func NewCrawlWorker(logger *zap.Logger, queue common.StringQueue, storage storage.Storage, assetStorage asset.Storage, config config.AssetStorageConfig, fedConfig config.FedConfig, httpClient *http.Client) Job {
return &crawl{
logger: logger,
queue: queue,
storage: storage,
assetStorage: assetStorage,
httpClient: httpClient,
assetStorageConfig: config,
fedConfig: fedConfig,
}
}
func (job *crawl) Run(parent context.Context) error {
job.ctx, job.cancel = context.WithCancel(parent)
defer job.cancel()
for {
select {
case <-time.After(time.Second):
err := job.work()
if err != nil {
job.logger.Error("error processing work", zap.Error(err))
return err
}
case <-job.ctx.Done():
return ignoreCanceled(job.ctx.Err())
}
}
}
func (job *crawl) Shutdown(parent context.Context) error {
job.cancel()
select {
case <-parent.Done():
return parent.Err()
case <-job.ctx.Done():
return job.ctx.Err()
}
}
func (job *crawl) work() error {
work, err := job.queue.Take()
if err != nil {
return err
}
if len(work) == 0 {
return nil
}
parts := strings.SplitN(work, ",", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid work: %s", work)
}
userID, err := uuid.FromString(parts[0])
if err != nil {
return err
}
user, err := job.storage.GetUser(job.ctx, userID)
if err != nil {
return err
}
crawler := &fed.Crawler{
HTTPClient: job.httpClient,
Logger: job.logger,
Storage: job.storage,
MaxDepth: fed.CrawlerDefaultMaxCount,
AssetStorage: job.assetStorage,
AssetStorageConfig: job.assetStorageConfig,
FedConfig: job.fedConfig,
}
if _, _, err = crawler.Start(user, parts[1]); err != nil {
job.logger.Warn("error crawling", zap.Error(err), zap.String("location", parts[1]))
}
return nil
}