使用多台机器时的批量插入速度差距

I have a go utility that reads documents from flat files and bulk loads them into couchbase. The application is able to insert data at a speed of up to 21K writes per second when 2 writer threads are being executed on a single machine (Destination is a remote couchbase cluster having one server within the network).

But When 2 writer threads are executed from 2 different machines (1 thread each), the insertion speed is reduced to half (10K writes/sec). Since both the machines are using their own RAM and CPU for insertion, plus the utility has shown the speed of up to 25K writes per second, the network doesn’t seem to be the issue (I checked the network utilization as well and it is below 50 percent when multiple machines are used.)

Note: All machines that are used have i7 3.40GHz quad-core processor and 8GB RAM. The total amount of data being inserted is up to 500MB. Bucket Configuration: 5.00GB RAM, Bucket disk I/O priority: High

I need to know what’s causing this speed gap. Please help…

Here is the Code:

package main

import (
    "bufio"
    "encoding/csv"
    "fmt"
    "io"
    "log"
    "strconv"
    "os"
    "sync"
    "runtime"
    "gopkg.in/couchbase/gocb.v1"
)

var  (    
    bucket *gocb.Bucket
    CB_Host string
)

func main() {
    var wg sync.WaitGroup
    CB_Host = <IP Address of Couchbase Server>
    runtime.GOMAXPROCS(runtime.NumCPU())
    cluster, err := gocb.Connect("couchbase://" + CB_Host)  //..........Establish Couchbase Connection
    if err != nil {
        fmt.Println("ERROR CONNECTING COUCHBASE:", err)
    }
    bucket, err = cluster.OpenBucket("BUCKET", "*******")
    if err != nil {
        fmt.Println("ERROR OPENING BUCKET:", err)
    }
    Path := "E:\\Data\\File_"               \\Path of the text file that contains data

    for i := 1; i <= 2; i++{
        wg.Add(1)                                            
        go InsertDataFromFile(Path+strconv.Itoa(i)+".txt", i, &wg)        
    }

    wg.Wait()

        err = bucket.Close()                            //.............. Close Couchbase Connection
        if err != nil {
           fmt.Println("ERROR CLOSING COUCHBASE CONNECTION:", err)
       }
   }

/*-- Main function Ends Here --*/


func InsertDataFromFile(Path string, i int, wg *sync.WaitGroup) (){        
    var  (    
        ID string   
        JSONData  string
        items []gocb.BulkOp  
    )

    csvFile, _ := os.Open(FilePath)                          //...............Open flat file containing data
    reader := csv.NewReader(bufio.NewReader(csvFile))
    reader.Comma = '$'
    reader.LazyQuotes = true
    counter := 1
    fmt.Println("Starting Insertion of File "+ strconv.Itoa(i) + "...")
    for {                                                                
        line, error := reader.Read()
        if error == io.EOF {
            break
        } else if error != nil {                         //...............Parse data and append it into items[] array
            log.Fatal(error)
        }
        ID = line[0]
        JSONData = line[1]
        items = append(items, &gocb.UpsertOp{Key: ID, Value: JSONData})
        if counter % 500 == 0 {
           BulkInsert(&items)                           //................Bulk Insert Next 500 Documents Data into couchbase
            items = nil
              }
        counter = counter + 1     
        } 

    BulkInsert(&items)                                  //................Insert remaining documents
       items = nil
    fmt.Println("Insertion of File "+ strconv.Itoa(i) + " Completed...")
    wg.Done()
}   

func BulkInsert(item *[]gocb.BulkOp) (){
    err := bucket.Do(*item)
    if err != nil {
       fmt.Println("ERROR PERFORMING BULK INSERT:", err)
       }
}