python,事务,多线程

问题:多线程和事务

       @action(methods=['post'], detail=False)
    def uploadTest(self, request):
        # 目标:1.假如每次随机数都为3 ,则能完整插入所有数据,msgList返回空数组
        #      2.随机数出现有一次2/5的倍数,则希望统计出所有报错的随机数,并且数据库保持没有数据插入
        # 当前能实现1,也能实现2的返回所有错误情况,但2时,主线程保持了原子性,子线程没有
        data = {'code': 0, 'msg': '', 'data': []}
        msgList = []
        try:
            with transaction.atomic():
                # 主线程会有一次数据库操作
                connection.cursor().execute("INSERT INTO upload_main (id, name,age) VALUES (%s, %s, %s)", (666, 'name', 666))
                for i in range(0, 5):
                    rand = random.randint(1, 9)
                    print(rand)
                    tempList = pool.submit(uploadSun, rand , msgList)
                msgList = tempList.result()
                if len(msgList)>0:
                    raise Exception('error')
        except Exception as e:
            data["data"] = msgList
            return JsonResponse(data)
        data['msg']='success'
        return JsonResponse(data)

def uploadSun(random,msgList):
    # 子线程先判断,完成后进行第一次数据库操作
    if random % 2 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        connection.cursor().execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    # 子线程完成后进行第二次数据库操作
    if random % 5 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        # 实际业务第二次是第一次的关联表,需要加上第一次插入后的id
        connection.cursor().execute("INSERT INTO upload_not5 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    return msgList

目前:

img

img

img

img

img

这段代码中涉及了多线程和事务,主要的问题在于子线程中的数据库操作无法参与到主线程的事务中,因此当子线程执行失败后,主线程中已经成功的数据库操作也无法回滚。

为了解决这个问题,可以将主线程和子线程中的数据库操作都放在同一个事务中,即在with transaction.atomic()块中处理主线程和子线程的数据库操作,这样无论哪个线程中的操作失败了都能进行回滚,代码如下:

from django.db import transaction

@action(methods=['post'], detail=False)
def uploadTest(self, request):
    data = {'code': 0, 'msg': '', 'data': []}
    msgList = []
    try:
        with transaction.atomic():
            # 主线程会有一次数据库操作
            connection.cursor().execute("INSERT INTO upload_main (id, name,age) VALUES (%s, %s, %s)", (666, 'name', 666))
            # 创建一个新的连接,用于子线程的数据库操作
            with connection.cursor() as cursor:
                for i in range(0, 5):
                    rand = random.randint(1, 9)
                    print(rand)
                    tempList = pool.submit(uploadSun, rand , msgList, cursor)
                msgList = tempList.result()
            if len(msgList) > 0:
                raise Exception('error')
    except Exception as e:
        data["data"] = msgList
        return JsonResponse(data)
    data['msg'] = 'success'
    return JsonResponse(data)
 
def uploadSun(random, msgList, cursor):
    # 子线程先判断,完成后进行第一次数据库操作
    if random % 2 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        cursor.execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    # 子线程完成后进行第二次数据库操作
    if random % 5 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        # 实际业务第二次是第一次的关联表,需要加上第一次插入后的id
        cursor.execute("INSERT INTO upload_not5 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    return msgList

在这个新的代码中,我们首先打开了一个新的连接cursor,用于子线程的数据库操作,然后将这个连接作为参数传递给uploadSun函数。在主线程中,我们将整个处理流程放在with transaction.atomic()块中,包括主线程和子线程的数据库操作。这样,当任何一个线程中的数据库操作失败时,整个事务都会回滚,从而保证了数据的一致性。

您提供的代码似乎是一个 Django 视图函数,用于处理将数据上传到数据库的 POST 请求。它还创建多个线程来执行其他数据库操作。

为了确保数据的一致性和完整性,在使用数据库时需要使用事务。事务允许您将一组数据库操作分组到单个工作单元中,该工作单元作为一个整体要么成功,要么失败。在 Django 的上下文中,您可以使用 'transaction.atomic上下文管理器,以确保其中的所有数据库操作都被视为单个事务。

在你的代码中,你正在使用'transaction.atomic将数据库操作包装在主线程中,这是一个良好的开端。但是,您不使用它来包装子线程中的数据库操作。这意味着,如果子线程中发生错误,主线程仍将提交它对数据库所做的更改,这可能导致数据不一致。

要解决此问题,您应该将uploadSun功能在单独的with transaction.atomic():块,以确保将其执行的所有数据库操作视为单个事务。此外,应捕获子线程中发生的任何异常并将其传播到主线程,以便主线程可以中止事务并返回相应的错误消息。

下面是代码的更新版本,应解决这些问题:

from concurrent.futures import ThreadPoolExecutor
import random
from django.db import connection, transaction
from django.http import JsonResponse

pool = ThreadPoolExecutor(5)

@transaction.atomic()
def uploadSun(random, msgList):
    try:
        with transaction.atomic():
            if random % 2 == 0:
                msgList.append(random)
            else:
                connection.cursor().execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
            if random % 5 == 0:
                msgList.append(random)
            else:
                connection.cursor().execute("INSERT INTO upload_not5 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    except Exception as e:
        # Catch any exceptions and propagate them to the main thread
        msgList.append(random)
        msgList.append(str(e))

    return msgList

def uploadTest(request):
    data = {'code': 0, 'msg': '', 'data': []}
    msgList = []
    try:
        with transaction.atomic():
            connection.cursor().execute("INSERT INTO upload_main (id, name,age) VALUES (%s, %s, %s)", (666, 'name', 666))
            futures = [pool.submit(uploadSun, random.randint(1, 9), msgList) for _ in range(5)]
            for future in futures:
                msgList = future.result()
                if isinstance(msgList[-1], str):
                    # If an exception was raised in a child thread, propagate it to the main thread
                    raise Exception(msgList[-1])
            if len(msgList) > 0:
                raise Exception('error')
    except Exception as e:
        data["data"] = msgList
        return JsonResponse(data)
    data['msg'] = 'success'
    return JsonResponse(data)

在此更新版本中,uploadSun函数包装在单独的with transaction.atomic():块,以确保将其执行的所有数据库操作视为单个事务。子线程中发生的任何异常都将被捕获并追加到msgList,以及相应的随机数。
在uploadTest函数,futures列表包含“pool.submit ”的结果

根据代码和错误提示,可能存在多个问题。下面提供一些可能的解决方案:

1 数据库连接问题:在多线程环境下,使用数据库连接需要确保每个线程都有独立的连接,可以考虑使用连接池来管理连接。此外,在使用完连接后需要及时关闭连接以避免资源泄漏。可以尝试使用Django自带的连接池工具django-db-connection-pool。

2 线程池中的函数调用问题:在函数uploadSun中,如果random % 2 == 0和random % 5 == 0同时成立,会导致msgList被添加两次,从而可能导致线程安全问题。可以尝试将两次数据库操作分别放在不同的函数中,分别提交到线程池中执行,从而避免竞争条件。

3 异常处理问题:当前的代码在子线程中抛出异常时,不会将异常传递回主线程,因此可能导致程序崩溃。可以考虑在uploadSun函数中捕获异常并将异常信息添加到msgList中返回给主线程,以便主线程能够得到完整的错误信息。同时,需要在主线程中对异常进行捕获并处理。

4 主线程中对线程池结果的处理问题:当前的代码中,主线程对线程池的结果处理存在问题。tempList在循环中被覆盖,最终只保留了最后一次提交的任务的结果。可以将tempList定义为列表,每次提交任务后将任务的Future对象添加到列表中,然后在循环结束后依次调用result方法获取各个任务的执行结果。

下面是一个可能的修改方案,用于参考:

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED

@action(methods=['post'], detail=False)
def uploadTest(self, request):
    data = {'code': 0, 'msg': '', 'data': []}
    msgList = []
    futures = []
    try:
        with transaction.atomic():
            connection.cursor().execute("INSERT INTO upload_main (id, name,age) VALUES (%s, %s, %s)", (666, 'name', 666))
            for i in range(0, 5):
                rand = random.randint(1, 9)
                print(rand)
                futures.append(pool.submit(uploadSun, rand))
            wait(futures, return_when=FIRST_COMPLETED)
            for future in futures:
                msgList.extend(future.result())
            if len(msgList) > 0:
                raise Exception('error')
    except Exception as e:
        data["data"] = msgList
        return JsonResponse(data)
    data['msg'] = 'success'
    return JsonResponse(data)

def uploadSun(random):
    msgList = []
    try:
        if random % 2 == 0:
            msgList.append(random)
        else:
            connection.cursor().execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name

如果对您有帮助,请给与采纳,谢谢。

这段代码涉及到多线程和事务的问题。可以看到,主线程使用了事务,但是子线程没有。这样可能导致主线程和子线程操作的数据不一致。

具体来说,如果子线程报错,会将报错的随机数添加到 msgList 中,但是在主线程中进行判断时,msgList 并没有加上子线程的结果。因此,即使子线程出错,主线程依然会提交事务,导致数据不一致。

解决这个问题的方法是,让子线程也参与到事务中。具体来说,可以使用 Django 的 atomic 装饰器,将 uploadSun 方法也包裹在事务中。同时,在主线程中获取 tempList.result() 时,需要在循环中不断地将结果加到 msgList 中。

下面是修改后的代码:

@action(methods=['post'], detail=False)
def uploadTest(self, request):
    data = {'code': 0, 'msg': '', 'data': []}
    msgList = []
    try:
        with transaction.atomic():
            connection.cursor().execute("INSERT INTO upload_main (id, name,age) VALUES (%s, %s, %s)", (666, 'name', 666))
            futures = []
            for i in range(0, 5):
                rand = random.randint(1, 9)
                print(rand)
                future = pool.submit(uploadSun, rand)
                futures.append(future)

            for future in futures:
                result = future.result()
                msgList.extend(result)

            if len(msgList) > 0:
                raise Exception('error')
    except Exception as e:
        data["data"] = msgList
        return JsonResponse(data)
    data['msg'] = 'success'
    return JsonResponse(data)

@transaction.atomic
def uploadSun(random):
    msgList = []
    if random % 2 == 0:
        msgList.append(random)
    else:
        connection.cursor().execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))

    if random % 5 == 0:
        msgList.append(random)
    else:
        connection.cursor().execute("INSERT INTO upload_not5 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))

    return msgList


不知道你解决了没有?如果没有解决,我们可以聊聊。

前面的回答给的都是其中的一个代码片段,或许你需要调试。

根据代码和注释,你的目标是在多线程情况下,对数据库进行插入操作,并且在出现特定随机数的情况下能够回滚所有数据库操作。同时,你想要通过主线程将出现错误的随机数收集起来返回给前端。

多线程是一种并发编程的技术,它可以让多个线程同时执行不同的任务,从而提高程序的性能和效率。在这个代码中,您使用了线程池(pool)来创建多个子线程,每个子线程都会执行uploadSun函数。

然而,多线程也带来了一些问题,特别是在共享资源的情况下。在这个代码中,子线程和主线程都会访问同一个数据库连接,这可能会导致竞争条件和数据不一致的问题。

为了解决这个问题,你使用了事务。事务是一组数据库操作,这些操作被当做一个单独的工作单元来处理,要么全部执行成功,要么全部失败回滚。在这个代码中,你使用了transaction.atomic()来包装整个数据库操作,这样就能保证如果其中任何一个操作失败,所有操作都会被回滚。

但是,在当前的代码中,子线程并没有被包含在事务中,因此在出现错误的情况下,子线程插入的数据不会被回滚。为了解决这个问题,你需要将子线程的操作也包含在事务中,或者使用分布式事务来保证所有的操作都能被回滚。

总之,多线程和事务是编写高效、可靠数据库程序的两个关键概念。在使用它们时,需要小心处理并发问题,同时也要考虑如何保证数据的一致性和可靠性。

https://www.baidu.com/link?url=VIXUeyIqYVKxmprQnGP_hkbj-2r0l6HK-Nyk1t4KYQydJ_KDuaMpeRsIJ85lxquJ2x2mFCsOUItAGQaOFOVWIGeG2G0n1tPNA6e3EduAVC7&wd=&eqid=a4a1890a0008182d0000000263eee03a

这是修改后的代码:

import random
import threading
from django.http import JsonResponse
from django.db import connection, transaction

def uploadSun(random, msgList):
    # 子线程先判断,完成后进行第一次数据库操作
    if random % 2 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        with connection.cursor() as cursor:
            cursor.execute("INSERT INTO upload_not2 (id, name, age) VALUES (%s, %s, %s)", (random, 'name2', random))
    # 子线程完成后进行第二次数据库操作
    if random % 5 == 0:
        # 模拟如果该子线程报错,则返回报错信息统计返回主线程
        msgList.append(random)
    else:
        with connection.cursor() as cursor:
            cursor.execute("INSERT INTO upload_not5 (id, name, age) VALUES (%s, %s, %s)", (random, 'name2', random))
    return msgList

def handle_upload(request):
    # 目标:1.假如每次随机数都为3 ,则能完整插入所有数据,msgList返回空数组
    #      2.随机数出现有一次2/5的倍数,则希望统计出所有报错的随机数,并且数据库保持没有数据插入
    # 当前能实现1,也能实现2的返回所有错误情况,但2时,主线程保持了原子性,子线程没有
    data = {'code': 0, 'msg': '', 'data': []}
    msgList = []
    try:
        with transaction.atomic():
            # 主线程会有一次数据库操作
            with connection.cursor() as cursor:
                cursor.execute("INSERT INTO upload_main (id, name, age) VALUES (%s, %s, %s)", (666, 'name', 666))
            thread_list = []
            for i in range(0, 5):
                rand = random.randint(1, 9)
                print(rand)
                t = threading.Thread(target=uploadSun, args=(rand, msgList))
                thread_list.append(t)
                t.start()
            for t in thread_list:
                t.join()
            if len(msgList)>0:
                raise Exception('error')
    except Exception as e:
        data["data"] = msgList
        return JsonResponse(data)
    data['msg']='success'
    return JsonResponse(data)

这段代码使用了多线程和事务,在主线程和子线程中分别进行了数据库操作,但是在子线程的数据库操作和主线程中的事务没有保持一致性,因此可能会出现一些问题。具体来说,如果在子线程中出现了错误,虽然在子线程中对 msgList 的修改会被保留下来,但是在主线程中无法捕捉到这些错误,因为主线程中的事务已经提交了,因此子线程的操作不会被回滚。

为了解决这个问题,可以考虑将子线程的数据库操作也放在事务中进行,这样就能保证子线程和主线程的操作都在同一个事务中,出现异常时整个事务都会回滚。具体来说,可以将 uploadSun 函数中的两个数据库操作封装在一个 with transaction.atomic(): 块中,这样就能保证子线程的操作和主线程中的操作在同一个事务中。例如:

from django.db import transaction

def uploadSun(random, msgList):
    if random % 2 == 0:
        msgList.append(random)
    else:
        with transaction.atomic():
            connection.cursor().execute("INSERT INTO upload_not2 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
            if random % 5 == 0:
                msgList.append(random)
            else:
                # 实际业务第二次是第一次的关联表,需要加上第一次插入后的id
                connection.cursor().execute("INSERT INTO upload_not5 (id, name,age) VALUES (%s, %s, %s)", (random, 'name2', random))
    return msgList

这样就能保证子线程中的操作和主线程中的操作在同一个事务中了。需要注意的是,在子线程中进行的数据库操作和主线程中的数据库操作应该是互相独立的,不应该有任何关联。否则可能会出现死锁等问题。