Redis 主从复制 - relication 源码分析


1 replicationUnsetMaster - 断开主从关系

/**
 * 断开主从关系
 */ 
void replicationUnsetMaster(void) {

     // 没有设置主节点的, 直接返回
    if (server.masterhost == NULL) 
        return;

    // 释放主节点的 host 配置
    sdsfree(server.masterhost);f
    server.masterhost = NULL;

    // 把当前的 replid 复制到 replid2
    // 重新生成一个新的 40 位随机 id 并赋值到 replid
    shiftReplicationId();

    // 释放代表主节点的 client 对象
    if (server.master) 
        freeClient(server.master);

    // 将 server.cached_server 设置为空, 同时设置释放对应的内存
    replicationDiscardCachedMaster();

    // 取消主从复制的握手
    cancelReplicationHandshake();

    // 关闭所有的从节点客户端
    disconnectSlaves();

    // 当前节点的主从复制状态设置为 REPL_STATE_NONE
    server.repl_state = REPL_STATE_NONE;

    // 从节点选中的数据库为 - 1
    server.slaveseldb = -1;
    server.repl_no_slaves_since = server.unixtime;      
}

1.1 shiftReplicationId - 切换第一组 replid 和 offset 到第二组

void shiftReplicationId(void) {
    // 将 replid 复制到 replid2
    memcpy(server.replid2, server.replid, sizeof(server.replid));

    // 将原本的复制积压缓冲区的偏移量 master_repl_offset + 1 放到第二组的  second_replid_offset 
    server.second_replid_offset = server.master_repl_offset+1;
    
    // 重新生成一个 replid, 并赋值到 server.replid
    changeReplicationId();
}

1.2 changeReplicationId - 重新生成一个 replid

void changeReplicationId(void) {
    // CONFIG_RUN_ID_SIZE = 40, 重新随机生成一个 40 位的 id 并赋值到 replid
    getRandomHexChars(server.replid, CONFIG_RUN_ID_SIZE);
    server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}

1.3 replicationDiscardCachedMaster - 清空缓存的 cached_master

void replicationDiscardCachedMaster(void) {

    if (server.cached_master == NULL) 
        return;

    // CLIENT_MASTER = 1<<1 = 1
    // 先设置为 cashed_master 的 flags = flag & (~CLIENT_MASTER), 简单理解就是把主节点的标识去掉
    server.cached_master->flags &= ~CLIENT_MASTER;

    // 然后再释放 cached_master
    freeClient(server.cached_master);
    server.cached_master = NULL;
}

1.4 cancelReplicationHandshake - 取消主从复制的握手

int cancelReplicationHandshake(void) {

    if (server.repl_state == REPL_STATE_TRANSFER) {
        // 当前从节点的状态为 REPL_STATE_TRANSFER (正在接收从主节点发送过来的 RDB 文件)

        // 停止同步传输
        // 关闭从节点同步主节点的 socket 和临时文件描述符 repl_transfer_tmpfile
        replicationAbortSyncTransfer();

        // 更新状态为 REPL_STATE_CONNECT (未连接上主节点)
        server.repl_state = REPL_STATE_CONNECT;

    } else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) {
        // 当前的状态为 REPL_STATE_CONNECTING (正在建立连接) 或者处于握手阶段

        // 关闭从节点同步主节点信息的 Socket, 对应的 Socket 文件描述符为 repl_transfer_s
        undoConnectWithMaster();

        // 更新状态为 REPL_STATE_CONNECT (未连接上主节点)
        server.repl_state = REPL_STATE_CONNECT;

    } else {
        // 3. 其他的情况 

        // 其他状态返回 0
        return 0;
    }
    return 1;
}

1.5 replicationAbortSyncTransfer - 停止文件同步传输

void replicationAbortSyncTransfer(void) {

    serverAssert(server.repl_state == REPL_STATE_TRANSFER);

    // 关闭从节点同步主节点信息的 Socket repl_transfer_s 和对应的文件描述符 repl_transfer_tmpfile
    undoConnectWithMaster();
    close(server.repl_transfer_fd);
    unlink(server.repl_transfer_tmpfile);
    zfree(server.repl_transfer_tmpfile);
}

1.6 undoConnectWithMaster - 断开和主节点的连接

void undoConnectWithMaster(void) {

    int fd = server.repl_transfer_s;

    // 删除 repl_transfer_s 这个文件描述符的事件
    aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);

    // 关闭这个文件描述符
    close(fd);

    // 重置为 - 1 
    server.repl_transfer_s = -1;
}

1.7 slaveIsInHandshakeState - 判断当前从节点是否处于握手阶段

int slaveIsInHandshakeState(void) {
    // 3 <= server.repl_state <= 13 
    return server.repl_state >= REPL_STATE_RECEIVE_PONG && server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
}

1.8 disconnectSlaves - 关闭所有的从节点

void disconnectSlaves(void) {
    // 遍历自身所有的从节点
    while (listLength(server.slaves)) {
        listNode *ln = listFirst(server.slaves);
        freeClient((client*)ln->value);
    }
}

2 replicationSetMaster - 保存主节点信息并进入待连接状态

void replicationSetMaster(char *ip, int port) {

    / was_master = 当前从节点的原本的主节点为 null
    int was_master = server.masterhost == NULL;

    // 将入参的 IP 和 端口赋值给对应的字段
    sdsfree(server.masterhost);
    server.masterhost = sdsnew(ip);
    server.masterport = port;

    // 旧的主节点客户端存在, 进行释放
    if (server.master) {
        freeClient(server.master);
    }

    // 解除所有阻塞状态的客户端
    disconnectAllBlockedClients(); 

    // 释放所有的从节点信息 
    disconnectSlaves();

    // 取消主从复制的握手
    cancelReplicationHandshake();

    // was_master = true, 也就是当前的从节点是第一次被设置为从节点
    // 创建出一个 cache_server, 用于后续数据复制
    if (was_master) {

        // 将 server.cached_server 设置为空, 同时设置释放对应的内存
        replicationDiscardCachedMaster();

        // 根据当前参数合成出一个 client, 同时将其放到 cached_master
        replicationCacheMasterUsingMyself();
    }

    // 设置当前的主从复制状态为待连接上主节点 (REPL_STATE_CONNECT)
    server.repl_state = REPL_STATE_CONNECT;
}

2.1 disconnectAllBlockedClients - 释放所有阻塞状态的客户端

void disconnectAllBlockedClients(void) {

    listNode *ln;
    listIter li;

    // 将 server.clients 的信息转移到 li 上
    listRewind(server.clients,&li);

    // 遍历 li
    while((ln = listNext(&li))) {

        client *c = listNodeValue(ln);

        // 客户端为阻塞状态
        if (c->flags & CLIENT_BLOCKED) {
            
            addReplySds(c,sdsnew("-UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)\r\n"));

            // 解除所有阻塞状态的客户端
            unblockClient(c);
            c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        }
    }
}

2.2 replicationCacheMasterUsingMyself - 为 server.cached_master 赋值一个默认合成的客户端

void replicationCacheMasterUsingMyself(void) {

    // 设置当前的 master_initial_offset 等于 master_repl_offset
    server.master_initial_offset = server.master_repl_offset;

    // 创建一个新的主节点客户端, 并存放发到 server.master 中
    replicationCreateMasterClient(-1,-1);

    // 将 server.replid 拷贝到创建出来的节点的 replid
    memcpy(server.master->replid, server.replid, sizeof(server.replid));

    // 通过 replicationCreateMasterClient 创建出来的客户端, unlinkClient 里面的逻辑都不符合条件, 直接结束了
    // 可以看成是没有这个方法的逻辑
    unlinkClient(server.master);

    // 设置 cached_master = master, 同时置空 master 
    server.cached_master = server.master;
    server.master = NULL;
}

2.3 replicationCreateMasterClient - 创建主节点客户端

/**
 *  创建主节点客户端
 */
void replicationCreateMasterClient(int fd, int dbid) {

    // 创建新的客户端, 赋值到 master, createClient 创建出来的 flag 默认为 0 
    // 具体的创建逻辑可以查看 networking.c 中的 createClient 函数
    server.master = createClient(fd);

    // 此时 flag = CLIENT_MASTER = 1, 主节点
    server.master->flags |= CLIENT_MASTER;
    server.master->authenticated = 1;
    server.master->reploff = server.master_initial_offset;
    server.master->read_reploff = server.master->reploff;
    memcpy(server.master->replid, server.master_replid, sizeof(server.master_replid));

    // 如果一个主节点的偏移量为 -1, 那么这个主节点是旧的, 并且是无法进行 psync 的, 所以将其设置为 CLIENT_PRE_PSYNC (1 << 16, 65536)
    if (server.master->reploff == -1)
        // | CLIENT_PRE_PSYNC 后, flags 等于 65537
        server.master->flags |= CLIENT_PRE_PSYNC;
    // 指定当前的数据库 (0 - 16 个数据库)
    if (dbid != -1) 
        selectDb(server.master, dbid);
}

3 connectWithMaster - 尝试和主节点建立 TCP 连接

int connectWithMaster(void) {

    int fd;

    // 建立非阻塞的 Tcp 连接
    fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost, server.masterport, NET_FIRST_BIND_ADDR);

    // 文件描述符为 -1, 建立连接失败
    if (fd == -1) {
        return C_ERR;
    }

    // 为 Tcp 连接的文件描述符注册一个的可读可写的事件, 处理的逻辑函数为 syncWithMaster
    // 注意这里注册的类型为 AE_READABLE|AE_WRITABLE, 在 epoll 中, 底层注册的是 EPOLLIN | EPOLLOUT
    // epoll 有个机制, 同时注册 EPOLLIN | EPOLLOUT 事件, 会立即触发一次 EPOLLOUT 事件, 也就是 AE_WRITABLE 事件
    // 也就是在下次事件轮询中会执行一次 syncWithMaster 函数
    if (aeCreateFileEvent(server.el, fd,AE_READABLE|AE_WRITABLE, syncWithMaster, NULL) == AE_ERR) {
        close(fd);
        return C_ERR;
    }

    // 更新从节点和主节点最近一次进行数据同步的时间, 默认为当前时间
    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    // 当前从节点的主从复制的状态为 REPL_STATE_CONNECTING (正在连接主节点)
    server.repl_state = REPL_STATE_CONNECTING;
    return C_OK;
}

4 sendSynchronousCommand - 发送命令到主节点

char *sendSynchronousCommand(int flags, int fd, ...) {

    // 发送写命令
    if (flags & SYNC_CMD_WRITE) {

        // 发送 ping 命令时, 入参为 (SYNC_CMD_WRITE,fd,"PING",NULL)
        // flags = SYNC_CMD_WRITE
        
        char *arg;

        // 下面的 va_ 开头的函数, 都是用来处理动态入参的
        va_list ap;
        sds cmd = sdsempty();
        sds cmdargs = sdsempty();
        size_t argslen = 0;
        va_start(ap,fd);

        while(1) {
            arg = va_arg(ap, char*);

            if (arg == NULL) 
                break;
            // 拼接 cmd 命令的参数
            cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
            argslen++;
        }

        va_end(ap);

        cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
        cmd = sdscatsds(cmd,cmdargs);
        sdsfree(cmdargs);

        // 发送给对应的文件描述符, 这里的 fd 是 Socket 通道
        if (syncWrite(fd, cmd, sdslen(cmd), server.repl_syncio_timeout*1000) == -1) {
            sdsfree(cmd);
            return sdscatprintf(sdsempty(),"-Writing to master: %s",strerror(errno));
        }

        sdsfree(cmd);
    }

    // 接受读操作
    if (flags & SYNC_CMD_READ) {

        char buf[256];
        // 读取数据到 buf 中
        if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) {
            return sdscatprintf(sdsempty(),"-Reading from master: %s", strerror(errno));
        }
        server.repl_transfer_lastio = server.unixtime;
        return sdsnew(buf);
    }
    return NULL;
}

5 slaveTryPartialResynchronization - 根据条件判断是发送部分同步复制还是全量同步复制, 同时支持读取主节点发送信息

int slaveTryPartialResynchronization(int fd, int read_reply) {
    // 入参的 read_reply 0: 表示向主节点写读取 1: 表示读取主节点的数据

    char *psync_replid;
    char psync_offset[32];
    sds reply;

    // 写操作
    if (!read_reply) {
        // 设置初始的偏移量为 -1
        // -1 表示示主节点的 replid 和全局复制偏移量是无效的
        server.master_initial_offset = -1;

        // 主节点的缓存不为空,可以尝试进行部分重同步
        if (server.cached_master) {

            // psync_replid 为缓存的主节点的 replid
            psync_replid = server.cached_master->replid;

            // 获取部分同步的开始位置 , 赋值给 psync_offset
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
        } else {
            // 没有主节点缓存, 进行全量同步
            psync_replid = "?";
            memcpy(psync_offset,"-1",3);
        }
        
        // 部分复制, 最终发送的命令 psync replid repl_offset
        // 全量复制, 最终发送的命令 psync ? -1
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
        
        // 写失败, 删除对应的文件描述符的读监听
        if (reply != NULL) {
            sdsfree(reply);
            aeDeleteFileEvent(server.el,fd,AE_READABLE);
            return PSYNC_WRITE_ERROR;
        }
        return PSYNC_WAIT_REPLY;
    }
    
    // 下面的读操作可以等待下面的 psync 命令应答在回来看, 现在下面不影响整个的流程

    // 读操作
    reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);

    if (sdslen(reply) == 0) {
        // 没有响应数据
        sdsfree(reply);
        return PSYNC_WAIT_REPLY;
    }
    
    // strncmp 比较字符串是否一样, 一样返回 0, 不一样, 返回值为非 0 
    // 比较 reply 前 11 个字符是否为 +FULLRESYNC
    if (!strncmp(reply,"+FULLRESYNC",11)) {
    
        // 响应的数据前面的内容为 +FULLRESYNC

        char *replid = NULL, *offset = NULL;

        // 这个是 psync 命令, 全量复制的响应 

        // 定位到第一个空格的位置
        replid = strchr(reply,' ');

        // 获取响应的 replid
        if (replid) {
            // 定位到 replid 的位置
            replid++;
            offset = strchr(replid,' ');
            if (offset) 
                // 定位到 offset 的位置
                offset++;
        }

        // replid 为  0 || offset 为 0 || offset - replid - 1 不等于 40 (replid 的长度为 40)
        if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {

            // 设置 replid 为 0
            memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);

        } else {
        
            // 将 replid 复制给 server.master_replid 
            memcpy(server.master_replid, replid, offset-replid-1);
            server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';

            // server.master_initial_offset = 获取到的 offset
            server.master_initial_offset = strtoll(offset,NULL,10);
        }
        
        // 全量复制了, 缓存的主节点没有意义, 清空了
        // 将 server.cached_server 设置为空, 同时设置释放对应的内存
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    }
    
    // 响应的前面的数据为 +CONTINUE
    if (!strncmp(reply,"+CONTINUE",9)) {
    
        char *start = reply+10;
        char *end = reply+9;
        while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') 
            end++;
            
        // 结束的位置 - 开始的位置 = 40 
        if (end - start == CONFIG_RUN_ID_SIZE) {

            // 新的 replid 
            char new[CONFIG_RUN_ID_SIZE+1];
            memcpy(new,start, CONFIG_RUN_ID_SIZE);
            new[CONFIG_RUN_ID_SIZE] = '\0';

            // replid 和当前的缓存主节点 cached_master 的 replid 不一致
            // 只能进行全量复制
            if (strcmp(new, server.cached_master->replid)) {

                // 缓存的主节点的 replid 和参数的 replid 不一致

                // 将当前的 cached_master 的 replid 赋值给 replid2
                memcpy(server.replid2, server.cached_master->replid, sizeof(server.replid2));
                server.second_replid_offset = server.master_repl_offset+1;

                // 新的 replid 赋值到 server.replid
                memcpy(server.replid, new, sizeof(server.replid));
                // 设置缓存的客户端的 cached_master 的 replid = server.replid
                memcpy(server.cached_master->replid, new, sizeof(server.replid));

                // 释放从节点
                disconnectSlaves();
            }
        } 
        
        sdsfree(reply);

        // 重新将 cached_master 作为主节点使用
        replicationResurrectCachedMaster(fd);  

        if (server.repl_backlog == NULL) 
            // 没有复制积压缓冲区, 进行创建
            createReplicationBacklog();
        return PSYNC_CONTINUE; 
    }
    
    // 主节点无法响应 psync 命令
    // 主节点处于一个特殊的状态, 无法处理对应的请求
    if (!strncmp(reply,"-NOMASTERLINK",13) || !strncmp(reply,"-LOADING",8)){
        sdsfree(reply);
        return PSYNC_TRY_LATER;
    }

    // 响应了异常

    sdsfree(reply);
    // 将 server.cached_server 设置为空, 同时设置释放对应的内存
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED; 
}

5.1 replicationResurrectCachedMaster - 重新将 cached_master 作为主节点使用

void replicationResurrectCachedMaster(int newfd) {

    server.master = server.cached_master;
    server.cached_master = NULL;
    server.master->fd = newfd;
    server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
    server.master->authenticated = 1;
    server.master->lastinteraction = server.unixtime;
    // 修改状态为 REPL_STATE_CONNECTED (已连接上主节点)
    server.repl_state = REPL_STATE_CONNECTED;
    server.repl_down_since = 0;

    linkClient(server.master);

    // 添加事件
    // 为当前的主节点添加一个读事件, 触发逻辑为 readQueryFromClient
    if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) {
        freeClientAsync(server.master);
    }

    // 当前的主节点有数据需要发送 
    if (clientHasPendingReplies(server.master)) {
        // 添加一个写事件
        if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) {
            freeClientAsync(server.master);
        }
    }
}

5.2 clientHasPendingReplies - 判断当前的入参的客户端是否有数据需要发送

int clientHasPendingReplies(client *c) {
    // 固定缓存区有数据 或者 回复列表有数据
    return c->bufpos || listLength(c->reply);
}

5.3 createReplicationBacklog - 创建复制积压缓冲区

void createReplicationBacklog(void) {

    serverAssert(server.repl_backlog == NULL);

    server.repl_backlog = zmalloc(server.repl_backlog_size);
    server.repl_backlog_histlen = 0;
    server.repl_backlog_idx = 0;
    
    // 虽然没有任何数据, 但是第一个字节是数据写入的位置
    server.repl_backlog_off = server.master_repl_offset+1;
}

6 clearReplicationId2 - 清除 replid2 和 second_replid_offset

/**
 * 清除 replid2 和 second_replid_offset
 */
void clearReplicationId2(void) {
    memset(server.replid2,'0',sizeof(server.replid));
    server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
    server.second_replid_offset = -1;
}

7 masterTryPartialResynchronization - 尝试和从节点进行部分同步复制

int masterTryPartialResynchronization(client *c) {

    // client 进到这里面执行的命令为 psync ? -1 或 psync master_repl_id repl_offset

    long long psync_offset, psync_len;
    char *master_replid = c->argv[1]->ptr;
    char buf[128];
    int buflen;

    // 读取入参的第 3 个参数到 psync_offset
    // 读取失败, 直接全量同步
    if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK) 
        goto need_full_resync;

    // 在入参的 repl_id 和当前主节点的 replid 不一样的条件下
    // 1. 入参的 repl_id 和当前主节点一样的 replid2 不一样
    // 2. 请求的偏移量 大于 second_replid_offset (也就是 replid2)
    // replid2 可以看出当前节点同步了旧主节点数据的位置, second_replid_offset 可以看出是从节点同步了旧节点数据的位置
    // 如果 second_replid_offset > replid2, 表示从节点同步的数据快于当前节点的, 从节点的数据比主节点多, 不合理, 直接全量同步
    // 满足其中一种情况, 直接进入全量赋值
    if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) {
        // 这 2 种请求都不能进行部分同步, 直接进入全量同步
        goto need_full_resync;
    }

    // psync_offset 可以看做是从复制积压缓冲区的哪个位置开始复制
    // 当前复制积压缓冲区没有数据
    // 请求的偏移量小于 repl_backlog_off (复制积压缓冲区 backlog 的第一个字节的逻辑位置是下次复制的开始位置), 说明 backlog 所备份的数据的已经太新了, 有一些数据被覆盖,则需要进行全量复制
    // 请求的偏移量大于 repl_backlog_off + repl_backlog_histlen (backlog 的实际数据大小) 表示当前 backlog 的数据不够全,则需要进行全量复制
    if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) {

        goto need_full_resync;
    }

    // 部分复制

    // 设置为从节点标识 
    c->flags |= CLIENT_SLAVE;
    // 设置状态为 在线状态
    c->replstate = SLAVE_STATE_ONLINE;
    // 设置当前客户端, 也就是从节点的 应答时间为当前时间
    c->repl_ack_time = server.unixtime;
    // 设置当前客户端, 也就是从节点需要向主节点发送 ack 的标志 为 0, 没有 ack
    c->repl_put_online_on_ack = 0;
    // 当前节点的从节点列表添加当前的节点
    listAddNodeTail(server.slaves,c);

    // 从节点的复制能力支持 psync2
    if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
        // 发送内容 +CONTINUE 主节点当前的 repl_id \r\n
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
    } else {
        // 不支持
        // 发送内容 +CONTINUE\r\n
        buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
    }

    // 发送数据给从节点
    if (write(c->fd,buf,buflen) != buflen) {
        freeClientAsync(c);
        return C_OK;
    }

    psync_len = addReplyReplicationBacklog(c,psync_offset);

    // 计算延迟值小于 min-slaves-max-lag 的从节点的个数
    refreshGoodSlavesCount(); 
    return C_OK;              

need_full_resync:
    return C_ERR;
}

7.1 addReplyReplicationBacklog - 进行部分同步复制

long long addReplyReplicationBacklog(client *c, long long offset) {

    long long j, skip, len;

    // 复制积压缓冲区没有数据实际没有数据, 直接返回
    if (server.repl_backlog_histlen == 0) {
        return 0;
    }

    /* Compute the amount of bytes we need to discard. */

    // 跳过初始位置多少个字节
    skip = offset - server.repl_backlog_off;

    // 因为复用着这一个复制积压缓冲区 repl_backlog, 写满了, 新的数据又重开始位置进行写
    // repl_backlog 的容量 repl_backlog_size 和当前容量 repl_backlog_histlen 基本不会大于 repl_backlog 的容量大小
    // 但是几个代表位置的字段 repl_backlog_idx repl_backlog_off 会大于 repl_backlog_size
    // 所以下面位置的计算才那么复杂

    // 计算出我们的复制积压缓冲区中数据实际最旧的位置, 在复制积压缓冲区的位置
    // (下一个字节写入的位置 + 复制积压缓冲区的大小 - 复制积压缓冲区当前实际的容量)% 复制积压缓冲区的大小
    j = (server.repl_backlog_idx + (server.repl_backlog_size - server.repl_backlog_histlen)) % server.repl_backlog_size;

     // 计算当前复制的开始位置
    j = (j + skip) % server.repl_backlog_size;

    // 需要发送发送的数据长度
    len = server.repl_backlog_histlen - skip;

    // 不为空
    while(len) {
        
        // (复制积压缓冲区的容量 - 开始发送的位置) < 需要发送的数据的长度 的话, 表示当前可以发送的数据有部分在复制积压缓冲区的开头
        // 计算这次发送的数据长度
        long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len;

        // 发送数据
        addReplySds(c, sdsnewlen(server.repl_backlog + j, thislen));
        // 重新计算下次需要发送的数据长度
        len -= thislen;
        // 开始的位置重新从 0 开始
        j = 0;
    }

    // 返回写入的长度
    return server.repl_backlog_histlen - skip;
}

8 replicationSetupSlaveForFullResync - 将最新的 repl_id 和 repl_offset 发送给从节点

int replicationSetupSlaveForFullResync(client *slave, long long offset) {

    char buf[128];
    int buflen;

    // 更新入参的客户端的 psync 偏移量为 offset 
    slave->psync_initial_offset = offset;

    char buf[128];
    int buflen;

    // 更新入参的客户端的 psync 偏移量为 offset 
    slave->psync_initial_offset = offset;

    // 更新状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 bgsave 结束)
    slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;

    server.slaveseldb = -1; 

    // 如果从节点的状态是 CLIENT_PRE_PSYNC,则表示是 Redis 是 2.8 之前的版本,则不将这些信息发送给从节点
    // 在 2.8 之前只支持 SYNC 的全量复制同步,而在之后的版本提供了部分的重同步
    if (!(slave->flags & CLIENT_PRE_PSYNC)) {
        
        // 发送全量复制的消息给从节点
        // 发送 +FULLRESYNC replid offset\r\n 给从节点 
        buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.replid, offset);
        if (write(slave->fd,buf,buflen) != buflen) {
            freeClientAsync(slave);
            return C_ERR;
        }
    }
    return C_OK;
}

9 startBgsaveForReplication - 通过执行 Bgsave 进行全量同步

int startBgsaveForReplication(int mincapa) {

    int retval;

    // 是否支持 socket 同步, 支持无盘同步同时同步复制能力支持 EOF
    int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
    listIter li;
    listNode *ln;

    rdbSaveInfo rsi, *rsiptr;

    // 创建出一个初始的 rdbSaveInfo
    rsiptr = rdbPopulateSaveInfo(&rsi);

    if (rsiptr) {
        
        // 创建处理的 rdbSaveInfo 不为空
        
        // 支持 socket 
        if (socket_target)
            // 大体的流程和正常的 RDB 相关, 就不张开了, 太长了, 可以查看 rdb.c 这个文件进行了解
            // 1. 在节点客户端列表中找到所有状态为 SLAVE_STATE_WAIT_BGSAVE_START (等待 bgsave 开始, 也就是开始创建 RDB 文件) 并且支持 psync 命令的客户端, 向他们发送 +FULLRESYNC replid offset\r\n, 并修改状态为 SLAVE_STATE_WAIT_BGSAVE_END (等待 RDB 创建文件结束), 同时设置他们的 Socket 为阻塞模式 (可以理解为数据先堆积在 Socket 中, 后序一起发送)
            // 2. 创建出一个子进程, 将当前主节点的数据以 EOF 要求的格式写入到满足上一步的所有从节点的 Socket 中
            // EOF 格式
            // 开头 $EOF: + 长度 40 的 16 位字符串 + \r\n
            // 中间 RDB 内容
            // 结尾 一开始的 长度 40 的 16 位字符串 
            // 3. 全部写完后, 发送给对应的从节点
            retval = rdbSaveToSlavesSockets(rsiptr);
        else
            // 正常的 RDB 持久化, 将数据写入到一个文件中, rdb 类型为 RDB_CHILD_TYPE_DISK
            retval = rdbSaveBackground(server.rdb_filename,rsiptr);
    } else {
        // 为空, 直接返回失败
        retval = C_ERR;
    }

    if (retval == C_ERR) {

        // 执行同步失败
        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {

            client *slave = ln->value;

            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {

                // 删除对应的从节点
                slave->replstate = REPL_STATE_NONE;
                slave->flags &= ~CLIENT_SLAVE;
                listDelNode(server.slaves,ln);

                // 响应一个失败的提示
                addReplyError(slave,"BGSAVE failed, replication can't continue");
                slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
            }
        }
        return retval;
    }

    // 磁盘同步类型处理
    if (!socket_target) {

        // 无盘同步, 已经在 rdbSaveToSlavesSockets 中已更新从节点的信息
        // 有盘同步, 在这一步处理

        listRewind(server.slaves,&li);
        while((ln = listNext(&li))) {

            client *slave = ln->value;

            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {// 更新从节点客户端的偏移量, 状态和发送全量复制的消息给从节点
                // 当前的从节点的状态将会变为 SLAVE_STATE_WAIT_BGSAVE_END
                replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
            }
        }
    }

    // 同步成功
    if (retval == C_OK) 
        // 清除脚本缓存
        replicationScriptCacheFlush();
    return retval;
}

9.1 replicationScriptCacheFlush - 清除 Lua 脚本的缓存

void replicationScriptCacheFlush(void) {
    dictEmpty(server.repl_scriptcache_dict,NULL);
    listRelease(server.repl_scriptcache_fifo);
    server.repl_scriptcache_fifo = listCreate();
}

10 freeReplicationBacklog - 清空复制积压缓冲区

void freeReplicationBacklog(void) {
    serverAssert(listLength(server.slaves) == 0);
    zfree(server.repl_backlog);
    server.repl_backlog = NULL;
}

11 putSlaveOnline - 更新从节点的状态

void putSlaveOnline(client *slave) {

    // 更新从节点的状态为 SLAVE_STATE_ONLINE (在线状态)
    slave->replstate = SLAVE_STATE_ONLINE;
    // 未 ack 应答
    slave->repl_put_online_on_ack = 0;
    // 更新 ack 为未应答
    slave->repl_ack_time = server.unixtime; 
    // 新增写事件, 将当前客户端的输出缓冲区数据发生给从节点
    if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) {
        freeClient(slave);
        return;
    }

    // 更新当前从节点中延迟值小于配置的
    refreshGoodSlavesCount();
}

11.1 refreshGoodSlavesCount - 刷新在线从节点且超时时间小于配置的从节点数量

void refreshGoodSlavesCount(void) {

    listIter li;
    listNode *ln;
    int good = 0;

    // 没有配置这 2 个配置, 直接返回
    if (!server.repl_min_slaves_to_write || !server.repl_min_slaves_max_lag) 
        return;

    listRewind(server.slaves,&li);

    // 遍历当前所有的从节点, 找出状态为 SLAVE_STATE_ONLINE 和 超时时间小于配置的 repl_min_slaves_max_lag 的节点
    while((ln = listNext(&li))) {
        client *slave = ln->value;
        time_t lag = server.unixtime - slave->repl_ack_time;

        if (slave->replstate == SLAVE_STATE_ONLINE && lag <= server.repl_min_slaves_max_lag) 
            good++;
    }

    // 更新个数
    server.repl_good_slaves_count = good;
}

12 cancelReplicationHandshake - 从节点取消和主节点的握手

int cancelReplicationHandshake(void) {
    if (server.repl_state == REPL_STATE_TRANSFER) {
        replicationAbortSyncTransfer();
        server.repl_state = REPL_STATE_CONNECT;
    } else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) {
        undoConnectWithMaster();
        server.repl_state = REPL_STATE_CONNECT;
    } else {
        return 0;
    }
    return 1;
}

13 feedReplicationBacklogWithObject - 向复制积压缓冲区 backlog 写数据

void feedReplicationBacklogWithObject(robj *o) {

    char llstr[LONG_STR_SIZE];
    void *p;
    size_t len;

    // 写入的对象为 int 整数, 转为字符串处理
    if (o->encoding == OBJ_ENCODING_INT) {
        len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
        p = llstr;
    } else {
        len = sdslen(o->ptr);
        p = o->ptr;
    }

    feedReplicationBacklog(p,len);
}

13.1 feedReplicationBacklog - 更新复制积压缓冲区 backlog

void feedReplicationBacklog(void *ptr, size_t len) {

    unsigned char *p = ptr;

    // 更新全局复制的偏移量
    server.master_repl_offset += len;

    // 复制积压缓冲区, 随着是一个字符串数组, 但是可以看成是一个环形的, 写的尾部
    // 会重新回到头部继续写, 所以每次写入时, 都需要更新写入位置的索引 repl_backlog_idx

    // 写入的对象的长度不为 0, 就一直写 
    while(len) {

        // 计算环形缓冲区还有多少空间
        size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
        // 如果空间足够,设置 thislen 写的长度为 len
        if (thislen > len) 
            thislen = len;
        // 空间不足够或着刚刚好,那么只写入剩余的空间数,等待下次循环时写入

        // 将数据拷贝到复制积压缓冲区中    
        memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);

        // 更新下次写的下标
        server.repl_backlog_idx += thislen;

        // 如果 repl_backlog_idx 已经到达缓冲区的尾部,那么重置它
        if (server.repl_backlog_idx == server.repl_backlog_size)
            server.repl_backlog_idx = 0;

        // 更新未写入的数据长度    
        len -= thislen;
        // 更新未写入数据的地址
        p += thislen;
        // 更新实际数据的长度
        server.repl_backlog_histlen += thislen;
    }

    // 实际数据的长度最大只能为复制缓冲区的大小,因为之后环形写入时会覆盖开头位置的数据
    if (server.repl_backlog_histlen > server.repl_backlog_size)
        server.repl_backlog_histlen = server.repl_backlog_size;

    // 设置 backlog 所备份已复制的数据的偏移量,用于处理复制时的断线
    server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1;
}

14 replicationSendAck - 从节点发送心跳 ack 给主节点

void replicationSendAck(void) {
    client *c = server.master;

    if (c != NULL) {
        c->flags |= CLIENT_MASTER_FORCE_REPLY;
        addReplyMultiBulkLen(c,3);

        // 内容: REPLCONF ACK repl_offset
        addReplyBulkCString(c,"REPLCONF");
        addReplyBulkCString(c,"ACK");
        addReplyBulkLongLong(c,c->reploff);
        c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
    }
}

  目录