1 replicationUnsetMaster - 断开主从关系
void replicationUnsetMaster(void) {
if (server.masterhost == NULL)
return;
sdsfree(server.masterhost);f
server.masterhost = NULL;
shiftReplicationId();
if (server.master)
freeClient(server.master);
replicationDiscardCachedMaster();
cancelReplicationHandshake();
disconnectSlaves();
server.repl_state = REPL_STATE_NONE;
server.slaveseldb = -1;
server.repl_no_slaves_since = server.unixtime;
}
1.1 shiftReplicationId - 切换第一组 replid 和 offset 到第二组
void shiftReplicationId(void) {
memcpy(server.replid2, server.replid, sizeof(server.replid));
server.second_replid_offset = server.master_repl_offset+1;
changeReplicationId();
}
1.2 changeReplicationId - 重新生成一个 replid
void changeReplicationId(void) {
getRandomHexChars(server.replid, CONFIG_RUN_ID_SIZE);
server.replid[CONFIG_RUN_ID_SIZE] = '\0';
}
1.3 replicationDiscardCachedMaster - 清空缓存的 cached_master
void replicationDiscardCachedMaster(void) {
if (server.cached_master == NULL)
return;
server.cached_master->flags &= ~CLIENT_MASTER;
freeClient(server.cached_master);
server.cached_master = NULL;
}
1.4 cancelReplicationHandshake - 取消主从复制的握手
int cancelReplicationHandshake(void) {
if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) {
undoConnectWithMaster();
server.repl_state = REPL_STATE_CONNECT;
} else {
return 0;
}
return 1;
}
1.5 replicationAbortSyncTransfer - 停止文件同步传输
void replicationAbortSyncTransfer(void) {
serverAssert(server.repl_state == REPL_STATE_TRANSFER);
undoConnectWithMaster();
close(server.repl_transfer_fd);
unlink(server.repl_transfer_tmpfile);
zfree(server.repl_transfer_tmpfile);
}
1.6 undoConnectWithMaster - 断开和主节点的连接
void undoConnectWithMaster(void) {
int fd = server.repl_transfer_s;
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
close(fd);
server.repl_transfer_s = -1;
}
1.7 slaveIsInHandshakeState - 判断当前从节点是否处于握手阶段
int slaveIsInHandshakeState(void) {
return server.repl_state >= REPL_STATE_RECEIVE_PONG && server.repl_state <= REPL_STATE_RECEIVE_PSYNC;
}
1.8 disconnectSlaves - 关闭所有的从节点
void disconnectSlaves(void) {
while (listLength(server.slaves)) {
listNode *ln = listFirst(server.slaves);
freeClient((client*)ln->value);
}
}
2 replicationSetMaster - 保存主节点信息并进入待连接状态
void replicationSetMaster(char *ip, int port) {
/ was_master = 当前从节点的原本的主节点为 null
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port;
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients();
disconnectSlaves();
cancelReplicationHandshake();
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
server.repl_state = REPL_STATE_CONNECT;
}
2.1 disconnectAllBlockedClients - 释放所有阻塞状态的客户端
void disconnectAllBlockedClients(void) {
listNode *ln;
listIter li;
listRewind(server.clients,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED) {
addReplySds(c,sdsnew("-UNBLOCKED force unblock from blocking operation, instance state changed (master -> replica?)\r\n"));
unblockClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
}
2.2 replicationCacheMasterUsingMyself - 为 server.cached_master 赋值一个默认合成的客户端
void replicationCacheMasterUsingMyself(void) {
server.master_initial_offset = server.master_repl_offset;
replicationCreateMasterClient(-1,-1);
memcpy(server.master->replid, server.replid, sizeof(server.replid));
unlinkClient(server.master);
server.cached_master = server.master;
server.master = NULL;
}
2.3 replicationCreateMasterClient - 创建主节点客户端
void replicationCreateMasterClient(int fd, int dbid) {
server.master = createClient(fd);
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;
memcpy(server.master->replid, server.master_replid, sizeof(server.master_replid));
if (server.master->reploff == -1)
server.master->flags |= CLIENT_PRE_PSYNC;
if (dbid != -1)
selectDb(server.master, dbid);
}
3 connectWithMaster - 尝试和主节点建立 TCP 连接
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockBestEffortBindConnect(NULL, server.masterhost, server.masterport, NET_FIRST_BIND_ADDR);
if (fd == -1) {
return C_ERR;
}
if (aeCreateFileEvent(server.el, fd,AE_READABLE|AE_WRITABLE, syncWithMaster, NULL) == AE_ERR) {
close(fd);
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
4 sendSynchronousCommand - 发送命令到主节点
char *sendSynchronousCommand(int flags, int fd, ...) {
if (flags & SYNC_CMD_WRITE) {
char *arg;
va_list ap;
sds cmd = sdsempty();
sds cmdargs = sdsempty();
size_t argslen = 0;
va_start(ap,fd);
while(1) {
arg = va_arg(ap, char*);
if (arg == NULL)
break;
cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
argslen++;
}
va_end(ap);
cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
cmd = sdscatsds(cmd,cmdargs);
sdsfree(cmdargs);
if (syncWrite(fd, cmd, sdslen(cmd), server.repl_syncio_timeout*1000) == -1) {
sdsfree(cmd);
return sdscatprintf(sdsempty(),"-Writing to master: %s",strerror(errno));
}
sdsfree(cmd);
}
if (flags & SYNC_CMD_READ) {
char buf[256];
if (syncReadLine(fd,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1) {
return sdscatprintf(sdsempty(),"-Reading from master: %s", strerror(errno));
}
server.repl_transfer_lastio = server.unixtime;
return sdsnew(buf);
}
return NULL;
}
5 slaveTryPartialResynchronization - 根据条件判断是发送部分同步复制还是全量同步复制, 同时支持读取主节点发送信息
int slaveTryPartialResynchronization(int fd, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
if (!read_reply) {
server.master_initial_offset = -1;
if (server.cached_master) {
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
} else {
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
sdsfree(reply);
aeDeleteFileEvent(server.el,fd,AE_READABLE);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
if (sdslen(reply) == 0) {
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset)
offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
}
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0')
end++;
if (end - start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start, CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
if (strcmp(new, server.cached_master->replid)) {
memcpy(server.replid2, server.cached_master->replid, sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
memcpy(server.replid, new, sizeof(server.replid));
memcpy(server.cached_master->replid, new, sizeof(server.replid));
disconnectSlaves();
}
}
sdsfree(reply);
replicationResurrectCachedMaster(fd);
if (server.repl_backlog == NULL)
createReplicationBacklog();
return PSYNC_CONTINUE;
}
if (!strncmp(reply,"-NOMASTERLINK",13) || !strncmp(reply,"-LOADING",8)){
sdsfree(reply);
return PSYNC_TRY_LATER;
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
5.1 replicationResurrectCachedMaster - 重新将 cached_master 作为主节点使用
void replicationResurrectCachedMaster(int newfd) {
server.master = server.cached_master;
server.cached_master = NULL;
server.master->fd = newfd;
server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
server.master->authenticated = 1;
server.master->lastinteraction = server.unixtime;
server.repl_state = REPL_STATE_CONNECTED;
server.repl_down_since = 0;
linkClient(server.master);
if (aeCreateFileEvent(server.el, newfd, AE_READABLE, readQueryFromClient, server.master)) {
freeClientAsync(server.master);
}
if (clientHasPendingReplies(server.master)) {
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, sendReplyToClient, server.master)) {
freeClientAsync(server.master);
}
}
}
5.2 clientHasPendingReplies - 判断当前的入参的客户端是否有数据需要发送
int clientHasPendingReplies(client *c) {
return c->bufpos || listLength(c->reply);
}
5.3 createReplicationBacklog - 创建复制积压缓冲区
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
server.repl_backlog = zmalloc(server.repl_backlog_size);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = server.master_repl_offset+1;
}
6 clearReplicationId2 - 清除 replid2 和 second_replid_offset
void clearReplicationId2(void) {
memset(server.replid2,'0',sizeof(server.replid));
server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
server.second_replid_offset = -1;
}
7 masterTryPartialResynchronization - 尝试和从节点进行部分同步复制
int masterTryPartialResynchronization(client *c) {
long long psync_offset, psync_len;
char *master_replid = c->argv[1]->ptr;
char buf[128];
int buflen;
if (getLongLongFromObjectOrReply(c,c->argv[2],&psync_offset,NULL) != C_OK)
goto need_full_resync;
if (strcasecmp(master_replid, server.replid) && (strcasecmp(master_replid, server.replid2) || psync_offset > server.second_replid_offset)) {
goto need_full_resync;
}
if (!server.repl_backlog || psync_offset < server.repl_backlog_off || psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) {
goto need_full_resync;
}
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
return C_OK;
}
psync_len = addReplyReplicationBacklog(c,psync_offset);
refreshGoodSlavesCount();
return C_OK;
need_full_resync:
return C_ERR;
}
7.1 addReplyReplicationBacklog - 进行部分同步复制
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
if (server.repl_backlog_histlen == 0) {
return 0;
}
skip = offset - server.repl_backlog_off;
j = (server.repl_backlog_idx + (server.repl_backlog_size - server.repl_backlog_histlen)) % server.repl_backlog_size;
j = (j + skip) % server.repl_backlog_size;
len = server.repl_backlog_histlen - skip;
while(len) {
long long thislen = ((server.repl_backlog_size - j) < len) ? (server.repl_backlog_size - j) : len;
addReplySds(c, sdsnewlen(server.repl_backlog + j, thislen));
len -= thislen;
j = 0;
}
return server.repl_backlog_histlen - skip;
}
8 replicationSetupSlaveForFullResync - 将最新的 repl_id 和 repl_offset 发送给从节点
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
char buf[128];
int buflen;
slave->psync_initial_offset = offset;
char buf[128];
int buflen;
slave->psync_initial_offset = offset;
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
server.slaveseldb = -1;
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", server.replid, offset);
if (write(slave->fd,buf,buflen) != buflen) {
freeClientAsync(slave);
return C_ERR;
}
}
return C_OK;
}
9 startBgsaveForReplication - 通过执行 Bgsave 进行全量同步
int startBgsaveForReplication(int mincapa) {
int retval;
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
listIter li;
listNode *ln;
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rsiptr) {
if (socket_target)
retval = rdbSaveToSlavesSockets(rsiptr);
else
retval = rdbSaveBackground(server.rdb_filename,rsiptr);
} else {
retval = C_ERR;
}
if (retval == C_ERR) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
slave->replstate = REPL_STATE_NONE;
slave->flags &= ~CLIENT_SLAVE;
listDelNode(server.slaves,ln);
addReplyError(slave,"BGSAVE failed, replication can't continue");
slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
return retval;
}
if (!socket_target) {
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {、
replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
}
}
}
if (retval == C_OK)
replicationScriptCacheFlush();
return retval;
}
9.1 replicationScriptCacheFlush - 清除 Lua 脚本的缓存
void replicationScriptCacheFlush(void) {
dictEmpty(server.repl_scriptcache_dict,NULL);
listRelease(server.repl_scriptcache_fifo);
server.repl_scriptcache_fifo = listCreate();
}
10 freeReplicationBacklog - 清空复制积压缓冲区
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
11 putSlaveOnline - 更新从节点的状态
void putSlaveOnline(client *slave) {
slave->replstate = SLAVE_STATE_ONLINE;
slave->repl_put_online_on_ack = 0;
slave->repl_ack_time = server.unixtime;
if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) {
freeClient(slave);
return;
}
refreshGoodSlavesCount();
}
11.1 refreshGoodSlavesCount - 刷新在线从节点且超时时间小于配置的从节点数量
void refreshGoodSlavesCount(void) {
listIter li;
listNode *ln;
int good = 0;
if (!server.repl_min_slaves_to_write || !server.repl_min_slaves_max_lag)
return;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
time_t lag = server.unixtime - slave->repl_ack_time;
if (slave->replstate == SLAVE_STATE_ONLINE && lag <= server.repl_min_slaves_max_lag)
good++;
}
server.repl_good_slaves_count = good;
}
12 cancelReplicationHandshake - 从节点取消和主节点的握手
int cancelReplicationHandshake(void) {
if (server.repl_state == REPL_STATE_TRANSFER) {
replicationAbortSyncTransfer();
server.repl_state = REPL_STATE_CONNECT;
} else if (server.repl_state == REPL_STATE_CONNECTING || slaveIsInHandshakeState()) {
undoConnectWithMaster();
server.repl_state = REPL_STATE_CONNECT;
} else {
return 0;
}
return 1;
}
13 feedReplicationBacklogWithObject - 向复制积压缓冲区 backlog 写数据
void feedReplicationBacklogWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBacklog(p,len);
}
13.1 feedReplicationBacklog - 更新复制积压缓冲区 backlog
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len)
thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
server.repl_backlog_off = server.master_repl_offset - server.repl_backlog_histlen + 1;
}
14 replicationSendAck - 从节点发送心跳 ack 给主节点
void replicationSendAck(void) {
client *c = server.master;
if (c != NULL) {
c->flags |= CLIENT_MASTER_FORCE_REPLY;
addReplyMultiBulkLen(c,3);
addReplyBulkCString(c,"REPLCONF");
addReplyBulkCString(c,"ACK");
addReplyBulkLongLong(c,c->reploff);
c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
}
}