Package paramiko :: Module channel
[frames] | no frames]

Source Code for Module paramiko.channel

   1  # Copyright (C) 2003-2007  Robey Pointer <robeypointer@gmail.com> 
   2  # 
   3  # This file is part of paramiko. 
   4  # 
   5  # Paramiko is free software; you can redistribute it and/or modify it under the 
   6  # terms of the GNU Lesser General Public License as published by the Free 
   7  # Software Foundation; either version 2.1 of the License, or (at your option) 
   8  # any later version. 
   9  # 
  10  # Paramiko is distrubuted in the hope that it will be useful, but WITHOUT ANY 
  11  # WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 
  12  # A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more 
  13  # details. 
  14  # 
  15  # You should have received a copy of the GNU Lesser General Public License 
  16  # along with Paramiko; if not, write to the Free Software Foundation, Inc., 
  17  # 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. 
  18   
  19  """ 
  20  Abstraction for an SSH2 channel. 
  21  """ 
  22   
  23  import binascii 
  24  import sys 
  25  import time 
  26  import threading 
  27  import socket 
  28  import os 
  29   
  30  from paramiko.common import * 
  31  from paramiko import util 
  32  from paramiko.message import Message 
  33  from paramiko.ssh_exception import SSHException 
  34  from paramiko.file import BufferedFile 
  35  from paramiko.buffered_pipe import BufferedPipe, PipeTimeout 
  36  from paramiko import pipe 
  37   
  38   
  39  # lower bound on the max packet size we'll accept from the remote host 
  40  MIN_PACKET_SIZE = 1024 
  41   
  42   
43 -class Channel (object):
44 """ 45 A secure tunnel across an SSH L{Transport}. A Channel is meant to behave 46 like a socket, and has an API that should be indistinguishable from the 47 python socket API. 48 49 Because SSH2 has a windowing kind of flow control, if you stop reading data 50 from a Channel and its buffer fills up, the server will be unable to send 51 you any more data until you read some of it. (This won't affect other 52 channels on the same transport -- all channels on a single transport are 53 flow-controlled independently.) Similarly, if the server isn't reading 54 data you send, calls to L{send} may block, unless you set a timeout. This 55 is exactly like a normal network socket, so it shouldn't be too surprising. 56 """ 57
58 - def __init__(self, chanid):
59 """ 60 Create a new channel. The channel is not associated with any 61 particular session or L{Transport} until the Transport attaches it. 62 Normally you would only call this method from the constructor of a 63 subclass of L{Channel}. 64 65 @param chanid: the ID of this channel, as passed by an existing 66 L{Transport}. 67 @type chanid: int 68 """ 69 self.chanid = chanid 70 self.remote_chanid = 0 71 self.transport = None 72 self.active = False 73 self.eof_received = 0 74 self.eof_sent = 0 75 self.in_buffer = BufferedPipe() 76 self.in_stderr_buffer = BufferedPipe() 77 self.timeout = None 78 self.closed = False 79 self.ultra_debug = False 80 self.lock = threading.Lock() 81 self.out_buffer_cv = threading.Condition(self.lock) 82 self.in_window_size = 0 83 self.out_window_size = 0 84 self.in_max_packet_size = 0 85 self.out_max_packet_size = 0 86 self.in_window_threshold = 0 87 self.in_window_sofar = 0 88 self.status_event = threading.Event() 89 self._name = str(chanid) 90 self.logger = util.get_logger('paramiko.transport') 91 self._pipe = None 92 self.event = threading.Event() 93 self.combine_stderr = False 94 self.exit_status = -1 95 self.origin_addr = None
96
97 - def __del__(self):
98 try: 99 self.close() 100 except: 101 pass
102
103 - def __repr__(self):
104 """ 105 Return a string representation of this object, for debugging. 106 107 @rtype: str 108 """ 109 out = '<paramiko.Channel %d' % self.chanid 110 if self.closed: 111 out += ' (closed)' 112 elif self.active: 113 if self.eof_received: 114 out += ' (EOF received)' 115 if self.eof_sent: 116 out += ' (EOF sent)' 117 out += ' (open) window=%d' % (self.out_window_size) 118 if len(self.in_buffer) > 0: 119 out += ' in-buffer=%d' % (len(self.in_buffer),) 120 out += ' -> ' + repr(self.transport) 121 out += '>' 122 return out
123
124 - def get_pty(self, term='vt100', width=80, height=24):
125 """ 126 Request a pseudo-terminal from the server. This is usually used right 127 after creating a client channel, to ask the server to provide some 128 basic terminal semantics for a shell invoked with L{invoke_shell}. 129 It isn't necessary (or desirable) to call this method if you're going 130 to exectue a single command with L{exec_command}. 131 132 @param term: the terminal type to emulate (for example, C{'vt100'}) 133 @type term: str 134 @param width: width (in characters) of the terminal screen 135 @type width: int 136 @param height: height (in characters) of the terminal screen 137 @type height: int 138 139 @raise SSHException: if the request was rejected or the channel was 140 closed 141 """ 142 if self.closed or self.eof_received or self.eof_sent or not self.active: 143 raise SSHException('Channel is not open') 144 m = Message() 145 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 146 m.add_int(self.remote_chanid) 147 m.add_string('pty-req') 148 m.add_boolean(True) 149 m.add_string(term) 150 m.add_int(width) 151 m.add_int(height) 152 # pixel height, width (usually useless) 153 m.add_int(0).add_int(0) 154 m.add_string('') 155 self.event.clear() 156 self.transport._send_user_message(m) 157 self._wait_for_event()
158
159 - def invoke_shell(self):
160 """ 161 Request an interactive shell session on this channel. If the server 162 allows it, the channel will then be directly connected to the stdin, 163 stdout, and stderr of the shell. 164 165 Normally you would call L{get_pty} before this, in which case the 166 shell will operate through the pty, and the channel will be connected 167 to the stdin and stdout of the pty. 168 169 When the shell exits, the channel will be closed and can't be reused. 170 You must open a new channel if you wish to open another shell. 171 172 @raise SSHException: if the request was rejected or the channel was 173 closed 174 """ 175 if self.closed or self.eof_received or self.eof_sent or not self.active: 176 raise SSHException('Channel is not open') 177 m = Message() 178 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 179 m.add_int(self.remote_chanid) 180 m.add_string('shell') 181 m.add_boolean(1) 182 self.event.clear() 183 self.transport._send_user_message(m) 184 self._wait_for_event()
185
186 - def exec_command(self, command):
187 """ 188 Execute a command on the server. If the server allows it, the channel 189 will then be directly connected to the stdin, stdout, and stderr of 190 the command being executed. 191 192 When the command finishes executing, the channel will be closed and 193 can't be reused. You must open a new channel if you wish to execute 194 another command. 195 196 @param command: a shell command to execute. 197 @type command: str 198 199 @raise SSHException: if the request was rejected or the channel was 200 closed 201 """ 202 if self.closed or self.eof_received or self.eof_sent or not self.active: 203 raise SSHException('Channel is not open') 204 m = Message() 205 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 206 m.add_int(self.remote_chanid) 207 m.add_string('exec') 208 m.add_boolean(True) 209 m.add_string(command) 210 self.event.clear() 211 self.transport._send_user_message(m) 212 self._wait_for_event()
213
214 - def invoke_subsystem(self, subsystem):
215 """ 216 Request a subsystem on the server (for example, C{sftp}). If the 217 server allows it, the channel will then be directly connected to the 218 requested subsystem. 219 220 When the subsystem finishes, the channel will be closed and can't be 221 reused. 222 223 @param subsystem: name of the subsystem being requested. 224 @type subsystem: str 225 226 @raise SSHException: if the request was rejected or the channel was 227 closed 228 """ 229 if self.closed or self.eof_received or self.eof_sent or not self.active: 230 raise SSHException('Channel is not open') 231 m = Message() 232 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 233 m.add_int(self.remote_chanid) 234 m.add_string('subsystem') 235 m.add_boolean(True) 236 m.add_string(subsystem) 237 self.event.clear() 238 self.transport._send_user_message(m) 239 self._wait_for_event()
240
241 - def resize_pty(self, width=80, height=24):
242 """ 243 Resize the pseudo-terminal. This can be used to change the width and 244 height of the terminal emulation created in a previous L{get_pty} call. 245 246 @param width: new width (in characters) of the terminal screen 247 @type width: int 248 @param height: new height (in characters) of the terminal screen 249 @type height: int 250 251 @raise SSHException: if the request was rejected or the channel was 252 closed 253 """ 254 if self.closed or self.eof_received or self.eof_sent or not self.active: 255 raise SSHException('Channel is not open') 256 m = Message() 257 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 258 m.add_int(self.remote_chanid) 259 m.add_string('window-change') 260 m.add_boolean(True) 261 m.add_int(width) 262 m.add_int(height) 263 m.add_int(0).add_int(0) 264 self.event.clear() 265 self.transport._send_user_message(m) 266 self._wait_for_event()
267
268 - def exit_status_ready(self):
269 """ 270 Return true if the remote process has exited and returned an exit 271 status. You may use this to poll the process status if you don't 272 want to block in L{recv_exit_status}. Note that the server may not 273 return an exit status in some cases (like bad servers). 274 275 @return: True if L{recv_exit_status} will return immediately 276 @rtype: bool 277 @since: 1.7.3 278 """ 279 return self.closed or self.status_event.isSet()
280
281 - def recv_exit_status(self):
282 """ 283 Return the exit status from the process on the server. This is 284 mostly useful for retrieving the reults of an L{exec_command}. 285 If the command hasn't finished yet, this method will wait until 286 it does, or until the channel is closed. If no exit status is 287 provided by the server, -1 is returned. 288 289 @return: the exit code of the process on the server. 290 @rtype: int 291 292 @since: 1.2 293 """ 294 self.status_event.wait() 295 assert self.status_event.isSet() 296 return self.exit_status
297
298 - def send_exit_status(self, status):
299 """ 300 Send the exit status of an executed command to the client. (This 301 really only makes sense in server mode.) Many clients expect to 302 get some sort of status code back from an executed command after 303 it completes. 304 305 @param status: the exit code of the process 306 @type status: int 307 308 @since: 1.2 309 """ 310 # in many cases, the channel will not still be open here. 311 # that's fine. 312 m = Message() 313 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 314 m.add_int(self.remote_chanid) 315 m.add_string('exit-status') 316 m.add_boolean(False) 317 m.add_int(status) 318 self.transport._send_user_message(m)
319
320 - def request_x11(self, screen_number=0, auth_protocol=None, auth_cookie=None, 321 single_connection=False, handler=None):
322 """ 323 Request an x11 session on this channel. If the server allows it, 324 further x11 requests can be made from the server to the client, 325 when an x11 application is run in a shell session. 326 327 From RFC4254:: 328 329 It is RECOMMENDED that the 'x11 authentication cookie' that is 330 sent be a fake, random cookie, and that the cookie be checked and 331 replaced by the real cookie when a connection request is received. 332 333 If you omit the auth_cookie, a new secure random 128-bit value will be 334 generated, used, and returned. You will need to use this value to 335 verify incoming x11 requests and replace them with the actual local 336 x11 cookie (which requires some knoweldge of the x11 protocol). 337 338 If a handler is passed in, the handler is called from another thread 339 whenever a new x11 connection arrives. The default handler queues up 340 incoming x11 connections, which may be retrieved using 341 L{Transport.accept}. The handler's calling signature is:: 342 343 handler(channel: Channel, (address: str, port: int)) 344 345 @param screen_number: the x11 screen number (0, 10, etc) 346 @type screen_number: int 347 @param auth_protocol: the name of the X11 authentication method used; 348 if none is given, C{"MIT-MAGIC-COOKIE-1"} is used 349 @type auth_protocol: str 350 @param auth_cookie: hexadecimal string containing the x11 auth cookie; 351 if none is given, a secure random 128-bit value is generated 352 @type auth_cookie: str 353 @param single_connection: if True, only a single x11 connection will be 354 forwarded (by default, any number of x11 connections can arrive 355 over this session) 356 @type single_connection: bool 357 @param handler: an optional handler to use for incoming X11 connections 358 @type handler: function 359 @return: the auth_cookie used 360 """ 361 if self.closed or self.eof_received or self.eof_sent or not self.active: 362 raise SSHException('Channel is not open') 363 if auth_protocol is None: 364 auth_protocol = 'MIT-MAGIC-COOKIE-1' 365 if auth_cookie is None: 366 auth_cookie = binascii.hexlify(self.transport.randpool.get_bytes(16)) 367 368 m = Message() 369 m.add_byte(chr(MSG_CHANNEL_REQUEST)) 370 m.add_int(self.remote_chanid) 371 m.add_string('x11-req') 372 m.add_boolean(True) 373 m.add_boolean(single_connection) 374 m.add_string(auth_protocol) 375 m.add_string(auth_cookie) 376 m.add_int(screen_number) 377 self.event.clear() 378 self.transport._send_user_message(m) 379 self._wait_for_event() 380 self.transport._set_x11_handler(handler) 381 return auth_cookie
382
383 - def get_transport(self):
384 """ 385 Return the L{Transport} associated with this channel. 386 387 @return: the L{Transport} that was used to create this channel. 388 @rtype: L{Transport} 389 """ 390 return self.transport
391
392 - def set_name(self, name):
393 """ 394 Set a name for this channel. Currently it's only used to set the name 395 of the channel in logfile entries. The name can be fetched with the 396 L{get_name} method. 397 398 @param name: new channel name 399 @type name: str 400 """ 401 self._name = name
402
403 - def get_name(self):
404 """ 405 Get the name of this channel that was previously set by L{set_name}. 406 407 @return: the name of this channel. 408 @rtype: str 409 """ 410 return self._name
411
412 - def get_id(self):
413 """ 414 Return the ID # for this channel. The channel ID is unique across 415 a L{Transport} and usually a small number. It's also the number 416 passed to L{ServerInterface.check_channel_request} when determining 417 whether to accept a channel request in server mode. 418 419 @return: the ID of this channel. 420 @rtype: int 421 """ 422 return self.chanid
423
424 - def set_combine_stderr(self, combine):
425 """ 426 Set whether stderr should be combined into stdout on this channel. 427 The default is C{False}, but in some cases it may be convenient to 428 have both streams combined. 429 430 If this is C{False}, and L{exec_command} is called (or C{invoke_shell} 431 with no pty), output to stderr will not show up through the L{recv} 432 and L{recv_ready} calls. You will have to use L{recv_stderr} and 433 L{recv_stderr_ready} to get stderr output. 434 435 If this is C{True}, data will never show up via L{recv_stderr} or 436 L{recv_stderr_ready}. 437 438 @param combine: C{True} if stderr output should be combined into 439 stdout on this channel. 440 @type combine: bool 441 @return: previous setting. 442 @rtype: bool 443 444 @since: 1.1 445 """ 446 data = '' 447 self.lock.acquire() 448 try: 449 old = self.combine_stderr 450 self.combine_stderr = combine 451 if combine and not old: 452 # copy old stderr buffer into primary buffer 453 data = self.in_stderr_buffer.empty() 454 finally: 455 self.lock.release() 456 if len(data) > 0: 457 self._feed(data) 458 return old
459 460 461 ### socket API 462 463
464 - def settimeout(self, timeout):
465 """ 466 Set a timeout on blocking read/write operations. The C{timeout} 467 argument can be a nonnegative float expressing seconds, or C{None}. If 468 a float is given, subsequent channel read/write operations will raise 469 a timeout exception if the timeout period value has elapsed before the 470 operation has completed. Setting a timeout of C{None} disables 471 timeouts on socket operations. 472 473 C{chan.settimeout(0.0)} is equivalent to C{chan.setblocking(0)}; 474 C{chan.settimeout(None)} is equivalent to C{chan.setblocking(1)}. 475 476 @param timeout: seconds to wait for a pending read/write operation 477 before raising C{socket.timeout}, or C{None} for no timeout. 478 @type timeout: float 479 """ 480 self.timeout = timeout
481
482 - def gettimeout(self):
483 """ 484 Returns the timeout in seconds (as a float) associated with socket 485 operations, or C{None} if no timeout is set. This reflects the last 486 call to L{setblocking} or L{settimeout}. 487 488 @return: timeout in seconds, or C{None}. 489 @rtype: float 490 """ 491 return self.timeout
492
493 - def setblocking(self, blocking):
494 """ 495 Set blocking or non-blocking mode of the channel: if C{blocking} is 0, 496 the channel is set to non-blocking mode; otherwise it's set to blocking 497 mode. Initially all channels are in blocking mode. 498 499 In non-blocking mode, if a L{recv} call doesn't find any data, or if a 500 L{send} call can't immediately dispose of the data, an error exception 501 is raised. In blocking mode, the calls block until they can proceed. An 502 EOF condition is considered "immediate data" for L{recv}, so if the 503 channel is closed in the read direction, it will never block. 504 505 C{chan.setblocking(0)} is equivalent to C{chan.settimeout(0)}; 506 C{chan.setblocking(1)} is equivalent to C{chan.settimeout(None)}. 507 508 @param blocking: 0 to set non-blocking mode; non-0 to set blocking 509 mode. 510 @type blocking: int 511 """ 512 if blocking: 513 self.settimeout(None) 514 else: 515 self.settimeout(0.0)
516
517 - def getpeername(self):
518 """ 519 Return the address of the remote side of this Channel, if possible. 520 This is just a wrapper around C{'getpeername'} on the Transport, used 521 to provide enough of a socket-like interface to allow asyncore to work. 522 (asyncore likes to call C{'getpeername'}.) 523 524 @return: the address if the remote host, if known 525 @rtype: tuple(str, int) 526 """ 527 return self.transport.getpeername()
528
529 - def close(self):
530 """ 531 Close the channel. All future read/write operations on the channel 532 will fail. The remote end will receive no more data (after queued data 533 is flushed). Channels are automatically closed when their L{Transport} 534 is closed or when they are garbage collected. 535 """ 536 self.lock.acquire() 537 try: 538 # only close the pipe when the user explicitly closes the channel. 539 # otherwise they will get unpleasant surprises. (and do it before 540 # checking self.closed, since the remote host may have already 541 # closed the connection.) 542 if self._pipe is not None: 543 self._pipe.close() 544 self._pipe = None 545 546 if not self.active or self.closed: 547 return 548 msgs = self._close_internal() 549 finally: 550 self.lock.release() 551 for m in msgs: 552 if m is not None: 553 self.transport._send_user_message(m)
554
555 - def recv_ready(self):
556 """ 557 Returns true if data is buffered and ready to be read from this 558 channel. A C{False} result does not mean that the channel has closed; 559 it means you may need to wait before more data arrives. 560 561 @return: C{True} if a L{recv} call on this channel would immediately 562 return at least one byte; C{False} otherwise. 563 @rtype: boolean 564 """ 565 return self.in_buffer.read_ready()
566
567 - def recv(self, nbytes):
568 """ 569 Receive data from the channel. The return value is a string 570 representing the data received. The maximum amount of data to be 571 received at once is specified by C{nbytes}. If a string of length zero 572 is returned, the channel stream has closed. 573 574 @param nbytes: maximum number of bytes to read. 575 @type nbytes: int 576 @return: data. 577 @rtype: str 578 579 @raise socket.timeout: if no data is ready before the timeout set by 580 L{settimeout}. 581 """ 582 try: 583 out = self.in_buffer.read(nbytes, self.timeout) 584 except PipeTimeout, e: 585 raise socket.timeout() 586 587 ack = self._check_add_window(len(out)) 588 # no need to hold the channel lock when sending this 589 if ack > 0: 590 m = Message() 591 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 592 m.add_int(self.remote_chanid) 593 m.add_int(ack) 594 self.transport._send_user_message(m) 595 596 return out
597
598 - def recv_stderr_ready(self):
599 """ 600 Returns true if data is buffered and ready to be read from this 601 channel's stderr stream. Only channels using L{exec_command} or 602 L{invoke_shell} without a pty will ever have data on the stderr 603 stream. 604 605 @return: C{True} if a L{recv_stderr} call on this channel would 606 immediately return at least one byte; C{False} otherwise. 607 @rtype: boolean 608 609 @since: 1.1 610 """ 611 return self.in_stderr_buffer.read_ready()
612
613 - def recv_stderr(self, nbytes):
614 """ 615 Receive data from the channel's stderr stream. Only channels using 616 L{exec_command} or L{invoke_shell} without a pty will ever have data 617 on the stderr stream. The return value is a string representing the 618 data received. The maximum amount of data to be received at once is 619 specified by C{nbytes}. If a string of length zero is returned, the 620 channel stream has closed. 621 622 @param nbytes: maximum number of bytes to read. 623 @type nbytes: int 624 @return: data. 625 @rtype: str 626 627 @raise socket.timeout: if no data is ready before the timeout set by 628 L{settimeout}. 629 630 @since: 1.1 631 """ 632 try: 633 out = self.in_stderr_buffer.read(nbytes, self.timeout) 634 except PipeTimeout, e: 635 raise socket.timeout() 636 637 ack = self._check_add_window(len(out)) 638 # no need to hold the channel lock when sending this 639 if ack > 0: 640 m = Message() 641 m.add_byte(chr(MSG_CHANNEL_WINDOW_ADJUST)) 642 m.add_int(self.remote_chanid) 643 m.add_int(ack) 644 self.transport._send_user_message(m) 645 646 return out
647
648 - def send_ready(self):
649 """ 650 Returns true if data can be written to this channel without blocking. 651 This means the channel is either closed (so any write attempt would 652 return immediately) or there is at least one byte of space in the 653 outbound buffer. If there is at least one byte of space in the 654 outbound buffer, a L{send} call will succeed immediately and return 655 the number of bytes actually written. 656 657 @return: C{True} if a L{send} call on this channel would immediately 658 succeed or fail 659 @rtype: boolean 660 """ 661 self.lock.acquire() 662 try: 663 if self.closed or self.eof_sent: 664 return True 665 return self.out_window_size > 0 666 finally: 667 self.lock.release()
668
669 - def send(self, s):
670 """ 671 Send data to the channel. Returns the number of bytes sent, or 0 if 672 the channel stream is closed. Applications are responsible for 673 checking that all data has been sent: if only some of the data was 674 transmitted, the application needs to attempt delivery of the remaining 675 data. 676 677 @param s: data to send 678 @type s: str 679 @return: number of bytes actually sent 680 @rtype: int 681 682 @raise socket.timeout: if no data could be sent before the timeout set 683 by L{settimeout}. 684 """ 685 size = len(s) 686 self.lock.acquire() 687 try: 688 size = self._wait_for_send_window(size) 689 if size == 0: 690 # eof or similar 691 return 0 692 m = Message() 693 m.add_byte(chr(MSG_CHANNEL_DATA)) 694 m.add_int(self.remote_chanid) 695 m.add_string(s[:size]) 696 finally: 697 self.lock.release() 698 # Note: We release self.lock before calling _send_user_message. 699 # Otherwise, we can deadlock during re-keying. 700 self.transport._send_user_message(m) 701 return size
702
703 - def send_stderr(self, s):
704 """ 705 Send data to the channel on the "stderr" stream. This is normally 706 only used by servers to send output from shell commands -- clients 707 won't use this. Returns the number of bytes sent, or 0 if the channel 708 stream is closed. Applications are responsible for checking that all 709 data has been sent: if only some of the data was transmitted, the 710 application needs to attempt delivery of the remaining data. 711 712 @param s: data to send. 713 @type s: str 714 @return: number of bytes actually sent. 715 @rtype: int 716 717 @raise socket.timeout: if no data could be sent before the timeout set 718 by L{settimeout}. 719 720 @since: 1.1 721 """ 722 size = len(s) 723 self.lock.acquire() 724 try: 725 size = self._wait_for_send_window(size) 726 if size == 0: 727 # eof or similar 728 return 0 729 m = Message() 730 m.add_byte(chr(MSG_CHANNEL_EXTENDED_DATA)) 731 m.add_int(self.remote_chanid) 732 m.add_int(1) 733 m.add_string(s[:size]) 734 finally: 735 self.lock.release() 736 # Note: We release self.lock before calling _send_user_message. 737 # Otherwise, we can deadlock during re-keying. 738 self.transport._send_user_message(m) 739 return size
740
741 - def sendall(self, s):
742 """ 743 Send data to the channel, without allowing partial results. Unlike 744 L{send}, this method continues to send data from the given string until 745 either all data has been sent or an error occurs. Nothing is returned. 746 747 @param s: data to send. 748 @type s: str 749 750 @raise socket.timeout: if sending stalled for longer than the timeout 751 set by L{settimeout}. 752 @raise socket.error: if an error occured before the entire string was 753 sent. 754 755 @note: If the channel is closed while only part of the data hase been 756 sent, there is no way to determine how much data (if any) was sent. 757 This is irritating, but identically follows python's API. 758 """ 759 while s: 760 if self.closed: 761 # this doesn't seem useful, but it is the documented behavior of Socket 762 raise socket.error('Socket is closed') 763 sent = self.send(s) 764 s = s[sent:] 765 return None
766
767 - def sendall_stderr(self, s):
768 """ 769 Send data to the channel's "stderr" stream, without allowing partial 770 results. Unlike L{send_stderr}, this method continues to send data 771 from the given string until all data has been sent or an error occurs. 772 Nothing is returned. 773 774 @param s: data to send to the client as "stderr" output. 775 @type s: str 776 777 @raise socket.timeout: if sending stalled for longer than the timeout 778 set by L{settimeout}. 779 @raise socket.error: if an error occured before the entire string was 780 sent. 781 782 @since: 1.1 783 """ 784 while s: 785 if self.closed: 786 raise socket.error('Socket is closed') 787 sent = self.send_stderr(s) 788 s = s[sent:] 789 return None
790
791 - def makefile(self, *params):
792 """ 793 Return a file-like object associated with this channel. The optional 794 C{mode} and C{bufsize} arguments are interpreted the same way as by 795 the built-in C{file()} function in python. 796 797 @return: object which can be used for python file I/O. 798 @rtype: L{ChannelFile} 799 """ 800 return ChannelFile(*([self] + list(params)))
801
802 - def makefile_stderr(self, *params):
803 """ 804 Return a file-like object associated with this channel's stderr 805 stream. Only channels using L{exec_command} or L{invoke_shell} 806 without a pty will ever have data on the stderr stream. 807 808 The optional C{mode} and C{bufsize} arguments are interpreted the 809 same way as by the built-in C{file()} function in python. For a 810 client, it only makes sense to open this file for reading. For a 811 server, it only makes sense to open this file for writing. 812 813 @return: object which can be used for python file I/O. 814 @rtype: L{ChannelFile} 815 816 @since: 1.1 817 """ 818 return ChannelStderrFile(*([self] + list(params)))
819
820 - def fileno(self):
821 """ 822 Returns an OS-level file descriptor which can be used for polling, but 823 but I{not} for reading or writing. This is primaily to allow python's 824 C{select} module to work. 825 826 The first time C{fileno} is called on a channel, a pipe is created to 827 simulate real OS-level file descriptor (FD) behavior. Because of this, 828 two OS-level FDs are created, which will use up FDs faster than normal. 829 (You won't notice this effect unless you have hundreds of channels 830 open at the same time.) 831 832 @return: an OS-level file descriptor 833 @rtype: int 834 835 @warning: This method causes channel reads to be slightly less 836 efficient. 837 """ 838 self.lock.acquire() 839 try: 840 if self._pipe is not None: 841 return self._pipe.fileno() 842 # create the pipe and feed in any existing data 843 self._pipe = pipe.make_pipe() 844 p1, p2 = pipe.make_or_pipe(self._pipe) 845 self.in_buffer.set_event(p1) 846 self.in_stderr_buffer.set_event(p2) 847 return self._pipe.fileno() 848 finally: 849 self.lock.release()
850
851 - def shutdown(self, how):
852 """ 853 Shut down one or both halves of the connection. If C{how} is 0, 854 further receives are disallowed. If C{how} is 1, further sends 855 are disallowed. If C{how} is 2, further sends and receives are 856 disallowed. This closes the stream in one or both directions. 857 858 @param how: 0 (stop receiving), 1 (stop sending), or 2 (stop 859 receiving and sending). 860 @type how: int 861 """ 862 if (how == 0) or (how == 2): 863 # feign "read" shutdown 864 self.eof_received = 1 865 if (how == 1) or (how == 2): 866 self.lock.acquire() 867 try: 868 m = self._send_eof() 869 finally: 870 self.lock.release() 871 if m is not None: 872 self.transport._send_user_message(m)
873
874 - def shutdown_read(self):
875 """ 876 Shutdown the receiving side of this socket, closing the stream in 877 the incoming direction. After this call, future reads on this 878 channel will fail instantly. This is a convenience method, equivalent 879 to C{shutdown(0)}, for people who don't make it a habit to 880 memorize unix constants from the 1970s. 881 882 @since: 1.2 883 """ 884 self.shutdown(0)
885
886 - def shutdown_write(self):
887 """ 888 Shutdown the sending side of this socket, closing the stream in 889 the outgoing direction. After this call, future writes on this 890 channel will fail instantly. This is a convenience method, equivalent 891 to C{shutdown(1)}, for people who don't make it a habit to 892 memorize unix constants from the 1970s. 893 894 @since: 1.2 895 """ 896 self.shutdown(1)
897 898 899 ### calls from Transport 900 901
902 - def _set_transport(self, transport):
905
906 - def _set_window(self, window_size, max_packet_size):
907 self.in_window_size = window_size 908 self.in_max_packet_size = max_packet_size 909 # threshold of bytes we receive before we bother to send a window update 910 self.in_window_threshold = window_size // 10 911 self.in_window_sofar = 0 912 self._log(DEBUG, 'Max packet in: %d bytes' % max_packet_size)
913
914 - def _set_remote_channel(self, chanid, window_size, max_packet_size):
915 self.remote_chanid = chanid 916 self.out_window_size = window_size 917 self.out_max_packet_size = max(max_packet_size, MIN_PACKET_SIZE) 918 self.active = 1 919 self._log(DEBUG, 'Max packet out: %d bytes' % max_packet_size)
920
921 - def _request_success(self, m):
922 self._log(DEBUG, 'Sesch channel %d request ok' % self.chanid) 923 self.event.set() 924 return
925
926 - def _request_failed(self, m):
927 self.lock.acquire() 928 try: 929 msgs = self._close_internal() 930 finally: 931 self.lock.release() 932 for m in msgs: 933 if m is not None: 934 self.transport._send_user_message(m)
935
936 - def _feed(self, m):
937 if type(m) is str: 938 # passed from _feed_extended 939 s = m 940 else: 941 s = m.get_string() 942 self.in_buffer.feed(s)
943
944 - def _feed_extended(self, m):
945 code = m.get_int() 946 s = m.get_string() 947 if code != 1: 948 self._log(ERROR, 'unknown extended_data type %d; discarding' % code) 949 return 950 if self.combine_stderr: 951 self._feed(s) 952 else: 953 self.in_stderr_buffer.feed(s)
954
955 - def _window_adjust(self, m):
956 nbytes = m.get_int() 957 self.lock.acquire() 958 try: 959 if self.ultra_debug: 960 self._log(DEBUG, 'window up %d' % nbytes) 961 self.out_window_size += nbytes 962 self.out_buffer_cv.notifyAll() 963 finally: 964 self.lock.release()
965
966 - def _handle_request(self, m):
967 key = m.get_string() 968 want_reply = m.get_boolean() 969 server = self.transport.server_object 970 ok = False 971 if key == 'exit-status': 972 self.exit_status = m.get_int() 973 self.status_event.set() 974 ok = True 975 elif key == 'xon-xoff': 976 # ignore 977 ok = True 978 elif key == 'pty-req': 979 term = m.get_string() 980 width = m.get_int() 981 height = m.get_int() 982 pixelwidth = m.get_int() 983 pixelheight = m.get_int() 984 modes = m.get_string() 985 if server is None: 986 ok = False 987 else: 988 ok = server.check_channel_pty_request(self, term, width, height, pixelwidth, 989 pixelheight, modes) 990 elif key == 'shell': 991 if server is None: 992 ok = False 993 else: 994 ok = server.check_channel_shell_request(self) 995 elif key == 'exec': 996 cmd = m.get_string() 997 if server is None: 998 ok = False 999 else: 1000 ok = server.check_channel_exec_request(self, cmd) 1001 elif key == 'subsystem': 1002 name = m.get_string() 1003 if server is None: 1004 ok = False 1005 else: 1006 ok = server.check_channel_subsystem_request(self, name) 1007 elif key == 'window-change': 1008 width = m.get_int() 1009 height = m.get_int() 1010 pixelwidth = m.get_int() 1011 pixelheight = m.get_int() 1012 if server is None: 1013 ok = False 1014 else: 1015 ok = server.check_channel_window_change_request(self, width, height, pixelwidth, 1016 pixelheight) 1017 elif key == 'x11-req': 1018 single_connection = m.get_boolean() 1019 auth_proto = m.get_string() 1020 auth_cookie = m.get_string() 1021 screen_number = m.get_int() 1022 if server is None: 1023 ok = False 1024 else: 1025 ok = server.check_channel_x11_request(self, single_connection, 1026 auth_proto, auth_cookie, screen_number) 1027 else: 1028 self._log(DEBUG, 'Unhandled channel request "%s"' % key) 1029 ok = False 1030 if want_reply: 1031 m = Message() 1032 if ok: 1033 m.add_byte(chr(MSG_CHANNEL_SUCCESS)) 1034 else: 1035 m.add_byte(chr(MSG_CHANNEL_FAILURE)) 1036 m.add_int(self.remote_chanid) 1037 self.transport._send_user_message(m)
1038
1039 - def _handle_eof(self, m):
1040 self.lock.acquire() 1041 try: 1042 if not self.eof_received: 1043 self.eof_received = True 1044 self.in_buffer.close() 1045 self.in_stderr_buffer.close() 1046 if self._pipe is not None: 1047 self._pipe.set_forever() 1048 finally: 1049 self.lock.release() 1050 self._log(DEBUG, 'EOF received (%s)', self._name)
1051
1052 - def _handle_close(self, m):
1053 self.lock.acquire() 1054 try: 1055 msgs = self._close_internal() 1056 self.transport._unlink_channel(self.chanid) 1057 finally: 1058 self.lock.release() 1059 for m in msgs: 1060 if m is not None: 1061 self.transport._send_user_message(m)
1062 1063 1064 ### internals... 1065 1066
1067 - def _log(self, level, msg, *args):
1068 self.logger.log(level, "[chan " + self._name + "] " + msg, *args)
1069
1070 - def _wait_for_event(self):
1071 self.event.wait() 1072 assert self.event.isSet() 1073 if self.closed: 1074 e = self.transport.get_exception() 1075 if e is None: 1076 e = SSHException('Channel closed.') 1077 raise e
1078
1079 - def _set_closed(self):
1080 # you are holding the lock. 1081 self.closed = True 1082 self.in_buffer.close() 1083 self.in_stderr_buffer.close() 1084 self.out_buffer_cv.notifyAll() 1085 # Notify any waiters that we are closed 1086 self.event.set() 1087 self.status_event.set() 1088 if self._pipe is not None: 1089 self._pipe.set_forever()
1090
1091 - def _send_eof(self):
1092 # you are holding the lock. 1093 if self.eof_sent: 1094 return None 1095 m = Message() 1096 m.add_byte(chr(MSG_CHANNEL_EOF)) 1097 m.add_int(self.remote_chanid) 1098 self.eof_sent = True 1099 self._log(DEBUG, 'EOF sent (%s)', self._name) 1100 return m
1101
1102 - def _close_internal(self):
1103 # you are holding the lock. 1104 if not self.active or self.closed: 1105 return None, None 1106 m1 = self._send_eof() 1107 m2 = Message() 1108 m2.add_byte(chr(MSG_CHANNEL_CLOSE)) 1109 m2.add_int(self.remote_chanid) 1110 self._set_closed() 1111 # can't unlink from the Transport yet -- the remote side may still 1112 # try to send meta-data (exit-status, etc) 1113 return m1, m2
1114 1125
1126 - def _check_add_window(self, n):
1127 self.lock.acquire() 1128 try: 1129 if self.closed or self.eof_received or not self.active: 1130 return 0 1131 if self.ultra_debug: 1132 self._log(DEBUG, 'addwindow %d' % n) 1133 self.in_window_sofar += n 1134 if self.in_window_sofar <= self.in_window_threshold: 1135 return 0 1136 if self.ultra_debug: 1137 self._log(DEBUG, 'addwindow send %d' % self.in_window_sofar) 1138 out = self.in_window_sofar 1139 self.in_window_sofar = 0 1140 return out 1141 finally: 1142 self.lock.release()
1143
1144 - def _wait_for_send_window(self, size):
1145 """ 1146 (You are already holding the lock.) 1147 Wait for the send window to open up, and allocate up to C{size} bytes 1148 for transmission. If no space opens up before the timeout, a timeout 1149 exception is raised. Returns the number of bytes available to send 1150 (may be less than requested). 1151 """ 1152 # you are already holding the lock 1153 if self.closed or self.eof_sent: 1154 return 0 1155 if self.out_window_size == 0: 1156 # should we block? 1157 if self.timeout == 0.0: 1158 raise socket.timeout() 1159 # loop here in case we get woken up but a different thread has filled the buffer 1160 timeout = self.timeout 1161 while self.out_window_size == 0: 1162 if self.closed or self.eof_sent: 1163 return 0 1164 then = time.time() 1165 self.out_buffer_cv.wait(timeout) 1166 if timeout != None: 1167 timeout -= time.time() - then 1168 if timeout <= 0.0: 1169 raise socket.timeout() 1170 # we have some window to squeeze into 1171 if self.closed or self.eof_sent: 1172 return 0 1173 if self.out_window_size < size: 1174 size = self.out_window_size 1175 if self.out_max_packet_size - 64 < size: 1176 size = self.out_max_packet_size - 64 1177 self.out_window_size -= size 1178 if self.ultra_debug: 1179 self._log(DEBUG, 'window down to %d' % self.out_window_size) 1180 return size
1181 1182
1183 -class ChannelFile (BufferedFile):
1184 """ 1185 A file-like wrapper around L{Channel}. A ChannelFile is created by calling 1186 L{Channel.makefile}. 1187 1188 @bug: To correctly emulate the file object created from a socket's 1189 C{makefile} method, a L{Channel} and its C{ChannelFile} should be able 1190 to be closed or garbage-collected independently. Currently, closing 1191 the C{ChannelFile} does nothing but flush the buffer. 1192 """ 1193
1194 - def __init__(self, channel, mode = 'r', bufsize = -1):
1195 self.channel = channel 1196 BufferedFile.__init__(self) 1197 self._set_mode(mode, bufsize)
1198
1199 - def __repr__(self):
1200 """ 1201 Returns a string representation of this object, for debugging. 1202 1203 @rtype: str 1204 """ 1205 return '<paramiko.ChannelFile from ' + repr(self.channel) + '>'
1206
1207 - def _read(self, size):
1208 return self.channel.recv(size)
1209
1210 - def _write(self, data):
1211 self.channel.sendall(data) 1212 return len(data)
1213 1214
1215 -class ChannelStderrFile (ChannelFile):
1216 - def __init__(self, channel, mode = 'r', bufsize = -1):
1217 ChannelFile.__init__(self, channel, mode, bufsize)
1218
1219 - def _read(self, size):
1220 return self.channel.recv_stderr(size)
1221
1222 - def _write(self, data):
1223 self.channel.sendall_stderr(data) 1224 return len(data)
1225 1226 1227 # vim: set shiftwidth=4 expandtab : 1228