sscflp(Single Source Capacitated Facility Location Problem)
1.前言
ssclfp,又称单源有限容量设施选址问题,可以描述如下:
- 可以在m个地点上建立工厂,但都得有一定的费用;
- 有n个用户需要使用这些工厂,他们到不同工厂需要的消费是不一样的;
- 每个工厂都是有固定容量的,而选择到这个工厂消费的用户都得占掉一些空间,用户占掉的空间不能超出工厂的容量;
- 现在求总的最小消费——工厂建立的费用以及用户的消费。
2.分析
单源固定容量设施选址问题中仅m个工厂的启用与关闭的选择就可以有m!种了,更不要说n个用户的工厂选择情况了,很显然,这是一个NP-hard问题,因为每个用户的消费以及占用的空间都是不一样的,所以我们不能选择使用那些穷搜的方法,只能像之前的TSP问题一样,选择使用模拟退火法之类的。而我选择使用的算法有三种——贪心算法、多邻域操作爬山法、模拟退火法。当然,如禁忌搜索、遗传算法或者线性规划之类的也是不错的选择,但时间有限就不一一实现了。
2.1 贪心算法
贪心算法,是一种局部搜索的方法,有某些情况下它能找到最优解,但是大部分情况下它都无法找到最优解。它的核心思想是从对问题求解时,总是做出在当前看来是最好的选择,隐藏这样的算法总能找到局部最优解。而且,即使是局部最优解的优劣和贪心策略的选择也很有关系,选择的贪心策略必须具备无后效性,即某个状态以前的过程不会影响以后的状态,只与当前状态有关。
在选址问题中我选择了一个非常简单的贪心策略——为了不让工厂的状态影响到用户的选择,我先把所有工厂都启动了,然后每个用户根据自己对工厂的消费来选择能使消费费用最小的工厂,当然得兼顾容量,在费用与容量间权衡,如果工厂容量不足了,那么就选择那些容量足够的而且使得消费费用最小的工厂来服务。
2.2 多领域操作爬山法
爬山法,我在之前的一篇博客TSP问题——模拟退火法里面是有提到的,具体的描述以及流程我就不多言了,这里只谈谈我使用到的邻域操作:
- 前提:基于工厂状态的爬山法,即根据工厂的启用以及关闭的状态来对用户进行贪心计算得出最小费用,当然,这样的算法是无法得出最优解的,因为贪心,但是靠近最优解的局部最优还是能找到的。
- 编码:因为是基于工厂状态的,所以,这里的编码很简单,就是01编码,0表示这个工厂关闭了,而1表示这个工厂启用了,然后所有工厂的状态用一个一维数组来表示就行了。
- 邻域操作:我在这里选择了三种邻域操作,因为多个邻域操作的良好配合能使得多邻域算法的解更好。
- 01变换:即用1减去数组中一个代表一个工厂的数。如果一个工厂启动了,那么关闭它,否则启动它;
- 交换数组中的两个数,即交换两个工厂的启动情况;
- 将数组中一个数插入到另一个数前,即改变两个数间所有工厂的启动状态。
这些只是基本的邻域操作而已,而且由于这次不追求解的优良性了,所以其实没这么优化。
2.3 模拟退火法
模拟退火法与多邻域爬山法很相似(我这里实现起来),只是多出了一些温度以及接受差解的可能而已,而且由于时间问题和基于工厂状态这样的局限性,这个算法也是没有可能达到最优解的,其余的与之前的多领域爬山法很相似,也在之前的一篇博客TSP问题——模拟退火法里面是有提到的,里面有具体的描述以及算法流程。
3.程序实现
3.1 编程环境
python3.6、pycharm,其中python需要下载的库只有著名的numpy库。
3.2 数据来源
http://www.di.unipi.it/optimize/Data/MEX.html
或者这个github里面的Instance
数据形式如下:
3.3 代码实现
# coding: UTF-8
# 数据来源:http://www.di.unipi.it/optimize/Data/MEX.html
# sscflp(Single Source Capacitated Facility Location Problem)
import numpy as np
import re
import time
import csv
import os
class Solution:
# tag是标记
# filename是计算的数据来源
# mode可以是
# "greed"(贪心)
# "la"(局部搜索——爬山法)
# "sa"(模拟退火法)
# csv_writer是将结果写入文件的csv.writer,用于记录每个样例的
# 最小消费
# 计算用时
# result_file是用于写入具体结果的文件,会记录每个样例的三行信息:
# 最小消费
# 工厂启动情况
# 用户对工厂的选择情况
def __init__(self, tag, filename, mode="greed", csv_writer=None, result_file=None):
file = open(filename, "r")
lines = file.read().splitlines()
# 首先读取工厂数量以及用户数量
line = re.findall("\d+", lines.pop(0))
factory_num, user_num = int(line[0]), int(line[1])
# factories里包含每个工厂的状态,每个item有两个元素,第一个是工厂容量,第二个是启动工厂的消费
self.factories = np.zeros([factory_num, 2], dtype=np.int)
for i in range(factory_num):
line = re.findall("\d+", lines.pop(0))
self.factories[i] = [int(line[0]), int(line[1])]
# 每个用户需要消耗的工厂容量
user_volume = []
while len(user_volume) < user_num:
user_volume.extend(np.array(re.findall("\d+", lines.pop(0))).astype(np.int).tolist())
# 每个用户使用对应工厂的消费,每个用户使用不同工厂的费用不用
self.users = np.zeros([user_num, factory_num + 1], dtype=np.int)
for i in range(user_num):
user = [user_volume[i]]
while len(user) < factory_num + 1:
user.extend(np.array(re.findall("\d+", lines.pop(0))).astype(np.int).tolist())
self.users[i] = user
file.close()
# 计算的总消费结果、工厂启动的情况、用户选择使用的工厂、是否选择正确的算法
result, factory_state, user_factory, flag = None, None, None, True
start = time.time()
if mode == "greed":
result, factory_state, user_factory = self.greed_answer(tag)
elif mode == "la":
result, factory_state, user_factory = self.la_answer(tag)
elif mode == "sa":
result, factory_state, user_factory = self.sa_answer(tag)
else:
print("something wrong happen!")
flag = False
# 计算耗时
time_cost = time.time() - start
print("用时", time_cost, "\n")
# 将结果写入文件
if csv_writer is not None and flag:
# 记录该样例的最小消费以及计算耗时
csv_writer.writerow([str(tag), str(result), str(time_cost)])
# 将具体结果写入文件
if result_file is not None and flag:
# 记录该样例的最小消费、工厂启用情况、用户选择情况
result_file.write("p{0}".format(tag) + "\n" + str(result) + "\n"
+ ' '.join(np.array(factory_state).astype(np.str).tolist())
+ "\n" + ' '.join(np.array(user_factory).astype(np.str).tolist()) + "\n\n")
# 贪心算法,tag是标记
def greed_answer(self, tag):
# 为了不让启动工厂的消费影响到用户的选择情况,在所有工厂开启的情况下进行贪心。
# 当然,这只是一种效果似乎不错的贪心策略,应该还有其他的更好的贪心策略选择。
factory_states = np.ones(self.factories.shape[0], np.int).tolist()
best_result, user_factory, num = self.calculate(factory_states)
print("greed--result{0}".format(tag), best_result, num)
return best_result, factory_states, user_factory
# 多领域操作爬山法——基于工厂与贪心,tag是标记
def la_answer(self, tag):
# 开始时工厂启用情况
factory_num = self.factories.shape[0]
factory_states = np.ones([factory_num], np.int).tolist()
# 决定爬山法爬山次数的外层循环的参数
current_temperature = 2 * factory_num // 10
final_temperature = 0.5 / (factory_num // 10)
# 初始的 消费、用户选择情况、无法选上工厂的用户的数量
best_result, best_user_factory, best_num = self.calculate(factory_states)
correct_flag = True
while current_temperature > final_temperature:
for _ in range(factory_num):
num1, num2 = np.random.randint(low=0, high=10, size=2).tolist()
while num1 == num2:
num1, num2 = np.random.randint(low=0, high=10, size=2).tolist()
if num1 > num2:
num1, num2 = num2, num1
# 让启用的工厂总能满足用户的需求
if not correct_flag:
zero_indexes = np.where(np.array(factory_states) == 0)[0]
length = zero_indexes.shape[0]
if length > 0:
factory_states[zero_indexes[np.random.randint(low=0, high=length, size=1)[0]]] = 1
correct_flag = True
# 用1减一个数,即如果一个工厂启动了,那么关闭它,否则启动它
temp_states = factory_states.copy()
temp_states[num1] = 1 - temp_states[num1]
temp_result, temp_user_factory, temp_num = self.calculate(temp_states)
if best_result > temp_result > 0 and temp_num == 0:
best_result, best_user_factory, best_num = temp_result, temp_user_factory, temp_num
factory_states = temp_states
elif temp_result == -1:
correct_flag = False
# 交换两个数,即交换两个工厂的启动情况
temp_states = factory_states.copy()
temp_states[num1], temp_states[num2] = temp_states[num2], temp_states[num1]
temp_result, temp_user_factory, temp_num = self.calculate(temp_states)
if best_result > temp_result > 0 and temp_num == 0:
best_result, best_user_factory, best_num = temp_result, temp_user_factory, temp_num
factory_states = temp_states
elif temp_result == -1:
correct_flag = False
# 将num2位置的数插到num1位置,改变num1到num2间所有工厂的启动情况
temp_states = factory_states.copy()
_temp = temp_states[num1]
for i in range(num1, num2):
temp_states[i] = temp_states[i + 1]
temp_states[num2] = _temp
temp_result, temp_user_factory, temp_num = self.calculate(temp_states)
if best_result > temp_result > 0 and temp_num == 0:
best_result, best_user_factory, best_num = temp_result, temp_user_factory, temp_num
factory_states = temp_states
elif temp_result == -1:
correct_flag = False
# 将num2与num1间的数逆转 -- 懒得实现了!!!
# 参数变化
current_temperature = current_temperature * 0.98
print("la--result{0}".format(tag), best_result, best_num)
return best_result, factory_states, best_user_factory
# 模拟退火法——基于工厂启动状态与贪心,tag是标记
def sa_answer(self, tag):
# 开始时工厂启动情况
factory_num = self.factories.shape[0]
factory_states = np.ones([factory_num], np.int).tolist()
# 初温与末温
current_temperature = 2 * factory_num // 10
final_temperature = 0.5 / (factory_num // 10)
# 初始的 消费、用户选择情况、无法选上工厂的用户的数量
now_result, now_user_factory, now_num = self.calculate(factory_states)
# 维护一个全局的最优解
best_result, best_user_factory, best_num = now_result, now_user_factory.copy(), now_num
best_factory_states = factory_states.copy()
# 这个flag是让工厂总能满足用户的需求
correct_flag = True
while current_temperature > final_temperature:
for _ in range(factory_num):
num1, num2 = np.random.randint(low=0, high=10, size=2).tolist()
while num1 == num2:
num2 = np.random.randint(low=0, high=10)
if num1 > num2:
num1, num2 = num2, num1
if not correct_flag:
zero_indexes = np.where(np.array(factory_states) == 0)[0]
length = zero_indexes.shape[0]
if length > 0:
factory_states[zero_indexes[np.random.randint(low=0, high=length, size=1)[0]]] = 1
correct_flag = True
temp_states, temp_result, temp_user_factory, temp_num = None, 10000000, None, None
# 用1减一个数,即如果一个工厂启动了,那么关闭它,否则启动它
temp_states1 = factory_states.copy()
temp_states1[num1] = 1 - temp_states1[num1]
temp_result1, temp_user_factory1, temp_num1 = self.calculate(temp_states1)
if temp_num1 == 0 and temp_result > temp_result1 > 0:
temp_states = temp_states1
temp_result, temp_user_factory, temp_num = temp_result1, temp_user_factory1, temp_num1
# 交换两个数,即交换两个工厂的启动情况
temp_states2 = factory_states.copy()
temp_states2[num1], temp_states2[num2] = temp_states2[num2], temp_states2[num1]
temp_result2, temp_user_factory2, temp_num2 = self.calculate(temp_states2)
if temp_num2 == 0 and temp_result > temp_result2 > 0:
temp_states = temp_states2
temp_result, temp_user_factory, temp_num = temp_result2, temp_user_factory2, temp_num2
# 将num2位置的数插到num1位置,改变num1到num2间所有工厂的启动情况
temp_states3 = factory_states.copy()
_temp = temp_states3[num1]
for i in range(num1, num2):
temp_states3[i] = temp_states3[i + 1]
temp_states3[num2] = _temp
temp_result3, temp_user_factory3, temp_num3 = self.calculate(temp_states3)
if temp_num3 == 0 and temp_result > temp_result3 > 0:
temp_states = temp_states3
temp_result, temp_user_factory, temp_num = temp_result3, temp_user_factory3, temp_num3
diff = best_result - temp_result
if temp_num == 0 and temp_result > 0 and (diff > 0 or np.exp(
-1 * np.abs(diff) / current_temperature) >= np.random.rand()):
now_result, now_user_factory, now_num = temp_result, temp_user_factory, temp_num
factory_states = temp_states
if now_result < best_result:
best_result, best_user_factory, best_num = now_result, now_user_factory.copy(), now_num
best_factory_states = factory_states.copy()
elif temp_result == -1:
correct_flag = False
# 将num2与num1间的数逆转--懒得实现了!!!
# 温度变化
current_temperature = current_temperature * 0.98
print("sa--result{0}".format(tag), best_result, best_num)
return best_result, best_factory_states, best_user_factory
# 根据工厂状态计算消费——基于贪心
def calculate(self, factory_states):
# 先获取启动了的工厂,根据启动了的工厂限制用户选择
factories = []
correct_indexes = []
users = [self.users[:, 0]]
for i in range(len(factory_states)):
if factory_states[i] == 1:
factories.append(self.factories[i])
correct_indexes.append(i)
users.append(self.users[:, i + 1])
# 如果没有启动的工厂,直接返回
if len(factories) == 0:
return 10000000, None, None
factories = np.array(factories)
users = np.vstack(users).T
result = factories[:, 1].sum()
volumes = factories[:, 0].copy()
# 如果启动了的工厂容量不足,直接返回
if volumes.sum() < users[:, 0].sum():
return -1, None, None
# 依据贪心计算消费
user_factory = []
num = 0
for _user in users:
user, max = _user.copy(), _user.max()
user_volume, user_cost = user[0], user[1:]
flag = True
while flag:
min, index = user_cost.min(), np.where(user_cost==np.min(user_cost))[0][0]
if max == min:
num += 1
flag = False
if volumes[index] >= user_volume:
volumes[index] -= user_volume
result += min
user_factory.append(correct_indexes[index])
break
else:
user_cost[index] = max
# 返回消费、用户对工厂的选择、没法选择到工厂的用户数量
return result, user_factory, num
# 模拟退火法——基于用户选择情况
def sa_answer2(self, tag):
pass
# 线性规划
def lp_answer(self, tag):
pass
# 遗传算法
def ga_answer(self, tag):
pass
# 禁忌搜索
def tabu_answer(self, tag):
pass
if __name__ == "__main__":
# 创建结果文件夹
result_path = os.getcwd() + "\\result"
if not os.path.exists(result_path):
os.makedirs(result_path)
# 实例化对应的文件管理对象,用于写入结果
greed_csv_file = open("result/greed_table_result.csv", mode="w+", newline='', encoding="utf-8")
la_csv_file = open("result/la_table_result.csv", mode="w+", newline='', encoding="utf-8")
sa_csv_file = open("result/sa_table_result.csv", mode="w+", newline='', encoding="utf-8")
greed_result_file = open("result/greed_detail_result.txt", mode="w+", encoding="utf-8")
la_result_file = open("result/la_detail_result.txt", mode="w+", encoding="utf-8")
sa_result_file = open("result/sa_detail_result.txt", mode="w+", encoding="utf-8")
greed_writer = csv.writer(greed_csv_file, dialect='excel', delimiter=',')
la_writer = csv.writer(la_csv_file, dialect='excel', delimiter=',')
sa_writer = csv.writer(sa_csv_file, dialect='excel', delimiter=',')
fields = ["index", "result", "time(s)"]
greed_writer.writerow(fields)
la_writer.writerow(fields)
sa_writer.writerow(fields)
# 将各个算法的结果写入文件
base_instances = "./Instances/p{0}"
for i in range(1, 72):
# 贪心
Solution(i, base_instances.format(i), mode="greed", csv_writer=greed_writer, result_file=greed_result_file)
# 局部搜索——爬山法
Solution(i, base_instances.format(i), mode="la", csv_writer=la_writer, result_file=la_result_file)
# 模拟退火法
Solution(i, base_instances.format(i), mode="sa", csv_writer=sa_writer, result_file=sa_result_file)
# 关闭各个文件
greed_csv_file.close()
la_csv_file.close()
sa_csv_file.close()
greed_result_file.close()
la_result_file.close()
sa_result_file.close()
3.4 运行结果
https://github.com/liangyy75/ProgrammingInPython/tree/master/sscflp/result