C++11线程
用函数方式起一个线程
#include <thread>
void ThreadFunc() {
std::cout << "函数线程开始" << std::endl;
std::cout << "函数线程结束" << std::endl;
}
int main() {
std::thread t(ThreadFunc); // 创建一个线程并开始执行
t.join(); //主线程等待当前线程执行完成再退出
//t.detach(); //主线程和当前线程一起执行
std::cout << "main 函数" << std::endl;
return 0;
}
用类的方式
class Thread {
public:
void operator()() {
std::cout << "类线程开始" << std::endl;
std::cout << "类线程结束" << std::endl;
}
};
int main() {
Thread myThread;
std::thread t(myThread);
t.join();
std::cout << "main 函数" << std::endl;
return 0;
}
用类成员函数的方式
class Thread {
public:
void ThreadFunc(int i) {
std::cout << "类成员函数" << i << std::endl;
}
};
int main() {
Thread myThread;
std::thread t(&Thread::ThreadFunc, myThread, 1);
t.join();
std::cout << "main 函数" << std::endl;
return 0;
}
用lambda表达式
int main() {
auto lamThread = [] {
std::cout << "lambda线程开始" << std::endl;
std::cout << "lambda线程结束" << std::endl;
};
std::thread t(lamThread);
t.join();
std::cout << "main 函数" << std::endl;
return 0;
}
std::thread { [=]
{
while (true) {
cout << "print" << endl;
Sleep(1000);
}
} }.detach();
使用detach时,不可以将指针作为函数的参数。
线程sleep20ms
std::chrono::milliseconds s(20);
std::this_thread::sleep_for(s);
std::async是一个函数模版
原子操作
#include <vector>
#include <mutex>
#include <iostream>
#include <atomic>
using namespace std;
std::atomic<int> g_nCount = 0; // 不是所有运算符都支持原子操作,支持++、--、+=、-=等运算符
//int g_nCount = 0; // 这样定义会错误
void ThreadFunc() {
for (int i = 0; i < 100000; i++) {
g_nCount++;
// g_nCount = g_nCount + 1; // 这种运算符不支持
}
}
int main() {
std::thread t1(ThreadFunc);
std::thread t2(ThreadFunc);
t1.join();
t2.join();
cout << "g_nCount=" << g_nCount << endl;
return 0;
}
C++互斥量
#include <vector>
#include <mutex>
class Thread {
public:
void Push(int i) {
std::chrono::milliseconds s(20);
std::this_thread::sleep_for(s);
m_Mutex.lock();
m_vec.push_back(i);
m_Mutex.unlock();
}
void Pop() {
// 构造函数中加锁,析构函数中释放锁
std::lock_guard<std::mutex> lockGuard(m_Mutex);
m_vec.pop_back();
/*
m_Mutex.lock();
std::unique_lock<std::mutex> uniqueLock(m_Mutex, std::adopt_lock);
//std::adopt_lock 表示互斥量已经被lock了
*/
/*
std::unique_lock<std::mutex> uniqueLock(m_Mutex, std::try_to_lock);
if (uniqueLock.owns_lock()) {
// 拿到了锁
} else {
// 没有拿到锁
}
*/
/*
// 创建一个没有加锁的mutex,析构函数中可以自己unlock
std::unique_lock<std::mutex> uniqueLock(m_Mutex, std::defer_lock);
uniqueLock.lock();
// 处理共享数据
uniqueLock.unlock();
// 其它处理
uniqueLock.lock(); // 这里可以不用自己unlock
// 处理其它共享数据
*/
/*
std::unique_lock<std::mutex> uniqueLock(m_Mutex, std::defer_lock);
if (uniqueLock.try_lock()) {
// 拿到了锁
} else {
// 没有拿到锁
}
*/
}
private:
std::vector<int> m_vec;
std::mutex m_Mutex;
};
C++17线程安全锁
C++17 线程安全锁
class ThreadSafeCounter {
public:
ThreadSafeCounter() = default;
// 多个线程/读者能同时读计数器的值。
unsigned int get() const {
std::shared_lock<std::shared_mutex> lock(mutex_);
return value_;
}
// 只有一个线程/写者能增加/写线程的值。
void increment() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_++;
}
// 只有一个线程/写者能重置/写线程的值。
void reset() {
std::unique_lock<std::shared_mutex> lock(mutex_);
value_ = 0;
}
private:
mutable std::shared_mutex mutex_;
unsigned int value_ = 0;
};
C++11信号量
#include <iostream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>
#include <condition_variable>
#include <mutex>
using namespace std;
class Semaphore {
public:
Semaphore(unsigned long ulCount = 0) : m_ulCount(ulCount) {}
Semaphore(const Semaphore&) = delete;
Semaphore& operator=(const Semaphore&) = delete;
void Signal() {
std::unique_lock<std::mutex> lock(m_Mutex);
++m_ulCount;
m_CondVar.notify_one();
}
void Wait() {
std::unique_lock<std::mutex> lock(m_Mutex);
while (m_ulCount == 0) {
m_CondVar.wait(lock);
}
--m_ulCount;
}
private:
std::mutex m_Mutex;
std::condition_variable m_CondVar;
unsigned long m_ulCount;
};
Semaphore sem1(0);//相当于Linux下sem_init(&sem1,0,0);
void Send() {
while (1) {
printf("Send Req Start\n");
sem1.Wait();//阻塞,等待sem1.signal();执行后才会返回
printf("Send Req, Recv Data\n");
this_thread::sleep_for(chrono::seconds(1));
}
}
void Recv() {
while (1) {
this_thread::sleep_for(chrono::seconds(3));
printf("Recv Data\n");
sem1.Signal();
}
}
int main() {
std::thread t1(Send);
std::thread t2(Recv);
t1.join();
t2.join();
return 0;
}
获取毫秒级时间
获取当前毫秒时间
long long GetCurMs() {
std::chrono::steady_clock::duration s = std::chrono::steady_clock::now().time_since_epoch();
std::chrono::milliseconds mil = std::chrono::duration_cast<std::chrono::milliseconds>(s);
return mil.count();
}
获取毫秒时间差
#include <chrono>
#include <thread>
long long start = GetCurMs();
long long end = GetCurMs();
std::cout << end - start << "毫秒" << std::endl;
线程安全队列
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>
template<typename T>
class threadsafe_queue {
private:
mutable std::mutex mut; // 1 互斥量必须是可变的
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue()
{}
threadsafe_queue(threadsafe_queue const& other) {
std::lock_guard<std::mutex> lk(other.mut);
data_queue=other.data_queue;
}
void push(T new_value) {
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
value=data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk,[this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return false;
value=data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lk(mut);
if(data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
empty()是一个const成员函数,并且传入拷贝构造函数的other形参是一个const引用;因为其他线程可能有这个类型的非const引用对象,并调用变种成员函数,所以这里有必要对互斥量上锁。又因为锁住互斥量是个可变操作,所以互斥量成员必须(修饰)为mutable①才能在empty()和拷贝构造函数中锁住。
std::future使用
#include <iostream>
#include <future>
#include <chrono>
#include <thread>
int f1() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return 1;
}
int f2() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return 2;
}
long long GetCurMs() {
std::chrono::steady_clock::duration s = std::chrono::steady_clock::now().time_since_epoch();
std::chrono::milliseconds mil = std::chrono::duration_cast<std::chrono::milliseconds>(s);
return mil.count();
}
int main() {
long long start = GetCurMs();
int val = f1() + f2();
long long end = GetCurMs();
std::cout << "cost time=" << end - start << ",val=" << val << std::endl;
start = GetCurMs();
std::future<int> an1 = std::async(f1);
std::future<int> an2 = std::async(f2);
val = an1.get() + an2.get();
end = GetCurMs();
std::cout << "cost time=" << end - start << ",val=" << val << std::endl;
return 0;
}
C++11条件变量
#include <queue>
#include <chrono>
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
int main() {
std::queue<int> produced_nums;
std::mutex mtx;
std::condition_variable cv;
bool notified = false; // 通知信号
// 生产者
auto producer = [&]() {
for (int i = 0; ; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(900));
std::unique_lock<std::mutex> lock(mtx);
std::cout << "producing " << i << std::endl;
produced_nums.push(i);
notified = true;
cv.notify_all(); // 此处也可以使用 notify_one
}
};
// 消费者
auto consumer = [&]() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
while (!notified) { // 避免虚假唤醒
cv.wait(lock);
}
// 短暂取消锁,使得生产者有机会在消费者消费空前继续生产
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // 消费者慢于生产者
lock.lock();
while (!produced_nums.empty()) {
std::cout << "consuming " << produced_nums.front() << std::endl;
produced_nums.pop();
}
notified = false;
}
};
// 分别在不同的线程中运行
std::thread p(producer);
std::thread cs[2];
for (int i = 0; i < 2; ++i) {
cs[i] = std::thread(consumer);
}
p.join();
for (int i = 0; i < 2; ++i) {
cs[i].join();
}
return 0;
}
下一篇OpenMP