Updating group inbox to publish Join and Leave events.

This commit is contained in:
Nick Gerakines 2020-04-03 16:03:08 -04:00
parent 50316102fc
commit 7a24d770f2
No known key found for this signature in database
GPG Key ID: 33D43D854F96B2E4
2 changed files with 84 additions and 28 deletions

View File

@ -320,24 +320,50 @@ func (h handler) groupActorInboxFollowActor(c *gin.Context, group storage.Group,
return
}
response := storage.EmptyPayload()
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["actor"] = groupActor.ActorID
response["id"] = acceptActivityID
response["object"] = payload
response["to"] = theActorFollowing
response["type"] = "Accept"
response["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
responsePayload := response.Bytes()
if h.publisherClient != nil {
err = h.publisherClient.Send(ctx, theActorFollowingA.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, acceptActivityID, string(responsePayload))
acceptActivity := storage.EmptyPayload()
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
acceptActivity["@context"] = "https://www.w3.org/ns/activitystreams"
acceptActivity["actor"] = groupActor.ActorID
acceptActivity["id"] = acceptActivityID
acceptActivity["object"] = payload
acceptActivity["to"] = theActorFollowing
acceptActivity["type"] = "Accept"
acceptActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
acceptPayload := acceptActivity.Bytes()
err = h.publisherClient.Send(ctx, theActorFollowingA.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, acceptActivityID, string(acceptPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
c.Status(http.StatusOK)
return
}
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
if err != nil {
h.logger.Error("failed getting group members", zap.Error(err))
c.Status(http.StatusOK)
return
}
for _, follower := range followers {
joinActivity := storage.EmptyPayload()
joinActivityID := common.ActivityURL(h.domain, storage.NewV4())
joinActivity["@context"] = "https://www.w3.org/ns/activitystreams"
joinActivity["id"] = joinActivityID
joinActivity["summary"] = ""
joinActivity["type"] = "Join"
joinActivity["actor"] = theActorFollowing
joinActivity["object"] = groupActor.ActorID
joinActivity["to"] = follower.ActorID
joinActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
joinPayload := joinActivity.Bytes()
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, joinActivityID, string(joinPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", joinActivityID), zap.Error(err))
}
}
}
// TODO: Implement non-publisherClient send.
}
@ -385,7 +411,15 @@ func (h handler) groupActorInboxUndoFollowActor(c *gin.Context, group storage.Gr
return
}
var groupActor *storage.Actor
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
var err error
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
@ -398,6 +432,28 @@ func (h handler) groupActorInboxUndoFollowActor(c *gin.Context, group storage.Gr
return
}
if h.publisherClient != nil {
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
for _, follower := range followers {
leaveActivity := storage.EmptyPayload()
leaveActivityID := common.ActivityURL(h.domain, storage.NewV4())
leaveActivity["@context"] = "https://www.w3.org/ns/activitystreams"
leaveActivity["summary"] = ""
leaveActivity["type"] = "Leave"
leaveActivity["id"] = leaveActivityID
leaveActivity["actor"] = actorFollowing
leaveActivity["object"] = groupActor.ActorID
leaveActivity["to"] = follower.ActorID
leaveActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
joinPayload := leaveActivity.Bytes()
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, leaveActivityID, string(joinPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", leaveActivityID), zap.Error(err))
}
}
}
c.Status(http.StatusOK)
}
@ -603,7 +659,7 @@ func (h handler) groupActorInboxAnnounce(c *gin.Context, group storage.Group, pa
return err
}
groupActor, err = h.storage.GetActor(ctx, group.ActorID)
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}

View File

@ -553,8 +553,8 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
var notifyActors []*storage.Actor
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityObjectID)
err = storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityObjectID)
if err != nil {
return err
}
@ -564,22 +564,22 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
// actions should not result in new records being created. However,
// we avoid the inbox-forwarding behavior.
activityObjectRowID, err := storage.RecordObject(ctx, activityObject, activityObjectID)
activityObjectRowID, err := tx.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
return err
}
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
activityRowID, err := tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
_, err = tx.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
if hasInReplyTo {
replyRowID, err := storage.ObjectRowIDForObjectID(ctx, inReplyTo)
replyRowID, err := tx.ObjectRowIDForObjectID(ctx, inReplyTo)
if err == nil {
_, err = storage.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
_, err = tx.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
if err != nil {
return err
}
@ -590,7 +590,7 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && strings.HasPrefix(inReplyTo, common.ObjectURLPrefix(h.domain)) && !activityExists {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, replyRowID)
notifyActors, err = tx.ActorsSubscribedToObject(ctx, replyRowID)
if err != nil {
return err
}
@ -854,25 +854,25 @@ func (h handler) actorInboxDelete(c *gin.Context, user *storage.User, payload st
var notifyActors []*storage.Actor
txErr := storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityURL)
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityURL)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityURL))
err = storage.UpdateObjectPayload(ctx, objectRowID, tombstone)
err = tx.UpdateObjectPayload(ctx, objectRowID, tombstone)
if err != nil {
return err
}
_, err = storage.RecordObjectEvent(ctx, activityURL, objectRowID, payload)
_, err = tx.RecordObjectEvent(ctx, activityURL, objectRowID, payload)
if err != nil {
return err
}
parentObjectRowID, err := storage.ParentObjectID(ctx, objectRowID)
parentObjectRowID, err := tx.ParentObjectID(ctx, objectRowID)
// If there was an error and it isn't a not-found error, fail.
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
return err
@ -883,7 +883,7 @@ func (h handler) actorInboxDelete(c *gin.Context, user *storage.User, payload st
// should be forwarded to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists && parentObjectRowID != uuid.Nil {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, parentObjectRowID)
notifyActors, err = tx.ActorsSubscribedToObject(ctx, parentObjectRowID)
if err != nil {
return err
}