博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于环状队列和迭代器实现分布式任务RR分配策略
阅读量:6163 次
发布时间:2019-06-21

本文共 7356 字,大约阅读时间需要 24 分钟。

hot3.png

背景

分布式任务分配

在很多运维场景下,我们都会执行一些长时间的任务,比如装机、部署环境、打包镜像等长时间任务, 而通常我们的任务节点数量通常是有限的(排除基于k8s的hpa、或者knative等自动伸缩场景)。

那么当我们有一个任务如何根据当前的worker和corrdinator和任务来进行合理的分配,分配其实也比较复杂,往复杂里面做,可以根据当前系统的负载、每个任务的执行资源消耗、当前集群的任务数量等, 这里我们就搞一个最简单的,基于任务和当前worker的RR算法

系统架构

在worker和任务队列之间,添加一层协调调度层Coordinator, 由它来根据当前集群任务的状态来进行任务的分配,同时感知当前集群worker和task的状态,协调整个集群任务的执行、终止等操作

单机实现

整体设计

members: 表示当前集群中所有的worker tasks: 就是当前的任务 Coordinator: 就是我们的协调者, 负责根据members和tasks进行任务的分配 result: 就是分配的结果

CircularIterator

CircularIterator就是我们的环状对立迭代器, 拥有两个方法, 一个是add添加member, 一个Next返回基于rr的下一个member

// CircularIterator 环状迭代器type CircularIterator struct {	list []interface{}    // 保存所有的成员变量	next int}// Next 返回下一个元素func (c *CircularIterator) Next() interface{} {	item := c.list[c.next]	c.next = (c.next + 1) % len(c.list)	return item}// Add 添加任务func (c *CircularIterator) Add(v interface{}) bool {	for _, item := range c.list {		if v == item {			return false		}	}	c.list = append(c.list, v)	return true}

Member&Task

Member就是负责执行任务的worker, 有一个AddTask方法和Execute方法负责任务的执行和添加任务 Task标识一个任务

// Member 任务组成员type Member struct {	id    int	tasks []*Task}// ID 返回当前memberIDfunc (m *Member) ID() int {	return m.id}// AddTask 为member添加任务func (m *Member) AddTask(t *Task) bool {	for _, task := range m.tasks {		if task == t {			return false		}	}	m.tasks = append(m.tasks, t)	return true}// Execute 执行任务func (m *Member) Execute() {	for _, task := range m.tasks {		fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())	}}// Task 任务type Task struct {	name string}// Execute 执行task返回结果func (t *Task) Execute() string {	return "Task " + t.name + " run success"}

Coordinator

Coordinator是协调器,负责根据 Member和task进行集群任务的协调调度

// Task 任务type Task struct {	name string}// Execute 执行task返回结果func (t *Task) Execute() string {	return "Task " + t.name + " run success"}// Coordinator 协调者type Coordinator struct {	members []*Member	tasks   []*Task}// TaskAssignments 为member分配任务func (c *Coordinator) TaskAssignments() map[int]*Member {	taskAssignments := make(map[int]*Member)	// 构建迭代器	memberIt := c.getMemberIterator()	for _, task := range c.tasks {		member := memberIt.Next().(*Member)		_, err := taskAssignments[member.ID()]		if err == false {			taskAssignments[member.ID()] = member		}		member.AddTask(task)	}	return taskAssignments}func (c *Coordinator) getMemberIterator() *CircularIterator {	// 通过当前成员, 构造成员队列	members := make([]interface{}, len(c.members))	for index, member := range c.members {		members[index] = member	}	return NewCircularIterftor(members)}// AddMember 添加member组成员func (c *Coordinator) AddMember(m *Member) bool {	for _, member := range c.members {		if member == m {			return false		}	}	c.members = append(c.members, m)	return true}// AddTask 添加任务func (c *Coordinator) AddTask(t *Task) bool {	for _, task := range c.tasks {		if task == t {			return false		}	}	c.tasks = append(c.tasks, t)	return true}

测试

我们首先创建一堆member和task, 然后调用coordinator进行任务分配,执行任务结果

coordinator := NewCoordinator()	for i := 0; i < 10; i++ {		m := &Member{id: i}		coordinator.AddMember(m)	}	for i := 0; i < 30; i++ {		t := &Task{name: fmt.Sprintf("task %d", i)}		coordinator.AddTask(t)	}	result := coordinator.TaskAssignments()	for _, member := range result {		member.Execute()	}

结果

可以看到每个worker均匀的得到任务分配

Member 6 run task Task task 6 run successMember 6 run task Task task 16 run successMember 6 run task Task task 26 run successMember 8 run task Task task 8 run successMember 8 run task Task task 18 run successMember 8 run task Task task 28 run successMember 0 run task Task task 0 run successMember 0 run task Task task 10 run successMember 0 run task Task task 20 run successMember 3 run task Task task 3 run successMember 3 run task Task task 13 run successMember 3 run task Task task 23 run successMember 4 run task Task task 4 run successMember 4 run task Task task 14 run successMember 4 run task Task task 24 run successMember 7 run task Task task 7 run successMember 7 run task Task task 17 run successMember 7 run task Task task 27 run successMember 9 run task Task task 9 run successMember 9 run task Task task 19 run successMember 9 run task Task task 29 run successMember 1 run task Task task 1 run successMember 1 run task Task task 11 run successMember 1 run task Task task 21 run successMember 2 run task Task task 2 run successMember 2 run task Task task 12 run successMember 2 run task Task task 22 run successMember 5 run task Task task 5 run successMember 5 run task Task task 15 run successMember 5 run task Task task 25 run success

完整代码

package mainimport "fmt"// CircularIterator 环状迭代器type CircularIterator struct {	list []interface{}	next int}// Next 返回下一个元素func (c *CircularIterator) Next() interface{} {	item := c.list[c.next]	c.next = (c.next + 1) % len(c.list)	return item}// Add 添加任务func (c *CircularIterator) Add(v interface{}) bool {	for _, item := range c.list {		if v == item {			return false		}	}	c.list = append(c.list, v)	return true}// Member 任务组成员type Member struct {	id    int	tasks []*Task}// ID 返回当前memberIDfunc (m *Member) ID() int {	return m.id}// AddTask 为member添加任务func (m *Member) AddTask(t *Task) bool {	for _, task := range m.tasks {		if task == t {			return false		}	}	m.tasks = append(m.tasks, t)	return true}// Execute 执行任务func (m *Member) Execute() {	for _, task := range m.tasks {		fmt.Printf("Member %d run task %s\n", m.ID(), task.Execute())	}}// Task 任务type Task struct {	name string}// Execute 执行task返回结果func (t *Task) Execute() string {	return "Task " + t.name + " run success"}// Coordinator 协调者type Coordinator struct {	members []*Member	tasks   []*Task}// TaskAssignments 为member分配任务func (c *Coordinator) TaskAssignments() map[int]*Member {	taskAssignments := make(map[int]*Member)	// 构建迭代器	memberIt := c.getMemberIterator()	for _, task := range c.tasks {		member := memberIt.Next().(*Member)		_, err := taskAssignments[member.ID()]		if err == false {			taskAssignments[member.ID()] = member		}		member.AddTask(task)	}	return taskAssignments}func (c *Coordinator) getMemberIterator() *CircularIterator {	// 通过当前成员, 构造成员队列	members := make([]interface{}, len(c.members))	for index, member := range c.members {		members[index] = member	}	return NewCircularIterftor(members)}// AddMember 添加member组成员func (c *Coordinator) AddMember(m *Member) bool {	for _, member := range c.members {		if member == m {			return false		}	}	c.members = append(c.members, m)	return true}// AddTask 添加任务func (c *Coordinator) AddTask(t *Task) bool {	for _, task := range c.tasks {		if task == t {			return false		}	}	c.tasks = append(c.tasks, t)	return true}// NewCircularIterftor 返回迭代器func NewCircularIterftor(list []interface{}) *CircularIterator {	iterator := CircularIterator{}	for _, item := range list {		iterator.Add(item)	}	return &iterator}// NewCoordinator 返回协调器func NewCoordinator() *Coordinator {	c := Coordinator{}	return &c}func main() {	coordinator := NewCoordinator()	for i := 0; i < 10; i++ {		m := &Member{id: i}		coordinator.AddMember(m)	}	for i := 0; i < 30; i++ {		t := &Task{name: fmt.Sprintf("task %d", i)}		coordinator.AddTask(t)	}	result := coordinator.TaskAssignments()	for _, member := range result {		member.Execute()	}}

总结

任务协调是一个非常复杂的事情, 内部的任务平台,虽然实现了基于任务的组合和app化,但是任务调度分配着一块,仍然没有去做,只是简单的根据树形任务去简单的做一些分支任务的执行,未来有时间再做吧,要继续研究下一个模块了

这个调度思想来源于kafka connect的DistributedHerder里面的WorkerCoordinator,感兴趣的可以看看,未完待续

更多文章可以访问

转载于:https://my.oschina.net/u/4131034/blog/3047849

你可能感兴趣的文章
MoSQL
查看>>
Hibernate多对一外键单向关联(Annotation配置)
查看>>
《CLR via C#》读书笔记 之 方法
查看>>
设计模式:组合模式(Composite Pattern)
查看>>
ContentValues 和HashTable区别
查看>>
LogicalDOC 6.6.2 发布,文档管理系统
查看>>
给PowerShell脚本传递参数
查看>>
实战2——Hadoop的日志分析
查看>>
利用FIFO进行文件拷贝一例
查看>>
Ecshop安装过程中的的问题:cls_image::gd_version()和不支持JPEG
查看>>
resmgr:cpu quantum等待事件
查看>>
一个屌丝程序猿的人生(六十六)
查看>>
Java 编码 UTF-8
查看>>
SpringMVC实战(注解)
查看>>
关于静态属性和静态函数
查看>>
进程的基本属性:进程ID、父进程ID、进程组ID、会话和控制终端
查看>>
spring+jotm+ibatis+mysql实现JTA分布式事务
查看>>
MyBatis启动:MapperStatement创建
查看>>
调查问卷相关
查看>>
eclipse启动无响应,老是加载不了revert resources,或停留在Loading workbench状态
查看>>