main
Raw Download raw file
  1//go:build windows
  2// +build windows
  3
  4package winio
  5
  6import (
  7	"errors"
  8	"io"
  9	"runtime"
 10	"sync"
 11	"sync/atomic"
 12	"syscall"
 13	"time"
 14
 15	"golang.org/x/sys/windows"
 16)
 17
 18//sys cancelIoEx(file windows.Handle, o *windows.Overlapped) (err error) = CancelIoEx
 19//sys createIoCompletionPort(file windows.Handle, port windows.Handle, key uintptr, threadCount uint32) (newport windows.Handle, err error) = CreateIoCompletionPort
 20//sys getQueuedCompletionStatus(port windows.Handle, bytes *uint32, key *uintptr, o **ioOperation, timeout uint32) (err error) = GetQueuedCompletionStatus
 21//sys setFileCompletionNotificationModes(h windows.Handle, flags uint8) (err error) = SetFileCompletionNotificationModes
 22//sys wsaGetOverlappedResult(h windows.Handle, o *windows.Overlapped, bytes *uint32, wait bool, flags *uint32) (err error) = ws2_32.WSAGetOverlappedResult
 23
 24var (
 25	ErrFileClosed = errors.New("file has already been closed")
 26	ErrTimeout    = &timeoutError{}
 27)
 28
 29type timeoutError struct{}
 30
 31func (*timeoutError) Error() string   { return "i/o timeout" }
 32func (*timeoutError) Timeout() bool   { return true }
 33func (*timeoutError) Temporary() bool { return true }
 34
 35type timeoutChan chan struct{}
 36
 37var ioInitOnce sync.Once
 38var ioCompletionPort windows.Handle
 39
 40// ioResult contains the result of an asynchronous IO operation.
 41type ioResult struct {
 42	bytes uint32
 43	err   error
 44}
 45
 46// ioOperation represents an outstanding asynchronous Win32 IO.
 47type ioOperation struct {
 48	o  windows.Overlapped
 49	ch chan ioResult
 50}
 51
 52func initIO() {
 53	h, err := createIoCompletionPort(windows.InvalidHandle, 0, 0, 0xffffffff)
 54	if err != nil {
 55		panic(err)
 56	}
 57	ioCompletionPort = h
 58	go ioCompletionProcessor(h)
 59}
 60
 61// win32File implements Reader, Writer, and Closer on a Win32 handle without blocking in a syscall.
 62// It takes ownership of this handle and will close it if it is garbage collected.
 63type win32File struct {
 64	handle        windows.Handle
 65	wg            sync.WaitGroup
 66	wgLock        sync.RWMutex
 67	closing       atomic.Bool
 68	socket        bool
 69	readDeadline  deadlineHandler
 70	writeDeadline deadlineHandler
 71}
 72
 73type deadlineHandler struct {
 74	setLock     sync.Mutex
 75	channel     timeoutChan
 76	channelLock sync.RWMutex
 77	timer       *time.Timer
 78	timedout    atomic.Bool
 79}
 80
 81// makeWin32File makes a new win32File from an existing file handle.
 82func makeWin32File(h windows.Handle) (*win32File, error) {
 83	f := &win32File{handle: h}
 84	ioInitOnce.Do(initIO)
 85	_, err := createIoCompletionPort(h, ioCompletionPort, 0, 0xffffffff)
 86	if err != nil {
 87		return nil, err
 88	}
 89	err = setFileCompletionNotificationModes(h, windows.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS|windows.FILE_SKIP_SET_EVENT_ON_HANDLE)
 90	if err != nil {
 91		return nil, err
 92	}
 93	f.readDeadline.channel = make(timeoutChan)
 94	f.writeDeadline.channel = make(timeoutChan)
 95	return f, nil
 96}
 97
 98// Deprecated: use NewOpenFile instead.
 99func MakeOpenFile(h syscall.Handle) (io.ReadWriteCloser, error) {
100	return NewOpenFile(windows.Handle(h))
101}
102
103func NewOpenFile(h windows.Handle) (io.ReadWriteCloser, error) {
104	// If we return the result of makeWin32File directly, it can result in an
105	// interface-wrapped nil, rather than a nil interface value.
106	f, err := makeWin32File(h)
107	if err != nil {
108		return nil, err
109	}
110	return f, nil
111}
112
113// closeHandle closes the resources associated with a Win32 handle.
114func (f *win32File) closeHandle() {
115	f.wgLock.Lock()
116	// Atomically set that we are closing, releasing the resources only once.
117	if !f.closing.Swap(true) {
118		f.wgLock.Unlock()
119		// cancel all IO and wait for it to complete
120		_ = cancelIoEx(f.handle, nil)
121		f.wg.Wait()
122		// at this point, no new IO can start
123		windows.Close(f.handle)
124		f.handle = 0
125	} else {
126		f.wgLock.Unlock()
127	}
128}
129
130// Close closes a win32File.
131func (f *win32File) Close() error {
132	f.closeHandle()
133	return nil
134}
135
136// IsClosed checks if the file has been closed.
137func (f *win32File) IsClosed() bool {
138	return f.closing.Load()
139}
140
141// prepareIO prepares for a new IO operation.
142// The caller must call f.wg.Done() when the IO is finished, prior to Close() returning.
143func (f *win32File) prepareIO() (*ioOperation, error) {
144	f.wgLock.RLock()
145	if f.closing.Load() {
146		f.wgLock.RUnlock()
147		return nil, ErrFileClosed
148	}
149	f.wg.Add(1)
150	f.wgLock.RUnlock()
151	c := &ioOperation{}
152	c.ch = make(chan ioResult)
153	return c, nil
154}
155
156// ioCompletionProcessor processes completed async IOs forever.
157func ioCompletionProcessor(h windows.Handle) {
158	for {
159		var bytes uint32
160		var key uintptr
161		var op *ioOperation
162		err := getQueuedCompletionStatus(h, &bytes, &key, &op, windows.INFINITE)
163		if op == nil {
164			panic(err)
165		}
166		op.ch <- ioResult{bytes, err}
167	}
168}
169
170// todo: helsaawy - create an asyncIO version that takes a context
171
172// asyncIO processes the return value from ReadFile or WriteFile, blocking until
173// the operation has actually completed.
174func (f *win32File) asyncIO(c *ioOperation, d *deadlineHandler, bytes uint32, err error) (int, error) {
175	if err != windows.ERROR_IO_PENDING { //nolint:errorlint // err is Errno
176		return int(bytes), err
177	}
178
179	if f.closing.Load() {
180		_ = cancelIoEx(f.handle, &c.o)
181	}
182
183	var timeout timeoutChan
184	if d != nil {
185		d.channelLock.Lock()
186		timeout = d.channel
187		d.channelLock.Unlock()
188	}
189
190	var r ioResult
191	select {
192	case r = <-c.ch:
193		err = r.err
194		if err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
195			if f.closing.Load() {
196				err = ErrFileClosed
197			}
198		} else if err != nil && f.socket {
199			// err is from Win32. Query the overlapped structure to get the winsock error.
200			var bytes, flags uint32
201			err = wsaGetOverlappedResult(f.handle, &c.o, &bytes, false, &flags)
202		}
203	case <-timeout:
204		_ = cancelIoEx(f.handle, &c.o)
205		r = <-c.ch
206		err = r.err
207		if err == windows.ERROR_OPERATION_ABORTED { //nolint:errorlint // err is Errno
208			err = ErrTimeout
209		}
210	}
211
212	// runtime.KeepAlive is needed, as c is passed via native
213	// code to ioCompletionProcessor, c must remain alive
214	// until the channel read is complete.
215	// todo: (de)allocate *ioOperation via win32 heap functions, instead of needing to KeepAlive?
216	runtime.KeepAlive(c)
217	return int(r.bytes), err
218}
219
220// Read reads from a file handle.
221func (f *win32File) Read(b []byte) (int, error) {
222	c, err := f.prepareIO()
223	if err != nil {
224		return 0, err
225	}
226	defer f.wg.Done()
227
228	if f.readDeadline.timedout.Load() {
229		return 0, ErrTimeout
230	}
231
232	var bytes uint32
233	err = windows.ReadFile(f.handle, b, &bytes, &c.o)
234	n, err := f.asyncIO(c, &f.readDeadline, bytes, err)
235	runtime.KeepAlive(b)
236
237	// Handle EOF conditions.
238	if err == nil && n == 0 && len(b) != 0 {
239		return 0, io.EOF
240	} else if err == windows.ERROR_BROKEN_PIPE { //nolint:errorlint // err is Errno
241		return 0, io.EOF
242	}
243	return n, err
244}
245
246// Write writes to a file handle.
247func (f *win32File) Write(b []byte) (int, error) {
248	c, err := f.prepareIO()
249	if err != nil {
250		return 0, err
251	}
252	defer f.wg.Done()
253
254	if f.writeDeadline.timedout.Load() {
255		return 0, ErrTimeout
256	}
257
258	var bytes uint32
259	err = windows.WriteFile(f.handle, b, &bytes, &c.o)
260	n, err := f.asyncIO(c, &f.writeDeadline, bytes, err)
261	runtime.KeepAlive(b)
262	return n, err
263}
264
265func (f *win32File) SetReadDeadline(deadline time.Time) error {
266	return f.readDeadline.set(deadline)
267}
268
269func (f *win32File) SetWriteDeadline(deadline time.Time) error {
270	return f.writeDeadline.set(deadline)
271}
272
273func (f *win32File) Flush() error {
274	return windows.FlushFileBuffers(f.handle)
275}
276
277func (f *win32File) Fd() uintptr {
278	return uintptr(f.handle)
279}
280
281func (d *deadlineHandler) set(deadline time.Time) error {
282	d.setLock.Lock()
283	defer d.setLock.Unlock()
284
285	if d.timer != nil {
286		if !d.timer.Stop() {
287			<-d.channel
288		}
289		d.timer = nil
290	}
291	d.timedout.Store(false)
292
293	select {
294	case <-d.channel:
295		d.channelLock.Lock()
296		d.channel = make(chan struct{})
297		d.channelLock.Unlock()
298	default:
299	}
300
301	if deadline.IsZero() {
302		return nil
303	}
304
305	timeoutIO := func() {
306		d.timedout.Store(true)
307		close(d.channel)
308	}
309
310	now := time.Now()
311	duration := deadline.Sub(now)
312	if deadline.After(now) {
313		// Deadline is in the future, set a timer to wait
314		d.timer = time.AfterFunc(duration, timeoutIO)
315	} else {
316		// Deadline is in the past. Cancel all pending IO now.
317		timeoutIO()
318	}
319	return nil
320}