ElasticSearch6.x 基于SpringBoot 实现ElasticSearch的文档批处理管理

虾米姐 阅读:697 2021-03-31 17:03:04 评论:0

SpringBoot 功能封装涉及ElasticSearch的文档批处理管理方法约定如下:

public void createBlukProcessDocument(String index, String type, List<Map<String, Object>> context):基于BulkProcessor 实现批处理

public BulkResponse createBulkDocument(String index, String type, List<Map<String, Object>> context):实现批处理

在上一篇文中说到:ElasticSearch6.x 基于SpringBoot 实现ElasticSearch连接功能封装,将约定的方法填充到ElasticSearchIndexUtil.java 工具类中。

 
/** 
 * ela index 工具类 
 *  
 * @author zzg 
 * 
 */ 
@Component 
public class ElasticSearchIndexUtil { 
	// 引入 Ela 连接实列化对象 
	@Autowired 
	private TransportClient client; 
 
 
   /** 
	 * 功能描述:批量导入文档 
	 *  
	 * @param index 
	 *            索引名 
	 * @param type 
	 *            索引类型 
	 * @param context 
	 *            文档内容 
	 */ 
	public void createBlukProcessDocument(String index, String type, List<Map<String, Object>> context) { 
		if (context != null && context.size() > 0) { 
			BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
 
				@Override 
				public void beforeBulk(long executionId, BulkRequest request) { 
					// TODO Auto-generated method stub 
 
				} 
 
				@Override 
				public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
					// TODO Auto-generated method stub 
 
				} 
 
				@Override 
				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
					// TODO Auto-generated method stub 
 
				} 
			}).setBulkActions(10) // 每次10000个请求 
					.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) // 拆成5MB一块 
					.setFlushInterval(TimeValue.timeValueSeconds(5)) // 无论请求数量多少,每5秒钟请求一次 
					.setConcurrentRequests(1)// 设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求 
					.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
					// 设置自定义重复请求机制,最开始等待100毫秒,之后成倍增加,重试3次,当一次或者多次重复请求失败后因为计算资源不够抛出EsRejectedExecutionException 
					// 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制 
					.build(); 
 
			// 线程批处理 
			context.stream().forEach(item -> { 
				String id = (String) item.get("id"); 
				IndexRequest indexRequest = new IndexRequest(index, type, id); 
				try { 
					XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); 
					item.forEach((k, v) -> { 
						if (!k.equalsIgnoreCase("id")) { 
							try { 
								builder.field(k, v); 
							} catch (Exception e) { 
								// TODO Auto-generated catch block 
								e.printStackTrace(); 
							} 
						} 
					}); 
					builder.endObject(); 
					indexRequest.source(builder); 
					bulkProcessor.add(indexRequest); 
				} catch (Exception e) { 
					e.printStackTrace(); 
				} 
			}); 
			bulkProcessor.flush(); 
			// 关闭bulkProcessor 
			bulkProcessor.close(); 
		} 
	} 
 
/** 
	 * 功能描述:批量导入文档 
	 *  
	 * @param index 
	 *            索引名 
	 * @param type 
	 *            索引类型 
	 * @param context 
	 *            文档内容 
	 */ 
	public BulkResponse createBulkDocument(String index, String type, List<Map<String, Object>> context) { 
		BulkRequestBuilder requestBuilder = client.prepareBulk(); 
 
		context.stream().forEach(item -> { 
			String id = (String) item.get("id"); 
			IndexRequestBuilder indexResponse = client.prepareIndex(index, type, id); 
 
			try { 
				XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); 
 
				item.forEach((k, v) -> { 
					if (!k.equalsIgnoreCase("id")) { 
						try { 
							builder.field(k, v); 
 
						} catch (Exception e) { 
							// TODO Auto-generated catch block 
							e.printStackTrace(); 
						} 
 
					} 
				}); 
				builder.endObject(); 
				indexResponse.setSource(builder); 
			} catch (Exception e) { 
				e.printStackTrace(); 
			} 
			requestBuilder.add(indexResponse); 
		}); 
		return requestBuilder.get(); 
	} 
}

编写测试工具类Test.java ,测试相关封装的功能代码:

@RunWith(SpringRunner.class) 
@SpringBootTest 
// 由于是Web项目,Junit需要模拟ServletContext,因此我们需要给我们的测试类加上@WebAppConfiguration。 
@WebAppConfiguration 
public class Test { 
	@Autowired 
	private ElasticSearchIndexUtil util; 
 
@org.junit.Test 
	public void createBulkDocument() { 
		List<Map<String, Object>> container = new ArrayList<Map<String, Object>>(); 
 
		Map<String, Object> obj1 = new HashMap<String, Object>(); 
		obj1.put("id", "5"); 
		obj1.put("postDate", "2019-07-18"); 
		obj1.put("message", "java 从入门到精通"); 
		obj1.put("user", "王刚"); 
 
		Map<String, Object> obj2 = new HashMap<String, Object>(); 
		obj2.put("id", "6"); 
		obj2.put("postDate", "2019-07-18"); 
		obj2.put("message", "python 从入门到精通"); 
		obj2.put("user", "光明科技"); 
 
		container.add(obj1); 
		container.add(obj2); 
 
		BulkResponse response = util.createBulkDocument("book", "library", container); 
		if (response.hasFailures()) { 
			System.out.println("error"); 
		} 
	} 
 
	@org.junit.Test 
	public void createBlukProcessDocument() { 
		List<Map<String, Object>> container = new ArrayList<Map<String, Object>>(); 
 
		Map<String, Object> obj1 = new HashMap<String, Object>(); 
		obj1.put("id", "9"); 
		obj1.put("postDate", "2019-07-18"); 
		obj1.put("message", "java 从入门到精通"); 
		obj1.put("user", "王刚"); 
 
		Map<String, Object> obj2 = new HashMap<String, Object>(); 
		obj2.put("id", "10"); 
		obj2.put("postDate", "2019-07-18"); 
		obj2.put("message", "python 从入门到精通"); 
		obj2.put("user", "光明科技"); 
 
		container.add(obj1); 
		container.add(obj2); 
 
		util.createBlukProcessDocument("book", "library", container); 
	} 
 
 
}

 

标签:Spring Boot
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号