Started adding Partial stream handling
- Partial stream implements ostream - Added state management for starting or stopping partial streams - Stream can be paused to write full messages without ending "stream session" - Stream supports buffering to prevent sending small packets Abstracted associatedData away to what is essentially a map
This commit is contained in:
parent
f3c5bdebbb
commit
43cea7b40b
169
CPPTools/Net.cpp
169
CPPTools/Net.cpp
@ -299,6 +299,59 @@ namespace IO {
|
|||||||
|
|
||||||
void NetClient::setOnDestroy(std::function<void()> call) { onDestroy = call; }
|
void NetClient::setOnDestroy(std::function<void()> call) { onDestroy = call; }
|
||||||
|
|
||||||
|
|
||||||
|
std::pair<ulong_64b, char*> NetClient::getValue(const char* name, bool copy) {
|
||||||
|
for(std::pair<char*, std::pair<ulong_64b, char*>*>* p : associatedData)
|
||||||
|
if (!strcmp(p->first, name)) {
|
||||||
|
char* c = copy ? new char[p->second->first] : p->second->second;
|
||||||
|
if (copy) memcpy(c, p->second->second, p->second->first);
|
||||||
|
return std::pair<ulong_64b, char*>(p->second->first, c);
|
||||||
|
}
|
||||||
|
return std::pair<ulong_64b, char*>(0, nullptr);
|
||||||
|
}
|
||||||
|
char* NetClient::getStrValue(const char* name, bool copy) {
|
||||||
|
return getValue(name, copy).second;
|
||||||
|
}
|
||||||
|
void NetClient::setValue(const char* name, std::pair<ulong_64b, char*> value, bool copy, bool del) {
|
||||||
|
for (std::pair<char*, std::pair<ulong_64b, char*>*>* p : associatedData)
|
||||||
|
if (!strcmp(p->first, name)) {
|
||||||
|
p->second->first = value.first;
|
||||||
|
if (del) delete[] p->second->second;
|
||||||
|
char* c = copy ? new char[value.first] : value.second;
|
||||||
|
if (copy) memcpy(c, value.second, value.first);
|
||||||
|
p->second->second = c;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
std::pair<char*, std::pair<ulong_64b, char*>*>* p = new std::pair<char*, std::pair<ulong_64b, char*>*>();
|
||||||
|
p->first = (char*)name;
|
||||||
|
p->second = new std::pair<ulong_64b, char*>();
|
||||||
|
p->second->first = value.first;
|
||||||
|
if (del) delete[] p->second->second;
|
||||||
|
char* c = copy ? new char[value.first] : value.second;
|
||||||
|
if (copy) memcpy(c, value.second, value.first);
|
||||||
|
p->second->second = c;
|
||||||
|
|
||||||
|
associatedData.push_back(p);
|
||||||
|
}
|
||||||
|
void NetClient::setValue(const char* name, char* value, bool copy, bool del) {
|
||||||
|
setValue(name, std::pair<ulong_64b, char*>(strlen(value), value), copy, del);
|
||||||
|
}
|
||||||
|
bool NetClient::removeValue(const char* name, bool del) {
|
||||||
|
for (size_t t = associatedData.size(); t>0; --t)
|
||||||
|
if (!strcmp(associatedData.at(t-1)->first, name)) {
|
||||||
|
if (del) delete[] associatedData.at(t-1)->second->second;
|
||||||
|
associatedData.erase(associatedData.begin()+t-1, associatedData.begin()+t);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
bool NetClient::containsKey(const char* name) {
|
||||||
|
for (size_t t = associatedData.size(); t>0; --t)
|
||||||
|
if (!strcmp(associatedData.at(t - 1)->first, name))
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ulong_64b NetClient::available() { return packets->size(); }
|
ulong_64b NetClient::available() { return packets->size(); }
|
||||||
|
|
||||||
|
|
||||||
@ -429,4 +482,120 @@ namespace IO {
|
|||||||
bool NetServer::isOpen() { return _open; }
|
bool NetServer::isOpen() { return _open; }
|
||||||
|
|
||||||
void NetServer::setOnDestroy(std::function<void()> call) { onDestroy = call; }
|
void NetServer::setOnDestroy(std::function<void()> call) { onDestroy = call; }
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void writeState(NetClient& cli, const char* stateName, char state) {
|
||||||
|
char* c = cli.getStrValue(stateName, false);
|
||||||
|
if (c == nullptr) c = new char[0];
|
||||||
|
c[0] = state;
|
||||||
|
cli.setValue(stateName, c, false, false); // Write/overwrite
|
||||||
|
}
|
||||||
|
|
||||||
|
char readState(NetClient& cli, const char* stateName) {
|
||||||
|
char* c = cli.getStrValue(stateName, false);
|
||||||
|
if (c == nullptr) return 0;
|
||||||
|
else return *c;
|
||||||
|
}
|
||||||
|
|
||||||
|
PartialNetworkStream::PartialNetworkStream(NetClient& client, bool noBuffer) : std::ostream(std::_Uninitialized::_Noinit), client(client), buffer(noBuffer?nullptr:new std::vector<char>()){ /* NOP */}
|
||||||
|
PartialNetworkStream::~PartialNetworkStream() {
|
||||||
|
if (client.isOpen()) {
|
||||||
|
client.write((char*)STREAM_DELIMIT, 8);
|
||||||
|
}
|
||||||
|
client.removeValue(STREAM_ATTRIB); // Cleanup
|
||||||
|
}
|
||||||
|
void PartialNetworkStream::write(char* message, std::streamsize size, bool autoFlush) {
|
||||||
|
bool isPartial = stateIs(client, PartialCommState::COMM_PARTIAL);
|
||||||
|
if (!isPartial || autoFlush || (size > STREAM_BUFMIN)) {
|
||||||
|
if(isPartial) flush();
|
||||||
|
client.write(message, size);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
for (size_t t = 0; t < size; ++t) buffer->push_back(message[t]);
|
||||||
|
if (buffer->size() > STREAM_BUFMIN) flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
void PartialNetworkStream::writeNonPartial(char* message, std::streamsize size) {
|
||||||
|
bool b = stateIs(client, PartialCommState::COMM_PARTIAL);
|
||||||
|
if (b) sendState(PartialCommState::COMM_PAUSE);
|
||||||
|
client.write(message, size);
|
||||||
|
if (b) sendState(PartialCommState::COMM_PARTIAL);
|
||||||
|
}
|
||||||
|
void PartialNetworkStream::flush() {
|
||||||
|
check(PartialCommState::COMM_FULL);
|
||||||
|
if (buffer->size() == 0) return;
|
||||||
|
bool b = stateIs(client, PartialCommState::COMM_PARTIAL);
|
||||||
|
if (b) sendState(PartialCommState::COMM_PARTIAL); // Temporarily set the remote read state to PARTIAL
|
||||||
|
client.write(&buffer->at(0), buffer->size());
|
||||||
|
if (b) sendState(PartialCommState::COMM_PAUSE); // Set the remote read state back to PAUSE
|
||||||
|
buffer->clear();
|
||||||
|
}
|
||||||
|
void PartialNetworkStream::check(PartialCommState state) { if(readState(client, STREAM_ATTRIB)==state) throw new std::exception("Stream is not open!"); }
|
||||||
|
void PartialNetworkStream::sendState(PartialCommState state) {
|
||||||
|
switch (getCommState()) {
|
||||||
|
case PartialCommState::COMM_PAUSE:
|
||||||
|
if (state == PartialCommState::COMM_FULL) {
|
||||||
|
client.write((char*)&STREAM_PAUSE, sizeof(STREAM_PAUSE));
|
||||||
|
client.write((char*)&STREAM_DELIMIT, sizeof(STREAM_DELIMIT));
|
||||||
|
}
|
||||||
|
else if (state == PartialCommState::COMM_PARTIAL) client.write((char*)&STREAM_PAUSE, sizeof(STREAM_PAUSE));
|
||||||
|
break;
|
||||||
|
case PartialCommState::COMM_PARTIAL:
|
||||||
|
if (state == PartialCommState::COMM_FULL) client.write((char*)&STREAM_DELIMIT, sizeof(STREAM_DELIMIT));
|
||||||
|
else if (state == PartialCommState::COMM_PAUSE) client.write((char*)&STREAM_PAUSE, sizeof(STREAM_PAUSE));
|
||||||
|
break;
|
||||||
|
case PartialCommState::COMM_FULL:
|
||||||
|
if (state == PartialCommState::COMM_PARTIAL) client.write((char*)&STREAM_DELIMIT, sizeof(STREAM_DELIMIT));
|
||||||
|
else if (state == PartialCommState::COMM_PAUSE) {
|
||||||
|
client.write((char*)&STREAM_DELIMIT, sizeof(STREAM_PAUSE));
|
||||||
|
client.write((char*)&STREAM_PAUSE, sizeof(STREAM_PAUSE));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PartialNetworkStream::endPartial() {
|
||||||
|
sendState(PartialCommState::COMM_FULL);
|
||||||
|
}
|
||||||
|
void PartialNetworkStream::startPartial() {
|
||||||
|
sendState(PartialCommState::COMM_PARTIAL);
|
||||||
|
}
|
||||||
|
PartialCommState PartialNetworkStream::getCommState() {
|
||||||
|
return static_cast<PartialCommState>(readState(client, STREAM_ATTRIB));
|
||||||
|
}
|
||||||
|
bool PartialNetworkStream::stateIs(NetClient& cli, PartialCommState state) { return readState(cli, STREAM_ATTRIB) == state; }
|
||||||
|
PartialDataState PartialNetworkStream::accept(NetClient& cli, Packet& pkt) {
|
||||||
|
bool toggle_partial = pkt.size == sizeof(ulong_64b) && ((*(ulong_64b*)pkt.message) == STREAM_DELIMIT);
|
||||||
|
bool toggle_pause = !toggle_partial && (pkt.size == sizeof(ulong_64b) && ((*(ulong_64b*)pkt.message) == STREAM_PAUSE));
|
||||||
|
if (!toggle_partial && !toggle_pause) return PartialDataState::DATA;
|
||||||
|
else if (toggle_partial) {
|
||||||
|
if (stateIs(cli, PartialCommState::COMM_FULL)) {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_PARTIAL);
|
||||||
|
return PartialDataState::START;
|
||||||
|
}
|
||||||
|
else if (stateIs(cli, PartialCommState::COMM_PAUSE)) {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_PARTIAL);
|
||||||
|
return PartialDataState::RESUME;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_FULL);
|
||||||
|
return PartialDataState::END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else /* if(toggle_pause) */{
|
||||||
|
if (stateIs(cli, PartialCommState::COMM_FULL)) {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_PAUSE);
|
||||||
|
return PartialDataState::PAUSE;
|
||||||
|
}
|
||||||
|
else if (stateIs(cli, PartialCommState::COMM_PAUSE)) {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_PARTIAL);
|
||||||
|
return PartialDataState::RESUME;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
writeState(cli, STREAM_ATTRIB, PartialCommState::COMM_PAUSE);
|
||||||
|
return PartialDataState::PAUSE;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
@ -16,6 +16,8 @@
|
|||||||
|
|
||||||
// Ping flag tells the recieving host to drop the current ulong_64b, as it is sent to check if the connection is still alive
|
// Ping flag tells the recieving host to drop the current ulong_64b, as it is sent to check if the connection is still alive
|
||||||
#define FLAG_PING (ulong_64b)-1
|
#define FLAG_PING (ulong_64b)-1
|
||||||
|
#define FLAG_PART (ulong_64b)-2
|
||||||
|
#define FLAG_NPRT (ulong_64b)-3
|
||||||
|
|
||||||
#include "Crypto.h"
|
#include "Crypto.h"
|
||||||
#include "ArchAbstract.h"
|
#include "ArchAbstract.h"
|
||||||
@ -73,6 +75,7 @@ namespace IO {
|
|||||||
void update(); // Read incoming data and store in buffers
|
void update(); // Read incoming data and store in buffers
|
||||||
bool ping(); // Check if connection is alive by pinging remote host
|
bool ping(); // Check if connection is alive by pinging remote host
|
||||||
protected:
|
protected:
|
||||||
|
std::vector<std::pair<char*, std::pair<ulong_64b, char*>*>*> associatedData;
|
||||||
std::thread listener; // Incoming data listener (optional)
|
std::thread listener; // Incoming data listener (optional)
|
||||||
SOCKET _socket; // Underlying socket used for communication
|
SOCKET _socket; // Underlying socket used for communication
|
||||||
std::vector<Packet>* packets; // Basically a set containing a backlog of unprocessed data. Will oly be used if event handler doesn't exist
|
std::vector<Packet>* packets; // Basically a set containing a backlog of unprocessed data. Will oly be used if event handler doesn't exist
|
||||||
@ -80,7 +83,6 @@ namespace IO {
|
|||||||
std::function<void()> onDestroy; // Event handler called when NetClient object is destroyed
|
std::function<void()> onDestroy; // Event handler called when NetClient object is destroyed
|
||||||
public:
|
public:
|
||||||
time_t commTime; // Latest time a transaction occurred
|
time_t commTime; // Latest time a transaction occurred
|
||||||
std::vector<char*> associatedData;
|
|
||||||
NetClient(char* ipAddr, char* port, CryptoLevel = CryptoLevel::None);// Standard constructor for creating connection
|
NetClient(char* ipAddr, char* port, CryptoLevel = CryptoLevel::None);// Standard constructor for creating connection
|
||||||
~NetClient();
|
~NetClient();
|
||||||
bool close();
|
bool close();
|
||||||
@ -92,6 +94,12 @@ namespace IO {
|
|||||||
Packet read();
|
Packet read();
|
||||||
void setEventHandler(std::function<void(NetClient*, Packet)>); // Register a callback that is guaranteed to be called when the socket has at least one unprocessed packet
|
void setEventHandler(std::function<void(NetClient*, Packet)>); // Register a callback that is guaranteed to be called when the socket has at least one unprocessed packet
|
||||||
void setOnDestroy(std::function<void()>);
|
void setOnDestroy(std::function<void()>);
|
||||||
|
std::pair<ulong_64b, char*> getValue(const char* name, bool copy = true);
|
||||||
|
char* getStrValue(const char* name, bool copy = true);
|
||||||
|
void setValue(const char* name, std::pair<ulong_64b, char*> value, bool copy = true, bool del = true);
|
||||||
|
void setValue(const char* name, char* data, bool copy = true, bool del = true);
|
||||||
|
bool removeValue(const char* name, bool del = true);
|
||||||
|
bool containsKey(const char* name);
|
||||||
bool isOpen();
|
bool isOpen();
|
||||||
ulong_64b available();
|
ulong_64b available();
|
||||||
};
|
};
|
||||||
@ -121,9 +129,55 @@ namespace IO {
|
|||||||
bool close();
|
bool close();
|
||||||
};
|
};
|
||||||
|
|
||||||
class PartialNetworkStream{
|
|
||||||
|
|
||||||
|
// Partial data stream management
|
||||||
|
static const auto STREAM_DELIMIT = FLAG_PART;
|
||||||
|
static const auto STREAM_PAUSE = FLAG_NPRT;
|
||||||
|
static const auto STREAM_ATTRIB = (const char*) "$PartialNetworkStream$ACTIVE";
|
||||||
|
static const auto STREAM_BUFMIN = 32;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
START represents the beginning of a partial message
|
||||||
|
PAUSE represents a pause in the partial stream in which a full (unrelated) message is being sent
|
||||||
|
RESUME tells dev that the partial stream is being resumed (from a full-write state)
|
||||||
|
END represebts the end of a partial message
|
||||||
|
DATA represents the the supplied data isn't metadata
|
||||||
|
*/
|
||||||
|
enum PartialDataState { START, PAUSE, RESUME, END, DATA };
|
||||||
|
|
||||||
|
/*
|
||||||
|
PARTIAL tells you that the stream is currently accepting partial data packets
|
||||||
|
PAUSE means that the client is set to accept a partial stream, but has been specifically paused to accept a full message
|
||||||
|
FULL means that the client is interpreting messages as full message blocks
|
||||||
|
*/
|
||||||
|
enum PartialCommState { COMM_FULL, COMM_PARTIAL, COMM_PAUSE };
|
||||||
|
|
||||||
|
|
||||||
|
class PartialNetworkStream : public std::ostream{
|
||||||
|
protected:
|
||||||
|
bool open;
|
||||||
|
std::vector<char>* buffer;
|
||||||
|
NetClient& client;
|
||||||
|
|
||||||
|
void check(PartialCommState state);
|
||||||
|
void sendState(PartialCommState state);
|
||||||
|
|
||||||
|
static bool stateIs(NetClient& cli, PartialCommState state);
|
||||||
public:
|
public:
|
||||||
PartialNetworkStream(NetClient& client);
|
PartialNetworkStream(NetClient&, bool = false);
|
||||||
|
~PartialNetworkStream();
|
||||||
|
|
||||||
|
void endPartial();
|
||||||
|
void startPartial();
|
||||||
|
PartialCommState getCommState();
|
||||||
|
void write(char*, std::streamsize, bool = false);
|
||||||
|
void writeNonPartial(char*, std::streamsize);
|
||||||
|
void flush();
|
||||||
|
|
||||||
|
static PartialDataState accept(NetClient& cli, Packet& pkt);
|
||||||
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user