Select Git revision
environment.py
Aleksandra Dimitrova authored
environment.py 5.98 KiB
import gymnasium as gym
import numpy as np
import simpy
import simplesimmodel as model
"""
Environment for the RL agent
"""
class BusinessProcessEnv(gym.Env):
def __init__(self, space, activities):
self.ressources = space[0]
self.case = space[1]
self.activities = activities
self.observation_space = gym.spaces.Dict(
{
'process': gym.spaces.MultiDiscrete(self.ressources),
'case': gym.spaces.MultiDiscrete(self.case),
'event': gym.spaces.Discrete(self.activities)
}
)
self.action_space = gym.spaces.Discrete(self.activities)
self.current_state = {
'process': np.array(self.ressources),
'case': np.zeros(len(self.case), dtype=int),
'event': 0
}
self.model_env = simpy.Environment()
self.process = model.BusinessProcess(self.model_env, self.ressources)
self.model_env.process(model.run_process(self.model_env, self.process))
# self.done_cases = set([])
self.reward = 0
def get_current_state(self, caseid):
process, case, event = model.get_current_state(self.process, caseid)
state = {
'process': process,
'case': case,
'event': event
}
return state
def step(self, action):
self.process.next = action
if self.process.case_id in self.process.done_cases:
self.process.case_id = np.random.choice(self.process.active_cases)
case_obj = self.process.case_objects[self.process.case_id]
self.current_state = self.get_current_state(case_obj)
# print(f"{self.current_state['event']}")
start = self.process.env.now
self.process.flag = True
if self.process.is_valid(self.current_state['event'], action, case_obj):
while(self.process.flag):
self.model_env.step()
stop = self.process.env.now
case_obj = self.process.case_objects[self.process.case_id]
# print(f"Agent did case {self.process.case_id} activity {action}.")
next_state = self.get_current_state(case_obj)
self.current_state = next_state
next_state = self.flatten_observation_to_int(next_state)
time = stop - start
reward = 10000 - time
self.reward += reward
done = True if (len(self.process.done_cases) == 5 or len(self.process.active_cases) == 0) else False
return next_state, self.reward, done, None
else:
self.reward += 0
next_state = self.flatten_observation_to_int(self.current_state)
done = False
return next_state, self.reward, done, None
def reset(self, seed=None, options=None):
# Reset the environment to the initial state
# Implement a function which extracts the current state from an event log / simulation model
super().reset(seed=seed)
self.current_state = {
'process': np.array(self.ressources),
'case': np.zeros(len(self.case), dtype=int),
'event': 0
}
self.current_step = 0
self.model_env = simpy.Environment()
self.process = model.BusinessProcess(self.model_env, self.ressources)
self.model_env.process(model.run_process(self.model_env, self.process))
self.process.done_cases = set([])
self.reward = 0
return self.current_state, None
def render(self, mode='human'):
# Render the current state of the environment
pass
def flatten_observation(self, observation):
flattened = []
for i in observation['process']: flattened.append(i)
for j in observation['case']: flattened.append(j)
flattened.append(observation['event'])
return flattened
def flatten_observation_to_int(self, observation):
state = 0
state += observation['event']*pow(2,10)
state += observation['case'][1]*pow(2,2)
state += observation['case'][2]*pow(2,2)
event = observation['event']
if event == 0:
state += observation['process'][0]*pow(2,6)
elif event == 1:
state += observation['process'][1]*pow(2,6)
elif 1 < event <=3:
state += observation['process'][2]*pow(2,6)+observation['process'][3]*pow(2,7)+observation['process'][4]*pow(2,8)
elif 3 < event <=6:
state += observation['process'][5]*pow(2,6)+observation['process'][6]*pow(2,7)
elif 6 < event <= 8:
state += observation['process'][7]*pow(2,6)+observation['process'][8]*pow(2,7)+observation['process'][9]*pow(2,8)
elif 8 < event <= 11:
state += observation['process'][10]*pow(2,6)+observation['process'][11]*pow(2,7)+observation['process'][12]*pow(2,8)
elif 11 < event <= 14:
state += observation['process'][0]*pow(2,6)
else:
pass
return state
def main():
process = []
num_s = 1
process.append(num_s)
num_ot = 5
process.append(num_ot)
num_sh_a = 3
process.append(num_sh_a)
num_sh_b = 3
process.append(num_sh_b)
num_sh_c = 3
process.append(num_sh_c)
num_m_a = 3
process.append(num_m_a)
num_m_b = 2
process.append(num_m_b)
num_p_a = 4
process.append(num_p_a)
num_p_b = 5
process.append(num_p_b)
num_p_c = 4
process.append(num_p_c)
num_ds_a = 8
process.append(num_ds_a)
num_ds_b = 8
process.append(num_ds_b)
num_ds_c = 8
process.append(num_ds_c)
case = []
for i in range(15):
case.append(1)
space = [process, case]
activities = 16
env = BusinessProcessEnv(space, activities)
state = env.current_state
flattened = env.flatten_observation_to_int(state)
print(flattened)
for value in range(env.action_space.n):
print(value)
if __name__ == "__main__":
main()