aboutsummaryrefslogtreecommitdiffhomepage
path: root/ext_lib
diff options
context:
space:
mode:
Diffstat (limited to 'ext_lib')
-rw-r--r--ext_lib/src/arsd/cgi.d372
1 files changed, 321 insertions, 51 deletions
diff --git a/ext_lib/src/arsd/cgi.d b/ext_lib/src/arsd/cgi.d
index 681fae4..caee996 100644
--- a/ext_lib/src/arsd/cgi.d
+++ b/ext_lib/src/arsd/cgi.d
@@ -4664,6 +4664,37 @@ version(cgi_with_websocket) {
// returns true if data available, false if it timed out
bool recvAvailable(Duration timeout = dur!"msecs"(0)) {
+ if(!waitForNextMessageWouldBlock())
+ return true;
+ if(isDataPending(timeout))
+ return true; // this is kinda a lie.
+
+ return false;
+ }
+
+ public bool lowLevelReceive() {
+ auto bfr = cgi.idlol;
+ top:
+ auto got = bfr.front;
+ if(got.length) {
+ if(receiveBuffer.length < receiveBufferUsedLength + got.length)
+ receiveBuffer.length += receiveBufferUsedLength + got.length;
+
+ receiveBuffer[receiveBufferUsedLength .. receiveBufferUsedLength + got.length] = got[];
+ receiveBufferUsedLength += got.length;
+ bfr.consume(got.length);
+
+ return true;
+ }
+
+ bfr.popFront(0);
+ if(bfr.sourceClosed)
+ return false;
+ goto top;
+ }
+
+
+ bool isDataPending(Duration timeout = 0.seconds) {
Socket socket = cgi.idlol.source;
auto check = new SocketSet();
@@ -4676,47 +4707,297 @@ version(cgi_with_websocket) {
}
// note: this blocks
- WebSocketMessage recv() {
- // FIXME: should we automatically handle pings and pongs?
- if(cgi.idlol.empty())
- throw new Exception("remote side disconnected");
- cgi.idlol.popFront(0);
+ WebSocketFrame recv() {
+ return waitForNextMessage();
+ }
- WebSocketMessage message;
- message = WebSocketMessage.read(cgi.idlol);
- return message;
+
+ private void llclose() {
+ cgi.close();
}
- void send(in char[] text) {
- // I cast away const here because I know this msg is private and it doesn't write
- // to that buffer unless masking is set... which it isn't, so we're ok.
- auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.text, cast(void[]) text);
- msg.send(cgi);
+ private void llsend(ubyte[] data) {
+ cgi.write(data);
+ cgi.flush();
}
- void send(in ubyte[] binary) {
- // I cast away const here because I know this msg is private and it doesn't write
- // to that buffer unless masking is set... which it isn't, so we're ok.
- auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.binary, cast(void[]) binary);
- msg.send(cgi);
+ void unregisterActiveSocket(WebSocket) {}
+
+ /* copy/paste section { */
+
+ private int readyState_;
+ private ubyte[] receiveBuffer;
+ private size_t receiveBufferUsedLength;
+
+ private Config config;
+
+ enum CONNECTING = 0; /// Socket has been created. The connection is not yet open.
+ enum OPEN = 1; /// The connection is open and ready to communicate.
+ enum CLOSING = 2; /// The connection is in the process of closing.
+ enum CLOSED = 3; /// The connection is closed or couldn't be opened.
+
+ /++
+
+ +/
+ /// Group: foundational
+ static struct Config {
+ /++
+ These control the size of the receive buffer.
+
+ It starts at the initial size, will temporarily
+ balloon up to the maximum size, and will reuse
+ a buffer up to the likely size.
+
+ Anything larger than the maximum size will cause
+ the connection to be aborted and an exception thrown.
+ This is to protect you against a peer trying to
+ exhaust your memory, while keeping the user-level
+ processing simple.
+ +/
+ size_t initialReceiveBufferSize = 4096;
+ size_t likelyReceiveBufferSize = 4096; /// ditto
+ size_t maximumReceiveBufferSize = 10 * 1024 * 1024; /// ditto
+
+ /++
+ Maximum combined size of a message.
+ +/
+ size_t maximumMessageSize = 10 * 1024 * 1024;
+
+ string[string] cookies; /// Cookies to send with the initial request. cookies[name] = value;
+ string origin; /// Origin URL to send with the handshake, if desired.
+ string protocol; /// the protocol header, if desired.
+
+ int pingFrequency = 5000; /// Amount of time (in msecs) of idleness after which to send an automatic ping
}
- void close() {
- auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.close, null);
- msg.send(cgi);
+ /++
+ Returns one of [CONNECTING], [OPEN], [CLOSING], or [CLOSED].
+ +/
+ int readyState() {
+ return readyState_;
}
+ /++
+ Closes the connection, sending a graceful teardown message to the other side.
+ +/
+ /// Group: foundational
+ void close(int code = 0, string reason = null)
+ //in (reason.length < 123)
+ in { assert(reason.length < 123); } do
+ {
+ if(readyState_ != OPEN)
+ return; // it cool, we done
+ WebSocketFrame wss;
+ wss.fin = true;
+ wss.opcode = WebSocketOpcode.close;
+ wss.data = cast(ubyte[]) reason;
+ wss.send(&llsend);
+
+ readyState_ = CLOSING;
+
+ llclose();
+ }
+
+ /++
+ Sends a ping message to the server. This is done automatically by the library if you set a non-zero [Config.pingFrequency], but you can also send extra pings explicitly as well with this function.
+ +/
+ /// Group: foundational
void ping() {
- auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.ping, null);
- msg.send(cgi);
+ WebSocketFrame wss;
+ wss.fin = true;
+ wss.opcode = WebSocketOpcode.ping;
+ wss.send(&llsend);
}
+ // automatically handled....
void pong() {
- auto msg = WebSocketMessage.simpleMessage(WebSocketOpcode.pong, null);
- msg.send(cgi);
+ WebSocketFrame wss;
+ wss.fin = true;
+ wss.opcode = WebSocketOpcode.pong;
+ wss.send(&llsend);
}
+
+ /++
+ Sends a text message through the websocket.
+ +/
+ /// Group: foundational
+ void send(in char[] textData) {
+ WebSocketFrame wss;
+ wss.fin = true;
+ wss.opcode = WebSocketOpcode.text;
+ wss.data = cast(ubyte[]) textData;
+ wss.send(&llsend);
+ }
+
+ /++
+ Sends a binary message through the websocket.
+ +/
+ /// Group: foundational
+ void send(in ubyte[] binaryData) {
+ WebSocketFrame wss;
+ wss.fin = true;
+ wss.opcode = WebSocketOpcode.binary;
+ wss.data = cast(ubyte[]) binaryData;
+ wss.send(&llsend);
+ }
+
+ /++
+ Waits for and returns the next complete message on the socket.
+
+ Note that the onmessage function is still called, right before
+ this returns.
+ +/
+ /// Group: blocking_api
+ public WebSocketFrame waitForNextMessage() {
+ do {
+ auto m = processOnce();
+ if(m.populated)
+ return m;
+ } while(lowLevelReceive());
+
+ return WebSocketFrame.init; // FIXME? maybe.
+ }
+
+ /++
+ Tells if [waitForNextMessage] would block.
+ +/
+ /// Group: blocking_api
+ public bool waitForNextMessageWouldBlock() {
+ checkAgain:
+ if(isMessageBuffered())
+ return false;
+ if(!isDataPending())
+ return true;
+ while(isDataPending())
+ lowLevelReceive();
+ goto checkAgain;
+ }
+
+ /++
+ Is there a message in the buffer already?
+ If `true`, [waitForNextMessage] is guaranteed to return immediately.
+ If `false`, check [isDataPending] as the next step.
+ +/
+ /// Group: blocking_api
+ public bool isMessageBuffered() {
+ ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
+ auto s = d;
+ if(d.length) {
+ auto orig = d;
+ auto m = WebSocketFrame.read(d);
+ // that's how it indicates that it needs more data
+ if(d !is orig)
+ return true;
+ }
+
+ return false;
+ }
+
+ private ubyte continuingType;
+ private ubyte[] continuingData;
+ //private size_t continuingDataLength;
+
+ private WebSocketFrame processOnce() {
+ ubyte[] d = receiveBuffer[0 .. receiveBufferUsedLength];
+ auto s = d;
+ // FIXME: handle continuation frames more efficiently. it should really just reuse the receive buffer.
+ WebSocketFrame m;
+ if(d.length) {
+ auto orig = d;
+ m = WebSocketFrame.read(d);
+ // that's how it indicates that it needs more data
+ if(d is orig)
+ return WebSocketFrame.init;
+ switch(m.opcode) {
+ case WebSocketOpcode.continuation:
+ if(continuingData.length + m.data.length > config.maximumMessageSize)
+ throw new Exception("message size exceeded");
+
+ continuingData ~= m.data;
+ if(m.fin) {
+ if(ontextmessage)
+ ontextmessage(cast(char[]) continuingData);
+ if(onbinarymessage)
+ onbinarymessage(continuingData);
+
+ continuingData = null;
+ }
+ break;
+ case WebSocketOpcode.text:
+ if(m.fin) {
+ if(ontextmessage)
+ ontextmessage(m.textData);
+ } else {
+ continuingType = m.opcode;
+ //continuingDataLength = 0;
+ continuingData = null;
+ continuingData ~= m.data;
+ }
+ break;
+ case WebSocketOpcode.binary:
+ if(m.fin) {
+ if(onbinarymessage)
+ onbinarymessage(m.data);
+ } else {
+ continuingType = m.opcode;
+ //continuingDataLength = 0;
+ continuingData = null;
+ continuingData ~= m.data;
+ }
+ break;
+ case WebSocketOpcode.close:
+ readyState_ = CLOSED;
+ if(onclose)
+ onclose();
+
+ unregisterActiveSocket(this);
+ break;
+ case WebSocketOpcode.ping:
+ pong();
+ break;
+ case WebSocketOpcode.pong:
+ // just really references it is still alive, nbd.
+ break;
+ default: // ignore though i could and perhaps should throw too
+ }
+ }
+ receiveBufferUsedLength -= s.length - d.length;
+
+ return m;
+ }
+
+ private void autoprocess() {
+ // FIXME
+ do {
+ processOnce();
+ } while(lowLevelReceive());
+ }
+
+
+ void delegate() onclose; ///
+ void delegate() onerror; ///
+ void delegate(in char[]) ontextmessage; ///
+ void delegate(in ubyte[]) onbinarymessage; ///
+ void delegate() onopen; ///
+
+ /++
+
+ +/
+ /// Group: browser_api
+ void onmessage(void delegate(in char[]) dg) {
+ ontextmessage = dg;
+ }
+
+ /// ditto
+ void onmessage(void delegate(in ubyte[]) dg) {
+ onbinarymessage = dg;
+ }
+
+ /* } end copy/paste */
+
+
}
bool websocketRequested(Cgi cgi) {
@@ -4755,10 +5036,11 @@ version(cgi_with_websocket) {
return new WebSocket(cgi);
}
- // FIXME: implement websocket extension frames
- // get websocket to work on other modes, not just embedded_httpd
+ // FIXME get websocket to work on other modes, not just embedded_httpd
+ /* copy/paste in http2.d { */
enum WebSocketOpcode : ubyte {
+ continuation = 0,
text = 1,
binary = 2,
// 3, 4, 5, 6, 7 RESERVED
@@ -4768,7 +5050,7 @@ version(cgi_with_websocket) {
// 11,12,13,14,15 RESERVED
}
- struct WebSocketMessage {
+ public struct WebSocketFrame {
private bool populated;
bool fin;
bool rsv1;
@@ -4781,8 +5063,8 @@ version(cgi_with_websocket) {
ubyte[4] maskingKey; // don't set this when sending
ubyte[] data;
- static WebSocketMessage simpleMessage(WebSocketOpcode opcode, void[] data) {
- WebSocketMessage msg;
+ static WebSocketFrame simpleMessage(WebSocketOpcode opcode, void[] data) {
+ WebSocketFrame msg;
msg.fin = true;
msg.opcode = opcode;
msg.data = cast(ubyte[]) data;
@@ -4790,7 +5072,7 @@ version(cgi_with_websocket) {
return msg;
}
- private void send(Cgi cgi) {
+ private void send(scope void delegate(ubyte[]) llsend) {
ubyte[64] headerScratch;
int headerScratchPos = 0;
@@ -4846,7 +5128,7 @@ version(cgi_with_websocket) {
headerScratch[1] = b2;
}
- assert(!masked, "masking key not properly implemented");
+ //assert(!masked, "masking key not properly implemented");
if(masked) {
// FIXME: randomize this
headerScratch[headerScratchPos .. headerScratchPos + 4] = maskingKey[];
@@ -4864,19 +5146,18 @@ version(cgi_with_websocket) {
}
//writeln("SENDING ", headerScratch[0 .. headerScratchPos], data);
- cgi.write(headerScratch[0 .. headerScratchPos]);
- cgi.write(data);
- cgi.flush();
+ llsend(headerScratch[0 .. headerScratchPos]);
+ llsend(data);
}
- static WebSocketMessage read(ref ubyte[] d) {
- WebSocketMessage msg;
+ static WebSocketFrame read(ref ubyte[] d) {
+ WebSocketFrame msg;
auto orig = d;
- WebSocketMessage needsMoreData() {
+ WebSocketFrame needsMoreData() {
d = orig;
- return WebSocketMessage.init;
+ return WebSocketFrame.init;
}
if(d.length < 2)
@@ -4957,22 +5238,11 @@ version(cgi_with_websocket) {
return msg;
}
- static WebSocketMessage read(BufferedInputRange ir) {
- readmore:
- auto d = ir.front();
- auto m = read(d);
- if(m is WebSocketMessage.init) {
- ir.popFront();
- goto readmore;
- }
- return m;
- }
-
char[] textData() {
return cast(char[]) data;
}
}
-
+ /* } */
}