java数据同步解决方案-java解决高并发方案
canal.instance.filter.regex = .*\\..*
#连接canal的端口
canal.port= 11111
#监听到的数据变更发送的队列
canal.destinations= example
F、 客户端开发,在maven中引入canal的依赖
com.alibaba.otter
canal.client
1.0.21
代码示例:
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalClientExample {
public static void main(String[] args) {
while (true) {
//连接canal
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
connector.connect();
//订阅 监控的 数据库.表
connector.subscribe("demo_db.user_tab");
//一次取10条
Message msg = connector.getWithoutAck(10);
long batchId = msg.getId();
int size = msg.getEntries().size();
if (batchId < 0 || size == 0) {
System.out.println("没有消息,休眠5秒");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//
CanalEntry.RowChange row = null;
for (CanalEntry.Entry entry : msg.getEntries()) {
try {
row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
List rowDatasList = row.getRowDatasList();
for (CanalEntry.RowData rowdata : rowDatasList) {
List afterColumnsList = rowdata.getAfterColumnsList();
Map dataMap = transforListToMap(afterColumnsList);
if (row.getEventType() == CanalEntry.EventType.INSERT) {
//具体业务操作
System.out.println(dataMap);
} else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
//具体业务操作
System.out.println(dataMap);
} else if (row.getEventType() == CanalEntry.EventType.DELETE) {
List beforeColumnsList = rowdata.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
if ("id".equals(column.getName())) {
//具体业务操作
System.out.println("删除的id:" + column.getValue());
}
}
} else {
System.out.println("其他操作类型不做处理");
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
//确认消息
connector.ack(batchId);
}
}
}
public static Map transforListToMap(List afterColumnsList) {
Map map = new HashMap();
if (afterColumnsList != null && afterColumnsList.size() > 0) {
for (CanalEntry.Column column : afterColumnsList) {
map.put(column.getName(), column.getValue());
}
}
return map;
}
}
}
2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase
我们有两种方式可以实现,
A、 使用spark任务,通过HQl读取数据,然后再通过hbase的Api插入到hbase中。
但是这种做法,效率很低,而且大批量的数据同时插入Hbase,对Hbase的性能影响很大。
在大数据量的情况下,使用BulkLoad可以快速导入,BulkLoad主要是借用了hbase的存储设计思想,因为hbase本质是存储在hdfs上的一个文件夹,然后底层是以一个个的Hfile存在的。HFile的形式存在。Hfile的路径格式一般是这样的:
/hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)////
B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,生成Hfile可以使用hadoop的MapReduce来实现。如果不是hive中的数据,比如外部的数据,那么我们可以将外部的数据生成文件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。
当然我们也可以不事先生成hfile,可以使用spark任务直接从hive中读取数据转换成RDD,然后使用HbaseContext的自动生成Hfile文件,部分关键代码如下:
…
//将DataFrame转换bulkload需要的RDD格式
val rddnew = datahiveDF.rdd.map(row => {
val rowKey = row.getAs[String](rowKeyField)
fields.map(field => {
val fieldValue = row.getAs[String](field)
(Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
})
}).flatMap(array => {
(array)
})
…
//使用HBaseContext的bulkload生成HFile文件
hbaseContext.bulkLoad[Put](rddnew.map(record => {
val put = new Put(record._1)
record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put
}), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
val conn = ConnectionFactory.createConnection(hBaseConf)
val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
val realTable = conn.getTable(hbTableName)
HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
// bulk load start
val loader = new LoadIncrementalHFiles(hBaseConf)
val admin = conn.getAdmin()
loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
sc.stop()
}
…
def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
import scala.collection.JavaConversions._
for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
val family = cells.getKey
for (value <- cells.getValue) {
val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
ret.+=((kfq, CellUtil.cloneValue(value)))
}
}
ret.iterator
}
}
…
C、pg_bulkload的使用
这是一个支持pg库(PostgreSQL)批量导入的插件工具java数据同步解决方案,它的思想也是通过外部文件加载的方式,这个工具笔者没有亲自去用过,详细的介绍可以参考: pg_bulkload项目的地址:
3)、基于sqoop的全量导入
Sqoop 是hadoop生态中的一个工具,专门用于外部数据导入进入到hdfs中,外部数据导出时,支持很多常见的关系型数据库,也是在大数据中常用的一个数据导出导入的交换工具。
Sqoop从外部导入数据的流程图如下:
Sqoop将hdfs中的数据导出的流程如下:
本质都是用了大数据的数据分布式处理来快速的导入和导出数据。
4)、HBase中建表,然后Hive中建一个外部表,这样当Hive中写入数据后,HBase中也会同时更新,但是需要注意
A、hbase中的空cell在hive中会补null
B、hive和hbase中不匹配的字段会补null
我们可以在hbase的shell 交互模式下,创建一张hbse表
create'bokeyuan','zhangyongqing'
使用这个命令,我们可以创建一张叫bokeyuan的表,并且里面有一个列族zhangyongqing,hbase创建表时,可以不用指定字段,但是需要指定表名以及列族
我们可以使用的hbase的put命令插入一些数据
put'bokeyuan','001','zhangyongqing:name','robot'
put'bokeyuan','001','zhangyongqing:age','20'
put'bokeyuan','002','zhangyongqing:name','spring'
put'bokeyuan','002','zhangyongqing:age','18'
可以通过hbase的scan 全表扫描的方式查看我们插入的数据
scan' bokeyuan'
我们继续创建一张hive外部表
createexternaltablebokeyuan (idint,namestring,ageint)
STOREDBY'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITHSERDEPROPERTIES("hbase.columns.mapping"=":key,user:name,user:age")
TBLPROPERTIES("hbase.table.name"=" bokeyuan");
外部表创建好了后,我们可以使用HQL语句来查询hive中的数据了
select*fromclasses;
OK
1robot20
2spring 18
5)、Debezium+bireme:Debezium for PostgreSQL to Kafka Debezium也是一个通过监控数据库的日志变化,通过对行级日志的处理来达到数据同步,而且Debezium 可以通过把数据放入到kafka,这样就可以通过消费kafka的数据来达到数据同步的目的。而且还可以给多个地方进行消费使用。
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。
该项目的GitHub地址为: 这是一个开源的项目。
本来监控数据库,并且在数据变动的时候获得通知其实一直是一件很复杂的事情。关系型数据库的触发器可以做到java数据同步解决方案,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性。
Debezium正好提供了模块为你做这些复杂的工作。一些模块是通用的,并且能够适用多种数据库管理系统,但在功能和性能方面仍有一些限制。另一些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利用数据库系统本身的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。
Debezium是一个捕获数据更改(CDC)平台,并且利用Kafka和Kafka Connect实现了自己的持久性、可靠性和容错性。每一个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一个上游数据库服务器,捕获所有的数据库更改,然后记录到一个或者多个Kafka topic(通常一个数据库表对应一个kafka topic)。
Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证一个topic的单个分区内有序),这样,更多的客户端可以独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(如果N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,可以把对数据库的压力降到1)。
另外,客户端可以随时停止消费,然后重启,从上次停止消费的地方接着消费。每个客户端可以自行决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发生的顺序被交付的。
对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应用,他们可以使用内嵌的Debezium connector引擎来直接在应用内部运行connector。这种应用仍需要消费数据库更改事件,但更希望connector直接传递给它,而不是持久化到Kafka里。
更详细的介绍可以参考:
bireme 的github 地址:
bireme 的介绍:
另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采用Json:Download:
Source:
6)、datax
datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。
github地址:
A、设计架构:
数据交换通过DataX进行中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步
B、框架
核心模块介绍:
DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0
DataX调度流程:
举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。DataX的调度决策思路是:
DataXJob根据分库分表切分成了100个Task。
根据20个并发,DataX计算共需要分配4个TaskGroup。
4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。
优势:
从插件视角看框架
总之,Job拆分为Task,分别在框架提供的容器中执行,插件只需要实现Job和Task两部分逻辑。
物理执行有三种运行模式:
总体来说,当JobContainer和TaskGroupContainer运行在同一个进程内的时候就是单机模式,在不同进程执行就是分布式模式。
如果需要开发插件,可以看zhege这个插件开发指南:
数据源支持情况:
类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库
MySQL
√
√
读、写
Oracle
√
√
读、写
SQLServer
√
√
读、写
PostgreSQL
√
√
读、写
DRDS
√
√
读、写
通用RDBMS(支持所有关系型数据库)
√
√
读、写
阿里云数仓数据存储
ODPS
√
√
读、写
ADS
√
写
OSS
√
√
读、写
OCS
√
√
读、写
NoSQL数据存储
OTS
√
√
读、写
Hbase0.94
√
√
读、写
Hbase1.1
√
√
读、写
Phoenix4.x
√
√
读、写
Phoenix5.x
√
√
读、写
MongoDB
√
√
读、写
Hive
√
√
读、写
无结构化数据存储
TxtFile
√
√
读、写
FTP
√
√
读、写
HDFS
√
√
读、写
Elasticsearch
√
写
时间序列数据库
OpenTSDB
√
读
TSDB
√
写
7)、OGG
OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:
8)、databus
Databus是一个实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。2011年在LinkedIn正式进入生产系统,2013年开源。
Databus通过挖掘数据库日志的方式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。
Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还支持无限回溯能力和丰富的变更订阅功能。
github:
databus架构设计:
检查中继上的新数据变更事件
将变更存储在MySQL数据库中
MySQL数据库供Bootstrap和客户端使用
检查Relay上新的数据变更事件,并执行特定业务逻辑的回调
如果落后Relay太多,向Bootstrap Server发起查询
新Databus客户端会向Bootstrap Server发起bootstrap启动查询,然后切换到向中继发起查询,以完成最新的数据变更事件
单一客户端可以处理整个Databus数据流,或者可以成为消费者集群的一部分,其中每个消费者只处理一部分流数据
从Databus来源读取变更行,并在内存缓存内将其序列化为Databus变更事件
监听来自Databus客户端(包括Bootstrap Producer)的请求,并传输新的Databus数据变更事件
对比项
Databus
canal
结论
支持的数据库
mysql, oracle
mysql(据说内部版本支持oracle)
Databus目前支持的数据源更多
业务开发
业务只需要实现事件处理接口
事件处理外,需要处理ack/rollback,
反序列化异常等
Databus开发接口用户友好度更高
服务模型
relay
relay可以同时服务多个client
一个server instance只能服务一个client
(受限于server端保存拉取位点)
Databus服务模式更灵活
client
client可以拉取多个relay的变更,
访问的relay可以指定拉取某些表某些分片的变更
client只能从一个server拉取变更,
而且只能是拉取全量的变更
可扩展性
client可以线性扩展,处理能力也能线性扩展
(Databus可识别pk,自动做数据分片)
client无法扩展
Databus扩展性更好
可用性
client ha
client支持cluster模式,每个client处理一部分数据,
某个client挂掉,其他client自动接管对应分片数据
主备client模式,主client消费,
如果主client挂掉,备client可自动接管
Databus实时热备方案更成熟
relay/server ha
多个relay可连接到同一个数据库,
client可以配置多个relay,relay故障启动切换
主备relay模式,relay通过zk进行failover
canal主备模式对数据库影响更小
故障对上游
数据库的影响
client故障,bootstrap会继续拉取变更,
client恢复后直接从bootstrap拉取历史变更
client故障会阻塞server拉取变更,
client恢复会导致server瞬时从数据库拉取大量变更
Databus本身的故障对数据库影响几乎为0
系统状态监控