Overhauled object storage and queries. Closes #29 and #30.

This commit is contained in:
Nick Gerakines 2020-03-14 14:04:52 -04:00
parent 26a4e54cc2
commit 04fd776cee
No known key found for this signature in database
GPG Key ID: 33D43D854F96B2E4
20 changed files with 1093 additions and 747 deletions

View File

@ -3,6 +3,7 @@ package common
import (
"fmt"
"strconv"
"strings"
"github.com/gofrs/uuid"
)
@ -60,3 +61,21 @@ func UUIDsToInterfaces(input []uuid.UUID) []interface{} {
}
return results
}
func RemoveTrimmedEmptyStrings(input []string) []string {
results := make([]string, 0)
for _, s := range input {
if trimmed := strings.TrimSpace(s); len(trimmed) > 0 {
results = append(results, trimmed)
}
}
return results
}
func StringToUUIDMapValues(thing map[string]uuid.UUID) []uuid.UUID {
results := make([]uuid.UUID, 0, len(thing))
for _, value := range thing {
results = append(results, value)
}
return results
}

View File

@ -4,7 +4,6 @@ import (
"context"
"crypto/rsa"
"strings"
"time"
"github.com/yukimochi/httpsig"
"go.uber.org/zap"
@ -79,28 +78,17 @@ func (c Crawler) Start(user *storage.User, seed string) ([]string, []string, err
return nil, nil, err
}
if existingCount == 0 {
var body string
body, payload, err = ldJsonGetSigned(c.HTTPClient, location, signer, userActor.GetKeyID(), privateKey)
_, payload, err = ldJsonGetSigned(c.HTTPClient, location, signer, userActor.GetKeyID(), privateKey)
if err != nil {
return nil, nil, err
}
conversation, _ := storage.JSONString(payload, "conversation")
publishedAtStr, hasPublished := storage.JSONString(payload, "published")
publishedAt := time.Now()
if hasPublished {
publishedAt, err = time.Parse("2006-01-02T15:04:05Z", publishedAtStr)
if err != nil {
publishedAt = time.Now()
}
}
err = c.Storage.RecordObject(ctx, location, body, conversation, publishedAt)
_, err := c.Storage.RecordObject(ctx, payload, location)
if err != nil {
return nil, nil, err
}
} else {
payload, err = c.Storage.ObjectPayload(ctx, location)
payload, err = c.Storage.ObjectPayloadByObjectID(ctx, location)
if err != nil {
return nil, nil, err
}

View File

@ -1,54 +1,64 @@
create table if not exists actors
(
id uuid not null
constraint actors_pk primary key,
-- TODO: make actor_id a sha256 value of the actor_id
actor_id varchar(255) not null,
payload text not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone default now() not null,
-- TODO: find a way to not have the aliases column
aliases varchar(255)[] default '{}'::character varying(255)[] not null,
-- TODO: remove the name and domain columns
name varchar(200) default 'empty'::character varying not null,
domain varchar(200) default 'empty'::character varying not null
-- TODO: create a "subject" column from the webfinger payload.
id uuid not null
constraint actors_pk
primary key,
actor_id varchar not null
constraint actors_actor_id
unique,
payload jsonb not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone default now() not null
);
create unique index if not exists actors_actor_id_uindex on actors (actor_id);
create table if not exists followers
create table if not exists actor_aliases
(
id uuid not null
constraint followers_pk primary key,
user_id uuid not null,
-- TODO: make actor_id a sha256 value of the actor_id
actor varchar(100) not null,
request_activity text not null,
status integer default 0 not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
constraint followers_user_actor unique (user_id, actor)
id uuid not null
constraint actor_subjects_pk
primary key,
actor_id uuid not null,
alias varchar not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
alias_type integer default 0 not null
);
create unique index if not exists followers_user_id_actor_uindex on followers (user_id, actor);
create unique index if not exists actor_aliases_alias_uindex
on actor_aliases (alias);
create table if not exists following
create table if not exists network_graph
(
id uuid not null
constraint following_pk primary key,
user_id uuid not null,
-- TODO: rename "actor" to "actor_id"
-- TODO: make actor_id a sha256 value of the actor_id
actor varchar(100) not null,
request_activity text not null,
status integer default 0 not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
constraint following_user_actor unique (user_id, actor)
id uuid not null
constraint network_graph_pk
primary key,
user_id uuid not null,
actor_id uuid not null,
activity jsonb not null,
relationship_type integer default 0 not null,
relationship_status integer default 0 not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
constraint network_graph_user_actor_rel
unique (user_id, actor_id, relationship_type)
);
create unique index if not exists following_user_id_actor_uindex on following (user_id, actor);
create table if not exists actor_keys
(
id uuid not null
constraint actor_keys_pk
primary key,
actor_id uuid not null,
key_id varchar not null,
pem text not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
constraint actor_keys_lookup
unique (actor_id, key_id)
);
create table if not exists image_aliases
(
@ -80,55 +90,6 @@ create unique index if not exists asset_image_checksum_uindex on images (checksu
create unique index if not exists asset_image_location_uindex on images (location);
create table if not exists keys
(
id uuid not null
constraint keys_pk primary key,
key_id varchar(255) not null,
public_key text not null,
created_at timestamp with time zone not null
);
create unique index if not exists keys_key_id_uindex on keys (key_id);
create table if not exists object_events
(
id uuid not null
constraint object_events_pk primary key,
-- TODO: make activity_id a sha256 value of the activity_id value
activity_id varchar(400) not null
constraint object_event_activity unique,
name varchar(50) not null,
-- TODO: make object_id a sha256 value of the object_id value
object_id varchar(400) not null,
created_at timestamp with time zone not null,
-- TODO: make actor_id a sha256 value of the actor_id value
actor_id varchar(200) not null,
published_at timestamp with time zone not null,
-- TODO: Find a way to get rid of the "to" and "cc" columns.
"to" varchar(200)[] default '{}'::character varying[] not null,
cc varchar(200)[] default '{}'::character varying[] not null
);
create unique index if not exists object_events_activity_id_uindex on object_events (activity_id);
create table if not exists objects
(
id uuid not null
constraint objects_pk primary key,
-- TODO: make object_id a sha256 value of the object_id value
object_id varchar(200) not null
constraint objects_id unique,
payload text not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
-- TODO: make conversation a sha256 value of the conversation value
conversation varchar(200) default ''::character varying not null,
published_at timestamp with time zone
);
create unique index if not exists objects_object_uri_uindex on objects (object_id);
create table if not exists peers
(
id uuid not null
@ -139,37 +100,6 @@ create table if not exists peers
create unique index if not exists peers_inbox_uindex on peers (inbox);
create table if not exists user_activities
(
id uuid not null
constraint user_activities_pk primary key,
user_id uuid not null,
activity_id uuid not null,
created_at timestamp with time zone not null,
public boolean default true not null,
-- TODO: split this out into a object_tags table
tags varchar(100)[] default '{}'::character varying[] not null
);
create table if not exists user_feed
(
id uuid not null
constraint user_feed_pk primary key,
activity_id uuid not null,
user_id uuid not null,
created_at timestamp with time zone not null
);
create table if not exists user_threads
(
id uuid not null
constraint user_threads_pk primary key,
user_id uuid not null,
-- TODO: make conversation a sha256 value of the conversation value
conversation varchar(500) not null,
created_at timestamp with time zone not null
);
create table if not exists users
(
id uuid not null
@ -191,3 +121,85 @@ create table if not exists users
);
create unique index if not exists users_username_uindex on users (name);
create table objects
(
id uuid
constraint objects_pk
primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
payload jsonb not null,
object_id varchar not null
);
create unique index objects_object_uindex on objects (object_id);
ALTER TABLE objects
ADD CONSTRAINT objects_object_uindex UNIQUE USING INDEX objects_object_uindex;
create table object_events
(
id uuid
constraint object_events_pk primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
activity_id varchar not null,
object_id uuid not null,
payload jsonb not null
);
create unique index object_events_activity_uindex on object_events (activity_id);
ALTER TABLE object_events
ADD CONSTRAINT object_events_activity_uindex UNIQUE USING INDEX object_events_activity_uindex;
create table user_object_events
(
id uuid
constraint user_object_events_pk primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
user_id uuid not null,
activity_id uuid not null,
object_id uuid not null,
public bool not null default false
);
create unique index user_object_events_user_activity_uindex on user_object_events (user_id, activity_id, object_id);
ALTER TABLE user_object_events
ADD CONSTRAINT user_object_events_user_activity_uindex UNIQUE USING INDEX user_object_events_user_activity_uindex;
create table user_object_tags
(
id uuid
constraint user_object_tags_pk primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
user_id uuid not null,
activity_id uuid not null,
object_id uuid not null,
tag varchar not null
);
create unique index user_object_tags_tagged_uindex on user_object_tags (user_id, object_id, tag);
ALTER TABLE user_object_tags
ADD CONSTRAINT user_object_tags_tagged_uindex UNIQUE USING INDEX user_object_tags_tagged_uindex;
create table user_feed
(
id uuid not null
constraint user_feed_pk
primary key,
activity_id uuid not null,
object_id uuid not null,
user_id uuid not null,
created_at timestamp with time zone not null
);
create unique index user_feed_activity_uindex on user_feed (user_id, activity_id, object_id);
ALTER TABLE user_feed
ADD CONSTRAINT user_feed_activity_uindex UNIQUE USING INDEX user_feed_activity_uindex;

View File

@ -162,7 +162,7 @@ func (s pgStorage) GetActorByAlias(ctx context.Context, alias string) (*Actor, e
return s.getFirstActor(s.db, ctx, query, alias)
}
func (s pgStorage) getFirstActor(qc QueryContext, ctx context.Context, query string, args ...interface{}) (*Actor, error) {
func (s pgStorage) getFirstActor(qc QueryExecute, ctx context.Context, query string, args ...interface{}) (*Actor, error) {
results, err := s.getActors(qc, ctx, query, args...)
if err != nil {
return nil, err
@ -173,7 +173,7 @@ func (s pgStorage) getFirstActor(qc QueryContext, ctx context.Context, query str
return results[0], nil
}
func (s pgStorage) getActors(qc QueryContext, ctx context.Context, query string, args ...interface{}) ([]*Actor, error) {
func (s pgStorage) getActors(qc QueryExecute, ctx context.Context, query string, args ...interface{}) ([]*Actor, error) {
results := make([]*Actor, 0)
rows, err := qc.QueryContext(ctx, query, args...)
if err != nil {

View File

@ -2,7 +2,6 @@ package storage
import (
"context"
"database/sql"
"fmt"
"strings"
@ -53,7 +52,7 @@ const (
func (s pgStorage) CreateImage(ctx context.Context, location, checksum, blur string, size, contentType, height, width int, aliases []string) (ImageAsset, error) {
var img ImageAsset
now := s.now()
txErr := runTransactionWithOptions(s.db, func(tx *sql.Tx) error {
txErr := runTransactionWithOptions(s.db, func(tx QueryExecute) error {
checksumCount, err := s.rowCount(tx, ctx, `SELECT COUNT(*) FROM images WHERE checksum = $1`, checksum)
if err != nil {
return errors.WrapInsertQueryFailedError(err)
@ -112,7 +111,7 @@ func (s pgStorage) GetImageByChecksum(ctx context.Context, checksum string) (Ima
return images[0], nil
}
func (s pgStorage) getImage(qc QueryContext, ctx context.Context, id uuid.UUID) (ImageAsset, error) {
func (s pgStorage) getImage(qc QueryExecute, ctx context.Context, id uuid.UUID) (ImageAsset, error) {
images, err := s.getImagesQuery(qc, ctx, `SELECT id, location, checksum, blur, size, content_type, height, width, '' FROM images WHERE id = $1`, id)
if err != nil {
return ImageAsset{}, err
@ -153,7 +152,7 @@ func (s pgStorage) GetImagesByAlias(ctx context.Context, aliases []string) ([]Im
return s.getImagesQuery(s.db, ctx, query, args...)
}
func (s pgStorage) getImagesQuery(qc QueryContext, ctx context.Context, query string, args ...interface{}) ([]ImageAsset, error) {
func (s pgStorage) getImagesQuery(qc QueryExecute, ctx context.Context, query string, args ...interface{}) ([]ImageAsset, error) {
var results []ImageAsset
rows, err := qc.QueryContext(ctx, query, args...)

View File

@ -1,5 +1,9 @@
package storage
import (
"github.com/ngerakines/tavern/common"
)
func JSONDeepString(document map[string]interface{}, keys ...string) (string, bool) {
if len(keys) == 1 {
return JSONString(document, keys[0])
@ -13,6 +17,28 @@ func JSONDeepString(document map[string]interface{}, keys ...string) (string, bo
return "", false
}
func CollectJSONDeepStrings(document map[string]interface{}, keySets ...[]string) []string {
results := make([]string, 0)
for _, keys := range keySets {
values, _ := JSONDeepStrings(document, keys...)
results = append(results, values...)
}
return common.RemoveTrimmedEmptyStrings(results)
}
func JSONDeepStrings(document map[string]interface{}, keys ...string) ([]string, bool) {
if len(keys) == 1 {
return JSONStrings(document, keys[0])
}
if len(keys) > 1 {
inner, ok := JSONMap(document, keys[0])
if ok {
return JSONDeepStrings(inner, keys[1:]...)
}
}
return []string{}, false
}
func JSONMap(document map[string]interface{}, key string) (map[string]interface{}, bool) {
if value, ok := document[key]; ok {
if mapVal, isMap := value.(map[string]interface{}); isMap {

View File

@ -28,6 +28,8 @@ type FollowerStorage interface {
RemoveFollower(ctx context.Context, userID, actorID uuid.UUID) error
IsFollowing(ctx context.Context, userID, actorID uuid.UUID) (bool, error)
IsFollower(ctx context.Context, userID, actorID uuid.UUID) (bool, error)
CountFollowers(ctx context.Context, userID uuid.UUID) (int, error)
CountFollowing(ctx context.Context, userID uuid.UUID) (int, error)
}
type NetworkRelationship struct {
@ -81,7 +83,7 @@ const (
var _ FollowerStorage = &pgStorage{}
func (s pgStorage) networkGraphQuery(qc QueryContext, ctx context.Context, userID uuid.UUID, relationshipType RelationshipType, relationshipStatus RelationshipStatus, limit, offset int) ([]string, error) {
func (s pgStorage) networkGraphQuery(qc QueryExecute, ctx context.Context, userID uuid.UUID, relationshipType RelationshipType, relationshipStatus RelationshipStatus, limit, offset int) ([]string, error) {
query := `SELECT a.actor_id FROM network_graph n INNER JOIN actors a ON a.id = n.actor_id WHERE n.user_id = $3 AND n.relationship_type = $4 AND n.relationship_status = $5 ORDER BY n.created_at ASC LIMIT $1 OFFSET $2`
rows, err := qc.QueryContext(ctx, query, limit, offset, userID, relationshipType, relationshipStatus)
if err != nil {
@ -144,7 +146,7 @@ func (s pgStorage) ActivityForFollower(ctx context.Context, userID, actorID uuid
return s.networkGraphActivity(s.db, ctx, userID, actorID, UserFollowedByRelationship)
}
func (s pgStorage) networkGraphActivity(qc QueryContext, ctx context.Context, userID, actorID uuid.UUID, relationshipType RelationshipType) (Payload, error) {
func (s pgStorage) networkGraphActivity(qc QueryExecute, ctx context.Context, userID, actorID uuid.UUID, relationshipType RelationshipType) (Payload, error) {
var payload Payload
err := qc.QueryRowContext(ctx, `SELECT activity FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, relationshipType).
Scan(&payload)
@ -188,3 +190,11 @@ func (s pgStorage) IsFollower(ctx context.Context, userID, actorID uuid.UUID) (b
c, err := s.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, UserFollowedByRelationship)
return c == 1, err
}
func (s pgStorage) CountFollowers(ctx context.Context, userID uuid.UUID) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowedByRelationship)
}
func (s pgStorage) CountFollowing(ctx context.Context, userID uuid.UUID) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowsRelationship)
}

View File

@ -8,102 +8,114 @@ import (
"time"
"github.com/gofrs/uuid"
"github.com/lib/pq"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
)
type ObjectStorage interface {
RecordObject(ctx context.Context, objectID, payload, conversation string, publishedAt time.Time) error
UpdateObjectPayload(ctx context.Context, objectID, payload string) error
ObjectPayload(ctx context.Context, objectID string) (Payload, error)
ObjectPayloads(ctx context.Context, objectIDs []string) ([]Payload, error)
ObjectPayloadsByConversation(ctx context.Context, conversation string) ([]Payload, error)
RemoveObject(ctx context.Context, objectID string) error
RecordObjectEvent(ctx context.Context, activityID, objectID, actorID, name string, publishedAt time.Time, to, cc []string) (uuid.UUID, error)
RecordCreateNoteEvent(ctx context.Context, activityID, objectID, actorID, payload, conversation string, publishedAt time.Time, to, cc []string) (uuid.UUID, error)
RecordAnnounceNoteEvent(ctx context.Context, activityID, objectID, actorID, payload, conversation string, publishedAt time.Time, to, cc []string) (uuid.UUID, error)
ObjectPairsByObjectEventRowIDs(ctx context.Context, objectEventIDs []uuid.UUID) ([]ObjectEventPair, error)
UserAnnouncementsByObject(ctx context.Context, actorID string, objectIDs []string) (map[string]bool, error)
ObjectPairsByActivityID(ctx context.Context, objectEventIDs []string) ([]ObjectEventPair, error)
CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error)
ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error)
ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error)
ListObjectPayloadsInLocalFeed(ctx context.Context) ([]Payload, error)
ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error)
CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error)
ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error)
CountObjectPayloadsInUserConversation(ctx context.Context, userID uuid.UUID, conversation string) (int, error)
ListObjectPayloadsInUserConversation(ctx context.Context, userID uuid.UUID, conversation string) ([]Payload, error)
CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error)
ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error)
ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error)
ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error)
ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error)
ObjectRowIDsForObjectIDs(ctx context.Context, objectID []string) (map[string]uuid.UUID, error)
RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error)
RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error)
RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error)
RecordUserConversation(ctx context.Context, userID, activityID, objectID uuid.UUID, conversation string) (uuid.UUID, error)
RecordUserConversationAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, conversation string) (uuid.UUID, error)
RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error)
RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error)
RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error)
RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error)
RecordUserAnnouncement(ctx context.Context, userID, activityID, objectID uuid.UUID) (uuid.UUID, error)
RecordUserAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID) (uuid.UUID, error)
UserAnnouncementCounts(ctx context.Context, userID uuid.UUID, objectIDs []uuid.UUID) ([]Count, error)
ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error)
}
type Object struct {
ID uuid.UUID
URI string
Payload string
Checksum string
CreatedAt time.Time
UpdatedAt time.Time
}
type ObjectEventPair struct {
ObjectRowID uuid.UUID
ObjectEventRowID uuid.UUID
ActivityID string
ActorID string
Published time.Time
ObjectID string
Name string
Object string
To pq.StringArray
Cc pq.StringArray
Conversation string
}
const (
CreateNoteObjectEvent = "create_note"
AnnounceNoteObjectEvent = "announce_note"
)
var _ ObjectStorage = &pgStorage{}
func (s pgStorage) RecordObject(ctx context.Context, objectID, payload, conversation string, publishedAt time.Time) error {
return s.recordObject(s.db, ctx, objectID, payload, conversation, publishedAt)
}
func (s pgStorage) recordObject(qe QueryExecute, ctx context.Context, objectID, payload, conversation string, publishedAt time.Time) error {
_, err := qe.ExecContext(ctx, "INSERT INTO objects (id, object_id, payload, created_at, updated_at, conversation, published_at) VALUES ($1, $2, $3, $4, $4, $5, $6)", NewV4(), objectID, payload, s.now(), conversation, publishedAt)
return errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) UpdateObjectPayload(ctx context.Context, objectID, payload string) error {
_, err := s.db.ExecContext(ctx, `UPDATE objects SET updated_at = $2, payload = $3 WHERE object_id = $1`, objectID, s.now(), payload)
return errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) ObjectPayload(ctx context.Context, objectID string) (Payload, error) {
var payload string
err := s.db.
QueryRowContext(ctx, `SELECT payload FROM objects WHERE object_id = $1`, objectID).
Scan(&payload)
if err != nil {
return nil, errors.WrapQueryFailedError(err)
}
return PayloadFromString(payload)
}
func (s pgStorage) ObjectPayloads(ctx context.Context, objectIDs []string) ([]Payload, error) {
func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs []string) ([]Payload, error) {
if len(objectIDs) == 0 {
return nil, nil
}
params := make([]string, len(objectIDs))
args := make([]interface{}, len(objectIDs))
for i, id := range objectIDs {
params[i] = fmt.Sprintf("$%d", i+1)
args[i] = id
}
query := fmt.Sprintf("SELECT payload FROM objects WHERE id IN (%s)", strings.Join(params, ", "))
return s.objectPayloads(ctx, query, args...)
query := fmt.Sprintf(`SELECT o.payload FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.objectPayloads(ctx, query, common.StringsToInterfaces(objectIDs)...)
}
func (s pgStorage) ObjectPayloadsByConversation(ctx context.Context, conversation string) ([]Payload, error) {
return s.objectPayloads(ctx, `SELECT payload FROM objects WHERE conversation = $1 ORDER BY created_at ASC`, conversation)
func (s pgStorage) CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM user_feed uf WHERE uf.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT oe.payload FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1 ORDER BY uf.created_at ASC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT oe.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id WHERE uoe.user_id = $1 ORDER BY uoe.created_at ASC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.public = true`)
}
func (s pgStorage) ListObjectPayloadsInLocalFeed(ctx context.Context) ([]Payload, error) {
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id WHERE uoe.public = true`
return s.objectPayloads(ctx, query)
}
func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error) {
query := `SELECT o.payload FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
return s.objectPayloads(ctx, query, tag)
}
func (s pgStorage) CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.*) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
return s.rowCount(s.db, ctx, query, userID)
}
func (s pgStorage) ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true ORDER BY uoe.created_at ASC LIMIT $2 OFFSET $3`
return s.objectPayloads(ctx, query, userID, limit, offset)
}
func (s pgStorage) CountObjectPayloadsInUserConversation(ctx context.Context, userID uuid.UUID, conversation string) (int, error) {
return s.RowCount(ctx, `SELECT COUNT(*) FROM user_conversations uc WHERE uc.user_id = $1 AND uc.conversation = $2`, userID, conversation)
}
func (s pgStorage) ListObjectPayloadsInUserConversation(ctx context.Context, userID uuid.UUID, conversation string) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_conversations uc ON uc.object_id = o.id WHERE uc.user_id = $1 AND uc.conversation = $2`
return s.objectPayloads(ctx, query, userID, conversation)
}
func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID uuid.UUID, tag string, limit int, offset int) ([]Payload, error) {
query := `SELECT o.payload FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id INNER JOIN object_tags ot on o.id = ot.object_id WHERE uoe.user_id = $1 AND uoe.public = true AND ot.tag = $2 ORDER BY uoe.created_at ASC LIMIT $3 OFFSET $4`
return s.objectPayloads(ctx, query, userID, tag, limit, offset)
}
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
fmt.Println(query)
fmt.Println(args...)
var results []Payload
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
@ -111,161 +123,156 @@ func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...int
}
defer rows.Close()
for rows.Next() {
var data string
var data Payload
if err := rows.Scan(&data); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
p, err := PayloadFromString(data)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
results = append(results, p)
results = append(results, data)
}
return results, nil
}
func (s pgStorage) RemoveObject(ctx context.Context, objectID string) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM objects WHERE object_id = $1`, objectID)
return errors.WrapDeleteQueryFailedError(err)
func (s pgStorage) ObjectPayloadByObjectID(ctx context.Context, objectID string) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE object_id = $1`, objectID)
}
func (s pgStorage) RecordObjectEvent(ctx context.Context, activityID, objectID, actorID, name string, publishedAt time.Time, to, cc []string) (uuid.UUID, error) {
return s.recordObjectEvent(s.db, ctx, activityID, objectID, actorID, name, publishedAt, to, cc)
func (s pgStorage) ObjectEventPayloadByActivityID(ctx context.Context, activityID string) (Payload, error) {
return s.objectPayload(ctx, `SELECT payload FROM objects WHERE activity_id = $1`, activityID)
}
func (s pgStorage) recordObjectEvent(qe QueryExecute, ctx context.Context, activityID, objectID, actorID, name string, publishedAt time.Time, to, cc []string) (uuid.UUID, error) {
func (s pgStorage) objectPayload(ctx context.Context, query string, args ...interface{}) (Payload, error) {
var data Payload
err := s.db.QueryRowContext(ctx, query, args...).Scan(&data)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
return data, nil
}
func (s pgStorage) RecordObject(ctx context.Context, payload Payload, objectID string) (uuid.UUID, error) {
rowID := NewV4()
_, err := qe.ExecContext(ctx, `INSERT INTO object_events (id, activity_id, object_id, actor_id, name, created_at, published_at, "to", cc) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)`, rowID, activityID, objectID, actorID, name, s.now(), publishedAt, pq.Array(to), pq.Array(cc))
return rowID, errors.WrapInsertQueryFailedError(err)
now := s.now()
return s.RecordObjectAll(ctx, rowID, now, now, payload, objectID)
}
func (s pgStorage) RecordCreateNoteEvent(ctx context.Context, activityID, objectID, actorID, payload, conversation string, publishedAt time.Time, to, cc []string) (uuid.UUID, error) {
return s.recordObjectEventPair(ctx, activityID, objectID, actorID, payload, CreateNoteObjectEvent, conversation, publishedAt, to, cc)
func (s pgStorage) RecordObjectAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, payload Payload, objectID string) (uuid.UUID, error) {
query := `INSERT INTO objects (id, created_at, updated_at, payload, object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT objects_object_uindex DO UPDATE SET updated_at = $2 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, payload, objectID).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) RecordAnnounceNoteEvent(ctx context.Context, activityID, objectID, actorID, payload, conversation string, publishedAt time.Time, to, cc []string) (uuid.UUID, error) {
return s.recordObjectEventPair(ctx, activityID, objectID, actorID, payload, AnnounceNoteObjectEvent, conversation, publishedAt, to, cc)
func (s pgStorage) RecordObjectEvent(ctx context.Context, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectEventAll(ctx, rowID, now, now, activityID, objectID, payload)
}
func (s pgStorage) recordObjectEventPair(ctx context.Context, activityID, objectID, actorID, payload, name, conversation string, publishedAt time.Time, to, cc []string) (uuid.UUID, error) {
var objectEventID uuid.UUID
txErr := runTransactionWithOptions(s.db, func(tx *sql.Tx) error {
oCount, err := s.rowCount(s.db, ctx, `SELECT COUNT(*) FROM objects WHERE object_id = $1`, objectID)
if err != nil {
return errors.WrapInsertQueryFailedError(err)
}
if oCount == 0 {
err = s.recordObject(tx, ctx, objectID, payload, conversation, publishedAt)
if err != nil {
return err
}
}
oeCount, err := s.rowCount(tx, ctx, `SELECT COUNT(*) FROM object_events WHERE activity_id = $1 AND object_id = $2 AND actor_id = $3 AND name = $4`, activityID, objectID, actorID, name)
if err != nil {
return errors.WrapInsertQueryFailedError(err)
}
if oeCount == 0 {
objectEventID, err = s.recordObjectEvent(tx, ctx, activityID, objectID, actorID, name, publishedAt, to, cc)
return err
}
return errors.WrapSelectQueryFailedError(
tx.QueryRowContext(ctx, `SELECT id FROM object_events WHERE activity_id = $1 AND object_id = $2 AND actor_id = $3 AND name = $4`, activityID, objectID, actorID, name).Scan(&objectEventID))
})
if txErr != nil {
return uuid.Nil, txErr
}
return objectEventID, nil
func (s pgStorage) RecordObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID string, objectID uuid.UUID, payload Payload) (uuid.UUID, error) {
query := `INSERT INTO object_events (id, created_at, updated_at, activity_id, object_id, payload) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT object_events_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, payload).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) ObjectPairsByObjectEventRowIDs(ctx context.Context, objectEventIDs []uuid.UUID) ([]ObjectEventPair, error) {
if len(objectEventIDs) == 0 {
return nil, nil
}
params := make([]string, len(objectEventIDs))
args := make([]interface{}, len(objectEventIDs))
for i, id := range objectEventIDs {
params[i] = fmt.Sprintf("$%d", i+1)
args[i] = id
}
func (s pgStorage) RecordUserConversation(ctx context.Context, userID, activityID, objectID uuid.UUID, conversation string) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserConversationAll(ctx, rowID, now, now, userID, activityID, objectID, conversation)
}
query := fmt.Sprintf("SELECT o.id, oe.id, oe.activity_id, oe.actor_id, oe.name, oe.published_at, o.object_id, o.payload, oe.to, oe.cc, o.conversation FROM objects o INNER JOIN object_events oe on o.object_id = oe.object_id WHERE oe.id IN (%s)", strings.Join(params, ", "))
func (s pgStorage) RecordUserConversationAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, conversation string) (uuid.UUID, error) {
query := `INSERT INTO user_conversations (id, created_at, updated_at, user_id, activity_id, object_id, conversation) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT user_conversations_thread_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, userID, activityID, objectID, conversation).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
var pairs []ObjectEventPair
rows, err := s.db.QueryContext(ctx, query, args...)
func (s pgStorage) RecordUserObjectEvent(ctx context.Context, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserObjectEventAll(ctx, rowID, now, now, userID, activityID, objectID, public)
}
func (s pgStorage) RecordUserObjectEventAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID, public bool) (uuid.UUID, error) {
query := `INSERT INTO user_object_events (id, created_at, updated_at, user_id, activity_id, object_id, public) VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT ON CONSTRAINT user_object_events_user_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, userID, activityID, objectID, public).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) RecordObjectTag(ctx context.Context, objectID uuid.UUID, tag string) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordObjectTagAll(ctx, rowID, now, now, objectID, tag)
}
func (s pgStorage) RecordObjectTagAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, objectID uuid.UUID, tag string) (uuid.UUID, error) {
query := `INSERT INTO object_tags (id, created_at, updated_at, object_id, tag) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT object_tags_tagged_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, objectID, tag).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) RecordUserFeed(ctx context.Context, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserFeedAll(ctx, rowID, now, now, activityID, objectID, userID)
}
func (s pgStorage) RecordUserFeedAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, activityID, objectID, userID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO user_feed (id, created_at, updated_at, activity_id, object_id, user_id) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT ON CONSTRAINT user_feed_activity_uindex DO UPDATE SET updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, activityID, objectID, userID).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) ObjectRowIDForObjectID(ctx context.Context, objectID string) (uuid.UUID, error) {
var rowID uuid.UUID
err := s.db.QueryRowContext(ctx, `SELECT id FROM objects WHERE object_id = $1`, objectID).Scan(&rowID)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var pair ObjectEventPair
if err := rows.Scan(&pair.ObjectRowID, &pair.ObjectEventRowID, &pair.ActivityID, &pair.ActorID, &pair.Name, &pair.Published, &pair.ObjectID, &pair.Object, &pair.To, &pair.Cc, &pair.Conversation); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
if err == sql.ErrNoRows {
return uuid.Nil, errors.NewNotFoundError(err)
}
pairs = append(pairs, pair)
return uuid.Nil, errors.NewQueryFailedError(err)
}
return pairs, nil
return rowID, nil
}
func (s pgStorage) ObjectPairsByActivityID(ctx context.Context, objectEventIDs []string) ([]ObjectEventPair, error) {
if len(objectEventIDs) == 0 {
return nil, nil
}
params := make([]string, len(objectEventIDs))
args := make([]interface{}, len(objectEventIDs))
for i, id := range objectEventIDs {
params[i] = fmt.Sprintf("$%d", i+1)
args[i] = id
}
query := fmt.Sprintf("SELECT o.id, oe.id, oe.activity_id, oe.actor_id, oe.name, oe.published_at, o.object_id, o.payload, oe.to, oe.cc, o.conversation FROM objects o INNER JOIN object_events oe on o.object_id = oe.object_id WHERE oe.activity_id IN (%s)", strings.Join(params, ", "))
var pairs []ObjectEventPair
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var pair ObjectEventPair
if err := rows.Scan(&pair.ObjectRowID, &pair.ObjectEventRowID, &pair.ActivityID, &pair.ActorID, &pair.Name, &pair.Published, &pair.ObjectID, &pair.Object, &pair.To, &pair.Cc, &pair.Conversation); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
pairs = append(pairs, pair)
}
return pairs, nil
}
func (s pgStorage) UserAnnouncementsByObject(ctx context.Context, actorID string, objectIDs []string) (map[string]bool, error) {
results := make(map[string]bool)
func (s pgStorage) ObjectRowIDsForObjectIDs(ctx context.Context, objectIDs []string) (map[string]uuid.UUID, error) {
if len(objectIDs) == 0 {
return results, nil
return nil, nil
}
params := make([]string, len(objectIDs))
args := make([]interface{}, len(objectIDs)+1)
args[0] = actorID
for i, id := range objectIDs {
params[i] = fmt.Sprintf("$%d", i+2)
args[i+1] = id
results[id] = false
}
query := fmt.Sprintf(`SELECT object_id FROM object_events WHERE actor_id = $1 AND name LIKE 'announce_%%' AND object_id IN (%s)`, strings.Join(params, ", "))
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var objectID string
if err := rows.Scan(&objectID); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
results[objectID] = true
}
return results, nil
query := fmt.Sprintf(`SELECT o.object_id, o.id FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.keysToUUID(ctx, query, common.StringsToInterfaces(objectIDs)...)
}
func (s pgStorage) ObjectPayloadByUUID(ctx context.Context, objectIDs []uuid.UUID) (map[uuid.UUID]Payload, error) {
if len(objectIDs) == 0 {
return nil, nil
}
query := fmt.Sprintf(`SELECT o.id, o.payload FROM objects o WHERE o.object_id in (%s)`, strings.Join(common.DollarForEach(len(objectIDs)), ","))
return s.uuidsToPayload(ctx, query, common.UUIDsToInterfaces(objectIDs)...)
}
func (s pgStorage) RecordUserAnnouncement(ctx context.Context, userID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordUserAnnouncementAll(ctx, rowID, now, now, userID, activityID, objectID)
}
func (s pgStorage) RecordUserAnnouncementAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userID, activityID, objectID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO user_announcements (id, created_at, updated_at, user_id, activity_id, object_id) VALUES ($1, $2, $3, $4, $5, $6) RETURNING ID`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, userID, activityID, objectID).Scan(&id)
return id, errors.WrapInsertQueryFailedError(err)
}
func (s pgStorage) UserAnnouncementCounts(ctx context.Context, userID uuid.UUID, objectIDs []uuid.UUID) ([]Count, error) {
if len(objectIDs) == 0 {
return []Count{}, nil
}
placeholders := common.MapStrings(common.StringIntRange(2, len(objectIDs)+1), common.Dollar)
query := fmt.Sprintf(`SELECT object_id, COUNT(*) FROM user_announcements WHERE user_id = $1 AND object_id IN (%s) GROUP BY object_id`, strings.Join(placeholders, ","))
return s.keyedCount(ctx, query, append([]interface{}{userID}, common.UUIDsToInterfaces(objectIDs)...)...)
}

View File

@ -3,8 +3,10 @@ package storage
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/gofrs/uuid"
_ "github.com/lib/pq"
"github.com/ngerakines/tavern/errors"
@ -22,21 +24,21 @@ type Storage interface {
ObjectStorage
AssetStorage
GetExecutor() QueryExecute
}
type QueryExecute interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
}
type QueryRowCount interface {
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
type QueryContext interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
type Count struct {
Key string
Count int
}
func DefaultStorage(db *sql.DB) Storage {
return pgStorage{
db: db,
@ -45,19 +47,25 @@ func DefaultStorage(db *sql.DB) Storage {
}
type pgStorage struct {
db *sql.DB
db QueryExecute
now nowFunc
}
type transactionScopedWork func(db *sql.Tx) error
type transactionScopedWork func(db QueryExecute) error
type transactionScopedStorage func(storage Storage) error
type nowFunc func() time.Time
func defaultNowFunc() time.Time {
return time.Now().UTC()
}
func runTransactionWithOptions(db *sql.DB, txBody transactionScopedWork) error {
tx, err := db.Begin()
func runTransactionWithOptions(db QueryExecute, txBody transactionScopedWork) error {
realDB, ok := db.(*sql.DB)
if !ok {
return txBody(db)
}
tx, err := realDB.Begin()
if err != nil {
return errors.NewDatabaseTransactionFailedError(err)
}
@ -72,11 +80,40 @@ func runTransactionWithOptions(db *sql.DB, txBody transactionScopedWork) error {
return errors.WrapDatabaseTransactionFailedError(tx.Commit())
}
func TransactionalStorage(ctx context.Context, storage Storage, txBody transactionScopedStorage) error {
executor := storage.GetExecutor()
realDB, ok := executor.(*sql.DB)
if !ok {
return txBody(storage)
}
tx, err := realDB.BeginTx(ctx, nil)
if err != nil {
return errors.NewDatabaseTransactionFailedError(err)
}
err = txBody(pgStorage{
db: tx,
now: defaultNowFunc,
})
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
return errors.NewDatabaseTransactionFailedError(txErr)
}
return err
}
return errors.WrapDatabaseTransactionFailedError(tx.Commit())
}
func (s pgStorage) GetExecutor() QueryExecute {
return s.db
}
func (s pgStorage) RowCount(ctx context.Context, query string, args ...interface{}) (int, error) {
return s.rowCount(s.db, ctx, query, args...)
}
func (s pgStorage) rowCount(qec QueryRowCount, ctx context.Context, query string, args ...interface{}) (int, error) {
func (s pgStorage) rowCount(qec QueryExecute, ctx context.Context, query string, args ...interface{}) (int, error) {
var total int
err := qec.QueryRowContext(ctx, query, args...).Scan(&total)
@ -86,6 +123,57 @@ func (s pgStorage) rowCount(qec QueryRowCount, ctx context.Context, query string
return total, nil
}
func executeContext(qec QueryExecute, ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return qec.ExecContext(ctx, query, args...)
func (s pgStorage) keyedCount(ctx context.Context, query string, args ...interface{}) ([]Count, error) {
results := make([]Count, 0)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var count Count
if err := rows.Scan(&count.Key, &count.Count); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
results = append(results, count)
}
return results, nil
}
func (s pgStorage) keysToUUID(ctx context.Context, query string, args ...interface{}) (map[string]uuid.UUID, error) {
fmt.Println(query)
fmt.Println(args...)
results := make(map[string]uuid.UUID)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var key string
var id uuid.UUID
if err := rows.Scan(&key, &id); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
results[key] = id
}
return results, nil
}
func (s pgStorage) uuidsToPayload(ctx context.Context, query string, args ...interface{}) (map[uuid.UUID]Payload, error) {
results := make(map[uuid.UUID]Payload)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
defer rows.Close()
for rows.Next() {
var key uuid.UUID
var payload Payload
if err := rows.Scan(&key, &payload); err != nil {
return nil, errors.NewSelectQueryFailedError(err)
}
results[key] = payload
}
return results, nil
}

View File

@ -18,60 +18,34 @@ func (h handler) getActivity(c *gin.Context) {
return
}
objectEventID := fmt.Sprintf("https://%s/activity/%s", h.domain, activityUUID)
ctx := c.Request.Context()
pairs, err := h.storage.ObjectPairsByActivityID(c.Request.Context(), []string{objectEventID})
if err != nil || len(pairs) == 0 {
h.notFoundJSON(c, errors.NewNotFoundError(err))
activityID := fmt.Sprintf("https://%s/activity/%s", h.domain, activityUUID)
activityPayload, err := h.storage.ObjectEventPayloadByActivityID(ctx, activityID)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, errors.NewNotFoundError(err))
return
}
h.internalServerErrorJSON(c, err)
return
}
pair := pairs[0]
destinations := storage.CollectJSONDeepStrings(activityPayload,
[]string{"to"}, []string{"cc"}, []string{"object", "to"}, []string{"object", "cc"})
isPublic := false
for _, d := range append(pair.To, pair.Cc...) {
for _, d := range destinations {
if d == "https://www.w3.org/ns/activitystreams#Public" {
isPublic = true
break
}
}
if !isPublic {
h.notFoundJSON(c, errors.NewNotFoundError(fmt.Errorf("activity was not made public")))
h.notFoundJSON(c, errors.NewNotFoundError(fmt.Errorf("activity was not public")))
return
}
response := storage.EmptyPayload()
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["id"] = objectEventID
response["actor"] = pair.ActorID
if len(pair.To) > 0 {
response["to"] = pair.To
}
if len(pair.Cc) > 0 {
response["cc"] = pair.Cc
}
response["published"] = pair.Published.Format("2006-01-02T15:04:05Z")
switch pair.Name {
case "create_note":
response["type"] = "Create"
p, err := storage.PayloadFromString(pair.Object)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
response["object"] = p
case "announce_note":
response["type"] = "Announce"
response["object"] = pair.ObjectID
default:
h.notFoundJSON(c, errors.NewNotFoundError(fmt.Errorf("unexpected activity: %s", pair.Name)))
return
}
h.writeJSONLD(c, http.StatusOK, response)
h.writeJSONLD(c, http.StatusOK, activityPayload)
}

View File

@ -1,12 +1,10 @@
package web
import (
"fmt"
"math"
"net/http"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"go.uber.org/zap"
"github.com/ngerakines/tavern/errors"
@ -52,7 +50,9 @@ func (h handler) actorInfo(c *gin.Context) {
func (h handler) actorFollowers(c *gin.Context) {
name := c.Param("name")
user, err := h.storage.GetUserByName(c.Request.Context(), name)
ctx := c.Request.Context()
user, err := h.storage.GetUserByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
@ -62,7 +62,7 @@ func (h handler) actorFollowers(c *gin.Context) {
return
}
total, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM followers WHERE user_id = $1`, user.ID)
total, err := h.storage.CountFollowing(ctx, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
@ -87,7 +87,7 @@ func (h handler) actorFollowers(c *gin.Context) {
offset := (page - 1) * 20
actors, err := h.storage.ListAcceptedFollowers(c.Request.Context(), user.ID, 20, offset)
actors, err := h.storage.ListAcceptedFollowers(ctx, user.ID, 20, offset)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
@ -111,7 +111,9 @@ func (h handler) actorFollowers(c *gin.Context) {
func (h handler) actorFollowing(c *gin.Context) {
name := c.Param("name")
user, err := h.storage.GetUserByName(c.Request.Context(), name)
ctx := c.Request.Context()
user, err := h.storage.GetUserByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
@ -121,7 +123,7 @@ func (h handler) actorFollowing(c *gin.Context) {
return
}
total, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM following WHERE user_id = $1`, user.ID)
total, err := h.storage.CountFollowing(ctx, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
@ -146,7 +148,7 @@ func (h handler) actorFollowing(c *gin.Context) {
offset := (page - 1) * 20
actors, err := h.storage.ListAcceptedFollowing(c.Request.Context(), user.ID, 20, offset)
actors, err := h.storage.ListAcceptedFollowing(ctx, user.ID, 20, offset)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
@ -172,7 +174,7 @@ func (h handler) actorOutbox(c *gin.Context) {
name := c.Param("name")
user, err := h.storage.GetUserByName(c.Request.Context(), name)
user, err := h.storage.GetUserByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
@ -187,7 +189,7 @@ func (h handler) actorOutbox(c *gin.Context) {
page := intParam(c, "page", 0)
limit := 50
total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_activities WHERE user_id = $1 AND public = TRUE`, user.ID)
total, err := h.storage.CountObjectPayloadsInUserOutbox(ctx, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
@ -227,70 +229,14 @@ func (h handler) actorOutbox(c *gin.Context) {
return
}
uf, err := h.storage.PaginatePublicUserActivity(ctx, user.ID, limit, (page-1)*limit)
objects, err := h.storage.ListObjectPayloadsInUserOutbox(ctx, user.ID, limit, (page-1)*limit)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
var objectEventIDs []uuid.UUID
for _, ufi := range uf {
objectEventIDs = append(objectEventIDs, ufi.ObjectEventID)
}
pairs, err := h.storage.ObjectPairsByObjectEventRowIDs(ctx, objectEventIDs)
if err != nil {
h.hardFail(c, err)
return
}
pairsMap := make(map[uuid.UUID]storage.ObjectEventPair)
for _, pair := range pairs {
pairsMap[pair.ObjectEventRowID] = pair
}
items := make([]map[string]interface{}, 0)
for _, i := range uf {
pair, ok := pairsMap[i.ObjectEventID]
if !ok {
continue
}
activity := storage.EmptyPayload()
activity["id"] = i.ObjectEventID
activity["actor"] = pair.ActorID
if len(pair.To) > 0 {
activity["to"] = pair.To
}
if len(pair.Cc) > 0 {
activity["cc"] = pair.Cc
}
activity["published"] = pair.Published.Format("2006-01-02T15:04:05Z")
switch pair.Name {
case "create_note":
activity["type"] = "Create"
p, err := storage.PayloadFromString(pair.Object)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
activity["object"] = p
case "announce_note":
activity["type"] = "Announce"
activity["object"] = pair.ObjectID
default:
h.notFoundJSON(c, errors.NewNotFoundError(fmt.Errorf("unexpected activity: %s", pair.Name)))
return
}
items = append(items, activity)
}
if len(items) > 0 {
response["orderedItems"] = items
if len(objects) > 0 {
response["orderedItems"] = objects
}
if page > 1 {

View File

@ -2,7 +2,6 @@ package web
import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@ -11,6 +10,7 @@ import (
"time"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/yukimochi/httpsig"
"go.uber.org/zap"
@ -312,9 +312,6 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
activityObject, hasActivityObject := storage.JSONMap(payload, "object")
activityObjectType, hasActivityObjectType := storage.JSONDeepString(payload, "object", "type")
to, _ := storage.JSONStrings(payload, "to")
cc, _ := storage.JSONStrings(payload, "cc")
if !hasActivityID || !hasActorID || !hasActivityObjectID || !hasActivityObject || !hasActivityObjectType {
h.logger.Info("ignoring invalid create note activity",
zap.String("id", activityID),
@ -338,30 +335,31 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
return
}
activityPublished, hasActivityPublished := storage.JSONString(payload, "published")
publishedAt := time.Now()
if hasActivityPublished {
publishedAt, err = time.Parse("2006-01-02T15:04:05Z", activityPublished)
conversation, hasConversation := storage.JSONDeepString(payload, "object", "conversation")
ctx := c.Request.Context()
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityObjectRowID, err := storage.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
publishedAt = time.Now()
return err
}
}
objectPayload, err := json.Marshal(activityObject)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
conversation, _ := storage.JSONDeepString(payload, "object", "conversation")
objectEventID, err := h.storage.RecordCreateNoteEvent(c.Request.Context(), activityID, activityObjectID, actorID, string(objectPayload), conversation, publishedAt, to, cc)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.AddToUserFeed(c.Request.Context(), user.ID, objectEventID)
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
if hasConversation {
_, err = storage.RecordUserConversation(ctx, user.ID, activityRowID, activityObjectRowID, conversation)
if err != nil {
return err
}
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
return
@ -418,9 +416,6 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
actorID, hasActorID := storage.JSONString(payload, "actor")
activityObjectID, hasActivityObjectID := storage.JSONString(payload, "object")
to, _ := storage.JSONStrings(payload, "to")
cc, _ := storage.JSONStrings(payload, "cc")
if !hasActivityID || !hasActorID || !hasActivityObjectID {
h.logger.Info("ignoring invalid announce activity",
zap.String("id", activityID),
@ -432,31 +427,33 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
return
}
activityPublished, hasActivityPublished := storage.JSONString(payload, "published")
publishedAt := time.Now()
if hasActivityPublished {
publishedAt, err = time.Parse("2006-01-02T15:04:05Z", activityPublished)
if err != nil {
publishedAt = time.Now()
}
}
ctx := c.Request.Context()
localActivityPrefix := fmt.Sprintf("https://%s/activity/", h.domain)
if strings.HasPrefix(activityObjectID, localActivityPrefix) {
objectEventID, err := h.storage.RecordObjectEvent(c.Request.Context(), activityID, activityObjectID, actorID, storage.AnnounceNoteObjectEvent, publishedAt, to, cc)
activityObjectRowID, err := uuid.FromString(strings.TrimPrefix(activityObjectID, localActivityPrefix))
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.AddToUserFeed(c.Request.Context(), user.ID, objectEventID)
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
// activity is local, nothing else to do
c.Status(http.StatusOK)
return
}
@ -466,7 +463,7 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
Logger: h.logger,
}
objectBody, objectPayload, err := ac.GetSigned(activityObjectID, h.userActor(user, userActor))
_, objectPayload, err := ac.GetSigned(activityObjectID, h.userActor(user, userActor))
if err != nil {
h.internalServerErrorJSON(c, err)
return
@ -484,15 +481,29 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
return
}
conversation, _ := storage.JSONString(objectPayload, "conversation")
conversation, hasConversation := storage.JSONString(objectPayload, "conversation")
objectEventID, err := h.storage.RecordAnnounceNoteEvent(c.Request.Context(), activityID, objectID, actorID, objectBody, conversation, publishedAt, to, cc)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
err = h.storage.AddToUserFeed(c.Request.Context(), user.ID, objectEventID)
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityObjectRowID, err := storage.RecordObject(ctx, objectPayload, activityObjectID)
if err != nil {
return err
}
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
if hasConversation {
_, err = storage.RecordUserConversation(ctx, user.ID, activityRowID, activityObjectRowID, conversation)
if err != nil {
return err
}
}
return nil
})
if err != nil {
h.internalServerErrorJSON(c, err)
return

View File

@ -16,12 +16,12 @@ func (h handler) apiV1Instance(c *gin.Context) {
h.internalServerErrorJSON(c, err)
return
}
followers, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM followers WHERE user_id = $1`, user.ID)
followers, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM network_graph WHERE user_id = $1`, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
following, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM following WHERE user_id = $1`, user.ID)
following, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM network_graph WHERE user_id = $1`, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err)
return
@ -31,12 +31,12 @@ func (h handler) apiV1Instance(c *gin.Context) {
h.internalServerErrorJSON(c, err)
return
}
activities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_activities`)
activities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_object_events`)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
userActivities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_activities WHERE user_id = $1`, user.ID)
userActivities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_object_events WHERE user_id = $1`, user.ID)
if err != nil {
h.internalServerErrorJSON(c, err)
return

View File

@ -37,7 +37,7 @@ func (h handler) compose(c *gin.Context) {
inReplyTo, err := url.QueryUnescape(c.Query("inReplyTo"))
if err == nil && len(inReplyTo) > 0 {
inReplyToPayload, err := h.storage.ObjectPayload(c.Request.Context(), inReplyTo)
inReplyToPayload, err := h.storage.ObjectPayloadByObjectID(c.Request.Context(), inReplyTo)
if err != nil {
h.flashErrorOrFail(c, h.url("compose"), err)
return
@ -310,7 +310,6 @@ func (h handler) createNote(c *gin.Context) {
createNote["object"] = note
payload := createNote.Bytes()
objectPayload := note.Bytes()
if broadcastTo || broadcastCC || broadcastPeers {
h.logger.Debug("broadcasting")
@ -321,7 +320,7 @@ func (h handler) createNote(c *gin.Context) {
return
}
followerTotal, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM followers WHERE user_id = $1`, user.ID)
followerTotal, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1`, user.ID)
if err != nil {
h.flashErrorOrFail(c, errorPage, err)
return
@ -365,21 +364,31 @@ func (h handler) createNote(c *gin.Context) {
return
}
objectEventID, err := h.storage.RecordCreateNoteEvent(c.Request.Context(), activityURL, noteURL, string(userActorID), string(objectPayload), conversation, now, to, cc)
if err != nil {
h.flashErrorOrFail(c, errorPage, err)
return
}
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityObjectRowID, err := storage.RecordObject(ctx, note, noteURL)
if err != nil {
return err
}
err = h.storage.WatchThread(ctx, user.ID, conversation)
if err != nil {
h.flashErrorOrFail(c, errorPage, err)
return
}
activityRowID, err := storage.RecordObjectEvent(ctx, activityURL, activityObjectRowID, createNote)
if err != nil {
return err
}
err = h.storage.RecordUserActivity(ctx, user.ID, objectEventID, isPublic, firstTags)
_, err = storage.RecordUserObjectEvent(ctx, user.ID, activityRowID, activityObjectRowID, isPublic)
if err != nil {
return err
}
_, err = storage.RecordUserConversation(ctx, user.ID, activityRowID, activityObjectRowID, conversation)
if err != nil {
return err
}
return nil
})
if err != nil {
h.flashErrorOrFail(c, errorPage, err)
h.internalServerErrorJSON(c, err)
return
}
@ -474,15 +483,30 @@ func (h handler) announceNote(c *gin.Context) {
announce["object"] = objectID
// announcePayload := announce.Bytes()
objectEventID, err := h.storage.RecordObjectEvent(ctx, announceID, objectID, string(actor), storage.AnnounceNoteObjectEvent, now, to, cc)
if err != nil {
h.flashErrorOrFail(c, h.url("feed_recent"), err)
return
}
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
objectRowID, err := storage.ObjectRowIDForObjectID(ctx, objectID)
if err != nil {
return err
}
err = h.storage.RecordUserActivity(ctx, user.ID, objectEventID, true, []string{})
activityRowID, err := storage.RecordObjectEvent(ctx, announceID, objectRowID, announce)
if err != nil {
return err
}
_, err = storage.RecordUserObjectEvent(ctx, user.ID, activityRowID, objectRowID, true)
if err != nil {
return err
}
_, err = storage.RecordUserAnnouncement(ctx, user.ID, activityRowID, objectRowID)
if err != nil {
return err
}
return nil
})
if err != nil {
h.flashErrorOrFail(c, h.url("feed_recent"), err)
h.internalServerErrorJSON(c, err)
return
}

View File

@ -13,6 +13,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/storage"
)
@ -60,68 +61,82 @@ func (h handler) viewFeed(c *gin.Context) {
}
ctx := c.Request.Context()
data["feed_view"] = "recent"
data["feed_link"] = "feed_recent"
userActorID := storage.NewActorID(user.Name, h.domain)
page := intParam(c, "page", 1)
limit := 20
total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_feed WHERE user_id = $1`, user.ID)
total, err := h.storage.CountObjectEventPayloadsInFeed(ctx, user.ID)
if err != nil {
h.hardFail(c, err)
return
}
uf, err := h.storage.PaginateUserFeed(ctx, user.ID, limit, (page-1)*limit)
objects, err := h.storage.ListObjectEventPayloadsInFeed(ctx, user.ID, limit, (page-1)*limit)
if err != nil {
h.hardFail(c, err)
return
}
pageCount := math.Ceil(float64(total) / float64(limit))
data["page_count"] = int(pageCount)
data["page"] = page
data["limit"] = limit
var objectEventIDs []uuid.UUID
for _, ufi := range uf {
objectEventIDs = append(objectEventIDs, ufi.ObjectEventID)
}
pairs, err := h.storage.ObjectPairsByObjectEventRowIDs(ctx, objectEventIDs)
if err != nil {
h.hardFail(c, err)
return
}
pairsMap := make(map[uuid.UUID]storage.ObjectEventPair)
actorIDs := make([]string, 0)
objectIDs := make([]string, 0)
for _, pair := range pairs {
pairsMap[pair.ObjectEventRowID] = pair
actorIDs = append(actorIDs, pair.ActorID)
objectIDs = append(objectIDs, pair.ObjectID)
conversations := make([]string, 0)
var found []string
for _, object := range objects {
found = storage.CollectJSONDeepStrings(object, []string{"object"}, []string{"object", "id"}, []string{"object", "inReplyTo"})
objectIDs = append(objectIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"actor"}, []string{"object", "attributedTo"})
actorIDs = append(actorIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"object", "conversation"})
conversations = append(conversations, found...)
}
announcements, err := h.storage.UserAnnouncementsByObject(ctx, string(userActorID), objectIDs)
objectRowIDs, err := h.storage.ObjectRowIDsForObjectIDs(ctx, objectIDs)
if err != nil {
h.hardFail(c, err)
return
}
data["announcements"] = announcements
collectedRowIDs := make([]uuid.UUID, 0, len(objectRowIDs))
for _, id := range objectRowIDs {
collectedRowIDs = append(collectedRowIDs, id)
}
refs, err := h.storage.ObjectPayloadByUUID(ctx, collectedRowIDs)
if err != nil {
h.hardFail(c, err)
return
}
objectsByObjectID := make(map[string]storage.Payload)
for objectID, objectRowID := range objectRowIDs { // map[string]uuid.UUID
payload, hasPayload := refs[objectRowID]
if hasPayload {
objectsByObjectID[objectID] = payload
}
}
announcements, err := h.storage.UserAnnouncementCounts(ctx, user.ID, common.StringToUUIDMapValues(objectRowIDs))
if err != nil {
h.hardFail(c, err)
return
}
announced := make(map[string]bool)
for _, count := range announcements {
if count.Count > 0 {
announced[count.Key] = true
}
}
vf := &viewFeed{storage: h.storage}
err = vf.populate(uf, pairsMap)
err = vf.populateObjectEventPayloads(objects, objectsByObjectID)
if err != nil {
h.hardFail(c, err)
return
}
data["feed"] = vf.feed
if len(uf) > 0 {
data["latest"] = uf[0].CreatedAt.UTC().Unix()
if len(objects) > 0 {
// data["latest"] = objects[0].PublishedAt
}
actors, err := h.storage.ActorsByActorID(ctx, append(actorIDs, vf.actorIDs...))
@ -146,17 +161,23 @@ func (h handler) viewFeed(c *gin.Context) {
pages = append(pages, i)
}
}
data["pages"] = pages
data["actors"] = actorLookup{h.domain, allActors}
ml := &mediaLookup{h.domain, make(map[string]string)}
err = ml.load(ctx, h.storage, vf.mediaURLs)
if err != nil {
h.hardFail(c, err)
return
}
data["feed_view"] = "recent"
data["feed_link"] = "feed_recent"
data["page_count"] = int(pageCount)
data["page"] = page
data["limit"] = limit
data["feed"] = vf.feed
data["pages"] = pages
data["actors"] = actorLookup{h.domain, allActors}
data["media"] = ml
data["announcements"] = announced
if cont = h.saveSession(c, session); !cont {
return
@ -171,68 +192,82 @@ func (h handler) viewMyFeed(c *gin.Context) {
}
ctx := c.Request.Context()
data["feed_view"] = "mine"
data["feed_link"] = "feed_mine"
userActorID := storage.NewActorID(user.Name, h.domain)
page := intParam(c, "page", 1)
limit := 20
total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_activities WHERE user_id = $1`, user.ID)
total, err := h.storage.CountObjectEventPayloadsInUserFeed(ctx, user.ID)
if err != nil {
h.hardFail(c, err)
return
}
uf, err := h.storage.PaginateUserActivity(ctx, user.ID, limit, (page-1)*limit)
objects, err := h.storage.ListObjectEventPayloadsInUserFeed(ctx, user.ID, limit, (page-1)*limit)
if err != nil {
h.hardFail(c, err)
return
}
pageCount := math.Ceil(float64(total) / float64(limit))
data["page_count"] = int(pageCount)
data["page"] = page
data["limit"] = limit
var objectEventIDs []uuid.UUID
for _, ufi := range uf {
objectEventIDs = append(objectEventIDs, ufi.ObjectEventID)
}
pairs, err := h.storage.ObjectPairsByObjectEventRowIDs(ctx, objectEventIDs)
if err != nil {
h.hardFail(c, err)
return
}
pairsMap := make(map[uuid.UUID]storage.ObjectEventPair)
actorIDs := make([]string, 0)
objectIDs := make([]string, 0)
for _, pair := range pairs {
pairsMap[pair.ObjectEventRowID] = pair
actorIDs = append(actorIDs, pair.ActorID)
objectIDs = append(objectIDs, pair.ObjectID)
conversations := make([]string, 0)
var found []string
for _, object := range objects {
found = storage.CollectJSONDeepStrings(object, []string{"object"}, []string{"object", "id"}, []string{"object", "inReplyTo"})
objectIDs = append(objectIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"actor"}, []string{"object", "attributedTo"})
actorIDs = append(actorIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"object", "conversation"})
conversations = append(conversations, found...)
}
announcements, err := h.storage.UserAnnouncementsByObject(ctx, string(userActorID), objectIDs)
objectRowIDs, err := h.storage.ObjectRowIDsForObjectIDs(ctx, objectIDs)
if err != nil {
h.hardFail(c, err)
return
}
data["announcements"] = announcements
collectedRowIDs := make([]uuid.UUID, 0, len(objectRowIDs))
for _, id := range objectRowIDs {
collectedRowIDs = append(collectedRowIDs, id)
}
refs, err := h.storage.ObjectPayloadByUUID(ctx, collectedRowIDs)
if err != nil {
h.hardFail(c, err)
return
}
objectsByObjectID := make(map[string]storage.Payload)
for objectID, objectRowID := range objectRowIDs { // map[string]uuid.UUID
payload, hasPayload := refs[objectRowID]
if hasPayload {
objectsByObjectID[objectID] = payload
}
}
announcements, err := h.storage.UserAnnouncementCounts(ctx, user.ID, common.StringToUUIDMapValues(objectRowIDs))
if err != nil {
h.hardFail(c, err)
return
}
announced := make(map[string]bool)
for _, count := range announcements {
if count.Count > 0 {
announced[count.Key] = true
}
}
vf := &viewFeed{storage: h.storage}
err = vf.populate(uf, pairsMap)
err = vf.populateObjectEventPayloads(objects, objectsByObjectID)
if err != nil {
h.hardFail(c, err)
return
}
data["feed"] = vf.feed
if len(uf) > 0 {
data["latest"] = uf[0].CreatedAt.UTC().Unix()
if len(objects) > 0 {
// data["latest"] = objects[0].PublishedAt
}
actors, err := h.storage.ActorsByActorID(ctx, append(actorIDs, vf.actorIDs...))
@ -257,16 +292,120 @@ func (h handler) viewMyFeed(c *gin.Context) {
pages = append(pages, i)
}
}
data["pages"] = pages
data["actors"] = actorLookup{h.domain, allActors}
ml := &mediaLookup{h.domain, make(map[string]string)}
err = ml.load(ctx, h.storage, vf.mediaURLs)
if err != nil {
h.hardFail(c, err)
return
}
data["feed_view"] = "mine"
data["feed_link"] = "feed_mine"
data["page_count"] = int(pageCount)
data["page"] = page
data["limit"] = limit
data["feed"] = vf.feed
data["pages"] = pages
data["actors"] = actorLookup{h.domain, allActors}
data["media"] = ml
data["announcements"] = announced
data["feed_view"] = "mine"
data["feed_link"] = "feed_mine"
//
// userActorID := storage.NewActorID(user.Name, h.domain)
//
// page := intParam(c, "page", 1)
// limit := 20
//
// total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_activities WHERE user_id = $1`, user.ID)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// uf, err := h.storage.PaginateUserActivity(ctx, user.ID, limit, (page-1)*limit)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// pageCount := math.Ceil(float64(total) / float64(limit))
// data["page_count"] = int(pageCount)
// data["page"] = page
// data["limit"] = limit
//
// var objectEventIDs []uuid.UUID
// for _, ufi := range uf {
// objectEventIDs = append(objectEventIDs, ufi.ObjectEventID)
// }
//
// pairs, err := h.storage.ObjectPairsByObjectEventRowIDs(ctx, objectEventIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// pairsMap := make(map[uuid.UUID]storage.ObjectEventPair)
// actorIDs := make([]string, 0)
// objectIDs := make([]string, 0)
// for _, pair := range pairs {
// pairsMap[pair.ObjectEventRowID] = pair
// actorIDs = append(actorIDs, pair.ActorID)
// objectIDs = append(objectIDs, pair.ObjectID)
// }
//
// announcements, err := h.storage.UserAnnouncementsByObject(ctx, string(userActorID), objectIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// data["announcements"] = announcements
//
// vf := &viewFeed{storage: h.storage}
// err = vf.populate(uf, pairsMap)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// data["feed"] = vf.feed
// if len(uf) > 0 {
// data["latest"] = uf[0].CreatedAt.UTC().Unix()
// }
//
// actors, err := h.storage.ActorsByActorID(ctx, append(actorIDs, vf.actorIDs...))
// if err != nil {
// h.hardFail(c, err)
// return
// }
// var actorRowIDs []uuid.UUID
// for _, actor := range actors {
// actorRowIDs = append(actorRowIDs, actor.ID)
// }
// actorSubjects, err := h.storage.ActorSubjects(ctx, actorRowIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// allActors := h.gatherActors(actors, actorSubjects)
//
// var pages []int
// for i := page - 3; i <= page+3; i++ {
// if i > 0 && i <= int(pageCount) {
// pages = append(pages, i)
// }
// }
// data["pages"] = pages
// data["actors"] = actorLookup{h.domain, allActors}
//
// ml := &mediaLookup{h.domain, make(map[string]string)}
// err = ml.load(ctx, h.storage, vf.mediaURLs)
// if err != nil {
// h.hardFail(c, err)
// }
//
// data["media"] = ml
if cont = h.saveSession(c, session); !cont {
return
@ -281,46 +420,77 @@ func (h handler) viewConversation(c *gin.Context) {
}
ctx := c.Request.Context()
data["feed_view"] = "conversation"
data["feed_link"] = "feed_conversation"
userActorID := storage.NewActorID(user.Name, h.domain)
conversation := c.Query("conversation")
total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_threads WHERE user_id = $1 AND conversation = $2`, user.ID, conversation)
objects, err := h.storage.ListObjectPayloadsInUserConversation(ctx, user.ID, conversation)
if err != nil {
h.hardFail(c, err)
return
}
if total == 0 {
h.notFoundJSON(c, nil)
return
}
objects, err := h.storage.ObjectPayloadsByConversation(ctx, conversation)
actorIDs := make([]string, 0)
objectIDs := make([]string, 0)
conversations := make([]string, 0)
var found []string
for _, object := range objects {
found = storage.CollectJSONDeepStrings(object, []string{"object"}, []string{"object", "id"}, []string{"object", "inReplyTo"})
objectIDs = append(objectIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"actor"}, []string{"object", "attributedTo"})
actorIDs = append(actorIDs, found...)
found = storage.CollectJSONDeepStrings(object, []string{"object", "conversation"})
conversations = append(conversations, found...)
}
objectRowIDs, err := h.storage.ObjectRowIDsForObjectIDs(ctx, objectIDs)
if err != nil {
h.hardFail(c, err)
return
}
collectedRowIDs := make([]uuid.UUID, 0, len(objectRowIDs))
for _, id := range objectRowIDs {
collectedRowIDs = append(collectedRowIDs, id)
}
refs, err := h.storage.ObjectPayloadByUUID(ctx, collectedRowIDs)
if err != nil {
h.hardFail(c, err)
return
}
objectsByObjectID := make(map[string]storage.Payload)
for objectID, objectRowID := range objectRowIDs { // map[string]uuid.UUID
payload, hasPayload := refs[objectRowID]
if hasPayload {
objectsByObjectID[objectID] = payload
}
}
announcements, err := h.storage.UserAnnouncementCounts(ctx, user.ID, common.StringToUUIDMapValues(objectRowIDs))
if err != nil {
h.hardFail(c, err)
return
}
announced := make(map[string]bool)
for _, count := range announcements {
if count.Count > 0 {
announced[count.Key] = true
}
}
vf := &viewFeed{storage: h.storage}
err = vf.populateObjectPayloads(objects)
err = vf.populateObjectEventPayloads(objects, objectsByObjectID)
if err != nil {
h.hardFail(c, err)
return
}
data["feed"] = vf.feed
announcements, err := h.storage.UserAnnouncementsByObject(ctx, string(userActorID), vf.objectIDs)
if err != nil {
h.hardFail(c, err)
return
if len(objects) > 0 {
// data["latest"] = objects[0].PublishedAt
}
data["announcements"] = announcements
actors, err := h.storage.ActorsByActorID(ctx, vf.actorIDs)
actors, err := h.storage.ActorsByActorID(ctx, append(actorIDs, vf.actorIDs...))
if err != nil {
h.hardFail(c, err)
return
@ -336,16 +506,86 @@ func (h handler) viewConversation(c *gin.Context) {
}
allActors := h.gatherActors(actors, actorSubjects)
data["actors"] = actorLookup{h.domain, allActors}
ml := &mediaLookup{h.domain, make(map[string]string)}
err = ml.load(ctx, h.storage, vf.mediaURLs)
if err != nil {
h.hardFail(c, err)
return
}
data["media"] = ml
data["feed_view"] = "conversation"
data["feed_link"] = "feed_conversation"
data["page_count"] = 0
data["feed"] = vf.feed
data["actors"] = actorLookup{h.domain, allActors}
data["media"] = ml
data["announcements"] = announced
// data["feed_view"] = "conversation"
// data["feed_link"] = "feed_conversation"
//
// userActorID := storage.NewActorID(user.Name, h.domain)
//
// conversation := c.Query("conversation")
//
// total, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM user_threads WHERE user_id = $1 AND conversation = $2`, user.ID, conversation)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// if total == 0 {
// h.notFoundJSON(c, nil)
// return
// }
//
// objects, err := h.storage.ObjectPayloadsByConversation(ctx, conversation)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// vf := &viewFeed{storage: h.storage}
// err = vf.populateObjectPayloads(objects)
// if err != nil {
// h.hardFail(c, err)
// return
// }
//
// data["feed"] = vf.feed
//
// announcements, err := h.storage.UserAnnouncementsByObject(ctx, string(userActorID), vf.objectIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// data["announcements"] = announcements
//
// actors, err := h.storage.ActorsByActorID(ctx, vf.actorIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// var actorRowIDs []uuid.UUID
// for _, actor := range actors {
// actorRowIDs = append(actorRowIDs, actor.ID)
// }
// actorSubjects, err := h.storage.ActorSubjects(ctx, actorRowIDs)
// if err != nil {
// h.hardFail(c, err)
// return
// }
// allActors := h.gatherActors(actors, actorSubjects)
//
// data["actors"] = actorLookup{h.domain, allActors}
//
// ml := &mediaLookup{h.domain, make(map[string]string)}
// err = ml.load(ctx, h.storage, vf.mediaURLs)
// if err != nil {
// h.hardFail(c, err)
// }
//
// data["media"] = ml
// data["page_count"] = 0
if cont = h.saveSession(c, session); !cont {
return

View File

@ -45,17 +45,22 @@ func (h handler) dashboardNetwork(c *gin.Context) {
data["user"] = user
data["authenticated"] = true
totalFollowers, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM followers WHERE user_id = $1`, user.ID)
if err != nil {
h.hardFail(c, err)
return
}
followers, err := h.storage.ListAcceptedFollowers(ctx, user.ID, totalFollowers+1, 0)
if err != nil {
h.hardFail(c, err)
return
}
pendingFollowers, err := h.storage.ListPendingFollowers(ctx, user.ID, totalFollowers+1, 0)
var totalFollowers int
var followers []string
var pendingFollowers []string
err = storage.TransactionalStorage(ctx, h.storage, func(txStorage storage.Storage) error {
totalFollowers, err = txStorage.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1`, user.ID)
followers, err = txStorage.ListAcceptedFollowers(ctx, user.ID, totalFollowers+1, 0)
if err != nil {
return err
}
pendingFollowers, err = txStorage.ListPendingFollowers(ctx, user.ID, totalFollowers+1, 0)
if err != nil {
return err
}
return nil
})
if err != nil {
h.hardFail(c, err)
return
@ -64,7 +69,7 @@ func (h handler) dashboardNetwork(c *gin.Context) {
data["followers"] = followers
data["pending_followers"] = pendingFollowers
totalFollowing, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM following WHERE user_id = $1`, user.ID)
totalFollowing, err := h.storage.RowCount(ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1`, user.ID)
if err != nil {
h.hardFail(c, err)
return

View File

@ -24,7 +24,7 @@ func (h handler) getObject(c *gin.Context) {
}
objectID := fmt.Sprintf("https://%s/object/%s", h.domain, objectUUID)
objectPayload, err := h.storage.ObjectPayload(c.Request.Context(), objectID)
objectPayload, err := h.storage.ObjectPayloadByObjectID(c.Request.Context(), objectID)
if err != nil {
h.notFoundJSON(c, errors.NewNotFoundError(err))
return

View File

@ -6,7 +6,6 @@ import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/ngerakines/tavern/storage"
)
@ -64,42 +63,14 @@ func (h handler) getTaggedObjects(c *gin.Context) {
return
}
uf, err := h.storage.PublicUserActivityByTag(ctx, tag, limit, (page-1)*limit)
objects, err := h.storage.ListObjectPayloadsInTagFeed(ctx, tag /*, limit, (page-1)*limit */)
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
var objectEventIDs []uuid.UUID
for _, ufi := range uf {
objectEventIDs = append(objectEventIDs, ufi.ObjectEventID)
}
pairs, err := h.storage.ObjectPairsByObjectEventRowIDs(ctx, objectEventIDs)
if err != nil {
h.hardFail(c, err)
return
}
pairsMap := make(map[uuid.UUID]storage.ObjectEventPair)
for _, pair := range pairs {
pairsMap[pair.ObjectEventRowID] = pair
}
items := make([]map[string]interface{}, 0)
for _, i := range uf {
pair, ok := pairsMap[i.ObjectEventID]
if !ok {
continue
}
p, err := storage.PayloadFromString(pair.Object)
if err != nil {
continue
}
items = append(items, p)
}
if len(items) > 0 {
response["orderedItems"] = items
if len(objects) > 0 {
response["orderedItems"] = objects
}
if page > 1 {

View File

@ -41,7 +41,7 @@ func (h handler) nodeInfoDetails(c *gin.Context) {
return
}
activities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_activities`)
activities, err := h.storage.RowCount(c.Request.Context(), `SELECT COUNT(*) FROM user_object_events`)
if err != nil {
h.internalServerErrorJSON(c, err)
return

View File

@ -1,12 +1,10 @@
package web
import (
"context"
"fmt"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/microcosm-cc/bluemonday"
"github.com/ngerakines/tavern/storage"
@ -25,13 +23,13 @@ type viewFeedMedia struct {
Blurhash string
}
func (v *viewFeed) populateObjectPayloads(objects []storage.Payload) error {
func (v *viewFeed) populateObjectPayloads(objects []storage.Payload, refs map[string]storage.Payload) error {
if v.feed == nil {
v.feed = make([]map[string]interface{}, 0)
}
for _, object := range objects {
if view := v.createNoteViewFromPayload(object); view != nil {
if view := v.createNoteViewFromPayload(object, refs); view != nil {
v.feed = append(v.feed, map[string]interface{}{
"create_note": view,
})
@ -41,69 +39,98 @@ func (v *viewFeed) populateObjectPayloads(objects []storage.Payload) error {
return nil
}
func (v *viewFeed) populate(userFeed []storage.UserFeed, pairs map[uuid.UUID]storage.ObjectEventPair) error {
func (v *viewFeed) populateObjectEventPayloads(objectEvents []storage.Payload, refs map[string]storage.Payload) error {
if v.feed == nil {
v.feed = make([]map[string]interface{}, 0)
}
for _, uf := range userFeed {
pair, ok := pairs[uf.ObjectEventID]
if !ok {
continue
}
switch pair.Name {
case storage.CreateNoteObjectEvent:
if view := v.createNoteView(pair); view != nil {
for _, objectEvent := range objectEvents {
objectEventType, _ := storage.JSONString(objectEvent, "type")
switch objectEventType {
case "Create":
object, _ := storage.JSONMap(objectEvent, "object")
if view := v.createNoteViewFromPayload(object, refs); view != nil {
v.feed = append(v.feed, map[string]interface{}{
"create_note": view,
})
}
case storage.AnnounceNoteObjectEvent:
if view := v.announceNoteView(pair); view != nil {
case "Note":
// object, _ := storage.JSONMap(objectEvent, "object")
if view := v.createNoteViewFromPayload(objectEvent, refs); view != nil {
v.feed = append(v.feed, map[string]interface{}{
"announce_note": view,
"create_note": view,
})
}
default:
return fmt.Errorf("unexpected activity type: %s", pair.Name)
}
}
return nil
}
func (v *viewFeed) announceNoteView(pair storage.ObjectEventPair) map[string]interface{} {
announceNote := make(map[string]interface{})
//
// func (v *viewFeed) populate(userFeed []storage.UserFeed, pairs map[uuid.UUID]storage.ObjectEventPair) error {
// if v.feed == nil {
// v.feed = make([]map[string]interface{}, 0)
// }
//
// for _, uf := range userFeed {
// pair, ok := pairs[uf.ObjectEventID]
// if !ok {
// continue
// }
//
// switch pair.Name {
// case storage.CreateNoteObjectEvent:
// if view := v.createNoteView(pair); view != nil {
// v.feed = append(v.feed, map[string]interface{}{
// "create_note": view,
// })
// }
// case storage.AnnounceNoteObjectEvent:
// if view := v.announceNoteView(pair); view != nil {
// v.feed = append(v.feed, map[string]interface{}{
// "announce_note": view,
// })
// }
// default:
// return fmt.Errorf("unexpected activity type: %s", pair.Name)
// }
// }
//
// return nil
// }
//
// func (v *viewFeed) announceNoteView(pair storage.ObjectEventPair) map[string]interface{} {
// announceNote := make(map[string]interface{})
//
// announceNote["object_id"] = pair.ObjectID
// v.objectIDs = append(v.objectIDs, pair.ObjectID)
//
// p, err := storage.PayloadFromString(pair.Object)
// if err != nil {
// return nil
// }
//
// announceNote["announcer"] = pair.ActorID
// v.actorIDs = append(v.actorIDs, pair.ActorID)
//
// noteView := v.createNoteViewFromPayload(p)
// if noteView != nil {
// announceNote["note"] = noteView
// }
//
// return announceNote
// }
//
// func (v *viewFeed) createNoteView(pair storage.ObjectEventPair) map[string]interface{} {
// p, err := storage.PayloadFromString(pair.Object)
// if err != nil {
// return nil
// }
// return v.createNoteViewFromPayload(p)
// }
announceNote["object_id"] = pair.ObjectID
v.objectIDs = append(v.objectIDs, pair.ObjectID)
p, err := storage.PayloadFromString(pair.Object)
if err != nil {
return nil
}
announceNote["announcer"] = pair.ActorID
v.actorIDs = append(v.actorIDs, pair.ActorID)
noteView := v.createNoteViewFromPayload(p)
if noteView != nil {
announceNote["note"] = noteView
}
return announceNote
}
func (v *viewFeed) createNoteView(pair storage.ObjectEventPair) map[string]interface{} {
p, err := storage.PayloadFromString(pair.Object)
if err != nil {
return nil
}
return v.createNoteViewFromPayload(p)
}
func (v *viewFeed) createNoteViewFromPayload(p storage.Payload) map[string]interface{} {
func (v *viewFeed) createNoteViewFromPayload(p storage.Payload, refs map[string]storage.Payload) map[string]interface{} {
createNote := make(map[string]interface{})
objectID, _ := storage.JSONString(p, "id")
@ -127,9 +154,8 @@ func (v *viewFeed) createNoteViewFromPayload(p storage.Payload) map[string]inter
if inReplyTo, hasInReplyTo := storage.JSONString(p, "inReplyTo"); hasInReplyTo {
createNote["in_reply_to"] = inReplyTo
// TODO: Batch these queries.
replyObj, err := v.storage.ObjectPayload(context.Background(), inReplyTo)
if err == nil {
replyObj, hasReplyObj := refs[inReplyTo]
if hasReplyObj {
inReplyToAuthor, hasInReplyToAuthor := storage.JSONString(replyObj, "attributedTo")
if hasInReplyToAuthor {
createNote["in_reply_to_author"] = inReplyToAuthor