暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

redis 集群源码解析(8)删除节点

谷竹 2020-06-25
216

这篇文章,我们将介绍删除节点是如何实现的。




语法

    redis-cli --cluster del-node <host:port> <node_id>


    示例

      redis-cli --cluster del-node 127.0.0.1:7000 ee8ce7d1ee50d84eba72482087350bfa065d1670


        static int clusterManagerCommandDeleteNode(int argc, char **argv) {
        UNUSED(argc);
        int success = 1;
        int port = 0;
        char *ip = NULL;
        if (!getClusterHostFromCmdArgs(1, argv, &ip, &port)) goto invalid_args;
        char *node_id = argv[1];
        clusterManagerLogInfo(">>> Removing node %s from cluster %s:%d\n",
        node_id, ip, port);
        clusterManagerNode *ref_node = clusterManagerNewNode(ip, port);
        clusterManagerNode *node = NULL;


        // Load cluster information
        if (!clusterManagerLoadInfoFromNode(ref_node, 0)) return 0;


        // Check if the node exists and is not empty
        node = clusterManagerNodeByName(node_id);
        if (node == NULL) {
        clusterManagerLogErr("[ERR] No such node ID %s\n", node_id);
        return 0;
        }
        if (node->slots_count != 0) {
        clusterManagerLogErr("[ERR] Node %s:%d is not empty! Reshard data "
        "away and try again.\n", node->ip, node->port);
        return 0;
        }


        // Send CLUSTER FORGET to all the nodes but the node to remove
        clusterManagerLogInfo(">>> Sending CLUSTER FORGET messages to the "
        "cluster...\n");
        listIter li;
        listNode *ln;
        listRewind(cluster_manager.nodes, &li);
        while ((ln = listNext(&li)) != NULL) {
        clusterManagerNode *n = ln->value;
        if (n == node) continue;
        if (n->replicate && !strcasecmp(n->replicate, node_id)) {
        // Reconfigure the slave to replicate with some other node
        clusterManagerNode *master = clusterManagerNodeWithLeastReplicas();
        assert(master != NULL);
        clusterManagerLogInfo(">>> %s:%d as replica of %s:%d\n",
        n->ip, n->port, master->ip, master->port);
        redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER REPLICATE %s",
        master->name);
        success = clusterManagerCheckRedisReply(n, r, NULL);
        if (r) freeReplyObject(r);
        if (!success) return 0;
        }
        redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER FORGET %s",
        node_id);
        success = clusterManagerCheckRedisReply(n, r, NULL);
        if (r) freeReplyObject(r);
        if (!success) return 0;
        }


        // Finally shutdown the node
        clusterManagerLogInfo(">>> SHUTDOWN the node.\n");
        redisReply *r = redisCommand(node->context, "SHUTDOWN");
        success = clusterManagerCheckRedisReply(node, r, NULL);
        if (r) freeReplyObject(r);
        return success;
        invalid_args:
        fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);
        return 0;
        }

        删除节点从 clusterManagerCommandDeleteNode() 函数开始执行。


        下面,我们具体分析删除节点都有哪些操作?


          // Check if the node exists and is not empty
          node = clusterManagerNodeByName(node_id);

          首先,查找要删除的节点是否存在。


            listRewind(cluster_manager.nodes, &li);
            while ((ln = listNext(&li)) != NULL) {
                clusterManagerNode *n = ln->value;
                if (n == node) continue;
                if (n->replicate && !strcasecmp(n->replicate, node_id)) {
                    // Reconfigure the slave to replicate with some other node
                    clusterManagerNode *master = clusterManagerNodeWithLeastReplicas();
                    assert(master != NULL);
                    clusterManagerLogInfo(">>> %s:%d as replica of %s:%d\n",
                                          n->ip, n->port, master->ip, master->port);
                    redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER REPLICATE %s",
                                                            master->name);
                    success = clusterManagerCheckRedisReply(n, r, NULL);
                    if (r) freeReplyObject(r);
                    if (!success) return 0;
                }
                redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER FORGET %s",
            node_id);
                success = clusterManagerCheckRedisReply(n, r, NULL);
                if (r) freeReplyObject(r);
                if (!success) return 0;
            }

            遍历 cluster_manager.nodes 列表,如果节点的主节点是 node_id,则执行 CLUSTER REPLICATE 命令,将该节点的主节点替换为拥有最少从节点的主节点,然后执行 CLUSTER FORGET 命令。


              // Finally shutdown the node
              clusterManagerLogInfo(">>> SHUTDOWN the node.\n");
              redisReply *r = redisCommand(node->context, "SHUTDOWN");
              success = clusterManagerCheckRedisReply(node, r, NULL);

              最后,执行 SHUTDOWN 命令。


              由上面的内容可知:删除节点时,客户端向服务端发送了 CLUSTER REPLICATE 命令和 CLUSTER FORGET 命令。


              下面,我们介绍这两个命令的具体实现:


              REPLICATE 命令实现

                else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
                    /* CLUSTER REPLICATE <NODE ID> */
                    clusterNode *n = clusterLookupNode(c->argv[2]->ptr);


                    /* Lookup the specified node in our table. */
                    if (!n) {
                        addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
                        return;
                    }


                    /* I can't replicate myself. */
                    if (n == myself) {
                        addReplyError(c,"Can't replicate myself");
                        return;
                    }


                    /* Can't replicate a slave. */
                    if (nodeIsSlave(n)) {
                        addReplyError(c,"I can only replicate a master, not a replica.");
                        return;
                    }


                    /* If the instance is currently a master, it should have no assigned
                     * slots nor keys to accept to replicate some other node.
                     * Slaves can switch to another master without issues. */
                    if (nodeIsMaster(myself) &&
                        (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
                        addReplyError(c,
                            "To set a master the node must be empty and "
                            "without assigned slots.");
                        return;
                    }


                    /* Set the master. */
                    clusterSetMaster(n);
                    clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
                    addReply(c,shared.ok);
                }

                将当前节点的主节点设置为节点 n,然后保存集群配置。

                  /* Set the specified node 'n' as master for this node.
                  * If this node is currently a master, it is turned into a slave. */
                  void clusterSetMaster(clusterNode *n) {
                  serverAssert(n != myself);
                  serverAssert(myself->numslots == 0);


                  if (nodeIsMaster(myself)) {
                  myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
                  myself->flags |= CLUSTER_NODE_SLAVE;
                  clusterCloseAllSlots();
                  } else {
                  if (myself->slaveof)
                  clusterNodeRemoveSlave(myself->slaveof,myself);
                  }
                  myself->slaveof = n;
                  clusterNodeAddSlave(n,myself);
                  replicationSetMaster(n->ip, n->port);
                  resetManualFailover();
                  }

                  将当前节点的主节点设置为节点 n,同时将当前节点添加到节点 n 的从节点列表中。


                  FORGET 命令实现

                    else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
                        /* CLUSTER FORGET <NODE ID> */
                        clusterNode *n = clusterLookupNode(c->argv[2]->ptr);


                        if (!n) {
                            addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
                            return;
                        } else if (n == myself) {
                            addReplyError(c,"I tried hard but I can't forget myself...");
                            return;
                        } else if (nodeIsSlave(myself) && myself->slaveof == n) {
                            addReplyError(c,"Can't forget my master!");
                            return;
                        }
                        clusterBlacklistAddNode(n);
                        clusterDelNode(n);
                        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
                                             CLUSTER_TODO_SAVE_CONFIG);
                        addReply(c,shared.ok);
                    }

                    从集群中删除指定的节点,然后保存集群配置。

                      /* Remove a node from the cluster. The functio performs the high level
                      * cleanup, calling freeClusterNode() for the low level cleanup.
                      * Here we do the following:
                      *
                      * 1) Mark all the slots handled by it as unassigned.
                      * 2) Remove all the failure reports sent by this node and referenced by
                      * other nodes.
                      * 3) Free the node with freeClusterNode() that will in turn remove it
                      * from the hash table and from the list of slaves of its master, if
                      * it is a slave node.
                      */
                      void clusterDelNode(clusterNode *delnode) {
                      int j;
                      dictIterator *di;
                      dictEntry *de;


                      /* 1) Mark slots as unassigned. */
                      for (j = 0; j < CLUSTER_SLOTS; j++) {
                      if (server.cluster->importing_slots_from[j] == delnode)
                      server.cluster->importing_slots_from[j] = NULL;
                      if (server.cluster->migrating_slots_to[j] == delnode)
                      server.cluster->migrating_slots_to[j] = NULL;
                      if (server.cluster->slots[j] == delnode)
                      clusterDelSlot(j);
                      }


                      /* 2) Remove failure reports. */
                      di = dictGetSafeIterator(server.cluster->nodes);
                      while((de = dictNext(di)) != NULL) {
                      clusterNode *node = dictGetVal(de);


                      if (node == delnode) continue;
                      clusterNodeDelFailureReport(node,delnode);
                      }
                      dictReleaseIterator(di);


                      /* 3) Free the node, unlinking it from the cluster. */
                      freeClusterNode(delnode);
                      }

                      将节点所管理的槽标记为未分配,然后删除该节点。


                      总结

                      首先,查找删除的节点是否存在,然后遍历集群节点列表,如果该节点的主节点是要删除的节点,则通过 CLUSTER REPLICATE 命令将其替换为拥有最少从节点的主节点,再执行 CLUSTER FORGET 命令将要删除的节点从集群中删除,最后,执行 SHUTDOWN 命令关闭要删除的节点。

                      文章转载自谷竹,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                      评论