SpringBoot2.x 集成Hadoop3.0.3 实现HDFS文件系统管理
任务要求:搭建SpringBoot 2.x 集成Hadoop3.0.3环境,实现Hadoop 重要组成部分HDFS 文件系统管理的封装。
核心pom.xml 文件:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.1.RELEASE</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<mybatis-spring-boot-starter.version>1.3.2</mybatis-spring-boot-starter.version>
<mysql-connector-java.version>8.0.11</mysql-connector-java.version>
<com.alibaba.druid.version>1.1.9</com.alibaba.druid.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-codec.version>1.10</commons-codec.version>
<commons-lang3.version>3.8.1</commons-lang3.version>
<commons-net.version>3.6</commons-net.version>
<commons-io.version>2.6</commons-io.version>
<commons-collections.version>3.2.1</commons-collections.version>
<common-fileupload.version>1.3.1</common-fileupload.version>
<fastjson.version>1.2.48</fastjson.version>
<jasperreports.version>6.10.0</jasperreports.version>
</properties>
<dependencies>
<!-- SpringWeb模块 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- 移除springboot 自带日志框架log-back -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--springboot 集成测试框架 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--lombok插件 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- mysql 连接 -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot-starter.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql-connector-java.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${com.alibaba.druid.version}</version>
</dependency>
<!-- 分页控件 -->
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>4.1.6</version>
</dependency>
<!--common-lang 常用工具包 -->
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
<!--commons-lang3 工具包 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!--commons-codec 加密工具包 -->
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>${commons-codec.version}</version>
</dependency>
<!--commons-net 网络工具包 -->
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>${commons-net.version}</version>
</dependency>
<!--common-io 工具包 -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<!--common-collection 工具包 -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
</dependency>
<!--common-fileupload 工具包 -->
<dependency>
<groupId>commons-fileupload</groupId>
<artifactId>commons-fileupload</artifactId>
<version>${common-fileupload.version}</version>
</dependency>
<!-- Swagger2 -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- 解决提示:Missing artifact jdk.tools:jdk.tools:jar:1.8 -->
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<!-- spring-test -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.3.RELEASE</version>
</dependency>
<!-- 集成hadoop-3.0.3 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>3.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.0.3</version>
<scope>provided</scope>
</dependency>
核心依赖冲突jar 包问题描述:
1:hadoop 3.0.3 依赖的核心包日志采用log4j 方式记录而spring-boot-start-web 核心jar包日志采用logback 方式记录。我这里选择使用log4j 的方式记录日志信息。所有在spring-boot-start-web.jar 文件依赖中移除对logback 日志文件方式的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<!-- 移除springboot 自带日志框架log-back -->
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
2、hadoop3.0.3 核心包依赖的guava.jar 的版本与swagger 核心依赖的guava.jar 的版本不匹配。
截图如下:
解决办法:由于hadoop 依赖的guava.jar 版本比较低,选择的方案是hadoop 依赖的guava.jar 全部移除。
<!-- 集成hadoop-3.0.3 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-streaming</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
<version>3.0.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.0.3</version>
<exclusions>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.0.3</version>
<scope>provided</scope>
</dependency>
模块配置化对象:
package com.zzg.hadoop.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.boot.web.servlet.ServletRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.alibaba.druid.support.http.StatViewServlet;
import com.alibaba.druid.support.http.WebStatFilter;
/**
* druid 监控配置
* @author zzg
*
*/
@Configuration
public class DruidConfig {
@Bean
public ServletRegistrationBean druidServletRegistrationBean() {
ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean();
servletRegistrationBean.setServlet(new StatViewServlet());
servletRegistrationBean.addUrlMappings("/druid/*");
servletRegistrationBean.addInitParameter("allow", "");
servletRegistrationBean.addInitParameter("deny", "");
servletRegistrationBean.addInitParameter("loginUsername", "admin");
servletRegistrationBean.addInitParameter("loginPassword", "admin");
return servletRegistrationBean;
}
/**
* 注册DruidFilter拦截
*
* @return
*/
@Bean
public FilterRegistrationBean duridFilterRegistrationBean() {
FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean();
filterRegistrationBean.setFilter(new WebStatFilter());
Map<String, String> initParams = new HashMap<String, String>();
//设置忽略请求
initParams.put("exclusions", "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*");
filterRegistrationBean.setInitParameters(initParams);
filterRegistrationBean.addUrlPatterns("/*");
return filterRegistrationBean;
}
}
package com.zzg.hadoop.config;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* hadoop hdfs 参数配置
* @author zzg
*
*/
@Configuration
public class HadoopHDFSConfig {
// 日志记录
private static final Logger logger = LoggerFactory.getLogger(HadoopHDFSConfig.class);
@Value("${hdfs.hdfsPath}")
private String hdfsPath;
@Value("${hdfs.hdfsName}")
private String hdfsName;
/**
* hadoop hdfs 配置参数对象
* @return
*/
@Bean
public org.apache.hadoop.conf.Configuration getConfiguration(){
org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration();
configuration.set("fs.defaultFS", hdfsPath);
return configuration;
}
/**
* hadoop filesystem 文件系统
* @return
*/
@Bean
public FileSystem getFileSystem(){
FileSystem fileSystem = null;
try {
fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
return fileSystem;
}
}
package com.zzg.hadoop.config;
import java.util.Properties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.github.pagehelper.PageHelper;
/**
* mybatis 配置对象
* @author zzg
*
*/
@Configuration
public class MyBatisConfig {
/**
* 分页对象实列化
* @return
*/
@Bean
public PageHelper pageHelper() {
PageHelper pageHelper = new PageHelper();
Properties p = new Properties();
p.setProperty("offsetAsPageNum", "true");
p.setProperty("rowBoundsWithCount", "true");
p.setProperty("reasonable", "true");
p.setProperty("dialect", "mysql");
pageHelper.setProperties(p);
return pageHelper;
}
}
package com.zzg.hadoop.config;
import java.util.ArrayList;
import java.util.List;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.swagger.annotations.ApiOperation;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.ParameterBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.schema.ModelRef;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.service.Parameter;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket buildDocket() {
ParameterBuilder tokenPar = new ParameterBuilder();
List<Parameter> pars = new ArrayList<Parameter>();
tokenPar.name("X-CSRF-TOKEN").description("令牌").modelRef(new ModelRef("string")).parameterType("header")
.required(false).build();
pars.add(tokenPar.build());
return new Docket(DocumentationType.SWAGGER_2).select()
.apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class)).paths(PathSelectors.any())
.build().globalOperationParameters(pars).apiInfo(buildApiInf());
}
private ApiInfo buildApiInf() {
return new ApiInfoBuilder().title("****").termsOfServiceUrl("http://www.baidu.cn/")
.description("API接口")
.contact(new Contact("baidu", "http://www.baidu.cn/", "zhouzhiwengang@163.com"))
.version("2.0").build();
}
}
针对Hadoop3.0.3 的HDFS文件操作服务定义和服务实现.
package com.zzg.hadoop.service;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.BlockLocation;
import org.springframework.web.multipart.MultipartFile;
/**
* hadoop hdfs 通用接口定义
* @author zzg
*
*/
public interface HDFSService {
/**
* HDFS 文件夹创建
* @param path
* @return
*/
public boolean mkdirFolder(String path);
/**
* HDFS 文件是否存在
* @param path
* @return
*/
public boolean existFile(String path);
/**
* HDFS 读取目录信息
* @param path
* @return
*/
public List<Map<String, Object>> readCatalog(String path);
/**
* HDFS 创建文件
* @param path
* @param file
*/
public void createFile(String path, MultipartFile file);
/**
* HDFS 读取文件内容
* @param path
* @return
*/
public String readFileContent(String path);
/**
* HDFS 读完文件列表
* @param path
* @return
*/
public List<Map<String, String>> listFile(String path);
/**
* HDFS 文件重命名
* @param oldName
* @param newName
* @return
*/
public boolean renameFile(String oldName, String newName);
/**
* HDFS 文件删除
* @param path
* @return
*/
public boolean deleteFile(String path);
/**
* HDFS 文件上传
* @param path
* @param uploadPath
*/
public void uploadFile(String path, String uploadPath);
/**
* HDFS 文件下载
* @param path
* @param downloadPath
*/
public void downloadFile(String path, String downloadPath);
/**
* HDFS 文件复制
* @param sourcePath
* @param targetPath
*/
public void copyFile(String sourcePath, String targetPath);
/**
* HDFS 读取指定文件 返回字节数组
* @param path
* @return
*/
public byte[] openFileToBytes(String path);
/**
* HDFS 获取指定文件 BlockLocation信息
* @param path
* @return
* @throws Exception
*/
public BlockLocation[] getFileBlockLocations(String path);
}
package com.zzg.hadoop.service.impl;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import com.zzg.hadoop.service.HDFSService;
@Service
public class HDFSServiceImpl implements HDFSService {
// 全局变量定义
private static final int bufferSize = 1024 * 1024 * 64;
// 日志记录服务
private static final Logger logger = LoggerFactory.getLogger(HDFSServiceImpl.class);
@Autowired
private FileSystem fileSystem;
@Override
public boolean mkdirFolder(String path) {
// TODO Auto-generated method stub
boolean target = false;
if (StringUtils.isEmpty(path)) {
return false;
}
if (existFile(path)) {
return true;
}
Path src = new Path(path);
try {
target = fileSystem.mkdirs(src);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
return target;
}
@Override
public boolean existFile(String path) {
// TODO Auto-generated method stub
boolean target = false;
if (StringUtils.isEmpty(path)) {
return target;
}
Path src = new Path(path);
try {
target = fileSystem.exists(src);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
return target;
}
@Override
public List<Map<String, Object>> readCatalog(String path) {
// TODO Auto-generated method stub
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
// 目标路径
Path newPath = new Path(path);
FileStatus[] statusList = null;
try {
statusList = fileSystem.listStatus(newPath);
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
List<Map<String, Object>> list = new ArrayList<>();
if (null != statusList && statusList.length > 0) {
for (FileStatus fileStatus : statusList) {
Map<String, Object> map = new HashMap<>();
map.put("filePath", fileStatus.getPath());
map.put("fileStatus", fileStatus.toString());
list.add(map);
}
return list;
} else {
return null;
}
}
@Override
public void createFile(String path, MultipartFile file) {
// TODO Auto-generated method stub
if (StringUtils.isEmpty(path)) {
return;
}
String fileName = file.getName();
Path newPath = new Path(path + "/" + fileName);
// 打开一个输出流
FSDataOutputStream outputStream;
try {
outputStream = fileSystem.create(newPath);
outputStream.write(file.getBytes());
outputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
@Override
public String readFileContent(String path) {
// TODO Auto-generated method stub
StringBuffer sb = new StringBuffer();
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
// 目标路径
Path srcPath = new Path(path);
FSDataInputStream inputStream = null;
try {
inputStream = fileSystem.open(srcPath);
// 防止中文乱码
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String lineTxt = "";
while ((lineTxt = reader.readLine()) != null) {
sb.append(lineTxt);
}
}catch(Exception e){
logger.error(e.getMessage());
} finally {
try {
inputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
return sb.toString();
}
@Override
public List<Map<String, String>> listFile(String path) {
// TODO Auto-generated method stub
List<Map<String, String>> returnList = new ArrayList<>();
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
// 目标路径
Path srcPath = new Path(path);
// 递归找到所有文件
try{
RemoteIterator<LocatedFileStatus> filesList = fileSystem.listFiles(srcPath, true);
while (filesList.hasNext()) {
LocatedFileStatus next = filesList.next();
String fileName = next.getPath().getName();
Path filePath = next.getPath();
Map<String, String> map = new HashMap<>();
map.put("fileName", fileName);
map.put("filePath", filePath.toString());
returnList.add(map);
}
}catch(Exception e){
logger.error(e.getMessage());
}
return returnList;
}
@Override
public boolean renameFile(String oldName, String newName) {
// TODO Auto-generated method stub
boolean target = false;
if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) {
return false;
}
// 原文件目标路径
Path oldPath = new Path(oldName);
// 重命名目标路径
Path newPath = new Path(newName);
try{
target = fileSystem.rename(oldPath, newPath);
}catch(Exception e){
logger.error(e.getMessage());
}
return target;
}
@Override
public boolean deleteFile(String path) {
// TODO Auto-generated method stub
boolean target = false;
if (StringUtils.isEmpty(path)) {
return false;
}
if (!existFile(path)) {
return false;
}
Path srcPath = new Path(path);
try{
target = fileSystem.deleteOnExit(srcPath);
}catch(Exception e){
logger.error(e.getMessage());
}
return target;
}
@Override
public void uploadFile(String path, String uploadPath) {
// TODO Auto-generated method stub
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
return;
}
// 上传路径
Path clientPath = new Path(path);
// 目标路径
Path serverPath = new Path(uploadPath);
// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
try {
fileSystem.copyFromLocalFile(false, clientPath, serverPath);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
@Override
public void downloadFile(String path, String downloadPath) {
// TODO Auto-generated method stub
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
return;
}
// 上传路径
Path clientPath = new Path(path);
// 目标路径
Path serverPath = new Path(downloadPath);
// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
try {
fileSystem.copyToLocalFile(false, clientPath, serverPath);
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
@Override
public void copyFile(String sourcePath, String targetPath) {
// TODO Auto-generated method stub
if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) {
return;
}
// 原始文件路径
Path oldPath = new Path(sourcePath);
// 目标路径
Path newPath = new Path(targetPath);
FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
try {
try{
inputStream = fileSystem.open(oldPath);
outputStream = fileSystem.create(newPath);
IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);
}catch(Exception e){
logger.error(e.getMessage());
}
} finally {
try{
inputStream.close();
outputStream.close();
}catch(Exception e){
logger.error(e.getMessage());
}
}
}
@Override
public byte[] openFileToBytes(String path) {
// TODO Auto-generated method stub
byte[] bytes= null;
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
// 目标路径
Path srcPath = new Path(path);
try {
FSDataInputStream inputStream = fileSystem.open(srcPath);
bytes = IOUtils.readFullyToByteArray(inputStream);
}catch(Exception e){
logger.error(e.getMessage());
}
return bytes;
}
@Override
public BlockLocation[] getFileBlockLocations(String path) {
// TODO Auto-generated method stub
BlockLocation[] blocks = null;
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
// 目标路径
Path srcPath = new Path(path);
try{
FileStatus fileStatus = fileSystem.getFileStatus(srcPath);
blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}catch(Exception e){
logger.error(e.getMessage());
}
return blocks;
}
}
针对Hadoop 3.0.3 的HDFS 文件操作测试Controller
package com.zzg.hadoop.controller;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.mock.web.MockMultipartFile;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSONObject;
import com.zzg.hadoop.config.HadoopHDFSConfig;
import com.zzg.hadoop.service.HDFSService;
import com.zzg.jreport.response.JreportResponse;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@Controller
@RequestMapping("/api/hadoop/hdfs")
@Api(value = "HDFS Controlle", tags = "HDFS 操作服务")
public class HDFSController {
// 日志记录
private static final Logger logger = LoggerFactory.getLogger(HDFSController.class);
@Autowired
private HDFSService service;
/**
* 创建的文件夹权限不够,需要设置权限问题
* @param entity
* @return
*/
@ApiOperation(httpMethod = "POST", value = "创建文件夹")
@RequestMapping(value = "/mkdirFolder", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse mkdirFolder(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
boolean target = service.mkdirFolder(entity.getString("path"));
return JreportResponse.ok(target);
}
@ApiOperation(httpMethod = "POST", value = "判断文件夹是否存在")
@RequestMapping(value = "/existFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse existFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
boolean target = service.existFile(entity.getString("path"));
return JreportResponse.ok(target);
}
@ApiOperation(httpMethod = "POST", value = "读取目录")
@RequestMapping(value = "/readCatalog", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse readCatalog(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
List<Map<String, Object>> list = service.readCatalog(entity.getString("path"));
return JreportResponse.ok(list);
}
@ApiOperation(httpMethod = "POST", value = "新建文件")
@RequestMapping(value = "/createFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse createFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
FileInputStream inputStream = null;
MultipartFile file = null;
try {
inputStream = new FileInputStream("C:\\data\\words.txt");
file = new MockMultipartFile("test.txt", inputStream);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}finally{
try {
inputStream.close();
} catch (IOException e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
service.createFile(entity.getString("path"),file);
return JreportResponse.ok();
}
@ApiOperation(httpMethod = "POST", value = "读取文件内容")
@RequestMapping(value = "/readFileContent", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse readFileContent(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
String content = service.readFileContent(entity.getString("path"));
return JreportResponse.ok(content);
}
@ApiOperation(httpMethod = "POST", value = "文件列表")
@RequestMapping(value = "/listFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse listFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
List<Map<String, String>> list = service.listFile(entity.getString("path"));
return JreportResponse.ok(list);
}
@ApiOperation(httpMethod = "POST", value = "文件重命名")
@RequestMapping(value = "/renameFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse renameFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
boolean target = service.renameFile(entity.getString("oldName"),entity.getString("newName"));
return JreportResponse.ok(target);
}
/**
* 指定文件位删除成功,需要寻找原因
* @param entity
* @return
*/
@ApiOperation(httpMethod = "POST", value = "文件删除")
@RequestMapping(value = "/deleteFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse deleteFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
boolean target = service.deleteFile(entity.getString("path"));
return JreportResponse.ok(target);
}
@ApiOperation(httpMethod = "POST", value = "文件拷贝")
@RequestMapping(value = "/uploadFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8")
@ResponseBody
public JreportResponse uploadFile(
@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) {
service.uploadFile(entity.getString("path"), entity.getString("uploadPath"));
return JreportResponse.ok();
}
}
其他业务domain、mapper、service 、serviceImpl 和controller 省略。
application.properties和log4j.properties 配置文件内容:
# æå®æå¡ç«¯å£
server.port=7090
# æå®æå¡ å称
# server.context-path=/jreport
#mybatis xml æ件éç½®
mybatis.mapper-locations=classpath*:mapper/hadoop/*Mapper.xml
mybatis.type-aliases-package=com.zzg.hadoop.domain
# MyBatis mysql8 éç½®
spring.datasource.url=jdbc:mysql://192.168.**.**:3306/boot-security?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&nullCatalogMeansCurrent=true
spring.datasource.username=root
spring.datasource.password=******
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
# Druid éç½®
# åå§åæ¶å»ºç«ç©çè¿æ¥ç个æ°
spring.datasource.druid.initial-size=5
# æ大è¿æ¥æ± æ°é
spring.datasource.druid.max-active=30
# æå°è¿æ¥æ± æ°é
spring.datasource.druid.min-idle=5
# è·åè¿æ¥æ¶æ大çå¾æ¶é´ï¼åä½æ¯«ç§
spring.datasource.druid.max-wait=60000
# éç½®é´éå¤ä¹æè¿è¡ä¸æ¬¡æ£æµï¼æ£æµéè¦å³éç空é²è¿æ¥ï¼åä½æ¯æ¯«ç§
spring.datasource.druid.time-between-eviction-runs-millis=60000
# è¿æ¥ä¿æ空é²èä¸è¢«é©±éçæå°æ¶é´
spring.datasource.druid.min-evictable-idle-time-millis=300000
# ç¨æ¥æ£æµè¿æ¥æ¯å¦ææçsqlï¼è¦æ±æ¯ä¸ä¸ªæ¥è¯¢è¯å¥
spring.datasource.druid.validation-query=SELECT 1 FROM DUAL
# 建议é置为trueï¼ä¸å½±åæ§è½ï¼å¹¶ä¸ä¿è¯å®å¨æ§ãç³è¯·è¿æ¥çæ¶åæ£æµï¼å¦æ空é²æ¶é´å¤§äºtimeBetweenEvictionRunsMillisï¼æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææã
spring.datasource.druid.test-while-idle=true
# ç³è¯·è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã
spring.datasource.druid.test-on-borrow=false
# å½è¿è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã
spring.datasource.druid.test-on-return=false
# æ¯å¦ç¼åpreparedStatementï¼ä¹å°±æ¯PSCacheãPSCache对æ¯æ游æ çæ°æ®åºæ§è½æå巨大ï¼æ¯å¦è¯´oracleãå¨mysqlä¸å»ºè®®å³éã
spring.datasource.druid.pool-prepared-statements=true
# è¦å¯ç¨PSCacheï¼å¿é¡»é置大äº0ï¼å½å¤§äº0æ¶ï¼poolPreparedStatementsèªå¨è§¦åä¿®æ¹ä¸ºtrueã
spring.datasource.druid.max-pool-prepared-statement-per-connection-size=50
# éç½®çæ§ç»è®¡æ¦æªçfiltersï¼å»æåçæ§çé¢sqlæ æ³ç»è®¡
#spring.datasource.druid.filters=stat,wall
# éè¿connectPropertieså±æ§æ¥æå¼mergeSqlåè½ï¼æ¢SQLè®°å½
spring.datasource.druid.connection-properties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500
# å并å¤ä¸ªDruidDataSourceççæ§æ°æ®
spring.datasource.druid.use-global-data-source-stat=true
# éç½®sql 注å¥æ¹å¼
spring.datasource.druid.filters=stat
# æ¥å¿æ件éç½®
#logging.config=classpath:logback.xml
# hadoop hdfs éç½®åæ°
hdfs.hdfsPath=hdfs://192.168.60.204:9000
hdfs.hdfsName=root
#log4j.rootLogger=CONSOLE,info,error,DEBUG
log4j.rootLogger=info,error,CONSOLE,DEBUG
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n
log4j.logger.info=info
log4j.appender.info=org.apache.log4j.DailyRollingFileAppender
log4j.appender.info.layout=org.apache.log4j.PatternLayout
log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n
log4j.appender.info.datePattern='.'yyyy-MM-dd
log4j.appender.info.Threshold = info
log4j.appender.info.append=true
#log4j.appender.info.File=/home/admin/pms-api-services/logs/info/api_services_info
log4j.appender.info.File=C:/logs/hadoop-hdfs/logs/info/api_services_info
log4j.logger.error=error
log4j.appender.error=org.apache.log4j.DailyRollingFileAppender
log4j.appender.error.layout=org.apache.log4j.PatternLayout
log4j.appender.error.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n
log4j.appender.error.datePattern='.'yyyy-MM-dd
log4j.appender.error.Threshold = error
log4j.appender.error.append=true
#log4j.appender.error.File=/home/admin/pms-api-services/logs/error/api_services_error
log4j.appender.error.File=/C:/logs/hadoop-hdfs/logs/error/api_services_error
log4j.logger.DEBUG=DEBUG
log4j.appender.DEBUG=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DEBUG.layout=org.apache.log4j.PatternLayout
log4j.appender.DEBUG.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n
log4j.appender.DEBUG.datePattern='.'yyyy-MM-dd
log4j.appender.DEBUG.Threshold = DEBUG
log4j.appender.DEBUG.append=true
#log4j.appender.DEBUG.File=/home/admin/pms-api-services/logs/debug/api_services_debug
log4j.appender.DEBUG.File=C:/logs/hadoop-hdfs/logs/debug/api_services_debug
### Debug
log4j.logger.com.ibatis=DEBUG
log4j.logger.com.ibatis.common.jdbc.SimpleDataSource=DEBUG
log4j.logger.com.ibatis.common.jdbc.ScriptRunner=DEBUG
log4j.logger.com.ibatis.sqlmap.engine.impl.SqlMapClientDelegate=DEBUG
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
程序入口代码:
package com.zzg.hadoop;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.transaction.annotation.EnableTransactionManagement;
@SpringBootApplication
@EnableTransactionManagement
@MapperScan("com.zzg.hadoop.mapper")
public class Application extends SpringBootServletInitializer {
public static void main(String[] args) {
// TODO Auto-generated method stub
SpringApplication.run(Application.class, args);
System.out.println("============= SpringBoot hadoop hdfs Service Start Success =============");
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
// 注意这里要指向原先用main方法执行的Application启动类
return builder.sources(Application.class);
}
}
项目结构截图:
1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。