ManualResetEvent.set()不生效的问题

问题描述:项目中有一个数据同步的函数,每隔2分钟被调用。
为了减少性能消耗,始终有两个线程去执行,线程之间通过ManualResetEvent
去传达信号. 先执行生产者线程,如果满足条件,通过ManualResetEvent.set
向消费者发出信号,消费者开始执行,执行完成后调用ManualResetEvent.WaitOne()
进行阻塞。
但问题是:第二次调用同步函数,却无法通过ManualResetEvent.set去唤醒线程。
详细的代码如下:

private static ManualResetEvent _mrePullData = new ManualResetEvent(false);//false初始化状态为无信号,将使WaitOne阻塞
private static Thread _producterThread = null; //生产者线程
private static Thread _costerThread = null;    //消费者线程

public ApiResults PullDataFromCenterAMS()  //这个函数每隔两分钟被调用
{
    ApiResults result = new ApiResults();
    result.message = "success";
    _logger.LogInformation("调用PullDataFromCenterAMS");
    try
    {
        if (_producterThread == null) //第一次调用,创建线程
        {
            _producterThread = new Thread(new ThreadStart(Product));
            _producterThread.Name = "Product";
            _producterThread.Start();
            _costerThread = new Thread(new ThreadStart(Cost));
            _costerThread.Name = "Cost";
            _costerThread.Start();
        }
        else //第二次执行时,不再创建线程
        {
            _mrePullData.Set();  //但是这里却没有办法唤醒生产者线程,也就是说无法调用Product()
        }
        return result;
    }
    catch (Exception ex)
    {
        result.message = "faile:" + ex.Message;
        return result;
    }
}
/// <summary>
/// 生产线
/// </summary>
private void Product()
{
    /*判断libraryCode是否有效,需要同步的数据是否存在,符合条件才打开信号*/
    string libraryCode = Appsettings.app(new string[] { "App", "LibraryCode" });
    _libraryModel = DAOControls.DAOControlsInit().GetLibraryByCode(libraryCode);
    if (_libraryModel == null)
    {
        _logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
        _mrePullData.WaitOne(); //没有同步对象,即无信号,则会阻塞
    }
    else
    {
        _libraryName = _libraryModel.LibraryCName;
        _mrePullData.Set();//表示有信号了,通知WaitOne不再阻塞
        Thread.Sleep(100);
    }
}
/// <summary>
/// 消费线
/// </summary>
private async void Cost()
{
    /*
    通过http请求去同步数据,并将同步结果进行日志输出,如果遇到同步错误或者同步完成,将调用_mrePullData.WaitOne()进行阻塞
    */
    SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
    if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
    {
        _logger.LogInformation(syncResp.SyncResult);
        _mrePullData.WaitOne();
        return;
    }
    else
    {
        if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
        {
            _logger.LogInformation(syncResp.SyncResult);
            _mrePullData.WaitOne();
            return;
        }
        else
        {
            if (syncResp.SyncResult.StartsWith("TableSyncFinished")) //某个表同步完成,此时输出日志,继续同步其它表
            {
                _logger.LogInformation(syncResp.SyncResult);
            }
            else
            {
                List<Audit> audits = syncResp.SyncResult.FromJsonString<List<Audit>>();
                string msg = await UpdateDataToDB(audits);
                if (msg == "success")
                {
                    _logger.LogInformation("此页数据成功同步到DB,继续同步");
                }
                else
                {
                    _logger.LogInformation("数据同步至DB时报错:" + msg + "请修复问题,同步中断");
                    _mrePullData.WaitOne();
                    return;
                }
            }
            ExecuteSyncOrder(); //继续同步
        }
    }
}

提出问题:ManualResetEvent.set()失效的原因是什么,有什么解决办法呢, 或者是否有其它实现方案
恳请大家指点,不胜感激,期待,谢谢!

你的生产者线程和消费者线程都是异步的,可能存在竞争条件,导致ManualResetEvent的状态不一致,还有你的生产者线程在调用mre->Set()后没有调用mre->WaitOne(),你的消费者线程在调用mre->WaitOne()后没有调用mre->Reset(),而是继续执行

采用chatgpt:
根据您提供的代码,有几个可能导致ManualResetEvent.Set()方法失效的原因:

1、调用ManualResetEvent.Set()之前,_mrePullData对象可能已经处于信号状态。在第一次调用PullDataFromCenterAMS()时,通过new ManualResetEvent(false)创建了_mrePullData对象,并将其初始化为无信号状态。但在第二次调用时,由于没有重新创建对象,因此_mrePullData可能已经处于有信号状态。这将导致Set()方法不起作用。确保在每次调用PullDataFromCenterAMS()之前,将_mrePullData对象重置为无信号状态,可以通过调用_mrePullData.Reset()来实现。

2、另一个可能的原因是,第一次调用PullDataFromCenterAMS()时,生产者线程(Product()方法)还没有执行完成,因此第二次调用时信号被忽略。您可以在第二次调用之前,通过添加适当的同步机制来确保生产者线程已经完成。

以下是修正后的代码示例,其中添加了重置_mrePullData对象的语句,并使用Thread.Join()方法等待生产者线程完成:

private static ManualResetEvent _mrePullData = new ManualResetEvent(false); // false初始化状态为无信号,将使WaitOne阻塞
private static Thread _producterThread = null; // 生产者线程
private static Thread _costerThread = null; // 消费者线程

public ApiResults PullDataFromCenterAMS()
{
    ApiResults result = new ApiResults();
    result.message = "success";
    _logger.LogInformation("调用PullDataFromCenterAMS");
    try
    {
        if (_producterThread == null) // 第一次调用,创建线程
        {
            _producterThread = new Thread(new ThreadStart(Product));
            _producterThread.Name = "Product";
            _producterThread.Start();
            _costerThread = new Thread(new ThreadStart(Cost));
            _costerThread.Name = "Cost";
            _costerThread.Start();
        }
        else // 第二次执行时,不再创建线程
        {
            _mrePullData.Reset(); // 重置为无信号状态
            _mrePullData.Set();
            _producterThread.Join(); // 等待生产者线程完成
        }
        return result;
    }
    catch (Exception ex)
    {
        result.message = "faile:" + ex.Message;
        return result;
    }
}

这只是修复了ManualResetEvent.Set()可能失效的一些常见原因。

TechWhizKid参考GPT回答:

  • 原因:你的代码逻辑中,ManualResetEvent对象_mrePullData的Set和Reset都是在生产者线程中调用的,而消费者线程中仅调用了WaitOne方法。这会导致_mrePullData.Set()在PullDataFromCenterAMS()方法中被调用时,无法达到预期的唤醒效果,因为这时的ManualResetEvent对象可能处于已经Set的状态。

  • 方案:在消费者线程执行完成后,调用_mrePullData.Reset()方法,以重置ManualResetEvent对象的状态。这样,在PullDataFromCenterAMS()方法中调用_mrePullData.Set()时,就可以唤醒在_mrePullData.WaitOne()处阻塞的生产者线程。

代码,修改消费者线程的代码:

private async void Cost()
{
    /*
    通过http请求去同步数据,并将同步结果进行日志输出,如果遇到同步错误或者同步完成,将调用_mrePullData.WaitOne()进行阻塞
    */
    SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
    if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
    {
        _logger.LogInformation(syncResp.SyncResult);
        _mrePullData.WaitOne();
        return;
    }
    else
    {
        if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
        {
            _logger.LogInformation(syncResp.SyncResult);
            _mrePullData.WaitOne();
            return;
        }
        else
        {
            // ...其他代码...

            //在消费者线程结束后,重置ManualResetEvent对象的状态
            _mrePullData.Reset();
        }
    }
}


生产者线程一旦成功,就会结束线程,再也不会启动了,应该增加while循环保持线程运行。就可以解决。

ManualResetEvent的set()方法失效的原因可能是由于在多线程环境下,两个线程的执行顺序导致的。在你的代码中,生产者线程(Product)和消费者线程(Cost)是同时启动的,但并不能保证它们的执行顺序。因此,可能出现以下情况:

  1. 生产者线程在消费者线程之前调用set()方法:在第二次调用PullDataFromCenterAMS()时,如果生产者线程先执行,它会调用_mrePullData的set()方法。然而,此时消费者线程可能还没有执行到_mrePullData的WaitOne()方法,因此生产者线程的set()方法会失效。

  2. 消费者线程在生产者线程之前调用WaitOne()方法:如果在第二次调用PullDataFromCenterAMS()时,消费者线程先执行并调用_mrePullData的WaitOne()方法,此时_mrePullData会进入阻塞状态。而生产者线程的Product()方法中的set()方法会无效,因为_mrePullData已经处于阻塞状态。

为了解决这个问题,你可以考虑以下两个解决办法:

  1. 确保消费者线程先执行:在第二次调用PullDataFromCenterAMS()时,可以先启动消费者线程,然后再启动生产者线程。这样可以确保消费者线程在生产者线程之前调用WaitOne()方法,从而避免set()方法失效的问题。

  2. 使用AutoResetEvent:AutoResetEvent也是一种线程同步机制,与ManualResetEvent类似,但在每次调用WaitOne()后会自动将信号状态重置为无信号状态。这样,无论生产者线程还是消费者线程,只要调用一次WaitOne()方法,它们都可以等待下一次信号的到来。

对于你的场景,使用AutoResetEvent可能更适合,因为每次只需要消费者线程等待一次信号即可。你可以尝试将_mrePullData替换为AutoResetEvent,并使用其相应的方法(如Set()和WaitOne())来实现你的逻辑。

注意:在使用多线程时,还需要考虑线程安全性和同步问题,以确保数据的一致性和正确性。

参考GPT回答: 代码中没有看到_manualResetEvent对象的初始化代码。确保在使用_manualResetEvent之前进行初始化,例如:

private static ManualResetEvent _mrePullData = new ManualResetEvent(false);

另外,您可以尝试以下解决方法:

  1. 确保_manualResetEvent对象只初始化一次,不要在每次调用PullDataFromCenterAMS函数时重新创建线程。
  2. 检查是否有其他地方调用了_manualResetEvent.Reset()来清除信号状态。这可能会导致_manualResetEvent不能正常工作。
  3. 请确保Cost函数中的_manualResetEvent.WaitOne()包裹在一个循环中,以便在收到信号后再次检查条件是否满足。例如:
private void Cost()
{
    while (true)
    {
        _mrePullData.WaitOne(); //等待信号

        // ... 执行同步操作

        // 执行完毕后重新阻塞
        _mrePullData.Reset();
    }
}

这样,每次收到信号后,Cost函数会执行同步操作,并在执行完毕后重新阻塞,等待下一次信号的到来。

  1. 可以考虑使用更高级的并发控制机制,例如Semaphore或CountdownEvent,它们提供了更灵活的信号控制功能。

根据您提供的代码,ManualResetEvent.Set()失效的可能原因有:

  1. _mrePullData对象被重置:检查代码中是否有重置 _mrePullData 对象的操作。如果在第一次调用 PullDataFromCenterAMS() 函数之后重置了 _mrePullData 对象,那么第二次调用时会导致信号失效。确保 _mrePullData 对象只在第一次调用时进行初始化,之后不进行重置操作。

  2. _mrePullData 对象在第一次调用 PullDataFromCenterAMS() 之后被释放:如果 _mrePullData 对象在第一次调用结束后被释放,那么第二次调用时重新创建的 _mrePullData 对象与原对象不同,导致信号无法传递。确保 _mrePullData 对象的生命周期足够长,以保持信号传递的有效性。

  3. 线程同步问题:确保线程之间的同步正确。在生产者线程中,确保在调用 WaitOne() 方法之前,消费者线程已经处于等待状态。可以使用锁或其他线程同步机制来确保线程按预期顺序执行。

另外,您还可以考虑使用更高级的线程同步机制,如 SemaphoreMonitor,以实现更灵活和可靠的线程通信。

以下是使用 Semaphore 的示例代码:

private static Semaphore _semaphore = new Semaphore(0, 1); // 初始计数为 0,最大计数为 1
private static Thread _producerThread = null; // 生产者线程
private static Thread _consumerThread = null; // 消费者线程

public ApiResults PullDataFromCenterAMS()
{
    ApiResults result = new ApiResults();
    result.message = "success";
    _logger.LogInformation("调用PullDataFromCenterAMS");
    try
    {
        if (_producerThread == null)
        {
            _producerThread = new Thread(new ThreadStart(Producer));
            _producerThread.Name = "Producer";
            _producerThread.Start();
            
            _consumerThread = new Thread(new ThreadStart(Consumer));
            _consumerThread.Name = "Consumer";
            _consumerThread.Start();
        }
        else
        {
            _semaphore.Release(); // 发出信号,唤醒消费者线程
        }
        return result;
    }
    catch (Exception ex)
    {
        result.message = "faile:" + ex.Message;
        return result;
    }
}

private void Producer()
{
    // 判断是否满足条件,打开信号
    if (_libraryModel == null)
    {
        _logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
        _semaphore.WaitOne(); // 没有同步对象,即无信号,则会阻塞
    }
    else
    {
        _libraryName = _libraryModel.LibraryCName;
        _semaphore.Release(); // 表示有信号了,通知 Consumer 不再阻塞
        Thread.Sleep(100);
    }
}

private async void Consumer()
{
    // 执行同步操作
    // ...

    // 同步完成后等待信号
    _semaphore.WaitOne();
    // 继续执行其他操作
    // ...
}

使用 Semaphore 可以更方便地控制线程之间的同步,使得信号的传递更加可靠。

有可能是因为用到了异步才导致第二次ManualResetEvent.set()不起作用,我已经改成只有一个线程去跑了,
当然依然是使用ManualResetEvent去控制信号,目前是正常的
更改后的代码如下

        /// <summary>
        /// 生产线
        /// </summary>
        public void Product()
        {
            while (true)
            {
                /*判断libraryCode是否有效,需要同步的数据是否存在,符合条件才打开信号*/
                string libraryCode = Appsettings.app(new string[] { "App", "LibraryCode" });
                _libraryModel = DAOControls.DAOControlsInit().GetLibraryByCode(libraryCode);
                if (_libraryModel == null)
                {
                    _logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
                    _mrePullData.WaitOne(); //没有同步对象,即无信号,则会阻塞
                }
                else
                {
                    _libraryName = _libraryModel.LibraryCName;
                    Cost();
                    _mrePullData.WaitOne();
                    _mrePullData.Reset();
                }
            }
        }
        /// <summary>
        /// 消费线
        /// </summary>
        public async void Cost()
        {
            while (true)
            {
                SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
                if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
                {
                    _logger.LogInformation(syncResp.SyncResult);
                    return;
                }
                else
                {
                    if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
                    {
                        _logger.LogInformation(syncResp.SyncResult);
                        _configurationRoot["App:LastSyncTime"] = DateTime.Now.ToString();
                        return;
                    }
                    else
                    {
                        if (syncResp.SyncResult.StartsWith("TableSyncFinished")) //某个表同步完成,此时输出日志,继续同步其它表
                        {
                            _logger.LogInformation(syncResp.SyncResult);
                        }
                        else
                        {
                            List<Audit> audits = syncResp.SyncResult.FromJsonString<List<Audit>>();
                            string msg = await UpdateDataToDB(audits);
                            if (msg == "success")
                            {
                                _logger.LogInformation("此页数据成功同步到DB,继续同步");
                                //_logger.LogInformation(syncResp.Message);
                            }
                            else
                            {
                                _logger.LogInformation("数据同步至DB时报错:" + msg + "请修复问题,同步中断");
                                return;
                            }
                        }
                    }
                }
            }
        }