1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| CREATE DATABASE IF NOT EXISTS common;;
USE common;;
CREATE TABLE IF NOT EXISTS common.demo_rabbitmq_entry ( `id` UInt32, `body` String, `update_at` DateTime ) ENGINE = RabbitMQ SETTINGS rabbitmq_host_port = 'rabbitmq:5672', rabbitmq_exchange_name = 'clickhouse-exchange', rabbitmq_routing_key_list = 'my_routing_key', rabbitmq_format = 'JSONEachRow', rabbitmq_exchange_type = 'fanout', rabbitmq_num_consumers = 1, rabbitmq_persistent = 1, rabbitmq_queue_base = 'demo', rabbitmq_username = 'guest', rabbitmq_password = 'guest';
CREATE DATABASE IF NOT EXISTS source;;
USE source;;
CREATE TABLE IF NOT EXISTS source.event ( `id` UInt32, `body` String, `update_at` DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/source/event', '{replica}', update_at) ORDER BY update_at;;
CREATE MATERIALIZED VIEW IF NOT EXISTS source.v_event_to_rabbitmq TO common.demo_rabbitmq_entry AS SELECT "id", "body", "update_at" FROM source.event;
CREATE DATABASE IF NOT EXISTS sink;;
USE sink;;
CREATE TABLE IF NOT EXISTS sink.event ( `id` UInt32, `body` String, `update_at` DateTime ) ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/sink/event', '{replica}', update_at) ORDER BY update_at;;
CREATE MATERIALIZED VIEW IF NOT EXISTS sink.v_rabbitmq_to_event TO sink.event AS SELECT id AS id, body AS body, now() AS update_at FROM common.demo_rabbitmq_entry;
INSERT INTO source.event (id, body, update_at) VALUES(1, 'foo-bar', now());
|