一个场景关于C++ 多线程运用问题

现在遇到应用场景如下
我需要监控200个目录,每个目录可能会有新的文本文件拷贝进来,当目录中有新的文本文件进来的时候,读取文本并开始解析处理,生成新的文本,将老的文本删除.

每个目录对文本的处理方式都不一样,往往文本都比较大, 100MB 到1GB, 每个文本读取并解析处理的完成的时间 大概 需要10秒到 5分钟

问题:
针对上面场景
处理方式1:
是创建200个线程,每个线程都在不停的扫描目录, 发现文件 就开始处理,处理完成后,进行下一次目录扫描,没有发现新文件就线程 sleep, 线程不停的扫描目录

处理方式2:
我的电脑是CPU,16核心的,创建 一个有16个线程的线程池, 当发现目录有文件时,从线程池中拉一个线程处理处理,处理完成后,放回线程池.

各位,有没有好的建议?谢谢

援引GPT回答:
针对这个场景,可以考虑使用处理方式2,即创建一个有16个线程的线程池。

使用线程池的好处是可以限制同时运行的线程数量,避免创建过多的线程导致系统资源的浪费。同时,线程池可以复用线程,减少线程创建和销毁的开销。

在线程池中,可以使用一个任务队列来存放需要处理的目录。当有新的目录需要处理时,直接将任务放入任务队列中。线程池中的线程会不断从任务队列中取出任务进行处理,处理完成后再将线程放回线程池中等待下一个任务。

这样可以有效地利用系统资源,同时也能保证每个目录的处理方式都是并发进行的,提高处理效率。

需要注意的是,在使用线程池时,需要合理设置线程池的大小。根据你的电脑是16核心的,可以设置线程池的大小为16。这样可以最大限度地发挥系统的并行处理能力。

此外,还可以考虑使用一些同步机制来确保线程安全,比如互斥锁或读写锁,以防止多个线程同时访问同一个目录或文件导致的竞态条件。

总之,使用线程池可以更好地管理和调度线程,提高程序的并发处理能力和效率。

那么多线程其实没什么意义,因为你的瓶颈不在cpu而是在磁盘io上

对于你的应用场景,我有几点建议可以供你参考:

  1. 方案评估:首先,评估你的应用场景对于性能和资源利用的要求。考虑到你的目录数目较多且每个目录的处理方式都不一样,可能会对系统资源造成较大的负担。此外,你也需要注意系统的稳定性和可维护性。

  2. 异步处理:使用异步处理的方式可以提高整体系统的效率。可以考虑使用异步IO库(比如asyncio)来实现异步文件扫描和处理。这样可以避免创建过多的线程和线程之间的上下文切换开销。

  3. 线程池:你可以考虑使用线程池来管理并发处理任务。线程池可以重用线程资源,避免频繁地创建和销毁线程。这样可以减少系统资源开销,并可以根据实际情况合理配置线程池的大小。

  4. 文件监控:你可以使用文件系统的监控功能来监听目录中的新文件。不需要频繁地扫描目录,而是在文件创建事件触发时进行处理。可使用watchdog等类似库实现文件监控功能。

  5. 分布式处理:如果你的应用场景需要更高的并发处理能力,可以考虑使用分布式处理框架(如Apache SparkHadoop等)来进行任务的分发和处理。这样可以利用多台机器的计算资源并行处理文件。

总体而言,选择适合你需求的方案需要综合考虑你的系统性能、过程复杂性、可维护性等因素。可以先进行一些性能评估和压力测试,再选择合适的方案进行实施。

【以下回答由 GPT 生成】

建议采用处理方式2: 使用一个有16个线程的线程池来处理这个问题。

以下是实现方式的步骤:

  1. 首先,引入C++的多线程库<thread>和线程池库<ThreadPool.h>,可以使用已有的线程池库,如ThreadPool(一个简单易用的开源库,可在GitHub上找到)。

  2. 初始化一个具有16个线程的线程池。

#include <ThreadPool.h>
...
int numThreads = 16;
ThreadPool pool(numThreads);
  1. 创建一个函数,该函数将用于处理单个目录中的文件。
void processDirectory(const std::string& directory) {
    // 扫描目录中的文件
    std::vector<std::string> files = scanDirectory(directory);

    for(const auto& file : files) {
        // 处理文件
        processFile(file);

        // 删除原始文件
        removeFile(file);
    }
}
  1. 创建一个函数,该函数将用于扫描所有目录。在扫描到新文件时,将其加入线程池进行处理。
void monitorDirectories(const std::vector<std::string>& directories, ThreadPool& pool) {
    while(true) {
        for(const auto& directory : directories) {
            // 扫描目录中的文件
            std::vector<std::string> files = scanDirectory(directory);

            for(const auto& file : files) {
                // 提交任务给线程池处理
                pool.enqueue([directory, file](){
                    processDirectory(directory);
                });
            }
        }

        // 休眠片刻再次扫描目录
        std::this_thread::sleep_for(std::chrono::seconds(10));
    }
}
  1. 调用monitorDirectories函数开始监控目录并处理新文件。
std::vector<std::string> directories = {"directory1", "directory2", ...};
monitorDirectories(directories, pool);

通过使用以上步骤,你将能够创建一个具有16个线程的线程池来监控200个目录,并在有新文件时进行处理。每个新文件将会被提交给线程池中的一个空闲线程进行处理,从而实现并行处理的效果。

这样,你就能够根据处理时间的长短和CPU核心数来动态调整线程池的大小,提高处理效率,并且解决了处理方式1中创建200个线程的资源浪费问题。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

仅供参考:

#pragma warning(disable:4996 4477)
#pragma comment(lib,"user32")
#define WINVER 0x0501
#define _WIN32_WINNT 0x0501
//#include <SDKDDKVer.h>
#include <share.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <io.h>
#include <tchar.h>
#include <Windows.h>
// this is the all purpose structure that contains the interesting directory information and provides the input buffer that is filled with file change data {
#define MAX_DIRS    10
#define MAX_FILES   1000
#define MAX_BUFFER  4096
typedef struct _DIRECTORY_INFO {
    HANDLE      hDir;
    TCHAR       lpszDirName[MAX_PATH];
    CHAR        lpBuffer[MAX_BUFFER];
    DWORD       dwBufLength;
    OVERLAPPED  Overlapped;
} DIRECTORY_INFO, *LPDIRECTORY_INFO;
DIRECTORY_INFO  DirInfo[MAX_DIRS];        // Buffer for all of the directories
DWORD           numDirs;
TCHAR           DirList[MAX_DIRS*MAX_PATH];     // Buffer for all of the directories
TCHAR           *p;
//}
//Log{
#define MAXLOGSIZE 20000000
#define MAXLINSIZE 16000
#include <time.h>
#include <sys/timeb.h>
#include <stdarg.h>
char logfilename1[]="dirmon1.log";
char logfilename2[]="dirmon2.log";
static char logstr[MAXLINSIZE+1];
char datestr[16];
char timestr[16];
char mss[4];
CRITICAL_SECTION cs_log;
FILE *flog;
char pathfilename[_MAX_FNAME];
char path[_MAX_FNAME];
char filename[_MAX_FNAME];
char tt[2];
void Lock(CRITICAL_SECTION *l) {
    EnterCriticalSection(l);
}
void Unlock(CRITICAL_SECTION *l) {
    LeaveCriticalSection(l);
}
void LogV(const char *pszFmt,va_list argp) {
    struct tm *now;
    struct timeb tb;

    if (NULL==pszFmt||0==pszFmt[0]) return;
    _vsnprintf(logstr,MAXLINSIZE,pszFmt,argp);
    ftime(&tb);
    now=localtime(&tb.time);
    sprintf(datestr,"%04d-%02d-%02d",now->tm_year+1900,now->tm_mon+1,now->tm_mday);
    sprintf(timestr,"%02d:%02d:%02d",now->tm_hour     ,now->tm_min  ,now->tm_sec );
    sprintf(mss,"%03d",tb.millitm);
    printf("%s %s.%s %s",datestr,timestr,mss,logstr);
    flog=fopen(logfilename1,"a");
    if (NULL!=flog) {
        fprintf(flog,"%s %s.%s %s",datestr,timestr,mss,logstr);
        if (ftell(flog)>MAXLOGSIZE) {
            fclose(flog);
            if (rename(logfilename1,logfilename2)) {
                remove(logfilename2);
                rename(logfilename1,logfilename2);
            }
        } else {
            fclose(flog);
        }
    }
}
void Log(const char *pszFmt,...) {
    va_list argp;

    Lock(&cs_log);
    va_start(argp,pszFmt);
    LogV(pszFmt,argp);
    va_end(argp);
    Unlock(&cs_log);
}
//Log}
void WINAPI CheckFile( LPDIRECTORY_INFO lpdi, PFILE_NOTIFY_INFORMATION lpfni) {
    TCHAR szFullPathName[MAX_PATH];
    TCHAR FileName[MAX_PATH*2];
    int Len;

    lstrcpy( szFullPathName, lpdi->lpszDirName );
    lstrcat( szFullPathName, "\\" );
    Len=WideCharToMultiByte(CP_ACP,WC_COMPOSITECHECK,lpfni->FileName,lpfni->FileNameLength/2,FileName,MAX_PATH*2,0,0);
    FileName[Len]=0;
    lstrcat( szFullPathName, FileName );
//#define FILE_ACTION_ADDED                   0x00000001
//#define FILE_ACTION_REMOVED                 0x00000002
//#define FILE_ACTION_MODIFIED                0x00000003
//#define FILE_ACTION_RENAMED_OLD_NAME        0x00000004
//#define FILE_ACTION_RENAMED_NEW_NAME        0x00000005
    const char *acts[6]={"NIL","ADD","DEL","MOD","OLD","NEW"};
    Log(" %s [%s]\n",acts[lpfni->Action],szFullPathName);
}
void WINAPI HandleDirectoryChange( HANDLE hdlCompletionPort ) {
    DWORD numBytes;
    DWORD cbOffset;
    LPDIRECTORY_INFO di;
    LPOVERLAPPED lpOverlapped;
    PFILE_NOTIFY_INFORMATION fni;

    do {
        // Retrieve the directory info for this directory through the completion key
        GetQueuedCompletionStatus(
            hdlCompletionPort,
            &numBytes,
            (unsigned long *)&di,
            &lpOverlapped,
            INFINITE
        );
        if (di) {
            fni = (PFILE_NOTIFY_INFORMATION)di->lpBuffer;
            do {
                CheckFile( di, fni );
                cbOffset = fni->NextEntryOffset;
                fni = (PFILE_NOTIFY_INFORMATION)((LPBYTE) fni + cbOffset);
            } while( cbOffset );
            BOOL r=ReadDirectoryChangesW(
                di->hDir,
                di->lpBuffer,
                MAX_BUFFER,
                TRUE,
                0x17F,
//#define FILE_NOTIFY_CHANGE_FILE_NAME    0x00000001
//#define FILE_NOTIFY_CHANGE_DIR_NAME     0x00000002
//#define FILE_NOTIFY_CHANGE_ATTRIBUTES   0x00000004
//#define FILE_NOTIFY_CHANGE_SIZE         0x00000008
//#define FILE_NOTIFY_CHANGE_LAST_WRITE   0x00000010
//#define FILE_NOTIFY_CHANGE_LAST_ACCESS  0x00000020
//#define FILE_NOTIFY_CHANGE_CREATION     0x00000040
//#define FILE_NOTIFY_CHANGE_SECURITY     0x00000100
                &di->dwBufLength,
                &di->Overlapped,
                NULL
            );
            if (!r) Log("ERROR: ReadDirectoryChangesW %d==GetLastError()!\n",GetLastError());
        }
    } while( di );
}
void splitpathfilename(char *pn) {
    char *a=strrchr(pn,'\\');
    DWORD len=_MAX_FNAME;
    if (a==NULL) {
        len=GetCurrentDirectory(len,path);
        path[len]='\\';
        path[len+1]=0;
        sprintf(pathfilename,"%s%s",path,pn);
    } else {
        strncpy(path,pn,a-pn+1);path[a-pn+1]=0;
        strcpy(pathfilename,pn);
    }
//  Log(" splitpathfilename(%s) path,pathfilename=%s,%s\n",pn,path,pathfilename);
}
void WINAPI WatchDirectories( HANDLE hCompPort ) {
    DWORD   i;
    DWORD   tid;
    HANDLE  hThread;
    int     cnt;

    for (i=0;i<numDirs;i++) {
        BOOL r=ReadDirectoryChangesW(
            DirInfo[i].hDir,
            DirInfo[i].lpBuffer,
            MAX_BUFFER,
            TRUE,
            0x17F,
//#define FILE_NOTIFY_CHANGE_FILE_NAME    0x00000001
//#define FILE_NOTIFY_CHANGE_DIR_NAME     0x00000002
//#define FILE_NOTIFY_CHANGE_ATTRIBUTES   0x00000004
//#define FILE_NOTIFY_CHANGE_SIZE         0x00000008
//#define FILE_NOTIFY_CHANGE_LAST_WRITE   0x00000010
//#define FILE_NOTIFY_CHANGE_LAST_ACCESS  0x00000020
//#define FILE_NOTIFY_CHANGE_CREATION     0x00000040
//#define FILE_NOTIFY_CHANGE_SECURITY     0x00000100
            &DirInfo[i].dwBufLength,&DirInfo[i].Overlapped,
            NULL
        );
        if (!r) Log("ERROR: ReadDirectoryChangesW %d==GetLastError()!\n",GetLastError());
    }
    hThread = CreateThread(
        NULL,
        0,
        (LPTHREAD_START_ROUTINE) HandleDirectoryChange,
        hCompPort,
        0,
        &tid
    );
    while (1) {
        if (_access("noloop.txt",0) == 0) break;//
        cnt=0;
        Log(" Monitor dir [%s] ...\n",DirList);
        while (1) {
            cnt++;
            if (cnt%100==0) {//100*100==10000ms==10s
                if (_access("noloop.txt",0) == 0) break;//
            }
            if (cnt%600==0) {//600*100==60000ms==60s
                cnt=0;
                Log(" Monitor dir [%s] ...\n",DirList);
            }
            Sleep(100);//100ms
        }
    }
    PostQueuedCompletionStatus( hCompPort, 0, 0, NULL );
    WaitForSingleObject( hThread, INFINITE );
    CloseHandle( hThread );
}
int main(int argc,char **argv) {
    if (argc<2) {
        printf("Usage: %s dirpath\nto Monitor dir\n",argv[0]);
        return 4;
    }
    //防止该进程重入
    HANDLE hMutex;
    hMutex = CreateMutex(NULL, FALSE, "dirmon.exe" );
    if ( WaitForSingleObject(hMutex, 1000) == WAIT_TIMEOUT ) {
        printf("dirmon.exe already running\n");
        return 1;
    }
    if ( GetLastError() == ERROR_ALREADY_EXISTS ) {
        ReleaseMutex(hMutex);
        printf("dirmon.exe already running\n");
        return 2;
    }
    InitializeCriticalSection(&cs_log);
    Log(" Begin...\n");

    HANDLE hCompPort;
    char dir[MAX_PATH];
    strcpy(dir,argv[1]);

    sprintf(DirList,"%s",dir);
    DirList[lstrlen(DirList)+1]=0;

    hCompPort=NULL;                 // Handle To a Completion Port
    numDirs=0;
    // First, walk through the raw list and count items, creating an array of handles for each directory
    for (TCHAR *p=DirList;*p!='\0';numDirs++,p+=(lstrlen(p)+1)) {
//      if (CreateDirectory(p, NULL))
//          Log(" Directory [%s] created\n",p);
//      else
//          Log(" Directory [%s] exists\n",p);

        DirInfo[numDirs].hDir = CreateFile(
            p,
            FILE_LIST_DIRECTORY,
            FILE_SHARE_READ |
            FILE_SHARE_WRITE |
            FILE_SHARE_DELETE,
            NULL,
            OPEN_EXISTING,
            FILE_FLAG_BACKUP_SEMANTICS |
            FILE_FLAG_OVERLAPPED,
            NULL
        );

        if (DirInfo[numDirs].hDir == INVALID_HANDLE_VALUE) {
            Log(" Unable to open directory [%s]. GetLastError():%d. Terminating...\n",p,GetLastError());
            DeleteCriticalSection(&cs_log);
            return 3;
        }
        lstrcpy( DirInfo[numDirs].lpszDirName, p );

        // Set up a key(directory info) for each directory
        hCompPort=CreateIoCompletionPort(
            DirInfo[numDirs].hDir,
            hCompPort,
            (unsigned long)&DirInfo[numDirs],
            0
        );
    }

    remove("noloop.txt");
    Log(" create a file noloop.txt or kill process %s to exit\n",argv[0]);

    WatchDirectories( hCompPort );

    for (DWORD i=0;i<numDirs;i++) CloseHandle( DirInfo[i].hDir );
    CloseHandle( hCompPort );

    Log(" ...End\n");
    DeleteCriticalSection(&cs_log);

    return 0;
}