Statistics
| Revision:

root / tmp / org.txm.statsengine.r.core.win32 / res / win32 / library / BH / include / boost / asio / detail / impl / win_iocp_io_service.ipp @ 2486

History | View | Annotate | Download (14.8 kB)

1
//
2
// detail/impl/win_iocp_io_service.ipp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
//
5
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6
//
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9
//
10

    
11
#ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP
12
#define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP
13

    
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15
# pragma once
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17

    
18
#include <boost/asio/detail/config.hpp>
19

    
20
#if defined(BOOST_ASIO_HAS_IOCP)
21

    
22
#include <boost/asio/error.hpp>
23
#include <boost/asio/io_service.hpp>
24
#include <boost/asio/detail/cstdint.hpp>
25
#include <boost/asio/detail/handler_alloc_helpers.hpp>
26
#include <boost/asio/detail/handler_invoke_helpers.hpp>
27
#include <boost/asio/detail/limits.hpp>
28
#include <boost/asio/detail/throw_error.hpp>
29
#include <boost/asio/detail/win_iocp_io_service.hpp>
30

    
31
#include <boost/asio/detail/push_options.hpp>
32

    
33
namespace boost {
34
namespace asio {
35
namespace detail {
36

    
37
struct win_iocp_io_service::work_finished_on_block_exit
38
{
39
  ~work_finished_on_block_exit()
40
  {
41
    io_service_->work_finished();
42
  }
43

    
44
  win_iocp_io_service* io_service_;
45
};
46

    
47
struct win_iocp_io_service::timer_thread_function
48
{
49
  void operator()()
50
  {
51
    while (::InterlockedExchangeAdd(&io_service_->shutdown_, 0) == 0)
52
    {
53
      if (::WaitForSingleObject(io_service_->waitable_timer_.handle,
54
            INFINITE) == WAIT_OBJECT_0)
55
      {
56
        ::InterlockedExchange(&io_service_->dispatch_required_, 1);
57
        ::PostQueuedCompletionStatus(io_service_->iocp_.handle,
58
            0, wake_for_dispatch, 0);
59
      }
60
    }
61
  }
62

    
63
  win_iocp_io_service* io_service_;
64
};
65

    
66
win_iocp_io_service::win_iocp_io_service(
67
    boost::asio::io_service& io_service, size_t concurrency_hint)
68
  : boost::asio::detail::service_base<win_iocp_io_service>(io_service),
69
    iocp_(),
70
    outstanding_work_(0),
71
    stopped_(0),
72
    stop_event_posted_(0),
73
    shutdown_(0),
74
    gqcs_timeout_(get_gqcs_timeout()),
75
    dispatch_required_(0)
76
{
77
  BOOST_ASIO_HANDLER_TRACKING_INIT;
78

    
79
  iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
80
      static_cast<DWORD>(concurrency_hint < DWORD(~0)
81
        ? concurrency_hint : DWORD(~0)));
82
  if (!iocp_.handle)
83
  {
84
    DWORD last_error = ::GetLastError();
85
    boost::system::error_code ec(last_error,
86
        boost::asio::error::get_system_category());
87
    boost::asio::detail::throw_error(ec, "iocp");
88
  }
89
}
90

    
91
void win_iocp_io_service::shutdown_service()
92
{
93
  ::InterlockedExchange(&shutdown_, 1);
94

    
95
  if (timer_thread_.get())
96
  {
97
    LARGE_INTEGER timeout;
98
    timeout.QuadPart = 1;
99
    ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
100
  }
101

    
102
  while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
103
  {
104
    op_queue<win_iocp_operation> ops;
105
    timer_queues_.get_all_timers(ops);
106
    ops.push(completed_ops_);
107
    if (!ops.empty())
108
    {
109
      while (win_iocp_operation* op = ops.front())
110
      {
111
        ops.pop();
112
        ::InterlockedDecrement(&outstanding_work_);
113
        op->destroy();
114
      }
115
    }
116
    else
117
    {
118
      DWORD bytes_transferred = 0;
119
      dword_ptr_t completion_key = 0;
120
      LPOVERLAPPED overlapped = 0;
121
      ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
122
          &completion_key, &overlapped, gqcs_timeout_);
123
      if (overlapped)
124
      {
125
        ::InterlockedDecrement(&outstanding_work_);
126
        static_cast<win_iocp_operation*>(overlapped)->destroy();
127
      }
128
    }
129
  }
130

    
131
  if (timer_thread_.get())
132
    timer_thread_->join();
133
}
134

    
135
boost::system::error_code win_iocp_io_service::register_handle(
136
    HANDLE handle, boost::system::error_code& ec)
137
{
138
  if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
139
  {
140
    DWORD last_error = ::GetLastError();
141
    ec = boost::system::error_code(last_error,
142
        boost::asio::error::get_system_category());
143
  }
144
  else
145
  {
146
    ec = boost::system::error_code();
147
  }
148
  return ec;
149
}
150

    
151
size_t win_iocp_io_service::run(boost::system::error_code& ec)
152
{
153
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
154
  {
155
    stop();
156
    ec = boost::system::error_code();
157
    return 0;
158
  }
159

    
160
  win_iocp_thread_info this_thread;
161
  thread_call_stack::context ctx(this, this_thread);
162

    
163
  size_t n = 0;
164
  while (do_one(true, ec))
165
    if (n != (std::numeric_limits<size_t>::max)())
166
      ++n;
167
  return n;
168
}
169

    
170
size_t win_iocp_io_service::run_one(boost::system::error_code& ec)
171
{
172
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
173
  {
174
    stop();
175
    ec = boost::system::error_code();
176
    return 0;
177
  }
178

    
179
  win_iocp_thread_info this_thread;
180
  thread_call_stack::context ctx(this, this_thread);
181

    
182
  return do_one(true, ec);
183
}
184

    
185
size_t win_iocp_io_service::poll(boost::system::error_code& ec)
186
{
187
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
188
  {
189
    stop();
190
    ec = boost::system::error_code();
191
    return 0;
192
  }
193

    
194
  win_iocp_thread_info this_thread;
195
  thread_call_stack::context ctx(this, this_thread);
196

    
197
  size_t n = 0;
198
  while (do_one(false, ec))
199
    if (n != (std::numeric_limits<size_t>::max)())
200
      ++n;
201
  return n;
202
}
203

    
204
size_t win_iocp_io_service::poll_one(boost::system::error_code& ec)
205
{
206
  if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
207
  {
208
    stop();
209
    ec = boost::system::error_code();
210
    return 0;
211
  }
212

    
213
  win_iocp_thread_info this_thread;
214
  thread_call_stack::context ctx(this, this_thread);
215

    
216
  return do_one(false, ec);
217
}
218

    
219
void win_iocp_io_service::stop()
220
{
221
  if (::InterlockedExchange(&stopped_, 1) == 0)
222
  {
223
    if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
224
    {
225
      if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
226
      {
227
        DWORD last_error = ::GetLastError();
228
        boost::system::error_code ec(last_error,
229
            boost::asio::error::get_system_category());
230
        boost::asio::detail::throw_error(ec, "pqcs");
231
      }
232
    }
233
  }
234
}
235

    
236
void win_iocp_io_service::post_deferred_completion(win_iocp_operation* op)
237
{
238
  // Flag the operation as ready.
239
  op->ready_ = 1;
240

    
241
  // Enqueue the operation on the I/O completion port.
242
  if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
243
  {
244
    // Out of resources. Put on completed queue instead.
245
    mutex::scoped_lock lock(dispatch_mutex_);
246
    completed_ops_.push(op);
247
    ::InterlockedExchange(&dispatch_required_, 1);
248
  }
249
}
250

    
251
void win_iocp_io_service::post_deferred_completions(
252
    op_queue<win_iocp_operation>& ops)
253
{
254
  while (win_iocp_operation* op = ops.front())
255
  {
256
    ops.pop();
257

    
258
    // Flag the operation as ready.
259
    op->ready_ = 1;
260

    
261
    // Enqueue the operation on the I/O completion port.
262
    if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
263
    {
264
      // Out of resources. Put on completed queue instead.
265
      mutex::scoped_lock lock(dispatch_mutex_);
266
      completed_ops_.push(op);
267
      completed_ops_.push(ops);
268
      ::InterlockedExchange(&dispatch_required_, 1);
269
    }
270
  }
271
}
272

    
273
void win_iocp_io_service::abandon_operations(
274
    op_queue<win_iocp_operation>& ops)
275
{
276
  while (win_iocp_operation* op = ops.front())
277
  {
278
    ops.pop();
279
    ::InterlockedDecrement(&outstanding_work_);
280
    op->destroy();
281
  }
282
}
283

    
284
void win_iocp_io_service::on_pending(win_iocp_operation* op)
285
{
286
  if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
287
  {
288
    // Enqueue the operation on the I/O completion port.
289
    if (!::PostQueuedCompletionStatus(iocp_.handle,
290
          0, overlapped_contains_result, op))
291
    {
292
      // Out of resources. Put on completed queue instead.
293
      mutex::scoped_lock lock(dispatch_mutex_);
294
      completed_ops_.push(op);
295
      ::InterlockedExchange(&dispatch_required_, 1);
296
    }
297
  }
298
}
299

    
300
void win_iocp_io_service::on_completion(win_iocp_operation* op,
301
    DWORD last_error, DWORD bytes_transferred)
302
{
303
  // Flag that the operation is ready for invocation.
304
  op->ready_ = 1;
305

    
306
  // Store results in the OVERLAPPED structure.
307
  op->Internal = reinterpret_cast<ulong_ptr_t>(
308
      &boost::asio::error::get_system_category());
309
  op->Offset = last_error;
310
  op->OffsetHigh = bytes_transferred;
311

    
312
  // Enqueue the operation on the I/O completion port.
313
  if (!::PostQueuedCompletionStatus(iocp_.handle,
314
        0, overlapped_contains_result, op))
315
  {
316
    // Out of resources. Put on completed queue instead.
317
    mutex::scoped_lock lock(dispatch_mutex_);
318
    completed_ops_.push(op);
319
    ::InterlockedExchange(&dispatch_required_, 1);
320
  }
321
}
322

    
323
void win_iocp_io_service::on_completion(win_iocp_operation* op,
324
    const boost::system::error_code& ec, DWORD bytes_transferred)
325
{
326
  // Flag that the operation is ready for invocation.
327
  op->ready_ = 1;
328

    
329
  // Store results in the OVERLAPPED structure.
330
  op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
331
  op->Offset = ec.value();
332
  op->OffsetHigh = bytes_transferred;
333

    
334
  // Enqueue the operation on the I/O completion port.
335
  if (!::PostQueuedCompletionStatus(iocp_.handle,
336
        0, overlapped_contains_result, op))
337
  {
338
    // Out of resources. Put on completed queue instead.
339
    mutex::scoped_lock lock(dispatch_mutex_);
340
    completed_ops_.push(op);
341
    ::InterlockedExchange(&dispatch_required_, 1);
342
  }
343
}
344

    
345
size_t win_iocp_io_service::do_one(bool block, boost::system::error_code& ec)
346
{
347
  for (;;)
348
  {
349
    // Try to acquire responsibility for dispatching timers and completed ops.
350
    if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
351
    {
352
      mutex::scoped_lock lock(dispatch_mutex_);
353

    
354
      // Dispatch pending timers and operations.
355
      op_queue<win_iocp_operation> ops;
356
      ops.push(completed_ops_);
357
      timer_queues_.get_ready_timers(ops);
358
      post_deferred_completions(ops);
359
      update_timeout();
360
    }
361

    
362
    // Get the next operation from the queue.
363
    DWORD bytes_transferred = 0;
364
    dword_ptr_t completion_key = 0;
365
    LPOVERLAPPED overlapped = 0;
366
    ::SetLastError(0);
367
    BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
368
        &completion_key, &overlapped, block ? gqcs_timeout_ : 0);
369
    DWORD last_error = ::GetLastError();
370

    
371
    if (overlapped)
372
    {
373
      win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
374
      boost::system::error_code result_ec(last_error,
375
          boost::asio::error::get_system_category());
376

    
377
      // We may have been passed the last_error and bytes_transferred in the
378
      // OVERLAPPED structure itself.
379
      if (completion_key == overlapped_contains_result)
380
      {
381
        result_ec = boost::system::error_code(static_cast<int>(op->Offset),
382
            *reinterpret_cast<boost::system::error_category*>(op->Internal));
383
        bytes_transferred = op->OffsetHigh;
384
      }
385

    
386
      // Otherwise ensure any result has been saved into the OVERLAPPED
387
      // structure.
388
      else
389
      {
390
        op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
391
        op->Offset = result_ec.value();
392
        op->OffsetHigh = bytes_transferred;
393
      }
394

    
395
      // Dispatch the operation only if ready. The operation may not be ready
396
      // if the initiating function (e.g. a call to WSARecv) has not yet
397
      // returned. This is because the initiating function still wants access
398
      // to the operation's OVERLAPPED structure.
399
      if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
400
      {
401
        // Ensure the count of outstanding work is decremented on block exit.
402
        work_finished_on_block_exit on_exit = { this };
403
        (void)on_exit;
404

    
405
        op->complete(*this, result_ec, bytes_transferred);
406
        ec = boost::system::error_code();
407
        return 1;
408
      }
409
    }
410
    else if (!ok)
411
    {
412
      if (last_error != WAIT_TIMEOUT)
413
      {
414
        ec = boost::system::error_code(last_error,
415
            boost::asio::error::get_system_category());
416
        return 0;
417
      }
418

    
419
      // If we're not polling we need to keep going until we get a real handler.
420
      if (block)
421
        continue;
422

    
423
      ec = boost::system::error_code();
424
      return 0;
425
    }
426
    else if (completion_key == wake_for_dispatch)
427
    {
428
      // We have been woken up to try to acquire responsibility for dispatching
429
      // timers and completed operations.
430
    }
431
    else
432
    {
433
      // Indicate that there is no longer an in-flight stop event.
434
      ::InterlockedExchange(&stop_event_posted_, 0);
435

    
436
      // The stopped_ flag is always checked to ensure that any leftover
437
      // stop events from a previous run invocation are ignored.
438
      if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
439
      {
440
        // Wake up next thread that is blocked on GetQueuedCompletionStatus.
441
        if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
442
        {
443
          if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
444
          {
445
            last_error = ::GetLastError();
446
            ec = boost::system::error_code(last_error,
447
                boost::asio::error::get_system_category());
448
            return 0;
449
          }
450
        }
451

    
452
        ec = boost::system::error_code();
453
        return 0;
454
      }
455
    }
456
  }
457
}
458

    
459
DWORD win_iocp_io_service::get_gqcs_timeout()
460
{
461
  OSVERSIONINFOEX osvi;
462
  ZeroMemory(&osvi, sizeof(osvi));
463
  osvi.dwOSVersionInfoSize = sizeof(osvi);
464
  osvi.dwMajorVersion = 6ul;
465

    
466
  const uint64_t condition_mask = ::VerSetConditionMask(
467
      0, VER_MAJORVERSION, VER_GREATER_EQUAL);
468

    
469
  if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
470
    return INFINITE;
471

    
472
  return default_gqcs_timeout;
473
}
474

    
475
void win_iocp_io_service::do_add_timer_queue(timer_queue_base& queue)
476
{
477
  mutex::scoped_lock lock(dispatch_mutex_);
478

    
479
  timer_queues_.insert(&queue);
480

    
481
  if (!waitable_timer_.handle)
482
  {
483
    waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
484
    if (waitable_timer_.handle == 0)
485
    {
486
      DWORD last_error = ::GetLastError();
487
      boost::system::error_code ec(last_error,
488
          boost::asio::error::get_system_category());
489
      boost::asio::detail::throw_error(ec, "timer");
490
    }
491

    
492
    LARGE_INTEGER timeout;
493
    timeout.QuadPart = -max_timeout_usec;
494
    timeout.QuadPart *= 10;
495
    ::SetWaitableTimer(waitable_timer_.handle,
496
        &timeout, max_timeout_msec, 0, 0, FALSE);
497
  }
498

    
499
  if (!timer_thread_.get())
500
  {
501
    timer_thread_function thread_function = { this };
502
    timer_thread_.reset(new thread(thread_function, 65536));
503
  }
504
}
505

    
506
void win_iocp_io_service::do_remove_timer_queue(timer_queue_base& queue)
507
{
508
  mutex::scoped_lock lock(dispatch_mutex_);
509

    
510
  timer_queues_.erase(&queue);
511
}
512

    
513
void win_iocp_io_service::update_timeout()
514
{
515
  if (timer_thread_.get())
516
  {
517
    // There's no point updating the waitable timer if the new timeout period
518
    // exceeds the maximum timeout. In that case, we might as well wait for the
519
    // existing period of the timer to expire.
520
    long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
521
    if (timeout_usec < max_timeout_usec)
522
    {
523
      LARGE_INTEGER timeout;
524
      timeout.QuadPart = -timeout_usec;
525
      timeout.QuadPart *= 10;
526
      ::SetWaitableTimer(waitable_timer_.handle,
527
          &timeout, max_timeout_msec, 0, 0, FALSE);
528
    }
529
  }
530
}
531

    
532
} // namespace detail
533
} // namespace asio
534
} // namespace boost
535

    
536
#include <boost/asio/detail/pop_options.hpp>
537

    
538
#endif // defined(BOOST_ASIO_HAS_IOCP)
539

    
540
#endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_SERVICE_IPP