Implemented inbox forwarding. Added follow/note and undo/follow/note in actor inbox. Added activity forwarding to create/note and announce/note in actor inbox. Added migrations to create table used to track object subscriptions and storage interface functions to add, remove, and list actor object subscriptions.

This commit is contained in:
Nick Gerakines 2020-03-28 16:18:57 -04:00
parent 2f0736a509
commit d5dc4ef3da
No known key found for this signature in database
GPG Key ID: 33D43D854F96B2E4
13 changed files with 8244 additions and 7875 deletions

View File

@ -4,16 +4,28 @@ import (
"fmt"
)
func ActorURLPrefix(domain string) string {
return fmt.Sprintf("https://%s/users/", domain)
}
func ActorURL(domain string, name interface{}) string {
return fmt.Sprintf("https://%s/users/%s", domain, name)
return fmt.Sprintf("%s%s", ActorURLPrefix(domain), name)
}
func ActivityURLPrefix(domain string) string {
return fmt.Sprintf("https://%s/activity/", domain)
}
func ActivityURL(domain string, activityID interface{}) string {
return fmt.Sprintf("https://%s/activity/%s", domain, activityID)
return fmt.Sprintf("%s%s", ActivityURLPrefix(domain), activityID)
}
func ObjectURLPrefix(domain string) string {
return fmt.Sprintf("https://%s/object/", domain)
}
func ObjectURL(domain string, objectID interface{}) string {
return fmt.Sprintf("https://%s/object/%s", domain, objectID)
return fmt.Sprintf("%s%s", ObjectURLPrefix(domain), objectID)
}
func ObjectRepliesURL(domain string, objectID interface{}) string {

View File

@ -5,14 +5,15 @@ import (
)
type FedConfig struct {
AllowReplyCollectionUpdates bool
AllowAutoAcceptFollowers bool
AllowFollowObject bool
AllowAutoAcceptFollowers bool
AllowInboxForwarding bool
}
var AllowReplyCollectionUpdatesFlag = cli.BoolFlag{
Name: "allow-reply-collection-updates",
Usage: "Allow users to turn on object reply collection updates.",
EnvVars: []string{"ALLOW_REPLY_COLLECTION_UPDATES"},
var AllowFollowObjectFlag = cli.BoolFlag{
Name: "allow-follow-object",
Usage: "Allow actors to follow objects",
EnvVars: []string{"ALLOW_OBJECT_FOLLOW"},
Value: false,
}
@ -23,10 +24,19 @@ var AllowAutoAcceptFollowersFlag = cli.BoolFlag{
Value: true,
}
var AllowInboxForwardingFlag = cli.BoolFlag{
Name: "allow-inbox-forwarding",
Usage: "Allow messages sent to your inbox to be forwarded to other inboxes.",
EnvVars: []string{"ALLOW_INBOX_FORWARDING"},
Value: false,
}
func NewFedConfig(cliCtx *cli.Context) (FedConfig, error) {
cfg := FedConfig{
AllowReplyCollectionUpdates: cliCtx.Bool("allow-reply-collection-updates"),
AllowAutoAcceptFollowers: cliCtx.Bool("allow-auto-accept-followers"),
AllowAutoAcceptFollowers: cliCtx.Bool("allow-auto-accept-followers"),
AllowFollowObject: cliCtx.Bool("allow-reply-collection-updates"),
AllowInboxForwarding: cliCtx.Bool("allow-inbox-forwarding"),
}
return cfg, nil
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

1
go.mod
View File

@ -24,6 +24,7 @@ require (
github.com/piprate/json-gold v0.3.0
github.com/prometheus/client_golang v1.5.1
github.com/russross/blackfriday/v2 v2.0.1
github.com/sslhound/herr v1.4.1 // indirect
github.com/stretchr/testify v1.4.0
github.com/teacat/noire v1.0.0
github.com/urfave/cli/v2 v2.1.1

2
go.sum
View File

@ -396,6 +396,8 @@ github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tL
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/sslhound/herr v1.4.1 h1:7EBdK2gDkT7lFve3KXC1PGJUasgeQRVYbZbI3qcpl0c=
github.com/sslhound/herr v1.4.1/go.mod h1:3zw8Zr8bwddppECO/GycmavbbGYyT6oCm0urxmSXfR0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

View File

@ -0,0 +1 @@
DROP TABLE object_subscriptions;

View File

@ -0,0 +1,11 @@
create table if not exists public.object_subscriptions
(
id uuid not null
constraint object_subscriptions_pk primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
object_id uuid not null,
actor_id uuid not null,
constraint object_subscriptions_uindex
unique (object_id, actor_id)
);

View File

@ -192,7 +192,7 @@ func (s pgStorage) RemoveFollower(ctx context.Context, userID, actorID uuid.UUID
func (s pgStorage) IsInNetwork(ctx context.Context, userID, actorID uuid.UUID) (bool, error) {
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2`, userID, actorID)
return c == 1, err
return c > 0, err
}
func (s pgStorage) IsFollowing(ctx context.Context, userID, actorID uuid.UUID) (bool, error) {

View File

@ -31,6 +31,7 @@ type ObjectStorage interface {
ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error)
ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error)
ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error)
ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error)
ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error)
ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error)
ObjectRowIDsForObjectIDs(ctx context.Context, objectID []string) (map[string]uuid.UUID, error)
@ -56,6 +57,14 @@ type ObjectStorage interface {
ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error)
ObjectParentsByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
ObjectChildrenByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error)
RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error
ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error)
ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error)
}
func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error) {
@ -159,6 +168,10 @@ func (s pgStorage) ObjectPayloadByObjectID(ctx context.Context, objectID string)
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE object_id = $1`, objectID)
}
func (s pgStorage) ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE id = $1`, objectRowID)
}
func (s pgStorage) ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM object_events WHERE activity_id = $1`, activityID)
}
@ -355,5 +368,38 @@ FROM object_children`
func (s pgStorage) ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error) {
query := `SELECT COUNT(o.*) FROM objects o INNER JOIN user_feed uf on o.id = uf.object_id WHERE o.object_id = $1`
return s.wrappedExists(errors.WrapActorAliasQueryFailedError, ctx, query, objectID)
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectSubscriptionAll(ctx, rowID, now, now, objectRowID, actorRowID)
}
func (s pgStorage) RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO object_subscriptions (id, created_at, updated_at, object_id, actor_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_subscriptions_uindex DO UPDATE SET updated_at = now() RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectRowID, actorRowID).Scan(&id)
return id, errors.WrapObjectReplyUpsertFailedError(err)
}
func (s pgStorage) ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error) {
query := actorsSelectQuery("object_subscriptions os on a.id = os.actor_id", []string{"os.object_id $1"})
return s.getActors(s.db, ctx, query, objectRowID)
}
func (s pgStorage) RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM object_subscriptions WHERE actor_id = $1 AND object_id = $2`, actorRowID, objectRowID)
return errors.WrapNetworkRelationshipUpdateFailedError(err)
}
func (s pgStorage) ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error) {
query := `SELECT COUNT(*) FROM objects WHERE object_id = $1`
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error) {
query := `SELECT COUNT(*) FROM object_events WHERE activity_id = $1`
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, activityID)
}

View File

@ -65,8 +65,9 @@ var Command = cli.Command{
&config.AssetStorageRemoteDenyFlag,
&config.AssetStorageRemoteMaxFlag,
&config.AllowReplyCollectionUpdatesFlag,
&config.AllowAutoAcceptFollowersFlag,
&config.AllowFollowObjectFlag,
&config.AllowInboxForwardingFlag,
&config.EnablePublisherFlag,
&config.PublisherLocationFlag,

View File

@ -15,6 +15,7 @@ import (
"github.com/yukimochi/httpsig"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/fed"
"github.com/ngerakines/tavern/storage"
@ -75,7 +76,7 @@ func (h handler) actorInbox(c *gin.Context) {
switch payloadType {
case "Follow":
h.actorInboxFollow(c, user, payload, body)
h.actorInboxFollow(c, user, payload)
case "Undo":
h.actorInboxUndo(c, user, payload)
case "Accept":
@ -83,9 +84,9 @@ func (h handler) actorInbox(c *gin.Context) {
case "Reject":
h.actorInboxReject(c, user, payload)
case "Create":
h.actorInboxCreate(c, user, payload)
h.actorInboxCreate(c, user, payload, body)
case "Announce":
h.actorInboxAnnounce(c, user, payload)
h.actorInboxAnnounce(c, user, payload, body)
case "Delete":
h.actorInboxDelete(c, user, payload)
default:
@ -167,32 +168,48 @@ func (h handler) actorInboxReject(c *gin.Context, user *storage.User, payload st
c.Status(http.StatusOK)
}
func (h handler) actorInboxFollow(c *gin.Context, user *storage.User, payload storage.Payload, body []byte) {
func (h handler) actorInboxFollow(c *gin.Context, user *storage.User, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
if strings.HasPrefix(objectID, common.ActorURLPrefix(h.domain)) {
h.actorInboxFollowActor(c, user, payload)
} else if h.fedConfig.AllowFollowObject && strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
h.actorInboxFollowNote(c, user, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxFollowActor(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.hardFail(c, err)
return
}
theActorBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
theActorFollowing, _ := storage.JSONString(payload, "actor")
userActorID := storage.NewActorID(user.Name, h.domain)
self, hasActor := storage.JSONString(payload, "object")
target, hasTarget := storage.JSONString(payload, "actor")
if !hasActor || !hasTarget || self != string(userActorID) {
h.logger.Warn("unable to process follow request",
zap.String("object", self),
zap.String("actor", target),
zap.String("user", user.Name),
)
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
followerActor, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, target)
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(theActorBeingFollowed, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if theActorBeingFollowed != common.ActorURL(h.domain, user.Name) {
c.Status(http.StatusOK)
return
}
// This just ensures we have a reference of the actor and it's keys for
// the verify signature step.
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
h.internalServerErrorJSON(c, err)
return
}
@ -202,65 +219,195 @@ func (h handler) actorInboxFollow(c *gin.Context, user *storage.User, payload st
return
}
remoteActorID, err := h.storage.ActorRowIDForActorID(c.Request.Context(), target)
theActorFollowingRowID, err := h.storage.ActorRowIDForActorID(ctx, theActorFollowing)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.CreatePendingFollower(ctx, user.ID, remoteActorID, payload)
err = h.storage.CreatePendingFollower(ctx, user.ID, theActorFollowingRowID, payload)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if user.AcceptFollowers {
if h.fedConfig.AllowAutoAcceptFollowers && user.AcceptFollowers {
response := storage.EmptyPayload()
acceptActivityID := storage.NewV4()
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["actor"] = storage.NewActorID(user.Name, h.domain)
response["id"] = fmt.Sprintf("https://%s/activity/%s", h.domain, acceptActivityID)
response["id"] = acceptActivityID
response["object"] = payload
response["to"] = followerActor.GetID()
response["to"] = theActorFollowing
response["type"] = "Accept"
response["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
responsePayload := response.Bytes()
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), followerActor, responsePayload)
err = h.storage.UpdateFollowerApproved(ctx, user.ID, theActorFollowingRowID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.UpdateFollowerApproved(ctx, user.ID, remoteActorID)
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), theActorFollowingA.GetInbox(), userActor.GetKeyID(), user.PrivateKey, acceptActivityID, string(responsePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), theActorFollowingA, responsePayload)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxFollowNote(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
theObjectBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
theActorFollowing, _ := storage.JSONString(payload, "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// This just ensures we have a reference of the actor and it's keys for
// the verify signature step.
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
h.internalServerErrorJSON(c, err)
return
}
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
var theObjectBeingFollowedRowID uuid.UUID
var userActor *storage.Actor
bail := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
theObjectBeingFollowedRowID, err = tx.ObjectRowIDForObjectID(ctx, theObjectBeingFollowed)
if err != nil {
return err
}
objectPayload, err := tx.ObjectPayloadByObjectRowID(ctx, theObjectBeingFollowedRowID)
if err != nil {
return err
}
// Verify the object's attributedTo actor is the same as the owner of the
// inbox this follow activity was sent to. Otherwise assume it is
// informational.
attributedTo, hasAttributedTo := storage.JSONString(objectPayload, "attributedTo")
if !hasAttributedTo || attributedTo != common.ActorURL(h.domain, user.Name) {
bail = true
return nil
}
_, err = tx.RecordObjectSubscription(ctx, theObjectBeingFollowedRowID, theActorFollowingA.ID)
if err != nil {
return err
}
userActor, err = tx.GetActor(ctx, user.ActorID)
if err != nil {
return err
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if bail {
c.Status(http.StatusOK)
return
}
response := storage.EmptyPayload()
acceptActivityID := fmt.Sprintf("https://%s/activity/%s", h.domain, storage.NewV4())
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["actor"] = storage.NewActorID(user.Name, h.domain)
response["id"] = acceptActivityID
response["object"] = payload
response["to"] = theActorFollowingA.GetID()
response["type"] = "Accept"
response["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
responsePayload := response.Bytes()
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), theActorFollowingA.GetInbox(), userActor.GetKeyID(), user.PrivateKey, acceptActivityID, string(responsePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), theActorFollowingA, responsePayload)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndoFollow(c *gin.Context, user *storage.User, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
if strings.HasPrefix(objectID, common.ActorURLPrefix(h.domain)) {
h.actorInboxUndoFollowActor(c, user, payload)
} else if h.fedConfig.AllowFollowObject && strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
h.actorInboxUndoFollowNote(c, user, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxUndoFollowActor(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
userActorID := storage.NewActorID(user.Name, h.domain)
// objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
actorFollowed, _ := storage.JSONDeepString(payload, "object", "object")
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
self, hasActor := storage.JSONDeepString(payload, "object", "object")
target, hasTarget := storage.JSONDeepString(payload, "object", "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
if !hasActor || !hasTarget || self != string(userActorID) {
h.logger.Warn("unable to process follow request",
zap.String("object", self),
zap.String("actor", target),
zap.String("user", user.Name),
)
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(actorFollowed, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if actorFollowed != common.ActorURL(h.domain, user.Name) {
c.Status(http.StatusOK)
return
}
@ -270,15 +417,74 @@ func (h handler) actorInboxUndoFollow(c *gin.Context, user *storage.User, payloa
return
}
remoteActorID, err := h.storage.ActorRowIDForActorID(c.Request.Context(), target)
if err != nil {
h.internalServerErrorJSON(c, err)
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
}
return tx.RemoveFollower(ctx, user.ID, actorFollowingRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
err = h.storage.RemoveFollower(ctx, user.ID, remoteActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndoFollowNote(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
}
objectRowID, err := tx.ObjectRowIDForObjectID(c.Request.Context(), objectID)
if err != nil {
return err
}
objectPayload, err := tx.ObjectPayloadByObjectRowID(ctx, objectRowID)
if err != nil {
return err
}
// Verify the object's attributedTo actor is the same as the owner of the
// inbox this follow activity was sent to. Otherwise assume it is
// informational.
attributedTo, hasAttributedTo := storage.JSONString(objectPayload, "attributedTo")
if !hasAttributedTo || attributedTo != common.ActorURL(h.domain, user.Name) {
return nil
}
return tx.RemoveObjectSubscription(ctx, objectRowID, actorFollowingRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
@ -296,7 +502,7 @@ func (h handler) actorInboxUndo(c *gin.Context, user *storage.User, payload stor
}
}
func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload storage.Payload) {
func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload storage.Payload, raw []byte) {
isRelevant, err := h.isActivityRelevant(c.Request.Context(), payload, user)
if err != nil {
h.internalServerErrorJSON(c, err)
@ -343,11 +549,21 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
inReplyTo, hasInReplyTo := storage.JSONDeepString(payload, "object", "inReplyTo")
// localObjectPrefix := fmt.Sprintf("https://%s/object/", h.domain)
ctx := c.Request.Context()
var notifyActors []*storage.Actor
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityObjectID)
if err != nil {
return err
}
// If we've seen the activity before, the upsert operations for
// recording the object event, object, user feed, and object reply
// actions should not result in new records being created. However,
// we avoid the inbox-forwarding behavior.
activityObjectRowID, err := storage.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
return err
@ -368,7 +584,17 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
return err
}
}
// If the server has inbox forwarding enabled, the replied to
// object is local, and the activity has never been seen, then get
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && strings.HasPrefix(inReplyTo, common.ObjectURLPrefix(h.domain)) && !activityExists {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, replyRowID)
if err != nil {
return err
}
}
}
return nil
})
@ -399,16 +625,43 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
}
}
if len(notifyActors) > 0 {
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), notifyActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(raw))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
err = nc.SendToInbox(ctx, h.userActor(user, userActor), notifyActor, raw)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload storage.Payload) {
func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload storage.Payload, raw []byte) {
isRelevant, err := h.isActivityRelevant(c.Request.Context(), payload, user)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if !isRelevant {
h.logger.Debug("ignoring announce that isn't relevant.")
c.Status(http.StatusOK)
return
}
@ -441,62 +694,46 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
ctx := c.Request.Context()
localActivityPrefix := fmt.Sprintf("https://%s/activity/", h.domain)
if strings.HasPrefix(activityObjectID, localActivityPrefix) {
activityObjectRowID, err := uuid.FromString(strings.TrimPrefix(activityObjectID, localActivityPrefix))
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
c.Status(http.StatusOK)
return
}
var objectPayload storage.Payload
ac := fed.ActivityClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
_, objectPayload, err := ac.GetSigned(activityObjectID, h.userActor(user, userActor))
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
var notifyActors []*storage.Actor
objectID, hasObjectID := storage.JSONString(objectPayload, "id")
if !hasObjectID || objectID != activityObjectID {
h.logger.Info("ignoring invalid announce activity",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.Strings("object_id", []string{activityObjectID, objectID}),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityObjectRowID, err := storage.RecordObject(ctx, objectPayload, activityObjectID)
txErr := storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityID)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityID))
// Nick: I don't like any of this.
var activityObjectRowID uuid.UUID
activityObjectRowID, err = storage.ObjectRowIDForObjectID(ctx, activityObjectID)
if err != nil {
// If the object is not local, and we don't have it recorded, try to fetch it.
if errors.Is(err, errors.NewNotFoundError(nil)) && !strings.HasPrefix(activityObjectID, common.ObjectURLPrefix(h.domain)) {
h.logger.Debug("object row not found and object is not local", zap.String("activity", activityID), zap.String("object", activityObjectID))
// If we are able to fetch the object and record the object, keep going
_, objectPayload, err = ac.GetSigned(activityObjectID, h.userActor(user, userActor))
if err != nil {
return err
}
activityObjectRowID, err = storage.RecordObject(ctx, objectPayload, activityObjectID)
if err != nil {
return err
}
} else {
return err
}
}
// TODO: Verify that the user owns the object if the object is local.
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
@ -505,14 +742,56 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
if err != nil {
return err
}
// If the server has inbox forwarding enabled, the replied to
// object is local, and the activity has never been seen, then get
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, activityObjectRowID)
if err != nil {
return err
}
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if err = h.crawlQueue.Add(strings.Join([]string{user.ID.String(), objectID}, ",")); err != nil {
// TODO: If set activityIsNew to true and server enabled inbox
// forwarding and actor enabled inbox forwarding then find all
// actors to receive the activity and publish it to them.
if h.fedConfig.AllowInboxForwarding && len(notifyActors) > 0 {
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), notifyActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(raw))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
err = nc.SendToInbox(ctx, h.userActor(user, userActor), notifyActor, raw)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
// Crawl last so that we don't introduce weird race conditions between the crawler and inbox processing.
if err = h.crawlQueue.Add(strings.Join([]string{user.ID.String(), activityObjectID}, ",")); err != nil {
h.logger.Warn("error queueing crawl work", zap.Error(err))
}

View File

@ -43,7 +43,7 @@ func (h handler) configure(c *gin.Context) {
data["authenticated"] = true
data["fed_config"] = map[string]bool{
"reply_collection_updates": h.fedConfig.AllowReplyCollectionUpdates,
"reply_collection_updates": h.fedConfig.AllowFollowObject,
"auto_accept_followers": h.fedConfig.AllowAutoAcceptFollowers,
}