mirror of https://gitlab.com/ngerakines/tavern.git
192 lines
4.4 KiB
Go
192 lines
4.4 KiB
Go
package fed
|
|
|
|
import (
|
|
"context"
|
|
"crypto/rsa"
|
|
"strings"
|
|
|
|
"github.com/yukimochi/httpsig"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ngerakines/tavern/asset"
|
|
"github.com/ngerakines/tavern/common"
|
|
"github.com/ngerakines/tavern/config"
|
|
"github.com/ngerakines/tavern/storage"
|
|
)
|
|
|
|
type Crawler struct {
|
|
HTTPClient common.HTTPClient
|
|
Logger *zap.Logger
|
|
Storage storage.Storage
|
|
MaxDepth int
|
|
AssetStorage asset.Storage
|
|
AssetStorageConfig config.AssetStorageConfig
|
|
FedConfig config.FedConfig
|
|
}
|
|
|
|
type Signer interface {
|
|
GetPrivateKey() (*rsa.PrivateKey, error)
|
|
GetKeyID() string
|
|
}
|
|
|
|
const CrawlerDefaultMaxCount = 30
|
|
|
|
func (c Crawler) Start(user *storage.User, seed string) ([]string, []string, error) {
|
|
c.Logger.Info("crawler starting", zap.String("seed", seed))
|
|
ctx := context.Background()
|
|
|
|
userActor, err := c.Storage.GetActor(ctx, user.ActorID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sigConfig := []httpsig.Algorithm{httpsig.RSA_SHA256}
|
|
headersToSign := []string{httpsig.RequestTarget, "date"}
|
|
signer, _, err := httpsig.NewSigner(sigConfig, headersToSign, httpsig.Signature)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
privateKey, err := user.GetPrivateKey()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
activityQueue := &common.SeenStringQueue{Seen: make(map[string]bool), Queue: make([]string, 0), Taken: make([]string, 0)}
|
|
activityQueue.Add(seed)
|
|
|
|
actorQueue := &common.SeenStringQueue{Seen: make(map[string]bool), Queue: make([]string, 0), Taken: make([]string, 0)}
|
|
|
|
mediaQueue := &common.SeenStringQueue{Seen: make(map[string]bool), Queue: make([]string, 0), Taken: make([]string, 0)}
|
|
|
|
counter := 0
|
|
|
|
replies := make(map[string]string)
|
|
|
|
for !activityQueue.Empty() {
|
|
if counter > c.MaxDepth {
|
|
break
|
|
}
|
|
|
|
location, ok := activityQueue.Take()
|
|
if !ok {
|
|
break
|
|
}
|
|
c.Logger.Info("crawling activity", zap.String("location", location))
|
|
|
|
var payload storage.Payload
|
|
|
|
existingCount, err := c.Storage.RowCount(ctx, `SELECT COUNT(*) FROM objects WHERE object_id = $1`, location)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if existingCount == 0 {
|
|
_, payload, err = ldJsonGetSigned(c.HTTPClient, location, signer, userActor.GetKeyID(), privateKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
_, err := c.Storage.RecordObject(ctx, payload, location)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
} else {
|
|
payload, err = c.Storage.ObjectPayloadByObjectID(ctx, location)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
inReplyTo, hasInReplyTo := storage.JSONString(payload, "inReplyTo")
|
|
if hasInReplyTo {
|
|
activityQueue.Add(inReplyTo)
|
|
replies[location] = inReplyTo
|
|
}
|
|
|
|
allActors := ActorsFromObject(payload)
|
|
for _, activityActor := range allActors {
|
|
actorQueue.Add(activityActor)
|
|
}
|
|
|
|
attachments, ok := storage.JSONMapList(payload, "attachment")
|
|
if ok {
|
|
for _, attachment := range attachments {
|
|
mediaType, hasMediaType := storage.JSONString(attachment, "mediaType")
|
|
url, hasURL := storage.JSONString(attachment, "url")
|
|
if hasMediaType && hasURL && strings.HasPrefix(url, "https://") {
|
|
if mediaType == "image/jpeg" || mediaType == "image/png" {
|
|
mediaQueue.Add(url)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
counter++
|
|
}
|
|
|
|
for obj, parent := range replies {
|
|
objRowID, err := c.Storage.ObjectRowIDForObjectID(ctx, obj)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
parentRowID, err := c.Storage.ObjectRowIDForObjectID(ctx, parent)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
_, _ = c.Storage.RecordObjectReply(ctx, objRowID, parentRowID)
|
|
}
|
|
|
|
counter = 0
|
|
|
|
for !actorQueue.Empty() {
|
|
if counter > c.MaxDepth {
|
|
break
|
|
}
|
|
|
|
location, ok := actorQueue.Take()
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
c.Logger.Info("crawling actor", zap.String("location", location))
|
|
|
|
_, err := GetOrFetchActor(ctx, c.Storage, c.Logger, c.HTTPClient, location)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
counter++
|
|
}
|
|
|
|
counter = 0
|
|
|
|
a := &asset.Agent{
|
|
AssetStorage: c.AssetStorage,
|
|
DataStorage: c.Storage,
|
|
HTTPClient: c.HTTPClient,
|
|
AssetStorageConfig: c.AssetStorageConfig,
|
|
}
|
|
|
|
for !mediaQueue.Empty() {
|
|
if counter > (c.MaxDepth * 10) {
|
|
break
|
|
}
|
|
|
|
location, ok := mediaQueue.Take()
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
c.Logger.Info("invoking media downloader", zap.String("location", location))
|
|
|
|
_, err = a.HandleImage(ctx, location)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
counter++
|
|
}
|
|
|
|
return actorQueue.Taken, activityQueue.Taken, nil
|
|
}
|