DataX MysqlWriter插件实战指南

概述

MysqlWriterDataX 框架中用于将数据写入 MySQL 数据库的核心插件。它通过 JDBC 连接远程 MySQL 数据库,使用 INSERT INTOREPLACE INTO 语句将数据批量写入目标表,支持高性能的数据导入操作。

核心特性

  • 多种写入模式:支持 insertreplaceupdate 三种写入模式
  • 批量写入优化:采用 PreparedStatement + Batch 技术,大幅提升写入性能
  • 灵活配置:支持前置/后置 SQL 执行、会话参数设置等功能
  • 类型自动转换:自动处理 DataX 内部类型与 MySQL 数据类型的映射

配置详解

基础配置示例

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column": [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19880808,
                                "type": "long"
                            }
                        ],
                        "sliceRecordCount": 1000
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

核心参数说明

必需参数

  • jdbcUrl:目标数据库的 JDBC 连接字符串,DataX 会自动追加优化参数
  • username:数据库用户名
  • password:数据库密码
  • table:目标表名,支持多表写入(需保证表结构一致)
  • column:目标表字段列表,必须明确指定(不支持使用 *

重要可选参数

  • writeMode:写入模式(insert/replace/update
  • batchSize:批量提交记录数,默认 1024
  • session:连接会话参数设置
  • preSql:写入前执行的 SQL 语句
  • postSql:写入后执行的 SQL 语句

写入模式对比

模式 行为 适用场景
insert 插入新数据,主键冲突时报错 确保数据唯一性
replace 主键冲突时替换整行数据 数据覆盖更新
update 主键冲突时更新指定字段 部分字段更新

类型转换映射

MysqlWriter 支持大部分 MySQL 数据类型:

DataX 内部类型 MySQL 数据类型
Long int, tinyint, smallint, mediumint, int, bigint, year
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

注意事项:

  • bit 类型目前是未定义类型转换
  • 建议明确指定字段列表,避免使用 *

性能优化指南

批量提交优化

根据性能测试数据,合理的 batchSize 和通道数配置能显著提升性能:

{
    "job": {
        "setting": {
            "speed": {
                "channel": 16
            }
        },
        "content": [
            {
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "batchSize": 4096,
                        // 其他配置...
                    }
                }
            }
        ]
    }
}

性能测试数据

单表写入性能

通道数 批量提交行数 速度(Rec/s) 流量(MB/s)
16 4096 246,153 11.74
8 2048 133,333 6.36
4 1024 50,000 2.38

分表写入性能(16张分表)

通道数 批量提交行数 速度(Rec/s) 流量(MB/s)
16 2048 289,382 13.8
16 1024 177,777 8.48
16 512 155,084 7.40

性能优化建议

  1. 批量大小:建议设置为 1024-4096 之间
  2. 通道数量:通常不超过 32 个通道
  3. 硬件资源:确保有足够的内存和网络带宽
  4. 数据库配置:使用 InnoDB 引擎,合理配置索引

最佳实践

数据清洗导入

{
    "writer": {
        "name": "mysqlwriter",
        "parameter": {
            "preSql": ["DELETE FROM target_table WHERE date = '${biz_date}'"],
            "postSql": ["UPDATE stats SET count = count + 1"],
            "writeMode": "insert",
            "batchSize": 2048
        }
    }
}

安全数据更新

{
    "writer": {
        "name": "mysqlwriter",
        "parameter": {
            "table": ["temp_table"],
            "preSql": ["DROP TABLE IF EXISTS temp_table", "CREATE TABLE temp_table LIKE real_table"],
            "postSql": ["RENAME TABLE real_table TO old_table, temp_table TO real_table", "DROP TABLE old_table"]
        }
    }
}

会话参数设置

{
    "writer": {
        "parameter": {
            "session": [
                "SET SESSION sql_mode='ANSI'",
                "SET SESSION bulk_insert_buffer_size=268435456"
            ]
        }
    }
}

约束与限制

事务一致性

由于 preSql、导入操作、postSql 不在同一事务中执行,可能出现部分执行成功的情况。

解决方案

  1. 预清理方案:每次导入前清理当天数据
  2. 临时表方案:先导入临时表,完成后重命名为正式表

常见问题

Q: 数据导入失败后如何处理脏数据?

A: 建议使用临时表方案或配置 preSql 清理语句。

Q: 如何提升大批量数据导入性能?

A: 调整 batchSizechannel 参数,参考性能测试数据选择最优配置。

Q: writeMode 三种模式有什么区别?

A: insert 遇到主键冲突会报错,replace 会替换整行,update 会更新冲突行的指定字段。