在现代多线程和异步编程中,CancellationToken是一个常见的工具,用于及时、安全地取消长时间运行的任务或操作。然而,在处理此类对象时,如果不慎操作,也可能导致悬空指针这一严重问题,进而引发程序崩溃或者难以预测的行为。本文将通过分析一个Rust环境下的实际案例,探讨由使用 tokio_util::sync::CancellationToken 所引起的悬空指针问题。
CancellationToken:程序员必读的悬空指针灾难案例
案情重现与分析
在多线程异步任务程序开发过程中,我们遭遇了一场由于 CancellationToken 的使用而导致的悬空指针问题。该问题的核心在于,当一个线程持有指向某个正在执行任务对象的指针,并通过CancellationToken请求取消该任务时,如果取消动作触发了对象的提前析构,而其他线程仍试图访问这个已被析构的对象,则会产生悬空指针异常。
use std::io::{Error, ErrorKind, Result}; use std::os::fd::AsRawFd; use std::path::PathBuf; use std::sync::Arc; use tokio_util::sync::CancellationToken; mod memory; pub(crate) async fn asyncify<F, T>(f: F) -> Result<T> where F: FnOnce() -> Result<T> + Send + 'static, T: Send + 'static, { match tokio::task::spawn_blocking(f).await { Ok(res) => res, Err(e) => Err(Error::new( ErrorKind::Other, format!("background task failed: {:?}", e), )), } } fn check_err_size(e: libc::ssize_t) -> Result<usize> { if e == -1_isize { Err(Error::last_os_error()) } else { Ok(e as usize) } } pub fn libc_pread(raw_fd: usize, pos: u64, len: usize, ptr: u64) -> Result<usize> { check_err_size(unsafe { libc::pread( raw_fd as std::os::fd::RawFd, ptr as *mut _, len as _, pos as libc::off_t, ) }) } async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> { let len = data.len(); let ptr = data.as_ptr() as u64; let fd = file.as_ref().as_raw_fd() as usize; let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?; Ok(len) // libc_pread(fd, pos, len, ptr) } async fn read_data_from_file(file: Arc<std::fs::File>) -> usize { let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024]; println!("----- * before read"); let len = read_file(file, 0, &mut buf).await.unwrap(); println!("----- * after read"); len } async fn test_read() { let cancel: CancellationToken = CancellationToken::new(); let file_path = PathBuf::from("./a_big_big_file"); let file = Arc::new(std::fs::File::open(file_path).unwrap()); println!("----- begin test ------------"); let can_tok = cancel.clone(); tokio::spawn(async move { loop { tokio::select! { _ = can_tok.cancelled() => { println!("----- cancelled break loop"); break; } res = read_data_from_file(file.clone()) => { println!("----- read data len: {}",res); } } } }); tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; cancel.cancel(); println!(" ----- cancel.cancel()"); tokio::time::sleep(tokio::time::Duration::from_millis(5 * 1000)).await; println!("----- test over"); } // #[cfg(unix)] // #[global_allocator] // static A: memory::DebugMemoryAlloc = memory::DebugMemoryAlloc; fn main() { let rt = tokio::runtime::Builder::new_multi_thread() .enable_all() .worker_threads(4) .thread_stack_size(4 * 1024 * 1024) .build() .unwrap(); rt.block_on(async move { test_read().await; }); std::thread::sleep(std::time::Duration::from_secs(10000)); }
复制
在上述例子程序中启动一个异步任务读取文件,然后在主线程中等待100ms取消任务。请注意读取文件的方式采用:tokio::task::spawn_blocking。同时准备一个超过2G大小的文件,确保100ms内读取不会完成。
- 运行程序,在函数read_data_from_file()中会发生如下现象
async fn read_data_from_file(file: Arc<std::fs::File>) -> usize { let mut buf: Vec<u8> = vec![0_u8; 2*1024*1024*1024]; println!("----- * before read"); // 会执行输出 let len = read_file(file, 0, &mut buf).await.unwrap(); println!("----- * after read"); // 不会执行输出 len }
复制
这说明异步任务被取消后,并没有等待read_file()执行完成后再取消,而是直接打断了read_data_from_file函数的执行。聪明的你有没有发现这将导致函数中局部变量buf的提前释放,但后面的异步读取文件还会继续使用这块内存,造成悬空指针问题。通过用文章后面附录提供的内存分配、释放跟踪可以证明这一点:异步任务被取消后局部内存立马释放。
- 我们可以尝试用阻塞式读取文件,而不是用tokio::task::spawn_blocking;也就是把读取相关代码改成如下方式,观察会发生什么现象。
async fn read_file(file: Arc<std::fs::File>, pos: u64, data: &mut [u8]) -> Result<usize> { let len = data.len(); let ptr = data.as_ptr() as u64; let fd = file.as_ref().as_raw_fd() as usize; // let len = asyncify(move || libc_pread(fd, pos, len, ptr)).await?; // Ok(len) libc_pread(fd, pos, len, ptr) }
复制
采用此种方式异步任务被取消后,会等待read_file()执行完成,不会造成悬空指针问题。因为在一个阻塞方法中,只有执行完成后异步任务才会有机会被取消。
重返现场
在CnosDB长期稳定性测试环境中我们会经常利用coredump文件排查故障。分析coredump文件发现每次挂掉的位置都不同,而且还经常挂掉在不同的第三方库,更甚至是一些不可能发生问题的代码处。我们也是怀疑内存写坏导致的,但苦于找不到具体原因,我们同时通过以下方式进行了排查:
- 分析近期的代码变更、并测试可疑的提交
- 排查可疑的第三方库
- 不同的编译、打包环境尝试
- 使用Valgrind进行测试排查
- 针对不同程序模块进行排查测试
- 添加调试、测试代码等
- 通过asan进行测试排查
最终,我们通过asan发现如下现象,说明存在内存释放后继续使用的问题。
然后,再通过addr2line找到相应调用栈定位到错误使用内存之处,也就是下面函数中的data参数被释放了但是继续使用。
虽然,通过工具发现了内存释放后继续使用的问题;但是,一时并没明白这块内存到底是怎么被释放的;最终,把怀疑的目光放到了CancellationToken这一块,然后通过测试确认了这一点。下面是具体使用之处,series_iter_closer是一个CancellationToken类型的变量,iter.next()会最终调用上面的pread()方法。
至此,由于悬空指针导致的惨案已经分析完毕。
解决方案
我们此处短期的解决方案是修改pread函数为直接调用os::pread(fd, pos, len, ptr)变为一个阻塞调用;长期会修改pread函数改为内部分配buffer的方式,不再通过外部传入参数来避免悬空指针问题。
附录
自定义内存分配释放器
extern crate core; use core::alloc::{GlobalAlloc, Layout}; use libc::{c_int, c_void}; use tikv_jemalloc_sys as ffi; #[cfg(all(any( target_arch = "arm", target_arch = "mips", target_arch = "mipsel", target_arch = "powerpc" )))] const ALIGNOF_MAX_ALIGN_T: usize = 8; #[cfg(all(any( target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64", target_arch = "powerpc64le", target_arch = "mips64", target_arch = "riscv64", target_arch = "s390x", target_arch = "sparc64" )))] const ALIGNOF_MAX_ALIGN_T: usize = 16; fn layout_to_flags(align: usize, size: usize) -> c_int { if align <= ALIGNOF_MAX_ALIGN_T && align <= size { 0 } else { ffi::MALLOCX_ALIGN(align) } } // Assumes a condition that always must hold. macro_rules! assume { ($e:expr) => { debug_assert!($e); if !($e) { core::hint::unreachable_unchecked(); } }; } #[derive(Copy, Clone, Default, Debug)] pub struct DebugMemoryAlloc; unsafe impl GlobalAlloc for DebugMemoryAlloc { #[inline] unsafe fn alloc(&self, layout: Layout) -> *mut u8 { assume!(layout.size() != 0); let flags = layout_to_flags(layout.align(), layout.size()); let ptr = if flags == 0 { ffi::malloc(layout.size()) } else { ffi::mallocx(layout.size(), flags) }; ptr as *mut u8 } #[inline] unsafe fn alloc_zeroed(&self, layout: Layout) -> *mut u8 { assume!(layout.size() != 0); let flags = layout_to_flags(layout.align(), layout.size()); let ptr = if flags == 0 { ffi::calloc(1, layout.size()) } else { ffi::mallocx(layout.size(), flags | ffi::MALLOCX_ZERO) }; ptr as *mut u8 } #[inline] unsafe fn realloc(&self, ptr: *mut u8, layout: Layout, new_size: usize) -> *mut u8 { let new_layout = unsafe { Layout::from_size_align_unchecked(new_size, layout.align()) }; let new_ptr = unsafe { self.alloc(new_layout) }; unsafe { let size = std::cmp::min(layout.size(), new_size); std::ptr::copy_nonoverlapping(ptr, new_ptr, size); self.dealloc(ptr, layout); } new_ptr } #[inline] unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) { assume!(!ptr.is_null()); assume!(layout.size() != 0); let flags = layout_to_flags(layout.align(), layout.size()); ffi::sdallocx(ptr as *mut c_void, layout.size(), flags); if layout.size() >= 2*1024*1024*1024 { panic!("-------- free big memory: {}", layout.size()); } } }
复制
作者简介:
“Hi,我是允哥,一个内向且不善言谈的大厂老鸟,喜欢默默地在代码丛林中捉虫(Bug),目前是CnosDB的一名工程师。”