深入HDFS——数据读取源码

发布于:2025-02-10 ⋅ 阅读:(27) ⋅ 点赞:(0)

引入

通过核心设计篇章的学习,我们知道数据上传设计的过程是很多的,这也是上一篇数据上传源码内容很多的原因。

今天我们就可以来个简单一些的放松放松,看看数据读取源码是如何实现的。

数据读取过程

还是老样子,实现一个简单的数据读取代码如下:

public class ReadDataFromHDFS {
    public static void main(String[] args) throws IOException, InterruptedException {
        Configuration conf = new Configuration();

        //创建FileSystem对象
        FileSystem fs = FileSystem.newInstance(conf);

        //创建HDFS文件路径
        Path path = new Path("/chaos.txt");
        FSDataInputStream in = fs.open(path);

        //读取HDFS中数据
        BufferedReader br = new BufferedReader(new InputStreamReader(in));
        String newLine = "";
        while((newLine = br.readLine()) != null) {
            System.out.println(newLine);
        }

        //关闭流对象
        br.close();
        in.close();
    }
}

整个数据读取源码流程很简单,客户端从HDFS中获取文件数据主要就以下步骤:

  • 首先会向NameNode获取文件相关的block信息;
  • 然后连接各个Datanode;
  • 最后以流方式读取数据就可以了。

下面我们深入源码看看,是如何实现的。

1.连接NameNode获取block信息

HDFS读取数据代码中,创建DFSClient的相关实现源码就不多说了,和上传数据的代码是一样的。

当代码执行到fs.open()时,其中fs肯定就是我们上一篇重点提到的DistributedFileSystem对象,所以open方法最终调用到DistributedFileSystem.open方法,其源码如下:

@Override //客户端读取HDFS中数据
public FSDataInputStream open(Path f, final int bufferSize)
    throws IOException {
    ... ...
    return new FileSystemLinkResolver<FSDataInputStream>() {
    @Override
    public FSDataInputStream doCall(final Path p) throws IOException {
        //执行dfs.open 方法,返回DFSInputStream对象
        final DFSInputStream dfsis =
            dfs.open(getPathName(p), bufferSize, verifyChecksum);
        try {
        return dfs.createWrappedInputStream(dfsis);
        } catch (IOException ex){
        dfsis.close();
        throw ex;
        }
    }
    @Override
    public FSDataInputStream next(final FileSystem fs, final Path p)
        throws IOException {
        return fs.open(p, bufferSize);
    }
    }.resolve(this, absF);
    ... ...
}

以上代码中,dfs.open()方法最终返回读取数据对象DFSInputStream。

dfs.open()源码如下:

public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
    throws IOException {
  checkOpen();
  //    Get block info from namenode
  try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
    //从NameNode节点上获取Block信息
    LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);
    //连接DataNode节点读取数据
    return openInternal(locatedBlocks, src, verifyChecksum);
  }
}

其中getLocatedBlocks(src, 0)方法会连接NameNode节点上获取读取文件相关Block信息,openInternal()方法中将获取过来的LocatedBlocks对象(该对象中属性List blocks表示读取文件的所有blockInfo信息)封装到DFSInputStream对象中并返回。

getLocatedBlocks源码如下:

public LocatedBlocks getLocatedBlocks(String src, long start)
    throws IOException {
  //从NameNode 获取文件block信息
  return getLocatedBlocks(src, start, dfsClientConf.getPrefetchSize());
}

getLocatedBlocks()源码如下:

@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
    throws IOException {
  try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) {
    //从NameNode 获取文件block信息
    return callGetBlockLocations(namenode, src, start, length);
  }
}

以上代码中callGetBlockLocations() 传入NameNode Rpc Proxy 对象,后续通过该对象连接NameNode。

其源码如下:

static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
    String src, long start, long length)
    throws IOException {
  try {
    //从NameNode 获取文件block信息
    return namenode.getBlockLocations(src, start, length);
  } catch(RemoteException re) {
    throw re.unwrapRemoteException(AccessControlException.class,
        FileNotFoundException.class,
        UnresolvedPathException.class);
  }
}

namenode.getBlockLocations(src, start, length)最终调用到NameNodeRpcServer.getBlockLocations()方法。

回到DFSClient.open()方法中,最终通过执行return openInternal(locatedBlocks, src, verifyChecksum) 返回DFSInputStream对象。

2.客户端通过socket连接DataNode

当从HDFS中通过流读取数据时,会将fs.open()方法返回的DFSInputStream对象经过一层层包装形成BufferedReader,该对象读取数据时最终调用到DFSInputStream.read()方法。

其源码如下:

@Override
public synchronized int read() throws IOException {
  if (oneByteBuf == null) {
    oneByteBuf = new byte[1];
  }
  //读取数据
  int ret = read(oneByteBuf, 0, 1);
  return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff);
}

其中Read方法实现源码如下:

@Override
public synchronized int read(@Nonnull final byte buf[], int off, int len)
    throws IOException {
  validatePositionedReadArgs(pos, buf, off, len);
  if (len == 0) {
    return 0;
  }
  ReaderStrategy byteArrayReader =
      new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient);
  //传入的是 ByteArrayStrategy
  return readWithStrategy(byteArrayReader);
}

上面代码中,在最后返回的return readWithStrategy(byteArrayReader)代码中传入了ByteArrayStrategy对象,该对象表示客户端以ByteArray方式从HDFS DataNode中读取数据。

readWithStrategy()方法实现源码如下:

protected synchronized int readWithStrategy(ReaderStrategy strategy)
    throws IOException {
    ... ...
    // 如果当前位置小于文件长度
    if (pos < getFileLength()) {
    if (pos > blockEnd || currentNode == null) {
    // 根据LocateBlocks列表生成BlockReader对象,在BlockReader中实现连接DataNode节点并准备接受数据的流对象。
    // 返回的currentNode 为block所在期望的DataNode
    currentNode = blockSeekTo(pos);
    }
    ... ...
    //从socket流中读取数据,使用到blockSeekTo 方法中从DataNode 获取数据的 in 流对象
    int result = readBuffer(strategy, realLen, corruptedBlocks);
    ... ...
    //返回读取的结果
    return result;
}

以上代码中核心代码为 blockSeekTo(),该方法会根据LocateBlocks列表生成BlockReader对象,在BlockReader中实现连接DataNode节点并准备接受数据的流对象,返回的currentNode 为block所在期望的DataNode。

最终在readBuffer(strategy, realLen, corruptedBlocks)代码中会使用到接收DataNode返回数据的流对象,将数据从DataNode节点接受过来。

下面我们先看blockSeekTo()源码实现,看看客户端如何连接上DataNode节点建立读取数据的连接对象。

blockSeekTo()源码如下:

private synchronized DatanodeInfo blockSeekTo(long target)
    throws IOException {
    ... ...
    //选择block所在DataNode的第一个节点信息
    chosenNode = retval.info;
    //选择block所在DataNode的第一个节点 addr
    InetSocketAddress targetAddr = retval.addr;
    ... ...
    //getBlockReader方法中与对应的DataNode节点进行连接并准备好接受从DataNode 返回的数据流的对象
    blockReader = getBlockReader(targetBlock, offsetIntoBlock,
        targetBlock.getBlockSize() - offsetIntoBlock, targetAddr,
        storageType, chosenNode);
    ... ...
    return chosenNode;
}

getBlockReader()的实现源码如下:

protected BlockReader getBlockReader(LocatedBlock targetBlock,
    long offsetInBlock, long length, InetSocketAddress targetAddr,
    StorageType storageType, DatanodeInfo datanode) throws IOException {
  ... ...
  // 最后执行build方法时,与DataNode节点建立连接,并准备接受数据的流对象
  return new BlockReaderFactory(dfsClient.getConf()).
      setInetSocketAddress(targetAddr).
      setRemotePeerFactory(dfsClient).
      setDatanodeInfo(datanode).
      setStorageType(storageType).
      setFileName(src).
      setBlock(blk).
      setBlockToken(accessToken).
      setStartOffset(offsetInBlock).
      setVerifyChecksum(verifyChecksum).
      setClientName(dfsClient.clientName).
      setLength(length).
      setCachingStrategy(curCachingStrategy).
      setAllowShortCircuitLocalReads(!shortCircuitForbidden).
      setClientCacheContext(dfsClient.getClientContext()).
      setUserGroupInformation(dfsClient.ugi).
      setConfiguration(dfsClient.getConfiguration()).
      build();
}

代码最后指定build()方法中会执行getRemoteBlockReaderFromDomain()方法与DataNode节点建立连接,并准备接受数据的流对象。

build()方法源码如下:

public BlockReader build() throws IOException {
    ... ...
    //与DataNode节点建立连接,并准备接受数据的流对象
    reader = getRemoteBlockReaderFromDomain();
    ... ...
}

getRemoteBlockReaderFromDomain()方法源码如下:

private BlockReader getRemoteBlockReaderFromDomain() throws IOException {
    ... ...
    //与DataNode节点建立连接,并准备接受数据的流对象
    blockReader = getRemoteBlockReader(peer);
    ... ...
}

最终getRemoteBlockReader()方法执行到BlockReaderRemote.newBlockReader方法中。

其源码如下:

public static BlockReader newBlockReader(String file,
    ExtendedBlock block,
    Token<BlockTokenIdentifier> blockToken,
    long startOffset, long len,
    boolean verifyChecksum,
    String clientName,
    Peer peer, DatanodeID datanodeID,
    PeerCache peerCache,
    CachingStrategy cachingStrategy,
    int networkDistance, Configuration configuration) throws IOException {
    ... ...
    //准备发送到DataNode的输出流
    final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
        peer.getOutputStream(), bufferSize));
    //从DataNode节点上读取block数据,这里的readBlock方法中会通过send(...)方法将读取DN Block数据发送到DN节点上
    new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
        verifyChecksum, cachingStrategy);
    // peer.getInputStream() 为DataNode 端返回的数据流 ,相当于客户端的输入流,后续redaBuffer中会通过该in 对象进行数据读取
    DataInputStream in = new DataInputStream(peer.getInputStream());
    ... ...
}

上面源码中new Sender(out).readBlock()readBlock()方法中会通过send(out, Op.READ_BLOCK, proto) 方法,将读取Block的信息发送到DataNode节点。

readBlock()实现核心源码如下:

public void readBlock(final ExtendedBlock blk,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final long blockOffset,
    final long length,
    final boolean sendChecksum,
    final CachingStrategy cachingStrategy) throws IOException {
    ... ...
    //send 方法会发送给DataNode中的DataXceiver服务中,DataXceiver服务一直运行,
      send(out, Op.READ_BLOCK, proto);
}

在启动DataNode后,DataNode节点上一直启动DataXceiver服务,该服务会一直接受客户端与DataNode的通信。

最终,通过DataInputStream in = new DataInputStream(peer.getInputStream())来接收DataNode节点返回Block的数据流,后续会通过该in对象从DataNode中读取block数据。

3.DataNode返回block数据流

接着找到DataNode节点的DataXceiver服务的run方法。

其源码如下:

public void run() {
    ... ...
    // 初始化操作对象
    Op op = null;
    ... ...
    // 初始化输入流
    InputStream input = socketIn;
    ... ...
    //读取客户端传入的数据给输入流赋值
    input = new BufferedInputStream(saslStreams.in,
        smallBufferSize);
    ... ...
    //回复到客户端的socket流
    socketOut = saslStreams.out;
    ... ...

    // 初始化DataXceiver的输入流 ,就是将 input 流赋值给了Receiver 中的 in 属性,后续使用
    super.initialize(new DataInputStream(input));
    ... ...
    //读取输入数据
    op = readOp();
    ... ...
    //处理读取过来的数据流
    processOp(op);
    ... ...
}

以上run方法为DataXceiver服务一直接受客户端发送过来的请求,当客户端执行send(out, Op.READ_BLOCK, proto)该服务接受到请求后,会执行processOp(op)方法。

其源码如下:

protected final void processOp(Op op) throws IOException {
    ... ...
    case READ_BLOCK:
    //读取Block数据
    opReadBlock();
    break;
    ... ...
}

所以最终执行到opReadBlock方法。

其源码如下:

private void opReadBlock() throws IOException {
    ... ...
    //读取Block数据
    readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
    PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
    proto.getHeader().getClientName(),
    proto.getOffset(),
    proto.getLen(),
    proto.getSendChecksums(),
    (proto.hasCachingStrategy() ?
        getCachingStrategy(proto.getCachingStrategy()) :
        CachingStrategy.newDefaultStrategy()));
    ... ...
}

上面代码中readBlock最终调用到DataXceiver.readBlock方法。

其源码如下:

public void readBlock(final ExtendedBlock block,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final long blockOffset,
    final long length,
    final boolean sendChecksum,
    final CachingStrategy cachingStrategy) throws IOException {
    ... ...
    //将block数据以packet方法发送给客户端
    read = blockSender.sendBlock(out, baseStream, null); // send data
    ... ...
}

sendBlock()源码如下:

public void readBlock(final ExtendedBlock block,
    final Token<BlockTokenIdentifier> blockToken,
    final String clientName,
    final long blockOffset,
    final long length,
    final boolean sendChecksum,
    final CachingStrategy cachingStrategy) throws IOException {
    ... ...
    //将block数据以packet方法发送给客户端
    read = blockSender.sendBlock(out, baseStream, null); // send data
    ... ...
}

doSendBlock()源码如下:

private long doSendBlock(DataOutputStream out, OutputStream baseStream,
      DataTransferThrottler throttler) throws IOException {
    ... ...
    while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
    manageOsCache();
    //循环发送packet数据到客户端
    long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
        transferTo, throttler);
    offset += len;
    totalRead += len + (numberOfChunks(len) * checksumSize);
    seqno++;
    }
    ... ...
}

上面代码中sendPacket()方法会找到packet 数据发送到客户端。

sendPacket()主要源码如下:

private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
                       boolean transferTo, DataTransferThrottler throttler) throws IOException {
    ... ...
    //sockOut 对象为客户端socket对象,这里从磁盘读取数据返回给客户端
    fileIoProvider.transferToSocketFully(
            ris.getVolumeRef().getVolume(), sockOut, fileCh, blockInPosition,
            dataLen, waitTime, transferTime);
    ... ...
}

代码中的fileIoProvider.transferToSocketFully()就是从磁盘读取数据通过socket返回到客户端。

回到BlockReaderRemote.newBlockReader()方法中,在该方法中最终通过DataInputStream in = new DataInputStream(peer.getInputStream())代码接收DataNode返回数据的流对象。

继续回到DFSInputStream.readWithStrategy()方法中。

其源码如下:

protected synchronized int readWithStrategy(ReaderStrategy strategy)
    throws IOException {
    ... ...
    // 如果当前位置小于文件长度
    if (pos < getFileLength()) {
    if (pos > blockEnd || currentNode == null) {
    // 根据LocateBlocks列表生成BlockReader对象,在BlockReader中实现连接DataNode节点并准备接受数据的流对象。
    // 返回的currentNode 为block所在期望的DataNode
    currentNode = blockSeekTo(pos);
    }
    ... ...
    //从socket流中读取数据,使用到blockSeekTo 方法中从DataNode 获取数据的 in 流对象
    int result = readBuffer(strategy, realLen, corruptedBlocks);
    ... ...
    //返回读取的结果
    return result;
}

最终通过readBuffer()从DataNode中获取文件数据。

总结

今天深入了下数据读取的源码,可以看到,哪怕是这么简单的操作,底层实现还是挺复杂的。

细心的小伙伴或许会发现本篇与前面的源码有些不一样——关于不同大版本的实现区别,这并不是代码数据读取过程没有2.x与3.x的区别。

我只是本文以3.x版本为主,感兴趣的小伙伴可以深入对比一下与2.x版本的区别。