What is ClickHouse?

 ClickHouse is a column-oriented database management system (DBMS) for online analytical processing of queries (OLAP).‌‌  For more information, please refer this link: https://clickhouse.yandex/docs/en/

Data Replication

Replication is only supported for tables in the MergeTree family. MergeTree is the main engine in ClickHouse. It allows to store, query data on one server and experience all the advantages of ClickHouse without requiring any special configuration.‌‌
However, data is not fault-tolerance, do not improve reliability or accessibility on one server. Maybe, a server has issues(downtime, network, etc). That’ why, the system need to have the replica(s). It means another server(s) have a copy data which can 'substitute' the data of original server any moment.
One of ClickHouse's features uses ReplicatedMergeTree engine to have an extra copy(replica) of data. The system condition is running ZooKeeper.‌‌
   For tests, it can use one standalone ZooKeeper instance.‌‌
   For production use, it should have ZooKeeper ensemble at least of 3 servers. And tips to deployment is not run ZooKeeper on the same servers as ClickHouse. Because ZooKeeper is very sensitive for latency and ClickHouse may utilize all available system resources.‌‌
After created table uses ReplicatedMergeTree and inserted data, it is copied automatically to all the replicas. However, all the SELECTs are executed on the server that the client has connected. For example, The table has 3 replicas, but client always connect to one replica, it will not balance or share that traffic automatically among all the replicas, one server will be served and the rest will generally do nothing. To support to load balancing among multiple replicas, ClickHouse is provided the Distributed engine to internal 'load balancer' mechanism. It will balance the requests among several replicas and dependent from specific rules or preferences configuration.

Let’s example of replication: The system has 3 nodes and 1 table(set 1 shard). The goal is set up a cluster has data replicated thrice and distribute it. With information of 3 nodes:

dwh01.uiza.lab 10.148.0.11‌‌
dwh02.uiza.lab 10.148.0.12‌‌
dwh03.uiza.lab 10.148.0.13

To use replication, set the addresses of the ZooKeeper cluster in the config file(/etc/metrika.xml) on 3 nodes. Use ZooKeeper version 3.4.5 or later

<zookeeper-servers>
 <node index="1">
   <host>dwh01.uiza.lab</host>
   <port>2181</port>
 </node>
 <node index="2">
   <host>dwh02.uiza.lab</host>
   <port>2181</port>
 </node>
 <node index="3">
   <host>dwh03.uiza.lab</host>
   <port>2181</port>
 </node>
</zookeeper-servers>

Config macros with substituted values that support to create the path to the table in ZooKeeper should be unique for each replicated table.

<!--dwh01.uiza.lab-->
<macros replace="replace">
 <cluster>OLAPLab</cluster>
 <shard>01</shard>
 <replica>01</replica> <!--value is 02 on dwh02.uiza.lab, 03 on dwh03.uiza.lab-->
</macros>

Review {replica} value on 3 nodes has a difference in order to unique for each replicated table. This value is important when creating a table with intend replication data. In this example 3 macros are defined:
{cluster} - the name of the ClickHouse cluster to distinguish data between different clusters
{shard} - the shard number or symbolic reference
{replica} - the name of the replica, the value on each server is the difference.

Creating Replicated Tables

In this case, The testing store multiple copies of the data at another node with ReplicatedMergeTree engine. Notes with 2 parameters:
 zookeeper_path - The path to the table in ZooKeeper
 replica_name - The replica name in ZooKeeper

Description Engine:
ENGINE = ReplicatedMergeTree('zookeeper_path', '{replica_name}', <date_partition_column>, (sort columns), 8192)

Statement to create table run on 3 nodes:

CREATE TABLE IF NOT EXISTS table_name(
   EventDate Date,
   CounterID UInt32,
   UserID UInt32) ENGINE = ReplicatedMergeTree(
   '/clickhouse/{cluster}/tables/{shard}/hits',
    '{replica}',
   EventDate,
    (CounterID, EventDate, intHash32(UserID)),
    8192);

Now let’s check replicated tables on 3 nodes with the statement:

SELECT    database,    table,    engine,
   is_leader,    zookeeper_path,
   replica_name,    replica_path,
   total_replicas
FROM system.replicas
FORMAT Vertical

Focusing value for output, the system create table_name with ReplicatedMergeTree engine  has total_replicas is 3, replica_name is the difference with 3 value of {replica} and some values are replicas description.

Distributed

The Distributed engine does not store data itself, but allows distributed query processing on multiple servers. Reading is automatically parallelized. To use the Distributed engine you need to configure '<cluster>' settings in your ClickHouse server config file(/etc/metrika.xml).
Exactly, main configuration in file '/etc/clickhouse-server/config.xml' with element is <remote_servers incl="clickhouse_remote_servers" >. Clusters are set like this (config on 3 nodes):

<clickhouse_remote_servers>
  <!-- Test database default 1 shard and 3 replica -->
  <default_1shard_3replicas>
      <shard>
          <internal_replication>true</internal_replication>
          <replica>
              <default_database>default</default_database>
              <host>dwh01.uiza.lab</host>
              <port>9000</port>
          </replica>
          <replica>
              <default_database>default</default_database>
              <host>dwh02.uiza.lab</host>
              <port>9000</port>
          </replica>
          <replica>
              <default_database>default</default_database>
              <host>dwh03.uiza.lab</host>
              <port>9000</port>
          </replica>
      </shard>
  </default_1shard_3replicas>
</clickhouse_remote_servers>

 default_1shard_3replicas: The cluster name in the server's config file.
Clusters have 3 replicas of table default_1shard_3replicas with ReplicatedMergeTree engine. You can create a table with a Distributed engine called default_1shard_3replicas which will 'point' to all of that 3 servers, and when you will select from that default_1shard_3replicas table the select will be forwarded and executed on one of the replicas. So in that scenario, each of the replicas will get 1/3 of requests (but each request still will be fully executed on one chosen replica).

Description Engine:
ENGINE = Distributed(<cluster_name>, <database>, <shard table> [, sharding_key])

Statement to create table run on 3 nodes:

CREATE TABLE IF NOT EXISTS dtable_name(
   EventDate Date,
   CounterID UInt32,
   UserID UInt32)
ENGINE = Distributed(default_1shard_3replicas, default, table_name, rand());

After that, Data will be read from all servers in the default_1shard_3replicas cluster, from the default.table_name table located on every server in the cluster. Data is not only read, but is partially processed on the remote servers (to the extent that this is possible). For example, for a query with GROUP BY, data will be aggregated on remote servers, and the intermediate states of aggregate functions will be sent to the requestor server. Then data will be further aggregated.

Next topic

All that is great, and will work well while one copy data is fitting on one server, and can be processed by the resources of one server. How to stored/processed data when amount data have too much on one server?
End of the article, a little suggestion is ClickHouse keep the idea to solve it.