暂无图片
暂无图片
1
暂无图片
暂无图片
暂无图片

父子线程数据安全访问的解决方案和原理

原创 soul0202 2025-01-24
314

一、支持多线程并发保证数据安全访问途径

1.1、ThreadLocal

简称TL

在Java的多线程并发执行过程中,为保证多个线程对变量的安全访问,可以将变量放到ThreadLocal类型的对象中,它为每个线程提供了它自己的变量副本,使得线程间无法相互访问对方的变量,从而避免了线程间的竞争和数据泄露问题。适用于需要在线程内部存储和获取数据,且不希望与其他线程共享数据的场景。ThreadLocal也就不存在继承性

1.2、InheritableThreadLocal

简称ITL

它是ThreadLocal的一个扩展,支持父子线程之间的变量传递。InheritableThreadLocal通过在创建新线程时复制父线程中的变量值,使得子线程可以访问到父线程中通过InheritableThreadLocal存储的数据。这种机制使得InheritableThreadLocal在需要跨线程(特别是父子线程)传递数据时非常有用。然而,它有一个限制,即在使用预先创建的线程池时,InheritableThreadLocal可能无法正确获取父线程中的值,因为线程池中的线程可能并不是直接从父线程继承得到的‌‌。

1.3、TransmittableThreadLocal

简称TTL

‌ 是阿里巴巴团队提供的一个解决方案,专门解决InheritableThreadLocal在线程池环境中无法获取父级线程中变量的问题。TransmittableThreadLocal通过引入额外的机制,确保在线程池环境中也能正确地传递和获取变量,从而弥补了InheritableThreadLocal的不足。它通过引入依赖和特定的实现方式,使得即使在复杂的线程池环境中,也能保证线程局部变量的正确传递‌。

InheritableThreadLocal和TransmittableThreadLocal都是为了解决多线程环境下变量传递的问题,TransmittableThreadLocal特别是在使用线程池时,能够确保变量在线程间的正确传递。

二、 多线程测试情况

InheritableThreadLocal

@Async("asyncExecutor") 注解加value属性

Spring会使用指定名称的asyncExecutor bean来执行这个异步方法,这样可以更加精确地控制线程池的配置,线程数量、队列大小等。

但是在核心多线全部被使用过后,后需要的线程在使用了核心线程,就会复用之前的副本变量,导致数据错乱异常。

//一、多线程
@Bean("asyncExecutor")
public ThreadPoolTaskExecutor asyncExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);
    executor.setMaxPoolSize(10);
    executor.setQueueCapacity(500);
    executor.setThreadNamePrefix("MyAsyncExecutor-"); 
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.initialize();
    return executor;
}
@Async("asyncExecutor")
@Override
public void threadTest() {
    SessionInfo sessionInfo = XiaoniuThreadLocal.get();
    System.out.println(sessionInfo.getName());
}

//二、多线程
ExecutorService executorService = Executors.newFixedThreadPool(5);
@Resource(name = "asyncExecutor")
ThreadPoolTaskExecutor executorService;


@Async注解不加 value属性

Spring会使用默认的异步执行机制。意味着Spring将查找一个名为applicationTaskExecutor的bean作为TaskExecutor来执行异步任务。如果没有找到这样的bean,它将会自动创建一个默认的SimpleAsyncTaskExecutor实例来执行异步方法。默认的SimpleAsyncTaskExecutor通常会为每个任务创建一个新的线程,因此线程的数量理论上是无限制的,但实际上受到JVM的最大线程数限制。

默认线程池的大小 默认情况下,在使用SimpleAsyncTaskExecutor,它不会预先创建线程,而是在需要时为每个任务创建新的线程。因此,在这种情况下,线程池大小是无上限的。 如果使用的是ThreadPoolTaskExecutor,它可以配置核心池大小、最大池大小以及其他线程池参数。如果没有特别配置,核心线程数通常是Runtime.getRuntime().availableProcessors()的结果,当前系统可用处理器的数量,而最大线程数通常是无限的。

三、现状

现在的tms-service多线程上线文传递实现方式

public class XiaoniuTreadLocal {
    private static InheritableThreadLocal<String> LOCAL = new InheritableThreadLocal();

    private XiaoniuTreadLocal() {
    }

    public static void set(String name) {
        LOCAL.set(name);
    }
    public static String get() {
        return LOCAL.get();
    }

    public static void remove() {
        LOCAL.remove();
    }
}

InheritableThreadLocal类是ThreadLocal类的子类,重写了chidValue、getMap、createMap三个方法,

其中createMap方法在被调用的时候创建的是inheritableThreadLocals变量值(ThreadLocal类中创建的是threadLocals变量的值),getMap方法在被get或set方法调用的时候返回的也是线程的inheritableThreadLocals变量。

如果是否初始化线程的inheritThreadLocals变量为true并且父线程的inheritThreadLocals变量不为null

设置子线程的inheritThreadLocals变量为父线程的inheritThreadLocals变量

InheritableThreadLocal类通过重写ThreadLocal类中的get、set方法。调用方法时会对线程的inheritableThreadLocals变量进行初始化,在对子线程进行初始化的时候,会将父线程的inheritableThreadLocals变量值赋值到子线程的inheritableThreadLocals变量中,这样就实现了子线程继承父线程问题。

四、解决方案

4.1、每次都创建一个新的线程

// 使用注解创建一个异步线程

@Async

//2 创建一个新的线程

new Thread(() -> {

// .......

}).start();

缺点: 高资源消耗,频繁创建线程,内存维护 上下文切换 并发控制困难,可能导致资源不足 可维护性差

4.2、实现多线程上下文准确传递

方案一:使用多线程装饰器,在装饰器中复制上下文,保证线程中的副本准确

@Configuration
public class AsyncConfiguration {

    @Bean("asyncExecutor")
    public ThreadPoolTaskExecutor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("MyAsyncExecutor-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        executor.setTaskDecorator(new MyDecorator());
        return executor;
    }
    /**
     * 线程装饰器
     */
    static class MyDecorator implements TaskDecorator {
        @NotNull
        @Override
        public Runnable decorate(@NotNull Runnable runnable) {
            // 获取主线程请求信息
            RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
            SessionInfo sessionInfo = Optional.ofNullable(XiaoniuThreadLocal.get()).orElse(new SessionInfo());
            return () -> {
                try {
                    XiaoniuThreadLocal.set(sessionInfo);
                    runnable.run();
                } finally {
                    XiaoniuThreadLocal.remove();
                }
            };
        }
    }
}

@Resource(name = "asyncExecutor")
ThreadPoolTaskExecutor executorService;

方案二:使用 TransmittableThreadLocal

public class XtmThreadLocal {
    private static TransmittableThreadLocal<String> LOCAL = new TransmittableThreadLocal();

    private XtmThreadLocal() {
    }

    public static void set(String name) {
        LOCAL.set(name);
    }
    public static String get() {
        return LOCAL.get();
    }

    public static void remove() {
        LOCAL.remove();
    }
}

// 线程池1
ExecutorService executorService = TtlExecutors.getTtlExecutorService(Executors.newFixedThreadPool(10));

// 线程池2
@Bean("asyncExecutor")
public ExecutorService asyncExecutor() {
    ThreadFactory threadFactory = new ThreadFactory() {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "MyAsyncExecutor-" + threadNumber.getAndIncrement());
            return t;
        }
    };
    ThreadPoolExecutor executor = new ThreadPoolExecutor(
            20,
            20,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            threadFactory,
            new ThreadPoolExecutor.CallerRunsPolicy()
    );
    // 使用TTL包装线程池
    return TtlExecutors.getTtlExecutorService(executor);
}
//线程池3
@Bean("asyncExecutor")
public Executor asyncExecutor() {
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(5);
    threadPoolTaskExecutor.setMaxPoolSize(5);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setQueueCapacity(200);
    threadPoolTaskExecutor.setThreadNamePrefix("MyAsyncExecutor-0");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    threadPoolTaskExecutor.initialize();
    return TtlExecutors.getTtlExecutor(threadPoolTaskExecutor);
}

4.3、TransmittableThreadLocal 组件介绍

4.3.1、介绍

ThreadLocal和InheritableThreadLocal 能够完成变量的线程本地化和父子线程中的value传递。

TTL组件功能:在使用线程池时会池化复用线程的执行组件情况下,提供ThreadLocal值的传递功能,解决异步执行时上下文传递的问题。

业务期望:上下文生命周期的操作从业务逻辑中分离出来。业务逻辑不涉及生命周期,就不会有业务代码如疏忽清理而引发的问题了。

4.3.2、实现原理

  • TransmittableThreadLocal 继承于 InheritableThreadLocal,并拥有了 InheritableThreadLocal 对子线程传递上下文的特性;
  • TransmittableThreadLocal中重要属性holder;主要用于存储当前线程的数据以及传递给子线程的数据。holder 的作用:
    • 存储当前线程的数据:每个线程都有一个对应的 holder 对象,用来存储该线程的本地数据。
    • 传递给子线程的数据:当创建新线程时,holder 负责将当前线程的数据传递给子线程,确保子线程可以继承父线程的数据。
  • initialValue方法是InheritableThreadLocal 创建时调用,默认创建一个 WeakHashMap。(WeakHashMap是弱引用,也就是GC会主动的帮你把内存回收掉,但是当弱引用指定的对象有强引用指向时GC则不会回收对象)
    • initialValue 创建了WeakHashMap这个对象用于存储了线程与它们对应的值之间的映射关系。默认initialValue是 null,但可以被重写以提供具体的初始值逻辑。
  • childValue 方法用于当一个新线程作为当前线程的子线程被创建时,决定子线程继承哪些线程局部变量的值。
    • 在创建子线程时,Thread 类的 init 方法会调用 ThreadLocal.createInheritedMap 方法来创建一个 ThreadLocalMap,其中包含了所有可继承的 InheritableThreadLocal 的实例及其对应的值。
    • createInheritedMap 方法内部会遍历父线程的 inheritableThreadLocals 集合,并且对每个 InheritableThreadLocal 调用其 childValue 方法来获取子线程应该继承的具体值。
  • 从父线程获取可继承的ThreadLocal变量

  • TransmittableThreadLocal重载了get和set方法,新增了addThisToHolder和removeThisFromHolder的逻辑;会把当前这个threadlocal缓存到holder。

  • TtlRunnable
    • Transmitter.capture():用于捕获父线程的ttl,会将当父线程上下文保存起来;
    • capturedRef.get():TtlRunnable 会从 AtomicReference 中获取出调用线程中所有的上下文。
    • Transmitter.replay(captured):用来将父线程的TTL循环复制进当前子线程,返回的backup是此子线程原来就有的本地变量值, backup用于恢复数据(如果任务执行完毕,意味着该子线程会归还线程池,那么需要将其原生本地变量属性恢复)
    • restore方法用来恢复原有值的


「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论