暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

CAT源码分析(六)——CAT客户端之消息发送

贰级天災 2018-09-14
699

上一篇我们讲道理CAT埋点的层层逻辑,最终将消息交给消息发送器(TcpSocketSender)完成。本篇将从TcpSocketSender分析CAT消息如何发送。

CAT作为实时监控框架,对消息的及时传输有较高要求,这就要求使用TCP协议,所以CAT使用了网络编程框架netty。下面来看看TcpSocketSender的结构。

TcpSocketSender的主要方法有三个:

  • initialize 初始化TcpSocketSender

  • send 发送消息

  • run 消费消息

一、初始化TcpSocketSender

    public void initialize() {
        int len = getQueueSize();
        m_queue = new DefaultMessageQueue(len);
        m_atomicTrees = new DefaultMessageQueue(len);
        m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
        Threads.forGroup("cat").start(this);
        Threads.forGroup("cat").start(m_manager);
        Threads.forGroup("cat").start(new MergeAtomicTask());
    }
复制

可以看到,TcpSocketSender初始化启动了三个线程:

  • 消息发送线程

  • channel管理线程

  • 合并原子任务的线程

二、发送消息

    public void send(MessageTree tree) {
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());
            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());
            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
复制

send方法判断消息树是否是原子消息,判断方法:

    private boolean isAtomicMessage(MessageTree tree) {
        Message message = tree.getMessage();
        if (message instanceof Transaction) {
            String type = message.getType();
            if (type.startsWith("Cache.") || "SQL".equals(type)) {
                return true;
            } else {
                return false;
            }
        } else {
            return true;
        }
    }
复制

根据结果分别将消息树推送到原子队列和普通队列。

三、消费消息

    public void run() {
        m_active = true;
        try {
            while (m_active) {
                ChannelFuture channel = m_manager.channel();
                if (channel != null && checkWritable(channel)) {
                    try {
                        MessageTree tree = m_queue.poll();
                        if (tree != null) {
                            sendInternal(tree);
                            tree.setMessage(null);
                        }
                    } catch (Throwable t) {
                        m_logger.error("Error when sending message over TCP socket!", t);
                    }
                } else {
                    long current = System.currentTimeMillis();
                    long oldTimestamp = current - HOUR;
                    while (true) {
                        try {
                            MessageTree tree = m_queue.peek();
                            if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                                MessageTree discradTree = m_queue.poll();
                                if (discradTree != null) {
                                    m_statistics.onOverflowed(discradTree);
                                }
                            } else {
                                break;
                            }
                        } catch (Exception e) {
                            m_logger.error(e.getMessage(), e);
                            break;
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(5);
                }
            }
        } catch (InterruptedException e) {
            // ignore it
            m_active = false;
        }
    }
复制

轮询检查channel的状态,当channel可写时,则从消息队列里面poll出一个消息树进行处理;当channel不可写时,循环peek出一个消息树,检查消息树是否已经过期,过期则丢弃并对丢弃的树做统计。实际向服务端发送消息方法是sendInternal:

    private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
        m_codec.encode(tree, buf);
        int size = buf.readableBytes();
        Channel channel = future.channel();
        channel.writeAndFlush(buf);
        if (m_statistics != null) {
            m_statistics.onBytes(size);
        }
    }
复制

将消息树编码后发送出去,再统计一下数据。消息编码分为两部分:消息头和消息体。

  • 消息头:按顺序依次是version(默认是PT1)、domain、hostName、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。以TAB符(\t)连接,以LF(\n)结尾。

  • 消息体:Transaction若是原子的(没有子消息),type为A;普通的Transaction开始时type为t,结束时为T。Event的type为E,Trace的type为L,Metric的type为M,Heartbeat的type为H。

文章转载自贰级天災,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论