https://kimdubi.github.io/postgresql/pg_logical_replication_1/
이 방법은 target DB에서 subscription 을 활성화할 때 Source DB의 데이터를 모두 COPY 하기 때문에
작업의 부하가 운영환경에 영향을 미칠 수 있습니다.
아래에서 설명하는 방법으로 initial data sync를 피하고 Source DB에서 새로 생겨난 데이터부터 replication을 받아올 수 있습니다.
source replication slot 생성
testdb=> CREATE PUBLICATION dbatest_repl01 FOR ALL TABLES;
CREATE PUBLICATION
testdb=> SELECT pg_create_logical_replication_slot('dbatest_repl01', 'pgoutput');
pg_create_logical_replication_slot
------------------------------------
(dbatest_repl01,98/517BB930)
(1 row)
source DB snapshot 생성 후 restore
- snapshot Restore / cluster 복제본 생성 으로 신규 클러스터 생성
- source 에서 replication slot을 생성한 이후의 스냅샷이어야함
- error log에서 WAL LOG LSN 확인
2022-09-04 13:13:59 UTC::@:[15996]:LOG: starting PostgreSQL 13.7 on aarch64-unknown-linux-gnu, compiled by aarch64-unknown-linux-gnu-gcc (GCC) 7.4.0, 64-bit
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on IPv4 address "0.0.0.0", port 6432
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on IPv6 address "::", port 6432
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on Unix socket "/tmp/.s.PGSQL.6432"
2022-09-04 13:13:59 UTC::@:[15996]:LOG: Waiting for runtime initialization complete...
2022-09-04 13:14:02 UTC:[local]:rdsadmin@rdsadmin:[16228]:FATAL: the database system is starting up
2022-09-04 13:14:02 UTC::@:[16227]:LOG: database system was interrupted; last known up at 2022-09-04 13:12:56 UTC
2022-09-04 13:14:02 UTC::@:[16227]:LOG: invalid record length at 98/517F9318: wanted 24, got 0
2022-09-04 13:14:02 UTC:[local]:rdsadmin@rdsadmin:[16239]:FATAL: the database system is starting up
2022-09-04 13:14:02 UTC::@:[15996]:LOG: database system is ready to accept connections
=> invalid record length at 98/517F9318: wanted 24, got 0
=> 98/517F9318 여기까지 포함된 신규클러스터가 복원된 것이므로 여기부터 replication 시작해야함, (에러같이 생겼지만 정상 로그입니다)
target subscription 생성
- subscription 생성, with 절 옵션이 포인트임
testdb=> CREATE SUBSCRIPTION dbatest_sub01 CONNECTION 'host=dbatest-pg-33.cluster-ckwx7ipq1exp.ap-northeast-2.rds.amazonaws.com port=6432 dbname=testdb user=master password=!dlatl00' PUBLICATION dbatest_repl01
WITH (
copy_data = false,
create_slot = false,
enabled = false,
connect = true,
slot_name = 'dbatest_repl01'
);
CREATE SUBSCRIPTION
copy_data = false 일 때 source log
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:DETAIL: Streaming transactions committing after 98/517BB930, reading WAL from 98/517BB8D8.
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:STATEMENT: START_REPLICATION SLOT "dbatest_repl01" LOGICAL 98/517F4518 (proto_version '1', publication_names '"dbatest_repl01"')
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:LOG: logical decoding found consistent point at 98/517BB8F8
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:DETAIL: There are no running transactions.
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:STATEMENT: START_REPLICATION SLOT "dbatest_repl01" LOGICAL 98/517F4518 (proto_version '1', publication_names '"dbatest_repl01"')
=> COPY 과정이 없고 target -> source 로의 데이터 전달을 위해 replication slot이 활성화 됐다는 로그만 남음
=> with 옵션없이 디폴트 설정으로 만들면 맨처음에 생성한 target 쪽 replication slot을 사용하지 못하고 initial data sync를 하게되므로 반드시 확인할 것
- subscription 활성화
testdb=> SELECT 'pg_'||oid::text AS "external_id"
FROM pg_subscription
WHERE subname = 'dbatest_sub01';
external_id
-------------
pg_3489802
(1 row)
testdb=> SELECT pg_replication_origin_advance('pg_3489802', '98/517F4518') ;
pg_replication_origin_advance
-------------------------------
(1 row)
testdb=> ALTER SUBSCRIPTION dbatest_sub01 ENABLE;
ALTER SUBSCRIPTION
데이터 확인
* subscription 활성화 전 (스냅샷에 있던 데이터만 있는 것 확인)
testdb=> select * from tb_test;
a
--------
1
9999
11
99999
999999
(5 rows)
* subscription 활성화 후 (스냅샷 생성 이후의 변경된 데이터가 들어옴)
testdb=> select * from tb_test;
a
--------
1
9999
11
99999
999999
2
3
4
66
(9 rows)
정리
### lag 확인
database_name=> SELECT slot_name, confirmed_flush_lsn as flushed, pg_current_wal_lsn(), (pg_current_wal_lsn() - confirmed_flush_lsn) AS lsn_distance FROM pg_catalog.pg_replication_slots WHERE slot_type = 'logical';
slot_name | flushed | pg_current_wal_lsn | lsn_distance
-----------------+---------------+--------------------+------------
dbatest_repl01 | 47D97/CF32980 | 47D97/CF3BAC8 | 37192
(1 row)
### lsn_distance = 0 되면 pause primary write
### source
drop publication dbatest_pub01;
drop pg_replication_slot('dbatest_repl01');
### target
drop subscription dbatest_sub01;
### target 쪽 sequence 세팅
### 커넥션 변경
이를 debezium 에 활용한다면?
- source DB에 위에서 설명한 publication, replication slot 생성 부분을 선 반영해놓고 아래 source connector를 생성하면 처음 initial data sync 없이 변경된 데이터만을 가져올 수 있다
{
"name": "",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"slot.name": "dbatest_repl01",
"publication.name": "dbatest_pub01" ,
"database.hostname": "",
"database.port": "5432",
"database.user": "master",
"database.password": "",
"database.server.name": "kimdubi_test_0831",
"database.dbname" : "testdb",
"table.include.list": "public.tb_test",
"database.history.kafka.bootstrap.servers": "",
"database.history.kafka.topic": "kimdubi_test",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true,
"topic.creation.enable": true,
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 10,
"snapshot.mode": "never",
"time.precision.mode": "connect",
"heartbeat.interval.ms": 5
}
mysql source connector와 다른 부분
connector.class: “io.debezium.connector.postgresql.PostgresConnector”,
plugin.name
- logical decoding을 위한 Plugin으로 aurora postgresql에서는 pgoutput을 사용한다
slot.name
- source에서 생성한 replication slot
publication.anme
- source에서 만든 publication name
database.dbname
- postgresql source connector에는 database.include.list 옵션이 없고 database.dbname 으로 가져올 db를 특정해야함
table.include.list
- schema.table 형식으로 기재해야함
snapshot.mode : never
- never 로 설정시 source의 Replication slot에 담겨진 transaction log 부터 가져옴
- mysql 의 schema_only, schema_only_recovery 같은 기능
heartbeat.interval.ms
- replication slot을 계속 갱신해주지않으면 불필요한 데이터가 쌓여서 디스크 사용량이 누적됨
- https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-wal-disk-space