使用Apache SeaTunnel高效集成和管理SftpFile数据源
本文为Apache SeaTunnel已经支持的SftpFile Source Connector使用文档,旨在帮助读者理解如何高效地使用SFTP文件源连接器,以便轻松地使用Apache SeaTunnel集成和管理您的SftpFil数据源。
SftpFile 是指通过 SFTP(Secure File Transfer Protocol)协议进行文件操作的对象或组件。在网络编程和数据集成中,SFTPFile 通常用来表示和操作存储在远程 SFTP 服务器上的文件。SFTP 是一种安全的文件传输协议,基于 SSH(Secure Shell)协议,提供了加密的文件传输和远程文件操作功能。
支持的引擎
Spark
Flink
SeaTunnel Zeta
主要特性
描述
从 SFTP 文件服务器读取数据。
支持的数据源信息
使用 SftpFile 连接器,需要以下依赖项。可以通过 install-plugin.sh
下载,也可以从 Maven 中央仓库获取。
数据源 | 支持的版本 | 依赖项 |
---|---|---|
SftpFile | 通用 | 下载 |
- 提示
如果你使用的是 Spark/Flink,请确保 Spark/Flink 集群已经集成了 Hadoop。Hadoop 2.x 版本已通过测试。
如果使用 SeaTunnel 引擎,安装 SeaTunnel 引擎时会自动集成 Hadoop JAR 包。可以在 ${SEATUNNEL_HOME}/lib
目录下检查这个 JAR 包是否存在。
为了支持更多的文件类型,我们做了一些妥协,所以在内部访问 Sftp 时我们使用了 HDFS 协议,这个连接器需要一些 Hadoop 依赖项,且仅支持 Hadoop 版2.9.X+ 版本。
数据类型映射
文件没有特定的类型列表,我们可以通过在配置中指定模式来指示要将哪个 SeaTunnel 数据类型转换为相应的数据。
SeaTunnel 数据类型 |
---|
STRING |
SHORT |
INT |
BIGINT |
BOOLEAN |
DOUBLE |
DECIMAL |
FLOAT |
DATE |
TIME |
TIMESTAMP |
BYTES |
ARRAY |
MAP |
Source选项
名称 | 类型 | 必填 | 默认值 | 描述 |
---|---|---|---|---|
host | 字符串 | 是 | - | 目标 SFTP 主机地址 |
port | 整数 | 是 | - | 目标 SFTP 端口号 |
user | 字符串 | 是 | - | 目标 SFTP 用户名 |
password | 字符串 | 是 | - | 目标 SFTP 密码 |
path | 字符串 | 是 | - | 源文件路径 |
file_format_type | 字符串 | 是 | - | 请查看下文的 #file_format_type |
file_filter_pattern | 字符串 | 否 | - | 用于文件过滤的过滤器模式。 |
delimiter | 字符串 | 否 | \001 | 字段分隔符,用于告诉连接器如何在读取文本文件时切割字段。默认 \001 ,与 Hive 的默认分隔符相同。 |
parse_partition_from_path | 布尔型 | 否 | true | 控制是否从文件路径中解析分区键和值。 例如,如果从路径中读取文件 oss://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 那么文件中的每条记录将添加这两个字段: name age tyrantlucifer 26 提示:不要在模式选项中定义分区字段。 |
date_format | 字符串 | 否 | yyyy-MM-dd | 日期类型的格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd ,默认 yyyy-MM-dd 。 |
datetime_format | 字符串 | 否 | yyyy-MM-dd HH:mm:ss | 日期时间类型的格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss ,默认 yyyy-MM-dd HH:mm:ss 。 |
time_format | 字符串 | 否 | HH:mm:ss | 时间类型的格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:HH:mm:ss HH:mm:ss.SSS ,默认 HH:mm:ss 。 |
skip_header_row_number | 长整型 | 否 | 0 | 跳过前几行,仅对 txt 和 csv 文件有效。 例如,设置如下: skip_header_row_number = 2 那么 SeaTunnel 将跳过源文件的前两行。 |
sheet_name | 字符串 | 否 | - | 读取工作簿的工作表名称,仅在文件格式为 Excel 时使用。 |
schema | 配置项 | 否 | - | 请查看下文的 #schema |
通用选项 | 否 | - | Source 插件通用参数,请参考 Source通用选项 获取详细信息。 |
file_format_type [字符串]
支持以下文件类型:
text
csv
parquet
orc
json
excel
如果将文件类型指定为 json
,需要配置 Schema 模式选项,向连接器说明如何解析数据为你需要所需的 Row。
例如:上游数据如下:
{"code": 200, "data": "get success", "success": true}
也可以将多个数据保存在一个文件中,并通过换行符进行分隔:
{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}
需要按照以下方式配置 Schema:
schema {
fields {
code = int
data = string
success = boolean
}
}
连接器将生成以下数据:
code | data | success |
---|---|---|
200 | 获取成功 | true |
如果将文件类型指定为 parquet 或 orc ,则无需指定模式选项,连接器可以自动查找上游数据的模式。 |
||
如果将文件类型指定为 text 或 csv ,则可以选择是否指定模式信息或不指定。 |
例如,上游数据如下:
tyrantlucifer#26#male
如果不配置 Schema,Connector 将这样处理上游数据:
如果分配数据模式,则除了 CSV 文件类型外,还应分配选项 delimiter。
内容 |
---|
tyrantlucifer#26#male |
如果配置了数据 Schema,除了CSV文件类型,还需要配置选项分隔符。 |
需要配置 Schema 和分隔符如下:
delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}
连接器将生成以下数据:
姓名 | 年龄 | 性别 |
---|---|---|
tyrantlucifer | 26 | 男 |
Schema [配置项]
fields [配置项]
上游数据的 Schema。
如何创建 Sftp 数据同步任务
以下示例演示了如何创建一个数据同步任务,从 Sftp 读取数据并在本地客户端上打印出来:
# 设置要执行的任务的基本配置
env {
execution.parallelism = 1
job.mode = "BATCH"
}
# 创建连接到 Sftp 的源
source {
SftpFile {
host = "sftp"
port = 22
user = seatunnel
password = pass
path = "tmp/seatunnel/read/json"
file_format_type = "json"
result_table_name = "sftp"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
C_MAP = "map<string, string>"
C_ARRAY = "array<int>"
C_STRING = string
C_BOOLEAN = boolean
C_TINYINT = tinyint
C_SMALLINT = smallint
C_INT = int
C_BIGINT = bigint
C_FLOAT = float
C_DOUBLE = double
C_BYTES = bytes
C_DATE = date
C_DECIMAL = "decimal(38, 18)"
C_TIMESTAMP = timestamp
}
}
}
}
}
# 控制台打印读取的 Sftp 数据
sink {
Console {
parallelism = 1
}
}
本文由 白鲸开源 提供发布支持!