1 module hunt.imf.clients.GatewayWebSocketClient; 2 3 import hunt.http.client.ClientHttpHandler; 4 import hunt.http.client.HttpClient; 5 import hunt.http.client.HttpClientConnection; 6 import hunt.http.client.HttpClientRequest; 7 import hunt.http.HttpOptions; 8 import hunt.http.HttpConnection; 9 import hunt.http.codec.http.stream.HttpOutputStream; 10 import hunt.http.codec.websocket.frame; 11 import hunt.http.codec.websocket.model.IncomingFrames; 12 import hunt.http.codec.websocket.stream.WebSocketConnection; 13 import hunt.http.codec.websocket.stream.WebSocketPolicy; 14 import hunt.concurrency.Promise; 15 import hunt.concurrency.Future; 16 import hunt.concurrency.FuturePromise; 17 import hunt.concurrency.CompletableFuture; 18 import hunt.http.client.HttpClientOptions; 19 import hunt.imf.clients.GatewayClient; 20 import hunt.imf.protocol.Protocol; 21 import google.protobuf; 22 import std.array; 23 import hunt.imf.ConnectBase; 24 import hunt.imf.protocol.websocket.WsConnection; 25 import hunt.net; 26 import hunt.logging; 27 import hunt.imf.MessageBuffer; 28 29 class ClientHttpHandlerEx : AbstractClientHttpHandler { 30 import hunt.http.codec.http.model; 31 32 override public bool messageComplete(HttpRequest request, 33 HttpResponse response, HttpOutputStream output, HttpConnection connection) { 34 tracef("upgrade websocket success: " ~ response.toString()); 35 return true; 36 } 37 } 38 39 class IncomingFramesEx : IncomingFrames 40 { 41 private 42 { 43 ConnectBase _conn; 44 } 45 46 void setWsConnection(ConnectBase connection) 47 { 48 _conn = connection; 49 } 50 51 override public void incomingError(Exception t) { 52 } 53 override public void incomingFrame(Frame frame) { 54 FrameType type = frame.getType(); 55 switch (type) { 56 case FrameType.TEXT: 57 { 58 break ; 59 } 60 case FrameType.BINARY: 61 { 62 BinaryFrame binFrame = cast(BinaryFrame) frame; 63 ConnectBase.dispatchMessage( _conn,MessageBuffer.decode( cast(ubyte[])binFrame.getPayload().getRemaining())); 64 break ; 65 } 66 default: 67 break ; 68 } 69 } 70 } 71 72 class GatewayWebSocketClient : GatewayClient 73 { 74 private 75 { 76 HttpClientConnection _connection; 77 Protocol _protocol; 78 HttpClientRequest _request; 79 FuturePromise!WebSocketConnection _promise; 80 IncomingFramesEx _incomingFramesEx; 81 ClientHttpHandlerEx _handlerEx; 82 ConnectBase _conn = null; 83 } 84 85 this(Protocol protocol) 86 { 87 _request = new HttpClientRequest("GET", "/index"); 88 _promise = new FuturePromise!WebSocketConnection(); 89 _incomingFramesEx = new IncomingFramesEx(); 90 _handlerEx = new ClientHttpHandlerEx(); 91 _protocol = protocol; 92 } 93 94 95 void connect() 96 { 97 HttpClient client = new HttpClient(new HttpClientOptions()); 98 Future!(HttpClientConnection) conn = client.connect(_protocol.getHost(), _protocol.getPort()); 99 _connection = conn.get(); 100 _connection.upgradeWebSocket(_request, WebSocketPolicy.newClientPolicy(), 101 _promise, _handlerEx, _incomingFramesEx); 102 WebSocketConnection connection = _promise.get(); 103 _conn = new WsConnection(connection); 104 _incomingFramesEx.setWsConnection(_conn); 105 } 106 107 void sendMsg(T)(int tid,T t) 108 { 109 if (_conn !is null) 110 { 111 MessageBuffer ask = new MessageBuffer(tid,t.toProtobuf.array); 112 _conn.sendMsg(ask); 113 } 114 } 115 116 void onConnection (ConnectBase connection) 117 { 118 119 } 120 121 void onClosed (ConnectBase connection) 122 { 123 124 } 125 126 } 127