goalpost is an embeddable, durable worker queue for golang
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.

301 lines
8.8 KiB

package goalpost
import (
"bytes"
"context"
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
"log"
"sync"
"time"
bolt "go.etcd.io/bbolt"
)
const (
jobsBucketName = "Jobs"
completedJobsBucketName = "CompletedJobs"
)
// Queue represents a queue
type Queue struct {
//ID is a unique identifier for a Queue
ID string
//db represents a handle to a bolt db
db *bolt.DB
//notifier is a chan used to signal workers there is a job to begin working
notifier chan uint64
//workeres is a list of *Workers
workers []*Worker
//shutdownFuncs are context.CancleFuncs used to signal graceful shutdown
shutdownFuncs []context.CancelFunc
//wg is used to help gracefully shutdown workers
wg *sync.WaitGroup
//PollRate the duration to Sleep each worker before checking the queue for jobs again
//queue for jobs again.
//Default: 500 milliseconds
PollRate time.Duration
}
//Init creates a connection to the internal database and initializes the Queue type
//filepath must be a valid path to a file. It cannot be shared between instances of
//a Queue. If the file cannot be opened r/w, an error is returned.
func Init(filepath string) (*Queue, error) {
q := &Queue{ID: filepath, PollRate: time.Duration(500 * time.Millisecond)}
db, err := bolt.Open(filepath+".db", 0600, nil)
if err != nil {
log.Print(err)
return nil, err
}
q.db = db
//Create bucket for jobs if it doesn't already exist
db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists([]byte(jobsBucketName))
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
// A second bucket so we can move completed jobs to a "archived" bucket
// to prevent unecessary scanning of old jobs when resuming from previously used queue
_, err = tx.CreateBucketIfNotExists([]byte(completedJobsBucketName))
if err != nil {
return fmt.Errorf("create bucket: %s", err)
}
return nil
})
// Make notification channels
c := make(chan uint64, 1000) //TODO: channel probably isn't the best way to handle the queue buffer
q.notifier = c
q.workers = make([]*Worker, 0)
q.shutdownFuncs = make([]context.CancelFunc, 0)
var wg sync.WaitGroup
q.wg = &wg
//resume stopped jobs
err = q.resumeUnackedJobs()
if err != nil {
log.Printf("Unable to resume jobs from bucket: %s", err)
//Don't fail out, this isn't really fatal. But maybe it should be?
}
return q, nil
}
//Close attempts to gracefull shutdown all workers in a queue and shutdown the db connection
func (q *Queue) Close() error {
for _, f := range q.shutdownFuncs {
f()
}
q.wg.Wait()
q.notifier = nil
q.workers = nil
q.shutdownFuncs = nil
return q.db.Close()
}
//registerWorkerWithContext contains the main loop for all Workers.
func (q *Queue) registerWorkerWithContext(ctx context.Context, w Worker) {
q.workers = append(q.workers, &w)
q.wg.Add(1)
log.Printf("Registering worker with ID: %s", w.ID())
//The big __main loop__ for workers.
go func() {
log.Printf("Starting up new worker...")
var jobID uint64
for {
// receive a notification from the queue chan
select {
case <-ctx.Done():
log.Printf("Received signal to shutdown worker. Exiting.")
q.wg.Done()
return
case jobID = <-q.notifier:
log.Printf("Receive job ID %d", jobID)
err := q.updateJobStatus(jobID, Uack, fmt.Sprintf("Picked up by %s", w.ID()))
if err != nil {
log.Printf("Unable to update job status: %s", err)
continue
}
//If subsequent calls to updateJobStatus fail, the whole thing is probably hosed and
//it should probably do something more drastic for error handling.
job, err := q.GetJobByID(jobID)
if err != nil {
log.Printf("Error processing job: %s", err)
q.updateJobStatus(jobID, Failed, err.Error())
continue
}
// Call the worker func handling this job
err = w.DoWork(ctx, job)
if err != nil {
_, ok := err.(RecoverableWorkerError)
if ok {
//temporary error, retry
log.Printf("Received temporary error: %s. Retrying...", err.Error())
q.updateJobStatus(jobID, Nack, err.Error())
} else {
log.Printf("Permanent error received from worker: %s", err)
//permanent error, mark as failed
q.updateJobStatus(jobID, Failed, err.Error())
}
} else {
q.updateJobStatus(jobID, Ack, "Complete")
}
log.Printf("Finished processing job %d", jobID)
default:
//log.Printf("Worker: %s. No message to queue. Sleeping 500ms", w.ID())
time.Sleep(q.PollRate)
}
}
}()
}
//RegisterWorker registers a Worker to handle queued Jobs
func (q *Queue) RegisterWorker(w Worker) {
baseCtx := context.Background()
ctx, cancelFunc := context.WithCancel(baseCtx)
q.shutdownFuncs = append(q.shutdownFuncs, cancelFunc)
q.registerWorkerWithContext(ctx, w)
}
//PushBytes wraps arbitrary binary data in a job and pushes it onto the queue
func (q *Queue) PushBytes(d []byte) (uint64, error) {
job := &Job{
Status: Uack,
Data: d,
RetryCount: 0,
}
return q.PushJob(job)
}
//PushJob pushes a job to the queue and notifies workers
// Job.ID is always overwritten
func (q *Queue) PushJob(j *Job) (uint64, error) {
var jobID uint64
err := q.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(jobsBucketName))
jobID, _ = b.NextSequence()
j.ID = jobID
log.Printf("Storing job %d for processing", jobID)
err := b.Put(intToByteArray(jobID), j.Bytes())
return err
})
if err != nil {
log.Printf("Unable to push job to queue: %s", err)
return 0, err
}
q.notifier <- jobID
return jobID, nil
}
//GetJobByID returns a pointer to a Job based on the primary key identifier id
//It first checks active jobs, if it doesn't find the bucket for active jobs
//it searches in the completed jobs bucket.
func (q *Queue) GetJobByID(id uint64) (*Job, error) {
job, err := q.getJobInBucketByID(id, jobsBucketName)
if err != nil {
return nil, err
}
if job == nil {
log.Printf("Job not found in active jobs bucket, checking complete")
job, err = q.getJobInBucketByID(id, completedJobsBucketName)
}
return job, err
}
func (q *Queue) getJobInBucketByID(id uint64, bucketName string) (*Job, error) {
var job *Job
err := q.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return errors.New("Invalid bucket")
}
jobBytes := b.Get(intToByteArray(id))
if jobBytes == nil {
log.Printf("Unknown key: %d", id)
return nil
}
job = DecodeJob(jobBytes)
return nil
})
return job, err
}
//updateJobStatus updates the processing status of a job
func (q *Queue) updateJobStatus(id uint64, status JobStatus, message string) error {
err := q.db.Update(func(tx *bolt.Tx) error {
jobsBucket := tx.Bucket([]byte(jobsBucketName))
completedJobsBucket := tx.Bucket([]byte(completedJobsBucketName))
jobBytes := jobsBucket.Get(intToByteArray(id))
job := DecodeJob(jobBytes)
job.Status = status
job.Message = message
// Move the job to the "completed" bucket if it's failed or Acked
if status == Ack || status == Failed {
log.Printf("Job is complete or failed. Moving to completed jobs bucket")
err := completedJobsBucket.Put(intToByteArray(id), job.Bytes())
if err != nil {
log.Printf("Unable to add job %d to completedJobsBucket: ", err)
// Don't return this, it's non-fatal
}
// remove the job from the 'active' bucket
err = jobsBucket.Delete(intToByteArray(id))
if err != nil {
log.Printf("Unable to remove job from activie jobs bucket: %s", err)
}
} else if status == Nack {
job.RetryCount = job.RetryCount + 1
log.Printf("Nack: retry count: %d", job.RetryCount)
err := jobsBucket.Put(intToByteArray(id), job.Bytes())
if err != nil {
log.Printf("Unable to update job in active jobs bucket: %s", err)
return err
}
} else {
err := jobsBucket.Put(intToByteArray(id), job.Bytes())
if err != nil {
log.Printf("Unable to update job in active jobs bucket: %s", err)
return err
}
}
return nil
})
//Put this job back on the queue if it was a Nack
if status == Nack && err == nil {
q.notifier <- id
}
return err
}
//resumeUnackedJobs loops through all jobs in the main job bucket and sends
//channel notifications if any are uacked or nacked
func (q *Queue) resumeUnackedJobs() error {
return q.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(jobsBucketName))
c := b.Cursor()
j := &Job{}
for k, v := c.First(); k != nil; k, v = c.Next() {
buffer := bytes.NewBuffer(v)
decoder := gob.NewDecoder(buffer)
_, err := buffer.Write(v)
if err != nil {
log.Printf("Error writing bytes to buffer: %s", err)
return err
}
decoder.Decode(j)
if j.Status == Uack || j.Status == Nack {
log.Printf("Job %d not processed. Retrying...", j.ID)
q.notifier <- j.ID
}
}
return nil
})
}
//intToByteArray small helper function for dealing with bucket record IDs
func intToByteArray(i uint64) []byte {
bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, i)
return bs
}