449
450 @Override
451 public synchronized void accept(long n) {
452 if (n < 1) {
453 throw new InternalError("FlowController.accept called with " + n);
454 }
455 if (permits == 0) {
456 permits += n;
457 notifyAll();
458 } else {
459 permits += n;
460 }
461 }
462
463 public synchronized void take() throws InterruptedException {
464 take(1);
465 }
466
467 public synchronized void take(int amount) throws InterruptedException {
468 assert permits >= 0;
469 while (permits < amount) {
470 int n = Math.min(amount, permits);
471 permits -= n;
472 amount -= n;
473 if (amount > 0)
474 wait();
475 }
476 }
477 }
478
479 @Override
480 void sendHeadersOnly() throws IOException, InterruptedException {
481 if (Log.requests() && request != null) {
482 Log.logRequest(request.toString());
483 }
484 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
485 OutgoingHeaders f = headerFrame(requestContentLen);
486 connection.sendFrame(f);
487 }
488
489 @Override
490 void sendBody() throws IOException, InterruptedException {
491 sendBodyImpl();
492 }
493
494 void registerStream(int id) {
495 this.streamid = id;
496 connection.putStream(this, streamid);
497 }
498
499 DataFrame getDataFrame() throws IOException, InterruptedException {
500 userRequestFlowController.take();
501 int maxpayloadLen = connection.getMaxSendFrameSize() - 9;
502 ByteBuffer buffer = connection.getBuffer();
503 buffer.limit(maxpayloadLen);
504 boolean complete = requestProcessor.onRequestBodyChunk(buffer);
505 buffer.flip();
506 int amount = buffer.remaining();
507 // wait for flow control if necessary. Following method will block
508 // until after headers frame is sent, so correct streamid is set.
509 remoteRequestFlowController.take(amount);
510 connection.obtainSendWindow(amount);
511
512 DataFrame df = new DataFrame();
513 df.streamid(streamid);
514 if (complete) {
515 df.setFlag(DataFrame.END_STREAM);
516 }
517 df.setData(buffer);
518 df.computeLength();
519 return df;
520 }
521
|
449
450 @Override
451 public synchronized void accept(long n) {
452 if (n < 1) {
453 throw new InternalError("FlowController.accept called with " + n);
454 }
455 if (permits == 0) {
456 permits += n;
457 notifyAll();
458 } else {
459 permits += n;
460 }
461 }
462
463 public synchronized void take() throws InterruptedException {
464 take(1);
465 }
466
467 public synchronized void take(int amount) throws InterruptedException {
468 assert permits >= 0;
469 do {
470 int n = Math.min(amount, permits);
471 permits -= n;
472 amount -= n;
473 if (amount > 0)
474 wait();
475 } while (permits < amount);
476 }
477 }
478
479 @Override
480 void sendHeadersOnly() throws IOException, InterruptedException {
481 if (Log.requests() && request != null) {
482 Log.logRequest(request.toString());
483 }
484 requestContentLen = requestProcessor.onRequestStart(request, userRequestFlowController);
485 OutgoingHeaders f = headerFrame(requestContentLen);
486 connection.sendFrame(f);
487 }
488
489 @Override
490 void sendBody() throws IOException, InterruptedException {
491 sendBodyImpl();
492 }
493
494 void registerStream(int id) {
495 this.streamid = id;
496 connection.putStream(this, streamid);
497 }
498
499 DataFrame getDataFrame() throws IOException, InterruptedException {
500 userRequestFlowController.take();
501 int maxpayloadLen = connection.getMaxSendFrameSize();
502 ByteBuffer buffer = connection.getBuffer();
503 buffer.limit(maxpayloadLen);
504 boolean complete = requestProcessor.onRequestBodyChunk(buffer);
505 buffer.flip();
506 int amount = buffer.remaining();
507 // wait for flow control if necessary. Following method will block
508 // until after headers frame is sent, so correct streamid is set.
509 remoteRequestFlowController.take(amount);
510 connection.obtainSendWindow(amount);
511
512 DataFrame df = new DataFrame();
513 df.streamid(streamid);
514 if (complete) {
515 df.setFlag(DataFrame.END_STREAM);
516 }
517 df.setData(buffer);
518 df.computeLength();
519 return df;
520 }
521
|