Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(1202)

Side by Side Diff: webrtc/rtc_base/task_queue_libevent.cc

Issue 3003643002: Allow external TaskQueue implementations on Linux (Closed)
Patch Set: .. Created 3 years, 3 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « webrtc/rtc_base/task_queue.h ('k') | webrtc/webrtc.gni » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 /* 1 /*
2 * Copyright 2016 The WebRTC Project Authors. All rights reserved. 2 * Copyright 2016 The WebRTC Project Authors. All rights reserved.
3 * 3 *
4 * Use of this source code is governed by a BSD-style license 4 * Use of this source code is governed by a BSD-style license
5 * that can be found in the LICENSE file in the root of the source 5 * that can be found in the LICENSE file in the root of the source
6 * tree. An additional intellectual property rights grant can be found 6 * tree. An additional intellectual property rights grant can be found
7 * in the file PATENTS. All contributing project authors may 7 * in the file PATENTS. All contributing project authors may
8 * be found in the AUTHORS file in the root of the source tree. 8 * be found in the AUTHORS file in the root of the source tree.
9 */ 9 */
10 10
11 #include "webrtc/rtc_base/task_queue.h" 11 #include "webrtc/rtc_base/task_queue.h"
12 12
13 #include <fcntl.h> 13 #include <fcntl.h>
14 #include <signal.h> 14 #include <signal.h>
15 #include <string.h> 15 #include <string.h>
16 #include <unistd.h> 16 #include <unistd.h>
17 17
18 #include "base/third_party/libevent/event.h" 18 #include "base/third_party/libevent/event.h"
19 #include "webrtc/rtc_base/checks.h" 19 #include "webrtc/rtc_base/checks.h"
20 #include "webrtc/rtc_base/logging.h" 20 #include "webrtc/rtc_base/logging.h"
21 #include "webrtc/rtc_base/platform_thread.h"
22 #include "webrtc/rtc_base/refcount.h"
23 #include "webrtc/rtc_base/refcountedobject.h"
21 #include "webrtc/rtc_base/safe_conversions.h" 24 #include "webrtc/rtc_base/safe_conversions.h"
25 #include "webrtc/rtc_base/task_queue.h"
22 #include "webrtc/rtc_base/task_queue_posix.h" 26 #include "webrtc/rtc_base/task_queue_posix.h"
23 #include "webrtc/rtc_base/timeutils.h" 27 #include "webrtc/rtc_base/timeutils.h"
24 28
25 namespace rtc { 29 namespace rtc {
26 using internal::GetQueuePtrTls; 30 using internal::GetQueuePtrTls;
27 using internal::AutoSetCurrentQueuePtr; 31 using internal::AutoSetCurrentQueuePtr;
28 32
29 namespace { 33 namespace {
30 static const char kQuit = 1; 34 static const char kQuit = 1;
31 static const char kRunTask = 2; 35 static const char kRunTask = 2;
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 case Priority::NORMAL: 101 case Priority::NORMAL:
98 return kNormalPriority; 102 return kNormalPriority;
99 default: 103 default:
100 RTC_NOTREACHED(); 104 RTC_NOTREACHED();
101 break; 105 break;
102 } 106 }
103 return kNormalPriority; 107 return kNormalPriority;
104 } 108 }
105 } // namespace 109 } // namespace
106 110
107 struct TaskQueue::QueueContext { 111 class TaskQueue::Impl : public RefCountInterface {
108 explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} 112 public:
109 TaskQueue* queue; 113 explicit Impl(const char* queue_name,
114 TaskQueue* queue,
115 Priority priority = Priority::NORMAL);
116 ~Impl() override;
117
118 static TaskQueue::Impl* Current();
119 static TaskQueue* CurrentQueue();
120
121 // Used for DCHECKing the current queue.
122 static bool IsCurrent(const char* queue_name);
123 bool IsCurrent() const;
124
125 void PostTask(std::unique_ptr<QueuedTask> task);
126 void PostTaskAndReply(std::unique_ptr<QueuedTask> task,
127 std::unique_ptr<QueuedTask> reply,
128 TaskQueue::Impl* reply_queue);
129
130 void PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds);
131
132 private:
133 static void ThreadMain(void* context);
134 static void OnWakeup(int socket, short flags, void* context); // NOLINT
135 static void RunTask(int fd, short flags, void* context); // NOLINT
136 static void RunTimer(int fd, short flags, void* context); // NOLINT
137
138 class ReplyTaskOwner;
139 class PostAndReplyTask;
140 class SetTimerTask;
141
142 typedef RefCountedObject<ReplyTaskOwner> ReplyTaskOwnerRef;
143
144 void PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task);
145
146 struct QueueContext;
147 TaskQueue* const queue_;
148 int wakeup_pipe_in_ = -1;
149 int wakeup_pipe_out_ = -1;
150 event_base* event_base_;
151 std::unique_ptr<event> wakeup_event_;
152 PlatformThread thread_;
153 rtc::CriticalSection pending_lock_;
154 std::list<std::unique_ptr<QueuedTask>> pending_ GUARDED_BY(pending_lock_);
155 std::list<scoped_refptr<ReplyTaskOwnerRef>> pending_replies_
156 GUARDED_BY(pending_lock_);
157 };
158
159 struct TaskQueue::Impl::QueueContext {
160 explicit QueueContext(TaskQueue::Impl* q) : queue(q), is_active(true) {}
161 TaskQueue::Impl* queue;
110 bool is_active; 162 bool is_active;
111 // Holds a list of events pending timers for cleanup when the loop exits. 163 // Holds a list of events pending timers for cleanup when the loop exits.
112 std::list<TimerEvent*> pending_timers_; 164 std::list<TimerEvent*> pending_timers_;
113 }; 165 };
114 166
115 // Posting a reply task is tricky business. This class owns the reply task 167 // Posting a reply task is tricky business. This class owns the reply task
116 // and a reference to it is held by both the reply queue and the first task. 168 // and a reference to it is held by both the reply queue and the first task.
117 // Here's an outline of what happens when dealing with a reply task. 169 // Here's an outline of what happens when dealing with a reply task.
118 // * The ReplyTaskOwner owns the |reply_| task. 170 // * The ReplyTaskOwner owns the |reply_| task.
119 // * One ref owned by PostAndReplyTask 171 // * One ref owned by PostAndReplyTask
120 // * One ref owned by the reply TaskQueue 172 // * One ref owned by the reply TaskQueue
121 // * ReplyTaskOwner has a flag |run_task_| initially set to false. 173 // * ReplyTaskOwner has a flag |run_task_| initially set to false.
122 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject). 174 // * ReplyTaskOwner has a method: HasOneRef() (provided by RefCountedObject).
123 // * After successfully running the original |task_|, PostAndReplyTask() calls 175 // * After successfully running the original |task_|, PostAndReplyTask() calls
124 // set_should_run_task(). This sets |run_task_| to true. 176 // set_should_run_task(). This sets |run_task_| to true.
125 // * In PostAndReplyTask's dtor: 177 // * In PostAndReplyTask's dtor:
126 // * It releases its reference to ReplyTaskOwner (important to do this first). 178 // * It releases its reference to ReplyTaskOwner (important to do this first).
127 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe. 179 // * Sends (write()) a kRunReplyTask message to the reply queue's pipe.
128 // * PostAndReplyTask doesn't care if write() fails, but when it does: 180 // * PostAndReplyTask doesn't care if write() fails, but when it does:
129 // * The reply queue is gone. 181 // * The reply queue is gone.
130 // * ReplyTaskOwner has already been deleted and the reply task too. 182 // * ReplyTaskOwner has already been deleted and the reply task too.
131 // * If write() succeeds: 183 // * If write() succeeds:
132 // * ReplyQueue receives the kRunReplyTask message 184 // * ReplyQueue receives the kRunReplyTask message
133 // * Goes through all pending tasks, finding the first that HasOneRef() 185 // * Goes through all pending tasks, finding the first that HasOneRef()
134 // * Calls ReplyTaskOwner::Run() 186 // * Calls ReplyTaskOwner::Run()
135 // * if set_should_run_task() was called, the reply task will be run 187 // * if set_should_run_task() was called, the reply task will be run
136 // * Release the reference to ReplyTaskOwner 188 // * Release the reference to ReplyTaskOwner
137 // * ReplyTaskOwner and associated |reply_| are deleted. 189 // * ReplyTaskOwner and associated |reply_| are deleted.
138 class TaskQueue::ReplyTaskOwner { 190 class TaskQueue::Impl::ReplyTaskOwner {
139 public: 191 public:
140 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply) 192 ReplyTaskOwner(std::unique_ptr<QueuedTask> reply)
141 : reply_(std::move(reply)) {} 193 : reply_(std::move(reply)) {}
142 194
143 void Run() { 195 void Run() {
144 RTC_DCHECK(reply_); 196 RTC_DCHECK(reply_);
145 if (run_task_) { 197 if (run_task_) {
146 if (!reply_->Run()) 198 if (!reply_->Run())
147 reply_.release(); 199 reply_.release();
148 } 200 }
149 reply_.reset(); 201 reply_.reset();
150 } 202 }
151 203
152 void set_should_run_task() { 204 void set_should_run_task() {
153 RTC_DCHECK(!run_task_); 205 RTC_DCHECK(!run_task_);
154 run_task_ = true; 206 run_task_ = true;
155 } 207 }
156 208
157 private: 209 private:
158 std::unique_ptr<QueuedTask> reply_; 210 std::unique_ptr<QueuedTask> reply_;
159 bool run_task_ = false; 211 bool run_task_ = false;
160 }; 212 };
161 213
162 class TaskQueue::PostAndReplyTask : public QueuedTask { 214 class TaskQueue::Impl::PostAndReplyTask : public QueuedTask {
163 public: 215 public:
164 PostAndReplyTask(std::unique_ptr<QueuedTask> task, 216 PostAndReplyTask(std::unique_ptr<QueuedTask> task,
165 std::unique_ptr<QueuedTask> reply, 217 std::unique_ptr<QueuedTask> reply,
166 TaskQueue* reply_queue, 218 TaskQueue::Impl* reply_queue,
167 int reply_pipe) 219 int reply_pipe)
168 : task_(std::move(task)), 220 : task_(std::move(task)),
169 reply_pipe_(reply_pipe), 221 reply_pipe_(reply_pipe),
170 reply_task_owner_( 222 reply_task_owner_(
171 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) { 223 new RefCountedObject<ReplyTaskOwner>(std::move(reply))) {
172 reply_queue->PrepareReplyTask(reply_task_owner_); 224 reply_queue->PrepareReplyTask(reply_task_owner_);
173 } 225 }
174 226
175 ~PostAndReplyTask() override { 227 ~PostAndReplyTask() override {
176 reply_task_owner_ = nullptr; 228 reply_task_owner_ = nullptr;
(...skipping 12 matching lines...) Expand all
189 task_.release(); 241 task_.release();
190 reply_task_owner_->set_should_run_task(); 242 reply_task_owner_->set_should_run_task();
191 return true; 243 return true;
192 } 244 }
193 245
194 std::unique_ptr<QueuedTask> task_; 246 std::unique_ptr<QueuedTask> task_;
195 int reply_pipe_; 247 int reply_pipe_;
196 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_; 248 scoped_refptr<RefCountedObject<ReplyTaskOwner>> reply_task_owner_;
197 }; 249 };
198 250
199 class TaskQueue::SetTimerTask : public QueuedTask { 251 class TaskQueue::Impl::SetTimerTask : public QueuedTask {
200 public: 252 public:
201 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds) 253 SetTimerTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)
202 : task_(std::move(task)), 254 : task_(std::move(task)),
203 milliseconds_(milliseconds), 255 milliseconds_(milliseconds),
204 posted_(Time32()) {} 256 posted_(Time32()) {}
205 257
206 private: 258 private:
207 bool Run() override { 259 bool Run() override {
208 // Compensate for the time that has passed since construction 260 // Compensate for the time that has passed since construction
209 // and until we got here. 261 // and until we got here.
210 uint32_t post_time = Time32() - posted_; 262 uint32_t post_time = Time32() - posted_;
211 TaskQueue::Current()->PostDelayedTask( 263 TaskQueue::Impl::Current()->PostDelayedTask(
212 std::move(task_), 264 std::move(task_),
213 post_time > milliseconds_ ? 0 : milliseconds_ - post_time); 265 post_time > milliseconds_ ? 0 : milliseconds_ - post_time);
214 return true; 266 return true;
215 } 267 }
216 268
217 std::unique_ptr<QueuedTask> task_; 269 std::unique_ptr<QueuedTask> task_;
218 const uint32_t milliseconds_; 270 const uint32_t milliseconds_;
219 const uint32_t posted_; 271 const uint32_t posted_;
220 }; 272 };
221 273
222 TaskQueue::TaskQueue(const char* queue_name, Priority priority /*= NORMAL*/) 274 TaskQueue::Impl::Impl(const char* queue_name,
223 : event_base_(event_base_new()), 275 TaskQueue* queue,
276 Priority priority /*= NORMAL*/)
277 : queue_(queue),
278 event_base_(event_base_new()),
224 wakeup_event_(new event()), 279 wakeup_event_(new event()),
225 thread_(&TaskQueue::ThreadMain, 280 thread_(&TaskQueue::Impl::ThreadMain,
226 this, 281 this,
227 queue_name, 282 queue_name,
228 TaskQueuePriorityToThreadPriority(priority)) { 283 TaskQueuePriorityToThreadPriority(priority)) {
229 RTC_DCHECK(queue_name); 284 RTC_DCHECK(queue_name);
230 int fds[2]; 285 int fds[2];
231 RTC_CHECK(pipe(fds) == 0); 286 RTC_CHECK(pipe(fds) == 0);
232 SetNonBlocking(fds[0]); 287 SetNonBlocking(fds[0]);
233 SetNonBlocking(fds[1]); 288 SetNonBlocking(fds[1]);
234 wakeup_pipe_out_ = fds[0]; 289 wakeup_pipe_out_ = fds[0];
235 wakeup_pipe_in_ = fds[1]; 290 wakeup_pipe_in_ = fds[1];
236 291
237 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_, 292 EventAssign(wakeup_event_.get(), event_base_, wakeup_pipe_out_,
238 EV_READ | EV_PERSIST, OnWakeup, this); 293 EV_READ | EV_PERSIST, OnWakeup, this);
239 event_add(wakeup_event_.get(), 0); 294 event_add(wakeup_event_.get(), 0);
240 thread_.Start(); 295 thread_.Start();
241 } 296 }
242 297
243 TaskQueue::~TaskQueue() { 298 TaskQueue::Impl::~Impl() {
244 RTC_DCHECK(!IsCurrent()); 299 RTC_DCHECK(!IsCurrent());
245 struct timespec ts; 300 struct timespec ts;
246 char message = kQuit; 301 char message = kQuit;
247 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 302 while (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
248 // The queue is full, so we have no choice but to wait and retry. 303 // The queue is full, so we have no choice but to wait and retry.
249 RTC_CHECK_EQ(EAGAIN, errno); 304 RTC_CHECK_EQ(EAGAIN, errno);
250 ts.tv_sec = 0; 305 ts.tv_sec = 0;
251 ts.tv_nsec = 1000000; 306 ts.tv_nsec = 1000000;
252 nanosleep(&ts, nullptr); 307 nanosleep(&ts, nullptr);
253 } 308 }
254 309
255 thread_.Stop(); 310 thread_.Stop();
256 311
257 event_del(wakeup_event_.get()); 312 event_del(wakeup_event_.get());
258 313
259 IgnoreSigPipeSignalOnCurrentThread(); 314 IgnoreSigPipeSignalOnCurrentThread();
260 315
261 close(wakeup_pipe_in_); 316 close(wakeup_pipe_in_);
262 close(wakeup_pipe_out_); 317 close(wakeup_pipe_out_);
263 wakeup_pipe_in_ = -1; 318 wakeup_pipe_in_ = -1;
264 wakeup_pipe_out_ = -1; 319 wakeup_pipe_out_ = -1;
265 320
266 event_base_free(event_base_); 321 event_base_free(event_base_);
267 } 322 }
268 323
269 // static 324 // static
270 TaskQueue* TaskQueue::Current() { 325 TaskQueue::Impl* TaskQueue::Impl::Current() {
271 QueueContext* ctx = 326 QueueContext* ctx =
272 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 327 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
273 return ctx ? ctx->queue : nullptr; 328 return ctx ? ctx->queue : nullptr;
274 } 329 }
275 330
276 // static 331 // static
277 bool TaskQueue::IsCurrent(const char* queue_name) { 332 TaskQueue* TaskQueue::Impl::CurrentQueue() {
278 TaskQueue* current = Current(); 333 TaskQueue::Impl* current = Current();
334 if (current) {
335 return current->queue_;
336 }
337 return nullptr;
338 }
339
340 // static
341 bool TaskQueue::Impl::IsCurrent(const char* queue_name) {
342 TaskQueue::Impl* current = Current();
279 return current && current->thread_.name().compare(queue_name) == 0; 343 return current && current->thread_.name().compare(queue_name) == 0;
280 } 344 }
281 345
282 bool TaskQueue::IsCurrent() const { 346 bool TaskQueue::Impl::IsCurrent() const {
283 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef()); 347 return IsThreadRefEqual(thread_.GetThreadRef(), CurrentThreadRef());
284 } 348 }
285 349
286 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) { 350 void TaskQueue::Impl::PostTask(std::unique_ptr<QueuedTask> task) {
287 RTC_DCHECK(task.get()); 351 RTC_DCHECK(task.get());
288 // libevent isn't thread safe. This means that we can't use methods such 352 // libevent isn't thread safe. This means that we can't use methods such
289 // as event_base_once to post tasks to the worker thread from a different 353 // as event_base_once to post tasks to the worker thread from a different
290 // thread. However, we can use it when posting from the worker thread itself. 354 // thread. However, we can use it when posting from the worker thread itself.
291 if (IsCurrent()) { 355 if (IsCurrent()) {
292 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::RunTask, 356 if (event_base_once(event_base_, -1, EV_TIMEOUT, &TaskQueue::Impl::RunTask,
293 task.get(), nullptr) == 0) { 357 task.get(), nullptr) == 0) {
294 task.release(); 358 task.release();
295 } 359 }
296 } else { 360 } else {
297 QueuedTask* task_id = task.get(); // Only used for comparison. 361 QueuedTask* task_id = task.get(); // Only used for comparison.
298 { 362 {
299 CritScope lock(&pending_lock_); 363 CritScope lock(&pending_lock_);
300 pending_.push_back(std::move(task)); 364 pending_.push_back(std::move(task));
301 } 365 }
302 char message = kRunTask; 366 char message = kRunTask;
303 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) { 367 if (write(wakeup_pipe_in_, &message, sizeof(message)) != sizeof(message)) {
304 LOG(WARNING) << "Failed to queue task."; 368 LOG(WARNING) << "Failed to queue task.";
305 CritScope lock(&pending_lock_); 369 CritScope lock(&pending_lock_);
306 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) { 370 pending_.remove_if([task_id](std::unique_ptr<QueuedTask>& t) {
307 return t.get() == task_id; 371 return t.get() == task_id;
308 }); 372 });
309 } 373 }
310 } 374 }
311 } 375 }
312 376
313 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task, 377 void TaskQueue::Impl::PostDelayedTask(std::unique_ptr<QueuedTask> task,
314 uint32_t milliseconds) { 378 uint32_t milliseconds) {
315 if (IsCurrent()) { 379 if (IsCurrent()) {
316 TimerEvent* timer = new TimerEvent(std::move(task)); 380 TimerEvent* timer = new TimerEvent(std::move(task));
317 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::RunTimer, timer); 381 EventAssign(&timer->ev, event_base_, -1, 0, &TaskQueue::Impl::RunTimer,
382 timer);
318 QueueContext* ctx = 383 QueueContext* ctx =
319 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 384 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
320 ctx->pending_timers_.push_back(timer); 385 ctx->pending_timers_.push_back(timer);
321 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000), 386 timeval tv = {rtc::dchecked_cast<int>(milliseconds / 1000),
322 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000}; 387 rtc::dchecked_cast<int>(milliseconds % 1000) * 1000};
323 event_add(&timer->ev, &tv); 388 event_add(&timer->ev, &tv);
324 } else { 389 } else {
325 PostTask(std::unique_ptr<QueuedTask>( 390 PostTask(std::unique_ptr<QueuedTask>(
326 new SetTimerTask(std::move(task), milliseconds))); 391 new SetTimerTask(std::move(task), milliseconds)));
327 } 392 }
328 } 393 }
329 394
330 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task, 395 void TaskQueue::Impl::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
331 std::unique_ptr<QueuedTask> reply, 396 std::unique_ptr<QueuedTask> reply,
332 TaskQueue* reply_queue) { 397 TaskQueue::Impl* reply_queue) {
333 std::unique_ptr<QueuedTask> wrapper_task( 398 std::unique_ptr<QueuedTask> wrapper_task(
334 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue, 399 new PostAndReplyTask(std::move(task), std::move(reply), reply_queue,
335 reply_queue->wakeup_pipe_in_)); 400 reply_queue->wakeup_pipe_in_));
336 PostTask(std::move(wrapper_task)); 401 PostTask(std::move(wrapper_task));
337 } 402 }
338 403
339 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
340 std::unique_ptr<QueuedTask> reply) {
341 return PostTaskAndReply(std::move(task), std::move(reply), Current());
342 }
343
344 // static 404 // static
345 void TaskQueue::ThreadMain(void* context) { 405 void TaskQueue::Impl::ThreadMain(void* context) {
346 TaskQueue* me = static_cast<TaskQueue*>(context); 406 TaskQueue::Impl* me = static_cast<TaskQueue::Impl*>(context);
347 407
348 QueueContext queue_context(me); 408 QueueContext queue_context(me);
349 pthread_setspecific(GetQueuePtrTls(), &queue_context); 409 pthread_setspecific(GetQueuePtrTls(), &queue_context);
350 410
351 while (queue_context.is_active) 411 while (queue_context.is_active)
352 event_base_loop(me->event_base_, 0); 412 event_base_loop(me->event_base_, 0);
353 413
354 pthread_setspecific(GetQueuePtrTls(), nullptr); 414 pthread_setspecific(GetQueuePtrTls(), nullptr);
355 415
356 for (TimerEvent* timer : queue_context.pending_timers_) 416 for (TimerEvent* timer : queue_context.pending_timers_)
357 delete timer; 417 delete timer;
358 } 418 }
359 419
360 // static 420 // static
361 void TaskQueue::OnWakeup(int socket, short flags, void* context) { // NOLINT 421 void TaskQueue::Impl::OnWakeup(int socket,
422 short flags,
423 void* context) { // NOLINT
362 QueueContext* ctx = 424 QueueContext* ctx =
363 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 425 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
364 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket); 426 RTC_DCHECK(ctx->queue->wakeup_pipe_out_ == socket);
365 char buf; 427 char buf;
366 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf))); 428 RTC_CHECK(sizeof(buf) == read(socket, &buf, sizeof(buf)));
367 switch (buf) { 429 switch (buf) {
368 case kQuit: 430 case kQuit:
369 ctx->is_active = false; 431 ctx->is_active = false;
370 event_base_loopbreak(ctx->queue->event_base_); 432 event_base_loopbreak(ctx->queue->event_base_);
371 break; 433 break;
(...skipping 26 matching lines...) Expand all
398 reply_task->Run(); 460 reply_task->Run();
399 break; 461 break;
400 } 462 }
401 default: 463 default:
402 RTC_NOTREACHED(); 464 RTC_NOTREACHED();
403 break; 465 break;
404 } 466 }
405 } 467 }
406 468
407 // static 469 // static
408 void TaskQueue::RunTask(int fd, short flags, void* context) { // NOLINT 470 void TaskQueue::Impl::RunTask(int fd, short flags, void* context) { // NOLINT
409 auto* task = static_cast<QueuedTask*>(context); 471 auto* task = static_cast<QueuedTask*>(context);
410 if (task->Run()) 472 if (task->Run())
411 delete task; 473 delete task;
412 } 474 }
413 475
414 // static 476 // static
415 void TaskQueue::RunTimer(int fd, short flags, void* context) { // NOLINT 477 void TaskQueue::Impl::RunTimer(int fd, short flags, void* context) { // NOLINT
416 TimerEvent* timer = static_cast<TimerEvent*>(context); 478 TimerEvent* timer = static_cast<TimerEvent*>(context);
417 if (!timer->task->Run()) 479 if (!timer->task->Run())
418 timer->task.release(); 480 timer->task.release();
419 QueueContext* ctx = 481 QueueContext* ctx =
420 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls())); 482 static_cast<QueueContext*>(pthread_getspecific(GetQueuePtrTls()));
421 ctx->pending_timers_.remove(timer); 483 ctx->pending_timers_.remove(timer);
422 delete timer; 484 delete timer;
423 } 485 }
424 486
425 void TaskQueue::PrepareReplyTask(scoped_refptr<ReplyTaskOwnerRef> reply_task) { 487 void TaskQueue::Impl::PrepareReplyTask(
488 scoped_refptr<ReplyTaskOwnerRef> reply_task) {
426 RTC_DCHECK(reply_task); 489 RTC_DCHECK(reply_task);
427 CritScope lock(&pending_lock_); 490 CritScope lock(&pending_lock_);
428 pending_replies_.push_back(std::move(reply_task)); 491 pending_replies_.push_back(std::move(reply_task));
429 } 492 }
430 493
494 TaskQueue::TaskQueue(const char* queue_name, Priority priority)
495 : impl_(new RefCountedObject<TaskQueue::Impl>(queue_name, this, priority)) {
496 }
497
498 TaskQueue::~TaskQueue() {}
499
500 // static
501 TaskQueue* TaskQueue::Current() {
502 return TaskQueue::Impl::CurrentQueue();
503 }
504
505 // Used for DCHECKing the current queue.
506 // static
507 bool TaskQueue::IsCurrent(const char* queue_name) {
508 return TaskQueue::Impl::IsCurrent(queue_name);
509 }
510
511 bool TaskQueue::IsCurrent() const {
512 return impl_->IsCurrent();
513 }
514
515 void TaskQueue::PostTask(std::unique_ptr<QueuedTask> task) {
516 return TaskQueue::impl_->PostTask(std::move(task));
517 }
518
519 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
520 std::unique_ptr<QueuedTask> reply,
521 TaskQueue* reply_queue) {
522 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
523 reply_queue->impl_.get());
524 }
525
526 void TaskQueue::PostTaskAndReply(std::unique_ptr<QueuedTask> task,
527 std::unique_ptr<QueuedTask> reply) {
528 return TaskQueue::impl_->PostTaskAndReply(std::move(task), std::move(reply),
529 impl_.get());
530 }
531
532 void TaskQueue::PostDelayedTask(std::unique_ptr<QueuedTask> task,
533 uint32_t milliseconds) {
534 return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds);
535 }
536
431 } // namespace rtc 537 } // namespace rtc
OLDNEW
« no previous file with comments | « webrtc/rtc_base/task_queue.h ('k') | webrtc/webrtc.gni » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698