boost asio async_write:如何不交错async_write调用?

问题描述:

这里是我的实现:boost asio async_write:如何不交错async_write调用?

  • 客户端A通过async_read发送消息的客户端B
  • Server进程的消息正确的数据量和 将等待来自客户端的新数据(为了不挡住客户端A)
  • 之后服务器将处理该信息(可能执行mysql 查询),然后通过async_write将消息发送给客户端B.

问题是,如果客户端A发送消息非常快,async_writes将交错之前的async_write处理程序被调用。

有没有简单的方法来避免这个问题?

编辑1: 如果一个客户端C刚过客户端A,同样的问题会出现将消息发送到客户端B ...

编辑2: 这工作?因为它似乎阻止,我不知道在哪里...

namespace structure {                
    class User {                  
    public:                   
    User(boost::asio::io_service& io_service, boost::asio::ssl::context& context) : 
     m_socket(io_service, context), m_strand(io_service), is_writing(false) {}  

    ssl_socket& getSocket() {              
     return m_socket;                
    }                    

    boost::asio::strand getStrand() {            
     return m_strand;                
    }                    

    void push(std::string str) {             
     m_strand.post(boost::bind(&structure::User::strand_push, this, str));   
    }                    

    void strand_push(std::string str) {            

     std::cout << "pushing: " << boost::this_thread::get_id() << std::endl;  
     m_queue.push(str);               
     if (!is_writing) {               
     write();                 
     std::cout << "going to write" << std::endl;         
     }                    
     std::cout << "Already writing" << std::endl;         
    }                    

    void write() {                 
     std::cout << "writing" << std::endl;           
     is_writing = true;               
     std::string str = m_queue.front();           
     boost::asio::async_write(m_socket,           
           boost::asio::buffer(str.c_str(), str.size()),  
           boost::bind(&structure::User::sent, this)   
           );             
    }                    

    void sent() {                 
     std::cout << "sent" << std::endl;            
     m_queue.pop();                
     if (!m_queue.empty()) {              
     write();                 
     return;                  
     }                    
     else                   
     is_writing = false;               
     std::cout << "done sent" << std::endl;          
    }           

    private:          
    ssl_socket   m_socket;    
    boost::asio::strand m_strand;    
    std::queue<std::string>  m_queue;  
    bool      is_writing;  
    };           
}            

#endif 
+0

注意,异步写入比异步读取更有价值。由于操作系统将在本地缓冲数据,大多数写入操作几乎是即时的另一方面,读取可能会阻止等待远程端,并且您无法在本地执行任何操作。因此,同步写入是实现排序的可行方式。这也解决了数据所有权的问题 - 上面的代码是不正确的,因为当write()返回时'str'被销毁,可能在'boost :: asio_async_write()'访问缓冲区之前。 – MSalters

有没有一种简单的方法来避免这个问题?

是的,维护每个客户端的传出队列。检查async_write完成处理程序中的队列大小,如果非零,则启动另一个async_write操作。下面是一个简单

#include <boost/asio.hpp> 
#include <boost/bind.hpp> 

#include <deque> 
#include <iostream> 
#include <string> 

class Connection 
{ 
public: 
    Connection(
      boost::asio::io_service& io_service 
      ) : 
     _io_service(io_service), 
     _strand(_io_service), 
     _socket(_io_service), 
     _outbox() 
    { 

    } 

    void write( 
      const std::string& message 
      ) 
    { 
     _strand.post(
       boost::bind(
        &Connection::writeImpl, 
        this, 
        message 
        ) 
       ); 
    } 

private: 
    void writeImpl(
      const std::string& message 
      ) 
    { 
     _outbox.push_back(message); 
     if (_outbox.size() > 1) { 
      // outstanding async_write 
      return; 
     } 

     this->write(); 
    } 

    void write() 
    { 
     const std::string& message = _outbox[0]; 
     boost::asio::async_write(
       _socket, 
       boost::asio::buffer(message.c_str(), message.size()), 
       _strand.wrap(
        boost::bind(
         &Connection::writeHandler, 
         this, 
         boost::asio::placeholders::error, 
         boost::asio::placeholders::bytes_transferred 
         ) 
        ) 
       ); 
    } 

    void writeHandler(
      const boost::system::error_code& error, 
      const size_t bytesTransferred 
      ) 
    { 
     _outbox.pop_front(); 

     if (error) { 
      std::cerr << "could not write: " << boost::system::system_error(error).what() << std::endl; 
      return; 
     } 

     if (!_outbox.empty()) { 
      // more messages to send 
      this->write(); 
     } 
    } 


private: 
    typedef std::deque<std::string> Outbox; 

private: 
    boost::asio::io_service& _io_service; 
    boost::asio::io_service::strand _strand; 
    boost::asio::ip::tcp::socket _socket; 
    Outbox _outbox; 
}; 

int 
main() 
{ 
    boost::asio::io_service io_service; 
    Connection foo(io_service); 
} 

一些关键点

  • boost::asio::io_service::strand保护访问Connection::_outbox
  • 处理程序从Connection::write()出动,因为它是公共

这一点不明确对我来说,如果你在问题的例子中使用类似的做法,因为所有的方法都是公开的。

+0

我试过这个解决方案,事情是我有一个单一的io_service与多个线程运行run(),事情是甚至使用strand.post推队列上的数据它似乎segfault因为它被称为从2个不同的线程...任何想法为什么? – TheSquad

+0

@TheSquad对我来说听起来像是一个单独的问题。你很可能错误地实现了你的逻辑,可以很容易地处理股线和多线程。对于您的原始问题,使用队列是一个适当的解决方案。 –

+0

你会用什么来知道什么时候应该从队列中弹出数据? – TheSquad

只是想改善山姆的伟大答案。改进点是:

  • async_write力图完成,这意味着你应该提供你的所有,到写操作的输入数据之前的每一个字节的缓冲区(一个或多个)发送,否则由于TCP数据包比它们可能小,成帧开销可能会增加。

  • asio::streambuf,虽然使用非常方便,但不是零拷贝。下面的示例演示了一个零拷贝方法:将输入数据块保留在它们所在的位置,并使用接收一系列输入缓冲区(它们只是指向实际输入数据的指针)的分散/聚集过载async_write, 。

完整的源代码:

#include <boost/asio.hpp> 
#include <iostream> 
#include <memory> 
#include <mutex> 
#include <string> 
#include <thread> 
#include <unordered_set> 
#include <vector> 

using namespace std::chrono_literals; 
using boost::asio::ip::tcp; 

class Server 
{ 
    class Connection : public std::enable_shared_from_this<Connection> 
    { 
    friend class Server; 
    void ProcessCommand(const std::string& cmd) { 
     if (cmd == "stop") { 
     server_.Stop(); 
     return; 
     } 
     if (cmd == "") { 
     Close(); 
     return; 
     } 
     std::thread t([this, self = shared_from_this(), cmd] { 
     for (int i = 0; i < 30; ++i) { 
      Write("Hello, " + cmd + " " + std::to_string(i) + "\r\n"); 
     } 
     server_.io_service_.post([this, self] { 
      DoReadCmd(); 
     }); 
     }); 
     t.detach(); 
    } 

    void DoReadCmd() { 
     read_timer_.expires_from_now(server_.read_timeout_); 
     read_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Read timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_read_until(socket_, buf_in_, '\n', [this, self = shared_from_this()](boost::system::error_code ec, std::size_t bytes_read) { 
     read_timer_.cancel(); 
     if (!ec) { 
      const char* p = boost::asio::buffer_cast<const char*>(buf_in_.data()); 
      std::string cmd(p, bytes_read - (bytes_read > 1 && p[bytes_read - 2] == '\r' ? 2 : 1)); 
      buf_in_.consume(bytes_read); 
      ProcessCommand(cmd); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 

    void DoWrite() { 
     active_buffer_ ^= 1; // switch buffers 
     for (const auto& data : buffers_[active_buffer_]) { 
     buffer_seq_.push_back(boost::asio::buffer(data)); 
     } 
     write_timer_.expires_from_now(server_.write_timeout_); 
     write_timer_.async_wait([this](boost::system::error_code ec) { 
     if (!ec) { 
      std::cout << "Write timeout\n"; 
      Shutdown(); 
     } 
     }); 
     boost::asio::async_write(socket_, buffer_seq_, [this, self = shared_from_this()](const boost::system::error_code& ec, size_t bytes_transferred) { 
     write_timer_.cancel(); 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_].clear(); 
     buffer_seq_.clear(); 
     if (!ec) { 
      std::cout << "Wrote " << bytes_transferred << " bytes\n"; 
      if (!buffers_[active_buffer_^1].empty()) // have more work 
      DoWrite(); 
     } 
     else { 
      Close(); 
     } 
     }); 
    } 
    bool Writing() const { return !buffer_seq_.empty(); } 

    Server& server_; 
    boost::asio::streambuf buf_in_; 
    std::mutex buffers_mtx_; 
    std::vector<std::string> buffers_[2]; // a double buffer 
    std::vector<boost::asio::const_buffer> buffer_seq_; 
    int active_buffer_ = 0; 
    bool closing_ = false; 
    bool closed_ = false; 
    boost::asio::deadline_timer read_timer_, write_timer_; 
    tcp::socket socket_; 
    public: 
    Connection(Server& server) : server_(server), read_timer_(server.io_service_), write_timer_(server.io_service_), socket_(server.io_service_) { 
    } 

    void Start() { 
     socket_.set_option(tcp::no_delay(true)); 
     DoReadCmd(); 
    } 

    void Close() { 
     closing_ = true; 
     if (!Writing()) 
     Shutdown(); 
    } 

    void Shutdown() { 
     if (!closed_) { 
     closing_ = closed_ = true; 
     boost::system::error_code ec; 
     socket_.shutdown(tcp::socket::shutdown_both, ec); 
     socket_.close(); 
     server_.active_connections_.erase(shared_from_this()); 
     } 
    } 

    void Write(std::string&& data) { 
     std::lock_guard<std::mutex> lock(buffers_mtx_); 
     buffers_[active_buffer_^1].push_back(std::move(data)); // move input data to the inactive buffer 
     if (!Writing()) 
     DoWrite(); 
    } 

    }; 

    void DoAccept() { 
    if (acceptor_.is_open()) { 
     auto session = std::make_shared<Connection>(*this); 
     acceptor_.async_accept(session->socket_, [this, session](boost::system::error_code ec) { 
     if (!ec) { 
      active_connections_.insert(session); 
      session->Start(); 
     } 
     DoAccept(); 
     }); 
    } 
    } 

    boost::asio::io_service io_service_; 
    tcp::acceptor acceptor_; 
    std::unordered_set<std::shared_ptr<Connection>> active_connections_; 
    const boost::posix_time::time_duration read_timeout_ = boost::posix_time::seconds(30); 
    const boost::posix_time::time_duration write_timeout_ = boost::posix_time::seconds(30); 

public: 
    Server(int port) : acceptor_(io_service_, tcp::endpoint(tcp::v6(), port), false) { } 

    void Run() { 
    std::cout << "Listening on " << acceptor_.local_endpoint() << "\n"; 
    DoAccept(); 
    io_service_.run(); 
    } 

    void Stop() { 
    acceptor_.close(); 
    { 
     std::vector<std::shared_ptr<Connection>> sessionsToClose; 
     copy(active_connections_.begin(), active_connections_.end(), back_inserter(sessionsToClose)); 
     for (auto& s : sessionsToClose) 
     s->Shutdown(); 
    } 
    active_connections_.clear(); 
    io_service_.stop(); 
    } 

}; 

int main() { 
    try { 
    Server srv(8888); 
    srv.Run(); 
    } 
    catch (const std::exception& e) { 
    std::cerr << "Error: " << e.what() << "\n"; 
    } 
}