1 /*
2 * Copyright (c) 2008, 2011, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.nio.channels.*;
29 import java.nio.ByteBuffer;
30 import java.nio.BufferOverflowException;
31 import java.net.*;
32 import java.util.concurrent.*;
33 import java.io.IOException;
34 import sun.misc.Unsafe;
35
36 /**
37 * Windows implementation of AsynchronousSocketChannel using overlapped I/O.
38 */
39
40 class WindowsAsynchronousSocketChannelImpl
41 extends AsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
42 {
43 private static final Unsafe unsafe = Unsafe.getUnsafe();
44 private static int addressSize = unsafe.addressSize();
45
46 private static int dependsArch(int value32, int value64) {
47 return (addressSize == 4) ? value32 : value64;
48 }
49
50 /*
51 * typedef struct _WSABUF {
52 * u_long len;
53 * char FAR * buf;
54 * } WSABUF;
55 */
56 private static final int SIZEOF_WSABUF = dependsArch(8, 16);
57 private static final int OFFSETOF_LEN = 0;
58 private static final int OFFSETOF_BUF = dependsArch(4, 8);
59
60 // maximum vector size for scatter/gather I/O
61 private static final int MAX_WSABUF = 16;
62
63 private static final int SIZEOF_WSABUFARRAY = MAX_WSABUF * SIZEOF_WSABUF;
64
65
66 // socket handle. Use begin()/end() around each usage of this handle.
67 final long handle;
68
69 // I/O completion port that the socket is associated with
70 private final Iocp iocp;
71
72 // completion key to identify channel when I/O completes
73 private final int completionKey;
74
75 // Pending I/O operations are tied to an OVERLAPPED structure that can only
76 // be released when the I/O completion event is posted to the completion
77 // port. Where I/O operations complete immediately then it is possible
78 // there may be more than two OVERLAPPED structures in use.
79 private final PendingIoCache ioCache;
80
81 // per-channel arrays of WSABUF structures
82 private final long readBufferArray;
83 private final long writeBufferArray;
84
85
86 WindowsAsynchronousSocketChannelImpl(Iocp iocp, boolean failIfGroupShutdown)
87 throws IOException
88 {
89 super(iocp);
90
91 // associate socket with default completion port
92 long h = IOUtil.fdVal(fd);
93 int key = 0;
94 try {
95 key = iocp.associate(this, h);
96 } catch (ShutdownChannelGroupException x) {
97 if (failIfGroupShutdown) {
98 closesocket0(h);
99 throw x;
100 }
101 } catch (IOException x) {
102 closesocket0(h);
103 throw x;
104 }
105
106 this.handle = h;
107 this.iocp = iocp;
108 this.completionKey = key;
109 this.ioCache = new PendingIoCache();
110
111 // allocate WSABUF arrays
112 this.readBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
113 this.writeBufferArray = unsafe.allocateMemory(SIZEOF_WSABUFARRAY);
114 }
115
116 WindowsAsynchronousSocketChannelImpl(Iocp iocp) throws IOException {
117 this(iocp, true);
118 }
119
120 @Override
121 public AsynchronousChannelGroupImpl group() {
122 return iocp;
123 }
124
125 /**
126 * Invoked by Iocp when an I/O operation competes.
127 */
128 @Override
129 public <V,A> PendingFuture<V,A> getByOverlapped(long overlapped) {
130 return ioCache.remove(overlapped);
131 }
132
133 // invoked by WindowsAsynchronousServerSocketChannelImpl
134 long handle() {
135 return handle;
136 }
137
138 // invoked by WindowsAsynchronousServerSocketChannelImpl when new connection
139 // accept
140 void setConnected(SocketAddress localAddress, SocketAddress remoteAddress) {
141 synchronized (stateLock) {
142 state = ST_CONNECTED;
143 this.localAddress = localAddress;
144 this.remoteAddress = remoteAddress;
145 }
146 }
147
148 @Override
149 void implClose() throws IOException {
150 // close socket (may cause outstanding async I/O operations to fail).
151 closesocket0(handle);
152
153 // waits until all I/O operations have completed
154 ioCache.close();
155
156 // release arrays of WSABUF structures
157 unsafe.freeMemory(readBufferArray);
158 unsafe.freeMemory(writeBufferArray);
159
160 // finally disassociate from the completion port (key can be 0 if
161 // channel created when group is shutdown)
162 if (completionKey != 0)
163 iocp.disassociate(completionKey);
164 }
165
166 @Override
167 public void onCancel(PendingFuture<?,?> task) {
168 if (task.getContext() instanceof ConnectTask)
169 killConnect();
170 if (task.getContext() instanceof ReadTask)
171 killReading();
172 if (task.getContext() instanceof WriteTask)
173 killWriting();
174 }
175
176 /**
177 * Implements the task to initiate a connection and the handler to
178 * consume the result when the connection is established (or fails).
179 */
180 private class ConnectTask<A> implements Runnable, Iocp.ResultHandler {
181 private final InetSocketAddress remote;
182 private final PendingFuture<Void,A> result;
183
184 ConnectTask(InetSocketAddress remote, PendingFuture<Void,A> result) {
185 this.remote = remote;
186 this.result = result;
187 }
188
189 private void closeChannel() {
190 try {
191 close();
192 } catch (IOException ignore) { }
193 }
194
195 private IOException toIOException(Throwable x) {
196 if (x instanceof IOException) {
197 if (x instanceof ClosedChannelException)
198 x = new AsynchronousCloseException();
199 return (IOException)x;
200 }
201 return new IOException(x);
202 }
203
204 /**
205 * Invoke after a connection is successfully established.
206 */
207 private void afterConnect() throws IOException {
208 updateConnectContext(handle);
209 synchronized (stateLock) {
210 state = ST_CONNECTED;
211 remoteAddress = remote;
212 }
213 }
214
215 /**
216 * Task to initiate a connection.
217 */
218 @Override
219 public void run() {
220 long overlapped = 0L;
221 Throwable exc = null;
222 try {
223 begin();
224
225 // synchronize on result to allow this thread handle the case
226 // where the connection is established immediately.
227 synchronized (result) {
228 overlapped = ioCache.add(result);
229 // initiate the connection
230 int n = connect0(handle, Net.isIPv6Available(), remote.getAddress(),
231 remote.getPort(), overlapped);
232 if (n == IOStatus.UNAVAILABLE) {
233 // connection is pending
234 return;
235 }
236
237 // connection established immediately
238 afterConnect();
239 result.setResult(null);
240 }
241 } catch (Throwable x) {
242 if (overlapped != 0L)
243 ioCache.remove(overlapped);
244 exc = x;
245 } finally {
246 end();
247 }
248
249 if (exc != null) {
250 closeChannel();
251 result.setFailure(toIOException(exc));
252 }
253 Invoker.invoke(result);
254 }
255
256 /**
257 * Invoked by handler thread when connection established.
258 */
259 @Override
260 public void completed(int bytesTransferred, boolean canInvokeDirect) {
261 Throwable exc = null;
262 try {
263 begin();
264 afterConnect();
265 result.setResult(null);
266 } catch (Throwable x) {
267 // channel is closed or unable to finish connect
268 exc = x;
269 } finally {
270 end();
271 }
272
273 // can't close channel while in begin/end block
274 if (exc != null) {
275 closeChannel();
276 result.setFailure(toIOException(exc));
277 }
278
279 if (canInvokeDirect) {
280 Invoker.invokeUnchecked(result);
281 } else {
282 Invoker.invoke(result);
283 }
284 }
285
286 /**
287 * Invoked by handler thread when failed to establish connection.
288 */
289 @Override
290 public void failed(int error, IOException x) {
291 if (isOpen()) {
292 closeChannel();
293 result.setFailure(x);
294 } else {
295 result.setFailure(new AsynchronousCloseException());
296 }
297 Invoker.invoke(result);
298 }
299 }
300
301 @Override
302 <A> Future<Void> implConnect(SocketAddress remote,
303 A attachment,
304 CompletionHandler<Void,? super A> handler)
305 {
306 if (!isOpen()) {
307 Throwable exc = new ClosedChannelException();
308 if (handler == null)
309 return CompletedFuture.withFailure(exc);
310 Invoker.invoke(this, handler, attachment, null, exc);
311 return null;
312 }
313
314 InetSocketAddress isa = Net.checkAddress(remote);
315
316 // permission check
317 SecurityManager sm = System.getSecurityManager();
318 if (sm != null)
319 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
320
321 // check and update state
322 // ConnectEx requires the socket to be bound to a local address
323 IOException bindException = null;
324 synchronized (stateLock) {
325 if (state == ST_CONNECTED)
326 throw new AlreadyConnectedException();
327 if (state == ST_PENDING)
328 throw new ConnectionPendingException();
329 if (localAddress == null) {
330 try {
331 bind(new InetSocketAddress(0));
332 } catch (IOException x) {
333 bindException = x;
334 }
335 }
336 if (bindException == null)
337 state = ST_PENDING;
338 }
339
340 // handle bind failure
341 if (bindException != null) {
342 try {
343 close();
344 } catch (IOException ignore) { }
345 if (handler == null)
346 return CompletedFuture.withFailure(bindException);
347 Invoker.invoke(this, handler, attachment, null, bindException);
348 return null;
349 }
350
351 // setup task
352 PendingFuture<Void,A> result =
353 new PendingFuture<Void,A>(this, handler, attachment);
354 ConnectTask task = new ConnectTask<A>(isa, result);
355 result.setContext(task);
356
357 // initiate I/O
358 if (Iocp.supportsThreadAgnosticIo()) {
359 task.run();
360 } else {
361 Invoker.invokeOnThreadInThreadPool(this, task);
362 }
363 return result;
364 }
365
366 /**
367 * Implements the task to initiate a read and the handler to consume the
368 * result when the read completes.
369 */
370 private class ReadTask<V,A> implements Runnable, Iocp.ResultHandler {
371 private final ByteBuffer[] bufs;
372 private final int numBufs;
373 private final boolean scatteringRead;
374 private final PendingFuture<V,A> result;
375
376 // set by run method
377 private ByteBuffer[] shadow;
378
379 ReadTask(ByteBuffer[] bufs,
380 boolean scatteringRead,
381 PendingFuture<V,A> result)
382 {
383 this.bufs = bufs;
384 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
385 this.scatteringRead = scatteringRead;
386 this.result = result;
387 }
388
389 /**
390 * Invoked prior to read to prepare the WSABUF array. Where necessary,
391 * it substitutes non-direct buffers with direct buffers.
392 */
393 void prepareBuffers() {
394 shadow = new ByteBuffer[numBufs];
395 long address = readBufferArray;
396 for (int i=0; i<numBufs; i++) {
397 ByteBuffer dst = bufs[i];
398 int pos = dst.position();
399 int lim = dst.limit();
400 assert (pos <= lim);
401 int rem = (pos <= lim ? lim - pos : 0);
402 long a;
403 if (!(dst instanceof DirectBuffer)) {
404 // substitute with direct buffer
405 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
406 shadow[i] = bb;
407 a = ((DirectBuffer)bb).address();
408 } else {
409 shadow[i] = dst;
410 a = ((DirectBuffer)dst).address() + pos;
411 }
412 unsafe.putAddress(address + OFFSETOF_BUF, a);
413 unsafe.putInt(address + OFFSETOF_LEN, rem);
414 address += SIZEOF_WSABUF;
415 }
416 }
417
418 /**
419 * Invoked after a read has completed to update the buffer positions
420 * and release any substituted buffers.
421 */
422 void updateBuffers(int bytesRead) {
423 for (int i=0; i<numBufs; i++) {
424 ByteBuffer nextBuffer = shadow[i];
425 int pos = nextBuffer.position();
426 int len = nextBuffer.remaining();
427 if (bytesRead >= len) {
428 bytesRead -= len;
429 int newPosition = pos + len;
430 try {
431 nextBuffer.position(newPosition);
432 } catch (IllegalArgumentException x) {
433 // position changed by another
434 }
435 } else { // Buffers not completely filled
436 if (bytesRead > 0) {
437 assert(pos + bytesRead < (long)Integer.MAX_VALUE);
438 int newPosition = pos + bytesRead;
439 try {
440 nextBuffer.position(newPosition);
441 } catch (IllegalArgumentException x) {
442 // position changed by another
443 }
444 }
445 break;
446 }
447 }
448
449 // Put results from shadow into the slow buffers
450 for (int i=0; i<numBufs; i++) {
451 if (!(bufs[i] instanceof DirectBuffer)) {
452 shadow[i].flip();
453 try {
454 bufs[i].put(shadow[i]);
455 } catch (BufferOverflowException x) {
456 // position changed by another
457 }
458 }
459 }
460 }
461
462 void releaseBuffers() {
463 for (int i=0; i<numBufs; i++) {
464 if (!(bufs[i] instanceof DirectBuffer)) {
465 Util.releaseTemporaryDirectBuffer(shadow[i]);
466 }
467 }
468 }
469
470 @Override
471 @SuppressWarnings("unchecked")
472 public void run() {
473 long overlapped = 0L;
474 boolean prepared = false;
475 boolean pending = false;
476
477 try {
478 begin();
479
480 // substitute non-direct buffers
481 prepareBuffers();
482 prepared = true;
483
484 // get an OVERLAPPED structure (from the cache or allocate)
485 overlapped = ioCache.add(result);
486
487 // initiate read
488 int n = read0(handle, numBufs, readBufferArray, overlapped);
489 if (n == IOStatus.UNAVAILABLE) {
490 // I/O is pending
491 pending = true;
492 return;
493 }
494 if (n == IOStatus.EOF) {
495 // input shutdown
496 enableReading();
497 if (scatteringRead) {
498 result.setResult((V)Long.valueOf(-1L));
499 } else {
500 result.setResult((V)Integer.valueOf(-1));
501 }
502 } else {
503 throw new InternalError("Read completed immediately");
504 }
505 } catch (Throwable x) {
506 // failed to initiate read
507 // reset read flag before releasing waiters
508 enableReading();
509 if (x instanceof ClosedChannelException)
510 x = new AsynchronousCloseException();
511 if (!(x instanceof IOException))
512 x = new IOException(x);
513 result.setFailure(x);
514 } finally {
515 // release resources if I/O not pending
516 if (!pending) {
517 if (overlapped != 0L)
518 ioCache.remove(overlapped);
519 if (prepared)
520 releaseBuffers();
521 }
522 end();
523 }
524
525 // invoke completion handler
526 Invoker.invoke(result);
527 }
528
529 /**
530 * Executed when the I/O has completed
531 */
532 @Override
533 @SuppressWarnings("unchecked")
534 public void completed(int bytesTransferred, boolean canInvokeDirect) {
535 if (bytesTransferred == 0) {
536 bytesTransferred = -1; // EOF
537 } else {
538 updateBuffers(bytesTransferred);
539 }
540
541 // return direct buffer to cache if substituted
542 releaseBuffers();
543
544 // release waiters if not already released by timeout
545 synchronized (result) {
546 if (result.isDone())
547 return;
548 enableReading();
549 if (scatteringRead) {
550 result.setResult((V)Long.valueOf(bytesTransferred));
551 } else {
552 result.setResult((V)Integer.valueOf(bytesTransferred));
553 }
554 }
555 if (canInvokeDirect) {
556 Invoker.invokeUnchecked(result);
557 } else {
558 Invoker.invoke(result);
559 }
560 }
561
562 @Override
563 public void failed(int error, IOException x) {
564 // return direct buffer to cache if substituted
565 releaseBuffers();
566
567 // release waiters if not already released by timeout
568 if (!isOpen())
569 x = new AsynchronousCloseException();
570
571 synchronized (result) {
572 if (result.isDone())
573 return;
574 enableReading();
575 result.setFailure(x);
576 }
577 Invoker.invoke(result);
578 }
579
580 /**
581 * Invoked if timeout expires before it is cancelled
582 */
583 void timeout() {
584 // synchronize on result as the I/O could complete/fail
585 synchronized (result) {
586 if (result.isDone())
587 return;
588
589 // kill further reading before releasing waiters
590 enableReading(true);
591 result.setFailure(new InterruptedByTimeoutException());
592 }
593
594 // invoke handler without any locks
595 Invoker.invoke(result);
596 }
597 }
598
599 @Override
600 <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
601 ByteBuffer dst,
602 ByteBuffer[] dsts,
603 long timeout,
604 TimeUnit unit,
605 A attachment,
606 CompletionHandler<V,? super A> handler)
607 {
608 // setup task
609 PendingFuture<V,A> result =
610 new PendingFuture<V,A>(this, handler, attachment);
611 ByteBuffer[] bufs;
612 if (isScatteringRead) {
613 bufs = dsts;
614 } else {
615 bufs = new ByteBuffer[1];
616 bufs[0] = dst;
617 }
618 final ReadTask readTask = new ReadTask<V,A>(bufs, isScatteringRead, result);
619 result.setContext(readTask);
620
621 // schedule timeout
622 if (timeout > 0L) {
623 Future<?> timeoutTask = iocp.schedule(new Runnable() {
624 public void run() {
625 readTask.timeout();
626 }
627 }, timeout, unit);
628 result.setTimeoutTask(timeoutTask);
629 }
630
631 // initiate I/O
632 if (Iocp.supportsThreadAgnosticIo()) {
633 readTask.run();
634 } else {
635 Invoker.invokeOnThreadInThreadPool(this, readTask);
636 }
637 return result;
638 }
639
640 /**
641 * Implements the task to initiate a write and the handler to consume the
642 * result when the write completes.
643 */
644 private class WriteTask<V,A> implements Runnable, Iocp.ResultHandler {
645 private final ByteBuffer[] bufs;
646 private final int numBufs;
647 private final boolean gatheringWrite;
648 private final PendingFuture<V,A> result;
649
650 // set by run method
651 private ByteBuffer[] shadow;
652
653 WriteTask(ByteBuffer[] bufs,
654 boolean gatheringWrite,
655 PendingFuture<V,A> result)
656 {
657 this.bufs = bufs;
658 this.numBufs = (bufs.length > MAX_WSABUF) ? MAX_WSABUF : bufs.length;
659 this.gatheringWrite = gatheringWrite;
660 this.result = result;
661 }
662
663 /**
664 * Invoked prior to write to prepare the WSABUF array. Where necessary,
665 * it substitutes non-direct buffers with direct buffers.
666 */
667 void prepareBuffers() {
668 shadow = new ByteBuffer[numBufs];
669 long address = writeBufferArray;
670 for (int i=0; i<numBufs; i++) {
671 ByteBuffer src = bufs[i];
672 int pos = src.position();
673 int lim = src.limit();
674 assert (pos <= lim);
675 int rem = (pos <= lim ? lim - pos : 0);
676 long a;
677 if (!(src instanceof DirectBuffer)) {
678 // substitute with direct buffer
679 ByteBuffer bb = Util.getTemporaryDirectBuffer(rem);
680 bb.put(src);
681 bb.flip();
682 src.position(pos); // leave heap buffer untouched for now
683 shadow[i] = bb;
684 a = ((DirectBuffer)bb).address();
685 } else {
686 shadow[i] = src;
687 a = ((DirectBuffer)src).address() + pos;
688 }
689 unsafe.putAddress(address + OFFSETOF_BUF, a);
690 unsafe.putInt(address + OFFSETOF_LEN, rem);
691 address += SIZEOF_WSABUF;
692 }
693 }
694
695 /**
696 * Invoked after a write has completed to update the buffer positions
697 * and release any substituted buffers.
698 */
699 void updateBuffers(int bytesWritten) {
700 // Notify the buffers how many bytes were taken
701 for (int i=0; i<numBufs; i++) {
702 ByteBuffer nextBuffer = bufs[i];
703 int pos = nextBuffer.position();
704 int lim = nextBuffer.limit();
705 int len = (pos <= lim ? lim - pos : lim);
706 if (bytesWritten >= len) {
707 bytesWritten -= len;
708 int newPosition = pos + len;
709 try {
710 nextBuffer.position(newPosition);
711 } catch (IllegalArgumentException x) {
712 // position changed by someone else
713 }
714 } else { // Buffers not completely filled
715 if (bytesWritten > 0) {
716 assert(pos + bytesWritten < (long)Integer.MAX_VALUE);
717 int newPosition = pos + bytesWritten;
718 try {
719 nextBuffer.position(newPosition);
720 } catch (IllegalArgumentException x) {
721 // position changed by someone else
722 }
723 }
724 break;
725 }
726 }
727 }
728
729 void releaseBuffers() {
730 for (int i=0; i<numBufs; i++) {
731 if (!(bufs[i] instanceof DirectBuffer)) {
732 Util.releaseTemporaryDirectBuffer(shadow[i]);
733 }
734 }
735 }
736
737 @Override
738 //@SuppressWarnings("unchecked")
739 public void run() {
740 long overlapped = 0L;
741 boolean prepared = false;
742 boolean pending = false;
743 boolean shutdown = false;
744
745 try {
746 begin();
747
748 // substitute non-direct buffers
749 prepareBuffers();
750 prepared = true;
751
752 // get an OVERLAPPED structure (from the cache or allocate)
753 overlapped = ioCache.add(result);
754 int n = write0(handle, numBufs, writeBufferArray, overlapped);
755 if (n == IOStatus.UNAVAILABLE) {
756 // I/O is pending
757 pending = true;
758 return;
759 }
760 if (n == IOStatus.EOF) {
761 // special case for shutdown output
762 shutdown = true;
763 throw new ClosedChannelException();
764 }
765 // write completed immediately
766 throw new InternalError("Write completed immediately");
767 } catch (Throwable x) {
768 // write failed. Enable writing before releasing waiters.
769 enableWriting();
770 if (!shutdown && (x instanceof ClosedChannelException))
771 x = new AsynchronousCloseException();
772 if (!(x instanceof IOException))
773 x = new IOException(x);
774 result.setFailure(x);
775 } finally {
776 // release resources if I/O not pending
777 if (!pending) {
778 if (overlapped != 0L)
779 ioCache.remove(overlapped);
780 if (prepared)
781 releaseBuffers();
782 }
783 end();
784 }
785
786 // invoke completion handler
787 Invoker.invoke(result);
788 }
789
790 /**
791 * Executed when the I/O has completed
792 */
793 @Override
794 @SuppressWarnings("unchecked")
795 public void completed(int bytesTransferred, boolean canInvokeDirect) {
796 updateBuffers(bytesTransferred);
797
798 // return direct buffer to cache if substituted
799 releaseBuffers();
800
801 // release waiters if not already released by timeout
802 synchronized (result) {
803 if (result.isDone())
804 return;
805 enableWriting();
806 if (gatheringWrite) {
807 result.setResult((V)Long.valueOf(bytesTransferred));
808 } else {
809 result.setResult((V)Integer.valueOf(bytesTransferred));
810 }
811 }
812 if (canInvokeDirect) {
813 Invoker.invokeUnchecked(result);
814 } else {
815 Invoker.invoke(result);
816 }
817 }
818
819 @Override
820 public void failed(int error, IOException x) {
821 // return direct buffer to cache if substituted
822 releaseBuffers();
823
824 // release waiters if not already released by timeout
825 if (!isOpen())
826 x = new AsynchronousCloseException();
827
828 synchronized (result) {
829 if (result.isDone())
830 return;
831 enableWriting();
832 result.setFailure(x);
833 }
834 Invoker.invoke(result);
835 }
836
837 /**
838 * Invoked if timeout expires before it is cancelled
839 */
840 void timeout() {
841 // synchronize on result as the I/O could complete/fail
842 synchronized (result) {
843 if (result.isDone())
844 return;
845
846 // kill further writing before releasing waiters
847 enableWriting(true);
848 result.setFailure(new InterruptedByTimeoutException());
849 }
850
851 // invoke handler without any locks
852 Invoker.invoke(result);
853 }
854 }
855
856 @Override
857 <V extends Number,A> Future<V> implWrite(boolean gatheringWrite,
858 ByteBuffer src,
859 ByteBuffer[] srcs,
860 long timeout,
861 TimeUnit unit,
862 A attachment,
863 CompletionHandler<V,? super A> handler)
864 {
865 // setup task
866 PendingFuture<V,A> result =
867 new PendingFuture<V,A>(this, handler, attachment);
868 ByteBuffer[] bufs;
869 if (gatheringWrite) {
870 bufs = srcs;
871 } else {
872 bufs = new ByteBuffer[1];
873 bufs[0] = src;
874 }
875 final WriteTask writeTask = new WriteTask<V,A>(bufs, gatheringWrite, result);
876 result.setContext(writeTask);
877
878 // schedule timeout
879 if (timeout > 0L) {
880 Future<?> timeoutTask = iocp.schedule(new Runnable() {
881 public void run() {
882 writeTask.timeout();
883 }
884 }, timeout, unit);
885 result.setTimeoutTask(timeoutTask);
886 }
887
888 // initiate I/O (can only be done from thread in thread pool)
889 // initiate I/O
890 if (Iocp.supportsThreadAgnosticIo()) {
891 writeTask.run();
892 } else {
893 Invoker.invokeOnThreadInThreadPool(this, writeTask);
894 }
895 return result;
896 }
897
898 // -- Native methods --
899
900 private static native void initIDs();
901
902 private static native int connect0(long socket, boolean preferIPv6,
903 InetAddress remote, int remotePort, long overlapped) throws IOException;
904
905 private static native void updateConnectContext(long socket) throws IOException;
906
907 private static native int read0(long socket, int count, long addres, long overlapped)
908 throws IOException;
909
910 private static native int write0(long socket, int count, long address,
911 long overlapped) throws IOException;
912
913 private static native void shutdown0(long socket, int how) throws IOException;
914
915 private static native void closesocket0(long socket) throws IOException;
916
917 static {
918 Util.load();
919 initIDs();
920 }
921 }