问题描述:项目中有一个数据同步的函数,每隔2分钟被调用。
为了减少性能消耗,始终有两个线程去执行,线程之间通过ManualResetEvent
去传达信号. 先执行生产者线程,如果满足条件,通过ManualResetEvent.set
向消费者发出信号,消费者开始执行,执行完成后调用ManualResetEvent.WaitOne()
进行阻塞。
但问题是:第二次调用同步函数,却无法通过ManualResetEvent.set去唤醒线程。
详细的代码如下:
private static ManualResetEvent _mrePullData = new ManualResetEvent(false);//false初始化状态为无信号,将使WaitOne阻塞
private static Thread _producterThread = null; //生产者线程
private static Thread _costerThread = null; //消费者线程
public ApiResults PullDataFromCenterAMS() //这个函数每隔两分钟被调用
{
ApiResults result = new ApiResults();
result.message = "success";
_logger.LogInformation("调用PullDataFromCenterAMS");
try
{
if (_producterThread == null) //第一次调用,创建线程
{
_producterThread = new Thread(new ThreadStart(Product));
_producterThread.Name = "Product";
_producterThread.Start();
_costerThread = new Thread(new ThreadStart(Cost));
_costerThread.Name = "Cost";
_costerThread.Start();
}
else //第二次执行时,不再创建线程
{
_mrePullData.Set(); //但是这里却没有办法唤醒生产者线程,也就是说无法调用Product()
}
return result;
}
catch (Exception ex)
{
result.message = "faile:" + ex.Message;
return result;
}
}
/// <summary>
/// 生产线
/// </summary>
private void Product()
{
/*判断libraryCode是否有效,需要同步的数据是否存在,符合条件才打开信号*/
string libraryCode = Appsettings.app(new string[] { "App", "LibraryCode" });
_libraryModel = DAOControls.DAOControlsInit().GetLibraryByCode(libraryCode);
if (_libraryModel == null)
{
_logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
_mrePullData.WaitOne(); //没有同步对象,即无信号,则会阻塞
}
else
{
_libraryName = _libraryModel.LibraryCName;
_mrePullData.Set();//表示有信号了,通知WaitOne不再阻塞
Thread.Sleep(100);
}
}
/// <summary>
/// 消费线
/// </summary>
private async void Cost()
{
/*
通过http请求去同步数据,并将同步结果进行日志输出,如果遇到同步错误或者同步完成,将调用_mrePullData.WaitOne()进行阻塞
*/
SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
_mrePullData.WaitOne();
return;
}
else
{
if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
_mrePullData.WaitOne();
return;
}
else
{
if (syncResp.SyncResult.StartsWith("TableSyncFinished")) //某个表同步完成,此时输出日志,继续同步其它表
{
_logger.LogInformation(syncResp.SyncResult);
}
else
{
List<Audit> audits = syncResp.SyncResult.FromJsonString<List<Audit>>();
string msg = await UpdateDataToDB(audits);
if (msg == "success")
{
_logger.LogInformation("此页数据成功同步到DB,继续同步");
}
else
{
_logger.LogInformation("数据同步至DB时报错:" + msg + "请修复问题,同步中断");
_mrePullData.WaitOne();
return;
}
}
ExecuteSyncOrder(); //继续同步
}
}
}
提出问题:ManualResetEvent.set()失效的原因是什么,有什么解决办法呢, 或者是否有其它实现方案
恳请大家指点,不胜感激,期待,谢谢!
你的生产者线程和消费者线程都是异步的,可能存在竞争条件,导致ManualResetEvent的状态不一致,还有你的生产者线程在调用mre->Set()后没有调用mre->WaitOne(),你的消费者线程在调用mre->WaitOne()后没有调用mre->Reset(),而是继续执行
采用chatgpt:
根据您提供的代码,有几个可能导致ManualResetEvent.Set()方法失效的原因:
1、调用ManualResetEvent.Set()之前,_mrePullData对象可能已经处于信号状态。在第一次调用PullDataFromCenterAMS()时,通过new ManualResetEvent(false)创建了_mrePullData对象,并将其初始化为无信号状态。但在第二次调用时,由于没有重新创建对象,因此_mrePullData可能已经处于有信号状态。这将导致Set()方法不起作用。确保在每次调用PullDataFromCenterAMS()之前,将_mrePullData对象重置为无信号状态,可以通过调用_mrePullData.Reset()来实现。
2、另一个可能的原因是,第一次调用PullDataFromCenterAMS()时,生产者线程(Product()方法)还没有执行完成,因此第二次调用时信号被忽略。您可以在第二次调用之前,通过添加适当的同步机制来确保生产者线程已经完成。
以下是修正后的代码示例,其中添加了重置_mrePullData对象的语句,并使用Thread.Join()方法等待生产者线程完成:
private static ManualResetEvent _mrePullData = new ManualResetEvent(false); // false初始化状态为无信号,将使WaitOne阻塞
private static Thread _producterThread = null; // 生产者线程
private static Thread _costerThread = null; // 消费者线程
public ApiResults PullDataFromCenterAMS()
{
ApiResults result = new ApiResults();
result.message = "success";
_logger.LogInformation("调用PullDataFromCenterAMS");
try
{
if (_producterThread == null) // 第一次调用,创建线程
{
_producterThread = new Thread(new ThreadStart(Product));
_producterThread.Name = "Product";
_producterThread.Start();
_costerThread = new Thread(new ThreadStart(Cost));
_costerThread.Name = "Cost";
_costerThread.Start();
}
else // 第二次执行时,不再创建线程
{
_mrePullData.Reset(); // 重置为无信号状态
_mrePullData.Set();
_producterThread.Join(); // 等待生产者线程完成
}
return result;
}
catch (Exception ex)
{
result.message = "faile:" + ex.Message;
return result;
}
}
这只是修复了ManualResetEvent.Set()可能失效的一些常见原因。
原因:你的代码逻辑中,ManualResetEvent对象_mrePullData的Set和Reset都是在生产者线程中调用的,而消费者线程中仅调用了WaitOne方法。这会导致_mrePullData.Set()在PullDataFromCenterAMS()方法中被调用时,无法达到预期的唤醒效果,因为这时的ManualResetEvent对象可能处于已经Set的状态。
方案:在消费者线程执行完成后,调用_mrePullData.Reset()方法,以重置ManualResetEvent对象的状态。这样,在PullDataFromCenterAMS()方法中调用_mrePullData.Set()时,就可以唤醒在_mrePullData.WaitOne()处阻塞的生产者线程。
代码,修改消费者线程的代码:
private async void Cost()
{
/*
通过http请求去同步数据,并将同步结果进行日志输出,如果遇到同步错误或者同步完成,将调用_mrePullData.WaitOne()进行阻塞
*/
SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
_mrePullData.WaitOne();
return;
}
else
{
if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
_mrePullData.WaitOne();
return;
}
else
{
// ...其他代码...
//在消费者线程结束后,重置ManualResetEvent对象的状态
_mrePullData.Reset();
}
}
}
生产者线程一旦成功,就会结束线程,再也不会启动了,应该增加while循环保持线程运行。就可以解决。
ManualResetEvent的set()方法失效的原因可能是由于在多线程环境下,两个线程的执行顺序导致的。在你的代码中,生产者线程(Product)和消费者线程(Cost)是同时启动的,但并不能保证它们的执行顺序。因此,可能出现以下情况:
生产者线程在消费者线程之前调用set()方法:在第二次调用PullDataFromCenterAMS()时,如果生产者线程先执行,它会调用_mrePullData的set()方法。然而,此时消费者线程可能还没有执行到_mrePullData的WaitOne()方法,因此生产者线程的set()方法会失效。
消费者线程在生产者线程之前调用WaitOne()方法:如果在第二次调用PullDataFromCenterAMS()时,消费者线程先执行并调用_mrePullData的WaitOne()方法,此时_mrePullData会进入阻塞状态。而生产者线程的Product()方法中的set()方法会无效,因为_mrePullData已经处于阻塞状态。
为了解决这个问题,你可以考虑以下两个解决办法:
确保消费者线程先执行:在第二次调用PullDataFromCenterAMS()时,可以先启动消费者线程,然后再启动生产者线程。这样可以确保消费者线程在生产者线程之前调用WaitOne()方法,从而避免set()方法失效的问题。
使用AutoResetEvent:AutoResetEvent也是一种线程同步机制,与ManualResetEvent类似,但在每次调用WaitOne()后会自动将信号状态重置为无信号状态。这样,无论生产者线程还是消费者线程,只要调用一次WaitOne()方法,它们都可以等待下一次信号的到来。
对于你的场景,使用AutoResetEvent可能更适合,因为每次只需要消费者线程等待一次信号即可。你可以尝试将_mrePullData替换为AutoResetEvent,并使用其相应的方法(如Set()和WaitOne())来实现你的逻辑。
注意:在使用多线程时,还需要考虑线程安全性和同步问题,以确保数据的一致性和正确性。
参考GPT回答: 代码中没有看到_manualResetEvent对象的初始化代码。确保在使用_manualResetEvent之前进行初始化,例如:
private static ManualResetEvent _mrePullData = new ManualResetEvent(false);
另外,您可以尝试以下解决方法:
private void Cost()
{
while (true)
{
_mrePullData.WaitOne(); //等待信号
// ... 执行同步操作
// 执行完毕后重新阻塞
_mrePullData.Reset();
}
}
这样,每次收到信号后,Cost函数会执行同步操作,并在执行完毕后重新阻塞,等待下一次信号的到来。
根据您提供的代码,ManualResetEvent.Set()
失效的可能原因有:
_mrePullData
对象被重置:检查代码中是否有重置 _mrePullData
对象的操作。如果在第一次调用 PullDataFromCenterAMS()
函数之后重置了 _mrePullData
对象,那么第二次调用时会导致信号失效。确保 _mrePullData
对象只在第一次调用时进行初始化,之后不进行重置操作。
_mrePullData
对象在第一次调用 PullDataFromCenterAMS()
之后被释放:如果 _mrePullData
对象在第一次调用结束后被释放,那么第二次调用时重新创建的 _mrePullData
对象与原对象不同,导致信号无法传递。确保 _mrePullData
对象的生命周期足够长,以保持信号传递的有效性。
线程同步问题:确保线程之间的同步正确。在生产者线程中,确保在调用 WaitOne()
方法之前,消费者线程已经处于等待状态。可以使用锁或其他线程同步机制来确保线程按预期顺序执行。
另外,您还可以考虑使用更高级的线程同步机制,如 Semaphore
或 Monitor
,以实现更灵活和可靠的线程通信。
以下是使用 Semaphore
的示例代码:
private static Semaphore _semaphore = new Semaphore(0, 1); // 初始计数为 0,最大计数为 1
private static Thread _producerThread = null; // 生产者线程
private static Thread _consumerThread = null; // 消费者线程
public ApiResults PullDataFromCenterAMS()
{
ApiResults result = new ApiResults();
result.message = "success";
_logger.LogInformation("调用PullDataFromCenterAMS");
try
{
if (_producerThread == null)
{
_producerThread = new Thread(new ThreadStart(Producer));
_producerThread.Name = "Producer";
_producerThread.Start();
_consumerThread = new Thread(new ThreadStart(Consumer));
_consumerThread.Name = "Consumer";
_consumerThread.Start();
}
else
{
_semaphore.Release(); // 发出信号,唤醒消费者线程
}
return result;
}
catch (Exception ex)
{
result.message = "faile:" + ex.Message;
return result;
}
}
private void Producer()
{
// 判断是否满足条件,打开信号
if (_libraryModel == null)
{
_logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
_semaphore.WaitOne(); // 没有同步对象,即无信号,则会阻塞
}
else
{
_libraryName = _libraryModel.LibraryCName;
_semaphore.Release(); // 表示有信号了,通知 Consumer 不再阻塞
Thread.Sleep(100);
}
}
private async void Consumer()
{
// 执行同步操作
// ...
// 同步完成后等待信号
_semaphore.WaitOne();
// 继续执行其他操作
// ...
}
使用 Semaphore
可以更方便地控制线程之间的同步,使得信号的传递更加可靠。
有可能是因为用到了异步才导致第二次ManualResetEvent.set()不起作用,我已经改成只有一个线程去跑了,
当然依然是使用ManualResetEvent去控制信号,目前是正常的
更改后的代码如下
/// <summary>
/// 生产线
/// </summary>
public void Product()
{
while (true)
{
/*判断libraryCode是否有效,需要同步的数据是否存在,符合条件才打开信号*/
string libraryCode = Appsettings.app(new string[] { "App", "LibraryCode" });
_libraryModel = DAOControls.DAOControlsInit().GetLibraryByCode(libraryCode);
if (_libraryModel == null)
{
_logger.LogInformation("图书馆代码无效,找不到对应的图书馆");
_mrePullData.WaitOne(); //没有同步对象,即无信号,则会阻塞
}
else
{
_libraryName = _libraryModel.LibraryCName;
Cost();
_mrePullData.WaitOne();
_mrePullData.Reset();
}
}
}
/// <summary>
/// 消费线
/// </summary>
public async void Cost()
{
while (true)
{
SyncResp syncResp = await _syncDataRequestServices.PullSysncDataOrder(_httpClientFactory, _syncDataURL, _libraryName, _lastSyncTime);
if (syncResp.SyncResult.StartsWith("Fail")) //拉取数据过程中报错,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
return;
}
else
{
if (syncResp.SyncResult.StartsWith("TotalSyncFinished")) //所有的表都已同步完成,此时输出日志,中断同步
{
_logger.LogInformation(syncResp.SyncResult);
_configurationRoot["App:LastSyncTime"] = DateTime.Now.ToString();
return;
}
else
{
if (syncResp.SyncResult.StartsWith("TableSyncFinished")) //某个表同步完成,此时输出日志,继续同步其它表
{
_logger.LogInformation(syncResp.SyncResult);
}
else
{
List<Audit> audits = syncResp.SyncResult.FromJsonString<List<Audit>>();
string msg = await UpdateDataToDB(audits);
if (msg == "success")
{
_logger.LogInformation("此页数据成功同步到DB,继续同步");
//_logger.LogInformation(syncResp.Message);
}
else
{
_logger.LogInformation("数据同步至DB时报错:" + msg + "请修复问题,同步中断");
return;
}
}
}
}
}
}