mirror of
https://github.com/jie65535/JChatGPT.git
synced 2026-06-23 00:49:31 +08:00
Fix streaming first-token timeout never firing on slow API
The firstChunkTimeout only wrapped the response-body read, but when the upstream (e.g. DeepSeek under load) stalls before sending response headers, httpClient.post() itself blocks and the withTimeout block is never reached. Every slow request fell through to Ktor's requestTimeoutMillis (120s) and was retried up to retryMax times, causing multi-minute waits before any reply. - Move post() inside withTimeout(firstChunkTimeout) so the entire request-to-first-data-chunk window is bounded and fails fast. - Apply withTimeout(firstChunkTimeout) to each streaming read so a mid-stream stall is also caught quickly instead of waiting on the socket/request backstop. - Drop requestTimeoutMillis so legitimately long streams are no longer killed at 120s; TTFT and inter-token gaps are now governed at the application layer. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -28,8 +28,9 @@ class ModelService(
|
||||
val httpClient: HttpClient by lazy {
|
||||
HttpClient(OkHttp) {
|
||||
install(HttpTimeout) {
|
||||
// 总请求/socket 超时保持长值,允许慢速流式输出;连接握手则用短超时。
|
||||
requestTimeoutMillis = timeout.inWholeMilliseconds
|
||||
// 流式响应的「首 token」与「token 间隔」超时统一由应用层 withTimeout 管控(见 chatCompletions)。
|
||||
// 这里特意不设 requestTimeoutMillis:否则正常但耗时较长的流式输出会被 Ktor 在中途整体掐断。
|
||||
// socket 超时作为字节级兜底,连接超时只覆盖 TCP 握手。
|
||||
socketTimeoutMillis = timeout.inWholeMilliseconds
|
||||
connectTimeoutMillis = firstChunkTimeout.inWholeMilliseconds
|
||||
}
|
||||
@@ -57,51 +58,52 @@ class ModelService(
|
||||
val body = JsonObject(requestJson).toString()
|
||||
|
||||
return flow {
|
||||
httpClient.post("chat/completions") {
|
||||
setBody(body)
|
||||
contentType(ContentType.Application.Json)
|
||||
accept(ContentType.Text.EventStream)
|
||||
headers {
|
||||
append(HttpHeaders.CacheControl, "no-cache")
|
||||
append(HttpHeaders.Connection, "keep-alive")
|
||||
}
|
||||
}.let { response ->
|
||||
val channel: ByteReadChannel = response.body()
|
||||
try {
|
||||
// 首块 data: 必须在 firstChunkTimeout 内到达,否则抛 TimeoutCancellationException
|
||||
// 走 JChatGPT 的重试流程;之后的流式读取不再有应用层超时,由 socketTimeoutMillis 兜底。
|
||||
val firstDataLine: String? = withTimeout(firstChunkTimeout) {
|
||||
var found: String? = null
|
||||
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
|
||||
val line = channel.readUTF8Line() ?: continue
|
||||
if (line.startsWith("data: ")) {
|
||||
found = line
|
||||
break
|
||||
}
|
||||
// 心跳/空行/注释行,不计为首块,继续等
|
||||
}
|
||||
found
|
||||
// 关键:服务器繁忙时会拖住「响应头」,使 httpClient.post() 自身阻塞在等待响应的阶段,
|
||||
// 因此必须把 post() 连同首个 data 块的读取一起包进 withTimeout。
|
||||
// 否则首 token 超时永远不会触发(post() 还没返回,根本进不到读取循环),
|
||||
// 只能落到 Ktor 的兜底超时(很久)后再重试,表现为「等很久才报异常」。
|
||||
val (channel, firstDataLine) = withTimeout(firstChunkTimeout) {
|
||||
val response = httpClient.post("chat/completions") {
|
||||
setBody(body)
|
||||
contentType(ContentType.Application.Json)
|
||||
accept(ContentType.Text.EventStream)
|
||||
headers {
|
||||
append(HttpHeaders.CacheControl, "no-cache")
|
||||
append(HttpHeaders.Connection, "keep-alive")
|
||||
}
|
||||
}
|
||||
val ch: ByteReadChannel = response.body()
|
||||
var found: String? = null
|
||||
while (currentCoroutineContext().isActive && !ch.isClosedForRead) {
|
||||
val line = ch.readUTF8Line() ?: continue
|
||||
if (line.startsWith("data: ")) {
|
||||
found = line
|
||||
break
|
||||
}
|
||||
// 心跳/空行/注释行,不计为首块,继续等
|
||||
}
|
||||
ch to found
|
||||
}
|
||||
|
||||
if (firstDataLine != null) {
|
||||
if (!firstDataLine.startsWith("data: [DONE]")) {
|
||||
emit(json.decodeFromString(firstDataLine.removePrefix("data: ")))
|
||||
try {
|
||||
if (firstDataLine != null && !firstDataLine.startsWith("data: [DONE]")) {
|
||||
emit(json.decodeFromString(firstDataLine.removePrefix("data: ")))
|
||||
|
||||
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
|
||||
val line = channel.readUTF8Line() ?: continue
|
||||
when {
|
||||
line.startsWith("data: [DONE]") -> break
|
||||
line.startsWith("data: ") -> {
|
||||
emit(json.decodeFromString(line.removePrefix("data: ")))
|
||||
}
|
||||
else -> continue
|
||||
}
|
||||
while (currentCoroutineContext().isActive && !channel.isClosedForRead) {
|
||||
// 流式期间同样对每次读取设「token 间隔」超时,避免中途卡死后干等兜底超时,
|
||||
// 从而能快速失败并交给上层重试。正常流式 token 间隔远小于 firstChunkTimeout。
|
||||
val line = withTimeout(firstChunkTimeout) { channel.readUTF8Line() } ?: continue
|
||||
when {
|
||||
line.startsWith("data: [DONE]") -> break
|
||||
line.startsWith("data: ") -> {
|
||||
emit(json.decodeFromString(line.removePrefix("data: ")))
|
||||
}
|
||||
else -> continue
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
channel.cancel()
|
||||
}
|
||||
} finally {
|
||||
channel.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user