
本篇中所有代码分析,都是基于Jewel 10.2.9版本。本篇都是个人理解,其中有些理解或者解释有不合理的,还请指正。
在Ceph的使用中,运行时调整参数值是个高频的操作,使用起来简单方便,最重要的是不用重启服务即可生效。
如何使用
Ceph动态调整参数有两种方式:
第一种:
1. ceph daemon <mon/osd/mds>.<id> config set <参数名> <参数值> 比如,设置OSD 1的heartbeat超时时间: ceph daemon osd.1 config set osd_heartbeat_grace 60
第二种:
2. ceph tell <mon/osd/mds>.<id> injectargs '--<参数名> <参数值>' 设置OSD 1的heartbeat超时时间: ceph tell osd.1 injectargs '--osd_heartbeat_grace 60'
第二种还有两个比较好用的地方:
1. 单条命令可以改变所有的实例的某个参数值:
ceph tell
设置所有OSD的heartbeat超时时间:
ceph tell osd.* injectargs ‘—osd_heartbeat_grace 60’
2. 单条命令可以改变多个参数:
ceph tell
设置OSD 1的heartbeat超时时间,及发起heartbeat的时间间隔
ceph tell osd.1 injectargs ‘—osd_heartbeat_grace 60 —osd_heartbeat_interval 10’
3. 当然,上面两个可以结合使用:
ceph tell
设置所有OSD的heartbeat超时时间,及发起heartbeat的时间间隔
ceph tell osd.* injectargs ‘—osd_heartbeat_grace 60 —osd_heartbeat_interval 10’
源码分析
那么Ceph内部是如何实现上面提到的动态更新呢?我们来深入代码中一探究竟。
tell方式的实现
我们先看tell…injectargs方式的实现:
ceph命令行的输入,源码入口都是ceph.in文件,是python文件,也就是/usr/bin/下的可执行文件ceph。
ceph.in中有关tell的代码:
def main(): ... // 如果命令行中有'injectargs'字串,就进行切分成两个:'injectargs'之前的部分是childargs,之后的部分是injectargs。 if 'injectargs' in childargs: position = childargs.index('injectargs') injectargs = childargs[position:] childargs = childargs[:position] if verbose: print('Separate childargs {0} from injectargs {1}'.format(childargs, injectargs), file=sys.stderr) else: injectargs = None ... if injectargs and '--' in injectargs injectargs.remove('--') ... // 如果childargs有'tell'命令,或者前面已经确定有injectargs,就将childargs重新赋值,也就是说childargs是真正要去发送给server端的命令。 // 在这里,和tell就没关系了,tell关键字并不会发到后端。 is_tell = False if len(childargs) and childargs[0] == 'tell': childargs = childargs[2:] is_tell = True if is_tell: if injectargs: childargs = injectargs if not len(childargs): print('"{0} tell" requires additional arguments.'.format(sys.argv[0]), 'Try "{0} tell <name> <command> [options...]" instead.'.format(sys.argv[0]), file=sys.stderr) return errno.EINVAL ... // 每个命令执行,都有目标端,如果命令行指明了目标server,就向具体的server发;如果是'*',会先把所有的instance id拿到,然后在后面的for循环中,逐个去发送命令。 if target[1] == '*': if target[0] == 'osd': targets = [(target[0], o) for o in osdids()] elif target[0] == 'mon': targets = [(target[0], m) for m in monids()] ... for target in targets: ...
从上面的代码,可知,tell命令,并没有走daemon的admin socket来进行通信,而是走正常的client->server的通信,而且,’*’也没有那么高效,就是拿到所有的id,然后逐个发送消息。
然后再经过librados、osdc等模块,将消息发送给具体的Monitor、OSD、MDS。
我们接下来直接看OSD端收到injectargs命令后,如何处理。
在OSD.cc中,do_command函数就是专门处理各种命令的,我们直接看injectargs分支:
void OSD::do_command(Connection *con, ceph_tid_t tid, vector<string>& cmd, bufferlist& data) { ... else if (prefix == "injectargs") { vector<string> argsvec; cmd_getval(cct, cmdmap, "injected_args", argsvec); if (argsvec.empty()) { r = -EINVAL; ss << "ignoring empty injectargs"; goto out; } string args = argsvec.front(); // 获取所有的要进行更新的参数 for (vector<string>::iterator a = ++argsvec.begin(); a != argsvec.end(); ++a) args += " " + *a; osd_lock.Unlock(); cct->_conf->injectargs(args, &ss); osd_lock.Lock(); } ... }
拿到client想要更新的所有参数,然后调用了injectargs,每个OSD都有一个CephContext类变量cct,md_config_t类型的_conf变量也是CephContext类的成员。
int md_config_t::injectargs(const std::string& s, std::ostream *oss) { ... // 解析参数 ret = parse_injectargs(nargs, oss); ... // 生效更新的参数 _apply_changes(oss); } int md_config_t::parse_injectargs(std::vector<const char*>& args, std::ostream *oss) { ... for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) { // 逐个解析参数,并更新参数。 int r = parse_option(args, i, oss); if (r < 0) ret = r; } ... } int md_config_t::parse_option(std::vector<const char*>& args, std::vector<const char*>::iterator& i, ostream *oss) { ... // 检查更新的参数中,是否有子系统(subsystem)的log级别参数,也就是以debug_开头的日志级别参数。 // 但是下面的这种遍历,似乎很低效啊,如果根本就没有debug_类参数,还是会检查一遍。 // 再看了一下L版的代码,已经修改了,在parse_option中没有这种遍历式检查subsystem,而是直接在md_config_t类的构造函数中把所有参数vector抓换为map,这样在运行时,能更高效的找到某个参数。 // subsystems? for (o = 0; o < subsys.get_num(); o++) { std::string as_option("--"); as_option += "debug_"; as_option += subsys.get_name(o); if (ceph_argparse_witharg(args, i, &val, as_option.c_str(), (char*)NULL)) { int log, gather; int r = sscanf(val.c_str(), "%d/%d", &log, &gather); ... // ceph的日志级别参数调整,直接就通过下面的两个函数调整了,和其他参数的处理是有区别的。 subsys.set_log_level(o, log); subsys.set_gather_level(o, gather); ... } } // 对于非log level参数,区分了Bool型和其他类型,然后调用的set_val_impl进行设置 for (o = 0; o < NUM_CONFIG_OPTIONS; ++o) { ostringstream err; const config_option *opt = config_optionsp + o; std::string as_option("--"); as_option += opt->name; if (opt->type == OPT_BOOL) { if (ceph_argparse_binary_flag(args, i, &res, oss, as_option.c_str(), (char*)NULL)) { if (res == 0) set_val_impl("false", opt); else if (res == 1) set_val_impl("true", opt); else ret = res; ... } else if (ceph_argparse_witharg(args, i, &val, err, as_option.c_str(), (char*)NULL)) { int res = set_val_impl(val.c_str(), opt); } ... } int md_config_t::set_val_impl(const char *val, const config_option *opt) { assert(lock.is_locked()); // 先设置参数值,具体看下面的函数原型 int ret = set_val_raw(val, opt); if (ret) return ret; // 参数通过上面的调用实现了,但是参数并不一定生效,对于大部分参数,已经在进程启动的时候加载了,所以现在更重要的是将这些参数生效。 // changed是一个set类型变量,主要记录的是从上一次apply_changes后改变的参数。 // 在injectargs函数中,parse_injectargs解析之后,调用了_apply_changes函数,函数的参数就是changed变量: changed.insert(opt->name); return 0; } // 设置参数值,根据参数类型来设置,做一些强制类型转换。 int md_config_t::set_val_raw(const char *val, const config_option *opt) { switch (opt->type) { case OPT_INT: { *(int*)opt->conf_ptr(this) = f; } case OPT_STR: *(std::string*)opt->conf_ptr(this) = val ? val : ""; ... } }
上面代码完成了参数的更新,主要有两点:
debug_类的参数(日志级别参数)单独走的一套,和其他参数不是同一种处理。
其他参数,先更新内存中的参数值;然后更重要的是:需要使其生效,即能够让系统真实的使用这些参数了。
后续的apply_changes是如何实现的?我们先暂放一下,先来看看daemon方式修改参数,因为这两种最终都会通过_apply_changes来使参数生效。
daemon方式的实现
我们继续看看daemon…config set方式的实现:
ceph.in中有关daemon的代码:
def main(): ... // 如果命令中有'daemon'字符串,先尝试看有没有socket path,如果没有就从配置文件中去找具体的instance的admin_socket参数值,也就是socket文件路径 sockpath = None if parsed_args.admin_socket: sockpath = parsed_args.admin_socket elif len(childargs) > 0 and childargs[0] in ["daemon", "daemonperf"]: ... if childargs[1].find('/') >= 0: sockpath = childargs[1] else: # try resolve daemon name try: sockpath = ceph_conf(parsed_args, 'admin_socket', childargs[1]) ... # for both: childargs = childargs[2:] ... if sockpath and daemon_perf: ... elif sockpath: try: // 尝试连接admin_socket,并发送命令 print(admin_socket(sockpath, childargs, format)) ... } def admin_socket(asok_path, cmd, format=''): def do_sockio(path, cmd_bytes): """ helper: do all the actual low-level stream I/O """ // 创建socket,并连接,然后发送相关命令 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(path) try: sock.sendall(cmd_bytes + '\0') len_str = sock.recv(4) if len(len_str) < 4: raise RuntimeError("no data returned from admin socket") l, = struct.unpack(">I", len_str) sock_ret = '' got = 0 while got < l: bit = sock.recv(l - got) sock_ret += bit got += len(bit) except Exception as sock_e: raise RuntimeError('exception: ' + str(sock_e)) return sock_ret ... try: ret = do_sockio(asok_path, json.dumps(valid_dict)) except Exception as e: raise RuntimeError('exception: ' + str(e)) return ret
Admin socket是什么呢?其实就是UNIX Domain Socket,是在socket架构上发展出的一种IPC机制,虽然网络socket也可用于同一台主机的进程间通讯(通过loopback地址127.0.0.1),但是UNIX Domain Socket用于IPC更有效率:不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程。这是因为,IPC机制本质上是可靠的通讯,而网络协议是为不可靠的通讯设计的。UNIX Domain Socket也提供面向流和面向数据包两种API接口,类似于TCP和UDP,但是面向消息的UNIX Domain Socket也是可靠的,消息既不会丢失也不会顺序错乱。
UNIX Domain Socket与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。
从上面的代码可以看到,我们现在是一个client,尝试进行socket连接、通信,那server端在哪里?
每个OSD实例在启动的时候,都有一个CephContext类变量cct变量,这个变量中会new AdminSocket对象,在ceph_osd.cc的main函数中会有cct的初始化操作:common_init_finish(g_ceph_context)。
我们先来看看common_init_finish函数:
common/common_init.cc: void common_init_finish(CephContext *cct) { cct->init_crypto(); // 启动一些线程,包括log线程、admin socket的初始化 int flags = cct->get_init_flags(); if (!(flags & CINIT_FLAG_NO_DAEMON_ACTIONS)) cct->start_service_thread(); // 修改admin socket path的权限 if ((flags & CINIT_FLAG_DEFER_DROP_PRIVILEGES) && (cct->get_set_uid() || cct->get_set_gid())) { cct->get_admin_socket()->chown(cct->get_set_uid(), cct->get_set_gid()); } } common/ceph_context.cc: void CephContext::start_service_thread() { ... if (_conf->admin_socket.length()) _admin_socket->init(_conf->admin_socket); } common/admin_socket.cc: bool AdminSocket::init(const std::string &path) { ... // 绑定socket文件,并开始监听sock fd err = bind_and_listen(path, &sock_fd); ... }
可以看到:UNIX Domain Socket server已经准备就绪,开始监听网络
在CephContext类构造函数中,创建了一个AdminSocket类变量_admin_socket,注册了一些_admin_socket可以处理的command:
CephContext::CephContext(uint32_t module_type_, int init_flags_) { ... _admin_socket = new AdminSocket(this); _admin_hook = new CephContextHook(this); _admin_socket->register_command("config show", "config show", _admin_hook, "dump current config settings"); _admin_socket->register_command("config set", "config set name=var,type=CephString name=val,type=CephString,n=N", _admin_hook, "config set <field> <val> [<val> ...]: set a config variable"); _admin_socket->register_command("config get", "config get name=var,type=CephString", _admin_hook, "config get <field>: get the config value"); ... }
从上面的代码可以看到,配置参数相关的这几个都已经注册在admin_socket中,而且这些命令相关的hook是CephContextHook类。
同时,在OSD启动时,admin_socket初始化成功后,调用了final_init,也注册了一些admin_socket可以识别的command。
我们再看看OSD的inal_init函数:
void OSD::final_init() { int r; // 这个admin_socket变量是从CephContext中获取的,并没new对象。这样CephContext和OSD中的admin_socket就是同一个, AdminSocket *admin_socket = cct->get_admin_socket(); asok_hook = new OSDSocketHook(this); r = admin_socket->register_command("status", "status", asok_hook, "high-level status of OSD"); assert(r == 0); r = admin_socket->register_command("flush_journal", "flush_journal", asok_hook, "flush the journal to permanent store"); ... }
在这里继续注册了一些可以通过admin socket来执行的命令,这里的hook是OSDSocketHook类。
AdminSocket类继承的Thread类,在线程入口函数entry中,会通过poll方式等待event,然后有connection的时候,会进行do_accept,然后进行正常的网络stream读写。
当client通过admin socket向server端发送了命令后,admin socket server会接收消息,在do_accept函数中,会判断这个command是否注册,如果注册了,调用相应的hook->call处理,最后将结果回复给client。
在CephContext类中,hook是CephContextHook类变量:
common/ceph_context.cc: class CephContextHook : public AdminSocketHook { CephContext *m_cct; public: explicit CephContextHook(CephContext *cct) : m_cct(cct) {} bool call(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist& out) { m_cct->do_command(command, cmdmap, format, &out); return true; } }
在OSD类中,hook是一个OSDSocketHook类变量:
osd/OSD.cc: class OSDSocketHook : public AdminSocketHook { OSD *osd; public: explicit OSDSocketHook(OSD *o) : osd(o) {} bool call(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist& out) { stringstream ss; bool r = osd->asok_command(command, cmdmap, format, ss); out.append(ss); return r; } }
在这里,我们先只看config set相关的处理,在CephContext的do_command函数中:
common/ceph_context.cc: void CephContext::do_command(std::string command, cmdmap_t& cmdmap, std::string format, bufferlist *out) { ... else if (command == "config set") { std::string var; std::vector<std::string> val; if (!(cmd_getval(this, cmdmap, "var", var)) || !(cmd_getval(this, cmdmap, "val", val))) { f->dump_string("error", "syntax error: 'config set <var> <value>'"); } else { // val may be multiple words string valstr = str_join(val, " "); int r = _conf->set_val(var.c_str(), valstr.c_str()); if (r < 0) { f->dump_stream("error") << "error setting '" << var << "' to '" << valstr << "': " << cpp_strerror(r); } else { ostringstream ss; // 调用apply_changes, 生效参数 _conf->apply_changes(&ss); f->dump_string("success", ss.str()); } } } ... } common/config.cc: int md_config_t::set_val(const char *key, const char *val, bool meta, bool safe) { ... // 下面的代码结构是不是很熟悉,对,和前面的tell方式中的parse_option很像,先设置了debug级别,然后再设置其他的参数。 if (strncmp(k.c_str(), "debug_", 6) == 0) { ... subsys.set_log_level(o, log); subsys.set_gather_level(o, gather); ... } ... for (int i = 0; i < NUM_CONFIG_OPTIONS; ++i) { ... return set_val_impl(v.c_str(), opt); ... } ... }
可以看到,和tell方式一样,也是先改变参数值,然后再调用_apply_changes来生效之。
参数如何应用到运行中的系统
现在,我们来看看_apply_changes的实现:
void md_config_t::_apply_changes(std::ostream *oss) { // 从变量命名来看,是reverse obs_map_t,也就是obs_map_t结构体的反转结构,obs_map_t是这样的结构:typedef std::multimap <std::string, md_config_obs_t*> obs_map_t; // rev_obs_map_t除了反转外,map value type成了set。 // obs_map_t是个multimap,存的是配置参数和config observer的映射关系。 // 现在的rev_obs_map_t是config observer和配置参数的映射关系,可以直接通过observer来查看都有哪些参数。 typedef std::map < md_config_obs_t*, std::set <std::string> > rev_obs_map_t; expand_all_meta(); rev_obs_map_t robs; std::set <std::string> empty_set; char buf[128]; char *bufptr = (char*)buf; // 这个循环就是要完成obs_map_t结构的反转,以及参数option组成的string set。 for (changed_set_t::const_iterator c = changed.begin(); c != changed.end(); ++c) { const std::string &key(*c); // multimap::equal_range是从map中查找某个key,返回第一个匹配的key对应的pair,和最后一个匹配key的下一个pair // 所以,这里就找到了某个参数的所有observers pair < obs_map_t::iterator, obs_map_t::iterator > range(observers.equal_range(key)); if ((oss) && (!_get_val(key.c_str(), &bufptr, sizeof(buf))) && !_internal_field(key)) { (*oss) << key << " = '" << buf << "' "; // 如果observers这个map中没有这个key,range的first和second都是指向同一个pair,或者都是NULL。 // 所以,这里就说明:没有任何observer关注这个参数。 // 这里的unchangeable是不是在设置参数的经常看到,其实就标明没有observer,也就更不能谈生效了。但是我们在检查参数的时候,确实看到了参数变了,就是因为之前看到的先设置了参数。 // 在L版的代码中,这个"unchangeable"输出已经换成了"not observed, change may require restart",也更清楚的说明了你修改的参数没人关注,没生效。 if (range.first == range.second) { (*oss) << "(unchangeable) "; } } // 生成rev_obs_map_t类型的robs for (obs_map_t::iterator r = range.first; r != range.second; ++r) { rev_obs_map_t::value_type robs_val(r->second, empty_set); pair < rev_obs_map_t::iterator, bool > robs_ret(robs.insert(robs_val)); std::set <std::string> &keys(robs_ret.first->second); keys.insert(key); } } // 遍历所有的observers,然后依次调用它的handle_conf_change,来应用参数。 // Make any pending observer callbacks for (rev_obs_map_t::const_iterator r = robs.begin(); r != robs.end(); ++r) { md_config_obs_t *obs = r->first; obs->handle_conf_change(this, r->second); } changed.clear(); }
上面代码中注释挺详细的了,我们在这里说一下observers是什么?就是具体的MON、OSD、MDS实例,这里用的是观察者模式,观察者关注自己关心的参数,这些参数一旦有更新,观察者会通过handle_conf_change使其生效。
那么,这些observer是什么时候开始watch的?我们这里只说OSD,在OSD::pre_init中,调用了cct->_conf->add_observer(this); pre_init也是在OSD启动时调用的。
// 具体的MON、OSD、MDS实例在启动时,调用本函数,将自己将入到config的observers中 void md_config_t::add_observer(md_config_obs_t* observer_) { Mutex::Locker l(lock); // 每个observer都watch哪些config option呢?就是通过自己的get_tracked_conf_keys来定义的 const char **keys = observer_->get_tracked_conf_keys(); for (const char ** k = keys; *k; ++k) { obs_map_t::value_type val(*k, observer_); observers.insert(val); } }
我们先看看OSD都关注了哪些config options:(其实很少的,除了一些log的,就10几个)
const char** OSD::get_tracked_conf_keys() const { static const char* KEYS[] = { "osd_max_backfills", "osd_min_recovery_priority", "osd_op_complaint_time", "osd_op_log_threshold", "osd_op_history_size", "osd_op_history_duration", "osd_enable_op_tracker", "osd_map_cache_size", "osd_map_max_advance", "osd_pg_epoch_persisted_max_stale", "osd_disk_thread_ioprio_class" "osd_disk_thread_ioprio_priority", // clog & admin clog "clog_to_monitors", "clog_to_syslog", "clog_to_syslog_facility", "clog_to_syslog_level", "osd_objectstore_fuse", "clog_to_graylog", "clog_to_graylog_host", "clog_to_graylog_port", "host", "fsid", "osd_client_message_size_cap", "osd_client_message_cap", NULL }; return KEYS; }
看了上面的参数列表,是不是很惊讶,就这些?难道我经常设置的参数,貌似生效了(参数值确实改变了),其实根本没有真正的生效。
我们再来看OSD的handle_conf_changes:
void OSD::handle_conf_change(const struct md_config_t *conf, const std::set <std::string> &changed) { if (changed.count("osd_max_backfills")) { service.local_reserver.set_max(cct->_conf->osd_max_backfills); service.remote_reserver.set_max(cct->_conf->osd_max_backfills); } ... }
这个函数就会根据不同的参数,调用具体的函数,将新的参数值应用到运行的系统中。
到这里,我们也基本搞清了动态更新参数的实现过程,当然,还有很多细节并没有研究。我们还是先搞清整个流程、原理,然后遇到问题,再深入细节分析。
总结
通过daemon admin socket修改参数走的是UNIX Domain Socket模式。所以,每次只能设置一个实例的参数;并且,只能在本地设置本地实例的参数。
通过tell修改参数走的是正常的命令发送流程。可以通过’*’的方式设置所有的实例的参数。
非log级别debug_类参数的动态更新,都经历了两步:设置进程内存中的参数值;调用相应的observer生效之。
log级别的debug_类参数,是直接设置参数并生效的(当然,依赖于ceph中log的实现)。
Ceph使用过程中,很多动态更新的参数,其实并没有真正的生效,修改参数时,返回的结果中有”unchangeable”,或“not observed, change may require restart”这些字串的都没生效。
关于更多文章可以阅读原文查看作者博客。
关于作者
姚国涛,有多年的OpenStack、Ceph运维、开发经验。目前就职于北京多来点科技有限公司,主要负责私有云平台中Ceph相关开发、运维工作。关注Ceph、Glusterfs等分布式存储开源技术。
Ceph中国社区
是国内唯一官方正式授权的社区,
为广大Ceph爱好者提供交流平台!
开源-创新-自强
官方网站:www.ceph.org.cn
合作邮箱:devin@ceph.org.cn
投稿地址:tougao@ceph.org.cn
长期招募热爱翻译人员,
参与社区翻译外文资料工作。






