CREATE TABLE {teble} ON CLUSTER {cluster}
AS {local_table}
ENGINE= Distributed({cluster}, {database}, {local_table},{policy})
Distributed表 Insert 原理
本文主要是对Distributed表如何写入及如何分发做一下分析,略过SQL的词法解析、语法解析等步骤,从写入流开始,其构造方法如下:
DistributedBlockOutputStream(const Context & context_, StorageDistributed &
storage_, const ASTPtr & query_ast_, const ClusterPtr & cluster_, bool
insert_sync_, UInt64 insert_timeout_);
可以发现每个shard对应的目录名是{darabse}@{hostname}:{tcpPort}的格式,如果多个副本会用,分隔。并且每个shard目录中还有个tmp目录,这个目录的设计在writeToShard()方法中做了解释,是为了避免数据文件在没写完就被发送到远端。
数据文件在本地写入的过程中会先写入tmp路径中,写完后通过硬链接link到shard目录,保证只要在shard目录中出现的数据文件都是完整写入的数据文件。
数据文件的命名是通过全局递增的数字加.bin命名,是为了在后续分发到远端节点保持顺序性。
5、数据如何分发到各个节点
细心的你可能已经发现在writeToShard()方法中有个requireDirectoryMonitor(),这个方法就是将shard目录注册监听,并通过专用类StorageDistributedDirectoryMonitor来实现数据文件的分发,根据不同配置可以实现逐一分发或批量分发。并且包含对坏文件的容错处理。
sharding_key_expr = buildShardingKeyExpression(sharding_key_, global_context,
getColumns().getAllPhysical(), false);
const ExpressionActionsPtr & getShardingKeyExpr() const { return
sharding_key_expr; }
所以说sharding_key_expr最终主要就是由sharding_key决定的。
一般情况下getShardingKeyExpr()方法都为true,如果再满足shard数量大于1,就会对block进行拆分,由splitBlock()方法主要逻辑就是创建selector并使用selector进行切割,大致逻辑如下:
Distributed表在写入时会在本地节点生成临时数据,会产生写放大,所以会对CPU及内存造成一些额外消耗,建议尽量少使用Distributed表进行写操作; Distributed表写的临时block会把原始block根据sharding_key和weight进行再次拆分,会产生更多的block分发到远端节点,也增加了merge的负担; Distributed表如果是基于表函数创建的,一般是同步写,需要注意。
Distributed表 Select 流程
void SelectStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
// 构造一个本地流方法
auto emplace_local_stream = [&]()
{
res.emplace_back(createLocalStream(query_ast, context, processed_stage));
};
// 构造一个远程流方法
auto emplace_remote_stream = [&]()
{
auto stream = std::make_shared<RemoteBlockInputStream>(shard_info.pool, query, header, context, nullptr, throttler, external_tables, processed_stage);
stream->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
stream->setMainTable(main_table);
res.emplace_back(std::move(stream));
};
// 获取settings配置
const auto & settings = context.getSettingsRef();
// prefer_localhost_replica默认为true,如果shard_info还是本地分片,进入以下逻辑
if (settings.prefer_localhost_replica && shard_info.isLocal())
{
StoragePtr main_table_storage;
// 根据是不是表函数方式使用不同逻辑获取main_table_storage,即一个IStorage
// 其中表函数是指那些虚的表引擎,例如file、merge、remote、url、mysql这一类的
// 因为这一类是不在本地存放数据的,需要另一种验证库表存在的方式
if (table_func_ptr)
{
const auto * table_function = table_func_ptr->as<ASTFunction>();
TableFunctionPtr table_function_ptr = TableFunctionFactory::instance().get(table_function->name, context);
main_table_storage = table_function_ptr->execute(table_func_ptr, context, table_function_ptr->getName());
}
else
main_table_storage = context.tryGetTable(main_table.database, main_table.table);
// 如果main_table_storage不存在,就尝试去其他server获取
if (!main_table_storage)
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionMissingTable);
if (shard_info.hasRemoteConnections())
{
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"There is no table " << main_table.database << "." << main_table.table
<< " on local replica of shard " << shard_info.shard_num << ", will try remote replicas.");
// 这里就是应用上面声明的匿名方法,使用远程流
emplace_remote_stream();
}
else
// 同理,使用本地流
// 当然这里已经没有获取到本地StoragePtr,所以也一定会失败
emplace_local_stream();
return;
}
// 尝试将main_table_storage指针转为replicated*类型指针
const auto * replicated_storage = dynamic_cast<const StorageReplicatedMergeTree *>(main_table_storage.get());
// 如果不是ReplicatedMergeTree引擎表,使用本地server,如果是就要考虑各个副本的
// 延迟情况,如果延迟不满足会在去寻找其他副本
if (!replicated_storage)
{
emplace_local_stream();
return;
}
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
// 如果没设置最大延迟,依旧选择本地副本查询
if (!max_allowed_delay)
{
emplace_local_stream();
return;
}
UInt32 local_delay = replicated_storage->getAbsoluteDelay();
// 如果设置了最大延迟且本地延迟小于最大延迟,本地副本依然有效,选择本地副本
// 这里的本地延迟时间是每次查询时不断更新的
// 包含min_unprocessed_insert_time和max_processed_insert_time
// 这两个值在zk的tables/{num}/replicas/{hostname}/路径下记录的
// 这里的用意也是为了间接判断本机的负载情况
if (local_delay < max_allowed_delay)
{
emplace_local_stream();
return;
}
// 如果以上逻辑都没有进入,说明本地延迟已经大于设置的最大延迟参数了,会执行以下代码
ProfileEvents::increment(ProfileEvents::DistributedConnectionStaleReplica);
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"Local replica of shard " << shard_info.shard_num << " is stale (delay: " << local_delay << "s.)");
// 如果没有这是fallback,就不能使用本地副本,去尝试获取远程副本
if (!settings.fallback_to_stale_replicas_for_distributed_queries)
{
if (shard_info.hasRemoteConnections())
{
emplace_remote_stream();
return;
}
else
throw Exception(
"Local replica of shard " + toString(shard_info.shard_num)
+ " is stale (delay: " + toString(local_delay) + "s.), but no other replica configured",
ErrorCodes::ALL_REPLICAS_ARE_STALE);
}
// 如果没有远程副本可选,而且设置了fallback,则才会选择本地副本
if (!shard_info.hasRemoteConnections())
{
emplace_local_stream();
return;
}
// 构造lazily_create_stream方法,避免在主线程中进行连接
auto lazily_create_stream = [
pool = shard_info.pool, shard_num = shard_info.shard_num, query, header = header, query_ast, context, throttler,
main_table = main_table, table_func_ptr = table_func_ptr, external_tables = external_tables, stage = processed_stage,
local_delay]()
-> BlockInputStreamPtr
{
auto current_settings = context.getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(
current_settings).getSaturated(
current_settings.max_execution_time);
std::vector<ConnectionPoolWithFailover::TryResult> try_results;
try
{
// 这里会去连接池中获取远端的entry,entry中包含着请求其他server的必要信息
// getManyForTableFunction和getManyChecked方法最后都会调用getManyImpl
// 只不过传入的TryGetEntry不同,在tryGetEntry中会去检查远端server的表的状态
// 并且检查远端server中分布式表所使用到的表的延迟情况,
// 以is_up_to_date(bool)表示
if (table_func_ptr)
try_results = pool->getManyForTableFunction(timeouts, ¤t_settings, PoolMode::GET_MANY);
else
try_results = pool->getManyChecked(timeouts, ¤t_settings, PoolMode::GET_MANY, main_table);
}
catch (const Exception & ex)
{
if (ex.code() == ErrorCodes::ALL_CONNECTION_TRIES_FAILED)
LOG_WARNING(
&Logger::get("ClusterProxy::SelectStreamFactory"),
"Connections to remote replicas of local shard " << shard_num << " failed, will use stale local replica");
else
throw;
}
double max_remote_delay = 0.0;
for (const auto & try_result : try_results)
{
if (!try_result.is_up_to_date)
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
}
// 下面是将得到的result进行聚合
if (try_results.empty() || local_delay < max_remote_delay)
return createLocalStream(query_ast, context, stage);
else
{
std::vector<IConnectionPool::Entry> connections;
connections.reserve(try_results.size());
for (auto & try_result : try_results)
connections.emplace_back(std::move(try_result.entry));
return std::make_shared<RemoteBlockInputStream>(
std::move(connections), query, header, context, nullptr, throttler, external_tables, stage);
}
};
res.emplace_back(std::make_shared<LazyBlockInputStream>("LazyShardWithLocalReplica", header, lazily_create_stream));
}
else
emplace_remote_stream();
}
PoolWithFailoverBase<TNestedPool>::getMany(
size_t min_entries, size_t max_entries, size_t max_tries,
const TryGetEntryFunc & try_get_entry,
const GetPriorityFunc & get_priority,
bool fallback_to_stale_replicas)
{
......
std::string fail_messages;
bool finished = false;
while (!finished)
{
for (size_t i = 0; i < shuffled_pools.size(); ++i)
{
if (up_to_date_count >= max_entries
|| entries_count + failed_pools_count >= nested_pools.size())
{
finished = true;
break;
}
ShuffledPool & shuffled_pool = shuffled_pools[i];
TryResult & result = try_results[i];
if (shuffled_pool.error_count >= max_tries || !result.entry.isNull())
continue;
std::string fail_message;
// 这里就是调用了上面提到的TryGetEntryFunc方法来真正的获取entry
result = try_get_entry(*shuffled_pool.pool, fail_message);
if (!fail_message.empty())
fail_messages += fail_message + '\n';
if (!result.entry.isNull())
{
++entries_count;
if (result.is_usable)
{
++usable_count;
if (result.is_up_to_date)
++up_to_date_count;
}
}
else
{
LOG_WARNING(log, "Connection failed at try №"
<< (shuffled_pool.error_count + 1) << ", reason: " << fail_message);
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailTry);
shuffled_pool.error_count = std::min(max_error_cap, shuffled_pool.error_count + 1);
if (shuffled_pool.error_count >= max_tries)
{
++failed_pools_count;
ProfileEvents::increment(ProfileEvents::DistributedConnectionFailAtAll);
}
}
}
}
if (usable_count < min_entries)
throw DB::NetException(
"All connection tries failed. Log: \n\n" + fail_messages + "\n",
DB::ErrorCodes::ALL_CONNECTION_TRIES_FAILED);
try_results.erase(
std::remove_if(
try_results.begin(), try_results.end(),
[](const TryResult & r) { return r.entry.isNull() || !r.is_usable; }),
try_results.end());
// 以下代码主要是对结果进行排序
std::stable_sort(
try_results.begin(), try_results.end(),
[](const TryResult & left, const TryResult & right)
{
return std::forward_as_tuple(!left.is_up_to_date, left.staleness)
< std::forward_as_tuple(!right.is_up_to_date, right.staleness);
});
......
return try_results;
}
近期文章推荐:
更多精彩内容欢迎关注公众号
文章转载自 ClickHouse周边,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。