cmd 指令&cron表达式

cmd 基础入门

Demo1

package main

import (
"fmt"
"os/exec"
)

func main() {
var (
cmd *exec.Cmd
err error
)

cmd = exec.Command("/bin/bash", "-c", "echo 1;echo2;")

// cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "echo 1")

err = cmd.Run()

fmt.Println(err)
}

在上述代码中,执行shell交互并且得到了结果,呢么本质是怎么样的呢?

  • 任务提交到golang程序;
  • golang会向linux申请pipe资源,供数据传输;
  • golang通过fork子进程,
  • 子进程将标准输入、标准输出、错误输出写入到pipe里面。
  • 子进程基于golang代码进行逻辑控制,比如执行bash命令。
  • 子进程将结果输入到pipe中
  • 又因为pipe一段连接golang程序,所以golang可以得到pipe的输出结果。
  • bash执行完毕,子进程退出,golang回收子进程资源。

Demo2

创建一个子协程,做两秒休眠后打印信息。但在第一秒休眠后将其中断。

package main

import (
"fmt"
"os/exec"
)

func main() {
var (
cmd *exec.Cmd
output []byte
err error
)

// 生成Cmd
cmd = exec.Command("/bin/bash", "-c", "sleep 5;ls -l")

// 执行了命令, 捕获了子进程的输出( pipe )
if output, err = cmd.CombinedOutput(); err != nil {
fmt.Println(err)
return
}

// 打印子进程的输出
fmt.Println(string(output))

}

context中具有一个channel ,通过cancelFunc对该channel进行了关闭行为,select {case <- ctx.done } 监听到后,kill掉相关的子进程pid;

Demo3

在上述代码基础下,使其可以做到子进程的结果告知主进程:

package main

import (
"context"
"fmt"
"os/exec"
"time"
)

type result struct {
err error
output []byte
}

func main() {
// 执行1个cmd, 让它在一个协程里去执行, 让它执行2秒: sleep 2; echo hello;
// 1秒的时候, 我们杀死cmd

var (
ctx context.Context
cancelFunc context.CancelFunc
cmd *exec.Cmd
resultChan chan *result
res *result
)

// 创建了一个结果队列
resultChan = make(chan *result, 1000)

// context: chan byte
// cancelFunc: close(chan byte)
ctx, cancelFunc = context.WithCancel(context.TODO())

go func() {
var (
output []byte
err error
)
cmd = exec.CommandContext(ctx, "/bin/bash", "-c", "sleep 2;echo hello;")

// 执行任务, 捕获输出
output, err = cmd.CombinedOutput()

// 把任务输出结果, 传给main协程
resultChan <- &result{
err: err,
output: output,
}
}()

// 继续往下走
time.Sleep(1 * time.Second)

// 取消上下文
cancelFunc()

// 在main协程里, 等待子协程的退出,并打印任务执行结果
res = <-resultChan

// 打印任务执行结果
fmt.Printf("执行结果为,err:%s,output:%s", res.err, string(res.output))
}

Cron 表达式

*****
星期

每个*代表的意思是每X;

  • 每隔5分钟执行一次: */5 * * * * echo hello > /tmp/x.log ,具体是指每个5分钟的时刻进行一次执行。
  • 第1-5分钟执行5次 :1-5 * * * * echo hello > xxxx ,具体指每个1、2、3、4、5分钟进行一次执行。
  • 每天的10点,22点执行一次:0 10,22 * * * echo bye | tail -1

在上述的定时任务中,指代每30分钟执行。

我们可以基于这样的cron表达式,形成 分钟、小时、日期、月份的枚举。

因为每30分钟执行一次,所以分钟的枚举只有一项 是30 ;
而小时、日、月 在*指代每一的情况下,枚举中包含所有的自然值;

如当前时间是 40分钟 , 10 小时 , 27 日。

  1. 时间从大到小进行判断,比如10小时,在对应的范围内,下来进行判定分;
  2. 那么分钟不在对应的枚举范围内,小时需要进到下一位枚举,既11点,需补时间20分钟;
  3. 此时时间为 00分,11小时,27日。
  4. 那么分钟不在对应的枚举范围内,下一个枚举时间为30分钟,当所有时间都符合枚举的时候,说明cron符合。

Demo1

基于cronexpr 做单个任务调度

package main

import (
"fmt"
"github.com/gorhill/cronexpr"
"time"
)

func main() {
var (
expr *cronexpr.Expression
err error
now time.Time
nextTime time.Time
)

// linux crontab
// 秒粒度, 年配置(2018-2099)
// 哪一分钟(0-59),哪小时(0-23),哪天(1-31),哪月(1-12),星期几(0-6)

// 每隔5分钟执行1次
//if expr, err = cronexpr.Parse("*/5 * * * *"); err != nil {
// fmt.Println(err)
// return
//}
// 0, 5, 10, 15, 20 ... 55
// 0, 6, 12, 18, .. 48..

if expr, err = cronexpr.Parse("*/5 * * * * * *"); err != nil {
fmt.Println(err)
return
}

// 当前时间
now = time.Now()

// 下次调度时间
nextTime = expr.Next(now)
fmt.Println(now, nextTime)

// 等待这个定时器超时
time.AfterFunc(nextTime.Sub(now), func() {
fmt.Println("被调度了:", nextTime)
})

time.Sleep(5 * time.Second)
}

Demo2

执行多个任务

package main

import (
"fmt"
"github.com/gorhill/cronexpr"
"time"
)

// 代表一个任务
type CronJob struct {
expr *cronexpr.Expression
nextTime time.Time // expr.Next(now)
}

func main() {

var (
cronJob *CronJob
expr *cronexpr.Expression
now time.Time
scheduleTable map[string]*CronJob // key: 任务的名字
)

// 需要有1个调度协程, 它定时检查所有的Cron任务, 谁过期了就执行谁
scheduleTable = make(map[string]*CronJob)
now = time.Now()

// 1, 我们定义2个cronjob
expr = cronexpr.MustParse("*/5 * * * * * *")
cronJob = &CronJob{
expr: expr,
nextTime: expr.Next(now),
}
// 任务注册到调度表
scheduleTable["job1"] = cronJob

expr = cronexpr.MustParse("*/5 * * * * * *")
cronJob = &CronJob{
expr: expr,
nextTime: expr.Next(now),
}
// 任务注册到调度表
scheduleTable["job2"] = cronJob

// 启动一个调度协程
go func() {
var (
jobName string
cronJob *CronJob
now time.Time
)

// 定时检查一下任务调度表
for {
now = time.Now()

for jobName, cronJob = range scheduleTable {
// 判断是否过期
if cronJob.nextTime.Before(now) || cronJob.nextTime.Equal(now) {
// 启动一个协程, 执行这个任务
go func(jobName string) {
fmt.Println("执行:", jobName)
}(jobName)

// 计算下一次调度时间
cronJob.nextTime = cronJob.expr.Next(now)
fmt.Println(jobName, "下次执行时间:", cronJob.nextTime)
}
}

// 睡眠100毫秒
// time.Sleep(100 * time.Millisecond)
select {
case <-time.NewTimer(100 * time.Millisecond).C: // 将在100毫秒可读,返回

}
}
}()

time.Sleep(100 * time.Second)
}

etcd入门

etcd功能介绍

  1. 数据存储在集群中的高可用K-V存储。
  2. 允许应用实时监听存储中的K-V变化。
  3. 可以容忍单点故障,并支持网络分区

在传统的存储模型中:

  • 如果存储节点是单点存储,呢么遇到宕机,即刻不可用;
  • 如果是主从架构,当主库不可用的使用,虽然可以继续基于从库来读,单主从同步时延容忍度又是新的问题。

etcd 基于抽屉理论来解决该点,所谓的抽屉理论指:

假如我们有一个30人的班级,我将一个秘密告诉其中的16位同学,呢么随便挑选16个同学中,必然有一个是知道我秘密的同学。呢么假如班里一直会有一半以上的同学正常上课,呢么我这个秘密就能正确获取;

etcd 与 Raft 的关系

  • Raft 是强一致的集群日志同步算法

  • etcd是一个分布式KV存储

  • etcd利用raft算法在集群中同步key-value的

Raft 日志概念、异常安全

名词:

replication: 日志在leader生成,向follower赋值,达到各个节点的日志序列组中一致;
term: 任期,重新选举产生的leader, term会单调递增;
log index: 日支行在日志序列的下标;

选举:

  • raft选举leader需要半数以上的节点参与;
  • 节点commit日志最多的选举为Leader;
  • commit日志同样多,则term、index越大的允许选举为leader;

quorum(大多数) 模型

在这里插入图片描述

该模型要求集群中至少有2N+1个节点;

  • 调用者向leader写入信息后,leader并不会立马同步返回给调用者,而是会follower进行同步。
  • 同步的follower+leader至少占半数以上的时候(既大于等于N+1个节点后),leader完成本地提交,此时才会返回客户端。
  • 随后leader会异步通知follower自己完成提交操作,所以该模型也是两阶段提交。

etcd相关特性

  1. 交互协议支持GRPC,内部基于ProtoBuffer;
  2. 底层存储是按key有序排列,支持顺序遍历;
  3. 因为key有序,所以etcd天然支持按目录结构高效遍历;
  4. 支持复杂事物,提供类型if…then…else…的事务能力;
  5. 基于租约机制实现key的TTL过期;
  6. etcd 支持MVCC多版本控制(提交会在version单调递增,同key维护多个历史版本),以实现watch机制;
  7. 对于多版本控制,可以执行compact命令完成删除。

watch 工作原理

lease 租约

  1. 调用者通过sdk向etcd申请一个单位时长的租约,etcd返回该租约的id;
  2. 随后调用者带着这个租约ID,向etcd申请K-V存储;
  3. K-V存储引擎与租约建立了关联,当该租约过期的时候,便会想K-V存储引擎删除该记录;
  4. 而续租面向的仍旧是租约,需要调用者想租约申请 ‘续租’ ;

etcd 功能实践

安装:为了方便学习,本地安装单机环境;

启动日志如下:

相关ctl 指令介绍 :

基础API:

1: put
在这里插入图片描述
2:get
在这里插入图片描述
3: 根据前缀查询
在这里插入图片描述
4: watch
在这里插入图片描述
第一个ctl watch 指定模糊前缀,第二个ctl 对其进行变更;于是 ctl1 收到了变更。

相关sdk指令case :

直接使用go get github.com/coreos/etcd/clientv3下载依赖

GO 代码启动测试:

package main

import (
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
)

// 客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

client = client
}

PUT操作

//用于读写etcd中的kv对
kv = clientv3.NewKV(client)

//put
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "helloOld"); err != nil {
fmt.Println(err)
} else {
fmt.Println("revision ", putResp.Header.Revision)
}

//put and get perv
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "hello", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
fmt.Println("revision ", putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Printf("prevValue : k = %s, v = %s ", string(putResp.PrevKv.Key), string(putResp.PrevKv.Value))
}
}

GET操作

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
} else {
fmt.Println(getResp.Kvs)
}

kv.Get(context.TODO(), “/cron/jobs/job1” ,OpOption) ;
Opoption中可以下达查询是否要仅查个数、是否要查当前游标下几个元素等。

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/",clientv3.WithPrefix()); err != nil {
fmt.Println(err)
} else {
fmt.Println(getResp.Kvs)
}

DEL操作

if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
if len(delResp.PrevKvs) != 0 {
for idx, kvPair = range delResp.PrevKvs {
fmt.Printf(" index =%d ,del key =%s del value =%s: ", idx, string(kvPair.Key), string(kvPair.Value))
}
}
}

del中,也同样支持追加Opoption行为,比如从某个Key开始,删除limit个;

租约

Demo1:创建一个简单的KV,并挂接租约

//通过客户端申请租约
lease = clientv3.NewLease(client)
//默认时间是秒
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}

leaseId = leaseGrantResp.ID

kv = clientv3.NewKV(client)
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "default", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入成功", putResp.Header.Revision)

for {

if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}

if getResp.Count == 0 {
fmt.Println("kv 被移除了")
} else {
fmt.Println(getResp.Kvs)
}

time.Sleep(2 * time.Second)

}

Demo2:创建一个简单的KV,并对其进行续约

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseId clientv3.LeaseID
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
kv clientv3.KV
leaseGrantResp *clientv3.LeaseGrantResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 申请一个lease(租约)
lease = clientv3.NewLease(client)

// 申请一个10秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}

// 拿到租约的ID
leaseId = leaseGrantResp.ID

// 5秒后会取消自动续租
// 续租了5秒,停止了续租,此时应还有10秒的生命周期,一共15秒
// ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}

// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else {
// 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入成功:", putResp.Header.Revision)

// 定时的看一下key过期了没有
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}

}

keepRespChan的类型是chan of *clientv3.LeaseKeepAliveResponse,在lease.KeepAlive()的返回值中被初始化过了,keepRespChan一直会是一个地址,KeepAlive函数将会close chan。此时<-keepRespChan返回nil,基于此点判断续约是否成功

监听

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
watchStartRevision int64
watcher clientv3.Watcher
watchRespChan <-chan clientv3.WatchResponse
watchResp clientv3.WatchResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// 模拟etcd中KV的变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")

time.Sleep(1 * time.Second)
}
}()

// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
fmt.Println(err)
return
}

// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}

// 当前etcd集群事务ID, 单调递增的
watchStartRevision = getResp.Header.Revision

// 创建一个watcher
watcher = clientv3.NewWatcher(client)

// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}

}

clientv3.Op Get/Put/Del 操作

同样的, 后面也可以加WithPrefix 等其他op操作。

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// 创建Op: operation
putOp = clientv3.OpPut("/cron/jobs/job8", "123123123758923")

// 执行OP
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入Revision:", opResp.Put().Header.Revision)

// 创建Op
getOp = clientv3.OpGet("/cron/jobs/job8")

// 执行OP
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}

// 打印
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}

乐观锁 case

基于etcd 的乐观锁 与 java+zookeeper/redis 的同步抢锁,整体思路一致,回答清楚下面的问题,剩下的基于上述的case学习,可以独立完成。(以后有时间可以试试写一个tryLock(Timeout time) 的方法试试)

F:若单次加锁时间为1S,但是作业任务超过了1秒,如何保证在接下来的时间仍旧作业的时候,资源依然独占?
A:在Java+redis 的分布式锁中, 可以通过当前作业线程创建相关子线程进行定时重置声明周期。而在etcd中可以通过向租约进行续约,来保证租约不过期;

F: 向redis中进行事务操作依赖lua编排指令成’原子‘执行 ,或者通过redission 才可以做到。呢么etcd如何保证事务呢?
A:天然支持,有蛮好用的API;

F:当任务执行完毕后,如何进行释放lock;
A:java+redis 中,往往是在finally中做del操作; etcd 中可以通过defer 函数,当函数退时进行租约进行取消;

F:这种分布式锁都会确保一个前提,若加锁节点宕机,相关的key或path要如何删除呢?
A:在redis中设置expire和etcd中通过租约可以达成同样的目的。

代码关键部分:

  1. 申请一个租约,并对做到两点。
    a:租约可进行续约,并且当前函数退出的时候,取消续约;
    b:租约可被取消,触发条件为当前函数退出的时候;
  2. 基于txn事务进行相关操作
/*
分布式乐观锁
*/
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
lease clientv3.Lease
leaseId clientv3.LeaseID
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
leaseGrantResp *clientv3.LeaseGrantResponse
txnResp *clientv3.TxnResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// lease实现锁自动过期:以及续约
// op操作
// txn事务: if else then

// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease = clientv3.NewLease(client)

// 申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}

// 拿到租约的ID
leaseId = leaseGrantResp.ID

// 准备一个用于取消自动续租的context
ctx, cancelFunc := context.WithCancel(context.TODO())

// 确保函数退出后, 自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)

// 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}

// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()

// if 不存在key, then 设置它, else 抢锁失败
kv = clientv3.NewKV(client)

txn := kv.Txn(context.TODO())

// 定义事务

// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败

// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}

// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}

// 2, 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)

// 3, 释放锁(取消自动续租, 释放租约)
// defer 会把租约释放掉, 关联的KV就被删除了

}

go-crontab整体架构和设计

实现目标:

实现一个分布式crontab系统。用户可以通过前端页面配置任务和cron表达式和命令来执行定时任务,相比较linux自带的crontab来说,本项目可以方便看到执行结果,且分布式部署可以避免单点问题,用户不用登陆到各个机器去配置任务,操作方便。同时用户可以通过页面查看任务执行的情况。当然,目前做的还比较简单,对任务的执行时间没有超时机制,但提供了手动的删除和强杀正在执行的任务操作。

最终效果

整体架构图

  1. 客户端请求无状态的Master集群,将任务保存到Etcd中,Master可以添加、查询任务,查询任务的执行日志
  2. 然后Etcd将任务同步到Worker集群,所有的worker都拿到全部的任务列表
  3. 通过分布式乐观锁互斥的控制多个worker争抢一个任务
  4. 然后将任务执行的日志保存在MongoDB中

系统架构

主要分为master和worker两个角色。通过etcd来作为服务发现和分布式锁的实现。MongoDB作为数据量存储日志信息,方便查询执行结果。同时也可以通过本地log日志查看模块的执行情况。
master通过跟前端交互获取用户的任务操作信息,通过与etcd交互和mongodb交互来完成建立、删除、编辑、强杀、查看健康woker节点以及查看日志等功能。
woker通过监控etcd的节点变化来执行任务的执行、强杀等操作,同时通过etcd来实现自身服务的注册功能以及吧执行结果写入MongoDB作为日志存储。

  • 利用etcd同步全量任务列表到所有的worker节点
  • 每个worker独立调度全量任务,无需和Master产生直接的RPC,避免网络故障
  • 每个worker利用分布式锁抢占,解决并发调度相同任务的问题

Master功能

  • 任务管理HTTP接口:新建、修改、查看、删除任务
  • 任务日志HTTP接口:查看任务执行历史日志
  • 任务控制HTTP接口:提供强制结束任务的接口
  • 实现web管理页面,前后端分离

任务管理

Etcd结构

/cron/jobs/任务名 -> {
name, // 任务名
command, // shell命令
cronExpr // cron表达式
}

任务日志

MongoDB结构

{
JobName string `bson:"jobName"` // 任务名
Command string `bson:"command"` // shell命令
Err string `bson:"err"` // 脚本错误
Content string `bson:"content"` // 脚本输出
TimePoint TimePoint `bson:"timePoint"` // 执行时间点
}

请求MongoDB,按任务名查看最近的执行日志

任务控制

  1. 向etcd中写入
/cron/killer/任务名 -> ""
  1. worker会监听/cron/killer/目录下的put修改操作
  2. Master将要结束的任务名put在/cron/killer/目录下,触发worker立即结束shell任务

Worker功能

任务同步

监听etcd中/cron/jobs/目录的变化,有变化就说明有添加或者修改任务

任务调度

基于cron表达式计算,触发过期任务

任务执行

协程池并发执行多任务,基于etcd分布式锁抢占

日志捕获

捕获任务执行输出,并保存到MongoDB

监听协程

  • 利用watch API,监听/cron/jobs//cron/killer/目录的变化
  • 将变化事件通过channel推送给调度协程,更新内存中的任务信息

调度协程

  • 监听任务变更event,更细内存中维护的任务列表
  • 检查cron表达式,扫描到期任务,交给执行协程运行
  • 监听任务控制event,强制中断正在执行的子进程
  • 监听任务执行result,更新内存中任务状态,投递执行日志

执行协程

  • 在etcd中抢占分布式乐观锁:/cron/lock/任务名
  • 抢占成功则通过Command类执行shell任务
  • 捕获Command输出并等待子进程结束,将执行结果投递给调度协程

日志协程

  • 监听调度协程发来的执行日志,放入一个batch中
  • 对新batch启动定时器,超时未满自动提交
  • 若batch被放满,那么就立即提交,并取消自动提交定时器

后续优化

有很多地方有待优化,比如

  • 任务执行时间的限制,可以支持配置任务执行的最大时长,超过强杀。
  • master目前虽然支持多机部署但是没有主从机制,可以实现master的选主机制,防止并发问题。只有主才能执行etcd 的”写入操作”
  • 代码结构上有一定的冗余,可以通过复用以实现精简

代码实现

Lemon-CS

参考链接

基于GO语言实现一个分布式定时任务_Lcy-CSDN博客