业务背景

写任何工具都不能脱离实际业务的背景。开始这个项目的时候是因为现有的项目中数据分布太零碎,零零散散的分布在好几个数据库中,没有统一的数据库来收集这些数据。这种情况下想做一个大而全的会员中心系统比较困难。(这边是一个以互联网保险为中心的项目,保单,会员等数据很零散的储存在好几个项目之中,并且项目之间的数据基本上是隔离的)。

现有的项目数据库是在腾讯云中储存,虽然腾讯提供了数据同步功能,但是这样必须要表结构相同才行,并不符合我们的需求。所以需要自行开发。

项目在这里:https://github.com/hjx601496320/miner。

更新:

这个项目我删掉了,git上现在找不到,新的项目在这里:

https://github.com/hjx601496320/plumber

去掉了对canal的依赖, 使用起来更加的方便~~

需求

1:需要能灵活配置。

2:实时数据10分钟内希望可以完成同步。

3:来源数据与目标数据可能结构,字段名称不同。

4:增删改都可以同步。

技术选择

这个任务交给了我和另外一个同事来做。

同事的

同事希望可以通过ETL工具Kettle来做,这个东西我没有研究过,是同事自己在研究。具体过程不是很清楚,但是最后是通过在mysql中设置更新,修改,删除的触发器,然后在Kettle中做了一个定时任务,实现了数据同步的功能,初步测试符合需求。但是必须要在数据库中设置触发器,并且会有一个临时表,这一点我个人不是很喜欢。

我的

我是本着能自己写就自己写的原则,准备自己写一个。刚开始使用的是定时任务比较两个库的数据差别,然后再同步数据。但是经过一定的数据测试后,发现在数据量大的时候,定时任务中的上一个任务没有执行完毕,下一个任务就又开始了。这样造成了两边数据不一致。最终这个方案废弃了。

后来通过研究,发现mysql的数据操作会记录在binlog中,这时就有了新的方案。可以通过逐行获取binlog信息,经过解析数据后,同步在目标库中。

既然有了方案,那么就开始做吧。

开始尝试:1

首先要打开数据库的binlog功能,这一步比较简单,修改mysql的配置文件:/etc/mysql/mysql.conf.d/mysqld.cnf,添加:

server-id        = 1
log_bin = /var/log/mysql/mysql-bin.log
expire_logs_days = 10
max_binlog_size = 100M
binlog_format = ROW

  

然后重启mysql 就好了,具体每个参数的意思,搜索一下就好了。这时候随意的对某一个数据库中的表做一下增删改,对应的日志就会记录在/var/log/mysql/这个文件夹下了。我们看一下这个文件夹里的东西:

这里的文件是没有办法正常查看的,需要使用mysql提供的命令来查看,命令是这个样子的:

1:查看
mysqlbinlog mysql-bin.000002
2:指定位置查看
mysqlbinlog --start-position="120" --stop-position="332" mysql-bin.000002

  

因为我们现在的binlog_format指定的格式是ROW(就在上面写的,还记得吗?),所谓binlog文件的内容没有办法正常查看,因为他是这个样子的:

这时,我们需要:

对输出进行解码
mysqlbinlog --base64-output=decode-rows -v mysql-bin.000001

  

这时候,显示的结果就变成了:

虽然还不是正常的sql,但是好赖是有一定的格式了。

but自己来做解析的话还是很麻烦,so~放弃这种操作。

继续尝试:2

经过再次研究后,发现数据库中执行sql也是可以查看binlog的。主要有如下几条命令:

重置binlog
reset master; 查看binlog的配置
show variables like '%binlog%'; 查看所有的binlog
show binary logs; 查看正在写入的binlog
show master status; 查看指定binlog文件
show binlog events in 'mysql-bin.000001'; 查看指定binlog文件,并指定位置
show binlog events in 'mysql-bin.000001' from [pos] limit [显示多少条];

  

按照上面的命令执行结果为:

发现sql还是不能正常显示。这里的原因应该是binlog_format配置的原因。将其修改为 binlog_format=Mixed后,完美解决。经过数据库中一通增删改后,显示的sql类似这样:

use `pay`; /* ApplicationName=DataGrip 2018.2.5 */ UPDATE `pay`.`p_pay_log` t SET t.`mark_0` = 'sdfsdf' WHERE t.`id` LIKE '342' ESCAPE '#'

  

现在似乎已经可以开始写数据同步了,只要在启动的时候获取当正在使用的是哪一个日志文件,记录binlog的位置,然后一点一点向下执行,解析sql就好了。但是在这个过程中,我发现阿里巴巴有一款开源的软件可以用。就是标题上说道的:canal。看了一下网站上的介绍,简直美滋滋。

它的文档和代码地址在这里:https://github.com/alibaba/canal,大家可以看一下。现在就准备用这个来完成我所需要的功能。

正式开始写

首先看一下介绍,canal是需要单独运行一个服务的,这个服务具体的配置还是比较简单的。它的作用我自己理解就是监控binlog,然后根据自己的需要获取binlog中一定量的数据。这个数据是经过处理的,可以比较方便的知道里面的具体信息。比如那些数据发生了变动,每列数据的列名是什么,变动前和变动后的值是啥之类的。那么开始。

1:我的想法

1):项目启动的时候,开启canal的链接,以及初始化一些配置。

@Bean
public CanalConnector canalConnector() {
CanalConnector connector = CanalConnectors.newSingleConnector(
//对应canal服务的链接
new InetSocketAddress(canalConf.getIp(), canalConf.getPort()),
//链接的目标,这里对应canal服务中的配置,需要查阅文档
canalConf.getDestination(),
//不知道是什么用户,使用“”
canalConf.getUser(),
//不知道是什么密码,使用“”
canalConf.getPassword()
);
return connector;
}

  

2):先开启一个线程,里面写一个死循环,用于从canal的服务中获取binlog中的消息。这个消息类是:com.alibaba.otter.canal.protocol.Message。

Message message = connector.getWithoutAck(100);

connector:canal链接的实例化对象。
connector.getWithoutAck(100):从连接中获取100条binlog中的数据。

  

3):取出Message中的事件集合,就是binlog中的每一条数据。将类型为增删改的数据取出,之后每一条数据放在一个线程中,用线程池去执行它。

List<Entry> entries = message.getEntries();

message.getEntries():从链接中获取的数据集合,每一条代表1条binlog数据

  

4):在每一个线程中,取出Entry中的数据,根据其类型拼接各种sql,并执行。

Header header = entry.getHeader();
获取发生变化的表名称,可能会没有
String tableName = header.getTableName(); 获取发生变化的数据库名称,可能会没有
String schemaName = header.getSchemaName(); //获取事件类型
EventType eventType = rowChange.getEventType();
这里我们只是用其中的三种类型:
EventType.DELETE 删除
EventType.INSERT 插入
EventType.UPDATE 更新 //获取发生变化的数据
RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); //遍历其中的数据
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
//每一行中的数据
RowData rowData = rowChange.getRowDatas(i);
} //获取修改前的数据
List<Column> before = rowData.getBeforeColumnsList(); //获取修改后的数据
List<Column> after = rowData.getAfterColumnsList(); Column中有一系列方法,比如是否发生修改,时候为key,是否是null等,就不在细说了。

  

2:万事具备,可以开始写了

1):这里先写一个线程,用于不停的从canal服务中获取消息,然后创建新的线程并让其处理其中的数据。代码如下:

@Override
public void run() {
while (true) {
//主要用于在链接失败后用于再次尝试重新链接
try {
if (!run) { //打开链接,并设置 run=true
startCanal();
}
} catch (Exception e) { System.err.println("连接失败,尝试重新链接。。。");
threadSleep(3 * 1000);
}
System.err.println("链接成功。。。");
//不停的从CanalConnector中获取消息
try {
while (run) { //获取一定数量的消息,这里为线程池数量×3
Message message = connector.getWithoutAck(batchSize * 3);
long id = message.getId(); //处理获取到的消息
process(message);
connector.ack(id);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
//如果发生异常,最终关闭连接,并设置run=false
stopCanal();
}
} }

  

void process(Message message) {
List<Entry> entries = message.getEntries();
if (entries.size() <= 0) {
return;
}
log.info("process message.entries.size:{}", entries.size());
for (Entry entry : entries) {
Header header = entry.getHeader();
String tableName = header.getTableName();
String schemaName = header.getSchemaName(); //这里判断是否可以取出数据库名称和表名称,如果不行,跳过循环
if (StringUtils.isAllBlank(tableName, schemaName)) {
continue;
} //创建新的线程,并执行
jobList.stream()
.filter(job -> job.isMatches(tableName, schemaName))
.forEach(job -> executorService.execute(job.newTask(entry)));
}
}

  

这里的jobList是我自己定义List<Job>,代码如下:

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate; import static com.alibaba.otter.canal.protocol.CanalEntry.Entry; @Slf4j
@Data
public abstract class Job { /**
* 数据库链接
*/
protected JdbcTemplate jdbcTemplate; /**
* 额外配置
*/
protected JSONObject prop; /**
* 校验目标是否为合适的数据库和表
*
* @param table
* @param database
* @return
*/
abstract public boolean isMatches(String table, String database); /**
* 实例化一个Runnable
*
* @param entry
* @return
*/
abstract public Runnable newTask(final Entry entry); /**
* 获取RowChange
*
* @param entry
* @return
*/
protected CanalEntry.RowChange getRowChange(Entry entry) {
try {
return CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return null;
} }

  

jobList里面放的是Job的实现类。

3:写一个Job的实现类,并用于同步表,并转换字段名称。

因为需求中要求两个同步的数据中可能字段名称不一致,所以我写了一个josn用来配置两个表的字段对应关系:

别的配置
。。。
"prop": {
//来源数据库
"database": "pay",
//来源表
"table": "p_pay_msg",
//目标表(目标库在其他地方配置)
"target": "member",
//字段对应关系
//key :来源表的字段名
//value:目标表的字段名
"mapping": {
"id": "id",
"mch_code": "mCode",
"send_type": "mName",
"order_id": "phone",
"created_time": "create_time",
"creator": "remark"
}
}
。。。
别的配置

  

下面是全部的代码,主要做的就是取出变动的数据,按照对应的字段名重新拼装sql,然后执行就好了,不多解释。

package com.hebaibai.miner.job;

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors; import static com.alibaba.otter.canal.protocol.CanalEntry.*; /**
* 单表同步,表的字段名称可以不同,类型需要一致
* 表中需要有id字段
*/
@SuppressWarnings("ALL")
@Slf4j
public class TableSyncJob extends Job { /**
* 用于校验是否适用于当前的配置
*
* @param table
* @param database
* @return
*/
@Override
public boolean isMatches(String table, String database) {
return prop.getString("database").equals(database) &&
prop.getString("table").equals(table);
} /**
* 返回一个新的Runnable
*
* @param entry
* @return
*/
@Override
public Runnable newTask(final Entry entry) {
return () -> {
RowChange rowChange = super.getRowChange(entry);
if (rowChange == null) {
return;
}
EventType eventType = rowChange.getEventType();
int rowDatasCount = rowChange.getRowDatasCount();
for (int i = 0; i < rowDatasCount; i++) {
RowData rowData = rowChange.getRowDatas(i);
if (eventType == EventType.DELETE) {
delete(rowData.getBeforeColumnsList());
}
if (eventType == EventType.INSERT) {
insert(rowData.getAfterColumnsList());
}
if (eventType == EventType.UPDATE) {
update(rowData.getBeforeColumnsList(), rowData.getAfterColumnsList());
}
}
};
} /**
* 修改后的数据
*
* @param after
*/
private void insert(List<Column> after) {
//找到改动的数据
List<Column> collect = after.stream().filter(column -> column.getUpdated() || column.getIsKey()).collect(Collectors.toList());
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
List<String> columnNames = new ArrayList<>();
List<String> columnValues = new ArrayList<>();
for (int i = 0; i < collect.size(); i++) {
Column column = collect.get(i);
if (!mapping.containsKey(column.getName())) {
continue;
}
String name = mapping.getString(column.getName());
columnNames.add(name);
if (column.getIsNull()) {
columnValues.add("null");
} else {
columnValues.add("'" + column.getValue() + "'");
}
}
StringBuilder sql = new StringBuilder();
sql.append("REPLACE INTO ").append(target).append("( ")
.append(StringUtils.join(columnNames, ", "))
.append(") VALUES ( ")
.append(StringUtils.join(columnValues, ", "))
.append(");");
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
} /**
* 更新数据
*
* @param before 原始数据
* @param after 更新后的数据
*/
private void update(List<Column> before, List<Column> after) {
//找到改动的数据
List<Column> updataCols = after.stream().filter(column -> column.getUpdated()).collect(Collectors.toList());
//找到之前的数据中的keys
List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
//没有key,执行更新替换
if (keyCols.size() == 0) {
return;
}
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
//待更新数据
List<String> updatas = new ArrayList<>();
for (int i = 0; i < updataCols.size(); i++) {
Column updataCol = updataCols.get(i);
if (!mapping.containsKey(updataCol.getName())) {
continue;
}
String name = mapping.getString(updataCol.getName());
if (updataCol.getIsNull()) {
updatas.add("`" + name + "` = null");
} else {
updatas.add("`" + name + "` = '" + updataCol.getValue() + "'");
}
}
//如果没有要修改的数据,返回
if (updatas.size() == 0) {
return;
}
//keys
List<String> keys = new ArrayList<>();
for (Column keyCol : keyCols) {
String name = mapping.getString(keyCol.getName());
keys.add("`" + name + "` = '" + keyCol.getValue() + "'");
}
StringBuilder sql = new StringBuilder();
sql.append("UPDATE ").append(target).append(" SET ");
sql.append(StringUtils.join(updatas, ", "));
sql.append(" WHERE ");
sql.append(StringUtils.join(keys, "AND "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
} /**
* 删除数据
*
* @param before
*/
private void delete(List<Column> before) {
//找到改动的数据
List<Column> keyCols = before.stream().filter(column -> column.getIsKey()).collect(Collectors.toList());
if (keyCols.size() == 0) {
return;
}
//根据表映射关系拼装更新sql
JSONObject mapping = prop.getJSONObject("mapping");
String target = prop.getString("target");
StringBuilder sql = new StringBuilder();
sql.append("DELETE FROM `").append(target).append("` WHERE ");
List<String> where = new ArrayList<>();
for (Column column : keyCols) {
String name = mapping.getString(column.getName());
where.add(name + " = '" + column.getValue() + "' ");
}
sql.append(StringUtils.join(where, "and "));
String sqlStr = sql.toString();
log.debug(sqlStr);
jdbcTemplate.execute(sqlStr);
}
}

项目在这里:https://github.com/hjx601496320/miner

更新:

这个项目我删掉了,git上现在找不到,新的项目在这里:

https://github.com/hjx601496320/plumber

去掉了对canal的依赖, 使用起来更加的方便~~

原文链接:https://www.cnblogs.com/hebaibai/p/10911899.html

用canal监控binlog并实现mysql定制同步数据的功能的更多相关文章

  1. Clickhouse单机部署以及从mysql增量同步数据

    背景: 随着数据量的上升,OLAP一直是被讨论的话题,虽然druid,kylin能够解决OLAP问题,但是druid,kylin也是需要和hadoop全家桶一起用的,异常的笨重,再说我也搞不定,那只能 ...

  2. mysql 同步数据到 ElasticSearch 的方案

    MySQL Binlog 要通过 MySQL binlog 将 MySQL 的数据同步给 ES, 我们只能使用 row 模式的 binlog.如果使用 statement 或者 mixed forma ...

  3. 这次一定要教会你搭建Redis集群和MySQL主从同步(非Docker)

    前言 一直都想自己动手搭建一个Redis集群和MySQL的主从同步,当然不是依靠Docker的一键部署(虽然现在企业开发用的最多的是这种方式),所以本文就算是一个教程类文章吧,但在动手搭建之前,会先聊 ...

  4. 使用canal分析binlog(二) canal源码分析

    在能够跑通example后有几个疑问 1. canal的server端对于已经读取的binlog,client已经ack的position,是否持久化,保存在哪里 2. 即使不启动zookeeper, ...

  5. mysql主从同步(4)-Slave延迟状态监控

    mysql主从同步(4)-Slave延迟状态监控  转自:http://www.cnblogs.com/kevingrace/p/5685511.html 之前部署了mysql主从同步环境(Mysql ...

  6. mysql主从同步(3)-percona-toolkit工具(数据一致性监测、延迟监控)使用梳理

    转自:http://www.cnblogs.com/kevingrace/p/6261091.html 在mysql工作中接触最多的就是mysql replication mysql在复制方面还是会有 ...

  7. Canal订阅binlog变更并结合kafka实现消息缓冲

    阿里Canal项目请先了解:canal 考虑可能binlog大批量变更,如果直接通过Canal订阅binlog变动,会造成CanalClient会瞬间爆掉.为了解决这个问题,我们可以引入kafka做一 ...

  8. LR通过snmp监控linux下的mysql

    LR通过snmp监控linux下的mysql 在linux底下安装配置snmp: 1.使用系统盘安装rpm包(这种方式最好) 2.在www.net-snmp.org处下载net-snmp安装(安装后有 ...

  9. zabbix3.0.4监控mysql主从同步

    zabbix3.0.4监控mysql主从同步 1.监控mysql主从同步原理: 执行一个命令 mysql -u zabbix -pzabbix -e 'show slave status\G' 我们在 ...

随机推荐

  1. NopCommerce 框架系列(一)

    今天,终于抽出时间来写写博文,也希望自己能养成写博文的好习惯,大神勿喷. 我从NopCommerce官网上下载了源码,以便自己学习研究,如有需要下载源码的朋友,请点击链接: http://www.no ...

  2. ASP.NET常用页面指令

    1.<%@Page "属性"="值"%>指令 AutoEventWireup:指定页的事件是否自动绑定,默认值true为启用自动绑定. CodeBe ...

  3. codeforces Hill Number 数位dp

    http://www.codeforces.com/gym/100827/attachments Hill Number Time Limits:  5000 MS   Memory Limits: ...

  4. php练习4——排序,查找

    排序(从小到大) 查找 注:二分法查找的数组默认为已经排序的数组

  5. [每日一题] 11gOCP 1z0-052 :2013-09-2 ADDM(Automatic Database Diagnostic Monitor)...................A28

    转载请注明出处:http://blog.csdn.net/guoyjoe/article/details/10951207 正确答案:BC AWR简称Automatic Workload Reposi ...

  6. php_ThinkPHP的RBAC(基于角色权限控制)详解

    一.什么是RBAC 基于角色的访问控制(Role-Based Access Control)作为传统访问控制(自主访问,强制访问)的有前景的代替受到广泛的关注. 在RBAC中,权限与角色相关联,用户通 ...

  7. Asp.Net下,基于Jquery的Ajax二级联动

    最近做一个项目,要求实现二级联动效果.背景为:通过学院的选择,联动出专业选项.起初想直接用微软的控件实现Ajax效果,但是DropDownList控件会自动触发PostBack,在后台根本就不好控制, ...

  8. jmeter学习记录--05--Beanshell2

    学习beanshell时有不少的例子.遇到不少问题.在此记录下. 测试实例列表 A1:使用Beanshell请求作为测试请求 一个打包的Jar包,直接对其内的方法进行测试. 第一步:将接口jar包要放 ...

  9. Repeater取不到服务端控件

    <td>      <asp:Button ID="Button1" runat="server" Text="查看" O ...

  10. QT 12 QTimage转换Mat

    QImage cvMat2QImage(const cv::Mat& mat) { // 8-bits unsigned, NO. OF CHANNELS = 1 if(mat.type() ...