DataX MysqlWriter插件实战指南

概述
MysqlWriter 是 DataX 框架中用于将数据写入 MySQL 数据库的核心插件。它通过 JDBC 连接远程 MySQL 数据库,使用 INSERT INTO 或 REPLACE INTO 语句将数据批量写入目标表,支持高性能的数据导入操作。
核心特性
- 多种写入模式:支持
insert、replace、update三种写入模式 - 批量写入优化:采用
PreparedStatement + Batch技术,大幅提升写入性能 - 灵活配置:支持前置/后置 SQL 执行、会话参数设置等功能
- 类型自动转换:自动处理 DataX 内部类型与 MySQL 数据类型的映射
配置详解
基础配置示例
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 19880808,
"type": "long"
}
],
"sliceRecordCount": 1000
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"session": [
"set session sql_mode='ANSI'"
],
"preSql": [
"delete from test"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
"table": [
"test"
]
}
]
}
}
}
]
}
}
核心参数说明
必需参数
jdbcUrl:目标数据库的 JDBC 连接字符串,DataX 会自动追加优化参数username:数据库用户名password:数据库密码table:目标表名,支持多表写入(需保证表结构一致)column:目标表字段列表,必须明确指定(不支持使用*)
重要可选参数
writeMode:写入模式(insert/replace/update)batchSize:批量提交记录数,默认 1024session:连接会话参数设置preSql:写入前执行的 SQL 语句postSql:写入后执行的 SQL 语句
写入模式对比
| 模式 | 行为 | 适用场景 |
|---|---|---|
insert |
插入新数据,主键冲突时报错 | 确保数据唯一性 |
replace |
主键冲突时替换整行数据 | 数据覆盖更新 |
update |
主键冲突时更新指定字段 | 部分字段更新 |
类型转换映射
MysqlWriter 支持大部分 MySQL 数据类型:
| DataX 内部类型 | MySQL 数据类型 |
|---|---|
Long |
int, tinyint, smallint, mediumint, int, bigint, year |
Double |
float, double, decimal |
String |
varchar, char, tinytext, text, mediumtext, longtext |
Date |
date, datetime, timestamp, time |
Boolean |
bit, bool |
Bytes |
tinyblob, mediumblob, blob, longblob, varbinary |
注意事项:
bit类型目前是未定义类型转换- 建议明确指定字段列表,避免使用
*
性能优化指南
批量提交优化
根据性能测试数据,合理的 batchSize 和通道数配置能显著提升性能:
{
"job": {
"setting": {
"speed": {
"channel": 16
}
},
"content": [
{
"writer": {
"name": "mysqlwriter",
"parameter": {
"batchSize": 4096,
// 其他配置...
}
}
}
]
}
}
性能测试数据
单表写入性能
| 通道数 | 批量提交行数 | 速度(Rec/s) | 流量(MB/s) |
|---|---|---|---|
| 16 | 4096 | 246,153 | 11.74 |
| 8 | 2048 | 133,333 | 6.36 |
| 4 | 1024 | 50,000 | 2.38 |
分表写入性能(16张分表)
| 通道数 | 批量提交行数 | 速度(Rec/s) | 流量(MB/s) |
|---|---|---|---|
| 16 | 2048 | 289,382 | 13.8 |
| 16 | 1024 | 177,777 | 8.48 |
| 16 | 512 | 155,084 | 7.40 |
性能优化建议
- 批量大小:建议设置为 1024-4096 之间
- 通道数量:通常不超过 32 个通道
- 硬件资源:确保有足够的内存和网络带宽
- 数据库配置:使用 InnoDB 引擎,合理配置索引
最佳实践
数据清洗导入
{
"writer": {
"name": "mysqlwriter",
"parameter": {
"preSql": ["DELETE FROM target_table WHERE date = '${biz_date}'"],
"postSql": ["UPDATE stats SET count = count + 1"],
"writeMode": "insert",
"batchSize": 2048
}
}
}
安全数据更新
{
"writer": {
"name": "mysqlwriter",
"parameter": {
"table": ["temp_table"],
"preSql": ["DROP TABLE IF EXISTS temp_table", "CREATE TABLE temp_table LIKE real_table"],
"postSql": ["RENAME TABLE real_table TO old_table, temp_table TO real_table", "DROP TABLE old_table"]
}
}
}
会话参数设置
{
"writer": {
"parameter": {
"session": [
"SET SESSION sql_mode='ANSI'",
"SET SESSION bulk_insert_buffer_size=268435456"
]
}
}
}
约束与限制
事务一致性
由于 preSql、导入操作、postSql 不在同一事务中执行,可能出现部分执行成功的情况。
解决方案
- 预清理方案:每次导入前清理当天数据
- 临时表方案:先导入临时表,完成后重命名为正式表
常见问题
Q: 数据导入失败后如何处理脏数据?
A: 建议使用临时表方案或配置 preSql 清理语句。
Q: 如何提升大批量数据导入性能?
A: 调整 batchSize 和 channel 参数,参考性能测试数据选择最优配置。
Q: writeMode 三种模式有什么区别?
A: insert 遇到主键冲突会报错,replace 会替换整行,update 会更新冲突行的指定字段。
评论