Merge branch 'issue-21-groups' into 'master'

Groups

See merge request ngerakines/tavern!6
This commit is contained in:
Nick Gerakines 2020-04-03 20:57:21 +00:00
commit fc01d0ed22
41 changed files with 6431 additions and 196 deletions

1
.dockerignore Normal file
View File

@ -0,0 +1 @@
*_testing

View File

@ -23,6 +23,8 @@ Activity and object pairs are formatted as `Activity/Object`. For example:
| Undo | Follow | Note |
| Accept | Follow | Note |
| Reject | Follow | Note |
| Follow | Group | |
| Undo | Follow | Group |
# Data Retention
@ -59,6 +61,99 @@ Tavern implements the following behavior for inbound `Delete/Actor` activities:
When a server broadcasts a `Delete/Actor` activity, tavern will tombstone the local actor reference as well as all activities associated with the actor.
# Groups
A group is effectively a distribution list with roles.
Groups have followers but do not follow anyone.
## Joining a group
To join a group, an actor sends a `Follow/Group` activity to the group inbox.
Groups may be configured to auto accept followers.
### Configuration
Group owners may configure the default member role for new followers:
* Viewer (can receive announcements but cannot publish)
* Member (can receive announcements and publish activities)
* Owner (can receive announcements, publish activities, and manage the group)
Group owners can also configure the group to automatically accept new followers.
## Leaving a group
To leave a group, an actor sends a `Undo/Follow/Group` activity to the group inbox.
## Group change notifications
When a member joins or leaves a group, an `Update/Collection` activity is created for the group follower's collection and sent to all members of the group. This activity may be throttled to occur less frequently by instance operators.
## Invitations
Persons may be invited to the group. When an `Invite/Person` activity is published to the group's inbox, the group will accept the next follow request from the invited actor.
```json
{
"@context": "https://www.w3.org/ns/activitystreams",
"summary": "",
"type": "Invite",
"actor": "https://tavern.town/users/nick",
"object": "https://tavern.town/groups/hiking",
"target": "https://tavern.town/users/mason",
"to": [
"https://tavern.town/groups/hiking",
"https://tavern.town/users/mason"
]
}
```
When the actor receives that activity, the actor (sender) and object (group) are cached, and then a `Follow/Group` activity sent to the group from the invited actor. The group that has received the invitation for the actor will respond to the invited actor with an `Accept/Follow` activity.
## Publishing to a group
When an actor sends a `Create/Note` or `Announce/Note` activity to a group's inbox, the group will create an `Announce/Note` activity and publish it to all of the followers of the group.
Activities are published only to group followers, regardless of who the activity is sent to (to, cc, bto, or bcc). The public destination is not added to group announcements.
**Note**: Members must have either the "member" or "owner" role within the group to publish to the group, otherwise the activity will be denied as unauthorized.
### Limitations
Groups record their announcements and will not publish `Announce/Note` activities for objects more than once a day (configurable).
## Use Cases
### Public Group
A "public group" is a group that is configured to auto-accept new followers with the "member" role.
This is useful for:
* Providing
### Broadcast Group
A "broadcast group" is a group that has one central creator of activities, and many followers that are only viewers. New followers may be auto-accepted with the "viewer" role.
This is useful for:
* Providing "top down" updates for projects, teams, or organizations
### Working Group
A "working group" (or committee) is a group has a mix of both viewers and contributors. New followers may be auto-accepted with the "viewer" role.
This is useful for:
* Public project groups where discussions are visible, but contributions limited to a select number of individuals.
### Private Group
A "private group" is a group that is limited to contributors who are invited to the group. New followers are kept as "pending" until accepted.
# Public vs Private Activity
Public activities include the https://www.w3.org/ns/activitystreams#Public destination in the `to` and will always be the first element of that list.

View File

@ -68,7 +68,7 @@ func fileCommandAction(cliCtx *cli.Context) error {
}
defer dbClose()
s := storage.DefaultStorage(db, logger)
s := storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger})
a := Agent{
AssetStorage: FileStorage{Base: "./assets/"},

View File

@ -1 +0,0 @@
package common

View File

@ -8,6 +8,8 @@ import (
"github.com/gofrs/uuid"
)
type FilterFunc func(string) bool
func IntRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
@ -79,3 +81,35 @@ func StringToUUIDMapValues(thing map[string]uuid.UUID) []uuid.UUID {
}
return results
}
func FilterStrings(values []string, includeFilter FilterFunc) []string {
filtered := make([]string, 0, len(values))
for _, value := range values {
if includeFilter(value) {
filtered = append(filtered, value)
}
}
return filtered
}
func StringsIncludeFF(values []string) FilterFunc {
return func(value string) bool {
for _, v := range values {
if v == value {
return true
}
}
return false
}
}
func StringsExcludeFF(values []string) FilterFunc {
return func(value string) bool {
for _, v := range values {
if v == value {
return false
}
}
return true
}
}

View File

@ -4,10 +4,22 @@ import (
"fmt"
)
func DomainPrefix(domain string) string {
return fmt.Sprintf("https://%s/", domain)
}
func GroupActorURLPrefix(domain string) string {
return fmt.Sprintf("https://%s/groups/", domain)
}
func ActorURLPrefix(domain string) string {
return fmt.Sprintf("https://%s/users/", domain)
}
func GroupActorURL(domain string, name interface{}) string {
return fmt.Sprintf("%s%s", GroupActorURLPrefix(domain), name)
}
func ActorURL(domain string, name interface{}) string {
return fmt.Sprintf("%s%s", ActorURLPrefix(domain), name)
}

50
config/groups.go Normal file
View File

@ -0,0 +1,50 @@
package config
import (
"github.com/urfave/cli/v2"
)
type GroupConfig struct {
EnableGroups bool
AllowAutoAcceptGroupFollowers bool
AllowRemoteGroupFollowers bool
DefaultGroupMemberRole int
}
var EnableGroupsFlag = cli.BoolFlag{
Name: "enable-groups",
Usage: "Enable groups",
EnvVars: []string{"ENABLE_GROUPS"},
Value: true,
}
var AllowAutoAcceptGroupFollowersFlag = cli.BoolFlag{
Name: "allow-auto-accept-group-followers",
Usage: "Allow groups to turn on automatically accepting follow requests.",
EnvVars: []string{"ALLOW_AUTO_ACCEPT_GROUP_FOLLOWERS"},
Value: true,
}
var AllowRemoteFollowersFlag = cli.BoolFlag{
Name: "allow-remote-group-followers",
Usage: "Allow non-local users to follow groups.",
EnvVars: []string{"ALLOW_REMOTE_GROUP_FOLLOWERS"},
Value: true,
}
var DefaultGroupMemberRoleFlag = cli.IntFlag{
Name: "default-group-member-role",
Usage: "The default member role for groups.",
EnvVars: []string{"DEFAULT_GROUP_MEMBER_ROLE"},
Value: 1,
}
func NewGroupConfig(cliCtx *cli.Context) (GroupConfig, error) {
cfg := GroupConfig{
EnableGroups: cliCtx.Bool("enable-groups"),
AllowAutoAcceptGroupFollowers: cliCtx.Bool("allow-auto-accept-group-followers"),
AllowRemoteGroupFollowers: cliCtx.Bool("allow-remote-group-followers"),
DefaultGroupMemberRole: cliCtx.Int("default-group-member-role"),
}
return cfg, nil
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -6,7 +6,7 @@
6 ,TAVDAT,DeleteQueryFailed ,The delete query failed ,QueryFailed
# The following was generated by storage_crud.go
# go run errors/storage_crud.go 7 User UserSession UserFeed UserObjectEvent Actor ActorAlias ActorKey NetworkRelationship Object ObjectEvent ObjectReply ObjectTag ObjectThread ObjectBoost Image ImageAlias
# go run errors/storage_crud.go 7 User UserSession UserFeed UserObjectEvent Actor ActorAlias ActorKey NetworkRelationship Object ObjectEvent ObjectReply ObjectTag ObjectThread ObjectBoost Image ImageAlias Group GroupMember
7,TAVDAT,UserQueryFailed,The user query failed.,QueryFailed
8,TAVDAT,UserSelectFailed,The select record operation for user failed.,SelectQueryFailed
9,TAVDAT,UserInsertFailed,The insert record operation for user failed.,InsertQueryFailed
@ -134,4 +134,38 @@
131,TAVDAT,ImageAliasUpdateFailed,The update record operation for image alias failed.,UpdateQueryFailed
132,TAVDAT,ImageAliasDeleteFailed,The delete record operation for image alias failed.,DeleteQueryFailed
133,TAVDAT,InvalidImageAlias,The image alias is invalid.
134,TAVDAT,ImageAliasNotFound,The image alias was not found.,NotFound
134,TAVDAT,ImageAliasNotFound,The image alias was not found.,NotFound
135,TAVDAT,GroupQueryFailed,The group query failed.,QueryFailed
136,TAVDAT,GroupSelectFailed,The select record operation for group failed.,SelectQueryFailed
137,TAVDAT,GroupInsertFailed,The insert record operation for group failed.,InsertQueryFailed
138,TAVDAT,GroupUpsertFailed,The upsert record operation for group failed.,QueryFailed
139,TAVDAT,GroupUpdateFailed,The update record operation for group failed.,UpdateQueryFailed
140,TAVDAT,GroupDeleteFailed,The delete record operation for group failed.,DeleteQueryFailed
141,TAVDAT,InvalidGroup,The group is invalid.
142,TAVDAT,GroupNotFound,The group was not found.,NotFound
143,TAVDAT,GroupMemberQueryFailed,The group member query failed.,QueryFailed
144,TAVDAT,GroupMemberSelectFailed,The select record operation for group member failed.,SelectQueryFailed
145,TAVDAT,GroupMemberInsertFailed,The insert record operation for group member failed.,InsertQueryFailed
146,TAVDAT,GroupMemberUpsertFailed,The upsert record operation for group member failed.,QueryFailed
147,TAVDAT,GroupMemberUpdateFailed,The update record operation for group member failed.,UpdateQueryFailed
148,TAVDAT,GroupMemberDeleteFailed,The delete record operation for group member failed.,DeleteQueryFailed
149,TAVDAT,InvalidGroupMember,The group member is invalid.
150,TAVDAT,GroupMemberNotFound,The group member was not found.,NotFound
# go run errors/storage_crud.go 151 GroupInvitation
151,TAVDAT,GroupInvitationQueryFailed,The group invitation query failed.,QueryFailed
152,TAVDAT,GroupInvitationSelectFailed,The select record operation for group invitation failed.,SelectQueryFailed
153,TAVDAT,GroupInvitationInsertFailed,The insert record operation for group invitation failed.,InsertQueryFailed
154,TAVDAT,GroupInvitationUpsertFailed,The upsert record operation for group invitation failed.,QueryFailed
155,TAVDAT,GroupInvitationUpdateFailed,The update record operation for group invitation failed.,UpdateQueryFailed
156,TAVDAT,GroupInvitationDeleteFailed,The delete record operation for group invitation failed.,DeleteQueryFailed
157,TAVDAT,InvalidGroupInvitation,The group invitation is invalid.
158,TAVDAT,GroupInvitationNotFound,The group invitation was not found.,NotFound
# go run errors/storage_crud.go 159 GroupBoost
159,TAVDAT,GroupBoostQueryFailed,The group boost query failed.,QueryFailed
160,TAVDAT,GroupBoostSelectFailed,The select record operation for group boost failed.,SelectQueryFailed
161,TAVDAT,GroupBoostInsertFailed,The insert record operation for group boost failed.,InsertQueryFailed
162,TAVDAT,GroupBoostUpsertFailed,The upsert record operation for group boost failed.,QueryFailed
163,TAVDAT,GroupBoostUpdateFailed,The update record operation for group boost failed.,UpdateQueryFailed
164,TAVDAT,GroupBoostDeleteFailed,The delete record operation for group boost failed.,DeleteQueryFailed
165,TAVDAT,InvalidGroupBoost,The group boost is invalid.
166,TAVDAT,GroupBoostNotFound,The group boost was not found.,NotFound
1 1 ,TAVDAT,QueryFailed ,The query operation failed.
6 6 ,TAVDAT,DeleteQueryFailed ,The delete query failed ,QueryFailed
7 # The following was generated by storage_crud.go
8 # go run errors/storage_crud.go 7 User UserSession UserFeed UserObjectEvent Actor ActorAlias ActorKey NetworkRelationship Object ObjectEvent ObjectReply ObjectTag ObjectThread ObjectBoost Image ImageAlias # go run errors/storage_crud.go 7 User UserSession UserFeed UserObjectEvent Actor ActorAlias ActorKey NetworkRelationship Object ObjectEvent ObjectReply ObjectTag ObjectThread ObjectBoost Image ImageAlias Group GroupMember
9 7,TAVDAT,UserQueryFailed,The user query failed.,QueryFailed
10 8,TAVDAT,UserSelectFailed,The select record operation for user failed.,SelectQueryFailed
11 9,TAVDAT,UserInsertFailed,The insert record operation for user failed.,InsertQueryFailed
12 10,TAVDAT,UserUpsertFailed,The upsert record operation for user failed.,QueryFailed
134 132,TAVDAT,ImageAliasDeleteFailed,The delete record operation for image alias failed.,DeleteQueryFailed
135 133,TAVDAT,InvalidImageAlias,The image alias is invalid.
136 134,TAVDAT,ImageAliasNotFound,The image alias was not found.,NotFound
137 135,TAVDAT,GroupQueryFailed,The group query failed.,QueryFailed
138 136,TAVDAT,GroupSelectFailed,The select record operation for group failed.,SelectQueryFailed
139 137,TAVDAT,GroupInsertFailed,The insert record operation for group failed.,InsertQueryFailed
140 138,TAVDAT,GroupUpsertFailed,The upsert record operation for group failed.,QueryFailed
141 139,TAVDAT,GroupUpdateFailed,The update record operation for group failed.,UpdateQueryFailed
142 140,TAVDAT,GroupDeleteFailed,The delete record operation for group failed.,DeleteQueryFailed
143 141,TAVDAT,InvalidGroup,The group is invalid.
144 142,TAVDAT,GroupNotFound,The group was not found.,NotFound
145 143,TAVDAT,GroupMemberQueryFailed,The group member query failed.,QueryFailed
146 144,TAVDAT,GroupMemberSelectFailed,The select record operation for group member failed.,SelectQueryFailed
147 145,TAVDAT,GroupMemberInsertFailed,The insert record operation for group member failed.,InsertQueryFailed
148 146,TAVDAT,GroupMemberUpsertFailed,The upsert record operation for group member failed.,QueryFailed
149 147,TAVDAT,GroupMemberUpdateFailed,The update record operation for group member failed.,UpdateQueryFailed
150 148,TAVDAT,GroupMemberDeleteFailed,The delete record operation for group member failed.,DeleteQueryFailed
151 149,TAVDAT,InvalidGroupMember,The group member is invalid.
152 150,TAVDAT,GroupMemberNotFound,The group member was not found.,NotFound
153 # go run errors/storage_crud.go 151 GroupInvitation
154 151,TAVDAT,GroupInvitationQueryFailed,The group invitation query failed.,QueryFailed
155 152,TAVDAT,GroupInvitationSelectFailed,The select record operation for group invitation failed.,SelectQueryFailed
156 153,TAVDAT,GroupInvitationInsertFailed,The insert record operation for group invitation failed.,InsertQueryFailed
157 154,TAVDAT,GroupInvitationUpsertFailed,The upsert record operation for group invitation failed.,QueryFailed
158 155,TAVDAT,GroupInvitationUpdateFailed,The update record operation for group invitation failed.,UpdateQueryFailed
159 156,TAVDAT,GroupInvitationDeleteFailed,The delete record operation for group invitation failed.,DeleteQueryFailed
160 157,TAVDAT,InvalidGroupInvitation,The group invitation is invalid.
161 158,TAVDAT,GroupInvitationNotFound,The group invitation was not found.,NotFound
162 # go run errors/storage_crud.go 159 GroupBoost
163 159,TAVDAT,GroupBoostQueryFailed,The group boost query failed.,QueryFailed
164 160,TAVDAT,GroupBoostSelectFailed,The select record operation for group boost failed.,SelectQueryFailed
165 161,TAVDAT,GroupBoostInsertFailed,The insert record operation for group boost failed.,InsertQueryFailed
166 162,TAVDAT,GroupBoostUpsertFailed,The upsert record operation for group boost failed.,QueryFailed
167 163,TAVDAT,GroupBoostUpdateFailed,The update record operation for group boost failed.,UpdateQueryFailed
168 164,TAVDAT,GroupBoostDeleteFailed,The delete record operation for group boost failed.,DeleteQueryFailed
169 165,TAVDAT,InvalidGroupBoost,The group boost is invalid.
170 166,TAVDAT,GroupBoostNotFound,The group boost was not found.,NotFound
171

View File

@ -44,6 +44,19 @@ func (client ActivityClient) GetSigned(location string, localActor storage.Local
return ldJsonGetSigned(client.HTTPClient, location, signer, localActor.Actor.GetKeyID(), privateKey)
}
func (client ActivityClient) GetSignedWithKey(location, keyID string, privateKey *rsa.PrivateKey) (string, storage.Payload, error) {
client.Logger.Debug("Sending signed activity request", zap.String("url", location))
sigConfig := []httpsig.Algorithm{httpsig.RSA_SHA256}
headersToSign := []string{httpsig.RequestTarget, "date"}
signer, _, err := httpsig.NewSigner(sigConfig, headersToSign, httpsig.Signature)
if err != nil {
return "", nil, err
}
return ldJsonGetSigned(client.HTTPClient, location, signer, keyID, privateKey)
}
func ldJsonGet(client common.HTTPClient, location string) (string, storage.Payload, error) {
request, err := http.NewRequest("GET", location, nil)
if err != nil {

4
go.mod
View File

@ -27,10 +27,10 @@ require (
github.com/sslhound/herr v1.4.1 // indirect
github.com/stretchr/testify v1.4.0
github.com/teacat/noire v1.0.0
github.com/urfave/cli/v2 v2.1.1
github.com/urfave/cli/v2 v2.2.0
github.com/yukimochi/httpsig v0.1.3
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20200221170553-0f24fbd83dfb
golang.org/x/tools v0.0.0-20200228224639-71482053b885 // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
gopkg.in/yaml.v2 v2.2.8
)

2
go.sum
View File

@ -415,6 +415,8 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k=
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.6.0/go.mod h1:FstJa9V+Pj9vQ7OJie2qMHdwemEDaDiSdBnvPM1Su9w=

276
janitor/command.go Normal file
View File

@ -0,0 +1,276 @@
package janitor
import (
"crypto/md5"
"encoding/hex"
"fmt"
"io/ioutil"
"path/filepath"
"strings"
"time"
"github.com/urfave/cli/v2"
"gopkg.in/yaml.v2"
)
var SkeletonCommand = cli.Command{
Name: "skeleton",
Usage: "Generate a skeleton for ",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "destination",
Usage: "The location to write files to.",
Value: "./",
},
&cli.StringFlag{
Name: "prefix",
Usage: "Set the prefix of the site.",
Value: "",
},
&cli.StringFlag{
Name: "domain",
Usage: "Set the domain of the site.",
Required: true,
},
&cli.StringFlag{
Name: "tag",
Usage: "The container tag to use.",
Value: "latest",
},
&cli.IntFlag{
Name: "web-port",
Usage: "The port to run web on",
Value: 9000,
},
&cli.IntFlag{
Name: "publisher-port",
Usage: "The port to run web on",
Value: 9200,
},
&cli.IntFlag{
Name: "svger-port",
Usage: "The port to run svger on",
Value: 9100,
},
},
Action: skeletonCommandAction,
}
func skeletonCommandAction(cliCtx *cli.Context) error {
prefix := cliCtx.String("prefix")
dbEnvFile := fmt.Sprintf("%sdb.env", prefix)
webEnvFile := fmt.Sprintf("%sweb.env", prefix)
initSqlFile := fmt.Sprintf("%sinit.sql", prefix)
dockerComposeFile := fmt.Sprintf("%sdocker-compose.yml", prefix)
webPort := cliCtx.Int("web-port")
publisherPort := cliCtx.Int("publisher-port")
svgerPort := cliCtx.Int("svger-port")
internalNetwork := fmt.Sprintf("%sinternal_network", prefix)
externalNetwork := fmt.Sprintf("%sexternal_network", prefix)
dbServiceName := "db"
svgerServiceName := "svger"
webServiceName := "web"
publisherServiceName := "publisher"
dockerComposeConfig := make(map[string]interface{})
services := make(map[string]interface{})
dbServiceConfig := make(map[string]interface{})
dbServiceConfig["restart"] = "on-failure"
dbServiceConfig["image"] = "postgres:12-alpine"
dbServiceConfig["networks"] = []string{internalNetwork}
dbServiceConfig["volumes"] = []string{
fmt.Sprintf("./%s:/docker-entrypoint-initdb.d/10-init.sql", initSqlFile),
fmt.Sprintf("./%spostgres:/var/lib/postgresql/data", prefix),
}
dbServiceConfig["env_file"] = []string{
fmt.Sprintf("./%s", dbEnvFile),
}
services[dbServiceName] = dbServiceConfig
svgerServiceConfig := make(map[string]interface{})
svgerServiceConfig["restart"] = "on-failure"
svgerServiceConfig["image"] = "ngerakines/svger:1.1.0"
svgerServiceConfig["networks"] = []string{
internalNetwork,
}
svgerServiceConfig["ports"] = []string{
fmt.Sprintf("%d:%d", svgerPort, svgerPort),
}
svgerServiceConfig["environment"] = []string{
fmt.Sprintf("PORT=%d", svgerPort),
}
services[svgerServiceName] = svgerServiceConfig
webServiceConfig := make(map[string]interface{})
webServiceConfig["restart"] = "on-failure"
webServiceConfig["image"] = fmt.Sprintf("ngerakines/tavern:%s", cliCtx.String("tag"))
webServiceConfig["networks"] = []string{
externalNetwork,
internalNetwork,
}
webServiceConfig["ports"] = []string{
fmt.Sprintf("%d:%d", webPort, webPort),
}
webServiceConfig["depends_on"] = []string{
dbServiceName,
svgerServiceName,
publisherServiceName,
}
webServiceConfig["env_file"] = []string{
fmt.Sprintf("./%s", webEnvFile),
}
webServiceConfig["volumes"] = []string{
fmt.Sprintf("./%sassets:/assets", prefix),
}
services[webServiceName] = webServiceConfig
publisherServiceConfig := make(map[string]interface{})
publisherServiceConfig["restart"] = "on-failure"
publisherServiceConfig["image"] = fmt.Sprintf("ngerakines/tavern:%s", cliCtx.String("tag"))
publisherServiceConfig["networks"] = []string{
internalNetwork,
externalNetwork,
}
publisherServiceConfig["command"] = []string{
"publisher",
}
publisherServiceConfig["ports"] = []string{
fmt.Sprintf("%d:%d", publisherPort, publisherPort),
}
publisherServiceConfig["environment"] = []string{
fmt.Sprintf("LISTEN=0.0.0.0:%d", publisherPort),
fmt.Sprintf("PUBLISHER_CALLBACK=http://web:%d/webhooks/publisher", webPort),
}
services[publisherServiceName] = publisherServiceConfig
dockerComposeConfig["services"] = services
dockerComposeConfig["version"] = "3"
dockerComposeConfig["networks"] = map[string]interface{}{
externalNetwork: nil,
internalNetwork: map[string]interface{}{
"internal": true,
},
}
d, err := yaml.Marshal(&dockerComposeConfig)
if err != nil {
return err
}
dbPasswordH := md5.New()
dbPasswordH.Write([]byte(time.Now().String()))
dbPasswordH.Write([]byte("database password"))
webSecretH := md5.New()
webSecretH.Write([]byte(time.Now().String()))
webSecretH.Write([]byte("web secret"))
dbPassword := hex.EncodeToString(dbPasswordH.Sum(nil))
webSecret := hex.EncodeToString(webSecretH.Sum(nil))
var dbEnvFileContent strings.Builder
dbEnvFileContent.WriteString(fmt.Sprintf("POSTGRES_PASSWORD=%s\n", dbPassword))
var webEnvFileContent strings.Builder
for _, line := range []string{
"ENABLE_SENTRY=false",
"ENABLE_SVGER=true",
"ENABLE_PUBLISHER=true",
"ENABLE_GROUPS=true",
fmt.Sprintf("SECRET=%s", webSecret),
fmt.Sprintf("DOMAIN=%s", cliCtx.String("domain")),
fmt.Sprintf("DATABASE=postgresql://postgres:%s@%s:5432/tavern?sslmode=disable", dbPassword, dbServiceName),
fmt.Sprintf("SVGER=http://%s:%d/", svgerServiceName, svgerPort),
fmt.Sprintf("LISTEN=0.0.0.0:%d", webPort),
"ASSET_STORAGE_FILE_BASE=/assets/",
"ASSET_STORAGE_REMOTE_DENY=*",
"ALLOW_REPLY_COLLECTION_UPDATES=true",
fmt.Sprintf("PUBLISHER=http://%s:%d/", publisherServiceName, publisherPort),
"ALLOW_OBJECT_FOLLOW=true",
"ALLOW_AUTO_ACCEPT_FOLLOWERS=true",
"ALLOW_INBOX_FORWARDING=true",
"ENABLE_GROUPS=true",
"ALLOW_AUTO_ACCEPT_GROUP_FOLLOWERS=true",
"ALLOW_REMOTE_GROUP_FOLLOWERS=true",
"DEFAULT_GROUP_MEMBER_ROLE=1",
} {
webEnvFileContent.WriteString(line)
webEnvFileContent.WriteString("\n")
}
var initSqlFileContent strings.Builder
for _, line := range []string{
`CREATE DATABASE tavern;`,
`CREATE EXTENSION IF NOT EXISTS "uuid-ossp";`,
} {
initSqlFileContent.WriteString(line)
initSqlFileContent.WriteString("\n")
}
var runFileContent strings.Builder
for _, line := range []string{
"#!/bin/sh",
"",
fmt.Sprintf(`docker-compose -f %s up -d db svger publisher`, dockerComposeFile),
"sleep 30",
fmt.Sprintf(`docker-compose -f %s run web migrate`, dockerComposeFile),
"sleep 5",
fmt.Sprintf(`docker-compose -f %s run web init --admin-email=nick.gerakines@gmail.com --admin-password=password --admin-name=nick`, dockerComposeFile),
fmt.Sprintf(`docker-compose -f %s up -d`, dockerComposeFile),
fmt.Sprintf(`docker-compose -f %s logs --tail=50 -f`, dockerComposeFile),
} {
runFileContent.WriteString(line)
runFileContent.WriteString("\n")
}
var restartFileContent strings.Builder
for _, line := range []string{
"#!/bin/sh",
"",
fmt.Sprintf(`docker-compose -f %s up -d`, dockerComposeFile),
fmt.Sprintf(`docker-compose -f %s logs --tail=50 -f`, dockerComposeFile),
} {
restartFileContent.WriteString(line)
restartFileContent.WriteString("\n")
}
runLoc := filepath.Join(cliCtx.String("destination"), fmt.Sprintf("%srun.sh", prefix))
restartLoc := filepath.Join(cliCtx.String("destination"), fmt.Sprintf("%srestart.sh", prefix))
dbEnvLoc := filepath.Join(cliCtx.String("destination"), dbEnvFile)
webEnvLoc := filepath.Join(cliCtx.String("destination"), webEnvFile)
initSqlLoc := filepath.Join(cliCtx.String("destination"), initSqlFile)
dcLoc := filepath.Join(cliCtx.String("destination"), dockerComposeFile)
err = ioutil.WriteFile(runLoc, []byte(runFileContent.String()), 0744)
if err != nil {
return err
}
err = ioutil.WriteFile(restartLoc, []byte(restartFileContent.String()), 0744)
if err != nil {
return err
}
err = ioutil.WriteFile(dbEnvLoc, []byte(dbEnvFileContent.String()), 0644)
if err != nil {
return err
}
err = ioutil.WriteFile(webEnvLoc, []byte(webEnvFileContent.String()), 0644)
if err != nil {
return err
}
err = ioutil.WriteFile(initSqlLoc, []byte(initSqlFileContent.String()), 0644)
if err != nil {
return err
}
return ioutil.WriteFile(dcLoc, d, 0644)
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ngerakines/tavern/asset"
"github.com/ngerakines/tavern/g"
"github.com/ngerakines/tavern/janitor"
"github.com/ngerakines/tavern/json"
"github.com/ngerakines/tavern/migrations"
"github.com/ngerakines/tavern/publisher"
@ -53,6 +54,7 @@ func main() {
&migrations.Command,
&json.DebugJSONLDCommand,
&publisher.Command,
&janitor.SkeletonCommand,
}
sort.Sort(cli.FlagsByName(app.Flags))

View File

@ -0,0 +1 @@
DROP TABLE groups, group_members;

View File

@ -0,0 +1,63 @@
create table if not exists public.groups
(
id uuid not null
constraint groups_pk primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
name varchar not null,
public_key text not null,
private_key text not null,
display_name varchar not null,
about text not null,
accept_followers boolean default true not null,
default_member_role integer default 0 not null,
allow_remote boolean default true not null,
owner uuid not null,
actor_id uuid not null,
constraint groups_name_uindex
unique (name)
);
create table if not exists public.group_members
(
id uuid not null
constraint group_followers_pk
primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
group_actor_id uuid not null,
member_actor_id uuid not null,
activity jsonb not null,
relationship_status integer default 0 not null,
member_role integer default 0 not null,
constraint group_members_uindex
unique (group_actor_id, member_actor_id)
);
create table if not exists public.group_invitations
(
id uuid not null
constraint group_invitations_pk
primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
group_actor_id uuid not null,
member_actor_id uuid not null,
activity jsonb not null,
role integer default 0 not null,
constraint group_invitations_uindex
unique (group_actor_id, member_actor_id)
);
create table if not exists public.group_boosts
(
id uuid not null
constraint group_boosts_pk
primary key,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
group_actor_id uuid not null,
object_id uuid not null,
constraint group_boosts_uindex
unique (group_actor_id, object_id)
);

View File

@ -105,7 +105,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
ctx := context.Background()
txErr := storage.TransactionalStorage(ctx, storage.DefaultStorage(db, logger), func(s storage.Storage) error {
txErr := storage.TransactionalStorage(ctx, storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger}), func(s storage.Storage) error {
name := cliCtx.String("admin-name")
displayName := cliCtx.String("admin-displayname")

View File

@ -29,6 +29,7 @@ type ActorStorage interface {
GetActorByAlias(ctx context.Context, subject string) (*Actor, error)
ActorSubjects(ctx context.Context, actors []uuid.UUID) ([]ActorAlias, error)
ActorAliasSubjectExists(ctx context.Context, alias string) (bool, error)
FilterGroupsByActorID(ctx context.Context, actorIDs []string) ([]string, error)
}
type Actor struct {
@ -151,20 +152,20 @@ func actorAliasesSelectQuery(join string, where []string) string {
}
func (s pgStorage) GetActor(ctx context.Context, id uuid.UUID) (*Actor, error) {
return s.getFirstActor(s.db, ctx, actorsSelectQuery("", []string{"a.id = $1"}), id)
return s.getFirstActor(ctx, actorsSelectQuery("", []string{"a.id = $1"}), id)
}
func (s pgStorage) GetActorByActorID(ctx context.Context, actorID string) (*Actor, error) {
return s.getFirstActor(s.db, ctx, actorsSelectQuery("", []string{"a.actor_id = $1"}), actorID)
return s.getFirstActor(ctx, actorsSelectQuery("", []string{"a.actor_id = $1"}), actorID)
}
func (s pgStorage) GetActorByAlias(ctx context.Context, alias string) (*Actor, error) {
query := actorsSelectQuery("actor_aliases aa ON a.id = aa.actor_id", []string{"aa.alias = $1"})
return s.getFirstActor(s.db, ctx, query, alias)
return s.getFirstActor(ctx, query, alias)
}
func (s pgStorage) getFirstActor(qc QueryExecute, ctx context.Context, query string, args ...interface{}) (*Actor, error) {
results, err := s.getActors(qc, ctx, query, args...)
func (s pgStorage) getFirstActor(ctx context.Context, query string, args ...interface{}) (*Actor, error) {
results, err := s.getActors(ctx, query, args...)
if err != nil {
return nil, err
}
@ -174,9 +175,9 @@ func (s pgStorage) getFirstActor(qc QueryExecute, ctx context.Context, query str
return results[0], nil
}
func (s pgStorage) getActors(qc QueryExecute, ctx context.Context, query string, args ...interface{}) ([]*Actor, error) {
func (s pgStorage) getActors(ctx context.Context, query string, args ...interface{}) ([]*Actor, error) {
results := make([]*Actor, 0)
rows, err := qc.QueryContext(ctx, query, args...)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
@ -252,7 +253,7 @@ func (s pgStorage) ActorsByActorID(ctx context.Context, actorIDs []string) ([]*A
query := actorsSelectQuery("", []string{
fmt.Sprintf("a.actor_id IN (%s)", valuesPlaceholder),
})
return s.getActors(s.db, ctx, query, common.StringsToInterfaces(actorIDs)...)
return s.getActors(ctx, query, common.StringsToInterfaces(actorIDs)...)
}
func (s pgStorage) ActorRowIDForActorID(ctx context.Context, actorID string) (uuid.UUID, error) {
@ -333,6 +334,42 @@ func ActorFromUserInfo(name, displayName, domain, publicKey string, privateKey *
return actor
}
func ActorFromGroupInfo(name, displayName, domain, publicKey string, privateKey *rsa.PrivateKey) Payload {
actor := EmptyPayload()
actor["@context"] = "https://www.w3.org/ns/activitystreams"
actor["id"] = common.GroupActorURL(domain, name)
actor["inbox"] = fmt.Sprintf("%s/inbox", common.GroupActorURL(domain, name))
actor["outbox"] = fmt.Sprintf("%s/outbox", common.GroupActorURL(domain, name))
actor["name"] = displayName
actor["preferredUsername"] = name
actor["summary"] = ""
actor["type"] = "Group"
actor["url"] = common.GroupActorURL(domain, name)
actor["followers"] = fmt.Sprintf("%s/followers", common.GroupActorURL(domain, name))
actor["following"] = fmt.Sprintf("%s/following", common.GroupActorURL(domain, name))
n := privateKey.PublicKey.N.Bytes()
e := big.NewInt(int64(privateKey.PublicKey.E)).Bytes()
fingerPrint := md5.New()
fingerPrint.Write(n)
fingerPrint.Write(e)
keyID := hex.EncodeToString(fingerPrint.Sum(nil))
key := EmptyPayload()
key["id"] = fmt.Sprintf("%s#%s", common.GroupActorURL(domain, name), keyID)
key["owner"] = common.GroupActorURL(domain, name)
key["publicKeyPem"] = publicKey
actor["publicKey"] = key
return actor
}
func (s pgStorage) ActorAliasSubjectExists(ctx context.Context, alias string) (bool, error) {
return s.wrappedExists(errors.WrapActorAliasQueryFailedError, ctx, `SELECT COUNT(*) FROM actor_aliases WHERE alias = $1 AND alias_type = 0`, alias)
}
func (s pgStorage) FilterGroupsByActorID(ctx context.Context, actorIDs []string) ([]string, error) {
query := fmt.Sprintf(`SELECT actor_id FROM actors WHERE payload->>'type' = 'Group' AND actor_id in (%s)`, strings.Join(common.DollarForEach(len(actorIDs)), ","))
return s.selectStrings(errors.WrapActorQueryFailedError, ctx, query, common.StringsToInterfaces(actorIDs)...)
}

View File

@ -53,7 +53,7 @@ func (s pgStorage) CreateImage(ctx context.Context, location, checksum, blur str
var img ImageAsset
now := s.now()
txErr := runTransactionWithOptions(s.db, func(tx QueryExecute) error {
checksumCount, err := s.rowCount(tx, ctx, `SELECT COUNT(*) FROM images WHERE checksum = $1`, checksum)
checksumCount, err := s.rowCount(ctx, `SELECT COUNT(*) FROM images WHERE checksum = $1`, checksum)
if err != nil {
return errors.WrapImageQueryFailedError(err)
}

66
storage/db.go Normal file
View File

@ -0,0 +1,66 @@
package storage
import (
"context"
"database/sql"
"go.uber.org/zap"
)
type QueryExecute interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
type TransactionSQLDriver struct {
Driver *sql.Tx
}
type LoggingSQLDriver struct {
Driver QueryExecute
Logger *zap.Logger
}
type SQLDriver struct {
Driver *sql.DB
}
func (d LoggingSQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
d.Logger.Debug("ExecContext", zap.String("query", query), zap.Reflect("args", args))
return d.Driver.ExecContext(ctx, query, args...)
}
func (d LoggingSQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
d.Logger.Debug("QueryContext", zap.String("query", query), zap.Reflect("args", args))
return d.Driver.QueryContext(ctx, query, args...)
}
func (d LoggingSQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
d.Logger.Debug("QueryRowContext", zap.String("query", query), zap.Reflect("args", args))
return d.Driver.QueryRowContext(ctx, query, args...)
}
func (d SQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return d.Driver.ExecContext(ctx, query, args...)
}
func (d SQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return d.Driver.QueryContext(ctx, query, args...)
}
func (d SQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
return d.Driver.QueryRowContext(ctx, query, args...)
}
func (d TransactionSQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
return d.Driver.ExecContext(ctx, query, args...)
}
func (d TransactionSQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
return d.Driver.QueryContext(ctx, query, args...)
}
func (d TransactionSQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
return d.Driver.QueryRowContext(ctx, query, args...)
}

341
storage/groups.go Normal file
View File

@ -0,0 +1,341 @@
package storage
import (
"context"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"strings"
"time"
"github.com/gofrs/uuid"
"github.com/ngerakines/tavern/errors"
)
type GroupStorage interface {
CountGroupMembers(ctx context.Context, groupActorRowID uuid.UUID) (int, error)
GetGroupByID(ctx context.Context, groupRowID uuid.UUID) (Group, error)
GetGroupByName(ctx context.Context, name string) (Group, error)
GetGroupsByOwner(ctx context.Context, userRowID uuid.UUID) ([]Group, error)
GroupActorsForMemberActorRowID(ctx context.Context, groupActorRowID uuid.UUID) ([]*Actor, error)
GroupMemberActorIDs(ctx context.Context, groupActorRowID uuid.UUID) ([]string, error)
GroupMemberActorsForGroupActorID(ctx context.Context, groupActorRowID uuid.UUID) ([]*Actor, error)
GroupMemberCanInvite(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error)
GroupMemberCanSubmit(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error)
GroupNameAvailable(context.Context, string) (bool, error)
IsActorInGroup(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error)
IsActorInvitedToGroup(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error)
ListGroups(ctx context.Context) ([]Group, error)
MinutesSinceGroupBoost(ctx context.Context, groupActorRowID, objectRowID uuid.UUID) (int, error)
RecordGroup(ctx context.Context, userRowID, actorRowID uuid.UUID, name, publicKey, privateKey, displayName, about string) (uuid.UUID, error)
RecordGroupAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userRowID, actorRowID uuid.UUID, name, publicKey, privateKey, displayName, about string) (uuid.UUID, error)
RecordGroupBoost(ctx context.Context, groupActorRowID, objectRowID uuid.UUID) (uuid.UUID, error)
RecordGroupBoostAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, objectRowID uuid.UUID) (uuid.UUID, error)
RecordGroupInvitation(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (uuid.UUID, error)
RecordGroupInvitationAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, memberActorRowID uuid.UUID) (uuid.UUID, error)
RecordGroupMember(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, activity Payload, status RelationshipStatus, role GroupRole) (uuid.UUID, error)
RecordGroupMemberAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, memberActorRowID uuid.UUID, activity Payload, status RelationshipStatus, role GroupRole) (uuid.UUID, error)
RemoveGroupInvitation(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) error
RemoveGroupMember(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) error
UpdateGroupMemberRole(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, role GroupRole) error
UpdateGroupMemberStatus(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, status RelationshipStatus) error
UpdateGroupDefaultRole(ctx context.Context, groupActorRowID uuid.UUID, role GroupRole) error
UpdateGroupAcceptFollowers(ctx context.Context, groupRowID uuid.UUID, acceptFollowers bool) error
UpdateGroupAllowRemote(ctx context.Context, groupRowID uuid.UUID, allowRemote bool) error
}
type Group struct {
ID uuid.UUID
CreatedAt time.Time
UpdatedAt time.Time
Name string
PublicKey string
PrivateKey string
DisplayName string
About string
AcceptFollowers bool
DefaultMemberRole GroupRole
AllowRemote bool
OwnerID uuid.UUID
ActorID uuid.UUID
}
type GroupRole int16
const (
// GroupOwner can manage the group.
GroupOwner GroupRole = 2
// GroupMember can post to the group.
GroupMember GroupRole = 1
// GroupViewer can receive group activities, but cannot create group activities.
GroupViewer GroupRole = 0
)
var groupFields = []string{
"id",
"created_at",
"updated_at",
"name",
"public_key",
"private_key",
"display_name",
"about",
"accept_followers",
"default_member_role",
"allow_remote",
"owner",
"actor_id",
}
func (g Group) GetPrivateKey() (*rsa.PrivateKey, error) {
block, _ := pem.Decode([]byte(g.PrivateKey))
if block == nil {
return nil, errors.New("invalid RSA PEM")
}
key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return key, nil
}
func (g Group) GetDecodedPublicKey() (*rsa.PublicKey, error) {
return DecodePublicKey(g.PublicKey)
}
func (s pgStorage) RecordGroup(ctx context.Context, userRowID, actorRowID uuid.UUID, name, publicKey, privateKey, displayName, about string) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordGroupAll(ctx, rowID, now, now, userRowID, actorRowID, name, publicKey, privateKey, displayName, about)
}
func (s pgStorage) RecordGroupAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, userRowID, actorRowID uuid.UUID, name, publicKey, privateKey, displayName, about string) (uuid.UUID, error) {
query := `INSERT INTO groups (id, created_at, updated_at, name, public_key, private_key, display_name, about, owner, actor_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT ON CONSTRAINT groups_name_uindex DO UPDATE set updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, name, publicKey, privateKey, displayName, about, userRowID, actorRowID).Scan(&id)
return id, errors.WrapGroupUpsertFailedError(err)
}
func (s pgStorage) RecordGroupMember(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, activity Payload, status RelationshipStatus, role GroupRole) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordGroupMemberAll(ctx, rowID, now, now, groupActorRowID, memberActorRowID, activity, status, role)
}
func (s pgStorage) RecordGroupMemberAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, memberActorRowID uuid.UUID, activity Payload, status RelationshipStatus, role GroupRole) (uuid.UUID, error) {
query := `INSERT INTO group_members (id, created_at, updated_at, group_actor_id, member_actor_id, activity, relationship_status, member_role) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT ON CONSTRAINT group_members_uindex DO UPDATE set updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, groupActorRowID, memberActorRowID, activity, status, role).Scan(&id)
return id, errors.WrapGroupMemberUpsertFailedError(err)
}
func (s pgStorage) GetGroupByName(ctx context.Context, name string) (Group, error) {
query := fmt.Sprintf("SELECT %s FROM groups WHERE name = $1", strings.Join(groupFields, ", "))
return s.getFirstGroup(ctx, query, name)
}
func (s pgStorage) GetGroupByID(ctx context.Context, groupRowID uuid.UUID) (Group, error) {
query := fmt.Sprintf("SELECT %s FROM groups WHERE id = $1", strings.Join(groupFields, ", "))
return s.getFirstGroup(ctx, query, groupRowID)
}
func (s pgStorage) GetGroupsByOwner(ctx context.Context, userRowID uuid.UUID) ([]Group, error) {
query := fmt.Sprintf("SELECT %s FROM groups WHERE owner = $1", strings.Join(groupFields, ", "))
return s.getGroups(ctx, query, userRowID)
}
func (s pgStorage) ListGroups(ctx context.Context) ([]Group, error) {
query := fmt.Sprintf("SELECT %s FROM groups ORDER BY name ASC", strings.Join(groupFields, ", "))
return s.getGroups(ctx, query)
}
func (s pgStorage) getFirstGroup(ctx context.Context, query string, args ...interface{}) (Group, error) {
groups, err := s.getGroups(ctx, query, args...)
if err != nil {
return Group{}, err
}
if len(groups) == 0 {
return Group{}, errors.NewGroupNotFoundError(nil)
}
return groups[0], nil
}
func (s pgStorage) getGroups(ctx context.Context, query string, args ...interface{}) ([]Group, error) {
results := make([]Group, 0)
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, errors.NewGroupSelectFailedError(err)
}
defer rows.Close()
for rows.Next() {
var group Group
err = rows.Scan(&group.ID,
&group.CreatedAt,
&group.UpdatedAt,
&group.Name,
&group.PublicKey,
&group.PrivateKey,
&group.DisplayName,
&group.About,
&group.AcceptFollowers,
&group.DefaultMemberRole,
&group.AllowRemote,
&group.OwnerID,
&group.ActorID)
if err != nil {
return nil, errors.NewInvalidGroupError(err)
}
results = append(results, group)
}
return results, nil
}
func (s pgStorage) GroupNameAvailable(ctx context.Context, name string) (bool, error) {
queries := []string{
`SELECT COUNT(*) FROM users WHERE name = $1`,
`SELECT COUNT(*) FROM groups WHERE name = $1`,
}
for _, query := range queries {
count, err := s.wrappedRowCount(errors.WrapUserQueryFailedError, ctx, query, name)
if err != nil {
return false, err
}
if count > 0 {
return false, nil
}
}
return true, nil
}
func (s pgStorage) CountGroupMembers(ctx context.Context, groupActorRowID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapGroupQueryFailedError, ctx, `SELECT COUNT(*) FROM group_members WHERE group_actor_id = $1`, groupActorRowID)
}
func (s pgStorage) GroupMemberActorIDs(ctx context.Context, groupActorRowID uuid.UUID) ([]string, error) {
query := `SELECT a.actor_id FROM group_members gm INNER JOIN actors a ON a.id = gm.member_actor_id WHERE gm.group_actor_id = $1 ORDER BY a.actor_id ASC`
rows, err := s.db.QueryContext(ctx, query, groupActorRowID)
if err != nil {
return nil, errors.NewGroupMemberSelectFailedError(err)
}
defer rows.Close()
var actors []string
for rows.Next() {
var actor string
if err := rows.Scan(&actor); err != nil {
return nil, errors.NewGroupMemberSelectFailedError(errors.NewInvalidGroupMemberError(err))
}
actors = append(actors, actor)
}
return actors, nil
}
func (s pgStorage) GroupActorsForMemberActorRowID(ctx context.Context, memberActorRowID uuid.UUID) ([]*Actor, error) {
query := actorsSelectQuery("group_members gm ON a.id = gm.group_actor_id", []string{"gm.member_actor_id = $1"})
return s.getActors(ctx, query, memberActorRowID)
}
func (s pgStorage) GroupMemberActorsForGroupActorID(ctx context.Context, groupActorRowID uuid.UUID) ([]*Actor, error) {
query := actorsSelectQuery("group_members gm ON a.id = gm.member_actor_id", []string{"gm.group_actor_id = $1"})
return s.getActors(ctx, query, groupActorRowID)
}
func (s pgStorage) UpdateGroupMemberStatus(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, status RelationshipStatus) error {
query := "UPDATE group_members SET relationship_status = $4, updated_at = $3 WHERE group_actor_id = $1 AND member_actor_id = $2"
now := s.now()
_, err := s.db.ExecContext(ctx, query, groupActorRowID, memberActorRowID, now, status)
return errors.WrapUserUpdateFailedError(err)
}
func (s pgStorage) UpdateGroupMemberRole(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, role GroupRole) error {
now := s.now()
_, err := s.db.ExecContext(ctx, "UPDATE group_members SET member_role = $4, updated_at = $3 WHERE group_actor_id = $1 AND member_actor_id = $2", groupActorRowID, memberActorRowID, now, role)
return errors.WrapUserUpdateFailedError(err)
}
func (s pgStorage) RemoveGroupMember(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) error {
query := `DELETE FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2`
_, err := s.db.ExecContext(ctx, query, groupActorRowID, memberActorRowID)
return errors.WrapGroupMemberUpdateFailedError(err)
}
func (s pgStorage) GroupMemberCanSubmit(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error) {
query := `SELECT COUNT(*) FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2 AND member_role IN ($3, $4)`
return s.wrappedExists(errors.WrapGroupMemberQueryFailedError, ctx, query, groupActorRowID, memberActorRowID, GroupMember, GroupOwner)
}
func (s pgStorage) GroupMemberCanInvite(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error) {
query := `SELECT COUNT(*) FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2 AND member_role IN ($3)`
return s.wrappedExists(errors.WrapGroupMemberQueryFailedError, ctx, query, groupActorRowID, memberActorRowID, GroupOwner)
}
func (s pgStorage) RecordGroupInvitation(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordGroupInvitationAll(ctx, rowID, now, now, groupActorRowID, memberActorRowID)
}
func (s pgStorage) RecordGroupInvitationAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, memberActorRowID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO group_invitations (id, created_at, updated_at, group_actor_id, member_actor_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT group_invitations_uindex DO UPDATE set updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, groupActorRowID, memberActorRowID).Scan(&id)
return id, errors.WrapGroupInvitationUpsertFailedError(err)
}
func (s pgStorage) IsActorInvitedToGroup(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error) {
return s.wrappedExists(errors.WrapGroupInvitationQueryFailedError, ctx, `SELECT COUNT(*) FROM group_invitations WHERE group_actor_id = $1 AND member_actor_id = $2`, groupActorRowID, memberActorRowID)
}
func (s pgStorage) RemoveGroupInvitation(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) error {
_, err := s.db.ExecContext(ctx, `DELETE FROM group_invitations WHERE group_actor_id = $1 AND member_actor_id = $2`, groupActorRowID, memberActorRowID)
return errors.WrapGroupInvitationDeleteFailedError(err)
}
func (s pgStorage) IsActorInGroup(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) (bool, error) {
return s.wrappedExists(errors.WrapGroupMemberQueryFailedError, ctx, `SELECT COUNT(*) FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2`, groupActorRowID, memberActorRowID)
}
func (s pgStorage) RecordGroupBoost(ctx context.Context, groupActorRowID, objectRowID uuid.UUID) (uuid.UUID, error) {
rowID := NewV4()
now := s.now()
return s.RecordGroupInvitationAll(ctx, rowID, now, now, groupActorRowID, objectRowID)
}
func (s pgStorage) RecordGroupBoostAll(ctx context.Context, rowID uuid.UUID, createdAt, updatedAt time.Time, groupActorRowID, objectRowID uuid.UUID) (uuid.UUID, error) {
query := `INSERT INTO group_boosts (id, created_at, updated_at, group_actor_id, object_id) VALUES ($1, $2, $3, $4, $5) ON CONFLICT ON CONSTRAINT group_boosts_uindex DO UPDATE set updated_at = $3 RETURNING id`
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, rowID, createdAt, updatedAt, groupActorRowID, objectRowID).Scan(&id)
return id, errors.WrapGroupInvitationUpsertFailedError(err)
}
func (s pgStorage) MinutesSinceGroupBoost(ctx context.Context, groupActorRowID, objectRowID uuid.UUID) (int, error) {
query := `SELECT (DATE_PART('day', NOW() - updated_at::timestamp) * 24 + DATE_PART('hour', NOW() - updated_at::timestamp)) * 60 + DATE_PART('minute', NOW() - updated_at::timestamp) FROM group_boosts WHERE group_actor_id = $1 AND object_id = $2`
return s.wrappedSelectInt(errors.WrapGroupBoostQueryFailedError, ctx, query, groupActorRowID, objectRowID)
}
func (s pgStorage) UpdateGroupDefaultRole(ctx context.Context, groupRowID uuid.UUID, role GroupRole) error {
now := s.now()
_, err := s.db.ExecContext(ctx, "UPDATE groups SET default_member_role = $3, updated_at = $2 WHERE id = $1", groupRowID, now, role)
return errors.WrapUserUpdateFailedError(err)
}
func (s pgStorage) UpdateGroupAcceptFollowers(ctx context.Context, groupRowID uuid.UUID, acceptFollowers bool) error {
now := s.now()
_, err := s.db.ExecContext(ctx, "UPDATE groups SET accept_followers = $3, updated_at = $2 WHERE id = $1", groupRowID, now, acceptFollowers)
return errors.WrapUserUpdateFailedError(err)
}
func (s pgStorage) UpdateGroupAllowRemote(ctx context.Context, groupRowID uuid.UUID, allowRemote bool) error {
now := s.now()
_, err := s.db.ExecContext(ctx, "UPDATE groups SET allow_remote = $3, updated_at = $2 WHERE id = $1", groupRowID, now, allowRemote)
return errors.WrapUserUpdateFailedError(err)
}

View File

@ -19,29 +19,29 @@ type InstanceStatsStorage interface {
}
func (s pgStorage) CountUserFollowers(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowedByRelationship)
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowedByRelationship)
}
func (s pgStorage) CountUserFollowing(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowsRelationship)
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowsRelationship)
}
func (s pgStorage) CountUsers(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapUserQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM users`)
return s.wrappedRowCount(errors.WrapUserQueryFailedError, ctx, `SELECT COUNT(*) FROM users`)
}
func (s pgStorage) CountUsersLastMonth(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapUserQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM users WHERE last_auth_at > NOW() - INTERVAL '31 days'`)
return s.wrappedRowCount(errors.WrapUserQueryFailedError, ctx, `SELECT COUNT(*) FROM users WHERE last_auth_at > NOW() - INTERVAL '31 days'`)
}
func (s pgStorage) CountUsersLastHalfYear(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapUserQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM users WHERE last_auth_at > NOW() - INTERVAL '183 days'`)
return s.wrappedRowCount(errors.WrapUserQueryFailedError, ctx, `SELECT COUNT(*) FROM users WHERE last_auth_at > NOW() - INTERVAL '183 days'`)
}
func (s pgStorage) CountObjectEvents(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM user_object_events`)
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events`)
}
func (s pgStorage) CountUserObjectEvents(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapUserObjectEventQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM user_object_events WHERE user_id = $1`, userID)
return s.wrappedRowCount(errors.WrapUserObjectEventQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events WHERE user_id = $1`, userID)
}

View File

@ -70,6 +70,14 @@ func JSONString(document map[string]interface{}, key string) (string, bool) {
return "", false
}
func FirstJSONString(document map[string]interface{}, key string) (string, bool) {
values, ok := JSONStrings(document, key)
if len(values) > 0 {
return values[0], ok
}
return "", ok
}
func JSONStrings(document map[string]interface{}, key string) ([]string, bool) {
results := make([]string, 0)
value, ok := document[key]

View File

@ -191,24 +191,24 @@ func (s pgStorage) RemoveFollower(ctx context.Context, userID, actorID uuid.UUID
}
func (s pgStorage) IsInNetwork(ctx context.Context, userID, actorID uuid.UUID) (bool, error) {
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2`, userID, actorID)
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2`, userID, actorID)
return c > 0, err
}
func (s pgStorage) IsFollowing(ctx context.Context, userID, actorID uuid.UUID) (bool, error) {
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, UserFollowsRelationship)
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, UserFollowsRelationship)
return c == 1, err
}
func (s pgStorage) IsFollower(ctx context.Context, userID, actorID uuid.UUID) (bool, error) {
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, UserFollowedByRelationship)
c, err := s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND actor_id = $2 AND relationship_type = $3`, userID, actorID, UserFollowedByRelationship)
return c == 1, err
}
func (s pgStorage) CountFollowers(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowedByRelationship)
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowedByRelationship)
}
func (s pgStorage) CountFollowing(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowsRelationship)
return s.wrappedRowCount(errors.WrapNetworkRelationshipQueryFailedError, ctx, `SELECT COUNT(*) FROM network_graph WHERE user_id = $1 AND relationship_type = $2`, userID, UserFollowsRelationship)
}

View File

@ -8,7 +8,6 @@ import (
"time"
"github.com/gofrs/uuid"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
@ -78,7 +77,7 @@ func (s pgStorage) ListObjectPayloadsByObjectIDs(ctx context.Context, objectIDs
}
func (s pgStorage) CountObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, s.db, ctx, `SELECT COUNT(oe.payload) FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1`, userID)
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(oe.payload) FROM user_feed uf INNER JOIN object_events oe on oe.id = uf.activity_id WHERE uf.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
@ -87,7 +86,7 @@ func (s pgStorage) ListObjectEventPayloadsInFeed(ctx context.Context, userID uui
}
func (s pgStorage) CountObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.user_id = $1`, userID)
return s.wrappedRowCount(errors.WrapObjectEventQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.user_id = $1`, userID)
}
func (s pgStorage) ListObjectEventPayloadsInUserFeed(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
@ -96,7 +95,7 @@ func (s pgStorage) ListObjectEventPayloadsInUserFeed(ctx context.Context, userID
}
func (s pgStorage) CountObjectPayloadsInLocalFeed(ctx context.Context) (int, error) {
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.public = true`)
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_object_events uoe WHERE uoe.public = true`)
}
func (s pgStorage) ListObjectPayloadsInLocalFeed(ctx context.Context, limit int, offset int) ([]Payload, error) {
@ -106,7 +105,7 @@ func (s pgStorage) ListObjectPayloadsInLocalFeed(ctx context.Context, limit int,
func (s pgStorage) CountObjectPayloadsInTagFeed(ctx context.Context, tag string) (int, error) {
query := `SELECT COUNT(o.payload) FROM user_object_events uoe INNER JOIN object_events oe on oe.id = uoe.activity_id INNER JOIN objects o on oe.object_id = o.id INNER JOIN object_tags ot on ot.object_id = o.id WHERE uoe.public = true AND ot.tag = $1`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, s.db, ctx, query, tag)
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, tag)
}
func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string) ([]Payload, error) {
@ -116,7 +115,7 @@ func (s pgStorage) ListObjectPayloadsInTagFeed(ctx context.Context, tag string)
func (s pgStorage) CountObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN object_replies r on o.id = r.object_id WHERE r.parent_object_id = $1`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, s.db, ctx, query, objectID)
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, objectID)
}
func (s pgStorage) ListObjectPayloadsInObjectReplies(ctx context.Context, objectID uuid.UUID, limit int, offset int) ([]Payload, error) {
@ -126,7 +125,7 @@ func (s pgStorage) ListObjectPayloadsInObjectReplies(ctx context.Context, object
func (s pgStorage) CountObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID) (int, error) {
query := `SELECT COUNT(o.payload) FROM objects o INNER JOIN user_object_events uoe ON uoe.object_id = o.id WHERE uoe.user_id = $1 AND uoe.public = true`
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, s.db, ctx, query, userID)
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, query, userID)
}
func (s pgStorage) ListObjectPayloadsInUserOutbox(ctx context.Context, userID uuid.UUID, limit int, offset int) ([]Payload, error) {
@ -135,7 +134,7 @@ func (s pgStorage) ListObjectPayloadsInUserOutbox(ctx context.Context, userID uu
}
func (s pgStorage) CountObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID) (int, error) {
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, s.db, ctx, `SELECT COUNT(*) FROM user_conversations uc WHERE uc.user_id = $1 AND uc.thread_id = $2`, userID, threadID)
return s.wrappedRowCount(errors.WrapObjectQueryFailedError, ctx, `SELECT COUNT(*) FROM user_conversations uc WHERE uc.user_id = $1 AND uc.thread_id = $2`, userID, threadID)
}
func (s pgStorage) ListObjectPayloadsInUserConversation(ctx context.Context, userID, threadID uuid.UUID, limit, offset int) ([]Payload, error) {
@ -149,7 +148,6 @@ func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID u
}
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
s.logger.Debug("object payloads query", zap.String("query", query), zap.Reflect("args", args))
var results []Payload
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
@ -388,7 +386,7 @@ func (s pgStorage) RecordObjectSubscriptionAll(ctx context.Context, rowID uuid.U
func (s pgStorage) ActorsSubscribedToObject(ctx context.Context, objectRowID uuid.UUID) ([]*Actor, error) {
query := actorsSelectQuery("object_subscriptions os on a.id = os.actor_id", []string{"os.object_id = $1"})
return s.getActors(s.db, ctx, query, objectRowID)
return s.getActors(ctx, query, objectRowID)
}
func (s pgStorage) RemoveObjectSubscription(ctx context.Context, objectRowID, actorRowID uuid.UUID) error {

View File

@ -7,7 +7,6 @@ import (
"github.com/gofrs/uuid"
_ "github.com/lib/pq"
"go.uber.org/zap"
"github.com/ngerakines/tavern/errors"
)
@ -22,15 +21,9 @@ type Storage interface {
ObjectStorage
AssetStorage
InstanceStatsStorage
GroupStorage
GetExecutor() QueryExecute
GetLogger() *zap.Logger
}
type QueryExecute interface {
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}
type Count struct {
@ -38,18 +31,16 @@ type Count struct {
Count int
}
func DefaultStorage(db *sql.DB, logger *zap.Logger) Storage {
func DefaultStorage(db QueryExecute) Storage {
return pgStorage{
db: db,
now: defaultNowFunc,
logger: logger,
db: db,
now: defaultNowFunc,
}
}
type pgStorage struct {
db QueryExecute
now nowFunc
logger *zap.Logger
db QueryExecute
now nowFunc
}
type transactionScopedWork func(db QueryExecute) error
@ -60,28 +51,16 @@ func defaultNowFunc() time.Time {
return time.Now().UTC()
}
// deprecated
func runTransactionWithOptions(db QueryExecute, txBody transactionScopedWork) error {
realDB, ok := db.(*sql.DB)
if !ok {
return txBody(db)
}
tx, err := realDB.Begin()
if err != nil {
return errors.NewDatabaseTransactionFailedError(err)
}
err = txBody(tx)
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
return errors.NewDatabaseTransactionFailedError(txErr)
}
return err
}
return errors.WrapDatabaseTransactionFailedError(tx.Commit())
return txBody(db)
}
func TransactionalStorage(ctx context.Context, storage Storage, txBody transactionScopedStorage) error {
if _, ok := storage.GetExecutor().(TransactionSQLDriver); ok {
return txBody(storage)
}
executor := storage.GetExecutor()
realDB, ok := executor.(*sql.DB)
if !ok {
@ -94,9 +73,8 @@ func TransactionalStorage(ctx context.Context, storage Storage, txBody transacti
}
err = txBody(pgStorage{
db: tx,
now: defaultNowFunc,
logger: storage.GetLogger(),
db: TransactionSQLDriver{Driver: tx},
now: defaultNowFunc,
})
if err != nil {
if txErr := tx.Rollback(); txErr != nil {
@ -111,24 +89,18 @@ func (s pgStorage) GetExecutor() QueryExecute {
return s.db
}
func (s pgStorage) GetLogger() *zap.Logger {
return s.logger
}
func (s pgStorage) RowCount(ctx context.Context, query string, args ...interface{}) (int, error) {
return s.wrappedRowCount(errors.WrapQueryFailedError, s.db, ctx, query, args...)
return s.wrappedRowCount(errors.WrapQueryFailedError, ctx, query, args...)
}
func (s pgStorage) rowCount(qec QueryExecute, ctx context.Context, query string, args ...interface{}) (int, error) {
return s.wrappedRowCount(errors.WrapQueryFailedError, qec, ctx, query, args...)
func (s pgStorage) rowCount(ctx context.Context, query string, args ...interface{}) (int, error) {
return s.wrappedRowCount(errors.WrapQueryFailedError, ctx, query, args...)
}
func (s pgStorage) wrappedRowCount(ew errors.ErrorWrapper, qec QueryExecute, ctx context.Context, query string, args ...interface{}) (int, error) {
s.logger.Debug("row count query", zap.String("query", query), zap.Reflect("args", args))
func (s pgStorage) wrappedRowCount(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (int, error) {
var total int
err := qec.QueryRowContext(ctx, query, args...).Scan(&total)
err := s.db.QueryRowContext(ctx, query, args...).Scan(&total)
if err != nil {
return -1, ew(err)
}
@ -136,8 +108,6 @@ func (s pgStorage) wrappedRowCount(ew errors.ErrorWrapper, qec QueryExecute, ctx
}
func (s pgStorage) wrappedExists(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (bool, error) {
s.logger.Debug("row count query", zap.String("query", query), zap.Reflect("args", args))
var total int
err := s.db.QueryRowContext(ctx, query, args...).Scan(&total)
if err != nil {
@ -147,8 +117,6 @@ func (s pgStorage) wrappedExists(ew errors.ErrorWrapper, ctx context.Context, qu
}
func (s pgStorage) wrappedSelectUUID(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (uuid.UUID, error) {
s.logger.Debug("select UUID query", zap.String("query", query), zap.Reflect("args", args))
var id uuid.UUID
err := s.db.QueryRowContext(ctx, query, args...).Scan(&id)
if err != nil {
@ -160,6 +128,18 @@ func (s pgStorage) wrappedSelectUUID(ew errors.ErrorWrapper, ctx context.Context
return id, nil
}
func (s pgStorage) wrappedSelectInt(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (int, error) {
var value int
err := s.db.QueryRowContext(ctx, query, args...).Scan(&value)
if err != nil {
if err == sql.ErrNoRows {
return -1, ew(errors.NewNotFoundError(err))
}
return -1, ew(err)
}
return value, nil
}
func (s pgStorage) keyedCount(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) ([]Count, error) {
results := make([]Count, 0)
rows, err := s.db.QueryContext(ctx, query, args...)
@ -252,3 +232,23 @@ func (s pgStorage) toUUIDMultiMap(ew errors.ErrorWrapper, ctx context.Context, q
}
return results, nil
}
func (s pgStorage) selectStrings(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) ([]string, error) {
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, ew(err)
}
defer rows.Close()
var results []string
for rows.Next() {
var value string
if err := rows.Scan(&value); err != nil {
return nil, ew(err)
}
results = append(results, value)
}
return results, nil
}

72
templates/groups.html Normal file
View File

@ -0,0 +1,72 @@
{{define "head"}}{{end}}
{{define "footer_script"}}{{end}}
{{define "content"}}
<div class="row pt-3">
<div class="col">
<p class="lead">
Groups are special actors that announce posts by other members. Groups make it possible to broadcast
content to other actors without everyone in the group knowing about the other actors ahead of time. They
fill the need for providing context, similar to how tags work, audiance, because it rebroadcasts
content, and distribution, similar to traditional mailing lists or user groups.
</p>
<p>
To send a message to a group, make sure the group's inbox is a recipient of the activity. The easy way
to do this is to include the group's network identifier (like <code>@tavern-updates@tavern.town</code>)
in the message, but you can also use the advanced view of the compose note form to include the group in
the to, cc, bto, or bcc of the create activity. Alternatively, you can also create <code>Announce</code>
activities and publish them to the group to have the group do the same.
</p>
<p>
<span class="text-danger"><strong>Important!</strong></span>
Groups do not manage privacy or access controls for content. Please be mindful of who activities are
being sent to when publishing federated content.
</p>
</div>
</div>
<div class="row pt-3">
<div class="col">
<h3>Create Group</h3>
<form method="POST" action="{{ url "groups_create" }}" id="createGroup">
<div class="form-group">
<label for="createGroup">Group Name</label>
<input type="text" class="form-control" id="createGroup" name="name"
placeholder="tavern-updates" required>
<small class="form-text text-muted">
Groups use the same namespace as users. You cannot create a group that has the same "name" as an
existing user, and inversely, you cannot create a user with the same name as an existing group.
</small>
</div>
<input type="submit" class="btn btn-dark" name="submit" value="Create"/>
</form>
</div>
</div>
<div class="row pt-3">
<div class="col">
<p class="lead">
To follow a group, go to the <a href="{{ url "network" }}">Network</a> page and follow the group's
actor.
</p>
</div>
</div>
<div class="row pt-3">
<div class="col">
<h1>Groups You Own</h1>
<table class="table table-striped">
<thead>
<tr>
<th>Group</th>
</tr>
</thead>
<tbody>
{{ range .groups }}
<tr>
<td>
{{ .ActorID }}
</td>
</tr>
{{ end }}
</tbody>
</table>
</div>
</div>
{{end}}

View File

@ -40,6 +40,9 @@
<li class="nav-item">
<a class="nav-link" href="{{ url "network" }}">Network</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url "groups" }}">Groups</a>
</li>
<li class="nav-item">
<a class="nav-link" href="{{ url "profile" (.user.Name) }}">Profile</a>
</li>

View File

@ -48,6 +48,41 @@
</td>
</tr>
{{ end }}
{{ range .groups }}
<tr>
<td><span class="badge badge-success">Group</span> {{ . }}</td>
<td>
<form method="post" action="{{ url "network_unfollow" }}">
<input type="hidden" name="actor" value="{{ . }}"/>
<input class="btn btn-sm btn-danger" type="submit" name="submit" value="Unfollow"/>
</form>
</td>
</tr>
{{ end }}
{{ range .pending_groups }}
<tr>
<td>
<span class="badge badge-success">Group</span>
<span class="badge badge-danger">Pending</span>
{{ . }}
</td>
<td class="d-flex">
<div>
<form method="post" action="{{ url "network_follow" }}">
<input type="hidden" name="actor" value="{{ . }}"/>
<input class="btn btn-sm btn-outline-primary" type="submit" name="submit"
value="Retry"/>
</form>
</div>
<div class="pl-2">
<form method="post" action="{{ url "network_unfollow" }}">
<input type="hidden" name="actor" value="{{ . }}"/>
<input class="btn btn-sm btn-danger" type="submit" name="submit" value="Unfollow"/>
</form>
</div>
</td>
</tr>
{{ end }}
</tbody>
</table>
</div>

View File

@ -684,6 +684,166 @@
"key": "TAVDATAAAAAAD0",
"trans": "The image alias was not found."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD1",
"trans": "The group query failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD2",
"trans": "The select record operation for group failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD3",
"trans": "The insert record operation for group failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD4",
"trans": "The upsert record operation for group failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD5",
"trans": "The update record operation for group failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD6",
"trans": "The delete record operation for group failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD7",
"trans": "The group is invalid."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD8",
"trans": "The group was not found."
},
{
"locale": "en",
"key": "TAVDATAAAAAAD9",
"trans": "The group member query failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEA",
"trans": "The select record operation for group member failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEB",
"trans": "The insert record operation for group member failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEC",
"trans": "The upsert record operation for group member failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAED",
"trans": "The update record operation for group member failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEE",
"trans": "The delete record operation for group member failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEF",
"trans": "The group member is invalid."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEG",
"trans": "The group member was not found."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEH",
"trans": "The group invitation query failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEI",
"trans": "The select record operation for group invitation failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEJ",
"trans": "The insert record operation for group invitation failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEK",
"trans": "The upsert record operation for group invitation failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEL",
"trans": "The update record operation for group invitation failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEM",
"trans": "The delete record operation for group invitation failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEN",
"trans": "The group invitation is invalid."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEO",
"trans": "The group invitation was not found."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEP",
"trans": "The group boost query failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEQ",
"trans": "The select record operation for group boost failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAER",
"trans": "The insert record operation for group boost failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAES",
"trans": "The upsert record operation for group boost failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAET",
"trans": "The update record operation for group boost failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEU",
"trans": "The delete record operation for group boost failed."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEV",
"trans": "The group boost is invalid."
},
{
"locale": "en",
"key": "TAVDATAAAAAAEW",
"trans": "The group boost was not found."
},
{
"locale": "en",
"key": "TAVWEBAAAAAAAB",

View File

@ -71,6 +71,11 @@ var Command = cli.Command{
&config.EnablePublisherFlag,
&config.PublisherLocationFlag,
&config.EnableGroupsFlag,
&config.AllowAutoAcceptGroupFollowersFlag,
&config.AllowRemoteFollowersFlag,
&config.DefaultGroupMemberRoleFlag,
},
Action: serverCommandAction,
}
@ -106,6 +111,11 @@ func serverCommandAction(cliCtx *cli.Context) error {
return err
}
groupConfig, err := config.NewGroupConfig(cliCtx)
if err != nil {
return err
}
publisherConfig := config.NewPublisherConfig(cliCtx)
if sentryConfig.Enabled {
@ -129,7 +139,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
}
defer dbClose()
s := storage.DefaultStorage(db, logger)
s := storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger})
utrans, err := config.Trans(cliCtx)
if err != nil {
@ -248,6 +258,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
domain: domain,
sentryConfig: sentryConfig,
fedConfig: fedConfig,
groupConfig: groupConfig,
webFingerQueue: webFingerQueue,
crawlQueue: crawlQueue,
assetQueue: assetQueue,
@ -311,13 +322,24 @@ func serverCommandAction(cliCtx *cli.Context) error {
authenticated.GET("/object/:object", h.getObject)
authenticated.GET("/tags/:tag", h.getTaggedObjects)
actor := authenticated.Group("/users")
userActor := authenticated.Group("/users")
{
actor.GET("/:name", h.actorInfo)
actor.POST("/:name/inbox", h.actorInbox)
actor.GET("/:name/outbox", h.actorOutbox)
actor.GET("/:name/following", h.actorFollowing)
actor.GET("/:name/followers", h.actorFollowers)
userActor.GET("/:name", h.userActorInfo)
userActor.POST("/:name/inbox", h.userActorInbox)
userActor.GET("/:name/outbox", h.userActorOutbox)
userActor.GET("/:name/following", h.userActorFollowing)
userActor.GET("/:name/followers", h.userActorFollowers)
}
if groupConfig.EnableGroups {
groupActor := authenticated.Group("/groups")
{
groupActor.GET("/:name", h.groupActorInfo)
groupActor.POST("/:name/inbox", h.groupActorInbox)
groupActor.GET("/:name/outbox", h.groupActorOutbox)
groupActor.GET("/:name/following", h.groupActorFollowing)
groupActor.GET("/:name/followers", h.groupActorFollowers)
}
}
authenticated.GET("/", h.home)
@ -328,11 +350,26 @@ func serverCommandAction(cliCtx *cli.Context) error {
authenticated.GET("/feed/mine", h.viewMyFeed)
authenticated.GET("/feed/local", h.viewLocalFeed)
authenticated.GET("/network", h.dashboardNetwork)
authenticated.POST("/network/follow", h.networkFollow)
authenticated.POST("/network/unfollow", h.networkUnfollow)
authenticated.POST("/network/accept", h.networkAccept)
authenticated.POST("/network/reject", h.networkReject)
manage := authenticated.Group("/manage")
{
manageNetwork := manage.Group("/network")
{
manageNetwork.GET("/", h.dashboardNetwork)
manageNetwork.POST("/follow", h.networkFollow)
manageNetwork.POST("/unfollow", h.networkUnfollow)
manageNetwork.POST("/accept", h.networkAccept)
manageNetwork.POST("/reject", h.networkReject)
}
if groupConfig.EnableGroups {
manageGroups := manage.Group("/groups")
{
manageGroups.GET("/", h.dashboardGroups)
manageGroups.POST("/create", h.dashboardGroupsCreate)
}
}
}
authenticated.GET("/compose", h.compose)
authenticated.POST("/compose/create/note", h.createNote)

View File

@ -19,10 +19,13 @@ import (
)
type handler struct {
storage storage.Storage
logger *zap.Logger
domain string
sentryConfig config.SentryConfig
storage storage.Storage
logger *zap.Logger
domain string
sentryConfig config.SentryConfig
fedConfig config.FedConfig
groupConfig config.GroupConfig
webFingerQueue common.StringQueue
crawlQueue common.StringQueue
adminUser string
@ -30,7 +33,6 @@ type handler struct {
svgConverter SVGConverter
assetStorage asset.Storage
assetQueue common.StringQueue
fedConfig config.FedConfig
httpClient common.HTTPClient
metricFactory promauto.Factory

171
web/handler_groups.go Normal file
View File

@ -0,0 +1,171 @@
package web
import (
"bytes"
"crypto/rand"
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"fmt"
"net/http"
"time"
"github.com/gin-gonic/gin"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/storage"
)
func (h handler) dashboardGroups(c *gin.Context) {
data, user, session, cont := h.loggedIn(c, true)
if !cont {
return
}
ctx := c.Request.Context()
groups, err := h.storage.GroupActorsForMemberActorRowID(ctx, user.ActorID)
if err != nil {
h.hardFail(c, err)
return
}
data["groups"] = groups
data["following"] = []interface{}{}
data["pending_following"] = []interface{}{}
data["invitations"] = []interface{}{}
if err := session.Save(); err != nil {
h.hardFail(c, errors.NewCannotSaveSessionError(err))
return
}
c.HTML(http.StatusOK, "groups", data)
}
func (h handler) dashboardGroupsCreate(c *gin.Context) {
_, localUser, _, cont := h.loggedIn(c, true)
if !cont {
return
}
ctx := c.Request.Context()
name := c.PostForm("name")
now := time.Now().UTC()
publishedAt := now.Format("2006-01-02T15:04:05Z")
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
localUserActor, err := h.storage.GetActor(ctx, localUser.ActorID)
if err != nil {
return err
}
available, err := tx.GroupNameAvailable(ctx, name)
if err != nil {
return err
}
if !available {
return fmt.Errorf("name not available")
}
key, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
return err
}
privateKeyBytes := x509.MarshalPKCS1PrivateKey(key)
var privateKeyBuffer bytes.Buffer
if err := pem.Encode(&privateKeyBuffer, &pem.Block{
Type: "PRIVATE KEY",
Bytes: privateKeyBytes,
}); err != nil {
return err
}
privateKey := string(privateKeyBuffer.Bytes())
publicKeyBytes, err := x509.MarshalPKIXPublicKey(key.Public())
if err != nil {
return err
}
var publicKeyBuffer bytes.Buffer
if err = pem.Encode(&publicKeyBuffer, &pem.Block{
Type: "PUBLIC KEY",
Bytes: publicKeyBytes,
}); err != nil {
return err
}
publicKey := string(publicKeyBuffer.Bytes())
groupActor := storage.ActorFromGroupInfo(name, name, h.domain, publicKey, key)
groupActorID, _ := storage.JSONString(groupActor, "id")
groupKeyID, _ := storage.JSONDeepString(groupActor, "publicKey", "id")
err = tx.CreateActor(ctx, groupActorID, groupActor)
if err != nil {
return err
}
actorRowID, err := tx.ActorRowIDForActorID(ctx, groupActorID)
if err != nil {
return err
}
groupRowID, err := tx.RecordGroup(ctx, localUser.ID, actorRowID, name, publicKey, privateKey, name, "")
if err != nil {
return err
}
err = tx.UpdateGroupAcceptFollowers(ctx, groupRowID, h.groupConfig.AllowAutoAcceptGroupFollowers)
if err != nil {
return err
}
err = tx.UpdateGroupAllowRemote(ctx, groupRowID, h.groupConfig.AllowRemoteGroupFollowers)
if err != nil {
return err
}
err = tx.UpdateGroupDefaultRole(ctx, groupRowID, storage.GroupRole(h.groupConfig.DefaultGroupMemberRole))
if err != nil {
return err
}
err = tx.RecordActorKey(ctx, actorRowID, groupKeyID, publicKey)
if err != nil {
return err
}
if err = tx.RecordActorAlias(ctx, actorRowID, fmt.Sprintf("acct:%s@%s", name, h.domain), storage.ActorAliasSubject); err != nil {
return err
}
if err = tx.RecordActorAlias(ctx, actorRowID, groupActorID, storage.ActorAliasSelf); err != nil {
return err
}
followGroup := storage.EmptyPayload()
followGroup["@context"] = "https://www.w3.org/ns/activitystreams"
followGroup["id"] = common.ActivityURL(h.domain, storage.NewV4())
followGroup["summary"] = ""
followGroup["type"] = "Follow"
followGroup["actor"] = localUserActor.ActorID
followGroup["to"] = []string{localUserActor.ActorID}
followGroup["object"] = groupActorID
followGroup["published"] = publishedAt
_, err = tx.RecordGroupMember(ctx, actorRowID, localUser.ActorID, followGroup, storage.AcceptRelationshipStatus, storage.GroupOwner)
if err != nil {
return err
}
return nil
})
if txErr != nil {
h.flashErrorOrFail(c, h.url("groups"), txErr)
return
}
c.Redirect(http.StatusFound, h.url("groups"))
}

153
web/handler_groups_actor.go Normal file
View File

@ -0,0 +1,153 @@
package web
import (
"net/http"
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/storage"
)
func (h handler) groupActorInfo(c *gin.Context) {
// accepted := common.ParseAccept(c.GetHeader("Accept"))
// if accepted.MatchHTML(true) {
// h.viewUser(c)
// return
// }
name := c.Param("name")
group, err := h.storage.GetGroupByName(c.Request.Context(), name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
groupActor, err := h.storage.GetActor(c.Request.Context(), group.ActorID)
if err != nil {
h.hardFail(c, err)
return
}
h.writeJSONLDProfile(c, http.StatusOK, groupActor.Payload)
}
func (h handler) groupActorFollowers(c *gin.Context) {
name := c.Param("name")
ctx := c.Request.Context()
group, err := h.storage.GetGroupByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
total, err := h.storage.CountGroupMembers(ctx, group.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
response := storage.EmptyPayload()
response["@context"] = "https://www.w3.org/ns/activitystreams"
actorID := storage.ActorID(common.GroupActorURL(h.domain, group.Name))
page := intParam(c, "page", 0)
if page == 0 {
response["id"] = actorID.Followers()
response["type"] = "OrderedCollection"
response["totalItems"] = total
response["first"] = actorID.FollowersPage(1)
h.writeJSONLD(c, http.StatusOK, response)
return
}
offset := (page - 1) * 20
actors, err := h.storage.GroupMemberActorIDs(ctx, group.ActorID)
if err != nil {
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
response["id"] = actorID.FollowersPage(1)
response["type"] = "OrderedCollectionPage"
response["totalItems"] = total
response["partOf"] = actorID.Followers()
if offset < total {
response["next"] = actorID.FollowersPage(page + 1)
}
if page > 1 {
response["prev"] = actorID.FollowersPage(page - 1)
}
response["orderedItems"] = actors
h.writeJSONLD(c, http.StatusOK, response)
}
func (h handler) groupActorFollowing(c *gin.Context) {
name := c.Param("name")
ctx := c.Request.Context()
group, err := h.storage.GetGroupByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
actorID := storage.ActorID(common.GroupActorURL(h.domain, group.Name))
response := storage.EmptyPayload()
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["id"] = actorID.Following()
response["type"] = "OrderedCollection"
response["totalItems"] = 0
h.writeJSONLD(c, http.StatusOK, response)
}
func (h handler) groupActorOutbox(c *gin.Context) {
ctx := c.Request.Context()
name := c.Param("name")
group, err := h.storage.GetGroupByName(ctx, name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
actorID := storage.ActorID(common.GroupActorURL(h.domain, group.Name))
response := storage.EmptyPayload()
response["@context"] = "https://www.w3.org/ns/activitystreams"
response["id"] = actorID.Outbox()
response["type"] = "OrderedCollection"
response["totalItems"] = 0
h.writeJSONLD(c, http.StatusOK, response)
}

764
web/handler_groups_inbox.go Normal file
View File

@ -0,0 +1,764 @@
package web
import (
"fmt"
"io"
"io/ioutil"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/gofrs/uuid"
"github.com/kr/pretty"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/fed"
"github.com/ngerakines/tavern/storage"
)
func (h handler) groupActorInbox(c *gin.Context) {
name := c.Param("name")
group, err := h.storage.GetGroupByName(c.Request.Context(), name)
if err != nil {
if errors.Is(err, errors.NewNotFoundError(nil)) {
h.notFoundJSON(c, nil)
return
}
h.internalServerErrorJSON(c, err, zap.String("name", name))
return
}
defer c.Request.Body.Close()
body, err := ioutil.ReadAll(io.LimitReader(c.Request.Body, maxInboxRequestSize+1))
if err != nil {
h.internalServerErrorJSON(c, err)
return
}
if len(body) > maxInboxRequestSize {
h.writeJSONError(c, http.StatusRequestEntityTooLarge, err)
return
}
fmt.Println(string(body))
payload, err := storage.PayloadFromBytes(body)
if err != nil {
h.badRequestJSON(c, err)
return
}
pretty.Println(payload)
if skipActorInbox(payload) {
h.logger.Debug("group actor inbox can ignore message", zap.String("group", name))
c.Status(http.StatusOK)
return
}
payloadType, _ := storage.JSONString(payload, "type")
actor, _ := storage.JSONString(payload, "actor")
if len(actor) > 0 {
err = h.webFingerQueue.Add(actor)
if err != nil {
h.logger.Error("unable to add actor to web finger queue", zap.String("actor", actor))
}
}
switch payloadType {
case "Follow":
h.groupActorInboxFollow(c, group, payload)
case "Undo":
h.groupActorInboxUndo(c, group, payload)
case "Create":
h.groupActorInboxCreate(c, group, payload)
case "Announce":
h.groupActorInboxAnnounce(c, group, payload)
case "Invite":
h.groupActorInboxInvite(c, group, payload)
default:
h.logger.Warn("Group received unexpected payload type", zap.String("type", payloadType), zap.String("user", name))
c.Status(http.StatusOK)
}
}
func (h handler) groupActorInboxInvite(c *gin.Context, group storage.Group, payload storage.Payload) {
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
inviter, hasInviter := storage.JSONString(payload, "actor")
invited, hasInvited := storage.JSONString(payload, "object")
targetGroup, hasTargetGroup := storage.FirstJSONString(payload, "target")
if !hasInviter || !hasInvited || !hasTargetGroup {
c.Status(http.StatusOK)
return
}
if targetGroup != common.GroupActorURL(h.domain, group.Name) {
c.Status(http.StatusOK)
return
}
// If remote group followers is not enabled on the server, and the
// invited actor is not local, then ignore the request
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
if !group.AllowRemote && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
actorInvited, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, invited)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", invited))
h.internalServerErrorJSON(c, err)
return
}
// TODO: Verify actorInvited is a type "Person".
denied := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
actorInviter, err := tx.GetActorByActorID(ctx, inviter)
if err != nil {
return err
}
allowed, err := tx.GroupMemberCanInvite(ctx, group.ActorID, actorInviter.ID)
if err != nil {
return err
}
if !allowed {
denied = true
return nil
}
alreadyInvited, err := tx.IsActorInvitedToGroup(ctx, group.ActorID, actorInvited.ID)
if err != nil {
return err
}
if alreadyInvited {
return nil
}
isInGroup, err := tx.IsActorInGroup(ctx, group.ActorID, actorInvited.ID)
if err != nil {
return err
}
if isInGroup {
return nil
}
_, err = tx.RecordGroupInvitation(ctx, group.ActorID, actorInvited.ID)
if err != nil {
return err
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if denied {
h.unauthorizedJSON(c, nil)
return
}
c.Status(http.StatusOK)
}
func (h handler) groupActorInboxFollow(c *gin.Context, group storage.Group, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
if strings.HasPrefix(objectID, common.GroupActorURLPrefix(h.domain)) {
h.groupActorInboxFollowActor(c, group, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) groupActorInboxFollowActor(c *gin.Context, group storage.Group, payload storage.Payload) {
ctx := c.Request.Context()
theActorBeingFollowed := storage.FirstJSONDeepStrings(payload, []string{"object"}, []string{"object", "id"})
theActorFollowing, _ := storage.JSONString(payload, "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
h.logger.Debug("group actor inbox follow bailing: actor is local",
zap.String("actor", theActorFollowing),
zap.String("prefix", common.ActorURLPrefix(h.domain)))
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(theActorBeingFollowed, common.GroupActorURLPrefix(h.domain)) {
h.logger.Debug("group actor inbox follow bailing: group is not local",
zap.String("object", theActorBeingFollowed),
zap.String("prefix", common.GroupActorURLPrefix(h.domain)))
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if theActorBeingFollowed != common.GroupActorURL(h.domain, group.Name) {
h.logger.Debug("group actor inbox follow bailing: object does not match a composed group actor id",
zap.String("object", theActorBeingFollowed),
zap.String("group_actor_id", common.GroupActorURL(h.domain, group.Name)))
c.Status(http.StatusOK)
return
}
// If remote group followers is not enabled on the server, and the actor is not local ignore the request
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
h.logger.Debug("group actor inbox follow bailing: server disallows remote followers and actor is not local",
zap.Bool("AllowRemoteGroupFollowers", h.groupConfig.AllowRemoteGroupFollowers),
zap.String("actor", theActorFollowing),
zap.String("prefix", common.ActorURLPrefix(h.domain)))
c.Status(http.StatusOK)
return
}
if !group.AllowRemote && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
h.logger.Debug("group actor inbox follow bailing: group disallows remote followers and actor is not local",
zap.Bool("AllowRemoteGroupFollowers", group.AllowRemote),
zap.String("actor", theActorFollowing),
zap.String("prefix", common.ActorURLPrefix(h.domain)))
c.Status(http.StatusOK)
return
}
// This just ensures we have a reference of the actor and it's keys for
// the verify signature step.
theActorFollowingA, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, theActorFollowing)
if err != nil {
h.logger.Error("unable to fetch actor", zap.Error(err), zap.String("actor", theActorFollowing))
h.internalServerErrorJSON(c, err)
return
}
if err := h.verifySignature(c); err != nil {
h.logger.Debug("signature verification failed", zap.Error(err))
h.unauthorizedJSON(c, err)
return
}
var theActorFollowingRowID uuid.UUID
invited := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
theActorFollowingRowID, err = tx.ActorRowIDForActorID(ctx, theActorFollowing)
if err != nil {
return err
}
_, err = tx.RecordGroupMember(ctx, group.ActorID, theActorFollowingRowID, payload, storage.PendingRelationshipStatus, group.DefaultMemberRole)
if err != nil {
return err
}
invited, err = tx.IsActorInvitedToGroup(ctx, group.ActorID, theActorFollowingRowID)
if err != nil {
return err
}
return nil
})
if txErr != nil {
h.logger.Debug("error performing lookup transaction", zap.Error(txErr), zap.Strings("error_chain", errors.ErrorChain(txErr)))
h.internalServerErrorJSON(c, txErr)
return
}
if invited || (h.fedConfig.AllowAutoAcceptFollowers && group.AcceptFollowers) {
var groupActor *storage.Actor
txErr = storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}
err = tx.UpdateGroupMemberStatus(ctx, groupActor.ID, theActorFollowingRowID, storage.AcceptRelationshipStatus)
if err != nil {
return err
}
if invited {
err = tx.RemoveGroupInvitation(ctx, groupActor.ID, theActorFollowingRowID)
if err != nil {
return err
}
}
return nil
})
if txErr != nil {
h.logger.Debug("error performing notify transaction", zap.Error(txErr), zap.Strings("error_chain", errors.ErrorChain(txErr)))
h.internalServerErrorJSON(c, txErr)
return
}
if h.publisherClient != nil {
acceptActivity := storage.EmptyPayload()
acceptActivityID := common.ActivityURL(h.domain, storage.NewV4())
acceptActivity["@context"] = "https://www.w3.org/ns/activitystreams"
acceptActivity["actor"] = groupActor.ActorID
acceptActivity["id"] = acceptActivityID
acceptActivity["object"] = payload
acceptActivity["to"] = theActorFollowing
acceptActivity["type"] = "Accept"
acceptActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
acceptPayload := acceptActivity.Bytes()
err = h.publisherClient.Send(ctx, theActorFollowingA.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, acceptActivityID, string(acceptPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", theActorFollowingA.GetID()), zap.String("activity", acceptActivityID), zap.Error(err))
c.Status(http.StatusOK)
return
}
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
if err != nil {
h.logger.Error("failed getting group members", zap.Error(err))
c.Status(http.StatusOK)
return
}
for _, follower := range followers {
joinActivity := storage.EmptyPayload()
joinActivityID := common.ActivityURL(h.domain, storage.NewV4())
joinActivity["@context"] = "https://www.w3.org/ns/activitystreams"
joinActivity["id"] = joinActivityID
joinActivity["summary"] = ""
joinActivity["type"] = "Join"
joinActivity["actor"] = theActorFollowing
joinActivity["object"] = groupActor.ActorID
joinActivity["to"] = follower.ActorID
joinActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
joinPayload := joinActivity.Bytes()
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, joinActivityID, string(joinPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", joinActivityID), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}
func (h handler) groupActorInboxUndoFollow(c *gin.Context, group storage.Group, payload storage.Payload) {
objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
if strings.HasPrefix(objectID, common.GroupActorURLPrefix(h.domain)) {
h.groupActorInboxUndoFollowActor(c, group, payload)
} else {
c.Status(http.StatusOK)
}
}
func (h handler) groupActorInboxUndoFollowActor(c *gin.Context, group storage.Group, payload storage.Payload) {
ctx := c.Request.Context()
// objectID := storage.FirstJSONDeepStrings(payload, []string{"object", "object"}, []string{"object", "object", "id"})
actorFollowed, _ := storage.JSONDeepString(payload, "object", "object")
actorFollowing, _ := storage.JSONDeepString(payload, "object", "actor")
// If the actor that is doing the following is local, bail.
if strings.HasPrefix(actorFollowing, common.ActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If the actor that is being followed is not local, bail. Assume it is
// forwarded or just informational.
if !strings.HasPrefix(actorFollowed, common.GroupActorURLPrefix(h.domain)) {
c.Status(http.StatusOK)
return
}
// If user is provided and the actor being followed is not the user,
// bail. Assume it is forwarded or just informational.
if actorFollowed != common.GroupActorURL(h.domain, group.Name) {
c.Status(http.StatusOK)
return
}
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
var groupActor *storage.Actor
txErr := storage.TransactionalStorage(c.Request.Context(), h.storage, func(tx storage.Storage) error {
var err error
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}
actorFollowingRowID, err := tx.ActorRowIDForActorID(c.Request.Context(), actorFollowing)
if err != nil {
return err
}
return tx.RemoveGroupMember(ctx, group.ActorID, actorFollowingRowID)
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if h.publisherClient != nil {
followers, err := h.storage.GroupMemberActorsForGroupActorID(ctx, group.ID)
for _, follower := range followers {
leaveActivity := storage.EmptyPayload()
leaveActivityID := common.ActivityURL(h.domain, storage.NewV4())
leaveActivity["@context"] = "https://www.w3.org/ns/activitystreams"
leaveActivity["summary"] = ""
leaveActivity["type"] = "Leave"
leaveActivity["id"] = leaveActivityID
leaveActivity["actor"] = actorFollowing
leaveActivity["object"] = groupActor.ActorID
leaveActivity["to"] = follower.ActorID
leaveActivity["published"] = time.Now().UTC().Format("2006-01-02T15:04:05Z")
joinPayload := leaveActivity.Bytes()
err = h.publisherClient.Send(ctx, follower.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, leaveActivityID, string(joinPayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", follower.GetID()), zap.String("activity", leaveActivityID), zap.Error(err))
}
}
}
c.Status(http.StatusOK)
}
func (h handler) groupActorInboxUndo(c *gin.Context, group storage.Group, payload storage.Payload) {
innerType, _ := storage.JSONDeepString(payload, "object", "type")
switch innerType {
case "Follow":
h.groupActorInboxUndoFollow(c, group, payload)
default:
h.logger.Warn("User received unexpected undo payload type", zap.String("type", innerType), zap.String("group", group.Name))
c.Status(http.StatusOK)
}
}
func (h handler) groupActorInboxCreate(c *gin.Context, group storage.Group, payload storage.Payload) {
// Because actors must follow the group, it is safe to assume that we
// have actor and actor key records for all valid incoming activities.
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
activityID, hasActivityID := storage.JSONString(payload, "id")
actorID, hasActorID := storage.JSONString(payload, "actor")
activityObjectID, hasActivityObjectID := storage.JSONDeepString(payload, "object", "id")
activityObject, hasActivityObject := storage.JSONMap(payload, "object")
// activityObjectType, hasActivityObjectType := storage.JSONDeepString(payload, "object", "type")
if !hasActivityID || !hasActorID || !hasActivityObjectID || !hasActivityObject {
h.logger.Info("ignoring invalid activity",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.String("group", group.Name),
)
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
var groupActor *storage.Actor
var notifyActors []*storage.Actor
denied := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityObjectID)
if err != nil {
return err
}
activityActorRowID, err := tx.ActorRowIDForActorID(ctx, actorID)
if err != nil {
// This shouldn't actually happen because the actor passed the validate activity phase.
if errors.Is(err, errors.NewNotFoundError(nil)) {
denied = true
return nil
}
return err
}
allowed, err := tx.GroupMemberCanSubmit(ctx, group.ActorID, activityActorRowID)
if err != nil {
return err
}
if !allowed {
denied = true
return nil
}
// If we've seen the activity before, the upsert operations for
// recording the object event, object, user feed, and object reply
// actions should not result in new records being created. However,
// we avoid the inbox-forwarding behavior.
activityObjectRowID, err := tx.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
return err
}
_, err = tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
since, err := tx.MinutesSinceGroupBoost(ctx, group.ActorID, activityObjectRowID)
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
return err
}
// Get the time in minutes since the object was last boosted. If the
// value is -1, we have no record of boosting the object. If the
// value is between 0 and 30, don't send anything to group followers.
if since != -1 && since < 30 {
return nil
}
if !activityExists {
notifyActors, err = tx.GroupMemberActorsForGroupActorID(ctx, group.ActorID)
if err != nil {
return err
}
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if denied {
h.unauthorizedJSON(c, nil)
return
}
if len(notifyActors) > 0 {
now := time.Now().UTC()
publishedAt := now.Format("2006-01-02T15:04:05Z")
announceID := common.ActivityURL(h.domain, storage.NewV4())
announce := storage.EmptyPayload()
announce["@context"] = "https://www.w3.org/ns/activitystreams"
announce["id"] = announceID
announce["type"] = "Announce"
announce["actor"] = groupActor.ActorID
announce["published"] = publishedAt
announce["to"] = []string{
storage.ActorID(groupActor.ActorID).Followers(),
}
announce["object"] = activityObjectID
announcePayload := announce.Bytes()
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err := h.publisherClient.Send(ctx, notifyActor.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, activityID, string(announcePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}
func (h handler) groupActorInboxAnnounce(c *gin.Context, group storage.Group, payload storage.Payload) {
if err := h.verifySignature(c); err != nil {
h.unauthorizedJSON(c, err)
return
}
activityID, hasActivityID := storage.JSONString(payload, "id")
actorID, hasActorID := storage.JSONString(payload, "actor")
activityObjectID, hasActivityObjectID := storage.JSONString(payload, "object")
if !hasActivityID || !hasActorID || !hasActivityObjectID {
h.logger.Info("ignoring invalid announce activity",
zap.String("id", activityID),
zap.String("actor", actorID),
zap.String("object_id", activityObjectID),
zap.String("group", group.Name),
)
c.Status(http.StatusOK)
return
}
ctx := c.Request.Context()
var objectPayload storage.Payload
ac := fed.ActivityClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
var groupActor *storage.Actor
var notifyActors []*storage.Actor
denied := false
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityActorRowID, err := tx.ActorRowIDForActorID(ctx, actorID)
if err != nil {
// This shouldn't actually happen because the actor passed the validate activity phase.
if errors.Is(err, errors.NewNotFoundError(nil)) {
denied = true
return nil
}
return err
}
allowed, err := tx.GroupMemberCanSubmit(ctx, group.ActorID, activityActorRowID)
if err != nil {
return err
}
if !allowed {
denied = true
return nil
}
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityID)
if err != nil {
return err
}
groupActor, err = tx.GetActor(ctx, group.ActorID)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityID))
// Nick: I don't like any of this.
var activityObjectRowID uuid.UUID
activityObjectRowID, err = tx.ObjectRowIDForObjectID(ctx, activityObjectID)
if err != nil {
// If the object is not local, and we don't have it recorded, try to fetch it.
if errors.Is(err, errors.NewNotFoundError(nil)) && !strings.HasPrefix(activityObjectID, common.ObjectURLPrefix(h.domain)) {
h.logger.Debug("object row not found and object is not local", zap.String("activity", activityID), zap.String("object", activityObjectID))
// If we are able to fetch the object and record the object, keep going
privateKey, err := group.GetPrivateKey()
if err != nil {
return err
}
_, objectPayload, err = ac.GetSignedWithKey(activityObjectID, groupActor.GetKeyID(), privateKey)
if err != nil {
return err
}
activityObjectRowID, err = tx.RecordObject(ctx, objectPayload, activityObjectID)
if err != nil {
return err
}
} else {
return err
}
}
since, err := tx.MinutesSinceGroupBoost(ctx, group.ActorID, activityObjectRowID)
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
return err
}
// Get the time in minutes since the object was last boosted. If the
// value is -1, we have no record of boosting the object. If the
// value is between 0 and 30, don't send anything to group followers.
if since != -1 && since < 30 {
return nil
}
// TODO: Verify that the user owns the object if the object is local.
_, err = tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
// If the server has inbox forwarding enabled, the replied to
// object is local, and the activity has never been seen, then get
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if !activityExists {
notifyActors, err = tx.GroupMemberActorsForGroupActorID(ctx, group.ActorID)
if err != nil {
return err
}
}
return nil
})
if txErr != nil {
h.internalServerErrorJSON(c, txErr)
return
}
if denied {
h.unauthorizedJSON(c, nil)
return
}
// TODO: If set activityIsNew to true and server enabled inbox
// forwarding and actor enabled inbox forwarding then find all
// actors to receive the activity and publish it to them.
if len(notifyActors) > 0 {
now := time.Now().UTC()
publishedAt := now.Format("2006-01-02T15:04:05Z")
announceID := common.ActivityURL(h.domain, storage.NewV4())
announce := storage.EmptyPayload()
announce["@context"] = "https://www.w3.org/ns/activitystreams"
announce["id"] = announceID
announce["type"] = "Announce"
announce["actor"] = groupActor.ActorID
announce["published"] = publishedAt
announce["to"] = []string{
storage.ActorID(groupActor.ActorID).Followers(),
}
announce["object"] = activityObjectID
announcePayload := announce.Bytes()
for _, notifyActor := range notifyActors {
if h.publisherClient != nil {
err := h.publisherClient.Send(ctx, notifyActor.GetInbox(), groupActor.GetKeyID(), group.PrivateKey, activityID, string(announcePayload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", notifyActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
}
}
}
c.Status(http.StatusOK)
}

View File

@ -9,6 +9,7 @@ import (
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/fed"
"github.com/ngerakines/tavern/storage"
@ -85,8 +86,20 @@ func (h handler) dashboardNetwork(c *gin.Context) {
return
}
data["following"] = following
data["pending_following"] = pendingFollowing
groups, err := h.storage.FilterGroupsByActorID(ctx, append(following, pendingFollowing...))
if err != nil {
h.hardFail(c, err)
return
}
followingActors := common.FilterStrings(following, common.StringsExcludeFF(groups))
pendingFollowingActors := common.FilterStrings(pendingFollowing, common.StringsExcludeFF(groups))
data["following"] = followingActors
data["pending_following"] = pendingFollowingActors
data["groups"] = common.FilterStrings(groups, common.StringsIncludeFF(following))
data["pending_groups"] = common.FilterStrings(groups, common.StringsIncludeFF(pendingFollowing))
c.HTML(http.StatusOK, "network", data)
}
@ -126,15 +139,12 @@ func (h handler) networkFollow(c *gin.Context) {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
if isFollowing {
c.Redirect(http.StatusFound, h.url("network"))
return
}
follow := storage.EmptyPayload()
activityID := storage.NewV4()
activityID := common.ActivityURL(h.domain, storage.NewV4())
follow["@context"] = "https://www.w3.org/ns/activitystreams"
follow["actor"] = storage.NewActorID(user.Name, h.domain)
follow["id"] = fmt.Sprintf("https://%s/activity/%s", h.domain, activityID)
follow["id"] = activityID
follow["object"] = actor.GetID()
follow["to"] = actor.GetID()
follow["type"] = "Follow"
@ -142,20 +152,29 @@ func (h handler) networkFollow(c *gin.Context) {
payload := follow.Bytes()
err = h.storage.CreatePendingFollowing(ctx, user.ID, actor.ID, follow)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
if !isFollowing {
err = h.storage.CreatePendingFollowing(ctx, user.ID, actor.ID, follow)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), actor, payload)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
if h.publisherClient != nil {
err = h.publisherClient.Send(ctx, actor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(payload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", actor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), actor, payload)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
}
c.Redirect(http.StatusFound, h.url("network"))
@ -207,10 +226,10 @@ func (h handler) networkUnfollow(c *gin.Context) {
delete(requestActivity, "@context")
undoFollow := storage.EmptyPayload()
activityID := storage.NewV4()
activityID := common.ActivityURL(h.domain, storage.NewV4())
undoFollow["@context"] = "https://www.w3.org/ns/activitystreams"
undoFollow["actor"] = storage.NewActorID(user.Name, h.domain)
undoFollow["id"] = fmt.Sprintf("https://%s/activity/%s", h.domain, activityID)
undoFollow["id"] = activityID
undoFollow["object"] = requestActivity
undoFollow["to"] = to
undoFollow["type"] = "Undo"
@ -218,26 +237,27 @@ func (h handler) networkUnfollow(c *gin.Context) {
payload := undoFollow.Bytes()
actor, err := fed.GetOrFetchActor(ctx, h.storage, h.logger, h.httpClient, to)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
err = h.storage.RemoveFollowing(ctx, user.ID, targetActor.ID)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), actor, payload)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
if h.publisherClient != nil {
err = h.publisherClient.Send(ctx, targetActor.GetInbox(), userActor.GetKeyID(), user.PrivateKey, activityID, string(payload))
if err != nil {
h.logger.Error("failed sending to actor", zap.String("target", targetActor.GetID()), zap.String("activity", activityID), zap.Error(err))
}
} else {
nc := fed.ActorClient{
HTTPClient: h.httpClient,
Logger: h.logger,
}
err = nc.SendToInbox(ctx, h.userActor(user, userActor), targetActor, payload)
if err != nil {
h.flashErrorOrFail(c, h.url("network"), err)
return
}
}
c.Redirect(http.StatusFound, h.url("network"))

View File

@ -12,7 +12,7 @@ import (
"github.com/ngerakines/tavern/storage"
)
func (h handler) actorInfo(c *gin.Context) {
func (h handler) userActorInfo(c *gin.Context) {
accepted := common.ParseAccept(c.GetHeader("Accept"))
if accepted.MatchHTML(true) {
h.viewUser(c)
@ -40,7 +40,7 @@ func (h handler) actorInfo(c *gin.Context) {
h.writeJSONLDProfile(c, http.StatusOK, userActor.Payload)
}
func (h handler) actorFollowers(c *gin.Context) {
func (h handler) userActorFollowers(c *gin.Context) {
name := c.Param("name")
ctx := c.Request.Context()
@ -101,7 +101,7 @@ func (h handler) actorFollowers(c *gin.Context) {
h.writeJSONLD(c, http.StatusOK, response)
}
func (h handler) actorFollowing(c *gin.Context) {
func (h handler) userActorFollowing(c *gin.Context) {
name := c.Param("name")
ctx := c.Request.Context()
@ -162,7 +162,7 @@ func (h handler) actorFollowing(c *gin.Context) {
h.writeJSONLD(c, http.StatusOK, response)
}
func (h handler) actorOutbox(c *gin.Context) {
func (h handler) userActorOutbox(c *gin.Context) {
ctx := c.Request.Context()
name := c.Param("name")

View File

@ -23,7 +23,7 @@ import (
const maxInboxRequestSize = 1 * 1024 * 1024
func (h handler) actorInbox(c *gin.Context) {
func (h handler) userActorInbox(c *gin.Context) {
name := c.Param("name")
user, err := h.storage.GetUserByName(c.Request.Context(), name)
@ -553,8 +553,8 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
var notifyActors []*storage.Actor
err = storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityObjectID)
err = storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityObjectID)
if err != nil {
return err
}
@ -564,22 +564,22 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
// actions should not result in new records being created. However,
// we avoid the inbox-forwarding behavior.
activityObjectRowID, err := storage.RecordObject(ctx, activityObject, activityObjectID)
activityObjectRowID, err := tx.RecordObject(ctx, activityObject, activityObjectID)
if err != nil {
return err
}
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
activityRowID, err := tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
_, err = tx.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
if hasInReplyTo {
replyRowID, err := storage.ObjectRowIDForObjectID(ctx, inReplyTo)
replyRowID, err := tx.ObjectRowIDForObjectID(ctx, inReplyTo)
if err == nil {
_, err = storage.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
_, err = tx.RecordObjectReply(ctx, activityObjectRowID, replyRowID)
if err != nil {
return err
}
@ -590,7 +590,7 @@ func (h handler) actorInboxCreate(c *gin.Context, user *storage.User, payload st
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && strings.HasPrefix(inReplyTo, common.ObjectURLPrefix(h.domain)) && !activityExists {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, replyRowID)
notifyActors, err = tx.ActorsSubscribedToObject(ctx, replyRowID)
if err != nil {
return err
}
@ -701,18 +701,26 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
Logger: h.logger,
}
var senderActor *storage.Actor
var notifyActors []*storage.Actor
txErr := storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityID)
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityID)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityID))
senderActor, err = tx.GetActorByActorID(ctx, actorID)
if err != nil {
return err
}
actorType, _ := storage.JSONString(senderActor.Payload, "type")
isGroup := actorType == "Group"
// Nick: I don't like any of this.
var activityObjectRowID uuid.UUID
activityObjectRowID, err = storage.ObjectRowIDForObjectID(ctx, activityObjectID)
activityObjectRowID, err = tx.ObjectRowIDForObjectID(ctx, activityObjectID)
if err != nil {
// If the object is not local, and we don't have it recorded, try to fetch it.
if errors.Is(err, errors.NewNotFoundError(nil)) && !strings.HasPrefix(activityObjectID, common.ObjectURLPrefix(h.domain)) {
@ -722,7 +730,7 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
if err != nil {
return err
}
activityObjectRowID, err = storage.RecordObject(ctx, objectPayload, activityObjectID)
activityObjectRowID, err = tx.RecordObject(ctx, objectPayload, activityObjectID)
if err != nil {
return err
}
@ -734,11 +742,11 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
// TODO: Verify that the user owns the object if the object is local.
activityRowID, err := storage.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
activityRowID, err := tx.RecordObjectEvent(ctx, activityID, activityObjectRowID, payload)
if err != nil {
return err
}
_, err = storage.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
_, err = tx.RecordUserFeed(ctx, activityRowID, activityObjectRowID, user.ID)
if err != nil {
return err
}
@ -747,8 +755,8 @@ func (h handler) actorInboxAnnounce(c *gin.Context, user *storage.User, payload
// a list of all the actors that the object should be forwarded
// to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, activityObjectRowID)
if h.fedConfig.AllowInboxForwarding && !activityExists && !isGroup {
notifyActors, err = tx.ActorsSubscribedToObject(ctx, activityObjectRowID)
if err != nil {
return err
}
@ -846,25 +854,25 @@ func (h handler) actorInboxDelete(c *gin.Context, user *storage.User, payload st
var notifyActors []*storage.Actor
txErr := storage.TransactionalStorage(ctx, h.storage, func(storage storage.Storage) error {
txErr := storage.TransactionalStorage(ctx, h.storage, func(tx storage.Storage) error {
activityExists, err := storage.ActivityExistsByActivityID(ctx, activityURL)
activityExists, err := tx.ActivityExistsByActivityID(ctx, activityURL)
if err != nil {
return err
}
h.logger.Debug("storing announce", zap.Bool("exists", activityExists), zap.String("activity", activityURL))
err = storage.UpdateObjectPayload(ctx, objectRowID, tombstone)
err = tx.UpdateObjectPayload(ctx, objectRowID, tombstone)
if err != nil {
return err
}
_, err = storage.RecordObjectEvent(ctx, activityURL, objectRowID, payload)
_, err = tx.RecordObjectEvent(ctx, activityURL, objectRowID, payload)
if err != nil {
return err
}
parentObjectRowID, err := storage.ParentObjectID(ctx, objectRowID)
parentObjectRowID, err := tx.ParentObjectID(ctx, objectRowID)
// If there was an error and it isn't a not-found error, fail.
if err != nil && !errors.Is(err, errors.NewNotFoundError(nil)) {
return err
@ -875,7 +883,7 @@ func (h handler) actorInboxDelete(c *gin.Context, user *storage.User, payload st
// should be forwarded to.
// TODO: Move this action to a worker. It could be a big list.
if h.fedConfig.AllowInboxForwarding && !activityExists && parentObjectRowID != uuid.Nil {
notifyActors, err = storage.ActorsSubscribedToObject(ctx, parentObjectRowID)
notifyActors, err = tx.ActorsSubscribedToObject(ctx, parentObjectRowID)
if err != nil {
return err
}

View File

@ -8,6 +8,7 @@ import (
"github.com/gin-gonic/gin"
"go.uber.org/zap"
"github.com/ngerakines/tavern/common"
"github.com/ngerakines/tavern/errors"
"github.com/ngerakines/tavern/g"
"github.com/ngerakines/tavern/storage"
@ -67,16 +68,17 @@ func (h handler) nodeInfoDetails(c *gin.Context) {
}
func (h handler) webFinger(c *gin.Context) {
username, domain, err := fingerUserDomain(c.Query("resource"), h.domain)
if err != nil {
h.logger.Error("error parsing resource", zap.Error(err))
resource := c.Query("resource")
if !strings.HasPrefix(resource, common.DomainPrefix(h.domain)) && (strings.HasPrefix(resource, "acct:") && !strings.HasSuffix(resource, fmt.Sprintf("@%s", h.domain))) {
h.logger.Debug("request for non-local resource received", zap.String("resource", resource))
h.writeJRD(c, http.StatusNotFound, nil)
return
}
h.logger.Info("webfinger request", zap.String("resource", c.Query("resource")))
user, err := h.storage.GetUserByName(c.Request.Context(), username)
foundActor, err := h.storage.GetActorByAlias(c.Request.Context(), resource)
if err != nil {
if errors.Is(err, errors.NotFoundError{}) {
h.notFoundJSON(c, nil)
@ -87,34 +89,28 @@ func (h handler) webFinger(c *gin.Context) {
}
response := storage.EmptyPayload()
response["subject"] = fmt.Sprintf("acct:%s@%s", user.Name, domain)
response["subject"] = fmt.Sprintf("acct:%s@%s", foundActor.PreferredUsername, h.domain)
response["aliases"] = []string{
fmt.Sprintf("https://%s/users/%s", domain, user.Name),
foundActor.ActorID,
}
self := storage.EmptyPayload()
self["rel"] = "self"
self["type"] = `application/activity+json`
self["href"] = fmt.Sprintf("https://%s/users/%s", domain, user.Name)
self["href"] = foundActor.ActorID
response["links"] = []storage.Payload{self}
icon := storage.EmptyPayload()
icon["type"] = "Image"
icon["mediaType"] = "image/png"
icon["url"] = fmt.Sprintf("https://%s/avatar/png/%s", h.domain, user.Name)
self["icon"] = icon
h.writeJRD(c, http.StatusOK, response)
}
func fingerUserDomain(input, domain string) (string, string, error) {
input = strings.TrimPrefix(input, "acct:")
domainPrefix := fmt.Sprintf("https://%s/users/", domain)
domainPrefix := fmt.Sprintf("https://%s/", domain)
if strings.HasPrefix(input, domainPrefix) {
user := strings.TrimPrefix(input, domainPrefix)
return user, domain, nil
name := strings.TrimPrefix(input, common.GroupActorURLPrefix(domain))
name = strings.TrimPrefix(name, common.ActorURLPrefix(domain))
return name, domain, nil
}
parts := strings.FieldsFunc(input, func(r rune) bool {
@ -126,9 +122,9 @@ func fingerUserDomain(input, domain string) (string, string, error) {
if parts[1] != domain {
return "", "", errors.New("malformed resource parameter")
}
user := strings.TrimSpace(parts[0])
if len(user) == 0 {
name := strings.TrimSpace(parts[0])
if len(name) == 0 {
return "", "", errors.New("malformed resource parameter")
}
return user, domain, nil
return name, domain, nil
}

View File

@ -125,15 +125,33 @@ func tmplUrlGen(siteBase string) func(parts ...interface{}) string {
return fmt.Sprintf("%s/thread/%s", siteBase, parts[1])
case "network":
return fmt.Sprintf("%s/network", siteBase)
return fmt.Sprintf("%s/manage/network", siteBase)
case "network_follow":
return fmt.Sprintf("%s/network/follow", siteBase)
return fmt.Sprintf("%s/manage/network/follow", siteBase)
case "network_unfollow":
return fmt.Sprintf("%s/network/unfollow", siteBase)
return fmt.Sprintf("%s/manage/network/unfollow", siteBase)
case "network_accept":
return fmt.Sprintf("%s/network/accept", siteBase)
return fmt.Sprintf("%s/manage/network/accept", siteBase)
case "network_reject":
return fmt.Sprintf("%s/network/reject", siteBase)
return fmt.Sprintf("%s/manage/network/reject", siteBase)
case "group":
return fmt.Sprintf("%s/groups/%s", siteBase, parts[1])
case "group_manage":
return fmt.Sprintf("%s/groups/%s/manage", siteBase, parts[1])
case "groups":
return fmt.Sprintf("%s/manage/groups", siteBase)
case "groups_create":
return fmt.Sprintf("%s/manage/groups/create", siteBase)
case "groups_accept":
return fmt.Sprintf("%s/manage/groups/accept", siteBase)
case "groups_reject":
return fmt.Sprintf("%s/manage/groups/reject", siteBase)
case "groups_follow":
return fmt.Sprintf("%s/manage/groups/follow", siteBase)
case "groups_unfollow":
return fmt.Sprintf("%s/manage/groups/unfollow", siteBase)
case "profile":
return fmt.Sprintf("%s/users/%s", siteBase, parts[1])