本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表,红黑树,时间轮实现定时器的思路和方法定时器作用定时器在各种场景都需要用到,比如游戏的Buff实现,Redis中的过期任务,Linux中的定时任务等等。
顾名思义,定时器的主要用途是执行定时任务定时器数据结构选取定时器数据结构要求:需要快速找到到期任务,因此,应该具有有序性;其过期执行、插入(添加定时任务)和删除(取消定时任务)的频率比较高,三种操作效率必须保证
以下为各数据结构时间复杂度表现有序链表:插入O(n),删除O(1),过期expire执行O(1)最小堆:插入O(logn),删除O(logn),过期expire执行O(1)红黑树:插入O(logn),删除
O(logn),过期expire执行O(logn)哈希表+链表(时间轮):插入O(1),删除O(1),过期expire平均执行O(1)(最坏为O(n))不同开源框架定时器实现方式不一,如,libuv采用最小堆来实现,
nginx采用红黑树实现,linux内核和skynet采用时间轮算法实现等等定时器接口封装作为定时器,需要封装以下4类接口给用户使用:创建定时器:init_timer添加定时任务:add_timer取消定时任务:
cancel_timer执行到期任务:expire_timer其中执行到期任务有两种工作方式:轮询: 每隔一个时间片去查找哪些任务到期睡眠/唤醒:不停查找deadline最近任务,到期执行,否则sleep;sleep期间,任务有改变,线程会被唤醒
接下来将介绍分别用跳表、红黑树、时间轮来实现定时器跳表实现定时器跳表简介跳表是一种动态的数据结构,采用空间换时间的思想,在有序链表基础上加入多级索引,通过索引进行二分快速查找,支持快速删除、插入和查找操作(平均时间复杂度为。
O(logN),最坏为O(N)),效率可与平衡树媲美,实现比其简单下面通过一张图来简单说明跳表操作跳表的最底层即为基本的有序链表,存储所有的数据,可理解为数据层;往上则为索引层,理想状态下,上一层为下一层节点数的一半。
比如,要查找下图的数据为11的节点,从begin出发,向右走,如果下一个节点大于11则往下走,直到找到目标节点。可见,跳表要比原始链表少比较一些节点,但前提是需要花更多空间存储索引节点。

跳表实现定时器跳表查找,插入,删除(任意节点、头节点)的时间复杂度大概率趋向于O(logn)过期任务查找,只需要跟第一个节点比较,因其第一个节点即为最小节点学会吸取开源框架中优秀数据结构和代码思想,直接采用
redis中跳表结构的实现,取出所需部分,用于实现定时器如下:相关视频推荐红黑树、最小堆、时间轮、跳表多种方式实现定时器游戏服务器框架skynet源码剖析学习地址:c/c++ linux服务器开发/后台架构师。
需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg
等),免费分享

跳表数据结构跳表节点与跳表结构 #define ZSKIPLIST_MAXLEVEL 32 #define ZSKIPPLIST 0.25 typedef struct zskiplistNode zskiplistNode; typedef void (*handler_pt) (zskiplistNode * node); // 跳表节点 struct zskiplistNode { unsigned long score; handler_pt handler; struct zskiplistLevel { struct zskiplistNode **forward; }level[]; }; // 跳表结构 typedef struct zskiplist { struct zskiplistNode * header; int length; int level; }zskiplist;
跳表接口申明具体接口实现细节请移步redis源码 zskiplist *zslCreate(void); void zslFree(zskiplist *zsl); zskiplistNode *zslInsert(zskiplist *zsl, unsigned long score, handler_pt func); void zsklDeleteHead(zskiplist *zsl); void zslDelete(zskiplist *zsl, zskplistNode *zn); void zslPrint(zskiplist *zsl);。
定时器接口实现主要介绍四个接口实现:初始化定时器,添加定时任务,删除/取消定时任务,处理定时任务// test_user.c 封装给用户使用的接口 static uint32_t current_time() { uint32_t t; struct timespec ti; clock_getttime(CLOCK_MONOTONIC, &ti); t = (uint32_t)ti.tv_sec * 1000; t += ti.tv_sec / 1000000; } zskiplist *init_timer() { // 初始化定时器 return zslCreate(); } zskiplistNode *add_timer(zskiplist *zsl, uint32_t msec, handler_pt func) { // 添加定时任务 msec += current_time(); return zslInsert(zsl, msec, func); } void cancel_timer(zskiplist *zsl, zskiplistNode *zn) { // 删除/取消定时任务 zslDelete(zsl, zn); } void expire_timer(zskiplist *zsl){ // 处理定时任务 zskiplistNode *x; uint32_t now = current_time(); for (;;) { x = zslMin(zsl); // 最近节点 if (!x) break; if (x->score > now) break; // 时间未到 x->handler(x); // 执行相关定时任务 zslDeleteHead(zsl); // 执行完删除 } }
红黑树实现定时器红黑树红黑树是一种自平衡的二叉查找树,即,插入和删除操作如果破坏树的平衡时,需要重新调整达到平衡状态因此,是一种比较难的数据结构红黑树五条性质每个节点要么是红色,要么是黑色根节点是黑色每个叶子结点是黑色
每个红节点的两个子节点一定是黑色任意一节点到每个叶子节点的路径都含相同数目的黑结点弄懂红黑树如何调整树的平衡,保证满足这5条性质,是比较麻烦,需要耐心的去推导一遍,此处不展开红黑树实现定时器AVL 树平衡要求太高,维护平衡操作过多,较复杂;红黑树只需维护一个黑高度,效率较高。
红黑树查找,删除,添加时间复杂度为:O(log(n))吸取开源框架中优秀数据结构和代码思想,选用nginx中的红黑树结构红黑树数据结构红黑树节点与红黑树// rbtree.h 红黑树数据结构以及相关接口,具体接口实现同上 #ifndef _NGX_RBTREE_H_INCLUDE_ #define _NGX_RBTREE_H_INCLUDE_ typedef unsigned int ngx_rbtree_key_t; typedef unsigned int ngx_uint_t; typedef int ngx_rbtree_key_int_t; // 红黑树节点 typedef struct ngx_rbtree_node_s ngx_rbtree_node_t; struct ngx_rbtree_node_s { ngx_rbtree_key_t key; ngx_rbtree_node_t *left; ngx_rbtree_node_t *right; ngx_rbtree_node_t *parent; u_char color; // 节点颜色 u_char data; // 节点数据 }; // 插入函数指针 typedef void (*ngx_rbtree_insert_pt) (ngx_rbtree_node_t *root, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); // 红黑树 typedef struct ngx_rbtree_s ngx_rbtree_t; struct ngx_rbtree_s { ngx_rbtree_node_t *root; ngx_rbtree_node_t *sentinel; ngx_rbtree_insert_pt insert; };
红黑树接口声明// 红黑树初始化 #define ngx_rbtree_init(tree, s, i) \ ngx_rbtree_sentinel_init(s); \ (tree)->root = s; \ (tree)->sentinel = s; \ (tree)->insert = i; // 插入操作 void ngx_rbtree_insert(ngx_rbtree_t *tree, ngx_rbtree_node_t *node); // 删除操作 void ngx_rbtree_delete(ngx_rbtree_t *tree, ngx_rbtree_node_t *node); // 插入value void ngx_rbtree_insert_value(ngx_rbtree_node_t *root, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); // 插入timer void ngx_rbtree_insert_timer_value(ngx_rbtree_node_t *root, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel); // 获取下一个节点 ngx_rbtree_node_t *ngx_rbtree_next(ngx_rbtree_t *tree, ngx_rbtree_node_t *node); #define ngx_rbt_red(node) ((node)->color = 1) #define ngx_rbt_black(node) ((node)->color = 0) #define ngx_rbt_is_red(node) ((node)->color) #define ngx_rbt_is_black(node) (!ngx_rbt_is_red(node)) #define ngx_rbt_copy_color(n1, n2) (n1->color = n2->color) #define ngx_rbtree_sentinel_init(node) ngx_rbt_black(node) // 找到最小值,一直往左走即可 static inline ngx_rbtree_node_t * ngx_rbtree_min(ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) { while (node->left != sentinel){ node = node->left; } return node; }
定时器接口实现// test_user.c 封装给用户使用的接口 ngx_rbtree_t timer; static ngx_rbtree_node_t sentinel; typedef struct timer_entry_s timer_entry_t; typedef void (*timer_handler_pt)(timer_entry_t *ev); struct timer_entry_s { ngx_rbtree_node_t timer; timer_handler_pt handler; }; // 初始化 int init_timer() { ngx_rbtree_init(&timer, &sentinel, ngx_rbtree_insert_timer_value); return 0; } // 添加定时任务 void add_timer(timer_entry_t *te, uint32_t msec) { msec += current_time(); te->timer.key = msec; ngx_rbtree_insert(&timer, &te->timer); } // 取消定时 void cancel_timer(timer_entry_t *te) { ngx_rbtree_delete(&timer, &te->timer); } // 执行到期任务 void expire_timer() { timer_entry_t *te; ngx_rbtree_node_t *sentinel, *root, *node; sentinel = timer.sentinel; uint32_t now = current_time(); for(;;){ root = timer.root; if (root == sentinel) break; if (node->key > now) break; te = (timer_entry_t *) ((char *) node - offsetof(timer_entry_t, timer)); te->handler(te); ngx_rbtree_delete(&timer, &te->timer); free(te); } }
以上,为红黑树和跳表实现的定时器,多线程环境下加锁粒度比较大,高并发场景下效率不高,而时间轮适合高并发场景,如下。时间轮实现定时器时间轮可以用于高效的执行大量定时任务,如下为分层时间轮示意图:

时间轮可参考时钟进行理解,秒针(Seconds wheel)转一圈,则分针(Minutes wheel)走一格,分针(Minutes wheel)转一圈,则时针(Hours wheel)走一格随着,时间的流逝,任务不断从上层流下下一层,最终到达秒针轮上,当秒针走到时执行。
如上所示,时间轮大小为8格,秒针1s转动一格,其每一格所指向的链表保存着待执行任务比如,如果当前指针指向1,要添加一个3s后执行的任务,由于1+3=4,即在第4格的链表中添加一个任务节点即可如果要添加一个。
10s后执行的任务,10+1=11,超过了秒针轮范围,因此需要对8取模11 % 8 = 3,即,会把这个任务放到分针轮上3对应的链表上,之后再从分针轮把任务丢到秒针轮上进行处理也即,**秒针轮(Seconds wheel)**即保存着最近将要执行的任务,随着时间的流逝,任务会慢慢的从上层流到秒针轮中进行执行。
优点:加锁粒度较小,只需要加一个格子即可,一个格子对应一串链表;适合高并发场景缺点:不好删除如何解决时间轮定时任务删除?通过引用计数来解决交由业务层处理,将删除标记设为true , 在函数回调中根据这个标记判断是否需要处理
这里介绍两种定时器实现方案,一种是简单实现方案,另一种是skynet较为复杂的实现时间轮实现定时器简单时间轮实现方案功能场景:由心跳包进行超时连接检测,10s未收到则断开连接一般做法:map。
每秒轮询这个结构,检测所有连接是否超时,收到心跳包,记录时间戳缺点:效率很差,每次需要检测所有连接,时间复杂度为O(n)优化:分治大法,只需检测快过期的连接, 采用hash数组+链表形式,数组大小设置成16 :
[0] + [1] + [2] + ... + [15] ,相同过期时间的放入一个数组,因此,每次只需检测最近过期的数组即可,不需要遍历所有数据结构定义以下为定时器节点,增加引用计数ref, 只有当ref
为0时删除连接class CTimerNode { public: CTimerNode(int fd) : id(fd), ref(0) {} void Offline() {this->ref = 0}; bool tryKill() { if (this->ref == 0) return true; DecRef(); if (this->ref == 0){ return true; } return false; } void IncRef() {this->ref++;} protected: void DecRef() {this->ref--;} private: int ref; int id; } // 时间轮数组大小16, (x对16取余)==(x&1111) 落到0-15之间,即落到对应的数组 const int TW_SIZE = 16; const in EXPIRE = 10; // 过期间隔 const int TW_MASK = TW_SIZE - 1; // 掩码, 用于对16取余 static size_t iReadTick = 0; // 滴答时钟 typedef list TimeList; // 数组每一个槽位对应一个list typedef TimeList::iterator TimeListIter; typedef vector TimeWheel; // 时间轮。
定时器接口/ 添加定时 void AddTimeOut(TimerWheel &tw, CTimerNode *p) { if (p) { p->IncRef(); // 找到iRealTick对应数组的idx(槽位) TimeList &le = tw[(iRealTick+EXPIRE) & TW_MASK]; le.push_back(p); // 把时间节点加入list中 } } // 延时调用 void AddTimeOutDelay(TimeWheel &tw, CTimerNode *p, size_t delay) { if (p) { p->IncRef(); TimeList &le = tw[(iRealTick + EXPIRE + delay) & TW_MASK]; le.push_back(p); } } // 时间轮移动 void TimerShift(TimeWheel &tw) { size_t tick = iRealTick; iRealTick++; TimeList &le = tw[tick & TW_MASK]; TimeListIter iter = le.begin(); for (; iter != le.end(); iter++) { CTimerNode *p = *iter; if (p && p->trySkill()){ delete p; } } le.clear(); }
Skynet定时器实现方案skynet中定时器数据结构采用时间轮方式,hash表+链表实现struct timer_node { //时间节点 struct timer_node *next; uint32_t expire; //到期滴答数 }; struct link_list { // 链表 struct timer_node head; struct timer_node *tail; }; struct timer { struct link_list near[256]; // 即将到来的定时器 struct link_list t[4][64]; // 相对较遥远的定时器 struct spinlock lock; uint32_t time; // 记录当前滴答数 uint64_t starttime; uint64_t current; uint64_t current_point; };
其中time为32位无符号整数, 记录时间片对应数组near[256] ,表示即将到来的定时任务, t[4][64],表示较为遥远的定时任务。定时器执行流程
skynet_time_wheelt[3]t[2]t[1]t[0]near26-32位20-26位14-20位8-14位0-8位[2^(8+6x3),2^(8+6x4)-1][2^(8+6x2),2^(8+6x3)-1]
[2^(8+6),2^(8+6x2)-1][2^8,2^(8+6) -1][0,2^8-1]首先检查节点的expire与time的高24位是否相等,相等则将该节点添加到expire低8位值对应数组near
的元素的链表中,不相等则进行下一步检查expire与time的高18位是否相等,相等则将该节点添加到expire低第9位到第14位对应的6位二进制值对应数组t[0]的元素的链表中,否则进行下一步检查expire
与time的高12位是否相等,相等则将该节点添加到expire低第15位到第20位对应的6位二进制值对应数组t[1]的元素的链表中,如果不相等则进行下一步检查expire与time的高6位是否相等,相等则将该节点添加到expire低第21位到第26位对应的6位二进制值对应数组
t[2]的元素的链表中,如果不相等则进行下一步将该节点添加到expire低第27位到第32位对应的6位二进制值对应数组t[3]的元素的链表中以下为具体实现,仅贴出主要接口,具体细节请参考skynet源代码。
定时器初始化// skynet_start.c // skynet 启动入口 void skynet_start(struct skynet_config * config) { ... skynet_timer_init(); ... } // skynet_timer.c void skynet_timer_init(void) { // 创建全局timer结构 TI TI = timer_create_timer(); uint32_t current = 0; systime(&TI->starttime, ¤t); TI->current = current; TI->current_point = gettime(); }
添加定时器通过skynet_server.c中的cmd_timeout调用skynet_timeout添加新的定时器// TI为全局的定时器指针 static struct timer * TI = NULL; int skynet_timeout(uint32_t handle, int time, int session) { ... struct timer_event event; event.handle = handle; // callback eveng.session = session; // 添加新的定时器节点 timer_add(TI, &event, sizeof(event), time); return session; } // 添加新的定时器节点 static void timer_add(struct timer *T, void 8arg, size_t sz, int time) { // 给timer_node指针分配空间,还需要分配timer_node + timer_event大小的空间, // 之后通过node + 1可获得timer_event数据 struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz); memcpy(node+1,arg,sz); SPIN_LOCK(T); node->expire=time+T->time; add_node(T, node); SPIN_UNLOCK(T); } // 添加到定时器链表里,如果定时器的到期滴答数跟当前比较近(near数组里 // 否则根据差值大小添加到对应的T->T[i]中 static void add_node(struct timer *T, struct timer_node *node) { ... }
驱动方式skynet启动时,会创建一个线程专门跑定时器,每帧(0.0025s)调用skynet_updatetime()// skynet_start.c static void * thread_timer(void *p) { struct monitor * m = p; skynet_initthread(THREAD_TIMER); for (;;) { skynet_updatetime(); // 调用timer_update skynet_socket_updatetime(); CHECK_ABORT wakeup(m,m->count-1); usleep(2500); // 2500微秒 = 0.0025s if (SIG) { signal_hup(); SIG = 0; } } ... }
每个定时器设置一个到期滴答数,与当前系统的滴答数(启动时为0,1滴答1滴答往后跳,1滴答==0.01s ) 比较得到差值interval;如果interval比较小(0 <= interval <= 2^8-1),表示定时器即将到来,保存在第一部分前2^8个定时器链表中;否则找到属于第二部分对用的层级中。
// skynet_timer.c void skynet_updatetime(void) { ... uint32_t diff = (uint32_t)(cp - TI->current_point); TI->current_point = cp; TI->current += diff; // diff单位为0.01s for (i = 0; i near是否位空,有就处理到期定时器 timer_shift(T); // 时间片time++,移动高24位的链表 timer_execute(T); SPIN_UNLOCK(T); } // 每帧从T->near中触发到期得定时器 static inline void timer_execute(struct timer *T) { ... } // 遍历处理定时器链表中所有的定时器 static inline void dispatch_list(struct timer_node *current) { ... } // 将高24位对应的4个6位的数组中的各个元素的链表往低位移 static void timer_shift(struct timer *T) { ... } // 将level层的idx位置的定时器链表从当前位置删除,并重新add_node static void move_list(struct timer *T, int level, int idx) { }
最小堆实现定时器最小堆实现例子:boost.asio采用二叉树,go采用四叉树, libuv具体实现略总结本文主要介绍定时器作用,实现定时器数据结构选取,并详细介绍了跳表,红黑树,时间轮实现定时器的思路和方法。