Like Share Discussion Bookmark Smile

J.J. Huang   2023-10-19   Clickhouse RabbitMQ   瀏覽次數:次   DMCA.com Protection Status

ClickHouse 對 RabbitMQ:問題篇

前言

前兩篇示範了如何使用ClickHouse來消費/推送RabbitMQ,這是一個強大的組合,可用於處理大量的實時資料和執行複雜的資料分析。然而,所提供的示範僅是一個簡單的入門,而這個領域有很多更深入和高級的主題。

這個組合的應用範圍非常廣泛,從資料分析到實時監控,因此它可能對不同領域的專業人士都有價值。繼續學習和探索,並享受使用ClickHouseRabbitMQ來處理資料的過程。

如果您在實施類似的解決方案時遇到疑問或問題,不要害怕尋求幫助。ClickHouseRabbitMQ都有廣泛的文檔和社區支持,您可以在網路上找到豐富的資源,幫助您解決任何挑戰。此外,許多問題都有可能在技術論壇、博客文章和教程中找到答案。

舉一反三

以下為我列出來在應用上思考到的問題或是「坑」,並實際操作找出問題的解答或是方案。

問題:RabbitMQ Engine 消費/推送兩個物化視圖,使用同一個來源表與目標表會發生什麼事?

解答:可以看到當到第「7」步驟後,會再次觸發上方的第「2」步驟,這將導致一個無限循環,因為訊息的循環傳遞會一直觸發。需要注意避免無限循環。

流程示意圖

問題:RabbitMQ Engine 消費/推送,物化視圖來源與目的是否可以跨資料庫?

解答:可以!以下是一個簡單範例。

流程示意圖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
-- 創建 common 資料庫,如果不存在
CREATE DATABASE IF NOT EXISTS common;;


-- 選擇使用 common 資料庫
USE common;;


-- 創建名為 'rabbitmq_entry' 的表格
CREATE TABLE IF NOT EXISTS common.demo_rabbitmq_entry
(
`id` UInt32, -- 事件 ID
`body` String, -- 事件內容
`update_at` DateTime -- 更新時間
) ENGINE = RabbitMQ -- 使用 RabbitMQ 引擎
SETTINGS
rabbitmq_host_port = 'rabbitmq:5672', -- RabbitMQ 伺服器的主機和端口
rabbitmq_exchange_name = 'clickhouse-exchange', -- RabbitMQ 交換機名稱
rabbitmq_routing_key_list = 'my_routing_key', -- RabbitMQ 路由鍵
rabbitmq_format = 'JSONEachRow', -- RabbitMQ 資料的格式為JSONEachRow
rabbitmq_exchange_type = 'fanout', -- RabbitMQ 交換機類型為fanout
rabbitmq_num_consumers = 1, -- RabbitMQ 消費者數量為1
rabbitmq_persistent = 1, -- RabbitMQ 消息持久性為1
rabbitmq_queue_base = 'demo', -- RabbitMQ 佇列基礎名稱
rabbitmq_username = 'guest', -- RabbitMQ 用戶名
rabbitmq_password = 'guest'; -- RabbitMQ 密碼


-----------------------------------------

-- 創建 source 資料庫,如果不存在
CREATE DATABASE IF NOT EXISTS source;;


-- 選擇使用 source 資料庫
USE source;;


-- 創建名為 'event' 的表格
CREATE TABLE IF NOT EXISTS source.event (
`id` UInt32, -- 事件 ID
`body` String, -- 事件內容
`update_at` DateTime -- 更新時間
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/source/event', '{replica}', update_at)
ORDER BY update_at;;


-- 創建名為 'v_event_to_rabbitmq' 的物化視圖,將 'source.event' 表的資料映射到 'common.demo_rabbitmq_entry' 表
CREATE MATERIALIZED VIEW IF NOT EXISTS source.v_event_to_rabbitmq TO common.demo_rabbitmq_entry
AS
SELECT
"id", -- 事件 ID
"body", -- 事件內容
"update_at" -- 更新時間
FROM source.event;

-----------------------------------------

-- 創建 sink 資料庫,如果不存在
CREATE DATABASE IF NOT EXISTS sink;;


-- 選擇使用 sink 資料庫
USE sink;;


-- 創建名為 'event' 的表格
CREATE TABLE IF NOT EXISTS sink.event (
`id` UInt32, -- 事件 ID
`body` String, -- 事件內容
`update_at` DateTime -- 更新時間
) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/sink/event', '{replica}', update_at)
ORDER BY update_at;;


-- 創建名為 'v_rabbitmq_to_event' 的物化視圖,將 RabbitMQ 資料映射到 'sink.event' 表
CREATE MATERIALIZED VIEW IF NOT EXISTS sink.v_rabbitmq_to_event TO sink.event
AS
SELECT
id AS id, -- 事件 ID
body AS body, -- 事件內容
now() AS update_at -- 現在的時間作為更新時間
FROM common.demo_rabbitmq_entry;

-----------------------------------------

-- 針對 source 資料庫的 event 表做資料寫入。
INSERT INTO source.event
(id, body, update_at)
VALUES(1, 'foo-bar', now());

註:以上參考了
Docker
ClickHouse
ClickHouse - RabbitMQ Engine
博客園 - 渐逝的星光 - clickhouse使用rabbitmq进行实时订阅消费
知乎 - 张琼芳 - 理解 RabbitMQ Exchange
Family with 220 icons by inipagi