1 /*
   2  * Copyright (c) 2015, 2018, Oracle and/or its affiliates. All rights reserved.
   3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   4  *
   5  * This code is free software; you can redistribute it and/or modify it
   6  * under the terms of the GNU General Public License version 2 only, as
   7  * published by the Free Software Foundation.  Oracle designates this
   8  * particular file as subject to the "Classpath" exception as provided
   9  * by Oracle in the LICENSE file that accompanied this code.
  10  *
  11  * This code is distributed in the hope that it will be useful, but WITHOUT
  12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
  14  * version 2 for more details (a copy is included in the LICENSE file that
  15  * accompanied this code).
  16  *
  17  * You should have received a copy of the GNU General Public License version
  18  * 2 along with this work; if not, write to the Free Software Foundation,
  19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
  20  *
  21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
  22  * or visit www.oracle.com if you need additional information or have any
  23  * questions.
  24  */
  25 
  26 package jdk.internal.net.http;
  27 
  28 import java.io.Closeable;
  29 import java.io.IOException;
  30 import java.lang.System.Logger.Level;
  31 import java.net.InetSocketAddress;
  32 import java.nio.ByteBuffer;
  33 import java.nio.channels.SocketChannel;
  34 import java.util.Arrays;
  35 import java.util.IdentityHashMap;
  36 import java.util.List;
  37 import java.util.Map;
  38 import java.util.TreeMap;
  39 import java.util.concurrent.CompletableFuture;
  40 import java.util.concurrent.CompletionStage;
  41 import java.util.concurrent.ConcurrentLinkedDeque;
  42 import java.util.concurrent.Flow;
  43 import java.util.function.BiPredicate;
  44 import java.util.function.Predicate;
  45 import java.net.http.HttpClient;
  46 import java.net.http.HttpClient.Version;
  47 import java.net.http.HttpHeaders;
  48 import jdk.internal.net.http.common.Demand;
  49 import jdk.internal.net.http.common.FlowTube;
  50 import jdk.internal.net.http.common.SequentialScheduler;
  51 import jdk.internal.net.http.common.SequentialScheduler.DeferredCompleter;
  52 import jdk.internal.net.http.common.Log;
  53 import jdk.internal.net.http.common.Utils;
  54 import static java.net.http.HttpClient.Version.HTTP_2;
  55 
  56 /**
  57  * Wraps socket channel layer and takes care of SSL also.
  58  *
  59  * Subtypes are:
  60  *      PlainHttpConnection: regular direct TCP connection to server
  61  *      PlainProxyConnection: plain text proxy connection
  62  *      PlainTunnelingConnection: opens plain text (CONNECT) tunnel to server
  63  *      AsyncSSLConnection: TLS channel direct to server
  64  *      AsyncSSLTunnelConnection: TLS channel via (CONNECT) proxy tunnel
  65  */
  66 abstract class HttpConnection implements Closeable {
  67 
  68     static final boolean DEBUG = Utils.DEBUG; // Revisit: temporary dev flag.
  69     final System.Logger  debug = Utils.getDebugLogger(this::dbgString, DEBUG);
  70     final static System.Logger DEBUG_LOGGER = Utils.getDebugLogger(
  71             () -> "HttpConnection(SocketTube(?))", DEBUG);
  72 
  73     /** The address this connection is connected to. Could be a server or a proxy. */
  74     final InetSocketAddress address;
  75     private final HttpClientImpl client;
  76     private final TrailingOperations trailingOperations;
  77 
  78     HttpConnection(InetSocketAddress address, HttpClientImpl client) {
  79         this.address = address;
  80         this.client = client;
  81         trailingOperations = new TrailingOperations();
  82     }
  83 
  84     private static final class TrailingOperations {
  85         private final Map<CompletionStage<?>, Boolean> operations =
  86                 new IdentityHashMap<>();
  87         void add(CompletionStage<?> cf) {
  88             synchronized(operations) {
  89                 cf.whenComplete((r,t)-> remove(cf));
  90                 operations.put(cf, Boolean.TRUE);
  91             }
  92         }
  93         boolean remove(CompletionStage<?> cf) {
  94             synchronized(operations) {
  95                 return operations.remove(cf);
  96             }
  97         }
  98     }
  99 
 100     final void addTrailingOperation(CompletionStage<?> cf) {
 101         trailingOperations.add(cf);
 102     }
 103 
 104 //    final void removeTrailingOperation(CompletableFuture<?> cf) {
 105 //        trailingOperations.remove(cf);
 106 //    }
 107 
 108     final HttpClientImpl client() {
 109         return client;
 110     }
 111 
 112     //public abstract void connect() throws IOException, InterruptedException;
 113 
 114     public abstract CompletableFuture<Void> connectAsync();
 115 
 116     /** Tells whether, or not, this connection is connected to its destination. */
 117     abstract boolean connected();
 118 
 119     /** Tells whether, or not, this connection is secure ( over SSL ) */
 120     abstract boolean isSecure();
 121 
 122     /** Tells whether, or not, this connection is proxied. */
 123     abstract boolean isProxied();
 124 
 125     /** Tells whether, or not, this connection is open. */
 126     final boolean isOpen() {
 127         return channel().isOpen() &&
 128                 (connected() ? !getConnectionFlow().isFinished() : true);
 129     }
 130 
 131     interface HttpPublisher extends FlowTube.TubePublisher {
 132         void enqueue(List<ByteBuffer> buffers) throws IOException;
 133         void enqueueUnordered(List<ByteBuffer> buffers) throws IOException;
 134         void signalEnqueued() throws IOException;
 135     }
 136 
 137     /**
 138      * Returns the HTTP publisher associated with this connection.  May be null
 139      * if invoked before connecting.
 140      */
 141     abstract HttpPublisher publisher();
 142 
 143     // HTTP/2 MUST use TLS version 1.2 or higher for HTTP/2 over TLS
 144     private static final Predicate<String> testRequiredHTTP2TLSVersion = proto ->
 145             proto.equals("TLSv1.2") || proto.equals("TLSv1.3");
 146 
 147    /**
 148     * Returns true if the given client's SSL parameter protocols contains at
 149     * least one TLS version that HTTP/2 requires.
 150     */
 151    private static final boolean hasRequiredHTTP2TLSVersion(HttpClient client) {
 152        String[] protos = client.sslParameters().getProtocols();
 153        if (protos != null) {
 154            return Arrays.stream(protos).filter(testRequiredHTTP2TLSVersion).findAny().isPresent();
 155        } else {
 156            return false;
 157        }
 158    }
 159 
 160     /**
 161      * Factory for retrieving HttpConnections. A connection can be retrieved
 162      * from the connection pool, or a new one created if none available.
 163      *
 164      * The given {@code addr} is the ultimate destination. Any proxies,
 165      * etc, are determined from the request. Returns a concrete instance which
 166      * is one of the following:
 167      *      {@link PlainHttpConnection}
 168      *      {@link PlainTunnelingConnection}
 169      *
 170      * The returned connection, if not from the connection pool, must have its,
 171      * connect() or connectAsync() method invoked, which ( when it completes
 172      * successfully ) renders the connection usable for requests.
 173      */
 174     public static HttpConnection getConnection(InetSocketAddress addr,
 175                                                HttpClientImpl client,
 176                                                HttpRequestImpl request,
 177                                                Version version) {
 178         HttpConnection c = null;
 179         InetSocketAddress proxy = request.proxy();
 180         if (proxy != null && proxy.isUnresolved()) {
 181             // The default proxy selector may select a proxy whose  address is
 182             // unresolved. We must resolve the address before connecting to it.
 183             proxy = new InetSocketAddress(proxy.getHostString(), proxy.getPort());
 184         }
 185         boolean secure = request.secure();
 186         ConnectionPool pool = client.connectionPool();
 187 
 188         if (!secure) {
 189             c = pool.getConnection(false, addr, proxy);
 190             if (c != null && c.isOpen() /* may have been eof/closed when in the pool */) {
 191                 final HttpConnection conn = c;
 192                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 193                             + ": plain connection retrieved from HTTP/1.1 pool");
 194                 return c;
 195             } else {
 196                 return getPlainConnection(addr, proxy, request, client);
 197             }
 198         } else {  // secure
 199             if (version != HTTP_2) { // only HTTP/1.1 connections are in the pool
 200                 c = pool.getConnection(true, addr, proxy);
 201             }
 202             if (c != null && c.isOpen()) {
 203                 final HttpConnection conn = c;
 204                 DEBUG_LOGGER.log(Level.DEBUG, () -> conn.getConnectionFlow()
 205                             + ": SSL connection retrieved from HTTP/1.1 pool");
 206                 return c;
 207             } else {
 208                 String[] alpn = null;
 209                 if (version == HTTP_2 && hasRequiredHTTP2TLSVersion(client)) {
 210                     alpn = new String[] { "h2", "http/1.1" };
 211                 }
 212                 return getSSLConnection(addr, proxy, alpn, request, client);
 213             }
 214         }
 215     }
 216 
 217     private static HttpConnection getSSLConnection(InetSocketAddress addr,
 218                                                    InetSocketAddress proxy,
 219                                                    String[] alpn,
 220                                                    HttpRequestImpl request,
 221                                                    HttpClientImpl client) {
 222         if (proxy != null)
 223             return new AsyncSSLTunnelConnection(addr, client, alpn, proxy,
 224                                                 proxyTunnelHeaders(request));
 225         else
 226             return new AsyncSSLConnection(addr, client, alpn);
 227     }
 228 
 229     /**
 230      * This method is used to build a filter that will accept or
 231      * veto (header-name, value) tuple for transmission on the
 232      * wire.
 233      * The filter is applied to the headers when sending the headers
 234      * to the remote party.
 235      * Which tuple is accepted/vetoed depends on:
 236      * <pre>
 237      *    - whether the connection is a tunnel connection
 238      *      [talking to a server through a proxy tunnel]
 239      *    - whether the method is CONNECT
 240      *      [establishing a CONNECT tunnel through a proxy]
 241      *    - whether the request is using a proxy
 242      *      (and the connection is not a tunnel)
 243      *      [talking to a server through a proxy]
 244      *    - whether the request is a direct connection to
 245      *      a server (no tunnel, no proxy).
 246      * </pre>
 247      * @param request
 248      * @return
 249      */
 250     BiPredicate<String,List<String>> headerFilter(HttpRequestImpl request) {
 251         if (isTunnel()) {
 252             // talking to a server through a proxy tunnel
 253             // don't send proxy-* headers to a plain server
 254             assert !request.isConnect();
 255             return Utils.NO_PROXY_HEADERS_FILTER;
 256         } else if (request.isConnect()) {
 257             // establishing a proxy tunnel
 258             // check for proxy tunnel disabled schemes
 259             // assert !this.isTunnel();
 260             assert request.proxy() == null;
 261             return Utils.PROXY_TUNNEL_FILTER;
 262         } else if (request.proxy() != null) {
 263             // talking to a server through a proxy (no tunnel)
 264             // check for proxy disabled schemes
 265             // assert !isTunnel() && !request.isConnect();
 266             return Utils.PROXY_FILTER;
 267         } else {
 268             // talking to a server directly (no tunnel, no proxy)
 269             // don't send proxy-* headers to a plain server
 270             // assert request.proxy() == null && !request.isConnect();
 271             return Utils.NO_PROXY_HEADERS_FILTER;
 272         }
 273     }
 274 
 275     // Composes a new immutable HttpHeaders that combines the
 276     // user and system header but only keeps those headers that
 277     // start with "proxy-"
 278     private static HttpHeaders proxyTunnelHeaders(HttpRequestImpl request) {
 279         Map<String, List<String>> combined = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
 280         combined.putAll(request.getSystemHeaders().map());
 281         combined.putAll(request.headers().map()); // let user override system
 282 
 283         // keep only proxy-* - and also strip authorization headers
 284         // for disabled schemes
 285         return ImmutableHeaders.of(combined, Utils.PROXY_TUNNEL_FILTER);
 286     }
 287 
 288     /* Returns either a plain HTTP connection or a plain tunnelling connection
 289      * for proxied WebSocket */
 290     private static HttpConnection getPlainConnection(InetSocketAddress addr,
 291                                                      InetSocketAddress proxy,
 292                                                      HttpRequestImpl request,
 293                                                      HttpClientImpl client) {
 294         if (request.isWebSocket() && proxy != null)
 295             return new PlainTunnelingConnection(addr, proxy, client,
 296                                                 proxyTunnelHeaders(request));
 297 
 298         if (proxy == null)
 299             return new PlainHttpConnection(addr, client);
 300         else
 301             return new PlainProxyConnection(proxy, client);
 302     }
 303 
 304     void closeOrReturnToCache(HttpHeaders hdrs) {
 305         if (hdrs == null) {
 306             // the connection was closed by server, eof
 307             close();
 308             return;
 309         }
 310         if (!isOpen()) {
 311             return;
 312         }
 313         HttpClientImpl client = client();
 314         if (client == null) {
 315             close();
 316             return;
 317         }
 318         ConnectionPool pool = client.connectionPool();
 319         boolean keepAlive = hdrs.firstValue("Connection")
 320                 .map((s) -> !s.equalsIgnoreCase("close"))
 321                 .orElse(true);
 322 
 323         if (keepAlive) {
 324             Log.logTrace("Returning connection to the pool: {0}", this);
 325             pool.returnToPool(this);
 326         } else {
 327             close();
 328         }
 329     }
 330 
 331     /* Tells whether or not this connection is a tunnel through a proxy */
 332     boolean isTunnel() { return false; }
 333 
 334     abstract SocketChannel channel();
 335 
 336     final InetSocketAddress address() {
 337         return address;
 338     }
 339 
 340     abstract ConnectionPool.CacheKey cacheKey();
 341 
 342     /**
 343      * Closes this connection, by returning the socket to its connection pool.
 344      */
 345     @Override
 346     public abstract void close();
 347 
 348     abstract void shutdownInput() throws IOException;
 349 
 350     abstract void shutdownOutput() throws IOException;
 351 
 352     // Support for WebSocket/RawChannelImpl which unfortunately
 353     // still depends on synchronous read/writes.
 354     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 355     abstract static class DetachedConnectionChannel implements Closeable {
 356         DetachedConnectionChannel() {}
 357         abstract SocketChannel channel();
 358         abstract long write(ByteBuffer[] buffers, int start, int number)
 359                 throws IOException;
 360         abstract void shutdownInput() throws IOException;
 361         abstract void shutdownOutput() throws IOException;
 362         abstract ByteBuffer read() throws IOException;
 363         @Override
 364         public abstract void close();
 365         @Override
 366         public String toString() {
 367             return this.getClass().getSimpleName() + ": " + channel().toString();
 368         }
 369     }
 370 
 371     // Support for WebSocket/RawChannelImpl which unfortunately
 372     // still depends on synchronous read/writes.
 373     // It should be removed when RawChannelImpl moves to using asynchronous APIs.
 374     abstract DetachedConnectionChannel detachChannel();
 375 
 376     abstract FlowTube getConnectionFlow();
 377 
 378     /**
 379      * A publisher that makes it possible to publish (write)
 380      * ordered (normal priority) and unordered (high priority)
 381      * buffers downstream.
 382      */
 383     final class PlainHttpPublisher implements HttpPublisher {
 384         final Object reading;
 385         PlainHttpPublisher() {
 386             this(new Object());
 387         }
 388         PlainHttpPublisher(Object readingLock) {
 389             this.reading = readingLock;
 390         }
 391         final ConcurrentLinkedDeque<List<ByteBuffer>> queue = new ConcurrentLinkedDeque<>();
 392         volatile Flow.Subscriber<? super List<ByteBuffer>> subscriber;
 393         volatile HttpWriteSubscription subscription;
 394         final SequentialScheduler writeScheduler =
 395                     new SequentialScheduler(this::flushTask);
 396         @Override
 397         public void subscribe(Flow.Subscriber<? super List<ByteBuffer>> subscriber) {
 398             synchronized (reading) {
 399                 //assert this.subscription == null;
 400                 //assert this.subscriber == null;
 401                 if (subscription == null) {
 402                     subscription = new HttpWriteSubscription();
 403                 }
 404                 this.subscriber = subscriber;
 405             }
 406             // TODO: should we do this in the flow?
 407             subscriber.onSubscribe(subscription);
 408             signal();
 409         }
 410 
 411         void flushTask(DeferredCompleter completer) {
 412             try {
 413                 HttpWriteSubscription sub = subscription;
 414                 if (sub != null) sub.flush();
 415             } finally {
 416                 completer.complete();
 417             }
 418         }
 419 
 420         void signal() {
 421             writeScheduler.runOrSchedule();
 422         }
 423 
 424         final class HttpWriteSubscription implements Flow.Subscription {
 425             final Demand demand = new Demand();
 426 
 427             @Override
 428             public void request(long n) {
 429                 if (n <= 0) throw new IllegalArgumentException("non-positive request");
 430                 demand.increase(n);
 431                 debug.log(Level.DEBUG, () -> "HttpPublisher: got request of "
 432                             + n + " from "
 433                             + getConnectionFlow());
 434                 writeScheduler.runOrSchedule();
 435             }
 436 
 437             @Override
 438             public void cancel() {
 439                 debug.log(Level.DEBUG, () -> "HttpPublisher: cancelled by "
 440                           + getConnectionFlow());
 441             }
 442 
 443             void flush() {
 444                 while (!queue.isEmpty() && demand.tryDecrement()) {
 445                     List<ByteBuffer> elem = queue.poll();
 446                     debug.log(Level.DEBUG, () -> "HttpPublisher: sending "
 447                                 + Utils.remaining(elem) + " bytes ("
 448                                 + elem.size() + " buffers) to "
 449                                 + getConnectionFlow());
 450                     subscriber.onNext(elem);
 451                 }
 452             }
 453         }
 454 
 455         @Override
 456         public void enqueue(List<ByteBuffer> buffers) throws IOException {
 457             queue.add(buffers);
 458             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 459             debug.log(Level.DEBUG, "added %d bytes to the write queue", bytes);
 460         }
 461 
 462         @Override
 463         public void enqueueUnordered(List<ByteBuffer> buffers) throws IOException {
 464             // Unordered frames are sent before existing frames.
 465             int bytes = buffers.stream().mapToInt(ByteBuffer::remaining).sum();
 466             queue.addFirst(buffers);
 467             debug.log(Level.DEBUG, "inserted %d bytes in the write queue", bytes);
 468         }
 469 
 470         @Override
 471         public void signalEnqueued() throws IOException {
 472             debug.log(Level.DEBUG, "signalling the publisher of the write queue");
 473             signal();
 474         }
 475     }
 476 
 477     String dbgTag = null;
 478     final String dbgString() {
 479         FlowTube flow = getConnectionFlow();
 480         String tag = dbgTag;
 481         if (tag == null && flow != null) {
 482             dbgTag = tag = this.getClass().getSimpleName() + "(" + flow + ")";
 483         } else if (tag == null) {
 484             tag = this.getClass().getSimpleName() + "(?)";
 485         }
 486         return tag;
 487     }
 488 
 489     @Override
 490     public String toString() {
 491         return "HttpConnection: " + channel().toString();
 492     }
 493 }