这篇文章,我们将介绍删除节点是如何实现的。
语法
redis-cli --cluster del-node <host:port> <node_id>
示例
redis-cli --cluster del-node 127.0.0.1:7000 ee8ce7d1ee50d84eba72482087350bfa065d1670
static int clusterManagerCommandDeleteNode(int argc, char **argv) {
UNUSED(argc);
int success = 1;
int port = 0;
char *ip = NULL;
if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
char *node_id = argv[1];
clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n",
node_id, ip, port);
clusterManagerNode *ref_node = clusterManagerNewNode(ip, port);
clusterManagerNode *node = NULL;
// Load cluster information
if (!clusterManagerLoadInfoFromNode(ref_node, 0)) return 0;
// Check if the node exists and is not empty
node = clusterManagerNodeByName(node_id);
if (node == NULL) {
clusterManagerLogErr("[ERR] No such node ID %s\n", node_id);
return 0;
}
if (node->slots_count != 0) {
clusterManagerLogErr("[ERR] Node %s:%d is not empty! Reshard data "
"away and try again.\n", node->ip, node->port);
return 0;
}
// Send CLUSTER FORGET to all the nodes but the node to remove
clusterManagerLogInfo(">>> Sending CLUSTER FORGET messages to the "
"cluster...\n");
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == node) continue;
if (n->replicate && !strcasecmp(n->replicate, node_id)) {
// Reconfigure the slave to replicate with some other node
clusterManagerNode *master = clusterManagerNodeWithLeastReplicas();
assert(master != NULL);
clusterManagerLogInfo(">>> %s:%d as replica of %s:%d\n",
n->ip, n->port, master->ip, master->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER REPLICATE %s",
master->name);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) return 0;
}
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER FORGET %s",
node_id);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) return 0;
}
// Finally shutdown the node
clusterManagerLogInfo(">>> SHUTDOWN the node.\n");
redisReply *r = redisCommand(node->context, "SHUTDOWN");
success = clusterManagerCheckRedisReply(node, r, NULL);
if (r) freeReplyObject(r);
return success;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
return 0;
}
删除节点从 clusterManagerCommandDeleteNode() 函数开始执行。
下面,我们具体分析删除节点都有哪些操作?
// Check if the node exists and is not empty
node = clusterManagerNodeByName(node_id);
首先,查找要删除的节点是否存在。
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == node) continue;
if (n->replicate && !strcasecmp(n->replicate, node_id)) {
// Reconfigure the slave to replicate with some other node
clusterManagerNode *master = clusterManagerNodeWithLeastReplicas();
assert(master != NULL);
clusterManagerLogInfo(">>> %s:%d as replica of %s:%d\n",
n->ip, n->port, master->ip, master->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER REPLICATE %s",
master->name);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) return 0;
}
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER FORGET %s",
node_id);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) return 0;
}
遍历 cluster_manager.nodes 列表,如果节点的主节点是 node_id,则执行 CLUSTER REPLICATE 命令,将该节点的主节点替换为拥有最少从节点的主节点,然后执行 CLUSTER FORGET 命令。
// Finally shutdown the node
clusterManagerLogInfo(">>> SHUTDOWN the node.\n");
redisReply *r = redisCommand(node->context, "SHUTDOWN");
success = clusterManagerCheckRedisReply(node, r, NULL);
最后,执行 SHUTDOWN 命令。
由上面的内容可知:删除节点时,客户端向服务端发送了 CLUSTER REPLICATE 命令和 CLUSTER FORGET 命令。
下面,我们介绍这两个命令的具体实现:
REPLICATE 命令实现
else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
/* CLUSTER REPLICATE <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
/* Lookup the specified node in our table. */
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
}
/* I can't replicate myself. */
if (n == myself) {
addReplyError(c,"Can't replicate myself");
return;
}
/* Can't replicate a slave. */
if (nodeIsSlave(n)) {
addReplyError(c,"I can only replicate a master, not a replica.");
return;
}
/* If the instance is currently a master, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
return;
}
/* Set the master. */
clusterSetMaster(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
将当前节点的主节点设置为节点 n,然后保存集群配置。
/* Set the specified node 'n' as master for this node.
* If this node is currently a master, it is turned into a slave. */
void clusterSetMaster(clusterNode *n) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);
if (nodeIsMaster(myself)) {
myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
myself->flags |= CLUSTER_NODE_SLAVE;
clusterCloseAllSlots();
} else {
if (myself->slaveof)
clusterNodeRemoveSlave(myself->slaveof,myself);
}
myself->slaveof = n;
clusterNodeAddSlave(n,myself);
replicationSetMaster(n->ip, n->port);
resetManualFailover();
}
将当前节点的主节点设置为节点 n,同时将当前节点添加到节点 n 的从节点列表中。
FORGET 命令实现
else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr);
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
return;
} else if (n == myself) {
addReplyError(c,"I tried hard but I can't forget myself...");
return;
} else if (nodeIsSlave(myself) && myself->slaveof == n) {
addReplyError(c,"Can't forget my master!");
return;
}
clusterBlacklistAddNode(n);
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
从集群中删除指定的节点,然后保存集群配置。
/* Remove a node from the cluster. The functio performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of slaves of its master, if
* it is a slave node.
*/
void clusterDelNode(clusterNode *delnode) {
int j;
dictIterator *di;
dictEntry *de;
/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->importing_slots_from[j] == delnode)
server.cluster->importing_slots_from[j] = NULL;
if (server.cluster->migrating_slots_to[j] == delnode)
server.cluster->migrating_slots_to[j] = NULL;
if (server.cluster->slots[j] == delnode)
clusterDelSlot(j);
}
/* 2) Remove failure reports. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node == delnode) continue;
clusterNodeDelFailureReport(node,delnode);
}
dictReleaseIterator(di);
/* 3) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}
将节点所管理的槽标记为未分配,然后删除该节点。
总结
首先,查找要删除的节点是否存在,然后遍历集群节点列表,如果该节点的主节点是要删除的节点,则通过 CLUSTER REPLICATE 命令将其替换为拥有最少从节点的主节点,再执行 CLUSTER FORGET 命令将要删除的节点从集群中删除,最后,执行 SHUTDOWN 命令关闭要删除的节点。
文章转载自谷竹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。