前言

这周主要是学习使用Flink, 其中有一部分学习的内容就是生成parquet。 Flink自身提供的文档写了个大概,但是真要自己动手去生成pqrquet文件,发现还是有些小坑,本文就是记录这些坑。

开始

官方文档总是最好的开始的地方, 下面是官方文档上面的内容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
Flink 生成ParquetFile-小白菜博客
从官方文档上面看,似乎很简单, 使用FileSink, 然后设置下格式使用AvroParquetWriters就可以了。
但是按照这个设置后,连FileSink这个类都找不到。
FilkSink需要这个dependency,

org.apache.flink
flink-connector-files
${flink.version}

AvroParquetWriters需要的是这个dependency

org.apache.flink
flink-parquet
${flink.version}
provided

使用AVRO

官方文档中使用了AvroParquetWriters, 那我们就先定义一个AVRO的schema文件MarketPrice.avsc,然后生成对应的类,

{
  "namespace": "com.ken.parquet",
  "type": "record",
  "name": "MarketPrice",
  "fields": [
    {"name":"performance_id", "type":"string"},
    {"name":"price_as_of_date", "type":"int", "logicalType": "date"},
    {"name":"open_price", "type": ["null", "double"], "default": null},
    {"name":"high_price", "type": ["null", "double"], "default": null},
    {"name":"low_price", "type": ["null", "double"], "default": null},
    {"name":"close_price", "type": ["null", "double"], "default": null}
  ]
}

然后加上Maven插件, 通过这个文件来生成Java类

            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

添加好后,我们使用maven, compile的时候会生成对应的Java类。

编写代码

我们这里不从外部读取了,直接用env.fromCollection, 然后输出到本地文件系统中


@Component
public class ParquetRunner implements ApplicationRunner {
    @Override
    public void run(ApplicationArguments args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        List<MarketPrice> marketPriceList = new ArrayList<>();
        MarketPrice marketPrice = new MarketPrice();
        marketPrice.setPerformanceId("123456789");
        marketPrice.setPriceAsOfDate(100);
        marketPrice.setOpenPrice(100d);
        marketPrice.setHighPrice(120d);
        marketPrice.setLowPrice(99d);
        marketPrice.setClosePrice(101.1d);
        marketPriceList.add(marketPrice);

        DataStream<MarketPrice>  marketPriceDataStream = env.fromCollection(marketPriceList);

        String localPath = "C:\\temp\\flink\\";

        File outputParquetFile = new File(localPath);

        String localURI = outputParquetFile.toURI().toString();
        Path outputPath = new Path(localURI);

        final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

        marketPriceDataStream.sinkTo(sink);

        marketPriceDataStream.print();

        env.execute();

    }
}

代码很简单,就是初始化DataStream, 然后Sink到本地。
运行程序报错
“Caused by: java.lang.RuntimeException: Could not load the AvroTypeInfo class. You may be missing the 'flink-avro' dependency”
添加dependency

org.apache.flink
flink-avro
${flink.version}
provided

继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter”
查找了一番,添加这个dependency

org.apache.parquet
parquet-avro
1.12.3

继续运行, 继续报错
“Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration”
看起来还需要hadoop的东西,这里可以添加

org.apache.hadoop
hadoop-core

正好我们后面需要生成到S3,我找到了这个

org.apache.flink
flink-s3-fs-hadoop
${flink.version}

这样可以不用上面hadoop-core了,
继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat”
加上这个dependency

org.apache.hadoop
hadoop-mapreduce-client-core
3.3.5

运行,成功,生成了parquet file, 如下图
image

如果要生成到AWS的S3上面去,只需要把Path换下, 很简单。 当然你需要有AWS的权限,我这里直接通过IDEA启动Environment variables里面加上AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN。

        String s3Path = "s3a://yourbucket/yourkey/";
        Path outputPath = new Path(s3Path);
		final FileSink<MarketPrice> sink = FileSink
                .forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
                .build();

总结

这些dependency的依赖,你要是缺少了,运行起来就会缺东少西,然后花时间去找,还蛮废时间的。官方文档往往又没有那么细,所以算是一些小小的坑,好在都解决了,顺利的用Flink生成了Parquet file, 比较完成的POM文件列在这里

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-parquet</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>${avro.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.parquet</groupId>
            <artifactId>parquet-avro</artifactId>
            <version>1.12.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.3.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-hadoop</artifactId>
            <version>${flink.version}</version>
        </dependency>