Comeback the post, the system will work well while one copy data is fitting on one server, and can be processed by the resources of one server. How to stored/processed data when amount data have too much on one server?

To solve the above problem, the data must split in order to smaller parts that are calling sharding. Sharding is the mechanism also provided by the Distributed engine on ClickHouse. The system uses sharding data to divide into parts (shards) according to some sharding key. Each shard hold and process a part of data, when it gets the result by querying from multi-shard and combined the data coming from the different shard.‌‌

Let's review a case study: The system has an amount of data that is not stored and process in one server. The working here needs to build a cluster to distributed with replication data to execute queries even in the case of a node failure. In this article will discuss how to set up circular replication in ClickHouse.

Overview

The Concepts are 3 nodes and 1 table. The goal is to have data distributed in 3 shards and replicated twice. That requires 2 different shards located on every node.

Circular replication

Cluster Configuration

Let’s start with a straightforward cluster configuration that defines 3 shards and 2 replicas. Since the system has only 3 nodes to work with, we will setup replica hosts in a “Circle” manner meaning we will use the first and the second node for the first shard, the second and the third node for the second shard and the third and the first node for the third shard. Just like so:

Insight Circle

‌One more important note about using “Circle” topology with ClickHouse is that you should set an 'internal_replication' option for each particular shard to TRUE. Each shard can have the 'internal_replication' parameter defined in the config file.
If this parameter is set to 'true', the write operation selects the first healthy replica and writes data to it. Use this alternative if the Distributed table "looks at" replicated tables. In other words, if the table where data will be written is going to replicate them itself.
If it is set to 'false' (the default), data is written to all replicas. In essence, this means that the Distributed table replicates data itself. This is worse than using replicated tables, because the consistency of replicas is not checked, and over time they will contain slightly different data.

Let's follow the configuration of the case study. The configuration section may look like this:(/etc/metrika.xml)

<clickhouse_remote_servers>
‌‌  <dwh_3shards_2replicas>
‌‌    <shard>
‌‌     <internal_replication>true</internal_replication>‌‌
   <replica>
‌‌        <default_database>dwh01</default_database>‌‌
       <host>dwh01.uiza.lab</host>
‌‌        <port>9000</port>‌‌
   </replica>
‌‌    <replica>
‌‌        <default_database>dwh01</default_database>
‌‌        <host>dwh02.uiza.lab</host>‌‌
       <port>9000</port>
‌‌    </replica>
‌‌    </shard>‌‌
   <shard>
‌‌        <internal_replication>true</internal_replication>‌‌
       <replica>
‌‌            <default_database>dwh02</default_database>
‌‌            <host>dwh02.uiza.lab</host>
‌‌            <port>9000</port>
‌‌        </replica>‌‌
       <replica>
‌‌            <default_database>dwh02</default_database>‌‌
           <host>dwh03.uiza.lab</host>‌‌
           <port>9000</port>‌‌
       </replica>
‌‌    </shard>
‌‌    <shard>
‌‌        <internal_replication>true</internal_replication>‌‌
       <replica>‌‌
           <default_database>dwh03</default_database>
‌‌            <host>dwh01.uiza.lab</host>
‌‌            <port>9000</port>
‌‌        </replica>
‌‌        <replica>
‌‌            <default_database>dwh03</default_database>‌‌
           <host>dwh03.uiza.lab</host>
‌‌            <port>9000</port>
‌‌        </replica>
‌‌    </shard>
‌‌  </dwh_3shards_2replicas>
‌‌</clickhouse_remote_servers>

The configuration about replication and sharding is very important because cluster need the path to the table in ZooKeeper should be unique for each replicated table. Tables on different shards should have different paths. The parameters about dwhshard0* and dwhreplica0* should focus the value, it is the identifier for path set in Zookeeper. Config macros like that:

<!--Config on dwh01.uiza.lab-->
<macros replace="replace">
  <cluster>OLAPLab</cluster>
  <dwhshard00>01</dwhshard00>
  <dwhshard01>03</dwhshard01>
  <dwhreplica00>01</dwhreplica00>
  <dwhreplica01>02</dwhreplica01>
</macros>

<!--Config on dwh02.uiza.lab-->
<macros replace="replace">
  <cluster>OLAPLab</cluster>
  <dwhshard00>02</dwhshard00>
  <dwhshard01>01</dwhshard01>
  <dwhreplica00>01</dwhreplica00>
  <dwhreplica01>02</dwhreplica01>
</macros>

<!--Config on dwh03.uiza.lab-->
<macros replace="replace">
  <cluster>OLAPLab</cluster>
  <dwhshard00>03</dwhshard00>
  <dwhshard01>02</dwhshard01>
  <dwhreplica00>01</dwhreplica00>
  <dwhreplica01>02</dwhreplica01>
</macros>

Table Schema

Now let’s setup replicated tables for shards. ReplicatedMergeTree table definition requires two important parameters:
Table Shard path in Zookeeper
Replica value
Notes: Zookeeper path should be unique for every shard, and Replica value should be unique within each particular shard

Statement to create table run on 3 nodes:

CREATE TABLE IF NOT EXISTS default.table_name_schema(
   EventDate Date,
   CounterID UInt32,
   UserID UInt32) ENGINE = MergeTree(
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

The purpose create table_name_schema with engine MergeTree to support processing create table easily.

## On dwh01.uiza.lab
CREATE TABLE IF NOT EXISTS dwh01.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard00}/table_name',
   '{dwhreplica00}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

CREATE TABLE IF NOT EXISTS dwh03.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard01}/table_name',
   '{dwhreplica01}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

## On dwh02.uiza.lab
CREATE TABLE IF NOT EXISTS dwh02.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard00}/table_name',
   '{dwhreplica00}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

CREATE TABLE IF NOT EXISTS dwh01.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard01}/table_name',
   '{dwhreplica01}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

## On dwh03.uiza.lab
CREATE TABLE IF NOT EXISTS dwh03.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard00}/table_name',
   '{dwhreplica00}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

CREATE TABLE IF NOT EXISTS dwh02.table_name AS default.table_name_schema
ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{dwhshard01}/table_name',
   '{dwhreplica01}',
   EventDate,
   (CounterID, EventDate, intHash32(UserID)),
   8192);

Create Distributed Table Schema on 3 nodes

## On 3 nodes
CREATE TABLE IF NOT EXISTS default.dtable_name AS default.table_name_schema
ENGINE = Distributed(
   'dwh_3shards_2replicas',
   'default',
   table_name,
   rand());

After creating table_name and dtable_name on 3 nodes, the cluster has 3 shards with 2 replicas for each shard. If the client selects from a distributed table, it just read data from one replica per shard and merges result.

Conclusion

Through the article, the hoping will help better understand the sharding and distribution mechanism on ClickHouse. Giving the solution to solve node failure and guarantee all the replicas will have 100% the same data.

Everything hears is good when create a solution about circular replication. However,  the performance and configuration concerns need to consider and benchmark in order to apply to the system.