1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| class TTornadoStreamTransport(TTransport.TTransportBase): """a framed, buffered transport over a Tornado stream""" def __init__(self, host, port, stream=None): self.host = host self.port = port self.is_queuing_reads = False self.read_queue = [] self.__wbuf = StringIO()
self.stream = stream if self.stream is not None: self._set_close_callback()
def open(self, callback): logging.debug('socket connecting') sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self.stream = iostream.IOStream(sock)
def on_close_in_connect(*_): message = 'could not connect to %s:%s' % (self.host, self.port) raise TTransportException( type=TTransportException.NOT_OPEN, message=message) self.stream.set_close_callback(on_close_in_connect)
def finish(*_): self._set_close_callback() callback()
self.stream.connect((self.host, self.port), callback=finish)
def _set_close_callback(self): def on_close(): raise TTransportException( type=TTransportException.END_OF_FILE, message='socket closed') self.stream.set_close_callback(self.close)
def close(self): self.stream.set_close_callback(None) self.stream.close()
def read(self, _): assert "you're doing it wrong" is True
@gen.engine def readFrame(self, callback): self.read_queue.append(callback) logging.debug('read queue: %s', self.read_queue)
if self.is_queuing_reads: return
self.is_queuing_reads = True while self.read_queue: next_callback = self.read_queue.pop() result = yield gen.Task(self._readFrameFromStream) next_callback(result) self.is_queuing_reads = False
@gen.engine def _readFrameFromStream(self, callback): logging.debug('_readFrameFromStream') frame_header = yield gen.Task(self.stream.read_bytes, 4) frame_length, = struct.unpack('!i', frame_header) logging.debug('received frame header, frame length = %i', frame_length) frame = yield gen.Task(self.stream.read_bytes, frame_length) logging.debug('received frame payload') callback(frame)
def write(self, buf): self.__wbuf.write(buf)
def flush(self, callback=None): wout = self.__wbuf.getvalue() wsz = len(wout)
self.__wbuf = StringIO() buf = struct.pack("!i", wsz) + wout
logging.debug('writing frame length = %i', wsz) self.stream.write(buf, callback)
|