暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

java1.8项目纤程实战和性能压测

IT学习道场 2023-04-11
286

新建一个maven依赖项目

pom.xml中引入纤程jar的坐标依赖

    <dependency>
    <groupId>co.paralleluniverse</groupId>
    <artifactId>quasar-core</artifactId>
    <version>0.7.4</version>
    <classifier>jdk8</classifier>
    </dependency>
    复制

    纤程使用示例

      package com.example.demo.fiber;


      import co.paralleluniverse.fibers.Fiber;
      import co.paralleluniverse.fibers.futures.AsyncCompletionStage;
      import co.paralleluniverse.strands.Strand;
      import co.paralleluniverse.strands.SuspendableRunnable;
      import com.example.demo.fiber.tool.WorkTools;


      import java.util.concurrent.CountDownLatch;


      /**
      * 描述:纤程demo </br>
      * 作者:王林冲 </br>
      * 时间:2023/4/7 15:44
      */
      public class Test {
      public static void main(String[] args) throws InterruptedException {
      fiberTest();
      }


      public static void fiberTest() throws InterruptedException {
      new Fiber(() -> {
      Strand.sleep(5000);
      System.out.println("纤程开始执行了");
      }).start();
      System.out.println("主线程执行完毕");
      }


      }


      复制

      为了实现类似于线程池的功能,想在一个批量处理的过程中,开多个纤程处理,在统一获取结果,然后继续主线程执行,场景相当多

      自己实现个纤程池

        package com.example.demo.fiber.tool;


        import co.paralleluniverse.fibers.Fiber;
        import lombok.extern.slf4j.Slf4j;
        import java.util.ArrayList;
        import java.util.List;
        import java.util.concurrent.CountDownLatch;


        /**
        * 描述:协程工作程池 </br>
        * 作者:王林冲 </br>
        * 时间:2023/4/7 17:33
        */
        @Slf4j
        public class FiberWorkPool {
        /**
        * 工作协程数组
        */
        private List<Fiber> workThreads;
        /**
        * 协程任务倒计数门栓
        */
        private CountDownLatch countDownLatch = new CountDownLatch(0);




        /**
        * 建立协程池,taskCount 为协程池中工做协程的个数
        * @param taskCount
        */
        public FiberWorkPool(int taskCount) {
        workThreads = new ArrayList<>(taskCount);
        countDownLatch = new CountDownLatch(taskCount);
        }


        /**
        * 任务加入任务队列
        * @param task
        */
        public void execute(Fiber task) {
        try {
        workThreads.add(task); //阻塞接口的Fiber work插入
        } catch (Exception e) {
        log.error("========> Fiber work add failed ..., msg : {}", e.getMessage());
        }
        }


        //销毁协程池,该方法保证全部任务都完成的状况下才销毁全部协程,不然等待任务完成再销毁
        public void shutdown() throws InterruptedException {
        start();
        countDownLatch.await();
        workThreads.clear(); //清空等待队列
        log.debug("========> successfully closed FiberWorkPool ...");
        }


        /**
        * 启动协程池里所有的协程
        */
        public void start() {
        if (workThreads.size() != 0) {
        for (Fiber fiber : workThreads) {
        fiber.start();
        }
        }
        }


        /**
        * 获取倒计数门栓
        *
        * @return
        */
        public CountDownLatch getCountDownLatch() {
        return this.countDownLatch;
        }




        }


        复制

        纤程池使用demo

          package com.example.demo.fiber.tool;


          import co.paralleluniverse.fibers.Fiber;
          import com.google.common.collect.Lists;
          import org.springframework.stereotype.Component;


          import java.util.List;
          import java.util.concurrent.*;


          /**
          * 描述:协程池应用demo </br>
          * 作者:王林冲 </br>
          * 时间:2023/4/10 17:17
          */
          @Component
          public class FiberWorkPoolAppDemo {


          public void fiber() throws InterruptedException {
          //开启5个协程,50个任务列队。
          FiberWorkPool fiberWorkPool = new FiberWorkPool(50);
          for (int i = 0; i < 50; i++) {
          fiberWorkPool.execute(new Fiber(() -> {
          Fiber.sleep(50);
          //System.out.println("========= " + Fiber.currentFiber().getName() + " ============");
          fiberWorkPool.getCountDownLatch().countDown();
          }));
          }
          //等待协程任务完毕后再结束主线程
          fiberWorkPool.shutdown();
          }


          public void thread() throws ExecutionException, InterruptedException {
          List<Future<Void>> futures = Lists.newArrayList();
          ExecutorService executorService = Executors.newFixedThreadPool(50);
          for (int i = 0; i < 50; i++) {
          futures.add(executorService.submit(new Callable<Void>() {
          @Override
          public Void call() throws Exception {
          Thread.sleep(50);
          return null;
          }
          }));
          }
          for (Future<Void> future : futures) {
          future.get();
          }
          executorService.shutdownNow();
          }




          }


          复制

          测试controller

            package com.example.demo.fiber.tool;


            import org.springframework.beans.factory.annotation.Autowired;
            import org.springframework.web.bind.annotation.GetMapping;
            import org.springframework.web.bind.annotation.RequestMapping;
            import org.springframework.web.bind.annotation.RestController;


            import java.util.concurrent.ExecutionException;


            /**
            * 描述:协程controller </br>
            * 作者:王林冲 </br>
            * 时间:2023/4/10 17:20
            */
            @RestController
            @RequestMapping("/fiber")
            public class FiberTestController {


            @Autowired
            private FiberWorkPoolAppDemo fiberWorkPoolAppDemo;


            /**
            * 协程测试
            * @throws InterruptedException
            * @throws ExecutionException
            */
            @GetMapping("/fiberTest")
            public void fiberTest () throws InterruptedException, ExecutionException {
            Long start = System.currentTimeMillis();
            fiberWorkPoolAppDemo.fiber();
            System.out.println("=======> "+ (System.currentTimeMillis() - start) + "=======毫秒");
            }


            /**
            * 线程测试
            * @throws InterruptedException
            * @throws ExecutionException
            */
            @GetMapping("/threadTest")
            public void threadTest () throws InterruptedException, ExecutionException {
            Long start = System.currentTimeMillis();
            fiberWorkPoolAppDemo.thread();
            System.out.println("=======> "+ (System.currentTimeMillis() - start) + "=======毫秒");
            }


            }


            复制

            jmeter压测参数

            线程池压测

            纤程池压测

            差距一目了然,当你的线程池,异步出现性能问题时,请考虑纤程,让你的代码性能数量级的提升,线程池之所以慢,是因为大量的线程频繁的上下文切换,和线程此中任务争夺线程while循环,耗cpu那是相当多,纤程就避免了这个问题。所以性能刚杠杠的


            文章转载自IT学习道场,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

            评论