Elasticsearch7 插件开发之索引创建、索引数据初始化

虾米姐 阅读:798 2021-03-31 12:44:25 评论:0

ElasticSearch es 插件开发

1. 插件分类

  • API Extension Plugins API扩展插件
    通过添加新的API或功能向Elasticsearch添加新功能,通常与搜索或映射有关。
    优秀插件代表:
    SQL language Plugin: 让 Elasticsearch 支持 SQL语句查询 (by NLPchina)

  • Alerting Plugins 告警插件
    监控Elasticsearch索引情况,并在超过某个阈值时触发告警。
    优秀插件代表:
    X-Pack

  • Analysis Plugins 分析插件
    扩展索引的分析规则,比如各种分词插件
    优秀插件代表:
    IK Analysis Plugin 中文分词
    Japanese(Kuromoji) Analysis plugin 日文分词
    PinyinAnalysis Plugin 拼音分词

  • Discovery Plugins 发现插件
    通过添加可用于代替Zen Discovery的新发现机制来扩展Elasticsearch 。

  • Ingest Plugins 提取插件
    增强每个节点的功能。
    优秀插件代表:
    Ingest Attachment Processor Plugin 让每个节点都可以处理附件文件

  • Management Plugins 管理插件
    管理Es集群
    优秀插件代表:X-Pack

  • Mapper Plugins 映射插件
    主要用来扩展es数据类型

  • Scripting Plugins 脚本插件
    主要用来扩展es的脚本功能,让es支持使用其他脚本语言。
    优秀插件代表:
    JavaScript Language
    Python Language

  • Security Plugins 安全插件
    扩展es的安全策略,比如控制api的访问权限等
    优秀插件代表:X-Pack

  • Snapshot/Restore Repository Plugins 快照/还原存储库插件
    扩展es的快照和恢复功能
  • Store Plugins 存储插件
    扩展es的存储方式,es默认使用的是Lucene存储数据的
    优秀插件代表:
    Store SMB Windows SMB

2. 插件开发

一 . 建立插件描述文件
es插件描述文件名为 plugin-descriptor.properties 放在 src/main/resources/ 目录下,内容如下

属性 描述
name 插件名字
version 插件版本
description 插件功能描述
classname 插件入口class,完整路径
java.version jdk 版本
elasticsearch.version elasticsearch 版本

内容如下,这里都是引用项目的配置属性

description=${project.description} 
version=${project.version} 
name=${elasticsearch.plugin.name} 
classname=${elasticsearch.plugin.classname} 
java.version=${maven.compiler.target} 
elasticsearch.version=${elasticsearch.version}

二. pom.xml 配置,已经打包配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
	<modelVersion>4.0.0</modelVersion> 
	<groupId>org.elasticsearch</groupId> 
	<artifactId>elasticsearch-plug-index</artifactId> 
	<version>0.0.1-SNAPSHOT</version> 
 
	<properties> 
		<elasticsearch.version>7.10.2</elasticsearch.version> 
		<maven.compiler.target>1.8</maven.compiler.target> 
		<elasticsearch.assembly.descriptor>${project.basedir}/src/main/assemblies/plugin.xml</elasticsearch.assembly.descriptor> 
		<elasticsearch.plugin.name>index-plug</elasticsearch.plugin.name> 
		<elasticsearch.plugin.classname>org.elasticsearch.plugin.IndexPlugin</elasticsearch.plugin.classname> 
		<elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm> 
	</properties> 
 
	<dependencies> 
		<dependency> 
			<groupId>org.elasticsearch.client</groupId> 
			<artifactId>transport</artifactId> 
			<version>7.10.2</version> 
			<exclusions> 
				<exclusion> 
					<groupId>org.elasticsearch</groupId> 
					<artifactId>elasticsearch</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.elasticsearch.plugin</groupId> 
			<artifactId>transport-netty4-client</artifactId> 
			<version>7.10.2</version> 
			<exclusions> 
				<exclusion> 
					<groupId>org.elasticsearch</groupId> 
					<artifactId>elasticsearch</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.elasticsearch</groupId> 
			<artifactId>elasticsearch</artifactId> 
			<version>${elasticsearch.version}</version> 
			<scope>compile</scope> 
		</dependency> 
 
 
		<dependency> 
			<groupId>org.apache.httpcomponents</groupId> 
			<artifactId>httpclient</artifactId> 
			<version>4.5.2</version> 
		</dependency> 
 
		<dependency> 
			<groupId>org.hamcrest</groupId> 
			<artifactId>hamcrest-core</artifactId> 
			<version>1.3</version> 
			<scope>test</scope> 
		</dependency> 
 
		<dependency> 
			<groupId>org.hamcrest</groupId> 
			<artifactId>hamcrest-library</artifactId> 
			<version>1.3</version> 
			<scope>test</scope> 
		</dependency> 
 
		<dependency> 
			<groupId>org.slf4j</groupId> 
			<artifactId>slf4j-api</artifactId> 
			<version>1.5.8</version> 
		</dependency> 
		 
		<dependency> 
			<groupId>commons-io</groupId> 
			<artifactId>commons-io</artifactId> 
			<version>2.6</version> 
		</dependency> 
		 
		<!--mysql 驱动程序 --> 
		<dependency> 
			<groupId>mysql</groupId> 
			<artifactId>mysql-connector-java</artifactId> 
			<version>8.0.13</version> 
		</dependency> 
	</dependencies> 
 
	<build> 
		<resources> 
			<resource> 
				<directory>src/main/resources</directory> 
				<filtering>false</filtering> 
				<excludes> 
					<exclude>*.properties</exclude> 
				</excludes> 
			</resource> 
		</resources> 
		<plugins> 
			<plugin> 
				<groupId>org.apache.maven.plugins</groupId> 
				<artifactId>maven-assembly-plugin</artifactId> 
				<version>2.6</version> 
				<configuration> 
					<appendAssemblyId>false</appendAssemblyId> 
					<outputDirectory>${project.build.directory}/releases/</outputDirectory> 
					<descriptors> 
						<descriptor>${basedir}/src/main/assembiles/plugin.xml</descriptor> 
					</descriptors> 
				</configuration> 
				<executions> 
					<execution> 
						<phase>package</phase> 
						<goals> 
							<goal>single</goal> 
						</goals> 
					</execution> 
				</executions> 
			</plugin> 
			<plugin> 
				<groupId>org.apache.maven.plugins</groupId> 
				<artifactId>maven-compiler-plugin</artifactId> 
				<version>3.5.1</version> 
				<configuration> 
					<source>${maven.compiler.target}</source> 
					<target>${maven.compiler.target}</target> 
				</configuration> 
			</plugin> 
		</plugins> 
	</build> 
</project>

plugin.xml 里面配置zip的打包配置

<?xml version="1.0"?> 
<assembly> 
    <id>index-plugin</id> 
    <formats> 
        <format>zip</format> 
    </formats> 
    <includeBaseDirectory>false</includeBaseDirectory> 
    <fileSets> 
        <fileSet> 
            <directory>${project.basedir}/config</directory> 
            <outputDirectory>config</outputDirectory> 
        </fileSet> 
    </fileSets> 
 
    <files> 
        <file> 
            <source>${project.basedir}/src/main/resources/plugin-descriptor.properties</source> 
            <outputDirectory/> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/application.properties</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/archinfo.json</source> 
             <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/engbaseinfo.json</source> 
             <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/fileinfo.json</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/giscertificate.json</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
        <file> 
            <source>${project.basedir}/src/main/resources/unitinfo.json</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
        <file> 
            <source>${project.basedir}/src/main/resources/unitinfo.sql</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/giscertificate.sql</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/engbaseinfo.sql</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
         <file> 
            <source>${project.basedir}/src/main/resources/archinfo.sql</source> 
            <outputDirectory>/config</outputDirectory> 
            <filtered>true</filtered> 
        </file> 
    </files> 
    <dependencySets> 
        <dependencySet> 
            <outputDirectory/> 
            <useProjectArtifact>true</useProjectArtifact> 
            <useTransitiveFiltering>true</useTransitiveFiltering> 
            <excludes> 
                <exclude>org.elasticsearch:elasticsearch</exclude> 
            </excludes> 
        </dependencySet> 
    </dependencySets> 
</assembly> 
 

三. 插件入口开发
插件入口需要继承Plugin类,然后实现相应的插件类型接口,去做相应的处理。我们这里实现ActionPlugin接口 实现一个api的插件类型,需要重写getRestHandlers方法,并将自己处理业务逻辑的handler类注册进去。

package org.elasticsearch.plugin; 
 
import java.util.ArrayList; 
import java.util.List; 
import java.util.function.Supplier; 
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; 
import org.elasticsearch.cluster.node.DiscoveryNodes; 
import org.elasticsearch.common.settings.ClusterSettings; 
import org.elasticsearch.common.settings.IndexScopedSettings; 
import org.elasticsearch.common.settings.Settings; 
import org.elasticsearch.common.settings.SettingsFilter; 
import org.elasticsearch.plugin.handler.ArchInfoDataPluginHandler; 
import org.elasticsearch.plugin.handler.EngBaseInfoDataPluginHandler; 
import org.elasticsearch.plugin.handler.FileInfoDataPluginHandler; 
import org.elasticsearch.plugin.handler.GisCertificateDataPluginHandler; 
import org.elasticsearch.plugin.handler.IndexPluginHandler; 
import org.elasticsearch.plugin.handler.UnitInfoDataPluginHandler; 
import org.elasticsearch.plugins.ActionPlugin; 
import org.elasticsearch.plugins.Plugin; 
import org.elasticsearch.rest.RestController; 
import org.elasticsearch.rest.RestHandler; 
 
public class IndexPlugin extends Plugin implements ActionPlugin { 
	private static String actionProfix = "index"; 
	 
	public IndexPlugin() { 
		super(); 
		System.out.println(actionProfix + "插件实例化......"); 
	} 
 
	@Override 
	public List<RestHandler> getRestHandlers(Settings settings, RestController restController, 
			ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, 
			IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) { 
		// TODO Auto-generated method stub 
		List<RestHandler> list = new ArrayList<RestHandler>(); 
		// 导入指定索引的数据,数据源来自:MySQL8 
		list.add(new ArchInfoDataPluginHandler()); 
		list.add(new UnitInfoDataPluginHandler()); 
		list.add(new GisCertificateDataPluginHandler()); 
		list.add(new EngBaseInfoDataPluginHandler()); 
		list.add(new FileInfoDataPluginHandler()); 
		// 索引初始化 
		list.add(new IndexPluginHandler()); 
		return list; 
	} 
	 
	 
 
 
} 

四. 插件处理Handler,处理具体业务逻辑

IndexPluginHandler: 处理索引(index)创建。

package org.elasticsearch.plugin.handler; 
 
import static java.util.Arrays.asList; 
import static java.util.Collections.unmodifiableList; 
import static org.elasticsearch.rest.RestRequest.Method.GET; 
 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.Arrays; 
import java.util.List; 
 
import org.apache.commons.io.Charsets; 
import org.apache.commons.io.FileUtils; 
import org.apache.commons.io.IOUtils; 
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; 
import org.elasticsearch.client.node.NodeClient; 
import org.elasticsearch.common.xcontent.XContentBuilder; 
import org.elasticsearch.common.xcontent.XContentType; 
import org.elasticsearch.rest.BaseRestHandler; 
import org.elasticsearch.rest.BytesRestResponse; 
import org.elasticsearch.rest.RestRequest; 
import org.elasticsearch.rest.RestStatus; 
import org.elasticsearch.util.ApplicationPropertiesHolder; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
 
import io.netty.util.internal.StringUtil; 
 
public class IndexPluginHandler extends BaseRestHandler { 
	// 日志记录 
	private Logger logger = LoggerFactory.getLogger(IndexPluginHandler.class); 
	@Override 
	public String getName() { 
		// TODO Auto-generated method stub 
		return ""; 
	} 
 
	@Override 
	public List<Route> routes() { 
		// TODO Auto-generated method stub 
		return unmodifiableList(asList(new Route(GET, "/_index_plugin"))); 
	} 
	 
	public String readLocalFile(String name){ 
//		InputStream jsonInputStream = IndexPluginHandler.class 
//                .getClassLoader().getResourceAsStream(name.concat(".json")); 
		String dir =new File(IndexPluginHandler.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParent(); 
		System.out.println("---------dir2--------:" + dir); 
		try { 
			InputStream jsonInputStream = new FileInputStream(new File(dir+File.separator+"config"+File.separator+name.concat(".json")));; 
		 
			if(jsonInputStream  != null){ 
				return IOUtils.toString(jsonInputStream, Charsets.toCharset("utf-8")); 
			} 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error("error: {}", e.getMessage(), e); 
			e.printStackTrace(); 
		} 
		 
		return null; 
	} 
 
	@Override 
	protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { 
		// TODO Auto-generated method stub 
		// 读取配置文件 
		ApplicationPropertiesHolder properties = new ApplicationPropertiesHolder(); 
		String indexs = properties.getProperty("indexs"); 
		 
		if(!StringUtil.isNullOrEmpty(indexs)){ 
			Arrays.asList(indexs.split(",")).stream().forEach(item ->{ 
				String mapping = this.readLocalFile(item); 
				System.out.println(mapping); 
				if(!StringUtil.isNullOrEmpty(mapping)){ 
					// 创建索引请求, 并指定索引名称:item 
					CreateIndexRequest index = new CreateIndexRequest(item); 
					 
					// 设置索引mapping 
					index.source(mapping, XContentType.JSON); 
					 
					// 设置索引结果返回 
					client.admin().indices().create(index).actionGet(); 
				} 
				 
			}); 
		} 
		 
		// 返回内容,这里返回消耗时间 请求参数 插件名称 
		return channel -> { 
			XContentBuilder builder = channel.newBuilder(); 
			builder.startObject(); 
			builder.field("message", "Index initialization succeeded"); 
			builder.endObject(); 
			channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); 
		}; 
	} 
 
} 

ArchInfoDataPluginHandler:处理指定索引(index),通过MySQL8 进行数据导入。

package org.elasticsearch.plugin.handler; 
 
import static java.util.Arrays.asList; 
import static java.util.Collections.unmodifiableList; 
import static org.elasticsearch.rest.RestRequest.Method.GET; 
 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.IOException; 
import java.io.InputStream; 
import java.sql.Connection; 
import java.sql.PreparedStatement; 
import java.sql.ResultSet; 
import java.sql.ResultSetMetaData; 
import java.text.DateFormat; 
import java.text.SimpleDateFormat; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.concurrent.TimeUnit; 
 
import org.apache.commons.io.Charsets; 
import org.apache.commons.io.IOUtils; 
import org.apache.commons.logging.Log; 
import org.apache.commons.logging.LogFactory; 
import org.elasticsearch.action.bulk.BackoffPolicy; 
import org.elasticsearch.action.bulk.BulkProcessor; 
import org.elasticsearch.action.bulk.BulkRequest; 
import org.elasticsearch.action.bulk.BulkResponse; 
import org.elasticsearch.action.index.IndexRequest; 
import org.elasticsearch.client.node.NodeClient; 
import org.elasticsearch.common.unit.ByteSizeUnit; 
import org.elasticsearch.common.unit.ByteSizeValue; 
import org.elasticsearch.common.unit.TimeValue; 
import org.elasticsearch.common.xcontent.XContentBuilder; 
import org.elasticsearch.rest.BaseRestHandler; 
import org.elasticsearch.rest.BytesRestResponse; 
import org.elasticsearch.rest.RestRequest; 
import org.elasticsearch.rest.RestStatus; 
import org.elasticsearch.util.ApplicationPropertiesHolder; 
import org.elasticsearch.util.DBUtil; 
 
import com.mysql.cj.util.StringUtils; 
 
public class ArchInfoDataPluginHandler extends BaseRestHandler { 
	 /** The logger. */ 
    private static Log logger = LogFactory.getLog(ArchInfoDataPluginHandler.class); 
     
	@Override 
	public String getName() { 
		// TODO Auto-generated method stub 
		return "archinfo"; 
	} 
 
	@Override 
	public List<Route> routes() { 
		// TODO Auto-generated method stub 
		return unmodifiableList(asList(new Route(GET, "/archinfo_data_plugin"))); 
	} 
	 
	public String readLocalSQL(String name){ 
//		InputStream jsonInputStream = IndexPluginHandler.class 
//                .getClassLoader().getResourceAsStream(name.concat(".json")); 
		String dir =new File(ArchInfoDataPluginHandler.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParent(); 
		System.out.println("---------dir2--------:" + dir); 
		try { 
			InputStream jsonInputStream = new FileInputStream(new File(dir + File.separator + "config" + File.separator + name.concat(".sql")));; 
		 
			if(jsonInputStream  != null){ 
				return IOUtils.toString(jsonInputStream, Charsets.toCharset("utf-8")); 
			} 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			e.printStackTrace(); 
		} 
		 
		return null; 
	} 
 
	@Override 
	protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { 
		// TODO Auto-generated method stub 
		BulkProcessor bulkProcessor = getBulkProcessor(client); 
		// 连接数据库,查询指定数据,索引数据入库 
		Connection conn = null; 
		PreparedStatement ps = null; 
		ResultSet rs = null; 
		 
		try { 
			conn = DBUtil.getConn(ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.driver-class-name"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.url"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.username"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.password")); 
			 
			String sql = this.readLocalSQL(ApplicationPropertiesHolder.getProperty("index.archinfo")); 
			 
			if(StringUtils.isNullOrEmpty(sql)){ 
				// 返回内容,这里返回消耗时间 请求参数 插件名称 
				return channel -> { 
					XContentBuilder builder = channel.newBuilder(); 
					builder.startObject(); 
					builder.field("message", "data initialization failed"); 
					builder.endObject(); 
					channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder)); 
				}; 
			} 
			ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); 
			ps.setFetchSize(Integer.MIN_VALUE); 
			rs = ps.executeQuery(); 
 
			ResultSetMetaData colData = rs.getMetaData(); 
 
			ArrayList<HashMap<String, Object>> dataList = new ArrayList<HashMap<String, Object>>(); 
 
			HashMap<String, Object> map = null; 
			int count = 0; 
			String c = null; 
			Object v = null; 
			while (rs.next()) { 
				count++; 
				map = new HashMap<String, Object>(100); 
				for (int i = 1; i <= colData.getColumnCount(); i++) { 
					//c = colData.getColumnName(i); 
					c = colData.getColumnLabel(i); 
					v = rs.getObject(c); 
					if(v instanceof java.sql.Timestamp){ 
						// 处理java.sql.Timestamp 与es 日期个数转换 
						if(v != null){ 
							 DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); 
							 String str = dateFormat.format(v); 
							 map.put(c, str); 
						} 
					} else { 
						map.put(c, v); 
					} 
					 
				} 
				dataList.add(map); 
				// 每20万条写一次,不足的批次的最后再一并提交 
				if (count % 200000 == 0) { 
					logger.info("Mysql handle data number : " + count); 
					// 写入ES 
					for (HashMap<String, Object> hashMap2 : dataList) { 
						bulkProcessor.add(new IndexRequest("archinfo").source(hashMap2)); 
					} 
					// 每提交一次便将map与list清空 
					map.clear(); 
					dataList.clear(); 
				} 
			} 
 
			// count % 200000 处理未提交的数据 
			for (HashMap<String, Object> hashMap2 : dataList) { 
				bulkProcessor.add( 
						new IndexRequest("archinfo").source(hashMap2)); 
			} 
 
			logger.info("-------------------------- Finally insert number total : " + count); 
            // 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间 
			bulkProcessor.flush(); 
			 
		} catch (Exception e) { 
			e.printStackTrace(); 
			logger.error(e.getMessage()); 
		} finally { 
			try { 
				rs.close(); 
				ps.close(); 
				conn.close(); 
				boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); 
				client.close(); 
				logger.info(terminatedFlag); 
			} catch (Exception e) { 
				e.printStackTrace(); 
				logger.error(e.getMessage()); 
			} 
		} 
		 
		// 返回内容,这里返回消耗时间 请求参数 插件名称 
		return channel -> { 
			XContentBuilder builder = channel.newBuilder(); 
			builder.startObject(); 
			builder.field("message", "data initialization succeeded"); 
			builder.endObject(); 
			channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder)); 
		}; 
	} 
	 
	private static BulkProcessor getBulkProcessor(NodeClient client) { 
		BulkProcessor bulkProcessor = null; 
		try { 
 
			BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
				@Override 
				public void beforeBulk(long executionId, BulkRequest request) { 
					logger.info("Try to insert data number : " + request.numberOfActions()); 
				} 
 
				@Override 
				public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
					logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: " 
							+ executionId); 
				} 
 
				@Override 
				public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
					logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId); 
				} 
 
			}; 
 
			 
			BulkProcessor.Builder builder = BulkProcessor.builder(client, listener); 
			builder.setBulkActions(10000); 
			builder.setBulkSize(new ByteSizeValue(300L, ByteSizeUnit.MB)); 
			builder.setConcurrentRequests(10); 
			builder.setFlushInterval(TimeValue.timeValueSeconds(100L)); 
			builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 
			// 注意点:在这里感觉有点坑,官网样例并没有这一步,而笔者因一时粗心也没注意,在调试时注意看才发现,上面对builder设置的属性没有生效 
			bulkProcessor = builder.build(); 
 
		} catch (Exception e) { 
			e.printStackTrace(); 
			try { 
				bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); 
			} catch (Exception e1) { 
				logger.error(e1.getMessage()); 
			} 
 
		} 
		return bulkProcessor; 
	} 
 
} 

开发好后,执行maven打包,会在target目录生成相应的zip文件。这个就是我们需要的插件包,后面安装插件会用到。

1、将文件解压到elasticsearch7的plugins文件下:

2、启动elasticsearch7 服务

注意:截图红框是我们在IndexPlugin 构造函数添加如下代码:

	public IndexPlugin() { 
		super(); 
		System.out.println(actionProfix + "插件实例化......"); 
	}

产生的效果,也预示这我编写的插件功能已经被elasticsearch7 加载。

使用postman 工具模拟向elasticsearch7 服务发送:索引初始化和索引数据导入功能。

 

以上请求地址来源于自定义Handler 配置:

IndexPluginHandler 指定访问地址:

	@Override 
	public List<Route> routes() { 
		// TODO Auto-generated method stub 
		return unmodifiableList(asList(new Route(GET, "/_index_plugin"))); 
	}

ArchInfoDataPluginHandler 指定访问地址:

	@Override 
	public List<Route> routes() { 
		// TODO Auto-generated method stub 
		return unmodifiableList(asList(new Route(GET, "/archinfo_data_plugin"))); 
	}

 

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号