1 module hunt.imf.core.task;
2 
3 
4 
5 import hunt.imf.protocol.packet;
6 import hunt.imf.core.routing;
7 import hunt.imf.io.context;
8 
9 import std.stdio;
10 import std.conv;
11 import std.stdint;
12 import std.container : DList;
13 
14 import core.thread;
15 import core.sync.condition;
16 import core.sync.mutex;
17 
18 import hunt.net;
19 
20 class Task : Thread
21 {
22     this()
23     {
24         _flag = true;
25         _condition = new Condition(new Mutex());
26         super(&run);
27     }
28 
29     void push( Packet packet)
30     { 
31         synchronized(this){
32             _queue.insertBack(packet);
33         }
34         _condition.notify();
35     }
36 
37     void stop()
38     {
39         _flag = false;
40         _condition.notify();
41     }
42 
43 private:
44     Packet pop()
45     {
46         synchronized(this){
47             if(_queue.empty())
48                 return null;
49             auto packet = _queue.front();
50             _queue.removeFront();
51             return packet;
52         }
53     }
54 
55     void execute(Packet packet)
56     {   
57         auto context = cast(Context)packet.getAttachment();   
58         auto data = Router.findRouter(context.ns , packet.message_id);
59         if( data is null)
60         {
61             writeln("can't found router " ~ to!string(packet.message_id));
62             return ;
63         }
64 
65         auto obj = Object.factory(data.className);
66         if( obj is null)
67         {
68             writeln("can't create " , data.className);
69             return;
70         }
71 
72         VoidProcessDele dele;
73         dele.ptr = cast(void*)obj;
74         dele.funcptr = data.func;
75         
76         setContext(context);
77         dele(packet.message_data);
78     }
79 
80     void run()
81     {
82         while(_flag)
83         {
84             _condition.mutex().lock();
85             _condition.wait();
86             _condition.mutex().unlock();
87             Packet packet = null;
88             while((packet = pop()) !is null)
89             {
90                 execute(packet);
91             }
92         }
93     }
94 
95     bool                  _flag;
96     Condition             _condition;
97     DList!Packet          _queue;
98 }