关于产线机构及调度怎么合理?

我们工厂有两台设备暂定为机器1和机器2.可以同时在生产不同的订单批次。每个订单随意拆分不同的批次且大小不一致。两个设备物料出来后经过同一条肯定交替经过输送线进入下一个工序里面。该工序有3个筐连在一起的,装了料进入下一个工序。现在问题是用那种方式进入筐中,并以那种方式取出来能够保证批次的顺序一样,如果3个筐实现不了,那么4个筐是否可行?
示意图如下:

img

根据您的描述,您有两台设备(机器1和机器2),它们可以同时生产不同的订单批次。每个订单可以随意拆分为不同的批次,并且批次的大小可能不一致。这些批次经过同一条传送带后,进入下一个工序,该工序有3个连在一起的筐。您希望确定一种方式来确保批次的顺序保持一致。

针对您的问题,我将提供两种解决方案供参考:

方案1: 使用队列(先进先出)

您可以使用队列(FIFO)来保持批次的顺序一致。每个设备在完成一批订单后,将批次放入共享的队列中。下一个工序从队列中取出批次并进行处理。这样可以确保批次按照先后顺序被处理。

示例代码:

from queue import Queue

# 创建队列
batch_queue = Queue()

# 机器1生产的批次
machine1_batches = ['batch1', 'batch2', 'batch3']

# 机器2生产的批次
machine2_batches = ['batch4', 'batch5']

# 将机器1的批次放入队列
for batch in machine1_batches:
    batch_queue.put(batch)

# 将机器2的批次放入队列
for batch in machine2_batches:
    batch_queue.put(batch)

# 下一个工序处理批次
while not batch_queue.empty():
    batch = batch_queue.get()
    process_batch(batch)

这种方式确保了批次的顺序,因为队列按照先进先出的原则处理批次。

方案2: 使用优先级队列

如果您需要根据订单或批次的某些属性进行排序或优先级控制,可以考虑使用优先级队列。您可以为每个批次分配一个优先级,并根据优先级从队列中取出批次进行处理。

示例代码:

import heapq

# 创建优先级队列
batch_queue = []

# 机器1生产的批次(带有优先级)
machine1_batches = [('batch1', 2), ('batch2', 1), ('batch3', 3)]

# 机器2生产的批次(带有优先级)
machine2_batches = [('batch4', 1), ('batch5', 2)]

# 将机器1的批次放入优先级队列
for batch in machine1_batches:
    heapq.heappush(batch_queue, batch)

# 将机器2的批次放入优先级队列
for batch in machine2_batches:
    heapq.heappush(batch_queue, batch)

# 下一个工序处理批次
while batch_queue:
    batch = heapq.heappop(batch_queue)[0]
    process_batch(batch)

在上述示例中,每个批次都与一个优先级关联。通过使用heapq模块的`heapp

以上,仅供参考

对于这个问题,你可以考虑以下的算法:

1.将订单按照批次拆分,每个批次有一个唯一的ID。

2.将每个批次与其所需生产的数量存储在一个字典中。

3.为每个批次生成一个计划开始时间和结束时间,以便确定它们应该何时进入生产线。

4.创建两个队列,一个用于机器1,另一个用于机器2,将批次按计划开始时间排队。

5.当机器1或机器2变得可用时,从相应队列中取出下一个批次,并生成一个标识符,以便在后续的过程中跟踪它。

6.将批次送入传送带,并记录它们的位置和标识符。

7.当批次到达筐时,检查它们的标识符,并根据需要将它们放在正确的位置上。如果有多个筐,可以使用队列或堆栈来跟踪它们。

8.当所有批次完成后,记录它们的完成时间,并将它们从队列和字典中删除。

9.如果有新的批次需要生产,重复步骤4-8。

10.最后,您可以通过将批次的开始和结束时间与它们实际完成的时间进行比较,来评估您的生产调度算法的性能。

以下是一个Python实现示例:

python

from collections import defaultdict

class ProductionLine:
    def __init__(self, num_baskets=3):
        self.machine1 = []
        self.machine2 = []
        self.baskets = [[] for _ in range(num_baskets)]
        self.basket_size = len(self.baskets[0])
        self.basket_capacity = self.basket_size * num_baskets
        self.num_baskets = num_baskets
        self.order_dict = defaultdict(dict)
        self.start_time_dict = {}
        self.end_time_dict = {}
        self.current_order_id = 0
        self.completed_orders = []

    def add_order(self, order_id, batch_id, batch_size):
        if batch_id not in self.order_dict[order_id]:
            self.order_dict[order_id][batch_id] = batch_size

    def get_next_batch(self, machine_num):
        if machine_num == 1:
            if not self.machine1:
                return None
            return self.machine1.pop(0)
        else:
            if not self.machine2:
                return None
            return self.machine2.pop(0)

    def put_in_basket(self, basket_idx, item):
        if len(self.baskets[basket_idx]) >= self.basket_size:
            return False
        self.baskets[basket_idx].append(item)
        return True

    def get_from_basket(self, basket_idx):
        if not self.baskets[basket_idx]:
            return None
        return self.baskets[basket_idx].pop(0)

    def is_basket_full(self, basket_idx):
        return len(self.baskets[basket_idx]) == self.basket_size

    def is_order_complete(self, order_id):
        if order_id not in self.order_dict:
            return False
        for batch_id, batch_size in self.order_dict[order_id].items():
            if batch_size > 0:
                return False
        return True

    def remove_completed_order(self, order_id):
        del self.order_dict[order_id]
        del self.start_time_dict[order_id]
        self.completed_orders.append(order_id)

    def schedule_batch(self, batch_id, machine_num, start_time):
        if machine_num == 1:
            self.machine1.append((batch_id, start_time))
        else:
            self.machine2.append((batch_id, start_time))

    def process_orders(self):
        while self.order_dict:
            # get next available batch from each machine
            batch1 = self.get_next_batch(1)
            batch2 = self.get_next_batch(2)

            # put batches into baskets
            basket_idx = 0
            while batch1 or batch2:
                if batch1 and batch2:
                    # both machines have a batch - choose the one with earlier start time
                    if batch1[1] < batch2[1]:
                        item = batch1
               python
                    else:
                        item = batch2
                        batch2 = self.get_next_batch(2)
                elif batch1:
                    item = batch1
                    batch1 = self.get_next_batch(1)
                else:
                    item = batch2
                    batch2 = self.get_next_batch(2)

                # put item in basket
                if not self.put_in_basket(basket_idx, item):
                    # basket is full - move to next basket
                    basket_idx += 1
                    if basket_idx == self.num_baskets:
                        # all baskets are full - wait for next cycle
                        break

            # process batches in each basket
            for basket in self.baskets:
                for i in range(len(basket)):
                    batch_id, start_time = basket[i]
                    order_id = batch_id.split("_")[0]

                    # update batch size and start time
                    batch_size = self.order_dict[order_id][batch_id]
                    start_time = max(start_time, self.start_time_dict.get(order_id, 0))
                    self.start_time_dict[order_id] = start_time
                    end_time = start_time + batch_size

                    # schedule batch for processing
                    self.schedule_batch(batch_id, i % 2 + 1, start_time)

                    # update batch size and order status
                    self.order_dict[order_id][batch_id] = 0
                    if self.is_order_complete(order_id):
                        self.end_time_dict[order_id] = end_time
                        self.remove_completed_order(order_id)

            # clear baskets
            self.baskets = [[] for _ in range(self.num_baskets)]

        # print results
        print("Completed orders:")
        for order_id in self.completed_orders:
            start_time = self.start_time_dict[order_id]
            end_time = self.end_time_dict[order_id]
            print(f"Order {order_id} completed in {end_time - start_time} units of time")



在这个实现中,我们使用了一个ProductionLine类来封装整个生产线的逻辑。该类包含了将订单按批次拆分、将批次放入队列、将批次放入篮子、从篮子中取出批次、判断篮子是否已满等方法。

在process_orders方法中,我们首先获取每台设备上的下一个可用批次,并将它们放入相应的篮子中。然后,我们遍历每个篮子中的批次,并为每个批次生成计划开始时间和结束时间,以便确定它们何时可以进入下一阶段的生产过程。最后,我们将完成的订单输出到控制台上。

注意:以上是一种基于假设的算法,实际情况可能会有所不同,具体实现需要结合实际情况进行优化和调整。

前两个框放机器1的,后两个框放机器2的。从从下到上放,也从下到上取

img


举个例吧 比如来料是不同的批次顺序,需要现进入清洗蓝,再进入吊蓝。最终结果需要在吊篮这将批次顺序保证一致。


m , n = map(int , input().split())
jobs = list(map(int , input().split()))
 
if n <= m:
    print(max(jobs))
else:
    jobs.sort()
    sum_time = 0
    current_jobs = jobs[0:m]
    completed = 0
    while completed < n - m:
        min_value = current_jobs[0]
        sum_time += min_value
        current_jobs = [x - min_value for x in current_jobs]
        del current_jobs[0]
        current_jobs.append(jobs[m + completed])
        completed += 1
    sum_time += current_jobs[-1]
 
    print(sum_time)
不知道你这个问题是否已经解决, 如果还没有解决的话:

如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

最好四个筐,每个机器放2个