#yyds干货盘点#Redis源码分析专题从本质分析你写入Redis中的数据为什么不见了()

人生必须的知识就是引人向光明方面的明灯。这篇文章主要讲述#yyds干货盘点#Redis源码分析专题从本质分析你写入Redis中的数据为什么不见了?相关的知识,希望能为你提供帮助。
Redis数据库介绍
Redis作为一个成熟的数据存储中间件,它提供了完善的数据管理功能,比如之前我们提到过的数据过期和今天我们要讲的数据淘汰(evict)策略。
数据的局部性原理贯穿计算机学科的原理局部性原理,这里可以明确告诉你,局部性原理在缓存场景有这样两种现象,

  1. 最新的数据下次被访问的概率越高。
  2. 被访问次数越多的数据下次被访问的概率越高。 这里我们可以简单认为被访问的概率越高价值越大。
基于上述两种现象,我们可以指定出两种策略
  1. 淘汰掉最早未被访问的数据,LRU(Least Recently Used)。
  2. 淘汰掉访被访问频次最低的数据,LFU(Least Frequently Used)。
除了LRU和LFU之外,还可以随机淘汰。这就是将数据一视同仁,随机选取一部分淘汰。实际上Redis实现了以上3中策略,你使用时可以根据具体的数据配置某个淘汰策略。
除了上述三种策略外,Redis还为由过期时间的数据提供了按TTL淘汰的策略,其实就是淘汰剩余TTL中最小的数据。另外需要注意的是Redis的淘汰策略可以配置在全局或者是有过期时间的数据上。
Redis的问题背景
  • 知道Redis主要是基于内存来进行高性能、高并发的读写操作的。
  • 然而内存是有限的,比如 redis 就只能用 10G,你要是往里面写了 20G 的数据,会咋办?当然会干掉 10G 的数据,然后就保留 10G 的数据了。那干掉哪些数据?保留哪些数据?这就要根据设置的redis的淘汰机制来选择了。数据明明过期了,竟然还占用这内存,这些都是由 redis 的过期策略来决定。
Redis的过期策略
Redis的过期策略包括两种,分别是定期删除和惰性删除:
  • 定期删除:指的是Redis默认是每隔100ms就随机抽取一些设置了过期时间的key,检查其是否过期,如果过期就删除。(不能完全删除)
  • 惰性删除:直接查询数据的时候,redis会先查看一些这个数据是否已经过期,如果过期,就进行删除。(不能完全删除数据)
Redis的删除内存策略定期删除和惰性删除都存在着一些问题,如果定期删除漏掉了很多过期key,然后你也没及时去查,也就没走惰性删除,此时有可能导致大量过期 key 堆积在内存里,导致redis 内存块耗尽了
Redis内存淘汰机制Redis中数据淘汰实际上是指的在内存空间不足时,清理掉某些数据以节省内存空间。 虽然Redis已经有了过期的策略,它可以清理掉有效期外的数据。
如果过期的数据被清理之后存储空间还是不够怎么办?是不是还可以再删除掉一部分数据? 在缓存这种场景下 这个问题的答案是可以,因为这个数据即便在Redis中找不到,也可以从被缓存的数据源中找到。
所以在某些特定的业务场景下,我们是可以丢弃掉Redis中部分旧数据来给新数据腾出空间。
内存淘汰机制包括以下几种方式:众所周知,redis是一个内存数据库,所有的键值对都是存储在内存中。当数据变多之后,由于内存有限就要淘汰一些键值对,使得内存有足够的空间来保存新的键值对。在redis中,通过设置server.maxmemory来限定内存的使用(server.maxmemory为0,不限制内存),到达server.maxmemory就会触发淘汰机制。
  • noeviction: 当内存不足以容纳新写入数据时,新写入操作会报错,一般不用。
  • Allkeys-lru: 当内存不足以容纳新写入数据时,在键空间中,移除最近最少使用的 key(最常用)。
  • allkeys-random:当内存不足以容纳新写入数据时,在键空间中,随机移除某个 key(很少用)。
  • volatile-lru:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,移除最近最少使用的 key(很少用)。
  • volatile-random:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,随机移除某个key。
  • volatile-ttl:当内存不足以容纳新写入数据时,在设置了过期时间的键空间中,有更早过期时间的key优先移除。
设置内存淘汰机制的方式:在redis.conf中:
  • maxmemory 100mb 最大内存设置,如果0代表无限 ;
  • maxmemory-policy: Allkeys-lru
Redis在每次执行客户端的命令的时候都会检查使用内存是否超过server.maxmemory,如果超过就进行淘汰数据。
int processCommand(client *c) { ……//server.maxmemory为0,表示对内存没有限制 if (server.maxmemory) { //判断内存,进行内存淘汰 int retval = freeMemoryIfNeeded(); …… } …… }

evict何时执行在Redis每次处理命令的时候,都会检查内存空间,并尝试去执行evict,因为有些情况下不需要执行evict,这个可以从isSafeToPerformEvictions中可以看出端倪。
static int isSafeToPerformEvictions(void) { /* 没有lua脚本执行超时,也没有在做数据超时 */ if (server.lua_timedout || server.loading) return 0; /* 只有master才需要做evict */ if (server.masterhost & & server.repl_slave_ignore_maxmemory) return 0; /* 当客户端暂停时,不需要evict,因为数据是不会变化的 */ if (checkClientPauseTimeoutAndReturnIfPaused()) return 0; return 1; }

执行回收驱逐操作
int performEvictions(void) { if (!isSafeToPerformEvictions()) return EVICT_OK; int keys_freed = 0; size_t mem_reported, mem_tofree; long long mem_freed; /* May be negative */ mstime_t latency, eviction_latency; long long delta; int slaves = listLength(server.slaves); int result = EVICT_FAIL; if (getMaxmemoryState(& mem_reported,NULL,& mem_tofree,NULL) == C_OK) return EVICT_OK; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) return EVICT_FAIL; /* We need to free memory, but policy forbids. */unsigned long eviction_time_limit_us = evictionTimeLimitUs(); mem_freed = 0; latencyStartMonitor(latency); monotime evictionTimer; elapsedStart(& evictionTimer); while (mem_freed < (long long)mem_tofree) { int j, k, i; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; dict *dict; dictEntry *de; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; /* We dont want to make local-db choices when expiring keys, * so to start populate the eviction pool sampling keys from * every DB. * 先从dict中采样key并放到pool中 */ for (i = 0; i < server.dbnum; i++) { db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db-> dict : db-> expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db-> dict, pool); total_keys += keys; } } if (!total_keys) break; /* No keys to evict. *//* 从pool中选择最适合淘汰的key. */ for (k = EVPOOL_SIZE-1; k > = 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); }/* 从淘汰池中移除. */ if (pool[k].key != pool[k].cached) sdsfree(pool[k].key); pool[k].key = NULL; pool[k].idle = 0; /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } }/* volatile-random and allkeys-random 策略 */ else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) { /* 当随机淘汰时,我们用静态变量next_db来存储当前执行到哪个db了*/ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db-> dict : db-> expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } }/* 从dict中移除被选中的key. */ if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); /*我们单独计算db*Delete()释放的内存量。实际上,在AOF和副本传播所需的内存可能大于我们正在释放的内存(删除key) ,如果我们考虑这点的话会很绕。由signalModifiedKey生成的CSC失效消息也是这样。 因为AOF和输出缓冲区内存最终会被释放,所以我们只需要关心key空间使用的内存即可。*/ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); latencyEndMonitor(eviction_latency); latencyAddSampleIfNeeded("eviction-del",eviction_latency); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; signalModifiedKey(NULL,db,keyobj); notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", keyobj, db-> id); decrRefCount(keyobj); keys_freed++; if (keys_freed % 16 == 0) { /*当要释放的内存开始足够大时,我们可能会在这里花费太多时间,不可能足够快地将数据传送到副本,因此我们会在循环中强制传输。*/ if (slaves) flushSlavesOutputBuffers(); /*通常我们的停止条件是释放一个固定的,预先计算的内存量。但是,当我们*在另一个线程中删除对象时, 最好不时*检查是否已经达到目标*内存,因为“mem\\u freed”量只在dbAsyncDelete()调用中*计算, 而线程可以*一直释放内存。*/ if (server.lazyfree_lazy_eviction) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { break; } }/*一段时间后,尽早退出循环-即使尚未达到内存限制*。如果我们突然需要释放大量的内存,不要在这里花太多时间。*/ if (elapsedUs(evictionTimer) > eviction_time_limit_us) { // We still need to free memory - start eviction timer proc if (!isEvictionProcRunning) { isEvictionProcRunning = 1; aeCreateTimeEvent(server.el, 0, evictionTimeProc, NULL, NULL); } break; } } } else { goto cant_free; /* nothing to free... */ } } /* at this point, the memory is OK, or we have reached the time limit */ result = (isEvictionProcRunning) ? EVICT_RUNNING : EVICT_OK; cant_free: if (result == EVICT_FAIL) { /* At this point, we have run out of evictable items.Its possible * that some items are being freed in the lazyfree thread.Perform a * short wait here if such jobs exist, but dont wait long.*/ if (bioPendingJobsOfType(BIO_LAZY_FREE)) { usleep(eviction_time_limit_us); if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { result = EVICT_OK; } } }latencyEndMonitor(latency); latencyAddSampleIfNeeded("eviction-cycle",latency); return result; }

释放资源如果在需要时候
int freeMemoryIfNeeded(void) { //获取redis内存使用 mem_reported = zmalloc_used_memory(); if (mem_reported < = server.maxmemory) return C_OK; mem_used = mem_reported; if (slaves) { listRewind(server.slaves,& li); //减去slaves的output缓冲区 while((ln = listNext(& li))) { …… } } //aof的缓冲区的内存使用 if (server.aof_state != AOF_OFF) { mem_used -= sdslen(server.aof_buf); mem_used -= aofRewriteBufferSize(); } /* Check if we are still over the memory limit. */ if (mem_used < = server.maxmemory) return C_OK; /* Compute how much memory we need to free. */ mem_tofree = mem_used - server.maxmemory; mem_freed = 0; if (server.maxmemory_policy == MAXMEMORY_NO_EVICTION) goto cant_free; /* 禁止驱逐数据 */ //进行数据驱逐 while (mem_freed < mem_tofree) { …… sds bestkey = NULL; if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {//进行ttl或者lru淘汰机制 struct evictionPoolEntry *pool = EvictionPoolLRU; while(bestkey == NULL) { unsigned long total_keys = 0, keys; for (i = 0; i < server.dbnum; i++) { db = server.db+i; dict = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? db-> dict : db-> expires; if ((keys = dictSize(dict)) != 0) { evictionPoolPopulate(i, dict, db-> dict, pool); //pool根据机制构建的evictionPool } }/*在evictionPool中从后往前选择一个还在存在数据库中的键值进行驱逐*/ for (k = EVPOOL_SIZE-1; k > = 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { de = dictFind(server.db[pool[k].dbid].dict, pool[k].key); } else { de = dictFind(server.db[pool[k].dbid].expires, pool[k].key); } …… if (de) { bestkey = dictGetKey(de); break; } else { /* Ghost... Iterate again. */ } } } } else if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || server.maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM) {/* 从db-> dict或者db-> expires随机选择一个键值对进行淘汰*/ for (i = 0; i < server.dbnum; i++) { j = (++next_db) % server.dbnum; db = server.db+j; dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? db-> dict : db-> expires; if (dictSize(dict) != 0) { de = dictGetRandomKey(dict); bestkey = dictGetKey(de); bestdbid = j; break; } } }//驱逐选中的键值对 if (bestkey) { db = server.db+bestdbid; robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,server.lazyfree_lazy_eviction); delta = (long long) zmalloc_used_memory(); if (server.lazyfree_lazy_eviction) dbAsyncDelete(db,keyobj); else dbSyncDelete(db,keyobj); delta -= (long long) zmalloc_used_memory(); mem_freed += delta; server.stat_evictedkeys++; decrRefCount(keyobj); keys_freed++; if (slaves) flushSlavesOutputBuffers(); }} return C_OK; cant_free://进行内存空间的惰性释放 while(bioPendingJobsOfType(BIO_LAZY_FREE)) { if (((mem_reported - zmalloc_used_memory()) + mem_freed) > = mem_tofree) break; usleep(1000); } return C_ERR; }

根据淘汰机制从随机选取的键值对中选取键值对构建evictionPool
  • 1)LRU数据淘汰机制:在数据集中随机选取几个键值对,选择lru最大的一部分键值对构建evictionPool。
LRU的本质是淘汰最久没被访问的数据,有种实现方式是用链表的方式实现,如果数据被访问了就把它移到链表头部,那么链尾一定是最久未访问的数据,但是单链表的查询时间复杂度是O(n),所以一般会用hash表来加快查询数据,比如java中LinkedHashMap就是这么实现的。但Redis并没有采用这种策略,Redis就是单纯记录了每个Key最近一次的访问时间戳,通过时间戳排序的方式来选找出最早的数据,当然如果把所有的数据都排序一遍,未免也太慢了,所以Redis是每次选一批数据,然后从这批数据执行淘汰策略。这样的好处就是性能高,坏处就是不一定是全局最优,只是达到局部最优。
在redisObject中有个24位的lru字段,这24位保存了数据访问的时间戳(秒),当然24位无法保存完整的unix时间戳,不到200天就会有一个轮回,当然这已经足够了。
robj *lookupKey(redisDb *db, robj *key, int flags) { dictEntry *de = dictFind(db-> dict,key-> ptr); if (de) { robj *val = dictGetVal(de); if (!hasActiveChildProcess() & & !(flags & LOOKUP_NOTOUCH)){ if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { updateLFU(val); } else { val-> lru = LRU_CLOCK(); // 这里更新LRU时间戳 } } return val; } else { return NULL; } }

  • 2)LFU数据淘汰机制:在数据集中随机选取几个键值对,选择lfu最小的一部分键值对构建evictionPool。
lru这个字段也会被lfu用到,所以你在上面lookupkey中可以看到在使用lfu策略是也会更新lru。Redis中lfu的出现稍晚一些,是在Redis 4.0才被引入的,所以这里复用了lru字段。 lru的实现思路只有一种,就是记录下key被访问的次数。但实现lru有个问题需要考虑到,虽然LFU是按访问频次来淘汰数据,但在Redis中随时可能有新数据就来,本身老数据可能有更多次的访问,新数据当前被访问次数少,并不意味着未来被访问的次数会少,如果不考虑到这点,新数据可能一就来就会被淘汰掉,这显然是不合理的。
Redis为了解决上述问题,将24位被分成了两部分,高16位的时间戳(分钟级),低8位的计数器。每个新数据计数器初始有一定值,这样才能保证它能走出新手村,然后计数值会随着时间推移衰减,这样可以保证老的但当前不常用的数据才有机会被淘汰掉,我们来看下具体实现代码。
计数器只有8个二进制位,充其量数到255,怎么会够? 当然Redis使用的不是精确计数,而是近似计数。具体实现就是counter概率性增长,counter的值越大增长速度越慢,具体增长逻辑如下:
/* 更新lfu的counter,counter并不是一个准确的数值,而是概率增长,counter的数值越大其增长速度越慢 * 只能反映出某个时间窗口的热度,无法反映出具体访问次数 */ uint8_t LFULogIncr(uint8_t counter) { if (counter == 255) return 255; double r = (double)rand()/RAND_MAX; double baseval = counter - LFU_INIT_VAL; // LFU_INIT_VAL为5 if (baseval < 0) baseval = 0; double p = 1.0/(baseval*server.lfu_log_factor+1); // server.lfu_log_factor可配置,默认是10 if (r < p) counter++; return counter; }

【#yyds干货盘点#Redis源码分析专题从本质分析你写入Redis中的数据为什么不见了()】LFU计数器衰减:如果说counter一直增长,即便增长速度很慢也有一天会增长到最大值255,最终导致无法做数据的筛选,所以要给它加一个衰减策略,思路就是counter随时间增长衰减,具体代码如下:
/* lfu counter衰减逻辑, lfu_decay_time是指多久counter衰减1,比如lfu_decay_time == 10 * 表示每10分钟counter衰减一,但lfu_decay_time为0时counter不衰减 */ unsigned long LFUDecrAndReturn(robj *o) { unsigned long ldt = o-> lru > > 8; unsigned long counter = o-> lru & 255; unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0; if (num_periods) counter = (num_periods > counter) ? 0 : counter - num_periods; return counter; }

  • 3)TTL数据淘汰机制:从设置过期时间的数据集中随机选取几个键值对,选择TTL最大的一部分键值对构建evictionPool。
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { int j, k, count; dictEntry *samples[server.maxmemory_samples]; //从数据集sampledict随机选取键值对 count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples); for (j = 0; j < count; j++) { de = samples[j]; key = dictGetKey(de); if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { if (sampledict != keydict) de = dictFind(keydict, key); o = dictGetVal(de); } if (server.maxmemory_policy & MAXMEMORY_FLAG_LRU) { idle = estimateObjectIdleTime(o); //LRU机制,计算lru值 } else if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { idle = 255-LFUDecrAndReturn(o); //LFU机制,计算lfu值 } else if (server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { idle = ULLONG_MAX - (long)dictGetVal(de); //TTL机制,计算ttl值 } k = 0; //根据idle从小到大将键值对插入到pool(插入排序的机制),但只保留idle最大的EVPOOL_SIZE个 while (k < EVPOOL_SIZE & & pool[k].key & & pool[k].idle < idle) k++; if (k == 0 & & pool[EVPOOL_SIZE-1].key != NULL) { continue; } else if (k < EVPOOL_SIZE & & pool[k].key == NULL) { /* Inserting into empty position. No setup needed before insert. */ } else { if (pool[EVPOOL_SIZE-1].key == NULL) { sds cached = pool[EVPOOL_SIZE-1].cached; memmove(pool+k+1,pool+k,sizeof(pool[0])*(EVPOOL_SIZE-k-1)); pool[k].cached = cached; } else { k--; sds cached = pool[0].cached; /* Save SDS before overwriting. */ if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); memmove(pool,pool+1,sizeof(pool[0])*k); pool[k].cached = cached; } } int klen = sdslen(key); if (klen > EVPOOL_CACHED_SDS_SIZE) { pool[k].key = sdsdup(key); } else { memcpy(pool[k].cached,key,klen+1); sdssetlen(pool[k].cached,klen); pool[k].key = pool[k].cached; } pool[k].idle = idle; pool[k].dbid = dbid; } }

参考学习
  • https://www.cnblogs.com/xindoo/p/14460546.html

    推荐阅读