需要加速的代码,是两个嵌套的for循环,单次循环之间可以并行处理,没有数据依赖,应该是可以用GPU进行加速的。
import msgpack
import torch
from torch.nn.utils.rnn import pad_sequence
from pathlib import Path
import csv
import queue
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from numba import cuda, jit
from numba.typed import List
import numpy as np
with open('/home/suned/data/giscup_2021_1/msgpack/20200801.msgpack', 'rb') as f1:
dct1 = msgpack.unpackb(f1.read())
with open('/home/suned/data/giscup_2021_1/totalmsgpack/1_2.msgpack', 'rb') as f2:
dct2 = msgpack.unpackb(f2.read())
def batch2tensor(batch, name, log_trans=False, long_tensor=False): # 将数据处理成张量形式。
if long_tensor == True:
x = torch.LongTensor([int(item[name]) for item in batch])
else:
x = torch.FloatTensor([item[name] for item in batch])
if log_trans == True:
x = torch.log(x)
return x
eta_min, eta_max, eta_mean, eta_std = (2.3978952727983707, 9.371353167823885, 6.553886963677842, 0.5905307292899195)
simple_eat_min, simple_eat_max, simple_eat_mean, simple_eat_std = (
0.6931471805599453, 9.320180837655714, 6.453206241137908, 0.5758803681400783) # simple_eta是出发时刻平均通行时间求和。
eta1 = (batch2tensor(dct1, 'eta',
log_trans=True) - eta_mean) / eta_std # 计算标准化值,给出一组数据中各数值的相对位置。是一种标准化处理,平均数为0,标准差为1。这里的eta实际是ata,即训练的标签值。
eta2 = (batch2tensor(dct2, 'eta', log_trans=True) - eta_mean) / eta_std
simple_eta1 = (batch2tensor(dct1, 'simple_eta', log_trans=True) - simple_eat_mean) / simple_eat_std
# simple_eta2 = (batch2tensor(dct2, 'simple_eta', log_trans=True) - simple_eat_mean)/simple_eat_std
link_start = [torch.LongTensor(list([item['link_id'][0]])) for item in dct1]
link_start5 = [torch.LongTensor(list(item['link_id'][0:10])) for item in dct2]
link_start = pad_sequence(link_start, batch_first=True)
link_start5 = pad_sequence(link_start5, batch_first=True)
# print('link_start:',link_start)
# print('link_start5',link_start5)
link_end = [torch.LongTensor(list([item['link_id'][-1]])) for item in dct1]
link_end5 = [torch.LongTensor(list(item['link_id'][-10:])) for item in dct2]
link_end = pad_sequence(link_end, batch_first=True)
link_end5 = pad_sequence(link_end5, batch_first=True)
eta1 = eta1.numpy()
eta2 = eta2.numpy()
simple_eta1 = simple_eta1.numpy()
link_start = link_start.numpy()
link_start5 = link_start5.numpy()
link_end = link_end.numpy()
link_end5 = link_end5.numpy()
@jit(nopython=True)
def traj_judge(link_start, link_start5, link_end, link_end5, eta):
trajO = np.empty(shape=(0, 10))
trajD = np.empty(shape=(0, 10))
trajOD = np.empty(shape=(0, 20))
etaOD = np.empty(shape=(0, 1))
etaODavg = np.empty(shape=(0, 1))
for j in range(0, len(link_start5)):
if ((link_start == link_start5[j]).sum()) >= 1 and ((link_end == link_end5[j]).sum()) >= 1:
trajO = np.append(trajO, [link_start == link_start5[j]], axis=0)
trajD = np.append(trajD, [link_end == link_end5[j]], axis=0)
etaOD = np.append(etaOD, [[eta[j]]], axis=0)
'''elif((link_start==link_start5[j]).long().sum())==0 or ((link_end==link_end5[j]).long().sum())==0:
trajO1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
trajD1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
etaOD1=[torch.tensor([0])]'''
if len(trajO) == 0:
trajO = np.array([[False, False, False, False, False, False, False, False, False, False]])
if len(trajD) == 0:
trajD = np.array([[False, False, False, False, False, False, False, False, False, False]])
trajOD = np.concatenate((trajO, trajD), axis=1)
etaODavg = etaOD.mean()
return etaODavg,trajOD
'''@jit('float64()',nopython=True)
def var1():
trajlen1 = np.empty(shape=(0, 1))
return trajlen1
@jit('float64()',nopython=True)
def var2():
etaODAVG1 = np.empty(shape=(0, 1))
return etaODAVG1'''
@cuda.jit
def gpurun(N, etaODAVG, trajlen, trajOD, etaODavg,trajO,trajD,etaOD,etaODavg1,link_start, link_start5, link_end, link_end5, eta):
idxWithinGrid = cuda.threadIdx.x + cuda.blockIdx.x * cuda.blockDim.x
gridStride = cuda.gridDim.x * cuda.blockDim.x
for i in range(idxWithinGrid, N, gridStride):
for j in range(0, len(link_start5)):
if ((link_start[i] == link_start5[j]).sum()) >= 1 and ((link_end[i] == link_end5[j]).sum()) >= 1:
trajO = np.append(trajO, [[link_start[i] == link_start5[j]]], axis=0)
trajD = np.append(trajD, [[link_end[i] == link_end5[j]]], axis=0)
etaOD = np.append(etaOD, [[eta[j]]], axis=0)
'''elif((link_start==link_start5[j]).long().sum())==0 or ((link_end==link_end5[j]).long().sum())==0:
trajO1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
trajD1=[torch.tensor([False, False, False, False, False, False, False, False, False, False])]
etaOD1=[torch.tensor([0])]'''
if len(trajO) == 0:
trajO = np.array([[False, False, False, False, False, False, False, False, False, False]])
if len(trajD) == 0:
trajD = np.array([[False, False, False, False, False, False, False, False, False, False]])
trajOD = np.concatenate((trajO, trajD), axis=1)
etaODavg = etaOD.mean()
if len(trajOD) == 1:
etaODavg = simple_eta1[i]
else:
etaODavg = etaODavg
etaODAVG = np.append(etaODAVG, etaODavg, axis=0)
trajlen = np.append(trajlen, [[len(trajOD)]], axis=0)
print('write:', i, 'trajlen:', len(trajOD), 'etaODAVG:', etaOD.mean(), 'simpale_eta:', simple_eta1[i], 'eta1:',
eta1[i])
def main():
etaODAVG = cuda.device_array(shape=(0, 1))
trajlen = cuda.device_array(shape=(0, 1))
trajOD = cuda.device_array(shape=(0, 20))
etaODavg = cuda.device_array(shape=(0, 1))
trajO = cuda.device_array(shape=(0, 10))
trajD = cuda.device_array(shape=(0, 10))
etaOD = cuda.device_array(shape=(0, 1))
etaODavg1 = cuda.device_array(shape=(0, 1))
gpurun[12, 256](len(link_start) + 1, etaODAVG, trajlen, trajOD, etaODavg,trajO,trajD,etaOD,etaODavg1,link_start, link_start5, link_end, link_end5, eta2)
if __name__ == '__main__':
main()
msg_path1 = Path('/home/suned/data/giscup_2021_1/totalmsgpack/20200801avg.msgpack')
msg_path1.parent.mkdir(parents=True, exist_ok=True) # parents:如果父目录不存在,是否创建父目录。exist_ok:只有在目录不存在时创建目录,目录已存在时不会抛出异常。
msg_path2 = Path('/home/suned/data/giscup_2021_1/totalmsgpack/20200801trajlen.msgpack')
msg_path2.parent.mkdir(parents=True, exist_ok=True)
with open(msg_path1, 'wb') as f1:
packed = msgpack.packb(etaODAVG) # msgpack.packb是一种数据打包的方式,类似JSON,但是更简单且数据量更小
f1.write(packed)
with open(msg_path2, 'wb') as f2:
packed = msgpack.packb(trajlen) # msgpack.packb是一种数据打包的方式,类似JSON,但是更简单且数据量更小
f2.write(packed)
在没有用cuda.jit装饰之前,代码是可以正常运行的,只是比较慢,而且目前用到的只是测试数据,后期可能会使用更大的数据。
目前调试遇到很多问题,貌似主要是对数据类型的不支持,不知道是否有擅长这方面的朋友可以帮忙解决一下。
数据在此:链接: https://pan.baidu.com/s/1J7ii31wCm3nhp9mNuZ4Akw 提取码: mi63
不是所有运算都可以加速
from numba import cuda
import numpy as np
@cuda.jit
def multiply_kernel(x, out):
idx = cuda.grid(1) #创建线程的一维网格
out[idx] = x[idx] * 2
n = 4096
x = np.arange(n).astype(np.int32)
d_x = cuda.to_device(x)
d_out = cuda.device_array_like(d_x) #创建输出数组
blocks_per_grid= 32 #每个网格中的数字块
threads_per_block = 128 #每个块中的线程数
multiply_kernel[blocks_per_grid, threads_per_block](d_x, d_out)
cuda.synchronize() #等待GPU完成任务
print(d_out.copy_to_host()) #将数据从GPU拷贝到CPU/主机
参考:
https://www.cnblogs.com/Tom-Ren/p/10024278.html
不是所有运算都可以加速
无法加速的是加速不了的
我之前也用numba加速,发现好多情况都用不了,然后就再也不用他了T_T
建议拆开
先执行一下这个看看你是不是能正常加速
import time
from numba import jit
import numpy as np
def my_add(a,b):
return a+b
#利用jit编译加速 cpu
@jit
def my_numba_add(x, y):
return x + y
def test(n):
a = np.random.random(1000)
b = np.random.random(1000)
tic1 = time.time()
my_add(a,b)
t1 = time.time()-tic1
print('python time:',t1)
tic2 = time.time()
my_numba_add(a,b)
t2 = time.time()-tic2
print('Numba time:',t2)
print('Numba acclerated {} times'.format(t1/t2))
加上这个试一试
from numba import cuda
import numpy as np
import math
from time import time
@cuda.jit
def gpu_add(a, b, result, n):
idx = cuda.threadIdx.x + cuda.blockDim.x * cuda.blockIdx.x
if idx < n :
result[idx] = a[idx] + b[idx]
def main():
n = 20000000
x = np.arange(n).astype(np.int32)
y = 2 * x
start = time()
x_device = cuda.to_device(x)
y_device = cuda.to_device(y)
out_device = cuda.device_array(n)
threads_per_block = 1024
blocks_per_grid = math.ceil(n / threads_per_block)
# 使用默认流
gpu_add[blocks_per_grid, threads_per_block](x_device, y_device, out_device, n)
gpu_result = out_device.copy_to_host()
cuda.synchronize()
print("gpu vector add time " + str(time() - start))
start = time()
# 使用5个stream
number_of_streams = 5
# 每个stream处理的数据量为原来的 1/5
# 符号//得到一个整数结果
segment_size = n // number_of_streams
# 创建5个cuda stream
stream_list = list()
for i in range (0, number_of_streams):
stream = cuda.stream()
stream_list.append(stream)
threads_per_block = 1024
# 每个stream的处理的数据变为原来的1/5
blocks_per_grid = math.ceil(segment_size / threads_per_block)
streams_out_device = cuda.device_array(segment_size)
streams_gpu_result = np.empty(n)
# 启动多个stream
for i in range(0, number_of_streams):
# 传入不同的参数,让函数在不同的流执行
x_i_device = cuda.to_device(x[i * segment_size : (i + 1) * segment_size], stream=stream_list[i])
y_i_device = cuda.to_device(y[i * segment_size : (i + 1) * segment_size], stream=stream_list[i])
gpu_add[blocks_per_grid, threads_per_block, stream_list[i]](
x_i_device,
y_i_device,
streams_out_device,
segment_size)
streams_gpu_result[i * segment_size : (i + 1) * segment_size] = streams_out_device.copy_to_host(stream=stream_list[i])
cuda.synchronize()
print("gpu streams vector add time " + str(time() - start))
if __name__ == "__main__":
main()