Redis

 

目录

Redis

1. version

2. Struct

2.1. Client

2.2. Redisdb

2.3. Dict

2.4. dictEntry

2.5. dictType

2.6. redisObject

2.7. sds

3. Data

3.1. String

3.1.1.  set

3.1.2. get

3.1.3. del

3.2. Hash

3.2.1. create

3.2.2. set

3.2.3. get

3.2.4. del

3.3. List

3.3.1. create

3.3.2. Set

3.3.3. Get

3.3.4. Del

3.4. Set

3.4.1. create

3.4.2. Add

3.4.3. Rem

3.4.4. Member

3.5. zset

3.5.1. Create

3.5.2. Add

3.5.3. Get

3.6. Stream

3.6.1. stream

3.6.2. Create

3.6.3. Add

3.6.4. XREADGROUP

3.6.5. ack

4. Algorithm

4.1. list

4.1.1. quicklist

4.2. hash

4.2.1. listpack

4.2.2. skip

4.3. zset

4.3.1. zskiplist

4.3.2. Zset

5. Pipeline

6. keyspace notifications

7. Pub/sub

8.1. list

8.2. pub

8.3. sub

8. db

8.1. replication

8.1.1. sync

8.1.2. replication

8.2. Rdb

8.2.1. rio

8.2.2. Savedb

8.2.3. file

8.3. Aof

8.3.1. Append

8.3.2. file

9. thread

9.1. Event

9.2. Io thread

10. HA

10.1. Sentinel

10.1.1. Ping

10.1.2. Select

10.1.3. Salveof

10.2. Cluster

10.2.1. Cluster node

10.2.2. Cluster link

10.2.3. Cluster state

10.2.4. Gossip

10.2.5. Msg

10.2.6. Slot

10.2.7. Corn

10.2.8. Query node&move

11. References

 

1.  version

redis-7.0.8

2.  Struct

2.1.   Client

typedef struct client {

    uint64_t id;            /* Client incremental unique ID. */

    uint64_t flags;         /* Client flags: CLIENT_* macros. */

    connection *conn;

    int resp;               /* RESP protocol version. Can be 2 or 3. */

    redisDb *db;            /* Pointer to currently SELECTed DB. */

    robj *name;             /* As set by CLIENT SETNAME. */

    sds querybuf;           /* Buffer we use to accumulate client queries. */

    size_t qb_pos;          /* The position we have read in querybuf. */

    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */

    int argc;               /* Num of arguments of current command. */

    robj **argv;            /* Arguments of current command. */

    int argv_len;           /* Size of argv array (may be more than argc) */

    int original_argc;      /* Num of arguments of original command if arguments were rewritten. */

    robj **original_argv;   /* Arguments of original command if arguments were rewritten. */

    size_t argv_len_sum;    /* Sum of lengths of objects in argv list. */

    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */

    struct redisCommand *realcmd; /* The original command that was executed by the client,

                                     Used to update error stats in case the c->cmd was modified

                                     during the command invocation (like on GEOADD for example). */

    user *user;             /* User associated with this connection. If the

                               user is set to NULL the connection can do

                               anything (admin). */

    int reqtype;            /* Request protocol type: PROTO_REQ_* */

    int multibulklen;       /* Number of multi bulk arguments left to read. */

    long bulklen;           /* Length of bulk argument in multi bulk request. */

    list *reply;            /* List of reply objects to send to the client. */

    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */

    list *deferred_reply_errors;    /* Used for module thread safe contexts. */

    size_t sentlen;         /* Amount of bytes already sent in the current

                               buffer or object being sent. */

    time_t ctime;           /* Client creation time. */

    long duration;          /* Current command duration. Used for measuring latency of blocking/non-blocking cmds */

    int slot;               /* The slot the client is executing against. Set to -1 if no slot is being used */

    dictEntry *cur_script;  /* Cached pointer to the dictEntry of the script being executed. */

    time_t lastinteraction; /* Time of the last interaction, used for timeout */

    time_t obuf_soft_limit_reached_time;

    int authenticated;      /* Needed when the default user requires auth. */

    int replstate;          /* Replication state if this is a slave. */

    int repl_start_cmd_stream_on_ack; /* Install slave write handler on first ACK. */

    int repldbfd;           /* Replication DB file descriptor. */

    off_t repldboff;        /* Replication DB file offset. */

    off_t repldbsize;       /* Replication DB file size. */

    sds replpreamble;       /* Replication DB preamble. */

    long long read_reploff; /* Read replication offset if this is a master. */

    long long reploff;      /* Applied replication offset if this is a master. */

    long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */

    long long repl_ack_off; /* Replication ack offset, if this is a slave. */

    long long repl_ack_time;/* Replication ack time, if this is a slave. */

    long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica  */

    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves

                                       copying this slave output buffer

                                       should use. */

    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */

    int slave_listening_port; /* As configured with: REPLCONF listening-port */

    char *slave_addr;       /* Optionally given by REPLCONF ip-address */

    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */

    int slave_req;          /* Slave requirements: SLAVE_REQ_* */

    multiState mstate;      /* MULTI/EXEC state */

    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */

    blockingState bpop;     /* blocking state */

    long long woff;         /* Last write global replication offset. */

    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */

    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */

    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */

    dict *pubsubshard_channels;  /* shard level channels a client is interested in (SSUBSCRIBE) */

    sds peerid;             /* Cached peer ID. */

    sds sockname;           /* Cached connection target address. */

    listNode *client_list_node; /* list node in client list */

    listNode *postponed_list_node; /* list node within the postponed list */

    listNode *pending_read_list_node; /* list node in clients pending read list */

    RedisModuleUserChangedFunc auth_callback; /* Module callback to execute

                                               * when the authenticated user

                                               * changes. */

    void *auth_callback_privdata; /* Private data that is passed when the auth

                                   * changed callback is executed. Opaque for

                                   * Redis Core. */

    void *auth_module;      /* The module that owns the callback, which is used

                             * to disconnect the client if the module is

                             * unloaded for cleanup. Opaque for Redis Core.*/

 

    /* If this client is in tracking mode and this field is non zero,

     * invalidation messages for keys fetched by this client will be send to

     * the specified client ID. */

    uint64_t client_tracking_redirection;

    rax *client_tracking_prefixes; /* A dictionary of prefixes we are already

                                      subscribed to in BCAST mode, in the

                                      context of client side caching. */

    /* In updateClientMemoryUsage() we track the memory usage of

     * each client and add it to the sum of all the clients of a given type,

     * however we need to remember what was the old contribution of each

     * client, and in which category the client was, in order to remove it

     * before adding it the new value. */

    size_t last_memory_usage;

    int last_memory_type;

 

    listNode *mem_usage_bucket_node;

    clientMemUsageBucket *mem_usage_bucket;

 

    listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,

                                  * see the definition of replBufBlock. */

    size_t ref_block_pos;        /* Access position of referenced buffer block,

                                  * i.e. the next offset to send. */

 

    /* Response buffer */

    size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */

    mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */

    int bufpos;

    size_t buf_usable_size; /* Usable size of buffer. */

    char *buf;

} client;

 

2.2.   Redisdb

typedef struct redisDb {

    dict *dict;                 /* The keyspace for this DB */

    dict *expires;              /* Timeout of keys with a timeout set */

    dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP)*/

    dict *ready_keys;           /* Blocked keys that received a PUSH */

    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */

    int id;                     /* Database ID */

    long long avg_ttl;          /* Average TTL, just for stats */

    unsigned long expires_cursor; /* Cursor of the active expire cycle. */

    list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. */

    clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */

} redisDb;

 

2.3.   Dict

typedef struct dict {

    dictEntry **table;

    dictType *type;

    unsigned long size;

    unsigned long sizemask;

    unsigned long used;

    void *privdata;

} dict;

 

2.4.   dictEntry

typedef struct dictEntry {

    void *key;

    void *val;

    struct dictEntry *next;

} dictEntry;

2.5.   dictType

typedef struct dictType {

    unsigned int (*hashFunction)(const void *key);

    void *(*keyDup)(void *privdata, const void *key);

    void *(*valDup)(void *privdata, const void *obj);

    int (*keyCompare)(void *privdata, const void *key1, const void *key2);

    void (*keyDestructor)(void *privdata, void *key);

    void (*valDestructor)(void *privdata, void *obj);

} dictType;

 

2.6.   redisObject

typedef struct redisObject {

    unsigned type:4;

    unsigned encoding:4;

    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or

                            * LFU data (least significant 8 bits frequency

                            * and most significant 16 bits access time). */

    int refcount;

    void *ptr;

} robj;

 

2.7.   sds

typedef char *sds;

3.  Data

3.1.   String

3.1.1.     set

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {

    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    int found = 0;

    int setkey_flags = 0;

 

    if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK) {

        return;

    }

 

    if (flags & OBJ_SET_GET) {

        if (getGenericCommand(c) == C_ERR) return;

    }

 

    found = (lookupKeyWrite(c->db,key) != NULL);

 

    if ((flags & OBJ_SET_NX && found) ||

        (flags & OBJ_SET_XX && !found))

    {

        if (!(flags & OBJ_SET_GET)) {

            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);

        }

        return;

    }

 

    setkey_flags |= (flags & OBJ_KEEPTTL) ? SETKEY_KEEPTTL : 0;

    setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST;

 

    setKey(c,c->db,key,val,setkey_flags);

    server.dirty++;

    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);

 

    if (expire) {

        setExpire(c,c->db,key,milliseconds);

        /* Propagate as SET Key Value PXAT millisecond-timestamp if there is

         * EX/PX/EXAT/PXAT flag. */

        robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);

        rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);

        decrRefCount(milliseconds_obj);

        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);

    }

 

    if (!(flags & OBJ_SET_GET)) {

        addReply(c, ok_reply ? ok_reply : shared.ok);

    }

 

    /* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */

    if ((flags & OBJ_SET_GET) && !expire) {

        int argc = 0;

        int j;

        robj **argv = zmalloc((c->argc-1)*sizeof(robj*));

        for (j=0; j < c->argc; j++) {

            char *a = c->argv[j]->ptr;

            /* Skip GET which may be repeated multiple times. */

            if (j >= 3 &&

                (a[0] == 'g' || a[0] == 'G') &&

                (a[1] == 'e' || a[1] == 'E') &&

                (a[2] == 't' || a[2] == 'T') && a[3] == '\0')

                continue;

            argv[argc++] = c->argv[j];

            incrRefCount(c->argv[j]);

        }

        replaceClientCommandVector(c, argc, argv);

    }

3.1.2.    get

int getGenericCommand(client *c) {

    robj *o;

 

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)

        return C_OK;

 

    if (checkType(c,o,OBJ_STRING)) {

        return C_ERR;

    }

 

    addReplyBulk(c,o);

    return C_OK;

}

 

3.1.3.    del

 

void getdelCommand(client *c) {

    if (getGenericCommand(c) == C_ERR) return;

    int deleted = server.lazyfree_lazy_user_del ? dbAsyncDelete(c->db, c->argv[1]) :

                  dbSyncDelete(c->db, c->argv[1]);

    if (deleted) {

        /* Propagate as DEL/UNLINK command */

        robj *aux = server.lazyfree_lazy_user_del ? shared.unlink : shared.del;

        rewriteClientCommandVector(c,2,aux,c->argv[1]);

        signalModifiedKey(c, c->db, c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_GENERIC, "del", c->argv[1], c->db->id);

        server.dirty++;

    }

}

3.2.   Hash

3.2.1.    create

robj *createHashObject(void) {

    unsigned char *zl = lpNew(0);

    robj *o = createObject(OBJ_HASH, zl);

    o->encoding = OBJ_ENCODING_LISTPACK;

    return o;

}

3.2.2.    set

 

void hsetCommand(client *c) {

    int i, created = 0;

    robj *o;

 

    if ((c->argc % 2) == 1) {

        addReplyErrorArity(c);

        return;

    }

 

    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;

    hashTypeTryConversion(o,c->argv,2,c->argc-1);

 

    for (i = 2; i < c->argc; i += 2)

        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

 

    /* HMSET (deprecated) and HSET return value is different. */

    char *cmdname = c->argv[0]->ptr;

    if (cmdname[1] == 's' || cmdname[1] == 'S') {

        /* HSET */

        addReplyLongLong(c, created);

    } else {

        /* HMSET */

        addReply(c, shared.ok);

    }

    signalModifiedKey(c,c->db,c->argv[1]);

    notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);

    server.dirty += (c->argc - 2)/2;

}

 

3.2.3.    get

void hgetCommand(client *c) {

    robj *o;

 

    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL ||

        checkType(c,o,OBJ_HASH)) return;

 

    addHashFieldToReply(c, o, c->argv[2]->ptr);

}

3.2.4.    del

 

void hdelCommand(client *c) {

    robj *o;

    int j, deleted = 0, keyremoved = 0;

 

    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||

        checkType(c,o,OBJ_HASH)) return;

 

    for (j = 2; j < c->argc; j++) {

        if (hashTypeDelete(o,c->argv[j]->ptr)) {

            deleted++;

            if (hashTypeLength(o) == 0) {

                dbDelete(c->db,c->argv[1]);

                keyremoved = 1;

                break;

            }

        }

    }

    if (deleted) {

        signalModifiedKey(c,c->db,c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_HASH,"hdel",c->argv[1],c->db->id);

        if (keyremoved)

            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],

                                c->db->id);

        server.dirty += deleted;

    }

    addReplyLongLong(c,deleted);

}

 

 

3.3.   List

3.3.1.    create

robj *createQuicklistObject(void) {

    quicklist *l = quicklistCreate();

    robj *o = createObject(OBJ_LIST,l);

    o->encoding = OBJ_ENCODING_QUICKLIST;

    return o;

}

3.3.2.    Set

void pushGenericCommand(client *c, int where, int xx) {

    int j;

 

    robj *lobj = lookupKeyWrite(c->db, c->argv[1]);

    if (checkType(c,lobj,OBJ_LIST)) return;

    if (!lobj) {

        if (xx) {

            addReply(c, shared.czero);

            return;

        }

 

        lobj = createQuicklistObject();

        quicklistSetOptions(lobj->ptr, server.list_max_listpack_size,

                            server.list_compress_depth);

        dbAdd(c->db,c->argv[1],lobj);

    }

 

    for (j = 2; j < c->argc; j++) {

        listTypePush(lobj,c->argv[j],where);

        server.dirty++;

    }

 

    addReplyLongLong(c, listTypeLength(lobj));

 

    char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

    signalModifiedKey(c,c->db,c->argv[1]);

    notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);

}

3.3.3.    Get

void popGenericCommand(client *c, int where) {

    int hascount = (c->argc == 3);

    long count = 0;

    robj *value;

 

    if (c->argc > 3) {

        addReplyErrorArity(c);

        return;

    } else if (hascount) {

        /* Parse the optional count argument. */

        if (getPositiveLongFromObjectOrReply(c,c->argv[2],&count,NULL) != C_OK)

            return;

    }

 

    robj *o = lookupKeyWriteOrReply(c, c->argv[1], hascount ? shared.nullarray[c->resp]: shared.null[c->resp]);

    if (o == NULL || checkType(c, o, OBJ_LIST))

        return;

 

    if (hascount && !count) {

        /* Fast exit path. */

        addReply(c,shared.emptyarray);

        return;

    }

 

    if (!count) {

        /* Pop a single element. This is POP's original behavior that replies

         * with a bulk string. */

        value = listTypePop(o,where);

        serverAssert(value != NULL);

        addReplyBulk(c,value);

        decrRefCount(value);

        listElementsRemoved(c,c->argv[1],where,o,1,NULL);

    } else {

        /* Pop a range of elements. An addition to the original POP command,

         *  which replies with a multi-bulk. */

        long llen = listTypeLength(o);

        long rangelen = (count > llen) ? llen : count;

        long rangestart = (where == LIST_HEAD) ? 0 : -rangelen;

        long rangeend = (where == LIST_HEAD) ? rangelen - 1 : -1;

        int reverse = (where == LIST_HEAD) ? 0 : 1;

 

        addListRangeReply(c,o,rangestart,rangeend,reverse);

        listTypeDelRange(o,rangestart,rangelen);

        listElementsRemoved(c,c->argv[1],where,o,rangelen,NULL);

    }

}

3.3.4.    Del

void lmoveHandlePush(client *c, robj *dstkey, robj *dstobj, robj *value,

                     int where) {

    /* Create the list if the key does not exist */

    if (!dstobj) {

        dstobj = createQuicklistObject();

        quicklistSetOptions(dstobj->ptr, server.list_max_listpack_size,

                            server.list_compress_depth);

        dbAdd(c->db,dstkey,dstobj);

    }

    signalModifiedKey(c,c->db,dstkey);

    listTypePush(dstobj,value,where);

    notifyKeyspaceEvent(NOTIFY_LIST,

                        where == LIST_HEAD ? "lpush" : "rpush",

                        dstkey,

                        c->db->id);

    /* Always send the pushed value to the client. */

    addReplyBulk(c,value);

}

 

3.4.   Set

3.4.1.    create

robj *createSetObject(void) {

    dict *d = dictCreate(&setDictType);

    robj *o = createObject(OBJ_SET,d);

    o->encoding = OBJ_ENCODING_HT;

    return o;

}

3.4.2.    Add

void saddCommand(client *c) {

    robj *set;

    int j, added = 0;

 

    set = lookupKeyWrite(c->db,c->argv[1]);

    if (checkType(c,set,OBJ_SET)) return;

   

    if (set == NULL) {

        set = setTypeCreate(c->argv[2]->ptr);

        dbAdd(c->db,c->argv[1],set);

    }

 

    for (j = 2; j < c->argc; j++) {

        if (setTypeAdd(set,c->argv[j]->ptr)) added++;

    }

    if (added) {

        signalModifiedKey(c,c->db,c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);

    }

    server.dirty += added;

    addReplyLongLong(c,added);

}

3.4.3.    Rem

void sremCommand(client *c) {

    robj *set;

    int j, deleted = 0, keyremoved = 0;

 

    if ((set = lookupKeyWriteOrReply(c,c->argv[1],shared.czero)) == NULL ||

        checkType(c,set,OBJ_SET)) return;

 

    for (j = 2; j < c->argc; j++) {

        if (setTypeRemove(set,c->argv[j]->ptr)) {

            deleted++;

            if (setTypeSize(set) == 0) {

                dbDelete(c->db,c->argv[1]);

                keyremoved = 1;

                break;

            }

        }

    }

    if (deleted) {

        signalModifiedKey(c,c->db,c->argv[1]);

        notifyKeyspaceEvent(NOTIFY_SET,"srem",c->argv[1],c->db->id);

        if (keyremoved)

            notifyKeyspaceEvent(NOTIFY_GENERIC,"del",c->argv[1],

                                c->db->id);

        server.dirty += deleted;

    }

    addReplyLongLong(c,deleted);

}

3.4.4.    Member

void sismemberCommand(client *c) {

    robj *set;

 

    if ((set = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||

        checkType(c,set,OBJ_SET)) return;

 

    if (setTypeIsMember(set,c->argv[2]->ptr))

        addReply(c,shared.cone);

    else

        addReply(c,shared.czero);

}

3.5.   zset

 

3.5.1.    Create

robj *createZsetObject(void) {

    zset *zs = zmalloc(sizeof(*zs));

    robj *o;

 

    zs->dict = dictCreate(&zsetDictType);

    zs->zsl = zslCreate();

    o = createObject(OBJ_ZSET,zs);

    o->encoding = OBJ_ENCODING_SKIPLIST;

    return o;

}

 

robj *createZsetListpackObject(void) {

    unsigned char *lp = lpNew(0);

    robj *o = createObject(OBJ_ZSET,lp);

    o->encoding = OBJ_ENCODING_LISTPACK;

    return o;

}

3.5.2.    Add

void zaddGenericCommand(client *c, int flags) {

    static char *nanerr = "resulting score is not a number (NaN)";

    robj *key = c->argv[1];

    robj *zobj;

    sds ele;

    double score = 0, *scores = NULL;

    int j, elements, ch = 0;

    int scoreidx = 0;

    /* The following vars are used in order to track what the command actually

     * did during the execution, to reply to the client and to trigger the

     * notification of keyspace change. */

    int added = 0;      /* Number of new elements added. */

    int updated = 0;    /* Number of elements with updated score. */

    int processed = 0;  /* Number of elements processed, may remain zero with

                           options like XX. */

 

    /* Parse options. At the end 'scoreidx' is set to the argument position

     * of the score of the first score-element pair. */

    scoreidx = 2;

    while(scoreidx < c->argc) {

        char *opt = c->argv[scoreidx]->ptr;

        if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;

        else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;

        else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */

        else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;

        else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;

        else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;

        else break;

        scoreidx++;

    }

 

    /* Turn options into simple to check vars. */

    int incr = (flags & ZADD_IN_INCR) != 0;

    int nx = (flags & ZADD_IN_NX) != 0;

    int xx = (flags & ZADD_IN_XX) != 0;

    int gt = (flags & ZADD_IN_GT) != 0;

    int lt = (flags & ZADD_IN_LT) != 0;

 

    /* After the options, we expect to have an even number of args, since

     * we expect any number of score-element pairs. */

    elements = c->argc-scoreidx;

    if (elements % 2 || !elements) {

        addReplyErrorObject(c,shared.syntaxerr);

        return;

    }

    elements /= 2; /* Now this holds the number of score-element pairs. */

 

    /* Check for incompatible options. */

    if (nx && xx) {

        addReplyError(c,

            "XX and NX options at the same time are not compatible");

        return;

    }

   

    if ((gt && nx) || (lt && nx) || (gt && lt)) {

        addReplyError(c,

            "GT, LT, and/or NX options at the same time are not compatible");

        return;

    }

    /* Note that XX is compatible with either GT or LT */

 

    if (incr && elements > 1) {

        addReplyError(c,

            "INCR option supports a single increment-element pair");

        return;

    }

 

    /* Start parsing all the scores, we need to emit any syntax error

     * before executing additions to the sorted set, as the command should

     * either execute fully or nothing at all. */

    scores = zmalloc(sizeof(double)*elements);

    for (j = 0; j < elements; j++) {

        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)

            != C_OK) goto cleanup;

    }

 

    /* Lookup the key and create the sorted set if does not exist. */

    zobj = lookupKeyWrite(c->db,key);

    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;

    if (zobj == NULL) {

        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */

        if (server.zset_max_listpack_entries == 0 ||

            server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))

        {

            zobj = createZsetObject();

        } else {

            zobj = createZsetListpackObject();

        }

        dbAdd(c->db,key,zobj);

    }

 

    for (j = 0; j < elements; j++) {

        double newscore;

        score = scores[j];

        int retflags = 0;

 

        ele = c->argv[scoreidx+1+j*2]->ptr;

        int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);

        if (retval == 0) {

            addReplyError(c,nanerr);

            goto cleanup;

        }

        if (retflags & ZADD_OUT_ADDED) added++;

        if (retflags & ZADD_OUT_UPDATED) updated++;

        if (!(retflags & ZADD_OUT_NOP)) processed++;

        score = newscore;

    }

    server.dirty += (added+updated);

 

reply_to_client:

    if (incr) { /* ZINCRBY or INCR option. */

        if (processed)

            addReplyDouble(c,score);

        else

            addReplyNull(c);

    } else { /* ZADD. */

        addReplyLongLong(c,ch ? added+updated : added);

    }

 

cleanup:

    zfree(scores);

    if (added || updated) {

        signalModifiedKey(c,c->db,key);

        notifyKeyspaceEvent(NOTIFY_ZSET,

            incr ? "zincr" : "zadd", key, c->db->id);

    }

}

 

3.5.3.    Get

void zaddGenericCommand(client *c, int flags) {

    static char *nanerr = "resulting score is not a number (NaN)";

    robj *key = c->argv[1];

    robj *zobj;

    sds ele;

    double score = 0, *scores = NULL;

    int j, elements, ch = 0;

    int scoreidx = 0;

    /* The following vars are used in order to track what the command actually

     * did during the execution, to reply to the client and to trigger the

     * notification of keyspace change. */

    int added = 0;      /* Number of new elements added. */

    int updated = 0;    /* Number of elements with updated score. */

    int processed = 0;  /* Number of elements processed, may remain zero with

                           options like XX. */

 

    /* Parse options. At the end 'scoreidx' is set to the argument position

     * of the score of the first score-element pair. */

    scoreidx = 2;

    while(scoreidx < c->argc) {

        char *opt = c->argv[scoreidx]->ptr;

        if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;

        else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;

        else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */

        else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;

        else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;

        else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;

        else break;

        scoreidx++;

    }

 

    /* Turn options into simple to check vars. */

    int incr = (flags & ZADD_IN_INCR) != 0;

    int nx = (flags & ZADD_IN_NX) != 0;

    int xx = (flags & ZADD_IN_XX) != 0;

    int gt = (flags & ZADD_IN_GT) != 0;

    int lt = (flags & ZADD_IN_LT) != 0;

 

    /* After the options, we expect to have an even number of args, since

     * we expect any number of score-element pairs. */

    elements = c->argc-scoreidx;

    if (elements % 2 || !elements) {

        addReplyErrorObject(c,shared.syntaxerr);

        return;

    }

    elements /= 2; /* Now this holds the number of score-element pairs. */

 

    /* Check for incompatible options. */

    if (nx && xx) {

        addReplyError(c,

            "XX and NX options at the same time are not compatible");

        return;

    }

   

    if ((gt && nx) || (lt && nx) || (gt && lt)) {

        addReplyError(c,

            "GT, LT, and/or NX options at the same time are not compatible");

        return;

    }

    /* Note that XX is compatible with either GT or LT */

 

    if (incr && elements > 1) {

        addReplyError(c,

            "INCR option supports a single increment-element pair");

        return;

    }

 

    /* Start parsing all the scores, we need to emit any syntax error

     * before executing additions to the sorted set, as the command should

     * either execute fully or nothing at all. */

    scores = zmalloc(sizeof(double)*elements);

    for (j = 0; j < elements; j++) {

        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)

            != C_OK) goto cleanup;

    }

 

    /* Lookup the key and create the sorted set if does not exist. */

    zobj = lookupKeyWrite(c->db,key);

    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;

    if (zobj == NULL) {

        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */

        if (server.zset_max_listpack_entries == 0 ||

            server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))

        {

            zobj = createZsetObject();

        } else {

            zobj = createZsetListpackObject();

        }

        dbAdd(c->db,key,zobj);

    }

 

    for (j = 0; j < elements; j++) {

        double newscore;

        score = scores[j];

        int retflags = 0;

 

        ele = c->argv[scoreidx+1+j*2]->ptr;

        int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);

        if (retval == 0) {

            addReplyError(c,nanerr);

            goto cleanup;

        }

        if (retflags & ZADD_OUT_ADDED) added++;

        if (retflags & ZADD_OUT_UPDATED) updated++;

        if (!(retflags & ZADD_OUT_NOP)) processed++;

        score = newscore;

    }

    server.dirty += (added+updated);

 

reply_to_client:

    if (incr) { /* ZINCRBY or INCR option. */

        if (processed)

            addReplyDouble(c,score);

        else

            addReplyNull(c);

    } else { /* ZADD. */

        addReplyLongLong(c,ch ? added+updated : added);

    }

 

cleanup:

    zfree(scores);

    if (added || updated) {

        signalModifiedKey(c,c->db,key);

        notifyKeyspaceEvent(NOTIFY_ZSET,

            incr ? "zincr" : "zadd", key, c->db->id);

    }

}

 

 

3.6.   Stream

3.6.1.    stream

typedef struct raxNode {

    uint32_t iskey:1;     /* Does this node contain a key? */

    uint32_t isnull:1;    /* Associated value is NULL (don't store it). */

    uint32_t iscompr:1;   /* Node is compressed. */

    uint32_t size:29;     /* Number of children, or compressed string len. */

    /* Data layout is as follows:

     *

     * If node is not compressed we have 'size' bytes, one for each children

     * character, and 'size' raxNode pointers, point to each child node.

     * Note how the character is not stored in the children but in the

     * edge of the parents:

     *

     * [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)

     *

     * if node is compressed (iscompr bit is 1) the node has 1 children.

     * In that case the 'size' bytes of the string stored immediately at

     * the start of the data section, represent a sequence of successive

     * nodes linked one after the other, for which only the last one in

     * the sequence is actually represented as a node, and pointed to by

     * the current compressed node.

     *

     * [header iscompr=1][xyz][z-ptr](value-ptr?)

     *

     * Both compressed and not compressed nodes can represent a key

     * with associated data in the radix tree at any level (not just terminal

     * nodes).

     *

     * If the node has an associated key (iskey=1) and is not NULL

     * (isnull=0), then after the raxNode pointers pointing to the

     * children, an additional value pointer is present (as you can see

     * in the representation above as "value-ptr" field).

     */

    unsigned char data[];

} raxNode;

typedef struct rax {

    raxNode *head;

    uint64_t numele;

    uint64_t numnodes;

} rax;

typedef struct streamID {

    uint64_t ms;        /* Unix time in milliseconds. */

    uint64_t seq;       /* Sequence number. */

} streamID;

 

typedef struct stream {

    rax *rax;               /* The radix tree holding the stream. */

    uint64_t length;        /* Current number of elements inside this stream. */

    streamID last_id;       /* Zero if there are yet no items. */

    streamID first_id;      /* The first non-tombstone entry, zero if empty. */

    streamID max_deleted_entry_id;  /* The maximal ID that was deleted. */

    uint64_t entries_added; /* All time count of elements added. */

    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */

} stream;

3.6.2.    Create

robj *createStreamObject(void) {

    stream *s = streamNew();

    robj *o = createObject(OBJ_STREAM,s);

    o->encoding = OBJ_ENCODING_STREAM;

    return o;

}

 

/* Create a new stream data structure. */

stream *streamNew(void) {

    stream *s = zmalloc(sizeof(*s));

    s->rax = raxNew();

    s->length = 0;

    s->first_id.ms = 0;

    s->first_id.seq = 0;

    s->last_id.ms = 0;

    s->last_id.seq = 0;

    s->max_deleted_entry_id.seq = 0;

    s->max_deleted_entry_id.ms = 0;

    s->entries_added = 0;

    s->cgroups = NULL; /* Created on demand to save memory when not used. */

    return s;

}

 

3.6.3.    Add

void xaddCommand(client *c) {

    /* Parse options. */

    streamAddTrimArgs parsed_args;

    int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);

    if (idpos < 0)

        return; /* streamParseAddOrTrimArgsOrReply already replied. */

    int field_pos = idpos+1; /* The ID is always one argument before the first field */

 

    /* Check arity. */

    if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {

        addReplyErrorArity(c);

        return;

    }

 

    /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating

     * a new stream and have streamAppendItem fail, leaving an empty key in the

     * database. */

    if (parsed_args.id_given && parsed_args.seq_given &&

        parsed_args.id.ms == 0 && parsed_args.id.seq == 0)

    {

        addReplyError(c,"The ID specified in XADD must be greater than 0-0");

        return;

    }

 

    /* Lookup the stream at key. */

    robj *o;

    stream *s;

    if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;

    s = o->ptr;

 

    /* Return ASAP if the stream has reached the last possible ID */

    if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {

        addReplyError(c,"The stream has exhausted the last possible ID, "

                        "unable to add more items");

        return;

    }

 

    /* Append using the low level function and return the ID. */

    streamID id;

    if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,

        &id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)

    {

        if (errno == EDOM)

            addReplyError(c,"The ID specified in XADD is equal or smaller than "

                            "the target stream top item");

        else

            addReplyError(c,"Elements are too large to be stored");

        return;

    }

    sds replyid = createStreamIDString(&id);

    addReplyBulkCBuffer(c, replyid, sdslen(replyid));

 

    signalModifiedKey(c,c->db,c->argv[1]);

    notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);

    server.dirty++;

 

    /* Trim if needed. */

    if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {

        if (streamTrim(s, &parsed_args)) {

            notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);

        }

        if (parsed_args.approx_trim) {

            /* In case our trimming was limited (by LIMIT or by ~) we must

             * re-write the relevant trim argument to make sure there will be

             * no inconsistencies in AOF loading or in the replica.

             * It's enough to check only args->approx because there is no

             * way LIMIT is given without the ~ option. */

            streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);

            streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);

        }

    }

 

    /* Let's rewrite the ID argument with the one actually generated for

     * AOF/replication propagation. */

    if (!parsed_args.id_given || !parsed_args.seq_given) {

        robj *idarg = createObject(OBJ_STRING, replyid);

        rewriteClientCommandArgument(c, idpos, idarg);

        decrRefCount(idarg);

    } else {

        sdsfree(replyid);

    }

 

    /* We need to signal to blocked clients that there is new data on this

     * stream. */

    signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM);

}

 

3.6.4.    XREADGROUP

void xreadCommand(client *c) {

    long long timeout = -1; /* -1 means, no BLOCK argument given. */

    long long count = 0;

    int streams_count = 0;

    int streams_arg = 0;

    int noack = 0;          /* True if NOACK option was specified. */

    streamID static_ids[STREAMID_STATIC_VECTOR_LEN];

    streamID *ids = static_ids;

    streamCG **groups = NULL;

    int xreadgroup = sdslen(c->argv[0]->ptr) == 10; /* XREAD or XREADGROUP? */

    robj *groupname = NULL;

    robj *consumername = NULL;

 

    /* Parse arguments. */

    for (int i = 1; i < c->argc; i++) {

        int moreargs = c->argc-i-1;

        char *o = c->argv[i]->ptr;

        if (!strcasecmp(o,"BLOCK") && moreargs) {

            if (c->flags & CLIENT_SCRIPT) {

                /*

                 * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client

                 * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backward compatibility.

                 * There is no sense to use BLOCK option within Lua. */

                addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);

                return;

            }

            i++;

            if (getTimeoutFromObjectOrReply(c,c->argv[i],&timeout,

                UNIT_MILLISECONDS) != C_OK) return;

        } else if (!strcasecmp(o,"COUNT") && moreargs) {

            i++;

            if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)

                return;

            if (count < 0) count = 0;

        } else if (!strcasecmp(o,"STREAMS") && moreargs) {

            streams_arg = i+1;

            streams_count = (c->argc-streams_arg);

            if ((streams_count % 2) != 0) {

                addReplyError(c,"Unbalanced XREAD list of streams: "

                                "for each stream key an ID or '$' must be "

                                "specified.");

                return;

            }

            streams_count /= 2; /* We have two arguments for each stream. */

            break;

        } else if (!strcasecmp(o,"GROUP") && moreargs >= 2) {

            if (!xreadgroup) {

                addReplyError(c,"The GROUP option is only supported by "

                                "XREADGROUP. You called XREAD instead.");

                return;

            }

            groupname = c->argv[i+1];

            consumername = c->argv[i+2];

            i += 2;

        } else if (!strcasecmp(o,"NOACK")) {

            if (!xreadgroup) {

                addReplyError(c,"The NOACK option is only supported by "

                                "XREADGROUP. You called XREAD instead.");

                return;

            }

            noack = 1;

        } else {

            addReplyErrorObject(c,shared.syntaxerr);

            return;

        }

    }

 

    /* STREAMS option is mandatory. */

    if (streams_arg == 0) {

        addReplyErrorObject(c,shared.syntaxerr);

        return;

    }

 

    /* If the user specified XREADGROUP then it must also

     * provide the GROUP option. */

    if (xreadgroup && groupname == NULL) {

        addReplyError(c,"Missing GROUP option for XREADGROUP");

        return;

    }

 

    /* Parse the IDs and resolve the group name. */

    if (streams_count > STREAMID_STATIC_VECTOR_LEN)

        ids = zmalloc(sizeof(streamID)*streams_count);

    if (groupname) groups = zmalloc(sizeof(streamCG*)*streams_count);

 

    for (int i = streams_arg + streams_count; i < c->argc; i++) {

        /* Specifying "$" as last-known-id means that the client wants to be

         * served with just the messages that will arrive into the stream

         * starting from now. */

        int id_idx = i - streams_arg - streams_count;

        robj *key = c->argv[i-streams_count];

        robj *o = lookupKeyRead(c->db,key);

        if (checkType(c,o,OBJ_STREAM)) goto cleanup;

        streamCG *group = NULL;

 

        /* If a group was specified, than we need to be sure that the

         * key and group actually exist. */

        if (groupname) {

            if (o == NULL ||

                (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)

            {

                addReplyErrorFormat(c, "-NOGROUP No such key '%s' or consumer "

                                       "group '%s' in XREADGROUP with GROUP "

                                       "option",

                                    (char*)key->ptr,(char*)groupname->ptr);

                goto cleanup;

            }

            groups[id_idx] = group;

        }

 

        if (strcmp(c->argv[i]->ptr,"$") == 0) {

            if (xreadgroup) {

                addReplyError(c,"The $ ID is meaningless in the context of "

                                "XREADGROUP: you want to read the history of "

                                "this consumer by specifying a proper ID, or "

                                "use the > ID to get new messages. The $ ID would "

                                "just return an empty result set.");

                goto cleanup;

            }

            if (o) {

                stream *s = o->ptr;

                ids[id_idx] = s->last_id;

            } else {

                ids[id_idx].ms = 0;

                ids[id_idx].seq = 0;

            }

            continue;

        } else if (strcmp(c->argv[i]->ptr,">") == 0) {

            if (!xreadgroup) {

                addReplyError(c,"The > ID can be specified only when calling "

                                "XREADGROUP using the GROUP <group> "

                                "<consumer> option.");

                goto cleanup;

            }

            /* We use just the maximum ID to signal this is a ">" ID, anyway

             * the code handling the blocking clients will have to update the

             * ID later in order to match the changing consumer group last ID. */

            ids[id_idx].ms = UINT64_MAX;

            ids[id_idx].seq = UINT64_MAX;

            continue;

        }

        if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)

            goto cleanup;

    }

 

    /* Try to serve the client synchronously. */

    size_t arraylen = 0;

    void *arraylen_ptr = NULL;

    for (int i = 0; i < streams_count; i++) {

        robj *o = lookupKeyRead(c->db,c->argv[streams_arg+i]);

        if (o == NULL) continue;

        stream *s = o->ptr;

        streamID *gt = ids+i; /* ID must be greater than this. */

        int serve_synchronously = 0;

        int serve_history = 0; /* True for XREADGROUP with ID != ">". */

 

        /* Check if there are the conditions to serve the client

         * synchronously. */

        if (groups) {

            /* If the consumer is blocked on a group, we always serve it

             * synchronously (serving its local history) if the ID specified

             * was not the special ">" ID. */

            if (gt->ms != UINT64_MAX ||

                gt->seq != UINT64_MAX)

            {

                serve_synchronously = 1;

                serve_history = 1;

            } else if (s->length) {

                /* We also want to serve a consumer in a consumer group

                 * synchronously in case the group top item delivered is smaller

                 * than what the stream has inside. */

                streamID maxid, *last = &groups[i]->last_id;

                streamLastValidID(s, &maxid);

                if (streamCompareID(&maxid, last) > 0) {

                    serve_synchronously = 1;

                    *gt = *last;

                }

            }

        } else if (s->length) {

            /* For consumers without a group, we serve synchronously if we can

             * actually provide at least one item from the stream. */

            streamID maxid;

            streamLastValidID(s, &maxid);

            if (streamCompareID(&maxid, gt) > 0) {

                serve_synchronously = 1;

            }

        }

 

        if (serve_synchronously) {

            arraylen++;

            if (arraylen == 1) arraylen_ptr = addReplyDeferredLen(c);

            /* streamReplyWithRange() handles the 'start' ID as inclusive,

             * so start from the next ID, since we want only messages with

             * IDs greater than start. */

            streamID start = *gt;

            streamIncrID(&start);

 

            /* Emit the two elements sub-array consisting of the name

             * of the stream and the data we extracted from it. */

            if (c->resp == 2) addReplyArrayLen(c,2);

            addReplyBulk(c,c->argv[streams_arg+i]);

            streamConsumer *consumer = NULL;

            streamPropInfo spi = {c->argv[i+streams_arg],groupname};

            if (groups) {

                consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT);

                if (consumer == NULL) {

                    consumer = streamCreateConsumer(groups[i],consumername->ptr,

                                                    c->argv[streams_arg+i],

                                                    c->db->id,SCC_DEFAULT);

                    if (noack)

                        streamPropagateConsumerCreation(c,spi.keyname,

                                                        spi.groupname,

                                                        consumer->name);

                }

            }

            int flags = 0;

            if (noack) flags |= STREAM_RWR_NOACK;

            if (serve_history) flags |= STREAM_RWR_HISTORY;

            streamReplyWithRange(c,s,&start,NULL,count,0,

                                 groups ? groups[i] : NULL,

                                 consumer, flags, &spi);

            if (groups) server.dirty++;

        }

    }

 

     /* We replied synchronously! Set the top array len and return to caller. */

    if (arraylen) {

        if (c->resp == 2)

            setDeferredArrayLen(c,arraylen_ptr,arraylen);

        else

            setDeferredMapLen(c,arraylen_ptr,arraylen);

        goto cleanup;

    }

 

    /* Block if needed. */

    if (timeout != -1) {

        /* If we are not allowed to block the client, the only thing

         * we can do is treating it as a timeout (even with timeout 0). */

        if (c->flags & CLIENT_DENY_BLOCKING) {

            addReplyNullArray(c);

            goto cleanup;

        }

        blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,

                     -1, timeout, NULL, NULL, ids);

        /* If no COUNT is given and we block, set a relatively small count:

         * in case the ID provided is too low, we do not want the server to

         * block just to serve this client a huge stream of messages. */

        c->bpop.xread_count = count ? count : XREAD_BLOCKED_DEFAULT_COUNT;

 

        /* If this is a XREADGROUP + GROUP we need to remember for which

         * group and consumer name we are blocking, so later when one of the

         * keys receive more data, we can call streamReplyWithRange() passing

         * the right arguments. */

        if (groupname) {

            incrRefCount(groupname);

            incrRefCount(consumername);

            c->bpop.xread_group = groupname;

            c->bpop.xread_consumer = consumername;

            c->bpop.xread_group_noack = noack;

        } else {

            c->bpop.xread_group = NULL;

            c->bpop.xread_consumer = NULL;

        }

        goto cleanup;

    }

 

    /* No BLOCK option, nor any stream we can serve. Reply as with a

     * timeout happened. */

    addReplyNullArray(c);

    /* Continue to cleanup... */

 

cleanup: /* Cleanup. */

 

    /* The command is propagated (in the READGROUP form) as a side effect

     * of calling lower level APIs. So stop any implicit propagation. */

    preventCommandPropagation(c);

    if (ids != static_ids) zfree(ids);

    zfree(groups);

}

3.6.5.    ack

void xackCommand(client *c) {

    streamCG *group = NULL;

    robj *o = lookupKeyRead(c->db,c->argv[1]);

    if (o) {

        if (checkType(c,o,OBJ_STREAM)) return; /* Type error. */

        group = streamLookupCG(o->ptr,c->argv[2]->ptr);

    }

 

    /* No key or group? Nothing to ack. */

    if (o == NULL || group == NULL) {

        addReply(c,shared.czero);

        return;

    }

 

    /* Start parsing the IDs, so that we abort ASAP if there is a syntax

     * error: the return value of this command cannot be an error in case

     * the client successfully acknowledged some messages, so it should be

     * executed in a "all or nothing" fashion. */

    streamID static_ids[STREAMID_STATIC_VECTOR_LEN];

    streamID *ids = static_ids;

    int id_count = c->argc-3;

    if (id_count > STREAMID_STATIC_VECTOR_LEN)

        ids = zmalloc(sizeof(streamID)*id_count);

    for (int j = 3; j < c->argc; j++) {

        if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;

    }

 

    int acknowledged = 0;

    for (int j = 3; j < c->argc; j++) {

        unsigned char buf[sizeof(streamID)];

        streamEncodeID(buf,&ids[j-3]);

 

        /* Lookup the ID in the group PEL: it will have a reference to the

         * NACK structure that will have a reference to the consumer, so that

         * we are able to remove the entry from both PELs. */

        streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));

        if (nack != raxNotFound) {

            raxRemove(group->pel,buf,sizeof(buf),NULL);

            raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);

            streamFreeNACK(nack);

            acknowledged++;

            server.dirty++;

        }

    }

    addReplyLongLong(c,acknowledged);

cleanup:

    if (ids != static_ids) zfree(ids);

}

4.  Algorithm

4.1.   list

4.1.1.    quicklist

/* Create a new quicklist.

 * Free with quicklistRelease(). */

quicklist *quicklistCreate(void) {

    struct quicklist *quicklist;

 

    quicklist = zmalloc(sizeof(*quicklist));

    quicklist->head = quicklist->tail = NULL;

    quicklist->len = 0;

    quicklist->count = 0;

    quicklist->compress = 0;

    quicklist->fill = -2;

    quicklist->bookmark_count = 0;

    return quicklist;

}

 

 

4.2.   hash

4.2.1.    listpack

typedef struct {

    /* When string is used, it is provided with the length (slen). */

    unsigned char *sval;

    uint32_t slen;

    /* When integer is used, 'sval' is NULL, and lval holds the value. */

    long long lval;

} listpackEntry;

 

4.2.2.    skip

 

unsigned char *lpInsert(unsigned char *lp, unsigned char *elestr, unsigned char *eleint,

                        uint32_t size, unsigned char *p, int where, unsigned char **newp)

{  

  unsigned char intenc[LP_MAX_INT_ENCODING_LEN];

    unsigned char backlen[LP_MAX_BACKLEN_SIZE];

 

    uint64_t enclen; /* The length of the encoded element. */

    int delete = (elestr == NULL && eleint == NULL);

 

    /* when deletion, it is conceptually replacing the element with a

     * zero-length element. So whatever we get passed as 'where', set

     * it to LP_REPLACE. */

    if (delete) where = LP_REPLACE;

 

    /* If we need to insert after the current element, we just jump to the

     * next element (that could be the EOF one) and handle the case of

     * inserting before. So the function will actually deal with just two

     * cases: LP_BEFORE and LP_REPLACE. */

    if (where == LP_AFTER) {

        p = lpSkip(p);

        where = LP_BEFORE;

        ASSERT_INTEGRITY(lp, p);

    }

 

    /* Store the offset of the element 'p', so that we can obtain its

     * address again after a reallocation. */

    unsigned long poff = p-lp;

 

    int enctype;

    if (elestr) {

        /* Calling lpEncodeGetType() results into the encoded version of the

        * element to be stored into 'intenc' in case it is representable as

        * an integer: in that case, the function returns LP_ENCODING_INT.

        * Otherwise if LP_ENCODING_STR is returned, we'll have to call

        * lpEncodeString() to actually write the encoded string on place later.

        *

        * Whatever the returned encoding is, 'enclen' is populated with the

        * length of the encoded element. */

        enctype = lpEncodeGetType(elestr,size,intenc,&enclen);

        if (enctype == LP_ENCODING_INT) eleint = intenc;

    } else if (eleint) {

        enctype = LP_ENCODING_INT;

        enclen = size; /* 'size' is the length of the encoded integer element. */

    } else {

        enctype = -1;

        enclen = 0;

    }

 

    /* We need to also encode the backward-parsable length of the element

     * and append it to the end: this allows to traverse the listpack from

     * the end to the start. */

    unsigned long backlen_size = (!delete) ? lpEncodeBacklen(backlen,enclen) : 0;

    uint64_t old_listpack_bytes = lpGetTotalBytes(lp);

    uint32_t replaced_len  = 0;

    if (where == LP_REPLACE) {

        replaced_len = lpCurrentEncodedSizeUnsafe(p);

        replaced_len += lpEncodeBacklen(NULL,replaced_len);

        ASSERT_INTEGRITY_LEN(lp, p, replaced_len);

    }

 

    uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size

                                  - replaced_len;

    if (new_listpack_bytes > UINT32_MAX) return NULL;

 

    /* We now need to reallocate in order to make space or shrink the

     * allocation (in case 'when' value is LP_REPLACE and the new element is

     * smaller). However we do that before memmoving the memory to

     * make room for the new element if the final allocation will get

     * larger, or we do it after if the final allocation will get smaller. */

 

    unsigned char *dst = lp + poff; /* May be updated after reallocation. */

 

    /* Realloc before: we need more room. */

    if (new_listpack_bytes > old_listpack_bytes &&

        new_listpack_bytes > lp_malloc_size(lp)) {

        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;

        dst = lp + poff;

    }

 

    /* Setup the listpack relocating the elements to make the exact room

     * we need to store the new one. */

    if (where == LP_BEFORE) {

        memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);

    } else { /* LP_REPLACE. */

        long lendiff = (enclen+backlen_size)-replaced_len;

        memmove(dst+replaced_len+lendiff,

                dst+replaced_len,

                old_listpack_bytes-poff-replaced_len);

    }

 

    /* Realloc after: we need to free space. */

    if (new_listpack_bytes < old_listpack_bytes) {

        if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;

        dst = lp + poff;

    }

 

    /* Store the entry. */

    if (newp) {

        *newp = dst;

        /* In case of deletion, set 'newp' to NULL if the next element is

         * the EOF element. */

        if (delete && dst[0] == LP_EOF) *newp = NULL;

    }

    if (!delete) {

        if (enctype == LP_ENCODING_INT) {

            memcpy(dst,eleint,enclen);

        } else {

            lpEncodeString(dst,elestr,size);

        }

        dst += enclen;

        memcpy(dst,backlen,backlen_size);

        dst += backlen_size;

    }

 

    /* Update header. */

    if (where != LP_REPLACE || delete) {

        uint32_t num_elements = lpGetNumElements(lp);

        if (num_elements != LP_HDR_NUMELE_UNKNOWN) {

            if (!delete)

                lpSetNumElements(lp,num_elements+1);

            else

                lpSetNumElements(lp,num_elements-1);

        }

    }

    lpSetTotalBytes(lp,new_listpack_bytes);

 

#if 0

    /* This code path is normally disabled: what it does is to force listpack

     * to return *always* a new pointer after performing some modification to

     * the listpack, even if the previous allocation was enough. This is useful

     * in order to spot bugs in code using listpacks: by doing so we can find

     * if the caller forgets to set the new pointer where the listpack reference

     * is stored, after an update. */

    unsigned char *oldlp = lp;

    lp = lp_malloc(new_listpack_bytes);

    memcpy(lp,oldlp,new_listpack_bytes);

    if (newp) {

        unsigned long offset = (*newp)-oldlp;

        *newp = lp + offset;

    }

    /* Make sure the old allocation contains garbage. */

    memset(oldlp,'A',new_listpack_bytes);

    lp_free(oldlp);

#endif

 

    return lp;

}

 

unsigned char *lpNew(size_t capacity) {

    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);

    if (lp == NULL) return NULL;

    lpSetTotalBytes(lp,LP_HDR_SIZE+1);

    lpSetNumElements(lp,0);

    lp[LP_HDR_SIZE] = LP_EOF;

    return lp;

}

 

 

4.3.   zset

4.3.1.    zskiplist

 

typedef struct zskiplist {

    struct zskiplistNode *header, *tail;

    unsigned long length;

    int level;

} zskiplist;

 

 

4.3.2.    Zset

typedef struct zset {

    dict *dict;

    zskiplist *zsl;

} zset;

 

 

zskiplist *zslCreate(void) {

    int j;

    zskiplist *zsl;

 

    zsl = zmalloc(sizeof(*zsl));

    zsl->level = 1;

    zsl->length = 0;

    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);

    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {

        zsl->header->level[j].forward = NULL;

        zsl->header->level[j].span = 0;

    }

    zsl->header->backward = NULL;

    zsl->tail = NULL;

    return zsl;

}

 

5.  Pipeline

void modulePipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) {

    UNUSED(el);

    UNUSED(fd);

    UNUSED(mask);

    UNUSED(privdata);

 

    char buf[128];

    while (read(fd, buf, sizeof(buf)) == sizeof(buf));

 

    /* Handle event loop events if pipe was written from event loop API */

    eventLoopHandleOneShotEvents();

}

static void eventLoopHandleOneShotEvents() {

    pthread_mutex_lock(&moduleEventLoopMutex);

    if (moduleEventLoopOneShots) {

        while (listLength(moduleEventLoopOneShots)) {

            listNode *ln = listFirst(moduleEventLoopOneShots);

            EventLoopOneShot *oneshot = ln->value;

            listDelNode(moduleEventLoopOneShots, ln);

            /* Unlock mutex before the callback. Another oneshot event can be

             * added in the callback, it will need to lock the mutex. */

            pthread_mutex_unlock(&moduleEventLoopMutex);

            oneshot->func(oneshot->user_data);

            zfree(oneshot);

            /* Lock again for the next iteration */

            pthread_mutex_lock(&moduleEventLoopMutex);

        }

    }

    pthread_mutex_unlock(&moduleEventLoopMutex);

}

6.  keyspace notifications

void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid) {

    sds chan;

    robj *chanobj, *eventobj;

    int len = -1;

    char buf[24];

 

    /* If any modules are interested in events, notify the module system now.

     * This bypasses the notifications configuration, but the module engine

     * will only call event subscribers if the event type matches the types

     * they are interested in. */

     moduleNotifyKeyspaceEvent(type, event, key, dbid);

 

    /* If notifications for this class of events are off, return ASAP. */

    if (!(server.notify_keyspace_events & type)) return;

 

    eventobj = createStringObject(event,strlen(event));

 

    /* __keyspace@<db>__:<key> <event> notifications. */

    if (server.notify_keyspace_events & NOTIFY_KEYSPACE) {

        chan = sdsnewlen("__keyspace@",11);

        len = ll2string(buf,sizeof(buf),dbid);

        chan = sdscatlen(chan, buf, len);

        chan = sdscatlen(chan, "__:", 3);

        chan = sdscatsds(chan, key->ptr);

        chanobj = createObject(OBJ_STRING, chan);

        pubsubPublishMessage(chanobj, eventobj, 0);

        decrRefCount(chanobj);

    }

 

    /* __keyevent@<db>__:<event> <key> notifications. */

    if (server.notify_keyspace_events & NOTIFY_KEYEVENT) {

        chan = sdsnewlen("__keyevent@",11);

        if (len == -1) len = ll2string(buf,sizeof(buf),dbid);

        chan = sdscatlen(chan, buf, len);

        chan = sdscatlen(chan, "__:", 3);

        chan = sdscatsds(chan, eventobj->ptr);

        chanobj = createObject(OBJ_STRING, chan);

        pubsubPublishMessage(chanobj, key, 0);

        decrRefCount(chanobj);

    }

    decrRefCount(eventobj);

}

 

void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) {

    /* Don't do anything if there aren't any subscribers */

    if (listLength(moduleKeyspaceSubscribers) == 0) return;

 

    listIter li;

    listNode *ln;

    listRewind(moduleKeyspaceSubscribers,&li);

 

    /* Remove irrelevant flags from the type mask */

    type &= ~(NOTIFY_KEYEVENT | NOTIFY_KEYSPACE);

 

    while((ln = listNext(&li))) {

        RedisModuleKeyspaceSubscriber *sub = ln->value;

        /* Only notify subscribers on events matching the registration,

         * and avoid subscribers triggering themselves */

        if ((sub->event_mask & type) && sub->active == 0) {

            RedisModuleCtx ctx;

            moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);

            selectDb(ctx.client, dbid);

 

            /* mark the handler as active to avoid reentrant loops.

             * If the subscriber performs an action triggering itself,

             * it will not be notified about it. */

            sub->active = 1;

            sub->notify_callback(&ctx, type, event, key);

            sub->active = 0;

            moduleFreeContext(&ctx);

        }

    }

}

7.  Pub/sub

8.1.   list

 

/* Node, List, and Iterator are the only data structures used currently. */

 

typedef struct listNode {

    struct listNode *prev;

    struct listNode *next;

    void *value;

} listNode;

 

typedef struct listIter {

    listNode *next;

    int direction;

} listIter;

 

typedef struct list {

    listNode *head;

    listNode *tail;

    void *(*dup)(void *ptr);

    void (*free)(void *ptr);

    int (*match)(void *ptr, void *key);

    unsigned long len;

} list;

 

list *listCreate(void)

{

    struct list *list;

 

    if ((list = zmalloc(sizeof(*list))) == NULL)

        return NULL;

    list->head = list->tail = NULL;

    list->len = 0;

    list->dup = NULL;

    list->free = NULL;

    list->match = NULL;

    return list;

}

8.2.   pub

 

/*

 * Publish a message to all the subscribers.

 */

int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {

    int receivers = 0;

    dictEntry *de;

    dictIterator *di;

    listNode *ln;

    listIter li;

 

    /* Send to clients listening for that channel */

    de = dictFind(*type.serverPubSubChannels, channel);

    if (de) {

        list *list = dictGetVal(de);

        listNode *ln;

        listIter li;

 

        listRewind(list,&li);

        while ((ln = listNext(&li)) != NULL) {

            client *c = ln->value;

            addReplyPubsubMessage(c,channel,message,*type.messageBulk);

            updateClientMemUsageAndBucket(c);

            receivers++;

        }

    }

 

    if (type.shard) {

        /* Shard pubsub ignores patterns. */

        return receivers;

    }

 

    /* Send to clients listening to matching channels */

    di = dictGetIterator(server.pubsub_patterns);

    if (di) {

        channel = getDecodedObject(channel);

        while((de = dictNext(di)) != NULL) {

            robj *pattern = dictGetKey(de);

            list *clients = dictGetVal(de);

            if (!stringmatchlen((char*)pattern->ptr,

                                sdslen(pattern->ptr),

                                (char*)channel->ptr,

                                sdslen(channel->ptr),0)) continue;

 

            listRewind(clients,&li);

            while ((ln = listNext(&li)) != NULL) {

                client *c = listNodeValue(ln);

                addReplyPubsubPatMessage(c,pattern,channel,message);

                updateClientMemUsageAndBucket(c);

                receivers++;

            }

        }

        decrRefCount(channel);

        dictReleaseIterator(di);

    }

    return receivers;

}

 

8.3.   sub

 

int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {

    dictEntry *de;

    list *clients = NULL;

    int retval = 0;

 

    /* Add the channel to the client -> channels hash table */

    if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {

        retval = 1;

        incrRefCount(channel);

        /* Add the client to the channel -> list of clients hash table */

        de = dictFind(*type.serverPubSubChannels, channel);

        if (de == NULL) {

            clients = listCreate();

            dictAdd(*type.serverPubSubChannels, channel, clients);

            incrRefCount(channel);

        } else {

            clients = dictGetVal(de);

        }

        listAddNodeTail(clients,c);

    }

    /* Notify the client */

    addReplyPubsubSubscribed(c,channel,type);

    return retval;

}

 

 

 

8.  db

8.1.   replication

 

 

8.1.1.    sync

 

void syncCommand(client *c) {

    /* ignore SYNC if already slave or in monitor mode */

    if (c->flags & CLIENT_SLAVE) return;

 

    /* Check if this is a failover request to a replica with the same replid and

     * become a master if so. */

    if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&

        !strcasecmp(c->argv[3]->ptr,"failover"))

    {

        serverLog(LL_WARNING, "Failover request received for replid %s.",

            (unsigned char *)c->argv[1]->ptr);

        if (!server.masterhost) {

            addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");

            return;

        }

 

        if (!strcasecmp(c->argv[1]->ptr,server.replid)) {

            replicationUnsetMaster();

            sds client = catClientInfoString(sdsempty(),c);

            serverLog(LL_NOTICE,

                "MASTER MODE enabled (failover request from '%s')",client);

            sdsfree(client);

        } else {

            addReplyError(c, "PSYNC FAILOVER replid must match my replid.");

            return;           

        }

    }

 

    /* Don't let replicas sync with us while we're failing over */

    if (server.failover_state != NO_FAILOVER) {

        addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over");

        return;

    }

 

    /* Refuse SYNC requests if we are a slave but the link with our master

     * is not ok... */

    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {

        addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master");

        return;

    }

 

    /* SYNC can't be issued when the server has pending data to send to

     * the client about already issued commands. We need a fresh reply

     * buffer registering the differences between the BGSAVE and the current

     * dataset, so that we can copy to other slaves if needed. */

    if (clientHasPendingReplies(c)) {

        addReplyError(c,"SYNC and PSYNC are invalid with pending output");

        return;

    }

 

    /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered

     * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing

     * use of a socket is handled, if needed, in `startBgsaveForReplication`. */

    if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) {

        addReplyError(c,"Filtered replica requires EOF capability");

        return;

    }

 

    serverLog(LL_NOTICE,"Replica %s asks for synchronization",

        replicationGetSlaveName(c));

 

    /* Try a partial resynchronization if this is a PSYNC command.

     * If it fails, we continue with usual full resynchronization, however

     * when this happens replicationSetupSlaveForFullResync will replied

     * with:

     *

     * +FULLRESYNC <replid> <offset>

     *

     * So the slave knows the new replid and offset to try a PSYNC later

     * if the connection with the master is lost. */

    if (!strcasecmp(c->argv[0]->ptr,"psync")) {

        long long psync_offset;

        if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) {

            serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset",

                      replicationGetSlaveName(c));

            return;

        }

 

        if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {

            server.stat_sync_partial_ok++;

            return; /* No full resync needed, return. */

        } else {

            char *master_replid = c->argv[1]->ptr;

 

            /* Increment stats for failed PSYNCs, but only if the

             * replid is not "?", as this is used by slaves to force a full

             * resync on purpose when they are not able to partially

             * resync. */

            if (master_replid[0] != '?') server.stat_sync_partial_err++;

        }

    } else {

        /* If a slave uses SYNC, we are dealing with an old implementation

         * of the replication protocol (like redis-cli --slave). Flag the client

         * so that we don't expect to receive REPLCONF ACK feedbacks. */

        c->flags |= CLIENT_PRE_PSYNC;

    }

 

    /* Full resynchronization. */

    server.stat_sync_full++;

 

    /* Setup the slave as one waiting for BGSAVE to start. The following code

     * paths will change the state if we handle the slave differently. */

    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;

    if (server.repl_disable_tcp_nodelay)

        connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */

    c->repldbfd = -1;

    c->flags |= CLIENT_SLAVE;

    listAddNodeTail(server.slaves,c);

 

    /* Create the replication backlog if needed. */

    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {

        /* When we create the backlog from scratch, we always use a new

         * replication ID and clear the ID2, since there is no valid

         * past history. */

        changeReplicationId();

        clearReplicationId2();

        createReplicationBacklog();

        serverLog(LL_NOTICE,"Replication backlog created, my new "

                            "replication IDs are '%s' and '%s'",

                            server.replid, server.replid2);

    }

 

    /* CASE 1: BGSAVE is in progress, with disk target. */

    if (server.child_type == CHILD_TYPE_RDB &&

        server.rdb_child_type == RDB_CHILD_TYPE_DISK)

    {

        /* Ok a background save is in progress. Let's check if it is a good

         * one for replication, i.e. if there is another slave that is

         * registering differences since the server forked to save. */

        client *slave;

        listNode *ln;

        listIter li;

 

        listRewind(server.slaves,&li);

        while((ln = listNext(&li))) {

            slave = ln->value;

            /* If the client needs a buffer of commands, we can't use

             * a replica without replication buffer. */

            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&

                (!(slave->flags & CLIENT_REPL_RDBONLY) ||

                 (c->flags & CLIENT_REPL_RDBONLY)))

                break;

        }

        /* To attach this slave, we check that it has at least all the

         * capabilities of the slave that triggered the current BGSAVE

         * and its exact requirements. */

        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&

            c->slave_req == slave->slave_req) {

            /* Perfect, the server is already registering differences for

             * another slave. Set the right state, and copy the buffer.

             * We don't copy buffer if clients don't want. */

            if (!(c->flags & CLIENT_REPL_RDBONLY))

                copyReplicaOutputBuffer(c,slave);

            replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);

            serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");

        } else {

            /* No way, we need to wait for the next BGSAVE in order to

             * register differences. */

            serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");

        }

 

    /* CASE 2: BGSAVE is in progress, with socket target. */

    } else if (server.child_type == CHILD_TYPE_RDB &&

               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)

    {

        /* There is an RDB child process but it is writing directly to

         * children sockets. We need to wait for the next BGSAVE

         * in order to synchronize. */

        serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");

 

    /* CASE 3: There is no BGSAVE is in progress. */

    } else {

        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&

            server.repl_diskless_sync_delay)

        {

            /* Diskless replication RDB child is created inside

             * replicationCron() since we want to delay its start a

             * few seconds to wait for more slaves to arrive. */

            serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");

        } else {

            /* We don't have a BGSAVE in progress, let's start one. Diskless

             * or disk-based mode is determined by replica's capacity. */

            if (!hasActiveChildProcess()) {

                startBgsaveForReplication(c->slave_capa, c->slave_req);

            } else {

                serverLog(LL_NOTICE,

                    "No BGSAVE in progress, but another BG operation is active. "

                    "BGSAVE for replication delayed");

            }

        }

    }

    return;

}

 

8.1.2.    replication

 

int replicaPutOnline(client *slave) {

    if (slave->flags & CLIENT_REPL_RDBONLY) {

        slave->replstate = SLAVE_STATE_RDB_TRANSMITTED;

        /* The client asked for RDB only so we should close it ASAP */

        serverLog(LL_NOTICE,

                  "RDB transfer completed, rdb only replica (%s) should be disconnected asap",

                  replicationGetSlaveName(slave));

        return 0;

    }

    slave->replstate = SLAVE_STATE_ONLINE;

    slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */

 

    refreshGoodSlavesCount();

    /* Fire the replica change modules event. */

    moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,

                          REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,

                          NULL);

    serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",

        replicationGetSlaveName(slave));

    return 1;

}

 

 

void moduleFireServerEvent(uint64_t eid, int subid, void *data) {

    /* Fast path to return ASAP if there is nothing to do, avoiding to

     * setup the iterator and so forth: we want this call to be extremely

     * cheap if there are no registered modules. */

    if (listLength(RedisModule_EventListeners) == 0) return;

 

    listIter li;

    listNode *ln;

    listRewind(RedisModule_EventListeners,&li);

    while((ln = listNext(&li))) {

        RedisModuleEventListener *el = ln->value;

        if (el->event.id == eid) {

            RedisModuleCtx ctx;

            if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {

                /* In the case of client changes, we're pushing the real client

                 * so the event handler can mutate it if needed. For example,

                 * to change its authentication state in a way that does not

                 * depend on specific commands executed later.

                 */

                moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_NONE);

                ctx.client = (client *) data;

            } else {

                moduleCreateContext(&ctx,el->module,REDISMODULE_CTX_TEMP_CLIENT);

            }

 

            void *moduledata = NULL;

            RedisModuleClientInfoV1 civ1;

            RedisModuleReplicationInfoV1 riv1;

            RedisModuleModuleChangeV1 mcv1;

 

            /* Event specific context and data pointer setup. */

            if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) {

                serverAssert(modulePopulateClientInfoStructure(&civ1,data, el->event.dataver) == REDISMODULE_OK);

                moduledata = &civ1;

            } else if (eid == REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED) {

                serverAssert(modulePopulateReplicationInfoStructure(&riv1,el->event.dataver) == REDISMODULE_OK);

                moduledata = &riv1;

            } else if (eid == REDISMODULE_EVENT_FLUSHDB) {

                moduledata = data;

                RedisModuleFlushInfoV1 *fi = data;

                if (fi->dbnum != -1)

                    selectDb(ctx.client, fi->dbnum);

            } else if (eid == REDISMODULE_EVENT_MODULE_CHANGE) {

                RedisModule *m = data;

                if (m == el->module) {

                    moduleFreeContext(&ctx);

                    continue;

                }

                mcv1.version = REDISMODULE_MODULE_CHANGE_VERSION;

                mcv1.module_name = m->name;

                mcv1.module_version = m->ver;

                moduledata = &mcv1;

            } else if (eid == REDISMODULE_EVENT_LOADING_PROGRESS) {

                moduledata = data;

            } else if (eid == REDISMODULE_EVENT_CRON_LOOP) {

                moduledata = data;

            } else if (eid == REDISMODULE_EVENT_SWAPDB) {

                moduledata = data;

            } else if (eid == REDISMODULE_EVENT_CONFIG) {

                moduledata = data;

            }

 

            el->module->in_hook++;

            el->callback(&ctx,el->event,subid,moduledata);

            el->module->in_hook--;

 

            moduleFreeContext(&ctx);

        }

    }

}

 

 

 

8.2.   Rdb

8.2.1.    rio

struct _rio {

    /* Backend functions.

     * Since this functions do not tolerate short writes or reads the return

     * value is simplified to: zero on error, non zero on complete success. */

    size_t (*read)(struct _rio *, void *buf, size_t len);

    size_t (*write)(struct _rio *, const void *buf, size_t len);

    off_t (*tell)(struct _rio *);

    int (*flush)(struct _rio *);

    /* The update_cksum method if not NULL is used to compute the checksum of

     * all the data that was read or written so far. The method should be

     * designed so that can be called with the current checksum, and the buf

     * and len fields pointing to the new block of data to add to the checksum

     * computation. */

    void (*update_cksum)(struct _rio *, const void *buf, size_t len);

 

    /* The current checksum and flags (see RIO_FLAG_*) */

    uint64_t cksum, flags;

 

    /* number of bytes read or written */

    size_t processed_bytes;

 

    /* maximum single read or write chunk size */

    size_t max_processing_chunk;

 

    /* Backend-specific vars. */

    union {

        /* In-memory buffer target. */

        struct {

            sds ptr;

            off_t pos;

        } buffer;

        /* Stdio file pointer target. */

        struct {

            FILE *fp;

            off_t buffered; /* Bytes written since last fsync. */

            off_t autosync; /* fsync after 'autosync' bytes written. */

        } file;

        /* Connection object (used to read from socket) */

        struct {

            connection *conn;   /* Connection */

            off_t pos;    /* pos in buf that was returned */

            sds buf;      /* buffered data */

            size_t read_limit;  /* don't allow to buffer/read more than that */

            size_t read_so_far; /* amount of data read from the rio (not buffered) */

        } conn;

        /* FD target (used to write to pipe). */

        struct {

            int fd;       /* File descriptor. */

            off_t pos;

            sds buf;

        } fd;

    } io;

};

 

typedef struct _rio rio;

8.2.2.    Savedb

 

/* Save the DB on disk. Return C_ERR on error, C_OK on success. */

int rdbSave(int req, char *filename, rdbSaveInfo *rsi) {

    char tmpfile[256];

    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */

    FILE *fp = NULL;

    rio rdb;

    int error = 0;

    char *err_op;    /* For a detailed log */

 

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());

    fp = fopen(tmpfile,"w");

    if (!fp) {

        char *str_err = strerror(errno);

        char *cwdp = getcwd(cwd,MAXPATHLEN);

        serverLog(LL_WARNING,

            "Failed opening the temp RDB file %s (in server root dir %s) "

            "for saving: %s",

            tmpfile,

            cwdp ? cwdp : "unknown",

            str_err);

        return C_ERR;

    }

 

    rioInitWithFile(&rdb,fp);

    startSaving(RDBFLAGS_NONE);

 

    if (server.rdb_save_incremental_fsync)

        rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);

 

    if (rdbSaveRio(req,&rdb,&error,RDBFLAGS_NONE,rsi) == C_ERR) {

        errno = error;

        err_op = "rdbSaveRio";

        goto werr;

    }

 

    /* Make sure data will not remain on the OS's output buffers */

    if (fflush(fp)) { err_op = "fflush"; goto werr; }

    if (fsync(fileno(fp))) { err_op = "fsync"; goto werr; }

    if (fclose(fp)) { fp = NULL; err_op = "fclose"; goto werr; }

    fp = NULL;

   

    /* Use RENAME to make sure the DB file is changed atomically only

     * if the generate DB file is ok. */

    if (rename(tmpfile,filename) == -1) {

        char *str_err = strerror(errno);

        char *cwdp = getcwd(cwd,MAXPATHLEN);

        serverLog(LL_WARNING,

            "Error moving temp DB file %s on the final "

            "destination %s (in server root dir %s): %s",

            tmpfile,

            filename,

            cwdp ? cwdp : "unknown",

            str_err);

        unlink(tmpfile);

        stopSaving(0);

        return C_ERR;

    }

    if (fsyncFileDir(filename) == -1) { err_op = "fsyncFileDir"; goto werr; }

 

    serverLog(LL_NOTICE,"DB saved on disk");

    server.dirty = 0;

    server.lastsave = time(NULL);

    server.lastbgsave_status = C_OK;

    stopSaving(1);

    return C_OK;

 

werr:

    serverLog(LL_WARNING,"Write error saving DB on disk(%s): %s", err_op, strerror(errno));

    if (fp) fclose(fp);

    unlink(tmpfile);

    stopSaving(0);

    return C_ERR;

}

 

 

8.2.3.    file

void rioInitWithFile(rio *r, FILE *fp) {

    *r = rioFileIO;

    r->io.file.fp = fp;

    r->io.file.buffered = 0;

    r->io.file.autosync = 0;

}

 

 

8.3.   Aof

8.3.1.    Append

 

 

/* Called when the user switches from "appendonly yes" to "appendonly no"

 * at runtime using the CONFIG command. */

void stopAppendOnly(void) {

    serverAssert(server.aof_state != AOF_OFF);

    flushAppendOnlyFile(1);

    if (redis_fsync(server.aof_fd) == -1) {

        serverLog(LL_WARNING,"Fail to fsync the AOF file: %s",strerror(errno));

    } else {

        server.aof_fsync_offset = server.aof_current_size;

        server.aof_last_fsync = server.unixtime;

    }

    close(server.aof_fd);

 

    server.aof_fd = -1;

    server.aof_selected_db = -1;

    server.aof_state = AOF_OFF;

    server.aof_rewrite_scheduled = 0;

    server.aof_last_incr_size = 0;

    killAppendOnlyChild();

    sdsfree(server.aof_buf);

    server.aof_buf = sdsempty();

}

 

8.3.2.    file

void flushAppendOnlyFile(int force) {

    ssize_t nwritten;

    int sync_in_progress = 0;

    mstime_t latency;

 

    if (sdslen(server.aof_buf) == 0) {

        /* Check if we need to do fsync even the aof buffer is empty,

         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is

         * called only when aof buffer is not empty, so if users

         * stop write commands before fsync called in one second,

         * the data in page cache cannot be flushed in time. */

        if (server.aof_fsync == AOF_FSYNC_EVERYSEC &&

            server.aof_fsync_offset != server.aof_current_size &&

            server.unixtime > server.aof_last_fsync &&

            !(sync_in_progress = aofFsyncInProgress())) {

            goto try_fsync;

        } else {

            return;

        }

    }

 

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)

        sync_in_progress = aofFsyncInProgress();

 

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {

        /* With this append fsync policy we do background fsyncing.

         * If the fsync is still in progress we can try to delay

         * the write for a couple of seconds. */

        if (sync_in_progress) {

            if (server.aof_flush_postponed_start == 0) {

                /* No previous write postponing, remember that we are

                 * postponing the flush and return. */

                server.aof_flush_postponed_start = server.unixtime;

                return;

            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {

                /* We were already waiting for fsync to finish, but for less

                 * than two seconds this is still ok. Postpone again. */

                return;

            }

            /* Otherwise fall through, and go write since we can't wait

             * over two seconds. */

            server.aof_delayed_fsync++;

            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");

        }

    }

    /* We want to perform a single write. This should be guaranteed atomic

     * at least if the filesystem we are writing is a real physical one.

     * While this will save us against the server being killed I don't think

     * there is much to do about the whole server stopping for power problems

     * or alike */

 

    if (server.aof_flush_sleep && sdslen(server.aof_buf)) {

        usleep(server.aof_flush_sleep);

    }

 

    latencyStartMonitor(latency);

    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));

    latencyEndMonitor(latency);

    /* We want to capture different events for delayed writes:

     * when the delay happens with a pending fsync, or with a saving child

     * active, and when the above two conditions are missing.

     * We also use an additional event name to save all samples which is

     * useful for graphing / monitoring purposes. */

    if (sync_in_progress) {

        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);

    } else if (hasActiveChildProcess()) {

        latencyAddSampleIfNeeded("aof-write-active-child",latency);

    } else {

        latencyAddSampleIfNeeded("aof-write-alone",latency);

    }

    latencyAddSampleIfNeeded("aof-write",latency);

 

    /* We performed the write so reset the postponed flush sentinel to zero. */

    server.aof_flush_postponed_start = 0;

 

    if (nwritten != (ssize_t)sdslen(server.aof_buf)) {

        static time_t last_write_error_log = 0;

        int can_log = 0;

 

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */

        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {

            can_log = 1;

            last_write_error_log = server.unixtime;

        }

 

        /* Log the AOF write error and record the error code. */

        if (nwritten == -1) {

            if (can_log) {

                serverLog(LL_WARNING,"Error writing to the AOF file: %s",

                    strerror(errno));

            }

            server.aof_last_write_errno = errno;

        } else {

            if (can_log) {

                serverLog(LL_WARNING,"Short write while writing to "

                                       "the AOF file: (nwritten=%lld, "

                                       "expected=%lld)",

                                       (long long)nwritten,

                                       (long long)sdslen(server.aof_buf));

            }

 

            if (ftruncate(server.aof_fd, server.aof_last_incr_size) == -1) {

                if (can_log) {

                    serverLog(LL_WARNING, "Could not remove short write "

                             "from the append-only file.  Redis may refuse "

                             "to load the AOF the next time it starts.  "

                             "ftruncate: %s", strerror(errno));

                }

            } else {

                /* If the ftruncate() succeeded we can set nwritten to

                 * -1 since there is no longer partial data into the AOF. */

                nwritten = -1;

            }

            server.aof_last_write_errno = ENOSPC;

        }

 

        /* Handle the AOF write error. */

        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

            /* We can't recover when the fsync policy is ALWAYS since the reply

             * for the client is already in the output buffers (both writes and

             * reads), and the changes to the db can't be rolled back. Since we

             * have a contract with the user that on acknowledged or observed

             * writes are is synced on disk, we must exit. */

            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");

            exit(1);

        } else {

            /* Recover from failed write leaving data into the buffer. However

             * set an error to stop accepting writes as long as the error

             * condition is not cleared. */

            server.aof_last_write_status = C_ERR;

 

            /* Trim the sds buffer if there was a partial write, and there

             * was no way to undo it with ftruncate(2). */

            if (nwritten > 0) {

                server.aof_current_size += nwritten;

                server.aof_last_incr_size += nwritten;

                sdsrange(server.aof_buf,nwritten,-1);

            }

            return; /* We'll try again on the next call... */

        }

    } else {

        /* Successful write(2). If AOF was in error state, restore the

         * OK state and log the event. */

        if (server.aof_last_write_status == C_ERR) {

            serverLog(LL_WARNING,

                "AOF write error looks solved, Redis can write again.");

            server.aof_last_write_status = C_OK;

        }

    }

    server.aof_current_size += nwritten;

    server.aof_last_incr_size += nwritten;

 

    /* Re-use AOF buffer when it is small enough. The maximum comes from the

     * arena size of 4k minus some overhead (but is otherwise arbitrary). */

    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {

        sdsclear(server.aof_buf);

    } else {

        sdsfree(server.aof_buf);

        server.aof_buf = sdsempty();

    }

 

try_fsync:

    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are

     * children doing I/O in the background. */

    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())

        return;

 

    /* Perform the fsync if needed. */

    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {

        /* redis_fsync is defined as fdatasync() for Linux in order to avoid

         * flushing metadata. */

        latencyStartMonitor(latency);

        /* Let's try to get this data on the disk. To guarantee data safe when

         * the AOF fsync policy is 'always', we should exit if failed to fsync

         * AOF (see comment next to the exit(1) after write error above). */

        if (redis_fsync(server.aof_fd) == -1) {

            serverLog(LL_WARNING,"Can't persist AOF for fsync error when the "

              "AOF fsync policy is 'always': %s. Exiting...", strerror(errno));

            exit(1);

        }

        latencyEndMonitor(latency);

        latencyAddSampleIfNeeded("aof-fsync-always",latency);

        server.aof_fsync_offset = server.aof_current_size;

        server.aof_last_fsync = server.unixtime;

    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&

                server.unixtime > server.aof_last_fsync)) {

        if (!sync_in_progress) {

            aof_background_fsync(server.aof_fd);

            server.aof_fsync_offset = server.aof_current_size;

        }

        server.aof_last_fsync = server.unixtime;

    }

}

 

9.  thread

9.1.   Event

aeEventLoop *aeCreateEventLoop(int setsize) {

    aeEventLoop *eventLoop;

    int i;

 

    monotonicInit();    /* just in case the calling app didn't initialize */

 

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;

    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);

    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);

    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;

    eventLoop->setsize = setsize;

    eventLoop->timeEventHead = NULL;

    eventLoop->timeEventNextId = 0;

    eventLoop->stop = 0;

    eventLoop->maxfd = -1;

    eventLoop->beforesleep = NULL;

    eventLoop->aftersleep = NULL;

    eventLoop->flags = 0;

    if (aeApiCreate(eventLoop) == -1) goto err;

    /* Events with mask == AE_NONE are not set. So let's initialize the

     * vector with it. */

    for (i = 0; i < setsize; i++)

        eventLoop->events[i].mask = AE_NONE;

    return eventLoop;

 

err:

    if (eventLoop) {

        zfree(eventLoop->events);

        zfree(eventLoop->fired);

        zfree(eventLoop);

    }

    return NULL;

}

9.2.   Io thread

void InitServerLast() {

    bioInit();

    initThreadedIO();

    set_jemalloc_bg_thread(server.jemalloc_bg_thread);

    server.initial_memory_usage = zmalloc_used_memory();

}

 

void bioInit(void) {

    pthread_attr_t attr;

    pthread_t thread;

    size_t stacksize;

    int j;

 

    /* Initialization of state vars and objects */

    for (j = 0; j < BIO_NUM_OPS; j++) {

        pthread_mutex_init(&bio_mutex[j],NULL);

        pthread_cond_init(&bio_newjob_cond[j],NULL);

        pthread_cond_init(&bio_step_cond[j],NULL);

        bio_jobs[j] = listCreate();

        bio_pending[j] = 0;

    }

 

    /* Set the stack size as by default it may be small in some system */

    pthread_attr_init(&attr);

    pthread_attr_getstacksize(&attr,&stacksize);

    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */

    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;

    pthread_attr_setstacksize(&attr, stacksize);

 

    /* Ready to spawn our threads. We use the single argument the thread

     * function accepts in order to pass the job ID the thread is

     * responsible of. */

    for (j = 0; j < BIO_NUM_OPS; j++) {

        void *arg = (void*)(unsigned long) j;

        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {

            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");

            exit(1);

        }

        bio_threads[j] = thread;

    }

}

/* Initialize the data structures needed for threaded I/O. */

void initThreadedIO(void) {

    server.io_threads_active = 0; /* We start with threads not active. */

 

    /* Indicate that io-threads are currently idle */

    io_threads_op = IO_THREADS_OP_IDLE;

 

    /* Don't spawn any thread if the user selected a single thread:

     * we'll handle I/O directly from the main thread. */

    if (server.io_threads_num == 1) return;

 

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {

        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "

                             "The maximum number is %d.", IO_THREADS_MAX_NUM);

        exit(1);

    }

 

    /* Spawn and initialize the I/O threads. */

    for (int i = 0; i < server.io_threads_num; i++) {

        /* Things we do for all the threads including the main thread. */

        io_threads_list[i] = listCreate();

        if (i == 0) continue; /* Thread 0 is the main thread. */

 

        /* Things we do only for the additional threads. */

        pthread_t tid;

        pthread_mutex_init(&io_threads_mutex[i],NULL);

        setIOPendingCount(i, 0);

        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */

        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {

            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");

            exit(1);

        }

        io_threads[i] = tid;

    }

}

 

 

10.       HA

10.1. Sentinel

10.1.1.  Ping

int sentinelSendPing(sentinelRedisInstance *ri) {

    int retval = redisAsyncCommand(ri->link->cc,

        sentinelPingReplyCallback, ri, "%s",

        sentinelInstanceMapCommand(ri,"PING"));

    if (retval == C_OK) {

        ri->link->pending_commands++;

        ri->link->last_ping_time = mstime();

        /* We update the active ping time only if we received the pong for

         * the previous ping, otherwise we are technically waiting since the

         * first ping that did not receive a reply. */

        if (ri->link->act_ping_time == 0)

            ri->link->act_ping_time = ri->link->last_ping_time;

        return 1;

    } else {

        return 0;

    }

}

10.1.2.  Select

 

sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {

    sentinelRedisInstance **instance =

        zmalloc(sizeof(instance[0])*dictSize(master->slaves));

    sentinelRedisInstance *selected = NULL;

    int instances = 0;

    dictIterator *di;

    dictEntry *de;

    mstime_t max_master_down_time = 0;

 

    if (master->flags & SRI_S_DOWN)

        max_master_down_time += mstime() - master->s_down_since_time;

    max_master_down_time += master->down_after_period * 10;

 

    di = dictGetIterator(master->slaves);

 

    while((de = dictNext(di)) != NULL) {

        sentinelRedisInstance *slave = dictGetVal(de);

        mstime_t info_validity_time;

 

        if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;

        if (slave->link->disconnected) continue;

        if (mstime() - slave->link->last_avail_time > sentinel_ping_period*5) continue;

        if (slave->slave_priority == 0) continue;

 

        /* If the master is in SDOWN state we get INFO for slaves every second.

         * Otherwise we get it with the usual period so we need to account for

         * a larger delay. */

        if (master->flags & SRI_S_DOWN)

            info_validity_time = sentinel_ping_period*5;

        else

            info_validity_time = sentinel_info_period*3;

        if (mstime() - slave->info_refresh > info_validity_time) continue;

        if (slave->master_link_down_time > max_master_down_time) continue;

        instance[instances++] = slave;

    }

    dictReleaseIterator(di);

    if (instances) {

        qsort(instance,instances,sizeof(sentinelRedisInstance*),

            compareSlavesForPromotion);

        selected = instance[0];

    }

    zfree(instance);

    return selected;

}

 

10.1.3.  Salveof

int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {

    char portstr[32];

    const char *host;

    int retval;

 

    /* If host is NULL we send SLAVEOF NO ONE that will turn the instance

    * into a master. */

    if (!addr) {

        host = "NO";

        memcpy(portstr,"ONE",4);

    } else {

        host = announceSentinelAddr(addr);

        ll2string(portstr,sizeof(portstr),addr->port);

    }

 

    /* In order to send SLAVEOF in a safe way, we send a transaction performing

     * the following tasks:

     * 1) Reconfigure the instance according to the specified host/port params.

     * 2) Rewrite the configuration.

     * 3) Disconnect all clients (but this one sending the command) in order

     *    to trigger the ask-master-on-reconnection protocol for connected

     *    clients.

     *

     * Note that we don't check the replies returned by commands, since we

     * will observe instead the effects in the next INFO output. */

    retval = redisAsyncCommand(ri->link->cc,

        sentinelDiscardReplyCallback, ri, "%s",

        sentinelInstanceMapCommand(ri,"MULTI"));

    if (retval == C_ERR) return retval;

    ri->link->pending_commands++;

 

    retval = redisAsyncCommand(ri->link->cc,

        sentinelDiscardReplyCallback, ri, "%s %s %s",

        sentinelInstanceMapCommand(ri,"SLAVEOF"),

        host, portstr);

    if (retval == C_ERR) return retval;

    ri->link->pending_commands++;

 

    retval = redisAsyncCommand(ri->link->cc,

        sentinelDiscardReplyCallback, ri, "%s REWRITE",

        sentinelInstanceMapCommand(ri,"CONFIG"));

    if (retval == C_ERR) return retval;

    ri->link->pending_commands++;

 

    /* CLIENT KILL TYPE <type> is only supported starting from Redis 2.8.12,

     * however sending it to an instance not understanding this command is not

     * an issue because CLIENT is variadic command, so Redis will not

     * recognized as a syntax error, and the transaction will not fail (but

     * only the unsupported command will fail). */

    for (int type = 0; type < 2; type++) {

        retval = redisAsyncCommand(ri->link->cc,

            sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",

            sentinelInstanceMapCommand(ri,"CLIENT"),

            type == 0 ? "normal" : "pubsub");

        if (retval == C_ERR) return retval;

        ri->link->pending_commands++;

    }

 

    retval = redisAsyncCommand(ri->link->cc,

        sentinelDiscardReplyCallback, ri, "%s",

        sentinelInstanceMapCommand(ri,"EXEC"));

    if (retval == C_ERR) return retval;

    ri->link->pending_commands++;

 

    return C_OK;

}

 

10.2. Cluster

10.2.1.  Cluster node

 

typedef struct clusterNode {

    mstime_t ctime; /* Node object creation time. */

    char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */

    int flags;      /* CLUSTER_NODE_... */

    uint64_t configEpoch; /* Last configEpoch observed for this node */

    unsigned char slots[CLUSTER_SLOTS/8]; /* slots handled by this node */

    uint16_t *slot_info_pairs; /* Slots info represented as (start/end) pair (consecutive index). */

    int slot_info_pairs_count; /* Used number of slots in slot_info_pairs */

    int numslots;   /* Number of slots handled by this node */

    int numslaves;  /* Number of slave nodes, if this is a master */

    struct clusterNode **slaves; /* pointers to slave nodes */

    struct clusterNode *slaveof; /* pointer to the master node. Note that it

                                    may be NULL even if the node is a slave

                                    if we don't have the master node in our

                                    tables. */

    unsigned long long last_in_ping_gossip; /* The number of the last carried in the ping gossip section */

    mstime_t ping_sent;      /* Unix time we sent latest ping */

    mstime_t pong_received;  /* Unix time we received the pong */

    mstime_t data_received;  /* Unix time we received any data */

    mstime_t fail_time;      /* Unix time when FAIL flag was set */

    mstime_t voted_time;     /* Last time we voted for a slave of this master */

    mstime_t repl_offset_time;  /* Unix time we received offset for this node */

    mstime_t orphaned_time;     /* Starting time of orphaned master condition */

    long long repl_offset;      /* Last known repl offset for this node. */

    char ip[NET_IP_STR_LEN];    /* Latest known IP address of this node */

    sds hostname;               /* The known hostname for this node */

    int port;                   /* Latest known clients port (TLS or plain). */

    int pport;                  /* Latest known clients plaintext port. Only used

                                   if the main clients port is for TLS. */

    int cport;                  /* Latest known cluster port of this node. */

    clusterLink *link;          /* TCP/IP link established toward this node */

    clusterLink *inbound_link;  /* TCP/IP link accepted from this node */

    list *fail_reports;         /* List of nodes signaling this as failing */

} clusterNode;

 

10.2.2.  Cluster link

 

/* clusterLink encapsulates everything needed to talk with a remote node. */

typedef struct clusterLink {

    mstime_t ctime;             /* Link creation time */

    connection *conn;           /* Connection to remote node */

    sds sndbuf;                 /* Packet send buffer */

    char *rcvbuf;               /* Packet reception buffer */

    size_t rcvbuf_len;          /* Used size of rcvbuf */

    size_t rcvbuf_alloc;        /* Allocated size of rcvbuf */

    struct clusterNode *node;   /* Node related to this link. Initialized to NULL when unknown */

    int inbound;                /* 1 if this link is an inbound link accepted from the related node */

} clusterLink;

 

 

10.2.3.  Cluster state

 

typedef struct clusterState {

    clusterNode *myself;  /* This node */

    uint64_t currentEpoch;

    int state;            /* CLUSTER_OK, CLUSTER_FAIL, ... */

    int size;             /* Num of master nodes with at least one slot */

    dict *nodes;          /* Hash table of name -> clusterNode structures */

    dict *nodes_black_list; /* Nodes we don't re-add for a few seconds. */

    clusterNode *migrating_slots_to[CLUSTER_SLOTS];

    clusterNode *importing_slots_from[CLUSTER_SLOTS];

    clusterNode *slots[CLUSTER_SLOTS];

    rax *slots_to_channels;

    /* The following fields are used to take the slave state on elections. */

    mstime_t failover_auth_time; /* Time of previous or next election. */

    int failover_auth_count;    /* Number of votes received so far. */

    int failover_auth_sent;     /* True if we already asked for votes. */

    int failover_auth_rank;     /* This slave rank for current auth request. */

    uint64_t failover_auth_epoch; /* Epoch of the current election. */

    int cant_failover_reason;   /* Why a slave is currently not able to

                                   failover. See the CANT_FAILOVER_* macros. */

    /* Manual failover state in common. */

    mstime_t mf_end;            /* Manual failover time limit (ms unixtime).

                                   It is zero if there is no MF in progress. */

    /* Manual failover state of master. */

    clusterNode *mf_slave;      /* Slave performing the manual failover. */

    /* Manual failover state of slave. */

    long long mf_master_offset; /* Master offset the slave needs to start MF

                                   or -1 if still not received. */

    int mf_can_start;           /* If non-zero signal that the manual failover

                                   can start requesting masters vote. */

    /* The following fields are used by masters to take state on elections. */

    uint64_t lastVoteEpoch;     /* Epoch of the last vote granted. */

    int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */

    /* Stats */

    /* Messages received and sent by type. */

    long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];

    long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];

    long long stats_pfail_nodes;    /* Number of nodes in PFAIL status,

                                       excluding nodes without address. */

    unsigned long long stat_cluster_links_buffer_limit_exceeded;  /* Total number of cluster links freed due to exceeding buffer limit */

} clusterState;

 

10.2.4.  Gossip

 

typedef struct {

    char nodename[CLUSTER_NAMELEN];

    uint32_t ping_sent;

    uint32_t pong_received;

    char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */

    uint16_t port;              /* base port last time it was seen */

    uint16_t cport;             /* cluster port last time it was seen */

    uint16_t flags;             /* node->flags copy */

    uint16_t pport;             /* plaintext-port, when base port is TLS */

    uint16_t notused1;

} clusterMsgDataGossip;

 

10.2.5.  Msg

union clusterMsgData {

    /* PING, MEET and PONG */

    struct {

        /* Array of N clusterMsgDataGossip structures */

        clusterMsgDataGossip gossip[1];

        /* Extension data that can optionally be sent for ping/meet/pong

         * messages. We can't explicitly define them here though, since

         * the gossip array isn't the real length of the gossip data. */

    } ping;

 

    /* FAIL */

    struct {

        clusterMsgDataFail about;

    } fail;

 

    /* PUBLISH */

    struct {

        clusterMsgDataPublish msg;

    } publish;

 

    /* UPDATE */

    struct {

        clusterMsgDataUpdate nodecfg;

    } update;

 

    /* MODULE */

    struct {

        clusterMsgModule msg;

    } module;

};

 

#define CLUSTER_PROTO_VER 1 /* Cluster bus protocol version. */

 

typedef struct {

    char sig[4];        /* Signature "RCmb" (Redis Cluster message bus). */

    uint32_t totlen;    /* Total length of this message */

    uint16_t ver;       /* Protocol version, currently set to 1. */

    uint16_t port;      /* TCP base port number. */

    uint16_t type;      /* Message type */

    uint16_t count;     /* Only used for some kind of messages. */

    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */

    uint64_t configEpoch;   /* The config epoch if it's a master, or the last

                               epoch advertised by its master if it is a

                               slave. */

    uint64_t offset;    /* Master replication offset if node is a master or

                           processed replication offset if node is a slave. */

    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */

    unsigned char myslots[CLUSTER_SLOTS/8];

    char slaveof[CLUSTER_NAMELEN];

    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */

    uint16_t extensions; /* Number of extensions sent along with this packet. */

    char notused1[30];   /* 30 bytes reserved for future usage. */

    uint16_t pport;      /* Sender TCP plaintext port, if base port is TLS */

    uint16_t cport;      /* Sender TCP cluster bus port */

    uint16_t flags;      /* Sender node flags */

    unsigned char state; /* Cluster state from the POV of the sender */

    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */

    union clusterMsgData data;

} clusterMsg;

10.2.6.  Slot

unsigned int keyHashSlot(char *key, int keylen) {

    int s, e; /* start-end indexes of { and } */

 

    for (s = 0; s < keylen; s++)

        if (key[s] == '{') break;

 

    /* No '{' ? Hash the whole key. This is the base case. */

    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

 

    /* '{' found? Check if we have the corresponding '}'. */

    for (e = s+1; e < keylen; e++)

        if (key[e] == '}') break;

 

    /* No '}' or nothing between {} ? Hash the whole key. */

    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

 

    /* If we are here there is both a { and a } on its right. Hash

     * what is in the middle between { and }. */

    return crc16(key+s+1,e-s-1) & 0x3FFF;

}

10.2.7.  Corn

/* This is executed 10 times every second */

void clusterCron(void) {

    dictIterator *di;

    dictEntry *de;

    int update_state = 0;

    int orphaned_masters; /* How many masters there are without ok slaves. */

    int max_slaves; /* Max number of ok slaves for a single master. */

    int this_slaves; /* Number of ok slaves for our master (if we are slave). */

    mstime_t min_pong = 0, now = mstime();

    clusterNode *min_pong_node = NULL;

    static unsigned long long iteration = 0;

    mstime_t handshake_timeout;

 

    iteration++; /* Number of times this function was called so far. */

 

    clusterUpdateMyselfHostname();

 

    /* The handshake timeout is the time after which a handshake node that was

     * not turned into a normal node is removed from the nodes. Usually it is

     * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use

     * the value of 1 second. */

    handshake_timeout = server.cluster_node_timeout;

    if (handshake_timeout < 1000) handshake_timeout = 1000;

 

    /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */

    server.cluster->stats_pfail_nodes = 0;

    /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */

    server.stat_cluster_links_memory = 0;

    /* Run through some of the operations we want to do on each cluster node. */

    di = dictGetSafeIterator(server.cluster->nodes);

    while((de = dictNext(di)) != NULL) {

        clusterNode *node = dictGetVal(de);

        /* The sequence goes:

         * 1. We try to shrink link buffers if possible.

         * 2. We free the links whose buffers are still oversized after possible shrinking.

         * 3. We update the latest memory usage of cluster links.

         * 4. We immediately attempt reconnecting after freeing links.

         */

        clusterNodeCronResizeBuffers(node);

        clusterNodeCronFreeLinkOnBufferLimitReached(node);

        clusterNodeCronUpdateClusterLinksMemUsage(node);

        /* The protocol is that function(s) below return non-zero if the node was

         * terminated.

         */

        if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;

    }

    dictReleaseIterator(di);

 

    /* Ping some random node 1 time every 10 iterations, so that we usually ping

     * one random node every second. */

    if (!(iteration % 10)) {

        int j;

 

        /* Check a few random nodes and ping the one with the oldest

         * pong_received time. */

        for (j = 0; j < 5; j++) {

            de = dictGetRandomKey(server.cluster->nodes);

            clusterNode *this = dictGetVal(de);

 

            /* Don't ping nodes disconnected or with a ping currently active. */

            if (this->link == NULL || this->ping_sent != 0) continue;

            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))

                continue;

            if (min_pong_node == NULL || min_pong > this->pong_received) {

                min_pong_node = this;

                min_pong = this->pong_received;

            }

        }

        if (min_pong_node) {

            serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);

            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);

        }

    }

 

    /* Iterate nodes to check if we need to flag something as failing.

     * This loop is also responsible to:

     * 1) Check if there are orphaned masters (masters without non failing

     *    slaves).

     * 2) Count the max number of non failing slaves for a single master.

     * 3) Count the number of slaves for our master, if we are a slave. */

    orphaned_masters = 0;

    max_slaves = 0;

    this_slaves = 0;

    di = dictGetSafeIterator(server.cluster->nodes);

    while((de = dictNext(di)) != NULL) {

        clusterNode *node = dictGetVal(de);

        now = mstime(); /* Use an updated time at every iteration. */

 

        if (node->flags &

            (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))

                continue;

 

        /* Orphaned master check, useful only if the current instance

         * is a slave that may migrate to another master. */

        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {

            int okslaves = clusterCountNonFailingSlaves(node);

 

            /* A master is orphaned if it is serving a non-zero number of

             * slots, have no working slaves, but used to have at least one

             * slave, or failed over a master that used to have slaves. */

            if (okslaves == 0 && node->numslots > 0 &&

                node->flags & CLUSTER_NODE_MIGRATE_TO)

            {

                orphaned_masters++;

            }

            if (okslaves > max_slaves) max_slaves = okslaves;

            if (myself->slaveof == node)

                this_slaves = okslaves;

        }

 

        /* If we are not receiving any data for more than half the cluster

         * timeout, reconnect the link: maybe there is a connection

         * issue even if the node is alive. */

        mstime_t ping_delay = now - node->ping_sent;

        mstime_t data_delay = now - node->data_received;

        if (node->link && /* is connected */

            now - node->link->ctime >

            server.cluster_node_timeout && /* was not already reconnected */

            node->ping_sent && /* we already sent a ping */

            /* and we are waiting for the pong more than timeout/2 */

            ping_delay > server.cluster_node_timeout/2 &&

            /* and in such interval we are not seeing any traffic at all. */

            data_delay > server.cluster_node_timeout/2)

        {

            /* Disconnect the link, it will be reconnected automatically. */

            freeClusterLink(node->link);

        }

 

        /* If we have currently no active ping in this instance, and the

         * received PONG is older than half the cluster timeout, send

         * a new ping now, to ensure all the nodes are pinged without

         * a too big delay. */

        if (node->link &&

            node->ping_sent == 0 &&

            (now - node->pong_received) > server.cluster_node_timeout/2)

        {

            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);

            continue;

        }

 

        /* If we are a master and one of the slaves requested a manual

         * failover, ping it continuously. */

        if (server.cluster->mf_end &&

            nodeIsMaster(myself) &&

            server.cluster->mf_slave == node &&

            node->link)

        {

            clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);

            continue;

        }

 

        /* Check only if we have an active ping for this instance. */

        if (node->ping_sent == 0) continue;

 

        /* Check if this node looks unreachable.

         * Note that if we already received the PONG, then node->ping_sent

         * is zero, so can't reach this code at all, so we don't risk of

         * checking for a PONG delay if we didn't sent the PING.

         *

         * We also consider every incoming data as proof of liveness, since

         * our cluster bus link is also used for data: under heavy data

         * load pong delays are possible. */

        mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :

                                                          data_delay;

 

        if (node_delay > server.cluster_node_timeout) {

            /* Timeout reached. Set the node as possibly failing if it is

             * not already in this state. */

            if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {

                serverLog(LL_DEBUG,"*** NODE %.40s possibly failing",

                    node->name);

                node->flags |= CLUSTER_NODE_PFAIL;

                update_state = 1;

            }

        }

    }

    dictReleaseIterator(di);

 

    /* If we are a slave node but the replication is still turned off,

     * enable it if we know the address of our master and it appears to

     * be up. */

    if (nodeIsSlave(myself) &&

        server.masterhost == NULL &&

        myself->slaveof &&

        nodeHasAddr(myself->slaveof))

    {

        replicationSetMaster(myself->slaveof->ip, myself->slaveof->port);

    }

 

    /* Abort a manual failover if the timeout is reached. */

    manualFailoverCheckTimeout();

 

    if (nodeIsSlave(myself)) {

        clusterHandleManualFailover();

        if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))

            clusterHandleSlaveFailover();

        /* If there are orphaned slaves, and we are a slave among the masters

         * with the max number of non-failing slaves, consider migrating to

         * the orphaned masters. Note that it does not make sense to try

         * a migration if there is no master with at least *two* working

         * slaves. */

        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&

           server.cluster_allow_replica_migration)

            clusterHandleSlaveMigration(max_slaves);

    }

 

    if (update_state || server.cluster->state == CLUSTER_FAIL)

        clusterUpdateState();

}

10.2.8.  Query node&move

clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {

    clusterNode *n = NULL;

    robj *firstkey = NULL;

    int multiple_keys = 0;

    multiState *ms, _ms;

    multiCmd mc;

    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,

        existing_keys = 0;

 

    /* Allow any key to be set if a module disabled cluster redirections. */

    if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)

        return myself;

 

    /* Set error code optimistically for the base case. */

    if (error_code) *error_code = CLUSTER_REDIR_NONE;

 

    /* Modules can turn off Redis Cluster redirection: this is useful

     * when writing a module that implements a completely different

     * distributed system. */

 

    /* We handle all the cases as if they were EXEC commands, so we have

     * a common code path for everything */

    if (cmd->proc == execCommand) {

        /* If CLIENT_MULTI flag is not set EXEC is just going to return an

         * error. */

        if (!(c->flags & CLIENT_MULTI)) return myself;

        ms = &c->mstate;

    } else {

        /* In order to have a single codepath create a fake Multi State

         * structure if the client is not in MULTI/EXEC state, this way

         * we have a single codepath below. */

        ms = &_ms;

        _ms.commands = &mc;

        _ms.count = 1;

        mc.argv = argv;

        mc.argc = argc;

        mc.cmd = cmd;

    }

 

    int is_pubsubshard = cmd->proc == ssubscribeCommand ||

            cmd->proc == sunsubscribeCommand ||

            cmd->proc == spublishCommand;

 

    /* Check that all the keys are in the same hash slot, and obtain this

     * slot and the node associated. */

    for (i = 0; i < ms->count; i++) {

        struct redisCommand *mcmd;

        robj **margv;

        int margc, numkeys, j;

        keyReference *keyindex;

 

        mcmd = ms->commands[i].cmd;

        margc = ms->commands[i].argc;

        margv = ms->commands[i].argv;

 

        getKeysResult result = GETKEYS_RESULT_INIT;

        numkeys = getKeysFromCommand(mcmd,margv,margc,&result);

        keyindex = result.keys;

 

        for (j = 0; j < numkeys; j++) {

            robj *thiskey = margv[keyindex[j].pos];

            int thisslot = keyHashSlot((char*)thiskey->ptr,

                                       sdslen(thiskey->ptr));

 

            if (firstkey == NULL) {

                /* This is the first key we see. Check what is the slot

                 * and node. */

                firstkey = thiskey;

                slot = thisslot;

                n = server.cluster->slots[slot];

 

                /* Error: If a slot is not served, we are in "cluster down"

                 * state. However the state is yet to be updated, so this was

                 * not trapped earlier in processCommand(). Report the same

                 * error to the client. */

                if (n == NULL) {

                    getKeysFreeResult(&result);

                    if (error_code)

                        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;

                    return NULL;

                }

 

                /* If we are migrating or importing this slot, we need to check

                 * if we have all the keys in the request (the only way we

                 * can safely serve the request, otherwise we return a TRYAGAIN

                 * error). To do so we set the importing/migrating state and

                 * increment a counter for every missing key. */

                if (n == myself &&

                    server.cluster->migrating_slots_to[slot] != NULL)

                {

                    migrating_slot = 1;

                } else if (server.cluster->importing_slots_from[slot] != NULL) {

                    importing_slot = 1;

                }

            } else {

                /* If it is not the first key/channel, make sure it is exactly

                 * the same key/channel as the first we saw. */

                if (!equalStringObjects(firstkey,thiskey)) {

                    if (slot != thisslot) {

                        /* Error: multiple keys from different slots. */

                        getKeysFreeResult(&result);

                        if (error_code)

                            *error_code = CLUSTER_REDIR_CROSS_SLOT;

                        return NULL;

                    } else {

                        /* Flag this request as one with multiple different

                         * keys/channels. */

                        multiple_keys = 1;

                    }

                }

            }

 

            /* Migrating / Importing slot? Count keys we don't have.

             * If it is pubsubshard command, it isn't required to check

             * the channel being present or not in the node during the

             * slot migration, the channel will be served from the source

             * node until the migration completes with CLUSTER SETSLOT <slot>

             * NODE <node-id>. */

            int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;

            if ((migrating_slot || importing_slot) && !is_pubsubshard)

            {

                if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;

                else existing_keys++;

            }

        }

        getKeysFreeResult(&result);

    }

 

    /* No key at all in command? then we can serve the request

     * without redirections or errors in all the cases. */

    if (n == NULL) return myself;

 

    uint64_t cmd_flags = getCommandFlags(c);

    /* Cluster is globally down but we got keys? We only serve the request

     * if it is a read command and when allow_reads_when_down is enabled. */

    if (server.cluster->state != CLUSTER_OK) {

        if (is_pubsubshard) {

            if (!server.cluster_allow_pubsubshard_when_down) {

                if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;

                return NULL;

            }

        } else if (!server.cluster_allow_reads_when_down) {

            /* The cluster is configured to block commands when the

             * cluster is down. */

            if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;

            return NULL;

        } else if (cmd_flags & CMD_WRITE) {

            /* The cluster is configured to allow read only commands */

            if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;

            return NULL;

        } else {

            /* Fall through and allow the command to be executed:

             * this happens when server.cluster_allow_reads_when_down is

             * true and the command is not a write command */

        }

    }

 

    /* Return the hashslot by reference. */

    if (hashslot) *hashslot = slot;

 

    /* MIGRATE always works in the context of the local node if the slot

     * is open (migrating or importing state). We need to be able to freely

     * move keys among instances in this case. */

    if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)

        return myself;

 

    /* If we don't have all the keys and we are migrating the slot, send

     * an ASK redirection or TRYAGAIN. */

    if (migrating_slot && missing_keys) {

        /* If we have keys but we don't have all keys, we return TRYAGAIN */

        if (existing_keys) {

            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;

            return NULL;

        } else {

            if (error_code) *error_code = CLUSTER_REDIR_ASK;

            return server.cluster->migrating_slots_to[slot];

        }

    }

 

    /* If we are receiving the slot, and the client correctly flagged the

     * request as "ASKING", we can serve the request. However if the request

     * involves multiple keys and we don't have them all, the only option is

     * to send a TRYAGAIN error. */

    if (importing_slot &&

        (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))

    {

        if (multiple_keys && missing_keys) {

            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;

            return NULL;

        } else {

            return myself;

        }

    }

 

    /* Handle the read-only client case reading from a slave: if this

     * node is a slave and the request is about a hash slot our master

     * is serving, we can reply without redirection. */

    int is_write_command = (cmd_flags & CMD_WRITE) ||

                           (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));

    if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&

        !is_write_command &&

        nodeIsSlave(myself) &&

        myself->slaveof == n)

    {

        return myself;

    }

 

    /* Base case: just return the right node. However if this node is not

     * myself, set error_code to MOVED since we need to issue a redirection. */

    if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;

    return n;

}

11.       References

 

Redis doc

https://redis.io/docs/