Importing dependencies

from concurrent import futures
import grpc
import portpicker
import sys
import os


from absl import flags
from absl import app
from absl import logging

os.sys.path.insert(0, os.path.abspath('../../'))
# from configs import arch_gym_configs
# from arch_gym.envs.envHelpers import helpers

import envlogger
import numpy as np
import pandas as pd

Import customenv_wrapper - it converts custom environment to deepmind envlogger environment, vizier algorithm designer

from vizier._src.algorithms.designers.random import RandomDesigner
from arch_gym.envs import customenv_wrapper
from vizier.service import clients
from vizier.service import pyvizier as vz
from vizier.service import vizier_server
from vizier.service import vizier_service_pb2_grpc

Define Flags - for taking in command line inputs

flags.DEFINE_string('workload_rs', 'stream.stl', 'Which DRAMSys workload to run?')
flags.DEFINE_integer('num_steps_rs', 100, 'Number of training steps.')
flags.DEFINE_integer('num_episodes_rs', 2, 'Number of training episodes.')
flags.DEFINE_string('traject_dir_rs', 
                    'random_search_trajectories', 
            'Directory to save the dataset.')
flags.DEFINE_bool('use_envlogger_rs', False, 'Use envlogger to log the data.')  
flags.DEFINE_string('summary_dir_rs', '.', 'Directory to save the summary.')
flags.DEFINE_string('reward_formulation_rs', 'power', 'Which reward formulation to use?')
flags.DEFINE_integer('seed', 110, 'random_search_hyperparameter')
FLAGS = flags.FLAGS

This function logs fitness history to csv file

def log_fitness_to_csv(filename, fitness_dict):
    """Logs fitness history to csv file

    Args:
        filename (str): path to the csv file
        fitness_dict (dict): dictionary containing the fitness history
    """
    df = pd.DataFrame([fitness_dict['reward']])
    csvfile = os.path.join(filename, "fitness.csv")
    df.to_csv(csvfile, index=False, header=False, mode='a')

    # append to csv
    df = pd.DataFrame([fitness_dict])
    csvfile = os.path.join(filename, "trajectory.csv")
    df.to_csv(csvfile, index=False, header=False, mode='a')

This function wraps the environment in envlogger

def wrap_in_envlogger(env, envlogger_dir):
    """Wraps the environment in envlogger

    Args:
        env (gym.Env): gym environment
        envlogger_dir (str): path to the directory where the data will be logged
    """
    metadata = {
        'agent_type': 'RandomSearch',
        'num_steps': FLAGS.num_steps_rs,
        'env_type': type(env).__name__,
    }
    if FLAGS.use_envlogger_rs:
        logging.info('Wrapping environment with EnvironmentLogger...')
        env = envlogger.EnvLogger(env,
                                  data_directory=envlogger_dir,
                                  max_episodes_per_file=1000,
                                  metadata=metadata)
        logging.info('Done wrapping environment with EnvironmentLogger.')
        return env
    else:
        return env


Main function trains the custom environment using random actions for a given number of steps and episodes

We inititalise env by calling the custom environment wrapper. And then we setup the problem statement, which contains information about the search space and the metrics to optimize.

def main(_):
    """Trains the custom environment using random actions for a given number of steps and episodes 
    """

    env = customenv_wrapper.make_custom_env(max_steps=FLAGS.num_steps_rs)
    fitness_hist = {}
    problem = vz.ProblemStatement()
    problem.search_space.select_root().add_int_param(name='num_cores', min_value = 1, max_value = 12)
    problem.search_space.select_root().add_float_param(name='freq', min_value = 0.5, max_value = 3)
    problem.search_space.select_root().add_categorical_param(name='mem_type', feasible_values =['DRAM', 'SRAM', 'Hybrid'])
    problem.search_space.select_root().add_discrete_param(name='mem_size', feasible_values=[0, 32, 64, 128, 256, 512])

    problem.metric_information.append(
        vz.MetricInformation(
            name='Reward', goal=vz.ObjectiveMetricGoal.MAXIMIZE))

The study configuration contains additional information, such as the algorithm to use and level of noise that we think the objective will have. To sweep through the hyperparameters of the algorithm, we access the algorithm through its designer.

NOTE - This part of the code is different for different algorithms.

study_config = vz.StudyConfig.from_problem(problem)
random_designer = RandomDesigner(problem.search_space, seed = FLAGS.seed)

Setting up the client

Starts a study_client, which can be either in local mode (default) or distributed mode.

Local Mode: The client has no endpoint set, and will implicitly create a local Vizier Service which will be shared across other clients in the same Python process. Studies will then be stored locally in a SQL database file located at service.VIZIER_DB_PATH.

Distributed mode: The service may be explicitly created, wrapped as a server in a separate process to accept requests from all other client processses. Details such as the database_url, port, policy_factory, etc. can be configured in the server’s initializer.

All client processes (on a single machine or over multiple machines) will connect to this server via a globally specified endpoint.

Client Parallelization

Regardless of whether the setup is local or distributed, we may simultaneously create multiple clients to work on the same study, useful for parallelizing evaluation workload.

port = portpicker.pick_unused_port()
address = f'localhost:{port}'

# Setup server.
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))

# Setup Vizier Service.
servicer = vizier_server.VizierService()
vizier_service_pb2_grpc.add_VizierServiceServicer_to_server(servicer, server)
server.add_secure_port(address, grpc.local_server_credentials())

# Start the server.
server.start()

clients.environment_variables.service_endpoint = address  # Server address.
study = clients.Study.from_study_config(
    study_config, owner='owner', study_id='example_study_id')

Define the directories for the logs to be saved.

exp_name = "_num_steps_" + str(FLAGS.num_steps_rs) + "_num_episodes_" + str(FLAGS.num_episodes_rs)

    # append logs to base path
log_path = os.path.join(FLAGS.summary_dir_rs, 'random_search_logs', FLAGS.reward_formulation_rs, exp_name)

# get the current working directory and append the exp name
traject_dir = os.path.join(FLAGS.summary_dir_rs, FLAGS.traject_dir_rs, FLAGS.reward_formulation_rs, exp_name)

# check if log_path exists else create it
if not os.path.exists(log_path):
    os.makedirs(log_path)

if FLAGS.use_envlogger_rs:
    if not os.path.exists(traject_dir):
        os.makedirs(traject_dir)
env = wrap_in_envlogger(env, traject_dir)

Obtaining suggestions

Start requesting suggestions from the server, for evaluating objectives. Suggestions can be made sequentially (count=1) or in batches (count>1).

env.reset()
    
count = 0
suggestions = random_designer.suggest(count=FLAGS.num_steps_rs)

for suggestion in suggestions:
    count += 1
    num_cores = str(suggestion.parameters['num_cores'])
    freq = str(suggestion.parameters['freq'])
    mem_type_dict = {'DRAM':0, 'SRAM':1, 'Hybrid':2}
    mem_type = str(mem_type_dict[str(suggestion.parameters['mem_type'])])
    mem_size = str(suggestion.parameters['mem_size'])
    
    action = {"num_cores":float(num_cores), "freq": float(freq), "mem_type":float(mem_type), "mem_size": float(mem_size)}
    
    print("Suggested Parameters for num_cores, freq, mem_type, mem_size are :", num_cores, freq, mem_type, mem_size)
    done, reward, info, obs = (env.step(action))
    fitness_hist['reward'] = reward
    fitness_hist['action'] = action
    fitness_hist['obs'] = obs
    if count == FLAGS.num_steps_rs:
        done = True

    log_fitness_to_csv(log_path, fitness_hist)
    print("Observation: ",obs)
    final_measurement = vz.Measurement({'Reward': reward})
    suggestion = suggestion.to_trial()
    suggestion.complete(final_measurement)
        

Calling the main function

if __name__ == '__main__':
   app.run(main)