流复制协议走读

笔者使用 Stolon Cluster 实现 PostgreSQL10 数据库的高可用。本文将按照笔者的理解,梳理 Stolon 以及 Postgres 高可用实现逻辑。


Postgres 流复制协议

Stolon 利用 Postgres 9.0+ 以上版本提供的流复制特性,实现了高可用的功能。在 Postgres10 中,Stolon 通过管理 postgresql.confrecovery.conf这些文件,影响 postgres 角色、行为。Stolon 在备用服务器,根据集群配置生成 recovery.conf 文件,备用服务器启动后探测到该配置文件,会启动 walreceiver 连接到主服务器并请求XLOG streaming replication, 主服务器接收到该请求,由 postmaster 启动 walsender 进程。walreceiver 以及 walsender 根据流复制协议进行交互。下文将介绍相关接口。

请求服务器标识它自己

IDENTIFY_SYSTEM

请求服务器标识它自己。服务器以一个行构成的结果集作为答复,其中包含四个域:

属性名 类型 说明
systemid text 标识该集簇的唯一的系统标识符。这可以被用来检查用于初始化后备机的基础备份是否来自于同一个集簇。
timeline int4 当前的时间线 ID。也对于检查后备机是否与主控机一致有用。
xlogpos text 当前的 WAL 刷写位置。用于得到一个在预写日志中的已知位置作为流的开始位置。
dbname 要连接到的数据库或者空。

请求服务器发送运行时参数的当前设置

SHOW name

请求服务器发送运行时参数的当前设置。这与SQL命令SHOW类似。

name: 运行时参数的名称。在第 19 章中记录了可用的参数。

获取时间线历史文件

TIMELINE_HISTORY tli

请求服务器将时间线tli的历史文件发送过来。服务器将以一行组成的结果集作为答复,其中包含两个域:

属性名 类型 说明
filename text 时间线历史文件的文件名,例如00000002.history。
content bytea 时间线历史文件的内容。

创建复制槽

CREATE_REPLICATION_SLOT slot_name [ TEMPORARY ]

创建一个物理的或者逻辑的复制槽。更多关于复制槽的内容请见 第 26.2.6 节

请求参数说明:

参数 说明
slot_name 要创建的槽的名称。必须是一个合法的复制槽名称(见 第 26.2.6.1 节)。
output_plugin 被用于逻辑解码的输出插件的名称(见 第 48.6 节)。
TEMPORARY 声明该复制槽是临时的。出错或会话结束时,不保存临时槽并自动将其删除。
RESERVE_WAL 指定这个物理复制槽立即保留WAL。否则,只有来自流复制客户端的连接上才会保留WAL。
EXPORT_SNAPSHOT
NOEXPORT_SNAPSHOT
USE_SNAPSHOT
决定如何处理逻辑插槽初始化期间创建的快照。默认情况下, EXPORT_SNAPSHOT将导出快照以用于其他会话。该选项不能在事务中使用。 USE_SNAPSHOT将使用执行命令的当前事务的快照。 该选项必须在事务中使用,并且CREATE_REPLICATION_SLOT 必须是在该事务中运行的第一个命令。最后,NOEXPORT_SNAPSHOT 将正常使用快照进行逻辑解码,但不会对其执行任何操作。

为了响应此命令,服务器将发送包含以下字段的单行结果集:

属性名 类型 说明
slot_name text 新创建的复制槽的名称。
consistent_point 插槽一致的WAL位置。这是从此复制插槽开始流式传输的最早位置。
snapshot_name 该命令导出的快照标识符。在此连接上执行新命令或复制连接关闭之前, 快照有效。如果创建的插槽是物理的,则为空。
output_plugin 新创建的复制插槽使用的输出插件的名称。如果创建的插槽是物理的,则为空。

开始流复制

START_REPLICATION [ SLOT slot_name ] [ PHYSICAL ] XXX/XXX [ TIMELINE tli ]

指示服务器开始启动流 WAL,从 WAL 位置 XXX/XXX 开始。如果TIMELINE选项被指定,流传送会在时间线 tli 上开始,否则会选择服务器的当前时间线。服务器可以回复一个错误,例如如果被请求的WAL节已经被回收了。如果成功,服务器将会响应一个 CopyBothResponse 消息,并且然后开始以流的方式把 WAL 传送给前端。

如果通过 slot_name 提供了一个槽的名称,它将被更新复制进度,这样该服务器知道哪些 WAL 段以及哪些事务(如果 hot_standby_feedback 为打开)仍然被后备机所需要。

如果客户端请求一个并非最新的时间线,但是属于服务器历史的一部分,服务器将会把该时间线上从请求点开始的所有 WAL 以流式传送,一直到服务器切换到另外一个时间线的点。如果客户端请求在一个老的时间线末尾进行流传送,服务器将在不进入 COPY 模式的情况下立即响应 CommandComplete。

在流传送完一个非最新时间线上所有的 WAL 之后,服务器将会通过退出 COPY 模式来结束流。当客户端认识到这一点并也退出 COPY 模式时,服务器会发送一个包含一行两列的结果集,以指示在该服务器历史中的下一个时间线。第一列是下一个时间线的 ID(类型 int8 ),而第二列是发生切换的 WAL 位置(类型 text )。通常,切换位置是被流传送的 WAL 的末尾,但是在很少的情况下服务器会从旧的时间线中发送一些 WAL,而该时间线是服务器本身在提示之前还没有重放的。最后,服务器发送 CommandComplete 消息,并且做好准备接受一个新的命令。

WAL 数据以一系列 CopyData 消息的形式被发送(这允许其他信息穿插其中,特别是服务器可以在开始流传送后遇到失败时发送一个 ErrorResponse 消息)。

从服务器到客户端

每个从服务器到客户端的 CopyData 消息承载了一个下列格式之一的消息:

XLogData (B)
属性 说明
Byte1(‘w’) 标识该消息是WAL数据。
Int64 在消息中 WAL 数据的起始点。
服务器上 WAL 的当前终点。
在传送时服务器的系统时钟,以从 2000-01-01 午夜开始的微秒计。
Byte n WAL 数据流的一节。

一个 WAL 记录绝不会被分割到两个 XLogData 消息。如果一个 WAL 记录跨越了一个 WAL 页面的边界,并且因此已经被使用连续的记录分割,它可以在页面边界被分割。换句话说,第一个主要WAL记录和它的后续记录可以在不同的 XLogData 消息中被发送。

主要存活消息 (B)
属性 说明
Byte1(‘k’) 标识该消息是一个发送者存活消息。
Int64 服务器上 WAL 的当前终点。
在传送时服务器的系统时钟,以从 2000-01-01 午夜开始的微秒计。
Byte1 1表示客户端应该尽快回复该消息,以避免连接超时。否则为0。

从客户端到服务器

接收进程可以在任何时候给发送者发送回复,回复可以使用下列消息格式之一(也在 CopData 消息中使用):

后备机状态更新 (F)
属性 说明
Byte1(‘r’) 标识该消息是一个接收者状态更新。
Int64 接收到并且写入到后备机磁盘的最后一个 WAL 比特的位置+1。
被刷入到后备机磁盘的最后一个 WAL 比特的位置+1。
被应用在后备机上的最后一个 WAL 比特的位置+1。
在传送时客户端的系统时钟,以从 2000-01-01 午夜开始的微秒计。
Byte1 如果为1,客户端要求服务器马上回复这个消息。这可以被用来 ping 服务器以测试连接是否仍然完好。
热备机反馈消息 (F)
属性 说明
Byte1(‘h’) 标识该消息是一个热备机反馈消息。
Int64 在传送时客户端的系统时钟,以从 2000-01-01 午夜开始的微秒计。
Int32 备用数据库的当前全局 xmin,不包括来自任何复制插槽的 catalog_xmin。 如果此值和以下 catalog_xmin 均为0,则将此视为热备机反馈将不再在此连接上发送的通知。 稍后的非零消息可以重新启动反馈机制。
备用服务器上全局 xmin xid 的时代。
备用数据库中任何复制插槽的最低 catalog_xmin。 如果备用数据库上不存在 catalog_xmin 或者热备份反馈被禁用,则设置为0。
备用服务器上 catalog_xmin xid 的时代。

开始逻辑复制

START_REPLICATION SLOT slot_name LOGICAL XXX/XXX [ ( option_name [ option_value ] [, …] ) ]

指示服务器为逻辑复制开始流式传送 WAL,从 WAL 位置XXX/XXX开始。服务器可以回复一个错误,例如如果请求的 WAL 小节已经回环。如果成功,服务器会响应一个 CopyBothResponse 消息,并且接着开始流失传送 WAL 给前端。

消息内部的消息与 START_REPLICATION … PHYSICAL 中记录的格式相同。

与选中槽关联的输出插件被用来处理流的输出。

属性 说明
SLOT slot_name 要从哪个槽流式传送改变。这个参数是必须的,并且必须对应于一个现有的用 LOGICAL 模式的CREATE_REPLICATION_SLOT 创建的逻辑复制槽。
XXX/XXX 要开始流传送的 WAL 位置。
option_name 一个传递给该槽的逻辑解码插件的选项的名称。
option_value 字符串常量形式的选项值,与前面指定的选项关联。

删除一个复制槽

DROP_REPLICATION_SLOT slot_name [ WAIT ]

删除一个复制槽,释放任何保留的服务器端资源。 如果插槽是在 walsender 连接到的数据库以外的数据库中创建的逻辑插槽,则此命令将失败。

属性 说明
slot_name 要删除的槽的名称。
WAIT 此选项会导致命令等待处于活动状态的插槽,直到它变为非活动状态, 而不是默认的引发错误行为。

传送一个基础备份

BASE_BACKUP [ LABEL ‘label’ ] [ PROGRESS ] [ FAST ] [ WAL ] [ NOWAIT ] [ MAX_RATE rate ] [ TABLESPACE_MAP ]

指示服务器开始流传送一个基础备份。在备份开始之前系统将自动被置于备份模式,而在备份结束时会自动被退出备份模式。可以接受下列选项:

属性 说明
LABEL ‘label’ 设置备份的标签。如果没有指定,将会使用 base backup 作为标签。标签的引号规则和 standard_conforming_strings 开启时标准SQL字符串的一样。
PROGRESS 请求用以生成一个进度报告的信息。这将送回位于每个表空间头部的一个近似大小,它可以被用于计算流还有多久才能被完成。它通过在传输开始之前枚举所有文件大小来计算,并且可能会对性能产生一种负面影响 – 特别情况下它可能会在流传送第一个数据之前就耗费很长时间。因为数据库文件可能在备份期间改变,这个大小只是近似的并且可能在近似计算和发送真正的文件之间增长或者收缩。
FAST 请求一个快速检查点。
WAL 在备份中包含必需的 WAL 段。这将把开始和停止备份之间的所有文件包括在 base 目录 tar 文件中的 pg_wal 目录中。
NOWAIT 默认情况下,备份会等待直到最后一个要求的 WAL 段被归档,或者当日至归档被禁用时发出一个警告。指定 NOWAIT 会禁用等待和警告,而让客户端负责确保所要求的日志是可用的。
MAX_RATE rate 单位时间内从服务器传输到客户端的最大数据量限制。期望的单位是千字节每秒。如果指定了这个选项,值必须等于零或者位于 32 kB到 1 GB(包括)范围之间。如果 0 被传入或者没有指定该选项,对于传输将没有限制。
TABLESPACE_MAP 在名为 tablespace_map 的文件中包括有关 pg_tblspc 目录中存在的符号链接的信息。这个表空间映射文件包括了在目录 pg_tblspc 中存在的每一个符号链接的名字以及它的完整路径。

当备份被启动,服务器将首先发送两个普通结果集,后面会跟着一个或多个 CopyResponse 结果。

第一个普通结果集在一行两列中包含了备份的起始位置。第一列包含使用 XLogRecPtr 格式给出的开始位置,第二列包含相应的时间线 ID。

第二个普通结果集中为每一个表空间都有一行。行中的域有:

属性名 类型 说明
spcoid oid 表空间的 OID,如果是 base 目录则为空。
spclocation text 表空间目录的完整路径,如果是 base 目录则为空。
size int8 如果进度报告被请求,这里是表空间的近似大小,否则为空。

在第二个普通结果集之后,一个或多个 CopyResponse 结果将被发送,一个用于主数据目录而对每一个除 pg_default 和 pg_global 之外的额外表空间也会有一个。CopyResponse 结果中的数据将会使一个 tar 格式(遵循POSIX 1003.1-2008标准中指定的“ustar交换格式”)的表空间内容转储,不过标准中定义的两个拖尾全0块将被忽略。在 tar 数据完成后,一个最终普通结果集将被发送,包含了备份的 WAL 结束位置,格式与起始位置相同。

用于数据目录和每个表空间的 tar 归档将包含目录中的所有文件,不管它们是否为 PostgreSQL 文件或者是被加入的其他文件。唯一被排除的文件是:

  • postmaster.pid
  • postmaster.opts

在 PostgreSQL 服务器运行期间创建的各种临时文件和目录, 例如任何以 pgsql_tmp 开头的文件或目录。

pg_wal 及其子目录。如果备份运行时要求包括 WAL 文件,一个 pg_wal 的合成版本将被包括进来,但是只会包含那些备份工作必需的文件,而不是包含剩下的内容。

pg_dynshmem、pg_notify、 pg_replslot、pg_serial、 pg_snapshots、pg_stat_tmp 和 pg_subtrans 被复制为一个空目录(即使它们是符号链接)。

除常规文件和目录之外的其他文件,例如符号链接(除了上面列出的目录之外) 和特殊设备文件,会被跳过(pg_tblspc中的符号链接会被保留)。

如果服务器上的底层文件系统支持,所有者、组合文件模式都会被设置。

结合 postgres 源码剖析流复制的流程

流复制同步机制,我们有必要结合下图的流程以及源码,看一下这个过程中程序都做了哪些事情。

Standby 节点

  1. PostmasterMain 触发 walreceiver 任务:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    /*
    * MaybeStartWalReceiver
    * Start the WAL receiver process, if not running and our state allows.
    *
    * Note: if WalReceiverPID is already nonzero, it might seem that we should
    * clear WalReceiverRequested. However, there's a race condition if the
    * walreceiver terminates and the startup process immediately requests a new
    * one: it's quite possible to get the signal for the request before reaping
    * the dead walreceiver process. Better to risk launching an extra
    * walreceiver than to miss launching one we need. (The walreceiver code
    * has logic to recognize that it should go away if not needed.)
    */
    static void
    MaybeStartWalReceiver(void)
    {
    if (WalReceiverPID == 0 &&
    (pmState == PM_STARTUP || pmState == PM_RECOVERY ||
    pmState == PM_HOT_STANDBY) &&
    Shutdown <= SmartShutdown)
    {
    WalReceiverPID = StartWalReceiver();
    if (WalReceiverPID != 0)
    WalReceiverRequested = false;
    /* else leave the flag set, so we'll try again later */
    }
    }
  2. 通过 AuxiliaryProcessMain 创建 walreciver 子进程:

  3. 从 recovery.conf 读取参数,与 master 节点建立连接:

    1
    2
    3
    4
    5
    /* Establish the connection to the primary for XLOG streaming */
    wrconn = walrcv_connect(conninfo, false, "walreceiver", &err);
    if (!wrconn)
    ereport(ERROR,
    (errmsg("could not connect to the primary server: %s", err)));
  4. 进入循坏,直到 postgres 被重启、关闭

  5. 检查当前 master 节点是否合法

    1
    2
    3
    4
    5
    6
    /*
    * Check that we're connected to a valid server using the
    * IDENTIFY_SYSTEM replication command.
    */
    primary_sysid = walrcv_identify_system(wrconn, &primaryTLI,
    &server_version);
  6. 检查当前 standby 节点的时间线是否与 master 的节点一致

    1
    2
    3
    4
    5
    6
    7
    8
    /*
    * Confirm that the current timeline of the primary is the same or
    * ahead of ours.
    */
    if (primaryTLI < startpointTLI)
    ereport(ERROR,
    (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
    primaryTLI, startpointTLI)));
  7. 从 master 节点拉取历史 wal 日志文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /*
    * Get any missing history files. We do this always, even when we're
    * not interested in that timeline, so that if we're promoted to
    * become the master later on, we don't select the same timeline that
    * was already used in the current master. This isn't bullet-proof -
    * you'll need some external software to manage your cluster if you
    * need to ensure that a unique timeline id is chosen in every case,
    * but let's avoid the confusion of timeline id collisions where we
    * can.
    */
    WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
  8. 开始接收流复制数据

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    /*
    * Start streaming.
    *
    * We'll try to start at the requested starting point and timeline,
    * even if it's different from the server's latest timeline. In case
    * we've already reached the end of the old timeline, the server will
    * finish the streaming immediately, and we will go back to await
    * orders from the startup process. If recovery_target_timeline is
    * 'latest', the startup process will scan pg_wal and find the new
    * history file, bump recovery target timeline, and ask us to restart
    * on the new timeline.
    */
    options.logical = false;
    options.startpoint = startpoint;
    options.slotname = slotname[0] != '\0' ? slotname : NULL;
    options.proto.physical.startpointTLI = startpointTLI;
    ThisTimeLineID = startpointTLI;
    if (walrcv_startstreaming(wrconn, &options))
    ......
  9. 流复制将一直持续,直到出现异常或接收到退出指令

master 节点

  1. postmaster 随时监控当前 postgresql 状态,当符合 postgres 的设置规则时,WalSender 将会被初始化

  2. 初始化 WalSender

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /* Initialize walsender process before entering the main command loop */
    void
    InitWalSender(void)
    {
    am_cascading_walsender = RecoveryInProgress();

    /* Create a per-walsender data structure in shared memory */
    InitWalSenderSlot();

    /* Set up resource owner */
    CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");

    /*
    * Let postmaster know that we're a WAL sender. Once we've declared us as
    * a WAL sender process, postmaster will let us outlive the bgwriter and
    * kill us last in the shutdown sequence, so we get a chance to stream all
    * remaining WAL at shutdown, including the shutdown checkpoint. Note that
    * there's no going back, and we mustn't write any WAL records after this.
    */
    MarkPostmasterChildWalSender();
    SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);

    /* Initialize empty timestamp buffer for lag tracking. */
    memset(&LagTracker, 0, sizeof(LagTracker));
    }
  3. 根据《流复制协议》处理相关请求:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    /*
    * Send out the WAL in its normal physical/stored form.
    *
    * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
    * but not yet sent to the client, and buffer it in the libpq output
    * buffer.
    *
    * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
    * otherwise WalSndCaughtUp is set to false.
    */
    static void
    XLogSendPhysical(void)
    {
    XLogRecPtr SendRqstPtr;
    XLogRecPtr startptr;
    XLogRecPtr endptr;
    Size nbytes;

Reference