目录
redis-7.0.8
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
uint64_t flags; /* Client flags: CLIENT_* macros. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
int argv_len; /* Size of argv array (may be more than argc) */
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
struct redisCommand *realcmd; /* The original command that was executed by the client,
Used to update error stats in case the c->cmd was modified
during the command invocation (like on GEOADD for example). */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
long duration; /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */
int slot; /* The slot the client is executing against. Set to -1 if no slot is being used */
dictEntry *cur_script; /* Cached pointer to the dictEntry of the script being executed. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char *slave_addr; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
int slave_req; /* Slave requirements: SLAVE_REQ_* */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *postponed_list_node; /* list node within the postponed list */
listNode *pending_read_list_node; /* list node in clients pending read list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/
/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In updateClientMemoryUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which category the client was, in order to remove it
* before adding it the new value. */
size_t last_memory_usage;
int last_memory_type;
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
/* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
char *buf;
} client;
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
typedef struct dict {
dictEntry **table;
dictType *type;
unsigned long size;
unsigned long sizemask;
unsigned long used;
void *privdata;
} dict;
typedef struct dictEntry {
void *key;
void *val;
struct dictEntry *next;
} dictEntry;
typedef struct dictType {
unsigned int (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
typedef char *sds;
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
int found = 0;
int setkey_flags = 0;
if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) {
return;
}
if (flags & OBJ_SET_GET) {
if (getGenericCommand(c) == C_ERR) return;
}
found = (lookupKeyWrite(c->db,key) != NULL);
if ((flags & OBJ_SET_NX && found) ||
(flags & OBJ_SET_XX && !found))
{
if (!(flags & OBJ_SET_GET)) {
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
}
return;
}
setkey_flags |= (flags & OBJ_KEEPTTL) ? SETKEY_KEEPTTL : 0;
setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST;
setKey(c,c->db,key,val,setkey_flags);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) {
setExpire(c,c->db,key,milliseconds);
/* Propagate as SET Key Value PXAT millisecond-timestamp if there is
* EX/PX/EXAT/PXAT flag. */
robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);
decrRefCount(milliseconds_obj);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
}
if (!(flags & OBJ_SET_GET)) {
addReply(c, ok_reply ? ok_reply : shared.ok);
}
/* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */
if ((flags & OBJ_SET_GET) && !expire) {
int argc = 0;
int j;
robj **argv = zmalloc((c->argc-1)*sizeof(robj*));
for (j=0; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
/* Skip GET which may be repeated multiple times. */
if (j >= 3 &&
(a[0] == 'g' || a[0] == 'G') &&
(a[1] == 'e' || a[1] == 'E') &&
(a[2] == 't' || a[2] == 'T') && a[3] == '\0')
continue;
argv[argc++] = c->argv[j];
incrRefCount(c->argv[j]);
}
replaceClientCommandVector(c, argc, argv);
}
}
int getGenericCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
return C_OK;
if (checkType(c,o,OBJ_STRING)) {
return C_ERR;
}
addReplyBulk(c,o);
return C_OK;
}
void getdelCommand(client *c) {
if (getGenericCommand(c) == C_ERR) return;
int deleted = server.lazyfree_lazy_user_del ? dbAsyncDelete(c->db, c->argv[1]) :
dbSyncDelete(c->db, c->argv[1]);
if (deleted) {
/* Propagate as DEL/UNLINK command */
robj *aux = server.lazyfree_lazy_user_del ? shared.unlink : shared.del;
rewriteClientCommandVector(c,2,aux,c->argv[1]);
signalModifiedKey(c, c->db, c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);
server.dirty++;
}
}
robj *createHashObject(void) {
unsigned char *zl = lpNew(0);
robj *o = createObject(OBJ_HASH, zl);
o->encoding = OBJ_ENCODING_LISTPACK;
return o;
}
void hsetCommand(client *c) {
int i, created = 0;
robj *o;
if ((c->argc % 2) == 1) {
addReplyErrorArity(c);
return;
}
if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
hashTypeTryConversion(o,c->argv,2,c->argc-1);
for (i = 2; i < c->argc; i += 2)
created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);
/* HMSET (deprecated) and HSET return value is different. */
char *cmdname = c->argv[0]->ptr;
if (cmdname[1] == 's' || cmdname[1] == 'S') {
/* HSET */
addReplyLongLong(c, created);
} else {
/* HMSET */
addReply(c, shared.ok);
}
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
server.dirty += (c->argc - 2)/2;
}
void hgetCommand(client *c) {
robj *o;
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL ||
checkType(c,o,OBJ_HASH)) return;
addHashFieldToReply(c, o, c->argv[2]->ptr);
}
void hdelCommand(client *c) {
robj *o;
int j, deleted = 0, keyremoved = 0;
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,o,OBJ_HASH)) return;
for (j = 2; j < c->argc; j++) {
if (hashTypeDelete(o,c->argv[j]->ptr)) {
deleted++;
if (hashTypeLength(o) == 0) {
dbDelete(c->db,c->argv[1]);
keyremoved = 1;
break;
}
}
}
if (deleted) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
robj *createQuicklistObject(void) {
quicklist *l = quicklistCreate();
robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST;
return o;
}
void pushGenericCommand(client *c, int where, int xx) {
int j;
robj *lobj = lookupKeyWrite(c->db, c->argv[1]);
if (checkType(c,lobj,OBJ_LIST)) return;
if (!lobj) {
if (xx) {
addReply(c, shared.czero);
return;
}
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_listpack_size,
server.list_compress_depth);
dbAdd(c->db,c->argv[1],lobj);
}
for (j = 2; j < c->argc; j++) {
listTypePush(lobj,c->argv[j],where);
server.dirty++;
}
addReplyLongLong(c, listTypeLength(lobj));
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
void popGenericCommand(client *c, int where) {
int hascount = (c->argc == 3);
long count = 0;
robj *value;
if (c->argc > 3) {
addReplyErrorArity(c);
return;
} else if (hascount) {
/* Parse the optional count argument. */
if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)
return;
}
robj *o = lookupKeyWriteOrReply(c, c->argv[1], hascount ? shared.nullarray[c->resp]: shared.null[c->resp]);
if (o == NULL || checkType(c, o, OBJ_LIST))
return;
if (hascount && !count) {
/* Fast exit path. */
addReply(c,shared.emptyarray);
return;
}
if (!count) {
/* Pop a single element. This is POP's original behavior that replies
* with a bulk string. */
value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyBulk(c,value);
decrRefCount(value);
listElementsRemoved(c,c->argv[1],where,o,1,NULL);
} else {
/* Pop a range of elements. An addition to the original POP command,
* which replies with a multi-bulk. */
long llen = listTypeLength(o);
long rangelen = (count > llen) ? llen : count;
long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;
long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;
int reverse = (where == LIST_HEAD) ? 0 : 1;
addListRangeReply(c,o,rangestart,rangeend,reverse);
listTypeDelRange(o,rangestart,rangelen);
listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);
}
}
void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value,
int where) {
/* Create the list if the key does not exist */
if (!dstobj) {
dstobj = createQuicklistObject();
quicklistSetOptions(dstobj->ptr, server.list_max_listpack_size,
server.list_compress_depth);
dbAdd(c->db,dstkey,dstobj);
}
signalModifiedKey(c,c->db,dstkey);
listTypePush(dstobj,value,where);
notifyKeyspaceEvent(NOTIFY_LIST,
where == LIST_HEAD ? "lpush" : "rpush",
dstkey,
c->db->id);
/* Always send the pushed value to the client. */
addReplyBulk(c,value);
}
robj *createSetObject(void) {
dict *d = dictCreate(&setDictType);
robj *o = createObject(OBJ_SET,d);
o->encoding = OBJ_ENCODING_HT;
return o;
}
void saddCommand(client *c) {
robj *set;
int j, added = 0;
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return;
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
}
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
}
if (added) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
}
server.dirty += added;
addReplyLongLong(c,added);
}
void sremCommand(client *c) {
robj *set;
int j, deleted = 0, keyremoved = 0;
if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
for (j = 2; j < c->argc; j++) {
if (setTypeRemove(set,c->argv[j]->ptr)) {
deleted++;
if (setTypeSize(set) == 0) {
dbDelete(c->db,c->argv[1]);
keyremoved = 1;
break;
}
}
}
if (deleted) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);
if (keyremoved)
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],
c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
void sismemberCommand(client *c) {
robj *set;
if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
checkType(c,set,OBJ_SET)) return;
if (setTypeIsMember(set,c->argv[2]->ptr))
addReply(c,shared.cone);
else
addReply(c,shared.czero);
}
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
robj *createZsetListpackObject(void) {
unsigned char *lp = lpNew(0);
robj *o = createObject(OBJ_ZSET,lp);
o->encoding = OBJ_ENCODING_LISTPACK;
return o;
}
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements, ch = 0;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_IN_INCR) != 0;
int nx = (flags & ZADD_IN_NX) != 0;
int xx = (flags & ZADD_IN_XX) != 0;
int gt = (flags & ZADD_IN_GT) != 0;
int lt = (flags & ZADD_IN_LT) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if ((gt && nx) || (lt && nx) || (gt && lt)) {
addReplyError(c,
"GT, LT, and/or NX options at the same time are not compatible");
return;
}
/* Note that XX is compatible with either GT or LT */
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_listpack_entries == 0 ||
server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetListpackObject();
}
dbAdd(c->db,key,zobj);
}
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = 0;
ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_OUT_ADDED) added++;
if (retflags & ZADD_OUT_UPDATED) updated++;
if (!(retflags & ZADD_OUT_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements, ch = 0;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change. */
int added = 0; /* Number of new elements added. */
int updated = 0; /* Number of elements with updated score. */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. */
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair. */
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
else break;
scoreidx++;
}
/* Turn options into simple to check vars. */
int incr = (flags & ZADD_IN_INCR) != 0;
int nx = (flags & ZADD_IN_NX) != 0;
int xx = (flags & ZADD_IN_XX) != 0;
int gt = (flags & ZADD_IN_GT) != 0;
int lt = (flags & ZADD_IN_LT) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs. */
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
elements /= 2; /* Now this holds the number of score-element pairs. */
/* Check for incompatible options. */
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if ((gt && nx) || (lt && nx) || (gt && lt)) {
addReplyError(c,
"GT, LT, and/or NX options at the same time are not compatible");
return;
}
/* Note that XX is compatible with either GT or LT */
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all. */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_listpack_entries == 0 ||
server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetListpackObject();
}
dbAdd(c->db,key,zobj);
}
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = 0;
ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_OUT_ADDED) added++;
if (retflags & ZADD_OUT_UPDATED) updated++;
if (!(retflags & ZADD_OUT_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
uint32_t iscompr:1; /* Node is compressed. */
uint32_t size:29; /* Number of children, or compressed string len. */
/* Data layout is as follows:
*
* If node is not compressed we have 'size' bytes, one for each children
* character, and 'size' raxNode pointers, point to each child node.
* Note how the character is not stored in the children but in the
* edge of the parents:
*
* [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
*
* if node is compressed (iscompr bit is 1) the node has 1 children.
* In that case the 'size' bytes of the string stored immediately at
* the start of the data section, represent a sequence of successive
* nodes linked one after the other, for which only the last one in
* the sequence is actually represented as a node, and pointed to by
* the current compressed node.
*
* [header iscompr=1][xyz][z-ptr](value-ptr?)
*
* Both compressed and not compressed nodes can represent a key
* with associated data in the radix tree at any level (not just terminal
* nodes).
*
* If the node has an associated key (iskey=1) and is not NULL
* (isnull=0), then after the raxNode pointers pointing to the
* children, an additional value pointer is present (as you can see
* in the representation above as "value-ptr" field).
*/
unsigned char data[];
} raxNode;
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
robj *createStreamObject(void) {
stream *s = streamNew();
robj *o = createObject(OBJ_STREAM,s);
o->encoding = OBJ_ENCODING_STREAM;
return o;
}
/* Create a new stream data structure. */
stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
s->first_id.ms = 0;
s->first_id.seq = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
s->max_deleted_entry_id.seq = 0;
s->max_deleted_entry_id.ms = 0;
s->entries_added = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
void xaddCommand(client *c) {
/* Parse options. */
streamAddTrimArgs parsed_args;
int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
if (idpos < 0)
return; /* streamParseAddOrTrimArgsOrReply already replied. */
int field_pos = idpos+1; /* The ID is always one argument before the first field */
/* Check arity. */
if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
addReplyErrorArity(c);
return;
}
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
if (parsed_args.id_given && parsed_args.seq_given &&
parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
{
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
/* Lookup the stream at key. */
robj *o;
stream *s;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
s = o->ptr;
/* Return ASAP if the stream has reached the last possible ID */
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
addReplyError(c,"The stream has exhausted the last possible ID, "
"unable to add more items");
return;
}
/* Append using the low level function and return the ID. */
streamID id;
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
{
if (errno == EDOM)
addReplyError(c,"The ID specified in XADD is equal or smaller than "
"the target stream top item");
else
addReplyError(c,"Elements are too large to be stored");
return;
}
sds replyid = createStreamIDString(&id);
addReplyBulkCBuffer(c, replyid, sdslen(replyid));
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
/* Trim if needed. */
if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
if (streamTrim(s, &parsed_args)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
if (parsed_args.approx_trim) {
/* In case our trimming was limited (by LIMIT or by ~) we must
* re-write the relevant trim argument to make sure there will be
* no inconsistencies in AOF loading or in the replica.
* It's enough to check only args->approx because there is no
* way LIMIT is given without the ~ option. */
streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
}
}
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
if (!parsed_args.id_given || !parsed_args.seq_given) {
robj *idarg = createObject(OBJ_STRING, replyid);
rewriteClientCommandArgument(c, idpos, idarg);
decrRefCount(idarg);
} else {
sdsfree(replyid);
}
/* We need to signal to blocked clients that there is new data on this
* stream. */
signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);
}
void xreadCommand(client *c) {
long long timeout = -1; /* -1 means, no BLOCK argument given. */
long long count = 0;
int streams_count = 0;
int streams_arg = 0;
int noack = 0; /* True if NOACK option was specified. */
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
streamCG **groups = NULL;
int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */
robj *groupname = NULL;
robj *consumername = NULL;
/* Parse arguments. */
for (int i = 1; i < c->argc; i++) {
int moreargs = c->argc-i-1;
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
if (c->flags & CLIENT_SCRIPT) {
/*
* Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
* on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.
* There is no sense to use BLOCK option within Lua. */
addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
return;
}
i++;
if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,
UNIT_MILLISECONDS) != C_OK) return;
} else if (!strcasecmp(o,"COUNT") && moreargs) {
i++;
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
return;
if (count < 0) count = 0;
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
streams_arg = i+1;
streams_count = (c->argc-streams_arg);
if ((streams_count % 2) != 0) {
addReplyError(c,"Unbalanced XREAD list of streams: "
"for each stream key an ID or '$' must be "
"specified.");
return;
}
streams_count /= 2; /* We have two arguments for each stream. */
break;
} else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {
if (!xreadgroup) {
addReplyError(c,"The GROUP option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
groupname = c->argv[i+1];
consumername = c->argv[i+2];
i += 2;
} else if (!strcasecmp(o,"NOACK")) {
if (!xreadgroup) {
addReplyError(c,"The NOACK option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
noack = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* STREAMS option is mandatory. */
if (streams_arg == 0) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
/* If the user specified XREADGROUP then it must also
* provide the GROUP option. */
if (xreadgroup && groupname == NULL) {
addReplyError(c,"Missing GROUP option for XREADGROUP");
return;
}
/* Parse the IDs and resolve the group name. */
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*streams_count);
if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);
for (int i = streams_arg + streams_count; i < c->argc; i++) {
/* Specifying "$" as last-known-id means that the client wants to be
* served with just the messages that will arrive into the stream
* starting from now. */
int id_idx = i - streams_arg - streams_count;
robj *key = c->argv[i-streams_count];
robj *o = lookupKeyRead(c->db,key);
if (checkType(c,o,OBJ_STREAM)) goto cleanup;
streamCG *group = NULL;
/* If a group was specified, than we need to be sure that the
* key and group actually exist. */
if (groupname) {
if (o == NULL ||
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
{
addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "
"group '%s' in XREADGROUP with GROUP "
"option",
(char*)key->ptr,(char*)groupname->ptr);
goto cleanup;
}
groups[id_idx] = group;
}
if (strcmp(c->argv[i]->ptr,"$") == 0) {
if (xreadgroup) {
addReplyError(c,"The $ ID is meaningless in the context of "
"XREADGROUP: you want to read the history of "
"this consumer by specifying a proper ID, or "
"use the > ID to get new messages. The $ ID would "
"just return an empty result set.");
goto cleanup;
}
if (o) {
stream *s = o->ptr;
ids[id_idx] = s->last_id;
} else {
ids[id_idx].ms = 0;
ids[id_idx].seq = 0;
}
continue;
} else if (strcmp(c->argv[i]->ptr,">") == 0) {
if (!xreadgroup) {
addReplyError(c,"The > ID can be specified only when calling "
"XREADGROUP using the GROUP <group> "
"<consumer> option.");
goto cleanup;
}
/* We use just the maximum ID to signal this is a ">" ID, anyway
* the code handling the blocking clients will have to update the
* ID later in order to match the changing consumer group last ID. */
ids[id_idx].ms = UINT64_MAX;
ids[id_idx].seq = UINT64_MAX;
continue;
}
if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
goto cleanup;
}
/* Try to serve the client synchronously. */
size_t arraylen = 0;
void *arraylen_ptr = NULL;
for (int i = 0; i < streams_count; i++) {
robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);
if (o == NULL) continue;
stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */
int serve_synchronously = 0;
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
/* Check if there are the conditions to serve the client
* synchronously. */
if (groups) {
/* If the consumer is blocked on a group, we always serve it
* synchronously (serving its local history) if the ID specified
* was not the special ">" ID. */
if (gt->ms != UINT64_MAX ||
gt->seq != UINT64_MAX)
{
serve_synchronously = 1;
serve_history = 1;
} else if (s->length) {
/* We also want to serve a consumer in a consumer group
* synchronously in case the group top item delivered is smaller
* than what the stream has inside. */
streamID maxid, *last = &groups[i]->last_id;
streamLastValidID(s, &maxid);
if (streamCompareID(&maxid, last) > 0) {
serve_synchronously = 1;
*gt = *last;
}
}
} else if (s->length) {
/* For consumers without a group, we serve synchronously if we can
* actually provide at least one item from the stream. */
streamID maxid;
streamLastValidID(s, &maxid);
if (streamCompareID(&maxid, gt) > 0) {
serve_synchronously = 1;
}
}
if (serve_synchronously) {
arraylen++;
if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);
/* streamReplyWithRange() handles the 'start' ID as inclusive,
* so start from the next ID, since we want only messages with
* IDs greater than start. */
streamID start = *gt;
streamIncrID(&start);
/* Emit the two elements sub-array consisting of the name
* of the stream and the data we extracted from it. */
if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
if (groups) {
consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT);
if (consumer == NULL) {
consumer = streamCreateConsumer(groups[i],consumername->ptr,
c->argv[streams_arg+i],
c->db->id,SCC_DEFAULT);
if (noack)
streamPropagateConsumerCreation(c,spi.keyname,
spi.groupname,
consumer->name);
}
}
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, flags, &spi);
if (groups) server.dirty++;
}
}
/* We replied synchronously! Set the top array len and return to caller. */
if (arraylen) {
if (c->resp == 2)
setDeferredArrayLen(c,arraylen_ptr,arraylen);
else
setDeferredMapLen(c,arraylen_ptr,arraylen);
goto cleanup;
}
/* Block if needed. */
if (timeout != -1) {
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
goto cleanup;
}
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
-1, timeout, NULL, NULL, ids);
/* If no COUNT is given and we block, set a relatively small count:
* in case the ID provided is too low, we do not want the server to
* block just to serve this client a huge stream of messages. */
c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;
/* If this is a XREADGROUP + GROUP we need to remember for which
* group and consumer name we are blocking, so later when one of the
* keys receive more data, we can call streamReplyWithRange() passing
* the right arguments. */
if (groupname) {
incrRefCount(groupname);
incrRefCount(consumername);
c->bpop.xread_group = groupname;
c->bpop.xread_consumer = consumername;
c->bpop.xread_group_noack = noack;
} else {
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
}
goto cleanup;
}
/* No BLOCK option, nor any stream we can serve. Reply as with a
* timeout happened. */
addReplyNullArray(c);
/* Continue to cleanup... */
cleanup: /* Cleanup. */
/* The command is propagated (in the READGROUP form) as a side effect
* of calling lower level APIs. So stop any implicit propagation. */
preventCommandPropagation(c);
if (ids != static_ids) zfree(ids);
zfree(groups);
}
void xackCommand(client *c) {
streamCG *group = NULL;
robj *o = lookupKeyRead(c->db,c->argv[1]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
/* No key or group? Nothing to ack. */
if (o == NULL || group == NULL) {
addReply(c,shared.czero);
return;
}
/* Start parsing the IDs, so that we abort ASAP if there is a syntax
* error: the return value of this command cannot be an error in case
* the client successfully acknowledged some messages, so it should be
* executed in a "all or nothing" fashion. */
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids;
int id_count = c->argc-3;
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
}
int acknowledged = 0;
for (int j = 3; j < c->argc; j++) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&ids[j-3]);
/* Lookup the ID in the group PEL: it will have a reference to the
* NACK structure that will have a reference to the consumer, so that
* we are able to remove the entry from both PELs. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
if (nack != raxNotFound) {
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamFreeNACK(nack);
acknowledged++;
server.dirty++;
}
}
addReplyLongLong(c,acknowledged);
cleanup:
if (ids != static_ids) zfree(ids);
}
/* Create a new quicklist.
* Free with quicklistRelease(). */
quicklist *quicklistCreate(void) {
struct quicklist *quicklist;
quicklist = zmalloc(sizeof(*quicklist));
quicklist->head = quicklist->tail = NULL;
quicklist->len = 0;
quicklist->count = 0;
quicklist->compress = 0;
quicklist->fill = -2;
quicklist->bookmark_count = 0;
return quicklist;
}
typedef struct {
/* When string is used, it is provided with the length (slen). */
unsigned char *sval;
uint32_t slen;
/* When integer is used, 'sval' is NULL, and lval holds the value. */
long long lval;
} listpackEntry;
unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,
uint32_t size, unsigned char *p, int where, unsigned char **newp)
{
unsigned char intenc[LP_MAX_INT_ENCODING_LEN];
unsigned char backlen[LP_MAX_BACKLEN_SIZE];
uint64_t enclen; /* The length of the encoded element. */
int delete = (elestr == NULL && eleint == NULL);
/* when deletion, it is conceptually replacing the element with a
* zero-length element. So whatever we get passed as 'where', set
* it to LP_REPLACE. */
if (delete) where = LP_REPLACE;
/* If we need to insert after the current element, we just jump to the
* next element (that could be the EOF one) and handle the case of
* inserting before. So the function will actually deal with just two
* cases: LP_BEFORE and LP_REPLACE. */
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
ASSERT_INTEGRITY(lp, p);
}
/* Store the offset of the element 'p', so that we can obtain its
* address again after a reallocation. */
unsigned long poff = p-lp;
int enctype;
if (elestr) {
/* Calling lpEncodeGetType() results into the encoded version of the
* element to be stored into 'intenc' in case it is representable as
* an integer: in that case, the function returns LP_ENCODING_INT.
* Otherwise if LP_ENCODING_STR is returned, we'll have to call
* lpEncodeString() to actually write the encoded string on place later.
*
* Whatever the returned encoding is, 'enclen' is populated with the
* length of the encoded element. */
enctype = lpEncodeGetType(elestr,size,intenc,&enclen);
if (enctype == LP_ENCODING_INT) eleint = intenc;
} else if (eleint) {
enctype = LP_ENCODING_INT;
enclen = size; /* 'size' is the length of the encoded integer element. */
} else {
enctype = -1;
enclen = 0;
}
/* We need to also encode the backward-parsable length of the element
* and append it to the end: this allows to traverse the listpack from
* the end to the start. */
unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0;
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSizeUnsafe(p);
replaced_len += lpEncodeBacklen(NULL,replaced_len);
ASSERT_INTEGRITY_LEN(lp, p, replaced_len);
}
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;
/* We now need to reallocate in order to make space or shrink the
* allocation (in case 'when' value is LP_REPLACE and the new element is
* smaller). However we do that before memmoving the memory to
* make room for the new element if the final allocation will get
* larger, or we do it after if the final allocation will get smaller. */
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Realloc before: we need more room. */
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Setup the listpack relocating the elements to make the exact room
* we need to store the new one. */
if (where == LP_BEFORE) {
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { /* LP_REPLACE. */
long lendiff = (enclen+backlen_size)-replaced_len;
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}
/* Realloc after: we need to free space. */
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}
/* Store the entry. */
if (newp) {
*newp = dst;
/* In case of deletion, set 'newp' to NULL if the next element is
* the EOF element. */
if (delete && dst[0] == LP_EOF) *newp = NULL;
}
if (!delete) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,eleint,enclen);
} else {
lpEncodeString(dst,elestr,size);
}
dst += enclen;
memcpy(dst,backlen,backlen_size);
dst += backlen_size;
}
/* Update header. */
if (where != LP_REPLACE || delete) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (!delete)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);
#if 0
/* This code path is normally disabled: what it does is to force listpack
* to return *always* a new pointer after performing some modification to
* the listpack, even if the previous allocation was enough. This is useful
* in order to spot bugs in code using listpacks: by doing so we can find
* if the caller forgets to set the new pointer where the listpack reference
* is stored, after an update. */
unsigned char *oldlp = lp;
lp = lp_malloc(new_listpack_bytes);
memcpy(lp,oldlp,new_listpack_bytes);
if (newp) {
unsigned long offset = (*newp)-oldlp;
*newp = lp + offset;
}
/* Make sure the old allocation contains garbage. */
memset(oldlp,'A',new_listpack_bytes);
lp_free(oldlp);
#endif
return lp;
}
unsigned char *lpNew(size_t capacity) {
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(fd);
UNUSED(mask);
UNUSED(privdata);
char buf[128];
while (read(fd, buf, sizeof(buf)) == sizeof(buf));
/* Handle event loop events if pipe was written from event loop API */
eventLoopHandleOneShotEvents();
}
static void eventLoopHandleOneShotEvents() {
pthread_mutex_lock(&moduleEventLoopMutex);
if (moduleEventLoopOneShots) {
while (listLength(moduleEventLoopOneShots)) {
listNode *ln = listFirst(moduleEventLoopOneShots);
EventLoopOneShot *oneshot = ln->value;
listDelNode(moduleEventLoopOneShots, ln);
/* Unlock mutex before the callback. Another oneshot event can be
* added in the callback, it will need to lock the mutex. */
pthread_mutex_unlock(&moduleEventLoopMutex);
oneshot->func(oneshot->user_data);
zfree(oneshot);
/* Lock again for the next iteration */
pthread_mutex_lock(&moduleEventLoopMutex);
}
}
pthread_mutex_unlock(&moduleEventLoopMutex);
}
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {
sds chan;
robj *chanobj, *eventobj;
int len = -1;
char buf[24];
/* If any modules are interested in events, notify the module system now.
* This bypasses the notifications configuration, but the module engine
* will only call event subscribers if the event type matches the types
* they are interested in. */
moduleNotifyKeyspaceEvent(type, event, key, dbid);
/* If notifications for this class of events are off, return ASAP. */
if (!(server.notify_keyspace_events & type)) return;
eventobj = createStringObject(event,strlen(event));
/* __keyspace@<db>__:<key> <event> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {
chan = sdsnewlen("__keyspace@",11);
len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, key->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, eventobj, 0);
decrRefCount(chanobj);
}
/* __keyevent@<db>__:<event> <key> notifications. */
if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {
chan = sdsnewlen("__keyevent@",11);
if (len == -1) len = ll2string(buf,sizeof(buf),dbid);
chan = sdscatlen(chan, buf, len);
chan = sdscatlen(chan, "__:", 3);
chan = sdscatsds(chan, eventobj->ptr);
chanobj = createObject(OBJ_STRING, chan);
pubsubPublishMessage(chanobj, key, 0);
decrRefCount(chanobj);
}
decrRefCount(eventobj);
}
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {
/* Don't do anything if there aren't any subscribers */
if (listLength(moduleKeyspaceSubscribers) == 0) return;
listIter li;
listNode *ln;
listRewind(moduleKeyspaceSubscribers,&li);
/* Remove irrelevant flags from the type mask */
type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);
while((ln = listNext(&li))) {
RedisModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
* and avoid subscribers triggering themselves */
if ((sub->event_mask & type) && sub->active == 0) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
/* mark the handler as active to avoid reentrant loops.
* If the subscriber performs an action triggering itself,
* it will not be notified about it. */
sub->active = 1;
sub->notify_callback(&ctx, type, event, key);
sub->active = 0;
moduleFreeContext(&ctx);
}
}
}
/* Node, List, and Iterator are the only data structures used currently. */
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;
typedef struct listIter {
listNode *next;
int direction;
} listIter;
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} list;
list *listCreate(void)
{
struct list *list;
if ((list = zmalloc(sizeof(*list))) == NULL)
return NULL;
list->head = list->tail = NULL;
list->len = 0;
list->dup = NULL;
list->free = NULL;
list->match = NULL;
return list;
}
/*
* Publish a message to all the subscribers.
*/
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
listNode *ln;
listIter li;
/* Send to clients listening for that channel */
de = dictFind(*type.serverPubSubChannels, channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
if (type.shard) {
/* Shard pubsub ignores patterns. */
return receivers;
}
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
return receivers;
}
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(*type.serverPubSubChannels, channel);
if (de == NULL) {
clients = listCreate();
dictAdd(*type.serverPubSubChannels, channel, clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel,type);
return retval;
}
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;
/* Check if this is a failover request to a replica with the same replid and
* become a master if so. */
if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&
!strcasecmp(c->argv[3]->ptr,"failover"))
{
serverLog(LL_WARNING, "Failover request received for replid %s.",
(unsigned char *)c->argv[1]->ptr);
if (!server.masterhost) {
addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");
return;
}
if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
replicationUnsetMaster();
sds client = catClientInfoString(sdsempty(),c);
serverLog(LL_NOTICE,
"MASTER MODE enabled (failover request from '%s')",client);
sdsfree(client);
} else {
addReplyError(c, "PSYNC FAILOVER replid must match my replid.");
return;
}
}
/* Don't let replicas sync with us while we're failing over */
if (server.failover_state != NO_FAILOVER) {
addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over");
return;
}
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master");
return;
}
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
/* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered
* RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing
* use of a socket is handled, if needed, in `startBgsaveForReplication`. */
if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) {
addReplyError(c,"Filtered replica requires EOF capability");
return;
}
serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens replicationSetupSlaveForFullResync will replied
* with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
long long psync_offset;
if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) {
serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset",
replicationGetSlaveName(c));
return;
}
if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not able to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c);
/* Create the replication backlog if needed. */
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
server.replid, server.replid2);
}
/* CASE 1: BGSAVE is in progress, with disk target. */
if (server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
/* If the client needs a buffer of commands, we can't use
* a replica without replication buffer. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
(!(slave->flags & CLIENT_REPL_RDBONLY) ||
(c->flags & CLIENT_REPL_RDBONLY)))
break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE
* and its exact requirements. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&
c->slave_req == slave->slave_req) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
if (!(c->flags & CLIENT_REPL_RDBONLY))
copyReplicaOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.child_type == CHILD_TYPE_RDB &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is in progress. */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
server.repl_diskless_sync_delay)
{
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* We don't have a BGSAVE in progress, let's start one. Diskless
* or disk-based mode is determined by replica's capacity. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa, c->slave_req);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}
int replicaPutOnline(client *slave) {
if (slave->flags & CLIENT_REPL_RDBONLY) {
slave->replstate = SLAVE_STATE_RDB_TRANSMITTED;
/* The client asked for RDB only so we should close it ASAP */
serverLog(LL_NOTICE,
"RDB transfer completed, rdb only replica (%s) should be disconnected asap",
replicationGetSlaveName(slave));
return 0;
}
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
refreshGoodSlavesCount();
/* Fire the replica change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
NULL);
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(slave));
return 1;
}
void moduleFireServerEvent(uint64_t eid, int subid, void *data) {
/* Fast path to return ASAP if there is nothing to do, avoiding to
* setup the iterator and so forth: we want this call to be extremely
* cheap if there are no registered modules. */
if (listLength(RedisModule_EventListeners) == 0) return;
listIter li;
listNode *ln;
listRewind(RedisModule_EventListeners,&li);
while((ln = listNext(&li))) {
RedisModuleEventListener *el = ln->value;
if (el->event.id == eid) {
RedisModuleCtx ctx;
if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
/* In the case of client changes, we're pushing the real client
* so the event handler can mutate it if needed. For example,
* to change its authentication state in a way that does not
* depend on specific commands executed later.
*/
moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_NONE);
ctx.client = (client *) data;
} else {
moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_TEMP_CLIENT);
}
void *moduledata = NULL;
RedisModuleClientInfoV1 civ1;
RedisModuleReplicationInfoV1 riv1;
RedisModuleModuleChangeV1 mcv1;
/* Event specific context and data pointer setup. */
if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {
serverAssert(modulePopulateClientInfoStructure(&civ1,data, el->event.dataver) == REDISMODULE_OK);
moduledata = &civ1;
} else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {
serverAssert(modulePopulateReplicationInfoStructure(&riv1,el->event.dataver) == REDISMODULE_OK);
moduledata = &riv1;
} else if (eid == REDISMODULE_EVENT_FLUSHDB) {
moduledata = data;
RedisModuleFlushInfoV1 *fi = data;
if (fi->dbnum != -1)
selectDb(ctx.client, fi->dbnum);
} else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {
RedisModule *m = data;
if (m == el->module) {
moduleFreeContext(&ctx);
continue;
}
mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;
mcv1.module_name = m->name;
mcv1.module_version = m->ver;
moduledata = &mcv1;
} else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CRON_LOOP) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_SWAPDB) {
moduledata = data;
} else if (eid == REDISMODULE_EVENT_CONFIG) {
moduledata = data;
}
el->module->in_hook++;
el->callback(&ctx,el->event,subid,moduledata);
el->module->in_hook--;
moduleFreeContext(&ctx);
}
}
}
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
* value is simplified to: zero on error, non zero on complete success. */
size_t (*read)(struct _rio *, void *buf, size_t len);
size_t (*write)(struct _rio *, const void *buf, size_t len);
off_t (*tell)(struct _rio *);
int (*flush)(struct _rio *);
/* The update_cksum method if not NULL is used to compute the checksum of
* all the data that was read or written so far. The method should be
* designed so that can be called with the current checksum, and the buf
* and len fields pointing to the new block of data to add to the checksum
* computation. */
void (*update_cksum)(struct _rio *, const void *buf, size_t len);
/* The current checksum and flags (see RIO_FLAG_*) */
uint64_t cksum, flags;
/* number of bytes read or written */
size_t processed_bytes;
/* maximum single read or write chunk size */
size_t max_processing_chunk;
/* Backend-specific vars. */
union {
/* In-memory buffer target. */
struct {
sds ptr;
off_t pos;
} buffer;
/* Stdio file pointer target. */
struct {
FILE *fp;
off_t buffered; /* Bytes written since last fsync. */
off_t autosync; /* fsync after 'autosync' bytes written. */
} file;
/* Connection object (used to read from socket) */
struct {
connection *conn; /* Connection */
off_t pos; /* pos in buf that was returned */
sds buf; /* buffered data */
size_t read_limit; /* don't allow to buffer/read more than that */
size_t read_so_far; /* amount of data read from the rio (not buffered) */
} conn;
/* FD target (used to write to pipe). */
struct {
int fd; /* File descriptor. */
off_t pos;
sds buf;
} fd;
} io;
};
typedef struct _rio rio;
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {
char tmpfile[256];
char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
FILE *fp = NULL;
rio rdb;
int error = 0;
char *err_op; /* For a detailed log */
snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
char *str_err = strerror(errno);
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Failed opening the temp RDB file %s (in server root dir %s) "
"for saving: %s",
tmpfile,
cwdp ? cwdp : "unknown",
str_err);
return C_ERR;
}
rioInitWithFile(&rdb,fp);
startSaving(RDBFLAGS_NONE);
if (server.rdb_save_incremental_fsync)
rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {
errno = error;
err_op = "rdbSaveRio";
goto werr;
}
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) { err_op = "fflush"; goto werr; }
if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }
if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }
fp = NULL;
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
char *str_err = strerror(errno);
char *cwdp = getcwd(cwd,MAXPATHLEN);
serverLog(LL_WARNING,
"Error moving temp DB file %s on the final "
"destination %s (in server root dir %s): %s",
tmpfile,
filename,
cwdp ? cwdp : "unknown",
str_err);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
if (fsyncFileDir(filename) == -1) { err_op = "fsyncFileDir"; goto werr; }
serverLog(LL_NOTICE,"DB saved on disk");
server.dirty = 0;
server.lastsave = time(NULL);
server.lastbgsave_status = C_OK;
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error saving DB on disk(%s): %s", err_op, strerror(errno));
if (fp) fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
void rioInitWithFile(rio *r, FILE *fp) {
*r = rioFileIO;
r->io.file.fp = fp;
r->io.file.buffered = 0;
r->io.file.autosync = 0;
}
/* Called when the user switches from "appendonly yes" to "appendonly no"
* at runtime using the CONFIG command. */
void stopAppendOnly(void) {
serverAssert(server.aof_state != AOF_OFF);
flushAppendOnlyFile(1);
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));
} else {
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
}
close(server.aof_fd);
server.aof_fd = -1;
server.aof_selected_db = -1;
server.aof_state = AOF_OFF;
server.aof_rewrite_scheduled = 0;
server.aof_last_incr_size = 0;
killAppendOnlyChild();
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time. */
if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.aof_fsync_offset != server.aof_current_size &&
server.unixtime > server.aof_last_fsync &&
!(sync_in_progress = aofFsyncInProgress())) {
goto try_fsync;
} else {
return;
}
}
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) {
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall through, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
}
server.aof_last_write_errno = errno;
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the reply
* for the client is already in the output buffers (both writes and
* reads), and the changes to the db can't be rolled back. Since we
* have a contract with the user that on acknowledged or observed
* writes are is synced on disk, we must exit. */
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
server.aof_last_incr_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
/* Let's try to get this data on the disk. To guarantee data safe when
* the AOF fsync policy is 'always', we should exit if failed to fsync
* AOF (see comment next to the exit(1) after write error above). */
if (redis_fsync(server.aof_fd) == -1) {
serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "
"AOF fsync policy is 'always': %s. Exiting...", strerror(errno));
exit(1);
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd);
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
monotonicInit(); /* just in case the calling app didn't initialize */
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
void InitServerLast() {
bioInit();
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
int sentinelSendPing(sentinelRedisInstance *ri) {
int retval = redisAsyncCommand(ri->link->cc,
sentinelPingReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"PING"));
if (retval == C_OK) {
ri->link->pending_commands++;
ri->link->last_ping_time = mstime();
/* We update the active ping time only if we received the pong for
* the previous ping, otherwise we are technically waiting since the
* first ping that did not receive a reply. */
if (ri->link->act_ping_time == 0)
ri->link->act_ping_time = ri->link->last_ping_time;
return 1;
} else {
return 0;
}
}
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
mstime_t max_master_down_time = 0;
if (master->flags & SRI_S_DOWN)
max_master_down_time += mstime() - master->s_down_since_time;
max_master_down_time += master->down_after_period * 10;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
mstime_t info_validity_time;
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (mstime() - slave->link->last_avail_time > sentinel_ping_period*5) continue;
if (slave->slave_priority == 0) continue;
/* If the master is in SDOWN state we get INFO for slaves every second.
* Otherwise we get it with the usual period so we need to account for
* a larger delay. */
if (master->flags & SRI_S_DOWN)
info_validity_time = sentinel_ping_period*5;
else
info_validity_time = sentinel_info_period*3;
if (mstime() - slave->info_refresh > info_validity_time) continue;
if (slave->master_link_down_time > max_master_down_time) continue;
instance[instances++] = slave;
}
dictReleaseIterator(di);
if (instances) {
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
zfree(instance);
return selected;
}
int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
char portstr[32];
const char *host;
int retval;
/* If host is NULL we send SLAVEOF NO ONE that will turn the instance
* into a master. */
if (!addr) {
host = "NO";
memcpy(portstr,"ONE",4);
} else {
host = announceSentinelAddr(addr);
ll2string(portstr,sizeof(portstr),addr->port);
}
/* In order to send SLAVEOF in a safe way, we send a transaction performing
* the following tasks:
* 1) Reconfigure the instance according to the specified host/port params.
* 2) Rewrite the configuration.
* 3) Disconnect all clients (but this one sending the command) in order
* to trigger the ask-master-on-reconnection protocol for connected
* clients.
*
* Note that we don't check the replies returned by commands, since we
* will observe instead the effects in the next INFO output. */
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
/* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,
* however sending it to an instance not understanding this command is not
* an issue because CLIENT is variadic command, so Redis will not
* recognized as a syntax error, and the transaction will not fail (but
* only the unsupported command will fail). */
for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
return C_OK;
}
typedef struct clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* CLUSTER_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */
uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */
int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */
int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */
struct clusterNode **slaves; /* pointers to slave nodes */
struct clusterNode *slaveof; /* pointer to the master node. Note that it
may be NULL even if the node is a slave
if we don't have the master node in our
tables. */
unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
sds hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
sds sndbuf; /* Packet send buffer */
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
typedef struct clusterState {
clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* CLUSTER_OK, CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */
dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
int failover_auth_sent; /* True if we already asked for votes. */
int failover_auth_rank; /* This slave rank for current auth request. */
uint64_t failover_auth_epoch; /* Epoch of the current election. */
int cant_failover_reason; /* Why a slave is currently not able to
failover. See the CANT_FAILOVER_* macros. */
/* Manual failover state in common. */
mstime_t mf_end; /* Manual failover time limit (ms unixtime).
It is zero if there is no MF in progress. */
/* Manual failover state of master. */
clusterNode *mf_slave; /* Slave performing the manual failover. */
/* Manual failover state of slave. */
long long mf_master_offset; /* Master offset the slave needs to start MF
or -1 if still not received. */
int mf_can_start; /* If non-zero signal that the manual failover
can start requesting masters vote. */
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
} clusterState;
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* base port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint16_t pport; /* plaintext-port, when base port is TLS */
uint16_t notused1;
} clusterMsgDataGossip;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[30]; /* 30 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
/* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
dictEntry *de;
int update_state = 0;
int orphaned_masters; /* How many masters there are without ok slaves. */
int max_slaves; /* Max number of ok slaves for a single master. */
int this_slaves; /* Number of ok slaves for our master (if we are slave). */
mstime_t min_pong = 0, now = mstime();
clusterNode *min_pong_node = NULL;
static unsigned long long iteration = 0;
mstime_t handshake_timeout;
iteration++; /* Number of times this function was called so far. */
clusterUpdateMyselfHostname();
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
* the value of 1 second. */
handshake_timeout = server.cluster_node_timeout;
if (handshake_timeout < 1000) handshake_timeout = 1000;
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
server.stat_cluster_links_memory = 0;
/* Run through some of the operations we want to do on each cluster node. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* The sequence goes:
* 1. We try to shrink link buffers if possible.
* 2. We free the links whose buffers are still oversized after possible shrinking.
* 3. We update the latest memory usage of cluster links.
* 4. We immediately attempt reconnecting after freeing links.
*/
clusterNodeCronResizeBuffers(node);
clusterNodeCronFreeLinkOnBufferLimitReached(node);
clusterNodeCronUpdateClusterLinksMemUsage(node);
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
}
dictReleaseIterator(di);
/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
/* Iterate nodes to check if we need to flag something as failing.
* This loop is also responsible to:
* 1) Check if there are orphaned masters (masters without non failing
* slaves).
* 2) Count the max number of non failing slaves for a single master.
* 3) Count the number of slaves for our master, if we are a slave. */
orphaned_masters = 0;
max_slaves = 0;
this_slaves = 0;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
continue;
/* Orphaned master check, useful only if the current instance
* is a slave that may migrate to another master. */
if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
int okslaves = clusterCountNonFailingSlaves(node);
/* A master is orphaned if it is serving a non-zero number of
* slots, have no working slaves, but used to have at least one
* slave, or failed over a master that used to have slaves. */
if (okslaves == 0 && node->numslots > 0 &&
node->flags & CLUSTER_NODE_MIGRATE_TO)
{
orphaned_masters++;
}
if (okslaves > max_slaves) max_slaves = okslaves;
if (myself->slaveof == node)
this_slaves = okslaves;
}
/* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
mstime_t ping_delay = now - node->ping_sent;
mstime_t data_delay = now - node->data_received;
if (node->link && /* is connected */
now - node->link->ctime >
server.cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
/* and we are waiting for the pong more than timeout/2 */
ping_delay > server.cluster_node_timeout/2 &&
/* and in such interval we are not seeing any traffic at all. */
data_delay > server.cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
freeClusterLink(node->link);
}
/* If we have currently no active ping in this instance, and the
* received PONG is older than half the cluster timeout, send
* a new ping now, to ensure all the nodes are pinged without
* a too big delay. */
if (node->link &&
node->ping_sent == 0 &&
(now - node->pong_received) > server.cluster_node_timeout/2)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* If we are a master and one of the slaves requested a manual
* failover, ping it continuously. */
if (server.cluster->mf_end &&
nodeIsMaster(myself) &&
server.cluster->mf_slave == node &&
node->link)
{
clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
continue;
}
/* Check only if we have an active ping for this instance. */
if (node->ping_sent == 0) continue;
/* Check if this node looks unreachable.
* Note that if we already received the PONG, then node->ping_sent
* is zero, so can't reach this code at all, so we don't risk of
* checking for a PONG delay if we didn't sent the PING.
*
* We also consider every incoming data as proof of liveness, since
* our cluster bus link is also used for data: under heavy data
* load pong delays are possible. */
mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
data_delay;
if (node_delay > server.cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",
node->name);
node->flags |= CLUSTER_NODE_PFAIL;
update_state = 1;
}
}
}
dictReleaseIterator(di);
/* If we are a slave node but the replication is still turned off,
* enable it if we know the address of our master and it appears to
* be up. */
if (nodeIsSlave(myself) &&
server.masterhost == NULL &&
myself->slaveof &&
nodeHasAddr(myself->slaveof))
{
replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);
}
/* Abort a manual failover if the timeout is reached. */
manualFailoverCheckTimeout();
if (nodeIsSlave(myself)) {
clusterHandleManualFailover();
if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
clusterHandleSlaveFailover();
/* If there are orphaned slaves, and we are a slave among the masters
* with the max number of non-failing slaves, consider migrating to
* the orphaned masters. Note that it does not make sense to try
* a migration if there is no master with at least *two* working
* slaves. */
if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
server.cluster_allow_replica_migration)
clusterHandleSlaveMigration(max_slaves);
}
if (update_state || server.cluster->state == CLUSTER_FAIL)
clusterUpdateState();
}
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
existing_keys = 0;
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = CLUSTER_REDIR_NONE;
/* Modules can turn off Redis Cluster redirection: this is useful
* when writing a module that implements a completely different
* distributed system. */
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand) {
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
} else {
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
}
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
struct redisCommand *mcmd;
robj **margv;
int margc, numkeys, j;
keyReference *keyindex;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
getKeysResult result = GETKEYS_RESULT_INIT;
numkeys = getKeysFromCommand(mcmd,margv,margc,&result);
keyindex = result.keys;
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j].pos];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL) {
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
getKeysFreeResult(&result);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
/* Flag this request as one with multiple different
* keys/channels. */
multiple_keys = 1;
}
}
}
/* Migrating / Importing slot? Count keys we don't have.
* If it is pubsubshard command, it isn't required to check
* the channel being present or not in the node during the
* slot migration, the channel will be served from the source
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
if ((migrating_slot || importing_slot) && !is_pubsubshard)
{
if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
else existing_keys++;
}
}
getKeysFreeResult(&result);
}
/* No key at all in command? then we can serve the request
* without redirections or errors in all the cases. */
if (n == NULL) return myself;
uint64_t cmd_flags = getCommandFlags(c);
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK) {
if (is_pubsubshard) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
}
} else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
} else if (cmd_flags & CMD_WRITE) {
/* The cluster is configured to allow read only commands */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
} else {
/* Fall through and allow the command to be executed:
* this happens when server.cluster_allow_reads_when_down is
* true and the command is not a write command */
}
}
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection or TRYAGAIN. */
if (migrating_slot && missing_keys) {
/* If we have keys but we don't have all keys, we return TRYAGAIN */
if (existing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
}
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about a hash slot our master
* is serving, we can reply without redirection. */
int is_write_command = (cmd_flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
/* Base case: just return the right node. However if this node is not
* myself, set error_code to MOVED since we need to issue a redirection. */
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
Redis doc
https://redis.io/docs/