现在遇到应用场景如下
我需要监控200个目录,每个目录可能会有新的文本文件拷贝进来,当目录中有新的文本文件进来的时候,读取文本并开始解析处理,生成新的文本,将老的文本删除.
每个目录对文本的处理方式都不一样,往往文本都比较大, 100MB 到1GB, 每个文本读取并解析处理的完成的时间 大概 需要10秒到 5分钟
问题:
针对上面场景
处理方式1:
是创建200个线程,每个线程都在不停的扫描目录, 发现文件 就开始处理,处理完成后,进行下一次目录扫描,没有发现新文件就线程 sleep, 线程不停的扫描目录
处理方式2:
我的电脑是CPU,16核心的,创建 一个有16个线程的线程池, 当发现目录有文件时,从线程池中拉一个线程处理处理,处理完成后,放回线程池.
各位,有没有好的建议?谢谢
援引GPT回答:
针对这个场景,可以考虑使用处理方式2,即创建一个有16个线程的线程池。
使用线程池的好处是可以限制同时运行的线程数量,避免创建过多的线程导致系统资源的浪费。同时,线程池可以复用线程,减少线程创建和销毁的开销。
在线程池中,可以使用一个任务队列来存放需要处理的目录。当有新的目录需要处理时,直接将任务放入任务队列中。线程池中的线程会不断从任务队列中取出任务进行处理,处理完成后再将线程放回线程池中等待下一个任务。
这样可以有效地利用系统资源,同时也能保证每个目录的处理方式都是并发进行的,提高处理效率。
需要注意的是,在使用线程池时,需要合理设置线程池的大小。根据你的电脑是16核心的,可以设置线程池的大小为16。这样可以最大限度地发挥系统的并行处理能力。
此外,还可以考虑使用一些同步机制来确保线程安全,比如互斥锁或读写锁,以防止多个线程同时访问同一个目录或文件导致的竞态条件。
总之,使用线程池可以更好地管理和调度线程,提高程序的并发处理能力和效率。
那么多线程其实没什么意义,因为你的瓶颈不在cpu而是在磁盘io上
对于你的应用场景,我有几点建议可以供你参考:
方案评估:首先,评估你的应用场景对于性能和资源利用的要求。考虑到你的目录数目较多且每个目录的处理方式都不一样,可能会对系统资源造成较大的负担。此外,你也需要注意系统的稳定性和可维护性。
异步处理:使用异步处理的方式可以提高整体系统的效率。可以考虑使用异步IO库(比如asyncio
)来实现异步文件扫描和处理。这样可以避免创建过多的线程和线程之间的上下文切换开销。
线程池:你可以考虑使用线程池来管理并发处理任务。线程池可以重用线程资源,避免频繁地创建和销毁线程。这样可以减少系统资源开销,并可以根据实际情况合理配置线程池的大小。
文件监控:你可以使用文件系统的监控功能来监听目录中的新文件。不需要频繁地扫描目录,而是在文件创建事件触发时进行处理。可使用watchdog
等类似库实现文件监控功能。
分布式处理:如果你的应用场景需要更高的并发处理能力,可以考虑使用分布式处理框架(如Apache Spark
、Hadoop
等)来进行任务的分发和处理。这样可以利用多台机器的计算资源并行处理文件。
总体而言,选择适合你需求的方案需要综合考虑你的系统性能、过程复杂性、可维护性等因素。可以先进行一些性能评估和压力测试,再选择合适的方案进行实施。
【以下回答由 GPT 生成】
建议采用处理方式2: 使用一个有16个线程的线程池来处理这个问题。
以下是实现方式的步骤:
首先,引入C++的多线程库<thread>
和线程池库<ThreadPool.h>
,可以使用已有的线程池库,如ThreadPool
(一个简单易用的开源库,可在GitHub上找到)。
初始化一个具有16个线程的线程池。
#include <ThreadPool.h>
...
int numThreads = 16;
ThreadPool pool(numThreads);
void processDirectory(const std::string& directory) {
// 扫描目录中的文件
std::vector<std::string> files = scanDirectory(directory);
for(const auto& file : files) {
// 处理文件
processFile(file);
// 删除原始文件
removeFile(file);
}
}
void monitorDirectories(const std::vector<std::string>& directories, ThreadPool& pool) {
while(true) {
for(const auto& directory : directories) {
// 扫描目录中的文件
std::vector<std::string> files = scanDirectory(directory);
for(const auto& file : files) {
// 提交任务给线程池处理
pool.enqueue([directory, file](){
processDirectory(directory);
});
}
}
// 休眠片刻再次扫描目录
std::this_thread::sleep_for(std::chrono::seconds(10));
}
}
monitorDirectories
函数开始监控目录并处理新文件。std::vector<std::string> directories = {"directory1", "directory2", ...};
monitorDirectories(directories, pool);
通过使用以上步骤,你将能够创建一个具有16个线程的线程池来监控200个目录,并在有新文件时进行处理。每个新文件将会被提交给线程池中的一个空闲线程进行处理,从而实现并行处理的效果。
这样,你就能够根据处理时间的长短和CPU核心数来动态调整线程池的大小,提高处理效率,并且解决了处理方式1中创建200个线程的资源浪费问题。
【相关推荐】
仅供参考:
#pragma warning(disable:4996 4477)
#pragma comment(lib,"user32")
#define WINVER 0x0501
#define _WIN32_WINNT 0x0501
//#include <SDKDDKVer.h>
#include <share.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <io.h>
#include <tchar.h>
#include <Windows.h>
// this is the all purpose structure that contains the interesting directory information and provides the input buffer that is filled with file change data {
#define MAX_DIRS 10
#define MAX_FILES 1000
#define MAX_BUFFER 4096
typedef struct _DIRECTORY_INFO {
HANDLE hDir;
TCHAR lpszDirName[MAX_PATH];
CHAR lpBuffer[MAX_BUFFER];
DWORD dwBufLength;
OVERLAPPED Overlapped;
} DIRECTORY_INFO, *LPDIRECTORY_INFO;
DIRECTORY_INFO DirInfo[MAX_DIRS]; // Buffer for all of the directories
DWORD numDirs;
TCHAR DirList[MAX_DIRS*MAX_PATH]; // Buffer for all of the directories
TCHAR *p;
//}
//Log{
#define MAXLOGSIZE 20000000
#define MAXLINSIZE 16000
#include <time.h>
#include <sys/timeb.h>
#include <stdarg.h>
char logfilename1[]="dirmon1.log";
char logfilename2[]="dirmon2.log";
static char logstr[MAXLINSIZE+1];
char datestr[16];
char timestr[16];
char mss[4];
CRITICAL_SECTION cs_log;
FILE *flog;
char pathfilename[_MAX_FNAME];
char path[_MAX_FNAME];
char filename[_MAX_FNAME];
char tt[2];
void Lock(CRITICAL_SECTION *l) {
EnterCriticalSection(l);
}
void Unlock(CRITICAL_SECTION *l) {
LeaveCriticalSection(l);
}
void LogV(const char *pszFmt,va_list argp) {
struct tm *now;
struct timeb tb;
if (NULL==pszFmt||0==pszFmt[0]) return;
_vsnprintf(logstr,MAXLINSIZE,pszFmt,argp);
ftime(&tb);
now=localtime(&tb.time);
sprintf(datestr,"%04d-%02d-%02d",now->tm_year+1900,now->tm_mon+1,now->tm_mday);
sprintf(timestr,"%02d:%02d:%02d",now->tm_hour ,now->tm_min ,now->tm_sec );
sprintf(mss,"%03d",tb.millitm);
printf("%s %s.%s %s",datestr,timestr,mss,logstr);
flog=fopen(logfilename1,"a");
if (NULL!=flog) {
fprintf(flog,"%s %s.%s %s",datestr,timestr,mss,logstr);
if (ftell(flog)>MAXLOGSIZE) {
fclose(flog);
if (rename(logfilename1,logfilename2)) {
remove(logfilename2);
rename(logfilename1,logfilename2);
}
} else {
fclose(flog);
}
}
}
void Log(const char *pszFmt,...) {
va_list argp;
Lock(&cs_log);
va_start(argp,pszFmt);
LogV(pszFmt,argp);
va_end(argp);
Unlock(&cs_log);
}
//Log}
void WINAPI CheckFile( LPDIRECTORY_INFO lpdi, PFILE_NOTIFY_INFORMATION lpfni) {
TCHAR szFullPathName[MAX_PATH];
TCHAR FileName[MAX_PATH*2];
int Len;
lstrcpy( szFullPathName, lpdi->lpszDirName );
lstrcat( szFullPathName, "\\" );
Len=WideCharToMultiByte(CP_ACP,WC_COMPOSITECHECK,lpfni->FileName,lpfni->FileNameLength/2,FileName,MAX_PATH*2,0,0);
FileName[Len]=0;
lstrcat( szFullPathName, FileName );
//#define FILE_ACTION_ADDED 0x00000001
//#define FILE_ACTION_REMOVED 0x00000002
//#define FILE_ACTION_MODIFIED 0x00000003
//#define FILE_ACTION_RENAMED_OLD_NAME 0x00000004
//#define FILE_ACTION_RENAMED_NEW_NAME 0x00000005
const char *acts[6]={"NIL","ADD","DEL","MOD","OLD","NEW"};
Log(" %s [%s]\n",acts[lpfni->Action],szFullPathName);
}
void WINAPI HandleDirectoryChange( HANDLE hdlCompletionPort ) {
DWORD numBytes;
DWORD cbOffset;
LPDIRECTORY_INFO di;
LPOVERLAPPED lpOverlapped;
PFILE_NOTIFY_INFORMATION fni;
do {
// Retrieve the directory info for this directory through the completion key
GetQueuedCompletionStatus(
hdlCompletionPort,
&numBytes,
(unsigned long *)&di,
&lpOverlapped,
INFINITE
);
if (di) {
fni = (PFILE_NOTIFY_INFORMATION)di->lpBuffer;
do {
CheckFile( di, fni );
cbOffset = fni->NextEntryOffset;
fni = (PFILE_NOTIFY_INFORMATION)((LPBYTE) fni + cbOffset);
} while( cbOffset );
BOOL r=ReadDirectoryChangesW(
di->hDir,
di->lpBuffer,
MAX_BUFFER,
TRUE,
0x17F,
//#define FILE_NOTIFY_CHANGE_FILE_NAME 0x00000001
//#define FILE_NOTIFY_CHANGE_DIR_NAME 0x00000002
//#define FILE_NOTIFY_CHANGE_ATTRIBUTES 0x00000004
//#define FILE_NOTIFY_CHANGE_SIZE 0x00000008
//#define FILE_NOTIFY_CHANGE_LAST_WRITE 0x00000010
//#define FILE_NOTIFY_CHANGE_LAST_ACCESS 0x00000020
//#define FILE_NOTIFY_CHANGE_CREATION 0x00000040
//#define FILE_NOTIFY_CHANGE_SECURITY 0x00000100
&di->dwBufLength,
&di->Overlapped,
NULL
);
if (!r) Log("ERROR: ReadDirectoryChangesW %d==GetLastError()!\n",GetLastError());
}
} while( di );
}
void splitpathfilename(char *pn) {
char *a=strrchr(pn,'\\');
DWORD len=_MAX_FNAME;
if (a==NULL) {
len=GetCurrentDirectory(len,path);
path[len]='\\';
path[len+1]=0;
sprintf(pathfilename,"%s%s",path,pn);
} else {
strncpy(path,pn,a-pn+1);path[a-pn+1]=0;
strcpy(pathfilename,pn);
}
// Log(" splitpathfilename(%s) path,pathfilename=%s,%s\n",pn,path,pathfilename);
}
void WINAPI WatchDirectories( HANDLE hCompPort ) {
DWORD i;
DWORD tid;
HANDLE hThread;
int cnt;
for (i=0;i<numDirs;i++) {
BOOL r=ReadDirectoryChangesW(
DirInfo[i].hDir,
DirInfo[i].lpBuffer,
MAX_BUFFER,
TRUE,
0x17F,
//#define FILE_NOTIFY_CHANGE_FILE_NAME 0x00000001
//#define FILE_NOTIFY_CHANGE_DIR_NAME 0x00000002
//#define FILE_NOTIFY_CHANGE_ATTRIBUTES 0x00000004
//#define FILE_NOTIFY_CHANGE_SIZE 0x00000008
//#define FILE_NOTIFY_CHANGE_LAST_WRITE 0x00000010
//#define FILE_NOTIFY_CHANGE_LAST_ACCESS 0x00000020
//#define FILE_NOTIFY_CHANGE_CREATION 0x00000040
//#define FILE_NOTIFY_CHANGE_SECURITY 0x00000100
&DirInfo[i].dwBufLength,&DirInfo[i].Overlapped,
NULL
);
if (!r) Log("ERROR: ReadDirectoryChangesW %d==GetLastError()!\n",GetLastError());
}
hThread = CreateThread(
NULL,
0,
(LPTHREAD_START_ROUTINE) HandleDirectoryChange,
hCompPort,
0,
&tid
);
while (1) {
if (_access("noloop.txt",0) == 0) break;//
cnt=0;
Log(" Monitor dir [%s] ...\n",DirList);
while (1) {
cnt++;
if (cnt%100==0) {//100*100==10000ms==10s
if (_access("noloop.txt",0) == 0) break;//
}
if (cnt%600==0) {//600*100==60000ms==60s
cnt=0;
Log(" Monitor dir [%s] ...\n",DirList);
}
Sleep(100);//100ms
}
}
PostQueuedCompletionStatus( hCompPort, 0, 0, NULL );
WaitForSingleObject( hThread, INFINITE );
CloseHandle( hThread );
}
int main(int argc,char **argv) {
if (argc<2) {
printf("Usage: %s dirpath\nto Monitor dir\n",argv[0]);
return 4;
}
//防止该进程重入
HANDLE hMutex;
hMutex = CreateMutex(NULL, FALSE, "dirmon.exe" );
if ( WaitForSingleObject(hMutex, 1000) == WAIT_TIMEOUT ) {
printf("dirmon.exe already running\n");
return 1;
}
if ( GetLastError() == ERROR_ALREADY_EXISTS ) {
ReleaseMutex(hMutex);
printf("dirmon.exe already running\n");
return 2;
}
InitializeCriticalSection(&cs_log);
Log(" Begin...\n");
HANDLE hCompPort;
char dir[MAX_PATH];
strcpy(dir,argv[1]);
sprintf(DirList,"%s",dir);
DirList[lstrlen(DirList)+1]=0;
hCompPort=NULL; // Handle To a Completion Port
numDirs=0;
// First, walk through the raw list and count items, creating an array of handles for each directory
for (TCHAR *p=DirList;*p!='\0';numDirs++,p+=(lstrlen(p)+1)) {
// if (CreateDirectory(p, NULL))
// Log(" Directory [%s] created\n",p);
// else
// Log(" Directory [%s] exists\n",p);
DirInfo[numDirs].hDir = CreateFile(
p,
FILE_LIST_DIRECTORY,
FILE_SHARE_READ |
FILE_SHARE_WRITE |
FILE_SHARE_DELETE,
NULL,
OPEN_EXISTING,
FILE_FLAG_BACKUP_SEMANTICS |
FILE_FLAG_OVERLAPPED,
NULL
);
if (DirInfo[numDirs].hDir == INVALID_HANDLE_VALUE) {
Log(" Unable to open directory [%s]. GetLastError():%d. Terminating...\n",p,GetLastError());
DeleteCriticalSection(&cs_log);
return 3;
}
lstrcpy( DirInfo[numDirs].lpszDirName, p );
// Set up a key(directory info) for each directory
hCompPort=CreateIoCompletionPort(
DirInfo[numDirs].hDir,
hCompPort,
(unsigned long)&DirInfo[numDirs],
0
);
}
remove("noloop.txt");
Log(" create a file noloop.txt or kill process %s to exit\n",argv[0]);
WatchDirectories( hCompPort );
for (DWORD i=0;i<numDirs;i++) CloseHandle( DirInfo[i].hDir );
CloseHandle( hCompPort );
Log(" ...End\n");
DeleteCriticalSection(&cs_log);
return 0;
}