Batching
Back to ProblemsZalando's Joint Order Selection, Allocation, Batching and Picking Problem (https://github.com/zalandoresearch/batching-benchmarks)
Instances
Algorithms
Problem Code
import json
import math
import os
from typing import List
from typing import Set, Dict, List
from collections import defaultdict
from pydantic import BaseModel
from algobench import algorithm
from dotenv import load_dotenv
load_dotenv()
import logging
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s %(asctime)s %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger(__name__)
class Article(BaseModel):
id: str
volume: float
def __hash__(self):
return hash(self.id)
class WarehouseItem(BaseModel):
id: str
row: int
aisle: int
article: Article | None
zone: str
def __hash__(self):
return hash(self.id)
def __lt__(self, other):
return self.id < other.id
class Order(BaseModel):
id: str
positions: List[Article]
def __hash__(self):
return hash(self.id)
class Parameters(BaseModel):
min_number_requested_items: int
max_orders_per_batch: int
max_container_volume: int
first_row: int
last_row: int
first_aisle: int
last_aisle: int
class Picklist(BaseModel):
item_ids: List[str]
class Batch(BaseModel):
picklists: List[Picklist]
order_ids: List[str]
class Solution(BaseModel):
batches: List[Batch]
class Instance(BaseModel):
id: str
articles: List[Article]
orders: List[Order]
warehouse_items: List[WarehouseItem]
zones: List[str] = []
parameters: Parameters
@staticmethod
def aisle_distance(u: int, v: int):
return abs(u - v)
def row_distance(self, u: int, v: int):
middle_distance = abs(u) + abs(v)
if min(u, v) < 0 < max(u, v):
return middle_distance
elif u < 0:
return min(
middle_distance, 2 * abs(self.parameters.first_row) - middle_distance
)
else:
return min(middle_distance, 2 * self.parameters.last_row - middle_distance)
def distance(self, u: WarehouseItem, v: WarehouseItem):
if u.zone != v.zone:
return math.inf
return self.row_distance(u.row, v.row) + self.aisle_distance(u.aisle, v.aisle)
def picklist_cost(self, picklist: List[WarehouseItem]) -> int:
if len(picklist) == 0:
return 0
conveyor_belt = WarehouseItem(id="conveyor", row=0, aisle=0, article=None, zone=picklist[0].zone)
return (
self.distance(conveyor_belt, picklist[0])
+ self.distance(picklist[-1], conveyor_belt)
+ sum(
self.distance(picklist[i], picklist[i + 1])
for i in range(len(picklist) - 1)
)
)
def check_feasibility(instance: Instance, solution: Solution) -> bool:
# no duplicate items
all_solution_items = [item_id for batch in solution.batches for picklist in batch.picklists for item_id in picklist.item_ids]
if len(all_solution_items) != len(set(all_solution_items)):
logger.warning("Duplicate items in solution")
return False
# all items in instance
instance_item_ids = {item.id: item for item in instance.warehouse_items}
instance_order_ids = {order.id: order for order in instance.orders}
for item_id in all_solution_items:
if item_id not in instance_item_ids:
logger.warning(f"Item {item_id} not in instance")
return False
if (
len(all_solution_items)
< instance.parameters.min_number_requested_items
):
logger.warning("Fewer items than requested")
return False
for batch in solution.batches:
if len(batch.order_ids) > instance.parameters.max_orders_per_batch:
logger.warning("Batch exceeds max commissions limit!")
return False
articles = [
article.id for order_id in batch.order_ids for article in instance_order_ids[order_id].positions
]
picklist_articles = [
instance_item_ids[item_id].article.id for picklist in batch.picklists for item_id in picklist.item_ids
]
if sorted(articles) != sorted(picklist_articles):
logger.warning(
"requested and assigned articles for some orders in this batch do not match!"
)
return False
for picklist in batch.picklists:
if len(set(instance_item_ids[item_id].zone for item_id in picklist.item_ids)) > 1:
logger.warning("picklist contains items of multiple zones!")
return False
if (
sum(instance_item_ids[item_id].article.volume for item_id in picklist.item_ids)
> instance.parameters.max_container_volume
):
logger.warning("Container volume exceeds limit")
return False
return True
def evaluate(instance: Instance, solution: Solution) -> float:
warehouse_items = {item.id: item for item in instance.warehouse_items}
nbr_picklist_items = sum(
len(p.item_ids) for batch in solution.batches for p in batch.picklists
)
logger.info(f"number of picklist items: {nbr_picklist_items}")
nbr_picklists = sum(len(batch.picklists) for batch in solution.batches)
logger.info(f"number of picklists: {nbr_picklists}")
objective_value = sum(
instance.picklist_cost([warehouse_items[item_id] for item_id in picklist.item_ids])
for batch in solution.batches
for picklist in batch.picklists
)
logger.info(f"total distance: {objective_value}")
return objective_value
def compute_picklists(
items: List[WarehouseItem], max_container_volume: int
) -> List[Picklist]:
items_by_zone = defaultdict(list)
for item in items:
items_by_zone[item.zone].append(item)
picklists: List[Picklist] = []
for zone in items_by_zone:
picklist = Picklist(item_ids=[])
cur_volume = 0
for item in sorted(items_by_zone[zone], key=lambda i: (i.aisle, i.row)):
if cur_volume + item.article.volume <= max_container_volume:
picklist.item_ids.append(item.id)
cur_volume += item.article.volume
else:
picklists.append(picklist)
picklist = Picklist(item_ids=[item.id])
cur_volume = item.article.volume
if len(picklist.item_ids) > 0:
picklists.append(picklist)
return picklists
def find_best_order(
remaining_orders: Set[Order],
selected_items: List[WarehouseItem],
warehouse_items: Dict[Article, Set[WarehouseItem]],
instance: Instance,
) -> tuple[float, List[WarehouseItem], Order]:
selected_items_by_zone = {
zone: [WarehouseItem(id="conveyor", row=0, aisle=0, article=None, zone=zone)] for zone in instance.zones
}
for i in selected_items:
selected_items_by_zone[i.zone].append(i)
def distance_per_item(order: Order) -> tuple[float, List[WarehouseItem]]:
total_distance = 0
add_items = set()
add_items_by_zone = defaultdict(list)
for article in order.positions:
distance, item = min(
(
min(
instance.distance(item1, item2)
for item2 in selected_items_by_zone[item1.zone]
+ add_items_by_zone[item1.zone]
),
item1,
)
for item1 in warehouse_items[article]
if item1 not in add_items
)
total_distance += distance
add_items_by_zone[item.zone].append(item)
add_items.add(item)
return total_distance / len(add_items), list(add_items)
return min((*distance_per_item(order), order) for order in remaining_orders)
@algorithm(name="Batching",
feasibility_function=check_feasibility,
scoring_function=evaluate,
api_key=os.getenv("ALGOBENCH_API_KEY"), is_minimization=True)
def greedy_solver(instance: Instance) -> Solution:
item_goal = instance.parameters.min_number_requested_items
remaining_orders = set(instance.orders.copy())
warehouse_article_items: Dict[Article, Set[WarehouseItem]] = defaultdict(set)
for item in instance.warehouse_items:
warehouse_article_items[item.article].add(item)
solution = Solution(batches=[])
# repeat while item goal is not reached and there are remaining orders in the pool
while item_goal > 0 and len(remaining_orders) > 0:
logger.info(f"Remaining items to batch: {item_goal}")
batch_orders: List[Order] = []
batch_order_ids: List[str] = []
batch_selected_items = []
logger.info(
f"Creating a batch, total remaining orders in the pool: {len(remaining_orders)}"
)
while (
len(remaining_orders) > 0
and len(batch_orders) < instance.parameters.max_orders_per_batch
):
orders = remaining_orders
cost, selected_items, selected_order = find_best_order(
orders, batch_selected_items, warehouse_article_items, instance
)
if selected_order is None:
break
batch_orders.append(selected_order)
batch_selected_items.extend(selected_items)
batch_order_ids.append(selected_order.id)
for item in selected_items:
warehouse_article_items[item.article].remove(item)
remaining_orders.remove(selected_order)
batch = Batch(
order_ids=batch_order_ids,
picklists=compute_picklists(
batch_selected_items, instance.parameters.max_container_volume
),
)
solution.batches.append(batch)
logger.info(f"Batch created with a total of {len(batch_selected_items)} items")
item_goal -= len(batch_selected_items)
return solution
def solve(path) -> None:
articles_json = json.load(open(f"{path}/articles.json"))
orders_json = json.load(open(f"{path}/orders.json"))
warehouse_items_json = json.load(open(f"{path}/warehouse_items.json"))
zones = list(set(item["zone"] for item in warehouse_items_json))
parameters_json = json.load(open(f"{path}/parameters.json"))
articles = {article["id"]: Article(**article) for article in articles_json}
warehouse_items = [WarehouseItem(id=item["id"], row=item["row"], aisle=item["aisle"], article=articles[item["article"]], zone=item["zone"]) for item in warehouse_items_json]
orders = [Order(id=order["id"], positions=[articles[article_id] for article_id in order["positions"]]) for order in orders_json]
parameters = Parameters(**parameters_json)
# Create the Instance with proper models
instance = Instance(
id=path,
articles=list(articles.values()),
orders=orders,
warehouse_items=warehouse_items,
zones=zones,
parameters=parameters
)
with open(f"{path}/instance.json", "w") as f:
json.dump(instance.model_dump(), f)
solution = greedy_solver(instance)
print(check_feasibility(instance, solution))
print(evaluate(instance, solution))
if __name__ == "__main__":
for size in ["small"]: #, "medium", "large"]:
for i in range(5):
logger.info(f"Solving instance {size}-{i}")
solve(f"instances/{size}-{i}")