mirror of https://gitlab.com/ngerakines/tavern.git
109 lines
2.5 KiB
Go
109 lines
2.5 KiB
Go
package job
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofrs/uuid"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ngerakines/tavern/asset"
|
|
"github.com/ngerakines/tavern/common"
|
|
"github.com/ngerakines/tavern/config"
|
|
"github.com/ngerakines/tavern/fed"
|
|
"github.com/ngerakines/tavern/storage"
|
|
)
|
|
|
|
type crawl struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
logger *zap.Logger
|
|
queue common.StringQueue
|
|
storage storage.Storage
|
|
httpClient common.HTTPClient
|
|
assetStorage asset.Storage
|
|
assetStorageConfig config.AssetStorageConfig
|
|
fedConfig config.FedConfig
|
|
}
|
|
|
|
func NewCrawlWorker(logger *zap.Logger, queue common.StringQueue, storage storage.Storage, assetStorage asset.Storage, config config.AssetStorageConfig, fedConfig config.FedConfig, httpClient *http.Client) Job {
|
|
return &crawl{
|
|
logger: logger,
|
|
queue: queue,
|
|
storage: storage,
|
|
assetStorage: assetStorage,
|
|
httpClient: httpClient,
|
|
assetStorageConfig: config,
|
|
fedConfig: fedConfig,
|
|
}
|
|
}
|
|
|
|
func (job *crawl) Run(parent context.Context) error {
|
|
job.ctx, job.cancel = context.WithCancel(parent)
|
|
defer job.cancel()
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(time.Second):
|
|
err := job.work()
|
|
if err != nil {
|
|
job.logger.Error("error processing work", zap.Error(err))
|
|
return err
|
|
}
|
|
case <-job.ctx.Done():
|
|
return ignoreCanceled(job.ctx.Err())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (job *crawl) Shutdown(parent context.Context) error {
|
|
job.cancel()
|
|
select {
|
|
case <-parent.Done():
|
|
return parent.Err()
|
|
case <-job.ctx.Done():
|
|
return job.ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (job *crawl) work() error {
|
|
work, err := job.queue.Take()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(work) == 0 {
|
|
return nil
|
|
}
|
|
|
|
parts := strings.SplitN(work, ",", 2)
|
|
if len(parts) != 2 {
|
|
return fmt.Errorf("invalid work: %s", work)
|
|
}
|
|
userID, err := uuid.FromString(parts[0])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
user, err := job.storage.GetUser(job.ctx, userID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
crawler := &fed.Crawler{
|
|
HTTPClient: job.httpClient,
|
|
Logger: job.logger,
|
|
Storage: job.storage,
|
|
MaxDepth: fed.CrawlerDefaultMaxCount,
|
|
AssetStorage: job.assetStorage,
|
|
AssetStorageConfig: job.assetStorageConfig,
|
|
FedConfig: job.fedConfig,
|
|
}
|
|
if _, _, err = crawler.Start(user, parts[1]); err != nil {
|
|
job.logger.Warn("error crawling", zap.Error(err), zap.String("location", parts[1]))
|
|
}
|
|
return nil
|
|
}
|