Hadoop-HDFS操作
文章目录
客户端默认地址:http://localhost:9870/explorer.html#/
1.HDFS命令行操作
1.1 基本命令
1.1.1 查看目录内容
列出指定路径下的文件和目录信息。
hdfs dfs -ls /test
1.1.2 创建目录
``创建目录,-p
参数用于创建多级目录。
hdfs dfs -mkdir -p /test/user
1.1.3 删除文件或目录
删除文件或目录,-r
参数用于递归删除目录(可选)。
hdfs dfs -rm -r /test
1.1.4 上传文件
将本地文件上传到HDFS。
hdfs dfs -put C:\Users\29699\Desktop\DeepSeek从入门到精通-清华.pdf /test/aaa.pdf
1.1.5 下载文件
- 命令:
- 说明: 将HDFS文件下载到本地。
hdfs dfs -get /test/aaa.pdf C:\Users\29699\Desktop\测试.pdf
linux
hdfs dfs -get /user/hadoop/data/file.txt /local/file.txt
1.1.6 查看文件内容
显示HDFS文件内容
hdfs dfs -cat /test/demo.txt
1.1.7 追加内容到文件
- 命令:
hdfs dfs -appendToFile <localsrc> <dst>
- 说明: 将本地文件内容追加到HDFS文件末尾。
hdfs dfs -appendToFile C:\Users\29699\Desktop\append.txt /test/demo.txt
如果报错
appendToFile: Failed to APPEND_FILE /test/demo.txt for DFSClient_NONMAPREDUCE_1383585836_1 on 127.0.0.1 because this file lease is currently owned by DFSClient_NONMAPREDUCE_-1120973147_1 on 127.0.0.1
问题原因
- 租约占用
HDFS 文件在写入时会获取一个租约(lease),确保同一时间只有一个客户端可以写入。若文件当前被其他客户端(如DFSClient_NONMAPREDUCE_-1120973147_1
)持有租约,新写入操作会失败 。 - 租约未释放
- 旧客户端可能未正确关闭文件流,导致租约未释放。
- 或者新客户端尝试追加时,旧租约仍在有效期内(租约硬限制为1小时)。
- 数据节点异常
若数据节点(Datanode)不可用,HDFS 会尝试替换失败节点,可能导致租约冲突 。
1.1.7.1手动释放租约
hdfs debug recoverLease -path /test/demo.txt -retries 10
错误2:
java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[DatanodeInfoWithStorage[127.0.0.1:
9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]], original=[DatanodeInfoWithStorage[127.0.0.1:9866,DS-8452ef95-afeb-4799-993b-564b8c1e18ca,DISK]]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
错误提示中提到“no more good datanodes being available to try”,表明当前集群中可用的DataNode数量不足以满足副本数(replication factor)要求。例如,若副本数设置为3,但集群中仅有2个DataNode,写入时会因无法找到可用节点而失败。
因为本地只有一个副本,但是上传时候副本设置的三,导致报错
,下面新创建的就正常
1.1.7.2修改副本数量
-R
:递归修改目录及其子目录的所有文件。
hdfs dfs -setrep -R 1 /test
修改默认副本数
修改配置文件:
打开
hdfs-site.xml
,修改dfs.replication
参数值:<property> <name>dfs.replication</name> <value>2</value> <!-- 新副本数 --> </property>
作用:新写入的数据将使用此副本数,但已存在的文件副本数不变。
重启服务:
重启NameNode和Datanode以生效配置:
service hadoop-hdfs-namenode restart service hadoop-hdfs-datanode restart
1.1.8 移动文件
- 命令:
hdfs dfs -mv <src> <dst>
- 说明: 移动HDFS中的文件或目录。
hdfs dfs -mv /test/demo.txt /test/user/demo.txt
1.1.9 复制文件
命令:
hdfs dfs -cp <src> <dst>
说明: 复制HDFS中的文件或目录。
hdfs dfs -cp /test/user/demo.txt /test/user/newDemo.txt
1.1.10 统计文件大小
- 命令:
hdfs dfs -du [-s] <path>
- 说明: 显示指定路径下每个文件的大小,
-s
参数用于统计总大小。
hdfs dfs -du -s /test
1.1.11 统计文件数量和大小
- 命令:
hdfs dfs -count <path>
- 说明: 统计指定路径下的目录个数、文件个数和文件总计大小。
hdfs dfs -count /test
1.2 管理命令
1.2.1 查看HDFS状态报告
- 说明: 显示HDFS的总容量、剩余容量、DataNode的相关信息。
hdfs dfsadmin -report
1.2.2 安全模式操作
- 命令:
hdfs dfsadmin -safemode <enter | leave | get | wait>
- 说明: 进入、离开、获取或等待安全模式状态。
在安全模式下,NameNode 会限制对文件系统的操作,仅允许读取,不允许写入、删除或修改文件。
hdfs dfsadmin -safemode enter
2.SpringBoot操作HDFS
1.xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.0</version>
<relativePath />
</parent>
<groupId>org.example</groupId>
<artifactId>springboot-hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.16.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.抽取yml配置
@Data
@Component
@ConfigurationProperties(prefix = "hdfs")
public class HDFSProperties {
/**
* host
*/
private String host;
/**
* 上传基础路径
*/
private String uploadPath;
/**
* 操作的用户名
*/
private String username;
}
hdfs:
host: hdfs://hadoop001:9000
upload-path: /user/hadoop
username: moshangshang
3.Hdfs操作工具类
@Service
public class HDFSService {
@Autowired
private HDFSProperties hdfsProperties;
/**
* 获取HDFS配置信息
*/
private Configuration getHDFSConfiguration() {
Configuration configuration = new Configuration();
configuration.set("dfs.client.use.datanode.hostname", "true");
configuration.set("fs.defaultFS", hdfsProperties.getHost());
return configuration;
}
/**
* 获取FileSystem对象
* 客户端去操作hdfs时,是有一个用户身份的,
* 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop
* 也可以在构造客户端fs对象时,通过参数传递进去
*/
private FileSystem getFileSystem() throws Exception {
return FileSystem.get(new URI(hdfsProperties.getHost())
, getHDFSConfiguration(), hdfsProperties.getUsername());
}
/**
* 递归创建目录
*/
public boolean mkdir(String path) throws Exception {
if (StringUtils.isEmpty(path)) {
return false;
}
if (existFile(path)) {
return true;
}
FileSystem fileSystem = getFileSystem();
Path srcPath = new Path(hdfsProperties.getUploadPath() + path);
boolean isOk = fileSystem.mkdirs(srcPath);
fileSystem.close();
return isOk;
}
/**
* 判断HDFS文件是否存在
*/
public boolean existFile(String path) throws Exception {
if (StringUtils.isEmpty(path)) {
return false;
}
FileSystem fileSystem = getFileSystem();
Path srcPath = new Path(path);
return fileSystem.exists(srcPath);
}
/**
* 删除HDFS文件
*/
public boolean deleteFile(String path) throws Exception {
if (StringUtils.isEmpty(path)) {
return false;
}
if (!existFile(path)) {
return false;
}
FileSystem fs = getFileSystem();
Path srcPath = new Path(path);
boolean isOk = fs.deleteOnExit(srcPath);
fs.close();
return isOk;
}
/**
* 上传HDFS文件
*/
public void uploadFile(String path, String uploadPath) throws Exception {
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(uploadPath)) {
return;
}
FileSystem fs = getFileSystem();
// 上传路径
Path clientPath = new Path(path);
// 目标路径
Path serverPath = new Path(uploadPath);
// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
fs.copyFromLocalFile(false, clientPath, serverPath);
fs.close();
}
/**
* 上传文件
*/
public void uploadFile(MultipartFile multipartFile, String path) throws Exception {
FileSystem fs = getFileSystem();
InputStream in = multipartFile.getInputStream();
// 输出流
OutputStream out = fs.create(new Path(hdfsProperties.getUploadPath() + path));
// 连接两个流,形成通道,使输入流向输出流传输数据
IOUtils.copyBytes(in, out, 1024, true);
out.close();
in.close();
fs.close();
}
/**
* 读取HDFS文件内容
*/
public void readFile(String filePath) throws Exception {
FileSystem fs = getFileSystem();
Path path = new Path(hdfsProperties.getUploadPath() + filePath);
InputStream in = null;
try {
in = fs.open(path);
//复制到标准输出流
IOUtils.copyBytes(in, System.out, 4096, false);
System.out.println("\n读取文件成功!");
} catch (Exception e) {
System.out.println("\n读取文件失败!");
} finally {
IOUtils.closeStream(in);
}
}
/**
* 下载HDFS文件
*/
public void downloadFile(String path, String downloadPath) throws Exception {
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(downloadPath)) {
return;
}
FileSystem fs = getFileSystem();
// 上传路径
Path clientPath = new Path(path);
// 目标路径
Path serverPath = new Path(downloadPath);
// 调用文件系统的文件复制方法,第一个参数是否删除原文件true为删除,默认为false
fs.copyToLocalFile(false, clientPath, serverPath);
fs.close();
}
/**
* 追加内容到文件
*/
public void appendFile(String path, String appendPath) throws Exception {
if (StringUtils.isEmpty(path) || StringUtils.isEmpty(appendPath)) {
return;
}
FileSystem fs = getFileSystem();
Path filePath = new Path(hdfsProperties.getUploadPath() + path);
FileInputStream fis = new FileInputStream(appendPath);
fs.append(filePath).writeBytes(fis.toString());
IOUtils.closeStream(fis);
fs.close();
}
/**
* 在HDFS创建文件,并向文件填充内容
*/
public void createFile(String filePath, byte[] files) {
try {
FileSystem fs = getFileSystem();
//目标路径
Path path = new Path(hdfsProperties.getUploadPath() + filePath);
//打开一个输出流
FSDataOutputStream outputStream = fs.create(path);
outputStream.write(files);
outputStream.close();
fs.close();
System.out.println("创建文件成功!");
} catch (Exception e) {
System.out.println("创建文件失败!");
}
}
/**
* 下载文件
*/
public void downloadFile(String downPath, String fileName, HttpServletResponse response) throws Exception {
FSDataInputStream fileinput = null;
OutputStream os = null;
FileSystem fs = getFileSystem();
try {
response.setContentType("multipart/form-data");
//设置编码格式
response.setCharacterEncoding("UTF-8");
//设置可以识别Html文件
response.setContentType("text/html");
response.setHeader("Content-Disposition", "attachment;filename=" + fileName);
fileinput = fs.open(new Path(hdfsProperties.getUploadPath() + downPath));
os = response.getOutputStream();
int b;
byte[] buffer = new byte[1024];
while ((b = fileinput.read(buffer)) != -1) {
// 4.写到输出流(out)中
os.write(buffer, 0, b);
}
os.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(fileinput);
IOUtils.closeStream(os);
IOUtils.closeStream(fs);
}
}
/**
* 读取HDFS文件列表
*/
public List<Map<String, Object>> listFile(String filePath) throws Exception {
filePath = hdfsProperties.getUploadPath() + filePath;
FileSystem fs = getFileSystem();
List<Map<String, Object>> list = new ArrayList<>();
//递归找到所有的文件
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path(filePath), true);
while (listFiles.hasNext()) {
Map<String, Object> map = new HashMap<>();
LocatedFileStatus next = listFiles.next();
String name = next.getPath().getName();
Path path = next.getPath();
map.put("fileName", name);
map.put("filePath", path.toUri());
list.add(map);
}
return list;
}
/**
* 文件重命名
*/
public boolean renameFile(String oldName, String newName) throws Exception {
FileSystem fs = getFileSystem();
Path oldPath = new Path(hdfsProperties.getUploadPath() + oldName);
Path newPath = new Path(hdfsProperties.getUploadPath() + newName);
boolean isOk = fs.rename(oldPath, newPath);
fs.close();
return isOk;
}
/**
* 读取HDFS文件内容
*/
public InputStream readFileInput(String filePath) throws Exception {
FileSystem fs = getFileSystem();
Path path = new Path(hdfsProperties.getUploadPath() + filePath);
return fs.open(path);
}
/**
* 获取某个文件在HDFS的集群位置
*/
public BlockLocation[] getFileBlockLocations(String path) throws Exception {
if (StringUtils.isEmpty(path)) {
return null;
}
if (!existFile(path)) {
return null;
}
FileSystem fs = getFileSystem();
// 目标路径
Path srcPath = new Path(hdfsProperties.getUploadPath() + path);
FileStatus fileStatus = fs.getFileStatus(srcPath);
return fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
}
/**
* 读取HDFS目录详细信息
*/
public List<Map<String, Object>> pathInfo(String filePath) throws Exception {
FileSystem fs = getFileSystem();
FileStatus[] listStatus = fs.listStatus(new Path(hdfsProperties.getUploadPath() + filePath));
List<Map<String, Object>> list = new ArrayList<>();
SimpleDateFormat sd = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
for (FileStatus fileStatus : listStatus) {
Map<String, Object> map = new HashMap<>();
Date date = new Date(fileStatus.getModificationTime());
map.put("name", fileStatus.getPath().toUri().getPath().replace(filePath, ""));
map.put("directory", fileStatus.isDirectory());
map.put("time", sd.format(date));
list.add(map);
}
list.sort((o1, o2) -> {
Boolean directory1 = Boolean.parseBoolean(o1.get("directory").toString());
Boolean directory2 = Boolean.parseBoolean(o2.get("directory").toString());
return directory2.compareTo(directory1);
});
return list;
}
}