Release ByteReadChannel when first-chunk timeout fires

The streaming refactor moved post()+body() inside withTimeout, so a
first-chunk timeout threw before `channel` was bound and the finally
guard never ran, leaking the connection on every slow-API retry. Hold
the channel in an outer nullable var and wrap the whole flow in
try/finally so an acquired channel is always cancelled.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-20 23:56:11 +08:00
parent 4307019ee8
commit a29cf17361

View File

@@ -62,37 +62,42 @@ class ModelService(
// 因此必须把 post() 连同首个 data 块的读取一起包进 withTimeout。
// 否则首 token 超时永远不会触发post() 还没返回,根本进不到读取循环),
// 只能落到 Ktor 的兜底超时(很久)后再重试,表现为「等很久才报异常」。
val (channel, firstDataLine) = withTimeout(firstChunkTimeout) {
val response = httpClient.post("chat/completions") {
setBody(body)
contentType(ContentType.Application.Json)
accept(ContentType.Text.EventStream)
headers {
append(HttpHeaders.CacheControl, "no-cache")
append(HttpHeaders.Connection, "keep-alive")
}
}
val ch: ByteReadChannel = response.body()
var found: String? = null
while (currentCoroutineContext().isActive && !ch.isClosedForRead) {
val line = ch.readUTF8Line() ?: continue
if (line.startsWith("data: ")) {
found = line
break
}
// 心跳/空行/注释行,不计为首块,继续等
}
ch to found
}
// channel 在 withTimeout 外层持有:哪怕首块读取在 withTimeout 内超时,
// 只要 response.body() 已拿到通道finally 也能释放它,避免慢速 API 重试时连接泄漏。
var channel: ByteReadChannel? = null
try {
val firstDataLine = withTimeout(firstChunkTimeout) {
val response = httpClient.post("chat/completions") {
setBody(body)
contentType(ContentType.Application.Json)
accept(ContentType.Text.EventStream)
headers {
append(HttpHeaders.CacheControl, "no-cache")
append(HttpHeaders.Connection, "keep-alive")
}
}
val ch: ByteReadChannel = response.body()
channel = ch
var found: String? = null
while (currentCoroutineContext().isActive && !ch.isClosedForRead) {
val line = ch.readUTF8Line() ?: continue
if (line.startsWith("data: ")) {
found = line
break
}
// 心跳/空行/注释行,不计为首块,继续等
}
found
}
if (firstDataLine != null && !firstDataLine.startsWith("data: [DONE]")) {
emit(json.decodeFromString(firstDataLine.removePrefix("data: ")))
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
val ch = channel!!
while (currentCoroutineContext().isActive && !ch.isClosedForRead) {
// 流式期间同样对每次读取设「token 间隔」超时,避免中途卡死后干等兜底超时,
// 从而能快速失败并交给上层重试。正常流式 token 间隔远小于 firstChunkTimeout。
val line = withTimeout(firstChunkTimeout) { channel.readUTF8Line() } ?: continue
val line = withTimeout(firstChunkTimeout) { ch.readUTF8Line() } ?: continue
when {
line.startsWith("data: [DONE]") -> break
line.startsWith("data: ") -> {
@@ -103,7 +108,7 @@ class ModelService(
}
}
} finally {
channel.cancel()
channel?.cancel()
}
}
}