Parolin 0.7.9 6796
Console (soon DLLs) to do a tar like job
Loading...
Searching...
No Matches
WorkQueue.h
Go to the documentation of this file.
1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 */
9#pragma once
10
11#include "utils/Buffer.h"
12
13#include <atomic>
14#include <cassert>
15#include <cstddef>
16#include <condition_variable>
17#include <cstddef>
18#include <functional>
19#include <mutex>
20#include <queue>
21
22namespace pzstd {
23
25template <typename T>
26class WorkQueue {
27 // Protects all member variable access
28 std::mutex mutex_;
29 std::condition_variable readerCv_;
30 std::condition_variable writerCv_;
31 std::condition_variable finishCv_;
32
33 std::queue<T> queue_;
34 bool done_;
35 std::size_t maxSize_;
36
37 // Must have lock to call this function
38 bool full() const {
39 if (maxSize_ == 0) {
40 return false;
41 }
42 return queue_.size() >= maxSize_;
43 }
44
45 public:
52 WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
53
63 bool push(T&& item) {
64 {
65 std::unique_lock<std::mutex> lock(mutex_);
66 while (full() && !done_) {
67 writerCv_.wait(lock);
68 }
69 if (done_) {
70 return false;
71 }
72 queue_.push(std::move(item));
73 }
74 readerCv_.notify_one();
75 return true;
76 }
77
87 bool pop(T& item) {
88 {
89 std::unique_lock<std::mutex> lock(mutex_);
90 while (queue_.empty() && !done_) {
91 readerCv_.wait(lock);
92 }
93 if (queue_.empty()) {
94 assert(done_);
95 return false;
96 }
97 item = std::move(queue_.front());
98 queue_.pop();
99 }
100 writerCv_.notify_one();
101 return true;
102 }
103
109 void setMaxSize(std::size_t maxSize) {
110 {
111 std::lock_guard<std::mutex> lock(mutex_);
112 maxSize_ = maxSize;
113 }
114 writerCv_.notify_all();
115 }
116
121 void finish() {
122 {
123 std::lock_guard<std::mutex> lock(mutex_);
124 assert(!done_);
125 done_ = true;
126 }
127 readerCv_.notify_all();
128 writerCv_.notify_all();
129 finishCv_.notify_all();
130 }
131
134 std::unique_lock<std::mutex> lock(mutex_);
135 while (!done_) {
136 finishCv_.wait(lock);
137 }
138 }
139};
140
142class BufferWorkQueue {
143 WorkQueue<Buffer> queue_;
144 std::atomic<std::size_t> size_;
145
146 public:
147 BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
148
149 void push(Buffer buffer) {
150 size_.fetch_add(buffer.size());
151 queue_.push(std::move(buffer));
152 }
153
154 bool pop(Buffer& buffer) {
155 bool result = queue_.pop(buffer);
156 if (result) {
157 size_.fetch_sub(buffer.size());
158 }
159 return result;
160 }
161
162 void setMaxSize(std::size_t maxSize) {
163 queue_.setMaxSize(maxSize);
164 }
165
166 void finish() {
167 queue_.finish();
168 }
169
176 std::size_t size() {
177 queue_.waitUntilFinished();
178 return size_.load();
179 }
180};
181}
Definition Buffer.h:27
std::size_t size() const
Definition Buffer.h:91
void push(Buffer buffer)
Definition WorkQueue.h:149
void finish()
Definition WorkQueue.h:166
BufferWorkQueue(std::size_t maxSize=0)
Definition WorkQueue.h:147
void setMaxSize(std::size_t maxSize)
Definition WorkQueue.h:162
std::size_t size()
Definition WorkQueue.h:176
bool pop(Buffer &buffer)
Definition WorkQueue.h:154
bool pop(T &item)
Definition WorkQueue.h:87
void finish()
Definition WorkQueue.h:121
bool push(T &&item)
Definition WorkQueue.h:63
void waitUntilFinished()
Blocks until finish() has been called (but the queue may not be empty).
Definition WorkQueue.h:133
WorkQueue(std::size_t maxSize=0)
Definition WorkQueue.h:52
void setMaxSize(std::size_t maxSize)
Definition WorkQueue.h:109
#define assert(condition)
Definition lz4.c:273
Definition ErrorHolder.h:16
#define false
Definition longfile.c:42