Implemented object replies endpoint, added reply tracking to compose/inbox/crawl, and added replies to compose note structure. Part of #35.

This commit is contained in:
Nick Gerakines 2020-03-15 16:01:05 -04:00
parent 717fe40001
commit 3d75b3b9b0
10 changed files with 233 additions and 11 deletions

23
common/urls.go Normal file
View File

@ -0,0 +1,23 @@
package common
import (
"fmt"
"github.com/gofrs/uuid"
)
func ActivityURL(domain string, activityRowID uuid.UUID) string {
return fmt.Sprintf("https://%s/activity/%s", domain, activityRowID)
}
func ObjectURL(domain string, objectRowID uuid.UUID) string {
return fmt.Sprintf("https://%s/object/%s", domain, objectRowID)
}
func ObjectRepliesURL(domain string, objectRowID uuid.UUID) string {
return fmt.Sprintf("https://%s/object/%s/replies", domain, objectRowID)
}
func ObjectRepliesPageURL(domain string, objectRowID uuid.UUID, page int) string {
return fmt.Sprintf("https://%s/object/%s/replies?page=%d", domain, objectRowID, page)
}

View File

@ -74,6 +74,37 @@ func ldJsonGet(client common.HTTPClient, location string) (string, storage.Paylo
return string(body), p, nil
}
func JRDJsonGet(client common.HTTPClient, location string) (string, storage.Payload, error) {
request, err := http.NewRequest("GET", location, nil)
if err != nil {
return "", nil, err
}
request.Header.Add("Accept", "application/jrd+json")
request.Header.Set("User-Agent", g.UserAgent())
resp, err := client.Do(request)
if err != nil {
return "", nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return "", nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1*1024*1024))
if err != nil {
return "", nil, err
}
p, err := storage.PayloadFromBytes(body)
if err != nil {
return "", nil, err
}
return string(body), p, nil
}
func ldJsonGetSigned(client common.HTTPClient, location string, signer httpsig.Signer, keyID string, privateKey *rsa.PrivateKey) (string, storage.Payload, error) {
request, err := http.NewRequest("GET", location, nil)
if err != nil {

View File

@ -60,6 +60,8 @@ func (c Crawler) Start(user *storage.User, seed string) ([]string, []string, err
counter := 0
replies := make(map[string]string)
for !activityQueue.Empty() {
if counter > c.MaxDepth {
break
@ -97,6 +99,7 @@ func (c Crawler) Start(user *storage.User, seed string) ([]string, []string, err
inReplyTo, hasInReplyTo := storage.JSONString(payload, "inReplyTo")
if hasInReplyTo {
activityQueue.Add(inReplyTo)
replies[location] = inReplyTo
}
allActors := ActorsFromObject(payload)
@ -120,6 +123,18 @@ func (c Crawler) Start(user *storage.User, seed string) ([]string, []string, err
counter++
}
for obj, parent := range replies {
objRowID, err := c.Storage.ObjectRowIDForObjectID(ctx, obj)
if err != nil {
continue
}
parentRowID, err := c.Storage.ObjectRowIDForObjectID(ctx, parent)
if err != nil {
continue
}
_, _ = c.Storage.RecordObjectReply(ctx, objRowID, parentRowID)
}
counter = 0
for !actorQueue.Empty() {

View File

@ -26,7 +26,7 @@ func (client WebFingerClient) Fetch(location string) (storage.Payload, error) {
}
client.Logger.Debug("Sending webfinger request", zap.String("url", destination))
_, payload, err := ldJsonGet(client.HTTPClient, destination)
_, payload, err := JRDJsonGet(client.HTTPClient, destination)
return payload, err
}

View File

@ -49,6 +49,11 @@ type ObjectStorage interface {
RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
UserAnnouncementCounts(ctx context.Context, userID uuid.UUID, objectIDs []uuid.UUID) ([]Count, error)
IsUserPartOfConversation(ctx context.Context, userID uuid.UUID, conversation string) (bool, error)
CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error)
ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error)
RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
}
func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error) {
@ -96,8 +101,18 @@ func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string)
return s.objectPayloads(ctx, query, tag)
}
func (s pgStorage) CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1`
return s.rowCount(s.db, ctx, query, objectID)
}
func (s pgStorage) ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1 ORDER BY r.created_at ASC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, objectID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.*) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
return s.rowCount(s.db, ctx, query, userID)
}
@ -121,8 +136,6 @@ func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID u
}
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
fmt.Println(query)
fmt.Println(args...)
var results []Payload
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
@ -291,3 +304,16 @@ func (s pgStorage) IsUserPartOfConversation(ctx context.Context, userID uuid.UUI
}
return total > 0, nil
}
func (s pgStorage) RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectReplyAll(ctx, rowID, now, now, objectID, parentObjectID)
}
func (s pgStorage) RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO object_replies (id, created_at, updated_at, object_id, parent_object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_replies_reply_uindex DO UPDATE SET updated_at = now() RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, parentObjectID).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}

View File

@ -230,6 +230,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
}
root.GET("/activity/:activity", h.getActivity)
root.GET("/object/:object/replies", h.getObjectReplies)
root.GET("/object/:object", h.getObject)
root.GET("/tags/:tag", h.getTaggedObjects)

View File

@ -11,6 +11,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/kr/pretty"
"github.com/yukimochi/httpsig"
"go.uber.org/zap"
@ -47,12 +48,16 @@ func (h handler) actorInbox(c *gin.Context) {
return
}
fmt.Println(string(body))
payload, err := storage.PayloadFromBytes(body)
if err != nil {
h.badRequestJSON(c, err)
return
}
pretty.Println(payload)
if skipActorInbox(payload) {
h.logger.Debug("actor inbox can ignore message", zap.String("user", name))
c.Status(http.StatusOK)
@ -337,6 +342,10 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
conversation, hasConversation := storage.JSONDeepString(payload, "object", "conversation")
inReplyTo, hasInReplyTo := storage.JSONDeepString(payload, "object", "inReplyTo")
localObjectPrefix := fmt.Sprintf("https://%s/object/", h.domain)
ctx := c.Request.Context()
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
@ -358,6 +367,16 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
return err
}
}
if hasInReplyTo && strings.HasPrefix(inReplyTo, localObjectPrefix) {
replyRowID, err := storage.ObjectRowIDForObjectID(ctx, inReplyTo)
if err != nil {
return err
}
_, err = storage.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
if err != nil {
return err
}
}
return nil
})
if err != nil {

View File

@ -6,6 +6,7 @@ import (
"net/http"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"github.com/ngerakines/tavern/avatar"
)
@ -49,6 +50,8 @@ func (h handler) avatar(c *gin.Context) ([]byte, error) {
domain := c.Param("domain")
size := intParam(c, "size", 120)
h.logger.Debug("avatar", zap.String("name", name), zap.String("domain", domain), zap.Int("size", size))
if len(name) == 0 {
name = domain
domain = h.domain

View File

@ -133,6 +133,8 @@ func (h handler) createNote(c *gin.Context) {
inReplyTo := c.PostForm("inReplyTo")
content := c.PostForm("content")
localObjectPrefix := fmt.Sprintf("https://%s/object/", h.domain)
if advanced {
broadcastTo, _ = strconv.ParseBool(c.PostForm("broadcastTo"))
broadcastCC, _ = strconv.ParseBool(c.PostForm("broadcastCc"))
@ -153,7 +155,7 @@ func (h handler) createNote(c *gin.Context) {
return
}
activityURL := fmt.Sprintf("https://%s/activity/%s", h.domain, activityID)
activityURL := common.ActivityURL(h.domain, activityID)
publishedAt := now.Format("2006-01-02T15:04:05Z")
@ -227,7 +229,7 @@ func (h handler) createNote(c *gin.Context) {
// The spec is really vague about how note context can be used.
// note["context"] = conversation
note["conversation"] = conversation
noteURL := fmt.Sprintf("https://%s/object/%s", h.domain, createNoteID)
noteURL := common.ObjectURL(h.domain, createNoteID)
note["id"] = noteURL
note["published"] = publishedAt
note["summary"] = summary
@ -240,6 +242,27 @@ func (h handler) createNote(c *gin.Context) {
note["type"] = "Note"
note["url"] = activityURL
replies := storage.EmptyPayload()
replies["id"] = common.ObjectRepliesURL(h.domain, createNoteID)
replies["id"] = "OrderedCollection"
replies["totalItems"] = 0
replies["published"] = publishedAt
replies["first"] = common.ObjectRepliesPageURL(h.domain, createNoteID, 1)
note["replies"] = replies
/*
"replies": {
"id": "https://tavern.town/objects/799b9617-dace-4510-9b95-7b84f747008a/replies",
"type": "OrderedCollection",
"totalItems": 0,
"first": "https://tavern.town/objects/799b9617-dace-4510-9b95-7b84f747008a/replies?page=1",
"published": "2020-03-12T00:00:00Z",
"updated": "2020-03-12T00:00:00Z"
}
*/
form, err := c.MultipartForm()
if err == nil {
@ -376,6 +399,17 @@ func (h handler) createNote(c *gin.Context) {
return err
}
if len(inReplyTo) > 0 && strings.HasPrefix(inReplyTo, localObjectPrefix) {
replyRowID, err := storage.ObjectRowIDForObjectID(ctx, inReplyTo)
if err != nil {
return err
}
_, err = storage.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
if err != nil {
return err
}
}
_, err = storage.RecordUserObjectEvent(ctx, user.ID, activityRowID, activityObjectRowID, isPublic)
if err != nil {
return err

View File

@ -2,21 +2,18 @@ package web
import (
"fmt"
"math"
"net/http"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/storage"
)
func (h handler) getObject(c *gin.Context) {
if !requireAccept(c, "application/ld+json") {
h.writeJSONError(c, http.StatusNotAcceptable, fmt.Errorf("client does not indicate that they accept application/jrd+json responses"))
return
}
objectUUID, err := uuid.FromString(c.Param("object"))
if err != nil {
h.notFoundJSON(c, err)
@ -54,3 +51,76 @@ func (h handler) getObject(c *gin.Context) {
h.writeJSONLD(c, http.StatusOK, objectPayload)
}
func (h handler) getObjectReplies(c *gin.Context) {
ctx := c.Request.Context()
objectID := common.ObjectURL(h.domain, uuid.FromStringOrNil(c.Param("object")))
objectRowID, err := h.storage.ObjectRowIDForObjectID(ctx, objectID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
page := intParam(c, "page", 0)
limit := 50
total, err := h.storage.CountObjectPayloadsInObjectReplies(ctx, objectRowID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
lastPage := 1
if total > limit {
lastPage = int(math.Ceil(float64(total) / float64(limit)))
}
response := storage.EmptyPayload()
if page == 0 {
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["id"] = common.ObjectRepliesURL(h.domain, objectRowID)
response["type"] = "OrderedCollection"
response["totalItems"] = total
if total > 0 {
response["first"] = common.ObjectRepliesPageURL(h.domain, objectRowID, 1)
response["last"] = common.ObjectRepliesPageURL(h.domain, objectRowID, lastPage)
}
h.writeJSONLD(c, http.StatusOK, response)
return
}
response["id"] = common.ObjectRepliesPageURL(h.domain, objectRowID, page)
response["type"] = "OrderedCollectionPage"
response["totalItems"] = total
response["partOf"] = common.ObjectRepliesURL(h.domain, objectRowID)
if total == 0 || page > lastPage {
response["orderedItems"] = []interface{}{}
h.writeJSONLD(c, http.StatusOK, response)
return
}
objects, err := h.storage.ListObjectPayloadsInObjectReplies(ctx, objectRowID, limit, (page-1)*limit)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if len(objects) > 0 {
response["orderedItems"] = objects
}
if page > 1 {
response["prev"] = common.ObjectRepliesPageURL(h.domain, objectRowID, page-1)
}
if page < lastPage {
response["next"] = common.ObjectRepliesPageURL(h.domain, objectRowID, page+1)
}
h.writeJSONLD(c, http.StatusOK, response)
}