flink 流式写入Elasticsearch 会经常出现各种错误:
1:Flink出现Caused by: java.lang.LinkageError: loader constraint violation错误
这是由于Flink的包加载机制引起的。-- 解决方案一原因: 类加载顺序问题,可以在flink-conf.yaml中加入classloader.resolve-order: parent-firstFlink的默认加载是child-first。但是用了parent-first配置有可能出现类冲突问题。解决办法只针对个别包出来,不要一律用parent-first, 配置如下:classloader.parent-first-patterns.additional: javax.script; jdk;-- 解决方案二pom.xml 中将Elasticsearch相关依赖放在最开始位置。
classloader.parent-first-patterns.default
,不建议修改,固定为以下这些值:
java.;scala.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback;org.xml;javax.xml;org.apache.xerces;org.w3c
classloader.parent-first-patterns.additional
:除了上一个参数指定的类之外,用户如果有其他类以child-first模式会发生冲突,而希望以双亲委派模型来加载的话,可以额外指定(分号分隔)。
2:Flink写入elaticSearch 遇到basic 认证问题
错误信息:
Suppressed: org.elasticsearch.client.ResponseException: method [POST], host [http://es1:9200], URI [/xxx/_count?ignore_unavailable=false&expand_wildcards=open&allow_no_indices=true], status line [HTTP/1.1 403 Forbidden]{"error":{"root_cause":[{"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"}],"type":"security_exception","reason":"action [indices:data/read/search] is unauthorized for user [anonymous_admin]"},"status":403}
原因:
看到这个问题,很疑惑,我使用用户名密码认证了啊,明明python,curl都行,为啥java不行,难道是我调用的api不对,有一遍遍的查api。后面实在是没办法,进行HttpProxy代理抓包,发现的确是没有Http的认证,在请求Headers中,没有Basic的Auth信息。最后,没办法,放弃了,就直接在Http中Headers手动根据用户名密码进行Base64转码设置进去,解决这个问题。
public class ESTest2 {public static void main(String[] args) throws IOException {String user = "user";String pwd = "pwd";String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());System.out.println(auth);RestClientBuilder clientBuilder = RestClient.builder(new HttpHost("es1", 9200));clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});RestHighLevelClient client = new RestHighLevelClient(clientBuilder);CountRequest countRequest = new CountRequest();countRequest.indices(index_name);CountResponse countResponse = client.count(countRequest, RequestOptions.DEFAULT);System.out.println(countResponse.getCount());client.close();}}
flink自定义ESsink Demo
import org.apache.commons.codec.binary.Base64;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;import org.apache.http.HttpHost;import org.apache.http.client.CredentialsProvider;import org.apache.http.message.BasicHeader;import org.elasticsearch.action.bulk.BulkRequest;import org.elasticsearch.action.index.IndexRequest;import org.elasticsearch.client.RequestOptions;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.elasticsearch.client.RestHighLevelClient;import org.elasticsearch.common.xcontent.XContentType;import java.util.Map;/*** @description: 自定义es sink* @author: mdz* @date: 2021/7/14**/public class MyESSink extends RichSinkFunction<Map<String, String>> {private BulkRequest request = null;private RestHighLevelClient client = null;private CredentialsProvider credentialsProvider = null;private String esHost = null;private int esPort;private String esUsername = null;private String esPassword = null;private String index = null;private String type = null;public MyESSink(String esHost, int esPort, String esUsername, String esPassword, String index, String type) {this.esHost = esHost;this.esPort = esPort;this.esUsername = esUsername;this.esPassword = esPassword;this.index = index;this.type = type;}@Overridepublic void open(Configuration parameters) throws Exception {String user = esUsername;String pwd = esPassword;String auth = Base64.encodeBase64String((user+":"+pwd).getBytes());RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(esHost, esPort));clientBuilder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});client = new RestHighLevelClient(clientBuilder);super.open(parameters);}@Overridepublic void invoke(Map<String, String> value, Context context) throws Exception {request = new BulkRequest();request.add(new IndexRequest(index,type).id(value.get("id")).source(value,XContentType.JSON));client.bulk(request, RequestOptions.DEFAULT);}@Overridepublic void close() throws Exception {client.close();super.close();}}
flink 读取kafka sink 到Es
import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;import java.util.*;/*** @description: pulsar写入到es* @author: mdz* @date: 2021/5/19**/public class Kafka2Es {public static void main(String[] args) throws Exception {//创建环境,设置参数StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.disableOperatorChaining();env.enableCheckpointing(60 * 1000 * 5, CheckpointingMode.EXACTLY_ONCE);//定义kafka相关参数Properties properties = new Properties();properties.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");properties.setProperty("group.id","gp_sdbd-etl-test07");properties.setProperty("request.timeout.ms", "600000");properties.setProperty("enable.auto.commit","false");//从kafka获取数据FlinkKafkaConsumer<Object> smartnewsSource =new FlinkKafkaConsumer("my_topic1",new JSONKeyValueDeserializationSchema(true),properties);smartnewsSource.setStartFromEarliest(); // 从最开始处消费DataStreamSource<Object> smartnewsDS = env.addSource(smartnewsSource);SingleOutputStreamOperator<Map<String, String>> mapDS = smartnewsDS.map(new MapFunction<Object, Map<String, String>>() {@Overridepublic HashMap<String, String> map(Object value) throws Exception {HashMap<String, String> map1 = new HashMap<>();map1.put("data",value.toString());return map1;}});// //写入esmapDS.addSink(new MyESSink("esIp",9200,"user","pass","index","type"));env.execute();}}
文章转载自大数据启示录,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




