24 Ocak 2018 Çarşamba

asio io_service Sınıfı

io_service Sınıfı
Açıklaması şöyle
the io_service is your central I/O notification mechanism. It abstracts the low-level OS-specific notification methods such as dev/epoll, I/O-Completion Ports and kqueue.
Bu sınıf boost 1.66'dan itibaren isim değiştirip io_context ismini alıyor. Açıklaması şöyle.
Boost.Asio now provides the interfaces and functionality specified by the "C++ Extensions for Networking" Technical Specification. In addition to access via the usual Boost.Asio header files, this functionality may be accessed through special headers that correspond to the header files defined in the TS.
Constructor - default
Şöyle yaparız.
boost::asio::io_service ios;
io_service kulanılan platforma göre task_io_service veya win_iocp_io_service isminde impl sınıfları kullanıyor.

windows
win_iocp_io_service windows'ta kullanılıyor.

Posix
task_io_service Posix işletim sistemlerinde kullanılıyor. task_io_service kendi içinde farklı reactorler kullanabilir. Bunlar
  • select_reactor reactor
  • epoll_reactor reactor
  • kqueue_reactor reactor
  • dev_poll_reactor reactor 
olabilir.

Constructor - concurrency_hint
Açıklaması şöyle
io_service( std::size_t concurrency_hint);
Construct with a hint about the required level of concurrency.
Parameters
concurrency_hint A suggestion to the implementation on how many threads it should allow to run simultaneously.
Belirtilen sayı örneğin Windows'ta CreateIoCompletionPort çağırısında işe yarar. Açıklaması şöyle
This value limits the number of runnable threads associated with the completion port. When the total number of runnable threads associated with the completion port reaches the concurrency value, the system blocks the execution of any subsequent threads associated with that completion port until the number of runnable threads drops below the concurrency value.
Eğer 1 verilirse bazı iyileştirmeler yapar. Açıklaması şöyle
Using thread-local operation queues in single-threaded use cases (i.e. when concurrency_hint is 1) to eliminate a lock/unlock pair.
notify_fork metodu
Uygulama içinde fork(), system() gibi çağrılar ile başka bir uygulama tetikleniyorsa, kullanılan socket'ler yeni uygulamaya da geçer.  fork() kullanırken şöyle yaparız.
// Inform the io_service that we are about to fork. The io_service cleans
// up any internal resources, such as threads, that may interfere with
// forking.
io_service_.notify_fork(boost::asio::io_service::fork_prepare);

if (0 == fork())
{
  // Inform the io_service that the fork is finished and that this is the
  // child process. The io_service uses this opportunity to create any
  // internal file descriptors that must be private to the new process.
  io_service_.notify_fork(boost::asio::io_service::fork_child);

  // The child won't be accepting new connections, so we can close the
  // acceptor. It remains open in the parent.
  acceptor_.close();
  system(...);
  ...
}
else
{
  // Inform the io_service that the fork is finished (or failed) and that
  // this is the parent process. The io_service uses this opportunity to
  // recreate any internal resources that were cleaned up during
  // preparation for the fork.
  io_service_.notify_fork(boost::asio::io_service::fork_parent);
  ...
}
poll metodu
Şöyle yaparız.
for(;;)
{
  io_service.poll();
  ...
}
Ready durumundaki handler'ları çağırır. Örneğin bir timer kurmuş olalım. Eğer timer henüz expire etmediyse poll hiçbir işlem yapmadan çıkar, çünkü timer handler henüz ready durumda değildir.  run ise timer expire edinceye kadar bekler.

Sınıfın poll() metodu çağrılınca impl sınıfının do_poll_one metodu çağrılır. Metodun imzası şöyle.
std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
    task_io_service::thread_info& this_thread,
    const boost::system::error_code& ec)
post metodu
Şöyle yaparız.
io_service.post(boost::bind(...));
İstersek lambda da kullabiliriz.
io_service.post([](){std::cout<<"...";});
Bu metoda parametre olarak verilen CompletionHandler CopyConstructible olmalı. Eğer bu özelliği sağlamıyorsa pointer haline getirilebilir.

reset metodu
Şöyle yaparız. io_service nesnesini yeniden başlatmak için kullanılır. Tek thread'li uygulamalarda timeout işlemleri için kullanılabilir.
io_service.reset();
run metodu
Açıklaması şöyle
Asynchronous completion handlers will only be called from threads that are currently calling io_service::run().
Sınıfın run metodu'nun çağrılması gerek. Böylece socketler ve timerlar çalışır.
io_service.run();  
Eğer istenirse run metodu bir thread, veya thread_group tarafından çağrılır. Sınıfın run metodu çağrılınca impl sınıfının do_run_one metodu çağrılır.

poll() ve run() dahili kuyruktan bir iş çekmeye çalışırlar. Aşağıda görülebilir.
          poll thread                  |          main thread
---------------------------------------+---------------------------------------
  lock()                               | 
  do_poll_one()                        |                          
  |-- pop task_operation_ from         |
  |   queue_op_                        |
  |-- unlock()                         |  lock()
  |-- create task_cleanup              |  do_run_one()
  |-- service reactor (non-block)      |  `-- queue_op_ is empty
  |-- ~task_cleanup()                  |      |-- set thread as idle
  |   |-- lock()                       |      `-- unlock()
  |   `-- queue_op_.push(              |
  |       task_operation_)             |
  `-- task_operation_ is               | 
      queue_op_.front()                |
      `-- return 0                     |  // still waiting on wakeup_event
  unlock()                             |
run metodunda çalıştırılan işlerden fırlatılan exception'ları yakalamak gerekir. Açıklaması şöyle
It is documented that exceptions thrown from completion handlers are propagated. So you need to handle them as appropriate for your application.
Şöyle yaparız.
static void m_asio_event_loop(boost::asio::io_service& svc, std::string name) {
  
  for (;;) {
    try {
      svc.run();
      break; // exited normally
    } catch (std::exception const &e) {
      ...
    } catch (...) {
      ...
    }
  }
}
run_one metodu
Açıklaması şöyle
The documentation clearly specifies that reset() must be called before run() can be called again.
Örnek
Tek thread'li uygulamalarda şöyle yaparız.
while (!exit) {
  io_service.run_one();
}
Örnek
Elimizde bir socket read işlemi bir de timer olsun. Hangisi önce gelirse gelsin sadece bir tanesini yapmak istersek şöyle yaparız.
boost::asio::async_read_until(s,..., ...,...);
high_resolution_timer t(...,...);
tm.async_wait(...);
ios.run_one();
Örnek
Elimizdeki şu işler post edilmiş olsun.
void foo() { /*.... */ }
void bar() { /*.... */ }

io_service ios;
ios.post(foo);
ios.post(bar);
Şu çağrıyı yapalım. Bu çağrı foo() çağrısı bitinceye kadar bloke olur. İkinci kere run_one() çağrısı yaparsak bar() çağrısı bitinceye kadar bloke olur. Üçüncü kere run_one() çağrısı yaparsak run_one() bloke olmaz ve 0 değeri ile döner.
svc.run_one();
run_for metodu
Belirtilen süre kadar çalışır. Şu satırı dahil ederiz.
#define BOOST_ASIO_HAS_CHRONO 1
#include <boost/asio.hpp>
Şöyle yaparız.
class Server {
  io_service ios;
  ...

public:
  void start(unsigned short port, std::chrono::high_resolution_clock::duration time) {
    ...
    ios.run_for(time);
  }
  ...

};

using namespace std::chrono_literals;
int main() {
  Server s;

  s.start(6767, 3s); // For demo, run 3 seconds
}
stop metodu
Açıklaması şöyle
All invocations of its run() or run_one() member functions should return as soon as possible
stop metodunun hazırladıki işleri çalıştırıp çalıştırmayacağı belli değil. Açıklaması şöyle
io_service::stop() just changes the state of the io_service's event loop, it causes the io_service::run() call to return as soon as possible. It's not documented whether stop() cancel already queued handlers, it seems it's OS dependent

Diğer Notlar

1. post metodu
post ile gönderilen iş exception fırlatırsa tüm io_service durur. İş için hem exception'ları yakalayacak hem de hemen durmasını sağlayacak bir template sınıf tanımlarız. Şöyle yaparız.
template <class T>
struct task_wrapped
{
  private:
    T task_unwrapped_;
  public:
    explicit task_wrapped(const T& task_unwrapped)
      : task_unwrapped_(task_unwrapped)
      {}
    void operator()() const
    {
      // resetting interruption
      try
      {
        boost::this_thread::interruption_point();
      }
      catch (const boost::thread_interrupted&){}

      try
      {
        // Executing task
        task_unwrapped_();
      }
      catch (const std::exception& e)
      {
        std::cerr << "Exception: " << e.what() << '\n';
      }
      catch (const boost::thread_interrupted&)
      {
        std::cerr << "Thread interrupted\n";
      }
      catch (...)
      {
        std::cerr << "Unknown exception\n";
      }
    }
};
Bu sınıfı kolayca yaratabilmek için şöyle yaparız.
template <class T>
task_wrapped<T> make_task_wrapped(const T& task_unwrapped)
{
  return task_wrapped<T>(task_unwrapped);
}
io_service nesnesinin postm metodunu çağırmak için şöyle yaparız.
template <class T>
inline void push_task(const T& task_unwrapped) {
  ios.post(detail::make_task_wrapped(task_unwrapped));
}

2. Dahili Kuyruk
io_service içinde bir kuyruk var. Örneğin task_io_service içinde şöyle bir kod var.

op_queue<operation> op_queue_;

Dahili kuyruğu mesaj yapısı ile kullanmak çok kolay.
void send_worker_message(const message_type& message)
{
  // Add work to worker_io_service that will process the message.
  worker_io_service.post(boost::bind(&process_message, message)); 
}

void process_message(message_type& message)
{
  ...
}
3. Harici Kuyruk
io_service nesnesinin kuyruğu private ve erişim imkanı da tanınmamış. Bu yüzden dışarıda kendi  kuyruğumuzu tutup, io_service nesnesine boş post mesajları göndermek bir çözüm olabilir.
Örnek
Şöyle yaparız.
boost::lockfree::queue<message_type> worker_message_queue;

void send_worker_message(const message_type& message)
{
  // Add message to worker message queue.
  worker_message_queue.push(message);

  // Add work to worker_io_service that will process the queue.
  worker_io_service.post(&process_message); 
}

void process_message()
{
  message_type message;

  // If the message was not retrieved, then return early.
  if (!worker_message_queue.pop(message)) return;

  ...
}
Örnek
Kuyruk için şöyle yaparız.
class handler_priority_queue {
public:
  template <typename Handler>
  void add(int priority, Handler&& handler) {
    std::cout << "add(" << priority << ")" << std::endl;
    std::lock_guard<std::mutex> g(mtx_);
    handlers_.emplace(priority, std::forward<Handler>(handler));
  }

  void execute_all() {
    auto top = [&]() -> boost::optional<queued_handler> {
      std::lock_guard<std::mutex> g(mtx_);
      if (handlers_.empty()) return boost::none;
      boost::optional<queued_handler> opt = handlers_.top();
      handlers_.pop();
      return opt;
    };
    while (auto h_opt = top()) {
      h_opt.get().execute();
    }
  }

private:

  std::priority_queue<queued_handler> handlers_;
  std::mutex mtx_;
};
Handler için şöyle yaparız.
class queued_handler {
public:
  template <typename Handler>
  queued_handler(int p, Handler&& handler)
    : priority_(p), function_(std::forward<Handler>(handler))
  {
    std::cout << "queued_handler()" << std::endl;
  }

  void execute() {
    std::cout << "execute(" << priority_ << ")" << std::endl;
    function_();
  }

  friend bool operator<(queued_handler const& lhs,
                        queued_handler const & rhs) {
    return lhs.priority_ < rhs.priority_;
  }

private:
  int priority_;
  std::function<void()> function_;
};
İkinci kademe hadler için şöyle yaparız. Bu handler priority_queue içindedir.
template <typename Handler>
class wrapped_handler {
public:
  template <typename HandlerArg>
  wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h)
    : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h))
  {
  }

  template <typename... Args>
  void operator()(Args&&... args) {
    std::cout << "operator() " << std::endl;
    handler_(std::forward<Args>(args)...);
  }

  //private:
  handler_priority_queue& queue_;
  int priority_;
  Handler handler_;
};

template <typename Handler>
wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
  return wrapped_handler<Handler>(*this, priority,
    std::forward<Handler>(handler));
}
Çağırmak için şöyle yaparız.
pq.wrap(i,[=] {
  std::cout << "[called] " << i << std::endl;
});
Eğer bu metodun sonucunu bir strand içine yerleştirmek ve daha sonra kuyrupa yerleştirmek istersek şöyle yaparız. asio_handler_invoke asio tarafından sağlanan özel bir metod. h işini kuyruğa tekrar yerleştiriyor.
// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
  std::cout << "asio_handler_invoke " << std::endl;
  h->queue_.add(h->priority_, h->handler_);
}
Tüm çağrı için şöyle yaparız.
strand.wrap(
  pq.wrap(
    i,
    [=] {
      std::cout << "[called] " << i << std::endl;
    }
  )
);





Hiç yorum yok:

Yorum Gönder