熟悉 ruby 的开发同学一定对 rails 以及 sidekiq 印象颇深,前者作为 ruby 语言开发的网页程序框架,提供了许多脚手架工具以及语法糖,充分提高了开发者的开发效率。而后者则作为异步任务处理系统,依赖 redis 存储任务元数据,并且按照 FIFO 的策略进行任务执行。通过 sidekiq,能够在请求处理的过程中将繁重的逻辑计算放在异步任务中,减少 IO 阻塞,提升响应速率。
而我们今天要介绍的 gocraft/work 则号称是 golang 版本的 sidekiq。与 sidekiq 类似的是,gocraft/work 也是依赖于 redis 存储任务元数据,并且也支持 schedule job 与 cron job。
快速上手
我们将用 gocraft/work 开发一个 demo,从而了解 gocraft/work 的特性。
package main
import (
"fmt"
"log"
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
)
var redisPool = &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
},
}
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)
func enqueueJob(job string, payload work.Q) {
_, err := enqueuer.Enqueue(job, payload)
if err != nil {
log.Fatal(err)
}
fmt.Println("Enqueued:", job, "with Paylod:", payload)
}
func enqueueEmail() {
enqueueJob(
"send_email",
work.Q{"address": "[email protected]", "subject": "hello world", "customer_id": 4},
)
}
func enqueueS3() {
enqueueJob(
"upload_s3",
work.Q{"bucket": "my-s3-bucket"},
)
}
在 enqueue.go 文件里,我们先实例化了一个 Enqueuer.在实例化 enqueuer 的时候我们要提供 redis 的 namespace 以及连接池.我们可以通过调用 enqueuer.Enqueue 将任务入队,在将任务入队时,我们则要提供任务的名称以及元数据。
package main
import (
"fmt"
"os"
"os/signal"
"github.com/gocraft/work"
)
type Context struct {
customerID int64
}
func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
fmt.Println("Starting job:", job.Name)
return next()
}
func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
// If there's a customer_id param, set it in the context for future middleware and handlers to use.
if _, ok := job.Args["customer_id"]; ok {
c.customerID = job.ArgInt64("customer_id")
if err := job.ArgError(); err != nil {
return err
}
}
return next()
}
func (c *Context) SendEmail(job *work.Job) error {
// Extract arguments:
addr := job.ArgString("address")
subject := job.ArgString("subject")
if err := job.ArgError(); err != nil {
return err
}
// Go ahead and send the email...
fmt.Println("Send email to:", addr, "with subject:", subject, "and customer id:", c.customerID)
return nil
}
func (c *Context) Export(job *work.Job) error {
bucket := job.ArgString("bucket")
fmt.Println("Upload:", bucket)
return nil
}
func startProcess() {
// Make a new pool. Arguments:
// Context{} is a struct that will be the context for the request.
// 10 is the max concurrency
// "my_app_namespace" is the Redis namespace
// redisPool is a Redis pool
pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
// Add middleware that will be executed for each job
pool.Middleware((*Context).Log)
pool.Middleware((*Context).FindCustomer)
// Map the name of jobs to handler functions
pool.Job("send_email", (*Context).SendEmail)
// Customize options:
pool.JobWithOptions("upload_s3", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)
// Start processing jobs
pool.Start()
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
// Stop the pool
pool.Stop()
}
在 process. go 文件里,我们声明了 middleware function。Middleware function 在每个 job 都会被执行。job work.Job 则声明了任务处理器,通过 pool.Job(“send_email”, (Context).SendEmail) 将任务处理器与对应路由注册。