Browse Source

Adds support for changing Sleep duration

Adds a time.Duration field to the Queue struct that's used for Sleeping
workers as they poll the queue. The default is the same as before,
500 milliseconds. It's optional and does not need to be changed.
master
Noah Pederson 2 years ago
parent
commit
9b441acc87
  1. 46
      queue.go

46
queue.go

@ -21,17 +21,30 @@ const (
// Queue represents a queue
type Queue struct {
ID string
db *bolt.DB
notifier chan uint64
workers []*Worker
//ID is a unique identifier for a Queue
ID string
//db represents a handle to a bolt db
db *bolt.DB
//notifier is a chan used to signal workers there is a job to begin working
notifier chan uint64
//workeres is a list of *Workers
workers []*Worker
//shutdownFuncs are context.CancleFuncs used to signal graceful shutdown
shutdownFuncs []context.CancelFunc
wg *sync.WaitGroup
//wg is used to help gracefully shutdown workers
wg *sync.WaitGroup
//PollRate the duration to Sleep each worker before checking the queue for jobs again
//queue for jobs again.
//Default: 500 milliseconds
PollRate time.Duration
}
//Init creates a connection to the internal database and initializes the Queue type
//filepath must be a valid path to a file. It cannot be shared between instances of
//a Queue. If the file cannot be opened r/w, an error is returned.
func Init(filepath string) (*Queue, error) {
q := &Queue{ID: filepath}
q := &Queue{ID: filepath, PollRate: time.Duration(500 * time.Millisecond)}
db, err := bolt.Open(filepath+".db", 0600, nil)
if err != nil {
log.Print(err)
@ -82,6 +95,7 @@ func (q *Queue) Close() error {
return q.db.Close()
}
//registerWorkerWithContext contains the main loop for all Workers.
func (q *Queue) registerWorkerWithContext(ctx context.Context, w Worker) {
q.workers = append(q.workers, &w)
q.wg.Add(1)
@ -99,17 +113,17 @@ func (q *Queue) registerWorkerWithContext(ctx context.Context, w Worker) {
return
case jobID = <-q.notifier:
log.Printf("Receive job ID %d", jobID)
err := q.UpdateJobStatus(jobID, Uack, fmt.Sprintf("Picked up by %s", w.ID()))
err := q.updateJobStatus(jobID, Uack, fmt.Sprintf("Picked up by %s", w.ID()))
if err != nil {
log.Printf("Unable to update job status: %s", err)
continue
}
//If subsequent calls to UpdateJobStatus fail, the whole thing is probably hosed and
//If subsequent calls to updateJobStatus fail, the whole thing is probably hosed and
//it should probably do something more drastic for error handling.
job, err := q.GetJobByID(jobID)
if err != nil {
log.Printf("Error processing job: %s", err)
q.UpdateJobStatus(jobID, Failed, err.Error())
q.updateJobStatus(jobID, Failed, err.Error())
continue
}
// Call the worker func handling this job
@ -119,19 +133,19 @@ func (q *Queue) registerWorkerWithContext(ctx context.Context, w Worker) {
if ok {
//temporary error, retry
log.Printf("Received temporary error: %s. Retrying...", err.Error())
q.UpdateJobStatus(jobID, Nack, err.Error())
q.updateJobStatus(jobID, Nack, err.Error())
} else {
log.Printf("Permanent error received from worker: %s", err)
//permanent error, mark as failed
q.UpdateJobStatus(jobID, Failed, err.Error())
q.updateJobStatus(jobID, Failed, err.Error())
}
} else {
q.UpdateJobStatus(jobID, Ack, "Complete")
q.updateJobStatus(jobID, Ack, "Complete")
}
log.Printf("Finished processing job %d", jobID)
default:
//log.Printf("Worker: %s. No message to queue. Sleeping 500ms", w.ID())
time.Sleep(500 * time.Millisecond)
time.Sleep(q.PollRate)
}
}
}()
@ -176,6 +190,8 @@ func (q *Queue) PushJob(j *Job) (uint64, error) {
}
//GetJobByID returns a pointer to a Job based on the primary key identifier id
//It first checks active jobs, if it doesn't find the bucket for active jobs
//it searches in the completed jobs bucket.
func (q *Queue) GetJobByID(id uint64) (*Job, error) {
job, err := q.getJobInBucketByID(id, jobsBucketName)
if err != nil {
@ -206,8 +222,8 @@ func (q *Queue) getJobInBucketByID(id uint64, bucketName string) (*Job, error) {
return job, err
}
//UpdateJobStatus updates the processing status of a job
func (q *Queue) UpdateJobStatus(id uint64, status JobStatus, message string) error {
//updateJobStatus updates the processing status of a job
func (q *Queue) updateJobStatus(id uint64, status JobStatus, message string) error {
err := q.db.Update(func(tx *bolt.Tx) error {
jobsBucket := tx.Bucket([]byte(jobsBucketName))
completedJobsBucket := tx.Bucket([]byte(completedJobsBucketName))