1 module hunt.imf.clients.GatewayWebSocketClient;
2 
3 import hunt.http.client.ClientHttpHandler;
4 import hunt.http.client.HttpClient;
5 import hunt.http.client.HttpClientConnection;
6 import hunt.http.client.HttpClientRequest;
7 import hunt.http.HttpOptions;
8 import hunt.http.HttpConnection;
9 import hunt.http.codec.http.stream.HttpOutputStream;
10 import hunt.http.codec.websocket.frame;
11 import hunt.http.codec.websocket.model.IncomingFrames;
12 import hunt.http.codec.websocket.stream.WebSocketConnection;
13 import hunt.http.codec.websocket.stream.WebSocketPolicy;
14 import hunt.concurrency.Promise;
15 import hunt.concurrency.Future;
16 import hunt.concurrency.FuturePromise;
17 import hunt.concurrency.CompletableFuture;
18 import hunt.http.client.HttpClientOptions;
19 import hunt.imf.clients.GatewayClient;
20 import hunt.imf.protocol.Protocol;
21 import google.protobuf;
22 import std.array;
23 import hunt.imf.ConnectBase;
24 import hunt.imf.protocol.websocket.WsConnection;
25 import hunt.net;
26 import hunt.logging;
27 import hunt.imf.MessageBuffer;
28 
29 class ClientHttpHandlerEx : AbstractClientHttpHandler {
30     import hunt.http.codec.http.model;
31 
32     override public bool messageComplete(HttpRequest request,
33     HttpResponse response, HttpOutputStream output, HttpConnection connection) {
34         tracef("upgrade websocket success: " ~ response.toString());
35         return true;
36     }
37 }
38 
39 class IncomingFramesEx : IncomingFrames
40 {
41     private
42     {
43         ConnectBase _conn;
44     }
45 
46     void setWsConnection(ConnectBase connection)
47     {
48         _conn = connection;
49     }
50 
51     override public void incomingError(Exception t) {
52     }
53     override public void incomingFrame(Frame frame) {
54         FrameType type = frame.getType();
55         switch (type) {
56             case FrameType.TEXT:
57             {
58                 break ;
59             }
60             case FrameType.BINARY:
61             {
62                 BinaryFrame binFrame = cast(BinaryFrame) frame;
63                 ConnectBase.dispatchMessage( _conn,MessageBuffer.decode( cast(ubyte[])binFrame.getPayload().getRemaining()));
64                 break ;
65             }
66             default:
67             break ;
68         }
69     }
70 }
71 
72 class GatewayWebSocketClient : GatewayClient
73 {
74     private
75     {
76         HttpClientConnection _connection;
77         Protocol _protocol;
78         HttpClientRequest _request;
79         FuturePromise!WebSocketConnection _promise;
80         IncomingFramesEx _incomingFramesEx;
81         ClientHttpHandlerEx _handlerEx;
82         ConnectBase _conn = null;
83     }
84 
85     this(Protocol protocol)
86     {
87         _request = new HttpClientRequest("GET", "/index");
88         _promise = new FuturePromise!WebSocketConnection();
89         _incomingFramesEx = new IncomingFramesEx();
90         _handlerEx = new ClientHttpHandlerEx();
91         _protocol = protocol;
92     }
93 
94 
95     void connect()
96     {
97         HttpClient client = new HttpClient(new HttpClientOptions());
98         Future!(HttpClientConnection) conn = client.connect(_protocol.getHost(), _protocol.getPort());
99         _connection = conn.get();
100         _connection.upgradeWebSocket(_request, WebSocketPolicy.newClientPolicy(),
101         _promise, _handlerEx, _incomingFramesEx);
102          WebSocketConnection connection = _promise.get();
103         _conn = new WsConnection(connection);
104         _incomingFramesEx.setWsConnection(_conn);
105     }
106 
107     void sendMsg(T)(int tid,T t)
108     {
109         if (_conn !is null)
110         {
111             MessageBuffer ask = new MessageBuffer(tid,t.toProtobuf.array);
112             _conn.sendMsg(ask);
113         }
114     }
115 
116     void onConnection (ConnectBase connection)
117     {
118 
119     }
120 
121     void onClosed (ConnectBase connection)
122     {
123 
124     }
125 
126 }
127