diff --git a/src/main/kotlin/ModelService.kt b/src/main/kotlin/ModelService.kt index 049577f..1ef8216 100644 --- a/src/main/kotlin/ModelService.kt +++ b/src/main/kotlin/ModelService.kt @@ -62,37 +62,42 @@ class ModelService( // 因此必须把 post() 连同首个 data 块的读取一起包进 withTimeout。 // 否则首 token 超时永远不会触发(post() 还没返回,根本进不到读取循环), // 只能落到 Ktor 的兜底超时(很久)后再重试,表现为「等很久才报异常」。 - val (channel, firstDataLine) = withTimeout(firstChunkTimeout) { - val response = httpClient.post("chat/completions") { - setBody(body) - contentType(ContentType.Application.Json) - accept(ContentType.Text.EventStream) - headers { - append(HttpHeaders.CacheControl, "no-cache") - append(HttpHeaders.Connection, "keep-alive") - } - } - val ch: ByteReadChannel = response.body() - var found: String? = null - while (currentCoroutineContext().isActive && !ch.isClosedForRead) { - val line = ch.readUTF8Line() ?: continue - if (line.startsWith("data: ")) { - found = line - break - } - // 心跳/空行/注释行,不计为首块,继续等 - } - ch to found - } - + // channel 在 withTimeout 外层持有:哪怕首块读取在 withTimeout 内超时, + // 只要 response.body() 已拿到通道,finally 也能释放它,避免慢速 API 重试时连接泄漏。 + var channel: ByteReadChannel? = null try { + val firstDataLine = withTimeout(firstChunkTimeout) { + val response = httpClient.post("chat/completions") { + setBody(body) + contentType(ContentType.Application.Json) + accept(ContentType.Text.EventStream) + headers { + append(HttpHeaders.CacheControl, "no-cache") + append(HttpHeaders.Connection, "keep-alive") + } + } + val ch: ByteReadChannel = response.body() + channel = ch + var found: String? = null + while (currentCoroutineContext().isActive && !ch.isClosedForRead) { + val line = ch.readUTF8Line() ?: continue + if (line.startsWith("data: ")) { + found = line + break + } + // 心跳/空行/注释行,不计为首块,继续等 + } + found + } + if (firstDataLine != null && !firstDataLine.startsWith("data: [DONE]")) { emit(json.decodeFromString(firstDataLine.removePrefix("data: "))) - while (currentCoroutineContext().isActive && !channel.isClosedForRead) { + val ch = channel!! + while (currentCoroutineContext().isActive && !ch.isClosedForRead) { // 流式期间同样对每次读取设「token 间隔」超时,避免中途卡死后干等兜底超时, // 从而能快速失败并交给上层重试。正常流式 token 间隔远小于 firstChunkTimeout。 - val line = withTimeout(firstChunkTimeout) { channel.readUTF8Line() } ?: continue + val line = withTimeout(firstChunkTimeout) { ch.readUTF8Line() } ?: continue when { line.startsWith("data: [DONE]") -> break line.startsWith("data: ") -> { @@ -103,7 +108,7 @@ class ModelService( } } } finally { - channel.cancel() + channel?.cancel() } } }