Flink 生成ParquetFile
前言
这周主要是学习使用Flink, 其中有一部分学习的内容就是生成parquet。 Flink自身提供的文档写了个大概,但是真要自己动手去生成pqrquet文件,发现还是有些小坑,本文就是记录这些坑。
开始
官方文档总是最好的开始的地方, 下面是官方文档上面的内容
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#file-sink
从官方文档上面看,似乎很简单, 使用FileSink, 然后设置下格式使用AvroParquetWriters就可以了。
但是按照这个设置后,连FileSink这个类都找不到。
FilkSink需要这个dependency,
AvroParquetWriters需要的是这个dependency
使用AVRO
官方文档中使用了AvroParquetWriters, 那我们就先定义一个AVRO的schema文件MarketPrice.avsc,然后生成对应的类,
{
"namespace": "com.ken.parquet",
"type": "record",
"name": "MarketPrice",
"fields": [
{"name":"performance_id", "type":"string"},
{"name":"price_as_of_date", "type":"int", "logicalType": "date"},
{"name":"open_price", "type": ["null", "double"], "default": null},
{"name":"high_price", "type": ["null", "double"], "default": null},
{"name":"low_price", "type": ["null", "double"], "default": null},
{"name":"close_price", "type": ["null", "double"], "default": null}
]
}
然后加上Maven插件, 通过这个文件来生成Java类
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>${avro.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/avro/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
添加好后,我们使用maven, compile的时候会生成对应的Java类。
编写代码
我们这里不从外部读取了,直接用env.fromCollection, 然后输出到本地文件系统中
@Component
public class ParquetRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<MarketPrice> marketPriceList = new ArrayList<>();
MarketPrice marketPrice = new MarketPrice();
marketPrice.setPerformanceId("123456789");
marketPrice.setPriceAsOfDate(100);
marketPrice.setOpenPrice(100d);
marketPrice.setHighPrice(120d);
marketPrice.setLowPrice(99d);
marketPrice.setClosePrice(101.1d);
marketPriceList.add(marketPrice);
DataStream<MarketPrice> marketPriceDataStream = env.fromCollection(marketPriceList);
String localPath = "C:\\temp\\flink\\";
File outputParquetFile = new File(localPath);
String localURI = outputParquetFile.toURI().toString();
Path outputPath = new Path(localURI);
final FileSink<MarketPrice> sink = FileSink
.forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
.build();
marketPriceDataStream.sinkTo(sink);
marketPriceDataStream.print();
env.execute();
}
}
代码很简单,就是初始化DataStream, 然后Sink到本地。
运行程序报错
“Caused by: java.lang.RuntimeException: Could not load the AvroTypeInfo class. You may be missing the 'flink-avro' dependency”
添加dependency
继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/parquet/avro/AvroParquetWriter”
查找了一番,添加这个dependency
继续运行, 继续报错
“Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration”
看起来还需要hadoop的东西,这里可以添加
正好我们后面需要生成到S3,我找到了这个
这样可以不用上面hadoop-core了,
继续运行,继续报错
“Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/lib/output/FileOutputFormat”
加上这个dependency
运行,成功,生成了parquet file, 如下图
如果要生成到AWS的S3上面去,只需要把Path换下, 很简单。 当然你需要有AWS的权限,我这里直接通过IDEA启动Environment variables里面加上AWS_ACCESS_KEY_ID,AWS_SECRET_ACCESS_KEY,AWS_SESSION_TOKEN。
String s3Path = "s3a://yourbucket/yourkey/";
Path outputPath = new Path(s3Path);
final FileSink<MarketPrice> sink = FileSink
.forBulkFormat(outputPath, AvroParquetWriters.forSpecificRecord(MarketPrice.class))
.build();
总结
这些dependency的依赖,你要是缺少了,运行起来就会缺东少西,然后花时间去找,还蛮废时间的。官方文档往往又没有那么细,所以算是一些小小的坑,好在都解决了,顺利的用Flink生成了Parquet file, 比较完成的POM文件列在这里
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.12.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-s3-fs-hadoop</artifactId>
<version>${flink.version}</version>
</dependency>