Kotlin实现文件下载断点续传(RandomAccessFile全解析)

发布于:2025-06-18 ⋅ 阅读:(16) ⋅ 点赞:(0)

本文将深入探讨如何使用Kotlin和RandomAccessFile实现高效的断点续传功能,涵盖原理分析、完整代码实现、性能优化及工程实践要点。

一、断点续传核心原理

1.1 HTTP断点续传协议
Client Server GET /file (Range: bytes=500-) 206 Partial Content Content-Range: bytes 500-999/1500 200 OK (完整文件) alt [支持断点续传] [不支持] Client Server
1.2 RandomAccessFile核心优势
特性 传统FileInputStream RandomAccessFile
随机访问能力
大文件处理效率 ⭐⭐ ⭐⭐⭐⭐
内存占用
断点续传实现复杂度
文件修改能力

二、服务端完整实现(Kotlin + Spring Boot)

2.1 依赖配置
// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.0")
}
2.2 控制器实现
@RestController
class DownloadController {

    @GetMapping("/download/{filename}")
    suspend fun downloadFile(
        @PathVariable filename: String,
        request: HttpServletRequest,
        response: HttpServletResponse
    ) {
        val file = File("/data/files/$filename").takeIf { it.exists() } 
            ?: throw FileNotFoundException("File not found")

        // 解析Range请求头
        val (start, end) = parseRangeHeader(request, file.length())

        // 设置HTTP响应头
        response.configureHeaders(file, start, end)

        // 使用RandomAccessFile进行文件传输
        transferFileContent(file, response, start, end)
    }

    private fun parseRangeHeader(
        request: HttpServletRequest, 
        fileLength: Long
    ): Pair<Long, Long> {
        val rangeHeader = request.getHeader("Range")?.takeIf { it.startsWith("bytes=") }
            ?: return 0L to fileLength - 1
        
        val range = rangeHeader.substring(6).split("-")
        val start = range[0].toLongOrNull() ?: 0L
        val end = range.getOrNull(1)?.toLongOrNull() ?: fileLength - 1
        
        return start to min(end, fileLength - 1)
    }

    private fun HttpServletResponse.configureHeaders(
        file: File, 
        start: Long, 
        end: Long
    ) {
        val fileLength = file.length()
        val contentLength = end - start + 1
        
        status = if (start > 0) HttpStatus.PARTIAL_CONTENT.value() else HttpStatus.OK.value()
        contentType = "application/octet-stream"
        setHeader("Accept-Ranges", "bytes")
        setHeader("Content-Disposition", "attachment; filename=\"${file.name}\"")
        setHeader("Content-Length", contentLength.toString())
        
        if (status == HttpStatus.PARTIAL_CONTENT.value()) {
            setHeader("Content-Range", "bytes $start-$end/$fileLength")
        }
    }

    private suspend fun transferFileContent(
        file: File, 
        response: HttpServletResponse,
        start: Long, 
        end: Long
    ) = withContext(Dispatchers.IO) {
        RandomAccessFile(file, "r").use { raf ->
            raf.seek(start)
            val output = response.outputStream
            val buffer = ByteArray(8192)
            var bytesRemaining = end - start + 1
            
            while (bytesRemaining > 0) {
                val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()
                val bytesRead = raf.read(buffer, 0, readSize)
                if (bytesRead == -1) break
                
                output.write(buffer, 0, bytesRead)
                output.flush()
                bytesRemaining -= bytesRead
            }
        }
    }
}
2.3 关键代码解析

1. 文件指针定位

raf.seek(start) // 将文件指针移动到断点位置

2. 分块传输逻辑

while (bytesRemaining > 0) {
    val readSize = min(bytesRemaining, buffer.size.toLong()).toInt()
    val bytesRead = raf.read(buffer, 0, readSize)
    // ... 写入输出流
}

3. HTTP头处理

// 部分内容响应
setHeader("Content-Range", "bytes $start-$end/$fileLength")
status = HttpStatus.PARTIAL_CONTENT.value()

三、客户端完整实现(Kotlin)

3.1 文件下载器类
class ResumableDownloader(
    private val url: String,
    private val savePath: String,
    private val chunkSize: Int = 8192
) {
    private var downloadedBytes: Long = 0
    private val progressListeners = mutableListOf<(Long, Long) -> Unit>()

    fun addProgressListener(listener: (Long, Long) -> Unit) {
        progressListeners.add(listener)
    }

    suspend fun startDownload() = withContext(Dispatchers.IO) {
        val file = File(savePath)
        downloadedBytes = if (file.exists()) file.length() else 0L

        while (true) {
            try {
                val connection = URL(url).openConnection() as HttpURLConnection
                connection.setRequestProperty("Range", "bytes=$downloadedBytes-")
                
                if (connection.responseCode !in 200..299) {
                    if (connection.responseCode == 416) { // 范围请求错误
                        file.delete() // 删除无效文件
                        downloadedBytes = 0
                        continue
                    }
                    throw IOException("HTTP error: ${connection.responseCode}")
                }

                // 获取文件总大小
                val contentRange = connection.getHeaderField("Content-Range")
                val totalSize = contentRange?.split("/")?.last()?.toLongOrNull() 
                    ?: connection.contentLengthLong.takeIf { it > 0 } 
                    ?: -1

                // 执行下载
                downloadChunks(connection, file, totalSize)
                break
            } catch (e: SocketTimeoutException) {
                println("Timeout, retrying...")
            } catch (e: IOException) {
                if (e.message?.contains("reset") == true) {
                    println("Connection reset, retrying...")
                } else {
                    throw e
                }
            }
        }
    }

    private suspend fun downloadChunks(
        connection: HttpURLConnection,
        file: File,
        totalSize: Long
    ) {
        RandomAccessFile(file, "rw").use { raf ->
            raf.seek(downloadedBytes)
            val input = connection.inputStream
            val buffer = ByteArray(chunkSize)

            while (true) {
                val bytesRead = input.read(buffer)
                if (bytesRead == -1) break

                raf.write(buffer, 0, bytesRead)
                downloadedBytes += bytesRead

                // 更新进度
                if (totalSize > 0) {
                    progressListeners.forEach { it(downloadedBytes, totalSize) }
                }
            }
        }
    }
}
3.2 使用示例
fun main() = runBlocking {
    val downloader = ResumableDownloader(
        url = "https://example.com/large-file.zip",
        savePath = "downloads/large-file.zip"
    )
    
    downloader.addProgressListener { current, total ->
        val percent = (current.toDouble() / total * 100).toInt()
        println("Downloaded: $current/$total ($percent%)")
    }
    
    try {
        downloader.startDownload()
        println("Download completed successfully!")
    } catch (e: Exception) {
        println("Download failed: ${e.message}")
        println("Resume position: ${File("downloads/large-file.zip").length()} bytes")
    }
}

四、性能优化策略

4.1 内存映射文件加速
private fun transferWithMemoryMap(file: File, start: Long, end: Long, output: OutputStream) {
    RandomAccessFile(file, "r").use { raf ->
        val channel = raf.channel
        val buffer = channel.map(
            FileChannel.MapMode.READ_ONLY, 
            start, 
            end - start + 1
        )
        
        output.write(buffer.array(), buffer.arrayOffset(), buffer.remaining())
    }
}
4.2 零拷贝技术(Linux系统)
private fun transferZeroCopy(file: File, response: HttpServletResponse, start: Long, end: Long) {
    FileInputStream(file).use { fis ->
        val channel = fis.channel
        val outputChannel = Channels.newChannel(response.outputStream)
        
        var position = start
        val totalBytes = end - start + 1
        var remaining = totalBytes
        
        while (remaining > 0) {
            val transferred = channel.transferTo(position, remaining, outputChannel)
            position += transferred
            remaining -= transferred
        }
    }
}

五、工程实践要点

5.1 断点存储设计
// 断点信息数据类
data class DownloadState(
    val url: String,
    val filePath: String,
    val downloaded: Long,
    val totalSize: Long,
    val timestamp: Long = System.currentTimeMillis()
)

// 持久化存储
class DownloadStateRepository {
    private val states = ConcurrentHashMap<String, DownloadState>()
    
    fun saveState(key: String, state: DownloadState) {
        states[key] = state
        // 实际项目应持久化到数据库或文件
    }
    
    fun loadState(key: String): DownloadState? {
        return states[key]
    }
}
5.2 多线程下载实现
class MultiThreadDownloader(
    private val url: String,
    private val savePath: String,
    private val threadCount: Int = 4
) {
    suspend fun download() = coroutineScope {
        val totalSize = getFileSize()
        val chunkSize = totalSize / threadCount
        
        // 创建临时文件
        RandomAccessFile(savePath, "rw").use {
            it.setLength(totalSize) // 预分配空间
        }
        
        // 启动多个下载协程
        (0 until threadCount).map { threadId ->
            async(Dispatchers.IO) {
                val start = threadId * chunkSize
                val end = if (threadId == threadCount - 1) {
                    totalSize - 1
                } else {
                    (threadId + 1) * chunkSize - 1
                }
                
                downloadChunk(start, end)
            }
        }.awaitAll()
    }
    
    private suspend fun downloadChunk(start: Long, end: Long) {
        val connection = URL(url).openConnection() as HttpURLConnection
        connection.setRequestProperty("Range", "bytes=$start-$end")
        
        RandomAccessFile(savePath, "rw").use { raf ->
            raf.seek(start)
            connection.inputStream.use { input ->
                input.copyTo(raf.channel)
            }
        }
    }
}

六、完整解决方案对比

方案 实现复杂度 大文件支持 内存效率 适用场景
RandomAccessFile ⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ 通用文件传输
内存映射 ⭐⭐⭐ ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ 超大文件读取
NIO零拷贝 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ 高性能服务器
多线程分块下载 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐ 高速下载环境

七、总结与最佳实践

核心要点总结

  1. HTTP协议:正确处理Range请求头和Content-Range响应头
  2. 文件定位:使用RandomAccessFile.seek()实现精确跳转
  3. 分块传输:采用8-16KB缓冲区平衡内存与IO效率
  4. 错误恢复
    • 捕获ClientAbortException处理客户端中断
    • 实现自动重试机制(3次重试策略)
  5. 进度监控:实时回调下载进度用于UI更新

生产环境建议

// 1. 添加超时控制
connection.connectTimeout = 30_000
connection.readTimeout = 120_000

// 2. 限流保护
val maxSpeed = 1024 * 1024 // 1MB/s
val startTime = System.currentTimeMillis()
var bytesTransferred = 0L

while (/*...*/) {
    // ... 传输逻辑
    bytesTransferred += bytesRead
    
    // 限速控制
    val elapsed = System.currentTimeMillis() - startTime
    val expectedTime = bytesTransferred * 1000 / maxSpeed
    if (elapsed < expectedTime) {
        delay(expectedTime - elapsed)
    }
}

// 3. 文件校验
fun verifyFile(file: File, expectedHash: String): Boolean {
    val digest = MessageDigest.getInstance("SHA-256")
    file.forEachBlock { buffer, bytesRead ->
        digest.update(buffer, 0, bytesRead)
    }
    return digest.digest().joinToString("") { "%02x".format(it) } == expectedHash
}

网站公告

今日签到

点亮在社区的每一天
去签到