Skip to content
Snippets Groups Projects
Select Git revision
  • 99f04ac848bfbee4181b2f81a05b28cfe6854f7a
  • main default protected
  • nour2
  • aleks4
  • geno2
  • petri-net-output
  • nour
  • deep-rl-1
  • geno3
  • paula
  • aleks2
  • aleks3
  • Nour
  • geno
  • aleks
15 results

environment.py

Blame
  • Aleksandra's avatar
    Aleksandra Dimitrova authored
    99f04ac8
    History
    environment.py 5.98 KiB
    import gymnasium as gym
    import numpy as np
    import simpy
    import simplesimmodel as model
    
    """
    Environment for the RL agent
    """
    
    class BusinessProcessEnv(gym.Env):
    
        def __init__(self, space, activities):
            self.ressources = space[0]
            self.case = space[1]
            self.activities = activities
            
            self.observation_space = gym.spaces.Dict(
                {
                    'process': gym.spaces.MultiDiscrete(self.ressources),
                    'case': gym.spaces.MultiDiscrete(self.case),
                    'event': gym.spaces.Discrete(self.activities)
                }
            )
    
            self.action_space = gym.spaces.Discrete(self.activities)
    
            self.current_state = {
                'process': np.array(self.ressources),
                'case': np.zeros(len(self.case), dtype=int),
                'event': 0
            }
    
            self.model_env = simpy.Environment()
            self.process = model.BusinessProcess(self.model_env, self.ressources)
            self.model_env.process(model.run_process(self.model_env, self.process))
            # self.done_cases = set([])
    
            self.reward = 0
    
    
        def get_current_state(self, caseid):
            process, case, event = model.get_current_state(self.process, caseid)
            state = {
                'process': process,
                'case': case,
                'event': event
            }
            return state
    
        def step(self, action):
            
            self.process.next = action
    
            if self.process.case_id in self.process.done_cases:
                self.process.case_id = np.random.choice(self.process.active_cases)
            
            case_obj = self.process.case_objects[self.process.case_id]
            self.current_state = self.get_current_state(case_obj)
    
            # print(f"{self.current_state['event']}")
    
            start = self.process.env.now
    
            self.process.flag = True
    
            if self.process.is_valid(self.current_state['event'], action, case_obj):
                
                while(self.process.flag):
                    self.model_env.step()
    
                stop = self.process.env.now
    
                case_obj = self.process.case_objects[self.process.case_id]
    
                # print(f"Agent did case {self.process.case_id} activity {action}.")
    
                next_state = self.get_current_state(case_obj)
                self.current_state = next_state
                next_state = self.flatten_observation_to_int(next_state)
    
                time = stop - start
                reward = 10000 - time
                self.reward += reward
                done = True if (len(self.process.done_cases) == 5 or len(self.process.active_cases) == 0) else False
                return next_state, self.reward, done, None
            
            else: 
                self.reward += 0
                next_state = self.flatten_observation_to_int(self.current_state)
                done = False
                return next_state, self.reward, done, None
        
             
        def reset(self, seed=None, options=None):
            # Reset the environment to the initial state
            # Implement a function which extracts the current state from an event log / simulation model
            super().reset(seed=seed)
    
            self.current_state = {
                'process': np.array(self.ressources),
                'case': np.zeros(len(self.case), dtype=int),
                'event': 0
            }
            self.current_step = 0
    
            self.model_env = simpy.Environment()
            self.process = model.BusinessProcess(self.model_env, self.ressources)
            self.model_env.process(model.run_process(self.model_env, self.process))
            self.process.done_cases = set([])
    
            self.reward = 0
    
            return self.current_state, None
            
    
        def render(self, mode='human'):
            # Render the current state of the environment
            pass
    
        
        def flatten_observation(self, observation):
            flattened = []
            for i in observation['process']: flattened.append(i)
            for j in observation['case']: flattened.append(j)
            flattened.append(observation['event'])
    
            return flattened
    
        def flatten_observation_to_int(self, observation):
            state = 0
            state += observation['event']*pow(2,10)
            state += observation['case'][1]*pow(2,2)
            state += observation['case'][2]*pow(2,2)
            event = observation['event']
            if event == 0:
                state += observation['process'][0]*pow(2,6)
            elif event == 1:
                state += observation['process'][1]*pow(2,6)
            elif 1 < event <=3:
                state += observation['process'][2]*pow(2,6)+observation['process'][3]*pow(2,7)+observation['process'][4]*pow(2,8)
            elif 3 < event <=6:
                state += observation['process'][5]*pow(2,6)+observation['process'][6]*pow(2,7)
            elif 6 < event <= 8:
                state += observation['process'][7]*pow(2,6)+observation['process'][8]*pow(2,7)+observation['process'][9]*pow(2,8)
            elif 8 < event <= 11:
                state += observation['process'][10]*pow(2,6)+observation['process'][11]*pow(2,7)+observation['process'][12]*pow(2,8)
            elif 11 < event <= 14:
                state += observation['process'][0]*pow(2,6)
            else:
                pass
            
            return state
    
    def main():
        process = [] 
        num_s = 1
        process.append(num_s)
        num_ot = 5
        process.append(num_ot)
        num_sh_a = 3
        process.append(num_sh_a)
        num_sh_b = 3
        process.append(num_sh_b)
        num_sh_c = 3
        process.append(num_sh_c)
        num_m_a = 3
        process.append(num_m_a)
        num_m_b = 2
        process.append(num_m_b)
        num_p_a = 4
        process.append(num_p_a)
        num_p_b = 5
        process.append(num_p_b)
        num_p_c = 4
        process.append(num_p_c)
        num_ds_a = 8
        process.append(num_ds_a)
        num_ds_b = 8
        process.append(num_ds_b)
        num_ds_c = 8
        process.append(num_ds_c)
    
        case = []
        for i in range(15):
            case.append(1)
            
        space = [process, case]
        activities = 16
    
        env = BusinessProcessEnv(space, activities)
        state = env.current_state
        flattened = env.flatten_observation_to_int(state)
        print(flattened)
        for value in range(env.action_space.n):
            print(value)
    
    
    if __name__ == "__main__":
        main()