seatunel学习
将Elasticsearch数据迁移到doirs
一、下载SeaTunnel
1. 下载Seatunnel
下载地址
进入之后选择 [bin\] apache-seatunnel-2.3.10-bin.tar.gz
sudo vim config/plugin.config
解压到相应目录
sudo tar -zxvf apache-seatunnel-2.3.10-bin.tar.gz
2. 下载连接器插件
修改连接器下载配置
sudo vim config/plugin.config
将下面部分修改
--Connectors-v2--
Connector-doris
Connector-elasticsearch
我们只需要 doris
和 elasticsearch
, 不嫌麻烦也可以所有都安装
然后运行安装插件的命令
./bin/install-plugin.sh
3. 安装mysql-connector的jar包
下载方式
wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j_8.0.33-1ubuntu20.04_all.deb
后面的 url
可在 mysql
官方
选择对应的版本和系统, 然后鼠标移动至下载, 浏览器的左下角会出现一个待跳转的url,复制下来即可
解压 deb
包
sudo dpkg-deb -x mysql-connector-j_8.0.33-1ubuntu20.04_all.deb
解压后是一个名为 usr
的文件夹
将里面的 jar
包复制到 seatunnel
的 lib
目录下
sudo cp usr/share/java/mysql-connector-j-8.0.33.jar .
sudo cp usr/share/java/mysql-connector-java-8.0.33.jar .
上面的命令是在 lib
目录下操作的,所以移动到 .
本目录即可
二、配置 es_to_doris.conf
文件
进入config目录下创建 es_to_dori.conf
的文件
sudo vim config/es_to_dori.conf
编写 conf
文件
可以参考 Apache seaTunnel
的官方文档
env {
parallelism = 1
job.mode = "BATCH"
plugin.load.mode = "CLASSLOADER"
job.name = "es_to_doris"
}
source {
Elasticsearch {
hosts = ["ElasticsearchHost:端口号"]
username="es用户名"
password="es密码"
index = "索引名"
plugin_output = "es_output"
scroll_time = "1m"
scroll_size = 1000
source = [
# es中的字段,或者说需要迁移的数据名称
],
query = "{\"range\": {\"updateTime\": {\"gte\": \"2025-01-01T00:00:00\", \"lte\": \"2025-04-20T00:00:00\"}}}"
}
}
transform {
JsonPath {
plugin_input = "es_output"
plugin_output = "output_temp"
# 按照列名的方式取数据
columns = [
{
src_field = "" # 源数据的字段名
path = "$[1]" # 只取第二个数据
dest_field = "" # 取出来后的字段名
dest_type = "STRING" # 取出来后的数据类型
}
]
}
Sql {
plugin_input = "output_temp" # 根据上一步处理好的数据,再统一同步到doirs
plugin_output = "final_output"
query = """
SELECT
*
FROM input_table
"""
}
}
sink {
Doris {
plugin_input = "final_output"
fenodes = "dorisHost:端口号" # 端口号我用的是部署到web上的
username = "用户名"
password = "密码"
database = "doris中的数据库名"
table = "doris中的数据库表名"
sink.label-prefix = "es_to_doris_sync"
sink.enable.2pc = "false"
doris.config = {
format="json"
read_json_by_line="true"
}
}
}
之后运行seaTunnel
./bin/seatunnel.sh --config ./config/es_to_doris.conf -m local
这个时候基本就算完成了
三、设置定时增量更新
需要将之前配置文件里query修改
query = "{\"range\": {\"updateTime\": {\"gte\": \"${start_time}\", \"lte\": \"${end_time}\"}}}"
创建 .sh
文件, 并编辑
sudo vim es_to_doris_byHour.sh
#!/bin/bash
# ========= 配置 =========
# Seatunnel主目录
SEATUNNEL_HOME=
# 日志目录
LOG_DIR=${SEATUNNEL_HOME}/es_to_doris_logs
# 配置文件路径
CONFIG_FILE=${SEATUNNEL_HOME}/config/es_to_doris.conf
# 创建日志目录(如果不存在)
mkdir -p ${LOG_DIR}
# 当前时间
NOW=$(date "+%Y%m%d_%H%M%S")
LOG_FILE=${LOG_DIR}/es_to_doris_sync_${NOW}.log
# ========= 计算时间范围 =========
start_time=$(date -u -d "-1 hour" +"%Y-%m-%dT%H:00:00")
end_time=$(date -u +"%Y-%m-%dT%H:00:00")
# ========= 执行 Seatunnel 同步 =========
echo "[$(date '+%F %T')] 开始同步,本次时间段:$start_time ~ $end_time" | tee -a ${LOG_FILE}
${SEATUNNEL_HOME}/bin/seatunnel.sh \
--config ${CONFIG_FILE} \
-m local \
--variable start_time="$start_time" \
--variable end_time="$end_time" \
>> ${LOG_FILE} 2>&1
echo "[$(date '+%F %T')] 本次同步结束 ✅" | tee -a ${LOG_FILE}
启动 linux
的 crontab
crontab -e
第一次进入需要选择编辑器, 我这里选择的是 vim.basic
0 * * * * 之前创建的sh的绝对路径
查看是否生效
crontab -l
更多crontab配置介绍
四、常见问题
如果要迁移,还是在服务器上操作比较好,使用自己的电脑我总是有这有哪的 bug
,但是一使用服务器, 就顺畅多了
如果出现的有报错日志
例如
http://xxx.xx.x.x:xxxx/api/_load_error_log?file=__shard_14/error_log_insert_stmt_e945c5596d8ba9e3-a98442ef76e69cbc_e945c5596d8ba9e3_a98442ef76e69cbc
这种, 一般是 doris
的数据类型不一致
使用 curl
查看报错信息
curl
http://xxx.xx.x.x:xxxx/api/_load_error_log?file=__shard_14/error_log_insert_stmt_e945c5596d8ba9e3-a98442ef76e69cbc_e945c5596d8ba9e3_a98442ef76e69cbc
然后根据报错信息的内容修改 doris
里数据的类型或长度
许可协议:
CC BY 4.0