随着Elasticsearch的发展,从5.x版本升级到7.x版本,数据写入方式发生了变化。在Elasticsearch5中,需要指定type字段,而在Elasticsearch7中移除了该字段。以下是针对Elasticsearch7的配置和代码示例:在7.x版本中,使用的是flink-connector-elasticsearch7_2.11。示例代码如下:public class Es7SinkDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 示例数据Row row1 = Row.of(\"张三\", \"001\", getTimestamp(\"2024-07-16 12:00:00\")); DataStream dataStream = env.fromElements(row1); ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>(getEsSinkConfig()); dataStream.addSink(esSinkBuilder.build()); env.execute(\"Flink Write to Elasticsearch7 Example\"); } private static Timestamp getTimestamp(String dateStr) { SimpleDateFormat format = new SimpleDateFormat(\"yyyy-MM-dd HH:mm:ss\"); try { return new Timestamp(format.parse(dateStr).getTime()); } catch (ParseException e) { e.printStackTrace(); } return null; } private static ElasticsearchSinkConfig getEsSinkConfig() { Map config = new HashMap<>(); config.put(\"cluster.name\", \"elasticsearch\"); config.put(\"bulk.flush.max.actions\", \"1\"); return new ElasticsearchSinkConfig<>(config, (Row element, RuntimeContext ctx, RequestIndexer indexer) -> { indexer.add(createIndexRequest(element)); }); } private static IndexRequest createIndexRequest(Row element) { Map json = new HashMap<>(); json.put(\"name\", element.getField(0)); json.put(\"id\", element.getField(1)); json.put(\"timestamp\", element.getField(2)); return Requests.indexRequest().index(\"your-index\").source(json); } }