https://kimdubi.github.io/postgresql/pg_logical_replication_1/
이 방법은 target DB에서 subscription 을 활성화할 때 Source DB의 데이터를 모두 COPY 하기 때문에
작업의 부하가 운영환경에 영향을 미칠 수 있습니다.
아래에서 설명하는 방법으로 initial data sync를 피하고 Source DB에서 새로 생겨난 데이터부터 replication을 받아올 수 있습니다.

source replication slot 생성

testdb=> CREATE PUBLICATION dbatest_repl01 FOR ALL TABLES;
CREATE PUBLICATION

testdb=> SELECT pg_create_logical_replication_slot('dbatest_repl01', 'pgoutput');
pg_create_logical_replication_slot
------------------------------------
(dbatest_repl01,98/517BB930)
(1 row)

source DB snapshot 생성 후 restore

  • snapshot Restore / cluster 복제본 생성 으로 신규 클러스터 생성
    • source 에서 replication slot을 생성한 이후의 스냅샷이어야함
  • error log에서 WAL LOG LSN 확인
2022-09-04 13:13:59 UTC::@:[15996]:LOG: starting PostgreSQL 13.7 on aarch64-unknown-linux-gnu, compiled by aarch64-unknown-linux-gnu-gcc (GCC) 7.4.0, 64-bit
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on IPv4 address "0.0.0.0", port 6432
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on IPv6 address "::", port 6432
2022-09-04 13:13:59 UTC::@:[15996]:LOG: listening on Unix socket "/tmp/.s.PGSQL.6432"
2022-09-04 13:13:59 UTC::@:[15996]:LOG: Waiting for runtime initialization complete...
2022-09-04 13:14:02 UTC:[local]:rdsadmin@rdsadmin:[16228]:FATAL: the database system is starting up
2022-09-04 13:14:02 UTC::@:[16227]:LOG: database system was interrupted; last known up at 2022-09-04 13:12:56 UTC
2022-09-04 13:14:02 UTC::@:[16227]:LOG: invalid record length at 98/517F9318: wanted 24, got 0
2022-09-04 13:14:02 UTC:[local]:rdsadmin@rdsadmin:[16239]:FATAL: the database system is starting up
2022-09-04 13:14:02 UTC::@:[15996]:LOG: database system is ready to accept connections

=>  invalid record length at 98/517F9318: wanted 24, got 0
=> 98/517F9318 여기까지 포함된 신규클러스터가 복원된 것이므로 여기부터 replication 시작해야함, (에러같이 생겼지만 정상 로그입니다)

target subscription 생성

  • subscription 생성, with 절 옵션이 포인트임
testdb=> CREATE SUBSCRIPTION dbatest_sub01 CONNECTION 'host=dbatest-pg-33.cluster-ckwx7ipq1exp.ap-northeast-2.rds.amazonaws.com port=6432 dbname=testdb user=master password=!dlatl00' PUBLICATION dbatest_repl01
WITH (
copy_data = false,
create_slot = false,
enabled = false,
connect = true,
slot_name = 'dbatest_repl01'
);
CREATE SUBSCRIPTION

copy_data = false 일 때 source log

2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:DETAIL: Streaming transactions committing after 98/517BB930, reading WAL from 98/517BB8D8.
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:STATEMENT: START_REPLICATION SLOT "dbatest_repl01" LOGICAL 98/517F4518 (proto_version '1', publication_names '"dbatest_repl01"')
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:LOG: logical decoding found consistent point at 98/517BB8F8
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:DETAIL: There are no running transactions.
2022-09-04 13:20:49 UTC:10.81.21.109(13478):master@testdb:[28477]:STATEMENT: START_REPLICATION SLOT "dbatest_repl01" LOGICAL 98/517F4518 (proto_version '1', publication_names '"dbatest_repl01"')

=> COPY 과정이 없고 target -> source 로의 데이터 전달을 위해 replication slot이 활성화 됐다는 로그만 남음
=> with 옵션없이 디폴트 설정으로 만들면 맨처음에 생성한 target 쪽 replication slot을 사용하지 못하고 initial data sync를 하게되므로 반드시 확인할 것

  • subscription 활성화
testdb=> SELECT 'pg_'||oid::text AS "external_id"
FROM pg_subscription
WHERE subname = 'dbatest_sub01';
external_id
-------------
pg_3489802
(1 row)


testdb=> SELECT pg_replication_origin_advance('pg_3489802', '98/517F4518') ;
pg_replication_origin_advance
-------------------------------

(1 row)

testdb=> ALTER SUBSCRIPTION dbatest_sub01 ENABLE;
ALTER SUBSCRIPTION

데이터 확인

* subscription 활성화 전 (스냅샷에 있던 데이터만 있는 것 확인)

testdb=> select * from tb_test;
a
--------
1
9999
11
99999
999999
(5 rows)


* subscription 활성화 후 (스냅샷 생성 이후의 변경된 데이터가 들어옴)

testdb=> select * from tb_test;
a
--------
1
9999
11
99999
999999
2
3
4
66
(9 rows)

정리

### lag 확인
database_name=> SELECT slot_name, confirmed_flush_lsn as flushed, pg_current_wal_lsn(), (pg_current_wal_lsn() - confirmed_flush_lsn) AS lsn_distance FROM pg_catalog.pg_replication_slots WHERE slot_type = 'logical';
slot_name        |    flushed    | pg_current_wal_lsn | lsn_distance
-----------------+---------------+--------------------+------------
dbatest_repl01   | 47D97/CF32980 | 47D97/CF3BAC8      | 37192
(1 row)


### lsn_distance = 0 되면 pause primary write 

### source
drop publication dbatest_pub01;
drop pg_replication_slot('dbatest_repl01');

### target 
drop subscription dbatest_sub01;

### target 쪽 sequence 세팅

### 커넥션 변경

이를 debezium 에 활용한다면?

  • source DB에 위에서 설명한 publication, replication slot 생성 부분을 선 반영해놓고 아래 source connector를 생성하면 처음 initial data sync 없이 변경된 데이터만을 가져올 수 있다

{
  "name": "",
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "plugin.name": "pgoutput",  
  "slot.name": "dbatest_repl01",
  "publication.name": "dbatest_pub01" ,
  "database.hostname": "",
  "database.port": "5432",
  "database.user": "master",
  "database.password": "",
  "database.server.name": "kimdubi_test_0831",  
  "database.dbname" : "testdb", 
  "table.include.list": "public.tb_test",
  "database.history.kafka.bootstrap.servers": "",
  "database.history.kafka.topic": "kimdubi_test",
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true,
  "errors.log.include.messages": true,
  "topic.creation.enable": true,
  "topic.creation.default.replication.factor": 3,
  "topic.creation.default.partitions": 10,
  "snapshot.mode": "never",
  "time.precision.mode": "connect",
  "heartbeat.interval.ms": 5
}
  • mysql source connector와 다른 부분

  • connector.class: “io.debezium.connector.postgresql.PostgresConnector”,

  • plugin.name 

    • logical decoding을 위한 Plugin으로 aurora postgresql에서는 pgoutput을 사용한다
  • slot.name

    • source에서 생성한 replication slot 
  • publication.anme

    • source에서 만든 publication name
  • database.dbname

    • postgresql source connector에는 database.include.list 옵션이 없고 database.dbname 으로 가져올 db를 특정해야함
  • table.include.list

    • schema.table 형식으로 기재해야함 
  • snapshot.mode : never  

    • never 로 설정시 source의 Replication slot에 담겨진 transaction log 부터 가져옴
    • mysql 의 schema_only, schema_only_recovery 같은 기능
  • heartbeat.interval.ms