暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Kettle插件开发之Elasticsearch篇

以数据之名 2021-06-08
4494

一、开发背景

知己知彼,百战不殆。既然要开发Elasticsearch批量写入插件,那我们首先了解下ElasticSearch。

  • Elasticsearch是一个实时分布式存储、搜索和数据分析引擎。它让你以前所未有的速度处理大数 据成为可能。它用于全文搜索、结构化搜索、分析以及将这三者混合使用。

  • Elasticsearch也使用Java开发并使用Lucene作为其核心来实现所有索引和搜索的功能,但是它的目的是通过简单的RESTful API来隐藏Lucene的复杂性,从而让全文搜索变得简单。

  • Elasticsearch很快,快到不可思议。强大的设计,方便我们通过有限状态转换器实现了用于全文检索的倒排索引,实现了用于存储数值数据和地理位置数据的 BKD 树,以及用于分析的列存储。

  • Elasticsearch和传统数据库RDBMS比较

序号

Elasticsearch

RDBMS

1

index(索引)

database(数据库)

2

mapping(映射)

schema(表结构)

3

type(类型)6.X后废弃

table(表)

4

doucment(文档)

row(行)

5

field(字段)

column(列)

6

反向索引

索引

7

DSL

SQL

8

Get Http://……

select * from tablename

9

Put Http://……

update tablename set 1=1

10

Detele Http://……

delete from tablename

二、索引创建

基于Kettle环境平台,构建app-pentaho-es6或app-pentaho-es7插件,实现原理是动态数据流字段,自定动态索引,来实现ElasticSearch Bulk Insert批量写入。所以,我们首先要了解如何基于索引模板(kettle-es)模式,按日期分片创建动态索引(kettle-es_*)实现写入,按别名kettle-es-query来实现索引检索。具体示例如下:

复制
    PUT _template/kettle-es
    {
    "index_patterns": [
    "kettle-es_*"
    ],
    "settings": {
    "index": {
    "max_result_window": "100000",
    "number_of_shards": "3",
    "number_of_replicas": "1"
    }
    },
    "aliases": {
    "kettle-es-query": {}
    },
    "mappings": {
    "properties": {
    "agent_ip": {
    "type": "ip"
    },
    "record_id": {
    "type": "integer"
    },
    "start_time": {
    "format": "yyyy-MM-dd HH:mm:ss",
    "type": "date"
    },
    "fire_time": {
    "format": "yyyy-MM-dd HH:mm:ss",
    "type": "date"
    },
    "end_time": {
    "format": "yyyy-MM-dd HH:mm:ss",
    "type": "date"
    },
    "pid": {
    "type": "keyword"
    },
    "message": {
    "index": false,
    "type": "text"
    }
    }
    }
    }
    复制

    复制

    三、ElasticSearch Bulk Insert源代码介绍

    3.1、源代码目录结构

    无论是app-pentaho-es6或app-pentaho-es7,从代码目录结构来看,都是ElasticSearchBulk步骤类、ElasticSearchBulkData数据类、ElasticSearchBulkMeta元数据类和ElasticSearchBulkDialog对话框类、日志消息提醒配置message。具体可查看源代码

    3.1.1、目录结构对比

    3.1.2、maven配置对比

    复制
      <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <maven.compiler.source>1.7</maven.compiler.source>
      <maven.compiler.target>1.7</maven.compiler.target>
      <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
      <build.revision>${project.version}</build.revision>
      <timestamp>${maven.build.timestamp}</timestamp>
      <build.description>${project.description}</build.description>
      <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
      <elasticsearch.version>6.4.2</elasticsearch.version>
      </properties>
      <dependencies>
      <dependency>
      <groupId>org.elasticsearch</groupId>
      <artifactId>elasticsearch</artifactId>
      <version>${elasticsearch.version}</version>
      <scope>compile</scope>
      </dependency>
      <dependency>
      <groupId>org.elasticsearch.client</groupId>
      <artifactId>transport</artifactId>
      <version>${elasticsearch.version}</version>
      <scope>compile</scope>
      </dependency>
      <dependencies>
      复制
      复制
        <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <pdi.version>6.1.0.1-SNAPSHOT</pdi.version>
        <build.revision>${project.version}</build.revision>
        <timestamp>${maven.build.timestamp}</timestamp>
        <build.description>${project.description}</build.description>
        <maven.build.timestamp.format>yyyy/MM/dd hh:mm</maven.build.timestamp.format>
        <elasticsearch.version>7.2.0</elasticsearch.version>
        </properties>
        <dependencies>
        <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${elasticsearch.version}</version>
        </dependency>
        <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
        <scope>compile</scope>
        </dependency>
        <dependencies>
        复制

        复制

        3.2、app-pentaho-es6插件

        3.2.1、TransportClient API说明

        app-pentaho-es6插件,基于TransportClient使用传输模块远程连接到集群。后面,elastic计划在Elasticsearch 7.0中弃用TransportClient,并在8.0中完全删除它。

          获取Elasticsearch客户端最常用方法是创建连接到群集的TransportClient。它不加入集群,而只是获取一个或多个初始传输地址,并在每个操作上以循环方式与它们通信。

        3.2.2、客户端初始化

         (1)确定PreBuiltTransportClient连接es集群名称、地址、端口和协议等信息,设置TransportAddress配置

        (2)测试制定Index是否正常连接成功,得到Client

        复制
          private void initClient() throws UnknownHostException {
          Settings.Builder settingsBuilder = Settings.builder();
          settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
          Map<String, String> tMetaMap = meta.getSettingsMap();
          Iterator<Entry<String, String>> iter = tMetaMap.entrySet().iterator();
          while (iter.hasNext()) {
          Entry<String, String> entry = (Entry<String, String>) iter.next();
          settingsBuilder.put(entry.getKey(),
          environmentSubstitute(entry.getValue()));
          }
          PreBuiltTransportClient tClient = new PreBuiltTransportClient(
          settingsBuilder.build());
          for (Server server : meta.getServers()) {
          tClient.addTransportAddress(new TransportAddress(InetAddress
          .getByName(environmentSubstitute(server.getAddress())),
          server.getPort()));
          }
          client = tClient;
          /**
          * With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,
          * which was removed from the elasticsearch 5.0 API, see:
          * https://www.elastic.co/guide/en/elasticsearch/reference/5.0/
          * breaking_50_java_api_changes .html#_nodebuilder_removed
          */
          }
          复制

          复制

          3.2.3、数据流处理

          复制
            public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
            throws KettleException {
            Object[] rowData = getRow();
            if (rowData == null) {
            if (currentRequest != null && currentRequest.numberOfActions() > 0) {
            // didn't fill a whole batch
            processBatch(false);
            }
            setOutputDone();
            return false;
            }
            if (first) {
            first = false;
            setupData();
            currentRequest = client.prepareBulk();
            requestsBuffer = new ArrayList<IndexRequestBuilder>(this.batchSize);
            initFieldIndexes();
            }
            try {
            data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
            return indexRow(data.inputRowMeta, rowData) || !stopOnError;
            } catch (KettleStepException e) {
            throw e;
            } catch (Exception e) {
            rejectAllRows(e.getLocalizedMessage());
            String msg = BaseMessages.getString(PKG,
            "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
            logError(msg);
            throw new KettleStepException(msg, e);
            }
            }
            复制

            复制

            3.2.4、数据批次处理

            复制
              private boolean processBatch(boolean makeNew) throws KettleStepException {
              ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
              boolean responseOk = false;
              BulkResponse response = null;
              try {
              if (timeout != null && timeoutUnit != null) {
              response = actionFuture.actionGet(timeout, timeoutUnit);
              } else {
              response = actionFuture.actionGet();
              }
              } catch (ElasticsearchException e) {
              String msg = BaseMessages.getString(PKG,
              "ElasticSearchBulk.Error.BatchExecuteFail",
              e.getLocalizedMessage());
              if (e instanceof ElasticsearchTimeoutException) {
              msg = BaseMessages.getString(PKG,
              "ElasticSearchBulk.Error.Timeout");
              }
              logError(msg);
              rejectAllRows(msg);
              }
              if (response != null) {
              responseOk = handleResponse(response);
              requestsBuffer.clear();
              } else { // have to assume all failed
              numberOfErrors += currentRequest.numberOfActions();
              setErrors(numberOfErrors);
              }
              // duration += response.getTookInMillis(); //just in trunk..
              if (makeNew) {
              currentRequest = client.prepareBulk();
              data.nextBufferRowIdx = 0;
              data.inputRowBuffer = new Object[batchSize][];
              } else {
              currentRequest = null;
              data.inputRowBuffer = null;
              }
              return responseOk;
              }
              复制

              复制

              3.3、app-pentaho-es7插件

              3.3.1、RestHighLevelClient API说明

              app-pentaho-es7插件,基于Elasticsearch提供的Java高级REST客户端RestHighLevelClient,它执行HTTP请求而不是序列化的Java请求。Java客户端主要用途有:

              (1)在现有集群上执行标准索引,获取,删除和搜索操作

              (2)在正在运行的集群上执行管理任务

              3.3.2、客户端初始化

              (1)使用CredentialsProvider初始化Elasticsearch身份认证

              (2)确定RestHighLevelClient连接es集群名称、地址、端口和协议等信息,设置setHttpClientConfigCallback回调配置

              (3)测试制定Index是否正常连接成功,得到Client

              复制
                private void initClient() throws UnknownHostException {
                Settings.Builder settingsBuilder = Settings.builder();
                settingsBuilder.put(Settings.Builder.EMPTY_SETTINGS);
                meta.getSettingsMap()
                .entrySet()
                .stream()
                .forEach(
                (s) -> settingsBuilder.put(s.getKey(),
                environmentSubstitute(s.getValue())));
                RestHighLevelClient rclient = null;
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY,
                new UsernamePasswordCredentials(settingsBuilder.get("es.user"),
                settingsBuilder.get("es.password")));
                for (Server server : meta.getServers()) {
                rclient = new RestHighLevelClient(RestClient.builder(
                new HttpHost(server.getAddress(), Integer.valueOf(server
                .getPort()), "http")).setHttpClientConfigCallback(
                new RestClientBuilder.HttpClientConfigCallback() {
                public HttpAsyncClientBuilder customizeHttpClient(
                HttpAsyncClientBuilder httpClientBuilder) {
                return httpClientBuilder
                .setDefaultCredentialsProvider(credentialsProvider);
                }
                }));
                }
                client = rclient;
                /**
                * With the upgrade to elasticsearch 6.3.0, removed the NodeBuilder,
                *
                * which was removed from the elasticsearch 5.0 API, see:
                *
                * https://www.elastic.co/guide/en/elasticsearch/reference/5.0/
                * breaking_50_java_api_changes
                *
                * .html#_nodebuilder_removed
                */
                }
                复制

                复制

                3.3.3、数据流处理

                复制
                  public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
                  throws KettleException {
                  Object[] rowData = getRow();
                  if (rowData == null) {
                  if (currentRequest != null && currentRequest.numberOfActions() > 0) {
                  processBatch(false);
                  }
                  setOutputDone();
                  return false;
                  }
                  if (first) {
                  first = false;
                  setupData();
                  requestsBuffer = new ArrayList<IndexRequest>(this.batchSize);
                  initFieldIndexes();
                  }
                  try {
                  data.inputRowBuffer[data.nextBufferRowIdx++] = rowData;
                  return indexRow(data.inputRowMeta, rowData) || !stopOnError;
                  } catch (KettleStepException e) {
                  throw e;
                  } catch (Exception e) {
                  rejectAllRows(e.getLocalizedMessage());
                  String msg = BaseMessages.getString(PKG,
                  "ElasticSearchBulk.Log.Exception", e.getLocalizedMessage());
                  logError(msg);
                  throw new KettleStepException(msg, e);
                  }
                  }
                  复制

                  复制

                  3.3.4、数据批次处理

                  复制
                    private boolean processBatch(boolean makeNew) throws KettleStepException {
                    BulkResponse response = null;
                    // ActionFuture<BulkResponse> actionFuture = currentRequest.execute();
                    try {
                    response = client.bulk(currentRequest, RequestOptions.DEFAULT);
                    } catch (IOException e1) {
                    rejectAllRows(e1.getLocalizedMessage());
                    String msg = BaseMessages
                    .getString(PKG, "ElasticSearchBulk.Log.Exception",
                    e1.getLocalizedMessage());
                    logError(msg);
                    throw new KettleStepException(msg, e1);
                    }
                    boolean responseOk = false;
                    if (response != null) {
                    responseOk = handleResponse(response);
                    requestsBuffer.clear();
                    } else { // have to assume all failed
                    numberOfErrors += currentRequest.numberOfActions();
                    setErrors(numberOfErrors);
                    }
                    if (makeNew) {
                    // currentRequest = client.prepareBulk();
                    try {
                    client.bulk(currentRequest, RequestOptions.DEFAULT);
                    } catch (IOException e1) {
                    rejectAllRows(e1.getLocalizedMessage());
                    String msg = BaseMessages.getString(PKG,
                    "ElasticSearchBulk.Log.Exception",
                    e1.getLocalizedMessage());
                    logError(msg);
                    throw new KettleStepException(msg, e1);
                    }
                    data.nextBufferRowIdx = 0;
                    data.inputRowBuffer = new Object[batchSize][];
                    } else {
                    currentRequest = null;
                    data.inputRowBuffer = null;
                    }
                    return responseOk;
                    }
                    复制

                    复制

                    无论服务端是那个版本的Elasticsearch集群,客户端必须具有与服务端群集中的节点相同的主要版本(例如6.x或7.x)

                    四、ElasticSearch Bulk Insert使用说明

                    4.1、General参数

                      ①Index:动态索引字段,索引前缀+动态日期
                      ②Type:默认_doc
                      ③Test Index:在线检查索引是否存在
                      ④Batch Size:批次大小
                      Stop on error:遇到错误是否终止
                      ⑥Batch Timeout:批次写入超时时间,单位秒
                      Id Field:即文档ID,doc_id
                      ⑧Overwrite if exists:存在是否覆盖
                      Output Rows:输出行
                      复制

                      4.2、Servers参数

                        ①Address:Elasticsearch集群地址列表
                        ②Port:匹配端口号
                        复制

                        4.3、Fields输出字段

                          ①Name:数据流字段
                          ②Target Name:Elasticsearch集群对应index,目标mapping字段
                          复制

                          4.4、Settings参数

                            cluster.name:集群名称
                            es.useres鉴权认证用户名,自定义参数名
                            es.passwordes鉴权认证密码,自定义参数名
                            复制

                            五、总结

                            如果你需要源码或者了解更多自定义插件及集成方式,抑或有开发过程或者使用过程中的任何疑问或建议,请关注小编公众号"游走在数据之间",回复2查看源代码,回复3获取入门视频。加入Kettle交流群"话说Kettle"。


                            文章转载自以数据之名,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                            评论