Package nbxmpp :: Module transports_nb
[hide private]
[frames] | no frames]

Source Code for Module nbxmpp.transports_nb

  1  ##   transports_nb.py 
  2  ##       based on transports.py 
  3  ## 
  4  ##   Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov 
  5  ##       modified by Dimitur Kirov <dkirov@gmail.com> 
  6  ##       modified by Tomas Karasek <tom.to.the.k@gmail.com> 
  7  ## 
  8  ##   This program is free software; you can redistribute it and/or modify 
  9  ##   it under the terms of the GNU General Public License as published by 
 10  ##   the Free Software Foundation; either version 2, or (at your option) 
 11  ##   any later version. 
 12  ## 
 13  ##   This program is distributed in the hope that it will be useful, 
 14  ##   but WITHOUT ANY WARRANTY; without even the implied warranty of 
 15  ##   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 16  ##   GNU General Public License for more details. 
 17   
 18  """ 
 19  Transports are objects responsible for connecting to XMPP server and putting 
 20  data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy, 
 21  for SOCKS5 proxy...) 
 22   
 23  Transports are not aware of XMPP stanzas and only responsible for low-level 
 24  connection handling. 
 25  """ 
 26   
 27  from simplexml import ustr 
 28  from plugin import PlugIn 
 29  from idlequeue import IdleObject 
 30  import proxy_connectors 
 31  import tls_nb 
 32   
 33  import socket 
 34  import errno 
 35  import time 
 36  import traceback 
 37  import base64 
 38  import urlparse 
 39   
 40  import logging 
 41  log = logging.getLogger('nbxmpp.transports_nb') 
 42   
43 -def urisplit(uri):
44 """ 45 Function for splitting URI string to tuple (protocol, host, port, path). 46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http', 47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto 48 is https else 80 49 """ 50 splitted = urlparse.urlsplit(uri) 51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path 52 try: 53 port = splitted.port 54 except ValueError: 55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \ 56 % uri) 57 port = '' 58 if not port: 59 if proto == 'https': 60 port = 443 61 else: 62 port = 80 63 return proto, host, port, path
64
65 -def get_proxy_data_from_dict(proxy):
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None 67 proxy_type = proxy['type'] 68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']: 69 # with BOSH not over proxy we have to parse the hostname from BOSH URI 70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri']) 71 else: 72 # with proxy!=bosh or with bosh over HTTP proxy we're connecting to proxy 73 # machine 74 tcp_host, tcp_port = proxy['host'], proxy['port'] 75 if proxy.get('useauth', False): 76 proxy_user, proxy_pass = proxy['user'], proxy['pass'] 77 return tcp_host, tcp_port, proxy_user, proxy_pass
78 79 #: timeout to connect to the server socket, it doesn't include auth 80 CONNECT_TIMEOUT_SECONDS = 30 81 82 #: how long to wait for a disconnect to complete 83 DISCONNECT_TIMEOUT_SECONDS = 5 84 85 #: size of the buffer which reads data from server 86 # if lower, more stanzas will be fragmented and processed twice 87 RECV_BUFSIZE = 32768 # 2x maximum size of ssl packet, should be plenty 88 # it's inefficient but should work. Problem is that connect machine makes wrong 89 # assumptions and that we only check for pending data in sockets but not in SSL 90 # buffer... 91 92 DATA_RECEIVED = 'DATA RECEIVED' 93 DATA_SENT = 'DATA SENT' 94 DATA_ERROR = 'DATA ERROR' 95 96 DISCONNECTED = 'DISCONNECTED' 97 DISCONNECTING = 'DISCONNECTING' 98 CONNECTING = 'CONNECTING' 99 PROXY_CONNECTING = 'PROXY_CONNECTING' 100 CONNECTED = 'CONNECTED' 101 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING) 102
103 -class NonBlockingTransport(PlugIn):
104 """ 105 Abstract class representing a transport 106 107 Subclasses CAN have different constructor signature but connect method SHOULD 108 be the same. 109 """ 110
111 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 112 certs):
113 """ 114 Each trasport class can have different constructor but it has to have at 115 least all the arguments of NonBlockingTransport constructor 116 117 :param raise_event: callback for monitoring of sent and received data 118 :param on_disconnect: callback called on disconnection during runtime 119 :param idlequeue: processing idlequeue 120 :param estabilish_tls: boolean whether to estabilish TLS connection after 121 TCP connection is done 122 :param certs: tuple of (cacerts, mycerts) see constructor of 123 tls_nb.NonBlockingTLS for more details 124 """ 125 PlugIn.__init__(self) 126 self.raise_event = raise_event 127 self.on_disconnect = on_disconnect 128 self.on_connect = None 129 self.on_connect_failure = None 130 self.idlequeue = idlequeue 131 self.on_receive = None 132 self.server = None 133 self.port = None 134 self.conn_5tuple = None 135 self.set_state(DISCONNECTED) 136 self.estabilish_tls = estabilish_tls 137 self.certs = certs 138 # type of used ssl lib (if any) will be assigned to this member var 139 self.ssl_lib = None 140 self._exported_methods=[self.onreceive, self.set_send_timeout, 141 self.set_send_timeout2, self.set_timeout, self.remove_timeout, 142 self.start_disconnect] 143 144 # time to wait for SOME stanza to come and then send keepalive 145 self.sendtimeout = 0 146 147 # in case we want to something different than sending keepalives 148 self.on_timeout = None 149 self.on_timeout2 = None
150
151 - def plugin(self, owner):
152 owner.Connection = self
153
154 - def plugout(self):
155 self._owner.Connection = None 156 self._owner = None 157 self.disconnect(do_callback=False)
158
159 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
160 """ 161 Creates and connects transport to server and port defined in conn_5tuple 162 which should be item from list returned from getaddrinfo 163 164 :param conn_5tuple: 5-tuple returned from getaddrinfo 165 :param on_connect: callback called on successful connect to the server 166 :param on_connect_failure: callback called on failure when connecting 167 """ 168 self.on_connect = on_connect 169 self.on_connect_failure = on_connect_failure 170 self.server, self.port = conn_5tuple[4][:2] 171 self.conn_5tuple = conn_5tuple
172
173 - def set_state(self, newstate):
174 assert(newstate in STATES) 175 self.state = newstate
176
177 - def get_state(self):
178 return self.state
179
180 - def _on_connect(self):
181 """ 182 Preceeds call of on_connect callback 183 """ 184 # data is reference to socket wrapper instance. We don't need it in client 185 # because 186 self.set_state(CONNECTED) 187 self.on_connect()
188
189 - def _on_connect_failure(self, err_message):
190 """ 191 Preceeds call of on_connect_failure callback 192 """ 193 # In case of error while connecting we need to disconnect transport 194 # but we don't want to call DisconnectHandlers from client, 195 # thus the do_callback=False 196 self.disconnect(do_callback=False) 197 self.on_connect_failure(err_message=err_message)
198
199 - def send(self, raw_data, now=False):
200 if self.get_state() == DISCONNECTED: 201 log.error('Unable to send %s \n because state is %s.' % 202 (raw_data, self.get_state()))
203
204 - def disconnect(self, do_callback=True):
205 self.set_state(DISCONNECTED) 206 if do_callback: 207 # invoke callback given in __init__ 208 self.on_disconnect()
209
210 - def onreceive(self, recv_handler):
211 """ 212 Set the on_receive callback. 213 214 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is 215 the default one that will decide what to do with received stanza based on 216 its tag name and namespace. 217 218 Do not confuse it with on_receive() method, which is the callback 219 itself. 220 """ 221 if not recv_handler: 222 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'): 223 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking 224 else: 225 log.warn('No Dispatcher plugged. Received data will not be processed') 226 self.on_receive = None 227 return 228 self.on_receive = recv_handler
229
230 - def _tcp_connecting_started(self):
232
233 - def read_timeout(self):
234 """ 235 Called when there's no response from server in defined timeout 236 """ 237 if self.on_timeout: 238 self.on_timeout() 239 self.renew_send_timeout()
240
241 - def read_timeout2(self):
242 """ 243 called when there's no response from server in defined timeout 244 """ 245 if self.on_timeout2: 246 self.on_timeout2() 247 self.renew_send_timeout2()
248
249 - def renew_send_timeout(self):
250 if self.on_timeout and self.sendtimeout > 0: 251 self.set_timeout(self.sendtimeout)
252
253 - def renew_send_timeout2(self):
254 if self.on_timeout2 and self.sendtimeout2 > 0: 255 self.set_timeout2(self.sendtimeout2)
256
257 - def set_timeout(self, timeout):
258 self.idlequeue.set_read_timeout(self.fd, timeout)
259
260 - def set_timeout2(self, timeout2):
261 self.idlequeue.set_read_timeout(self.fd, timeout2, self.read_timeout2)
262
263 - def get_fd(self):
264 pass
265
266 - def remove_timeout(self):
267 self.idlequeue.remove_timeout(self.fd)
268
269 - def set_send_timeout(self, timeout, on_timeout):
270 self.sendtimeout = timeout 271 if self.sendtimeout > 0: 272 self.on_timeout = on_timeout 273 else: 274 self.on_timeout = None
275
276 - def set_send_timeout2(self, timeout2, on_timeout2):
277 self.sendtimeout2 = timeout2 278 if self.sendtimeout2 > 0: 279 self.on_timeout2 = on_timeout2 280 else: 281 self.on_timeout2 = None
282 283 # FIXME: where and why does this need to be called
284 - def start_disconnect(self):
286 287
288 -class NonBlockingTCP(NonBlockingTransport, IdleObject):
289 """ 290 Non-blocking TCP socket wrapper 291 292 It is used for simple XMPP connection. Can be connected via proxy and can 293 estabilish TLS connection. 294 """
295 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 296 certs, proxy_dict=None):
297 """ 298 :param proxy_dict: dictionary with proxy data as loaded from config file 299 """ 300 NonBlockingTransport.__init__(self, raise_event, on_disconnect, idlequeue, 301 estabilish_tls, certs) 302 IdleObject.__init__(self) 303 304 # queue with messages to be send 305 self.sendqueue = [] 306 307 # bytes remained from the last send message 308 self.sendbuff = '' 309 310 self.proxy_dict = proxy_dict 311 self.on_remote_disconnect = self.disconnect
312 313 # FIXME: transport should not be aware xmpp
314 - def start_disconnect(self):
315 NonBlockingTransport.start_disconnect(self) 316 self.send('</stream:stream>', now=True) 317 self.disconnect()
318
319 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
320 NonBlockingTransport.connect(self, conn_5tuple, on_connect, 321 on_connect_failure) 322 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' % 323 (self.server, self.port)) 324 325 try: 326 self._sock = socket.socket(*conn_5tuple[:3]) 327 except socket.error, (errnum, errstr): 328 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\ 329 socket: %s %s' % (errnum, errstr)) 330 return 331 332 self._send = self._sock.send 333 self._recv = self._sock.recv 334 self.fd = self._sock.fileno() 335 336 # we want to be notified when send is possible to connected socket because 337 # it means the TCP connection is estabilished 338 self._plug_idle(writable=True, readable=False) 339 self.peerhost = None 340 341 # variable for errno symbol that will be found from exception raised 342 # from connect() 343 errnum = 0 344 errstr = str() 345 346 # set timeout for TCP connecting - if nonblocking connect() fails, pollend 347 # is called. If if succeeds pollout is called. 348 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS) 349 350 try: 351 self._sock.setblocking(False) 352 self._sock.connect((self.server, self.port)) 353 except Exception, exc: 354 errnum, errstr = exc.args 355 356 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK): 357 # connecting in progress 358 log.info('After NB connect() of %s. "%s" raised => CONNECTING' % 359 (id(self), errstr)) 360 self._tcp_connecting_started() 361 return 362 363 # if there was some other exception, call failure callback and unplug 364 # transport which will also remove read_timeouts for descriptor 365 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' % 366 (self.server, self.port, errnum, errstr))
367
368 - def _connect_to_proxy(self):
369 self.set_state(PROXY_CONNECTING) 370 if self.proxy_dict['type'] == 'socks5': 371 proxyclass = proxy_connectors.SOCKS5Connector 372 elif self.proxy_dict['type'] == 'http' : 373 proxyclass = proxy_connectors.HTTPCONNECTConnector 374 proxyclass.get_instance( 375 send_method=self.send, 376 onreceive=self.onreceive, 377 old_on_receive=self.on_receive, 378 on_success=self._on_connect, 379 on_failure=self._on_connect_failure, 380 xmpp_server=self.proxy_dict['xmpp_server'], 381 proxy_creds=self.proxy_dict['credentials'])
382
383 - def _on_connect(self):
384 """ 385 Preceed invoking of on_connect callback. TCP connection is already 386 estabilished by this time 387 """ 388 if self.estabilish_tls: 389 self.tls_init( 390 on_succ = lambda: NonBlockingTransport._on_connect(self), 391 on_fail = lambda: self._on_connect_failure( 392 'error while estabilishing TLS')) 393 else: 394 NonBlockingTransport._on_connect(self)
395
396 - def tls_init(self, on_succ, on_fail):
397 """ 398 Estabilishes TLS/SSL using this TCP connection by plugging a 399 NonBlockingTLS module 400 """ 401 cacerts, mycerts = self.certs 402 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self) 403 if result: 404 on_succ() 405 else: 406 on_fail()
407
408 - def pollin(self):
409 """ 410 Called by idlequeu when receive on plugged socket is possible 411 """ 412 log.info('pollin called, state == %s' % self.get_state()) 413 self._do_receive()
414
415 - def pollout(self):
416 """ 417 Called by idlequeu when send to plugged socket is possible 418 """ 419 log.info('pollout called, state == %s' % self.get_state()) 420 421 if self.get_state() == CONNECTING: 422 log.info('%s socket wrapper connected' % id(self)) 423 self.idlequeue.remove_timeout(self.fd) 424 self._plug_idle(writable=False, readable=False) 425 self.peerhost = self._sock.getsockname() 426 if self.proxy_dict: 427 self._connect_to_proxy() 428 else: 429 self._on_connect() 430 else: 431 self._do_send()
432
433 - def pollend(self):
434 """ 435 Called by idlequeue on TCP connection errors 436 """ 437 log.info('pollend called, state == %s' % self.get_state()) 438 439 if self.get_state() == CONNECTING: 440 self._on_connect_failure('Error during connect to %s:%s' % 441 (self.server, self.port)) 442 else: 443 self.disconnect()
444
445 - def disconnect(self, do_callback=True):
446 if self.get_state() == DISCONNECTED: 447 return 448 self.set_state(DISCONNECTED) 449 self.idlequeue.unplug_idle(self.fd) 450 if 'NonBlockingTLS' in self.__dict__: 451 self.NonBlockingTLS.PlugOut() 452 try: 453 self._sock.shutdown(socket.SHUT_RDWR) 454 self._sock.close() 455 except socket.error, (errnum, errstr): 456 log.info('Error while disconnecting socket: %s' % errstr) 457 self.fd = -1 458 NonBlockingTransport.disconnect(self, do_callback)
459
460 - def read_timeout(self):
461 log.info('read_timeout called, state == %s' % self.get_state()) 462 if self.get_state() == CONNECTING: 463 # if read_timeout is called during connecting, connect() didn't end yet 464 # thus we have to call the tcp failure callback 465 self._on_connect_failure('Error during connect to %s:%s' % 466 (self.server, self.port)) 467 else: 468 NonBlockingTransport.read_timeout(self)
469
470 - def set_timeout(self, timeout):
471 if self.get_state() != DISCONNECTED and self.fd != -1: 472 NonBlockingTransport.set_timeout(self, timeout) 473 else: 474 log.warn('set_timeout: TIMEOUT NOT SET: state is %s, fd is %s' % 475 (self.get_state(), self.fd))
476
477 - def remove_timeout(self):
478 if self.fd: 479 NonBlockingTransport.remove_timeout(self) 480 else: 481 log.warn('remove_timeout: no self.fd state is %s' % self.get_state())
482
483 - def send(self, raw_data, now=False):
484 """ 485 Append raw_data to the queue of messages to be send. If supplied data is 486 unicode string, encode it to utf-8. 487 """ 488 NonBlockingTransport.send(self, raw_data, now) 489 490 r = self.encode_stanza(raw_data) 491 492 if now: 493 self.sendqueue.insert(0, r) 494 self._do_send() 495 else: 496 self.sendqueue.append(r) 497 498 self._plug_idle(writable=True, readable=True)
499
500 - def encode_stanza(self, stanza):
501 """ 502 Encode str or unicode to utf-8 503 """ 504 if isinstance(stanza, unicode): 505 stanza = stanza.encode('utf-8') 506 elif not isinstance(stanza, str): 507 stanza = ustr(stanza).encode('utf-8') 508 return stanza
509
510 - def _plug_idle(self, writable, readable):
511 """ 512 Plug file descriptor of socket to Idlequeue 513 514 Plugged socket will be watched for "send possible" or/and "recv possible" 515 events. pollin() callback is invoked on "recv possible", pollout() on 516 "send_possible". 517 518 Plugged socket will always be watched for "error" event - in that case, 519 pollend() is called. 520 """ 521 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable)) 522 self.idlequeue.plug_idle(self, writable, readable)
523
524 - def _do_send(self):
525 """ 526 Called when send() to connected socket is possible. First message from 527 sendqueue will be sent 528 """ 529 if not self.sendbuff: 530 if not self.sendqueue: 531 log.warn('calling send on empty buffer and queue') 532 self._plug_idle(writable=False, readable=True) 533 return None 534 self.sendbuff = self.sendqueue.pop(0) 535 try: 536 send_count = self._send(self.sendbuff) 537 if send_count: 538 sent_data = self.sendbuff[:send_count] 539 self.sendbuff = self.sendbuff[send_count:] 540 self._plug_idle( 541 writable=((self.sendqueue!=[]) or (self.sendbuff!='')), 542 readable=True) 543 self.raise_event(DATA_SENT, sent_data) 544 545 except Exception: 546 log.error('_do_send:', exc_info=True) 547 traceback.print_exc() 548 self.disconnect()
549
550 - def _do_receive(self):
551 """ 552 Reads all pending incoming data. Will call owner's disconnected() method 553 if appropriate 554 """ 555 received = None 556 errnum = 0 557 errstr = 'No Error Set' 558 559 try: 560 # get as many bites, as possible, but not more than RECV_BUFSIZE 561 received = self._recv(RECV_BUFSIZE) 562 except socket.error, (errnum, errstr): 563 log.info("_do_receive: got %s:" % received, exc_info=True) 564 except tls_nb.SSLWrapper.Error, e: 565 log.info("_do_receive, caught SSL error, got %s:" % received, 566 exc_info=True) 567 errnum, errstr = e.errno, e.strerror 568 569 if received == '': 570 errstr = 'zero bytes on recv' 571 572 if (self.ssl_lib is None and received == '') or \ 573 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \ 574 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ): 575 # 8 in stdlib: errstr == EOF occured in violation of protocol 576 # -1 in pyopenssl: errstr == Unexpected EOF 577 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr)) 578 self.on_remote_disconnect() 579 return 580 581 if errnum: 582 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port, 583 errnum, errstr), exc_info=True) 584 self.disconnect() 585 return 586 587 # this branch is for case of non-fatal SSL errors - None is returned from 588 # recv() but no errnum is set 589 if received is None: 590 return 591 592 # we have received some bytes, stop the timeout! 593 self.remove_timeout() 594 self.renew_send_timeout() 595 self.renew_send_timeout2() 596 # pass received data to owner 597 if self.on_receive: 598 self.raise_event(DATA_RECEIVED, received) 599 self._on_receive(received) 600 else: 601 # This should never happen, so we need the debug. 602 # (If there is no handler on receive specified, data is passed to 603 # Dispatcher.ProcessNonBlocking) 604 log.error('SOCKET %s Unhandled data received: %s' % (id(self), 605 received)) 606 self.disconnect()
607
608 - def _on_receive(self, data):
609 """ 610 Preceeds on_receive callback. It peels off and checks HTTP headers in 611 HTTP classes, in here it just calls the callback 612 """ 613 self.on_receive(data)
614 615
616 -class NonBlockingHTTP(NonBlockingTCP):
617 """ 618 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP 619 headers from incoming messages 620 """ 621
622 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls, 623 certs, on_http_request_possible, on_persistent_fallback, http_dict, 624 proxy_dict=None):
625 """ 626 :param on_http_request_possible: method to call when HTTP request to 627 socket owned by transport is possible. 628 :param on_persistent_fallback: callback called when server ends TCP 629 connection. It doesn't have to be fatal for HTTP session. 630 :param http_dict: dictionary with data for HTTP request and headers 631 """ 632 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue, 633 estabilish_tls, certs, proxy_dict) 634 635 self.http_protocol, self.http_host, self.http_port, self.http_path = \ 636 urisplit(http_dict['http_uri']) 637 self.http_protocol = self.http_protocol or 'http' 638 self.http_path = self.http_path or '/' 639 self.http_version = http_dict['http_version'] 640 self.http_persistent = http_dict['http_persistent'] 641 self.add_proxy_headers = http_dict['add_proxy_headers'] 642 643 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict: 644 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[ 645 'proxy_pass'] 646 else: 647 self.proxy_user, self.proxy_pass = None, None 648 649 # buffer for partial responses 650 self.recvbuff = '' 651 self.expected_length = 0 652 self.pending_requests = 0 653 self.on_http_request_possible = on_http_request_possible 654 self.last_recv_time = 0 655 self.close_current_connection = False 656 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
657
658 - def http_send(self, raw_data, now=False):
659 self.send(self.build_http_message(raw_data), now)
660
661 - def _on_receive(self, data):
662 """ 663 Preceeds passing received data to owner class. Gets rid of HTTP headers 664 and checks them. 665 """ 666 if self.get_state() == PROXY_CONNECTING: 667 NonBlockingTCP._on_receive(self, data) 668 return 669 670 # append currently received data to HTTP msg in buffer 671 self.recvbuff = '%s%s' % (self.recvbuff or '', data) 672 statusline, headers, httpbody, buffer_rest = self.parse_http_message( 673 self.recvbuff) 674 675 if not (statusline and headers and httpbody): 676 log.debug('Received incomplete HTTP response') 677 return 678 679 if statusline[1] != '200': 680 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2])) 681 self.disconnect() 682 return 683 self.expected_length = int(headers['Content-Length']) 684 if 'Connection' in headers and headers['Connection'].strip()=='close': 685 self.close_current_connection = True 686 687 if self.expected_length > len(httpbody): 688 # If we haven't received the whole HTTP mess yet, let's end the thread. 689 # It will be finnished from one of following recvs on plugged socket. 690 log.info('not enough bytes in HTTP response - %d expected, got %d' % 691 (self.expected_length, len(httpbody))) 692 else: 693 # First part of buffer has been extraced and is going to be handled, 694 # remove it from buffer 695 self.recvbuff = buffer_rest 696 697 # everything was received 698 self.expected_length = 0 699 700 if not self.http_persistent or self.close_current_connection: 701 # not-persistent connections disconnect after response 702 self.disconnect(do_callback=False) 703 self.close_current_connection = False 704 self.last_recv_time = time.time() 705 self.on_receive(data=httpbody, socket=self) 706 self.on_http_request_possible()
707
708 - def build_http_message(self, httpbody, method='POST'):
709 """ 710 Builds http message with given body. Values for headers and status line 711 fields are taken from class variables 712 """ 713 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host, 714 self.http_port, self.http_path) 715 headers = ['%s %s %s' % (method, absolute_uri, self.http_version), 716 'Host: %s:%s' % (self.http_host, self.http_port), 717 'User-Agent: Gajim', 718 'Content-Type: text/xml; charset=utf-8', 719 'Content-Length: %s' % len(str(httpbody))] 720 if self.add_proxy_headers: 721 headers.append('Proxy-Connection: keep-alive') 722 headers.append('Pragma: no-cache') 723 if self.proxy_user and self.proxy_pass: 724 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass) 725 credentials = base64.encodestring(credentials).strip() 726 headers.append('Proxy-Authorization: Basic %s' % credentials) 727 else: 728 headers.append('Connection: Keep-Alive') 729 headers.append('\r\n') 730 headers = '\r\n'.join(headers) 731 return('%s%s' % (headers, httpbody))
732
733 - def parse_http_message(self, message):
734 """ 735 Split http message into a tuple: 736 - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'], 737 - headers - dictionary of headers e.g. {'Content-Length': '604', 738 'Content-Type': 'text/xml; charset=utf-8'}, 739 - httpbody - string with http body) 740 - http_rest - what is left in the message after a full HTTP header + body 741 """ 742 splitted = message.split('\r\n\r\n') 743 if len(splitted) < 2: 744 # no complete http message. Keep filling the buffer until we find one 745 buffer_rest = message 746 return ('', '', '', buffer_rest) 747 else: 748 (header, httpbody) = splitted[:2] 749 header = header.replace('\r', '') 750 header = header.lstrip('\n') 751 header = header.split('\n') 752 statusline = header[0].split(' ', 2) 753 header = header[1:] 754 headers = {} 755 for dummy in header: 756 row = dummy.split(' ', 1) 757 headers[row[0][:-1]] = row[1] 758 body_size = headers['Content-Length'] 759 rest_splitted = splitted[2:] 760 while (len(httpbody) < body_size) and rest_splitted: 761 # Complete httpbody until it has the announced size 762 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)]) 763 buffer_rest = "\n\n".join(rest_splitted) 764 return (statusline, headers, httpbody, buffer_rest)
765 766
767 -class NonBlockingHTTPBOSH(NonBlockingHTTP):
768 """ 769 Class for BOSH HTTP connections. Slightly redefines HTTP transport by 770 calling bosh bodytag generating callback before putting data on wire 771 """ 772
773 - def set_stanza_build_cb(self, build_cb):
774 self.build_cb = build_cb
775
776 - def _do_send(self):
777 if self.state == PROXY_CONNECTING: 778 NonBlockingTCP._do_send(self) 779 return 780 if not self.sendbuff: 781 stanza = self.build_cb(socket=self) 782 stanza = self.encode_stanza(stanza) 783 stanza = self.build_http_message(httpbody=stanza) 784 self.sendbuff = stanza 785 NonBlockingTCP._do_send(self)
786