# HG changeset patch # User Sergey Kuksenko # Date 1467845006 25200 # Wed Jul 06 15:43:26 2016 -0700 # Node ID 267e1929354d6b9b616f6a98d8f51cce90572170 # Parent a82d07d9c195b955a606fa907b4454c400a20ad3 cumulative window update diff --git a/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java b/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java --- a/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2ClientImpl.java @@ -146,7 +146,7 @@ frame.setParameter(MAX_CONCURRENT_STREAMS, Utils.getIntegerNetProperty( "java.net.httpclient.maxstreams", 16)); frame.setParameter(INITIAL_WINDOW_SIZE, Utils.getIntegerNetProperty( - "java.net.httpclient.windowsize", 32 * K)); + "java.net.httpclient.windowsize", Http2Connection.INITIAL_WINDOW_SIZE)); frame.setParameter(MAX_FRAME_SIZE, Utils.getIntegerNetProperty( "java.net.httpclient.maxframesize", 16 * K)); frame.computeLength(); diff --git a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java --- a/src/java.httpclient/share/classes/java/net/http/Http2Connection.java +++ b/src/java.httpclient/share/classes/java/net/http/Http2Connection.java @@ -29,7 +29,7 @@ import java.net.http.HttpConnection.Mode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; -import java.util.Collection; +import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -102,6 +102,7 @@ final ExecutorWrapper executor; + WindowUpdateSender windowUpdater; /** * This is established by the protocol spec and the peer will update it with * WINDOW_UPDATEs, which affects the sendWindow. @@ -506,6 +507,13 @@ serverSettings = SettingsFrame.getDefaultSettings(); hpackOut = new Encoder(serverSettings.getParameter(HEADER_TABLE_SIZE)); hpackIn = new Decoder(clientSettings.getParameter(HEADER_TABLE_SIZE)); + + windowUpdater = new WindowUpdateSender(this, client2.client().getReceiveBufferSize()) { + @Override + int getStreamId() { + return 0; + } + }; } /** @@ -550,16 +558,11 @@ SettingsFrame sf = client2.getClientSettings(); Log.logFrames(sf, "OUT"); sf.writeOutgoing(bg); - WindowUpdateFrame wup = new WindowUpdateFrame(); - wup.streamid(0); + ba = bg.getBufferArray(); + connection.write(ba, 0, ba.length); // send a Window update for the receive buffer we are using // minus the initial 64 K specified in protocol - wup.setUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); - wup.computeLength(); - wup.writeOutgoing(bg); - Log.logFrames(wup, "OUT"); - ba = bg.getBufferArray(); - connection.write(ba, 0, ba.length); + windowUpdater.sendWindowUpdate(client2.client().getReceiveBufferSize() - (64 * 1024 - 1)); } /** @@ -731,13 +734,22 @@ // body to proceed. stream.updateOutgoingWindow(getInitialSendWindowSize()); LinkedList frames = encodeHeaders(oh); - for (Http2Frame f : frames) { - sendOneFrame(f); + if (frames.size() == 1) { + sendOneFrame(frames.getFirst()); + } else { + // provide protection from inserting unordered frames between Headers and Continuation + // that is a temporal solution until asycn connection queue will have + // atomic implementation of addAll method. + List bufs = new ArrayList<>(); + for (Http2Frame f : frames) { + bufs.addAll(encodeFrame(f)); + } + ByteBuffer[] currentBufs = bufs.toArray(new ByteBuffer[0]); + connection.write(currentBufs, 0, currentBufs.length); } } else { sendOneFrame(frame); } - } catch (IOException e) { if (!closed) { Log.logError(e); @@ -749,7 +761,6 @@ /** * Send a frame. - * * @param frame * @throws IOException */ @@ -762,6 +773,34 @@ connection.write(currentBufs, 0, currentBufs.length); } + /* + * Direct call of the method bypasses synchronization on "sendlock" and + * allowed only of control frames: WindowUpdateFrame, PingFrame and etc. + * prohibited for such frames as DataFrame, HeadersFrame, ContinuationFrame. + */ + void sendUnorderedFrame(Http2Frame frame){ + try { + ByteBufferGenerator bbg = new ByteBufferGenerator(this); + frame.computeLength(); + Log.logFrames(frame, "OUT"); + frame.writeOutgoing(bbg); + ByteBuffer[] currentBufs = bbg.getBufferArray(); + connection.write(currentBufs, 0, currentBufs.length); + } catch (IOException e) { + if (!closed) { + Log.logError(e); + shutdown(e); + } + } + } + + private List encodeFrame(Http2Frame frame) throws IOException { + ByteBufferGenerator bbg = new ByteBufferGenerator(this); + frame.computeLength(); + Log.logFrames(frame, "OUT"); + frame.writeOutgoing(bbg); + return bbg.getBufferList(); + } private SettingsFrame getAckFrame(int streamid) { SettingsFrame frame = new SettingsFrame(); diff --git a/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java b/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java --- a/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java +++ b/src/java.httpclient/share/classes/java/net/http/SettingsFrame.java @@ -158,7 +158,7 @@ f.setParameter(ENABLE_PUSH, 1); f.setParameter(HEADER_TABLE_SIZE, 4 * K); f.setParameter(MAX_CONCURRENT_STREAMS, 35); - f.setParameter(INITIAL_WINDOW_SIZE, 16 * K); + f.setParameter(INITIAL_WINDOW_SIZE, Http2Connection.INITIAL_WINDOW_SIZE); f.setParameter(MAX_FRAME_SIZE, 16 * K); return f; } diff --git a/src/java.httpclient/share/classes/java/net/http/Stream.java b/src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java @@ -111,7 +111,7 @@ HttpResponse.BodyProcessor responseProcessor; final HttpRequest.BodyProcessor requestProcessor; HttpResponse response; - + final WindowUpdateSender windowUpdater; // state flags boolean requestSent, responseReceived; @@ -163,8 +163,10 @@ Http2Frame frame; DataFrame df = null; try { + boolean endOfStream; do { frame = inputQ.take(); + endOfStream = frame.getFlag(DataFrame.END_STREAM); if (!(frame instanceof DataFrame)) { assert false; continue; @@ -176,8 +178,12 @@ responseFlowController.take(); responseProcessor.onResponseBodyChunk(b); } - sendWindowUpdate(len); - } while (!df.getFlag(DataFrame.END_STREAM)); + connection.windowUpdater.update(len); + if(!endOfStream) { + // if we got the last frame in the stream we shouldn't send WindowUpdate + windowUpdater.update(len); + } + } while (!endOfStream); } catch (InterruptedException e) { throw new IOException(e); } @@ -198,22 +204,6 @@ return cf; } - private void sendWindowUpdate(int increment) - throws IOException, InterruptedException { - if (increment == 0) - return; - LinkedList list = new LinkedList<>(); - WindowUpdateFrame frame = new WindowUpdateFrame(); - frame.streamid(streamid); - frame.setUpdate(increment); - list.add(frame); - frame = new WindowUpdateFrame(); - frame.streamid(0); - frame.setUpdate(increment); - list.add(frame); - connection.sendFrames(list); - } - @Override CompletableFuture sendBodyAsync() { final CompletableFuture cf = new CompletableFuture<>(); @@ -245,6 +235,7 @@ this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW this.inputQ = new Queue<>(); + this.windowUpdater = new StreamWindowUpdateSender(connection); } @SuppressWarnings("unchecked") @@ -264,6 +255,7 @@ this.requestPseudoHeaders = new HttpHeadersImpl(); // NEW this.inputQ = new Queue<>(); + this.windowUpdater = new StreamWindowUpdateSender(connection); } /** @@ -852,4 +844,16 @@ resultCF.completeExceptionally(t); } } + + class StreamWindowUpdateSender extends WindowUpdateSender { + + StreamWindowUpdateSender(Http2Connection connection) { + super(connection); + } + + @Override + int getStreamId() { + return streamid; + } + } } diff --git a/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java b/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java new file mode 100644 --- /dev/null +++ b/src/java.httpclient/share/classes/java/net/http/WindowUpdateSender.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This code is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 only, as + * published by the Free Software Foundation. Oracle designates this + * particular file as subject to the "Classpath" exception as provided + * by Oracle in the LICENSE file that accompanied this code. + * + * This code is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * version 2 for more details (a copy is included in the LICENSE file that + * accompanied this code). + * + * You should have received a copy of the GNU General Public License version + * 2 along with this work; if not, write to the Free Software Foundation, + * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. + * + * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA + * or visit www.oracle.com if you need additional information or have any + */ +package java.net.http; + +import java.util.concurrent.atomic.AtomicInteger; + +abstract class WindowUpdateSender { + + + final int limit; + final Http2Connection connection; + final AtomicInteger received = new AtomicInteger(0); + + WindowUpdateSender(Http2Connection connection) { + this(connection, connection.clientSettings.getParameter(SettingsFrame.INITIAL_WINDOW_SIZE)); + } + + WindowUpdateSender(Http2Connection connection, int initWindowSize) { + this(connection, connection.getMaxReceiveFrameSize(), initWindowSize); + } + + WindowUpdateSender(Http2Connection connection, int maxFrameSize, int initWindowSize) { + this.connection = connection; + int v0 = Math.max(0, initWindowSize - maxFrameSize); + int v1 = (initWindowSize + (maxFrameSize - 1)) / maxFrameSize; + v1 = v1 * maxFrameSize / 2; + // send WindowUpdate heuristic: + // - we got data near half of window size + // or + // - remaining window size reached max frame size. + limit = Math.min(v0, v1); + } + + abstract int getStreamId(); + + void update(int delta) { + if (received.addAndGet(delta) > limit) { + synchronized (this) { + int tosend = received.get(); + if( tosend > limit) { + received.getAndAdd(-tosend); + sendWindowUpdate(tosend); + } + } + } + } + + void sendWindowUpdate(int delta) { + WindowUpdateFrame frame = new WindowUpdateFrame(); + frame.streamid(getStreamId()); + frame.setUpdate(delta); + connection.sendUnorderedFrame(frame); + } + + +} # HG changeset patch # User Sergey Kuksenko # Date 1468013159 25200 # Fri Jul 08 14:25:59 2016 -0700 # Node ID 0bc445e98bf3711119de195935cd6c36c6296e0d # Parent a82d07d9c195b955a606fa907b4454c400a20ad3 fix flow controller diff --git a/src/java.httpclient/share/classes/java/net/http/Stream.java b/src/java.httpclient/share/classes/java/net/http/Stream.java --- a/src/java.httpclient/share/classes/java/net/http/Stream.java +++ b/src/java.httpclient/share/classes/java/net/http/Stream.java @@ -466,13 +466,13 @@ public synchronized void take(int amount) throws InterruptedException { assert permits >= 0; - while (permits < amount) { + do { int n = Math.min(amount, permits); permits -= n; amount -= n; if (amount > 0) wait(); - } + } while (permits < amount); } } @@ -498,7 +498,7 @@ DataFrame getDataFrame() throws IOException, InterruptedException { userRequestFlowController.take(); - int maxpayloadLen = connection.getMaxSendFrameSize() - 9; + int maxpayloadLen = connection.getMaxSendFrameSize(); ByteBuffer buffer = connection.getBuffer(); buffer.limit(maxpayloadLen); boolean complete = requestProcessor.onRequestBodyChunk(buffer);