一、安装datax

通过https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz这个地址下载datax.tar.gz,解压到某个目录,如我的目录/opt/conf/datax-20230301,解压完后会在当前目录下生成datax目录,进入datax目录后的目录如下图所示:
基于datax抽取mysql数据到HDFS-小白菜博客
之后在datax安装目录下,运行以下命令,赋予执行权限。

sudo chmod -R 755 ./* 

二、测试datax是否正确安装

/opt/conf/datax-20230301/datax/bin/datax.py /opt/conf/datax-20230301/datax/job/job.json

运行以上命令,看是否能正确启动,启动后运行完结果如下图:
基于datax抽取mysql数据到HDFS-小白菜博客
如果那个正确运行,说明/opt/conf/datax-20230301/datax/bin/datax.py这个文件的编码不是utf-8,需要重新编码。用我这个替换一下即可正常使用。
datax.py

三、编写配置文件

在datax安装目录下的job文件夹,使用以下命令新建配置文件

vim job_air_data_source_mysql_hdfs.json

之后将下面的json文件内容拷贝粘贴到刚才打开的文件,保存即可。

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": ["*"],
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "air_data_source"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://master:3306/air_data"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://master:9820",
            "fileType": "TEXT",
            "path": "/user/hive/warehouse/air_data.db/air_data_source",
            "fileName": "air_data_source_202302",
            "column": [
              {"name": "	id	","type": "STRING"},	
              {"name": "	airlinelogo	","type": "STRING"},	
              {"name": "	airlineshortcompany	","type": "STRING"},	
              {"name": "	arractcross	","type": "STRING"},	
              {"name": "	arracttime	","type": "STRING"},	
              {"name": "	arrairport	","type": "STRING"},	
              {"name": "	arrcode	","type": "STRING"},	
              {"name": "	arrontimerate	","type": "STRING"},	
              {"name": "	arrplancross	","type": "STRING"},	
              {"name": "	arrplantime	","type": "STRING"},	
              {"name": "	arrterminal	","type": "STRING"},	
              {"name": "	checkintable	","type": "STRING"},	
              {"name": "	checkintablewidth	","type": "STRING"},	
              {"name": "	depactcross	","type": "STRING"},	
              {"name": "	depacttime	","type": "STRING"},	
              {"name": "	depairport	","type": "STRING"},	
              {"name": "	depcode	","type": "STRING"},	
              {"name": "	depplancross	","type": "STRING"},	
              {"name": "	depplantime	","type": "STRING"},	
              {"name": "	depterminal	","type": "STRING"},	
              {"name": "	flightno	","type": "STRING"},	
              {"name": "	flightstate	","type": "STRING"},	
              {"name": "	localdate	","type": "STRING"},	
              {"name": "	mainflightno	","type": "STRING"},	
              {"name": "	shareflag	","type": "STRING"},	
              {"name": "	statecolor	","type": "STRING"}	
            ],
              "writeMode": "truncate",
              "fieldDelimiter": "\u0001",
              "compress":"GZIP"
              }
              }
              }
            ]
              }
              }

四、Hive建数据库、数据表

create database air_data;
use air_data;
CREATE TABLE `air_data_source`(
  `id` int COMMENT '主键',
  `airlinelogo` string COMMENT '航空公司logo',
  `airlineshortcompany` string COMMENT '航空公司简称',
  `arractcross` string,
  `arracttime` string COMMENT '实际起飞时间',
  `arrairport` string,
  `arrcode` string,
  `arrontimerate` string COMMENT '到达准点率',
  `arrplancross` string,
  `arrplantime` string COMMENT '计划到达时间',
  `arrterminal` string,
  `checkintable` string,
  `checkintablewidth` string,
  `depactcross` string,
  `depacttime` string COMMENT '实际到达时间',
  `depairport` string COMMENT '到达机场名称',
  `depcode` string COMMENT '到达机场代码',
  `depplancross` string,
  `depplantime` string COMMENT '计划起飞时间',
  `depterminal` string,
  `flightno` string COMMENT '航班号',
  `flightstate` string COMMENT '航班状态',
  `localdate` string,
  `mainflightno` string,
  `shareflag` string,
  `statecolor` string)
COMMENT '航空数据原始表'
ROW FORMAT SERDE
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
STORED AS INPUTFORMAT
  'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
;

运行完以上任务后,接着可以进行数据抽取了。

四、运行任务

在当前目录下执行以下命令:

/opt/conf/datax-20230301/datax/bin/datax.py /opt/conf/datax-20230301/datax/job/job_air_data_source_mysql_hdfs.json 

即可正确启动数据同步任务,运行完结果如下:
基于datax抽取mysql数据到HDFS-小白菜博客
查看HDFS上是否已经有了数据文件,运行一下命令,得到输出。

hadoop fs -ls hdfs://master:9820/user/hive/warehouse/air_data.db/air_data_source

基于datax抽取mysql数据到HDFS-小白菜博客
至此,利用datax将mysql数据同步到hdfs任务已配置完成。