mirror of https://gitlab.com/ngerakines/tavern.git
141 lines
2.5 KiB
Go
141 lines
2.5 KiB
Go
package publisher
|
|
|
|
import (
|
|
"crypto/rsa"
|
|
"crypto/x509"
|
|
"encoding/pem"
|
|
"sync"
|
|
|
|
"github.com/ngerakines/tavern/errors"
|
|
)
|
|
|
|
type queue struct {
|
|
lock sync.Mutex
|
|
front *node
|
|
}
|
|
|
|
type node struct {
|
|
Next *node
|
|
|
|
Destination string
|
|
PrivateKey *rsa.PrivateKey
|
|
KeyID string
|
|
Payloads map[string]string
|
|
}
|
|
|
|
type jobInfo struct {
|
|
Destination string
|
|
PrivateKey *rsa.PrivateKey
|
|
KeyID string
|
|
ActivityID string
|
|
Payload string
|
|
}
|
|
|
|
func (q *queue) Take() []jobInfo {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
if q.front == nil {
|
|
return nil
|
|
}
|
|
|
|
jobs := make([]jobInfo, 0)
|
|
for activityID, payload := range q.front.Payloads {
|
|
jobs = append(jobs, jobInfo{
|
|
Destination: q.front.Destination,
|
|
PrivateKey: q.front.PrivateKey,
|
|
KeyID: q.front.KeyID,
|
|
ActivityID: activityID,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
|
|
q.front = q.front.Next
|
|
|
|
return jobs
|
|
}
|
|
|
|
func (q *queue) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
|
|
q.lock.Lock()
|
|
defer q.lock.Unlock()
|
|
|
|
privateKey, err := decodePrivateKey(privateKeyPEM)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if q.front != nil {
|
|
return q.front.Add(destination, keyID, privateKeyPEM, activityID, payload)
|
|
}
|
|
|
|
q.front = &node{
|
|
Next: nil,
|
|
Destination: destination,
|
|
KeyID: keyID,
|
|
PrivateKey: privateKey,
|
|
Payloads: map[string]string{
|
|
activityID: payload,
|
|
},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *queue) prune() {
|
|
current := q.front
|
|
|
|
for current != nil {
|
|
current = current.fastForward()
|
|
current = current.Next
|
|
}
|
|
}
|
|
|
|
func (n *node) Add(destination, keyID, privateKeyPEM, activityID, payload string) error {
|
|
if n.Destination == destination {
|
|
n.Payloads[activityID] = payload
|
|
return nil
|
|
}
|
|
if n.Next != nil {
|
|
return n.Next.Add(destination, keyID, privateKeyPEM, activityID, payload)
|
|
}
|
|
|
|
privateKey, err := decodePrivateKey(privateKeyPEM)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
n.Next = &node{
|
|
Next: nil,
|
|
Destination: destination,
|
|
KeyID: keyID,
|
|
PrivateKey: privateKey,
|
|
Payloads: map[string]string{
|
|
activityID: payload,
|
|
},
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (n *node) fastForward() *node {
|
|
if n.Next == nil {
|
|
return nil
|
|
}
|
|
if len(n.Next.Payloads) > 0 {
|
|
return n.Next
|
|
}
|
|
return n.Next.fastForward()
|
|
}
|
|
|
|
func decodePrivateKey(input string) (*rsa.PrivateKey, error) {
|
|
block, _ := pem.Decode([]byte(input))
|
|
if block == nil {
|
|
return nil, errors.New("invalid RSA PEM")
|
|
}
|
|
|
|
key, err := x509.ParsePKCS1PrivateKey(block.Bytes)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return key, nil
|
|
}
|