Like Share Discussion Bookmark Smile

J.J. Huang   2023-10-18   Clickhouse RabbitMQ   瀏覽次數:次   DMCA.com Protection Status

ClickHouse 對 RabbitMQ:推送篇

原由

我們的需求主要源於對Change Data Capture(CDC)功能的需求,這種功能允許我們實時捕獲和處理資料更改。CDC在現代資料處理中變得越來越重要,因為它可以用於許多關鍵業務案例,如實時監控、資料同步和資料分析。

為了實現CDC,我們尋找了合適的工具和技術,而ClickHouse提供的PublishRabbitMQEngine正是我們所需要的功能。這個Engine允許我們將資料更改以實時方式發布到RabbitMQ消息隊列,然後可以對這些消息進行進一步處理、分析或同步到其他資料存儲系統。這使我們能夠快速且可靠地響應資料更改,並滿足我們的業務需求。

總而言之,選擇了ClickHouseCDC功能,因為它能夠有效地實現我們的實時資料捕獲需求,並為我們提供了一個強大的工具,用於處理資料更改,並支持多種關鍵業務用例。


Docker Compose

  • docker-compose.yml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    version: "3.0"
    services:
    clickhouse01:
    container_name: clickhouse-01
    image: clickhouse/clickhouse-server:23.5.3.24
    ulimits:
    nproc: 65535
    nofile:
    soft: 262144
    hard: 262144
    depends_on:
    - zookeeper
    volumes:
    - "./clickhouse-01:/var/lib/clickhouse"
    - "./clickhouse-server-01:/etc/clickhouse-server"
    - "./clickhouse-log-01:/var/log/clickhouse-server"
    ports:
    - "8123:8123"
    - "9011:9011"
    - "9004:9004" # mysql
    - "9005:9005" # postgres
    networks:
    - clickhouse-cluster

    clickhouse02:
    container_name: clickhouse-02
    image: clickhouse/clickhouse-server:23.5.3.24
    ulimits:
    nproc: 65535
    nofile:
    soft: 262144
    hard: 262144
    depends_on:
    - zookeeper
    volumes:
    - "./clickhouse-02:/var/lib/clickhouse"
    - "./clickhouse-server-02:/etc/clickhouse-server"
    - "./clickhouse-log-02:/var/log/clickhouse-server"
    ports:
    - "8223:8223"
    - "9211:9211"
    - "9204:9204" # mysql
    - "9205:9205" # postgres
    networks:
    - clickhouse-cluster

    zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    container_name: zkserver
    deploy:
    resources:
    limits:
    memory: 512M
    ports:
    - "2181:2181"
    environment:
    ZOOKEEPER_SERVER_ID: 1
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 5000
    ZOOKEEPER_INIT_LIMIT: 5
    ZOOKEEPER_SYNC_LIMIT: 2
    volumes:
    - ./zookeeper_data:/var/lib/zookeeper/data
    - ./zookeeper_log:/var/lib/zookeeper/log
    - ./zookeeper_secrets:/etc/zookeeper/secrets
    networks:
    - clickhouse-cluster

    tabix-web-client:
    container_name: tabix-web-client
    image: spoonest/clickhouse-tabix-web-client
    depends_on:
    - clickhouse01
    - clickhouse02
    ports:
    - "8080:80"
    networks:
    - clickhouse-cluster

    rabbitmq:
    image: rabbitmq:3.12-management
    hostname: rabbitmq
    container_name: rabbitmq
    environment:
    - RABBITMQ_DEFAULT_USER=guest
    - RABBITMQ_DEFAULT_PASS=guest
    volumes:
    - ./rabbitmq/data:/var/lib/rabbitmq
    - ./rabbitmq/log:/var/log/rabbitmq
    ports:
    - "5672:5672"
    - "15672:15672"
    depends_on:
    - clickhouse01
    - clickhouse02
    networks:
    - clickhouse-cluster

    networks:
    clickhouse-cluster:
    關於Docker compose的詳細說明與相關的設定檔案配置,請參閱Docker Compose - ClickHouse (cluster - CK)Docker Compose - RabbitMQ

流程示意圖

流程示意圖

Run & Test

  • 此處是使用瀏覽器開啟Clickhouse-Web-Client + RabbitMQ-WebUI進行測試。

    1
    2
    3
    4
    5
    # Clickhouse-Web-Client
    http://127.0.0.1:8080

    # RabbitMQ-WebUI
    http://127.0.0.1:15672
  • 首先於clickhouse-01clickhouse-02建立資料庫、分布式表、RabbitMQ EngineMATERIALIZED View

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    -- 創建 test 資料庫,如果不存在
    CREATE DATABASE IF NOT EXISTS test;;

    -- 選擇使用 test 資料庫
    USE test;;

    -- 創建名為 'event' 的表格
    CREATE TABLE IF NOT EXISTS test.event (
    `id` UInt32, -- 事件 ID
    `body` String, -- 事件內容
    `update_at` DateTime -- 更新時間
    ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/test/event', '{replica}', update_at)
    ORDER BY update_at;;

    -- 創建名為 'rabbitmq_entry' 的表格
    CREATE TABLE IF NOT EXISTS test.rabbitmq_entry
    (
    `id` UInt32, -- 事件 ID
    `body` String, -- 事件內容
    `update_at` DateTime -- 更新時間
    ) ENGINE = RabbitMQ -- 使用 RabbitMQ 引擎
    SETTINGS
    rabbitmq_host_port = 'rabbitmq:5672', -- RabbitMQ 伺服器的主機和端口
    rabbitmq_exchange_name = 'clickhouse-exchange', -- RabbitMQ 交換機名稱
    rabbitmq_routing_key_list = 'my_routing_key', -- RabbitMQ 路由鍵
    rabbitmq_format = 'JSONEachRow', -- RabbitMQ 資料的格式為JSONEachRow
    rabbitmq_exchange_type = 'fanout', -- RabbitMQ 交換機類型為fanout
    rabbitmq_num_consumers = 1, -- RabbitMQ 消費者數量為1
    rabbitmq_persistent = 1, -- RabbitMQ 消息持久性為1
    rabbitmq_queue_base = 'hello', -- RabbitMQ 佇列基礎名稱
    rabbitmq_username = 'guest', -- RabbitMQ 用戶名
    rabbitmq_password = 'guest'; -- RabbitMQ 密碼


    -- 創建名為 'v_event_to_rabbitmq' 的物化視圖,將 'event' 表的據映射到 'rabbitmq_entry' 表
    CREATE MATERIALIZED VIEW IF NOT EXISTS test.v_event_to_rabbitmq TO test.rabbitmq_entry
    AS
    SELECT
    "id", -- 事件 ID
    "body", -- 事件內容
    "update_at" -- 更新時間
    FROM test.event;

    註:關於 RabbitMQ ENGINE 的 SETTINGS 可以參考RabbitMQ Engine - Optional parameters

  • 使用Insert into語法,寫入資料至event表內

    1
    2
    3
    INSERT INTO test.event
    (id, body, update_at)
    VALUES(1, 'foo-bar', now());;

  • 後續使用RabbitMQ-WebUI進行取得訊息,確定訊息是有被推送。


註:以上參考了
Docker
ClickHouse
ClickHouse - RabbitMQ Engine
博客園 - 渐逝的星光 - clickhouse使用rabbitmq进行实时订阅消费
知乎 - 张琼芳 - 理解 RabbitMQ Exchange