ElasticSearch6.x 基于SpringBoot 实现ElasticSearch的文档批处理管理
虾米姐
阅读:697
2021-03-31 17:03:04
评论:0
SpringBoot 功能封装涉及ElasticSearch的文档批处理管理方法约定如下:
public void createBlukProcessDocument(String index, String type, List<Map<String, Object>> context):基于BulkProcessor 实现批处理
public BulkResponse createBulkDocument(String index, String type, List<Map<String, Object>> context):实现批处理
在上一篇文中说到:ElasticSearch6.x 基于SpringBoot 实现ElasticSearch连接功能封装,将约定的方法填充到ElasticSearchIndexUtil.java 工具类中。
/**
* ela index 工具类
*
* @author zzg
*
*/
@Component
public class ElasticSearchIndexUtil {
// 引入 Ela 连接实列化对象
@Autowired
private TransportClient client;
/**
* 功能描述:批量导入文档
*
* @param index
* 索引名
* @param type
* 索引类型
* @param context
* 文档内容
*/
public void createBlukProcessDocument(String index, String type, List<Map<String, Object>> context) {
if (context != null && context.size() > 0) {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
// TODO Auto-generated method stub
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// TODO Auto-generated method stub
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// TODO Auto-generated method stub
}
}).setBulkActions(10) // 每次10000个请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 拆成5MB一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) // 无论请求数量多少,每5秒钟请求一次
.setConcurrentRequests(1)// 设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
// 设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException
// 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
// 线程批处理
context.stream().forEach(item -> {
String id = (String) item.get("id");
IndexRequest indexRequest = new IndexRequest(index, type, id);
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
item.forEach((k, v) -> {
if (!k.equalsIgnoreCase("id")) {
try {
builder.field(k, v);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
builder.endObject();
indexRequest.source(builder);
bulkProcessor.add(indexRequest);
} catch (Exception e) {
e.printStackTrace();
}
});
bulkProcessor.flush();
// 关闭bulkProcessor
bulkProcessor.close();
}
}
/**
* 功能描述:批量导入文档
*
* @param index
* 索引名
* @param type
* 索引类型
* @param context
* 文档内容
*/
public BulkResponse createBulkDocument(String index, String type, List<Map<String, Object>> context) {
BulkRequestBuilder requestBuilder = client.prepareBulk();
context.stream().forEach(item -> {
String id = (String) item.get("id");
IndexRequestBuilder indexResponse = client.prepareIndex(index, type, id);
try {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
item.forEach((k, v) -> {
if (!k.equalsIgnoreCase("id")) {
try {
builder.field(k, v);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
builder.endObject();
indexResponse.setSource(builder);
} catch (Exception e) {
e.printStackTrace();
}
requestBuilder.add(indexResponse);
});
return requestBuilder.get();
}
}
编写测试工具类Test.java ,测试相关封装的功能代码:
@RunWith(SpringRunner.class)
@SpringBootTest
// 由于是Web项目,Junit需要模拟ServletContext,因此我们需要给我们的测试类加上@WebAppConfiguration。
@WebAppConfiguration
public class Test {
@Autowired
private ElasticSearchIndexUtil util;
@org.junit.Test
public void createBulkDocument() {
List<Map<String, Object>> container = new ArrayList<Map<String, Object>>();
Map<String, Object> obj1 = new HashMap<String, Object>();
obj1.put("id", "5");
obj1.put("postDate", "2019-07-18");
obj1.put("message", "java 从入门到精通");
obj1.put("user", "王刚");
Map<String, Object> obj2 = new HashMap<String, Object>();
obj2.put("id", "6");
obj2.put("postDate", "2019-07-18");
obj2.put("message", "python 从入门到精通");
obj2.put("user", "光明科技");
container.add(obj1);
container.add(obj2);
BulkResponse response = util.createBulkDocument("book", "library", container);
if (response.hasFailures()) {
System.out.println("error");
}
}
@org.junit.Test
public void createBlukProcessDocument() {
List<Map<String, Object>> container = new ArrayList<Map<String, Object>>();
Map<String, Object> obj1 = new HashMap<String, Object>();
obj1.put("id", "9");
obj1.put("postDate", "2019-07-18");
obj1.put("message", "java 从入门到精通");
obj1.put("user", "王刚");
Map<String, Object> obj2 = new HashMap<String, Object>();
obj2.put("id", "10");
obj2.put("postDate", "2019-07-18");
obj2.put("message", "python 从入门到精通");
obj2.put("user", "光明科技");
container.add(obj1);
container.add(obj2);
util.createBlukProcessDocument("book", "library", container);
}
}
声明
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。