【计算机网络】Stanford CS144 Lab1 : stitching substrings into a byte stream 学习记录
Putting substrings in sequence
实现一个流重组器。可以将带有索引的流碎片按照顺序重组。这些流碎片是可以重复的部分,但是不会有冲突的部分。这些流碎片将通过 Lab0 中 实现的 ByteStream
来将其有序的传输出去。
需要注意的是:
-
eof 不一定是最后一个片段被传入的
-
如果该字节流前面的字节没有被写入
ByteStream
, 该字节流不能被写入ByteStream
, 一旦可以写入,就要立即写入。 -
容量限制,不限制的话,流重组器会越来越大。
-
会存在为空的带
EOF
的字符串
最初,使用 std::map<size_t, std::string>
来实现, key
中存放的是 index + data.size()
, 这样可以非常方便的使用 lower_bound
来确认一个新的片段要与 map
中的哪些段合并。写完测试也通过了(当时运气好),但是第二遍测试就挂掉了,由于代码写的太乱,就用其他方法重写了。
把操作分为三部
-
剪切掉已经写入
_output
的部分。 -
与
map
中的其他片段合并。 -
写入
_output
关于片段合并,我使用一个结构体 SubString
来存储片段, 同时为这个结构体设置一个 merge
方法, 让它可以把其他的结构体合并到自己身上, 返回值为 :
-
如果不能合并, 返回
std::nullopt
-
如果可以合并, 返回这两个片段中有多少字段重复的
通过 lower_bound
来找可以合并的片段即可。
stream_reassembler.hh
struct SubString {
std::string _data{};
size_t _begin{0};
size_t _size{0};
SubString() = default;
SubString(const std::string &data, const size_t begin, const size_t size)
: _data{data}, _begin{begin}, _size{size} {}
bool operator<(const SubString &rhs) const { return _begin < rhs._begin; }
std::optional<size_t> merge(const SubString &other) {
SubString lhs{}, rhs{};
if (_begin > other._begin) {
lhs = other;
rhs = *this;
} else {
lhs = *this;
rhs = other;
}
// can't merge
if (lhs._begin + lhs._size < rhs._begin) return std::nullopt;
// \in
if (lhs._begin + lhs._size >= rhs._begin + rhs._size) {
*this = lhs;
return rhs._size;
}
const size_t res = _begin + _size - rhs._begin;
lhs._data += rhs._data.substr((lhs._begin + lhs._size) - rhs._begin);
lhs._size = lhs._data.size();
*this = lhs;
return res;
}
};
class StreamReassembler {
private:
// Your code here -- add private members as necessary.
std::set<SubString> _buffer{};
size_t _unassembled_bytes{0};
size_t _push_pos{0};
bool _is_eof{false};
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
SubString cut_substring(const std::string &data, const size_t index);
void merge_substring(SubString &ssd);
// ...
stream_reassembler.cc
StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) {}
//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
// DUMMY_CODE(data, index, eof);
if (index >= _push_pos + _capacity) return;
SubString ssd = cut_substring(data, index);
merge_substring(ssd);
while (_unassembled_bytes > 0 && _output.remaining_capacity() > 0) {
auto iter = _buffer.begin();
if (iter->_begin != _push_pos) break;
const size_t push_len = _output.write(iter->_data);
_unassembled_bytes -= push_len;
_push_pos += push_len;
if (iter->_size == push_len) {
_buffer.erase(iter);
} else {
SubString unass{std::string{}.assign(iter->_data.begin() + _push_pos, iter->_data.end())
,_push_pos, iter->_size - push_len};
_buffer.erase(iter);
_buffer.insert(unass);
}
}
if (index + data.length() <= _push_pos + _capacity && eof) {
_is_eof |= eof;
}
if (_unassembled_bytes == 0 && _is_eof) {
_output.end_input();
}
}
SubString StreamReassembler::cut_substring(const std::string &data, const size_t index) {
// \in
SubString res{};
if (index + data.size() <= _push_pos) return res;
//
else if (index < _push_pos) {
const size_t len = _push_pos - index;
res._begin = _push_pos;
res._data.assign(data.begin() + len, data.end());
res._size = res._data.size();
} else {
res._begin = index;
res._data = data;
res._size = data.size();
}
_unassembled_bytes += res._size;
return res;
}
void StreamReassembler::merge_substring(SubString &ssd) {
if (ssd._size == 0) return;
do {
// next
auto iter = _buffer.lower_bound(ssd);
std::optional<size_t> res{};
while (iter != _buffer.end() && (res = ssd.merge(*iter)) != std::nullopt) {
_unassembled_bytes -= res.value();
_buffer.erase(iter);
iter = _buffer.lower_bound(ssd);
}
// pre
if (iter == _buffer.begin()) break;
iter--;
while (iter != _buffer.end() && (res = ssd.merge(*iter)) != std::nullopt) {
_unassembled_bytes -= res.value();
_buffer.erase(iter);
iter = _buffer.lower_bound(ssd);
if (iter == _buffer.begin()) break;
iter--;
}
} while(false);
_buffer.insert(ssd);
}
size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }
bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }