Statistics
| Revision:

root / tmp / org.txm.statsengine.r.core.win32 / res / win32 / library / BH / include / boost / asio / detail / impl / dev_poll_reactor.ipp @ 2486

History | View | Annotate | Download (12.5 kB)

1
//
2
// detail/impl/dev_poll_reactor.ipp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
//
5
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6
//
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9
//
10

    
11
#ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
12
#define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
13

    
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15
# pragma once
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17

    
18
#include <boost/asio/detail/config.hpp>
19

    
20
#if defined(BOOST_ASIO_HAS_DEV_POLL)
21

    
22
#include <boost/asio/detail/dev_poll_reactor.hpp>
23
#include <boost/asio/detail/assert.hpp>
24
#include <boost/asio/detail/throw_error.hpp>
25
#include <boost/asio/error.hpp>
26

    
27
#include <boost/asio/detail/push_options.hpp>
28

    
29
namespace boost {
30
namespace asio {
31
namespace detail {
32

    
33
dev_poll_reactor::dev_poll_reactor(boost::asio::io_service& io_service)
34
  : boost::asio::detail::service_base<dev_poll_reactor>(io_service),
35
    io_service_(use_service<io_service_impl>(io_service)),
36
    mutex_(),
37
    dev_poll_fd_(do_dev_poll_create()),
38
    interrupter_(),
39
    shutdown_(false)
40
{
41
  // Add the interrupter's descriptor to /dev/poll.
42
  ::pollfd ev = { 0, 0, 0 };
43
  ev.fd = interrupter_.read_descriptor();
44
  ev.events = POLLIN | POLLERR;
45
  ev.revents = 0;
46
  ::write(dev_poll_fd_, &ev, sizeof(ev));
47
}
48

    
49
dev_poll_reactor::~dev_poll_reactor()
50
{
51
  shutdown_service();
52
  ::close(dev_poll_fd_);
53
}
54

    
55
void dev_poll_reactor::shutdown_service()
56
{
57
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
58
  shutdown_ = true;
59
  lock.unlock();
60

    
61
  op_queue<operation> ops;
62

    
63
  for (int i = 0; i < max_ops; ++i)
64
    op_queue_[i].get_all_operations(ops);
65

    
66
  timer_queues_.get_all_timers(ops);
67

    
68
  io_service_.abandon_operations(ops);
69
} 
70

    
71
void dev_poll_reactor::fork_service(boost::asio::io_service::fork_event fork_ev)
72
{
73
  if (fork_ev == boost::asio::io_service::fork_child)
74
  {
75
    detail::mutex::scoped_lock lock(mutex_);
76

    
77
    if (dev_poll_fd_ != -1)
78
      ::close(dev_poll_fd_);
79
    dev_poll_fd_ = -1;
80
    dev_poll_fd_ = do_dev_poll_create();
81

    
82
    interrupter_.recreate();
83

    
84
    // Add the interrupter's descriptor to /dev/poll.
85
    ::pollfd ev = { 0, 0, 0 };
86
    ev.fd = interrupter_.read_descriptor();
87
    ev.events = POLLIN | POLLERR;
88
    ev.revents = 0;
89
    ::write(dev_poll_fd_, &ev, sizeof(ev));
90

    
91
    // Re-register all descriptors with /dev/poll. The changes will be written
92
    // to the /dev/poll descriptor the next time the reactor is run.
93
    for (int i = 0; i < max_ops; ++i)
94
    {
95
      reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin();
96
      reactor_op_queue<socket_type>::iterator end = op_queue_[i].end();
97
      for (; iter != end; ++iter)
98
      {
99
        ::pollfd& pending_ev = add_pending_event_change(iter->first);
100
        pending_ev.events |= POLLERR | POLLHUP;
101
        switch (i)
102
        {
103
        case read_op: pending_ev.events |= POLLIN; break;
104
        case write_op: pending_ev.events |= POLLOUT; break;
105
        case except_op: pending_ev.events |= POLLPRI; break;
106
        default: break;
107
        }
108
      }
109
    }
110
    interrupter_.interrupt();
111
  }
112
}
113

    
114
void dev_poll_reactor::init_task()
115
{
116
  io_service_.init_task();
117
}
118

    
119
int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&)
120
{
121
  return 0;
122
}
123

    
124
int dev_poll_reactor::register_internal_descriptor(int op_type,
125
    socket_type descriptor, per_descriptor_data&, reactor_op* op)
126
{
127
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
128

    
129
  op_queue_[op_type].enqueue_operation(descriptor, op);
130
  ::pollfd& ev = add_pending_event_change(descriptor);
131
  ev.events = POLLERR | POLLHUP;
132
  switch (op_type)
133
  {
134
  case read_op: ev.events |= POLLIN; break;
135
  case write_op: ev.events |= POLLOUT; break;
136
  case except_op: ev.events |= POLLPRI; break;
137
  default: break;
138
  }
139
  interrupter_.interrupt();
140

    
141
  return 0;
142
}
143

    
144
void dev_poll_reactor::move_descriptor(socket_type,
145
    dev_poll_reactor::per_descriptor_data&,
146
    dev_poll_reactor::per_descriptor_data&)
147
{
148
}
149

    
150
void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
151
    dev_poll_reactor::per_descriptor_data&, reactor_op* op,
152
    bool is_continuation, bool allow_speculative)
153
{
154
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
155

    
156
  if (shutdown_)
157
  {
158
    post_immediate_completion(op, is_continuation);
159
    return;
160
  }
161

    
162
  if (allow_speculative)
163
  {
164
    if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor))
165
    {
166
      if (!op_queue_[op_type].has_operation(descriptor))
167
      {
168
        if (op->perform())
169
        {
170
          lock.unlock();
171
          io_service_.post_immediate_completion(op, is_continuation);
172
          return;
173
        }
174
      }
175
    }
176
  }
177

    
178
  bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
179
  io_service_.work_started();
180
  if (first)
181
  {
182
    ::pollfd& ev = add_pending_event_change(descriptor);
183
    ev.events = POLLERR | POLLHUP;
184
    if (op_type == read_op
185
        || op_queue_[read_op].has_operation(descriptor))
186
      ev.events |= POLLIN;
187
    if (op_type == write_op
188
        || op_queue_[write_op].has_operation(descriptor))
189
      ev.events |= POLLOUT;
190
    if (op_type == except_op
191
        || op_queue_[except_op].has_operation(descriptor))
192
      ev.events |= POLLPRI;
193
    interrupter_.interrupt();
194
  }
195
}
196

    
197
void dev_poll_reactor::cancel_ops(socket_type descriptor,
198
    dev_poll_reactor::per_descriptor_data&)
199
{
200
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
201
  cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
202
}
203

    
204
void dev_poll_reactor::deregister_descriptor(socket_type descriptor,
205
    dev_poll_reactor::per_descriptor_data&, bool)
206
{
207
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
208

    
209
  // Remove the descriptor from /dev/poll.
210
  ::pollfd& ev = add_pending_event_change(descriptor);
211
  ev.events = POLLREMOVE;
212
  interrupter_.interrupt();
213

    
214
  // Cancel any outstanding operations associated with the descriptor.
215
  cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
216
}
217

    
218
void dev_poll_reactor::deregister_internal_descriptor(
219
    socket_type descriptor, dev_poll_reactor::per_descriptor_data&)
220
{
221
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
222

    
223
  // Remove the descriptor from /dev/poll. Since this function is only called
224
  // during a fork, we can apply the change immediately.
225
  ::pollfd ev = { 0, 0, 0 };
226
  ev.fd = descriptor;
227
  ev.events = POLLREMOVE;
228
  ev.revents = 0;
229
  ::write(dev_poll_fd_, &ev, sizeof(ev));
230

    
231
  // Destroy all operations associated with the descriptor.
232
  op_queue<operation> ops;
233
  boost::system::error_code ec;
234
  for (int i = 0; i < max_ops; ++i)
235
    op_queue_[i].cancel_operations(descriptor, ops, ec);
236
}
237

    
238
void dev_poll_reactor::run(bool block, op_queue<operation>& ops)
239
{
240
  boost::asio::detail::mutex::scoped_lock lock(mutex_);
241

    
242
  // We can return immediately if there's no work to do and the reactor is
243
  // not supposed to block.
244
  if (!block && op_queue_[read_op].empty() && op_queue_[write_op].empty()
245
      && op_queue_[except_op].empty() && timer_queues_.all_empty())
246
    return;
247

    
248
  // Write the pending event registration changes to the /dev/poll descriptor.
249
  std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
250
  if (events_size > 0)
251
  {
252
    errno = 0;
253
    int result = ::write(dev_poll_fd_,
254
        &pending_event_changes_[0], events_size);
255
    if (result != static_cast<int>(events_size))
256
    {
257
      boost::system::error_code ec = boost::system::error_code(
258
          errno, boost::asio::error::get_system_category());
259
      for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
260
      {
261
        int descriptor = pending_event_changes_[i].fd;
262
        for (int j = 0; j < max_ops; ++j)
263
          op_queue_[j].cancel_operations(descriptor, ops, ec);
264
      }
265
    }
266
    pending_event_changes_.clear();
267
    pending_event_change_index_.clear();
268
  }
269

    
270
  int timeout = block ? get_timeout() : 0;
271
  lock.unlock();
272

    
273
  // Block on the /dev/poll descriptor.
274
  ::pollfd events[128] = { { 0, 0, 0 } };
275
  ::dvpoll dp = { 0, 0, 0 };
276
  dp.dp_fds = events;
277
  dp.dp_nfds = 128;
278
  dp.dp_timeout = timeout;
279
  int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp);
280

    
281
  lock.lock();
282

    
283
  // Dispatch the waiting events.
284
  for (int i = 0; i < num_events; ++i)
285
  {
286
    int descriptor = events[i].fd;
287
    if (descriptor == interrupter_.read_descriptor())
288
    {
289
      interrupter_.reset();
290
    }
291
    else
292
    {
293
      bool more_reads = false;
294
      bool more_writes = false;
295
      bool more_except = false;
296

    
297
      // Exception operations must be processed first to ensure that any
298
      // out-of-band data is read before normal data.
299
      if (events[i].events & (POLLPRI | POLLERR | POLLHUP))
300
        more_except =
301
          op_queue_[except_op].perform_operations(descriptor, ops);
302
      else
303
        more_except = op_queue_[except_op].has_operation(descriptor);
304

    
305
      if (events[i].events & (POLLIN | POLLERR | POLLHUP))
306
        more_reads = op_queue_[read_op].perform_operations(descriptor, ops);
307
      else
308
        more_reads = op_queue_[read_op].has_operation(descriptor);
309

    
310
      if (events[i].events & (POLLOUT | POLLERR | POLLHUP))
311
        more_writes = op_queue_[write_op].perform_operations(descriptor, ops);
312
      else
313
        more_writes = op_queue_[write_op].has_operation(descriptor);
314

    
315
      if ((events[i].events & (POLLERR | POLLHUP)) != 0
316
            && !more_except && !more_reads && !more_writes)
317
      {
318
        // If we have an event and no operations associated with the
319
        // descriptor then we need to delete the descriptor from /dev/poll.
320
        // The poll operation can produce POLLHUP or POLLERR events when there
321
        // is no operation pending, so if we do not remove the descriptor we
322
        // can end up in a tight polling loop.
323
        ::pollfd ev = { 0, 0, 0 };
324
        ev.fd = descriptor;
325
        ev.events = POLLREMOVE;
326
        ev.revents = 0;
327
        ::write(dev_poll_fd_, &ev, sizeof(ev));
328
      }
329
      else
330
      {
331
        ::pollfd ev = { 0, 0, 0 };
332
        ev.fd = descriptor;
333
        ev.events = POLLERR | POLLHUP;
334
        if (more_reads)
335
          ev.events |= POLLIN;
336
        if (more_writes)
337
          ev.events |= POLLOUT;
338
        if (more_except)
339
          ev.events |= POLLPRI;
340
        ev.revents = 0;
341
        int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
342
        if (result != sizeof(ev))
343
        {
344
          boost::system::error_code ec(errno,
345
              boost::asio::error::get_system_category());
346
          for (int j = 0; j < max_ops; ++j)
347
            op_queue_[j].cancel_operations(descriptor, ops, ec);
348
        }
349
      }
350
    }
351
  }
352
  timer_queues_.get_ready_timers(ops);
353
}
354

    
355
void dev_poll_reactor::interrupt()
356
{
357
  interrupter_.interrupt();
358
}
359

    
360
int dev_poll_reactor::do_dev_poll_create()
361
{
362
  int fd = ::open("/dev/poll", O_RDWR);
363
  if (fd == -1)
364
  {
365
    boost::system::error_code ec(errno,
366
        boost::asio::error::get_system_category());
367
    boost::asio::detail::throw_error(ec, "/dev/poll");
368
  }
369
  return fd;
370
}
371

    
372
void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue)
373
{
374
  mutex::scoped_lock lock(mutex_);
375
  timer_queues_.insert(&queue);
376
}
377

    
378
void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue)
379
{
380
  mutex::scoped_lock lock(mutex_);
381
  timer_queues_.erase(&queue);
382
}
383

    
384
int dev_poll_reactor::get_timeout()
385
{
386
  // By default we will wait no longer than 5 minutes. This will ensure that
387
  // any changes to the system clock are detected after no longer than this.
388
  return timer_queues_.wait_duration_msec(5 * 60 * 1000);
389
}
390

    
391
void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor,
392
    const boost::system::error_code& ec)
393
{
394
  bool need_interrupt = false;
395
  op_queue<operation> ops;
396
  for (int i = 0; i < max_ops; ++i)
397
    need_interrupt = op_queue_[i].cancel_operations(
398
        descriptor, ops, ec) || need_interrupt;
399
  io_service_.post_deferred_completions(ops);
400
  if (need_interrupt)
401
    interrupter_.interrupt();
402
}
403

    
404
::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor)
405
{
406
  hash_map<int, std::size_t>::iterator iter
407
    = pending_event_change_index_.find(descriptor);
408
  if (iter == pending_event_change_index_.end())
409
  {
410
    std::size_t index = pending_event_changes_.size();
411
    pending_event_changes_.reserve(pending_event_changes_.size() + 1);
412
    pending_event_change_index_.insert(std::make_pair(descriptor, index));
413
    pending_event_changes_.push_back(::pollfd());
414
    pending_event_changes_[index].fd = descriptor;
415
    pending_event_changes_[index].revents = 0;
416
    return pending_event_changes_[index];
417
  }
418
  else
419
  {
420
    return pending_event_changes_[iter->second];
421
  }
422
}
423

    
424
} // namespace detail
425
} // namespace asio
426
} // namespace boost
427

    
428
#include <boost/asio/detail/pop_options.hpp>
429

    
430
#endif // defined(BOOST_ASIO_HAS_DEV_POLL)
431

    
432
#endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP