tavern/web/handler_user_inbox.go

1153 lines
34 KiB
Go

package web
import (
"context"
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/kr/pretty"
"github.com/yukimochi/httpsig"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/fed"
"github.com/ngerakines/tavern/storage"
)
const maxInboxRequestSize = 1 * 1024 * 1024
func (h handler) userActorInbox(c *gin.Context) {
name := c.Param("name")
user, err := h.storage.GetUserByName(c.Request.Context(), name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
defer c.Request.Body.Close()
body, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, maxInboxRequestSize+1))
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if len(body) > maxInboxRequestSize {
h.writeJSONError(c, http.StatusRequestEntityTooLarge, err)
return
}
fmt.Println(string(body))
payload, err := storage.PayloadFromBytes(body)
if err != nil {
h.badRequestJSON(c, err)
return
}
pretty.Println(payload)
if skipActorInbox(payload) {
h.logger.Debug("actor inbox can ignore message", zap.String("user", name))
c.Status(http.StatusOK)
return
}
payloadType, _ := storage.JSONString(payload, "type")
actor, _ := storage.JSONString(payload, "actor")
if len(actor) > 0 && !allow(c.Request.Context(), h.logger, h.storage, h.serverActorRowID, user.ActorID, actor) {
h.logger.Debug("actor denied", zap.String("user", name), zap.String("actor", actor))
c.Status(http.StatusOK)
return
}
switch payloadType {
case "Follow":
h.actorInboxFollow(c, user, payload)
case "Undo":
h.actorInboxUndo(c, user, payload)
case "Accept":
h.actorInboxAccept(c, user, payload)
case "Reject":
h.actorInboxReject(c, user, payload)
case "Create":
h.actorInboxCreate(c, user, payload, body)
case "Announce":
h.actorInboxAnnounce(c, user, payload, body)
case "Delete":
h.actorInboxDelete(c, user, payload, body)
case "Like":
h.actorInboxLike(c, user, payload)
default:
h.logger.Warn("User received unexpected payload type", zap.String("type", payloadType), zap.String("user", name))
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxLike(c *gin.Context, user *storage.User, payload storage.Payload) {
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
actor, hasActor := storage.JSONString(payload, "actor")
object, hasObject := storage.JSONString(payload, "object")
if !hasActor || !hasObject {
h.logger.Warn("unable to process like activity",
zap.String("actor", actor),
zap.String("object", object),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
objectRowID, err := tx.ObjectRowIDForObjectID(ctx, object)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
return nil
}
return err
}
actorRowID, err := tx.ActorRowIDForActorID(ctx, actor)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
return nil
}
return err
}
_, err = tx.RecordLike(ctx, actorRowID, objectRowID, payload)
return err
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxAccept(c *gin.Context, user *storage.User, payload storage.Payload) {
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
userActorID := storage.NewActorID(user.Name, h.domain)
self, hasActor := storage.JSONDeepString(payload, "object", "actor")
target, hasTarget := storage.JSONDeepString(payload, "object", "object")
if !hasActor || !hasTarget || self != string(userActorID) {
h.logger.Warn("unable to process accept request",
zap.String("self", self),
zap.String("actor", target),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
remoteActorID, err := h.storage.ActorRowIDForActorID(c.Request.Context(), target)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.UpdateFollowingAccepted(c.Request.Context(), user.ID, remoteActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxReject(c *gin.Context, user *storage.User, payload storage.Payload) {
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
userActorID := storage.NewActorID(user.Name, h.domain)
obj, hasObject := storage.JSONMap(payload, "object")
self, hasActor := storage.JSONString(obj, "actor")
target, hasTarget := storage.JSONString(obj, "object")
if !hasObject || !hasActor || !hasTarget || self != string(userActorID) {
h.logger.Warn("unable to process reject request",
zap.String("self", self),
zap.String("actor", target),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
remoteActorID, err := h.storage.ActorRowIDForActorID(c.Request.Context(), target)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.UpdateFollowingRejected(c.Request.Context(), user.ID, remoteActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxFollow(c *gin.Context, user *storage.User, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
if strings.HasPrefix(objectID, common.ActorURLPrefix(h.domain)) {
h.actorInboxFollowActor(c, user, payload)
} else if h.fedConfig.AllowFollowObject && strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
h.actorInboxFollowNote(c, user, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxFollowActor(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
theActorBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
theActorFollowing, _ := storage.JSONString(payload, "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(theActorBeingFollowed, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if theActorBeingFollowed != common.ActorURL(h.domain, user.Name) {
c.Status(http.StatusOK)
return
}
// This just ensures we have a reference of the actor and it's keys for
// the verify signature step.
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
h.internalServerErrorJSON(c, err)
return
}
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
theActorFollowingRowID, err := h.storage.ActorRowIDForActorID(ctx, theActorFollowing)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.CreatePendingFollower(ctx, user.ID, theActorFollowingRowID, payload)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if h.fedConfig.AllowAutoAcceptFollowers && user.AcceptFollowers {
response := storage.EmptyPayload()
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["actor"] = storage.NewActorID(user.Name, h.domain)
response["id"] = acceptActivityID
response["object"] = payload
response["to"] = theActorFollowing
response["type"] = "Accept"
response["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
responsePayload := response.Bytes()
err = h.storage.UpdateFollowerApproved(ctx, user.ID, theActorFollowingRowID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), theActorFollowingA.GetInbox(), userActor.GetKeyID(), user.PrivateKey, acceptActivityID, string(responsePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), theActorFollowingA, responsePayload)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxFollowNote(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
theObjectBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
theActorFollowing, _ := storage.JSONString(payload, "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// This just ensures we have a reference of the actor and it's keys for
// the verify signature step.
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
h.internalServerErrorJSON(c, err)
return
}
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
var theObjectBeingFollowedRowID uuid.UUID
var userActor *storage.Actor
bail := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
theObjectBeingFollowedRowID, err = tx.ObjectRowIDForObjectID(ctx, theObjectBeingFollowed)
if err != nil {
return err
}
objectPayload, err := tx.ObjectPayloadByObjectRowID(ctx, theObjectBeingFollowedRowID)
if err != nil {
return err
}
// Verify the object's attributedTo actor is the same as the owner of the
// inbox this follow activity was sent to. Otherwise assume it is
// informational.
attributedTo, hasAttributedTo := storage.JSONString(objectPayload, "attributedTo")
if !hasAttributedTo || attributedTo != common.ActorURL(h.domain, user.Name) {
bail = true
return nil
}
_, err = tx.RecordObjectSubscription(ctx, theObjectBeingFollowedRowID, theActorFollowingA.ID)
if err != nil {
return err
}
userActor, err = tx.GetActor(ctx, user.ActorID)
if err != nil {
return err
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if bail {
c.Status(http.StatusOK)
return
}
response := storage.EmptyPayload()
acceptActivityID := fmt.Sprintf("https://%s/activity/%s", h.domain, storage.NewV4())
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["actor"] = storage.NewActorID(user.Name, h.domain)
response["id"] = acceptActivityID
response["object"] = payload
response["to"] = theActorFollowingA.GetID()
response["type"] = "Accept"
response["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
responsePayload := response.Bytes()
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), theActorFollowingA.GetInbox(), userActor.GetKeyID(), user.PrivateKey, acceptActivityID, string(responsePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), theActorFollowingA, responsePayload)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndoFollow(c *gin.Context, user *storage.User, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
if strings.HasPrefix(objectID, common.ActorURLPrefix(h.domain)) {
h.actorInboxUndoFollowActor(c, user, payload)
} else if h.fedConfig.AllowFollowObject && strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
h.actorInboxUndoFollowNote(c, user, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxUndoLike(c *gin.Context, user *storage.User, payload storage.Payload) {
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
actor, hasActor := storage.JSONString(payload, "actor")
object, hasObject := storage.JSONDeepString(payload, "object", "object")
if !hasActor || !hasObject {
h.logger.Warn("unable to process undo like activity",
zap.String("actor", actor),
zap.String("object", object),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
objectRowID, err := tx.ObjectRowIDForObjectID(ctx, object)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
return nil
}
return err
}
actorRowID, err := tx.ActorRowIDForActorID(ctx, actor)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
return nil
}
return err
}
return tx.RemoveObjectLike(ctx, actorRowID, objectRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndoFollowActor(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
// objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
actorFollowed, _ := storage.JSONDeepString(payload, "object", "object")
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(actorFollowed, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if actorFollowed != common.ActorURL(h.domain, user.Name) {
c.Status(http.StatusOK)
return
}
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
}
return tx.RemoveFollower(ctx, user.ID, actorFollowingRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndoFollowNote(c *gin.Context, user *storage.User, payload storage.Payload) {
ctx := c.Request.Context()
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(objectID, common.ObjectURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
if err := h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
}
objectRowID, err := tx.ObjectRowIDForObjectID(c.Request.Context(), objectID)
if err != nil {
return err
}
objectPayload, err := tx.ObjectPayloadByObjectRowID(ctx, objectRowID)
if err != nil {
return err
}
// Verify the object's attributedTo actor is the same as the owner of the
// inbox this follow activity was sent to. Otherwise assume it is
// informational.
attributedTo, hasAttributedTo := storage.JSONString(objectPayload, "attributedTo")
if !hasAttributedTo || attributedTo != common.ActorURL(h.domain, user.Name) {
return nil
}
return tx.RemoveObjectSubscription(ctx, objectRowID, actorFollowingRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxUndo(c *gin.Context, user *storage.User, payload storage.Payload) {
innerType, _ := storage.JSONDeepString(payload, "object", "type")
switch innerType {
case "Follow":
h.actorInboxUndoFollow(c, user, payload)
case "Like":
h.actorInboxUndoLike(c, user, payload)
default:
h.logger.Warn("User received unexpected undo payload type", zap.String("type", innerType), zap.String("user", user.Name))
c.Status(http.StatusOK)
}
}
func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload storage.Payload, raw []byte) {
isRelevant, err := h.isActivityRelevant(c.Request.Context(), payload, user)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if !isRelevant {
c.Status(http.StatusOK)
return
}
if err = h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
activityID, hasActivityID := storage.JSONString(payload, "id")
actorID, hasActorID := storage.JSONString(payload, "actor")
activityObjectID, hasActivityObjectID := storage.JSONDeepString(payload, "object", "id")
activityObject, hasActivityObject := storage.JSONMap(payload, "object")
activityObjectType, hasActivityObjectType := storage.JSONDeepString(payload, "object", "type")
if !hasActivityID || !hasActorID || !hasActivityObjectID || !hasActivityObject || !hasActivityObjectType {
h.logger.Info("ignoring invalid create note activity",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.String("object_id", activityObjectID),
zap.String("object_type", activityObjectType),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
if activityObjectType != "Note" {
h.logger.Warn("unknown create object type",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.String("object_id", activityObjectID),
zap.String("object_type", activityObjectType),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
inReplyTo, hasInReplyTo := storage.JSONDeepString(payload, "object", "inReplyTo")
ctx := c.Request.Context()
var notifyActors []*storage.Actor
err = storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityObjectID)
if err != nil {
return err
}
// If we've seen the activity before, the upsert operations for
// recording the object event, object, user feed, and object reply
// actions should not result in new records being created. However,
// we avoid the inbox-forwarding behavior.
activityObjectRowID, err := tx.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
return err
}
activityRowID, err := tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = tx.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
if hasInReplyTo {
replyRowID, err := tx.ObjectRowIDForObjectID(ctx, inReplyTo)
if err == nil {
_, err = tx.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
if err != nil {
return err
}
}
// If the server has inbox forwarding enabled, the replied to
// object is local, and the activity has never been seen, then get
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && strings.HasPrefix(inReplyTo, common.ObjectURLPrefix(h.domain)) && !activityExists {
notifyActors, err = tx.ActorsSubscribedToObject(ctx, replyRowID)
if err != nil {
return err
}
}
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if err = h.crawlQueue.Add(strings.Join([]string{user.ID.String(), activityObjectID}, ",")); err != nil {
h.logger.Warn("error queueing crawl work", zap.Error(err))
}
obj, ok := storage.JSONMap(activityObject, "object")
if ok {
attachments, ok := storage.JSONMapList(obj, "attachment")
if ok {
for _, attachment := range attachments {
mediaType, hasMediaType := storage.JSONString(attachment, "mediaType")
url, hasURL := storage.JSONString(attachment, "url")
if hasMediaType && hasURL && strings.HasPrefix(url, "https://") {
if mediaType == "image/jpeg" || mediaType == "image/png" {
if err = h.assetQueue.Add(url); err != nil {
h.logger.Warn("error queueing asset download work", zap.Error(err))
}
}
}
}
}
}
if len(notifyActors) > 0 {
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), notifyActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(raw))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
err = nc.SendToInbox(ctx, h.userActor(user, userActor), notifyActor, raw)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload storage.Payload, raw []byte) {
isRelevant, err := h.isActivityRelevant(c.Request.Context(), payload, user)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if !isRelevant {
h.logger.Debug("ignoring announce that isn't relevant.")
c.Status(http.StatusOK)
return
}
if err = h.verifySignature(c, user.ActorID); err != nil {
h.unauthorizedJSON(c, err)
return
}
userActor, err := h.storage.GetActor(c.Request.Context(), user.ActorID)
if err != nil {
h.hardFail(c, err)
return
}
activityID, hasActivityID := storage.JSONString(payload, "id")
actorID, hasActorID := storage.JSONString(payload, "actor")
activityObjectID, hasActivityObjectID := storage.JSONString(payload, "object")
if !hasActivityID || !hasActorID || !hasActivityObjectID {
h.logger.Info("ignoring invalid announce activity",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.String("object_id", activityObjectID),
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
var objectPayload storage.Payload
ac := fed.ActivityClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
var senderActor *storage.Actor
var notifyActors []*storage.Actor
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityID)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityID))
senderActor, err = tx.GetActorByActorID(ctx, actorID)
if err != nil {
return err
}
actorType, _ := storage.JSONString(senderActor.Payload, "type")
isGroup := actorType == "Group"
// Nick: I don't like any of this.
var activityObjectRowID uuid.UUID
activityObjectRowID, err = tx.ObjectRowIDForObjectID(ctx, activityObjectID)
if err != nil {
// If the object is not local, and we don't have it recorded, try to fetch it.
if errors.Is(err, errors.NewNotFoundError(nil)) && !strings.HasPrefix(activityObjectID, common.ObjectURLPrefix(h.domain)) {
h.logger.Debug("object row not found and object is not local", zap.String("activity", activityID), zap.String("object", activityObjectID))
// If we are able to fetch the object and record the object, keep going
_, objectPayload, err = ac.GetSigned(activityObjectID, h.userActor(user, userActor))
if err != nil {
return err
}
activityObjectRowID, err = tx.RecordObject(ctx, objectPayload, activityObjectID)
if err != nil {
return err
}
} else {
return err
}
}
// TODO: Verify that the user owns the object if the object is local.
activityRowID, err := tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = tx.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
// If the server has inbox forwarding enabled, the replied to
// object is local, and the activity has never been seen, then get
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists && !isGroup {
notifyActors, err = tx.ActorsSubscribedToObject(ctx, activityObjectRowID)
if err != nil {
return err
}
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
// TODO: If set activityIsNew to true and server enabled inbox
// forwarding and actor enabled inbox forwarding then find all
// actors to receive the activity and publish it to them.
if h.fedConfig.AllowInboxForwarding && len(notifyActors) > 0 {
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), notifyActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(raw))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
err = nc.SendToInbox(ctx, h.userActor(user, userActor), notifyActor, raw)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
// Crawl last so that we don't introduce weird race conditions between the crawler and inbox processing.
if err = h.crawlQueue.Add(strings.Join([]string{user.ID.String(), activityObjectID}, ",")); err != nil {
h.logger.Warn("error queueing crawl work", zap.Error(err))
}
c.Status(http.StatusOK)
}
func (h handler) actorInboxDelete(c *gin.Context, user *storage.User, payload storage.Payload, raw []byte) {
err := h.verifySignature(c, user.ActorID)
if err != nil {
h.unauthorizedJSON(c, err)
return
}
ctx := c.Request.Context()
var objectID string
var objectRowID uuid.UUID
foundObjectID := false
objectIDs := storage.CollectJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
for _, objectID = range objectIDs {
objectRowID, err = h.storage.ObjectRowIDForObjectID(ctx, objectID)
if err == nil {
foundObjectID = true
break
}
}
if !foundObjectID {
h.logger.Warn("unable to process delete request",
zap.String("user", user.Name),
)
c.Status(http.StatusOK)
return
}
activityURL, _ := storage.JSONString(payload, "id")
var tombstone storage.Payload
if object, hasObject := storage.JSONMap(payload, "object"); hasObject {
tombstone = object
}
if tombstone == nil {
// TODO: Replace this with a fetch of the object to get a proper tombstone from the remote instance.
tombstone = storage.EmptyPayload()
tombstone["type"] = "Tombstone"
// TODO: Ensure that the object is, in fact, a note.
tombstone["formerType"] = "Note"
tombstone["id"] = objectID
now := time.Now().UTC()
deletedAt := now.Format("2006-01-02T15:04:05Z")
tombstone["deleted"] = deletedAt
}
var notifyActors []*storage.Actor
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityURL)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityURL))
err = tx.UpdateObjectPayload(ctx, objectRowID, tombstone)
if err != nil {
return err
}
_, err = tx.RecordObjectEvent(ctx, activityURL, objectRowID, payload)
if err != nil {
return err
}
parentObjectRowID, err := tx.ParentObjectID(ctx, objectRowID)
// If there was an error and it isn't a not-found error, fail.
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
return err
}
// If the server has inbox forwarding enabled and the activity has
// never been seen, then get a list of all the actors that the object
// should be forwarded to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists && parentObjectRowID != uuid.Nil {
notifyActors, err = tx.ActorsSubscribedToObject(ctx, parentObjectRowID)
if err != nil {
return err
}
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if h.fedConfig.AllowInboxForwarding && len(notifyActors) > 0 {
userActor, err := h.storage.GetActor(ctx, user.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err = h.publisherClient.Send(context.Background(), notifyActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityURL, string(raw))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
}
} else {
err = nc.SendToInbox(ctx, h.userActor(user, userActor), notifyActor, raw)
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityURL), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}
func skipActorInbox(j storage.Payload) bool {
t, _ := storage.JSONString(j, "type")
a, _ := storage.JSONString(j, "actor")
o, _ := storage.JSONString(j, "object")
if t == "Delete" && a == o {
return true
}
return false
}
func (h handler) isActivityRelevant(ctx context.Context, activity storage.Payload, user *storage.User) (bool, error) {
actor, ok := storage.JSONString(activity, "actor")
if ok {
if err := h.webFingerQueue.Add(actor); err != nil {
h.logger.Warn("unable to add actor to webfinger queue",
zap.Error(err),
zap.String("actor", actor))
}
remoteActorID, err := h.storage.ActorRowIDForActorID(ctx, actor)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
return false, nil
}
h.logger.Debug("error looking up actor to see if activity is relevant",
zap.String("user", user.Name),
zap.String("actor", actor),
zap.Error(err))
return false, err
}
isFollowing, err := h.storage.IsInNetwork(ctx, user.ID, remoteActorID)
if err != nil {
h.logger.Debug("error determining if user follows actor",
zap.String("user", user.Name),
zap.String("actor", actor),
zap.Error(err))
return false, err
}
if isFollowing {
return true, nil
}
}
actorID := storage.NewActorID(user.Name, h.domain)
obj, ok := storage.JSONMap(activity, "object")
if ok {
to, ok := storage.JSONStrings(obj, "to")
if ok {
for _, i := range to {
if i == string(actorID) {
return true, nil
}
}
}
cc, ok := storage.JSONStrings(obj, "cc")
if ok {
for _, i := range cc {
if i == string(actorID) {
return true, nil
}
}
}
tag, ok := storage.JSONMapList(obj, "tag")
if ok {
for _, i := range tag {
tagType, hasTagType := storage.JSONString(i, "type")
href, hasHref := storage.JSONString(i, "href")
if hasTagType && hasHref && tagType == "Mention" && href == string(actorID) {
return true, nil
}
}
}
inReplyTo, ok := storage.JSONString(obj, "inReplyTo")
if ok {
exists, err := h.storage.ExistsObjectInUserFeedByObjectID(ctx, inReplyTo)
if err == nil && exists {
return true, nil
}
}
}
return false, nil
}
func (h handler) verifySignature(c *gin.Context, scope uuid.UUID) error {
// Host header isn't set for some reason.
c.Request.Header.Add("Host", c.Request.Host)
verifier, err := httpsig.NewVerifier(c.Request)
if err != nil {
return err
}
ctx := c.Request.Context()
var key *storage.Key
var actorID string
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
var err error
key, err = h.storage.GetKey(c.Request.Context(), verifier.KeyId())
if err != nil {
return err
}
actorID, err = tx.ActorIDForActorRowID(ctx, key.Actor)
if err != nil {
return err
}
return nil
})
if txErr != nil {
return txErr
}
if !allow(c.Request.Context(), h.logger, h.storage, h.serverActorRowID, scope, actorID) {
return errors.NewAccessDeniedError(nil)
}
publicKey, err := key.GetDecodedPublicKey()
if err != nil {
return err
}
if err = verifier.Verify(publicKey, httpsig.RSA_SHA256); err != nil {
return err
}
return nil
}