cmd 指令&cron表达式 cmd 基础入门 Demo1 package mainimport ( "fmt" "os/exec" ) func main () { var ( cmd *exec.Cmd err error ) cmd = exec.Command("/bin/bash" , "-c" , "echo 1;echo2;" ) err = cmd.Run() fmt.Println(err) }
在上述代码中,执行shell交互并且得到了结果,呢么本质是怎么样的呢?
任务提交到golang程序; golang会向linux申请pipe资源,供数据传输; golang通过fork子进程, 子进程将标准输入、标准输出、错误输出写入到pipe里面。 子进程基于golang代码进行逻辑控制,比如执行bash命令。 子进程将结果输入到pipe中 又因为pipe一段连接golang程序,所以golang可以得到pipe的输出结果。 bash执行完毕,子进程退出,golang回收子进程资源。
Demo2 创建一个子协程,做两秒休眠后打印信息。但在第一秒休眠后将其中断。
package mainimport ( "fmt" "os/exec" ) func main () { var ( cmd *exec.Cmd output []byte err error ) cmd = exec.Command("/bin/bash" , "-c" , "sleep 5;ls -l" ) if output, err = cmd.CombinedOutput(); err != nil { fmt.Println(err) return } fmt.Println(string (output)) }
context中具有一个channel ,通过cancelFunc对该channel进行了关闭行为,select {case <- ctx.done }
监听到后,kill掉相关的子进程pid;
Demo3 在上述代码基础下,使其可以做到子进程的结果告知主进程:
package mainimport ( "context" "fmt" "os/exec" "time" ) type result struct { err error output []byte } func main () { var ( ctx context.Context cancelFunc context.CancelFunc cmd *exec.Cmd resultChan chan *result res *result ) resultChan = make (chan *result, 1000 ) ctx, cancelFunc = context.WithCancel(context.TODO()) go func () { var ( output []byte err error ) cmd = exec.CommandContext(ctx, "/bin/bash" , "-c" , "sleep 2;echo hello;" ) output, err = cmd.CombinedOutput() resultChan <- &result{ err: err, output: output, } }() time.Sleep(1 * time.Second) cancelFunc() res = <-resultChan fmt.Printf("执行结果为,err:%s,output:%s" , res.err, string (res.output)) }
Cron 表达式 每个*
代表的意思是每X;
每隔5分钟执行一次: */5 * * * * echo hello > /tmp/x.log
,具体是指每个5分钟的时刻进行一次执行。 第1-5分钟执行5次 :1-5 * * * * echo hello > xxxx
,具体指每个1、2、3、4、5分钟进行一次执行。 每天的10点,22点执行一次:0 10,22 * * * echo bye | tail -1
在上述的定时任务中,指代每30分钟执行。
我们可以基于这样的cron表达式,形成 分钟、小时、日期、月份的枚举。
因为每30分钟执行一次,所以分钟的枚举只有一项 是30 ; 而小时、日、月 在*指代每一的情况下,枚举中包含所有的自然值;
如当前时间是 40分钟 , 10 小时 , 27 日。
时间从大到小进行判断,比如10小时,在对应的范围内,下来进行判定分; 那么分钟不在对应的枚举范围内,小时需要进到下一位枚举,既11点,需补时间20分钟; 此时时间为 00分,11小时,27日。 那么分钟不在对应的枚举范围内,下一个枚举时间为30分钟,当所有时间都符合枚举的时候,说明cron符合。 Demo1 基于cronexpr 做单个任务调度
package mainimport ( "fmt" "github.com/gorhill/cronexpr" "time" ) func main () { var ( expr *cronexpr.Expression err error now time.Time nextTime time.Time ) if expr, err = cronexpr.Parse("*/5 * * * * * *" ); err != nil { fmt.Println(err) return } now = time.Now() nextTime = expr.Next(now) fmt.Println(now, nextTime) time.AfterFunc(nextTime.Sub(now), func () { fmt.Println("被调度了:" , nextTime) }) time.Sleep(5 * time.Second) }
Demo2 执行多个任务
package mainimport ( "fmt" "github.com/gorhill/cronexpr" "time" ) type CronJob struct { expr *cronexpr.Expression nextTime time.Time } func main () { var ( cronJob *CronJob expr *cronexpr.Expression now time.Time scheduleTable map [string ]*CronJob ) scheduleTable = make (map [string ]*CronJob) now = time.Now() expr = cronexpr.MustParse("*/5 * * * * * *" ) cronJob = &CronJob{ expr: expr, nextTime: expr.Next(now), } scheduleTable["job1" ] = cronJob expr = cronexpr.MustParse("*/5 * * * * * *" ) cronJob = &CronJob{ expr: expr, nextTime: expr.Next(now), } scheduleTable["job2" ] = cronJob go func () { var ( jobName string cronJob *CronJob now time.Time ) for { now = time.Now() for jobName, cronJob = range scheduleTable { if cronJob.nextTime.Before(now) || cronJob.nextTime.Equal(now) { go func (jobName string ) { fmt.Println("执行:" , jobName) }(jobName) cronJob.nextTime = cronJob.expr.Next(now) fmt.Println(jobName, "下次执行时间:" , cronJob.nextTime) } } select { case <-time.NewTimer(100 * time.Millisecond).C: } } }() time.Sleep(100 * time.Second) }
etcd入门 etcd功能介绍 数据存储在集群中的高可用
K-V存储。 允许应用实时监听
存储中的K-V变化。 可以容忍单点故障
,并支持网络分区
。
在传统的存储模型中:
如果存储节点是单点存储,呢么遇到宕机,即刻不可用; 如果是主从架构,当主库不可用的使用,虽然可以继续基于从库来读,单主从同步时延容忍度又是新的问题。 etcd 基于抽屉理论来解决该点,所谓的抽屉理论指:
假如我们有一个30人的班级,我将一个秘密告诉其中的16位同学,呢么随便挑选16个同学中,必然有一个是知道我秘密的同学。呢么假如班里一直会有一半以上的同学正常上课,呢么我这个秘密就能正确获取;
etcd 与 Raft 的关系 Raft 日志概念、异常安全 名词:
replication
: 日志在leader生成,向follower赋值,达到各个节点的日志序列组中一致;term
: 任期,重新选举产生的leader, term会单调递增; log index
: 日支行在日志序列的下标;
选举:
raft选举leader需要半数以上的节点参与; 节点commit日志最多的选举为Leader; commit日志同样多,则term、index越大的允许选举为leader; quorum(大多数) 模型
该模型要求集群中至少有2N+1
个节点;
调用者向leader写入信息后,leader并不会立马同步返回给调用者,而是会follower进行同步。 同步的follower+leader至少占半数以上的时候(既大于等于N+1个节点后),leader完成本地提交,此时才会返回客户端。 随后leader会异步通知follower自己完成提交操作,所以该模型也是两阶段提交。 etcd相关特性 交互协议支持GRPC,内部基于ProtoBuffer; 底层存储是按key有序排列,支持顺序遍历; 因为key有序,所以etcd天然支持按目录结构高效遍历; 支持复杂事物,提供类型if…then…else…的事务能力; 基于租约机制实现key的TTL过期; etcd 支持MVCC多版本控制(提交会在version单调递增,同key维护多个历史版本),以实现watch机制; 对于多版本控制,可以执行compact命令完成删除。 watch 工作原理
lease 租约
调用者通过sdk向etcd申请一个单位时长的租约,etcd返回该租约的id; 随后调用者带着这个租约ID,向etcd申请K-V存储; K-V存储引擎与租约建立了关联,当该租约过期的时候,便会想K-V存储引擎删除该记录; 而续租面向的仍旧是租约,需要调用者想租约申请 ‘续租’ ; etcd 功能实践 安装:为了方便学习,本地安装单机环境; 启动日志如下:
相关ctl 指令介绍 :
基础API: 1: put 2:get 3: 根据前缀查询 4: watch 第一个ctl watch 指定模糊前缀,第二个ctl 对其进行变更;于是 ctl1 收到了变更。
相关sdk指令case : 直接使用go get github.com/coreos/etcd/clientv3
下载依赖
GO 代码启动测试: package mainimport ( "fmt" "github.com/coreos/etcd/clientv3" "time" ) func main () { var ( config clientv3.Config client *clientv3.Client err error ) config = clientv3.Config{ Endpoints: []string {"127.0.0.1:2379" }, DialTimeout: 5 * time.Second, } if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } client = client }
PUT操作 kv = clientv3.NewKV(client) if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1" , "helloOld" ); err != nil { fmt.Println(err) } else { fmt.Println("revision " , putResp.Header.Revision) } if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1" , "hello" , clientv3.WithPrevKV()); err != nil { fmt.Println(err) } else { fmt.Println("revision " , putResp.Header.Revision) if putResp.PrevKv != nil { fmt.Printf("prevValue : k = %s, v = %s " , string (putResp.PrevKv.Key), string (putResp.PrevKv.Value)) } }
GET操作 if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1" ); err != nil { fmt.Println(err) } else { fmt.Println(getResp.Kvs) }
kv.Get(context.TODO(), “/cron/jobs/job1” ,OpOption) ;
Opoption中可以下达查询是否要仅查个数、是否要查当前游标下几个元素等。
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/" ,clientv3.WithPrefix()); err != nil { fmt.Println(err) } else { fmt.Println(getResp.Kvs) }
DEL操作 if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1" , clientv3.WithPrevKV()); err != nil { fmt.Println(err) } else { if len (delResp.PrevKvs) != 0 { for idx, kvPair = range delResp.PrevKvs { fmt.Printf(" index =%d ,del key =%s del value =%s: " , idx, string (kvPair.Key), string (kvPair.Value)) } } }
del中,也同样支持追加Opoption行为,比如从某个Key开始,删除limit个;
租约 Demo1:创建一个简单的KV,并挂接租约
lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 5 ); err != nil { fmt.Println(err) return } leaseId = leaseGrantResp.ID kv = clientv3.NewKV(client) if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1" , "default" , clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } fmt.Println("写入成功" , putResp.Header.Revision) for { if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1" ); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv 被移除了" ) } else { fmt.Println(getResp.Kvs) } time.Sleep(2 * time.Second) }
Demo2:创建一个简单的KV,并对其进行续约
package mainimport ( "context" "fmt" "github.com/coreos/etcd/clientv3" "time" ) func main () { var ( config clientv3.Config client *clientv3.Client err error lease clientv3.Lease leaseId clientv3.LeaseID putResp *clientv3.PutResponse getResp *clientv3.GetResponse keepResp *clientv3.LeaseKeepAliveResponse keepRespChan <-chan *clientv3.LeaseKeepAliveResponse kv clientv3.KV leaseGrantResp *clientv3.LeaseGrantResponse ) config = clientv3.Config{ Endpoints: []string {"127.0.0.1:2379" }, DialTimeout: 5 * time.Second, } if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 10 ); err != nil { fmt.Println(err) return } leaseId = leaseGrantResp.ID if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil { fmt.Println(err) return } go func () { for { select { case keepResp = <-keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了" ) goto END } else { fmt.Println("收到自动续租应答:" , keepResp.ID) } } } END: }() kv = clientv3.NewKV(client) if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1" , "" , clientv3.WithLease(leaseId)); err != nil { fmt.Println(err) return } fmt.Println("写入成功:" , putResp.Header.Revision) for { if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1" ); err != nil { fmt.Println(err) return } if getResp.Count == 0 { fmt.Println("kv过期了" ) break } fmt.Println("还没过期:" , getResp.Kvs) time.Sleep(2 * time.Second) } }
keepRespChan的类型是chan of *clientv3.LeaseKeepAliveResponse
,在lease.KeepAlive()的返回值中被初始化过了,keepRespChan一直会是一个地址,KeepAlive函数将会close chan。此时<-keepRespChan返回nil,基于此点判断续约是否成功
监听 package mainimport ( "context" "fmt" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "time" ) func main () { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV getResp *clientv3.GetResponse watchStartRevision int64 watcher clientv3.Watcher watchRespChan <-chan clientv3.WatchResponse watchResp clientv3.WatchResponse ) config = clientv3.Config{ Endpoints: []string {"127.0.0.1:2379" }, DialTimeout: 5 * time.Second, } if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } kv = clientv3.NewKV(client) go func () { for { kv.Put(context.TODO(), "/cron/jobs/job7" , "i am job7" ) kv.Delete(context.TODO(), "/cron/jobs/job7" ) time.Sleep(1 * time.Second) } }() if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7" ); err != nil { fmt.Println(err) return } if len (getResp.Kvs) != 0 { fmt.Println("当前值:" , string (getResp.Kvs[0 ].Value)) } watchStartRevision = getResp.Header.Revision watcher = clientv3.NewWatcher(client) fmt.Println("从该版本向后监听:" , watchStartRevision) ctx, cancelFunc := context.WithCancel(context.TODO()) time.AfterFunc(5 *time.Second, func () { cancelFunc() }) watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7" , clientv3.WithRev(watchStartRevision)) for watchResp = range watchRespChan { for _, event := range watchResp.Events { switch event.Type { case mvccpb.PUT: fmt.Println("修改为:" , string (event.Kv.Value), "Revision:" , event.Kv.CreateRevision, event.Kv.ModRevision) case mvccpb.DELETE: fmt.Println("删除了" , "Revision:" , event.Kv.ModRevision) } } } }
clientv3.Op Get/Put/Del 操作 同样的, 后面也可以加WithPrefix 等其他op操作。
package mainimport ( "context" "fmt" "github.com/coreos/etcd/clientv3" "time" ) func main () { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV putOp clientv3.Op getOp clientv3.Op opResp clientv3.OpResponse ) config = clientv3.Config{ Endpoints: []string {"127.0.0.1:2379" }, DialTimeout: 5 * time.Second, } if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } kv = clientv3.NewKV(client) putOp = clientv3.OpPut("/cron/jobs/job8" , "123123123758923" ) if opResp, err = kv.Do(context.TODO(), putOp); err != nil { fmt.Println(err) return } fmt.Println("写入Revision:" , opResp.Put().Header.Revision) getOp = clientv3.OpGet("/cron/jobs/job8" ) if opResp, err = kv.Do(context.TODO(), getOp); err != nil { fmt.Println(err) return } fmt.Println("数据Revision:" , opResp.Get().Kvs[0 ].ModRevision) fmt.Println("数据value:" , string (opResp.Get().Kvs[0 ].Value)) }
乐观锁 case 基于etcd 的乐观锁 与 java+zookeeper/redis 的同步抢锁,整体思路一致,回答清楚下面的问题,剩下的基于上述的case学习,可以独立完成。(以后有时间可以试试写一个tryLock(Timeout time) 的方法试试)
F :若单次加锁时间为1S,但是作业任务超过了1秒,如何保证在接下来的时间仍旧作业的时候,资源依然独占?A :在Java+redis 的分布式锁中, 可以通过当前作业线程创建相关子线程进行定时重置声明周期。而在etcd中可以通过向租约进行续约,来保证租约不过期;
F : 向redis中进行事务操作依赖lua编排指令成’原子‘执行 ,或者通过redission 才可以做到。呢么etcd如何保证事务呢?A :天然支持,有蛮好用的API;
F :当任务执行完毕后,如何进行释放lock;A :java+redis 中,往往是在finally中做del操作; etcd 中可以通过defer 函数,当函数退时进行租约进行取消;
F :这种分布式锁都会确保一个前提,若加锁节点宕机,相关的key或path要如何删除呢?A :在redis中设置expire和etcd中通过租约可以达成同样的目的。
代码关键部分:
申请一个租约,并对做到两点。 a:租约可进行续约,并且当前函数退出的时候,取消续约; b:租约可被取消,触发条件为当前函数退出的时候; 基于txn事务进行相关操作 package mainimport ( "context" "fmt" "github.com/coreos/etcd/clientv3" "time" ) func main () { var ( config clientv3.Config client *clientv3.Client err error kv clientv3.KV lease clientv3.Lease leaseId clientv3.LeaseID keepResp *clientv3.LeaseKeepAliveResponse keepRespChan <-chan *clientv3.LeaseKeepAliveResponse leaseGrantResp *clientv3.LeaseGrantResponse txnResp *clientv3.TxnResponse ) config = clientv3.Config{ Endpoints: []string {"127.0.0.1:2379" }, DialTimeout: 5 * time.Second, } if client, err = clientv3.New(config); err != nil { fmt.Println(err) return } lease = clientv3.NewLease(client) if leaseGrantResp, err = lease.Grant(context.TODO(), 5 ); err != nil { fmt.Println(err) return } leaseId = leaseGrantResp.ID ctx, cancelFunc := context.WithCancel(context.TODO()) defer cancelFunc() defer lease.Revoke(context.TODO(), leaseId) if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil { fmt.Println(err) return } go func () { for { select { case keepResp = <-keepRespChan: if keepRespChan == nil { fmt.Println("租约已经失效了" ) goto END } else { fmt.Println("收到自动续租应答:" , keepResp.ID) } } } END: }() kv = clientv3.NewKV(client) txn := kv.Txn(context.TODO()) txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9" ), "=" , 0 )). Then(clientv3.OpPut("/cron/lock/job9" , "xxx" , clientv3.WithLease(leaseId))). Else(clientv3.OpGet("/cron/lock/job9" )) if txnResp, err = txn.Commit(); err != nil { fmt.Println(err) return } if !txnResp.Succeeded { fmt.Println("锁被占用:" , string (txnResp.Responses[0 ].GetResponseRange().Kvs[0 ].Value)) return } fmt.Println("处理任务" ) time.Sleep(5 * time.Second) }
go-crontab整体架构和设计 实现目标: 实现一个分布式crontab系统。用户可以通过前端页面配置任务和cron表达式和命令来执行定时任务,相比较linux自带的crontab来说,本项目可以方便看到执行结果,且分布式部署可以避免单点问题,用户不用登陆到各个机器去配置任务,操作方便。同时用户可以通过页面查看任务执行的情况。当然,目前做的还比较简单,对任务的执行时间没有超时机制,但提供了手动的删除和强杀正在执行的任务操作。
最终效果
整体架构图 客户端请求无状态的Master集群,将任务保存到Etcd中,Master可以添加、查询任务,查询任务的执行日志 然后Etcd将任务同步到Worker集群,所有的worker都拿到全部的任务列表 通过分布式乐观锁互斥的控制多个worker争抢一个任务 然后将任务执行的日志保存在MongoDB中 系统架构 主要分为master和worker两个角色。通过etcd来作为服务发现和分布式锁的实现。MongoDB作为数据量存储日志信息,方便查询执行结果。同时也可以通过本地log日志查看模块的执行情况。 master通过跟前端交互获取用户的任务操作信息,通过与etcd交互和mongodb交互来完成建立、删除、编辑、强杀、查看健康woker节点以及查看日志等功能。 woker通过监控etcd的节点变化来执行任务的执行、强杀等操作,同时通过etcd来实现自身服务的注册功能以及吧执行结果写入MongoDB作为日志存储。
利用etcd同步全量任务列表到所有的worker节点 每个worker独立调度全量任务,无需和Master产生直接的RPC,避免网络故障 每个worker利用分布式锁抢占,解决并发调度相同任务的问题
Master功能 任务管理HTTP接口:新建、修改、查看、删除任务 任务日志HTTP接口:查看任务执行历史日志 任务控制HTTP接口:提供强制结束任务的接口 实现web管理页面,前后端分离
任务管理 Etcd结构
/cron/jobs/任务名 -> { name, command, cronExpr }
任务日志 MongoDB结构
{ JobName string `bson:"jobName" ` Command string `bson:"command" ` Err string `bson:"err" ` Content string `bson:"content" ` TimePoint TimePoint `bson:"timePoint" ` }
请求MongoDB,按任务名查看最近的执行日志
任务控制 向etcd中写入 worker会监听/cron/killer/
目录下的put修改操作 Master将要结束的任务名put在/cron/killer/
目录下,触发worker立即结束shell任务 Worker功能 任务同步 监听etcd中/cron/jobs/
目录的变化,有变化就说明有添加或者修改任务
任务调度 基于cron表达式计算,触发过期任务
任务执行 协程池并发执行多任务,基于etcd分布式锁抢占
日志捕获 捕获任务执行输出,并保存到MongoDB
监听协程 利用watch API,监听/cron/jobs/
和/cron/killer/
目录的变化 将变化事件通过channel推送给调度协程,更新内存中的任务信息 调度协程 监听任务变更event,更细内存中维护的任务列表 检查cron表达式,扫描到期任务,交给执行协程运行 监听任务控制event,强制中断正在执行的子进程 监听任务执行result,更新内存中任务状态,投递执行日志 执行协程 在etcd中抢占分布式乐观锁:/cron/lock/任务名
抢占成功则通过Command类执行shell任务 捕获Command输出并等待子进程结束,将执行结果投递给调度协程 日志协程 监听调度协程发来的执行日志,放入一个batch中 对新batch启动定时器,超时未满自动提交 若batch被放满,那么就立即提交,并取消自动提交定时器 后续优化 有很多地方有待优化,比如
任务执行时间的限制,可以支持配置任务执行的最大时长,超过强杀。 master目前虽然支持多机部署但是没有主从机制,可以实现master的选主机制,防止并发问题。只有主才能执行etcd 的”写入操作” 代码结构上有一定的冗余,可以通过复用以实现精简 代码实现 Lemon-CS
参考链接
基于GO语言实现一个分布式定时任务_Lcy-CSDN博客