Kaze
Kaze
Published on 2023-10-20 / 16 Visits
0
0

Hadoop-HDFS

概念

HDFS(Hadoop Distributed File System),Hadoop分布式文件系统

HDFS的使用场景

适合一次写入,多次读出的场景。

HDFS的优点

  • 高容错性

  • 适合处理大数据

  • 可构建在廉价机器上,通过多副本机制,提高可靠性

HDFS的缺点

  • 不适合低延时数据访问
  • 无法高效的对大量小文件进行存储
  • 不支持并发写入、文件随机修改,只能追加写入

HDFS组成

image-20231012144340274 image-20231012144544084

HDFS文件块大小

image-20231012144711320 image-20231012144947666

HDFS的Shell操作

基本语法:

hadoop fs 具体命令 

OR

hdfs dfs 具体命令

两个是完全相同的

上传

-moveFromLocal:从本地剪切粘贴到HDFS

hadoop fs -moveFromLocal ./shuguo.txt /sanguo

-copyFromLocal:从本地文件系统中拷贝文件到HDFS路径去

hadoop fs -copyFromLocal weiguo.txt /sanguo

-put:等同于copyFromLocal,生产环境更习惯用put

hadoop fs -put ./wuguo.txt /sanguo

-appendToFile:追加一个文件到已经存在的文件末尾

hadoop fs -appendToFile liubei.txt /sanguo/shuguo.txt

下载

-copyToLocal:从HDFS拷贝到本地

hadoop fs -copyToLocal /sanguo/shuguo.txt ./

-get:等同于copyToLocal,生产环境更习惯用get

hadoop fs -get /sanguo/shuguo.txt ./shuguo2.txt

直接操作

-ls: 显示目录信息

hadoop fs -ls /sanguo

-cat:显示文件内容

hadoop fs -cat /sanguo/shuguo.txt

-chgrp、-chmod、-chown:Linux文件系统中的用法一样,修改文件所属权限

hadoop fs -chmod 666 /sanguo/shuguo.txt
hadoop fs -chown kaze:kaze  /sanguo/shuguo.txt

-mkdir:创建路径

hadoop fs -mkdir /jinguo

-cp:从HDFS的一个路径拷贝到HDFS的另一个路径

hadoop fs -cp /sanguo/shuguo.txt /jinguo

-mv:在HDFS目录中移动文件

hadoop fs -mv /sanguo/wuguo.txt /jinguo

-tail:显示一个文件的末尾1kb的数据

hadoop fs -tail /jinguo/shuguo.txt

-rm:删除文件或文件夹

hadoop fs -rm /sanguo/shuguo.txt

-rm -r:递归删除目录及目录里面内容

hadoop fs -rm -r /sanguo

-du:统计文件夹的大小信息

hadoop fs -du -s -h /jinguo // 统计整个文件夹的大小信息
hadoop fs -du -h /jinguo // 统计文件夹中各个文件的大小信息

-setrep:设置HDFS中文件的副本数量

hadoop fs -setrep 10 /jinguo/shuguo.txt

这里设置的副本数只是记录在NameNode的元数据中,是否真的会有这么多副本,还得看DataNode的数量。如果只有3台设备,最多也就3个副本,只有节点数增加到10台时,副本数才能达到10。

HDFS的API操作

Windows环境下需要先安装hadoop的winutils,配置完环境变量后,双击winutils.exe即可

创建Maven项目并引入hadoop依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
</dependency>

在项目的src/main/resources目录下,新建一个文件,命名为log4j.properties,在文件中填入

log4j.rootLogger=INFO, stdout  
log4j.appender.stdout=org.apache.log4j.ConsoleAppender  
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout  
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n  
log4j.appender.logfile=org.apache.log4j.FileAppender  
log4j.appender.logfile.File=target/spring.log  
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout  
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

示例程序(SpringBoot)

/**
 * @author kaze
 * @date 2023/10/11 17:22
 */
@Component
public class HdfsClient {
    private FileSystem fs;
    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsClient.class);


    /**
     * 初始化
     */
    @PostConstruct
    public void init() throws URISyntaxException, IOException, InterruptedException {
        Configuration configuration = new Configuration();
        this.fs = FileSystem.get(new URI("hdfs://hadoop102:8020"), configuration,"kaze");
        LOGGER.info("获取文件系统成功");
    }

    /**
     * 关闭资源
     */
    @PreDestroy
    public void close() throws IOException {
        fs.close();
        LOGGER.info("关闭资源成功");
    }

    /**
     * 创建目录
     * @param path 文件夹路径
     */
    public void mkdir(String path) throws IOException {
        fs.mkdirs(new Path(path));
    }

    /**
     * 上传文件
     * @param src 源路径
     * @param dst 目标路径
     */
    public void put(String src, String dst) throws IOException {
        fs.copyFromLocalFile(new Path(src), new Path(dst));
    }

    /**
     * 下载文件
     * @param src 源路径
     * @param dst 目标路径
     */
    public void get(String src, String dst) throws IOException {
        fs.copyToLocalFile(new Path(src), new Path(dst));
    }

    /**
     * 文件重命名、移动
     * @param oldName 原名称
     * @param newName 新名称
     */
    public void rename(String oldName, String newName) throws IOException {
        fs.rename(new Path(oldName), new Path(newName));
    }

    /**
     * 删除文件或文件夹
     * @param path 文件或文件夹路径
     */
    public void delete(String path) throws IOException {
        // 第二个参数表示是否递归删除
        fs.delete(new Path(path), true);
    }

    /**
     * 获取文件详情信息
     * @param path 文件路径
     */
    public void fileDetail(String path) throws IOException {
        // 第二个参数表示是否递归,若为true则遍历这个目录及子目录下所有文件,若为false则只遍历当前目录下的文件
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(path), false);
        while (listFiles.hasNext()) {
            LocatedFileStatus fileStatus = listFiles.next();

            System.out.println("========" + fileStatus.getPath() + "=========");
            System.out.println(fileStatus.getPermission());
            System.out.println(fileStatus.getOwner());
            System.out.println(fileStatus.getGroup());
            System.out.println(fileStatus.getLen());
            System.out.println(fileStatus.getModificationTime());
            System.out.println(fileStatus.getReplication());
            System.out.println(fileStatus.getBlockSize());
            System.out.println(fileStatus.getPath().getName());
            // 获取块信息
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            System.out.println(Arrays.toString(blockLocations));
        }
    }

    /**
     * 判断是文件还是文件夹
     * @param path 路径
     */
    public void listStatus(String path) throws IOException {
        // 只能遍历当前目录的文件和文件夹
        FileStatus[] listStatus = fs.listStatus(new Path(path));
        for (FileStatus status : listStatus) {
            if (status.isFile()) {
                System.out.println("文件:" + status.getPath().getName());
            } else {
                System.out.println("文件夹:" + status.getPath().getName());
            }
        }
    }
}

HDFS的配置优先级

客户端代码中设置的值 > ClassPath下的用户自定义配置文件 > 然后是服务器的自定义配置(xxx-site.xml) > 服务器的默认配置(xxx-default.xml)

HDFS读写流程

HDFS写数据流程

image-20231012152856911
流程剖析
  1. 客户端通过Distributed File System模块NameNode请求上传文件,NameNode检查是否有写入权限,目标文件是否已存在,父目录是否存在。

  2. NameNode返回是否可以上传。

  3. 客户端请求第一个 Block上传到哪几个DataNode服务器上。

  4. NameNode返回3个DataNode节点,分别为dn1、dn2、dn3。

    返回哪些节点跟设置的副本策略有关系,设置几个,就返回几个DN,并且会根据机架感知,返回距离待上传数据最近的DataNode接收数据

  5. 客户端通过FSDataOutputStream模块请求dn1上传数据,dn1收到请求会继续调用dn2,然后dn2调用dn3,将这个 pipeline 通信管道建立完成。

  6. dn3应答->dn2应答->dn1应答(逐级应答,dn1会包含dn2和dn3的应答结果)。

  7. 客户端开始往dn1上传第一个Block(先从磁盘读取数据放到一个本地内存缓存),以Packet为单位(64kb)(由多个chunk 512byte和checksum 4byte 组成),dn1收到一个Packet就会传给dn2,dn2传给dn3;

    dn1每传一个packet会放入一个应答队列等待应答。

    ack队列data队列,每传输一个packet都会从data队列将该packet移到ack队列中,等待客户端收到ack响应,就会把ack队列的packet移除,此时packet传输成功,若传输失败,会将packet从ack队列移到data队列,然后重新发送。

    如果传输的过程中某个节点宕机了(dn2),那么会将dn2移除pipeline通信管道,只传输dn1和dn3,就相当于只有2个同步副本,当数据传输完毕后,dn1和dn3会主动向NameNode上报自己的block信息,当NameNode发现该文件只有2个block副本的时候,NameNode会再找一个节点,叫dn1或dn3把block复制一份到该节点。若3个节点都宕机,pipeline通道不可能建立,那么此时写操作将失败。

  8. 当一个Block传输完成之后,客户端再次请求NameNode上传第二个Block的服务器。(重复执行3-7步)。

节点距离计算

节点距离:两个节点到达最近的共同祖先的距离总和。

机架感知
image-20231012161323282

HDFS读数据流程

image-20231012161554845
流程剖析
  1. 客户端通过Distributed File System向NameNode请求下载文件,NameNode通过查询元数据,找到文件块所在的DataNode地址。

  2. 挑选一台DataNode(就近原则,然后随机)服务器,请求读取数据。

  3. DataNode开始传输数据给客户端(从磁盘里面读取数据输入流,以Packet为单位来做校验)。

  4. 客户端以Packet为单位接收,先在本地缓存,然后写入目标文件。

注意:

  1. 同一个文件的几个block可以在不同的DataNode上。

  2. 读文件只能按block的顺序读,并且只能串行读block,不能并行。

NameNode和SecondaryNameNode

image-20231012162736150 image-20231012163202022

DataNode

image-20231012163740048

Comment