接触DataX是基于公司离线数据同步需求,从而开始接触到DataX的使用。前异构数据之间开源同步工具,主要有
同步工具调研
Sqoop
Sqoop是一款开源的工具,主要用于Hadoop与传统RDBMS之间的数据同步,可以将RDBMS中的数据同步到HDFS中,也可以进行逆向操作。主要是基于MR任务的进行同步,具有支持并发、增量更新、支持海量数据同步等优点。
Sqoop Wiki
Sqoop官网OGG
Oracle Golden Gate,缩写为OGG,可见其是Oralce“冠名”的同步工具。主要是RDBMS之间的数据同步,常见的关系型数据,例如Oracle/DB2/Sybase/MySQL/MsSQL等均支持。
OGG介绍Kettle
Kettle是一款开源的ETL工具,具有集群模式。Kettle主要用来做ETL数据处理,具有方便快捷、可视化等优点,适合于一般中小业务的使用。DataX
DataX是由Alibaba开源的一款异构数据同步工具,可以在常见的各种数据源之间进行同步,并仅依赖Java环境,具有轻量、插件式、方便等优点,可以快速完成同步任务。一般公司的数据同步任务,基本可以满足。canal+otter
canal+otter数据同步方案,同样是Alibaba开源的同步工具。该方案适用于大规模、扩机房、跨区域等重量级数据同步任务,并具有监控、近实时同步等优点。SymmetricDS
同样是一款基于Java开发的分布式开源同步软件,基于复制原理,源于Java的特性,SymmetricDS具有跨平台、多线程、可监控等优点。
博主公司具有实时同步数据与离线同步数据的需求,因此需要调研数据同步工具。其中实时同步主要是从MySQL同步数据到Hive,这里我们选择的是MySQL Binlog方式进行实时同步;离线同步,主要是从各种数据源–如MySQL、ODPS、HIVE等–之间的数据同步。
经过调研发现,DataX具有以下优点
- 社区活跃,虽然Alibaba没有开源集群模式,仍有许多用户群体;
- Alibaba出品,有大公司维护;
- 使用方便,只需要配置JSON即可使用;无需安装,直接解压即可使用;
- 完善的文档,很快可以上手,学习成本近乎于零。
DataX介绍
DataX是Alibaba推出的异构数据通过方案,通过插件式组合,可以完成不同数据源之间的数据同步任务。DataX可以通过的数据包括目前所见的绝大数,包括关系型数据库[MySQL、Oracle、SQLServer、PostgreSQL等]、阿里云数据仓数据存储[ODPS、ADS、OSS、OCS等]、NoSQL数据库[OTS、Hbase、MongoDB、Hive等]、无结构化数据[TextFile、FTP、HDFS、ES等]。另外DataX提供了方便的扩展实现,可以方便的个性化定制自己的数据源,方便扩展。
目前开源出来的DataX已经发布了3.0版本,增加了许多新的特性,包括
- 可靠地数据质量监控
- 丰富的数据转换功能
- 精准的速度控制
- 强劲的同步性能
- 健壮的容错机制
- 极简的使用体验
更加详细的介绍,可以参考DataX的官方说明:
User Guide
DataX-Introduction
插件开发宝典
目前使用场景
目前我们使用到的场景如下所示
同步的路径为:
- MySQL->Hive
- Hive->MySQL
- ODPS->Hive
- ODPS->MySQL
- RDS->Hive
选择DataX还有一个重要的特性就是,DataX是Alibaba提供的数据同步方案,天然的支持阿里云的数据源,不需要我们重新从API开始开发,节省开发成本。
使用体会
在使用DataX的过程中,总体而言遇到的问题较少。目前DataX的主要缺点在于开源出来的DataX,缺少分布式支持,是单机版本,无法充分发挥集群的里面。因此,会存在单机节点存在的各种问题,内存、CPU、网络等问题。
使用一段时间,有一些思考
- DataX集群模式的实现方式,如何实现?如果集群之后,如何监控?
- DataX单节点状态时,任务运行性能参数的收集。DataX提供了Hook,可以回调打印一些参数,另外可以加入最后打印的数据,例如这样我们收集到更多的运行参数,方便后期对任务的运行作分析,便于改进优化任务的运行参数。
Map
preLogStatics = communication.getCounter(); preLogStatics.put("任务启动时刻", startTimeStamp); preLogStatics.put("任务结束时刻", endTimeStamp); preLogStatics.put("任务总计耗时", totalCosts); preLogStatics.put("任务平均流量", byteSpeedPerSecond); preLogStatics.put("记录写入速度", recordSpeedPerSecond); preLogStatics.put("读出记录总数", CommunicationTool.getTotalReadRecords(communication)); preLogStatics.put("读写失败总数", CommunicationTool.getTotalErrorRecords(communication)); HookInvoker invoker = new HookInvoker(CoreConstant.DATAX_HOME + "/hook", configuration, preLogStatics); - DataX.py本质上运行的是Java程序,如果直接调用Engine.entry,如何统计任务运行日志?是否存在一种方式,可以按照线程打印单独一个文件的日志?
参考文章
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!