Hadoop 002 — HDFS常用命令及SpringBoot整合操作

发布于:2025-06-14 ⋅ 阅读:(16) ⋅ 点赞:(0)

Hadoop-HDFS操作


客户端默认地址:http://localhost:9870/explorer.html#/

在这里插入图片描述

1.HDFS命令行操作

1.1 基本命令

1.1.1 查看目录内容

列出指定路径下的文件和目录信息。

hdfs dfs -ls /test

在这里插入图片描述

1.1.2 创建目录

``创建目录,-p参数用于创建多级目录。

hdfs dfs -mkdir -p /test/user
1.1.3 删除文件或目录

删除文件或目录,-r参数用于递归删除目录(可选)。

hdfs dfs -rm  -r  /test
1.1.4 上传文件

将本地文件上传到HDFS。

hdfs dfs -put C:\Users\29699\Desktop\DeepSeek从入门到精通-清华.pdf  /test/aaa.pdf

在这里插入图片描述

1.1.5 下载文件
  • 命令:
  • 说明: 将HDFS文件下载到本地。
 hdfs dfs -get  /test/aaa.pdf   C:\Users\29699\Desktop\测试.pdf

linux

hdfs dfs -get /user/hadoop/data/file.txt /local/file.txt
1.1.6 查看文件内容

显示HDFS文件内容

hdfs dfs -cat /test/demo.txt 
1.1.7 追加内容到文件
  • 命令: hdfs dfs -appendToFile <localsrc> <dst>
  • 说明: 将本地文件内容追加到HDFS文件末尾。
hdfs dfs -appendToFile  C:\Users\29699\Desktop\append.txt   /test/demo.txt 

如果报错

appendToFile: Failed to APPEND_FILE /test/demo.txt for DFSClient_NONMAPREDUCE_1383585836_1 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1120973147_1 on 127.0.0.1

问题原因

  1. 租约占用
    HDFS 文件在写入时会获取一个租约(lease),确保同一时间只有一个客户端可以写入。若文件当前被其他客户端(如 DFSClient_NONMAPREDUCE_-1120973147_1)持有租约,新写入操作会失败 。
  2. 租约未释放
    • 旧客户端可能未正确关闭文件流,导致租约未释放。
    • 或者新客户端尝试追加时,旧租约仍在有效期内(租约硬限制为1小时)。
  3. 数据节点异常
    若数据节点(Datanode)不可用,HDFS 会尝试替换失败节点,可能导致租约冲突 。
1.1.7.1手动释放租约
hdfs debug recoverLease -path /test/demo.txt -retries 10

在这里插入图片描述

错误2:

java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[127.0.0.1:
9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]], original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.

错误提示中提到“no more good datanodes being available to try”,表明当前集群中可用的DataNode数量不足以满足副本数(replication factor)要求。例如,若副本数设置为3,但集群中仅有2个DataNode,写入时会因无法找到可用节点而失败。

因为本地只有一个副本,但是上传时候副本设置的三,导致报错,下面新创建的就正常

在这里插入图片描述

1.1.7.2修改副本数量
  • -R:递归修改目录及其子目录的所有文件。
hdfs dfs -setrep -R 1 /test

修改默认副本数

  • 修改配置文件

    • 打开 hdfs-site.xml,修改 dfs.replication 参数值:

      <property>
        <name>dfs.replication</name>
        <value>2</value> <!-- 新副本数 -->
      </property>
      
    • 作用:新写入的数据将使用此副本数,但已存在的文件副本数不变。

  • 重启服务

    • 重启NameNode和Datanode以生效配置:

      service hadoop-hdfs-namenode restart
      service hadoop-hdfs-datanode restart
      
1.1.8 移动文件
  • 命令: hdfs dfs -mv <src> <dst>
  • 说明: 移动HDFS中的文件或目录。
hdfs dfs -mv /test/demo.txt  /test/user/demo.txt 
1.1.9 复制文件
  • 命令: hdfs dfs -cp <src> <dst>

  • 说明: 复制HDFS中的文件或目录。

    hdfs dfs -cp /test/user/demo.txt   /test/user/newDemo.txt
    
1.1.10 统计文件大小
  • 命令: hdfs dfs -du [-s] <path>
  • 说明: 显示指定路径下每个文件的大小,-s参数用于统计总大小。
hdfs dfs -du -s  /test

在这里插入图片描述

1.1.11 统计文件数量和大小
  • 命令: hdfs dfs -count <path>
  • 说明: 统计指定路径下的目录个数、文件个数和文件总计大小。
hdfs dfs -count  /test

在这里插入图片描述

1.2 管理命令

1.2.1 查看HDFS状态报告
  • 说明: 显示HDFS的总容量、剩余容量、DataNode的相关信息。
 hdfs dfsadmin -report

在这里插入图片描述

1.2.2 安全模式操作
  • 命令: hdfs dfsadmin -safemode <enter | leave | get | wait>
  • 说明: 进入、离开、获取或等待安全模式状态。

在安全模式下,NameNode 会限制对文件系统的操作,仅允许读取,不允许写入、删除或修改文件。

hdfs dfsadmin -safemode enter

在这里插入图片描述

2.SpringBoot操作HDFS

1.xml依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath />
    </parent>

    <groupId>org.example</groupId>
    <artifactId>springboot-hadoop</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.8.25</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.16.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>


        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.1.3</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2.抽取yml配置

@Data
@Component
@ConfigurationProperties(prefix = "hdfs")
public class HDFSProperties {

    /**
     * host
     */
    private String host;

    /**
     * 上传基础路径
     */
    private String uploadPath;


    /**
     * 操作的用户名
     */
    private String username;


}
hdfs:
  host: hdfs://hadoop001:9000
  upload-path: /user/hadoop
  username: moshangshang

3.Hdfs操作工具类

@Service
public class HDFSService {


    @Autowired
    private HDFSProperties hdfsProperties;


    /**
     * 获取HDFS配置信息
     */
    private Configuration getHDFSConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set("dfs.client.use.datanode.hostname", "true");
        configuration.set("fs.defaultFS", hdfsProperties.getHost());
        return configuration;
    }


    /**
     * 获取FileSystem对象
     * 客户端去操作hdfs时,是有一个用户身份的,
     * 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop
     * 也可以在构造客户端fs对象时,通过参数传递进去
     */
    private FileSystem getFileSystem() throws Exception {
        return FileSystem.get(new URI(hdfsProperties.getHost())
                , getHDFSConfiguration(), hdfsProperties.getUsername());
    }


    /**
     * 递归创建目录
     */
    public boolean mkdir(String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        if (existFile(path)) {
            return true;
        }
        FileSystem fileSystem = getFileSystem();
        Path srcPath = new Path(hdfsProperties.getUploadPath() + path);
        boolean isOk = fileSystem.mkdirs(srcPath);
        fileSystem.close();
        return isOk;
    }


    /**
     * 判断HDFS文件是否存在
     */
    public boolean existFile(String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        FileSystem fileSystem = getFileSystem();
        Path srcPath = new Path(path);
        return fileSystem.exists(srcPath);
    }

    /**
     * 删除HDFS文件
     */
    public boolean deleteFile(String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return false;
        }
        if (!existFile(path)) {
            return false;
        }
        FileSystem fs = getFileSystem();
        Path srcPath = new Path(path);
        boolean isOk = fs.deleteOnExit(srcPath);
        fs.close();
        return isOk;
    }


    /**
     * 上传HDFS文件
     */
    public void uploadFile(String path, String uploadPath) throws Exception {
        if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
            return;
        }
        FileSystem fs = getFileSystem();
        // 上传路径
        Path clientPath = new Path(path);
        // 目标路径
        Path serverPath = new Path(uploadPath);
        // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
        fs.copyFromLocalFile(false, clientPath, serverPath);
        fs.close();
    }


    /**
     * 上传文件
     */
    public void uploadFile(MultipartFile multipartFile, String path) throws Exception {
        FileSystem fs = getFileSystem();
        InputStream in = multipartFile.getInputStream();
        // 输出流
        OutputStream out = fs.create(new Path(hdfsProperties.getUploadPath() + path));
        // 连接两个流,形成通道,使输入流向输出流传输数据
        IOUtils.copyBytes(in, out, 1024, true);
        out.close();
        in.close();
        fs.close();
    }


    /**
     * 读取HDFS文件内容
     */
    public void readFile(String filePath) throws Exception {
        FileSystem fs = getFileSystem();
        Path path = new Path(hdfsProperties.getUploadPath() + filePath);
        InputStream in = null;
        try {
            in = fs.open(path);
            //复制到标准输出流
            IOUtils.copyBytes(in, System.out, 4096, false);
            System.out.println("\n读取文件成功!");
        } catch (Exception e) {
            System.out.println("\n读取文件失败!");
        } finally {
            IOUtils.closeStream(in);
        }
    }


    /**
     * 下载HDFS文件
     */
    public void downloadFile(String path, String downloadPath) throws Exception {
        if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
            return;
        }
        FileSystem fs = getFileSystem();
        // 上传路径
        Path clientPath = new Path(path);
        // 目标路径
        Path serverPath = new Path(downloadPath);

        // 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
        fs.copyToLocalFile(false, clientPath, serverPath);
        fs.close();
    }


    /**
     * 追加内容到文件
     */
    public void appendFile(String path, String appendPath) throws Exception {
        if (StringUtils.isEmpty(path) || StringUtils.isEmpty(appendPath)) {
            return;
        }
        FileSystem fs = getFileSystem();
        Path filePath = new Path(hdfsProperties.getUploadPath() + path);
        FileInputStream fis = new FileInputStream(appendPath);
        fs.append(filePath).writeBytes(fis.toString());
        IOUtils.closeStream(fis);
        fs.close();
    }


    /**
     * 在HDFS创建文件,并向文件填充内容
     */
    public void createFile(String filePath, byte[] files) {
        try {
            FileSystem fs = getFileSystem();
            //目标路径
            Path path = new Path(hdfsProperties.getUploadPath() + filePath);
            //打开一个输出流
            FSDataOutputStream outputStream = fs.create(path);
            outputStream.write(files);
            outputStream.close();
            fs.close();
            System.out.println("创建文件成功!");
        } catch (Exception e) {
            System.out.println("创建文件失败!");
        }
    }


    /**
     * 下载文件
     */
    public void downloadFile(String downPath, String fileName, HttpServletResponse response) throws Exception {
        FSDataInputStream fileinput = null;
        OutputStream os = null;
        FileSystem fs = getFileSystem();
        try {
            response.setContentType("multipart/form-data");
            //设置编码格式
            response.setCharacterEncoding("UTF-8");
            //设置可以识别Html文件
            response.setContentType("text/html");
            response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
            fileinput = fs.open(new Path(hdfsProperties.getUploadPath() + downPath));
            os = response.getOutputStream();
            int b;
            byte[] buffer = new byte[1024];
            while ((b = fileinput.read(buffer)) != -1) {
                // 4.写到输出流(out)中
                os.write(buffer, 0, b);
            }
            os.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(fileinput);
            IOUtils.closeStream(os);
            IOUtils.closeStream(fs);
        }
    }


    /**
     * 读取HDFS文件列表
     */
    public List<Map<String, Object>> listFile(String filePath) throws Exception {
        filePath = hdfsProperties.getUploadPath() + filePath;
        FileSystem fs = getFileSystem();
        List<Map<String, Object>> list = new ArrayList<>();
        //递归找到所有的文件
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(filePath), true);
        while (listFiles.hasNext()) {
            Map<String, Object> map = new HashMap<>();
            LocatedFileStatus next = listFiles.next();
            String name = next.getPath().getName();
            Path path = next.getPath();
            map.put("fileName", name);
            map.put("filePath", path.toUri());
            list.add(map);
        }
        return list;
    }


    /**
     * 文件重命名
     */
    public boolean renameFile(String oldName, String newName) throws Exception {
        FileSystem fs = getFileSystem();
        Path oldPath = new Path(hdfsProperties.getUploadPath() + oldName);
        Path newPath = new Path(hdfsProperties.getUploadPath() + newName);
        boolean isOk = fs.rename(oldPath, newPath);
        fs.close();
        return isOk;
    }

    /**
     * 读取HDFS文件内容
     */
    public InputStream readFileInput(String filePath) throws Exception {
        FileSystem fs = getFileSystem();
        Path path = new Path(hdfsProperties.getUploadPath() + filePath);
        return fs.open(path);
    }


    /**
     * 获取某个文件在HDFS的集群位置
     */
    public BlockLocation[] getFileBlockLocations(String path) throws Exception {
        if (StringUtils.isEmpty(path)) {
            return null;
        }
        if (!existFile(path)) {
            return null;
        }
        FileSystem fs = getFileSystem();
        // 目标路径
        Path srcPath = new Path(hdfsProperties.getUploadPath() + path);
        FileStatus fileStatus = fs.getFileStatus(srcPath);
        return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
    }


    /**
     * 读取HDFS目录详细信息
     */
    public List<Map<String, Object>> pathInfo(String filePath) throws Exception {
        FileSystem fs = getFileSystem();
        FileStatus[] listStatus = fs.listStatus(new Path(hdfsProperties.getUploadPath() + filePath));
        List<Map<String, Object>> list = new ArrayList<>();
        SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        for (FileStatus fileStatus : listStatus) {
            Map<String, Object> map = new HashMap<>();
            Date date = new Date(fileStatus.getModificationTime());
            map.put("name", fileStatus.getPath().toUri().getPath().replace(filePath, ""));
            map.put("directory", fileStatus.isDirectory());
            map.put("time", sd.format(date));
            list.add(map);
        }
        list.sort((o1, o2) -> {
            Boolean directory1 = Boolean.parseBoolean(o1.get("directory").toString());
            Boolean directory2 = Boolean.parseBoolean(o2.get("directory").toString());
            return directory2.compareTo(directory1);
        });
        return list;
    }
}