1 /*
2 * Copyright (c) 2015, 2016, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 */
24
25 package java.net.http;
26
27 import sun.net.httpclient.hpack.DecodingCallback;
28
29 import java.io.IOException;
30 import java.net.URI;
31 import java.nio.ByteBuffer;
32 import java.util.LinkedList;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.concurrent.CompletableFuture;
36 import java.util.concurrent.CompletionException;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.TimeUnit;
39 import java.util.concurrent.TimeoutException;
40 import java.util.function.BiFunction;
41 import java.util.function.LongConsumer;
42
43 /**
44 * Http/2 Stream handling.
45 *
46 * REQUESTS
47 *
48 * sendHeadersOnly() -- assembles HEADERS frame and puts on connection outbound Q
49 *
50 * sendRequest() -- sendHeadersOnly() + sendBody()
51 *
52 * sendBody() -- in calling thread: obeys all flow control (so may block)
53 * obtains data from request body processor and places on connection
54 * outbound Q.
55 *
56 * sendBodyAsync() -- calls sendBody() in an executor thread.
57 *
58 * sendHeadersAsync() -- calls sendHeadersOnly() which does not block
59 *
60 * sendRequestAsync() -- calls sendRequest() in an executor thread
61 *
62 * RESPONSES
63 *
64 * Multiple responses can be received per request. Responses are queued up on
65 * a LinkedList of CF<HttpResponse> and the the first one on the list is completed
66 * with the next response
67 *
68 * getResponseAsync() -- queries list of response CFs and returns first one
69 * if one exists. Otherwise, creates one and adds it to list
70 * and returns it. Completion is achieved through the
71 * incoming() upcall from connection reader thread.
72 *
73 * getResponse() -- calls getResponseAsync() and waits for CF to complete
74 *
75 * responseBody() -- in calling thread: blocks for incoming DATA frames on
76 * stream inputQ. Obeys remote and local flow control so may block.
77 * Calls user response body processor with data buffers.
78 *
79 * responseBodyAsync() -- calls responseBody() in an executor thread.
80 *
81 * incoming() -- entry point called from connection reader thread. Frames are
82 * either handled immediately without blocking or for data frames
83 * placed on the stream's inputQ which is consumed by the stream's
84 * reader thread.
85 *
86 * PushedStream sub class
87 * ======================
88 * Sending side methods are not used because the request comes from a PUSH_PROMISE
89 * frame sent by the server. When a PUSH_PROMISE is received the PushedStream
90 * is created. PushedStream does not use responseCF list as there can be only
91 * one response. The CF is created when the object created and when the response
92 * HEADERS frame is received the object is completed.
93 */
94 class Stream extends ExchangeImpl {
95
96 final Queue<Http2Frame> inputQ;
97
98 volatile int streamid;
99
100 long responseContentLen = -1;
101 long responseBytesProcessed = 0;
102 long requestContentLen;
103
104 Http2Connection connection;
105 HttpClientImpl client;
106 final HttpRequestImpl request;
107 final DecodingCallback rspHeadersConsumer;
108 HttpHeadersImpl responseHeaders;
109 final HttpHeadersImpl requestHeaders;
110 final HttpHeadersImpl requestPseudoHeaders;
111 HttpResponse.BodyProcessor<?> responseProcessor;
112 final HttpRequest.BodyProcessor requestProcessor;
113 HttpResponse response;
114
115 // state flags
116 boolean requestSent, responseReceived;
117
118 final FlowController userRequestFlowController =
119 new FlowController();
120 final FlowController remoteRequestFlowController =
121 new FlowController();
122 final FlowController responseFlowController =
123 new FlowController();
124
125 final ExecutorWrapper executor;
126
127 @Override
128 @SuppressWarnings("unchecked")
129 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
130 this.responseProcessor = processor;
131 CompletableFuture<T> cf;
132 try {
133 T body = processor.onResponseBodyStart(
134 responseContentLen, responseHeaders,
135 responseFlowController); // TODO: filter headers
136 if (body != null) {
137 cf = CompletableFuture.completedFuture(body);
138 receiveDataAsync(processor);
139 } else
140 cf = receiveDataAsync(processor);
141 } catch (IOException e) {
142 cf = CompletableFuture.failedFuture(e);
143 }
144 PushGroup<?> pg = request.pushGroup();
145 if (pg != null) {
146 // if an error occurs make sure it is recorded in the PushGroup
147 cf = cf.whenComplete((t,e) -> pg.pushError(e));
148 }
149 return cf;
150 }
151
152 @Override
153 public String toString() {
154 StringBuilder sb = new StringBuilder();
155 sb.append("streamid: ")
156 .append(streamid);
157 return sb.toString();
158 }
159
160 // pushes entire response body into response processor
161 // blocking when required by local or remote flow control
162 void receiveData() throws IOException {
163 Http2Frame frame;
164 DataFrame df = null;
165 try {
166 do {
167 frame = inputQ.take();
168 if (!(frame instanceof DataFrame)) {
169 assert false;
170 continue;
171 }
172 df = (DataFrame) frame;
173 int len = df.getDataLength();
174 ByteBuffer[] buffers = df.getData();
175 for (ByteBuffer b : buffers) {
176 responseFlowController.take();
177 responseProcessor.onResponseBodyChunk(b);
178 }
179 sendWindowUpdate(len);
180 } while (!df.getFlag(DataFrame.END_STREAM));
181 } catch (InterruptedException e) {
182 throw new IOException(e);
183 }
184 }
185
186 private <T> CompletableFuture<T> receiveDataAsync(HttpResponse.BodyProcessor<T> processor) {
187 CompletableFuture<T> cf = new CompletableFuture<>();
188 executor.execute(() -> {
189 try {
190 receiveData();
191 T body = processor.onResponseComplete();
192 cf.complete(body);
193 responseReceived();
194 } catch (Throwable t) {
195 cf.completeExceptionally(t);
196 }
197 }, null);
198 return cf;
199 }
200
201 private void sendWindowUpdate(int increment)
202 throws IOException, InterruptedException {
203 if (increment == 0)
204 return;
205 LinkedList<Http2Frame> list = new LinkedList<>();
206 WindowUpdateFrame frame = new WindowUpdateFrame();
207 frame.streamid(streamid);
208 frame.setUpdate(increment);
209 list.add(frame);
210 frame = new WindowUpdateFrame();
211 frame.streamid(0);
212 frame.setUpdate(increment);
213 list.add(frame);
214 connection.sendFrames(list);
215 }
216
217 @Override
218 CompletableFuture<Void> sendBodyAsync() {
219 final CompletableFuture<Void> cf = new CompletableFuture<>();
220 executor.execute(() -> {
221 try {
222 sendBodyImpl();
223 cf.complete(null);
224 } catch (IOException | InterruptedException e) {
225 cf.completeExceptionally(e);
226 }
227 }, null);
228 return cf;
229 }
230
231 @SuppressWarnings("unchecked")
232 Stream(HttpClientImpl client, Http2Connection connection, Exchange e) {
233 super(e);
234 this.client = client;
235 this.connection = connection;
236 this.request = e.request();
237 this.requestProcessor = request.requestProcessor();
238 responseHeaders = new HttpHeadersImpl();
239 requestHeaders = new HttpHeadersImpl();
240 rspHeadersConsumer = (name, value) -> {
241 responseHeaders.addHeader(name.toString(), value.toString());
242 };
243 this.executor = client.executorWrapper();
244 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
245 this.requestPseudoHeaders = new HttpHeadersImpl();
246 // NEW
247 this.inputQ = new Queue<>();
248 }
249
250 @SuppressWarnings("unchecked")
251 Stream(HttpClientImpl client, Http2Connection connection, HttpRequestImpl req) {
252 super(null);
253 this.client = client;
254 this.connection = connection;
255 this.request = req;
256 this.requestProcessor = null;
257 responseHeaders = new HttpHeadersImpl();
258 requestHeaders = new HttpHeadersImpl();
259 rspHeadersConsumer = (name, value) -> {
260 responseHeaders.addHeader(name.toString(), value.toString());
261 };
262 this.executor = client.executorWrapper();
263 //this.response_cf = new CompletableFuture<HttpResponseImpl>();
264 this.requestPseudoHeaders = new HttpHeadersImpl();
265 // NEW
266 this.inputQ = new Queue<>();
267 }
268
269 /**
270 * Entry point from Http2Connection reader thread.
271 *
272 * Data frames will be removed by response body thread.
273 *
274 * @param frame
275 * @throws IOException
276 */
277 void incoming(Http2Frame frame) throws IOException, InterruptedException {
278 if ((frame instanceof HeaderFrame) && ((HeaderFrame)frame).endHeaders()) {
279 // Complete headers accumulated. handle response.
280 // It's okay if there are multiple HeaderFrames.
281 handleResponse();
282 } else if (frame instanceof DataFrame) {
283 inputQ.put(frame);
284 } else {
285 otherFrame(frame);
286 }
287 }
288
289 void otherFrame(Http2Frame frame) throws IOException {
290 switch (frame.type()) {
291 case WindowUpdateFrame.TYPE:
292 incoming_windowUpdate((WindowUpdateFrame) frame);
293 break;
294 case ResetFrame.TYPE:
295 incoming_reset((ResetFrame) frame);
296 break;
297 case PriorityFrame.TYPE:
298 incoming_priority((PriorityFrame) frame);
299 break;
300 default:
301 String msg = "Unexpected frame: " + frame.toString();
302 throw new IOException(msg);
303 }
304 }
305
306 // The Hpack decoder decodes into one of these consumers of name,value pairs
307
308 DecodingCallback rspHeadersConsumer() {
309 return rspHeadersConsumer;
310 }
311
312 // create and return the HttpResponseImpl
313 protected void handleResponse() throws IOException {
314 HttpConnection c = connection.connection; // TODO: improve
315 long statusCode = responseHeaders
316 .firstValueAsLong(":status")
317 .orElseThrow(() -> new IOException("no statuscode in response"));
318
319 this.response = new HttpResponseImpl((int)statusCode, exchange, responseHeaders, null,
320 c.sslParameters(), HttpClient.Version.HTTP_2, c);
321 this.responseContentLen = responseHeaders
322 .firstValueAsLong("content-length")
323 .orElse(-1L);
324 // different implementations for normal streams and pushed streams
325 completeResponse(response);
326 }
327
328 void incoming_reset(ResetFrame frame) {
329 // TODO: implement reset
330 int error = frame.getErrorCode();
331 IOException e = new IOException(ErrorFrame.stringForCode(error));
332 completeResponseExceptionally(e);
333 throw new UnsupportedOperationException("Not implemented");
334 }
335
336 void incoming_priority(PriorityFrame frame) {
337 // TODO: implement priority
338 throw new UnsupportedOperationException("Not implemented");
339 }
340
341 void incoming_windowUpdate(WindowUpdateFrame frame) {
342 int amount = frame.getUpdate();
343 if (amount > 0)
344 remoteRequestFlowController.accept(amount);
345 }
346
347 void incoming_pushPromise(HttpRequestImpl pushReq, PushedStream pushStream) throws IOException {
348 if (Log.requests()) {
349 Log.logRequest("PUSH_PROMISE: " + pushReq.toString());
350 }
351 PushGroup<?> pushGroup = request.pushGroup();
352 if (pushGroup == null) {
353 cancelImpl(new IllegalStateException("unexpected push promise"));
354 }
355 // get the handler and call it.
356 BiFunction<HttpRequest,CompletableFuture<HttpResponse>,Boolean> ph =
357 pushGroup.pushHandler();
358
359 CompletableFuture<HttpResponse> pushCF = pushStream
360 .getResponseAsync(null)
361 .thenApply(r -> (HttpResponse)r);
362 boolean accept = ph.apply(pushReq, pushCF);
363 if (!accept) {
364 IOException ex = new IOException("Stream cancelled by user");
365 cancelImpl(ex);
366 pushCF.completeExceptionally(ex);
367 } else {
368 pushStream.requestSent();
369 pushGroup.addPush();
370 }
371 }
372
373 private OutgoingHeaders headerFrame(long contentLength) {
374 HttpHeadersImpl h = request.getSystemHeaders();
375 if (contentLength > 0) {
376 h.setHeader("content-length", Long.toString(contentLength));
377 }
378 setPseudoHeaderFields();
379 OutgoingHeaders f = new OutgoingHeaders(h, request.getUserHeaders(), this);
380 if (contentLength == 0) {
381 f.setFlag(HeadersFrame.END_STREAM);
382 }
383 return f;
384 }
385
386 private void setPseudoHeaderFields() {
387 HttpHeadersImpl hdrs = requestPseudoHeaders;
388 String method = request.method();
389 hdrs.setHeader(":method", method);
390 URI uri = request.uri();
391 hdrs.setHeader(":scheme", uri.getScheme());
392 // TODO: userinfo deprecated. Needs to be removed
393 hdrs.setHeader(":authority", uri.getAuthority());
394 // TODO: ensure header names beginning with : not in user headers
395 String query = uri.getQuery();
396 String path = uri.getPath();
397 if (path == null) {
398 if (method.equalsIgnoreCase("OPTIONS")) {
399 path = "*";
400 } else {
401 path = "/";
402 }
403 }
404 if (query != null) {
405 path += "?" + query;
406 }
407 hdrs.setHeader(":path", path);
408 }
409
410 HttpHeadersImpl getRequestPseudoHeaders() {
411 return requestPseudoHeaders;
412 }
413
414 @Override
415 HttpResponseImpl getResponse() throws IOException {
416 try {
417 if (request.timeval() > 0) {
418 return getResponseAsync(null).get(
419 request.timeval(), TimeUnit.MILLISECONDS);
420 } else {
421 return getResponseAsync(null).join();
422 }
423 } catch (TimeoutException e) {
424 throw new HttpTimeoutException("Response timed out");
425 } catch (InterruptedException | ExecutionException | CompletionException e) {
426 Throwable t = e.getCause();
427 if (t instanceof IOException) {
428 throw (IOException)t;
429 }
430 throw new IOException(e);
431 }
432 }
433
434 @Override
435 void sendRequest() throws IOException, InterruptedException {
436 sendHeadersOnly();
437 sendBody();
438 }
439
440 /**
441 * A simple general purpose blocking flow controller
442 */
443 class FlowController implements LongConsumer {
444 int permits;
445
446 FlowController() {
447 this.permits = 0;
448 }
449
450 @Override
451 public synchronized void accept(long n) {
452 if (n < 1) {
453 throw new InternalError("FlowController.accept called with " + n);
454 }
455 if (permits == 0) {
456 permits += n;
457 notifyAll();
458 } else {
459 permits += n;
460 }
461 }
462
463 public synchronized void take() throws InterruptedException {
464 take(1);
465 }
466
467 public synchronized void take(int amount) throws InterruptedException {
468 assert permits >= 0;
469 while (permits < amount) {
470 int n = Math.min(amount, permits);
471 permits -= n;
472 amount -= n;
473 if (amount > 0)
474 wait();
475 }
476 }
477 }
478
479 @Override
480 void sendHeadersOnly() throws IOException, InterruptedException {
481 if (Log.requests() && request != null) {
482 Log.logRequest(request.toString());
483 }
484 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
485 OutgoingHeaders f = headerFrame(requestContentLen);
486 connection.sendFrame(f);
487 }
488
489 @Override
490 void sendBody() throws IOException, InterruptedException {
491 sendBodyImpl();
492 }
493
494 void registerStream(int id) {
495 this.streamid = id;
496 connection.putStream(this, streamid);
497 }
498
499 DataFrame getDataFrame() throws IOException, InterruptedException {
500 userRequestFlowController.take();
501 int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
502 ByteBuffer buffer = connection.getBuffer();
503 buffer.limit(maxpayloadLen);
504 boolean complete = requestProcessor.onRequestBodyChunk(buffer);
505 buffer.flip();
506 int amount = buffer.remaining();
507 // wait for flow control if necessary. Following method will block
508 // until after headers frame is sent, so correct streamid is set.
509 remoteRequestFlowController.take(amount);
510 connection.obtainSendWindow(amount);
511
512 DataFrame df = new DataFrame();
513 df.streamid(streamid);
514 if (complete) {
515 df.setFlag(DataFrame.END_STREAM);
516 }
517 df.setData(buffer);
518 df.computeLength();
519 return df;
520 }
521
522
523 @Override
524 CompletableFuture<Void> sendHeadersAsync() {
525 try {
526 sendHeadersOnly();
527 return CompletableFuture.completedFuture(null);
528 } catch (IOException | InterruptedException ex) {
529 return CompletableFuture.failedFuture(ex);
530 }
531 }
532
533 /**
534 * A List of responses relating to this stream. Normally there is only
535 * one response, but intermediate responses like 100 are allowed
536 * and must be passed up to higher level before continuing. Deals with races
537 * such as if responses are returned before the CFs get created by
538 * getResponseAsync()
539 */
540
541 final List<CompletableFuture<HttpResponseImpl>> response_cfs = new ArrayList<>(5);
542
543 @Override
544 CompletableFuture<HttpResponseImpl> getResponseAsync(Void v) {
545 CompletableFuture<HttpResponseImpl> cf;
546 synchronized (response_cfs) {
547 if (!response_cfs.isEmpty()) {
548 cf = response_cfs.remove(0);
549 } else {
550 cf = new CompletableFuture<>();
551 response_cfs.add(cf);
552 }
553 }
554 PushGroup<?> pg = request.pushGroup();
555 if (pg != null) {
556 // if an error occurs make sure it is recorded in the PushGroup
557 cf = cf.whenComplete((t,e) -> pg.pushError(e));
558 }
559 return cf;
560 }
561
562 /**
563 * Completes the first uncompleted CF on list, and removes it. If there is no
564 * uncompleted CF then creates one (completes it) and adds to list
565 */
566 void completeResponse(HttpResponse r) {
567 HttpResponseImpl resp = (HttpResponseImpl)r;
568 synchronized (response_cfs) {
569 int cfs_len = response_cfs.size();
570 for (int i=0; i<cfs_len; i++) {
571 CompletableFuture<HttpResponseImpl> cf = response_cfs.get(i);
572 if (!cf.isDone()) {
573 cf.complete(resp);
574 response_cfs.remove(cf);
575 return;
576 }
577 }
578 response_cfs.add(CompletableFuture.completedFuture(resp));
579 }
580 }
581
582 // methods to update state and remove stream when finished
583
584 synchronized void requestSent() {
585 requestSent = true;
586 if (responseReceived)
587 connection.deleteStream(this);
588 }
589
590 synchronized void responseReceived() {
591 responseReceived = true;
592 if (requestSent)
593 connection.deleteStream(this);
594 PushGroup<?> pg = request.pushGroup();
595 if (pg != null)
596 pg.noMorePushes();
597 }
598
599 /**
600 * same as above but for errors
601 *
602 * @param t
603 */
604 void completeResponseExceptionally(Throwable t) {
605 synchronized (response_cfs) {
606 for (CompletableFuture<HttpResponseImpl> cf : response_cfs) {
607 if (!cf.isDone()) {
608 cf.completeExceptionally(t);
609 response_cfs.remove(cf);
610 return;
611 }
612 }
613 response_cfs.add(CompletableFuture.failedFuture(t));
614 }
615 }
616
617 void sendBodyImpl() throws IOException, InterruptedException {
618 if (requestContentLen == 0) {
619 // no body
620 requestSent();
621 return;
622 }
623 DataFrame df;
624 do {
625 df = getDataFrame();
626 // TODO: check accumulated content length (if not checked below)
627 connection.sendFrame(df);
628 } while (!df.getFlag(DataFrame.END_STREAM));
629 requestSent();
630 }
631
632 @Override
633 void cancel() {
634 cancelImpl(new Exception("Cancelled"));
635 }
636
637
638 void cancelImpl(Throwable e) {
639 Log.logTrace("cancelling stream: {0}\n", e.toString());
640 inputQ.close();
641 completeResponseExceptionally(e);
642 try {
643 connection.resetStream(streamid, ResetFrame.CANCEL);
644 } catch (IOException | InterruptedException ex) {
645 Log.logError(ex);
646 }
647 }
648
649 @Override
650 CompletableFuture<Void> sendRequestAsync() {
651 CompletableFuture<Void> cf = new CompletableFuture<>();
652 executor.execute(() -> {
653 try {
654 sendRequest();
655 cf.complete(null);
656 } catch (IOException |InterruptedException e) {
657 cf.completeExceptionally(e);
658 }
659 }, null);
660 return cf;
661 }
662
663 @Override
664 <T> T responseBody(HttpResponse.BodyProcessor<T> processor) throws IOException {
665 this.responseProcessor = processor;
666 T body = processor.onResponseBodyStart(
667 responseContentLen, responseHeaders,
668 responseFlowController); // TODO: filter headers
669 if (body == null) {
670 receiveData();
671 body = processor.onResponseComplete();
672 } else
673 receiveDataAsync(processor);
674 responseReceived();
675 return body;
676 }
677
678 // called from Http2Connection reader thread
679 synchronized void updateOutgoingWindow(int update) {
680 remoteRequestFlowController.accept(update);
681 }
682
683 void close(String msg) {
684 cancel();
685 }
686
687 static class PushedStream extends Stream {
688 final PushGroup<?> pushGroup;
689 final private Stream parent; // used by server push streams
690 // push streams need the response CF allocated up front as it is
691 // given directly to user via the multi handler callback function.
692 final CompletableFuture<HttpResponseImpl> pushCF;
693 final HttpRequestImpl pushReq;
694
695 PushedStream(PushGroup<?> pushGroup, HttpClientImpl client,
696 Http2Connection connection, Stream parent,
697 HttpRequestImpl pushReq) {
698 super(client, connection, pushReq);
699 this.pushGroup = pushGroup;
700 this.pushReq = pushReq;
701 this.pushCF = new CompletableFuture<>();
702 this.parent = parent;
703 }
704
705 // Following methods call the super class but in case of
706 // error record it in the PushGroup. The error method is called
707 // with a null value when no error occurred (is a no-op)
708 @Override
709 CompletableFuture<Void> sendBodyAsync() {
710 return super.sendBodyAsync()
711 .whenComplete((v, t) -> pushGroup.pushError(t));
712 }
713
714 @Override
715 CompletableFuture<Void> sendHeadersAsync() {
716 return super.sendHeadersAsync()
717 .whenComplete((v, t) -> pushGroup.pushError(t));
718 }
719
720 @Override
721 CompletableFuture<Void> sendRequestAsync() {
722 return super.sendRequestAsync()
723 .whenComplete((v, t) -> pushGroup.pushError(t));
724 }
725
726 @Override
727 CompletableFuture<HttpResponseImpl> getResponseAsync(Void vo) {
728 return pushCF.whenComplete((v, t) -> pushGroup.pushError(t));
729 }
730
731 @Override
732 <T> CompletableFuture<T> responseBodyAsync(HttpResponse.BodyProcessor<T> processor) {
733 return super.responseBodyAsync(processor)
734 .whenComplete((v, t) -> pushGroup.pushError(t));
735 }
736
737 @Override
738 void completeResponse(HttpResponse r) {
739 HttpResponseImpl resp = (HttpResponseImpl)r;
740 Utils.logResponse(resp);
741 pushCF.complete(resp);
742 }
743
744 @Override
745 void completeResponseExceptionally(Throwable t) {
746 pushCF.completeExceptionally(t);
747 }
748
749 @Override
750 synchronized void responseReceived() {
751 super.responseReceived();
752 pushGroup.pushCompleted();
753 }
754
755 // create and return the PushResponseImpl
756 @Override
757 protected void handleResponse() {
758 HttpConnection c = connection.connection; // TODO: improve
759 long statusCode = responseHeaders
760 .firstValueAsLong(":status")
761 .orElse(-1L);
762
763 if (statusCode == -1L)
764 completeResponseExceptionally(new IOException("No status code"));
765 ImmutableHeaders h = new ImmutableHeaders(responseHeaders, Utils.ALL_HEADERS);
766 this.response = new HttpResponseImpl((int)statusCode, pushReq, h, this,
767 c.sslParameters());
768 this.responseContentLen = responseHeaders
769 .firstValueAsLong("content-length")
770 .orElse(-1L);
771 // different implementations for normal streams and pushed streams
772 completeResponse(response);
773 }
774 }
775
776 /**
777 * One PushGroup object is associated with the parent Stream of
778 * the pushed Streams. This keeps track of all common state associated
779 * with the pushes.
780 */
781 static class PushGroup<T> {
782 // the overall completion object, completed when all pushes are done.
783 final CompletableFuture<T> resultCF;
784 Throwable error; // any exception that occured during pushes
785
786 // CF for main response
787 final CompletableFuture<HttpResponse> mainResponse;
788
789 // user's processor object
790 final HttpResponse.MultiProcessor<T> multiProcessor;
791
792 // per push handler function provided by processor
793 final private BiFunction<HttpRequest,
794 CompletableFuture<HttpResponse>,
795 Boolean> pushHandler;
796 int numberOfPushes;
797 int remainingPushes;
798 boolean noMorePushes = false;
799
800 PushGroup(HttpResponse.MultiProcessor<T> multiProcessor, HttpRequestImpl req) {
801 this.resultCF = new CompletableFuture<>();
802 this.mainResponse = new CompletableFuture<>();
803 this.multiProcessor = multiProcessor;
804 this.pushHandler = multiProcessor.onStart(req, mainResponse);
805 }
806
807 CompletableFuture<T> groupResult() {
808 return resultCF;
809 }
810
811 CompletableFuture<HttpResponse> mainResponse() {
812 return mainResponse;
813 }
814
815 private BiFunction<HttpRequest,
816 CompletableFuture<HttpResponse>, Boolean> pushHandler()
817 {
818 return pushHandler;
819 }
820
821 synchronized void addPush() {
822 numberOfPushes++;
823 remainingPushes++;
824 }
825
826 synchronized int numberOfPushes() {
827 return numberOfPushes;
828 }
829 // This is called when the main body response completes because it means
830 // no more PUSH_PROMISEs are possible
831 synchronized void noMorePushes() {
832 noMorePushes = true;
833 checkIfCompleted();
834 }
835
836 synchronized void pushCompleted() {
837 remainingPushes--;
838 checkIfCompleted();
839 }
840
841 synchronized void checkIfCompleted() {
842 if (remainingPushes == 0 && error == null && noMorePushes) {
843 T overallResult = multiProcessor.onComplete();
844 resultCF.complete(overallResult);
845 }
846 }
847
848 synchronized void pushError(Throwable t) {
849 if (t == null)
850 return;
851 this.error = t;
852 resultCF.completeExceptionally(t);
853 }
854 }
855 }