Select Git revision
IntenSelectComponent.cpp
environment.py 5.98 KiB
import gymnasium as gym
import numpy as np
import simpy
import simplesimmodel as model
"""
Environment for the RL agent
"""
class BusinessProcessEnv(gym.Env):
def __init__(self, space, activities):
self.ressources = space[0]
self.case = space[1]
self.activities = activities
self.observation_space = gym.spaces.Dict(
{
'process': gym.spaces.MultiDiscrete(self.ressources),
'case': gym.spaces.MultiDiscrete(self.case),
'event': gym.spaces.Discrete(self.activities)
}
)
self.action_space = gym.spaces.Discrete(self.activities)
self.current_state = {
'process': np.array(self.ressources),
'case': np.zeros(len(self.case), dtype=int),
'event': 0
}
self.model_env = simpy.Environment()
self.process = model.BusinessProcess(self.model_env, self.ressources)
self.model_env.process(model.run_process(self.model_env, self.process))
# self.done_cases = set([])
self.reward = 0
def get_current_state(self, caseid):
process, case, event = model.get_current_state(self.process, caseid)
state = {
'process': process,
'case': case,
'event': event
}
return state
def step(self, action):
self.process.next = action
if self.process.case_id in self.process.done_cases:
self.process.case_id = np.random.choice(self.process.active_cases)
case_obj = self.process.case_objects[self.process.case_id]
self.current_state = self.get_current_state(case_obj)
# print(f"{self.current_state['event']}")
start = self.process.env.now
self.process.flag = True
if self.process.is_valid(self.current_state['event'], action, case_obj):
while(self.process.flag):
self.model_env.step()
stop = self.process.env.now
case_obj = self.process.case_objects[self.process.case_id]
# print(f"Agent did case {self.process.case_id} activity {action}.")
next_state = self.get_current_state(case_obj)
self.current_state = next_state
next_state = self.flatten_observation_to_int(next_state)
time = stop - start
reward = 10000 - time
self.reward += reward
done = True if (len(self.process.done_cases) == 5 or len(self.process.active_cases) == 0) else False
return next_state, self.reward, done, None
else:
self.reward += 0
next_state = self.flatten_observation_to_int(self.current_state)
done = False
return next_state, self.reward, done, None
def reset(self, seed=None, options=None):
# Reset the environment to the initial state
# Implement a function which extracts the current state from an event log / simulation model
super().reset(seed=seed)
self.current_state = {
'process': np.array(self.ressources),
'case': np.zeros(len(self.case), dtype=int),
'event': 0
}
self.current_step = 0
self.model_env = simpy.Environment()
self.process = model.BusinessProcess(self.model_env, self.ressources)
self.model_env.process(model.run_process(self.model_env, self.process))
self.process.done_cases = set([])
self.reward = 0
return self.current_state, None
def render(self, mode='human'):
# Render the current state of the environment
pass
def flatten_observation(self, observation):
flattened = []
for i in observation['process']: flattened.append(i)
for j in observation['case']: flattened.append(j)
flattened.append(observation['event'])
return flattened
def flatten_observation_to_int(self, observation):
state = 0
state += observation['event']*pow(2,10)
state += observation['case'][1]*pow(2,2)
state += observation['case'][2]*pow(2,2)
event = observation['event']
if event == 0:
state += observation['process'][0]*pow(2,6)
elif event == 1:
state += observation['process'][1]*pow(2,6)
elif 1 < event <=3:
state += observation['process'][2]*pow(2,6)+observation['process'][3]*pow(2,7)+observation['process'][4]*pow(2,8)
elif 3 < event <=6:
state += observation['process'][5]*pow(2,6)+observation['process'][6]*pow(2,7)
elif 6 < event <= 8:
state += observation['process'][7]*pow(2,6)+observation['process'][8]*pow(2,7)+observation['process'][9]*pow(2,8)
elif 8 < event <= 11:
state += observation['process'][10]*pow(2,6)+observation['process'][11]*pow(2,7)+observation['process'][12]*pow(2,8)
elif 11 < event <= 14:
state += observation['process'][0]*pow(2,6)
else:
pass
return state
def main():
process = []
num_s = 1
process.append(num_s)
num_ot = 5
process.append(num_ot)
num_sh_a = 3
process.append(num_sh_a)
num_sh_b = 3
process.append(num_sh_b)
num_sh_c = 3
process.append(num_sh_c)
num_m_a = 3
process.append(num_m_a)
num_m_b = 2
process.append(num_m_b)
num_p_a = 4
process.append(num_p_a)
num_p_b = 5
process.append(num_p_b)
num_p_c = 4
process.append(num_p_c)
num_ds_a = 8
process.append(num_ds_a)
num_ds_b = 8
process.append(num_ds_b)
num_ds_c = 8
process.append(num_ds_c)
case = []
for i in range(15):
case.append(1)
space = [process, case]
activities = 16
env = BusinessProcessEnv(space, activities)
state = env.current_state
flattened = env.flatten_observation_to_int(state)
print(flattened)
for value in range(env.action_space.n):
print(value)
if __name__ == "__main__":
main()