Elasticsearch7 插件开发之索引创建、索引数据初始化
ElasticSearch es 插件开发
1. 插件分类
-
API Extension Plugins API扩展插件
通过添加新的API或功能向Elasticsearch添加新功能,通常与搜索或映射有关。
优秀插件代表:
SQL language Plugin: 让 Elasticsearch 支持 SQL语句查询 (by NLPchina) -
Alerting Plugins 告警插件
监控Elasticsearch索引情况,并在超过某个阈值时触发告警。
优秀插件代表:
X-Pack -
Analysis Plugins 分析插件
扩展索引的分析规则,比如各种分词插件
优秀插件代表:
IK Analysis Plugin 中文分词
Japanese(Kuromoji) Analysis plugin 日文分词
PinyinAnalysis Plugin 拼音分词 -
Discovery Plugins 发现插件
通过添加可用于代替Zen Discovery的新发现机制来扩展Elasticsearch 。 -
Ingest Plugins 提取插件
增强每个节点的功能。
优秀插件代表:
Ingest Attachment Processor Plugin 让每个节点都可以处理附件文件 -
Management Plugins 管理插件
管理Es集群
优秀插件代表:X-Pack -
Mapper Plugins 映射插件
主要用来扩展es数据类型 -
Scripting Plugins 脚本插件
主要用来扩展es的脚本功能,让es支持使用其他脚本语言。
优秀插件代表:
JavaScript Language
Python Language -
Security Plugins 安全插件
扩展es的安全策略,比如控制api的访问权限等
优秀插件代表:X-Pack - Snapshot/Restore Repository Plugins 快照/还原存储库插件
扩展es的快照和恢复功能 - Store Plugins 存储插件
扩展es的存储方式,es默认使用的是Lucene存储数据的
优秀插件代表:
Store SMB Windows SMB
2. 插件开发
一 . 建立插件描述文件
es插件描述文件名为 plugin-descriptor.properties
放在 src/main/resources/
目录下,内容如下
属性 | 描述 |
---|---|
name | 插件名字 |
version | 插件版本 |
description | 插件功能描述 |
classname | 插件入口class,完整路径 |
java.version | jdk 版本 |
elasticsearch.version | elasticsearch 版本 |
内容如下,这里都是引用项目的配置属性
description=${project.description}
version=${project.version}
name=${elasticsearch.plugin.name}
classname=${elasticsearch.plugin.classname}
java.version=${maven.compiler.target}
elasticsearch.version=${elasticsearch.version}
二. pom.xml 配置,已经打包配置
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-plug-index</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<elasticsearch.version>7.10.2</elasticsearch.version>
<maven.compiler.target>1.8</maven.compiler.target>
<elasticsearch.assembly.descriptor>${project.basedir}/src/main/assemblies/plugin.xml</elasticsearch.assembly.descriptor>
<elasticsearch.plugin.name>index-plug</elasticsearch.plugin.name>
<elasticsearch.plugin.classname>org.elasticsearch.plugin.IndexPlugin</elasticsearch.plugin.classname>
<elasticsearch.plugin.jvm>true</elasticsearch.plugin.jvm>
</properties>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.10.2</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>7.10.2</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.2</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<!--mysql 驱动程序 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.13</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
<excludes>
<exclude>*.properties</exclude>
</excludes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.6</version>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<outputDirectory>${project.build.directory}/releases/</outputDirectory>
<descriptors>
<descriptor>${basedir}/src/main/assembiles/plugin.xml</descriptor>
</descriptors>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>${maven.compiler.target}</source>
<target>${maven.compiler.target}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
plugin.xml
里面配置zip的打包配置
<?xml version="1.0"?>
<assembly>
<id>index-plugin</id>
<formats>
<format>zip</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>${project.basedir}/config</directory>
<outputDirectory>config</outputDirectory>
</fileSet>
</fileSets>
<files>
<file>
<source>${project.basedir}/src/main/resources/plugin-descriptor.properties</source>
<outputDirectory/>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/application.properties</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/archinfo.json</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/engbaseinfo.json</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/fileinfo.json</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/giscertificate.json</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/unitinfo.json</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/unitinfo.sql</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/giscertificate.sql</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/engbaseinfo.sql</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
<file>
<source>${project.basedir}/src/main/resources/archinfo.sql</source>
<outputDirectory>/config</outputDirectory>
<filtered>true</filtered>
</file>
</files>
<dependencySets>
<dependencySet>
<outputDirectory/>
<useProjectArtifact>true</useProjectArtifact>
<useTransitiveFiltering>true</useTransitiveFiltering>
<excludes>
<exclude>org.elasticsearch:elasticsearch</exclude>
</excludes>
</dependencySet>
</dependencySets>
</assembly>
三. 插件入口开发
插件入口需要继承Plugin类,然后实现相应的插件类型接口,去做相应的处理。我们这里实现ActionPlugin接口 实现一个api的插件类型,需要重写getRestHandlers方法,并将自己处理业务逻辑的handler类注册进去。
package org.elasticsearch.plugin;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.plugin.handler.ArchInfoDataPluginHandler;
import org.elasticsearch.plugin.handler.EngBaseInfoDataPluginHandler;
import org.elasticsearch.plugin.handler.FileInfoDataPluginHandler;
import org.elasticsearch.plugin.handler.GisCertificateDataPluginHandler;
import org.elasticsearch.plugin.handler.IndexPluginHandler;
import org.elasticsearch.plugin.handler.UnitInfoDataPluginHandler;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestHandler;
public class IndexPlugin extends Plugin implements ActionPlugin {
private static String actionProfix = "index";
public IndexPlugin() {
super();
System.out.println(actionProfix + "插件实例化......");
}
@Override
public List<RestHandler> getRestHandlers(Settings settings, RestController restController,
ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
// TODO Auto-generated method stub
List<RestHandler> list = new ArrayList<RestHandler>();
// 导入指定索引的数据,数据源来自:MySQL8
list.add(new ArchInfoDataPluginHandler());
list.add(new UnitInfoDataPluginHandler());
list.add(new GisCertificateDataPluginHandler());
list.add(new EngBaseInfoDataPluginHandler());
list.add(new FileInfoDataPluginHandler());
// 索引初始化
list.add(new IndexPluginHandler());
return list;
}
}
四. 插件处理Handler,处理具体业务逻辑
IndexPluginHandler: 处理索引(index)创建。
package org.elasticsearch.plugin.handler;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.util.ApplicationPropertiesHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.internal.StringUtil;
public class IndexPluginHandler extends BaseRestHandler {
// 日志记录
private Logger logger = LoggerFactory.getLogger(IndexPluginHandler.class);
@Override
public String getName() {
// TODO Auto-generated method stub
return "";
}
@Override
public List<Route> routes() {
// TODO Auto-generated method stub
return unmodifiableList(asList(new Route(GET, "/_index_plugin")));
}
public String readLocalFile(String name){
// InputStream jsonInputStream = IndexPluginHandler.class
// .getClassLoader().getResourceAsStream(name.concat(".json"));
String dir =new File(IndexPluginHandler.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParent();
System.out.println("---------dir2--------:" + dir);
try {
InputStream jsonInputStream = new FileInputStream(new File(dir+File.separator+"config"+File.separator+name.concat(".json")));;
if(jsonInputStream != null){
return IOUtils.toString(jsonInputStream, Charsets.toCharset("utf-8"));
}
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error("error: {}", e.getMessage(), e);
e.printStackTrace();
}
return null;
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// TODO Auto-generated method stub
// 读取配置文件
ApplicationPropertiesHolder properties = new ApplicationPropertiesHolder();
String indexs = properties.getProperty("indexs");
if(!StringUtil.isNullOrEmpty(indexs)){
Arrays.asList(indexs.split(",")).stream().forEach(item ->{
String mapping = this.readLocalFile(item);
System.out.println(mapping);
if(!StringUtil.isNullOrEmpty(mapping)){
// 创建索引请求, 并指定索引名称:item
CreateIndexRequest index = new CreateIndexRequest(item);
// 设置索引mapping
index.source(mapping, XContentType.JSON);
// 设置索引结果返回
client.admin().indices().create(index).actionGet();
}
});
}
// 返回内容,这里返回消耗时间 请求参数 插件名称
return channel -> {
XContentBuilder builder = channel.newBuilder();
builder.startObject();
builder.field("message", "Index initialization succeeded");
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
};
}
}
ArchInfoDataPluginHandler:处理指定索引(index),通过MySQL8 进行数据导入。
package org.elasticsearch.plugin.handler;
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.rest.RestRequest.Method.GET;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.util.ApplicationPropertiesHolder;
import org.elasticsearch.util.DBUtil;
import com.mysql.cj.util.StringUtils;
public class ArchInfoDataPluginHandler extends BaseRestHandler {
/** The logger. */
private static Log logger = LogFactory.getLog(ArchInfoDataPluginHandler.class);
@Override
public String getName() {
// TODO Auto-generated method stub
return "archinfo";
}
@Override
public List<Route> routes() {
// TODO Auto-generated method stub
return unmodifiableList(asList(new Route(GET, "/archinfo_data_plugin")));
}
public String readLocalSQL(String name){
// InputStream jsonInputStream = IndexPluginHandler.class
// .getClassLoader().getResourceAsStream(name.concat(".json"));
String dir =new File(ArchInfoDataPluginHandler.class.getProtectionDomain().getCodeSource().getLocation().getPath()).getParent();
System.out.println("---------dir2--------:" + dir);
try {
InputStream jsonInputStream = new FileInputStream(new File(dir + File.separator + "config" + File.separator + name.concat(".sql")));;
if(jsonInputStream != null){
return IOUtils.toString(jsonInputStream, Charsets.toCharset("utf-8"));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
// TODO Auto-generated method stub
BulkProcessor bulkProcessor = getBulkProcessor(client);
// 连接数据库,查询指定数据,索引数据入库
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
try {
conn = DBUtil.getConn(ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.driver-class-name"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.url"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.username"),ApplicationPropertiesHolder.getProperty("elasticsearch.datasource.password"));
String sql = this.readLocalSQL(ApplicationPropertiesHolder.getProperty("index.archinfo"));
if(StringUtils.isNullOrEmpty(sql)){
// 返回内容,这里返回消耗时间 请求参数 插件名称
return channel -> {
XContentBuilder builder = channel.newBuilder();
builder.startObject();
builder.field("message", "data initialization failed");
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, builder));
};
}
ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();
ResultSetMetaData colData = rs.getMetaData();
ArrayList<HashMap<String, Object>> dataList = new ArrayList<HashMap<String, Object>>();
HashMap<String, Object> map = null;
int count = 0;
String c = null;
Object v = null;
while (rs.next()) {
count++;
map = new HashMap<String, Object>(100);
for (int i = 1; i <= colData.getColumnCount(); i++) {
//c = colData.getColumnName(i);
c = colData.getColumnLabel(i);
v = rs.getObject(c);
if(v instanceof java.sql.Timestamp){
// 处理java.sql.Timestamp 与es 日期个数转换
if(v != null){
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String str = dateFormat.format(v);
map.put(c, str);
}
} else {
map.put(c, v);
}
}
dataList.add(map);
// 每20万条写一次,不足的批次的最后再一并提交
if (count % 200000 == 0) {
logger.info("Mysql handle data number : " + count);
// 写入ES
for (HashMap<String, Object> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest("archinfo").source(hashMap2));
}
// 每提交一次便将map与list清空
map.clear();
dataList.clear();
}
}
// count % 200000 处理未提交的数据
for (HashMap<String, Object> hashMap2 : dataList) {
bulkProcessor.add(
new IndexRequest("archinfo").source(hashMap2));
}
logger.info("-------------------------- Finally insert number total : " + count);
// 将数据刷新到es, 注意这一步执行后并不会立即生效,取决于bulkProcessor设置的刷新时间
bulkProcessor.flush();
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
} finally {
try {
rs.close();
ps.close();
conn.close();
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
client.close();
logger.info(terminatedFlag);
} catch (Exception e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
// 返回内容,这里返回消耗时间 请求参数 插件名称
return channel -> {
XContentBuilder builder = channel.newBuilder();
builder.startObject();
builder.field("message", "data initialization succeeded");
builder.endObject();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, builder));
};
}
private static BulkProcessor getBulkProcessor(NodeClient client) {
BulkProcessor bulkProcessor = null;
try {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : " + request.numberOfActions());
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};
BulkProcessor.Builder builder = BulkProcessor.builder(client, listener);
builder.setBulkActions(10000);
builder.setBulkSize(new ByteSizeValue(300L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
// 注意点:在这里感觉有点坑,官网样例并没有这一步,而笔者因一时粗心也没注意,在调试时注意看才发现,上面对builder设置的属性没有生效
bulkProcessor = builder.build();
} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}
}
开发好后,执行maven打包,会在target目录生成相应的zip文件。这个就是我们需要的插件包,后面安装插件会用到。
1、将文件解压到elasticsearch7的plugins文件下:
2、启动elasticsearch7 服务
注意:截图红框是我们在IndexPlugin 构造函数添加如下代码:
public IndexPlugin() {
super();
System.out.println(actionProfix + "插件实例化......");
}
产生的效果,也预示这我编写的插件功能已经被elasticsearch7 加载。
使用postman 工具模拟向elasticsearch7 服务发送:索引初始化和索引数据导入功能。
以上请求地址来源于自定义Handler 配置:
IndexPluginHandler 指定访问地址:
@Override
public List<Route> routes() {
// TODO Auto-generated method stub
return unmodifiableList(asList(new Route(GET, "/_index_plugin")));
}
ArchInfoDataPluginHandler 指定访问地址:
@Override
public List<Route> routes() {
// TODO Auto-generated method stub
return unmodifiableList(asList(new Route(GET, "/archinfo_data_plugin")));
}
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。