工作中一直在用Oracle 的中间件Oracle GondenGate 是如何保证消息的有序和不丢失呢?

Oracle GoldenGate逻辑架构

首先,先看一下Oracle GoldenGate 的逻辑架构:

图中涉及到两个阶段:

  1. 初始化阶段: extract 进程直接抽取源表信息经网络传输到target 端的 replicat进程,replicat 进程获取到初始化加载数据将其同步到目标数据源。
  2. 增量数据抓取阶段:extract 进程从源表redo log 或其他增量日志中解析并获取增量,然后落地成数据文件;然后pump进程将数据经网络推送到目标端的collector进程。collector是由manager进程维护的,有新的pump数据过来,它会启动一个新的collector,这个collector绑定到特定的端口上,通过TCP/IP连接,负责接收特定pump进程推送过来的数据并落地到指定的目录下生成trail文件。replicat 进程实时读取 trail 文件并将数据推送给kafka。

官方关于 trail文件的说明如下:

To support the continuous extraction and replication of database changes, Oracle GoldenGate stores records of the captured changes temporarily on disk in a series of files called a trail. A trail can exist on the source system, an intermediary system, the target system, or any combination of those systems, depending on how you configure Oracle GoldenGate. On the local system it is known as an extract trail (or local trail). On a remote system it is known as a remote trail.

By using a trail for storage, Oracle GoldenGate supports data accuracy and fault tolerance (see Section 1.2.6, "Overview of Checkpoints"). The use of a trail also allows extraction and replication activities to occur independently of each other. With these processes separated, you have more choices for how data is processed and delivered. For example, instead of extracting and replicating changes continuously, you could extract changes continuously but store them in the trail for replication to the target later, whenever the target application needs them.

即trail 中保存的是数据库中的变化数据。Oracle GoldenGate用trail 做存储,确保数据的准确性和容错性。它也允许extract进程和replicat进程可以独立存在,类似于消息中间件的作用。

checkpoint保证数据不丢失和有序性

下面看一下官方给出的checkpoint 的案例(本来想用项目的真实checkpoint信息,为避免不必要的麻烦,作罢):

注意这个是Oracle RAC模式下checkpoint信息。

查看extract进程checkpoint信息命令:INFO EXTRACT JC108XT,SHOWCH

extract 进程checkpoint信息如下:

EXTRACT    JC108XT Last Started 2011-01-01 14:15   Status ABENDED
Checkpoint Lag       00:00:00 (updated 00:00:01 ago)
Log Read Checkpoint  File /orarac/oradata/racq/redo01.log
            2011-01-01 14:16:45  Thread 1, Seqno 47, RBA 68748800
Log Read Checkpoint  File /orarac/oradata/racq/redo04.log
            2011-01-01 14:16:19  Thread 2, Seqno 24, RBA 65657408

Current Checkpoint Detail:

Read Checkpoint #1

    Oracle RAC Redo Log
    Startup Checkpoint (starting position in data source):
        Thread #: 1
        Sequence #: 47
        RBA: 68548112
        Timestamp: 2011-01-01 13:37:51.000000
        SCN: 0.8439720
        Redo File: /orarac/oradata/racq/redo01.log

Recovery Checkpoint (position of oldest unprocessed transaction in data source):
        Thread #: 1
        Sequence #: 47
        RBA: 68748304
        Timestamp: 2011-01-01 14:16:45.000000
        SCN: 0.8440969
        Redo File: /orarac/oradata/racq/redo01.log

    Current Checkpoint (position of last record read in the data source):
        Thread #: 1
        Sequence #: 47
        RBA: 68748800
        Timestamp: 2011-01-01 14:16:45.000000
        SCN: 0.8440969
        Redo File: /orarac/oradata/racq/redo01.log

Read Checkpoint #2

    Oracle RAC Redo Log

    Startup Checkpoint(starting position in data source):
        Sequence #: 24
        RBA: 60607504
        Timestamp: 2011-01-01 13:37:50.000000
        SCN: 0.8439719
        Redo File: /orarac/oradata/racq/redo04.log

Recovery Checkpoint (position of oldest unprocessed transaction in data source):
        Thread #: 2
        Sequence #: 24
        RBA: 65657408
        Timestamp: 2011-01-01 14:16:19.000000
        SCN: 0.8440613
        Redo File: /orarac/oradata/racq/redo04.log

    Current Checkpoint (position of last record read in the data source):
        Thread #: 2
        Sequence #: 24
        RBA: 65657408
        Timestamp: 2011-01-01 14:16:19.000000
        SCN: 0.8440613
        Redo File: /orarac/oradata/racq/redo04.log

Write Checkpoint #1

    GGS Log Trail

    Current Checkpoint (current write position):

        Sequence #: 2
        RBA: 2142224
        Timestamp: 2011-01-01 14:16:50.567638
        Extract Trail: ./dirdat/eh

    Header:
        Version = 2
        Record Source = A
        Type = 6
        # Input Checkpoints = 2
        # Output Checkpoints = 1

    File Information:
        Block Size = 2048
        Max Blocks = 100
        Record Length = 2048
        Current Offset = 0

    Configuration:
        Data Source = 3
        Transaction Integrity = 1
        Task Type = 0

    Status:
        Start Time = 2011-01-01 14:15:14
        Last Update Time = 2011-01-01 14:16:50
        Stop Status = A
        Last Result = 400

关于Extract的read几种checkpoint解释:

1. extract将read checkpoints放置在数据源中。如果数据源是Oracle,则检查点是放在Oracle的日志中。

2. Startup checkpoint:启动检查点是进程启动时在数据源中创建的第一个检查点。

  • Thread #: 创建检查点的线程数,只有Oracle的RAC模式才会有

  • Sequence #: 创建检查点的事务日志的序列号

  • RBA: RBA是relative byte address的简写,表示创建检查点的记录的相对字节地址

  • Timestamp: 表示创建检查点的记录的时间戳

  • SCN: SCN是system change number的简写,表示系统更改检查点所在记录的编号

  • Redo File: 包含创建检查点的记录的事务日志的路径名

3. Recovery checkpoint:恢复检查点表示extract未处理的最早的事务日志的位置信息。

4. Current checkpoint:表示extract在数据源中读的最近的(注意:此时还没有写成功)记录的位置信息。它应该和 Log Read Checkpoint 信息一致。

关于extract的写的checkpoint解释:

extract进程将 current checkpoint 放在trail 文件中。current checkpoint 是指extract 正在写的trail的位置。

  • Sequence #: 写入检查点的trail文件的序列号

  • RBA:trail文件中创建检查点的记录的相对字节地址

  • Timestamp: 创建检查点的记录的时间戳

  • Extract trail: trail文件的相对路径名称

  • Trail Type: 其中在类似于NFS服务上的被认为是local

查看 replicat 进程 checkpoint 信息命令:INFO REPLICAT JC108RP, SHOWCH

replicat 进程checkpoint 信息如下:

REPLICAT   JC108RP   Last Started 2011-01-12 13:10   Status RUNNING
Checkpoint Lag       00:00:00 (updated 111:46:54 ago)
Log Read Checkpoint  File ./dirdat/eh000000
                     First Record  RBA 3702915
Current Checkpoint Detail:
    Read Checkpoint #1
    GGS Log Trail
    Startup Checkpoint(starting position in data source):
    Sequence #: 0
    RBA: 3702915
    Timestamp: Not Available
    Extract Trail: ./dirdat/eh
    Current Checkpoint (position of last record read in the data source):
    Sequence #: 0
    RBA: 3702915
    Timestamp: Not Available
    Extract Trail: ./dirdat/eh
    Header:
    Version = 2
    Record Source = A
    Type = 1
    # Input Checkpoints = 1
    # Output Checkpoints = 0
    File Information:
    Block Size = 2048
    Max Blocks = 100
    Record Length = 2048
    Current Offset = 0
    Configuration:
    Data Source = 0
    Transaction Integrity = -1
    Task Type = 0
    Status:
    Start Time = 2011-01-12 13:10:13
    Last Update Time = 2011-01-12 21:23:31
    Stop Status = A
    Last Result = 400

1. Startup Checkpoint

当进程启动时在trail文件中创建的第一个checkpoint

  • Sequence #: 写入检查点的trail文件的序列号

  • RBA: trail文件中创建检查点的记录的相对字节地址

  • Timestamp: 表示创建检查点的记录的时间戳

  • Extract Trail: trail 文件的相对地址

2. Current Checkpoint:current checkpoint 是指replicat 进程读取trail文件的最近的记录的位置。

抽取的最终的日志格式使得实现幂等性操作成为可能

Oracle GoldenGate的日志格式是snapshot格式的,试想一下,假设我一条记录的某个字段 做累加操作,Oracle GoldenGate给我们的数据是增量数据,在at-least-once语义之上,进行多次传输,那么数据最终会出问题。而snapshot数据,只需要根据主键不断覆盖即可。这种数据是支持幂等性操作的。

 

总结

1. 关于数据容错性。
  1. 其一,日志格式是snapshot的,使得实现幂等性操作成为可能。
  2. 其二Oracle GoldenGate的checkpoint机制和消费者保存消费的offset的机制是一样的。都支持at-least-once的语义。因为很有可能出现已经写数据成功,但更新checkpoint数据失败,即kafka中的数据可能会出现重复的现象。所以在处理Oracle GoldenGate的消息时,要确保最终落地的操作是幂等性操作,这样数据至少不会丢。一般幂等性都需要唯一id作为标识,一般选用数据的主键做唯一id。如果没有主键可以使用其他方案,切记生成的id要唯一且可重复生成,即同一条记录根据id生成规则,永远是相同的id且在一定时间范围内不冲突。
2. 关于有序性。
  1. 其一数据在Oracle GoldenGate传输过程中,同一个extract进程落到一个文件中,经网络传输会被响应的collector接收并把数据放到对应的rmtrail中,其中 local trail 和 remote trail 是一一对应的。相当于数据从一个分区传输到另外一个固定对应的分区,只要数据有序传输(它通过读写checkpoint来保证),那么最终remote trail和local trail的顺序肯定是一致的(可能会有重复)。
  2. 其二,replicat落到kafka的数据是单分区的,保证了放到kafka的数据的有序性。
 
 

参考:

Oracle GoldenGate文档库:https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_about_gg.htm#GWUAD117

Oracle官方对 Checkpoint 的术语的解释:https://docs.oracle.com/goldengate/1212/gg-winux/GWUAD/wu_ogg_checkpts.htm#GWUAD965

 

Oracle Goldengate是如何保证数据有序和确保数据不丢失的?的更多相关文章

  1. Oracle GoldenGate理论

    1Oracle GoldenGate处理方法和支持的数据库Oracle GoldenGate在多样和异构的基础IT平台中,可以在事务级别上进行数据交换和数据操作.在保证交易完整性和最小的开销的条件下, ...

  2. Oracle GoldenGate 12c实时捕获SQL Server数据

    在Oracle GoldenGate 12c中,对一些最新的数据库提供了支持,比如SQL Server 2012/2014,当然12c也支持sql server 2008.主要新增特性有: 捕获进程可 ...

  3. Oracle GoldenGate (以下简称ogg)在异种移植os同一种db之间的数据同步。

    Oracle GoldenGate (以下简称ogg)在异种移植os同一种db之间的数据同步. ogg要实现的功能: 同步可以细化到单个表,满足特定的where条件rows同步,称号column同步. ...

  4. Oracle GoldenGate 四、数据过滤和数据项匹配

    写在开始前 从两周前我花了大量的业余时间阅读GoldenGate官方文档,并根据文档实践和进一步学习了解GoldenGate,以下便是根据官方文档理解总结的GoldenGate学习内容: Oracle ...

  5. Oracle GoldenGate 支持 从SAP HANA database抽取或者复制数据到SAP HANA database 吗?

    Oracle GoldenGate 支持 从SAP  HANA database抽取或者复制数据到SAP HANA database 吗? 来源于: Does Oracle GoldenGate Su ...

  6. Oracle Goldengate和Oracle Data Integrator的初步认识

    免责声明:     本文中使用的部分图片来自于网络,如有侵权,请联系博主进行删除 1. Oracle Glodengate是什么? GoldenGate是oracle的一种基于数据库日志的数据同步软件 ...

  7. Oracle GoldenGate学习之Goldengate介绍

    Oracle GoldenGate学习之Goldengate介绍 (2012-10-02 17:07:27) 标签: 检查点 数据传输 队列 进程 分类: Goldengate Goldengate介 ...

  8. Oracle GoldenGate实现数据库同步

    前言:最近刚好在弄数据库同步,网上查了些资料再加上自己整理了一些,做个分享! 一.GoldenGate的安装 1.安装包准备 数据库版本:Oracle Database 11g Release 2(1 ...

  9. 为Oracle GoldenGate准备数据库

    了解如何为Oracle GoldenGate准备数据库,包括如何配置连接和日志记录,如何在数据库中启用Oracle GoldenGate,如何设置闪回查询以及如何管理服务器资源. 为集成进程配置连接 ...

随机推荐

  1. 批量文本读取URL获取正常访问且保留对应IP

    #coding=utf-8 import sys import requests for i in range(3000,4999,1): url = 'http://192.168.88.139:8 ...

  2. Python中的抽象超类

    # -*- coding:utf-8 -*- class Super(object): def test(self): self.action() class Sub(Super): def acti ...

  3. Cookie 添加,读取,删除

    Name,value – 声明时 new Cookie(key,value); Path        - 默认值,即为当前保存cookie的这个serlvet所在的路径. 如果Cookie在这样的路 ...

  4. delegate,notifucation,KVO三种模式实现通信的优缺点

             在开发ios应用的时候,我们会经常遇到一个常见的问题:在不过分耦合的前提下,controllers间怎么进行通信.在IOS应用不断的出现三种模式来实现这种通信: 1.委托delega ...

  5. NSRangeFromString(<#NSString * _Nonnull aString#>) 和rangeOfString

    NSRangeFromString NSString *str1 = @"abcdef"; NSString *str2 = @"1-105"; NSStrin ...

  6. Java:终结器

    目录 背景Java版:终结器防卫者C#版:“终结器防卫者”备注 背景返回目录 多数情况我们不需要重写 finalize 方法,只有当我们需要持有未托管资源的时候才需要,而此时重写 finalize 方 ...

  7. Industry Engineer

    IE:有两种,一种是系统方面的,对全厂的物流设计,依据产量做人力估算,机台,设备,手工具的估算等,一种是配合产能提升做制造的改善,以及SOP(Standard Operation Procedure: ...

  8. Java---Ajax在Struts2框架的应用实例

    Ajax 是一种在无需重新加载整个网页的情况下,能够更新部分网页的技术. 很久没有动过ajax了,趁此机会复习一下,写一个简单的例子 一.项目结构: 二.需要的jar包 三.具体代码: 1.web.x ...

  9. umask的作用[转]

    umask的作用 umask 命令允许你设定文件创建时的缺省模式,对应每一类用户(文件属主.同组用户.其他用户)存在一个相应的umask值中的数字.对于文件来说,这一数字的最 大值分别是6.系统不允许 ...

  10. CentOS配置SSH免密登录

    假如我们有两台CentOS机器,192.168.199.101,192.168.199.102,要想在101上远程连接102可以通过ssh命令来实现 ssh 192.168.199.102 如果没有配 ...