main
Raw Download raw file
  1// Copyright 2011 The Go Authors. All rights reserved.
  2// Use of this source code is governed by a BSD-style
  3// license that can be found in the LICENSE file.
  4
  5package ssh
  6
  7import (
  8	"encoding/binary"
  9	"errors"
 10	"fmt"
 11	"io"
 12	"log"
 13	"sync"
 14)
 15
 16const (
 17	minPacketLength = 9
 18	// channelMaxPacket contains the maximum number of bytes that will be
 19	// sent in a single packet. As per RFC 4253, section 6.1, 32k is also
 20	// the minimum.
 21	channelMaxPacket = 1 << 15
 22	// We follow OpenSSH here.
 23	channelWindowSize = 64 * channelMaxPacket
 24)
 25
 26// NewChannel represents an incoming request to a channel. It must either be
 27// accepted for use by calling Accept, or rejected by calling Reject.
 28type NewChannel interface {
 29	// Accept accepts the channel creation request. It returns the Channel
 30	// and a Go channel containing SSH requests. The Go channel must be
 31	// serviced otherwise the Channel will hang.
 32	Accept() (Channel, <-chan *Request, error)
 33
 34	// Reject rejects the channel creation request. After calling
 35	// this, no other methods on the Channel may be called.
 36	Reject(reason RejectionReason, message string) error
 37
 38	// ChannelType returns the type of the channel, as supplied by the
 39	// client.
 40	ChannelType() string
 41
 42	// ExtraData returns the arbitrary payload for this channel, as supplied
 43	// by the client. This data is specific to the channel type.
 44	ExtraData() []byte
 45}
 46
 47// A Channel is an ordered, reliable, flow-controlled, duplex stream
 48// that is multiplexed over an SSH connection.
 49type Channel interface {
 50	// Read reads up to len(data) bytes from the channel.
 51	Read(data []byte) (int, error)
 52
 53	// Write writes len(data) bytes to the channel.
 54	Write(data []byte) (int, error)
 55
 56	// Close signals end of channel use. No data may be sent after this
 57	// call.
 58	Close() error
 59
 60	// CloseWrite signals the end of sending in-band
 61	// data. Requests may still be sent, and the other side may
 62	// still send data
 63	CloseWrite() error
 64
 65	// SendRequest sends a channel request.  If wantReply is true,
 66	// it will wait for a reply and return the result as a
 67	// boolean, otherwise the return value will be false. Channel
 68	// requests are out-of-band messages so they may be sent even
 69	// if the data stream is closed or blocked by flow control.
 70	// If the channel is closed before a reply is returned, io.EOF
 71	// is returned.
 72	SendRequest(name string, wantReply bool, payload []byte) (bool, error)
 73
 74	// Stderr returns an io.ReadWriter that writes to this channel
 75	// with the extended data type set to stderr. Stderr may
 76	// safely be read and written from a different goroutine than
 77	// Read and Write respectively.
 78	Stderr() io.ReadWriter
 79}
 80
 81// Request is a request sent outside of the normal stream of
 82// data. Requests can either be specific to an SSH channel, or they
 83// can be global.
 84type Request struct {
 85	Type      string
 86	WantReply bool
 87	Payload   []byte
 88
 89	ch  *channel
 90	mux *mux
 91}
 92
 93// Reply sends a response to a request. It must be called for all requests
 94// where WantReply is true and is a no-op otherwise. The payload argument is
 95// ignored for replies to channel-specific requests.
 96func (r *Request) Reply(ok bool, payload []byte) error {
 97	if !r.WantReply {
 98		return nil
 99	}
100
101	if r.ch == nil {
102		return r.mux.ackRequest(ok, payload)
103	}
104
105	return r.ch.ackRequest(ok)
106}
107
108// RejectionReason is an enumeration used when rejecting channel creation
109// requests. See RFC 4254, section 5.1.
110type RejectionReason uint32
111
112const (
113	Prohibited RejectionReason = iota + 1
114	ConnectionFailed
115	UnknownChannelType
116	ResourceShortage
117)
118
119// String converts the rejection reason to human readable form.
120func (r RejectionReason) String() string {
121	switch r {
122	case Prohibited:
123		return "administratively prohibited"
124	case ConnectionFailed:
125		return "connect failed"
126	case UnknownChannelType:
127		return "unknown channel type"
128	case ResourceShortage:
129		return "resource shortage"
130	}
131	return fmt.Sprintf("unknown reason %d", int(r))
132}
133
134func min(a uint32, b int) uint32 {
135	if a < uint32(b) {
136		return a
137	}
138	return uint32(b)
139}
140
141type channelDirection uint8
142
143const (
144	channelInbound channelDirection = iota
145	channelOutbound
146)
147
148// channel is an implementation of the Channel interface that works
149// with the mux class.
150type channel struct {
151	// R/O after creation
152	chanType          string
153	extraData         []byte
154	localId, remoteId uint32
155
156	// maxIncomingPayload and maxRemotePayload are the maximum
157	// payload sizes of normal and extended data packets for
158	// receiving and sending, respectively. The wire packet will
159	// be 9 or 13 bytes larger (excluding encryption overhead).
160	maxIncomingPayload uint32
161	maxRemotePayload   uint32
162
163	mux *mux
164
165	// decided is set to true if an accept or reject message has been sent
166	// (for outbound channels) or received (for inbound channels).
167	decided bool
168
169	// direction contains either channelOutbound, for channels created
170	// locally, or channelInbound, for channels created by the peer.
171	direction channelDirection
172
173	// Pending internal channel messages.
174	msg chan interface{}
175
176	// Since requests have no ID, there can be only one request
177	// with WantReply=true outstanding.  This lock is held by a
178	// goroutine that has such an outgoing request pending.
179	sentRequestMu sync.Mutex
180
181	incomingRequests chan *Request
182
183	sentEOF bool
184
185	// thread-safe data
186	remoteWin  window
187	pending    *buffer
188	extPending *buffer
189
190	// windowMu protects myWindow, the flow-control window, and myConsumed,
191	// the number of bytes consumed since we last increased myWindow
192	windowMu   sync.Mutex
193	myWindow   uint32
194	myConsumed uint32
195
196	// writeMu serializes calls to mux.conn.writePacket() and
197	// protects sentClose and packetPool. This mutex must be
198	// different from windowMu, as writePacket can block if there
199	// is a key exchange pending.
200	writeMu   sync.Mutex
201	sentClose bool
202
203	// packetPool has a buffer for each extended channel ID to
204	// save allocations during writes.
205	packetPool map[uint32][]byte
206}
207
208// writePacket sends a packet. If the packet is a channel close, it updates
209// sentClose. This method takes the lock c.writeMu.
210func (ch *channel) writePacket(packet []byte) error {
211	ch.writeMu.Lock()
212	if ch.sentClose {
213		ch.writeMu.Unlock()
214		return io.EOF
215	}
216	ch.sentClose = (packet[0] == msgChannelClose)
217	err := ch.mux.conn.writePacket(packet)
218	ch.writeMu.Unlock()
219	return err
220}
221
222func (ch *channel) sendMessage(msg interface{}) error {
223	if debugMux {
224		log.Printf("send(%d): %#v", ch.mux.chanList.offset, msg)
225	}
226
227	p := Marshal(msg)
228	binary.BigEndian.PutUint32(p[1:], ch.remoteId)
229	return ch.writePacket(p)
230}
231
232// WriteExtended writes data to a specific extended stream. These streams are
233// used, for example, for stderr.
234func (ch *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err error) {
235	if ch.sentEOF {
236		return 0, io.EOF
237	}
238	// 1 byte message type, 4 bytes remoteId, 4 bytes data length
239	opCode := byte(msgChannelData)
240	headerLength := uint32(9)
241	if extendedCode > 0 {
242		headerLength += 4
243		opCode = msgChannelExtendedData
244	}
245
246	ch.writeMu.Lock()
247	packet := ch.packetPool[extendedCode]
248	// We don't remove the buffer from packetPool, so
249	// WriteExtended calls from different goroutines will be
250	// flagged as errors by the race detector.
251	ch.writeMu.Unlock()
252
253	for len(data) > 0 {
254		space := min(ch.maxRemotePayload, len(data))
255		if space, err = ch.remoteWin.reserve(space); err != nil {
256			return n, err
257		}
258		if want := headerLength + space; uint32(cap(packet)) < want {
259			packet = make([]byte, want)
260		} else {
261			packet = packet[:want]
262		}
263
264		todo := data[:space]
265
266		packet[0] = opCode
267		binary.BigEndian.PutUint32(packet[1:], ch.remoteId)
268		if extendedCode > 0 {
269			binary.BigEndian.PutUint32(packet[5:], uint32(extendedCode))
270		}
271		binary.BigEndian.PutUint32(packet[headerLength-4:], uint32(len(todo)))
272		copy(packet[headerLength:], todo)
273		if err = ch.writePacket(packet); err != nil {
274			return n, err
275		}
276
277		n += len(todo)
278		data = data[len(todo):]
279	}
280
281	ch.writeMu.Lock()
282	ch.packetPool[extendedCode] = packet
283	ch.writeMu.Unlock()
284
285	return n, err
286}
287
288func (ch *channel) handleData(packet []byte) error {
289	headerLen := 9
290	isExtendedData := packet[0] == msgChannelExtendedData
291	if isExtendedData {
292		headerLen = 13
293	}
294	if len(packet) < headerLen {
295		// malformed data packet
296		return parseError(packet[0])
297	}
298
299	var extended uint32
300	if isExtendedData {
301		extended = binary.BigEndian.Uint32(packet[5:])
302	}
303
304	length := binary.BigEndian.Uint32(packet[headerLen-4 : headerLen])
305	if length == 0 {
306		return nil
307	}
308	if length > ch.maxIncomingPayload {
309		// TODO(hanwen): should send Disconnect?
310		return errors.New("ssh: incoming packet exceeds maximum payload size")
311	}
312
313	data := packet[headerLen:]
314	if length != uint32(len(data)) {
315		return errors.New("ssh: wrong packet length")
316	}
317
318	ch.windowMu.Lock()
319	if ch.myWindow < length {
320		ch.windowMu.Unlock()
321		// TODO(hanwen): should send Disconnect with reason?
322		return errors.New("ssh: remote side wrote too much")
323	}
324	ch.myWindow -= length
325	ch.windowMu.Unlock()
326
327	if extended == 1 {
328		ch.extPending.write(data)
329	} else if extended > 0 {
330		// discard other extended data.
331	} else {
332		ch.pending.write(data)
333	}
334	return nil
335}
336
337func (c *channel) adjustWindow(adj uint32) error {
338	c.windowMu.Lock()
339	// Since myConsumed and myWindow are managed on our side, and can never
340	// exceed the initial window setting, we don't worry about overflow.
341	c.myConsumed += adj
342	var sendAdj uint32
343	if (channelWindowSize-c.myWindow > 3*c.maxIncomingPayload) ||
344		(c.myWindow < channelWindowSize/2) {
345		sendAdj = c.myConsumed
346		c.myConsumed = 0
347		c.myWindow += sendAdj
348	}
349	c.windowMu.Unlock()
350	if sendAdj == 0 {
351		return nil
352	}
353	return c.sendMessage(windowAdjustMsg{
354		AdditionalBytes: sendAdj,
355	})
356}
357
358func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
359	switch extended {
360	case 1:
361		n, err = c.extPending.Read(data)
362	case 0:
363		n, err = c.pending.Read(data)
364	default:
365		return 0, fmt.Errorf("ssh: extended code %d unimplemented", extended)
366	}
367
368	if n > 0 {
369		err = c.adjustWindow(uint32(n))
370		// sendWindowAdjust can return io.EOF if the remote
371		// peer has closed the connection, however we want to
372		// defer forwarding io.EOF to the caller of Read until
373		// the buffer has been drained.
374		if n > 0 && err == io.EOF {
375			err = nil
376		}
377	}
378
379	return n, err
380}
381
382func (c *channel) close() {
383	c.pending.eof()
384	c.extPending.eof()
385	close(c.msg)
386	close(c.incomingRequests)
387	c.writeMu.Lock()
388	// This is not necessary for a normal channel teardown, but if
389	// there was another error, it is.
390	c.sentClose = true
391	c.writeMu.Unlock()
392	// Unblock writers.
393	c.remoteWin.close()
394}
395
396// responseMessageReceived is called when a success or failure message is
397// received on a channel to check that such a message is reasonable for the
398// given channel.
399func (ch *channel) responseMessageReceived() error {
400	if ch.direction == channelInbound {
401		return errors.New("ssh: channel response message received on inbound channel")
402	}
403	if ch.decided {
404		return errors.New("ssh: duplicate response received for channel")
405	}
406	ch.decided = true
407	return nil
408}
409
410func (ch *channel) handlePacket(packet []byte) error {
411	switch packet[0] {
412	case msgChannelData, msgChannelExtendedData:
413		return ch.handleData(packet)
414	case msgChannelClose:
415		ch.sendMessage(channelCloseMsg{PeersID: ch.remoteId})
416		ch.mux.chanList.remove(ch.localId)
417		ch.close()
418		return nil
419	case msgChannelEOF:
420		// RFC 4254 is mute on how EOF affects dataExt messages but
421		// it is logical to signal EOF at the same time.
422		ch.extPending.eof()
423		ch.pending.eof()
424		return nil
425	}
426
427	decoded, err := decode(packet)
428	if err != nil {
429		return err
430	}
431
432	switch msg := decoded.(type) {
433	case *channelOpenFailureMsg:
434		if err := ch.responseMessageReceived(); err != nil {
435			return err
436		}
437		ch.mux.chanList.remove(msg.PeersID)
438		ch.msg <- msg
439	case *channelOpenConfirmMsg:
440		if err := ch.responseMessageReceived(); err != nil {
441			return err
442		}
443		if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1<<31 {
444			return fmt.Errorf("ssh: invalid MaxPacketSize %d from peer", msg.MaxPacketSize)
445		}
446		ch.remoteId = msg.MyID
447		ch.maxRemotePayload = msg.MaxPacketSize
448		ch.remoteWin.add(msg.MyWindow)
449		ch.msg <- msg
450	case *windowAdjustMsg:
451		if !ch.remoteWin.add(msg.AdditionalBytes) {
452			return fmt.Errorf("ssh: invalid window update for %d bytes", msg.AdditionalBytes)
453		}
454	case *channelRequestMsg:
455		req := Request{
456			Type:      msg.Request,
457			WantReply: msg.WantReply,
458			Payload:   msg.RequestSpecificData,
459			ch:        ch,
460		}
461
462		ch.incomingRequests <- &req
463	default:
464		ch.msg <- msg
465	}
466	return nil
467}
468
469func (m *mux) newChannel(chanType string, direction channelDirection, extraData []byte) *channel {
470	ch := &channel{
471		remoteWin:        window{Cond: newCond()},
472		myWindow:         channelWindowSize,
473		pending:          newBuffer(),
474		extPending:       newBuffer(),
475		direction:        direction,
476		incomingRequests: make(chan *Request, chanSize),
477		msg:              make(chan interface{}, chanSize),
478		chanType:         chanType,
479		extraData:        extraData,
480		mux:              m,
481		packetPool:       make(map[uint32][]byte),
482	}
483	ch.localId = m.chanList.add(ch)
484	return ch
485}
486
487var errUndecided = errors.New("ssh: must Accept or Reject channel")
488var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
489
490type extChannel struct {
491	code uint32
492	ch   *channel
493}
494
495func (e *extChannel) Write(data []byte) (n int, err error) {
496	return e.ch.WriteExtended(data, e.code)
497}
498
499func (e *extChannel) Read(data []byte) (n int, err error) {
500	return e.ch.ReadExtended(data, e.code)
501}
502
503func (ch *channel) Accept() (Channel, <-chan *Request, error) {
504	if ch.decided {
505		return nil, nil, errDecidedAlready
506	}
507	ch.maxIncomingPayload = channelMaxPacket
508	confirm := channelOpenConfirmMsg{
509		PeersID:       ch.remoteId,
510		MyID:          ch.localId,
511		MyWindow:      ch.myWindow,
512		MaxPacketSize: ch.maxIncomingPayload,
513	}
514	ch.decided = true
515	if err := ch.sendMessage(confirm); err != nil {
516		return nil, nil, err
517	}
518
519	return ch, ch.incomingRequests, nil
520}
521
522func (ch *channel) Reject(reason RejectionReason, message string) error {
523	if ch.decided {
524		return errDecidedAlready
525	}
526	reject := channelOpenFailureMsg{
527		PeersID:  ch.remoteId,
528		Reason:   reason,
529		Message:  message,
530		Language: "en",
531	}
532	ch.decided = true
533	return ch.sendMessage(reject)
534}
535
536func (ch *channel) Read(data []byte) (int, error) {
537	if !ch.decided {
538		return 0, errUndecided
539	}
540	return ch.ReadExtended(data, 0)
541}
542
543func (ch *channel) Write(data []byte) (int, error) {
544	if !ch.decided {
545		return 0, errUndecided
546	}
547	return ch.WriteExtended(data, 0)
548}
549
550func (ch *channel) CloseWrite() error {
551	if !ch.decided {
552		return errUndecided
553	}
554	ch.sentEOF = true
555	return ch.sendMessage(channelEOFMsg{
556		PeersID: ch.remoteId})
557}
558
559func (ch *channel) Close() error {
560	if !ch.decided {
561		return errUndecided
562	}
563
564	return ch.sendMessage(channelCloseMsg{
565		PeersID: ch.remoteId})
566}
567
568// Extended returns an io.ReadWriter that sends and receives data on the given,
569// SSH extended stream. Such streams are used, for example, for stderr.
570func (ch *channel) Extended(code uint32) io.ReadWriter {
571	if !ch.decided {
572		return nil
573	}
574	return &extChannel{code, ch}
575}
576
577func (ch *channel) Stderr() io.ReadWriter {
578	return ch.Extended(1)
579}
580
581func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (bool, error) {
582	if !ch.decided {
583		return false, errUndecided
584	}
585
586	if wantReply {
587		ch.sentRequestMu.Lock()
588		defer ch.sentRequestMu.Unlock()
589	}
590
591	msg := channelRequestMsg{
592		PeersID:             ch.remoteId,
593		Request:             name,
594		WantReply:           wantReply,
595		RequestSpecificData: payload,
596	}
597
598	if err := ch.sendMessage(msg); err != nil {
599		return false, err
600	}
601
602	if wantReply {
603		m, ok := (<-ch.msg)
604		if !ok {
605			return false, io.EOF
606		}
607		switch m.(type) {
608		case *channelRequestFailureMsg:
609			return false, nil
610		case *channelRequestSuccessMsg:
611			return true, nil
612		default:
613			return false, fmt.Errorf("ssh: unexpected response to channel request: %#v", m)
614		}
615	}
616
617	return false, nil
618}
619
620// ackRequest either sends an ack or nack to the channel request.
621func (ch *channel) ackRequest(ok bool) error {
622	if !ch.decided {
623		return errUndecided
624	}
625
626	var msg interface{}
627	if !ok {
628		msg = channelRequestFailureMsg{
629			PeersID: ch.remoteId,
630		}
631	} else {
632		msg = channelRequestSuccessMsg{
633			PeersID: ch.remoteId,
634		}
635	}
636	return ch.sendMessage(msg)
637}
638
639func (ch *channel) ChannelType() string {
640	return ch.chanType
641}
642
643func (ch *channel) ExtraData() []byte {
644	return ch.extraData
645}