i'm really new to Go, su just bear with me here. I'm trying to write a code for loading mysql data to redis cluster using following :redis-go-cluster, load2redis
This is the code. Its a little bit long, just bear with me here.
package main
import (
"bytes"
"database/sql"
"flag"
// "github.com/garyburd/redigo/redis"
_ "github.com/go-sql-driver/mysql"
//"gopkg.in/redis.v4"
"github.com/chasex/redis-go-cluster"
"log"
"runtime"
// "strings"
"sync"
"time"
)
var client *redis.Cluster
type Task interface {
Execute()
}
type Pool struct {
mu sync.Mutex
size int
tasks chan Task
kill chan struct{}
wg sync.WaitGroup
}
func NewPool(size int) *Pool {
pool := &Pool{
tasks: make(chan Task, 128),
kill: make(chan struct{}),
}
pool.Resize(size)
return pool
}
func (p *Pool) worker() {
defer p.wg.Done()
for {
select {
case task, ok := <-p.tasks:
if !ok {
return
}
task.Execute()
case <-p.kill:
return
}
}
}
func (p *Pool) Resize(n int) {
p.mu.Lock()
defer p.mu.Unlock()
for p.size < n {
p.size++
p.wg.Add(1)
go p.worker()
}
for p.size > n {
p.size--
p.kill <- struct{}{}
}
}
func (p *Pool) Close() {
close(p.tasks)
}
func (p *Pool) Wait() {
p.wg.Wait()
}
func (p *Pool) Exec(task Task) {
p.tasks <- task
}
type RedisTask struct {
Index int
Command string
Key string
Value string
MapData map[string]string
}
func (e RedisTask) Execute() {
log.Println("executing:", e.Key, ",", e.Index)
if e.Command == "SET" {
_,err := redis.String(client.Do("SET", e.Key, e.Value))
checkErr(err, "set error:")
} else if e.Command == "SADD" {
_,err := redis.Strings(client.Do("SADD", e.Key, e.Value))
checkErr(err, "sadd error:")
} else if e.Command == "HMSET" {
_,err := redis.StringMap(client.Do("HMSET", e.Key, e.MapData))
checkErr(err, "hmset error:")
}
// TODO: clean data
}
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
startTime := time.Now().UnixNano() / int64(time.Millisecond)
host := flag.String("s", "localhost:3306", "mysql server host and port ,eg localhost:3306")
username := flag.String("u", "test", "username to login mysql")
password := flag.String("p", "test", "password for mysql")
database := flag.String("d", "test", "database you want to execute query")
query := flag.String("q", "select 1;", "your query sql")
ds := flag.String("ds", "key", "redis structure")
PK := flag.String("pk", "Rkey", "the redis Key in the fields of mysql query result")
//redisHost := flag.String("rs", "localhost:6379", "redis host and port ,eg localhost:6379")
//redisPassword := flag.String("rp", "test", "redis password")
poolSize := flag.Int("size", 10000, "redis pool size")
flag.Parse()
var buf bytes.Buffer = bytes.Buffer{}
buf.WriteString(*username)
buf.WriteString(":")
buf.WriteString(*password)
buf.WriteString("@tcp(")
buf.WriteString(*host)
buf.WriteString(")/")
buf.WriteString(*database)
db, err := sql.Open("mysql", buf.String())
checkErr(err, "connect to mysql error !")
defer db.Close()
poolWorker := NewPool(*poolSize)
// Execute the query
rows, err := db.Query(*query)
checkErr(err, "execute sql error!")
// pool = newPool(*redisHost, *redisPassword, *poolSize)
//client = redis.NewClient(&redis.Options{
// Addr: *redisHost,
// Password: *redisPassword, // no password set
// DB: 0, // use default DB
//})
client,_ = redis.NewCluster(&redis.Options{
StartNodes: []string{"10.x.x.x:6000", "10.x.x.x:6001", "10.x.x.x:6002"},
ConnTimeout: 50 * time.Millisecond,
ReadTimeout: 50 * time.Millisecond,
WriteTimeout: 50 * time.Millisecond,
KeepAlive: 16,
AliveTime: 60 * time.Second,
})
//checkErr(err, "client error:")
//pong, err := client.Ping().Result()
//checkErr(err, "redis client error:")
//log.Println(pong)
columns, err := rows.Columns()
checkErr(err, "get columns error!")
length := len(columns)
values := make([]sql.RawBytes, length)
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}
count := 0
for rows.Next() {
count += 1
err = rows.Scan(scanArgs...)
checkErr(err, "scan error")
var value string
var key string
var task RedisTask
if *ds == "key" {
key = getStringData(values[0])
value = getStringData(values[1])
if value != "" {
task = RedisTask{
Index: count,
Command: "SET",
Key: key,
Value: value,
}
}
} else if *ds == "set" {
key = getStringData(values[0])
value = getStringData(values[1])
if value != "" {
task = RedisTask{
Index: count,
Command: "SADD",
Key: key,
Value: value,
}
}
} else if *ds == "hash" {
key = getStringData(values[0])
// args := redis.Args{}.Add(key)
m := make(map[string]string)
for i, col := range values {
if col != nil && columns[i] != *PK {
value = getStringData(col)
m[columns[i]] = value
}
}
task = RedisTask{
Index: count,
Command: "HMSET",
Key: key,
MapData: m,
}
}
poolWorker.Exec(task)
}
if err = rows.Err(); err != nil {
panic(err.Error()) // proper error handling instead of panic in your app
}
poolWorker.Close()
poolWorker.Wait()
EndTime := time.Now().UnixNano() / int64(time.Millisecond)
log.Println("======================================== executing time:", EndTime-startTime, " ms, total:", count)
}
func getStringData(data sql.RawBytes) string {
if data == nil {
return ""
}
value := string(data)
return clearBad(value)
}
func clearBad(str string) string {
// str = strings.Trim(str, "`")
// str = strings.Trim(str, "ï½€")
// str = strings.Trim(str, "-")
// str = strings.Trim(str, ".")
// str = strings.Trim(str, " ")
// str = strings.Trim(str, ";")
// str = strings.Trim(str, ",")
// str = strings.Trim(str, ":")
// str = strings.Trim(str, ";")
// str = strings.Trim(str, "'")
// str = strings.Trim(str, "!")
return str
}
func checkErr(err error, msg string) {
if err != nil {
log.Fatalln(msg, err)
}
}
When i'm executing it, i'm getting following exception:
./rak -u user -p user -s 10.X.X.X:8080 -d test -q "SELECT CONCAT( 'student:', c.sid ) Rkey, c.sname SNAME, c.saddress SADDRESS, c.sage SAGE FROM STUDENT c WHERE c.sid > 0;" -ds hash -size 1200
2017/07/21 10:29:09 rak.go:93: executing: student:2 , 2
2017/07/21 10:29:09 rak.go:93: executing: student:1 , 1
2017/07/21 10:29:09 rak.go:93: executing: student:3 , 3
2017/07/21 10:29:09 rak.go:268: hmset error: Do: unknown type map[string]string
$
Can somebody explain to me what i'm doing wrong here? I'd be very grateful.
As pointed out, Do
does not work with maps. This is one way you could fix it.
} else if e.Command == "HMSET" {
// Build up a string slice to hold the key value pairs
args := make([]string, 0, len(e.MapData) * 2)
for k, v := range e.MapData {
args = append(args, k, v)
}
_,err := redis.StringMap(client.Do("HMSET", e.Key, args...))
checkErr(err, "hmset error:")
}
The Do method maps to the Redis command set and arguments are expected in the same way. For example.
127.0.0.1:6379> HMSET myKey foo bar baz boff
OK
127.0.0.1:6379> HGETALL myKey
1) "foo"
2) "bar"
3) "baz"
4) "boff"
127.0.0.1:6379>
The same map-set operation using the redis client in your code would be
client.Do("HMSET", "myKey", "foo", "bar", "baz", "boff")
When the arguments for keys and values of the map are dynamic, the most straight forward way is
client.Do("HMSET", "myKey", []string{"foo", "bar", "baz", "boff"}...)
which is exactly what the first code block above does.
Accepted values for Do function arguements are int64
, float64
, string
or []byte
.Since you passed a map it failed with error unknown type %T
Here is a reference to the respective code that does it