< prev index next >

src/java.net.http/share/classes/jdk/internal/net/http/Stream.java

Print this page

        

@@ -21,31 +21,35 @@
  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  * or visit www.oracle.com if you need additional information or have any
  * questions.
  */
 
-package jdk.incubator.http;
+package jdk.internal.net.http;
 
 import java.io.IOException;
 import java.lang.System.Logger.Level;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Flow;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.atomic.AtomicReference;
-import jdk.incubator.http.HttpResponse.BodySubscriber;
-import jdk.incubator.http.internal.common.*;
-import jdk.incubator.http.internal.frame.*;
-import jdk.incubator.http.internal.hpack.DecodingCallback;
+import java.util.function.BiPredicate;
+import java.net.http.HttpClient;
+import java.net.http.HttpHeaders;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+import java.net.http.HttpResponse.BodySubscriber;
+import jdk.internal.net.http.common.*;
+import jdk.internal.net.http.frame.*;
+import jdk.internal.net.http.hpack.DecodingCallback;
 
 /**
  * Http/2 Stream handling.
  *
  * REQUESTS

@@ -148,10 +152,11 @@
     private void schedule() {
         if (responseSubscriber == null)
             // can't process anything yet
             return;
 
+        try {
         while (!inputQ.isEmpty()) {
             Http2Frame frame  = inputQ.peek();
             if (frame instanceof ResetFrame) {
                 inputQ.remove();
                 handleReset((ResetFrame)frame);

@@ -186,10 +191,14 @@
                 }
             } else {
                 return;
             }
         }
+        } catch (Throwable throwable) {
+            failed = throwable;
+        }
+
         Throwable t = failed;
         if (t != null) {
             sched.stop();
             responseSubscriber.onError(t);
             close();

@@ -220,13 +229,13 @@
                                        boolean returnConnectionToPool,
                                        Executor executor)
     {
         Log.logTrace("Reading body on stream {0}", streamid);
         BodySubscriber<T> bodySubscriber = handler.apply(responseCode, responseHeaders);
-        CompletableFuture<T> cf = receiveData(bodySubscriber);
+        CompletableFuture<T> cf = receiveData(bodySubscriber, executor);
 
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(e));
         }
         return cf;

@@ -251,12 +260,22 @@
         sched.runOrSchedule();
     }
 
     // pushes entire response body into response subscriber
     // blocking when required by local or remote flow control
-    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber) {
-        responseBodyCF = MinimalFuture.of(bodySubscriber.getBody());
+    CompletableFuture<T> receiveData(BodySubscriber<T> bodySubscriber, Executor executor) {
+        responseBodyCF = new MinimalFuture<>();
+        // We want to allow the subscriber's getBody() method to block so it
+        // can work with InputStreams. So, we offload execution.
+        executor.execute(() -> {
+            bodySubscriber.getBody().whenComplete((T body, Throwable t) -> {
+                if (t == null)
+                    responseBodyCF.complete(body);
+                else
+                    responseBodyCF.completeExceptionally(t);
+            });
+        });
 
         if (isCanceled()) {
             Throwable t = getCancelCause();
             responseBodyCF.completeExceptionally(t);
         } else {

@@ -418,61 +437,59 @@
                 connection.resetStream(streamid, ResetFrame.FLOW_CONTROL_ERROR);
             }
         }
     }
 
-    void incoming_pushPromise(HttpRequestImpl pushReq,
-                              PushedStream<?,T> pushStream)
+    void incoming_pushPromise(HttpRequestImpl pushRequest,
+                              PushedStream<T> pushStream)
         throws IOException
     {
         if (Log.requests()) {
-            Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
+            Log.logRequest("PUSH_PROMISE: " + pushRequest.toString());
         }
-        PushGroup<?,T> pushGroup = exchange.getPushGroup();
+        PushGroup<T> pushGroup = exchange.getPushGroup();
         if (pushGroup == null) {
             Log.logTrace("Rejecting push promise stream " + streamid);
             connection.resetStream(pushStream.streamid, ResetFrame.REFUSED_STREAM);
             pushStream.close();
             return;
         }
 
-        HttpResponse.MultiSubscriber<?,T> proc = pushGroup.subscriber();
-
-        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
-
-        Optional<HttpResponse.BodyHandler<T>> bpOpt =
-                pushGroup.handlerForPushRequest(pushReq);
+        PushGroup.Acceptor<T> acceptor = pushGroup.acceptPushRequest(pushRequest);
 
-        if (!bpOpt.isPresent()) {
-            IOException ex = new IOException("Stream "
-                 + streamid + " cancelled by user");
+        if (!acceptor.accepted()) {
+            // cancel / reject
+            IOException ex = new IOException("Stream " + streamid + " cancelled by users handler");
             if (Log.trace()) {
-                Log.logTrace("No body subscriber for {0}: {1}", pushReq,
+                Log.logTrace("No body subscriber for {0}: {1}", pushRequest,
                             ex.getMessage());
             }
             pushStream.cancelImpl(ex);
-            cf.completeExceptionally(ex);
             return;
         }
 
-        pushGroup.addPush();
+        CompletableFuture<HttpResponse<T>> pushResponseCF = acceptor.cf();
+        HttpResponse.BodyHandler<T> pushHandler = acceptor.bodyHandler();
+        assert pushHandler != null;
+
         pushStream.requestSent();
-        pushStream.setPushHandler(bpOpt.get());
+        pushStream.setPushHandler(pushHandler);  // TODO: could wrap the handler to throw on acceptPushPromise ?
         // setup housekeeping for when the push is received
         // TODO: deal with ignoring of CF anti-pattern
+        CompletableFuture<HttpResponse<T>> cf = pushStream.responseCF();
         cf.whenComplete((HttpResponse<T> resp, Throwable t) -> {
             t = Utils.getCompletionCause(t);
             if (Log.trace()) {
                 Log.logTrace("Push completed on stream {0} for {1}{2}",
                              pushStream.streamid, resp,
                              ((t==null) ? "": " with exception " + t));
             }
             if (t != null) {
                 pushGroup.pushError(t);
-                proc.onError(pushReq, t);
+                pushResponseCF.completeExceptionally(t);
             } else {
-                proc.onResponse(resp);
+                pushResponseCF.complete(resp);
             }
             pushGroup.pushCompleted();
         });
 
     }

@@ -481,18 +498,63 @@
         HttpHeadersImpl h = request.getSystemHeaders();
         if (contentLength > 0) {
             h.setHeader("content-length", Long.toString(contentLength));
         }
         setPseudoHeaderFields();
-        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(h, request.getUserHeaders(), this);
+        HttpHeaders sysh = filter(h);
+        HttpHeaders userh = filter(request.getUserHeaders());
+        OutgoingHeaders<Stream<T>> f = new OutgoingHeaders<>(sysh, userh, this);
         if (contentLength == 0) {
             f.setFlag(HeadersFrame.END_STREAM);
             endStreamSent = true;
         }
         return f;
     }
 
+    private boolean hasProxyAuthorization(HttpHeaders headers) {
+        return headers.firstValue("proxy-authorization")
+                      .isPresent();
+    }
+
+    // Determines whether we need to build a new HttpHeader object.
+    //
+    // Ideally we should pass the filter to OutgoingHeaders refactor the
+    // code that creates the HeaderFrame to honor the filter.
+    // We're not there yet - so depending on the filter we need to
+    // apply and the content of the header we will try to determine
+    //  whether anything might need to be filtered.
+    // If nothing needs filtering then we can just use the
+    // original headers.
+    private boolean needsFiltering(HttpHeaders headers,
+                                   BiPredicate<String, List<String>> filter) {
+        if (filter == Utils.PROXY_TUNNEL_FILTER || filter == Utils.PROXY_FILTER) {
+            // we're either connecting or proxying
+            // slight optimization: we only need to filter out
+            // disabled schemes, so if there are none just
+            // pass through.
+            return Utils.proxyHasDisabledSchemes(filter == Utils.PROXY_TUNNEL_FILTER)
+                    && hasProxyAuthorization(headers);
+        } else {
+            // we're talking to a server, either directly or through
+            // a tunnel.
+            // Slight optimization: we only need to filter out
+            // proxy authorization headers, so if there are none just
+            // pass through.
+            return hasProxyAuthorization(headers);
+        }
+    }
+
+    private HttpHeaders filter(HttpHeaders headers) {
+        HttpConnection conn = connection();
+        BiPredicate<String, List<String>> filter =
+                conn.headerFilter(request);
+        if (needsFiltering(headers, filter)) {
+            return ImmutableHeaders.of(headers.map(), filter);
+        }
+        return headers;
+    }
+
     private void setPseudoHeaderFields() {
         HttpHeadersImpl hdrs = requestPseudoHeaders;
         String method = request.method();
         hdrs.setHeader(":method", method);
         URI uri = request.uri();

@@ -765,11 +827,11 @@
     DataFrame getDataFrame(ByteBuffer buffer) {
         int requestAmount = Math.min(connection.getMaxSendFrameSize(), buffer.remaining());
         // blocks waiting for stream send window, if exhausted
         int actualAmount = windowController.tryAcquire(requestAmount, streamid, this);
         if (actualAmount <= 0) return null;
-        ByteBuffer outBuf = Utils.slice(buffer,  actualAmount);
+        ByteBuffer outBuf = Utils.sliceWithLimitedCapacity(buffer,  actualAmount);
         DataFrame df = new DataFrame(streamid, 0 , outBuf);
         return df;
     }
 
     private DataFrame getEmptyEndStreamDataFrame()  {

@@ -810,11 +872,11 @@
         if (executor != null && !cf.isDone()) {
             // protect from executing later chain of CompletableFuture operations from SelectorManager thread
             cf = cf.thenApplyAsync(r -> r, executor);
         }
         Log.logTrace("Response future (stream={0}) is: {1}", streamid, cf);
-        PushGroup<?,?> pg = exchange.getPushGroup();
+        PushGroup<?> pg = exchange.getPushGroup();
         if (pg != null) {
             // if an error occurs make sure it is recorded in the PushGroup
             cf = cf.whenComplete((t,e) -> pg.pushError(Utils.getCompletionCause(e)));
         }
         return cf;

@@ -949,28 +1011,29 @@
         Log.logTrace("Closing stream {0}", streamid);
         connection.closeStream(streamid);
         Log.logTrace("Stream {0} closed", streamid);
     }
 
-    static class PushedStream<U,T> extends Stream<T> {
-        final PushGroup<U,T> pushGroup;
+    static class PushedStream<T> extends Stream<T> {
+        final PushGroup<T> pushGroup;
         // push streams need the response CF allocated up front as it is
         // given directly to user via the multi handler callback function.
         final CompletableFuture<Response> pushCF;
-        final CompletableFuture<HttpResponse<T>> responseCF;
+        CompletableFuture<HttpResponse<T>> responseCF;
         final HttpRequestImpl pushReq;
         HttpResponse.BodyHandler<T> pushHandler;
 
-        PushedStream(PushGroup<U,T> pushGroup,
+        PushedStream(PushGroup<T> pushGroup,
                      Http2Connection connection,
                      Exchange<T> pushReq) {
             // ## no request body possible, null window controller
             super(connection, pushReq, null);
             this.pushGroup = pushGroup;
             this.pushReq = pushReq.request();
             this.pushCF = new MinimalFuture<>();
             this.responseCF = new MinimalFuture<>();
+
         }
 
         CompletableFuture<HttpResponse<T>> responseCF() {
             return responseCF;
         }
< prev index next >