暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

【Netty】Springboot整合Netty

爱编码 2021-07-20
1991

小知识:21天效应

在行为心理学中,人们把一个人的新习惯或新理念的形成并得以巩固至少需要21天的现象,称之为21天效应。也就是说,一个人的动作或想法,如果重复21天就会变成一个习惯性的动作或想法。

图片来源:https://www.foodiesfeed.com

步骤

1 先写好基本的Netty客户端和Netty服务的代码。参考文章【netty初识】

2.搭建好基本的Springboot项目。

3.将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例。

4.Springboot启动时,将Netty服务给启动;同时Springboot停止时,将Netty服务销毁。

实现

Netty服务端

主要工作:将Netty服务端代码的启动代码和关闭代码分离,服务端加上@Component注解,交由Spring管理实例

  1. /**

  2. * 服务端

  3. * 1.创建一个ServerBootstrap的实例引导和绑定服务器。

  4. * 2.创建并分配一个NioEventLoopGroup实例以进行事件的处理,比如接受连接以及读写数据。

  5. * 3.指定服务器绑定的本地的InetSocketAddress。

  6. * 4.使用一个EchoServerHandler的实例初始化每一个新的Channel。

  7. * 5.调用ServerBootstrap.bind()方法以绑定服务器。

  8. */

  9. @Slf4j

  10. @Component

  11. public class EchoServer {


  12. /**

  13. * NioEventLoop并不是一个纯粹的I/O线程,它除了负责I/O的读写之外

  14. * 创建了两个NioEventLoopGroup,

  15. * 它们实际是两个独立的Reactor线程池。

  16. * 一个用于接收客户端的TCP连接,

  17. * 另一个用于处理I/O相关的读写操作,或者执行系统Task、定时任务Task等。

  18. */

  19. private final EventLoopGroup bossGroup = new NioEventLoopGroup();

  20. private final EventLoopGroup workerGroup = new NioEventLoopGroup();

  21. private Channel channel;

  22. /**

  23. * 启动服务

  24. * @param hostname

  25. * @param port

  26. * @return

  27. * @throws Exception

  28. */

  29. public ChannelFuture start(String hostname,int port) throws Exception {


  30. final EchoServerHandler serverHandler = new EchoServerHandler();

  31. ChannelFuture f = null;

  32. try {

  33. //ServerBootstrap负责初始化netty服务器,并且开始监听端口的socket请求

  34. ServerBootstrap b = new ServerBootstrap();

  35. b.group(bossGroup, workerGroup)

  36. .channel(NioServerSocketChannel.class)

  37. .localAddress(new InetSocketAddress(hostname,port))

  38. .childHandler(new ChannelInitializer<SocketChannel>() {

  39. @Override

  40. protected void initChannel(SocketChannel socketChannel) throws Exception {

  41. // 为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler

  42. socketChannel.pipeline().addLast(serverHandler);

  43. }

  44. });


  45. f = b.bind().sync();

  46. channel = f.channel();

  47. log.info("======EchoServer启动成功!!!=========");

  48. } catch (Exception e) {

  49. e.printStackTrace();

  50. } finally {

  51. if (f != null && f.isSuccess()) {

  52. log.info("Netty server listening " + hostname + " on port " + port + " and ready for connections...");

  53. } else {

  54. log.error("Netty server start up Error!");

  55. }

  56. }

  57. return f;

  58. }


  59. /**

  60. * 停止服务

  61. */

  62. public void destroy() {

  63. log.info("Shutdown Netty Server...");

  64. if(channel != null) { channel.close();}

  65. workerGroup.shutdownGracefully();

  66. bossGroup.shutdownGracefully();

  67. log.info("Shutdown Netty Server Success!");

  68. }

  69. }

服务端业务处理handler

服务端的生命周期以及接收客户端的信息收发和处理。这里不建议使用阻塞的操作,容易影响netty的性能。

  1. /***

  2. * 服务端自定义业务处理handler

  3. */

  4. public class EchoServerHandler extends ChannelInboundHandlerAdapter {


  5. /**

  6. * 对每一个传入的消息都要调用;

  7. * @param ctx

  8. * @param msg

  9. * @throws Exception

  10. */

  11. @Override

  12. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {


  13. ByteBuf in = (ByteBuf) msg;

  14. System.out.println("server received: "+in.toString(CharsetUtil.UTF_8));


  15. ctx.write(in);

  16. }



  17. /**

  18. * 通知ChannelInboundHandler最后一次对channelRead()的调用时当前批量读取中的的最后一条消息。

  19. * @param ctx

  20. * @throws Exception

  21. */

  22. @Override

  23. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

  24. ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);

  25. }


  26. /**

  27. * 在读取操作期间,有异常抛出时会调用。

  28. * @param ctx

  29. * @param cause

  30. * @throws Exception

  31. */

  32. @Override

  33. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

  34. cause.printStackTrace();

  35. ctx.close();

  36. }

  37. }

Springboot启动服务端代码

CommandLineRunner #run()

这里主要是通过CommandLineRunner 接口的run方法,实现在项目启动后执行的功能,SpringBoot提供的一种简单的实现方案就是添加一个model并实现CommandLineRunner接口,实现功能的代码放在实现的run方法中。

addShutdownHook()

Runtime.getRuntime().addShutdownHook(shutdownHook); 这个方法的意思就是在jvm中增加一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的所有通过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭。所以这些钩子可以在jvm关闭的时候进行内存清理、对象销毁等操作。

详细代码如下:

  1. @SpringBootApplication

  2. public class SpringNettyApplication implements CommandLineRunner {


  3. @Value("${netty.port}")

  4. private int port;


  5. @Value("${netty.url}")

  6. private String url;


  7. @Autowired

  8. private EchoServer echoServer;


  9. public static void main(String[] args) {

  10. SpringApplication.run(SpringNettyApplication.class, args);

  11. }


  12. @Override

  13. public void run(String... args) throws Exception {

  14. ChannelFuture future = echoServer.start(url,port);

  15. Runtime.getRuntime().addShutdownHook(new Thread(){

  16. @Override

  17. public void run() {

  18. echoServer.destroy();

  19. }

  20. });

  21. //服务端管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程

  22. future.channel().closeFuture().syncUninterruptibly();

  23. }

  24. }

Netty客户端

它在本文中的作用主要是为了测试服务端是否正常运行,通过客户端连接Netty的服务端,看到消息是否正常通信。

  1. /**

  2. * 客户端

  3. * 1.为初始化客户端,创建一个Bootstrap实例

  4. * 2.为进行事件处理分配了一个NioEventLoopGroup实例,其中事件处理包括创建新的连接以及处理入站和出站数据;

  5. * 3.当连接被建立时,一个EchoClientHandler实例会被安装到(该Channel的一个ChannelPipeline中;

  6. * 4.在一切都设置完成后,调用Bootstrap.connect()方法连接到远程节点。

  7. */

  8. public class EchoClient {


  9. private final String host;

  10. private final int port;



  11. public EchoClient(String host, int port) {

  12. this.host = host;

  13. this.port = port;

  14. }



  15. /**

  16. * 运行流程:

  17. * @param args

  18. * @throws Exception

  19. */

  20. public static void main(String[] args) throws Exception {

  21. new EchoClient("127.0.0.1",10010).start();

  22. }


  23. private void start() throws Exception {


  24. /**

  25. * Netty用于接收客户端请求的线程池职责如下。

  26. * (1)接收客户端TCP连接,初始化Channel参数;

  27. * (2)将链路状态变更事件通知给ChannelPipeline

  28. */

  29. EventLoopGroup group = new NioEventLoopGroup();

  30. try {

  31. Bootstrap b = new Bootstrap();

  32. b.group(group)

  33. .channel(NioSocketChannel.class)

  34. .remoteAddress(new InetSocketAddress(host,port))

  35. .handler(new ChannelInitializer<SocketChannel>() {

  36. @Override

  37. protected void initChannel(SocketChannel socketChannel) throws Exception {

  38. socketChannel.pipeline().addLast(new EchoClientHandler());

  39. }

  40. });

  41. //绑定端口

  42. ChannelFuture f = b.connect().sync();


  43. f.channel().closeFuture().sync();

  44. } catch (Exception e) {

  45. group.shutdownGracefully().sync();

  46. }

  47. }

  48. }

Netty客户端业务处理类

主要是监控Netty客户端的生命周期以及接收服务端的消息,往服务端发送消息等。

  1. /**

  2. * 客户端处理类

  3. */

  4. public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {


  5. /**

  6. * 在到服务器的连接已经建立之后将被调用

  7. * @param ctx

  8. * @throws Exception

  9. */

  10. @Override

  11. public void channelActive(ChannelHandlerContext ctx) throws Exception {

  12. ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks !", CharsetUtil.UTF_8));

  13. }


  14. /**

  15. * 当从服务器接收到一个消息时被调用

  16. * @param channelHandlerContext

  17. * @param byteBuf

  18. * @throws Exception

  19. */

  20. @Override

  21. protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {

  22. System.out.println("Client received: "+ byteBuf.toString(CharsetUtil.UTF_8));

  23. }


  24. /**

  25. * 在处理过程中引发异常时被调用

  26. * @param ctx

  27. * @param cause

  28. * @throws Exception

  29. */

  30. @Override

  31. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

  32. cause.printStackTrace();

  33. ctx.close();

  34. }

  35. }

总结

从整体来看,只不过是将Netty服务端从main函数启动方式改为交给Spring来管理启动和销毁的工作。

也就说以后你有个什么代码要交给Spring管理的也是可以这样子处理。

最后

如果对 Java、大数据感兴趣请长按二维码关注一波,我会努力带给你们价值。觉得对你哪怕有一丁点帮助的请帮忙点个赞或者转发哦。关注公众号【爱编码】,小编会一直更新文章的哦。


文章转载自爱编码,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论