1 module hunt.imf.core.task; 2 3 4 5 import hunt.imf.protocol.packet; 6 import hunt.imf.core.routing; 7 import hunt.imf.io.context; 8 9 import std.stdio; 10 import std.conv; 11 import std.stdint; 12 import std.container : DList; 13 14 import core.thread; 15 import core.sync.condition; 16 import core.sync.mutex; 17 18 import hunt.net; 19 20 class Task : Thread 21 { 22 this() 23 { 24 _flag = true; 25 _condition = new Condition(new Mutex()); 26 super(&run); 27 } 28 29 void push( Packet packet) 30 { 31 synchronized(this){ 32 _queue.insertBack(packet); 33 } 34 _condition.notify(); 35 } 36 37 void stop() 38 { 39 _flag = false; 40 _condition.notify(); 41 } 42 43 private: 44 Packet pop() 45 { 46 synchronized(this){ 47 if(_queue.empty()) 48 return null; 49 auto packet = _queue.front(); 50 _queue.removeFront(); 51 return packet; 52 } 53 } 54 55 void execute(Packet packet) 56 { 57 auto context = cast(Context)packet.getAttachment(); 58 auto data = Router.findRouter(context.ns , packet.message_id); 59 if( data is null) 60 { 61 writeln("can't found router " ~ to!string(packet.message_id)); 62 return ; 63 } 64 65 auto obj = Object.factory(data.className); 66 if( obj is null) 67 { 68 writeln("can't create " , data.className); 69 return; 70 } 71 72 VoidProcessDele dele; 73 dele.ptr = cast(void*)obj; 74 dele.funcptr = data.func; 75 76 setContext(context); 77 dele(packet.message_data); 78 } 79 80 void run() 81 { 82 while(_flag) 83 { 84 _condition.mutex().lock(); 85 _condition.wait(); 86 _condition.mutex().unlock(); 87 Packet packet = null; 88 while((packet = pop()) !is null) 89 { 90 execute(packet); 91 } 92 } 93 } 94 95 bool _flag; 96 Condition _condition; 97 DList!Packet _queue; 98 }