You can use SDK for Golang to connect Golang applications to SchedulerX. This way, you can schedule the Processor interface on a regular basis. You cannot stop jobs because Golang runs jobs by using goroutines.
Job types
Standalone jobs
Write service code to implement the Processor
interface.
type Processor interface {
Process(ctx *processor.JobContext) (*ProcessResult, error)
}
Sample code:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"time"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Start process my task: Hello world!")
// mock execute task
time.Sleep(3 * time.Second)
ret := new(processor.ProcessResult)
ret.SetStatus(processor.InstanceStatusSucceed)
fmt.Println("[Process] End process my task: Hello world!")
return ret, nil
}
Broadcast jobs
Sharding broadcast jobs in the Java language are supported. Supported interfaces:
PreProcess: The master node runs PreProcess once before all the worker nodes run Process.
Process: Results are returned only after all worker nodes run Process.
PostProcess: After all worker nodes run Process, the master node runs PostProcess once to obtain the results of Process on all nodes.
The version of SchedulerX SDK for Golang must be 0.0.2 or later.
Sample code:
package main
import (
"fmt"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
"math/rand"
"strconv"
)
type TestBroadcast struct{}
// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
value := rand.Intn(10)
fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(value))
return ret, nil
}
// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
fmt.Println("TestBroadcastJob PreProcess")
return nil
}
// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("TestBroadcastJob PostProcess")
allTaskResults := ctx.TaskResults()
allTaskStatuses := ctx.TaskStatuses()
num := 0
for key, val := range allTaskResults {
fmt.Printf("%v:%v\n", key, val)
if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
valInt, _ := strconv.Atoi(val)
num += valInt
}
}
fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
ret := new(processor.ProcessResult)
ret.SetSucceed()
ret.SetResult(strconv.Itoa(num))
return ret, nil
}
MapReduce jobs
Map and MapReduce jobs in the Java language are supported.
The version of SchedulerX SDK for Golang must be 0.0.4 or later.
Map jobs
Write service code to implement the
MapJobProcessor
interface.package main import ( "encoding/json" "errors" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "strconv" "time" ) type TestMapJob struct { *mapjob.MapJobProcessor } func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 10 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("start root task") var messageList []interface{} for i := 1; i <= num; i++ { var str = fmt.Sprintf("id_%d", i) messageList = append(messageList, str) } fmt.Println(messageList) return mr.Map(jobCtx, messageList, "Level1Dispatch") } else if taskName == "Level1Dispatch" { var task []byte = jobCtx.Task() var str string err = json.Unmarshal(task, &str) fmt.Printf("str=%s\n", str) time.Sleep(100 * time.Millisecond) fmt.Println("Finish Process...") if str == "id_5" { return processor.NewProcessResult( processor.WithFailed(), processor.WithResult(str), ), errors.New("test") } return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(str), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil }
Register a Map job on the client.
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapJob{ mapjob.NewMapJobProcessor(), } client.RegisterTask("TestMapJob", task) select {} }
MapReduce jobs
Write service code to implement the
MapReduceJobProcessor
interface.package main import ( "encoding/json" "fmt" "github.com/alibaba/schedulerx-worker-go/processor" "github.com/alibaba/schedulerx-worker-go/processor/jobcontext" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" "github.com/alibaba/schedulerx-worker-go/processor/taskstatus" "strconv" "time" ) type OrderInfo struct { Id string `json:"id"` Value int `json:"value"` } func NewOrderInfo(id string, value int) *OrderInfo { return &OrderInfo{Id: id, Value: value} } type TestMapReduceJob struct { *mapjob.MapReduceJobProcessor } func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error { //TODO implement me panic("implement me") } // Process the MapReduce model is used to distributed scan orders for timeout confirmation func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { var ( num = 1000 err error ) taskName := jobCtx.TaskName() if jobCtx.JobParameters() != "" { num, err = strconv.Atoi(jobCtx.JobParameters()) if err != nil { return nil, err } } if mr.IsRootTask(jobCtx) { fmt.Println("start root task, taskId=%d", jobCtx.TaskId()) var orderInfos []interface{} for i := 1; i <= num; i++ { orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i)) } return mr.Map(jobCtx, orderInfos, "OrderInfo") } else if taskName == "OrderInfo" { orderInfo := new(OrderInfo) if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil { fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task()) } fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo) time.Sleep(1 * time.Millisecond) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(orderInfo.Value)), ), nil } return processor.NewProcessResult(processor.WithFailed()), nil } func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) { allTaskResults := jobCtx.TaskResults() allTaskStatuses := jobCtx.TaskStatuses() count := 0 fmt.Printf("reduce: all task count=%d\n", len(allTaskResults)) for key, val := range allTaskResults { if key == 0 { continue } if allTaskStatuses[key] == taskstatus.TaskStatusSucceed { num, err := strconv.Atoi(val) if err != nil { return nil, err } count += num } } fmt.Printf("reduce: succeed task count=%d\n", count) return processor.NewProcessResult( processor.WithSucceed(), processor.WithResult(strconv.Itoa(count)), ), nil }
Register a MapReduce job on the client.
package main import ( "github.com/alibaba/schedulerx-worker-go" "github.com/alibaba/schedulerx-worker-go/processor/mapjob" ) func main() { // This is just an example, the real configuration needs to be obtained from the platform cfg := &schedulerx.Config{ Endpoint: "acm.aliyun.com", Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af", GroupId: "xueren_sub", AppKey: "xxxxxx", } client, err := schedulerx.GetClient(cfg) if err != nil { panic(err) } task := &TestMapReduceJob{ mapjob.NewMapReduceJobProcessor(), } client.RegisterTask("TestMapReduceJob", task) select {} }
Stop jobs
Golang processors run jobs by using goroutines. You cannot stop goroutines. You can use the KillProcessor interface to kill processors.
The version of SDK for Golang must be 1.0.2 or later.
Sample code:
package main
import (
"fmt"
"time"
"github.com/alibaba/schedulerx-worker-go/processor"
"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)
var _ processor.Processor = &HelloWorld{}
type HelloWorld struct{}
var Stop = false
func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
fmt.Println("[Process] Start process my task: Hello world!")
// mock execute task
for i := 0; i < 10; i++ {
fmt.Printf("Hello%d\n", i)
time.Sleep(2 * time.Second)
if Stop {
break
}
}
ret := new(processor.ProcessResult)
ret.SetSucceed()
fmt.Println("[Process] End process my task: Hello world!")
return ret, nil
}
func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
fmt.Println("[Kill] Start kill my task: Hello world!")
Stop = true
return nil
}