您的位置:金沙游乐场85155 > 大数据库 > 浅析Redis分布式锁

浅析Redis分布式锁

发布时间:2020-04-15 07:02编辑:大数据库浏览(95)

    近期工作遇到需要业务场景如下,需要每天定时推送给另一系统一批数据,但是由于系统是集群部署的,会造成统一情况下任务争用的情况,所以需要增加分布式锁来保证一定时间范围内有一个Job来完成定时任务. 前期考虑的方案有采用ZooKeeper分布式任务,Quartz分布式任务调度,但是由于Zookeeper需要增加额外组件,Quartz需要增加表,并且项目中现在已经有Redis这一组件存在,所以考虑采用Redis分布式锁的情况来完成分布式任务抢占这一功能

    日常的业务系统中经常使用到redis,平时也会研究下redis的设计文档和源码,对redis的使用场景、实现方案、运维要点这些常规知识点都有所了解,但是零零碎碎总感觉不够系统,这里结合源码对自己使用redis过程中的一些经验、疑惑、思考进行归纳和总结。

    记录一下走过的弯路.

    主要有以下内容:

    第一版本:

    • Redis的线程结构,单线程结构、异步化组件、最佳线程数、性能瓶颈、阻塞场景
    • 过期和淘汰策略,令人困惑的expire、单线程中的定时器、淘汰策略和时机
    • 数据结构的选择,内部数据结构实现、hash/B+/LSM的特点和场景对比
    • 高可用和集群部署的方式
    @Overridepublic T Long set(String key,T value, Long cacheSeconds) {if (value instanceof HashMap) {BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.putAll((Map) value);valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}else{//使用map存储BoundHashOperations valueOperations = redisTemplate.boundHashOps(key);valueOperations.put(key, value);//秒valueOperations.expire(cacheSeconds, TimeUnit.SECONDS);}return null;}@Overridepublic void del(String key) {redisTemplate.delete(key);}
    

    1、Redis的线程结构


    Redis在设计上最大的亮点是其单线程结构,并且还能提供极其强大的并发处理能力和丰富的数据结构,这点让我很激动也很是困惑的。

    激动的是redis强大的并发处理能力,以及其丰富的api接口,让日常的业务需要可以更爽的完成。更让我惊叹的是单线程的设计导致redis的代码非常的小巧,整个源码大约5w行,而且不需要处理多线程引入的并发问题,整个代码理解起来也很顺畅。

    困惑的是单线程的设计结构为什么能支持这么大的并发量,这一点和我们常规处理大并发的习惯性思维不同。一般在面对大并发的请求,首先想到的是用多个线程来处理,io线程和业务线程分开,业务线程使用线程池来避免频繁创建和销毁线程,即便是一次请求阻塞了也不会影响到其他请求。为什么redis会选择反其道而行之,这么做是否会局限redis的使用,在使用redis时有没有特别需要注意的点?

    采用set 和 del 完成锁的占用与释放,后经测试得知,set不是线程安全,在并发情况下常常会导致数据不一致.

    1.1 IO/业务单线程

    准确的来说,Redis的单线程结构是指其主线程是单线程的,这里主线程包括IO事件的处理,以及IO对应的相关请求的业务处理,此外主线程还负责过期键的处理、复制协调、集群协调等等,这些除了IO事件之外的逻辑会被封装成周期性的任务由主线程周期性的处理。

    正因为采用单线程的设计,对于客户端的所有读写请求,都由一个主线程串行地处理,因此多个客户端同时对一个键进行写操作不会有并发问题,避免了频繁的上下文切换和锁竞争。

    并且在网络上使用epoll,利用epoll的非阻塞多路复用特性,不在IO上浪费时间。

    第二版本:

    1.2 异步化组件

    但是除了客户端读写请求之外还有一些比较耗时的操作,如持久化RDB文件,持久化AOF文件等等,这些操作不能放在主线程里面处理,因此Redis会在适当的时候fork子进程来异步的处理这种任务。除了这些,Redis还有一组异步任务处理线程,用于处理不需要主线程同步处理的工作,总体上Redis的线程体系结构大致如下图:

    图片 1

    Redis线程结构

    上图中间蓝色的部分代表主线程,最左边虚线代表通过fork得到的子进程,用来处理RDB持久化以及AOF持久化等任务,最右边橙色部分代表一组异步任务处理线程,下面会详细介绍这组异步任务处理线程,即Redis异步化组件——BIO组件。

    在Redis中,异步任务处理线程组被封装在BIO组件中,源文件为bio.h和bio.c。bio异步线程启动时在main方法调用,会生成BIO_NUM_OPS(3)个线程,线程函数为bioProcessBackgroundJobs。

    void bioInit(void) {
        pthread_attr_t attr;
        pthread_t thread;
        size_t stacksize;
        int j;
    
        for (j = 0; j < BIO_NUM_OPS; j++) {
            pthread_mutex_init(&bio_mutex[j],NULL);
            pthread_cond_init(&bio_newjob_cond[j],NULL);
            pthread_cond_init(&bio_step_cond[j],NULL);
            bio_jobs[j] = listCreate();
            bio_pending[j] = 0;
        }
    
        pthread_attr_init(&attr);
        pthread_attr_getstacksize(&attr,&stacksize);
    
        if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
        while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
        pthread_attr_setstacksize(&attr, stacksize);
    
        for (j = 0; j < BIO_NUM_OPS; j++) {
            void *arg = (void*)(unsigned long) j;
            if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
                exit(1);
            }
    
            bio_threads[j] = thread;
        }
    }
    

    bioProcessBackgroundJobs的处理过程如下:

    void *bioProcessBackgroundJobs(void *arg) {
    
        pthread_mutex_lock(&bio_mutex[type]);
    
        //...
    
        while(1) {
            listNode *ln;
    
            /* Pop the job from the queue. */
            ln = listFirst(bio_jobs[type]);
            job = ln->value;
    
            pthread_mutex_unlock(&bio_mutex[type]);
    
            /* Process the job accordingly to its type. */
            if (type == BIO_CLOSE_FILE) {
                close((long)job->arg1);
            } else if (type == BIO_AOF_FSYNC) {
                aof_fsync((long)job->arg1);
            } else if (type == BIO_LAZY_FREE) {
                if (job->arg1)
                    lazyfreeFreeObjectFromBioThread(job->arg1);
                else if (job->arg2 && job->arg3)
                    lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
                else if (job->arg3)
                    lazyfreeFreeSlotsMapFromBioThread(job->arg3);
            } else {
                serverPanic("Wrong job type in bioProcessBackgroundJobs().");
            }
    
            zfree(job);
    
            //...
            pthread_mutex_lock(&bio_mutex[type]);
        }
    }
    

    BIO线程目前包括三个线程,处理三种类型的任务:

    • 文件句柄关闭任务

      文件句柄的释放(close)对于操作系统来说是一个比较重的操作,在Redis中,当需要重新创建新的文件句柄,废弃的文件句柄失效的时候,这个废弃的文件句柄将由异步任务处理线程来关闭。

    • AOF持久化任务,Redis对于AOF文件的持久化有三种策略:

      • 关闭AOF功能
      • aof_fsync_everysec策略,即每秒一次,实际上并不是一定一秒钟一次
      • aof_fsync_always策略,即每次IO事件处理完毕,都将AOF持久化

      这三种策略分别对应不同的业务场景和用户需求,默认的策略为aof_fsync_everysec,这个时候对于aof缓冲区内容持久化工作会交给异步任务处理线程来处理。

    • 内存的释放也是比较重的操作,这部分工作可以交给异步任务处理线程来处理,Redis中通过一部任务释放的空间主要包括三种:

      • 对象空间的释放,当当前内存够用的时候,主线程不会同步释放废弃对象的内存,交给异步任务处理线程来释放,当然需要配合lazyfree_lazy_server_del参数使用。
      • DB空间的异步释放,当需要删除DB的时候,Redis会申请一个新的哈希表作为新的DB,将DB内存的释放的工作交给异步任务处理线程来处理。
      • slots-leys空间释放,在Redis Cluster模式下,slots-keys是由Redis实现的跳跃表数据结构支撑的,当Redis Cluster需要刷新slots-keys的时候,首先创建一个新的跳跃表结构作为新的slots-keys,然后将老的slots-keys结构释放的工作交给异步任务处理线程来处理。
    /** * 分布式锁 * @param range 锁的长度 允许有多少个请求抢占资源 * @param key * @return */ public boolean getLock(int range, String key) { ValueOperationsString, Integer valueOper1 = template.opsForValue(); return valueOper1.increment(key, 1) = range; } /** * 初始化锁, 设置等于0 * @param key * @param expireSeconds * @return */ public void initLock(String key, Long expireSeconds) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); operations.set(key, 0, expireSeconds * 1000); } /** * 释放锁 * @param key */ public void releaseLock(String key) { ValueOperationsString, Integer operations = template.opsForValue(); template.setKeySerializer(new GenericJackson2JsonRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); template.delete(key); }
    

    1.3 最佳线程数 —— 单线程

    这里讨论下Redis的最佳线程数。

    关于服务器性能的思考中,可以看到一个应用系统的最佳线程数的计算公式:

    图片 2

    最佳线程数

    其中Tic是指一次请求过程中cpu计算耗时,Tiw是指请求处理过程中的等待耗时,这里的等待耗时可能由io、锁导致。但是redis是纯内存数据库,没有io操作,所以其Tiw为0,那么redis的最佳线程数就很明显了,就是一个线程。

    再来看看,线程数对系统性能的影响:

    图片 3

    线程数对系统性能的影响

    从压测的结果来看,系统的性能表现并不是线程越多越好,而是会在起最佳线程数附件产生性能拐点,过多的线程带来的上下文切换会对系统的性能表现带来负面影响。

    通过对系统最佳线程数的讨论,我们可以明确为什么Redis采用单线程依然可以达到超高的并发处理能力,因为所有的请求都在内存中完成,根本不需要去等待io,只要机器的cpu没有达到瓶颈,再多的请求也不怕。

    但是cpu会成为Redis的性能瓶颈吗,而且在多核cpu流行的今天,只有一个线程,只用一个核多少让人感觉很浪费,万一cpu的单核计算能力成为系统的性能瓶颈,又没法利用cpu其他的计算能力,那不是得不偿失?

    采用redis的 increament操作完成锁的抢占.但是释放锁时,是每个线程都可以删除redis中的key值. 并且initLock会降上一次的操作给覆盖掉,所以也废弃掉此方法

    1.4 瓶颈 —— cpu or 网络?

    对cpu会成为Redis的性能瓶颈的担忧是可以理解的,但是在实际使用过程中cpu或许不是制约Redis的关键因素,网络io可能才是最大的瓶颈。

    看一组Redis的性能测试数据对比,Redis版本2.8.19,测试环境如下:

    • KVStore(阿里云Redis):E5-2682 2.5GHz 1G内存 1G网卡
    • Redis on ECS SSD: E5-2682 2.5GHz 1G内存 1G网
    • Redis on ECS 硬盘: E5-2682 2.5GHz 1G内存 1G网

    图片 4

    set接口测试

    图片 5

    get接口测试

    从对比的测试结果来看,Redis的性能还是很强悍的,单机qps最低也在5W以上,但是相同条件下KVStore比Redis on ECS要高出一倍。

    分析对比结果,Redis on ECS SSD和Redis on ECS 硬盘在性能上差不多,说明磁盘并不是Redis的性能瓶颈。而Redis on ECS之所以低于KVStore主要是受限于ECS的网络io性能,并没有跑满cpu,导致并发处理上不去。

    Redis性能分析官方文档中,也对影响Redis性能的因素进行了分析和对比测试,大部分情况下网络依然是制约其性能的首要因素。

    但是毕竟Redis的单线程模型对多核cpu没有完全利用,如果有这样的担心,那么在网络io没有成为瓶颈时,可以在一台机器上多部署几个Redis的实例,充分利用cpu和网卡的能力。

    最终版本:

    1.5 噩梦 —— 阻塞

    基于单线程设计,Redis整体上设计精良并且运行良好。但是凡事有利有弊,正因为主线程只有一个,所有的读写请求都在主线程中完成,如果出现请求阻塞,哪怕是很短的时间,对应应用来说都是噩梦。

    Redis官方文档对绝大多数阻塞问题进行了分类说明,归纳起来导致阻塞问题的场景大致可以分为内在和外在两个原因。

    内在原因

    • 不合理使用API和数据结构

      通常Redis的执行速度都非常快,但是如果对一个包含上万个元素的hash结构执行hgetall操作,由于数据量比较大而且算法复杂度是O(n),这条命令执行速度必然很慢。

      对于高并发的场景我们应该尽量避免在大对象上执行算法复杂度超过O(n)的命令。在使用redis的命令时需要密切注意文档中算法复杂度为O(n)的api。

      Redis原生提供慢查询统计,执行slowlog get {n}命令可以获取最近n条慢查询命令。

    • CPU饱和的问题

      单线程的Redis处理命令时只能使用一个CPU,如果Redis把单核CPU使用率跑到接近100%,则会导致CPU饱和问题,这时Redis将无法处理更多的命令,严重影响吞吐量和系统的稳定性。

      对于CPU饱和的问题,首先需要确认当前Redis的并发处理量是否达到极限,如果只有几百或几千qps的Redis实例其cpu使用率就达到饱和是不正常,这个时候需要排查是否是使用了高算法复杂度的命令,或者是否是对内存过度优化。如果qps确实很高,则需要考虑集群化水平扩展来分摊qps压力。

    • 持久化相关的阻塞

      对于开启了持久化功能的Redis节点,需要排查是否是持久化导致的阻塞。持久化引起的主线程阻塞的操作有:fork阻塞、AOF刷盘阻塞、HugePage写操作阻塞。

      • fork阻塞:fork操作发生在RDB和AOF重写时,Redis主线程调用fork操作产生共享内存的子进程,由子进程完成持久化文件重写工作,如果fork操作本身耗时很长,必然会导致主线程阻塞。
      • AOF刷盘阻塞:在开启AOF持久化功能时,文件刷盘一般采用一秒一次,后台线程每秒对AOF文件做fsync操作,当硬盘压力过大时,fsync操作需要等待,直到写入完成。如果主线程发现距离上一次的fsync成功超过2秒,为了数据安全性它会阻塞直到后台线程执行fsync操作完成。
      • 子进程在执行重新期间利用linux写时复制技术降低内存开销,因此只有写操作时Redis才复制需要修改的内存页,对于开启Transparent HugePages的操作系统,每次写命令引起的复制内存页单位由4K变为2M,放大了512倍,会拖慢写操作的执行时间,导致大量写操作慢查询。

    外在原因

    • CPU竞争

      Redis是典型的cpu密集型应用,不建议和其他多核cpu密集型服务部署在一起,当其他进程过度消耗cpu时,将严重影响Redis的吞吐量。

    • 内存交换

      内存交换(swap)对于Redis来说是非常致命的,Redis保证高性能的一个重要前提是所有的数据都在内存中,如果操作系统把Redis使用的部分内存换出到硬盘,由于内存和硬盘读写速度相差几个数量级,会导致发生交换之后的Redis性能急剧下降。

      预防内存交换的方法有:保证机器有充足的可用内存,确保Redis实例设置了最大可用内存,防止极端情况下Redis内存不可控增长。

    • 网络问题

      网络问题也会引起Redis阻塞,常见的网络问题主要有:链接拒绝、网络延迟、网卡软中断。

    import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.connection.jedis.JedisConnection;import org.springframework.stereotype.Service;import org.springframework.util.ReflectionUtils;import redis.clients.jedis.Jedis;import java.lang.reflect.Field;import java.util.Collections;@Servicepublic class RedisLock { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; private static final Long RELEASE_SUCCESS = 1L; @Autowired private RedisConnectionFactory connectionFactory; /** * 尝试获取分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @param expireTime 超期时间 * @return 是否获取成功 */ public boolean lock(String lockKey, String requestId, int expireTime) { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } /** * 释放分布式锁 * @param lockKey 锁 * @param requestId 请求标识 * @return 是否释放成功 */ public boolean releaseLock(String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = getJedis().eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } public Jedis getJedis() { Field jedisField = ReflectionUtils.findField(JedisConnection.class, "jedis"); ReflectionUtils.makeAccessible(jedisField); Jedis jedis = (Jedis) ReflectionUtils.getField(jedisField, connectionFactory.getConnection()); return jedis; }}
    

    2、Redis的数据过期及淘汰策略


    Redis的数据过期策略和实现一直是让我非常困惑,Redis的文档也没有清晰的说明,下面结合源码一起分析下Redis的数据过期及淘汰策略。

    2.1 令人困惑的expire

    Redis文档中对于过期key的处理方式的描述有两种:被动和主动方式。

    当一些客户端尝试访问它时,key会被发现并主动的过期。
    但是这是不够的,因为有些过期的keys,永远不会访问他们。 无论如何,这些keys应该过期,所以定时随机测试设置keys的过期时间,并将过期的keys进行删除。
    具体就是Redis每秒10次做的事情:

    • 测试随机的20个keys进行相关过期检测。
    • 删除所有已经过期的keys。
    • 如果有多于25%的keys过期,重复步奏1。

    但是Redis的主线程是单线程,并没有一个专门的线程来负责定时对过期数据进行清理,Redis如何具体完成过期key的查找、定时任务如何设置、对过期keys删除的效果如何?Redis的文档并没有明确的说明,需要从源码中查找。

    2.2 定时器?

    Redis的文档中多处提到定时处理的逻辑,如过期key的定期清理,aof定时写文件,但是如何在单线程中实现一个定时器呢?

    在Redis中所有的IO事件都会被封装成redisServer.aeEventLoop,在Redis的启动函数中,会进行aeEventLoop事件的定时处理回调(serverCron)的注册:

    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    

    定时事件的注册过程如下:

    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
    {
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
    
        te->id = id;
        aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->next = eventLoop->timeEventHead;
        eventLoop->timeEventHead = te;
        return id;
    }
    

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 是定时处理逻辑的回调,这里会处理过期key的清理、统计信息更新、对不合理的数据库进行大小调整、关闭和清理连接生效的客户端、尝试进行AOF或RDB持久化操作等。

    int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    
        //...
    
        clientsCron();
    
        /* Handle background operations on Redis databases. */
        databasesCron();
    
        rewriteAppendOnlyFileBackground();
    
        backgroundSaveDoneHandler(exitcode,bysignal);
    
        freeClientsInAsyncFreeQueue();
    
        clientsArePaused();
    
        /* Replication cron function -- used to reconnect to master,
        * detect transfer failures, start background RDB transfers and so forth. */
        run_with_period(1000) replicationCron();
    
        /* Run the Redis Cluster cron. */
        run_with_period(100) {
            if (server.cluster_enabled) clusterCron();
        }
    
        /* Run the Sentinel timer if we are in sentinel mode. */
        run_with_period(100) {
            if (server.sentinel_mode) sentinelTimer();
        }
    
        /* Cleanup expired MIGRATE cached sockets. */
        run_with_period(1000) {migrateCloseTimedoutSockets();}
    
        //...
    
        return 1000/server.hz;
    }
    

    定时回调serverCron具体的处理业务后面再研究,先看看serverCron什么时候会被触发调用。
    在Redis事件循环中aeProcessEvents会调用processTimeEvents,从名字上看出是处理定时事件。

    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te, *prev;
        long long maxId;
        time_t now = time(NULL);
    
        /**
        * 如果系统时间被调整到将来某段时间然后又被设置回正确的时间,
        * 这种情况下链表中的timeEvent有可能会被随机的延迟执行,因
        * 此在这个情况下把所有的timeEvent的触发时间设置为0表示及执行
        */
        if (now < eventLoop->lastTime) {
            te = eventLoop->timeEventHead;
            while(te) {
                te->when_sec = 0;
                te = te->next;
            }
        }
    
        eventLoop->lastTime = now; // 设置上次运行时间为now
    
        prev = NULL;
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;
        while(te) {
            long now_sec, now_ms;
            long long id;
    
            /**
            * 删除已经被标志位 删除 的时间事件
            */
            if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                // 在时间事件节点被删除前调用finlizerProce()方法
                te->finalizerProc(eventLoop, te->clientData);
                zfree(te);
                te = next;
                continue;
            }
    
            if (te->id > maxId) {
                /**
                * te->id > maxId 表示当前te指向的timeEvent为当前循环中新添加的,
                * 对于新添加的节点在本次循环中不作处理。
                * PS:为什么会出现这种情况呢?有可能是在timeProc()里面会注册新的timeEvent节点?
                * 对于当前的Redis版本中不会出现te->id > maxId这种情况
                */
                te = te->next;
                continue;
            }
    
            aeGetTime(&now_sec, &now_ms);
            if (now_sec > te->when_sec ||
                (now_sec == te->when_sec && now_ms >= te->when_ms))
            {
                int retval;
                id = te->id;
    
                // 如果当前时间已经超过了对应的timeEvent节点设置的触发时间,
                // 则调用timeProc()方法执行对应的任务,即serverCron
                retval = te->timeProc(eventLoop, id, te->clientData);
    
                processed++;
                if (retval != AE_NOMORE) {
                    // 要执行多次,则计算下次执行时间
                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
                } else {
                    // 如果只需要执行一次,则把id设置为-1,再下次循环中删除
                    te->id = AE_DELETED_EVENT_ID;
                }
            }
            prev = te;
            te = te->next;
        }
        return processed;
    }
    

    processTimeEvents会对定时事件进行时间判断,如果到了设置的触发时间则会调用注册的定时回调函数
    serverCron。

    这里需要注意te->timeProc,即serverCron的返回值,从之前serverCron分析来看,其返回值为1000/server.hz。server.hz是Redis server执行后台任务的频率,默认为10,此值越大表示redis对定时任务的执行次数越频繁,如定期清理过期key。aeAddMillisecondsToNow会根据serverCron的返回值来计算下次定时任务触发的unix时间。

    定时器的后果 —— 阻塞

    至此已经很清楚,Redis中的定时业务的处理是放在主线程之中,在主线程处理完一次请求之后,接着计算是否到了业务的定时周期,如果到了则处理定时业务。

    但是这会加大主线程处理请求的延时,如果在定时回调中塞入过多的处理逻辑或者某一次处理耗时严重,如由于磁盘压力导致aof写文件耗时增加,那么就会阻塞整个主线程的处理。

    Redis在主线程中塞入定时处理的业务逻辑,避免再引入一个单独的定时线程,简化了代码,但是也带来阻塞主线程业务处理的风险,因此在定时回调中处理相关定时业务逻辑时需要十分小心,密切注意处理耗时和对cpu的使用。

    本文由金沙游乐场85155发布于大数据库,转载请注明出处:浅析Redis分布式锁

    关键词: