上一篇我们讲道理CAT埋点的层层逻辑,最终将消息交给消息发送器(TcpSocketSender)完成。本篇将从TcpSocketSender分析CAT消息如何发送。
CAT作为实时监控框架,对消息的及时传输有较高要求,这就要求使用TCP协议,所以CAT使用了网络编程框架netty。下面来看看TcpSocketSender的结构。
TcpSocketSender的主要方法有三个:
initialize 初始化TcpSocketSender
send 发送消息
run 消费消息
一、初始化TcpSocketSender
public void initialize() { int len = getQueueSize(); m_queue = new DefaultMessageQueue(len); m_atomicTrees = new DefaultMessageQueue(len); m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory); Threads.forGroup("cat").start(this); Threads.forGroup("cat").start(m_manager); Threads.forGroup("cat").start(new MergeAtomicTask()); }
复制
可以看到,TcpSocketSender初始化启动了三个线程:
消息发送线程
channel管理线程
合并原子任务的线程
二、发送消息
public void send(MessageTree tree) { if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); if (!result) { logQueueFullInfo(tree); } } }
复制
send方法判断消息树是否是原子消息,判断方法:
private boolean isAtomicMessage(MessageTree tree) { Message message = tree.getMessage(); if (message instanceof Transaction) { String type = message.getType(); if (type.startsWith("Cache.") || "SQL".equals(type)) { return true; } else { return false; } } else { return true; } }
复制
根据结果分别将消息树推送到原子队列和普通队列。
三、消费消息
public void run() { m_active = true; try { while (m_active) { ChannelFuture channel = m_manager.channel(); if (channel != null && checkWritable(channel)) { try { MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree); tree.setMessage(null); } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { long current = System.currentTimeMillis(); long oldTimestamp = current - HOUR; while (true) { try { MessageTree tree = m_queue.peek(); if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) { MessageTree discradTree = m_queue.poll(); if (discradTree != null) { m_statistics.onOverflowed(discradTree); } } else { break; } } catch (Exception e) { m_logger.error(e.getMessage(), e); break; } } TimeUnit.MILLISECONDS.sleep(5); } } } catch (InterruptedException e) { // ignore it m_active = false; } }
复制
轮询检查channel的状态,当channel可写时,则从消息队列里面poll出一个消息树进行处理;当channel不可写时,循环peek出一个消息树,检查消息树是否已经过期,过期则丢弃并对丢弃的树做统计。实际向服务端发送消息方法是sendInternal:
private void sendInternal(MessageTree tree) { ChannelFuture future = m_manager.channel(); ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K m_codec.encode(tree, buf); int size = buf.readableBytes(); Channel channel = future.channel(); channel.writeAndFlush(buf); if (m_statistics != null) { m_statistics.onBytes(size); } }
复制
将消息树编码后发送出去,再统计一下数据。消息编码分为两部分:消息头和消息体。
消息头:按顺序依次是version(默认是PT1)、domain、hostName、ipAddress、threadGroupName、threadId、threadName、messageId、parentMessageId、rootMessageId、sessionToken。以TAB符(\t)连接,以LF(\n)结尾。
消息体:Transaction若是原子的(没有子消息),type为A;普通的Transaction开始时type为t,结束时为T。Event的type为E,Trace的type为L,Metric的type为M,Heartbeat的type为H。