暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

MySQL组复制(二)为何不推荐执行大事务

johopig 2021-07-19
3353


  • 场景复现

  • 术语及函数介绍

    • alive_task

    • send_msg

    • sender_task

    • _send_msg

  • 结论

  • 解决方法

  • 写在最后


最近在测试不同数据量下备份还原的功能,因此需要迁移大量数据到MGR集群中,但是迁移数据量大一点的情况经常报错中断,在网上搜索了很多都只是提了一下建议不要执行大事务就没了,官方文档则好一点提示可能因为内存分配及网络带宽引发问题。但由于要交差并且可以水一下日报周报我们不能只知其一不知其二对不对,还是得找出阻碍执行大事务的真正原因!

场景复现

这个错误几乎是必现的,往MGR集群迁移个几百兆的数据就可以复现,集群Master的错误日志如下:

2021-07-15T05:38:44.345845Z 0 [ERROR] [MY-011495] [Repl] Plugin group_replication reported: 'This server is not able to reach a majority of members in the group. This server will now block all updates. The server will remain blocked until contact with the majority is restored. It is possible to use group_replication_force_members to force a new group membership.'
......
2021-07-15T05:38:44.690220Z 0 [Warning] [MY-011498] [Repl] Plugin group_replication reported: 'The member has resumed contact with a majority of the members in the group. Regular operation is restored and transactions are unblocked.'
2021-07-15T05:39:03.249057Z 0 [Warning] [MY-011630] [Repl] Plugin group_replication reported: 'Due to a plugin error, some transactions were unable to be certified and will now rollback.'
2021-07-15T05:39:03.233267Z 0 [ERROR] [MY-011505] [Repl] Plugin group_replication reported: 'Member was expelled from the group due to network failures, changing member status to ERROR.'

复制

client端这边更简洁,就提示了一个:

error on observer while running replication hook 'before_commit'

这个错误在我们上一篇的文章中提到,mgr是以MySQL插件的形式存在的,MySQL在二阶段提交prepare后写binlog
之前执行这些回调函数,因此这个报错是比较笼统的,只能知道在执行mgr插件函数的时候发生了问题。根据这个问题你去谷歌、百度搜索的话千篇一律都是让你修改replication_transication_size_limit
参数的大小,但是经过调整我们发现错误依然会发生。

通过查看MySQL的官方文档,我们可以找到下面一句话:

replication_transication_size_limit

这里提醒了我们可以从组成员之间的心跳机制去思考这个问题

术语及函数介绍

我们到GitHub里找到组复制插件的源码 https://github.com/mysql/mysql-server/blob/3e90d07c3578e4da39dc1bce73559bbdf655c28c/plugin/group_replication/libmysqlgcs/src/bindings/xcom/xcom/xcom_base.cc,可以看到这些术语以及对一些函数的介绍。

/*
    A node is an instance of the xcom thread. There is only one instance
    of the xcom thread in the agent.
    A client is the application which is using xcom to send messages.
    A thread is a real OS thread.
    A task is a logical process. It is implemented by coroutines and
    an explicit stack.
*/


复制
/*
 static int tcp_server(task_arg);
 The tcp_server listens on the xcom port and starts an
 acceptor_learner_task whenever a new connection is detected.
*/


复制
/*  
 static int sender_task(task_arg);
    The sender_task waits for tcp messages on its input queue and
    sends it on the tcp socket. If the socket is closed for any
    reason, the sender_task will reconnect the socket. There is one
    sender_task for each socket. The sender task exists mainly to
    simplify the logic in the other tasks, but it could have been
    replaced with a coroutine which handles the connection logic after
    having reserved the socket for its client task.
    
    static int acceptor_learner_task(task_arg);
    This is the server part of the xcom thread. There is one
    acceptor_learner_task for each node in the system. The acceptor
    learner_task reads messages from the socket, finds the correct
    Paxos state machine, and dispatches to the correct message handler
    with the state machine and message as arguments.

    static int alive_task(task_arg);
    Sends i-am-alive to other nodes if there has been no normal traffic
    for a while. It also pings nodes which seem to be inactive.
    static int detector_task(task_arg);
    
    The detector_task periodically scans the set of connections from
    other nodes and sees if there has been any activity. If there has
    been no activity for some time, it will assume that the node is
    dead, and send a view message to the client.
*/


复制

alive_task

找到和心跳发送有关的代码。可以看到这里在循环发送心跳。源码地址L408-L462

int alive_task(task_arg arg MY_ATTRIBUTE((unused))) {
  DECL_ENV
  pax_msg *i_p;
  pax_msg *you_p;
  END_ENV;
  TASK_BEGIN

  ep->i_p = ep->you_p = NULL;

  while (!xcom_shutdown) {
    {
      double sec = task_now();
      synode_no alive_synode = get_current_message();
      site_def const *site = find_site_def(alive_synode);

      /*
        如果有一些配置上的变更,马上应用
      */

      validate_update_configuration(site, alive_synode);

      if (site && get_nodeno(site) != VOID_NODE_NO) {
        /* 
        如果该node已经一段时间没发送心跳了,
        立即发送心跳
        */

        if (server_active(site, get_nodeno(site)) < sec - 0.5) {
          replace_pax_msg(&ep->i_p, pax_msg_new(alive_synode, site));
          ep->i_p->op = i_am_alive_op;
          /* 通过该方法调用send_msg将心跳task写入queue */
          send_to_all_site(site, ep->i_p, "alive_task");
        }

        /* 
        如果某个node没有发心跳过来,
        那就ping一下它看它是死是活
        */

        {
          node_no i;
          for (i = 0; i < get_maxnodes(site); i++) {
            if (i != get_nodeno(site) && may_be_dead(site->detected, i, sec)) {
              replace_pax_msg(&ep->you_p, pax_msg_new(alive_synode, site));
              ep->you_p->op = are_you_alive_op;

              ep->you_p->a = new_app_data();
              ep->you_p->a->app_key.group_id = ep->you_p->a->group_id =
                  get_group_id(site);
              ep->you_p->a->body.c_t = xcom_boot_type;
              init_node_list(1, &site->nodes.node_list_val[i],
                             &ep->you_p->a->body.app_u_u.nodes);

              IFDBG(D_DETECT, FN; COPY_AND_FREE_GOUT(
                        dbg_list(&ep->you_p->a->body.app_u_u.nodes)););

              send_server_msg(site, i, ep->you_p);
            }
          }
        }
      }
    }
    TASK_DELAY(1.0);

复制

send_msg

send_msg
xcom
通用的方法,即发送心跳task、ping其他成员或者发送binlog event等等最后都是通过该方法塞入相应的队列里。源码地址L846-L864

/* 将消息塞入队列 */
int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
             pax_msg *p)
 
{
  assert(p);
  assert(s);
  {
    /* msg_link队列 */
    msg_link *link = msg_link_new(p, to);
    IFDBG(D_NONE, FN; PTREXP(&s->outgoing);
          COPY_AND_FREE_GOUT(dbg_msg_link(link)););
    p->from = from;
    p->group_id = group_id;
    p->max_synode = get_max_synode();
    p->delivered_msg = get_delivered_msg();
    IFDBG(D_NONE, FN; PTREXP(p); STREXP(s->srv); NDBG(p->from, d);
          NDBG(p->to, d); NDBG(p->group_id, u));
    channel_put(&s->outgoing, &link->l);
  }
  return 0;
}

复制


/* 将消息塞入队列 */
int send_msg(server *s, node_no from, node_no to, uint32_t group_id,
             pax_msg *p)
 
{
  assert(p);
  assert(s);
  {
    /* msg_link队列 */
    msg_link *link = msg_link_new(p, to);
    IFDBG(D_NONE, FN; PTREXP(&s->outgoing);
          COPY_AND_FREE_GOUT(dbg_msg_link(link)););
    p->from = from;
    p->group_id = group_id;
    p->max_synode = get_max_synode();
    p->delivered_msg = get_delivered_msg();
    IFDBG(D_NONE, FN; PTREXP(p); STREXP(s->srv); NDBG(p->from, d);
          NDBG(p->to, d); NDBG(p->group_id, u));
    channel_put(&s->outgoing, &link->l);
  }
  return 0;
}

复制


sender_task

sender_task
从队列里获取task后通过 _send_mgs
发送到各个TCP连接。源码地址L1426-L1570

/* Fetch messages from queue and send to other server.  Having a
   separate queue and task for doing this simplifies the logic since we
   never need to wait to send. */

int sender_task(task_arg arg) {
  ......
  TASK_BEGIN
  ......
  ep->s = (server *)get_void_arg(arg);
  ep->link = NULL;
  ep->tag = TAG_START;
  srv_ref(ep->s);

  while (!xcom_shutdown) {
    /* Loop until connected */
    G_DEBUG("Connecting to %s:%d", ep->s->srv, ep->s->port);
    for (;;) {
      ......
    }

    G_DEBUG("Connected to %s:%d on fd=%d", ep->s->srv, ep->s->port,
            ep->s->con.fd);

    /* We are ready to start sending messages.
       Insert a message in the input queue to negotiate the protocol.
    */

    start_protocol_negotiation(&ep->s->outgoing);
    while (is_connected(&ep->s->con)) {
      int64_t ret;
      assert(!ep->link);
      ......
      if (link_empty(&ep->s->outgoing.data)) {
        TASK_CALL(flush_srv_buf(ep->s, &ret));
      }
      CHANNEL_GET(&ep->s->outgoing, &ep->link, msg_link);
      {
        int64_t ret_code;
        if (ep->link->p) {
          ......
          /* 调用_send_msg将消息发送到其他server */
          TASK_CALL(_send_msg(ep->s, ep->link->p, ep->link->to, &ret_code));
        }
      }
    next:
      msg_link_delete(&ep->link);
      /* TASK_YIELD; */
    }
  }
}

复制

_send_msg

最后在下面这一步write 或者 flush buffer 源码地址L256-L322

/* Send a message to server s */
static int _send_msg(server *s, pax_msg *p, node_no to, int64_t *ret) {
    ......
    TASK_CALL(flush_srv_buf(s, ret));
    or
    TASK_CALL(task_write(&s->con, ep->buf, ep->buflen, &sent))
;
    ......
    alive(s); /* Note activity */
    X_FREE(ep->buf);
    /* UNLOCK_FD(s->con.fd, 'w'); */
    if (sent <= 0) {
        shutdown_connection(&s->con);
    }
    ......
}

复制

结论

到这里就可以解释为什么大事务会导致mgr集群出问题,「原因是每个socket连接只有一个sender_task
,相当于只有一个worker从队列中获取task并处理,而在执行大事务的情况下无论是内存分配或者说binlog event发送都占用了较长的时间,导致后面的心跳包阻塞发不出去,因此该master就被其他成员误认为死了并驱逐出去」
,驱逐之后该master被自动设置为read_only
状态,那么这边客户端继续导入sql显然就会引起错误,最后就报了error befor_commit
的错误。

解决方法

解决方法很简单:

  1. replication_transication_size_limit
    调大
  2. group_replication_member_expel_timeout
    调大

在迁移完成后记得调回原本的大小

写在最后

虽然最后只是简单地调了两个参数,但是过程却是十分艰辛的。

从环境搭建、场景复现、缩小问题的范围、搜索资料、源码的查阅,都耗费了很长的时间和精力。还有一个心得就是英语得补补课才行,官方文档和源码的注解全部是英文,很多单词都不认识,读起来磕磕绊绊的,而一键翻译又是机翻,经常遇到翻出来的和原本表达的意思天差地别的情况。


文章转载自johopig,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论