All Products
Search
Document Center

SchedulerX:Golang jobs

Last Updated:Sep 10, 2024

You can use SDK for Golang to connect Golang applications to SchedulerX. This way, you can schedule the Processor interface on a regular basis. You cannot stop jobs because Golang runs jobs by using goroutines.

Job types

Standalone jobs

Write service code to implement the Processor interface.

type Processor interface {
    Process(ctx *processor.JobContext) (*ProcessResult, error)
}

Sample code:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"time"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Start process my task: Hello world!")
	// mock execute task
	time.Sleep(3 * time.Second)
	ret := new(processor.ProcessResult)
	ret.SetStatus(processor.InstanceStatusSucceed)
	fmt.Println("[Process] End process my task: Hello world!")
	return ret, nil
}

Broadcast jobs

Sharding broadcast jobs in the Java language are supported. Supported interfaces:

  • PreProcess: The master node runs PreProcess once before all the worker nodes run Process.

  • Process: Results are returned only after all worker nodes run Process.

  • PostProcess: After all worker nodes run Process, the master node runs PostProcess once to obtain the results of Process on all nodes.

Note

The version of SchedulerX SDK for Golang must be 0.0.2 or later.

Sample code:

package main

import (
	"fmt"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
	"math/rand"
	"strconv"
)

type TestBroadcast struct{}

// Process all machines would execute it.
func (t TestBroadcast) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	value := rand.Intn(10)
	fmt.Printf("Total sharding num=%d, sharding=%d, value=%d\n", ctx.ShardingNum(), ctx.ShardingId(), value)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(value))
	return ret, nil
}

// PreProcess only one machine will execute it.
func (t TestBroadcast) PreProcess(ctx *jobcontext.JobContext) error {
	fmt.Println("TestBroadcastJob PreProcess")
	return nil
}

// PostProcess only one machine will execute it.
func (t TestBroadcast) PostProcess(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("TestBroadcastJob PostProcess")
	allTaskResults := ctx.TaskResults()
	allTaskStatuses := ctx.TaskStatuses()
	num := 0

	for key, val := range allTaskResults {
		fmt.Printf("%v:%v\n", key, val)
		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
			valInt, _ := strconv.Atoi(val)
			num += valInt
		}
	}

	fmt.Printf("TestBroadcastJob PostProcess(), num=%d\n", num)
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	ret.SetResult(strconv.Itoa(num))
	return ret, nil
}

MapReduce jobs

Map and MapReduce jobs in the Java language are supported.

Note

The version of SchedulerX SDK for Golang must be 0.0.4 or later.

Map jobs

  1. Write service code to implement the MapJobProcessor interface.

    package main
    
    import (
    	"encoding/json"
    	"errors"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"strconv"
    	"time"
    )
    
    type TestMapJob struct {
    	*mapjob.MapJobProcessor
    }
    
    func (mr *TestMapJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 10
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task")
    		var messageList []interface{}
    		for i := 1; i <= num; i++ {
    			var str = fmt.Sprintf("id_%d", i)
    			messageList = append(messageList, str)
    		}
    		fmt.Println(messageList)
    		return mr.Map(jobCtx, messageList, "Level1Dispatch")
    	} else if taskName == "Level1Dispatch" {
    		var task []byte = jobCtx.Task()
    		var str string
    		err = json.Unmarshal(task, &str)
    		fmt.Printf("str=%s\n", str)
    		time.Sleep(100 * time.Millisecond)
    		fmt.Println("Finish Process...")
    		if str == "id_5" {
    			return processor.NewProcessResult(
    				processor.WithFailed(),
    				processor.WithResult(str),
    			), errors.New("test")
    		}
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(str),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
  2. Register a Map job on the client.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    	task := &TestMapJob{
    		mapjob.NewMapJobProcessor(),
    	}
    
    	client.RegisterTask("TestMapJob", task)
    	select {}
    }
    

MapReduce jobs

  1. Write service code to implement the MapReduceJobProcessor interface.

    package main
    
    import (
    	"encoding/json"
    	"fmt"
    	"github.com/alibaba/schedulerx-worker-go/processor"
    	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    	"github.com/alibaba/schedulerx-worker-go/processor/taskstatus"
    	"strconv"
    	"time"
    )
    
    type OrderInfo struct {
    	Id    string `json:"id"`
    	Value int    `json:"value"`
    }
    
    func NewOrderInfo(id string, value int) *OrderInfo {
    	return &OrderInfo{Id: id, Value: value}
    }
    
    type TestMapReduceJob struct {
    	*mapjob.MapReduceJobProcessor
    }
    
    func (mr *TestMapReduceJob) Kill(jobCtx *jobcontext.JobContext) error {
    	//TODO implement me
    	panic("implement me")
    }
    
    // Process the MapReduce model is used to distributed scan orders for timeout confirmation
    func (mr *TestMapReduceJob) Process(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	var (
    		num = 1000
    		err error
    	)
    	taskName := jobCtx.TaskName()
    
    	if jobCtx.JobParameters() != "" {
    		num, err = strconv.Atoi(jobCtx.JobParameters())
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	if mr.IsRootTask(jobCtx) {
    		fmt.Println("start root task, taskId=%d", jobCtx.TaskId())
    		var orderInfos []interface{}
    		for i := 1; i <= num; i++ {
    			orderInfos = append(orderInfos, NewOrderInfo(fmt.Sprintf("id_%d", i), i))
    		}
    		return mr.Map(jobCtx, orderInfos, "OrderInfo")
    	} else if taskName == "OrderInfo" {
    		orderInfo := new(OrderInfo)
    		if err := json.Unmarshal(jobCtx.Task(), orderInfo); err != nil {
    			fmt.Printf("task is not OrderInfo, task=%+v\n", jobCtx.Task())
    		}
    		fmt.Printf("taskId=%d, orderInfo=%+v\n", jobCtx.TaskId(), orderInfo)
    		time.Sleep(1 * time.Millisecond)
    		return processor.NewProcessResult(
    			processor.WithSucceed(),
    			processor.WithResult(strconv.Itoa(orderInfo.Value)),
    		), nil
    	}
    	return processor.NewProcessResult(processor.WithFailed()), nil
    }
    
    func (mr *TestMapReduceJob) Reduce(jobCtx *jobcontext.JobContext) (*processor.ProcessResult, error) {
    	allTaskResults := jobCtx.TaskResults()
    	allTaskStatuses := jobCtx.TaskStatuses()
    	count := 0
    	fmt.Printf("reduce: all task count=%d\n", len(allTaskResults))
    	for key, val := range allTaskResults {
    		if key == 0 {
    			continue
    		}
    		if allTaskStatuses[key] == taskstatus.TaskStatusSucceed {
    			num, err := strconv.Atoi(val)
    			if err != nil {
    				return nil, err
    			}
    			count += num
    		}
    	}
    	fmt.Printf("reduce: succeed task count=%d\n", count)
    	return processor.NewProcessResult(
    		processor.WithSucceed(),
    		processor.WithResult(strconv.Itoa(count)),
    	), nil
    }
  1. Register a MapReduce job on the client.

    package main
    
    import (
    	"github.com/alibaba/schedulerx-worker-go"
    	"github.com/alibaba/schedulerx-worker-go/processor/mapjob"
    )
    
    func main() {
    	// This is just an example, the real configuration needs to be obtained from the platform
    	cfg := &schedulerx.Config{
    		Endpoint:  "acm.aliyun.com",
    		Namespace: "433d8b23-xxx-xxx-xxx-90d4d1b9a4af",
    		GroupId:   "xueren_sub",
    		AppKey:    "xxxxxx",
    	}
    	client, err := schedulerx.GetClient(cfg)
    	if err != nil {
    		panic(err)
    	}
    
    	task := &TestMapReduceJob{
    		mapjob.NewMapReduceJobProcessor(),
    	}
        
    	client.RegisterTask("TestMapReduceJob", task)
    	select {}
    }
    

Stop jobs

Golang processors run jobs by using goroutines. You cannot stop goroutines. You can use the KillProcessor interface to kill processors.

Note

The version of SDK for Golang must be 1.0.2 or later.

Sample code:

package main

import (
	"fmt"
	"time"
	"github.com/alibaba/schedulerx-worker-go/processor"
	"github.com/alibaba/schedulerx-worker-go/processor/jobcontext"
)

var _ processor.Processor = &HelloWorld{}

type HelloWorld struct{}

var Stop = false

func (h *HelloWorld) Process(ctx *jobcontext.JobContext) (*processor.ProcessResult, error) {
	fmt.Println("[Process] Start process my task: Hello world!")
	// mock execute task
	for i := 0; i < 10; i++ {
		fmt.Printf("Hello%d\n", i)
		time.Sleep(2 * time.Second)
		if Stop {
			break
		}
	}
	ret := new(processor.ProcessResult)
	ret.SetSucceed()
	fmt.Println("[Process] End process my task: Hello world!")
	return ret, nil
}

func (h *HelloWorld) Kill(ctx *jobcontext.JobContext) error {
	fmt.Println("[Kill] Start kill my task: Hello world!")
	Stop = true
	return nil
}

References