tavern/publisher/queue.go

141 lines
2.5 KiB
Go

package publisher
import (
"crypto/rsa"
"crypto/x509"
"encoding/pem"
"sync"
"github.com/ngerakines/tavern/errors"
)
type queue struct {
lock sync.Mutex
front *node
}
type node struct {
Next *node
Destination string
PrivateKey *rsa.PrivateKey
KeyID string
Payloads map[string]string
}
type jobInfo struct {
Destination string
PrivateKey *rsa.PrivateKey
KeyID string
ActivityID string
Payload string
}
func (q *queue) Take() []jobInfo {
q.lock.Lock()
defer q.lock.Unlock()
if q.front == nil {
return nil
}
jobs := make([]jobInfo, 0)
for activityID, payload := range q.front.Payloads {
jobs = append(jobs, jobInfo{
Destination: q.front.Destination,
PrivateKey: q.front.PrivateKey,
KeyID: q.front.KeyID,
ActivityID: activityID,
Payload: payload,
})
}
q.front = q.front.Next
return jobs
}
func (q *queue) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
q.lock.Lock()
defer q.lock.Unlock()
privateKey, err := decodePrivateKey(privateKeyPEM)
if err != nil {
return err
}
if q.front != nil {
return q.front.Add(destination, keyID, privateKeyPEM, activityID, payload)
}
q.front = &node{
Next: nil,
Destination: destination,
KeyID: keyID,
PrivateKey: privateKey,
Payloads: map[string]string{
activityID: payload,
},
}
return nil
}
func (q *queue) prune() {
current := q.front
for current != nil {
current = current.fastForward()
current = current.Next
}
}
func (n *node) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
if n.Destination == destination {
n.Payloads[activityID] = payload
return nil
}
if n.Next != nil {
return n.Next.Add(destination, keyID, privateKeyPEM, activityID, payload)
}
privateKey, err := decodePrivateKey(privateKeyPEM)
if err != nil {
return err
}
n.Next = &node{
Next: nil,
Destination: destination,
KeyID: keyID,
PrivateKey: privateKey,
Payloads: map[string]string{
activityID: payload,
},
}
return nil
}
func (n *node) fastForward() *node {
if n.Next == nil {
return nil
}
if len(n.Next.Payloads) > 0 {
return n.Next
}
return n.Next.fastForward()
}
func decodePrivateKey(input string) (*rsa.PrivateKey, error) {
block, _ := pem.Decode([]byte(input))
if block == nil {
return nil, errors.New("invalid RSA PEM")
}
key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
return key, nil
}