mirror of https://gitlab.com/ngerakines/tavern.git
Improving transaction handling and logging.
This commit is contained in:
parent
70bde416ed
commit
7aa1e4737a
|
@ -68,7 +68,7 @@ func fileCommandAction(cliCtx *cli.Context) error {
|
|||
}
|
||||
defer dbClose()
|
||||
|
||||
s := storage.DefaultStorage(db, logger)
|
||||
s := storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger})
|
||||
|
||||
a := Agent{
|
||||
AssetStorage: FileStorage{Base: "./assets/"},
|
||||
|
|
|
@ -8,10 +8,6 @@ type FedConfig struct {
|
|||
AllowFollowObject bool
|
||||
AllowAutoAcceptFollowers bool
|
||||
AllowInboxForwarding bool
|
||||
|
||||
EnableGroups bool
|
||||
AllowAutoAcceptGroupFollowers bool
|
||||
AllowRemoteGroupFollowers bool
|
||||
}
|
||||
|
||||
var AllowFollowObjectFlag = cli.BoolFlag{
|
||||
|
@ -35,37 +31,12 @@ var AllowInboxForwardingFlag = cli.BoolFlag{
|
|||
Value: false,
|
||||
}
|
||||
|
||||
var EnableGroupsFlag = cli.BoolFlag{
|
||||
Name: "enable-groups",
|
||||
Usage: "Enable groups",
|
||||
EnvVars: []string{"ENABLE_GROUPS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
var AllowAutoAcceptGroupFollowersFlag = cli.BoolFlag{
|
||||
Name: "allow-auto-accept-group-followers",
|
||||
Usage: "Allow groups to turn on automatically accepting follow requests.",
|
||||
EnvVars: []string{"ALLOW_AUTO_ACCEPT_GROUP_FOLLOWERS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
var AllowRemoteFollowersFlag = cli.BoolFlag{
|
||||
Name: "allow-remote-group-followers",
|
||||
Usage: "Allow non-local users to follow groups.",
|
||||
EnvVars: []string{"ALLOW_REMOTE_GROUP_FOLLOWERS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
func NewFedConfig(cliCtx *cli.Context) (FedConfig, error) {
|
||||
cfg := FedConfig{
|
||||
AllowAutoAcceptFollowers: cliCtx.Bool("allow-auto-accept-followers"),
|
||||
|
||||
AllowFollowObject: cliCtx.Bool("allow-reply-collection-updates"),
|
||||
AllowInboxForwarding: cliCtx.Bool("allow-inbox-forwarding"),
|
||||
|
||||
EnableGroups: cliCtx.Bool("enable-groups"),
|
||||
AllowAutoAcceptGroupFollowers: cliCtx.Bool("allow-auto-accept-group-followers"),
|
||||
AllowRemoteGroupFollowers: cliCtx.Bool("allow-remote-group-followers"),
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,49 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
type GroupConfig struct {
|
||||
EnableGroups bool
|
||||
AllowAutoAcceptGroupFollowers bool
|
||||
AllowRemoteGroupFollowers bool
|
||||
DefaultGroupMemberRole int
|
||||
}
|
||||
|
||||
var EnableGroupsFlag = cli.BoolFlag{
|
||||
Name: "enable-groups",
|
||||
Usage: "Enable groups",
|
||||
EnvVars: []string{"ENABLE_GROUPS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
var AllowAutoAcceptGroupFollowersFlag = cli.BoolFlag{
|
||||
Name: "allow-auto-accept-group-followers",
|
||||
Usage: "Allow groups to turn on automatically accepting follow requests.",
|
||||
EnvVars: []string{"ALLOW_AUTO_ACCEPT_GROUP_FOLLOWERS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
var AllowRemoteFollowersFlag = cli.BoolFlag{
|
||||
Name: "allow-remote-group-followers",
|
||||
Usage: "Allow non-local users to follow groups.",
|
||||
EnvVars: []string{"ALLOW_REMOTE_GROUP_FOLLOWERS"},
|
||||
Value: true,
|
||||
}
|
||||
|
||||
var DefaultGroupMemberRoleFlag = cli.IntFlag{
|
||||
Name: "default-group-member-role",
|
||||
Usage: "The default member role for groups.",
|
||||
EnvVars: []string{"DEFAULT_GROUP_MEMBER_ROLE"},
|
||||
Value: 0,
|
||||
}
|
||||
|
||||
func NewGroupConfig(cliCtx *cli.Context) (GroupConfig, error) {
|
||||
cfg := GroupConfig{
|
||||
EnableGroups: cliCtx.Bool("enable-groups"),
|
||||
AllowAutoAcceptGroupFollowers: cliCtx.Bool("allow-auto-accept-group-followers"),
|
||||
AllowRemoteGroupFollowers: cliCtx.Bool("allow-remote-group-followers"),
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
|
@ -105,7 +105,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
|
||||
ctx := context.Background()
|
||||
|
||||
txErr := storage.TransactionalStorage(ctx, storage.DefaultStorage(db, logger), func(s storage.Storage) error {
|
||||
txErr := storage.TransactionalStorage(ctx, storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger}), func(s storage.Storage) error {
|
||||
|
||||
name := cliCtx.String("admin-name")
|
||||
displayName := cliCtx.String("admin-displayname")
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type QueryExecute interface {
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
type TransactionSQLDriver struct {
|
||||
Driver *sql.Tx
|
||||
}
|
||||
|
||||
type LoggingSQLDriver struct {
|
||||
Driver QueryExecute
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
type SQLDriver struct {
|
||||
Driver *sql.DB
|
||||
}
|
||||
|
||||
func (d LoggingSQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
d.Logger.Debug("ExecContext", zap.String("query", query), zap.Reflect("args", args))
|
||||
return d.Driver.ExecContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d LoggingSQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
d.Logger.Debug("QueryContext", zap.String("query", query), zap.Reflect("args", args))
|
||||
return d.Driver.QueryContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d LoggingSQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
d.Logger.Debug("QueryRowContext", zap.String("query", query), zap.Reflect("args", args))
|
||||
return d.Driver.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d SQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
return d.Driver.ExecContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d SQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
return d.Driver.QueryContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d SQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
return d.Driver.QueryRowContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d TransactionSQLDriver) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
|
||||
return d.Driver.ExecContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d TransactionSQLDriver) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
|
||||
return d.Driver.QueryContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
func (d TransactionSQLDriver) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
|
||||
return d.Driver.QueryRowContext(ctx, query, args...)
|
||||
}
|
|
@ -246,8 +246,10 @@ func (s pgStorage) GroupMemberActorsForGroupActorID(ctx context.Context, groupAc
|
|||
}
|
||||
|
||||
func (s pgStorage) UpdateGroupMemberStatus(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID, status RelationshipStatus) error {
|
||||
query := "UPDATE group_members SET relationship_status = $4, updated_at = $3 WHERE group_actor_id = $1 AND member_actor_id = $2"
|
||||
now := s.now()
|
||||
_, err := s.db.ExecContext(ctx, "UPDATE group_members SET relationship_status = $4, updated_at = $3 WHERE group_actor_id = $1 AND member_actor_id = $2", groupActorRowID, memberActorRowID, now, status)
|
||||
|
||||
_, err := s.db.ExecContext(ctx, query, groupActorRowID, memberActorRowID, now, status)
|
||||
return errors.WrapUserUpdateFailedError(err)
|
||||
}
|
||||
|
||||
|
@ -258,7 +260,8 @@ func (s pgStorage) UpdateGroupMemberRole(ctx context.Context, groupActorRowID, m
|
|||
}
|
||||
|
||||
func (s pgStorage) RemoveGroupMember(ctx context.Context, groupActorRowID, memberActorRowID uuid.UUID) error {
|
||||
_, err := s.db.ExecContext(ctx, `DELETE FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2`, groupActorRowID, memberActorRowID)
|
||||
query := `DELETE FROM group_members WHERE group_actor_id = $1 AND member_actor_id = $2`
|
||||
_, err := s.db.ExecContext(ctx, query, groupActorRowID, memberActorRowID)
|
||||
return errors.WrapGroupMemberUpdateFailedError(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/gofrs/uuid"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ngerakines/tavern/common"
|
||||
"github.com/ngerakines/tavern/errors"
|
||||
|
@ -149,7 +148,6 @@ func (s pgStorage) ListObjectPayloadsInUserTagFeed(ctx context.Context, userID u
|
|||
}
|
||||
|
||||
func (s pgStorage) objectPayloads(ctx context.Context, query string, args ...interface{}) ([]Payload, error) {
|
||||
s.logger.Debug("object payloads query", zap.String("query", query), zap.Reflect("args", args))
|
||||
var results []Payload
|
||||
rows, err := s.db.QueryContext(ctx, query, args...)
|
||||
if err != nil {
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
|
||||
"github.com/gofrs/uuid"
|
||||
_ "github.com/lib/pq"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/ngerakines/tavern/errors"
|
||||
)
|
||||
|
@ -25,13 +24,6 @@ type Storage interface {
|
|||
GroupStorage
|
||||
|
||||
GetExecutor() QueryExecute
|
||||
GetLogger() *zap.Logger
|
||||
}
|
||||
|
||||
type QueryExecute interface {
|
||||
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
|
||||
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
|
||||
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
type Count struct {
|
||||
|
@ -39,18 +31,16 @@ type Count struct {
|
|||
Count int
|
||||
}
|
||||
|
||||
func DefaultStorage(db *sql.DB, logger *zap.Logger) Storage {
|
||||
func DefaultStorage(db QueryExecute) Storage {
|
||||
return pgStorage{
|
||||
db: db,
|
||||
now: defaultNowFunc,
|
||||
logger: logger,
|
||||
db: db,
|
||||
now: defaultNowFunc,
|
||||
}
|
||||
}
|
||||
|
||||
type pgStorage struct {
|
||||
db QueryExecute
|
||||
now nowFunc
|
||||
logger *zap.Logger
|
||||
db QueryExecute
|
||||
now nowFunc
|
||||
}
|
||||
|
||||
type transactionScopedWork func(db QueryExecute) error
|
||||
|
@ -61,28 +51,16 @@ func defaultNowFunc() time.Time {
|
|||
return time.Now().UTC()
|
||||
}
|
||||
|
||||
// deprecated
|
||||
func runTransactionWithOptions(db QueryExecute, txBody transactionScopedWork) error {
|
||||
realDB, ok := db.(*sql.DB)
|
||||
if !ok {
|
||||
return txBody(db)
|
||||
}
|
||||
|
||||
tx, err := realDB.Begin()
|
||||
if err != nil {
|
||||
return errors.NewDatabaseTransactionFailedError(err)
|
||||
}
|
||||
|
||||
err = txBody(tx)
|
||||
if err != nil {
|
||||
if txErr := tx.Rollback(); txErr != nil {
|
||||
return errors.NewDatabaseTransactionFailedError(txErr)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return errors.WrapDatabaseTransactionFailedError(tx.Commit())
|
||||
return txBody(db)
|
||||
}
|
||||
|
||||
func TransactionalStorage(ctx context.Context, storage Storage, txBody transactionScopedStorage) error {
|
||||
if _, ok := storage.GetExecutor().(TransactionSQLDriver); ok {
|
||||
return txBody(storage)
|
||||
}
|
||||
|
||||
executor := storage.GetExecutor()
|
||||
realDB, ok := executor.(*sql.DB)
|
||||
if !ok {
|
||||
|
@ -95,9 +73,8 @@ func TransactionalStorage(ctx context.Context, storage Storage, txBody transacti
|
|||
}
|
||||
|
||||
err = txBody(pgStorage{
|
||||
db: tx,
|
||||
now: defaultNowFunc,
|
||||
logger: storage.GetLogger(),
|
||||
db: TransactionSQLDriver{Driver: tx},
|
||||
now: defaultNowFunc,
|
||||
})
|
||||
if err != nil {
|
||||
if txErr := tx.Rollback(); txErr != nil {
|
||||
|
@ -112,10 +89,6 @@ func (s pgStorage) GetExecutor() QueryExecute {
|
|||
return s.db
|
||||
}
|
||||
|
||||
func (s pgStorage) GetLogger() *zap.Logger {
|
||||
return s.logger
|
||||
}
|
||||
|
||||
func (s pgStorage) RowCount(ctx context.Context, query string, args ...interface{}) (int, error) {
|
||||
return s.wrappedRowCount(errors.WrapQueryFailedError, ctx, query, args...)
|
||||
}
|
||||
|
@ -125,8 +98,6 @@ func (s pgStorage) rowCount(ctx context.Context, query string, args ...interface
|
|||
}
|
||||
|
||||
func (s pgStorage) wrappedRowCount(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (int, error) {
|
||||
s.logger.Debug("row count query", zap.String("query", query), zap.Reflect("args", args))
|
||||
|
||||
var total int
|
||||
|
||||
err := s.db.QueryRowContext(ctx, query, args...).Scan(&total)
|
||||
|
@ -137,8 +108,6 @@ func (s pgStorage) wrappedRowCount(ew errors.ErrorWrapper, ctx context.Context,
|
|||
}
|
||||
|
||||
func (s pgStorage) wrappedExists(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (bool, error) {
|
||||
s.logger.Debug("row count query", zap.String("query", query), zap.Reflect("args", args))
|
||||
|
||||
var total int
|
||||
err := s.db.QueryRowContext(ctx, query, args...).Scan(&total)
|
||||
if err != nil {
|
||||
|
@ -148,8 +117,6 @@ func (s pgStorage) wrappedExists(ew errors.ErrorWrapper, ctx context.Context, qu
|
|||
}
|
||||
|
||||
func (s pgStorage) wrappedSelectUUID(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (uuid.UUID, error) {
|
||||
s.logger.Debug("select UUID query", zap.String("query", query), zap.Reflect("args", args))
|
||||
|
||||
var id uuid.UUID
|
||||
err := s.db.QueryRowContext(ctx, query, args...).Scan(&id)
|
||||
if err != nil {
|
||||
|
@ -162,8 +129,6 @@ func (s pgStorage) wrappedSelectUUID(ew errors.ErrorWrapper, ctx context.Context
|
|||
}
|
||||
|
||||
func (s pgStorage) wrappedSelectInt(ew errors.ErrorWrapper, ctx context.Context, query string, args ...interface{}) (int, error) {
|
||||
s.logger.Debug("select int query", zap.String("query", query), zap.Reflect("args", args))
|
||||
|
||||
var value int
|
||||
err := s.db.QueryRowContext(ctx, query, args...).Scan(&value)
|
||||
if err != nil {
|
||||
|
|
|
@ -75,6 +75,7 @@ var Command = cli.Command{
|
|||
&config.EnableGroupsFlag,
|
||||
&config.AllowAutoAcceptGroupFollowersFlag,
|
||||
&config.AllowRemoteFollowersFlag,
|
||||
&config.DefaultGroupMemberRoleFlag,
|
||||
},
|
||||
Action: serverCommandAction,
|
||||
}
|
||||
|
@ -110,6 +111,11 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
groupConfig, err := config.NewGroupConfig(cliCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
publisherConfig := config.NewPublisherConfig(cliCtx)
|
||||
|
||||
if sentryConfig.Enabled {
|
||||
|
@ -133,7 +139,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
}
|
||||
defer dbClose()
|
||||
|
||||
s := storage.DefaultStorage(db, logger)
|
||||
s := storage.DefaultStorage(storage.LoggingSQLDriver{Driver: db, Logger: logger})
|
||||
|
||||
utrans, err := config.Trans(cliCtx)
|
||||
if err != nil {
|
||||
|
@ -252,6 +258,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
domain: domain,
|
||||
sentryConfig: sentryConfig,
|
||||
fedConfig: fedConfig,
|
||||
groupConfig: groupConfig,
|
||||
webFingerQueue: webFingerQueue,
|
||||
crawlQueue: crawlQueue,
|
||||
assetQueue: assetQueue,
|
||||
|
@ -324,7 +331,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
userActor.GET("/:name/followers", h.userActorFollowers)
|
||||
}
|
||||
|
||||
if fedConfig.EnableGroups {
|
||||
if groupConfig.EnableGroups {
|
||||
groupActor := authenticated.Group("/groups")
|
||||
{
|
||||
groupActor.GET("/:name", h.groupActorInfo)
|
||||
|
@ -354,7 +361,7 @@ func serverCommandAction(cliCtx *cli.Context) error {
|
|||
manageNetwork.POST("/reject", h.networkReject)
|
||||
}
|
||||
|
||||
if fedConfig.EnableGroups {
|
||||
if groupConfig.EnableGroups {
|
||||
manageGroups := manage.Group("/groups")
|
||||
{
|
||||
manageGroups.GET("/", h.dashboardGroups)
|
||||
|
|
|
@ -19,10 +19,13 @@ import (
|
|||
)
|
||||
|
||||
type handler struct {
|
||||
storage storage.Storage
|
||||
logger *zap.Logger
|
||||
domain string
|
||||
sentryConfig config.SentryConfig
|
||||
storage storage.Storage
|
||||
logger *zap.Logger
|
||||
domain string
|
||||
sentryConfig config.SentryConfig
|
||||
fedConfig config.FedConfig
|
||||
groupConfig config.GroupConfig
|
||||
|
||||
webFingerQueue common.StringQueue
|
||||
crawlQueue common.StringQueue
|
||||
adminUser string
|
||||
|
@ -30,7 +33,6 @@ type handler struct {
|
|||
svgConverter SVGConverter
|
||||
assetStorage asset.Storage
|
||||
assetQueue common.StringQueue
|
||||
fedConfig config.FedConfig
|
||||
|
||||
httpClient common.HTTPClient
|
||||
metricFactory promauto.Factory
|
||||
|
|
|
@ -109,7 +109,7 @@ func (h handler) groupActorInboxInvite(c *gin.Context, group storage.Group, payl
|
|||
|
||||
// If remote group followers is not enabled on the server, and the
|
||||
// invited actor is not local, then ignore the request
|
||||
if !h.fedConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
|
||||
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(invited, common.ActorURLPrefix(h.domain)) {
|
||||
c.Status(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
@ -217,7 +217,7 @@ func (h handler) groupActorInboxFollowActor(c *gin.Context, group storage.Group,
|
|||
}
|
||||
|
||||
// If remote group followers is not enabled on the server, and the actor is not local ignore the request
|
||||
if !h.fedConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
|
||||
if !h.groupConfig.AllowRemoteGroupFollowers && !strings.HasPrefix(theActorFollowing, common.ActorURLPrefix(h.domain)) {
|
||||
c.Status(http.StatusOK)
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue