暂无图片
暂无图片
4
暂无图片
暂无图片
暂无图片

即时通讯 | 记一次从0-1搭建IM在线实时聊天进阶思考过程

原创 每天译点晓知识 2022-12-09
1040


开篇:想必大家每当提到消息通讯,就会想到消息队列。当然,对于消息可靠性有极高要求的业务场景-使用专业的消息队列MQ(AQMP高级消息队列协议)。今天,我们来try不一样的idea。
现在ChatGPT风靡网络-OpenAI愈火,这里仅以机器人问答为例-RoBot-FAQ:


引入:

1、当使用IM通讯技术时,还在束缚于第三方SDK?

2、当还在处于付费享用IM聊天,提供的服务,不妨自主来搭建聊天室,点对点聊天,端到端通信,群组聊天,实时推送等服务?

方案:客户端 VS 服务端 =>即时通讯


简介:基于socket.io,

服务端使用 netty-socket.io

客户端使用 socket.io-client-java(或Socket.io.js)

首先,在pom.xml中引入依赖,

    <!-- netty-socketio -->
    <dependency>
        <groupId>com.corundumstudio.socketio</groupId>
        <artifactId>netty-socketio</artifactId>
        <version>${netty-socketio.version}</version>
    </dependency>
    <!-- socket.io-client 也可通过github源码编译更高版本jar -->
    <dependency>
        <groupId>io.socket</groupId>
        <artifactId>socket.io-client</artifactId>
        <version>${socket.io-client.version}</version>
    </dependency>
    <!-- netty-transport-native-epoll,说明:若是主流的maven仓库没有ARM 64架构版本的Jar包,将ARM 64 架构版本的netty-transport-native-epoll-4.1.43.Final-linux-aarch_64.jar包放入maven仓库,当然也可自行移植。-->        
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-transport-native-epoll</artifactId>
        <version>4.1.45.Final</version>
        <classifier>linux-aarch_64</classifier>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>commons-logging</artifactId>
                <groupId>commons-logging</groupId>
            </exclusion>
        </exclusions>
    </dependency>
    <!--  redisson  -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>${redisson.version}</version>
    </dependency>
    <!-- jedis  -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>${jedis.version}</version>
    </dependency>
复制

其中,

netty-transport-native-epoll移植可参考->

https://support.huaweicloud.com/prtg-sc-kunpengwebs/kunpengspringcloudhoxton_02_0042.html
复制

可将其上传至私服-nexus,

具体坐标系数创建可参考小编之前博文->

https://blog.csdn.net/yxd179/article/details/115665824
复制

接着,这里socketio的相关配置信息,可自行根据实际场景去调整,说明->


这里将IM服务内嵌在主服务中,内部端口指定的是五位数字16688。

然后,我们可以通过SocketConfigurer配置类,来读取相关配置信息,

注入Bean-socketIOServer,加入configuration.(说明:configuration类可替换xml配置文件,即把该类作为spring的xml配置文件中的-配置spring容器(应用上下文),当被注解的类内部包含有一个或多个被@Bean注解的方法,则这些方法将会被扫描并用于构建bean定义-初始化Spring容器,至于SpringBoot自动装配可参考达梦数据库适配博文。)

@Configuration
public class SocketConfigurer {
private static final Logger logger = LoggerFactory.getLogger(SocketConfigurer.class);

@Value("{socketio.port}") private Integer port = 16688;  @Value("{socketio.workCount}")
private int workCount = 100;

@Value("{socketio.allowCustomRequests}") private boolean allowCustomRequests = true;  @Value("{socketio.upgradeTimeout}")
private int upgradeTimeout = 10000;

@Value("{socketio.pingTimeout}") private int pingTimeout = 60000;  @Value("{socketio.pingInterval}")
private int pingInterval = 25000;

@Value("{socketio.maxFramePayloadLength}") private int maxFramePayloadLength = 1048576;  @Value("{socketio.maxHttpContentLength}")
private int maxHttpContentLength = 1048576;

@Autowired
private RedissonClient redissonClient;

@Bean(“socketIOServer”)
public SocketIOServer socketIOServer() {

com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration();
// 配置端口
config.setPort(port);
// 开启Socket端口复用
com.corundumstudio.socketio.SocketConfig socketConfig = new com.corundumstudio.socketio.SocketConfig();
socketConfig.setReuseAddress(true);
config.setSocketConfig(socketConfig);
// 连接数大小
config.setWorkerThreads(workCount);
// 允许客户请求
config.setAllowCustomRequests(allowCustomRequests);
// 协议升级超时时间(毫秒),默认10秒,HTTP握手升级为ws协议超时时间
config.setUpgradeTimeout(upgradeTimeout);
// 心跳机制-客户端:定时清除那些因为某种原因在一定时间段内没有做指定操作的客户端连接。
// 心跳机制-服务端:检测是否断开连接,然后尝试重连等,也可监控延时。
// 最关键通过nettyIdleStateHandler超时机制来实现心跳和重连,再者就是编码传输org.msgpack编码器来实现跨平台数据传输,或者protobuf全称Google Protocol Buffers,在通信协议和数据存储等领域中使用较多,若是实现RPC也就是网络层+代理+传输层。
// Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件
config.setPingTimeout(pingTimeout);
// Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔
config.setPingInterval(pingInterval);
// 设置HTTP交互最大内容长度
config.setMaxHttpContentLength(maxHttpContentLength);
// 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器
config.setMaxFramePayloadLength(maxFramePayloadLength);
// 支持分布式-多台机器能够同时消费消息-后续这里消息消费去重-redis分布式锁
config.setStoreFactory(new RedissonStoreFactory(redissonClient));
// 认证
/config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
// 这里逻辑是直接通过,可自行设计verifyUser()
return true;
}
});/
return new SocketIOServer(config);
}

/**
* 开启SocketIOServer注解支持-在AuthorizationListener调用service作用户认证-通过注解方式可以注入service-执行相应的连接授权
*/
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) {
return new SpringAnnotationScanner(socketServer);
}
}
构建消息结构类-客户端和服务器端直接传递的消息体,

@Data
public class MessageDto implements Serializable {
/**
* 源客户端用户名
/
private String sourceUserName;

/*
* 目标客户端用户名
/
private String targetUserName;

/*
* 消息类型
/
private String isOnline;

/*
* 消息类型
/
private String msgType;

/*
* 消息内容
/
private String content;

/*
* 订单信息ID
/
private Integer orderInfoId;

/*
* 发送设备编号
/
private String senderNo;

/*
* 发送类型
/
private Integer sendType;

/*
* 发送方
/
private String from;

/*
* 接收方
/
private String to;

private Date createTime;

private String messageId;

private String orderNo;

/*
* 空构造方法
/
public MessageDto() {
}

/*
* 构造方法
*
* @param sourceUserName
* @param targetUserName
* @param msgType
* @param content
*/
public MessageDto(String sourceUserName, String targetUserName, String isOnline, String content, String msgType, Integer orderInfoId, String senderNo, Integer sendType, String from, String to, Date createTime, String messageId, String orderNo) {
this.sourceUserName = sourceUserName;
this.targetUserName = targetUserName;
this.isOnline = isOnline;
this.content = content;
this.msgType = msgType;
this.orderInfoId = orderInfoId;
this.senderNo = senderNo;
this.sendType = sendType;
this.from = from;
this.to = to;
this.createTime = createTime;
this.messageId = messageId;
this.orderNo = orderNo;
}
}
声明AbstractSockerServerHandler类,核心-Message Handler,

public abstract class AbstractSockerServerHandler implements ApplicationListener {

/**
* 记录客户端用户
/
private static final String USER_CLIENT_KEY = “IM_Yd_User_Client”;

protected final static Logger logger = LoggerFactory.getLogger(AbstractSockerServer.class);

protected SocketIOServer socketIOServer;

protected RedissonClient redissonClient;

protected RedisClient redisClient;


public AbstractSockerServerHandler(SocketIOServer server, RedissonClient redissonClient, RedisClient redisClient) {
this.socketIOServer = server;
this.redisClient = redisClient;
this.redissonClient = redissonClient;
}

@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
logger.info("---------- NettySocket Starting… ----------");
// 事件监听
joinEventListener();
// 主题
planTopic();
// 启动
this.socketIOServer.start();
logger.info("---------- NettySocket Start completed. ----------");
}

/*
* 用户授权-验证
*
* @param userKey
* @return
*/
public abstract boolean verifyUser(String userKey);

public abstract SlefMessageDto sendSlef(String messageId, Date date, SocketIOClient toclinet);

/**
* 是否转发消息,保存消息
*
* @param socketIOClient
* @param ackRequest
* @param messageDto
*/
public abstract boolean sendMsg(SocketIOClient socketIOClient, AckRequest ackRequest, MessageDto messageDto);


//当用户连接进来,通过hset存储用户-哈希表,Redis除了string, 还有hash,list,set,zset,这里通过hash里的hset()+hget()
protected void saveClient(SocketIOClient client, String sourceUserName) {
redisClient.hset(USER_CLIENT_KEY, sourceUserName, client.getSessionId().toString());
}

//当用户退出,移除
protected void remove(String sourceUserName) {
redisClient.hdel(USER_CLIENT_KEY, sourceUserName);
}

//获取所有client
protected SocketIOClient getClient(String targetUserName) {
String uuid = redisClient.hget(USER_CLIENT_KEY, targetUserName);
if (StringUtils.isNotBlank(uuid)) {
return socketIOServer.getClient(UUID.fromString(uuid));
} else {
return null;
}
}

protected void close() {
this.socketIOServer.stop();
}

private void joinEventListener() {
// 连接
this.socketIOServer.addConnectListener(client -> {
String userNo = client.getHandshakeData().getSingleUrlParam(“userId”);
String orderNo = client.getHandshakeData().getSingleUrlParam(“orderId”);
String userKey= client.getHandshakeData().getSingleUrlParam(“userKey”);
if (verifyUser(userKey)) {
logger.info(“user {} enable long connection notification, sessionId: {}, remoteAddress: {}”,
userNo, client.getSessionId().toString(), client.getRemoteAddress().toString());
// 存储
saveClient(client, orderNo + “" + userNo);
// 上线通知
this.sendMsg(null, null,
new MessageDto(orderNo + "” + userNo, null, MsgTypeEnum.ONLINE.getValue(), null, null, null, null, null, null, null, null, null, null));
} else {
// 断开
client.disconnect();
logger.info(“user {} ilegal”, userNo);
}
});

// 消息主题发布-redisson
this.socketIOServer.addEventListener(EventLisenter.NEWS, MessageDto.class,
(client, data, ackSender) -> redissonClient.getTopic(Topic.NEWS).publish(data));
// 断开
this.socketIOServer.addDisconnectListener(client -> {
String userNo = client.getHandshakeData().getSingleUrlParam(“userId”);
String orderNo = client.getHandshakeData().getSingleUrlParam(“orderId”);
if (StringUtils.isNotBlank(userNo)) {
logger.info(“user {} disconnect long connection notification, sessionId: {}, remoteAddress: {}”,
userNo, client.getSessionId().toString(), client.getRemoteAddress().toString());
redissonClient.getMap(client.getSessionId().toString()).delete();
client.disconnect();
// 移除
remove(orderNo + “" + userNo);
// 发送下线通知
/this.sendMsg(null, null,
new MessageDto(userName, null, MsgTypeEnum.OFFLINE.getValue(), null));/
}
});
}


private void planTopic() {
// 消息主题订阅-redisson
RTopic newsTopic = redissonClient.getTopic(Topic.NEWS);
newsTopic.addListener((s, message) -> {
SocketIOClient slefClient = getClient(message.getOrderNo() + "” + message.getFrom());
SocketIOClient client = getClient(message.getOrderNo() + “_” + message.getTo());
// 转发
if (sendMsg(client, null, message) && client != null && client.isChannelOpen()) {
logger.info(“user {} to {} sendMsg”, message.getFrom(), message.getTo());
client.sendEvent(Event.RECEIVEMSG, message);
}

// sendSelf
if (slefClient != null && slefClient.isChannelOpen()) {
SlefMessageDto slefMessageDto = sendSlef(message.getMessageId(), message.getCreateTime(), client);
slefClient.sendEvent(Event.SLEFMSG, slefMessageDto);
}
});

// 广播
RTopic broadcastTopic = redissonClient.getTopic(Topic.BROADCAST);
broadcastTopic.addListener((channel, data) -> {
socketIOServer.getBroadcastOperations().sendEvent(Event.BROADCAST, data);
});
}

}
@Component
@Order(1)
public class SocketServerHandler extends AbstractSockerServerHandler {
@Autowired
private RedisClient redisClient;

@Autowired
public SocketServerHandler(SocketIOServer server, RedissonClient redissonClient, RedisClient redisClient) {
super(server, redissonClient, redisClient);
}

@Override
public boolean verifyUser(String skey) {
//验证用户-逻辑可自行定义
boolean verifyFalg = true;
return verifyFalg;
}

@Override
public SlefMessageDto sendSlef(String messageId, Date date, SocketIOClient toclinet) {
SlefMessageDto slefMessageDto = new SlefMessageDto();
slefMessageDto.setCreateTime(date);
slefMessageDto.setMessageId(messageId);
slefMessageDto.setStatus(0);
return slefMessageDto;
}

@Override
public boolean sendMsg(SocketIOClient socketIOClient, AckRequest ackRequest, MessageDto messageDto) {
//消息结构体转化
if (messageDto != null) {
logger.info(“messageDto:{}”, JSON.toJSON(messageDto));
/String sign = redisClient.setNX(Md5Util.encrypByMD5(JSON.toJSONString(messageDto)), “MSG_UNIQUE”, 3);/
//转发返回时间戳
//messageDto.setCreateTime(new Date());
//logger.info(“msgObject:{}”, JSON.toJSON(msgObject));
//入库-逻辑可自行定义
//logger.info(“sign:{}”, sign);
//消息重复消费-redis分布式锁防止重复消费:主要是加锁-解锁
//加锁>>>使用setnx命令保证互斥性,需要设置锁的过期时间,避免死锁setnx和设置过期时间需要保持原子性,避免在设置setnx成功之后在设置过期时间客户端崩溃导致死锁,加锁的Value 值为一个唯一标示。可以采用UUID作为唯一标示。加锁成功后需要把唯一标示返回给客户端来用来客户端进行解锁操作。
//解锁>>>a. 需要拿加锁成功的唯一标示要进行解锁,从而保证加锁和解锁的是同一个客户端。b. 解锁操作需要比较唯一标示是否相等,相等再执行删除操作。这2个操作可以采用Lua脚本方式使2个命令的原子性。
//if (判断sign标识是否通过) {
//插入
//}
//if (ackRequest.isAckRequested()) {
// send ack response with data to client
// ackRequest.sendAckData(“服务器回答Socket.EVENT_MESSAGE”, “okkk”);
//}
if (socketIOClient != null) {
return true;
}
}
return false;
}

@PreDestroy
public void relase() {
this.close();
}

}
复制

最后,开始服务启动,上效果图:


这里基于一个单号作为多用户间即时聊天的媒介-组聊-群聊可扩展…

我们在redis查看下用户数,建议千万别在生产环境随便Keys *哈~~阻塞!!!!!!


在redis中删除Big Key,可参->

public void delBigHash1() throws IOException {
String bigHashKey = “bigHashKey”;
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).build();
Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(bigHashKey, scanOptions);
cursor.forEachRemaining(member -> {
redisTemplate.opsForHash().delete(bigHashKey, member.getKey());
});
cursor.close();
}

public void delBigHash2() throws IOException {
String bigHashKey = “bigHashKey”;
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).build();
Cursor<Map.Entry<Object, Object>> cursor = redisTemplate.opsForHash().scan(bigHashKey, scanOptions);
while (cursor.hasNext()){
Map.Entry<Object, Object> next = cursor.next();
redisTemplate.opsForHash().delete(bigHashKey, next.getKey());
}
cursor.close();
}
复制

相关阐述-详细总结:

消息队列的本质,其实还是归根于生产者和消费者供需不平衡问题:

比如,MQ大量消息堆积=>

若是线上故障的话需修复 consumer,恢复速度等其消费完这个对实时性不作太多要求的可以等,但消息太多时长也会过久,则临时可紧急扩容:

1、先修复 consumer 的问题,确保其恢复消费速度,然后将现有 consumer 都停掉。

2、新建一个 topic,partition 是原来的 N 倍,临时建立好原先 N 倍的 queue 数量。

3、然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据(注册新的服务实例不能影响正常服务运行),消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 N 倍数量的 queue。

4、接着临时征用 N 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 N 倍,以正常的 N 倍速度来消费数据。

5、等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。

若是 RocketMQ,官方针对消息积压问题,也有相当成熟的解决方案,也无非就是:

1、提高消费并行度.

绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度:

同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax…

2、批量方式消费.

3、不重要的跳过.

IM即时通讯,这里通过netty去作通知,可省自己写网络层,通过redis发布订阅(publish/subscribe)模型,其publish/subscribe的缺陷在于客户端必须一直在线才能接收到消息,断线可能会导致客户端丢失消息,旧版的redis可能会由于订阅者消费不够快而变的不稳定导致崩溃,甚至被管理员kill,若一个客户端订阅了某个或者某些频道,但它读取消息的速度不够快,那么不断的积压的消息就会使得redis输出缓冲区的体积越来越大,这可能会导致redis的速度变慢,甚至直接崩溃。也可能导致redis被操作系统强制杀死,甚至导致操作系统本身不可用,当然,redis设计者也考虑到这些问题->自动断开不符合client-output-buffer-limit pubsub配置选项要求的订阅客户端。基于数据传输的可靠性:任何网络系统在执行操作时都可能会遇到断网的情况。而断线产生的连接错误通常会使得网络连接两端中的一端进行重新连接。如果客户端在执行订阅操作的过程中断线,那么客户端将会丢失在断线期间的消息,这在很多业务场景下是不可忍受的。

服务-思考延伸:

Redis服务

开源的内存中数据结构存储系统kv,可用作nosql数据库、缓存和消息发布订阅(FIFO-list对象从头取数据,从尾部塞数据-list对象blpop brpop接口以及Pub/Sub(发布/订阅)的某些接口-阻塞),当然还有一些高速读写的场景-秒杀-红包等。

数据结构:字符串(String),散列(Hash),列表(List),集合(Set),有序集合(Sorted Set或者是ZSet)与范围查询,Bitmaps,Hyperloglogs 和地理空间(Geospatial)索引半径查询,常见的数据结构类型:String、List、Set、Hash、ZSet。

优势:QPS吞吐量-基于内存,非常快,数据存在内存中类似于HashMap,查找和操作的时间复杂度都是O(1),多路I/O复用模型-多个网络连接-复用同一个线程,非阻塞IO。

其中,select、poll、epoll 可以同时监察多个流的 I/O 事件的能力,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有 I/O 事件时,就从阻塞态中唤醒,于是程序就会轮询一遍所有的流(epoll 是只轮询那些真正发出了事件的流),并且只依次顺序的处理就绪的流,这种做法就避免了大量的无用操作。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络 IO 的时间消耗)。

Netty服务

线程模型:通过 Reactor 模型基于多路复用器接收并处理用户请求,内部实现了两个线程池, boss 线程池和 work 线程池,其中 boss 线程池的线程负责处理请求的 accept 事件,当接收到 accept 事件的请求时,把对应的 socket 封装到一个 NioSocketChannel 中,并交给 work 线程池,其中 work 线程池负责请求的 read 和 write 事件,由对应的 Handler 处理。

i:传统阻塞 I/O 服务模型

ii:Reactor 模型

• 单 Reactor 单线程

所有 I/O 操作都由一个线程完成,即多路复用、事件分发和处理都是在一个 Reactor 线程上完成的。既要接收客户端的连接请求,向服务端发起连接,又要发送/读取请 求或应答/响应消息。一个 NIO 线程同时处理成百上千的链路,性能上无法支撑,速度 慢,若线程进入死循环,整个程序不可用,对于高负载、大并发的应用场景不合适。

• 单 Reactor 多线程

有一个 NIO 线程(Acceptor) 只负责监听服务端,接收客户端的 TCP 连接 请求;NIO 线程池负责网络 IO 的操作,即消息的读取、解码、编码和发送;1 个 NIO 线 程可以同时处理 N 条链路,但是 1 个链路只对应 1 个 NIO 线程,这是为了防止发生并发 操作问题。但在并发百万客户端连接或需要安全认证时,一个 Acceptor 线程可能会存在性能不足问题。

• 主从 Reactor 多线程

Acceptor 线程用于绑定监听端口,接收客户端连接,将 SocketChannel 从主线程池的 Reactor 线程的多路复用器上移除,重新注册到 Sub 线程池的线程上,用于处理 I/O 的读写等操作,从而保证 mainReactor 只负责接入认证、握手等操作;

多路 I/O 复用:

select具有O(n)的无差别轮询复杂度,poll本质上和select没有区别,它将用户传入的数组拷贝到内核空间,然后查询每个fd(linux中每一个进程在内核中都对应有一个“打开文件”数组,存放指向文件对象的指针,而fd 是这个数组的下标。我们对文件进行操作时,系统调用,将fd传入内核,内核通过fd找到文件,对文件进行操作。)对应的设备状态, 但是它没有最大连接数的限制-基于链表存储。

epoll时间复杂度O(1)event poll,当哪个流发生了怎样的I/O事件进行通知,实际上是事件驱动(每个事件关联上fd,复杂度降低到了O(1))。

select,poll,epoll都是IO多路复用的机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),通知程序进行相应读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

当然还有BIO同步阻塞IO:一个线程管理一个连接-单向传递数据,面向流,操作字节按字节存取。

NIO同步非阻塞IO:一个线程管理多个连接-双向传递数据,面向通道,操作缓冲区按快存取,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间,在现在的Linux内核里有都能够支持,epoll-Linux所特有,而select则应该是POSIX所规定,一般操作系统均有实现。

Selector非阻塞模式下,一个选择器可以循环检测多个selectablechennal获得准备好的通道,一个线程就可以处理多个channel(类似于BIO中的socket),极大的减少线程数。Channel 通道源节点和目标节点的连接,在NIO中负责缓存区数据的传输,本身不存储数据(Channel管道运输者Buffer缓冲区存储的数据来实现对数据处理)-从文件中读写数据>FileChannel;能通过TCP读写网络中的数据>SocketChannel;能通过UDP读写网络中的数据>ServerSocketChannel;可以监听新进来的TCP连接,类似Web服务器对每一个新进来的连接都会创建>DatagramChannel。Buffer缓冲区-数据的临时存放区数组,通过allocate()分配缓冲区大小来获得缓冲区。

本地NIO:非直接缓冲区-需要copy和直接缓冲区-零拷贝避免在用户态和内核态之间进行数据拷贝,由native方法实现,直接从内核(内核态)缓冲区copy到目标路径;减少上下文对内核态和用户态切换,减少了copy的次数,比如将硬盘上的文件展示给用户看时,需要:OS将文件A从硬盘拷贝到内核,A从内核buffercopy到用户buffer,A从用户buffercopy到内核buffer,A从内核buffer中copy到硬件设备中传输。

NIO的零拷贝过程:文件A从硬盘拷贝到内核buffer中,向socket buffer中追加文件A在内核buffer中的位置和偏移量,根据相应位置和偏移量直接将文件A从内核buffercopy到硬件设备中。

OS将内存空间划分:内核空间和用户空间,为保证用户进程不能直接操作内核,保证内核安全。内核空间存放内核代码和程序,直接与硬件交互,用户空间存储用户的进程,与内核空间交互。

比如IO在OS中的运行过程,read读取:应用程序需要等待,等待内核空间有数据reda读取,IO多路复用->linux中所有外接设备都当成文件对待,OS使用文件描述符来操作,调用select/poll等函数传入多个文件描述符,如果有一个文件描述符就绪(可理解为对应内核空间有了数据)则立刻返回,否则阻塞至超时-两个系统 调用,一个是select另一个是recvfrom。其通信过程当用户进程调用了select,那么整个进程会被block,同时,kernel会“监视”所有select负责的socket,当任一个socket中的数据准备好,select就会返回,这个时候用户进程再调用read操作(read时的进程跟select不是同一个,select只负责监控),将数据从kernel拷贝到用户进程(空间)。


「 往期文章 」


数据库在线实训平台-MySQL篇

数仓进阶 | 记一次OLAP分析引擎演进思考过程

鲲鹏认证 | 多数据库切换之Oracle迁移至MySQL篇

开源数据库 | 记一次基于鲲鹏欧拉操作系统openGauss实践过程

MySQL优化案例 | 查看SQL语句执行计划

达梦 | 记一次国产数据库适配的思考过程

Elasticsearch读写数据工作原理 | MySQL的重复数据插入处理

Elasticsearch进阶篇 | 记kibana执行dsl脚本实战过程

分表分库 | 水平分割VS垂直分割

序列化 | Google的Gson与Alibaba的FastJson机制

最后修改时间:2022-12-09 21:01:25
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论