mirror of https://gitlab.com/ngerakines/tavern.git
428 lines
25 KiB
Go
428 lines
25 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofrs/uuid"
|
|
|
|
"github.com/ngerakines/tavern/common"
|
|
"github.com/ngerakines/tavern/errors"
|
|
)
|
|
|
|
type ObjectStorage interface {
|
|
CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error)
|
|
CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error)
|
|
CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error)
|
|
CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error)
|
|
CountObjectPayloadsInTagFeed(ctx context.Context, tag string) (int, error)
|
|
CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error)
|
|
ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
|
|
ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
|
|
ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error)
|
|
ListObjectPayloadsInLocalFeed(ctx context.Context, limit int, offset int) ([]Payload, error)
|
|
ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error)
|
|
ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error)
|
|
ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
|
|
ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error)
|
|
ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error)
|
|
ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error)
|
|
ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error)
|
|
ObjectPayloads(ctx context.Context, objectIDs []uuid.UUID) ([]Payload, error)
|
|
ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error)
|
|
ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error)
|
|
ObjectRowIDsForObjectIDs(ctx context.Context, objectID []string) (map[string]uuid.UUID, error)
|
|
RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error)
|
|
RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error)
|
|
RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
|
|
RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
|
|
RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
|
|
RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error)
|
|
RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error)
|
|
RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error)
|
|
RecordObjectAnnouncement(ctx context.Context, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error)
|
|
RecordObjectAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error)
|
|
RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
|
|
RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
|
|
RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
|
|
RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
|
|
UpdateObjectPayload(ctx context.Context, objectRowID uuid.UUID, payload Payload) error
|
|
CountObjectBoostsByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
|
|
CountObjectBoostsByActorObjectIDs(ctx context.Context, actorRowID uuid.UUID, objectRowIDs []uuid.UUID) ([]Count, error)
|
|
CountObjectRepliesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
|
|
CountObjectLikesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error)
|
|
ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error)
|
|
ObjectParentsByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
|
|
ObjectChildrenByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error)
|
|
|
|
RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
|
|
RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error)
|
|
ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error)
|
|
RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error
|
|
|
|
ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error)
|
|
ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error)
|
|
|
|
ParentObjectID(context.Context, uuid.UUID) (uuid.UUID, error)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error) {
|
|
if len(objectIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
query := fmt.Sprintf(`SELECT o.payload FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
|
|
return s.objectPayloads(ctx, query, common.StringsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error) {
|
|
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(oe.payload) FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1`, userID)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT oe.payload FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1 ORDER BY uf.created_at DESC LIMIT $2 OFFSET $3`
|
|
return s.objectPayloads(ctx, query, userID, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error) {
|
|
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.user_id = $1`, userID)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT oe.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id WHERE uoe.user_id = $1 ORDER BY uoe.created_at DESC LIMIT $2 OFFSET $3`
|
|
return s.objectPayloads(ctx, query, userID, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error) {
|
|
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.public = true`)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInLocalFeed(ctx context.Context, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id WHERE uoe.public = true ORDER BY uoe.created_at DESC LIMIT $1 OFFSET $2`
|
|
return s.objectPayloads(ctx, query, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectPayloadsInTagFeed(ctx context.Context, tag string) (int, error) {
|
|
query := `SELECT COUNT(o.payload) FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
|
|
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, tag)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
|
|
return s.objectPayloads(ctx, query, tag)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error) {
|
|
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1`
|
|
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, objectID)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1 ORDER BY r.created_at DESC LIMIT $2 OFFSET $3`
|
|
return s.objectPayloads(ctx, query, objectID, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error) {
|
|
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
|
|
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, userID)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true ORDER BY uoe.created_at DESC LIMIT $2 OFFSET $3`
|
|
return s.objectPayloads(ctx, query, userID, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID) (int, error) {
|
|
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_conversations uc WHERE uc.user_id = $1 AND uc.thread_id = $2`, userID, threadID)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID, limit, offset int) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM objects o INNER JOIN user_conversations uc ON uc.object_id = o.id WHERE uc.user_id = $1 AND uc.thread_id = $2 ORDER BY uc.created_at DESC LIMIT $3 OFFSET $4`
|
|
return s.objectPayloads(ctx, query, userID, threadID, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error) {
|
|
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id INNER JOIN object_tags ot on o.id = ot.object_id WHERE uoe.user_id = $1 AND uoe.public = true AND ot.tag = $2 ORDER BY uoe.created_at DESC LIMIT $3 OFFSET $4`
|
|
return s.objectPayloads(ctx, query, userID, tag, limit, offset)
|
|
}
|
|
|
|
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
|
|
var results []Payload
|
|
rows, err := s.db.QueryContext(ctx, query, args...)
|
|
if err != nil {
|
|
return nil, errors.NewObjectSelectFailedError(err)
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
var data Payload
|
|
if err := rows.Scan(&data); err != nil {
|
|
return nil, errors.NewInvalidObjectError(err)
|
|
}
|
|
results = append(results, data)
|
|
}
|
|
return results, nil
|
|
}
|
|
|
|
func (s pgStorage) ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error) {
|
|
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE object_id = $1`, objectID)
|
|
}
|
|
|
|
func (s pgStorage) ObjectPayloadByObjectRowID(ctx context.Context, objectRowID uuid.UUID) (Payload, error) {
|
|
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE id = $1`, objectRowID)
|
|
}
|
|
|
|
func (s pgStorage) ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error) {
|
|
return s.objectPayload(ctx, `SELECT payload FROM object_events WHERE activity_id = $1`, activityID)
|
|
}
|
|
|
|
func (s pgStorage) objectPayload(ctx context.Context, query string, args ...interface{}) (Payload, error) {
|
|
var data Payload
|
|
err := s.db.QueryRowContext(ctx, query, args...).Scan(&data)
|
|
return data, errors.WrapObjectSelectFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectAll(ctx, rowID, now, now, payload, objectID)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error) {
|
|
query := `INSERT INTO objects (id, created_at, updated_at, payload, object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT objects_object_uindex DO UPDATE SET updated_at = $2 RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, payload, objectID).Scan(&id)
|
|
return id, errors.WrapObjectUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectEventAll(ctx, rowID, now, now, activityID, objectID, payload)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
|
|
query := `INSERT INTO object_events (id, created_at, updated_at, activity_id, object_id, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT object_events_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, payload).Scan(&id)
|
|
return id, errors.WrapObjectEventUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordUserObjectEventAll(ctx, rowID, now, now, userID, activityID, objectID, public)
|
|
}
|
|
|
|
func (s pgStorage) RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
|
|
query := `INSERT INTO user_object_events (id, created_at, updated_at, user_id, activity_id, object_id, public) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT user_object_events_user_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, userID, activityID, objectID, public).Scan(&id)
|
|
return id, errors.WrapUserObjectEventUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectTagAll(ctx, rowID, now, now, objectID, tag)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error) {
|
|
query := `INSERT INTO object_tags (id, created_at, updated_at, object_id, tag) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_tags_tagged_uindex DO UPDATE SET updated_at = $3 RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, tag).Scan(&id)
|
|
return id, errors.WrapObjectTagUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordUserFeedAll(ctx, rowID, now, now, activityID, objectID, userID)
|
|
}
|
|
|
|
func (s pgStorage) RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
|
|
query := `INSERT INTO user_feed (id, created_at, updated_at, activity_id, object_id, user_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT user_feed_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, userID).Scan(&id)
|
|
return id, errors.WrapUserFeedUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error) {
|
|
var rowID uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, `SELECT id FROM objects WHERE object_id = $1`, objectID).Scan(&rowID)
|
|
if err != nil {
|
|
if err == sql.ErrNoRows {
|
|
return uuid.Nil, errors.NewObjectNotFoundError(errors.NewObjectQueryFailedError(err))
|
|
}
|
|
return uuid.Nil, errors.NewObjectQueryFailedError(err)
|
|
}
|
|
return rowID, nil
|
|
}
|
|
|
|
func (s pgStorage) ObjectRowIDsForObjectIDs(ctx context.Context, objectIDs []string) (map[string]uuid.UUID, error) {
|
|
if len(objectIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
query := fmt.Sprintf(`SELECT o.object_id, o.id FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
|
|
return s.keysToUUID(errors.WrapObjectQueryFailedError, ctx, query, common.StringsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error) {
|
|
if len(objectIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
query := fmt.Sprintf(`SELECT o.id, o.payload FROM objects o WHERE o.id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
|
|
return s.uuidsToPayload(errors.WrapObjectQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) ObjectPayloads(ctx context.Context, objectIDs []uuid.UUID) ([]Payload, error) {
|
|
if len(objectIDs) == 0 {
|
|
return nil, nil
|
|
}
|
|
query := fmt.Sprintf(`SELECT o.payload FROM objects o WHERE o.id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
|
|
return s.objectPayloads(ctx, query, common.UUIDsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectAnnouncement(ctx context.Context, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectAnnouncementAll(ctx, rowID, now, now, actorID, activityID, objectID)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, actorID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
|
|
query := `INSERT INTO object_boosts (id, created_at, updated_at, actor_id, activity_id, object_id) VALUES ($1, $2, $3, $4, $5, $6) RETURNING ID`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, actorID, activityID, objectID).Scan(&id)
|
|
return id, errors.WrapObjectBoostInsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectBoostsByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
|
|
if len(objectIDs) == 0 {
|
|
return []Count{}, nil
|
|
}
|
|
placeholders := common.DollarForEach(len(objectIDs))
|
|
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_boosts WHERE object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
|
|
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectLikesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
|
|
if len(objectIDs) == 0 {
|
|
return []Count{}, nil
|
|
}
|
|
placeholders := common.DollarForEach(len(objectIDs))
|
|
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_likes WHERE object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
|
|
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectRepliesByObjectIDs(ctx context.Context, objectIDs []uuid.UUID) ([]Count, error) {
|
|
if len(objectIDs) == 0 {
|
|
return []Count{}, nil
|
|
}
|
|
placeholders := common.DollarForEach(len(objectIDs))
|
|
query := fmt.Sprintf(`SELECT parent_object_id, COUNT(*) FROM object_replies WHERE parent_object_id IN (%s) GROUP BY parent_object_id`, strings.Join(placeholders, ","))
|
|
return s.keyedCount(errors.WrapObjectReplyQueryFailedError, ctx, query, common.UUIDsToInterfaces(objectIDs)...)
|
|
}
|
|
|
|
func (s pgStorage) CountObjectBoostsByActorObjectIDs(ctx context.Context, actorRowID uuid.UUID, objectRowIDs []uuid.UUID) ([]Count, error) {
|
|
if len(objectRowIDs) == 0 {
|
|
return []Count{}, nil
|
|
}
|
|
placeholders := common.MapStrings(common.StringIntRange(2, len(objectRowIDs)+1), common.Dollar)
|
|
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM object_boosts WHERE actor_id = $1 AND object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
|
|
return s.keyedCount(errors.WrapObjectBoostQueryFailedError, ctx, query, append([]interface{}{actorRowID}, common.UUIDsToInterfaces(objectRowIDs)...)...)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectReply(ctx context.Context, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectReplyAll(ctx, rowID, now, now, objectID, parentObjectID)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectReplyAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID, parentObjectID uuid.UUID) (uuid.UUID, error) {
|
|
query := `INSERT INTO object_replies (id, created_at, updated_at, object_id, parent_object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_replies_reply_uindex DO UPDATE SET updated_at = now() RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, parentObjectID).Scan(&id)
|
|
return id, errors.WrapObjectReplyUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) UpdateObjectPayload(ctx context.Context, objectRowID uuid.UUID, payload Payload) error {
|
|
now := s.now()
|
|
_, err := s.db.ExecContext(ctx, "UPDATE objects SET payload = $3, updated_at = $2 WHERE id = $1", objectRowID, now, payload)
|
|
return errors.WrapUpdateQueryFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) ObjectParentsByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error) {
|
|
query := `WITH RECURSIVE object_parents(n) AS (
|
|
SELECT 1, o.object_id, o.parent_object_id
|
|
FROM object_replies o
|
|
WHERE o.object_id = $1
|
|
UNION
|
|
SELECT n + 1, r.object_id, r.parent_object_id
|
|
FROM object_replies r
|
|
INNER JOIN object_parents p ON p.parent_object_id = r.object_id
|
|
WHERE n < 10
|
|
)
|
|
SELECT object_id, parent_object_id
|
|
FROM object_parents`
|
|
return s.toUUIDMultiMap(errors.WrapObjectReplySelectFailedError, ctx, query, objectRowID)
|
|
}
|
|
|
|
func (s pgStorage) ObjectChildrenByObjectID(ctx context.Context, objectRowID uuid.UUID) (map[uuid.UUID][]uuid.UUID, error) {
|
|
query := `WITH RECURSIVE object_children(n) AS (
|
|
SELECT 1, o.parent_object_id, o.object_id
|
|
FROM object_replies o
|
|
WHERE o.parent_object_id = $1
|
|
UNION
|
|
SELECT n + 1, r.parent_object_id, r.object_id
|
|
FROM object_replies r
|
|
INNER JOIN object_children p ON p.object_id = r.parent_object_id
|
|
WHERE n < 10
|
|
)
|
|
SELECT parent_object_id, object_id
|
|
FROM object_children`
|
|
return s.toUUIDMultiMap(errors.WrapObjectReplySelectFailedError, ctx, query, objectRowID)
|
|
}
|
|
|
|
func (s pgStorage) ExistsObjectInUserFeedByObjectID(ctx context.Context, objectID string) (bool, error) {
|
|
query := `SELECT COUNT(o.*) FROM objects o INNER JOIN user_feed uf on o.id = uf.object_id WHERE o.object_id = $1`
|
|
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
|
|
rowID := NewV4()
|
|
now := s.now()
|
|
return s.RecordObjectSubscriptionAll(ctx, rowID, now, now, objectRowID, actorRowID)
|
|
}
|
|
|
|
func (s pgStorage) RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectRowID, actorRowID uuid.UUID) (uuid.UUID, error) {
|
|
query := `INSERT INTO object_subscriptions (id, created_at, updated_at, object_id, actor_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_subscriptions_uindex DO UPDATE SET updated_at = now() RETURNING id`
|
|
var id uuid.UUID
|
|
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectRowID, actorRowID).Scan(&id)
|
|
return id, errors.WrapObjectReplyUpsertFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error) {
|
|
query := actorsSelectQuery("object_subscriptions os on a.id = os.actor_id", []string{"os.object_id = $1"})
|
|
return s.getActors(ctx, query, objectRowID)
|
|
}
|
|
|
|
func (s pgStorage) RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error {
|
|
_, err := s.db.ExecContext(ctx, `DELETE FROM object_subscriptions WHERE actor_id = $1 AND object_id = $2`, actorRowID, objectRowID)
|
|
return errors.WrapNetworkRelationshipUpdateFailedError(err)
|
|
}
|
|
|
|
func (s pgStorage) ObjectExistsByObjectID(ctx context.Context, objectID string) (bool, error) {
|
|
query := `SELECT COUNT(*) FROM objects WHERE object_id = $1`
|
|
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, objectID)
|
|
}
|
|
|
|
func (s pgStorage) ActivityExistsByActivityID(ctx context.Context, activityID string) (bool, error) {
|
|
query := `SELECT COUNT(*) FROM object_events WHERE activity_id = $1`
|
|
return s.wrappedExists(errors.WrapObjectQueryFailedError, ctx, query, activityID)
|
|
}
|
|
|
|
func (s pgStorage) ParentObjectID(ctx context.Context, objectRowID uuid.UUID) (uuid.UUID, error) {
|
|
return s.wrappedSelectUUID(errors.WrapObjectReplySelectFailedError, ctx, `SELECT parent_object_id FROM object_replies WHERE object_id = $1`, objectRowID)
|
|
}
|