SpringBoot2.x 集成Hadoop3.0.3 实现HDFS文件系统管理

虾米姐 阅读:1637 2021-03-31 16:55:41 评论:0

任务要求:搭建SpringBoot 2.x 集成Hadoop3.0.3环境,实现Hadoop 重要组成部分HDFS 文件系统管理的封装。

核心pom.xml 文件:

<parent> 
		<groupId>org.springframework.boot</groupId> 
		<artifactId>spring-boot-starter-parent</artifactId> 
		<version>2.1.1.RELEASE</version> 
	</parent> 
 
	<properties> 
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
		<java.version>1.8</java.version> 
		<mybatis-spring-boot-starter.version>1.3.2</mybatis-spring-boot-starter.version> 
		<mysql-connector-java.version>8.0.11</mysql-connector-java.version> 
		<com.alibaba.druid.version>1.1.9</com.alibaba.druid.version> 
		<commons-lang.version>2.6</commons-lang.version> 
		<commons-codec.version>1.10</commons-codec.version> 
		<commons-lang3.version>3.8.1</commons-lang3.version> 
		<commons-net.version>3.6</commons-net.version> 
		<commons-io.version>2.6</commons-io.version> 
		<commons-collections.version>3.2.1</commons-collections.version> 
		<common-fileupload.version>1.3.1</common-fileupload.version> 
		<fastjson.version>1.2.48</fastjson.version> 
		<jasperreports.version>6.10.0</jasperreports.version> 
	</properties> 
 
 
	<dependencies> 
		<!-- SpringWeb模块 --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-web</artifactId> 
			<!-- 移除springboot 自带日志框架log-back  --> 
			 <exclusions> 
                <exclusion> 
                    <groupId>org.springframework.boot</groupId> 
                    <artifactId>spring-boot-starter-logging</artifactId> 
                </exclusion> 
            </exclusions> 
		</dependency> 
 
		<!--springboot 集成测试框架 --> 
		<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-test</artifactId> 
			<scope>test</scope> 
		</dependency> 
 
 
		<!--lombok插件 --> 
		<dependency> 
			<groupId>org.projectlombok</groupId> 
			<artifactId>lombok</artifactId> 
			<version>${lombok.version}</version> 
			<scope>provided</scope> 
		</dependency> 
 
 
		<!-- mysql 连接 --> 
		<dependency> 
			<groupId>org.mybatis.spring.boot</groupId> 
			<artifactId>mybatis-spring-boot-starter</artifactId> 
			<version>${mybatis-spring-boot-starter.version}</version> 
		</dependency> 
		<dependency> 
			<groupId>mysql</groupId> 
			<artifactId>mysql-connector-java</artifactId> 
			<version>${mysql-connector-java.version}</version> 
			<scope>runtime</scope> 
		</dependency> 
		<dependency> 
			<groupId>com.alibaba</groupId> 
			<artifactId>druid-spring-boot-starter</artifactId> 
			<version>${com.alibaba.druid.version}</version> 
		</dependency> 
		<!-- 分页控件 --> 
		<dependency> 
			<groupId>com.github.pagehelper</groupId> 
			<artifactId>pagehelper</artifactId> 
			<version>4.1.6</version> 
		</dependency> 
 
		<!--common-lang 常用工具包 --> 
		<dependency> 
			<groupId>commons-lang</groupId> 
			<artifactId>commons-lang</artifactId> 
			<version>${commons-lang.version}</version> 
		</dependency> 
		<!--commons-lang3 工具包 --> 
		<dependency> 
			<groupId>org.apache.commons</groupId> 
			<artifactId>commons-lang3</artifactId> 
			<version>${commons-lang3.version}</version> 
		</dependency> 
 
		<!--commons-codec 加密工具包 --> 
		<dependency> 
			<groupId>commons-codec</groupId> 
			<artifactId>commons-codec</artifactId> 
			<version>${commons-codec.version}</version> 
		</dependency> 
		<!--commons-net 网络工具包 --> 
		<dependency> 
			<groupId>commons-net</groupId> 
			<artifactId>commons-net</artifactId> 
			<version>${commons-net.version}</version> 
		</dependency> 
		<!--common-io 工具包 --> 
		<dependency> 
			<groupId>commons-io</groupId> 
			<artifactId>commons-io</artifactId> 
			<version>${commons-io.version}</version> 
		</dependency> 
		<!--common-collection 工具包 --> 
		<dependency> 
			<groupId>commons-collections</groupId> 
			<artifactId>commons-collections</artifactId> 
			<version>${commons-collections.version}</version> 
		</dependency> 
		<!--common-fileupload 工具包 --> 
		<dependency> 
			<groupId>commons-fileupload</groupId> 
			<artifactId>commons-fileupload</artifactId> 
			<version>${common-fileupload.version}</version> 
		</dependency> 
 
		<!-- Swagger2 --> 
		<dependency> 
			<groupId>io.springfox</groupId> 
			<artifactId>springfox-swagger2</artifactId> 
			<version>2.7.0</version> 
		</dependency> 
		<dependency> 
			<groupId>io.springfox</groupId> 
			<artifactId>springfox-swagger-ui</artifactId> 
			<version>2.7.0</version> 
		</dependency> 
 
		<!-- fastjson --> 
		<dependency> 
			<groupId>com.alibaba</groupId> 
			<artifactId>fastjson</artifactId> 
			<version>${fastjson.version}</version> 
		</dependency> 
 
        <!-- 解决提示:Missing artifact jdk.tools:jdk.tools:jar:1.8 --> 
		<dependency> 
			<groupId>jdk.tools</groupId> 
			<artifactId>jdk.tools</artifactId> 
			<version>1.8</version> 
			<scope>system</scope> 
			<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> 
		</dependency> 
 
		<!-- spring-test --> 
		<dependency> 
			<groupId>org.springframework</groupId> 
			<artifactId>spring-test</artifactId> 
			<version>5.1.3.RELEASE</version> 
		</dependency> 
 
		<!-- 集成hadoop-3.0.3 --> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-common</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-streaming</artifactId> 
			<version>3.0.3</version> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-yarn-common</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-distcp</artifactId> 
			<version>3.0.3</version> 
			<scope>provided</scope> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-mapreduce-client-core</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-hdfs</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId> 
			<version>3.0.3</version> 
			<scope>provided</scope> 
		</dependency>

核心依赖冲突jar 包问题描述:

1:hadoop 3.0.3 依赖的核心包日志采用log4j 方式记录而spring-boot-start-web 核心jar包日志采用logback 方式记录。我这里选择使用log4j 的方式记录日志信息。所有在spring-boot-start-web.jar 文件依赖中移除对logback 日志文件方式的依赖:

<dependency> 
			<groupId>org.springframework.boot</groupId> 
			<artifactId>spring-boot-starter-web</artifactId> 
			<!-- 移除springboot 自带日志框架log-back  --> 
			 <exclusions> 
                <exclusion> 
                    <groupId>org.springframework.boot</groupId> 
                    <artifactId>spring-boot-starter-logging</artifactId> 
                </exclusion> 
            </exclusions> 
		</dependency>

2、hadoop3.0.3 核心包依赖的guava.jar 的版本与swagger 核心依赖的guava.jar 的版本不匹配。

截图如下:

解决办法:由于hadoop 依赖的guava.jar 版本比较低,选择的方案是hadoop 依赖的guava.jar 全部移除。

<!-- 集成hadoop-3.0.3 --> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-common</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-streaming</artifactId> 
			<version>3.0.3</version> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-yarn-common</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-distcp</artifactId> 
			<version>3.0.3</version> 
			<scope>provided</scope> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-mapreduce-client-core</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-hdfs</artifactId> 
			<version>3.0.3</version> 
			<exclusions> 
				<exclusion> 
					<groupId>com.google.guava</groupId> 
					<artifactId>guava</artifactId> 
				</exclusion> 
			</exclusions> 
		</dependency> 
		<dependency> 
			<groupId>org.apache.hadoop</groupId> 
			<artifactId>hadoop-mapreduce-client-jobclient</artifactId> 
			<version>3.0.3</version> 
			<scope>provided</scope> 
		</dependency>

模块配置化对象:

package com.zzg.hadoop.config; 
 
import java.util.HashMap; 
import java.util.Map; 
 
import org.springframework.boot.web.servlet.FilterRegistrationBean; 
import org.springframework.boot.web.servlet.ServletRegistrationBean; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
 
import com.alibaba.druid.support.http.StatViewServlet; 
import com.alibaba.druid.support.http.WebStatFilter; 
/** 
 * druid 监控配置 
 * @author zzg 
 * 
 */ 
@Configuration 
public class DruidConfig { 
	 	@Bean 
	    public ServletRegistrationBean druidServletRegistrationBean() { 
	        ServletRegistrationBean servletRegistrationBean = new ServletRegistrationBean(); 
	        servletRegistrationBean.setServlet(new StatViewServlet()); 
	        servletRegistrationBean.addUrlMappings("/druid/*"); 
	        servletRegistrationBean.addInitParameter("allow", ""); 
	        servletRegistrationBean.addInitParameter("deny", ""); 
	        servletRegistrationBean.addInitParameter("loginUsername", "admin"); 
	        servletRegistrationBean.addInitParameter("loginPassword", "admin"); 
	        return servletRegistrationBean; 
	    } 
 
	    /** 
	     * 注册DruidFilter拦截 
	     * 
	     * @return 
	     */ 
	    @Bean 
	    public FilterRegistrationBean duridFilterRegistrationBean() { 
	        FilterRegistrationBean filterRegistrationBean = new FilterRegistrationBean(); 
	        filterRegistrationBean.setFilter(new WebStatFilter()); 
	        Map<String, String> initParams = new HashMap<String, String>(); 
	        //设置忽略请求 
	        initParams.put("exclusions", "*.js,*.gif,*.jpg,*.bmp,*.png,*.css,*.ico,/druid/*"); 
	        filterRegistrationBean.setInitParameters(initParams); 
	        filterRegistrationBean.addUrlPatterns("/*"); 
	        return filterRegistrationBean; 
	    } 
} 
package com.zzg.hadoop.config; 
 
import java.io.IOException; 
import java.net.URI; 
import java.net.URISyntaxException; 
 
import org.apache.hadoop.fs.FileSystem; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Value; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
/** 
 * hadoop hdfs 参数配置 
 * @author zzg 
 * 
 */ 
@Configuration 
public class HadoopHDFSConfig { 
	 
	// 日志记录 
	private static final Logger logger = LoggerFactory.getLogger(HadoopHDFSConfig.class);	 
 
	@Value("${hdfs.hdfsPath}") 
	private String hdfsPath; 
	@Value("${hdfs.hdfsName}") 
	private String hdfsName; 
	 
	/** 
	 * hadoop hdfs 配置参数对象 
	 * @return 
	 */ 
	@Bean 
	public org.apache.hadoop.conf.Configuration  getConfiguration(){ 
		org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); 
		configuration.set("fs.defaultFS", hdfsPath); 
		return configuration; 
	} 
	/** 
	 * hadoop filesystem 文件系统 
	 * @return 
	 */ 
	@Bean 
	public FileSystem getFileSystem(){ 
		FileSystem fileSystem = null; 
		try { 
			fileSystem = FileSystem.get(new URI(hdfsPath), getConfiguration(), hdfsName); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} catch (InterruptedException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} catch (URISyntaxException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
		return fileSystem; 
	} 
	 
	 
 
} 
package com.zzg.hadoop.config; 
 
import java.util.Properties; 
 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
 
import com.github.pagehelper.PageHelper; 
 
/** 
 * mybatis 配置对象 
 * @author zzg 
 * 
 */ 
@Configuration 
public class MyBatisConfig { 
	/** 
	 * 分页对象实列化 
	 * @return 
	 */ 
	@Bean 
	public PageHelper pageHelper() { 
		PageHelper pageHelper = new PageHelper(); 
		Properties p = new Properties(); 
		p.setProperty("offsetAsPageNum", "true"); 
		p.setProperty("rowBoundsWithCount", "true"); 
		p.setProperty("reasonable", "true"); 
		p.setProperty("dialect", "mysql"); 
		pageHelper.setProperties(p); 
		return pageHelper; 
	} 
}
package com.zzg.hadoop.config; 
 
import java.util.ArrayList; 
import java.util.List; 
 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
 
import io.swagger.annotations.ApiOperation; 
import springfox.documentation.builders.ApiInfoBuilder; 
import springfox.documentation.builders.ParameterBuilder; 
import springfox.documentation.builders.PathSelectors; 
import springfox.documentation.builders.RequestHandlerSelectors; 
import springfox.documentation.schema.ModelRef; 
import springfox.documentation.service.ApiInfo; 
import springfox.documentation.service.Contact; 
import springfox.documentation.service.Parameter; 
import springfox.documentation.spi.DocumentationType; 
import springfox.documentation.spring.web.plugins.Docket; 
import springfox.documentation.swagger2.annotations.EnableSwagger2; 
 
@Configuration 
@EnableSwagger2 
public class SwaggerConfig { 
	@Bean 
	public Docket buildDocket() { 
 
		ParameterBuilder tokenPar = new ParameterBuilder(); 
		List<Parameter> pars = new ArrayList<Parameter>(); 
		tokenPar.name("X-CSRF-TOKEN").description("令牌").modelRef(new ModelRef("string")).parameterType("header") 
				.required(false).build(); 
		pars.add(tokenPar.build()); 
 
		return new Docket(DocumentationType.SWAGGER_2).select() 
				.apis(RequestHandlerSelectors.withMethodAnnotation(ApiOperation.class)).paths(PathSelectors.any()) 
				.build().globalOperationParameters(pars).apiInfo(buildApiInf()); 
	} 
 
	private ApiInfo buildApiInf() { 
		return new ApiInfoBuilder().title("****").termsOfServiceUrl("http://www.baidu.cn/") 
				.description("API接口") 
				.contact(new Contact("baidu", "http://www.baidu.cn/", "zhouzhiwengang@163.com")) 
				.version("2.0").build(); 
 
	} 
} 

针对Hadoop3.0.3 的HDFS文件操作服务定义和服务实现.

package com.zzg.hadoop.service; 
 
import java.util.List; 
import java.util.Map; 
 
import org.apache.hadoop.fs.BlockLocation; 
import org.springframework.web.multipart.MultipartFile; 
 
/** 
 * hadoop hdfs 通用接口定义 
 * @author zzg 
 * 
 */ 
public interface HDFSService { 
	/** 
	 * HDFS 文件夹创建 
	 * @param path 
	 * @return 
	 */ 
	public boolean mkdirFolder(String path); 
	 
	/** 
	 * HDFS 文件是否存在 
	 * @param path 
	 * @return 
	 */ 
	public boolean existFile(String path); 
	 
	/** 
	 * HDFS 读取目录信息 
	 * @param path 
	 * @return 
	 */ 
	public List<Map<String, Object>> readCatalog(String path); 
	 
	/** 
	 * HDFS 创建文件 
	 * @param path 
	 * @param file 
	 */ 
	public void createFile(String path, MultipartFile file); 
	 
	/** 
	 * HDFS 读取文件内容 
	 * @param path 
	 * @return 
	 */ 
	public String readFileContent(String path); 
	 
	/** 
	 * HDFS 读完文件列表 
	 * @param path 
	 * @return 
	 */ 
	public List<Map<String, String>> listFile(String path); 
	 
	/** 
	 * HDFS 文件重命名 
	 * @param oldName 
	 * @param newName 
	 * @return 
	 */ 
	public boolean renameFile(String oldName, String newName); 
	 
	/** 
	 * HDFS 文件删除 
	 * @param path 
	 * @return 
	 */ 
	public boolean deleteFile(String path); 
	 
	/** 
	 * HDFS 文件上传 
	 * @param path 
	 * @param uploadPath 
	 */ 
	public void uploadFile(String path, String uploadPath); 
	 
	/** 
	 * HDFS 文件下载 
	 * @param path 
	 * @param downloadPath 
	 */ 
	public void downloadFile(String path, String downloadPath); 
	 
	/** 
	 * HDFS 文件复制 
	 * @param sourcePath 
	 * @param targetPath 
	 */ 
	public void copyFile(String sourcePath, String targetPath); 
	 
	 
	/** 
	 * HDFS 读取指定文件 返回字节数组 
	 * @param path 
	 * @return 
	 */ 
	public byte[] openFileToBytes(String path);  
	 
	/** 
	 * HDFS 获取指定文件 BlockLocation信息 
	 * @param path 
	 * @return 
	 * @throws Exception 
	 */ 
	public BlockLocation[] getFileBlockLocations(String path); 
} 
package com.zzg.hadoop.service.impl; 
 
import java.io.BufferedReader; 
import java.io.FileNotFoundException; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.util.ArrayList; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import org.apache.commons.lang.StringUtils; 
import org.apache.hadoop.fs.BlockLocation; 
import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.FSDataOutputStream; 
import org.apache.hadoop.fs.FileStatus; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.LocatedFileStatus; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.fs.RemoteIterator; 
import org.apache.hadoop.io.IOUtils; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.stereotype.Service; 
import org.springframework.web.multipart.MultipartFile; 
import com.zzg.hadoop.service.HDFSService; 
 
@Service 
public class HDFSServiceImpl implements HDFSService { 
	// 全局变量定义 
	private static final int bufferSize = 1024 * 1024 * 64; 
 
	 
	// 日志记录服务 
	private static final Logger logger = LoggerFactory.getLogger(HDFSServiceImpl.class); 
 
	@Autowired 
	private FileSystem fileSystem; 
 
	@Override 
	public boolean mkdirFolder(String path) { 
		// TODO Auto-generated method stub 
		boolean target = false; 
		if (StringUtils.isEmpty(path)) { 
			return false; 
		} 
		if (existFile(path)) { 
			return true; 
		} 
		Path src = new Path(path); 
		try { 
			target = fileSystem.mkdirs(src); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
		return target; 
	} 
 
	@Override 
	public boolean existFile(String path) { 
		// TODO Auto-generated method stub 
		boolean target = false; 
 
		if (StringUtils.isEmpty(path)) { 
			return target; 
		} 
		Path src = new Path(path); 
		try { 
			target = fileSystem.exists(src); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
		return target; 
	} 
 
	@Override 
	public List<Map<String, Object>> readCatalog(String path) { 
		// TODO Auto-generated method stub 
		if (StringUtils.isEmpty(path)) { 
			return null; 
		} 
		if (!existFile(path)) { 
			return null; 
		} 
 
		// 目标路径 
		Path newPath = new Path(path); 
		FileStatus[] statusList = null; 
		try { 
			statusList = fileSystem.listStatus(newPath); 
		} catch (FileNotFoundException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
		List<Map<String, Object>> list = new ArrayList<>(); 
		if (null != statusList && statusList.length > 0) { 
			for (FileStatus fileStatus : statusList) { 
				Map<String, Object> map = new HashMap<>(); 
				map.put("filePath", fileStatus.getPath()); 
				map.put("fileStatus", fileStatus.toString()); 
				list.add(map); 
			} 
			return list; 
		} else { 
			return null; 
		} 
 
	} 
 
	@Override 
	public void createFile(String path, MultipartFile file) { 
		// TODO Auto-generated method stub 
		if (StringUtils.isEmpty(path)) { 
			return; 
		} 
		String fileName = file.getName(); 
		Path newPath = new Path(path + "/" + fileName); 
		// 打开一个输出流 
		FSDataOutputStream outputStream; 
		try { 
			outputStream = fileSystem.create(newPath); 
			outputStream.write(file.getBytes()); 
			outputStream.close(); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
	} 
 
	@Override 
	public String readFileContent(String path) { 
		// TODO Auto-generated method stub 
		StringBuffer sb = new StringBuffer(); 
		if (StringUtils.isEmpty(path)) { 
			return null; 
		} 
		if (!existFile(path)) { 
			return null; 
		} 
		// 目标路径 
		Path srcPath = new Path(path); 
		FSDataInputStream inputStream = null; 
		try { 
			inputStream = fileSystem.open(srcPath); 
			// 防止中文乱码 
			BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); 
			String lineTxt = ""; 
			while ((lineTxt = reader.readLine()) != null) { 
				sb.append(lineTxt); 
			} 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} finally { 
			try { 
				inputStream.close(); 
			} catch (IOException e) { 
				// TODO Auto-generated catch block 
				logger.error(e.getMessage()); 
			} 
		} 
		return sb.toString(); 
	} 
 
	@Override 
	public List<Map<String, String>> listFile(String path) { 
		// TODO Auto-generated method stub 
		List<Map<String, String>> returnList = new ArrayList<>(); 
		if (StringUtils.isEmpty(path)) { 
			return null; 
		} 
		if (!existFile(path)) { 
			return null; 
		} 
		// 目标路径 
		Path srcPath = new Path(path); 
		// 递归找到所有文件 
		try{ 
			RemoteIterator<LocatedFileStatus> filesList = fileSystem.listFiles(srcPath, true); 
			while (filesList.hasNext()) { 
				LocatedFileStatus next = filesList.next(); 
				String fileName = next.getPath().getName(); 
				Path filePath = next.getPath(); 
				Map<String, String> map = new HashMap<>(); 
				map.put("fileName", fileName); 
				map.put("filePath", filePath.toString()); 
				returnList.add(map); 
			} 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} 
		return returnList; 
	} 
 
	@Override 
	public boolean renameFile(String oldName, String newName) { 
		// TODO Auto-generated method stub 
		boolean target = false; 
		if (StringUtils.isEmpty(oldName) || StringUtils.isEmpty(newName)) { 
			return false; 
		} 
		// 原文件目标路径 
		Path oldPath = new Path(oldName); 
		// 重命名目标路径 
		Path newPath = new Path(newName); 
		try{ 
			target = fileSystem.rename(oldPath, newPath);			 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} 
		 
		return target; 
	} 
 
	@Override 
	public boolean deleteFile(String path) { 
		// TODO Auto-generated method stub 
		boolean target = false; 
		if (StringUtils.isEmpty(path)) { 
			return false; 
		} 
		if (!existFile(path)) { 
			return false; 
		} 
		Path srcPath = new Path(path); 
		try{ 
			target = fileSystem.deleteOnExit(srcPath);			 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} 
 
		return target; 
 
	} 
 
	@Override 
	public void uploadFile(String path, String uploadPath) { 
		// TODO Auto-generated method stub 
		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) { 
			return; 
		} 
		// 上传路径 
		Path clientPath = new Path(path); 
		// 目标路径 
		Path serverPath = new Path(uploadPath); 
  
		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false 
		try { 
			fileSystem.copyFromLocalFile(false, clientPath, serverPath); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
 
	} 
 
	@Override 
	public void downloadFile(String path, String downloadPath) { 
		// TODO Auto-generated method stub 
		if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) { 
			return; 
		} 
		// 上传路径 
		Path clientPath = new Path(path); 
		// 目标路径 
		Path serverPath = new Path(downloadPath); 
  
		// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false 
		try { 
			fileSystem.copyToLocalFile(false, clientPath, serverPath); 
		} catch (IOException e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		} 
 
	} 
 
	@Override 
	public void copyFile(String sourcePath, String targetPath) { 
		// TODO Auto-generated method stub 
		if (StringUtils.isEmpty(sourcePath) || StringUtils.isEmpty(targetPath)) { 
			return; 
		} 
		// 原始文件路径 
		Path oldPath = new Path(sourcePath); 
		// 目标路径 
		Path newPath = new Path(targetPath); 
  
		FSDataInputStream inputStream = null; 
		FSDataOutputStream outputStream = null; 
		try { 
			try{ 
				inputStream = fileSystem.open(oldPath); 
				outputStream = fileSystem.create(newPath); 
				 
				IOUtils.copyBytes(inputStream, outputStream, bufferSize, false);				 
			}catch(Exception e){ 
				logger.error(e.getMessage()); 
			} 
		} finally { 
			try{ 
				inputStream.close(); 
				outputStream.close();				 
			}catch(Exception e){ 
				logger.error(e.getMessage()); 
			} 
 
		} 
 
	} 
 
	@Override 
	public byte[] openFileToBytes(String path) { 
		// TODO Auto-generated method stub 
		 byte[] bytes= null; 
		if (StringUtils.isEmpty(path)) { 
			return null; 
		} 
		if (!existFile(path)) { 
			return null; 
		} 
		// 目标路径 
		Path srcPath = new Path(path); 
		try { 
			FSDataInputStream inputStream = fileSystem.open(srcPath); 
			bytes = IOUtils.readFullyToByteArray(inputStream); 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} 
		return bytes; 
 
	} 
 
	@Override 
	public BlockLocation[] getFileBlockLocations(String path) { 
		// TODO Auto-generated method stub 
		BlockLocation[] blocks = null; 
		if (StringUtils.isEmpty(path)) { 
			return null; 
		} 
		if (!existFile(path)) { 
			return null; 
		} 
		// 目标路径 
		Path srcPath = new Path(path); 
		try{ 
			FileStatus fileStatus = fileSystem.getFileStatus(srcPath); 
			blocks = fileSystem.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); 
		}catch(Exception e){ 
			logger.error(e.getMessage()); 
		} 
		return blocks; 
 
	} 
 
} 

针对Hadoop 3.0.3 的HDFS 文件操作测试Controller

package com.zzg.hadoop.controller; 
 
import java.io.FileInputStream; 
import java.io.FileNotFoundException; 
import java.io.IOException; 
import java.util.List; 
import java.util.Map; 
 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.mock.web.MockMultipartFile; 
import org.springframework.stereotype.Controller; 
import org.springframework.web.bind.annotation.RequestBody; 
import org.springframework.web.bind.annotation.RequestMapping; 
import org.springframework.web.bind.annotation.RequestMethod; 
import org.springframework.web.bind.annotation.ResponseBody; 
import org.springframework.web.multipart.MultipartFile; 
 
import com.alibaba.fastjson.JSONObject; 
import com.zzg.hadoop.config.HadoopHDFSConfig; 
import com.zzg.hadoop.service.HDFSService; 
import com.zzg.jreport.response.JreportResponse; 
import io.swagger.annotations.Api; 
import io.swagger.annotations.ApiOperation; 
import io.swagger.annotations.ApiParam; 
 
@Controller 
@RequestMapping("/api/hadoop/hdfs") 
@Api(value = "HDFS Controlle", tags = "HDFS 操作服务") 
public class HDFSController { 
	// 日志记录 
	private static final Logger logger = LoggerFactory.getLogger(HDFSController.class);	 
	@Autowired 
	private HDFSService service; 
	 
	/** 
	 * 创建的文件夹权限不够,需要设置权限问题 
	 * @param entity 
	 * @return 
	 */ 
	 
	@ApiOperation(httpMethod = "POST", value = "创建文件夹") 
	@RequestMapping(value = "/mkdirFolder", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse mkdirFolder( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		boolean target = service.mkdirFolder(entity.getString("path")); 
		return JreportResponse.ok(target); 
		 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "判断文件夹是否存在") 
	@RequestMapping(value = "/existFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse existFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		boolean target = service.existFile(entity.getString("path")); 
		return JreportResponse.ok(target); 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "读取目录") 
	@RequestMapping(value = "/readCatalog", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse readCatalog( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		 List<Map<String, Object>> list = service.readCatalog(entity.getString("path")); 
		return JreportResponse.ok(list); 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "新建文件") 
	@RequestMapping(value = "/createFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse createFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		FileInputStream inputStream = null; 
		MultipartFile file = null; 
		try { 
			inputStream = new FileInputStream("C:\\data\\words.txt"); 
			file = new MockMultipartFile("test.txt", inputStream); 
		} catch (Exception e) { 
			// TODO Auto-generated catch block 
			logger.error(e.getMessage()); 
		}finally{ 
			try { 
				inputStream.close(); 
			} catch (IOException e) { 
				// TODO Auto-generated catch block 
				logger.error(e.getMessage()); 
			} 
		} 
		service.createFile(entity.getString("path"),file); 
		return JreportResponse.ok(); 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "读取文件内容") 
	@RequestMapping(value = "/readFileContent", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse readFileContent( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		String content = service.readFileContent(entity.getString("path")); 
		return JreportResponse.ok(content); 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "文件列表") 
	@RequestMapping(value = "/listFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse listFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		List<Map<String, String>> list = service.listFile(entity.getString("path")); 
		return JreportResponse.ok(list); 
	} 
	 
	@ApiOperation(httpMethod = "POST", value = "文件重命名") 
	@RequestMapping(value = "/renameFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse renameFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		boolean target = service.renameFile(entity.getString("oldName"),entity.getString("newName")); 
		return JreportResponse.ok(target); 
	} 
	 
	/** 
	 * 指定文件位删除成功,需要寻找原因 
	 * @param entity 
	 * @return 
	 */ 
	@ApiOperation(httpMethod = "POST", value = "文件删除") 
	@RequestMapping(value = "/deleteFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse deleteFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		boolean target = service.deleteFile(entity.getString("path")); 
		return JreportResponse.ok(target); 
	} 
	 
	 
	@ApiOperation(httpMethod = "POST", value = "文件拷贝") 
	@RequestMapping(value = "/uploadFile", method = { RequestMethod.POST }, produces = "application/json;charset=UTF-8") 
	@ResponseBody 
	public JreportResponse uploadFile( 
			@RequestBody @ApiParam(name = "JSON对象", value = "json格式对象", required = true) JSONObject entity) { 
		service.uploadFile(entity.getString("path"), entity.getString("uploadPath")); 
		return JreportResponse.ok(); 
	} 
	 
	 
	 
	 
 
} 

其他业务domain、mapper、service 、serviceImpl 和controller 省略。

application.properties和log4j.properties 配置文件内容:

# æå®æå¡ç«¯å£ 
server.port=7090 
# æå®æå¡ å称 
# server.context-path=/jreport 
#mybatis xml æ件éç½® 
mybatis.mapper-locations=classpath*:mapper/hadoop/*Mapper.xml 
mybatis.type-aliases-package=com.zzg.hadoop.domain 
# MyBatis mysql8 éç½® 
spring.datasource.url=jdbc:mysql://192.168.**.**:3306/boot-security?serverTimezone=UTC&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true&nullCatalogMeansCurrent=true 
spring.datasource.username=root 
spring.datasource.password=****** 
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver 
# Druid éç½® 
# åå§åæ¶å»ºç«ç©çè¿æ¥çä¸ªæ° 
spring.datasource.druid.initial-size=5 
# æ大è¿æ¥æ± æ°é 
spring.datasource.druid.max-active=30 
# æå°è¿æ¥æ± æ°é 
spring.datasource.druid.min-idle=5 
# è·åè¿æ¥æ¶æ大ç­å¾æ¶é´ï¼åä½æ¯«ç§ 
spring.datasource.druid.max-wait=60000 
# éç½®é´éå¤ä¹æè¿è¡ä¸æ¬¡æ£æµï¼æ£æµéè¦å³é­ç空é²è¿æ¥ï¼åä½æ¯æ¯«ç§ 
spring.datasource.druid.time-between-eviction-runs-millis=60000 
# è¿æ¥ä¿æ空é²èä¸è¢«é©±éçæå°æ¶é´ 
spring.datasource.druid.min-evictable-idle-time-millis=300000 
# ç¨æ¥æ£æµè¿æ¥æ¯å¦ææçsqlï¼è¦æ±æ¯ä¸ä¸ªæ¥è¯¢è¯­å¥ 
spring.datasource.druid.validation-query=SELECT 1 FROM DUAL 
# 建议é置为trueï¼ä¸å½±åæ§è½ï¼å¹¶ä¸ä¿è¯å®å¨æ§ãç³è¯·è¿æ¥çæ¶åæ£æµï¼å¦æ空é²æ¶é´å¤§äºtimeBetweenEvictionRunsMillisï¼æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææã 
spring.datasource.druid.test-while-idle=true 
# ç³è¯·è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã 
spring.datasource.druid.test-on-borrow=false 
# å½è¿è¿æ¥æ¶æ§è¡validationQueryæ£æµè¿æ¥æ¯å¦ææï¼åäºè¿ä¸ªéç½®ä¼éä½æ§è½ã 
spring.datasource.druid.test-on-return=false 
# æ¯å¦ç¼å­preparedStatementï¼ä¹å°±æ¯PSCacheãPSCache对æ¯æ游æ çæ°æ®åºæ§è½æå巨大ï¼æ¯å¦è¯´oracleãå¨mysqlä¸å»ºè®®å³é­ã 
spring.datasource.druid.pool-prepared-statements=true 
# è¦å¯ç¨PSCacheï¼å¿é¡»é置大äº0ï¼å½å¤§äº0æ¶ï¼poolPreparedStatementsèªå¨è§¦åä¿®æ¹ä¸ºtrueã 
spring.datasource.druid.max-pool-prepared-statement-per-connection-size=50 
# éç½®çæ§ç»è®¡æ¦æªçfiltersï¼å»æåçæ§çé¢sqlæ æ³ç»è®¡ 
#spring.datasource.druid.filters=stat,wall 
# éè¿connectPropertieså±æ§æ¥æå¼mergeSqlåè½ï¼æ¢SQLè®°å½ 
spring.datasource.druid.connection-properties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=500 
# å并å¤ä¸ªDruidDataSourceççæ§æ°æ® 
spring.datasource.druid.use-global-data-source-stat=true 
# éç½®sql 注å¥æ¹å¼ 
spring.datasource.druid.filters=stat 
# æ¥å¿æ件éç½® 
#logging.config=classpath:logback.xml 
 
# hadoop hdfs éç½®åæ° 
hdfs.hdfsPath=hdfs://192.168.60.204:9000 
hdfs.hdfsName=root
#log4j.rootLogger=CONSOLE,info,error,DEBUG 
log4j.rootLogger=info,error,CONSOLE,DEBUG 
 
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender      
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout      
log4j.appender.CONSOLE.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n      
       
log4j.logger.info=info 
log4j.appender.info=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.info.layout=org.apache.log4j.PatternLayout      
log4j.appender.info.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n   
log4j.appender.info.datePattern='.'yyyy-MM-dd 
log4j.appender.info.Threshold = info    
log4j.appender.info.append=true    
#log4j.appender.info.File=/home/admin/pms-api-services/logs/info/api_services_info 
log4j.appender.info.File=C:/logs/hadoop-hdfs/logs/info/api_services_info 
 
log4j.logger.error=error   
log4j.appender.error=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.error.layout=org.apache.log4j.PatternLayout      
log4j.appender.error.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n   
log4j.appender.error.datePattern='.'yyyy-MM-dd 
log4j.appender.error.Threshold = error    
log4j.appender.error.append=true    
#log4j.appender.error.File=/home/admin/pms-api-services/logs/error/api_services_error 
log4j.appender.error.File=/C:/logs/hadoop-hdfs/logs/error/api_services_error 
 
log4j.logger.DEBUG=DEBUG 
log4j.appender.DEBUG=org.apache.log4j.DailyRollingFileAppender 
log4j.appender.DEBUG.layout=org.apache.log4j.PatternLayout      
log4j.appender.DEBUG.layout.ConversionPattern=%d{yyyy-MM-dd-HH-mm} [%t] [%c] [%p] - %m%n   
log4j.appender.DEBUG.datePattern='.'yyyy-MM-dd 
log4j.appender.DEBUG.Threshold = DEBUG    
log4j.appender.DEBUG.append=true    
#log4j.appender.DEBUG.File=/home/admin/pms-api-services/logs/debug/api_services_debug 
log4j.appender.DEBUG.File=C:/logs/hadoop-hdfs/logs/debug/api_services_debug 
 
### Debug 
log4j.logger.com.ibatis=DEBUG 
log4j.logger.com.ibatis.common.jdbc.SimpleDataSource=DEBUG 
log4j.logger.com.ibatis.common.jdbc.ScriptRunner=DEBUG 
log4j.logger.com.ibatis.sqlmap.engine.impl.SqlMapClientDelegate=DEBUG 
log4j.logger.java.sql.Connection=DEBUG 
log4j.logger.java.sql.Statement=DEBUG 
log4j.logger.java.sql.PreparedStatement=DEBUG 
 

程序入口代码:

package com.zzg.hadoop; 
 
 
import org.mybatis.spring.annotation.MapperScan; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.builder.SpringApplicationBuilder; 
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; 
import org.springframework.transaction.annotation.EnableTransactionManagement; 
 
@SpringBootApplication  
@EnableTransactionManagement 
@MapperScan("com.zzg.hadoop.mapper") 
public class Application extends SpringBootServletInitializer { 
 
	public static void main(String[] args) { 
		// TODO Auto-generated method stub 
		SpringApplication.run(Application.class, args); 
		System.out.println("============= SpringBoot hadoop hdfs Service Start Success ============="); 
	} 
	 
	@Override 
	protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { 
		// 注意这里要指向原先用main方法执行的Application启动类 
		return builder.sources(Application.class); 
	} 
 
} 

项目结构截图:

标签:Spring Boot
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

关注我们

一个IT知识分享的公众号