tavern/storage/objects.go

428 lines
25 KiB
Go

package storage
import (
"context"
"database/sql"
"fmt"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
)
type ObjectStorage interface {
CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error)
CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error)
CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error)
CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error)
CountObjectPayloadsInTagFeed(ctx context.Context, tag string) (int, error)
CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error)
ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error)
ListObjectPayloadsInLocalFeed(ctx context.Context, limit int, offset int) ([]Payload, error)
ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error)
ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error)
ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error)
ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error)
ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error)
ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error)
ObjectPayloads(ctx context.Context, objectIDs []uuid.UUID) ([]Payload, error)
ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error)
ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error)
ObjectRowIDsForObjectIDs(ctx context.Context, objectID []string) (map[string]uuid.UUID, error)
RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error)
RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error)
RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error)
RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error)
RecordObjectAnnouncement(ctx context.Context, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error)
RecordObjectAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error)
RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
UpdateObjectPayload(ctx context.Context, objectRowID uuid.UUID, payload Payload) error
CountObjectBoostsByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
CountObjectBoostsByActorObjectIDs(ctx context.Context, actorRowID uuid.UUID, objectRowIDs []uuid.UUID) ([]Count, error)
CountObjectRepliesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
CountObjectLikesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error)
ObjectParentsByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
ObjectChildrenByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error)
RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error
ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error)
ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error)
ParentObjectID(context.Context, uuid.UUID) (uuid.UUID, error)
}
func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error) {
if len(objectIDs) == 0 {
return nil, nil
}
query := fmt.Sprintf(`SELECT o.payload FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.objectPayloads(ctx, query, common.StringsToInterfaces(objectIDs)...)
}
func (s pgStorage) CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(oe.payload) FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT oe.payload FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1 ORDER BY uf.created_at DESC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT oe.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id WHERE uoe.user_id = $1 ORDER BY uoe.created_at DESC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.public = true`)
}
func (s pgStorage) ListObjectPayloadsInLocalFeed(ctx context.Context, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id WHERE uoe.public = true ORDER BY uoe.created_at DESC LIMIT $1 OFFSET $2`
return s.objectPayloads(ctx, query, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInTagFeed(ctx context.Context, tag string) (int, error) {
query := `SELECT COUNT(o.payload) FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, tag)
}
func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error) {
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
return s.objectPayloads(ctx, query, tag)
}
func (s pgStorage) CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1 ORDER BY r.created_at DESC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, objectID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, userID)
}
func (s pgStorage) ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true ORDER BY uoe.created_at DESC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_conversations uc WHERE uc.user_id = $1 AND uc.thread_id = $2`, userID, threadID)
}
func (s pgStorage) ListObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID, limit, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_conversations uc ON uc.object_id = o.id WHERE uc.user_id = $1 AND uc.thread_id = $2 ORDER BY uc.created_at DESC LIMIT $3 OFFSET $4`
return s.objectPayloads(ctx, query, userID, threadID, limit, offset)
}
func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id INNER JOIN object_tags ot on o.id = ot.object_id WHERE uoe.user_id = $1 AND uoe.public = true AND ot.tag = $2 ORDER BY uoe.created_at DESC LIMIT $3 OFFSET $4`
return s.objectPayloads(ctx, query, userID, tag, limit, offset)
}
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
var results []Payload
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewObjectSelectFailedError(err)
}
defer rows.Close()
for rows.Next() {
var data Payload
if err := rows.Scan(&data); err != nil {
return nil, errors.NewInvalidObjectError(err)
}
results = append(results, data)
}
return results, nil
}
func (s pgStorage) ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE object_id = $1`, objectID)
}
func (s pgStorage) ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE id = $1`, objectRowID)
}
func (s pgStorage) ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM object_events WHERE activity_id = $1`, activityID)
}
func (s pgStorage) objectPayload(ctx context.Context, query string, args ...interface{}) (Payload, error) {
var data Payload
err := s.db.QueryRowContext(ctx, query, args...).Scan(&data)
return data, errors.WrapObjectSelectFailedError(err)
}
func (s pgStorage) RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectAll(ctx, rowID, now, now, payload, objectID)
}
func (s pgStorage) RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error) {
query := `INSERT INTO objects (id, created_at, updated_at, payload, object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT objects_object_uindex DO UPDATE SET updated_at = $2 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, payload, objectID).Scan(&id)
return id, errors.WrapObjectUpsertFailedError(err)
}
func (s pgStorage) RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectEventAll(ctx, rowID, now, now, activityID, objectID, payload)
}
func (s pgStorage) RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
query := `INSERT INTO object_events (id, created_at, updated_at, activity_id, object_id, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT object_events_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, payload).Scan(&id)
return id, errors.WrapObjectEventUpsertFailedError(err)
}
func (s pgStorage) RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserObjectEventAll(ctx, rowID, now, now, userID, activityID, objectID, public)
}
func (s pgStorage) RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
query := `INSERT INTO user_object_events (id, created_at, updated_at, user_id, activity_id, object_id, public) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT user_object_events_user_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, userID, activityID, objectID, public).Scan(&id)
return id, errors.WrapUserObjectEventUpsertFailedError(err)
}
func (s pgStorage) RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectTagAll(ctx, rowID, now, now, objectID, tag)
}
func (s pgStorage) RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error) {
query := `INSERT INTO object_tags (id, created_at, updated_at, object_id, tag) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_tags_tagged_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, tag).Scan(&id)
return id, errors.WrapObjectTagUpsertFailedError(err)
}
func (s pgStorage) RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserFeedAll(ctx, rowID, now, now, activityID, objectID, userID)
}
func (s pgStorage) RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO user_feed (id, created_at, updated_at, activity_id, object_id, user_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT user_feed_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, userID).Scan(&id)
return id, errors.WrapUserFeedUpsertFailedError(err)
}
func (s pgStorage) ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error) {
var rowID uuid.UUID
err := s.db.QueryRowContext(ctx, `SELECT id FROM objects WHERE object_id = $1`, objectID).Scan(&rowID)
if err != nil {
if err == sql.ErrNoRows {
return uuid.Nil, errors.NewObjectNotFoundError(errors.NewObjectQueryFailedError(err))
}
return uuid.Nil, errors.NewObjectQueryFailedError(err)
}
return rowID, nil
}
func (s pgStorage) ObjectRowIDsForObjectIDs(ctx context.Context, objectIDs []string) (map[string]uuid.UUID, error) {
if len(objectIDs) == 0 {
return nil, nil
}
query := fmt.Sprintf(`SELECT o.object_id, o.id FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.keysToUUID(errors.WrapObjectQueryFailedError, ctx, query, common.StringsToInterfaces(objectIDs)...)
}
func (s pgStorage) ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error) {
if len(objectIDs) == 0 {
return nil, nil
}
query := fmt.Sprintf(`SELECT o.id, o.payload FROM objects o WHERE o.id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.uuidsToPayload(errors.WrapObjectQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) ObjectPayloads(ctx context.Context, objectIDs []uuid.UUID) ([]Payload, error) {
if len(objectIDs) == 0 {
return nil, nil
}
query := fmt.Sprintf(`SELECT o.payload FROM objects o WHERE o.id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.objectPayloads(ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) RecordObjectAnnouncement(ctx context.Context, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectAnnouncementAll(ctx, rowID, now, now, actorID, activityID, objectID)
}
func (s pgStorage) RecordObjectAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO object_boosts (id, created_at, updated_at, actor_id, activity_id, object_id) VALUES ($1, $2, $3, $4, $5, $6) RETURNING ID`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, actorID, activityID, objectID).Scan(&id)
return id, errors.WrapObjectBoostInsertFailedError(err)
}
func (s pgStorage) CountObjectBoostsByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
if len(objectIDs) == 0 {
return []Count{}, nil
}
placeholders := common.DollarForEach(len(objectIDs))
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_boosts WHERE object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) CountObjectLikesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
if len(objectIDs) == 0 {
return []Count{}, nil
}
placeholders := common.DollarForEach(len(objectIDs))
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_likes WHERE object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) CountObjectRepliesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
if len(objectIDs) == 0 {
return []Count{}, nil
}
placeholders := common.DollarForEach(len(objectIDs))
query := fmt.Sprintf(`SELECT parent_object_id, COUNT(*) FROM object_replies WHERE parent_object_id IN (%s) GROUP BY parent_object_id`, strings.Join(placeholders, ","))
return s.keyedCount(errors.WrapObjectReplyQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) CountObjectBoostsByActorObjectIDs(ctx context.Context, actorRowID uuid.UUID, objectRowIDs []uuid.UUID) ([]Count, error) {
if len(objectRowIDs) == 0 {
return []Count{}, nil
}
placeholders := common.MapStrings(common.StringIntRange(2, len(objectRowIDs)+1), common.Dollar)
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_boosts WHERE actor_id = $1 AND object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, append([]interface{}{actorRowID}, common.UUIDsToInterfaces(objectRowIDs)...)...)
}
func (s pgStorage) RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectReplyAll(ctx, rowID, now, now, objectID, parentObjectID)
}
func (s pgStorage) RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO object_replies (id, created_at, updated_at, object_id, parent_object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_replies_reply_uindex DO UPDATE SET updated_at = now() RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, parentObjectID).Scan(&id)
return id, errors.WrapObjectReplyUpsertFailedError(err)
}
func (s pgStorage) UpdateObjectPayload(ctx context.Context, objectRowID uuid.UUID, payload Payload) error {
now := s.now()
_, err := s.db.ExecContext(ctx, "UPDATE objects SET payload = $3, updated_at = $2 WHERE id = $1", objectRowID, now, payload)
return errors.WrapUpdateQueryFailedError(err)
}
func (s pgStorage) ObjectParentsByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error) {
query := `WITH RECURSIVE object_parents(n) AS (
SELECT 1, o.object_id, o.parent_object_id
FROM object_replies o
WHERE o.object_id = $1
UNION
SELECT n + 1, r.object_id, r.parent_object_id
FROM object_replies r
INNER JOIN object_parents p ON p.parent_object_id = r.object_id
WHERE n < 10
)
SELECT object_id, parent_object_id
FROM object_parents`
return s.toUUIDMultiMap(errors.WrapObjectReplySelectFailedError, ctx, query, objectRowID)
}
func (s pgStorage) ObjectChildrenByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error) {
query := `WITH RECURSIVE object_children(n) AS (
SELECT 1, o.parent_object_id, o.object_id
FROM object_replies o
WHERE o.parent_object_id = $1
UNION
SELECT n + 1, r.parent_object_id, r.object_id
FROM object_replies r
INNER JOIN object_children p ON p.object_id = r.parent_object_id
WHERE n < 10
)
SELECT parent_object_id, object_id
FROM object_children`
return s.toUUIDMultiMap(errors.WrapObjectReplySelectFailedError, ctx, query, objectRowID)
}
func (s pgStorage) ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error) {
query := `SELECT COUNT(o.*) FROM objects o INNER JOIN user_feed uf on o.id = uf.object_id WHERE o.object_id = $1`
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectSubscriptionAll(ctx, rowID, now, now, objectRowID, actorRowID)
}
func (s pgStorage) RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO object_subscriptions (id, created_at, updated_at, object_id, actor_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_subscriptions_uindex DO UPDATE SET updated_at = now() RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectRowID, actorRowID).Scan(&id)
return id, errors.WrapObjectReplyUpsertFailedError(err)
}
func (s pgStorage) ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error) {
query := actorsSelectQuery("object_subscriptions os on a.id = os.actor_id", []string{"os.object_id = $1"})
return s.getActors(ctx, query, objectRowID)
}
func (s pgStorage) RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM object_subscriptions WHERE actor_id = $1 AND object_id = $2`, actorRowID, objectRowID)
return errors.WrapNetworkRelationshipUpdateFailedError(err)
}
func (s pgStorage) ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error) {
query := `SELECT COUNT(*) FROM objects WHERE object_id = $1`
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error) {
query := `SELECT COUNT(*) FROM object_events WHERE activity_id = $1`
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, activityID)
}
func (s pgStorage) ParentObjectID(ctx context.Context, objectRowID uuid.UUID) (uuid.UUID, error) {
return s.wrappedSelectUUID(errors.WrapObjectReplySelectFailedError, ctx, `SELECT parent_object_id FROM object_replies WHERE object_id = $1`, objectRowID)
}