架构师_程序员

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 166|回复: 0

[资料] 掌握HDFS的Java API接口访问

[复制链接]
跳转到指定楼层
楼主
发表于 2019-7-5 13:35:05
HDFS设计的主要目的是对海量数据进行存储,也就是说在其上能够存储很大量文件(可以存储TB级的文件)。HDFS将这些文件分割之后,存储在不同的DataNode上, HDFS 提供了两种访问接口:Shell接口和Java API 接口,对HDFS里面的文件进行操作,具体每个Block放在哪台DataNode上面,对于开发者来说是透明的。

1、获取文件系统

  1. /**
  2. * 获取文件系统
  3. *
  4. * @return FileSystem
  5. */
  6. public static FileSystem getFileSystem() {
  7.     //读取配置文件
  8.     Configuration conf = new Configuration();
  9.     // 文件系统
  10.     FileSystem fs = null;
  11.    
  12.     String hdfsUri = HDFSUri;
  13.     if(StringUtils.isBlank(hdfsUri)){
  14.         // 返回默认文件系统  如果在 Hadoop集群下运行,使用此种方法可直接获取默认文件系统
  15.         try {
  16.             fs = FileSystem.get(conf);
  17.         } catch (IOException e) {
  18.             logger.error("", e);
  19.         }
  20.     }else{
  21.         // 返回指定的文件系统,如果在本地测试,需要使用此种方法获取文件系统
  22.         try {
  23.             URI uri = new URI(hdfsUri.trim());
  24.             fs = FileSystem.get(uri,conf);
  25.         } catch (URISyntaxException | IOException e) {
  26.             logger.error("", e);
  27.         }
  28.     }
  29.         
  30.     return fs;
  31. }
复制代码
2、创建文件目录

  1. /**
  2. * 创建文件目录
  3. *
  4. * @param path
  5. */
  6. public static void mkdir(String path) {
  7.     try {
  8.         // 获取文件系统
  9.         FileSystem fs = getFileSystem();
  10.         
  11.         String hdfsUri = HDFSUri;
  12.         if(StringUtils.isNotBlank(hdfsUri)){
  13.             path = hdfsUri + path;
  14.         }
  15.         
  16.         // 创建目录
  17.         fs.mkdirs(new Path(path));
  18.         
  19.         //释放资源
  20.         fs.close();
  21.     } catch (IllegalArgumentException | IOException e) {
  22.         logger.error("", e);
  23.     }
  24. }
复制代码

3、删除文件或者文件目录

  1. /**
  2. * 删除文件或者文件目录
  3. *
  4. * @param path
  5. */
  6. public static void rmdir(String path) {
  7.     try {
  8.         // 返回FileSystem对象
  9.         FileSystem fs = getFileSystem();
  10.         
  11.         String hdfsUri = HDFSUri;
  12.         if(StringUtils.isNotBlank(hdfsUri)){
  13.             path = hdfsUri + path;
  14.         }
  15.         
  16.         // 删除文件或者文件目录  delete(Path f) 此方法已经弃用
  17.         fs.delete(new Path(path),true);
  18.         
  19.         // 释放资源
  20.         fs.close();
  21.     } catch (IllegalArgumentException | IOException e) {
  22.         logger.error("", e);
  23.     }
  24. }
复制代码

4、根据filter获取目录下的文件

  1. /**
  2. * 根据filter获取目录下的文件
  3. *
  4. * @param path
  5. * @param pathFilter
  6. * @return String[]
  7. */
  8. public static String[] ListFile(String path,PathFilter pathFilter) {
  9.     String[] files = new String[0];
  10.    
  11.     try {
  12.         // 返回FileSystem对象
  13.         FileSystem fs = getFileSystem();
  14.         
  15.         String hdfsUri = HDFSUri;
  16.         if(StringUtils.isNotBlank(hdfsUri)){
  17.             path = hdfsUri + path;
  18.         }
  19.         
  20.         FileStatus[] status;
  21.         if(pathFilter != null){
  22.             // 根据filter列出目录内容
  23.             status = fs.listStatus(new Path(path),pathFilter);
  24.         }else{
  25.             // 列出目录内容
  26.             status = fs.listStatus(new Path(path));
  27.         }
  28.         
  29.         // 获取目录下的所有文件路径
  30.         Path[] listedPaths = FileUtil.stat2Paths(status);
  31.         // 转换String[]
  32.         if (listedPaths != null && listedPaths.length > 0){
  33.             files = new String[listedPaths.length];
  34.             for (int i = 0; i < files.length; i++){
  35.                 files[i] = listedPaths[i].toString();
  36.             }
  37.         }
  38.         // 释放资源
  39.         fs.close();
  40.     } catch (IllegalArgumentException | IOException e) {
  41.         logger.error("", e);
  42.     }
  43.    
  44.     return files;
  45. }
复制代码

5、文件上传至 HDFS

  1. /**
  2. * 文件上传至 HDFS
  3. *
  4. * @param delSrc
  5. * @param overwrite
  6. * @param srcFile
  7. * @param destPath
  8. */
  9. public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
  10.     // 源文件路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/weibo.txt
  11.     Path srcPath = new Path(srcFile);
  12.    
  13.     // 目的路径
  14.     String hdfsUri = HDFSUri;
  15.     if(StringUtils.isNotBlank(hdfsUri)){
  16.         destPath = hdfsUri + destPath;
  17.     }
  18.     Path dstPath = new Path(destPath);
  19.    
  20.     // 实现文件上传
  21.     try {
  22.         // 获取FileSystem对象
  23.         FileSystem fs = getFileSystem();
  24.         fs.copyFromLocalFile(srcPath, dstPath);
  25.         fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
  26.         //释放资源
  27.         fs.close();
  28.     } catch (IOException e) {
  29.         logger.error("", e);
  30.     }
  31. }
复制代码
6、从 HDFS 下载文件

  1. /**
  2. * 从 HDFS 下载文件
  3. *
  4. * @param srcFile
  5. * @param destPath
  6. */
  7. public static void getFile(String srcFile,String destPath) {
  8.     // 源文件路径
  9.     String hdfsUri = HDFSUri;
  10.     if(StringUtils.isNotBlank(hdfsUri)){
  11.         srcFile = hdfsUri + srcFile;
  12.     }
  13.     Path srcPath = new Path(srcFile);
  14.    
  15.     // 目的路径是Linux下的路径,如果在 windows 下测试,需要改写为Windows下的路径,比如D://hadoop/djt/
  16.     Path dstPath = new Path(destPath);
  17.    
  18.     try {
  19.         // 获取FileSystem对象
  20.         FileSystem fs = getFileSystem();
  21.         // 下载hdfs上的文件
  22.         fs.copyToLocalFile(srcPath, dstPath);
  23.         // 释放资源
  24.         fs.close();
  25.     } catch (IOException e) {
  26.         logger.error("", e);
  27.     }
  28. }
复制代码

7、获取 HDFS 集群节点信息

  1. /**
  2. * 获取 HDFS 集群节点信息
  3. *
  4. * @return DatanodeInfo[]
  5. */
  6. public static DatanodeInfo[] getHDFSNodes() {
  7.     // 获取所有节点
  8.     DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
  9.    
  10.     try {
  11.         // 返回FileSystem对象
  12.         FileSystem fs = getFileSystem();
  13.         
  14.         // 获取分布式文件系统
  15.         DistributedFileSystem hdfs = (DistributedFileSystem)fs;
  16.         
  17.         dataNodeStats = hdfs.getDataNodeStats();
  18.     } catch (IOException e) {
  19.         logger.error("", e);
  20.     }
  21.     return dataNodeStats;
  22. }
复制代码

8、查找某个文件在 HDFS集群的位置

  1. /**
  2. * 查找某个文件在 HDFS集群的位置
  3. *
  4. * @param filePath
  5. * @return BlockLocation[]
  6. */
  7. public static BlockLocation[] getFileBlockLocations(String filePath) {
  8.     // 文件路径
  9.     String hdfsUri = HDFSUri;
  10.     if(StringUtils.isNotBlank(hdfsUri)){
  11.         filePath = hdfsUri + filePath;
  12.     }
  13.     Path path = new Path(filePath);
  14.    
  15.     // 文件块位置列表
  16.     BlockLocation[] blkLocations = new BlockLocation[0];
  17.     try {
  18.         // 返回FileSystem对象
  19.         FileSystem fs = getFileSystem();
  20.         // 获取文件目录
  21.         FileStatus filestatus = fs.getFileStatus(path);
  22.         //获取文件块位置列表
  23.         blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
  24.     } catch (IOException e) {
  25.         logger.error("", e);
  26.     }
  27.     return blkLocations;
  28. }
复制代码

9、文件重命名

  1. /**
  2. * 文件重命名
  3. *
  4. * @param srcPath
  5. * @param dstPath
  6. */
  7. public boolean rename(String srcPath, String dstPath){
  8.     boolean flag = false;
  9.     try    {
  10.         // 返回FileSystem对象
  11.         FileSystem fs = getFileSystem();
  12.         
  13.         String hdfsUri = HDFSUri;
  14.         if(StringUtils.isNotBlank(hdfsUri)){
  15.             srcPath = hdfsUri + srcPath;
  16.             dstPath = hdfsUri + dstPath;
  17.         }
  18.         
  19.         flag = fs.rename(new Path(srcPath), new Path(dstPath));
  20.     } catch (IOException e) {
  21.         logger.error("{} rename to {} error.", srcPath, dstPath);
  22.     }
  23.    
  24.     return flag;
  25. }
复制代码

10、判断目录是否存在

  1. /**
  2. * 判断目录是否存在
  3. *
  4. * @param srcPath
  5. * @param dstPath
  6. */
  7. public boolean existDir(String filePath, boolean create){
  8.     boolean flag = false;
  9.    
  10.     if (StringUtils.isEmpty(filePath)){
  11.         return flag;
  12.     }
  13.    
  14.     try{
  15.         Path path = new Path(filePath);
  16.         // FileSystem对象
  17.         FileSystem fs = getFileSystem();
  18.         
  19.         if (create){
  20.             if (!fs.exists(path)){
  21.                 fs.mkdirs(path);
  22.             }
  23.         }
  24.         
  25.         if (fs.isDirectory(path)){
  26.             flag = true;
  27.         }
  28.     }catch (Exception e){
  29.         logger.error("", e);
  30.     }
  31.    
  32.     return flag;
  33. }
  34. 查看HDFS文件的最后修改时间  

  35. public void testgetModifyTime() throws Exception {  
  36.         Configuration conf = new Configuration();  
  37.         FileSystem hdfs = FileSystem.get(conf);  
  38.         Path dst = new Path(hdfsPath);  
  39.         FileStatus files[] = hdfs.listStatus(dst);  
  40. for (FileStatus file : files) {  
  41.             System.out.println(file.getPath() + "\t"  
  42.                     + file.getModificationTime());  
  43.             System.out.println(file.getPath() + "\t"  
  44.                     + new Date(file.getModificationTime()));  
  45.         }

  46.    

  47.    // 查看HDFS文件是否存在  
  48.   
  49.     public void testExists() throws Exception {  
  50.   
  51.         Configuration conf = new Configuration();  
  52.          
  53.         FileSystem hdfs = FileSystem.get(conf);  
  54.         Path dst = new Path(hdfsPath + "file01.txt");  
  55.   
  56.         boolean ok = hdfs.exists(dst);  
  57.         System.out.println(ok ? "文件存在" : "文件不存在");  
  58.     }  

  59.    

  60.     // 获取HDFS集群上所有节点名称  
  61.     public void testGetHostName() throws Exception {  
  62.   
  63.         Configuration conf = new Configuration();  
  64.          
  65.         DistributedFileSystem hdfs = (DistributedFileSystem) FileSystem  
  66.                 .get(conf);  
  67.         DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();  
  68.   
  69.         for (DatanodeInfo dataNode : dataNodeStats) {  
  70.             System.out.println(dataNode.getHostName() + "\t"  
  71.                     + dataNode.getName());  
  72.         }  
  73.     }
复制代码

(完)





上一篇:一文彻底明白linux中的selinux到底是什么
下一篇:Hadoop HDFS Shell 命令汇总
帖子永久地址: 

架构师_程序员 - 论坛版权1、本主题所有言论和图片纯属会员个人意见,与本论坛立场无关
2、本站所有主题由该帖子作者发表,该帖子作者与架构师_程序员享有帖子相关版权
3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和架构师_程序员的同意
4、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任
5、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责
6、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意
7、架构师_程序员管理员和版主有权不事先通知发贴者而删除本文

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

免责声明:
码农网所发布的一切软件、编程资料或者文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。

Mail To:help@itsvse.com

QQ|Archiver|手机版|小黑屋|架构师 ( 鲁ICP备14021824号-2 )|网站地图

GMT+8, 2019-10-21 12:41

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表