李林超博客
首页
归档
留言
友链
动态
关于
归档
留言
友链
动态
关于
首页
大数据
正文
【转载】06.ClickHouse表引擎之外部集成表引擎
Leefs
2022-04-29 AM
2270℃
0条
[TOC] ### 前言 本篇文章转载于大佬文章:[大数据技术与数仓](https://www.modb.pro/db/79081) ### 一、概述 ClickHouse提供了许多与外部系统集成的方法,包括一些表引擎。这些表引擎与其他类型的表引擎类似,可以用于将外部数据导入到ClickHouse中,或者在ClickHouse中直接操作外部数据源。 例如直接读取HDFS的文件或者MySQL数据库的表。这些表引擎只负责元数据管理和数据查询,而它们自身通常并不负责数据的写入,数据文件直接由外部系统提供。目前ClickHouse提供了下面的外部集成表引擎: - **ODBC**:通过指定odbc连接读取数据源 - **JDBC**:通过指定jdbc连接读取数据源; - **MySQL**:将MySQL作为数据存储,直接查询其数据 - **HDFS**:直接读取HDFS上的特定格式的数据文件; - **Kafka**:将Kafka数据导入ClickHouse - **RabbitMQ**:与Kafka类似 ### 二、HDFS #### 使用方式 ```basic ENGINE = HDFS(URI, format) ``` - URI:HDFS文件路径 - format:文件格式,比如CSV、JSON、TSV等 #### 使用示例 ```sql -- 建表 CREATE TABLE hdfs_engine_table( emp_id UInt16 COMMENT '员工id', name String COMMENT '员工姓名', work_place String COMMENT '工作地点', age UInt8 COMMENT '员工年龄', depart String COMMENT '部门', salary Decimal32(2) COMMENT '工资' ) ENGINE=HDFS('hdfs://cdh03:8020/user/hive/hdfs_engine_table', 'CSV'); -- 写入数据 INSERT INTO hdfs_engine_table VALUES (1,'tom','上海',25,'技术部',20000),(2,'jack','上海',26,'人事部',10000); -- 查询数据 cdh04 :) select * from hdfs_engine_table; SELECT * FROM hdfs_engine_table ┌─emp_id─┬─name─┬─work_place─┬─age─┬─depart─┬───salary─┐ │ 1 │ tom │ 上海 │ 25 │ 技术部 │ 20000.00 │ │ 2 │ jack │ 上海 │ 26 │ 人事部 │ 10000.00 │ └────────┴──────┴────────────┴─────┴────────┴──────────┘ --再在HDFS上其对应的文件,添加几条数据,再次查看 cdh04 :) select * from hdfs_engine_table; SELECT * FROM hdfs_engine_table ┌─emp_id─┬─name───┬─work_place─┬─age─┬─depart─┬───salary─┐ │ 1 │ tom │ 上海 │ 25 │ 技术部 │ 20000.00 │ │ 2 │ jack │ 上海 │ 26 │ 人事部 │ 10000.00 │ │ 3 │ lili │ 北京 │ 28 │ 技术部 │ 20000.00 │ │ 4 │ jasper │ 杭州 │ 27 │ 人事部 │ 8000.00 │ └────────┴────────┴────────────┴─────┴────────┴──────────┘ ``` 可以看出,这种方式与使用Hive类似,我们直接可以将HDFS对应的文件映射成ClickHouse中的一张表,这样就可以使用SQL操作HDFS上的文件了。 值得注意的是:ClickHouse并不能够删除HDFS上的数据,当我们在ClickHouse客户端中删除了对应的表,只是删除了表结构,HDFS上的文件并没有被删除,这一点跟Hive的外部表十分相似。 ### 三、MySQL ClickHouse可以创建一个MySQL数据引擎,这样就可以在ClickHouse中操作其对应的数据库中的数据。其实,ClickHouse同样支持MySQL表引擎,即映射一张MySQL中的表到ClickHouse中。 #### 使用方式 ```sql CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2], ... ) ENGINE = MySQL('host:port', 'database', 'table', 'user', 'password'[, replace_query, 'on_duplicate_clause']); ``` #### 使用示例 ```sql -- 连接MySQL中clickhouse数据库的test表 CREATE TABLE mysql_engine_table( id Int32, name String ) ENGINE = MySQL( '192.168.200.241:3306', 'clickhouse', 'test', 'root', '123qwe'); -- 查询数据 cdh04 :) SELECT * FROM mysql_engine_table; SELECT * FROM mysql_engine_table ┌─id─┬─name──┐ │ 1 │ tom │ │ 2 │ jack │ │ 3 │ lihua │ └────┴───────┘ -- 插入数据,会将数据插入MySQL对应的表中 -- 所以当查询MySQL数据时,会发现新增了一条数据 INSERT INTO mysql_engine_table VALUES(4,'robin'); -- 再次查询 cdh04 :) select * from mysql_engine_table; SELECT * FROM mysql_engine_table ┌─id─┬─name──┐ │ 1 │ tom │ │ 2 │ jack │ │ 3 │ lihua │ │ 4 │ robin │ └────┴───────┘ ``` **注意**:对于MySQL表引擎,不支持UPDATE和DELETE操作,比如执行下面命令时,会报错: ```sql -- 执行更新 ALTER TABLE mysql_engine_table UPDATE name = 'hanmeimei' WHERE id = 1; -- 执行删除 ALTER TABLE mysql_engine_table DELETE WHERE id = 1; -- 报错 DB::Exception: Mutations are not supported by storage MySQL. ``` ### 四、JDBC #### 使用方式 JDBC表引擎不仅可以对接MySQL数据库,还能够与PostgreSQL等数据库。为了实现JDBC连接,ClickHouse使用了**clickhouse-jdbc-bridge**的查询代理服务。 首先我们需要下载clickhouse-jdbc-bridge,然后按照ClickHouse的github中的步骤进行编译,编译完成之后会有一个**clickhouse-jdbc-bridge-1.0.jar**的jar文件,除了需要该文件之外,还需要JDBC的驱动文件,本文使用的是MySQL,所以还需要下载MySQL驱动包。将MySQL的驱动包和**clickhouse-jdbc-bridge-1.0.jar**文件放在了/opt/softwares路径下,执行如下命令: ```shell [root@cdh04 softwares]# java -jar clickhouse-jdbc-bridge-1.0.jar --driver-path . --listen-host cdh04 ``` 其中`--driver-path`是MySQL驱动的jar所在的路径,`listen-host`是代理服务绑定的主机。默认情况下,绑定的端口是:**9019**。 然后我们再配置`/etc/clickhouse-server/config.xml`,在文件中添加如下配置,然后重启服务。 ```xml
cdh04
9019
``` #### 使用示例 - 直接查询MySQL中对应的表 ```sql SELECT * FROM jdbc( 'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe', 'clickhouse', 'test'); ``` - 创建一张映射表 ```sql -- 语法 CREATE TABLE [IF NOT EXISTS] [db.]table_name ( columns list... ) ENGINE = JDBC(dbms_uri, external_database, external_table) -- MySQL建表 CREATE TABLE jdbc_table_mysql ( order_id INT NOT NULL AUTO_INCREMENT, amount FLOAT NOT NULL, PRIMARY KEY (order_id)); INSERT INTO jdbc_table_mysql VALUES (1,200); -- 在ClickHouse中建表 CREATE TABLE jdbc_table ( order_id Int32, amount Float32 ) ENGINE JDBC( 'jdbc:mysql://192.168.200.241:3306/?user=root&password=123qwe', 'clickhouse', 'jdbc_table_mysql'); -- 查询数据 cdh04 :) select * from jdbc_table; SELECT * FROM jdbc_table ┌─order_id─┬─amount─┐ │ 1 │ 200 │ └──────────┴────────┘ ``` ### 五、Kafka #### 使用方式 ```sql CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host:port', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'[,] [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N,] [kafka_max_block_size = 0,] [kafka_skip_broken_messages = N,] [kafka_commit_every_batch = 0,] [kafka_thread_per_consumer = 0] ``` - `kafka_broker_list`:逗号分隔的brokers地址 (localhost:9092). - `kafka_topic_list`:Kafka 主题列表,多个主题用逗号分隔. - `kafka_group_name`:消费者组. - `kafka_format`– Message format. 比如`JSONEachRow`、JSON、CSV等等 #### 使用示例 在kafka中创建ck_topic主题,并向该主题写入数据 ```sql CREATE TABLE kafka_table ( id UInt64, name String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'cdh04:9092', kafka_topic_list = 'ck_topic', kafka_group_name = 'group1', kafka_format = 'JSONEachRow' ; -- 查询 cdh04 :) select * from kafka_table ; SELECT * FROM kafka_table ┌─id─┬─name─┐ │ 1 │ tom │ └────┴──────┘ ┌─id─┬─name─┐ │ 2 │ jack │ └────┴──────┘ ``` #### 注意点 当我们一旦查询完毕之后,ClickHouse会删除表内的数据,其实Kafka表引擎只是一个数据管道,我们可以通过物化视图的方式访问Kafka中的数据。 - 首先创建一张Kafka表引擎的表,用于从Kafka中读取数据 - 然后再创建一张普通表引擎的表,比如MergeTree,面向终端用户使用 - 最后创建物化视图,用于将Kafka引擎表实时同步到终端用户所使用的表中 ```sql -- 创建Kafka引擎表 CREATE TABLE kafka_table_consumer ( id UInt64, name String ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'cdh04:9092', kafka_topic_list = 'ck_topic', kafka_group_name = 'group1', kafka_format = 'JSONEachRow' ; -- 创建一张终端用户使用的表 CREATE TABLE kafka_table_mergetree ( id UInt64 , name String )ENGINE=MergeTree() ORDER BY id ; -- 创建物化视图,同步数据 CREATE MATERIALIZED VIEW consumer TO kafka_table_mergetree AS SELECT id,name FROM kafka_table_consumer ; -- 查询,多次查询,已经被查询的数据依然会被输出 cdh04 :) select * from kafka_table_mergetree; SELECT * FROM kafka_table_mergetree ┌─id─┬─name─┐ │ 2 │ jack │ └────┴──────┘ ┌─id─┬─name─┐ │ 1 │ tom │ └────┴──────┘ ``` ### 六、其他特殊的表引擎 #### Memory表引擎 Memory表引擎直接将数据保存在内存中,数据既不会被压缩也不会被格式转换。当ClickHouse服务重启的时候,Memory表内的数据会全部丢失。一般在测试时使用。 ```sql CREATE TABLE table_memory ( id UInt64, name String ) ENGINE = Memory(); ``` #### Distributed表引擎 ##### 使用方式 Distributed表引擎是分布式表的代名词,它自身不存储任何数据,数据都分散存储在某一个分片上,能够自动路由数据至集群中的各个节点,所以Distributed表引擎需要和其他数据表引擎一起协同工作。 所以,一张分布式表底层会对应多个本地分片数据表,由具体的分片表存储数据,分布式表与分片表是**一对多的关系** **Distributed表引擎的定义形式如下所示** ```sql Distributed(cluster_name, database_name, table_name[, sharding_key]) ``` 各个参数的含义分别如下: - **cluster_name**:集群名称,与集群配置中的自定义名称相对应。 - **database_name**:数据库名称 - **table_name**:表名称 - **sharding_key**:可选的,用于分片的key值,在数据写入的过程中,分布式表会依据分片key的规则,将数据分布到各个节点的本地表。 > 提示: > > 创建分布式表是**读时检查的机制**,也就是说对**创建分布式表和本地表的顺序并没有强制要求**。 > > 同样值得注意的是,在上面的语句中使用了ON CLUSTER分布式DDL,这意味着在集群的每个分片节点上,都会创建一张Distributed表,这样便可以从其中任意一端发起对所有分片的读、写请求。 ##### 使用示例 ```sql -- 创建一张分布式表 CREATE TABLE IF NOT EXISTS user_cluster ON CLUSTER cluster_3shards_1replicas ( id Int32, name String )ENGINE = Distributed(cluster_3shards_1replicas, default, user_local,id); ``` 创建完成上面的分布式表时,在每台机器上查看表,发现每台机器上都存在一张刚刚创建好的表。 接下来就需要创建本地表了,在每台机器上分别创建一张本地表: ```sql CREATE TABLE IF NOT EXISTS user_local ( id Int32, name String )ENGINE = MergeTree() ORDER BY id PARTITION BY id PRIMARY KEY id; ``` 我们先在一台机器上,对user_local表进行插入数据,然后再查询user_cluster表 ```sql -- 插入数据 cdh04 :) INSERT INTO user_local VALUES(1,'tom'),(2,'jack'); -- 查询user_cluster表,可见通过user_cluster表可以操作所有的user_local表 cdh04 :) select * from user_cluster; ┌─id─┬─name─┐ │ 2 │ jack │ └────┴──────┘ ┌─id─┬─name─┐ │ 1 │ tom │ └────┴──────┘ ``` 接下来,我们再向user_cluster中插入一些数据,观察user_local表数据变化,可以发现数据被分散存储到了其他节点上了。 ```sql -- 向user_cluster插入数据 cdh04 :) INSERT INTO user_cluster VALUES(3,'lilei'),(4,'lihua'); -- 查看user_cluster数据 cdh04 :) select * from user_cluster; ┌─id─┬─name─┐ │ 2 │ jack │ └────┴──────┘ ┌─id─┬─name──┐ │ 3 │ lilei │ └────┴───────┘ ┌─id─┬─name─┐ │ 1 │ tom │ └────┴──────┘ ┌─id─┬─name──┐ │ 4 │ lihua │ └────┴───────┘ -- 在cdh04上查看user_local cdh04 :) select * from user_local; ┌─id─┬─name─┐ │ 2 │ jack │ └────┴──────┘ ┌─id─┬─name──┐ │ 3 │ lilei │ └────┴───────┘ ┌─id─┬─name─┐ │ 1 │ tom │ └────┴──────┘ -- 在cdh05上查看user_local cdh05 :) select * from user_local; ┌─id─┬─name──┐ │ 4 │ lihua │ └────┴───────┘ ``` ### 总结 ClickHouse提供了非常多的表引擎,每一种表引擎都有各自的适用场景。通过特定的表引擎支撑特定的场景,十分灵活。
标签:
ClickHouse
非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:
https://lilinchao.com/archives/2036.html
上一篇
【转载】05.ClickHouse表引擎之MergeTree系列引擎
下一篇
07.ClickHouse之SQL操作
评论已关闭
栏目分类
随笔
2
Java
326
大数据
229
工具
31
其它
25
GO
47
NLP
4
标签云
Java阻塞队列
Spark Core
数学
数据结构和算法
数据结构
Thymeleaf
SQL练习题
Kibana
Flink
锁
ajax
机器学习
并发编程
Redis
pytorch
Spark
Http
GET和POST
Stream流
Docker
Elasticsearch
国产数据库改造
MyBatisX
容器深入研究
Ubuntu
Beego
HDFS
ClickHouse
Eclipse
Azkaban
友情链接
申请
范明明
庄严博客
Mx
陶小桃Blog
虫洞
评论已关闭