文章

seatunel学习

将Elasticsearch数据迁移到doirs

一、下载SeaTunnel

1. 下载Seatunnel

下载地址

https://seatunnel.apache.org/download/

进入之后选择 [bin\] apache-seatunnel-2.3.10-bin.tar.gz

sudo vim config/plugin.config

解压到相应目录

sudo tar -zxvf apache-seatunnel-2.3.10-bin.tar.gz

2. 下载连接器插件

修改连接器下载配置

sudo vim config/plugin.config

将下面部分修改

--Connectors-v2--
Connector-doris
Connector-elasticsearch

我们只需要 doriselasticsearch, 不嫌麻烦也可以所有都安装

然后运行安装插件的命令

./bin/install-plugin.sh

3. 安装mysql-connector的jar包

下载方式

wget https://downloads.mysql.com/archives/get/p/3/file/mysql-connector-j_8.0.33-1ubuntu20.04_all.deb

后面的 url可在 mysql官方

https://downloads.mysql.com/archives/c-j/

选择对应的版本和系统, 然后鼠标移动至下载, 浏览器的左下角会出现一个待跳转的url,复制下来即可

解压 deb

sudo dpkg-deb -x mysql-connector-j_8.0.33-1ubuntu20.04_all.deb

解压后是一个名为 usr的文件夹

将里面的 jar包复制到 seatunnellib目录下

sudo cp usr/share/java/mysql-connector-j-8.0.33.jar .
sudo cp usr/share/java/mysql-connector-java-8.0.33.jar .

上面的命令是在 lib目录下操作的,所以移动到 .本目录即可

二、配置 es_to_doris.conf文件

进入config目录下创建 es_to_dori.conf的文件

sudo vim config/es_to_dori.conf

编写 conf文件

可以参考 Apache seaTunnel的官方文档

https://seatunnel.apache.org/zh-CN/docs/2.3.10/about

env {
  parallelism = 1
  job.mode = "BATCH"
  plugin.load.mode = "CLASSLOADER"
  job.name = "es_to_doris"
}

source {
  Elasticsearch {
    hosts = ["ElasticsearchHost:端口号"]
    username="es用户名"
    password="es密码"
    index = "索引名"
    plugin_output = "es_output"
    scroll_time = "1m"
    scroll_size = 1000
    source = [
      # es中的字段,或者说需要迁移的数据名称
    ],
    query = "{\"range\": {\"updateTime\": {\"gte\": \"2025-01-01T00:00:00\", \"lte\": \"2025-04-20T00:00:00\"}}}"

  }
}

transform {
  JsonPath {
    plugin_input = "es_output"
    plugin_output = "output_temp"
    # 按照列名的方式取数据
    columns = [
      {
        src_field = "" # 源数据的字段名
        path = "$[1]" # 只取第二个数据
        dest_field = "" # 取出来后的字段名
        dest_type = "STRING" # 取出来后的数据类型
      }
    ]
  }

  Sql {
    plugin_input = "output_temp"    # 根据上一步处理好的数据,再统一同步到doirs
    plugin_output = "final_output"
    query = """
    SELECT
      *
      FROM input_table
    """
  }
}

sink {
  Doris {
    plugin_input = "final_output"
    fenodes = "dorisHost:端口号" # 端口号我用的是部署到web上的
    username = "用户名"
    password = "密码"
    database = "doris中的数据库名"
    table = "doris中的数据库表名"
    sink.label-prefix = "es_to_doris_sync"
    sink.enable.2pc = "false"
    doris.config = {
      format="json"
      read_json_by_line="true"
    }
  }
}

之后运行seaTunnel

./bin/seatunnel.sh --config ./config/es_to_doris.conf -m local

这个时候基本就算完成了

三、设置定时增量更新

需要将之前配置文件里query修改

query = "{\"range\": {\"updateTime\": {\"gte\": \"${start_time}\", \"lte\": \"${end_time}\"}}}"

创建 .sh文件, 并编辑

sudo vim es_to_doris_byHour.sh
#!/bin/bash

# ========= 配置 =========

# Seatunnel主目录
SEATUNNEL_HOME=

# 日志目录
LOG_DIR=${SEATUNNEL_HOME}/es_to_doris_logs

# 配置文件路径
CONFIG_FILE=${SEATUNNEL_HOME}/config/es_to_doris.conf

# 创建日志目录(如果不存在)
mkdir -p ${LOG_DIR}

# 当前时间
NOW=$(date "+%Y%m%d_%H%M%S")
LOG_FILE=${LOG_DIR}/es_to_doris_sync_${NOW}.log

# ========= 计算时间范围 =========

start_time=$(date -u -d "-1 hour" +"%Y-%m-%dT%H:00:00")
end_time=$(date -u +"%Y-%m-%dT%H:00:00")

# ========= 执行 Seatunnel 同步 =========

echo "[$(date '+%F %T')] 开始同步,本次时间段:$start_time ~ $end_time" | tee -a ${LOG_FILE}

${SEATUNNEL_HOME}/bin/seatunnel.sh \
  --config ${CONFIG_FILE} \
  -m local \
  --variable start_time="$start_time" \
  --variable end_time="$end_time" \
  >> ${LOG_FILE} 2>&1

echo "[$(date '+%F %T')] 本次同步结束 ✅" | tee -a ${LOG_FILE}

启动 linuxcrontab

crontab -e

第一次进入需要选择编辑器, 我这里选择的是 vim.basic

0 * * * * 之前创建的sh的绝对路径

查看是否生效

crontab -l

更多crontab配置介绍

https://www.runoob.com/linux/linux-comm-crontab.html

四、常见问题

如果要迁移,还是在服务器上操作比较好,使用自己的电脑我总是有这有哪的 bug,但是一使用服务器, 就顺畅多了

如果出现的有报错日志

例如

http://xxx.xx.x.x:xxxx/api/_load_error_log?file=__shard_14/error_log_insert_stmt_e945c5596d8ba9e3-a98442ef76e69cbc_e945c5596d8ba9e3_a98442ef76e69cbc

这种, 一般是 doris的数据类型不一致

使用 curl查看报错信息

curl 
http://xxx.xx.x.x:xxxx/api/_load_error_log?file=__shard_14/error_log_insert_stmt_e945c5596d8ba9e3-a98442ef76e69cbc_e945c5596d8ba9e3_a98442ef76e69cbc

然后根据报错信息的内容修改 doris里数据的类型或长度

许可协议:  CC BY 4.0