Clickhouse集群搭建以及常用语句

Clickhouse集群搭建

简介

Clickhouse是一个用于联机分析处理(OLAP)的列式数据库管理系统(columnar DBMS)。
传统数据库在数据大小比较小,索引大小适合内存,数据缓存命中率足够高的情形下能正常提供服务。但残酷的是,这种理想情形最终会随着业务的增长走到尽头,查询会变得越来越慢。你可能通过增加更多的内存,订购更快的磁盘等等来解决问题(纵向扩展),但这只是拖延解决本质问题。如果你的需求是解决怎样快速查询出结果,那么ClickHouse也许可以解决你的问题。

ClickHouse作为分析型数据库,有三大特点:一是跑分快,二是功能多,三是文艺范

应用场景:

  1. 绝大多数请求都是用于读访问的
  2. 数据需要以大批次(大于1000行)进行更新,而不是单行更新;或者根本没有更新操作
  3. 数据只是添加到数据库,没有必要修改
  4. 读取数据时,会从数据库中提取出大量的行,但只用到一小部分列
  5. 表很“宽”,即表中包含大量的列
  6. 查询频率相对较低(通常每台服务器每秒查询数百次或更少)
  7. 对于简单查询,允许大约50毫秒的延迟
  8. 列的值是比较小的数值和短字符串(例如,每个URL只有60个字节)
  9. 在处理单个查询时需要高吞吐量(每台服务器每秒高达数十亿行)
  10. 不需要事务
  11. 数据一致性要求较低
  12. 每次查询中只会查询一个大表。除了一个大表,其余都是小表
  13. 查询结果显著小于数据源。即数据有过滤或聚合。返回结果不超过单个服务器内存大小

本身的限制:

  1. 不支持真正的删除/更新支持 不支持事务(期待后续版本支持)
  2. 不支持二级索引
  3. 有限的SQL支持,join实现与众不同
  4. 不支持窗口功能
  5. 元数据管理需要人工干预维护

安装

rpm安装

1
2
3
4
5
grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not supported"
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-19.4.0-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-server-common-19.4.0-2.noarch.rpm
wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-server-19.4.0-2.noarch.rpm
rpm -ivh *

yum安装

1
2
3
4
curl -s https://packagecloud.io/install/repositories/altinity/clickhouse/script.rpm.sh | sudo bash
yum list 'clickhouse*'
yum install -y clickhouse-server clickhouse-client
yum list installed 'clickhouse*'

单机配置

1
2
[root@host201 scripts]# ls /etc/clickhouse-server/
config.xml users.xml

vi config.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 设置log位置
<log>/data/clickhouse/server.log</log>
<errorlog>/data/clickhouse/error.log</errorlog>
# 修改host
<interserver_http_host>host201</interserver_http_host>
# 设置外网访问
<listen_host>0.0.0.0</listen_host>
# 设置数据存在路径
<path>/data/clickhouse/</path>
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/data/clickhouse/tmp/</tmp_path>
<!-- Directory with user provided files that are accessible by 'file' table function. -->
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
# 设置时区
<timezone>Asia/Shanghai</timezone>
# 在后面添加集群信息
<include_from>/data/metrika.xml</include_from>

分布式集群

CK是如何实现分布式的

  • CK的分布式,完全依赖配置文件,即每个节点,都共享同样的配置文件,这个配置文件里,写了我跟谁是一个cluster的,我自己的名字是啥
  • 如下面的配置文件里,有4个分片,各自用域名来标记,如果需要密码的话,集群也要写上明文密码和用户名

  • 集群怎么用?

    • 答案是指定引擎
    • CK里的引擎有十几个,这里只推荐3个:
      • MergeTree,是CK里最Advanced的引擎,性能超高,单机写入可以达到50w峰值,查询性能非常快,有兴趣看我其他文章
      • ReplicatedMergeTree,基于MergeTree,同时引入ZK,做了复制,下文会说
      • Distributed,分布式引擎,本身不存储数据,可认为就是一张View,如果写入,会把请求丢到集群里的节点(有算法控制),如果查询,会帮你做查询转发再聚合返回
  • 集群文件的配置
    vi metrika.xml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    <yandex>
    <clickhouse_remote_servers>
    <!-- 集群名称 -->
    <ck_cluster>
    <!-- 数据分片1 -->
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host201</host>
    <port>9000</port>
    </replica>
    <!--
    <replica>
    <default_database>shardt</default_database>
    <host>host202</host>
    <port>9000</port>
    </replica>
    -->
    </shard>
    <!-- 数据分片2 -->
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host202</host>
    <port>9000</port>
    </replica>
    <!--
    <replica>
    <default_database>shardt</default_database>
    <host>host203</host>
    <port>9000</port>
    </replica>
    -->
    </shard>
    <!-- 数据分片2 -->
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host203</host>
    <port>9000</port>
    </replica>
    <!--
    <replica>
    <default_database>shardt</default_database>
    <host>host205</host>
    <port>9000</port>
    </replica>
    -->
    </shard>
    <!-- 数据分片2 -->
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host205</host>
    <port>9000</port>
    </replica>
    <!--
    <replica>
    <default_database>shardt</default_database>
    <host>host201</host>
    <port>9000</port>
    </replica>
    -->
    </shard>
    </ck_cluster>
    <bip_ck_cluster2>
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host202</host>
    <port>9000</port>
    </replica>
    </shard>
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host203</host>
    <port>9000</port>
    </replica>
    </shard>
    <shard>
    <internal_replication>false</internal_replication>
    <replica>
    <default_database>shardt</default_database>
    <host>host205</host>
    <port>9000</port>
    </replica>
    </shard>
    </bip_ck_cluster2>
    </clickhouse_remote_servers>
    <!-- 监听网络(貌似重复) -->
    <networks>
    <ip>::/0</ip>
    </networks>
    <!-- ZK -->
    <zookeeper-servers>
    <node index="1">
    <host>host201</host>
    <!-- clickhouse 与 zookeeper的连接,也就要改成了host201 -->
    <port>2181</port>
    </node>
    <node index="2">
    <host>host202</host>
    <!-- clickhouse 与 zookeeper的连接 也就要改成了host202 -->
    <port>2181</port>
    </node>
    <node index="3">
    <host>host203</host>
    <!-- clickhouse 与 zookeeper的连接 也就要改成了host203 -->
    <port>2181</port>
    </node>
    <node index="4">
    <host>host205</host>
    <!-- clickhouse 与 zookeeper的连接 也就要改成了host205 -->
    <port>2181</port>
    </node>
    </zookeeper-servers>
    <clickhouse_compression>
    <case>
    <min_part_size>10000000000</min_part_size>
    <min_part_size_ratio>0.01</min_part_size_ratio>
    <method>lz4</method>
    </case>
    </clickhouse_compression>
    </yandex>

简单分布式方案

  • MergeTree + Distributed

    1
    2
    3
    4
    CREATE TABLE dbname.tablename (date DateTime, ……) ENGINE = MergeTree()
    PARTITION BY toYYYYMMDD(date)

    CREATE TABLE dbname.tablename_all (date DateTime, ……) ENGINE = Distributed(ck_cluster, 'dbname', 'tablename', rand())"

    说明:

    • dbname.tablename为本地表,数据只是在本地
    • dbname.tablename_all为分布式表,查询这个表,引擎自动把整个集群数据计算后返回

分布式+高可用方案

  • ReplicatedMergeTree + Distributed
  • 仅仅是把MergeTree引擎替换为ReplicatedMergeTree引擎
  • ReplicatedMergeTree里,共享同一个ZK路径的表,会相互,注意是,相互同步数据

    1
    2
    3
    CREATE TABLE dbname.tablename (date DateTime, ……) ENGINE = ReplicatedMergeTree('/clickhouse/db/tb/name', 'node_name', date, (date, hour, datetime), 8192)

    CREATE TABLE dbname.tablename_all (date DateTime, ……) ENGINE = Distributed(ck_cluster, 'dbname', 'tablename', rand())"
  • 每个IDC有3个分片,各自占1/3数据

  • 每个节点,依赖ZK,各自有2个副本,这样,就不怕宕机啦~

    CK分布式的问题

  • 数据写入时,该写入哪张表写
    1. 可以写xxx_all,也可以写xxx本地表
    2. 前者由于分布式表的逻辑简单,仅仅是转发请求,所以在转发安全性上,会有风险,并且rand的方式,可能会造成不均衡,建议通过DNS轮训,写本地表,这样最保险和均衡
  • 数据查询时,我们读取哪张表
    1. 毫无疑问,是xxx_all表
  • 集群配置里,我们用了域名,本想着方便切换,但是CK只有在启动的时候,才会做解析
  • 那故障了怎么切换?
    1. CK有一个厉害的地方,节点变动,无需重启,会自动加载
    2. 利用上述特性,我们先去掉一个节点的配置,再加上这个节点的配置(DNS变更后),即可不重启就完成fail over

CK的启动

1
2
3
4
sudo /etc/init.d/clickhouse-server strart/stop/restart

# 如果修改了域名 请以域名登录
clickhouse-client -h host201

常用SQL总结

  1. 创建表

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    DROP TABLE IF EXISTS shardt.nc2csv; 
    CREATE TABLE shardt.nc2csv (
    id Int64,
    province_name String,
    city_name String,
    product_time Datetime,
    lon Float64,
    lat Float64,
    u10 Float64,
    v10 Float64,
    t2 Float64,
    rainc Float64,
    q2 Float64,
    psfc Float64,
    cfract Float64,
    ws Float64,
    wd Float64
    )ENGINE = MergeTree()
    PARTITION BY toYYYYMMDD(product_time)
    PRIMARY Key (id, product_time, province_name, city_name)
    ORDER BY (id, product_time, province_name, city_name)
    SETTINGS index_granularity = 8192;
  2. 导入CSV数据

    1
    2
    3
    time cat test.csv | clickhouse-client -h host201 --port 9000 --format_csv_delimiter="," --input_format_allow_errors_num=1 --input_format_allow_errors_ratio=0.1 --query="INSERT INTO shardt.nc2csv FORMAT CSVWithNames"
    # 或者
    time clickhouse-client -h host201 --port 9000 --format_csv_delimiter="," --input_format_allow_errors_num=1 --input_format_allow_errors_ratio=0.1 --query="INSERT INTO shardt.nc2csv FORMAT CSVWithNames" < test.csv

    说明:

    • 指定分隔符: –format_csv_delimiter=”,”
    • 导入数据时忽略错误:
      • --input_format_allow_errors_num : 是允许的错误数
      • --input_format_allow_errors_ratio : 是允许的错误率, 范围是 [0-1]
    • CSV文件有列名: FORMAT CSVWithNames
  3. 导出 CSV 数据

    1
    clickhouse-client -h host201 --query="select * from shardt.nc2csv_2  WHERE product_time = (toDateTime('2018-03-04 04:00:00')) limit 10 format CSV" > test.csv
  4. 重命名表

    1
    rename table tbl1 to btl2;
  5. 删除表

    1
    drop table tbl;

    注意: 默认情况下, Clickhouse不允许删除分区或表的大小大于 50GB的分区或表. 可以通过修改server的配置文件来永久配置. 也可以临时设置一下来删除而不用重启服务.

    • 永久配置

      1
      2
      3
      4
      5
      sudo vim /etc/clickhouse-server/config.xml
      然后注释掉下面两行
      <!-- <max_table_size_to_drop>0</max_table_size_to_drop> -->
      <!-- <max_partition_size_to_drop>0</max_partition_size_to_drop> -->
      0表示不限制. 或者你可以设置为你想限制的最大的大小.
    • 临时设置

      1
      sudo touch '/home/username/clickhouse/flags/force_drop_table' && sudo chmod 666 '/home/username/clickhouse/flags/force_drop_table'
  6. 添加列

    1
    alter table dsp_statis add column cost UInt32 default 0;
  7. 查看表结构

    1
    desc tbl;
  8. 查看集群信息

    1
    select * from system.clusters;
  9. 执行SQL文件

    1
    clickhouse-client -h host205 -mn < create_cluster.sql
  10. 查看分区信息

    1
    2
    3
    4
    5
    SELECT partition, name, count() AS number_of_parts, formatReadableSize(sum(bytes)) AS sum_size
    FROM system.parts
    WHERE active AND (database = 'xxx') AND (table = 'xxx')
    GROUP BY partition, name
    ORDER BY partition ASC;
  11. 查看表大小

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    SELECT
    database,
    table,
    formatReadableSize(size) AS size,
    formatReadableSize(bytes_on_disk) AS bytes_on_disk,
    formatReadableSize(data_uncompressed_bytes) AS data_uncompressed_bytes,
    formatReadableSize(data_compressed_bytes) AS data_compressed_bytes,
    compress_rate,
    rows,
    days,
    formatReadableSize(avgDaySize) AS avgDaySize
    FROM
    (
    SELECT
    database,
    table,
    sum(bytes) AS size,
    sum(rows) AS rows,
    min(min_date) AS min_date,
    max(max_date) AS max_date,
    sum(bytes_on_disk) AS bytes_on_disk,
    sum(data_uncompressed_bytes) AS data_uncompressed_bytes,
    sum(data_compressed_bytes) AS data_compressed_bytes,
    (data_compressed_bytes / data_uncompressed_bytes) * 100 AS compress_rate,
    max_date - min_date AS days,
    size / (max_date - min_date) AS avgDaySize
    FROM system.parts
    WHERE active
    GROUP BY
    database,
    table
    ORDER BY
    database ASC,
    size DESC
    )