mirror of https://gitlab.com/ngerakines/tavern.git
765 lines
24 KiB
Go
765 lines
24 KiB
Go
package web
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gofrs/uuid"
|
|
"github.com/kr/pretty"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ngerakines/tavern/common"
|
|
"github.com/ngerakines/tavern/errors"
|
|
"github.com/ngerakines/tavern/fed"
|
|
"github.com/ngerakines/tavern/storage"
|
|
)
|
|
|
|
func (h handler) groupActorInbox(c *gin.Context) {
|
|
name := c.Param("name")
|
|
|
|
group, err := h.storage.GetGroupByName(c.Request.Context(), name)
|
|
if err != nil {
|
|
if errors.Is(err, errors.NewNotFoundError(nil)) {
|
|
h.notFoundJSON(c, nil)
|
|
return
|
|
}
|
|
h.internalServerErrorJSON(c, err, zap.String("name", name))
|
|
return
|
|
}
|
|
|
|
defer c.Request.Body.Close()
|
|
|
|
body, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, maxInboxRequestSize+1))
|
|
if err != nil {
|
|
h.internalServerErrorJSON(c, err)
|
|
return
|
|
}
|
|
if len(body) > maxInboxRequestSize {
|
|
h.writeJSONError(c, http.StatusRequestEntityTooLarge, err)
|
|
return
|
|
}
|
|
|
|
fmt.Println(string(body))
|
|
|
|
payload, err := storage.PayloadFromBytes(body)
|
|
if err != nil {
|
|
h.badRequestJSON(c, err)
|
|
return
|
|
}
|
|
|
|
pretty.Println(payload)
|
|
|
|
if skipActorInbox(payload) {
|
|
h.logger.Debug("group actor inbox can ignore message", zap.String("group", name))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
payloadType, _ := storage.JSONString(payload, "type")
|
|
|
|
actor, _ := storage.JSONString(payload, "actor")
|
|
|
|
if len(actor) > 0 && !allow(c.Request.Context(), h.logger, h.storage, h.serverActorRowID, group.ActorID, actor) {
|
|
h.logger.Debug("actor denied", zap.String("group", name), zap.String("actor", actor))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
switch payloadType {
|
|
case "Follow":
|
|
h.groupActorInboxFollow(c, group, payload)
|
|
case "Undo":
|
|
h.groupActorInboxUndo(c, group, payload)
|
|
case "Create":
|
|
h.groupActorInboxCreate(c, group, payload)
|
|
case "Announce":
|
|
h.groupActorInboxAnnounce(c, group, payload)
|
|
case "Invite":
|
|
h.groupActorInboxInvite(c, group, payload)
|
|
default:
|
|
h.logger.Warn("Group received unexpected payload type", zap.String("type", payloadType), zap.String("user", name))
|
|
c.Status(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (h handler) groupActorInboxInvite(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
if err := h.verifySignature(c, group.ActorID); err != nil {
|
|
h.unauthorizedJSON(c, err)
|
|
return
|
|
}
|
|
|
|
inviter, hasInviter := storage.JSONString(payload, "actor")
|
|
invited, hasInvited := storage.JSONString(payload, "object")
|
|
targetGroup, hasTargetGroup := storage.FirstJSONString(payload, "target")
|
|
|
|
if !hasInviter || !hasInvited || !hasTargetGroup {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
if targetGroup != common.GroupActorURL(h.domain, group.Name) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If remote group followers is not enabled on the server, and the
|
|
// invited actor is not local, then ignore the request
|
|
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
if !group.AllowRemote && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
ctx := c.Request.Context()
|
|
|
|
actorInvited, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, invited)
|
|
if err != nil {
|
|
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", invited))
|
|
h.internalServerErrorJSON(c, err)
|
|
return
|
|
}
|
|
|
|
// TODO: Verify actorInvited is a type "Person".
|
|
|
|
denied := false
|
|
|
|
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
|
|
actorInviter, err := tx.GetActorByActorID(ctx, inviter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
allowed, err := tx.GroupMemberCanInvite(ctx, group.ActorID, actorInviter.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !allowed {
|
|
denied = true
|
|
return nil
|
|
}
|
|
|
|
alreadyInvited, err := tx.IsActorInvitedToGroup(ctx, group.ActorID, actorInvited.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyInvited {
|
|
return nil
|
|
}
|
|
|
|
isInGroup, err := tx.IsActorInGroup(ctx, group.ActorID, actorInvited.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if isInGroup {
|
|
return nil
|
|
}
|
|
|
|
_, err = tx.RecordGroupInvitation(ctx, group.ActorID, actorInvited.ID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if txErr != nil {
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
if denied {
|
|
h.unauthorizedJSON(c, nil)
|
|
return
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|
|
|
|
func (h handler) groupActorInboxFollow(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
objectID := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
|
|
if strings.HasPrefix(objectID, common.GroupActorURLPrefix(h.domain)) {
|
|
h.groupActorInboxFollowActor(c, group, payload)
|
|
} else {
|
|
c.Status(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (h handler) groupActorInboxFollowActor(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
ctx := c.Request.Context()
|
|
|
|
theActorBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
|
|
theActorFollowing, _ := storage.JSONString(payload, "actor")
|
|
|
|
// If the actor that is doing the following is local, bail.
|
|
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
|
|
h.logger.Debug("group actor inbox follow bailing: actor is local",
|
|
zap.String("actor", theActorFollowing),
|
|
zap.String("prefix", common.ActorURLPrefix(h.domain)))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If the actor that is being followed is not local, bail. Assume it is
|
|
// forwarded or just informational.
|
|
if !strings.HasPrefix(theActorBeingFollowed, common.GroupActorURLPrefix(h.domain)) {
|
|
h.logger.Debug("group actor inbox follow bailing: group is not local",
|
|
zap.String("object", theActorBeingFollowed),
|
|
zap.String("prefix", common.GroupActorURLPrefix(h.domain)))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If user is provided and the actor being followed is not the user,
|
|
// bail. Assume it is forwarded or just informational.
|
|
if theActorBeingFollowed != common.GroupActorURL(h.domain, group.Name) {
|
|
h.logger.Debug("group actor inbox follow bailing: object does not match a composed group actor id",
|
|
zap.String("object", theActorBeingFollowed),
|
|
zap.String("group_actor_id", common.GroupActorURL(h.domain, group.Name)))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If remote group followers is not enabled on the server, and the actor is not local ignore the request
|
|
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
|
|
h.logger.Debug("group actor inbox follow bailing: server disallows remote followers and actor is not local",
|
|
zap.Bool("AllowRemoteGroupFollowers", h.groupConfig.AllowRemoteGroupFollowers),
|
|
zap.String("actor", theActorFollowing),
|
|
zap.String("prefix", common.ActorURLPrefix(h.domain)))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
if !group.AllowRemote && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
|
|
h.logger.Debug("group actor inbox follow bailing: group disallows remote followers and actor is not local",
|
|
zap.Bool("AllowRemoteGroupFollowers", group.AllowRemote),
|
|
zap.String("actor", theActorFollowing),
|
|
zap.String("prefix", common.ActorURLPrefix(h.domain)))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// This just ensures we have a reference of the actor and it's keys for
|
|
// the verify signature step.
|
|
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
|
|
if err != nil {
|
|
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
|
|
h.internalServerErrorJSON(c, err)
|
|
return
|
|
}
|
|
|
|
if err := h.verifySignature(c, group.ActorID); err != nil {
|
|
h.logger.Debug("signature verification failed", zap.Error(err))
|
|
h.unauthorizedJSON(c, err)
|
|
return
|
|
}
|
|
|
|
var theActorFollowingRowID uuid.UUID
|
|
invited := false
|
|
|
|
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
|
|
theActorFollowingRowID, err = tx.ActorRowIDForActorID(ctx, theActorFollowing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.RecordGroupMember(ctx, group.ActorID, theActorFollowingRowID, payload, storage.PendingRelationshipStatus, group.DefaultMemberRole)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
invited, err = tx.IsActorInvitedToGroup(ctx, group.ActorID, theActorFollowingRowID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if txErr != nil {
|
|
h.logger.Debug("error performing lookup transaction", zap.Error(txErr), zap.Strings("error_chain", errors.ErrorChain(txErr)))
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
|
|
if invited || (h.fedConfig.AllowAutoAcceptFollowers && group.AcceptFollowers) {
|
|
var groupActor *storage.Actor
|
|
|
|
txErr = storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
|
|
groupActor, err = tx.GetActor(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tx.UpdateGroupMemberStatus(ctx, groupActor.ID, theActorFollowingRowID, storage.AcceptRelationshipStatus)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if invited {
|
|
err = tx.RemoveGroupInvitation(ctx, groupActor.ID, theActorFollowingRowID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if txErr != nil {
|
|
h.logger.Debug("error performing notify transaction", zap.Error(txErr), zap.Strings("error_chain", errors.ErrorChain(txErr)))
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
|
|
if h.publisherClient != nil {
|
|
acceptActivity := storage.EmptyPayload()
|
|
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
|
|
acceptActivity["@context"] = "https://www.w3.org/ns/activitystreams"
|
|
acceptActivity["actor"] = groupActor.ActorID
|
|
acceptActivity["id"] = acceptActivityID
|
|
acceptActivity["object"] = payload
|
|
acceptActivity["to"] = theActorFollowing
|
|
acceptActivity["type"] = "Accept"
|
|
acceptActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
|
|
acceptPayload := acceptActivity.Bytes()
|
|
|
|
err = h.publisherClient.Send(ctx, theActorFollowingA.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, acceptActivityID, string(acceptPayload))
|
|
if err != nil {
|
|
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
|
|
if err != nil {
|
|
h.logger.Error("failed getting group members", zap.Error(err))
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
for _, follower := range followers {
|
|
joinActivity := storage.EmptyPayload()
|
|
joinActivityID := common.ActivityURL(h.domain, storage.NewV4())
|
|
joinActivity["@context"] = "https://www.w3.org/ns/activitystreams"
|
|
joinActivity["id"] = joinActivityID
|
|
joinActivity["summary"] = ""
|
|
joinActivity["type"] = "Join"
|
|
joinActivity["actor"] = theActorFollowing
|
|
joinActivity["object"] = groupActor.ActorID
|
|
joinActivity["to"] = follower.ActorID
|
|
joinActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
|
|
joinPayload := joinActivity.Bytes()
|
|
|
|
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, joinActivityID, string(joinPayload))
|
|
if err != nil {
|
|
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", joinActivityID), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|
|
|
|
func (h handler) groupActorInboxUndoFollow(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
|
|
if strings.HasPrefix(objectID, common.GroupActorURLPrefix(h.domain)) {
|
|
h.groupActorInboxUndoFollowActor(c, group, payload)
|
|
} else {
|
|
c.Status(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (h handler) groupActorInboxUndoFollowActor(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
ctx := c.Request.Context()
|
|
|
|
// objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
|
|
actorFollowed, _ := storage.JSONDeepString(payload, "object", "object")
|
|
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
|
|
|
|
// If the actor that is doing the following is local, bail.
|
|
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If the actor that is being followed is not local, bail. Assume it is
|
|
// forwarded or just informational.
|
|
if !strings.HasPrefix(actorFollowed, common.GroupActorURLPrefix(h.domain)) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// If user is provided and the actor being followed is not the user,
|
|
// bail. Assume it is forwarded or just informational.
|
|
if actorFollowed != common.GroupActorURL(h.domain, group.Name) {
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
if err := h.verifySignature(c, group.ActorID); err != nil {
|
|
h.unauthorizedJSON(c, err)
|
|
return
|
|
}
|
|
|
|
var groupActor *storage.Actor
|
|
|
|
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
|
|
var err error
|
|
groupActor, err = tx.GetActor(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return tx.RemoveGroupMember(ctx, group.ActorID, actorFollowingRowID)
|
|
})
|
|
if txErr != nil {
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
|
|
if h.publisherClient != nil {
|
|
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
|
|
for _, follower := range followers {
|
|
leaveActivity := storage.EmptyPayload()
|
|
leaveActivityID := common.ActivityURL(h.domain, storage.NewV4())
|
|
leaveActivity["@context"] = "https://www.w3.org/ns/activitystreams"
|
|
leaveActivity["summary"] = ""
|
|
leaveActivity["type"] = "Leave"
|
|
leaveActivity["id"] = leaveActivityID
|
|
leaveActivity["actor"] = actorFollowing
|
|
leaveActivity["object"] = groupActor.ActorID
|
|
leaveActivity["to"] = follower.ActorID
|
|
leaveActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
|
|
joinPayload := leaveActivity.Bytes()
|
|
|
|
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, leaveActivityID, string(joinPayload))
|
|
if err != nil {
|
|
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", leaveActivityID), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|
|
|
|
func (h handler) groupActorInboxUndo(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
innerType, _ := storage.JSONDeepString(payload, "object", "type")
|
|
switch innerType {
|
|
case "Follow":
|
|
h.groupActorInboxUndoFollow(c, group, payload)
|
|
default:
|
|
h.logger.Warn("User received unexpected undo payload type", zap.String("type", innerType), zap.String("group", group.Name))
|
|
c.Status(http.StatusOK)
|
|
}
|
|
}
|
|
|
|
func (h handler) groupActorInboxCreate(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
// Because actors must follow the group, it is safe to assume that we
|
|
// have actor and actor key records for all valid incoming activities.
|
|
if err := h.verifySignature(c, group.ActorID); err != nil {
|
|
h.unauthorizedJSON(c, err)
|
|
return
|
|
}
|
|
|
|
activityID, hasActivityID := storage.JSONString(payload, "id")
|
|
actorID, hasActorID := storage.JSONString(payload, "actor")
|
|
|
|
activityObjectID, hasActivityObjectID := storage.JSONDeepString(payload, "object", "id")
|
|
activityObject, hasActivityObject := storage.JSONMap(payload, "object")
|
|
// activityObjectType, hasActivityObjectType := storage.JSONDeepString(payload, "object", "type")
|
|
|
|
if !hasActivityID || !hasActorID || !hasActivityObjectID || !hasActivityObject {
|
|
h.logger.Info("ignoring invalid activity",
|
|
zap.String("id", activityID),
|
|
zap.String("actor", actorID),
|
|
zap.String("group", group.Name),
|
|
)
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
ctx := c.Request.Context()
|
|
|
|
var groupActor *storage.Actor
|
|
var notifyActors []*storage.Actor
|
|
denied := false
|
|
|
|
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
|
|
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityObjectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
activityActorRowID, err := tx.ActorRowIDForActorID(ctx, actorID)
|
|
if err != nil {
|
|
// This shouldn't actually happen because the actor passed the validate activity phase.
|
|
if errors.Is(err, errors.NewNotFoundError(nil)) {
|
|
denied = true
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
allowed, err := tx.GroupMemberCanSubmit(ctx, group.ActorID, activityActorRowID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !allowed {
|
|
denied = true
|
|
return nil
|
|
}
|
|
|
|
// If we've seen the activity before, the upsert operations for
|
|
// recording the object event, object, user feed, and object reply
|
|
// actions should not result in new records being created. However,
|
|
// we avoid the inbox-forwarding behavior.
|
|
|
|
activityObjectRowID, err := tx.RecordObject(ctx, activityObject, activityObjectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
since, err := tx.MinutesSinceGroupBoost(ctx, group.ActorID, activityObjectRowID)
|
|
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
|
|
return err
|
|
}
|
|
// Get the time in minutes since the object was last boosted. If the
|
|
// value is -1, we have no record of boosting the object. If the
|
|
// value is between 0 and 30, don't send anything to group followers.
|
|
if since != -1 && since < 30 {
|
|
return nil
|
|
}
|
|
|
|
if !activityExists {
|
|
notifyActors, err = tx.GroupMemberActorsForGroupActorID(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
groupActor, err = tx.GetActor(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if txErr != nil {
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
if denied {
|
|
h.unauthorizedJSON(c, nil)
|
|
return
|
|
}
|
|
|
|
if len(notifyActors) > 0 {
|
|
now := time.Now().UTC()
|
|
publishedAt := now.Format("2006-01-02T15:04:05Z")
|
|
|
|
announceID := common.ActivityURL(h.domain, storage.NewV4())
|
|
announce := storage.EmptyPayload()
|
|
announce["@context"] = "https://www.w3.org/ns/activitystreams"
|
|
announce["id"] = announceID
|
|
announce["type"] = "Announce"
|
|
announce["actor"] = groupActor.ActorID
|
|
announce["published"] = publishedAt
|
|
announce["to"] = []string{
|
|
storage.ActorID(groupActor.ActorID).Followers(),
|
|
}
|
|
announce["object"] = activityObjectID
|
|
announcePayload := announce.Bytes()
|
|
|
|
for _, notifyActor := range notifyActors {
|
|
if h.publisherClient != nil {
|
|
err := h.publisherClient.Send(ctx, notifyActor.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, activityID, string(announcePayload))
|
|
if err != nil {
|
|
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|
|
|
|
func (h handler) groupActorInboxAnnounce(c *gin.Context, group storage.Group, payload storage.Payload) {
|
|
if err := h.verifySignature(c, group.ActorID); err != nil {
|
|
h.unauthorizedJSON(c, err)
|
|
return
|
|
}
|
|
|
|
activityID, hasActivityID := storage.JSONString(payload, "id")
|
|
actorID, hasActorID := storage.JSONString(payload, "actor")
|
|
activityObjectID, hasActivityObjectID := storage.JSONString(payload, "object")
|
|
|
|
if !hasActivityID || !hasActorID || !hasActivityObjectID {
|
|
h.logger.Info("ignoring invalid announce activity",
|
|
zap.String("id", activityID),
|
|
zap.String("actor", actorID),
|
|
zap.String("object_id", activityObjectID),
|
|
zap.String("group", group.Name),
|
|
)
|
|
c.Status(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
ctx := c.Request.Context()
|
|
|
|
var objectPayload storage.Payload
|
|
|
|
ac := fed.ActivityClient{
|
|
HTTPClient: h.httpClient,
|
|
Logger: h.logger,
|
|
}
|
|
|
|
var groupActor *storage.Actor
|
|
var notifyActors []*storage.Actor
|
|
|
|
denied := false
|
|
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
|
|
activityActorRowID, err := tx.ActorRowIDForActorID(ctx, actorID)
|
|
if err != nil {
|
|
// This shouldn't actually happen because the actor passed the validate activity phase.
|
|
if errors.Is(err, errors.NewNotFoundError(nil)) {
|
|
denied = true
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
allowed, err := tx.GroupMemberCanSubmit(ctx, group.ActorID, activityActorRowID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !allowed {
|
|
denied = true
|
|
return nil
|
|
}
|
|
|
|
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
groupActor, err = tx.GetActor(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityID))
|
|
|
|
// Nick: I don't like any of this.
|
|
var activityObjectRowID uuid.UUID
|
|
activityObjectRowID, err = tx.ObjectRowIDForObjectID(ctx, activityObjectID)
|
|
if err != nil {
|
|
// If the object is not local, and we don't have it recorded, try to fetch it.
|
|
if errors.Is(err, errors.NewNotFoundError(nil)) && !strings.HasPrefix(activityObjectID, common.ObjectURLPrefix(h.domain)) {
|
|
h.logger.Debug("object row not found and object is not local", zap.String("activity", activityID), zap.String("object", activityObjectID))
|
|
|
|
// If we are able to fetch the object and record the object, keep going
|
|
privateKey, err := group.GetPrivateKey()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, objectPayload, err = ac.GetSignedWithKey(activityObjectID, groupActor.GetKeyID(), privateKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
activityObjectRowID, err = tx.RecordObject(ctx, objectPayload, activityObjectID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
since, err := tx.MinutesSinceGroupBoost(ctx, group.ActorID, activityObjectRowID)
|
|
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
|
|
return err
|
|
}
|
|
// Get the time in minutes since the object was last boosted. If the
|
|
// value is -1, we have no record of boosting the object. If the
|
|
// value is between 0 and 30, don't send anything to group followers.
|
|
if since != -1 && since < 30 {
|
|
return nil
|
|
}
|
|
|
|
// TODO: Verify that the user owns the object if the object is local.
|
|
|
|
_, err = tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// If the server has inbox forwarding enabled, the replied to
|
|
// object is local, and the activity has never been seen, then get
|
|
// a list of all the actors that the object should be forwarded
|
|
// to.
|
|
// TODO: Move this action to a worker. It could be a big list.
|
|
if !activityExists {
|
|
notifyActors, err = tx.GroupMemberActorsForGroupActorID(ctx, group.ActorID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if txErr != nil {
|
|
h.internalServerErrorJSON(c, txErr)
|
|
return
|
|
}
|
|
if denied {
|
|
h.unauthorizedJSON(c, nil)
|
|
return
|
|
}
|
|
|
|
// TODO: If set activityIsNew to true and server enabled inbox
|
|
// forwarding and actor enabled inbox forwarding then find all
|
|
// actors to receive the activity and publish it to them.
|
|
|
|
if len(notifyActors) > 0 {
|
|
now := time.Now().UTC()
|
|
publishedAt := now.Format("2006-01-02T15:04:05Z")
|
|
|
|
announceID := common.ActivityURL(h.domain, storage.NewV4())
|
|
announce := storage.EmptyPayload()
|
|
announce["@context"] = "https://www.w3.org/ns/activitystreams"
|
|
announce["id"] = announceID
|
|
announce["type"] = "Announce"
|
|
announce["actor"] = groupActor.ActorID
|
|
announce["published"] = publishedAt
|
|
announce["to"] = []string{
|
|
storage.ActorID(groupActor.ActorID).Followers(),
|
|
}
|
|
announce["object"] = activityObjectID
|
|
announcePayload := announce.Bytes()
|
|
|
|
for _, notifyActor := range notifyActors {
|
|
if h.publisherClient != nil {
|
|
err := h.publisherClient.Send(ctx, notifyActor.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, activityID, string(announcePayload))
|
|
if err != nil {
|
|
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|