diff options
Diffstat (limited to 'ext_lib/src/arsd')
-rw-r--r-- | ext_lib/src/arsd/cgi.d | 372 |
1 files changed, 321 insertions, 51 deletions
diff --git a/ext_lib/src/arsd/cgi.d b/ext_lib/src/arsd/cgi.d index 681fae4..caee996 100644 --- a/ext_lib/src/arsd/cgi.d +++ b/ext_lib/src/arsd/cgi.d @@ -4664,6 +4664,37 @@ version(cgi_with_websocket) { // returns true if data available, false if it timed out bool recvAvailable(Duration timeout = dur!"msecs"(0)) { + if(!waitForNextMessageWouldBlock()) + return true; + if(isDataPending(timeout)) + return true; // this is kinda a lie. + + return false; + } + + public bool lowLevelReceive() { + auto bfr = cgi.idlol; + top: + auto got = bfr.front; + if(got.length) { + if(receiveBuffer.length < receiveBufferUsedLength + got.length) + receiveBuffer.length += receiveBufferUsedLength + got.length; + + receiveBuffer[receiveBufferUsedLength .. receiveBufferUsedLength + got.length] = got[]; + receiveBufferUsedLength += got.length; + bfr.consume(got.length); + + return true; + } + + bfr.popFront(0); + if(bfr.sourceClosed) + return false; + goto top; + } + + + bool isDataPending(Duration timeout = 0.seconds) { Socket socket = cgi.idlol.source; auto check = new SocketSet(); @@ -4676,47 +4707,297 @@ version(cgi_with_websocket) { } // note: this blocks - WebSocketMessage recv() { - // FIXME: should we automatically handle pings and pongs? - if(cgi.idlol.empty()) - throw new Exception("remote side disconnected"); - cgi.idlol.popFront(0); + WebSocketFrame recv() { + return waitForNextMessage(); + } - WebSocketMessage message; - message = WebSocketMessage.read(cgi.idlol); - return message; + + private void llclose() { + cgi.close(); } - void send(in char[] text) { - // I cast away const here because I know this msg is private and it doesn't write - // to that buffer unless masking is set... which it isn't, so we're ok. - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.text, cast(void[]) text); - msg.send(cgi); + private void llsend(ubyte[] data) { + cgi.write(data); + cgi.flush(); } - void send(in ubyte[] binary) { - // I cast away const here because I know this msg is private and it doesn't write - // to that buffer unless masking is set... which it isn't, so we're ok. - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.binary, cast(void[]) binary); - msg.send(cgi); + void unregisterActiveSocket(WebSocket) {} + + /* copy/paste section { */ + + private int readyState_; + private ubyte[] receiveBuffer; + private size_t receiveBufferUsedLength; + + private Config config; + + enum CONNECTING = 0; /// Socket has been created. The connection is not yet open. + enum OPEN = 1; /// The connection is open and ready to communicate. + enum CLOSING = 2; /// The connection is in the process of closing. + enum CLOSED = 3; /// The connection is closed or couldn't be opened. + + /++ + + +/ + /// Group: foundational + static struct Config { + /++ + These control the size of the receive buffer. + + It starts at the initial size, will temporarily + balloon up to the maximum size, and will reuse + a buffer up to the likely size. + + Anything larger than the maximum size will cause + the connection to be aborted and an exception thrown. + This is to protect you against a peer trying to + exhaust your memory, while keeping the user-level + processing simple. + +/ + size_t initialReceiveBufferSize = 4096; + size_t likelyReceiveBufferSize = 4096; /// ditto + size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto + + /++ + Maximum combined size of a message. + +/ + size_t maximumMessageSize = 10 * 1024 * 1024; + + string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value; + string origin; /// Origin URL to send with the handshake, if desired. + string protocol; /// the protocol header, if desired. + + int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping } - void close() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.close, null); - msg.send(cgi); + /++ + Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED]. + +/ + int readyState() { + return readyState_; } + /++ + Closes the connection, sending a graceful teardown message to the other side. + +/ + /// Group: foundational + void close(int code = 0, string reason = null) + //in (reason.length < 123) + in { assert(reason.length < 123); } do + { + if(readyState_ != OPEN) + return; // it cool, we done + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.close; + wss.data = cast(ubyte[]) reason; + wss.send(&llsend); + + readyState_ = CLOSING; + + llclose(); + } + + /++ + Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function. + +/ + /// Group: foundational void ping() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.ping, null); - msg.send(cgi); + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.ping; + wss.send(&llsend); } + // automatically handled.... void pong() { - auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.pong, null); - msg.send(cgi); + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.pong; + wss.send(&llsend); } + + /++ + Sends a text message through the websocket. + +/ + /// Group: foundational + void send(in char[] textData) { + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.text; + wss.data = cast(ubyte[]) textData; + wss.send(&llsend); + } + + /++ + Sends a binary message through the websocket. + +/ + /// Group: foundational + void send(in ubyte[] binaryData) { + WebSocketFrame wss; + wss.fin = true; + wss.opcode = WebSocketOpcode.binary; + wss.data = cast(ubyte[]) binaryData; + wss.send(&llsend); + } + + /++ + Waits for and returns the next complete message on the socket. + + Note that the onmessage function is still called, right before + this returns. + +/ + /// Group: blocking_api + public WebSocketFrame waitForNextMessage() { + do { + auto m = processOnce(); + if(m.populated) + return m; + } while(lowLevelReceive()); + + return WebSocketFrame.init; // FIXME? maybe. + } + + /++ + Tells if [waitForNextMessage] would block. + +/ + /// Group: blocking_api + public bool waitForNextMessageWouldBlock() { + checkAgain: + if(isMessageBuffered()) + return false; + if(!isDataPending()) + return true; + while(isDataPending()) + lowLevelReceive(); + goto checkAgain; + } + + /++ + Is there a message in the buffer already? + If `true`, [waitForNextMessage] is guaranteed to return immediately. + If `false`, check [isDataPending] as the next step. + +/ + /// Group: blocking_api + public bool isMessageBuffered() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + if(d.length) { + auto orig = d; + auto m = WebSocketFrame.read(d); + // that's how it indicates that it needs more data + if(d !is orig) + return true; + } + + return false; + } + + private ubyte continuingType; + private ubyte[] continuingData; + //private size_t continuingDataLength; + + private WebSocketFrame processOnce() { + ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength]; + auto s = d; + // FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer. + WebSocketFrame m; + if(d.length) { + auto orig = d; + m = WebSocketFrame.read(d); + // that's how it indicates that it needs more data + if(d is orig) + return WebSocketFrame.init; + switch(m.opcode) { + case WebSocketOpcode.continuation: + if(continuingData.length + m.data.length > config.maximumMessageSize) + throw new Exception("message size exceeded"); + + continuingData ~= m.data; + if(m.fin) { + if(ontextmessage) + ontextmessage(cast(char[]) continuingData); + if(onbinarymessage) + onbinarymessage(continuingData); + + continuingData = null; + } + break; + case WebSocketOpcode.text: + if(m.fin) { + if(ontextmessage) + ontextmessage(m.textData); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.binary: + if(m.fin) { + if(onbinarymessage) + onbinarymessage(m.data); + } else { + continuingType = m.opcode; + //continuingDataLength = 0; + continuingData = null; + continuingData ~= m.data; + } + break; + case WebSocketOpcode.close: + readyState_ = CLOSED; + if(onclose) + onclose(); + + unregisterActiveSocket(this); + break; + case WebSocketOpcode.ping: + pong(); + break; + case WebSocketOpcode.pong: + // just really references it is still alive, nbd. + break; + default: // ignore though i could and perhaps should throw too + } + } + receiveBufferUsedLength -= s.length - d.length; + + return m; + } + + private void autoprocess() { + // FIXME + do { + processOnce(); + } while(lowLevelReceive()); + } + + + void delegate() onclose; /// + void delegate() onerror; /// + void delegate(in char[]) ontextmessage; /// + void delegate(in ubyte[]) onbinarymessage; /// + void delegate() onopen; /// + + /++ + + +/ + /// Group: browser_api + void onmessage(void delegate(in char[]) dg) { + ontextmessage = dg; + } + + /// ditto + void onmessage(void delegate(in ubyte[]) dg) { + onbinarymessage = dg; + } + + /* } end copy/paste */ + + } bool websocketRequested(Cgi cgi) { @@ -4755,10 +5036,11 @@ version(cgi_with_websocket) { return new WebSocket(cgi); } - // FIXME: implement websocket extension frames - // get websocket to work on other modes, not just embedded_httpd + // FIXME get websocket to work on other modes, not just embedded_httpd + /* copy/paste in http2.d { */ enum WebSocketOpcode : ubyte { + continuation = 0, text = 1, binary = 2, // 3, 4, 5, 6, 7 RESERVED @@ -4768,7 +5050,7 @@ version(cgi_with_websocket) { // 11,12,13,14,15 RESERVED } - struct WebSocketMessage { + public struct WebSocketFrame { private bool populated; bool fin; bool rsv1; @@ -4781,8 +5063,8 @@ version(cgi_with_websocket) { ubyte[4] maskingKey; // don't set this when sending ubyte[] data; - static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) { - WebSocketMessage msg; + static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) { + WebSocketFrame msg; msg.fin = true; msg.opcode = opcode; msg.data = cast(ubyte[]) data; @@ -4790,7 +5072,7 @@ version(cgi_with_websocket) { return msg; } - private void send(Cgi cgi) { + private void send(scope void delegate(ubyte[]) llsend) { ubyte[64] headerScratch; int headerScratchPos = 0; @@ -4846,7 +5128,7 @@ version(cgi_with_websocket) { headerScratch[1] = b2; } - assert(!masked, "masking key not properly implemented"); + //assert(!masked, "masking key not properly implemented"); if(masked) { // FIXME: randomize this headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[]; @@ -4864,19 +5146,18 @@ version(cgi_with_websocket) { } //writeln("SENDING ", headerScratch[0 .. headerScratchPos], data); - cgi.write(headerScratch[0 .. headerScratchPos]); - cgi.write(data); - cgi.flush(); + llsend(headerScratch[0 .. headerScratchPos]); + llsend(data); } - static WebSocketMessage read(ref ubyte[] d) { - WebSocketMessage msg; + static WebSocketFrame read(ref ubyte[] d) { + WebSocketFrame msg; auto orig = d; - WebSocketMessage needsMoreData() { + WebSocketFrame needsMoreData() { d = orig; - return WebSocketMessage.init; + return WebSocketFrame.init; } if(d.length < 2) @@ -4957,22 +5238,11 @@ version(cgi_with_websocket) { return msg; } - static WebSocketMessage read(BufferedInputRange ir) { - readmore: - auto d = ir.front(); - auto m = read(d); - if(m is WebSocketMessage.init) { - ir.popFront(); - goto readmore; - } - return m; - } - char[] textData() { return cast(char[]) data; } } - + /* } */ } |