暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

flink 写入Elasticsearch 踩坑小结

大数据启示录 2022-02-15
909

flink 流式写入Elasticsearch 会经常出现各种错误:

1:Flink出现Caused by: java.lang.LinkageError: loader constraint violation错误

这是由于Flink的包加载机制引起的。
-- 解决方案一
原因: 类加载顺序问题,可以在flink-conf.yaml中加入
classloader.resolve-order: parent-first
Flink的默认加载是child-first
但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下:
classloader.parent-first-patterns.additional: javax.script; jdk;


-- 解决方案二
pom.xml 中将Elasticsearch相关依赖放在最开始位置。

  • classloader.parent-first-patterns.default
    ,不建议修改,固定为以下这些值:

java.;
scala.;
org.apache.flink.;
com.esotericsoftware.kryo;
org.apache.hadoop.;
javax.annotation.;
org.slf4j;
org.apache.log4j;
org.apache.logging;
org.apache.commons.logging;
ch.qos.logback;
org.xml;
javax.xml;
org.apache.xerces;
org.w3c
  • classloader.parent-first-patterns.additional
    :除了上一个参数指定的类之外,用户如果有其他类以child-first模式会发生冲突,而希望以双亲委派模型来加载的话,可以额外指定(分号分隔)。

2:Flink写入elaticSearch 遇到basic 认证问题

错误信息:

Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://es1:9200], URI [/xxx/_count?ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true], status line [HTTP/1.1 403 Forbidden]
{"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"}],"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"},"status":403}


原因:

看到这个问题,很疑惑,我使用用户名密码认证了啊,明明python,curl都行,为啥java不行,难道是我调用的api不对,有一遍遍的查api。后面实在是没办法,进行HttpProxy代理抓包,发现的确是没有Http的认证,在请求Headers中,没有Basic的Auth信息。最后,没办法,放弃了,就直接在Http中Headers手动根据用户名密码进行Base64转码设置进去,解决这个问题。


public class ESTest2 {


public static void main(String[] args) throws IOException {


String user = "user";
String pwd = "pwd";
String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());

System.out.println(auth);


RestClientBuilder clientBuilder = RestClient.builder(new HttpHost("es1", 9200));


clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});


RestHighLevelClient client = new RestHighLevelClient(clientBuilder);


CountRequest countRequest = new CountRequest();
countRequest.indices(index_name);
CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);
System.out.println(countResponse.getCount());


client.close();
}
}


flink自定义ESsink Demo

import org.apache.commons.codec.binary.Base64;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.http.HttpHost;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;


import java.util.Map;


/**
* @description: 自定义es sink
* @author: mdz
* @date: 2021/7/14
**/
public class MyESSink extends RichSinkFunction<Map<String, String>> {
private BulkRequest request = null;
private RestHighLevelClient client = null;
private CredentialsProvider credentialsProvider = null;


private String esHost = null;
private int esPort;
private String esUsername = null;
private String esPassword = null;
private String index = null;
private String type = null;


public MyESSink(String esHost, int esPort, String esUsername, String esPassword, String index, String type) {
this.esHost = esHost;
this.esPort = esPort;
this.esUsername = esUsername;
this.esPassword = esPassword;
this.index = index;
this.type = type;
}


@Override
public void open(Configuration parameters) throws Exception {
String user = esUsername;
String pwd = esPassword;
String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());
RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(esHost, esPort));
clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});
client = new RestHighLevelClient(clientBuilder);
super.open(parameters);
}


@Override
public void invoke(Map<String, String> value, Context context) throws Exception {
request = new BulkRequest();
request.add(new IndexRequest(index,type).id(value.get("id")).source(value,XContentType.JSON));
client.bulk(request, RequestOptions.DEFAULT);


}




@Override
public void close() throws Exception {
client.close();
super.close();
    }
}


flink 读取kafka sink 到Es

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import java.util.*;


/**
* @description: pulsar写入到es
* @author: mdz
* @date: 2021/5/19
**/
public class Kafka2Es {


public static void main(String[] args) throws Exception {
//创建环境,设置参数
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.disableOperatorChaining();
env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);


//定义kafka相关参数
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
properties.setProperty("group.id","gp_sdbd-etl-test07");
properties.setProperty("request.timeout.ms", "600000");
properties.setProperty("enable.auto.commit","false");






//从kafka获取数据
        FlinkKafkaConsumer<Object> smartnewsSource =new FlinkKafkaConsumer("my_topic1",
new JSONKeyValueDeserializationSchema(true),
properties
);
smartnewsSource.setStartFromEarliest(); // 从最开始处消费
DataStreamSource<Object> smartnewsDS = env.addSource(smartnewsSource);


SingleOutputStreamOperator<Map<String, String>> mapDS = smartnewsDS.map(new MapFunction<Object, Map<String, String>>() {
@Override
public HashMap<String, String> map(Object value) throws Exception {
HashMap<String, String> map1 = new HashMap<>();
map1.put("data",value.toString());
return map1;
}
});




// //写入es
          mapDS.addSink(new MyESSink("esIp",9200,"user","pass","index","type"));
env.execute();
}
}



文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论