ClickHouse 對 RabbitMQ:推送篇
原由
我們的需求主要源於對Change Data Capture(CDC)
功能的需求,這種功能允許我們實時捕獲和處理資料更改。CDC
在現代資料處理中變得越來越重要,因為它可以用於許多關鍵業務案例,如實時監控、資料同步和資料分析。
為了實現CDC
,我們尋找了合適的工具和技術,而ClickHouse
提供的Publish
到RabbitMQ
的Engine
正是我們所需要的功能。這個Engine
允許我們將資料更改以實時方式發布到RabbitMQ
消息隊列,然後可以對這些消息進行進一步處理、分析或同步到其他資料存儲系統。這使我們能夠快速且可靠地響應資料更改,並滿足我們的業務需求。
總而言之,選擇了ClickHouse
的CDC
功能,因為它能夠有效地實現我們的實時資料捕獲需求,並為我們提供了一個強大的工具,用於處理資料更改,並支持多種關鍵業務用例。
Docker Compose
- docker-compose.yml關於
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100version: "3.0"
services:
clickhouse01:
container_name: clickhouse-01
image: clickhouse/clickhouse-server:23.5.3.24
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
depends_on:
- zookeeper
volumes:
- "./clickhouse-01:/var/lib/clickhouse"
- "./clickhouse-server-01:/etc/clickhouse-server"
- "./clickhouse-log-01:/var/log/clickhouse-server"
ports:
- "8123:8123"
- "9011:9011"
- "9004:9004" # mysql
- "9005:9005" # postgres
networks:
- clickhouse-cluster
clickhouse02:
container_name: clickhouse-02
image: clickhouse/clickhouse-server:23.5.3.24
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
depends_on:
- zookeeper
volumes:
- "./clickhouse-02:/var/lib/clickhouse"
- "./clickhouse-server-02:/etc/clickhouse-server"
- "./clickhouse-log-02:/var/log/clickhouse-server"
ports:
- "8223:8223"
- "9211:9211"
- "9204:9204" # mysql
- "9205:9205" # postgres
networks:
- clickhouse-cluster
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zkserver
deploy:
resources:
limits:
memory: 512M
ports:
- "2181:2181"
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 5000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
volumes:
- ./zookeeper_data:/var/lib/zookeeper/data
- ./zookeeper_log:/var/lib/zookeeper/log
- ./zookeeper_secrets:/etc/zookeeper/secrets
networks:
- clickhouse-cluster
tabix-web-client:
container_name: tabix-web-client
image: spoonest/clickhouse-tabix-web-client
depends_on:
- clickhouse01
- clickhouse02
ports:
- "8080:80"
networks:
- clickhouse-cluster
rabbitmq:
image: rabbitmq:3.12-management
hostname: rabbitmq
container_name: rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=guest
- RABBITMQ_DEFAULT_PASS=guest
volumes:
- ./rabbitmq/data:/var/lib/rabbitmq
- ./rabbitmq/log:/var/log/rabbitmq
ports:
- "5672:5672"
- "15672:15672"
depends_on:
- clickhouse01
- clickhouse02
networks:
- clickhouse-cluster
networks:
clickhouse-cluster:Docker compose
的詳細說明與相關的設定檔案配置,請參閱Docker Compose - ClickHouse (cluster - CK)與Docker Compose - RabbitMQ。
流程示意圖
Run & Test
此處是使用瀏覽器開啟
Clickhouse-Web-Client
+RabbitMQ-WebUI
進行測試。1
2
3
4
5# Clickhouse-Web-Client
http://127.0.0.1:8080
# RabbitMQ-WebUI
http://127.0.0.1:15672首先於
clickhouse-01
和clickhouse-02
建立資料庫、分布式表、RabbitMQ Engine
和MATERIALIZED View
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42-- 創建 test 資料庫,如果不存在
CREATE DATABASE IF NOT EXISTS test;;
-- 選擇使用 test 資料庫
USE test;;
-- 創建名為 'event' 的表格
CREATE TABLE IF NOT EXISTS test.event (
`id` UInt32, -- 事件 ID
`body` String, -- 事件內容
`update_at` DateTime -- 更新時間
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/test/event', '{replica}', update_at)
ORDER BY update_at;;
-- 創建名為 'rabbitmq_entry' 的表格
CREATE TABLE IF NOT EXISTS test.rabbitmq_entry
(
`id` UInt32, -- 事件 ID
`body` String, -- 事件內容
`update_at` DateTime -- 更新時間
) ENGINE = RabbitMQ -- 使用 RabbitMQ 引擎
SETTINGS
rabbitmq_host_port = 'rabbitmq:5672', -- RabbitMQ 伺服器的主機和端口
rabbitmq_exchange_name = 'clickhouse-exchange', -- RabbitMQ 交換機名稱
rabbitmq_routing_key_list = 'my_routing_key', -- RabbitMQ 路由鍵
rabbitmq_format = 'JSONEachRow', -- RabbitMQ 資料的格式為JSONEachRow
rabbitmq_exchange_type = 'fanout', -- RabbitMQ 交換機類型為fanout
rabbitmq_num_consumers = 1, -- RabbitMQ 消費者數量為1
rabbitmq_persistent = 1, -- RabbitMQ 消息持久性為1
rabbitmq_queue_base = 'hello', -- RabbitMQ 佇列基礎名稱
rabbitmq_username = 'guest', -- RabbitMQ 用戶名
rabbitmq_password = 'guest'; -- RabbitMQ 密碼
-- 創建名為 'v_event_to_rabbitmq' 的物化視圖,將 'event' 表的據映射到 'rabbitmq_entry' 表
CREATE MATERIALIZED VIEW IF NOT EXISTS test.v_event_to_rabbitmq TO test.rabbitmq_entry
AS
SELECT
"id", -- 事件 ID
"body", -- 事件內容
"update_at" -- 更新時間
FROM test.event;註:關於 RabbitMQ ENGINE 的 SETTINGS 可以參考RabbitMQ Engine - Optional parameters
使用
Insert into
語法,寫入資料至event
表內1
2
3INSERT INTO test.event
(id, body, update_at)
VALUES(1, 'foo-bar', now());;後續使用
RabbitMQ-WebUI
進行取得訊息,確定訊息是有被推送。
註:以上參考了
Docker
ClickHouse
ClickHouse - RabbitMQ Engine
博客園 - 渐逝的星光 - clickhouse使用rabbitmq进行实时订阅消费
知乎 - 张琼芳 - 理解 RabbitMQ Exchange