Statistics
| Revision:

root / tmp / org.txm.statsengine.r.core.win32 / res / win32 / library / BH / include / boost / asio / detail / impl / task_io_service.ipp @ 2486

History | View | Annotate | Download (11.3 kB)

1
//
2
// detail/impl/task_io_service.ipp
3
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
4
//
5
// Copyright (c) 2003-2015 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6
//
7
// Distributed under the Boost Software License, Version 1.0. (See accompanying
8
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9
//
10

    
11
#ifndef BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
12
#define BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP
13

    
14
#if defined(_MSC_VER) && (_MSC_VER >= 1200)
15
# pragma once
16
#endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
17

    
18
#include <boost/asio/detail/config.hpp>
19

    
20
#if !defined(BOOST_ASIO_HAS_IOCP)
21

    
22
#include <boost/asio/detail/event.hpp>
23
#include <boost/asio/detail/limits.hpp>
24
#include <boost/asio/detail/reactor.hpp>
25
#include <boost/asio/detail/task_io_service.hpp>
26
#include <boost/asio/detail/task_io_service_thread_info.hpp>
27

    
28
#include <boost/asio/detail/push_options.hpp>
29

    
30
namespace boost {
31
namespace asio {
32
namespace detail {
33

    
34
struct task_io_service::task_cleanup
35
{
36
  ~task_cleanup()
37
  {
38
    if (this_thread_->private_outstanding_work > 0)
39
    {
40
      boost::asio::detail::increment(
41
          task_io_service_->outstanding_work_,
42
          this_thread_->private_outstanding_work);
43
    }
44
    this_thread_->private_outstanding_work = 0;
45

    
46
    // Enqueue the completed operations and reinsert the task at the end of
47
    // the operation queue.
48
    lock_->lock();
49
    task_io_service_->task_interrupted_ = true;
50
    task_io_service_->op_queue_.push(this_thread_->private_op_queue);
51
    task_io_service_->op_queue_.push(&task_io_service_->task_operation_);
52
  }
53

    
54
  task_io_service* task_io_service_;
55
  mutex::scoped_lock* lock_;
56
  thread_info* this_thread_;
57
};
58

    
59
struct task_io_service::work_cleanup
60
{
61
  ~work_cleanup()
62
  {
63
    if (this_thread_->private_outstanding_work > 1)
64
    {
65
      boost::asio::detail::increment(
66
          task_io_service_->outstanding_work_,
67
          this_thread_->private_outstanding_work - 1);
68
    }
69
    else if (this_thread_->private_outstanding_work < 1)
70
    {
71
      task_io_service_->work_finished();
72
    }
73
    this_thread_->private_outstanding_work = 0;
74

    
75
#if defined(BOOST_ASIO_HAS_THREADS)
76
    if (!this_thread_->private_op_queue.empty())
77
    {
78
      lock_->lock();
79
      task_io_service_->op_queue_.push(this_thread_->private_op_queue);
80
    }
81
#endif // defined(BOOST_ASIO_HAS_THREADS)
82
  }
83

    
84
  task_io_service* task_io_service_;
85
  mutex::scoped_lock* lock_;
86
  thread_info* this_thread_;
87
};
88

    
89
task_io_service::task_io_service(
90
    boost::asio::io_service& io_service, std::size_t concurrency_hint)
91
  : boost::asio::detail::service_base<task_io_service>(io_service),
92
    one_thread_(concurrency_hint == 1),
93
    mutex_(),
94
    task_(0),
95
    task_interrupted_(true),
96
    outstanding_work_(0),
97
    stopped_(false),
98
    shutdown_(false)
99
{
100
  BOOST_ASIO_HANDLER_TRACKING_INIT;
101
}
102

    
103
void task_io_service::shutdown_service()
104
{
105
  mutex::scoped_lock lock(mutex_);
106
  shutdown_ = true;
107
  lock.unlock();
108

    
109
  // Destroy handler objects.
110
  while (!op_queue_.empty())
111
  {
112
    operation* o = op_queue_.front();
113
    op_queue_.pop();
114
    if (o != &task_operation_)
115
      o->destroy();
116
  }
117

    
118
  // Reset to initial state.
119
  task_ = 0;
120
}
121

    
122
void task_io_service::init_task()
123
{
124
  mutex::scoped_lock lock(mutex_);
125
  if (!shutdown_ && !task_)
126
  {
127
    task_ = &use_service<reactor>(this->get_io_service());
128
    op_queue_.push(&task_operation_);
129
    wake_one_thread_and_unlock(lock);
130
  }
131
}
132

    
133
std::size_t task_io_service::run(boost::system::error_code& ec)
134
{
135
  ec = boost::system::error_code();
136
  if (outstanding_work_ == 0)
137
  {
138
    stop();
139
    return 0;
140
  }
141

    
142
  thread_info this_thread;
143
  this_thread.private_outstanding_work = 0;
144
  thread_call_stack::context ctx(this, this_thread);
145

    
146
  mutex::scoped_lock lock(mutex_);
147

    
148
  std::size_t n = 0;
149
  for (; do_run_one(lock, this_thread, ec); lock.lock())
150
    if (n != (std::numeric_limits<std::size_t>::max)())
151
      ++n;
152
  return n;
153
}
154

    
155
std::size_t task_io_service::run_one(boost::system::error_code& ec)
156
{
157
  ec = boost::system::error_code();
158
  if (outstanding_work_ == 0)
159
  {
160
    stop();
161
    return 0;
162
  }
163

    
164
  thread_info this_thread;
165
  this_thread.private_outstanding_work = 0;
166
  thread_call_stack::context ctx(this, this_thread);
167

    
168
  mutex::scoped_lock lock(mutex_);
169

    
170
  return do_run_one(lock, this_thread, ec);
171
}
172

    
173
std::size_t task_io_service::poll(boost::system::error_code& ec)
174
{
175
  ec = boost::system::error_code();
176
  if (outstanding_work_ == 0)
177
  {
178
    stop();
179
    return 0;
180
  }
181

    
182
  thread_info this_thread;
183
  this_thread.private_outstanding_work = 0;
184
  thread_call_stack::context ctx(this, this_thread);
185

    
186
  mutex::scoped_lock lock(mutex_);
187

    
188
#if defined(BOOST_ASIO_HAS_THREADS)
189
  // We want to support nested calls to poll() and poll_one(), so any handlers
190
  // that are already on a thread-private queue need to be put on to the main
191
  // queue now.
192
  if (one_thread_)
193
    if (thread_info* outer_thread_info = ctx.next_by_key())
194
      op_queue_.push(outer_thread_info->private_op_queue);
195
#endif // defined(BOOST_ASIO_HAS_THREADS)
196

    
197
  std::size_t n = 0;
198
  for (; do_poll_one(lock, this_thread, ec); lock.lock())
199
    if (n != (std::numeric_limits<std::size_t>::max)())
200
      ++n;
201
  return n;
202
}
203

    
204
std::size_t task_io_service::poll_one(boost::system::error_code& ec)
205
{
206
  ec = boost::system::error_code();
207
  if (outstanding_work_ == 0)
208
  {
209
    stop();
210
    return 0;
211
  }
212

    
213
  thread_info this_thread;
214
  this_thread.private_outstanding_work = 0;
215
  thread_call_stack::context ctx(this, this_thread);
216

    
217
  mutex::scoped_lock lock(mutex_);
218

    
219
#if defined(BOOST_ASIO_HAS_THREADS)
220
  // We want to support nested calls to poll() and poll_one(), so any handlers
221
  // that are already on a thread-private queue need to be put on to the main
222
  // queue now.
223
  if (one_thread_)
224
    if (thread_info* outer_thread_info = ctx.next_by_key())
225
      op_queue_.push(outer_thread_info->private_op_queue);
226
#endif // defined(BOOST_ASIO_HAS_THREADS)
227

    
228
  return do_poll_one(lock, this_thread, ec);
229
}
230

    
231
void task_io_service::stop()
232
{
233
  mutex::scoped_lock lock(mutex_);
234
  stop_all_threads(lock);
235
}
236

    
237
bool task_io_service::stopped() const
238
{
239
  mutex::scoped_lock lock(mutex_);
240
  return stopped_;
241
}
242

    
243
void task_io_service::reset()
244
{
245
  mutex::scoped_lock lock(mutex_);
246
  stopped_ = false;
247
}
248

    
249
void task_io_service::post_immediate_completion(
250
    task_io_service::operation* op, bool is_continuation)
251
{
252
#if defined(BOOST_ASIO_HAS_THREADS)
253
  if (one_thread_ || is_continuation)
254
  {
255
    if (thread_info* this_thread = thread_call_stack::contains(this))
256
    {
257
      ++this_thread->private_outstanding_work;
258
      this_thread->private_op_queue.push(op);
259
      return;
260
    }
261
  }
262
#else // defined(BOOST_ASIO_HAS_THREADS)
263
  (void)is_continuation;
264
#endif // defined(BOOST_ASIO_HAS_THREADS)
265

    
266
  work_started();
267
  mutex::scoped_lock lock(mutex_);
268
  op_queue_.push(op);
269
  wake_one_thread_and_unlock(lock);
270
}
271

    
272
void task_io_service::post_deferred_completion(task_io_service::operation* op)
273
{
274
#if defined(BOOST_ASIO_HAS_THREADS)
275
  if (one_thread_)
276
  {
277
    if (thread_info* this_thread = thread_call_stack::contains(this))
278
    {
279
      this_thread->private_op_queue.push(op);
280
      return;
281
    }
282
  }
283
#endif // defined(BOOST_ASIO_HAS_THREADS)
284

    
285
  mutex::scoped_lock lock(mutex_);
286
  op_queue_.push(op);
287
  wake_one_thread_and_unlock(lock);
288
}
289

    
290
void task_io_service::post_deferred_completions(
291
    op_queue<task_io_service::operation>& ops)
292
{
293
  if (!ops.empty())
294
  {
295
#if defined(BOOST_ASIO_HAS_THREADS)
296
    if (one_thread_)
297
    {
298
      if (thread_info* this_thread = thread_call_stack::contains(this))
299
      {
300
        this_thread->private_op_queue.push(ops);
301
        return;
302
      }
303
    }
304
#endif // defined(BOOST_ASIO_HAS_THREADS)
305

    
306
    mutex::scoped_lock lock(mutex_);
307
    op_queue_.push(ops);
308
    wake_one_thread_and_unlock(lock);
309
  }
310
}
311

    
312
void task_io_service::do_dispatch(
313
    task_io_service::operation* op)
314
{
315
  work_started();
316
  mutex::scoped_lock lock(mutex_);
317
  op_queue_.push(op);
318
  wake_one_thread_and_unlock(lock);
319
}
320

    
321
void task_io_service::abandon_operations(
322
    op_queue<task_io_service::operation>& ops)
323
{
324
  op_queue<task_io_service::operation> ops2;
325
  ops2.push(ops);
326
}
327

    
328
std::size_t task_io_service::do_run_one(mutex::scoped_lock& lock,
329
    task_io_service::thread_info& this_thread,
330
    const boost::system::error_code& ec)
331
{
332
  while (!stopped_)
333
  {
334
    if (!op_queue_.empty())
335
    {
336
      // Prepare to execute first handler from queue.
337
      operation* o = op_queue_.front();
338
      op_queue_.pop();
339
      bool more_handlers = (!op_queue_.empty());
340

    
341
      if (o == &task_operation_)
342
      {
343
        task_interrupted_ = more_handlers;
344

    
345
        if (more_handlers && !one_thread_)
346
          wakeup_event_.unlock_and_signal_one(lock);
347
        else
348
          lock.unlock();
349

    
350
        task_cleanup on_exit = { this, &lock, &this_thread };
351
        (void)on_exit;
352

    
353
        // Run the task. May throw an exception. Only block if the operation
354
        // queue is empty and we're not polling, otherwise we want to return
355
        // as soon as possible.
356
        task_->run(!more_handlers, this_thread.private_op_queue);
357
      }
358
      else
359
      {
360
        std::size_t task_result = o->task_result_;
361

    
362
        if (more_handlers && !one_thread_)
363
          wake_one_thread_and_unlock(lock);
364
        else
365
          lock.unlock();
366

    
367
        // Ensure the count of outstanding work is decremented on block exit.
368
        work_cleanup on_exit = { this, &lock, &this_thread };
369
        (void)on_exit;
370

    
371
        // Complete the operation. May throw an exception. Deletes the object.
372
        o->complete(*this, ec, task_result);
373

    
374
        return 1;
375
      }
376
    }
377
    else
378
    {
379
      wakeup_event_.clear(lock);
380
      wakeup_event_.wait(lock);
381
    }
382
  }
383

    
384
  return 0;
385
}
386

    
387
std::size_t task_io_service::do_poll_one(mutex::scoped_lock& lock,
388
    task_io_service::thread_info& this_thread,
389
    const boost::system::error_code& ec)
390
{
391
  if (stopped_)
392
    return 0;
393

    
394
  operation* o = op_queue_.front();
395
  if (o == &task_operation_)
396
  {
397
    op_queue_.pop();
398
    lock.unlock();
399

    
400
    {
401
      task_cleanup c = { this, &lock, &this_thread };
402
      (void)c;
403

    
404
      // Run the task. May throw an exception. Only block if the operation
405
      // queue is empty and we're not polling, otherwise we want to return
406
      // as soon as possible.
407
      task_->run(false, this_thread.private_op_queue);
408
    }
409

    
410
    o = op_queue_.front();
411
    if (o == &task_operation_)
412
    {
413
      wakeup_event_.maybe_unlock_and_signal_one(lock);
414
      return 0;
415
    }
416
  }
417

    
418
  if (o == 0)
419
    return 0;
420

    
421
  op_queue_.pop();
422
  bool more_handlers = (!op_queue_.empty());
423

    
424
  std::size_t task_result = o->task_result_;
425

    
426
  if (more_handlers && !one_thread_)
427
    wake_one_thread_and_unlock(lock);
428
  else
429
    lock.unlock();
430

    
431
  // Ensure the count of outstanding work is decremented on block exit.
432
  work_cleanup on_exit = { this, &lock, &this_thread };
433
  (void)on_exit;
434

    
435
  // Complete the operation. May throw an exception. Deletes the object.
436
  o->complete(*this, ec, task_result);
437

    
438
  return 1;
439
}
440

    
441
void task_io_service::stop_all_threads(
442
    mutex::scoped_lock& lock)
443
{
444
  stopped_ = true;
445
  wakeup_event_.signal_all(lock);
446

    
447
  if (!task_interrupted_ && task_)
448
  {
449
    task_interrupted_ = true;
450
    task_->interrupt();
451
  }
452
}
453

    
454
void task_io_service::wake_one_thread_and_unlock(
455
    mutex::scoped_lock& lock)
456
{
457
  if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
458
  {
459
    if (!task_interrupted_ && task_)
460
    {
461
      task_interrupted_ = true;
462
      task_->interrupt();
463
    }
464
    lock.unlock();
465
  }
466
}
467

    
468
} // namespace detail
469
} // namespace asio
470
} // namespace boost
471

    
472
#include <boost/asio/detail/pop_options.hpp>
473

    
474
#endif // !defined(BOOST_ASIO_HAS_IOCP)
475

    
476
#endif // BOOST_ASIO_DETAIL_IMPL_TASK_IO_SERVICE_IPP