Skip to content

Commit 10e4d47

Browse files
committed
Flush all queued requests and responses in websocket pumper
1 parent 019fd8d commit 10e4d47

File tree

1 file changed

+15
-2
lines changed

1 file changed

+15
-2
lines changed

internal/ws.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,14 @@ func (proxy *WebSocketProxy) pumpProvider(providerClient *Client) {
147147
case req := <-proxy.Requests:
148148
metrics.RecordRequest(proxy.endpoint.Name, proxy.provider.Name, "ws", req.Method, 0)
149149
providerClient.Write(rpc.SerializeRequest(req))
150+
151+
// Flush any remaining requests
152+
for i := 0; i < len(proxy.Requests); i++ {
153+
req := <-proxy.Requests
154+
metrics.RecordRequest(proxy.endpoint.Name, proxy.provider.Name, "ws", req.Method, 0)
155+
providerClient.Write(rpc.SerializeRequest(req))
156+
}
157+
150158
case message := <-providerClient.Read():
151159
rpcResponse, err := rpc.DecodeResponse(message)
152160
if err != nil {
@@ -260,8 +268,13 @@ func (proxy *WebSocketProxy) pumpClient(client *Client) {
260268
proxy.log.Debug("proxy.Responses closed")
261269
continue
262270
}
263-
ss := rpc.SerializeResponse(rpcResponse)
264-
proxy.ClientConn.Write(ss)
271+
272+
proxy.ClientConn.Write(rpc.SerializeResponse(rpcResponse))
273+
274+
// Flush any remaining responses
275+
for i := 0; i < len(proxy.Responses); i++ {
276+
proxy.ClientConn.Write(rpc.SerializeResponse(<-proxy.Responses))
277+
}
265278
}
266279
}
267280
}

0 commit comments

Comments
 (0)