@RedisHash(value = "userAlert", timeToLive = ExpireTime.REFRESH_TOKEN_EXPIRE_TIME_FOR_REDIS)
public class UserAlert {
@Id
private String id;
private String title;
private String message;
private String createdDate;
@Indexed
private Long userId;
}
예전 프로젝트에서 유저 게시물에 댓글을 달면 알림이 가는 기능을 만들었다.
Entity에 Redis TTL을 넣어서 사용한 적이 있는데
그 만료된 값을 어떤식으로 삭제할지 동작방식이 궁금해서 찾아봤다.
매번 찾는건지... 찾는다면 어떤식으로 찾을지 랜덤하게 찾을지..
만료된 키를 찾는 방법
Passive Expiration (수동 만료)
1. Passive Expiration (수동 만료)
키에 접근 할때 마다 키의 TTL을 확인, TTL 만료시 키를 제거 한다.
- 예시: GET mykey 명령이 실행될 때, mykey의 TTL을 확인하고 만료되었으면 mykey를 삭제
Active Expiration (능동 만료)
2. Active Expiration (능동 만료)
Redis 에서 주기적으로 랜덤하게 DB를 스캔, 만료된 키를 제거. ( 주기적은 사용자가 설정할수있을것같다..)
하지만!! Redis 6.0 부터는 만료된 키를 랜덤하게 찾기 보단 조금더 효율적이고 정교적인 알고리즘을 통해 선택한다고한다. 기수 트리에서 만료시간을 기준으로 정렬되어 쉽게 찾을수있다고 한다!!.
Lazy Expiration (지연 만료)
만료 시간을 설정한 키에 접근할 때만 만료 여부를 확인하고, 만료되었다면 그 시점에서 키를 삭제하는 방식입니다.
- 예시: 클라이언트가 GET mykey 명령을 실행했을 때, mykey가 만료된 상태라면 그 즉시 삭제
Periodic Scans (주기적 스캔)
Redis는 주기적으로 백그라운드 스레드에서 만료된 키를 찾아 삭제합니다. 이는 시스템 자원을 효율적으로 사용하면서도 만료된 키를 빠르게 제거하는 방법입니다.
- Redis는 각 데이터베이스에서 일정 시간마다 일정 수의 키를 랜덤하게 선택하여 만료 여부를 확인하고 삭제
Eviction Policies
Redis는 메모리 사용량이 설정된 한계에 도달하면, eviction 정책을 사용하여 키들을 제거한다.
이러한 정책은 만료된 키뿐만 아니라 다른 기준에 따라 키를 제거하여 메모리를 확보
요약하자면 용량이 꽉차면 찾아서 제거
접근할때 만료되었으면 제거
기수트리에서 찾아서 만료된거 제거
주기적으로 랜덤하게 서치후 제거
정도가 있겠다.
https://github.com/redis/redis/blob/unstable/src/expire.c
레디스에 등록되어있는 Expire C 코드이다.
GPT와 함께 한번 분석해보겠다.
ActiveExpireCycleTryExpire 함수
/* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
* If the key is found to be expired, it is removed from the database and
* 1 is returned. Otherwise no operation is performed and 0 is returned.
*
* When a key is expired, server.stat_expiredkeys is incremented.
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
위에는 영어로 함수에 대한 설명이 나와있고 해석하면
해시 테이블에 있는 키를 만료시키려고 시도하는 함수라고 한다.
키가 만료된 것으로 확인되면 데이터베이스에서 제거하고 1을 반환
그렇지 않으면 작업이 수행되지 않고 0이 반환
키가 만료되면 server.stat_expired 키가 증가
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
// dictEntry의 값(t)을 가져옵니다. 이 값은 키의 만료 시간입니다.
long long t = dictGetSignedIntegerVal(de);
// 현재 시간(now)이 키의 만료 시간(t)보다 크면 (즉, 키가 만료되었으면)
if (now > t) {
// 실행 유닛에 진입합니다. (트랜잭션과 관련된 함수일 수 있습니다.)
enterExecutionUnit(1, 0);
// dictEntry에서 키를 가져옵니다.
sds key = dictGetKey(de);
// 키를 문자열 객체로 만듭니다.
robj *keyobj = createStringObject(key, sdslen(key));
// 만료된 키를 삭제하고 관련된 작업을 전파합니다.
deleteExpiredKeyAndPropagate(db, keyobj);
// 키 객체의 참조 카운트를 감소시킵니다. (메모리 관리를 위해)
decrRefCount(keyobj);
// 실행 유닛을 종료합니다.
exitExecutionUnit();
// 키가 만료되어 삭제되었음을 나타내는 1을 반환합니다.
return 1;
} else {
// 키가 아직 만료되지 않았음을 나타내는 0을 반환합니다.
return 0;
}
}
가장 위에 나와있는 만료시간을 체크하여 제거하는 코드
역시 레디스 코드가 매우 간단히 되어있다고 하던데 나같은 사람도 이해하기 쉽게 짜여져 있다.
그 다음줄엔 주석이 적혀있다.
아래는 코드에 적혀있는 주석에 대한 해석이다.
- 적응형 알고리즘:
- 적응형 알고리즘을 사용하여 만료된 키가 적을 때는 CPU 사이클을 적게 사용하고, 만료된 키가 많을 때는 메모리 낭비를 방지하기 위해 더 공격적으로 동작합니다.
- 데이터베이스 순환 검사:
- 각 만료 주기마다 여러 데이터베이스를 테스트합니다. 다음 호출은 다음 데이터베이스에서 시작합니다. 각 반복마다 CRON_DBS_PER_CALL 데이터베이스보다 많지 않게 테스트합니다.
- 주기 유형에 따른 작업량 조절:
- "type" 인수에 따라 더 많거나 적은 작업을 수행할 수 있습니다. "빠른 주기(fast cycle)" 또는 "느린 주기(slow cycle)"를 실행할 수 있습니다.
- 느린 주기는 주로 만료된 키를 수집하는 방법으로, "server.hz" 주파수(일반적으로 10 헤르츠)로 실행됩니다.
- 느린 주기는 시간 초과로 종료될 수 있습니다. 따라서 각 이벤트 루프 주기마다 beforeSleep() 함수에서 빠른 주기를 수행하여 더 자주, 적은 작업을 수행합니다.
- 주기 유형과 중지 조건:
- ACTIVE_EXPIRE_CYCLE_FAST: "빠른" 만료 주기를 실행하며, 이 주기는 ACTIVE_EXPIRE_CYCLE_FAST_DURATION 마이크로초를 넘지 않으며, 동일한 시간이 지나기 전에는 반복되지 않습니다. 최신 느린 주기가 시간 제한 조건으로 종료되지 않은 경우에는 이 주기를 실행하지 않습니다.
- ACTIVE_EXPIRE_CYCLE_SLOW: 일반 만료 주기를 실행하며, 시간 제한은 REDIS_HZ 주기의 일정 비율(ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC)에 따라 정해집니다. 빠른 주기에서는 데이터베이스의 만료된 키 수가 일정 비율 이하로 추정되면 검사를 중단하여 많은 작업을 하지 않고도 메모리를 적게 확보할 수 있습니다.
- 만료 "노력" 설정:
- 구성된 만료 "노력"은 기본 매개변수를 수정하여 빠르고 느린 만료 주기에서 더 많은 작업을 수행하게 합니다.
CPU 사용량과 메모리 효율성을 동시에 고려하는 적응형 알고리즘에 대해 설명하고 있다.
구조체 정의
typedef struct {
redisDb *db;
long long now;
unsigned long sampled; /* 체크한 키의 수 */
unsigned long expired; /* 만료된 키의 수 */
long long ttl_sum; /* 아직 만료되지 않은 키의 TTL 합계 */
int ttl_samples; /* 아직 만료되지 않은 키의 수 */
} expireScanData;
구조체는 이런식으로 구성되어있다.
만료 스캔 콜백 함수
void expireScanCallback(void *privdata, const dictEntry *const_de) {
dictEntry *de = (dictEntry *)const_de;
expireScanData *data = privdata;
long long ttl = dictGetSignedIntegerVal(de) - data->now;
if (activeExpireCycleTryExpire(data->db, de, data->now)) {
data->expired++;
/* DEL 명령을 전파 */
postExecutionUnitOperations();
}
if (ttl > 0) {
/* 아직 만료되지 않은 키의 평균 TTL을 계산하기 위해 */
data->ttl_sum += ttl;
data->ttl_samples++;
}
data->sampled++;
}
이건 만료된 키를 확인하고 제거하는 함수이다
ActiveExpireCycle 함수
void activeExpireCycle(int type) {
/* Adjust the running parameters according to the configured expire
* effort. The default effort is 1, and the maximum configurable effort
* is 10. */
unsigned long
effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */
config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP +
ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort,
config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION +
ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort,
config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC +
2*effort,
config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE-
effort;
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Next DB to test. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int dbs_performed = 0;
long long start = ustime(), timelimit, elapsed;
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
* Typically, at the end of the pause we will properly expire the key OR we
* will have failed over and the new primary will send us the expire. */
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exit
* for time limit, unless the percentage of estimated stale keys is
* too high. Also never repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit &&
server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
/* We usually should test CRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU
* time per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = config_cycle_slow_time_perc*1000000/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea
* about the number of keys that are already logically expired, but still
* existing inside the database. */
long total_sampled = 0;
long total_expired = 0;
/* Try to smoke-out bugs (server.also_propagate should be empty here) */
serverAssert(server.also_propagate.numops == 0);
/* Stop iteration when one of the following conditions is met:
*
* 1) We have checked a sufficient number of databases with expiration time.
* 2) The time limit has been exceeded.
* 3) All databases have been traversed. */
for (j = 0; dbs_performed < dbs_per_call && timelimit_exit == 0 && j < server.dbnum; j++) {
/* Scan callback data including expired and checked count per iteration. */
expireScanData data;
data.ttl_sum = 0;
data.ttl_samples = 0;
redisDb *db = server.db+(current_db % server.dbnum);
data.db = db;
int db_done = 0; /* The scan of the current DB is done? */
int update_avg_ttl_times = 0, repeat = 0;
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
if (kvstoreSize(db->expires))
dbs_performed++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
* we scanned. The percentage, stored in config_cycle_acceptable_stale
* is not fixed, but depends on the Redis configured "expire effort". */
do {
unsigned long num;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = kvstoreSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
data.now = mstime();
/* The main collection cycle. Scan through keys among keys
* with an expire set, checking for expired ones. */
data.sampled = 0;
data.expired = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
/* Here we access the low level representation of the hash table
* for speed concerns: this makes this code coupled with dict.c,
* but it hardly changed in ten years.
*
* Note that certain places of the hash table may be empty,
* so we want also a stop condition about the number of
* buckets that we scanned. However scanning for free buckets
* is very fast: we are in the cache line scanning a sequential
* array of NULL pointers, so we can scan a lot more buckets
* than keys in the same time. */
long max_buckets = num*20;
long checked_buckets = 0;
int origin_ttl_samples = data.ttl_samples;
while (data.sampled < num && checked_buckets < max_buckets) {
db->expires_cursor = kvstoreScan(db->expires, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
if (db->expires_cursor == 0) {
db_done = 1;
break;
}
checked_buckets++;
}
total_expired += data.expired;
total_sampled += data.sampled;
/* If find keys with ttl not yet expired, we need to update the average TTL stats once. */
if (data.ttl_samples - origin_ttl_samples > 0) update_avg_ttl_times++;
/* We don't repeat the cycle for the current database if the db is done
* for scanning or an acceptable number of stale keys (logically expired
* but yet not reclaimed). */
repeat = db_done ? 0 : (data.sampled == 0 || (data.expired * 100 / data.sampled) > config_cycle_acceptable_stale);
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of microseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0 || !repeat) { /* Update the average TTL stats every 16 iterations or about to exit. */
/* Update the average TTL stats for this database,
* because this may reach the time limit. */
if (data.ttl_samples) {
long long avg_ttl = data.ttl_sum / data.ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) {
db->avg_ttl = avg_ttl;
} else {
/* The origin code is as follow.
* for (int i = 0; i < update_avg_ttl_times; i++) {
* db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
* }
* We can convert the loop into a sum of a geometric progression.
* db->avg_ttl = db->avg_ttl * pow(0.98, update_avg_ttl_times) +
* avg_ttl / 50 * (pow(0.98, update_avg_ttl_times - 1) + ... + 1)
* = db->avg_ttl * pow(0.98, update_avg_ttl_times) +
* avg_ttl * (1 - pow(0.98, update_avg_ttl_times))
* = avg_ttl + (db->avg_ttl - avg_ttl) * pow(0.98, update_avg_ttl_times)
* Notice that update_avg_ttl_times is between 1 and 16, we use a constant table
* to accelerate the calculation of pow(0.98, update_avg_ttl_times).*/
db->avg_ttl = avg_ttl + (db->avg_ttl - avg_ttl) * avg_ttl_factor[update_avg_ttl_times - 1] ;
}
update_avg_ttl_times = 0;
data.ttl_sum = 0;
data.ttl_samples = 0;
}
if ((iteration & 0xf) == 0) { /* check time limit every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
}
} while (repeat);
}
elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}
static unsigned int current_db = 0;
static int timelimit_exit = 0;
static long long last_fast_cycle = 0;
- current_db: 다음에 검사할 데이터베이스를 추적
- timelimit_exit: 이전 호출에서 시간 제한이 초과되었는지 여부를 나타낸다.
- last_fast_cycle: 마지막 빠른 주기가 실행된 시간을 기록
빠른 주기 조건 검사
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
if (!timelimit_exit && server.stat_expired_stale_perc < config_cycle_acceptable_stale)
return;
if (start < last_fast_cycle + (long long)config_cycle_fast_duration*2)
return;
last_fast_cycle = start;
}
- 빠른 주기는 이전 주기가 시간 제한으로 종료되지 않았고, 예상 만료 키 비율이 낮으면 실행되지 않는다.
- 또한, 마지막 빠른 주기 후 일정 시간이 지나지 않으면 실행되지 않는다.
for (j = 0; dbs_performed < dbs_per_call && timelimit_exit == 0 && j < server.dbnum; j++) {
expireScanData data;
data.ttl_sum = 0;
data.ttl_samples = 0;
redisDb *db = server.db + (current_db % server.dbnum);
data.db = db;
current_db++;
if (kvstoreSize(db->expires))
dbs_performed++;
do {
unsigned long num;
iteration++;
if ((num = kvstoreSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
data.now = mstime();
data.sampled = 0;
data.expired = 0;
if (num > config_keys_per_loop)
num = config_keys_per_loop;
long max_buckets = num * 20;
long checked_buckets = 0;
int origin_ttl_samples = data.ttl_samples;
while (data.sampled < num && checked_buckets < max_buckets) {
db->expires_cursor = kvstoreScan(db->expires, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
if (db->expires_cursor == 0) {
db_done = 1;
break;
}
checked_buckets++;
}
total_expired += data.expired;
total_sampled += data.sampled;
repeat = db_done ? 0 : (data.sampled == 0 || (data.expired * 100 / data.sampled) > config_cycle_acceptable_stale);
if ((iteration & 0xf) == 0 || !repeat) {
if (data.ttl_samples) {
long long avg_ttl = data.ttl_sum / data.ttl_samples;
if (db->avg_ttl == 0) {
db->avg_ttl = avg_ttl;
} else {
db->avg_ttl = avg_ttl + (db->avg_ttl - avg_ttl) * avg_ttl_factor[update_avg_ttl_times - 1];
}
update_avg_ttl_times = 0;
data.ttl_sum = 0;
data.ttl_samples = 0;
}
if ((iteration & 0xf) == 0) {
elapsed = ustime() - start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
}
} while (repeat);
}
위에 코드가 DB를 검사하여 만료키를 찾아 제거하는 함수
여기서 kvstoreScan 이 스캔하는 함수인것 같다
설명은 이는 기수 트리를 통해 키를 효율적으로 탐색합니다. 라고 GPT 에서 말했다.
unsigned long kvstoreScan(dict *d, unsigned long v, int count, void (*callback)(void *, const dictEntry *), int (*safeToPerform)(dict *), void *privdata) {
unsigned long empty_visits = 0;
dictEntry *de;
unsigned long idx = v & 0x3FFFFFFF;
unsigned long step = 0;
if (safeToPerform && !safeToPerform(d)) return 0;
while (step < count && idx < DICT_HT_INITIAL_SIZE) {
if ((de = dictGetEntry(d, idx)) != NULL) {
callback(privdata, de);
step++;
empty_visits = 0;
} else {
empty_visits++;
if (empty_visits > 16) break; /* Avoid too many empty visits. */
}
idx++;
}
if (idx >= DICT_HT_INITIAL_SIZE) {
idx = 0;
}
return idx | (empty_visits << 30);
}
이게 kvstorescan 함수 라고 한다.
콜백함수를 통해 제거해야할 idx를 리턴 시킨다. 빈항목을 여러번 방문하면 브레이크를 먹인다.
expireSlaveKeys 함수
void expireSlaveKeys(void) {
if (slaveKeysWithExpire == NULL ||
dictSize(slaveKeysWithExpire) == 0) return;
int cycles = 0, noexpire = 0;
mstime_t start = mstime();
while(1) {
dictEntry *de = dictGetRandomKey(slaveKeysWithExpire);
sds keyname = dictGetKey(de);
uint64_t dbids = dictGetUnsignedIntegerVal(de);
uint64_t new_dbids = 0;
/* Check the key against every database corresponding to the
* bits set in the value bitmap. */
int dbid = 0;
while(dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = server.db+dbid;
dictEntry *expire = dbFindExpires(db, keyname);
int expired = 0;
if (expire &&
activeExpireCycleTryExpire(server.db+dbid,expire,start))
{
expired = 1;
/* Propagate the DEL (writable replicas do not propagate anything to other replicas,
* but they might propagate to AOF) and trigger module hooks. */
postExecutionUnitOperations();
}
/* If the key was not expired in this DB, we need to set the
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (expire && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}
}
dbid++;
dbids >>= 1;
}
/* Set the new bitmap as value of the key, in the dictionary
* of keys with an expire set directly in the writable slave. Otherwise
* if the bitmap is zero, we no longer need to keep track of it. */
if (new_dbids)
dictSetUnsignedIntegerVal(de,new_dbids);
else
dictDelete(slaveKeysWithExpire,keyname);
/* Stop conditions: found 3 keys we can't expire in a row or
* time limit was reached. */
cycles++;
if (noexpire > 3) break;
if ((cycles % 64) == 0 && mstime()-start > 1) break;
if (dictSize(slaveKeysWithExpire) == 0) break;
}
}
이 함수는 Redis 복제본 서버에서 만료된 키를 처리하는 기능을 수행한다고 한다.
그외에도 함수가 있는데 어떤식으로 찾는지가 궁금했기 때문에.. 기수트리를 활용하여 스캔 한다는것을 알았다.
다음에도 궁금한게 있으면 찾아봐야겠다.