I am trying to write a simple program that behaves like find | grep in golang. I have the program all working using goroutines using the following pattern:
goroutine (filech <- each file as found) goroutine (store files in extension based category <- grepch)
goroutine for each filech file (grepch <- if file contains string)
This all works as expected, but when presented with a large number of files, the memory just keeps growing and growing. I have looked into some of the profiling tools offered by Go, but I couldn't figure out how to find my memory leak. I can say that the memory is being used up mostly by bytes.makeSlice.
Can anyone look at the code below and see what I am doing wrong? Also, I would like to know what is wrong with my code, but I would also like to learn how to debug this on my own in the future, so if you could give detailed profiling instructions for a problem such as this, that would be greatly appreciated.
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"regexp"
"runtime/pprof"
"strings"
"sync"
)
var (
topDir string
cProf bool
mProf bool
cProfFile *os.File
mProfFile *os.File
fileNames []string
fileTypes []string
fileLists map[string][]string
cMatch = regexp.MustCompile(`(?i)^.*\.(?:c|h|cc|cpp|c\+\+|hpp)$`)
javaMatch = regexp.MustCompile(`(?i)^.*\.(?:java|js)$`)
goMatch = regexp.MustCompile(`(?i)^.*\.(?:go)$`)
buildMatch = regexp.MustCompile(`(?i)^.*\.(?:gradle|mk|mka)$`)
buildMatch2 = regexp.MustCompile(`^.*/(?:Makefile[^/\\]*)$`)
regMatch = regexp.MustCompile(`(?i)(?:test|debug)`)
)
func init() {
fileLists = make(map[string][]string)
}
func main() {
flag.StringVar(&topDir, "d", ".", "The top level directory to process (default is current directory)")
flag.BoolVar(&cProf, "c", false, "Include if you want to save the CPU profile")
flag.BoolVar(&mProf, "m", false, "Include if you want to save the MEM profile")
flag.Parse()
cProfFunc()
getFilesChan := make(chan string, 1000)
grepFilesChan := make(chan string, 100)
go getFileNamesOverChan(topDir, getFilesChan)
var fileResult string
var grepWg sync.WaitGroup
var categorizeWg sync.WaitGroup
fileTypes = append(fileTypes, "C", "Java", "Go", "Build", "Uncategorized")
categorizeWg.Add(1)
go func(chan string) {
var grepResult string
for grepResult = range grepFilesChan {
if grepResult != "" {
fmt.Printf("Found file %s with text
", grepResult)
var fileType = getFileCategory(grepResult)
fileLists[fileType] = append(fileLists[fileType], grepResult)
}
}
categorizeWg.Done()
}(grepFilesChan)
for fileResult = range getFilesChan {
if fileResult != "" {
fileNames = append(fileNames, fileResult)
grepWg.Add(1)
go func(file string, ch chan string) {
fmt.Printf("Grepping file %s
", file)
grepOverChan(file, ch)
grepWg.Done()
}(fileResult, grepFilesChan)
}
}
grepWg.Wait()
close(grepFilesChan)
categorizeWg.Wait()
printSummary()
mProfFunc()
defer pprof.StopCPUProfile()
defer cProfFile.Close()
}
func cProfFunc() {
if cProf {
cProfFile, _ = os.Create("cpu_profile.pprof")
//handle err
_ = pprof.StartCPUProfile(cProfFile)
//handle err
}
}
func mProfFunc() {
if mProf {
mProfFile, _ = os.Create("mem_profile.pprof")
//handle err
_ = pprof.WriteHeapProfile(mProfFile)
//handle err
defer mProfFile.Close()
}
}
func printSummary() {
fmt.Printf("
Processed %d Files
", len(fileNames))
fmt.Println("")
fmt.Println("Found text in the following files:")
for _, fType := range fileTypes {
fmt.Printf("Found text in %d %s Files
", len(fileLists[fType]), fType)
}
/*
for _, fType := range fileTypes {
if len(fileLists[fType]) > 0 {
fmt.Println("")
fmt.Printf("\t%s Files:
", fType)
}
for _, fileName := range fileLists[fType] {
fmt.Printf("\t\t%s
", fileName)
}
}
*/
}
func getFileNamesOverChan(directory string, ch chan string) {
fmt.Printf("Finding files in directory %s
", directory)
var err error
var dirInfo os.FileInfo
dirInfo, err = os.Lstat(directory)
if err != nil {
close(ch)
return
}
if !dirInfo.IsDir() {
close(ch)
return
}
recursiveGetFilesOverChan(directory, ch)
close(ch)
}
func recursiveGetFilesOverChan(dir string, ch chan string) {
dirFile, _ := os.Open(dir)
//handle err
defer dirFile.Close()
dirFileInfo, _ := dirFile.Readdir(0)
//handle err
for _, file := range dirFileInfo {
filePath := fmt.Sprintf("%s%c%s", dir, os.PathSeparator, file.Name())
switch mode := file.Mode(); {
case mode.IsDir():
//is a directory ... recurse
recursiveGetFilesOverChan(filePath, ch)
case mode.IsRegular():
//is a regular file ... send it if it is not a CVS or GIT file
if !strings.Contains(filePath, "/CVS/") && !strings.Contains(filePath, "/.git/") {
fmt.Printf("Found File %s
", filePath)
ch <- filePath
}
case mode&os.ModeSymlink != 0:
//is a symbolic link ... skip it
continue
case mode&os.ModeNamedPipe != 0:
//is a Named Pipe ... skip it
continue
}
}
}
func getFileCategory(file string) string {
var fileType string
switch {
case cMatch.MatchString(file):
fileType = "C"
case javaMatch.MatchString(file):
fileType = "Java"
case goMatch.MatchString(file):
fileType = "Go"
case buildMatch.MatchString(file):
fileType = "Build"
case buildMatch2.MatchString(file):
fileType = "Build"
default:
fileType = "Uncategorized"
}
return fileType
}
func grepOverChan(f string, ch chan string) {
fileBytes, _ := ioutil.ReadFile(f)
if regMatch.Match(fileBytes) {
ch <- f
}
}
Based on the comments by @JimB to my question, I was able to figure out that this was not a memory leak per say, but a problem of unlimited concurrency. My original code was kicking off a grep for every file as it was encountered without limit.
I was able to resolve this problem by limiting the number of files opened to grep at any one time. Using the example provided by http://jmoiron.net/blog/limiting-concurrency-in-go/. In this link, they create a semaphoreChannel that will only accept the limit number of messages. Write a value to this channel before opening a file, and read a value from this channel after finished searching the file. At the end wait for the semaphoreChannel to fill up again.
Here is the working code that corresponds to my broken original code (see grepConcurrencyLimit
and semaphoreChan
for the relevant portions):
package main
import (
"flag"
"fmt"
"io/ioutil"
"os"
"regexp"
"runtime/pprof"
"strings"
"sync"
)
var (
topDir string
cProf bool
mProf bool
cProfFile *os.File
mProfFile *os.File
fileNames []string
fileTypes []string
fileLists map[string][]string
grepConcurrencyLimit int
cMatch = regexp.MustCompile(`(?i)^.*\.(?:c|h|cc|cpp|c\+\+|hpp)$`)
javaMatch = regexp.MustCompile(`(?i)^.*\.(?:java|js)$`)
goMatch = regexp.MustCompile(`(?i)^.*\.(?:go)$`)
buildMatch = regexp.MustCompile(`(?i)^.*\.(?:gradle|mk|mka)$`)
buildMatch2 = regexp.MustCompile(`^.*/(?:Makefile[^/\\]*)$`)
regMatch = regexp.MustCompile(`(?i)(?:test|debug)`)
)
func init() {
fileLists = make(map[string][]string)
}
func main() {
flag.StringVar(&topDir, "d", ".", "The top level directory to process (default is current directory)")
flag.IntVar(&grepConcurrencyLimit, "l", 50, "The limit of number of files to grep at any one time")
flag.BoolVar(&cProf, "c", false, "Include if you want to save the CPU profile")
flag.BoolVar(&mProf, "m", false, "Include if you want to save the MEM profile")
flag.Parse()
cProfFunc()
getFilesChan := make(chan string, 1000)
grepFilesChan := make(chan string, 100)
// This channel is to ensure that only grepConcurrencyLimit files are ever grepped at any one time
semaphoreChan := make(chan bool, grepConcurrencyLimit)
go getFileNamesOverChan(topDir, getFilesChan)
var fileResult string
var grepWg sync.WaitGroup
var categorizeWg sync.WaitGroup
fileTypes = append(fileTypes, "C", "Java", "Go", "Build", "Uncategorized")
categorizeWg.Add(1)
go func(chan string) {
var grepResult string
for grepResult = range grepFilesChan {
if grepResult != "" {
fmt.Printf("Found file %s with text
", grepResult)
var fileType = getFileCategory(grepResult)
fileLists[fileType] = append(fileLists[fileType], grepResult)
}
}
categorizeWg.Done()
}(grepFilesChan)
for fileResult = range getFilesChan {
if fileResult != "" {
fileNames = append(fileNames, fileResult)
grepWg.Add(1)
// write a boolean to semaphoreChan to take up one of the concurrency limit spots
semaphoreChan <- true
go func(file string, ch chan string) {
fmt.Printf("Grepping file %s
", file)
//run the function to read a boolean from semaphoreChan to release one of the concurrency limit spots
defer func() { <-semaphoreChan }()
grepOverChan(file, ch)
grepWg.Done()
}(fileResult, grepFilesChan)
}
}
// refill semaphoreChan to capacity to wait until all of the final go routines have completed.
for i := 0; i < cap(semaphoreChan); i++ {
semaphoreChan <- true
}
grepWg.Wait()
close(grepFilesChan)
categorizeWg.Wait()
printSummary()
mProfFunc()
defer pprof.StopCPUProfile()
defer cProfFile.Close()
}
func cProfFunc() {
if cProf {
cProfFile, _ = os.Create("cpu_profile.pprof")
//handle err
_ = pprof.StartCPUProfile(cProfFile)
//handle err
}
}
func mProfFunc() {
if mProf {
mProfFile, _ = os.Create("mem_profile.pprof")
//handle err
_ = pprof.WriteHeapProfile(mProfFile)
//handle err
defer mProfFile.Close()
}
}
func printSummary() {
fmt.Printf("
Processed %d Files
", len(fileNames))
fmt.Println("")
fmt.Println("Found text in the following files:")
for _, fType := range fileTypes {
fmt.Printf("Found text in %d %s Files
", len(fileLists[fType]), fType)
}
/*
for _, fType := range fileTypes {
if len(fileLists[fType]) > 0 {
fmt.Println("")
fmt.Printf("\t%s Files:
", fType)
}
for _, fileName := range fileLists[fType] {
fmt.Printf("\t\t%s
", fileName)
}
}
*/
}
func getFileNamesOverChan(directory string, ch chan string) {
fmt.Printf("Finding files in directory %s
", directory)
var err error
var dirInfo os.FileInfo
dirInfo, err = os.Lstat(directory)
if err != nil {
close(ch)
return
}
if !dirInfo.IsDir() {
close(ch)
return
}
recursiveGetFilesOverChan(directory, ch)
close(ch)
}
func recursiveGetFilesOverChan(dir string, ch chan string) {
dirFile, _ := os.Open(dir)
//handle err
defer dirFile.Close()
dirFileInfo, _ := dirFile.Readdir(0)
//handle err
for _, file := range dirFileInfo {
filePath := fmt.Sprintf("%s%c%s", dir, os.PathSeparator, file.Name())
switch mode := file.Mode(); {
case mode.IsDir():
//is a directory ... recurse
recursiveGetFilesOverChan(filePath, ch)
case mode.IsRegular():
//is a regular file ... send it if it is not a CVS or GIT file
if !strings.Contains(filePath, "/CVS/") && !strings.Contains(filePath, "/.git/") {
fmt.Printf("Found File %s
", filePath)
ch <- filePath
}
case mode&os.ModeSymlink != 0:
//is a symbolic link ... skip it
continue
case mode&os.ModeNamedPipe != 0:
//is a Named Pipe ... skip it
continue
}
}
}
func getFileCategory(file string) string {
var fileType string
switch {
case cMatch.MatchString(file):
fileType = "C"
case javaMatch.MatchString(file):
fileType = "Java"
case goMatch.MatchString(file):
fileType = "Go"
case buildMatch.MatchString(file):
fileType = "Build"
case buildMatch2.MatchString(file):
fileType = "Build"
default:
fileType = "Uncategorized"
}
return fileType
}
func grepOverChan(f string, ch chan string) {
fileBytes, _ := ioutil.ReadFile(f)
if regMatch.Match(fileBytes) {
ch <- f
}
}