From a29cf173611f4e5cff9a566525a0125e9526947f Mon Sep 17 00:00:00 2001 From: jie65535 Date: Sat, 20 Jun 2026 23:56:11 +0800 Subject: [PATCH] Release ByteReadChannel when first-chunk timeout fires The streaming refactor moved post()+body() inside withTimeout, so a first-chunk timeout threw before `channel` was bound and the finally guard never ran, leaking the connection on every slow-API retry. Hold the channel in an outer nullable var and wrap the whole flow in try/finally so an acquired channel is always cancelled. Co-Authored-By: Claude Opus 4.8 --- src/main/kotlin/ModelService.kt | 57 ++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/src/main/kotlin/ModelService.kt b/src/main/kotlin/ModelService.kt index 049577f..1ef8216 100644 --- a/src/main/kotlin/ModelService.kt +++ b/src/main/kotlin/ModelService.kt @@ -62,37 +62,42 @@ class ModelService( // 因此必须把 post() 连同首个 data 块的读取一起包进 withTimeout。 // 否则首 token 超时永远不会触发(post() 还没返回,根本进不到读取循环), // 只能落到 Ktor 的兜底超时(很久)后再重试,表现为「等很久才报异常」。 - val (channel, firstDataLine) = withTimeout(firstChunkTimeout) { - val response = httpClient.post("chat/completions") { - setBody(body) - contentType(ContentType.Application.Json) - accept(ContentType.Text.EventStream) - headers { - append(HttpHeaders.CacheControl, "no-cache") - append(HttpHeaders.Connection, "keep-alive") - } - } - val ch: ByteReadChannel = response.body() - var found: String? = null - while (currentCoroutineContext().isActive && !ch.isClosedForRead) { - val line = ch.readUTF8Line() ?: continue - if (line.startsWith("data: ")) { - found = line - break - } - // 心跳/空行/注释行,不计为首块,继续等 - } - ch to found - } - + // channel 在 withTimeout 外层持有:哪怕首块读取在 withTimeout 内超时, + // 只要 response.body() 已拿到通道,finally 也能释放它,避免慢速 API 重试时连接泄漏。 + var channel: ByteReadChannel? = null try { + val firstDataLine = withTimeout(firstChunkTimeout) { + val response = httpClient.post("chat/completions") { + setBody(body) + contentType(ContentType.Application.Json) + accept(ContentType.Text.EventStream) + headers { + append(HttpHeaders.CacheControl, "no-cache") + append(HttpHeaders.Connection, "keep-alive") + } + } + val ch: ByteReadChannel = response.body() + channel = ch + var found: String? = null + while (currentCoroutineContext().isActive && !ch.isClosedForRead) { + val line = ch.readUTF8Line() ?: continue + if (line.startsWith("data: ")) { + found = line + break + } + // 心跳/空行/注释行,不计为首块,继续等 + } + found + } + if (firstDataLine != null && !firstDataLine.startsWith("data: [DONE]")) { emit(json.decodeFromString(firstDataLine.removePrefix("data: "))) - while (currentCoroutineContext().isActive && !channel.isClosedForRead) { + val ch = channel!! + while (currentCoroutineContext().isActive && !ch.isClosedForRead) { // 流式期间同样对每次读取设「token 间隔」超时,避免中途卡死后干等兜底超时, // 从而能快速失败并交给上层重试。正常流式 token 间隔远小于 firstChunkTimeout。 - val line = withTimeout(firstChunkTimeout) { channel.readUTF8Line() } ?: continue + val line = withTimeout(firstChunkTimeout) { ch.readUTF8Line() } ?: continue when { line.startsWith("data: [DONE]") -> break line.startsWith("data: ") -> { @@ -103,7 +108,7 @@ class ModelService( } } } finally { - channel.cancel() + channel?.cancel() } } }