架构师_程序员

 找回密码
 注册[Register]

QQ登录

只需一步,快速开始

查看: 188|回复: 0

[资料] Hadoop- HDFS的API操作

[复制链接]
跳转到指定楼层
楼主
发表于 2019-7-5 16:33:58
1、引入依赖

  1. <dependency>
  2.     <groupId>org.apache.hadoop</groupId>
  3.     <artifactId>hadoop-client</artifactId>
  4.     <version>2.6.1</version>
  5. </dependency>
复制代码
注:如需手动引入jar包,hdfs的jar包----hadoop的安装目录的share下

2、window下开发的说明

建议在linux下进行hadoop应用的开发,不会存在兼容性问题。如在window上做客户端应用开发,需要设置以下环境:

A、在windows的某个目录下解压一个hadoop的安装包

B、将安装包下的lib和bin目录用对应windows版本平台编译的本地库替换

C、在window系统中配置HADOOP_HOME指向你解压的安装包

D、在windows系统的path变量中加入hadoop的bin目录


在java中操作hdfs,首先要获得一个客户端实例


  1. Configuration conf = new Configuration()

  2. FileSystem fs = FileSystem.get(conf)
复制代码


而我们的操作目标是HDFS,所以获取到的fs对象应该是DistributedFileSystem的实例;

get方法是从何处判断具体实例化那种客户端类呢?

——从conf中的一个参数 fs.defaultFS的配置值判断;

如果我们的代码中没有指定fs.defaultFS,并且工程classpath下也没有给定相应的配置,conf中的默认值就来自于hadoop的jar包中的core-default.xml,默认值为: file:///,则获取的将不是一个DistributedFileSystem的实例,而是一个本地文件系统的客户端对象;


文件的增删改查

  1. public class HdfsClient {

  2.     FileSystem fs = null;

  3.     @Before
  4.     public void init() throws Exception {

  5.         // 构造一个配置参数对象,设置一个参数:我们要访问的hdfs的URI
  6.         // 从而FileSystem.get()方法就知道应该是去构造一个访问hdfs文件系统的客户端,以及hdfs的访问地址
  7.         // new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml
  8.         // 然后再加载classpath下的hdfs-site.xml
  9.         Configuration conf = new Configuration();
  10.         conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
  11.         /**
  12.          * 参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
  13.          */
  14.         conf.set("dfs.replication", "3");

  15.         // 获取一个hdfs的访问客户端,根据参数,这个实例应该是DistributedFileSystem的实例
  16.         // fs = FileSystem.get(conf);

  17.         // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
  18.         fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

  19.     }

  20.     /**
  21.      * 往hdfs上传文件
  22.      *
  23.      * @throws Exception
  24.      */
  25.     @Test
  26.     public void testAddFileToHdfs() throws Exception {

  27.         // 要上传的文件所在的本地路径
  28.         Path src = new Path("g:/redis-recommend.zip");
  29.         // 要上传到hdfs的目标路径
  30.         Path dst = new Path("/aaa");
  31.         fs.copyFromLocalFile(src, dst);
  32.         fs.close();
  33.     }

  34.     /**
  35.      * 从hdfs中复制文件到本地文件系统
  36.      *
  37.      * @throws IOException
  38.      * @throws IllegalArgumentException
  39.      */
  40.     @Test
  41.     public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
  42.         fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
  43.         fs.close();
  44.     }

  45.     @Test
  46.     public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {

  47.         // 创建目录
  48.         fs.mkdirs(new Path("/a1/b1/c1"));

  49.         // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
  50.         fs.delete(new Path("/aaa"), true);

  51.         // 重命名文件或文件夹
  52.         fs.rename(new Path("/a1"), new Path("/a2"));

  53.     }

  54.     /**
  55.      * 查看目录信息,只显示文件
  56.      *
  57.      * @throws IOException
  58.      * @throws IllegalArgumentException
  59.      * @throws FileNotFoundException
  60.      */
  61.     @Test
  62.     public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {

  63.         // 思考:为什么返回迭代器,而不是List之类的容器
  64.         RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

  65.         while (listFiles.hasNext()) {
  66.             LocatedFileStatus fileStatus = listFiles.next();
  67.             System.out.println(fileStatus.getPath().getName());
  68.             System.out.println(fileStatus.getBlockSize());
  69.             System.out.println(fileStatus.getPermission());
  70.             System.out.println(fileStatus.getLen());
  71.             BlockLocation[] blockLocations = fileStatus.getBlockLocations();
  72.             for (BlockLocation bl : blockLocations) {
  73.                 System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
  74.                 String[] hosts = bl.getHosts();
  75.                 for (String host : hosts) {
  76.                     System.out.println(host);
  77.                 }
  78.             }
  79.             System.out.println("--------------为angelababy打印的分割线--------------");
  80.         }
  81.     }

  82.     /**
  83.      * 查看文件及文件夹信息
  84.      *
  85.      * @throws IOException
  86.      * @throws IllegalArgumentException
  87.      * @throws FileNotFoundException
  88.      */
  89.     @Test
  90.     public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {

  91.         FileStatus[] listStatus = fs.listStatus(new Path("/"));

  92.         String flag = "d--             ";
  93.         for (FileStatus fstatus : listStatus) {
  94.             if (fstatus.isFile())  flag = "f--         ";
  95.             System.out.println(flag + fstatus.getPath().getName());
  96.         }
  97.     }
  98. }
复制代码
通过流的方式访问hdfs

  1. /**
  2. * 相对那些封装好的方法而言的更底层一些的操作方式
  3. * 上层那些mapreduce   spark等运算框架,去hdfs中获取数据的时候,就是调的这种底层的api
  4. * @author
  5. *
  6. */
  7. public class StreamAccess {
  8.    
  9.     FileSystem fs = null;

  10.     @Before
  11.     public void init() throws Exception {

  12.         Configuration conf = new Configuration();
  13.         fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");

  14.     }
  15.    
  16.    
  17.    
  18.     @Test
  19.     public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
  20.         
  21.         //先获取一个文件的输入流----针对hdfs上的
  22.         FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
  23.         
  24.         //再构造一个文件的输出流----针对本地的
  25.         FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
  26.         
  27.         //再将输入流中数据传输到输出流
  28.         IOUtils.copyBytes(in, out, 4096);
  29.         
  30.         
  31.     }
  32.    
  33.    
  34.     /**
  35.      * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
  36.      * 用于上层分布式运算框架并发处理数据
  37.      * @throws IllegalArgumentException
  38.      * @throws IOException
  39.      */
  40.     @Test
  41.     public void testRandomAccess() throws IllegalArgumentException, IOException{
  42.         //先获取一个文件的输入流----针对hdfs上的
  43.         FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
  44.         
  45.         
  46.         //可以将流的起始偏移量进行自定义
  47.         in.seek(22);
  48.         
  49.         //再构造一个文件的输出流----针对本地的
  50.         FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
  51.         
  52.         IOUtils.copyBytes(in,out,19L,true);
  53.         
  54.     }
  55.    
  56.    
  57.    
  58.     /**
  59.      * 显示hdfs上文件的内容
  60.      * @throws IOException
  61.      * @throws IllegalArgumentException
  62.      */
  63.     @Test
  64.     public void testCat() throws IllegalArgumentException, IOException{
  65.         
  66.         FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
  67.         
  68.         IOUtils.copyBytes(in, System.out, 1024);
  69.     }
  70. }


  71. 7.4.3 场景编程
  72. 在mapreduce 、spark等运算框架中,有一个核心思想就是将运算移往数据,或者说,就是要在并发计算中尽可能让运算本地化,这就需要获取数据所在位置的信息并进行相应范围读取
  73. 以下模拟实现:获取一个文件的所有block位置信息,然后读取指定block中的内容
  74.     @Test
  75.     public void testCat() throws IllegalArgumentException, IOException{
  76.         
  77.         FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
  78.         //拿到文件信息
  79.         FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
  80.         //获取这个文件的所有block的信息
  81.         BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());
  82.         //第一个block的长度
  83.         long length = fileBlockLocations[0].getLength();
  84.         //第一个block的起始偏移量
  85.         long offset = fileBlockLocations[0].getOffset();
  86.         
  87.         System.out.println(length);
  88.         System.out.println(offset);
  89.         
  90.         //获取第一个block写入输出流
  91. //        IOUtils.copyBytes(in, System.out, (int)length);
  92.         byte[] b = new byte[4096];
  93.         
  94.         FileOutputStream os = new FileOutputStream(new File("d:/block0"));
  95.         while(in.read(offset, b, 0, 4096)!=-1){
  96.             os.write(b);
  97.             offset += 4096;
  98.             if(offset>=length) return;
  99.         };
  100.         os.flush();
  101.         os.close();
  102.         in.close();
  103.     }
复制代码






上一篇:Hadoop HDFS Shell 命令汇总
下一篇:C# Http请求 上传表单文件(可添加其他参数)
帖子永久地址: 

架构师_程序员 - 论坛版权1、本主题所有言论和图片纯属会员个人意见,与本论坛立场无关
2、本站所有主题由该帖子作者发表,该帖子作者与架构师_程序员享有帖子相关版权
3、其他单位或个人使用、转载或引用本文时必须同时征得该帖子作者和架构师_程序员的同意
4、帖子作者须承担一切因本文发表而直接或间接导致的民事或刑事法律责任
5、本帖部分内容转载自其它媒体,但并不代表本站赞同其观点和对其真实性负责
6、如本帖侵犯到任何版权问题,请立即告知本站,本站将及时予与删除并致以最深的歉意
7、架构师_程序员管理员和版主有权不事先通知发贴者而删除本文

码农网,只发表在实践过程中,遇到的技术难题,不误导他人。
您需要登录后才可以回帖 登录 | 注册[Register]

本版积分规则

免责声明:
码农网所发布的一切软件、编程资料或者文章仅限用于学习和研究目的;不得将上述内容用于商业或者非法用途,否则,一切后果请用户自负。本站信息来自网络,版权争议与本站无关。您必须在下载后的24个小时之内,从您的电脑中彻底删除上述内容。如果您喜欢该程序,请支持正版软件,购买注册,得到更好的正版服务。

Mail To:help@itsvse.com

QQ|Archiver|手机版|小黑屋|架构师 ( 鲁ICP备14021824号-2 )|网站地图

GMT+8, 2019-10-21 13:20

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表