import os
import sys
import sys
import subprocess
import time
import re
import random
import numpy as np
settings_file_path = os.path.realpath(__file__)
settings_dir_path = os.path.dirname(settings_file_path)
os.sys.path.insert(0, settings_dir_path)
os.sys.path.insert(0, settings_dir_path + '/../../')
os.sys.path.insert(0, settings_dir_path + '/../../sims/DRAM/binary/DRAMSys_Proxy_Model')
from DRAMSys_Proxy_Model import DRAMSysProxyModel
from configs.sims import DRAMSys_config
from configs.algos import rl_config
import gym
from gym.utils import seeding
from envHelpers import helpers
from loggers import write_csv
[docs]
class DRAMEnv(gym.Env):
def __init__(self,
reward_formulation = "power",
cost_model = "simulator"):
# Todo: Change the values if we normalize the observation space
self.observation_space = gym.spaces.Box(low=0, high=1e10, shape=(1,3))
self.action_space = gym.spaces.Box(low=0, high=8, shape=(10,))
self.binary_name = DRAMSys_config.binary_name
self.exe_path = DRAMSys_config.exe_path
self.sim_config = DRAMSys_config.sim_config
self.experiment_name = DRAMSys_config.experiment_name
self.logdir = DRAMSys_config.logdir
self.cost_model = cost_model
self.reward_formulation = reward_formulation
self.max_steps = 100
self.steps = 0
self.max_episode_len = 10
self.episode = 0
self.reward_cap = sys.float_info.epsilon
self.helpers = helpers()
self.reset()
[docs]
def get_observation(self,outstream):
'''
converts the std out from DRAMSys to observation of energy, power, latency
[Energy (PJ), Power (mW), Latency (ns)]
'''
obs = []
keywords = ["Total Energy", "Average Power", "Total Time"]
energy = re.findall(keywords[0],outstream)
all_lines = outstream.splitlines()
for each_idx in range(len(all_lines)):
if keywords[0] in all_lines[each_idx]:
obs.append(float(all_lines[each_idx].split(":")[1].split()[0])/1e9)
if keywords[1] in all_lines[each_idx]:
obs.append(float(all_lines[each_idx].split(":")[1].split()[0])/1e3)
if keywords[2] in all_lines[each_idx]:
obs.append(float(all_lines[each_idx].split(":")[1].split()[0])/1e9)
obs = np.asarray(obs)
print('[Environment] Observation:', obs)
if(len(obs)==0):
print(outstream)
return obs
[docs]
def obs_to_dict(self, obs):
obs_dict = {}
obs_dict["Energy"] = obs[0]
obs_dict["Power"] = obs[1]
obs_dict["Latency"] = obs[2]
return obs_dict
[docs]
def calculate_reward(self, power, latency):
target_power = DRAMSys_config.target_power
target_latency = DRAMSys_config.target_latency
print("Power:", power, "Latency:", latency, "Target Power:", target_power, "Target Latency:", target_latency)
#power_norm = max((power - target_power)/target_power, self.reward_cap)
#latency_norm = max((latency-target_latency)/target_latency, self.reward_cap)
if self.reward_formulation == "power":
power_norm = target_power/abs(power-target_power)
reward = power_norm
elif self.reward_formulation == "latency":
latency_norm = target_latency/abs((latency-target_latency))
reward = latency_norm
elif self.reward_formulation == "both":
power_norm = target_power/abs(power-target_power)
latency_norm = target_latency/abs((latency-target_latency))
reward = power_norm*latency_norm
# For RL agent, we want to maximize the reward
if(rl_config.rl_agent):
reward = 1/reward
return reward
[docs]
def runDRAMEnv(self):
'''
Method to launch the DRAM executables given an action
'''
exe_path = self.exe_path
exe_name = self.binary_name
config_name = self.sim_config
exe_final = os.path.join(exe_path,exe_name)
process = subprocess.Popen([exe_final, config_name],stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate()
if err.decode() == "":
outstream = out.decode()
else:
print(err.decode())
sys.exit()
obs = self.get_observation(outstream)
obs = obs.reshape(1,3)
return obs
[docs]
def step(self, action_dict):
'''
Step method takes action as input and outputs observation
rewards
'''
print("Action Dict",action_dict)
self.steps += 1
done = False
if self.cost_model == "simulator":
status = self.actionToConfigs(action_dict)
if(status):
obs = self.runDRAMEnv()
else:
print("Error in writing configs")
elif self.cost_model == "proxy_model":
proxy_model = DRAMSysProxyModel()
obs = proxy_model.run_proxy_model(action_dict)
reward = self.calculate_reward(obs[0][1], obs[0][2])
if(self.steps == 100):
done = True
print("Maximum steps per episodes reached!")
self.reset()
self.episode +=1
print("Episode:", self.episode, " Rewards:", reward)
return obs, reward, done, {}
[docs]
def reset(self):
#print("Reseting Environment!")
self.steps = 0
return self.observation_space.sample()
[docs]
def actionToConfigs(self,action):
'''
Converts actions output from the agent to update the configuration files.
'''
write_ok = False
if(type(action) == dict):
write_ok = self.helpers.read_modify_write_dramsys(action)
else:
action_decoded = self.helpers.action_decoder_rl(action)
write_ok = self.helpers.read_modify_write_dramsys(action_decoded)
return write_ok
# For testing
if __name__ == "__main__":
print("Hey")
dramObj = DRAMEnv(cost_model="proxy_model")
helpers = helpers()
logs = []
obs = dramObj.runDRAMEnv()