htc: implement remainder of Mux/Tasks

This commit is contained in:
Michael Scire 2021-02-09 19:51:52 -08:00 committed by SciresM
parent 42cf3f50d7
commit b925344c3b
13 changed files with 291 additions and 24 deletions

View file

@ -120,9 +120,12 @@ namespace ams::htc::server::driver {
}
Result HtclowDriver::ReceiveInternal(size_t *out, void *dst, size_t dst_size, htclow::ChannelId channel, htclow::ReceiveOption option) {
/* Determine whether we're blocking. */
const bool blocking = option != htclow::ReceiveOption_NonBlocking;
/* Begin receiving. */
u32 task_id;
R_TRY(m_manager->ReceiveBegin(std::addressof(task_id), GetHtclowChannel(channel, m_module_id), option != htclow::ReceiveOption_NonBlocking));
R_TRY(m_manager->ReceiveBegin(std::addressof(task_id), GetHtclowChannel(channel, m_module_id), blocking ? 1 : 0));
/* Wait for the task to complete. */
this->WaitTask(task_id);

View file

@ -157,8 +157,8 @@ namespace ams::htclow {
return m_ctrl_service.NotifyAwake();
}
Result HtclowManagerImpl::ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, bool blocking) {
return m_mux.ReceiveBegin(out_task_id, channel, blocking);
Result HtclowManagerImpl::ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, size_t size) {
return m_mux.ReceiveBegin(out_task_id, channel, size);
}
Result HtclowManagerImpl::ReceiveEnd(size_t *out, void *dst, size_t dst_size, impl::ChannelInternalType channel, u32 task_id) {

View file

@ -68,7 +68,7 @@ namespace ams::htclow {
void NotifyAsleep();
void NotifyAwake();
Result ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, bool blocking);
Result ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, size_t size);
Result ReceiveEnd(size_t *out, void *dst, size_t dst_size, impl::ChannelInternalType channel, u32 task_id);
Result SendBegin(u32 *out_task_id, size_t *out, const void *src, size_t src_size, impl::ChannelInternalType channel);

View file

@ -228,11 +228,44 @@ namespace ams::htclow::mux {
return m_channel_impl_map[it->second].DoConnectEnd();
}
ChannelState Mux::GetChannelState(impl::ChannelInternalType channel);
os::EventType *Mux::GetChannelStateEvent(impl::ChannelInternalType channel);
ChannelState Mux::GetChannelState(impl::ChannelInternalType channel) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
Result Mux::FlushBegin(u32 *out_task_id, impl::ChannelInternalType channel);
Result Mux::FlushEnd(u32 task_id);
return m_channel_impl_map.GetChannelImpl(channel).GetChannelState();
}
os::EventType *Mux::GetChannelStateEvent(impl::ChannelInternalType channel) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
return m_channel_impl_map.GetChannelImpl(channel).GetChannelStateEvent();
}
Result Mux::FlushBegin(u32 *out_task_id, impl::ChannelInternalType channel) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
/* Find the channel. */
auto it = m_channel_impl_map.GetMap().find(channel);
R_UNLESS(it != m_channel_impl_map.GetMap().end(), htclow::ResultChannelNotExist());
/* Perform the connection. */
return m_channel_impl_map[it->second].DoFlush(out_task_id);
}
Result Mux::FlushEnd(u32 task_id) {
/* Get the trigger for the task. */
const auto trigger = m_task_manager.GetTrigger(task_id);
/* Free the task. */
m_task_manager.FreeTask(task_id);
/* Check that we didn't hit a disconnect. */
R_UNLESS(trigger != EventTrigger_Disconnect, htclow::ResultInvalidChannelStateDisconnected());
return ResultSuccess();
}
os::EventType *Mux::GetTaskEvent(u32 task_id) {
/* Lock ourselves. */
@ -241,11 +274,60 @@ namespace ams::htclow::mux {
return m_task_manager.GetTaskEvent(task_id);
}
Result Mux::ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, bool blocking);
Result Mux::ReceiveEnd(size_t *out, void *dst, size_t dst_size, impl::ChannelInternalType channel, u32 task_id);
Result Mux::ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, size_t size) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
Result Mux::SendBegin(u32 *out_task_id, size_t *out, const void *src, size_t src_size, impl::ChannelInternalType channel);
Result Mux::SendEnd(u32 task_id);
/* Find the channel. */
auto it = m_channel_impl_map.GetMap().find(channel);
R_UNLESS(it != m_channel_impl_map.GetMap().end(), htclow::ResultChannelNotExist());
/* Perform the connection. */
return m_channel_impl_map[it->second].DoReceiveBegin(out_task_id, size);
}
Result Mux::ReceiveEnd(size_t *out, void *dst, size_t dst_size, impl::ChannelInternalType channel, u32 task_id) {
/* Free the task. */
m_task_manager.FreeTask(task_id);
/* If we have data, perform the receive. */
if (dst_size > 0) {
/* Find the channel. */
auto it = m_channel_impl_map.GetMap().find(channel);
R_UNLESS(it != m_channel_impl_map.GetMap().end(), htclow::ResultChannelNotExist());
/* Perform the RECEIVE. */
return m_channel_impl_map[it->second].DoReceiveEnd(out, dst, dst_size);
} else {
*out = 0;
return ResultSuccess();
}
}
Result Mux::SendBegin(u32 *out_task_id, size_t *out, const void *src, size_t src_size, impl::ChannelInternalType channel) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
/* Find the channel. */
auto it = m_channel_impl_map.GetMap().find(channel);
R_UNLESS(it != m_channel_impl_map.GetMap().end(), htclow::ResultChannelNotExist());
/* Perform the connection. */
return m_channel_impl_map[it->second].DoSend(out_task_id, out, src, src_size);
}
Result Mux::SendEnd(u32 task_id) {
/* Get the trigger for the task. */
const auto trigger = m_task_manager.GetTrigger(task_id);
/* Free the task. */
m_task_manager.FreeTask(task_id);
/* Check that we didn't hit a disconnect. */
R_UNLESS(trigger != EventTrigger_Disconnect, htclow::ResultInvalidChannelStateDisconnected());
return ResultSuccess();
}
void Mux::SetSendBuffer(impl::ChannelInternalType channel, void *buf, size_t buf_size, size_t max_packet_size) {
/* Lock ourselves. */
@ -283,6 +365,16 @@ namespace ams::htclow::mux {
m_channel_impl_map[it->second].SetReceiveBuffer(buf, buf_size);
}
Result Mux::Shutdown(impl::ChannelInternalType channel);
Result Mux::Shutdown(impl::ChannelInternalType channel) {
/* Lock ourselves. */
std::scoped_lock lk(m_mutex);
/* Find the channel. */
auto it = m_channel_impl_map.GetMap().find(channel);
R_UNLESS(it != m_channel_impl_map.GetMap().end(), htclow::ResultChannelNotExist());
/* Perform the shutdown. */
return m_channel_impl_map[it->second].DoShutdown();
}
}

View file

@ -68,7 +68,7 @@ namespace ams::htclow::mux {
os::EventType *GetTaskEvent(u32 task_id);
Result ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, bool blocking);
Result ReceiveBegin(u32 *out_task_id, impl::ChannelInternalType channel, size_t size);
Result ReceiveEnd(size_t *out, void *dst, size_t dst_size, impl::ChannelInternalType channel, u32 task_id);
Result SendBegin(u32 *out_task_id, size_t *out, const void *src, size_t src_size, impl::ChannelInternalType channel);

View file

@ -24,7 +24,7 @@ namespace ams::htclow::mux {
ChannelImpl::ChannelImpl(impl::ChannelInternalType channel, PacketFactory *pf, ctrl::HtcctrlStateMachine *sm, TaskManager *tm, os::Event *ev)
: m_channel(channel), m_packet_factory(pf), m_state_machine(sm), m_task_manager(tm), m_event(ev),
m_send_buffer(m_channel, pf), m_receive_buffer(), m_version(ProtocolVersion), m_config(DefaultChannelConfig),
m_offset(0), m_total_send_size(0), m_next_max_data(0), m_cur_max_data(0), m_share(),
m_offset(0), m_total_send_size(0), m_cur_max_data(0), m_prev_max_data(0), m_share(),
m_state_change_event(os::EventClearMode_ManualClear), m_state(ChannelState_Unconnectable)
{
@ -146,10 +146,10 @@ namespace ams::htclow::mux {
bool ChannelImpl::QuerySendPacket(PacketHeader *header, PacketBody *body, int *out_body_size) {
/* Check our send buffer. */
if (m_send_buffer.QueryNextPacket(header, body, out_body_size, m_next_max_data, m_total_send_size, m_share.has_value(), m_share.value_or(0))) {
if (m_send_buffer.QueryNextPacket(header, body, out_body_size, m_cur_max_data, m_total_send_size, m_share.has_value(), m_share.value_or(0))) {
/* Update tracking variables. */
if (header->packet_type == PacketType_Data) {
m_cur_max_data = m_next_max_data;
m_prev_max_data = m_cur_max_data;
}
return true;
@ -259,11 +259,11 @@ namespace ams::htclow::mux {
/* Perform handshake, if we should. */
if (m_config.handshake_enabled) {
/* Set our next max data. */
m_next_max_data = m_receive_buffer.GetBufferSize();
/* Set our current max data. */
m_cur_max_data = m_receive_buffer.GetBufferSize();
/* Make a max data packet. */
auto packet = m_packet_factory->MakeMaxDataPacket(m_channel, m_version, m_next_max_data);
auto packet = m_packet_factory->MakeMaxDataPacket(m_channel, m_version, m_cur_max_data);
R_UNLESS(packet, htclow::ResultOutOfMemory());
/* Send the packet. */
@ -272,8 +272,8 @@ namespace ams::htclow::mux {
/* Signal that we have an packet to send. */
this->SignalSendPacketEvent();
/* Set our current max data. */
m_cur_max_data = m_next_max_data;
/* Set our prev max data. */
m_prev_max_data = m_cur_max_data;
} else {
/* Set our share. */
m_share = m_config.initial_counter_max_data;
@ -290,6 +290,141 @@ namespace ams::htclow::mux {
return ResultSuccess();
}
Result ChannelImpl::DoFlush(u32 *out_task_id) {
/* Check our state. */
R_TRY(this->CheckState({ChannelState_Connected}));
/* Allocate a task. */
u32 task_id;
R_TRY(m_task_manager->AllocateTask(std::addressof(task_id), m_channel));
/* Configure the task. */
m_task_manager->ConfigureFlushTask(task_id);
/* If we're already flushed, complete the task immediately. */
if (m_send_buffer.Empty()) {
m_task_manager->CompleteTask(task_id, EventTrigger_SendBufferEmpty);
}
/* Set the output task id. */
*out_task_id = task_id;
return ResultSuccess();
}
Result ChannelImpl::DoReceiveBegin(u32 *out_task_id, size_t size) {
/* Check our state. */
R_TRY(this->CheckState({ChannelState_Connected, ChannelState_Disconnected}));
/* Allocate a task. */
u32 task_id;
R_TRY(m_task_manager->AllocateTask(std::addressof(task_id), m_channel));
/* Configure the task. */
m_task_manager->ConfigureReceiveTask(task_id, size);
/* Check if the task is already complete. */
if (m_receive_buffer.GetDataSize() >= size) {
m_task_manager->CompleteTask(task_id, EventTrigger_ReceiveData);
} else if (m_state == ChannelState_Disconnected) {
m_task_manager->CompleteTask(task_id, EventTrigger_Disconnect);
}
/* Set the output task id. */
*out_task_id = task_id;
return ResultSuccess();
}
Result ChannelImpl::DoReceiveEnd(size_t *out, void *dst, size_t dst_size) {
/* Check our state. */
R_TRY(this->CheckState({ChannelState_Connected, ChannelState_Disconnected}));
/* If we have nowhere to receive, we're done. */
if (dst_size == 0) {
*out = 0;
return ResultSuccess();
}
/* Get the amount of receivable data. */
const size_t receivable = m_receive_buffer.GetDataSize();
const size_t received = std::min(dst_size, receivable);
/* Read the data. */
R_ABORT_UNLESS(m_receive_buffer.Read(dst, received));
/* Handle flow control, if we should. */
if (m_config.flow_control_enabled) {
/* Read our fields. */
const auto prev_max_data = m_prev_max_data;
const auto next_max_data = m_cur_max_data + received;
const auto max_packet_size = m_config.max_packet_size;
const auto offset = m_offset;
/* Update our current max data. */
m_cur_max_data = next_max_data;
/* If we can, send a max data packet. */
if (prev_max_data - offset < max_packet_size + sizeof(PacketHeader)) {
/* Make a max data packet. */
auto packet = m_packet_factory->MakeMaxDataPacket(m_channel, m_version, next_max_data);
R_UNLESS(packet, htclow::ResultOutOfMemory());
/* Send the packet. */
m_send_buffer.AddPacket(std::move(packet));
/* Signal that we have an packet to send. */
this->SignalSendPacketEvent();
/* Set our prev max data. */
m_prev_max_data = m_cur_max_data;
}
}
/* Set the output size. */
*out = received;
return ResultSuccess();
}
Result ChannelImpl::DoSend(u32 *out_task_id, size_t *out, const void *src, size_t src_size) {
/* Check our state. */
R_TRY(this->CheckState({ChannelState_Connected}));
/* Allocate a task. */
u32 task_id;
R_TRY(m_task_manager->AllocateTask(std::addressof(task_id), m_channel));
/* Send the data. */
const size_t sent = m_send_buffer.AddData(src, src_size);
/* Add the size to our total. */
m_total_send_size += sent;
/* Signal our event. */
this->SignalSendPacketEvent();
/* Configure the task. */
m_task_manager->ConfigureSendTask(task_id);
/* If we sent all the data, we're done. */
if (sent == src_size) {
m_task_manager->CompleteTask(task_id, EventTrigger_SendComplete);
}
/* Set the output. */
*out_task_id = task_id;
*out = sent;
return ResultSuccess();
}
Result ChannelImpl::DoShutdown() {
/* Check our state. */
R_TRY(this->CheckState({ChannelState_Connected}));
/* Set our state. */
this->SetState(ChannelState_Disconnected);
return ResultSuccess();
}
void ChannelImpl::SetSendBuffer(void *buf, size_t buf_size, size_t max_packet_size) {
/* Set buffer. */
m_send_buffer.SetBuffer(buf, buf_size);

View file

@ -45,8 +45,8 @@ namespace ams::htclow::mux {
ChannelConfig m_config;
u64 m_offset;
u64 m_total_send_size;
u64 m_next_max_data;
u64 m_cur_max_data;
u64 m_prev_max_data;
std::optional<u64> m_share;
os::Event m_state_change_event;
ChannelState m_state;
@ -55,6 +55,9 @@ namespace ams::htclow::mux {
void SetVersion(s16 version);
ChannelState GetChannelState() { return m_state; }
os::EventType *GetChannelStateEvent() { return m_state_change_event.GetBase(); }
Result ProcessReceivePacket(const PacketHeader &header, const void *body, size_t body_size);
bool QuerySendPacket(PacketHeader *header, PacketBody *body, int *out_body_size);
@ -68,6 +71,15 @@ namespace ams::htclow::mux {
Result DoConnectBegin(u32 *out_task_id);
Result DoConnectEnd();
Result DoFlush(u32 *out_task_id);
Result DoReceiveBegin(u32 *out_task_id, size_t size);
Result DoReceiveEnd(size_t *out, void *dst, size_t dst_size);
Result DoSend(u32 *out_task_id, size_t *out, const void *src, size_t src_size);
Result DoShutdown();
void SetSendBuffer(void *buf, size_t buf_size, size_t max_packet_size);
void SetReceiveBuffer(void *buf, size_t buf_size);
void SetSendBufferWithData(const void *buf, size_t buf_size, size_t max_packet_size);

View file

@ -47,6 +47,16 @@ namespace ams::htclow::mux {
m_can_discard = false;
}
Result RingBuffer::Read(void *dst, size_t size) {
/* Copy the data. */
R_TRY(this->Copy(dst, size));
/* Discard. */
R_TRY(this->Discard(size));
return ResultSuccess();
}
Result RingBuffer::Write(const void *data, size_t size) {
/* Validate pre-conditions. */
AMS_ASSERT(!m_is_read_only);

View file

@ -38,6 +38,7 @@ namespace ams::htclow::mux {
size_t GetBufferSize() { return m_buffer_size; }
size_t GetDataSize() { return m_data_size; }
Result Read(void *dst, size_t size);
Result Write(const void *data, size_t size);
Result Copy(void *dst, size_t size);

View file

@ -131,6 +131,17 @@ namespace ams::htclow::mux {
}
}
size_t SendBuffer::AddData(const void *data, size_t size) {
/* Determine how much to actually add. */
size = std::min(size, m_ring_buffer.GetBufferSize() - m_ring_buffer.GetDataSize());
/* Write the data. */
R_ABORT_UNLESS(m_ring_buffer.Write(data, size));
/* Return the size we wrote. */
return size;
}
void SendBuffer::SetBuffer(void *buffer, size_t buffer_size) {
m_ring_buffer.Initialize(buffer, buffer_size);
}

View file

@ -54,6 +54,8 @@ namespace ams::htclow::mux {
void AddPacket(std::unique_ptr<Packet, PacketDeleter> ptr);
void RemovePacket(const PacketHeader &header);
size_t AddData(const void *data, size_t size);
void SetBuffer(void *buffer, size_t buffer_size);
void SetReadOnlyBuffer(const void *buffer, size_t buffer_size);
void SetMaxPacketSize(size_t max_packet_size);

View file

@ -44,7 +44,7 @@ namespace ams::htclow::mux {
}
/* Verify the task is free. */
R_UNLESS(!m_valid[task_id], htclow::ResultOutOfTask());
R_UNLESS(task_id < util::size(m_tasks), htclow::ResultOutOfTask());
/* Mark the task as allocated. */
m_valid[task_id] = true;

View file

@ -23,6 +23,7 @@ namespace ams::htclow::mux {
enum EventTrigger : u8 {
EventTrigger_Disconnect = 1,
EventTrigger_ReceiveData = 2,
EventTrigger_SendComplete = 4,
EventTrigger_SendReady = 5,
EventTrigger_SendBufferEmpty = 10,
EventTrigger_ConnectReady = 11,