1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 """
19 Transports are objects responsible for connecting to XMPP server and putting
20 data to wrapped sockets in in desired form (SSL, TLS, TCP, for HTTP proxy,
21 for SOCKS5 proxy...)
22
23 Transports are not aware of XMPP stanzas and only responsible for low-level
24 connection handling.
25 """
26
27 from simplexml import ustr
28 from plugin import PlugIn
29 from idlequeue import IdleObject
30 import proxy_connectors
31 import tls_nb
32
33 import socket
34 import errno
35 import time
36 import traceback
37 import base64
38 import urlparse
39
40 import logging
41 log = logging.getLogger('nbxmpp.transports_nb')
42
44 """
45 Function for splitting URI string to tuple (protocol, host, port, path).
46 e.g. urisplit('http://httpcm.jabber.org:123/webclient') returns ('http',
47 'httpcm.jabber.org', 123, '/webclient') return 443 as default port if proto
48 is https else 80
49 """
50 splitted = urlparse.urlsplit(uri)
51 proto, host, path = splitted.scheme, splitted.hostname, splitted.path
52 try:
53 port = splitted.port
54 except ValueError:
55 log.warn('port cannot be extracted from BOSH URL %s, using default port' \
56 % uri)
57 port = ''
58 if not port:
59 if proto == 'https':
60 port = 443
61 else:
62 port = 80
63 return proto, host, port, path
64
66 tcp_host, tcp_port, proxy_user, proxy_pass = None, None, None, None
67 proxy_type = proxy['type']
68 if proxy_type == 'bosh' and not proxy['bosh_useproxy']:
69
70 proto, tcp_host, tcp_port, path = urisplit(proxy['bosh_uri'])
71 else:
72
73
74 tcp_host, tcp_port = proxy['host'], proxy['port']
75 if proxy.get('useauth', False):
76 proxy_user, proxy_pass = proxy['user'], proxy['pass']
77 return tcp_host, tcp_port, proxy_user, proxy_pass
78
79
80 CONNECT_TIMEOUT_SECONDS = 30
81
82
83 DISCONNECT_TIMEOUT_SECONDS = 5
84
85
86
87 RECV_BUFSIZE = 32768
88
89
90
91
92 DATA_RECEIVED = 'DATA RECEIVED'
93 DATA_SENT = 'DATA SENT'
94 DATA_ERROR = 'DATA ERROR'
95
96 DISCONNECTED = 'DISCONNECTED'
97 DISCONNECTING = 'DISCONNECTING'
98 CONNECTING = 'CONNECTING'
99 PROXY_CONNECTING = 'PROXY_CONNECTING'
100 CONNECTED = 'CONNECTED'
101 STATES = (DISCONNECTED, CONNECTING, PROXY_CONNECTING, CONNECTED, DISCONNECTING)
102
104 """
105 Abstract class representing a transport
106
107 Subclasses CAN have different constructor signature but connect method SHOULD
108 be the same.
109 """
110
111 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
112 certs):
113 """
114 Each trasport class can have different constructor but it has to have at
115 least all the arguments of NonBlockingTransport constructor
116
117 :param raise_event: callback for monitoring of sent and received data
118 :param on_disconnect: callback called on disconnection during runtime
119 :param idlequeue: processing idlequeue
120 :param estabilish_tls: boolean whether to estabilish TLS connection after
121 TCP connection is done
122 :param certs: tuple of (cacerts, mycerts) see constructor of
123 tls_nb.NonBlockingTLS for more details
124 """
125 PlugIn.__init__(self)
126 self.raise_event = raise_event
127 self.on_disconnect = on_disconnect
128 self.on_connect = None
129 self.on_connect_failure = None
130 self.idlequeue = idlequeue
131 self.on_receive = None
132 self.server = None
133 self.port = None
134 self.conn_5tuple = None
135 self.set_state(DISCONNECTED)
136 self.estabilish_tls = estabilish_tls
137 self.certs = certs
138
139 self.ssl_lib = None
140 self._exported_methods=[self.onreceive, self.set_send_timeout,
141 self.set_send_timeout2, self.set_timeout, self.remove_timeout,
142 self.start_disconnect]
143
144
145 self.sendtimeout = 0
146
147
148 self.on_timeout = None
149 self.on_timeout2 = None
150
152 owner.Connection = self
153
155 self._owner.Connection = None
156 self._owner = None
157 self.disconnect(do_callback=False)
158
159 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
160 """
161 Creates and connects transport to server and port defined in conn_5tuple
162 which should be item from list returned from getaddrinfo
163
164 :param conn_5tuple: 5-tuple returned from getaddrinfo
165 :param on_connect: callback called on successful connect to the server
166 :param on_connect_failure: callback called on failure when connecting
167 """
168 self.on_connect = on_connect
169 self.on_connect_failure = on_connect_failure
170 self.server, self.port = conn_5tuple[4][:2]
171 self.conn_5tuple = conn_5tuple
172
174 assert(newstate in STATES)
175 self.state = newstate
176
179
181 """
182 Preceeds call of on_connect callback
183 """
184
185
186 self.set_state(CONNECTED)
187 self.on_connect()
188
190 """
191 Preceeds call of on_connect_failure callback
192 """
193
194
195
196 self.disconnect(do_callback=False)
197 self.on_connect_failure(err_message=err_message)
198
199 - def send(self, raw_data, now=False):
203
209
211 """
212 Set the on_receive callback.
213
214 onreceive(None) sets callback to Dispatcher.ProcessNonBlocking which is
215 the default one that will decide what to do with received stanza based on
216 its tag name and namespace.
217
218 Do not confuse it with on_receive() method, which is the callback
219 itself.
220 """
221 if not recv_handler:
222 if hasattr(self, '_owner') and hasattr(self._owner, 'Dispatcher'):
223 self.on_receive = self._owner.Dispatcher.ProcessNonBlocking
224 else:
225 log.warn('No Dispatcher plugged. Received data will not be processed')
226 self.on_receive = None
227 return
228 self.on_receive = recv_handler
229
232
234 """
235 Called when there's no response from server in defined timeout
236 """
237 if self.on_timeout:
238 self.on_timeout()
239 self.renew_send_timeout()
240
242 """
243 called when there's no response from server in defined timeout
244 """
245 if self.on_timeout2:
246 self.on_timeout2()
247 self.renew_send_timeout2()
248
250 if self.on_timeout and self.sendtimeout > 0:
251 self.set_timeout(self.sendtimeout)
252
254 if self.on_timeout2 and self.sendtimeout2 > 0:
255 self.set_timeout2(self.sendtimeout2)
256
259
262
265
268
270 self.sendtimeout = timeout
271 if self.sendtimeout > 0:
272 self.on_timeout = on_timeout
273 else:
274 self.on_timeout = None
275
277 self.sendtimeout2 = timeout2
278 if self.sendtimeout2 > 0:
279 self.on_timeout2 = on_timeout2
280 else:
281 self.on_timeout2 = None
282
283
286
287
289 """
290 Non-blocking TCP socket wrapper
291
292 It is used for simple XMPP connection. Can be connected via proxy and can
293 estabilish TLS connection.
294 """
295 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
296 certs, proxy_dict=None):
312
313
318
319 - def connect(self, conn_5tuple, on_connect, on_connect_failure):
320 NonBlockingTransport.connect(self, conn_5tuple, on_connect,
321 on_connect_failure)
322 log.info('NonBlockingTCP Connect :: About to connect to %s:%s' %
323 (self.server, self.port))
324
325 try:
326 self._sock = socket.socket(*conn_5tuple[:3])
327 except socket.error, (errnum, errstr):
328 self._on_connect_failure('NonBlockingTCP Connect: Error while creating\
329 socket: %s %s' % (errnum, errstr))
330 return
331
332 self._send = self._sock.send
333 self._recv = self._sock.recv
334 self.fd = self._sock.fileno()
335
336
337
338 self._plug_idle(writable=True, readable=False)
339 self.peerhost = None
340
341
342
343 errnum = 0
344 errstr = str()
345
346
347
348 self.idlequeue.set_read_timeout(self.fd, CONNECT_TIMEOUT_SECONDS)
349
350 try:
351 self._sock.setblocking(False)
352 self._sock.connect((self.server, self.port))
353 except Exception, exc:
354 errnum, errstr = exc.args
355
356 if errnum in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK):
357
358 log.info('After NB connect() of %s. "%s" raised => CONNECTING' %
359 (id(self), errstr))
360 self._tcp_connecting_started()
361 return
362
363
364
365 self._on_connect_failure('Exception while connecting to %s:%s - %s %s' %
366 (self.server, self.port, errnum, errstr))
367
382
395
397 """
398 Estabilishes TLS/SSL using this TCP connection by plugging a
399 NonBlockingTLS module
400 """
401 cacerts, mycerts = self.certs
402 result = tls_nb.NonBlockingTLS.get_instance(cacerts, mycerts).PlugIn(self)
403 if result:
404 on_succ()
405 else:
406 on_fail()
407
409 """
410 Called by idlequeu when receive on plugged socket is possible
411 """
412 log.info('pollin called, state == %s' % self.get_state())
413 self._do_receive()
414
432
444
459
469
476
482
483 - def send(self, raw_data, now=False):
484 """
485 Append raw_data to the queue of messages to be send. If supplied data is
486 unicode string, encode it to utf-8.
487 """
488 NonBlockingTransport.send(self, raw_data, now)
489
490 r = self.encode_stanza(raw_data)
491
492 if now:
493 self.sendqueue.insert(0, r)
494 self._do_send()
495 else:
496 self.sendqueue.append(r)
497
498 self._plug_idle(writable=True, readable=True)
499
501 """
502 Encode str or unicode to utf-8
503 """
504 if isinstance(stanza, unicode):
505 stanza = stanza.encode('utf-8')
506 elif not isinstance(stanza, str):
507 stanza = ustr(stanza).encode('utf-8')
508 return stanza
509
511 """
512 Plug file descriptor of socket to Idlequeue
513
514 Plugged socket will be watched for "send possible" or/and "recv possible"
515 events. pollin() callback is invoked on "recv possible", pollout() on
516 "send_possible".
517
518 Plugged socket will always be watched for "error" event - in that case,
519 pollend() is called.
520 """
521 log.info('Plugging fd %d, W:%s, R:%s' % (self.fd, writable, readable))
522 self.idlequeue.plug_idle(self, writable, readable)
523
525 """
526 Called when send() to connected socket is possible. First message from
527 sendqueue will be sent
528 """
529 if not self.sendbuff:
530 if not self.sendqueue:
531 log.warn('calling send on empty buffer and queue')
532 self._plug_idle(writable=False, readable=True)
533 return None
534 self.sendbuff = self.sendqueue.pop(0)
535 try:
536 send_count = self._send(self.sendbuff)
537 if send_count:
538 sent_data = self.sendbuff[:send_count]
539 self.sendbuff = self.sendbuff[send_count:]
540 self._plug_idle(
541 writable=((self.sendqueue!=[]) or (self.sendbuff!='')),
542 readable=True)
543 self.raise_event(DATA_SENT, sent_data)
544
545 except Exception:
546 log.error('_do_send:', exc_info=True)
547 traceback.print_exc()
548 self.disconnect()
549
551 """
552 Reads all pending incoming data. Will call owner's disconnected() method
553 if appropriate
554 """
555 received = None
556 errnum = 0
557 errstr = 'No Error Set'
558
559 try:
560
561 received = self._recv(RECV_BUFSIZE)
562 except socket.error, (errnum, errstr):
563 log.info("_do_receive: got %s:" % received, exc_info=True)
564 except tls_nb.SSLWrapper.Error, e:
565 log.info("_do_receive, caught SSL error, got %s:" % received,
566 exc_info=True)
567 errnum, errstr = e.errno, e.strerror
568
569 if received == '':
570 errstr = 'zero bytes on recv'
571
572 if (self.ssl_lib is None and received == '') or \
573 (self.ssl_lib == tls_nb.PYSTDLIB and errnum == 8 ) or \
574 (self.ssl_lib == tls_nb.PYOPENSSL and errnum == -1 ):
575
576
577 log.info("Disconnected by remote server: #%s, %s" % (errnum, errstr))
578 self.on_remote_disconnect()
579 return
580
581 if errnum:
582 log.info("Connection to %s:%s lost: %s %s" % (self.server, self.port,
583 errnum, errstr), exc_info=True)
584 self.disconnect()
585 return
586
587
588
589 if received is None:
590 return
591
592
593 self.remove_timeout()
594 self.renew_send_timeout()
595 self.renew_send_timeout2()
596
597 if self.on_receive:
598 self.raise_event(DATA_RECEIVED, received)
599 self._on_receive(received)
600 else:
601
602
603
604 log.error('SOCKET %s Unhandled data received: %s' % (id(self),
605 received))
606 self.disconnect()
607
609 """
610 Preceeds on_receive callback. It peels off and checks HTTP headers in
611 HTTP classes, in here it just calls the callback
612 """
613 self.on_receive(data)
614
615
617 """
618 Socket wrapper that creates HTTP message out of sent data and peels-off HTTP
619 headers from incoming messages
620 """
621
622 - def __init__(self, raise_event, on_disconnect, idlequeue, estabilish_tls,
623 certs, on_http_request_possible, on_persistent_fallback, http_dict,
624 proxy_dict=None):
625 """
626 :param on_http_request_possible: method to call when HTTP request to
627 socket owned by transport is possible.
628 :param on_persistent_fallback: callback called when server ends TCP
629 connection. It doesn't have to be fatal for HTTP session.
630 :param http_dict: dictionary with data for HTTP request and headers
631 """
632 NonBlockingTCP.__init__(self, raise_event, on_disconnect, idlequeue,
633 estabilish_tls, certs, proxy_dict)
634
635 self.http_protocol, self.http_host, self.http_port, self.http_path = \
636 urisplit(http_dict['http_uri'])
637 self.http_protocol = self.http_protocol or 'http'
638 self.http_path = self.http_path or '/'
639 self.http_version = http_dict['http_version']
640 self.http_persistent = http_dict['http_persistent']
641 self.add_proxy_headers = http_dict['add_proxy_headers']
642
643 if 'proxy_user' in http_dict and 'proxy_pass' in http_dict:
644 self.proxy_user, self.proxy_pass = http_dict['proxy_user'], http_dict[
645 'proxy_pass']
646 else:
647 self.proxy_user, self.proxy_pass = None, None
648
649
650 self.recvbuff = ''
651 self.expected_length = 0
652 self.pending_requests = 0
653 self.on_http_request_possible = on_http_request_possible
654 self.last_recv_time = 0
655 self.close_current_connection = False
656 self.on_remote_disconnect = lambda: on_persistent_fallback(self)
657
660
662 """
663 Preceeds passing received data to owner class. Gets rid of HTTP headers
664 and checks them.
665 """
666 if self.get_state() == PROXY_CONNECTING:
667 NonBlockingTCP._on_receive(self, data)
668 return
669
670
671 self.recvbuff = '%s%s' % (self.recvbuff or '', data)
672 statusline, headers, httpbody, buffer_rest = self.parse_http_message(
673 self.recvbuff)
674
675 if not (statusline and headers and httpbody):
676 log.debug('Received incomplete HTTP response')
677 return
678
679 if statusline[1] != '200':
680 log.error('HTTP Error: %s %s' % (statusline[1], statusline[2]))
681 self.disconnect()
682 return
683 self.expected_length = int(headers['Content-Length'])
684 if 'Connection' in headers and headers['Connection'].strip()=='close':
685 self.close_current_connection = True
686
687 if self.expected_length > len(httpbody):
688
689
690 log.info('not enough bytes in HTTP response - %d expected, got %d' %
691 (self.expected_length, len(httpbody)))
692 else:
693
694
695 self.recvbuff = buffer_rest
696
697
698 self.expected_length = 0
699
700 if not self.http_persistent or self.close_current_connection:
701
702 self.disconnect(do_callback=False)
703 self.close_current_connection = False
704 self.last_recv_time = time.time()
705 self.on_receive(data=httpbody, socket=self)
706 self.on_http_request_possible()
707
709 """
710 Builds http message with given body. Values for headers and status line
711 fields are taken from class variables
712 """
713 absolute_uri = '%s://%s:%s%s' % (self.http_protocol, self.http_host,
714 self.http_port, self.http_path)
715 headers = ['%s %s %s' % (method, absolute_uri, self.http_version),
716 'Host: %s:%s' % (self.http_host, self.http_port),
717 'User-Agent: Gajim',
718 'Content-Type: text/xml; charset=utf-8',
719 'Content-Length: %s' % len(str(httpbody))]
720 if self.add_proxy_headers:
721 headers.append('Proxy-Connection: keep-alive')
722 headers.append('Pragma: no-cache')
723 if self.proxy_user and self.proxy_pass:
724 credentials = '%s:%s' % (self.proxy_user, self.proxy_pass)
725 credentials = base64.encodestring(credentials).strip()
726 headers.append('Proxy-Authorization: Basic %s' % credentials)
727 else:
728 headers.append('Connection: Keep-Alive')
729 headers.append('\r\n')
730 headers = '\r\n'.join(headers)
731 return('%s%s' % (headers, httpbody))
732
734 """
735 Split http message into a tuple:
736 - (statusline - list of e.g. ['HTTP/1.1', '200', 'OK'],
737 - headers - dictionary of headers e.g. {'Content-Length': '604',
738 'Content-Type': 'text/xml; charset=utf-8'},
739 - httpbody - string with http body)
740 - http_rest - what is left in the message after a full HTTP header + body
741 """
742 splitted = message.split('\r\n\r\n')
743 if len(splitted) < 2:
744
745 buffer_rest = message
746 return ('', '', '', buffer_rest)
747 else:
748 (header, httpbody) = splitted[:2]
749 header = header.replace('\r', '')
750 header = header.lstrip('\n')
751 header = header.split('\n')
752 statusline = header[0].split(' ', 2)
753 header = header[1:]
754 headers = {}
755 for dummy in header:
756 row = dummy.split(' ', 1)
757 headers[row[0][:-1]] = row[1]
758 body_size = headers['Content-Length']
759 rest_splitted = splitted[2:]
760 while (len(httpbody) < body_size) and rest_splitted:
761
762 httpbody = '\n\n'.join([httpbody, rest_splitted.pop(0)])
763 buffer_rest = "\n\n".join(rest_splitted)
764 return (statusline, headers, httpbody, buffer_rest)
765
766
768 """
769 Class for BOSH HTTP connections. Slightly redefines HTTP transport by
770 calling bosh bodytag generating callback before putting data on wire
771 """
772
774 self.build_cb = build_cb
775
786