登錄 |創建賬號 |找回密碼
查看: 91|回復: 2

一個簡易的綫程池

[複製鏈接]

23

主題

6

回帖

130

積分

管理員

Rank: 9Rank: 9Rank: 9

積分
130
發表於 2024-3-23 20:26:53 | 顯示全部樓層 |閱讀模式
ThreadPool.h:

  1. // ThreadPool.h
  2. #ifndef THREADPOOL_H
  3. #define THREADPOOL_H

  4. #include <vector>
  5. #include <queue>
  6. #include <memory>
  7. #include <thread>
  8. #include <condition_variable>
  9. #include <mutex>
  10. #include <future>
  11. #include <functional>
  12. #include <stdexcept>

  13. class ThreadPool {
  14. public:
  15.     ThreadPool(size_t);
  16.     ~ThreadPool();

  17.     // 声明模板成员函数
  18.     template<class F, class... Args>
  19.     auto enqueue(F&& f, Args&&... args)
  20.         -> std::future<typename std::result_of<F(Args...)>::type>;

  21. private:
  22.     std::vector<std::thread> workers;
  23.     std::queue<std::function<void()>> tasks;
  24.    
  25.     std::mutex queue_mutex;
  26.     std::condition_variable condition;
  27.     bool stop;
  28. };

  29. #include "ThreadPool.tpp" // 包含模板实现

  30. #endif // THREADPOOL_H
複製代碼
ThreadPool.tpp
  1. // ThreadPool.tpp
  2. #ifndef THREADPOOL_TPP
  3. #define THREADPOOL_TPP

  4. #include "ThreadPool.h"

  5. template<class F, class... Args>
  6. auto ThreadPool::enqueue(F&& f, Args&&... args)
  7.     -> std::future<typename std::result_of<F(Args...)>::type>
  8. {
  9.     using return_type = typename std::result_of<F(Args...)>::type;

  10.     auto task = std::make_shared< std::packaged_task<return_type()> >(
  11.             std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  12.         );
  13.    
  14.     std::future<return_type> res = task->get_future();
  15.     {
  16.         std::unique_lock<std::mutex> lock(queue_mutex);

  17.         // don't allow enqueueing after stopping the pool
  18.         if(stop)
  19.             throw std::runtime_error("enqueue on stopped ThreadPool");

  20.         tasks.emplace([task](){ (*task)(); });
  21.     }
  22.     condition.notify_one();
  23.     return res;
  24. }

  25. #endif // THREADPOOL_TPP
複製代碼
ThreadPool.cpp
  1. // pool.cpp
  2. #include "ThreadPool.h"

  3. ThreadPool::ThreadPool(size_t threads)
  4.     : stop(false)
  5. {
  6.     for(size_t i = 0; i<threads; ++i)
  7.         workers.emplace_back(
  8.             [this]
  9.             {
  10.                 for(;;)
  11.                 {
  12.                     std::function<void()> task;

  13.                     {
  14.                         std::unique_lock<std::mutex> lock(this->queue_mutex);
  15.                         this->condition.wait(lock,
  16.                             [this]{ return this->stop || !this->tasks.empty(); });
  17.                         if(this->stop && this->tasks.empty())
  18.                             return;
  19.                         task = std::move(this->tasks.front());
  20.                         this->tasks.pop();
  21.                     }

  22.                     task();
  23.                 }
  24.             }
  25.         );
  26. }

  27. ThreadPool::~ThreadPool()
  28. {
  29.     {
  30.         std::unique_lock<std::mutex> lock(queue_mutex);
  31.         stop = true;
  32.     }
  33.     condition.notify_all();
  34.     for(auto &worker: workers)
  35.         worker.join();
  36. }
複製代碼
main.cpp
  1. #include <iostream>
  2. #include <string>
  3. #include <random>
  4. #include <chrono>
  5. #include <unordered_map>
  6. #include <mutex>
  7. #include "ThreadPool.h"

  8. std::string generateRandomString(size_t length) {
  9.     const std::string chars = "abcdefghijklmnopqrstuvwxyz";
  10.     std::random_device random_device;
  11.     std::mt19937 generator(random_device());
  12.     std::uniform_int_distribution<> distribution(0, chars.size() - 1);

  13.     std::string random_string;
  14.     for (size_t i = 0; i < length; ++i) {
  15.         random_string += chars[distribution(generator)];
  16.     }

  17.     return random_string;
  18. }

  19. void calculateCharacterFrequencies(const std::string& input, int threadNum) {
  20.     auto start = std::chrono::high_resolution_clock::now();
  21.     std::unordered_map<char, int> frequencies;
  22.     for (char c : input) {
  23.         frequencies[c]++;
  24.     }
  25.     auto end = std::chrono::high_resolution_clock::now();
  26.     std::chrono::duration<double, std::milli> duration = end - start;

  27.     // Thread-safe output
  28.     static std::mutex cout_mutex;
  29.     {
  30.         std::lock_guard<std::mutex> lock(cout_mutex);
  31.         std::cout << input << ", Thread" << threadNum << ", ";
  32.         for (const auto& pair : frequencies) {
  33.             std::cout << pair.first << ":" << pair.second << ",";
  34.         }
  35.         std::cout << "time:" << duration.count() << "ms\n";
  36.     }
  37. }

  38. int main() {
  39.     ThreadPool pool(5);
  40.     std::vector<std::future<void>> futures;

  41.     auto totalStart = std::chrono::high_resolution_clock::now();
  42.     for (int i = 0; i < 100; ++i) {
  43.         size_t length = 30 + rand() % 71; // Random length between 30 and 100
  44.         std::string randomString = generateRandomString(length);
  45.         futures.push_back(pool.enqueue(calculateCharacterFrequencies, randomString, i % 5 + 1));
  46.     }

  47.     // Wait for all tasks to complete
  48.     for (auto& fut : futures) {
  49.         fut.get();
  50.     }

  51.     auto totalEnd = std::chrono::high_resolution_clock::now();
  52.     std::chrono::duration<double, std::milli> totalDuration = totalEnd - totalStart;
  53.     std::cout << "Total execution time: " << totalDuration.count() << "ms\n";

  54.     return 0;
  55. }
複製代碼
編譯命令:
  1. g++ -std=c++11 -pthread ThreadPool.cpp main.cpp -o main
複製代碼


回復

使用道具 舉報

23

主題

6

回帖

130

積分

管理員

Rank: 9Rank: 9Rank: 9

積分
130
 樓主| 發表於 2024-3-23 20:28:10 | 顯示全部樓層
这段代码实现了以下功能:

generateRandomString(size_t length)函数生成一个指定长度的随机字符串,仅包含小写字母。
calculateCharacterFrequencies(const std::string& input, int threadNum)函数计算一个字符串中每个字符出现的次数,并打印出来,包括处理该任务的线程号和任务执行所需的时间。这里使用了一个static std::mutex来确保输出时的线程安全。
在main函数中,我们首先生成了100个随机字符串,并将它们的处理任务提交到了线程池中。我们通过enqueue方法的返回值(一个std::future对象)来追踪任务的完成情况。完成所有任务后,计算并输出总体执行时间。
回復

使用道具 舉報

23

主題

6

回帖

130

積分

管理員

Rank: 9Rank: 9Rank: 9

積分
130
 樓主| 發表於 2024-3-23 20:32:59 | 顯示全部樓層
一個簡單的調用示例
  1. #include <iostream>
  2. #include <string>
  3. #include <chrono>
  4. #include "ThreadPool.h"

  5. // 一个简单的数学计算函数
  6. int multiply(int a, int b) {
  7.     return a * b;
  8. }

  9. // 一个字符串操作函数
  10. std::string concat(const std::string& a, const std::string& b) {
  11.     return a + b;
  12. }

  13. // 一个模拟耗时操作的函数
  14. std::string simulateLongComputationTask(const std::string& input) {
  15.     std::this_thread::sleep_for(std::chrono::seconds(3)); // 模拟耗时任务
  16.     return "Processed: " + input;
  17. }

  18. int main() {
  19.     ThreadPool pool(4); // 创建一个拥有4个工作线程的线程池

  20.     // 任务1: 数学计算
  21.     auto task1 = pool.enqueue(multiply, 5, 6);
  22.    
  23.     // 任务2: 字符串操作
  24.     auto task2 = pool.enqueue(concat, "Hello, ", "World!");

  25.     // 任务3: 耗时任务
  26.     auto task3 = pool.enqueue(simulateLongComputationTask, "Task 3");

  27.     // 获取并输出结果
  28.     std::cout << "Task 1 result: " << task1.get() << std::endl;
  29.     std::cout << "Task 2 result: " << task2.get() << std::endl;
  30.     std::cout << "Task 3 result: " << task3.get() << std::endl;

  31.     return 0;
  32. }
複製代碼
回復

使用道具 舉報

您需要登錄後才可以回帖 登錄 | 創建賬號

本版積分規則

Archiver|手機版|小黑屋|九派社區 ( 苏ICP备07501547号-12 )

GMT+8, 2024-5-21 12:36 , Processed in 0.064289 second(s), 27 queries .

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回復 返回頂部 返回列表