Source code for HMB.AgentsHelper

import numpy as np


[docs] class RandomAgent(object): r''' Random policy baseline that always samples from a provided sampler. Useful as a non-learning baseline or to perform pure exploration. '''
[docs] def __init__(self, ActionSpaceSampleFunc): r''' Initialize RandomAgent. Parameters: ActionSpaceSampleFunc (callable): Callable returning a random action index when called. ''' # Store the action sampler for later use. self.ActionSpaceSampleFunc = ActionSpaceSampleFunc
[docs] def ChooseAction(self, state=None): r''' Return a randomly sampled action. Parameters: state: Ignored by RandomAgent but kept for API compatibility. Returns: int: Random action index returned by ActionSpaceSampleFunc(). ''' # Delegate to the provided sampler and return the result. return self.ActionSpaceSampleFunc()
[docs] class QAgent(object): r''' Base tabular Q-agent with epsilon-greedy action selection. This base class stores a Q-table and implements a simple epsilon-greedy action selection policy. Subclasses should implement UpdateParameters to perform learning updates (Q-learning, SARSA, Expected SARSA, ...). Parameters: ActionSpaceSampleFunc (callable): Callable returning a sampled action index from the environment's action space; used for exploration. alpha (float): Learning rate (0 < alpha <= 1). gamma (float): Discount factor for future rewards (0 <= gamma <= 1). noOfStates (int): Number of discrete states (size of first Q dimension). noOfActions (int): Number of discrete actions (size of second Q dimension). epsilon (float, optional): Exploration probability for epsilon-greedy policy (default 0.1). Attributes: qTable (numpy.ndarray): Array of shape (noOfStates, noOfActions) storing Q-values. alpha, gamma, epsilon (float): Learning and policy hyperparameters. ActionSpaceSampleFunc (callable): Stored reference to the sampler. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.1): r''' Initialize the QAgent and allocate the Q-table. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Exploration probability for epsilon-greedy policy. ''' # Store the action sampler function for exploration. self.ActionSpaceSampleFunc = ActionSpaceSampleFunc # Store the learning rate. self.alpha = alpha # Store the discount factor. self.gamma = gamma # Store the exploration probability. self.epsilon = epsilon # Store the number of available actions. self.noOfActions = noOfActions # Store the number of possible states. self.noOfStates = noOfStates # Allocate the Q-table initialized to zeros. self.qTable = np.zeros([noOfStates, noOfActions])
[docs] def ChooseAction(self, state): r''' Select an action using an epsilon-greedy strategy. Parameters: state (int): Current discrete state index. Returns: int: Chosen action index. If exploring, result of ActionSpaceSampleFunc; otherwise the greedy action (argmax over Q-values). ''' # Draw a uniform random number to decide exploration vs. exploitation. if (np.random.uniform(0, 1) < self.epsilon): # When exploring, sample a random action from the provided sampler. action = self.ActionSpaceSampleFunc() else: # When exploiting, choose the greedy action from the Q-table. action = self.GetAction(state) # Return the selected action. return action
[docs] def GetAction(self, state): r''' Return the greedy action for the given state using the Q-table. Parameters: state (int): State index to query. Returns: int: Index of the action with maximal Q-value for the state. Ties are resolved by numpy.argmax (first occurrence). ''' # Clip out-of-range state index to last valid. if ((state < 0) or (state >= self.qTable.shape[0])): state = min(max(state, 0), self.qTable.shape[0] - 1) return np.argmax(self.qTable[state, :])
[docs] class QLearningAgent(QAgent): r''' Tabular Q-learning agent implementing the standard off-policy update. Implements the classical Q-learning update which bootstraps using the maximum action-value in the next state. '''
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction): r''' Perform a Q-learning update for a single transition. Parameters: state (int): Previous state index. nextState (int): Next state index after taking the action. reward (float): Observed reward for the transition. action (int): Action index taken in ``state``. nextAction (int or None): Present for API parity with on-policy agents but unused by Q-learning. ''' # Read the old Q-value for the state-action pair. oldValue = self.qTable[state, action] # Compute the target using the reward and the max Q in the next state. target = reward + self.gamma * np.max(self.qTable[nextState, :]) # Apply the learning update rule. newValue = oldValue + self.alpha * (target - oldValue) # Write the updated Q-value back into the table. self.qTable[state, action] = newValue
[docs] class SARSAAgent(QAgent): r''' On-policy SARSA agent that updates using the Q-value of the next taken action. Uses the (state, action, reward, nextState, nextAction) tuple to form the TD target for on-policy updates. '''
[docs] def UpdateParameters(self, prevState, nextState, reward, prevAction, nextAction): r''' Perform a SARSA update for the observed transition. Parameters: prevState (int): The previous state index. nextState (int): The subsequent state index. reward (float): Observed reward for the transition. prevAction (int): Action taken in ``prevState``. nextAction (int): Action taken in ``nextState`` (on-policy). ''' # Read the old Q-value for the previous state-action pair. oldValue = self.qTable[prevState, prevAction] # Compute the SARSA target using the Q-value of the next (state, action) pair. target = reward + self.gamma * self.qTable[nextState, nextAction] # Apply the update step. newValue = oldValue + self.alpha * (target - oldValue) # Store the updated Q-value. self.qTable[prevState, prevAction] = newValue
[docs] class ExpectedSARSAAgent(QAgent): r''' Expected SARSA agent that uses the expectation under the epsilon-greedy policy as the bootstrap target. The expected value is computed exactly by accounting for the epsilon mass that is shared between greedy and non-greedy actions and handling ties. '''
[docs] def UpdateParameters(self, prevState, nextState, reward, prevAction, nextAction): r''' Perform an Expected SARSA update using the expectation over the policy in the next state. Parameters: prevState (int): Previous state index. nextState (int): Next state index. reward (float): Observed reward. prevAction (int): Action taken in previous state. nextAction (int or None): Present for API parity but unused here. ''' # Read the current Q-value for the previous state and action. oldValue = self.qTable[prevState, prevAction] # Initialize accumulator for expected Q-value in nextState. expectedQ = 0 # Find the maximum Q-value in the next state. qMax = np.max(self.qTable[nextState, :]) # Count how many actions achieve the maximal Q-value (tie handling). greedyActions = 0 for i in range(self.noOfActions): # Count greedy actions when their Q equals the maximum. if (self.qTable[nextState, i] == qMax): greedyActions += 1 # Probability assigned to any non-greedy action under epsilon-greedy policy. nonGreedyActionProbability = self.epsilon / float(self.noOfActions) # Probability for each greedy action is the remaining mass divided by number of greedy actions, # plus the non-greedy probability that every action keeps. greedyActionProbability = ((1.0 - self.epsilon) / greedyActions) + nonGreedyActionProbability # Accumulate the expected value under the epsilon-greedy policy. for i in range(self.noOfActions): if (self.qTable[nextState, i] == qMax): expectedQ += self.qTable[nextState, i] * greedyActionProbability else: expectedQ += self.qTable[nextState, i] * nonGreedyActionProbability # Compute the expected SARSA target using expected Q of next state. target = reward + self.gamma * expectedQ # Apply the learning update. newValue = oldValue + self.alpha * (target - oldValue) # Write the updated value back to the Q-table. self.qTable[prevState, prevAction] = newValue
[docs] class GreedyAgent(QAgent): r''' Deterministic greedy policy that always picks the current best action. This convenience subclass of QAgent enforces epsilon=0 for deterministic action selection. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions): r''' Initialize GreedyAgent by calling QAgent with epsilon forced to 0. Parameters: See QAgent.__init__ for parameter descriptions; epsilon is forced to 0. ''' # Initialize parent QAgent with epsilon disabled for pure exploitation. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.0)
[docs] def ChooseAction(self, state): r''' Choose the greedy action deterministically. Parameters: state (int): Current state index. Returns: int: Index of the greedy action according to the stored Q-table. ''' # Use QAgent's GetAction to return the greedy choice. return self.GetAction(state)
[docs] class SoftmaxPolicyAgent(QAgent): r''' Softmax (Boltzmann) policy over Q-values for stochastic action selection. The temperature parameter controls exploration: lower values concentrate probability mass on higher-valued actions, higher values approach a uniform distribution. An optional small epsilon mixes the softmax probabilities with a uniform distribution. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, temperature=1.0, epsilon=0.0): r''' Initialize the SoftmaxPolicyAgent. Parameters: temperature (float): Softmax temperature (must be > 0). Lower favors greedy actions. epsilon (float, optional): Small probability to mix the softmax distribution with a uniform distribution (default 0.0). ''' # Initialize base QAgent, use epsilon as an optional uniform-mix parameter. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) # Store the softmax temperature. self.temperature = max(1e-8, float(temperature))
def _softmax(self, logits): r''' Numerically-stable softmax that returns a probability vector. Parameters: logits (array-like): Values to convert to probabilities. Returns: numpy.ndarray: Probability vector summing to 1. ''' # Shift logits by their max for numerical stability. shifted = logits - np.max(logits) # Exponentiate scaled logits. exp = np.exp(shifted / self.temperature) # Normalize to get probabilities. probs = exp / np.sum(exp) return probs
[docs] def ChooseAction(self, state): r''' Sample an action according to the softmax policy (optionally mixed with epsilon uniform). Parameters: state (int): Current state index. Returns: int: Sampled action index. ''' # Read Q-values for the state. qvals = self.qTable[state, :] # Compute softmax probabilities. probs = self._softmax(qvals) # If small epsilon mixing is enabled, mix with uniform distribution. if (self.epsilon > 0.0): # Create uniform probabilities. uniform = np.ones_like(probs) / float(len(probs)) # Mix the distributions. probs = (1.0 - self.epsilon) * probs + self.epsilon * uniform # Sample from the probability vector using numpy choice. action = np.random.choice(len(probs), p=probs) # Return the sampled action. return int(action)
[docs] class DoubleQLearningAgent(QAgent): r''' Double Q-learning agent implementing two Q-tables to reduce overestimation. On each update a random coin flip decides which table is updated; the other table is used to evaluate the chosen action (the Double Q-learning scheme described by Van Hasselt et al.). '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.1): r''' Initialize double-Q tables and parameters. Parameters: See QAgent.__init__ for parameter descriptions; an additional second Q-table is allocated internally. ''' # Initialize the base single QAgent with epsilon for action selection. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) # Allocate the second Q-table initialized to zeros. self.qTable2 = np.zeros_like(self.qTable)
[docs] def ChooseAction(self, state): r''' Choose action using the sum of both Q-tables for improved selection. Parameters: state (int): Current state index. Returns: int: Action index chosen either randomly (with probability epsilon) or as the argmax of qTable + qTable2. ''' # Use the sum of both tables as an estimate for action values. combined = self.qTable[state, :] + self.qTable2[state, :] # With probability epsilon choose a random action. if (np.random.uniform(0, 1) < self.epsilon): # Exploration: use provided sampler. return self.ActionSpaceSampleFunc() # Exploitation: choose argmax of combined Q-values. return int(np.argmax(combined))
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction=None): r''' Perform Double Q-learning update by randomly choosing which table to update. If the chosen table is A then the argmax is computed on A and evaluated using B (and vice-versa). This reduces the maximization bias present in standard Q-learning. Parameters: state (int): Current state index. nextState (int): Next state index. reward (float): Observed reward. action (int): Action taken in ``state``. nextAction (int or None): Present for API parity; unused here. ''' # Randomly choose which table to update this step. if (np.random.rand() < 0.5): # Update primary table qTable using qTable2 for evaluation. old = self.qTable[state, action] # Select greedy action according to qTable. aStar = int(np.argmax(self.qTable[nextState, :])) # Evaluate selected action using qTable2. target = reward + self.gamma * self.qTable2[nextState, aStar] # Apply learning rule. self.qTable[state, action] = old + self.alpha * (target - old) else: # Update qTable2 using qTable for evaluation. old = self.qTable2[state, action] aStar = int(np.argmax(self.qTable2[nextState, :])) target = reward + self.gamma * self.qTable[nextState, aStar] self.qTable2[state, action] = old + self.alpha * (target - old)
[docs] class QLambdaAgent(QAgent): r''' Q(\u03BB) agent with accumulating eligibility traces (tabular). Implements off-policy Q(\u03BB) with accumulating traces. The agent maintains an eligibility matrix of the same shape as the Q-table and updates all Q-values proportionally to their eligibility on each step. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, lambd=0.9, epsilon=0.1): r''' Initialize QLambdaAgent with eligibility traces. Parameters: lambd (float): Eligibility trace decay parameter (\u03BB), typically in [0, 1]. ''' # Initialize base QAgent. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) # Store lambda parameter. self.lambd = float(lambd) # Allocate eligibility traces initialized to zeros. self.eligibility = np.zeros_like(self.qTable)
[docs] def ResetTraces(self): r''' Reset eligibility traces to zero. Call this at the start of each episode if episodic resets are used. ''' # Zero the eligibility trace matrix. self.eligibility.fill(0.0)
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction): r''' Perform Q(\u03BB) update with accumulating traces for a single transition. This implementation uses the off-policy max over next-state actions when forming the TD target (i.e., Q-based bootstrap). Parameters: state (int): Current state index. nextState (int): Next state index after taking action. reward (float): Observed reward. action (int): Action taken in ``state``. nextAction (int or None): Present for API parity but unused for the off-policy Q(\u03BB) update. ''' # Read current Q-value for the (state, action) pair. qSa = self.qTable[state, action] # Compute TD target using max over next state's actions (off-policy Q(\u03BB)). tdTarget = reward + self.gamma * np.max(self.qTable[nextState, :]) # Compute TD error. delta = tdTarget - qSa # Increment eligibility for the active state-action pair. self.eligibility[state, action] += 1.0 # Update all Q-values proportionally to their eligibility. self.qTable += self.alpha * delta * self.eligibility # Decay eligibility traces by gamma * lambda. self.eligibility *= (self.gamma * self.lambd)
[docs] class SARSALambdaAgent(QAgent): r''' On-policy SARSA(\u03BB) agent with eligibility traces. This agent implements accumulating or replacing eligibility traces for SARSA-style on-policy learning. Call ResetTraces() at episode start. Parameters: ActionSpaceSampleFunc (callable): Action sampler used for exploration. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. lambd (float, optional): Trace-decay parameter (default 0.9). epsilon (float, optional): Epsilon for epsilon-greedy policy (default 0.1). trace_type (str, optional): "accumulating" or "replacing" (default "accumulating"). '''
[docs] def __init__( self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, lambd=0.9, epsilon=0.1, traceType="accumulating" ): r''' Initialize the SARSALambdaAgent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. lambd (float, optional): Trace-decay parameter for eligibility traces. epsilon (float, optional): Exploration probability for epsilon-greedy policy. traceType (str, optional): Type of eligibility traces ("accumulating" or "replacing"). ''' # Initialize parent QAgent. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) self.lambd = float(lambd) self.traceType = traceType self.eligibility = np.zeros_like(self.qTable)
[docs] def ResetTraces(self): r''' Zero the eligibility traces (call at episode start). ''' # Zero the eligibility trace matrix. self.eligibility.fill(0.0)
[docs] def UpdateParameters(self, prevState, nextState, reward, prevAction, nextAction): r''' Perform SARSA(\u03BB) update using the on-policy TD error. Parameters follow the SARSA convention; nextAction is used for the on-policy bootstrap. ''' # TD error using Q(prevState, prevAction) and Q(nextState, nextAction) old = self.qTable[prevState, prevAction] target = reward + self.gamma * self.qTable[nextState, nextAction] delta = target - old # Update eligibility traces if (self.traceType == "replacing"): self.eligibility[prevState, prevAction] = 1.0 else: self.eligibility[prevState, prevAction] += 1.0 # Update Q-values proportionally to eligibility self.qTable += self.alpha * delta * self.eligibility # Decay traces self.eligibility *= (self.gamma * self.lambd)
[docs] class MonteCarloAgent(object): r''' First-visit Monte Carlo control (episodic) with incremental averaging. Stores an episode buffer and updates Q at episode end using returns. Parameters: ActionSpaceSampleFunc (callable): Action sampler for exploration. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Epsilon for epsilon-greedy policy (default 0.1). useFirstVisit (bool, optional): If True use first-visit MC, otherwise every-visit. '''
[docs] def __init__(self, ActionSpaceSampleFunc, gamma, noOfStates, noOfActions, epsilon=0.1, useFirstVisit=True): r''' Initialize the MonteCarloAgent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Exploration probability for epsilon-greedy policy. useFirstVisit (bool, optional): If True use first-visit MC, otherwise every-visit. ''' # Store the action sampler function for exploration. self.ActionSpaceSampleFunc = ActionSpaceSampleFunc # Store the discount factor. self.gamma = gamma # Store the number of available actions. self.noOfActions = noOfActions # Store the number of possible states. self.noOfStates = noOfStates # Store the exploration probability. self.epsilon = epsilon # Store the first-visit flag. self.useFirstVisit = useFirstVisit # Q-table and counts for incremental averaging self.qTable = np.zeros([noOfStates, noOfActions]) self.counts = np.zeros([noOfStates, noOfActions], dtype=np.int64) # Episode memory self.episode = [] # list of (state, action, reward)
[docs] def ChooseAction(self, state): r''' Epsilon-greedy action selection based on current Q-table. Parameters: state (int): Current discrete state index. Returns: int: Chosen action index. If exploring, result of ActionSpaceSampleFunc; otherwise the greedy action (argmax over Q-values). ''' # Draw a uniform random number to decide exploration vs. exploitation. if (np.random.uniform() < self.epsilon): # When exploring, sample a random action from the provided sampler. action = self.ActionSpaceSampleFunc() else: # When exploiting, choose the greedy action from the Q-table. action = int(np.argmax(self.qTable[state, :])) # Return the selected action. return action
[docs] def StoreTransition(self, state, action, reward): r''' Append a transition to the current episode buffer. Parameters: state (int): State index. action (int): Action index. reward (float): Reward value. ''' # Append the (state, action, reward) tuple to the episode list. self.episode.append((state, action, reward))
[docs] def EndEpisodeAndUpdate(self): r''' Process stored episode and perform first-visit (or every-visit) Monte Carlo updates, then clear the episode buffer. ''' G = 0.0 visited = set() # iterate backwards. for t in reversed(range(len(self.episode))): s, a, r = self.episode[t] G = self.gamma * G + r if (self.useFirstVisit): if ((s, a) in visited): continue visited.add((s, a)) # incremental average. self.counts[s, a] += 1 n = self.counts[s, a] self.qTable[s, a] += (G - self.qTable[s, a]) / float(n) # clear episode. self.episode = []
[docs] class DynaQAgent(QAgent): r''' Dyna-Q agent: model-based planning with a simple tabular model. Learns a one-step model (last observed next state and reward per (s,a)) and performs planning updates by sampling previously observed pairs. Parameters: ActionSpaceSampleFunc (callable): Action sampler for exploration. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Epsilon for epsilon-greedy policy (default 0.1). planningSteps (int, optional): Number of model-based planning updates per real step. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.1, planningSteps=5): r''' Initialize the DynaQAgent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Exploration probability for epsilon-greedy policy. planningSteps (int, optional): Number of planning steps to perform. ''' # Initialize the base QAgent. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) # Store the number of planning steps. self.planningSteps = int(planningSteps) # Simple model: store last seen reward and nextState for each (s,a) self.modelReward = np.zeros_like(self.qTable) self.modelNext = np.zeros_like(self.qTable, dtype=int) self.observed = np.zeros_like(self.qTable, dtype=bool)
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction=None): r''' Perform a real experience Q-learning update, update the model, and run planningSteps simulated updates sampled from observed (s,a) pairs. Parameters: state (int): Current state index. nextState (int): Next state index. reward (float): Observed reward. action (int): Action taken in ``state``. nextAction (int or None): Present for API parity; unused here. ''' # Real experience Q-learning update old = self.qTable[state, action] target = reward + self.gamma * np.max(self.qTable[nextState, :]) self.qTable[state, action] = old + self.alpha * (target - old) # Update model (simple last-observed model) self.modelReward[state, action] = reward self.modelNext[state, action] = int(nextState) self.observed[state, action] = True # Planning: sample previously observed (s,a) uniformly seenIndices = np.argwhere(self.observed) if (seenIndices.size == 0): return for _ in range(self.planningSteps): idx = np.random.randint(len(seenIndices)) sP, aP = seenIndices[idx] rP = float(self.modelReward[sP, aP]) sPNext = int(self.modelNext[sP, aP]) oldP = self.qTable[sP, aP] targetP = rP + self.gamma * np.max(self.qTable[sPNext, :]) self.qTable[sP, aP] = oldP + self.alpha * (targetP - oldP)
[docs] class UCB1Agent(object): r''' UCB1 multi-armed bandit agent (no states). Uses the UCB1 formula to select actions in pure bandit problems. '''
[docs] def __init__(self, ActionSpaceSampleFunc, noOfActions, c=1.0): r''' Initialize the UCB1Agent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. noOfActions (int): Number of available actions (arms). c (float, optional): Exploration parameter for UCB1 (default 1.0). ''' # Store the action sampler function for exploration. self.ActionSpaceSampleFunc = ActionSpaceSampleFunc # Store the number of available actions. self.noOfActions = noOfActions # Store the exploration parameter. self.c = float(c) # Initialize counts and values for each action. self.counts = np.zeros(noOfActions, dtype=np.int64) self.values = np.zeros(noOfActions, dtype=float)
[docs] def ChooseAction(self, state=None): r''' Choose an action using the UCB1 rule. Parameters: state: Ignored by UCB1Agent but kept for API compatibility. Returns: int: Action index selected by the UCB1 algorithm. ''' total = np.sum(self.counts) # choose any untried action first for a in range(self.noOfActions): if (self.counts[a] == 0): return a # compute ucb values ucb = self.values + self.c * np.sqrt(np.log(total) / (self.counts + 1e-12)) return int(np.argmax(ucb))
[docs] def UpdateParameters(self, action, reward): r''' Update running average for the chosen arm. Parameters: action (int): Chosen action index. reward (float): Observed reward. ''' # Increment the count for the selected action. self.counts[action] += 1 n = self.counts[action] # Update the value estimate for the selected action using incremental average. self.values[action] += (reward - self.values[action]) / float(n)
[docs] class CountBonusQLAgent(QAgent): r''' Q-learning augmented with a count-based intrinsic bonus to encourage exploration. The bonus can be computed per-state or per state-action pair. '''
[docs] def __init__( self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.1, bonus_coef=1.0, countType="state" ): r''' Initialize the CountBonusQLAgent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Exploration probability for epsilon-greedy policy. bonus_coef (float, optional): Coefficient for the exploration bonus. countType (str, optional): Type of count-based bonus ("state" or "state_action"). ''' # Initialize the base QAgent. super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) # Store the bonus coefficient and count type. self.bonus_coef = float(bonus_coef) self.countType = countType # Initialize visitation counts. if (countType == "state"): self.counts = np.zeros(noOfStates, dtype=np.int64) else: self.counts = np.zeros([noOfStates, noOfActions], dtype=np.int64)
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction=None): r''' Perform Q-learning update with an added exploration bonus computed from visitation counts. Parameters: state (int): Current state index. nextState (int): Next state index. reward (float): Observed reward. action (int): Action taken in ``state``. nextAction (int or None): Present for API parity; unused here. ''' # increment counts for the nextState or (state,action) if (self.countType == "state"): self.counts[nextState] += 1 cnt = self.counts[nextState] else: self.counts[state, action] += 1 cnt = self.counts[state, action] # Compute the exploration bonus based on visitation count. bonus = self.bonus_coef / np.sqrt(float(cnt)) # Augment the reward with the exploration bonus. augReward = reward + bonus # Perform the standard Q-learning update with the augmented reward. old = self.qTable[state, action] target = augReward + self.gamma * np.max(self.qTable[nextState, :]) self.qTable[state, action] = old + self.alpha * (target - old)
[docs] class NStepTDAgent(object): r''' n-step on-policy TD agent (n-step SARSA style). Maintains a buffer of recent transitions and performs n-step updates. '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, n=3, epsilon=0.1): r''' Initialize the NStepTDAgent. Parameters: ActionSpaceSampleFunc (callable): Function to sample random actions. alpha (float): Learning rate. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. n (int, optional): Number of steps for n-step TD updates (default 3). epsilon (float, optional): Exploration probability for epsilon-greedy policy. ''' # Store the action sampler function for exploration. self.ActionSpaceSampleFunc = ActionSpaceSampleFunc # Store the learning rate. self.alpha = alpha # Store the discount factor. self.gamma = gamma # Store the number of available actions. self.noOfActions = noOfActions # Store the number of possible states. self.noOfStates = noOfStates # Store the n-step value. self.n = int(max(1, n)) # Store the exploration probability. self.epsilon = epsilon # Allocate the Q-table initialized to zeros. self.qTable = np.zeros([noOfStates, noOfActions]) # buffers self.states = [] self.actions = [] self.rewards = []
[docs] def ChooseAction(self, state): r''' Select an action using an epsilon-greedy strategy. Parameters: state (int): Current discrete state index. Returns: int: Chosen action index. If exploring, result of ActionSpaceSampleFunc; otherwise the greedy action (argmax over Q-values). ''' # Draw a uniform random number to decide exploration vs. exploitation. if (np.random.uniform() < self.epsilon): # When exploring, sample a random action from the provided sampler. action = self.ActionSpaceSampleFunc() else: # When exploiting, choose the greedy action from the Q-table. action = int(np.argmax(self.qTable[state, :])) # Return the selected action. return action
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction, done=False): r''' Step the agent with a (state, action, reward) sample and perform any ready n-step updates. If done==True, flush remaining updates. Parameters: state (int): Current state index. nextState (int): Next state index. reward (float): Observed reward. action (int): Action taken in ``state``. nextAction (int): Action taken in ``nextState`` (on-policy). done (bool, optional): Flag indicating episode termination (default False). ''' # Append transition. self.states.append(state) self.actions.append(action) self.rewards.append(reward) # Perform updates while we have at least n rewards or if episode ended. while len(self.rewards) >= self.n or (done and len(self.rewards) > 0): # Compute n-step return for the oldest stored transition. G = 0.0 for i in range(self.n): if (i < len(self.rewards)): G += (self.gamma ** i) * self.rewards[i] else: break # Bootstrap term. if (len(self.rewards) >= self.n and not (done and len(self.rewards) == self.n and nextState is None)): # if we have a nextState/action to bootstrap from. G += (self.gamma ** self.n) * self.qTable[nextState, nextAction] # Pop the oldest transition from the buffers. s0 = self.states.pop(0) a0 = self.actions.pop(0) self.rewards.pop(0) # Perform the Q-value update for the oldest transition. old = self.qTable[s0, a0] self.qTable[s0, a0] = old + self.alpha * (G - old) # If episode ended, clear buffers. if (done): self.states = [] self.actions = [] self.rewards = []
[docs] class PrioritizedSweepingAgent(QAgent): r''' Prioritized Sweeping agent (model-based planning with prioritized updates). This agent maintains a one-step model (reward and next-state for each observed (s,a)) and a predecessor map for states. When a real transition is observed it computes a priority for the state and uses a priority queue to selectively perform planning updates on predecessor state-action pairs with largest expected TD error first. This is useful for sample-efficient planning in small tabular MDPs. Parameters: ActionSpaceSampleFunc (callable): Action sampler used for exploration. alpha (float): Learning rate for Q-value updates. gamma (float): Discount factor. noOfStates (int): Number of discrete states. noOfActions (int): Number of discrete actions. epsilon (float, optional): Epsilon for epsilon-greedy action selection (default 0.1). planningSteps (int, optional): Number of prioritized-planning iterations to perform per real update (default 5). theta (float, optional): Priority threshold; only priorities greater than theta are pushed (default 1e-4). '''
[docs] def __init__(self, ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=0.1, planningSteps=5, theta=1e-4): r''' Initialize PrioritizedSweepingAgent and allocate model/priorities. ''' super().__init__(ActionSpaceSampleFunc, alpha, gamma, noOfStates, noOfActions, epsilon=epsilon) import heapq self._heapq = heapq self.planningSteps = int(planningSteps) self.theta = float(theta) # Model tables. self.modelReward = np.zeros_like(self.qTable) self.modelNext = np.zeros_like(self.qTable, dtype=int) self.observed = np.zeros_like(self.qTable, dtype=bool) # Predecessors: mapping state -> set of (s_prev, a_prev). self.predecessors = {s: set() for s in range(self.noOfStates)} # Priority queue storing tuples (-priority, state) to simulate max-heap. self.pq = []
def _push_state(self, state, priority): # Only push if priority exceeds threshold. if (priority > self.theta): # Store negative priority for max-heap behavior. self._heapq.heappush(self.pq, (-float(priority), int(state))) def _pop_state(self): if (not self.pq): return None pr, s = self._heapq.heappop(self.pq) return int(s), -float(pr)
[docs] def ResetModel(self): r''' Clear the learned model and predecessor information. ''' self.modelReward.fill(0.0) self.modelNext.fill(0) self.observed.fill(False) self.predecessors = {s: set() for s in range(self.noOfStates)} self.pq = []
[docs] def ChooseAction(self, state): r''' Choose an action using epsilon-greedy based on current Q-table. ''' return super().ChooseAction(state)
[docs] def UpdateParameters(self, state, nextState, reward, action, nextAction=None): r''' Process a real experience (state, action, reward, nextState): - Update Q with a standard Q-learning step (real experience). - Update internal one-step model and predecessors. - Compute priority for the experienced state and push it if large. - Run up to planningSteps prioritized planning updates. Parameters: state (int): Current state index. nextState (int): Next state index observed. reward (float): Observed reward. action (int): Action taken in ``state``. ''' # Real Q-learning update. old = self.qTable[state, action] target = reward + self.gamma * np.max(self.qTable[nextState, :]) self.qTable[state, action] = old + self.alpha * (target - old) # Update model: store last seen reward/next for this (s,a). self.modelReward[state, action] = reward self.modelNext[state, action] = int(nextState) just_new = not self.observed[state, action] self.observed[state, action] = True # Update predecessors map: predecessor of nextState includes (state, action). self.predecessors[int(nextState)].add((int(state), int(action))) # Compute priority for the state-action pair and push state. priority = abs(target - old) self._push_state(state, priority) # Planning loop: pop highest-priority state and update predecessors. for _ in range(self.planningSteps): popped = self._pop_state() if (popped is None): break sPopped, pVal = popped # For each predecessor (sPrev, aPrev) of s_popped, perform update. for (sPrev, aPrev) in list(self.predecessors.get(sPopped, [])): rP = float(self.modelReward[sPrev, aPrev]) sPNext = int(self.modelNext[sPrev, aPrev]) oldPrev = self.qTable[sPrev, aPrev] targetPrev = rP + self.gamma * np.max(self.qTable[sPNext, :]) # Q update for predecessor self.qTable[sPrev, aPrev] = oldPrev + self.alpha * (targetPrev - oldPrev) # Compute priority for predecessor and push if large. pr = abs(targetPrev - oldPrev) self._push_state(sPrev, pr)
# Self-checking test harness if __name__ == "__main__": ''' Run deterministic tests for all agents and compare to expected values. The tests use a tiny deterministic scenario (3 states x 2 actions) and perform the same sequence of calls used previously. Expected results are computed analytically and compared with a small numeric tolerance. The harness prints observed vs expected values and an overall PASS/FAIL summary. ''' import math np.random.seed(0) # Make sampling deterministic for reproducibility. print("Running deterministic agent tests...") # Environment and sampler. noS = 3 noA = 2 def sampler(): # Deterministic pseudo-random sampler (depends on np.random seed). return int(np.random.randint(0, noA)) results = {} # 1) SarsaLambdaAgent: single SARSA(lambda) update. sarsaL = SARSALambdaAgent(sampler, 0.5, 0.9, noS, noA, lambd=0.8, epsilon=0.2) results["SARSALambda_Q_sum"] = float(np.sum(sarsaL.qTable)) sarsaL.UpdateParameters(prevState=0, nextState=1, reward=1.0, prevAction=0, nextAction=1) results["SARSALambda_Q_sum"] = float(np.sum(sarsaL.qTable)) # 2) MonteCarloAgent: two-step episode (0,0,1.0), (1,1,2.0). mc = MonteCarloAgent(sampler, 0.9, noS, noA, epsilon=0.2, useFirstVisit=True) results["MonteCarlo_Q_00"] = float(mc.qTable[0, 0]) mc.StoreTransition(0, 0, 1.0) mc.StoreTransition(1, 1, 2.0) mc.EndEpisodeAndUpdate() results["MonteCarlo_Q_00"] = float(mc.qTable[0, 0]) # 3) DynaQAgent: one real update then 3 planning steps. dyna = DynaQAgent(sampler, 0.5, 0.9, noS, noA, epsilon=0.2, planningSteps=3) results["DynaQ_Q_sum"] = float(np.sum(dyna.qTable)) dyna.UpdateParameters(state=0, nextState=1, reward=1.0, action=0) results["DynaQ_Q_sum"] = float(np.sum(dyna.qTable)) # 4) UCB1Agent: choose untried arm then update its count. ucb = UCB1Agent(sampler, noA, c=1.0) results["UCB1_counts"] = np.array(ucb.counts, copy=True) chosen = ucb.ChooseAction() ucb.UpdateParameters(chosen, 1.0) results["UCB1_counts"] = np.array(ucb.counts, copy=True) cb = CountBonusQLAgent(sampler, 0.5, 0.9, noS, noA, epsilon=0.1, bonus_coef=1.0, countType="state") # 5) CountBonusQLAgent: state-count bonus update. results["CountBonusQL_Q_sum"] = float(np.sum(cb.qTable)) cb.UpdateParameters(state=0, nextState=1, reward=0.0, action=1) results["CountBonusQL_Q_sum"] = float(np.sum(cb.qTable)) # 6) NStepTDAgent: two-step episode flushed at end. ns = NStepTDAgent(sampler, 0.5, 0.9, noS, noA, n=2, epsilon=0.1) ns.UpdateParameters(state=0, nextState=1, reward=1.0, action=0, nextAction=1, done=False) ns.UpdateParameters(state=1, nextState=2, reward=2.0, action=1, nextAction=0, done=True) results["NStep_Q_sum"] = float(np.sum(ns.qTable)) # 7) PrioritizedSweepingAgent complex test: two sequential updates to build predecessors # First update: (1,0) -> nextState=2 with reward=1.0 # Second update: (0,1) -> nextState=1 with reward=2.0 # We expect real updates only (no additional predecessor-triggered changes) for this small scenario psa = PrioritizedSweepingAgent(sampler, 0.5, 0.9, noS, noA, epsilon=0.1, planningSteps=2, theta=0.0) psa.UpdateParameters(state=1, nextState=2, reward=1.0, action=0) psa.UpdateParameters(state=0, nextState=1, reward=2.0, action=1) # Analytical expectations: # Q[1,0] = 0.5 (first real update with alpha=0.5) # Q[0,1] = 0.5 * (2.0 + 0.9 * 0.5) = 0.5 * 2.45 = 1.225 # Sum = 0.5 + 1.225 = 1.725 results["PrioritizedSweeping_Q_sum"] = float(np.sum(psa.qTable)) # Analytically computed expected values for the above deterministic sequence. expected = { "SARSALambda_Q_sum" : 0.5, # 0.5 after single SARSA(lambda) accumulating update. "MonteCarlo_Q_00" : 2.8, # G for state 0: 1 + 0.9*2 = 2.8. "DynaQ_Q_sum" : 0.9375, # 0.5 -> 0.75 -> 0.875 -> 0.9375 after 3 planning steps. "UCB1_counts" : np.array([1, 0]), # first untried arm chosen then incremented. "CountBonusQL_Q_sum" : 0.5, # Bonus=1->aug_reward=1->Q update yields 0.5. "NStep_Q_sum" : 2.4, # 2-step returns lead to Q[0,0]=1.4 and Q[1,1]=1.0 -> sum=2.4. "PrioritizedSweeping_Q_sum": 1.725 # See analytical expectations above. } print("\nTest results:") tol = 1e-8 allPass = True for k in expected.keys(): expVal = expected[k] obsVal = results[k] if (isinstance(expVal, np.ndarray)): testPass = np.allclose(obsVal, expVal, atol=tol) else: testPass = math.isclose(obsVal, expVal, abs_tol=tol) allPass = allPass and testPass status = "PASS" if testPass else "FAIL" print(f" {k:25s}: observed={obsVal} expected={expVal} [{status}]") print("\nOverall test result: " + ("PASS" if allPass else "FAIL")) # Another advanced test. print("\nRunning advanced test...") # Advanced deterministic scenario: two sequential Q-updates with gamma=0 simplify bootstrap. np.random.seed(123) # Reseed for reproducibility in advanced test. advResults = {} # QLearningAgent: two updates on (state=0, action=0) with gamma=0, alpha=0.5. ql = QLearningAgent(sampler, 0.5, 0.0, noS, noA, epsilon=0.0) ql.UpdateParameters(state=0, nextState=0, reward=1.0, action=0, nextAction=None) ql.UpdateParameters(state=0, nextState=0, reward=3.0, action=0, nextAction=None) advResults["QLearning_Q00"] = float(ql.qTable[0, 0]) # DynaQAgent with planningSteps=0 should behave like Q-learning here. dyna0 = DynaQAgent(sampler, 0.5, 0.0, noS, noA, epsilon=0.0, planningSteps=0) dyna0.UpdateParameters(state=0, nextState=0, reward=1.0, action=0) dyna0.UpdateParameters(state=0, nextState=0, reward=3.0, action=0) advResults["DynaQ_Q00"] = float(dyna0.qTable[0, 0]) # PrioritizedSweepingAgent with planningSteps=0 should also reduce to a Q-like update. psa0 = PrioritizedSweepingAgent(sampler, 0.5, 0.0, noS, noA, epsilon=0.0, planningSteps=0, theta=0.0) psa0.UpdateParameters(state=0, nextState=0, reward=1.0, action=0) psa0.UpdateParameters(state=0, nextState=0, reward=3.0, action=0) advResults["PSA_Q00"] = float(psa0.qTable[0, 0]) # Analytical expected Q after two sequential updates with alpha=0.5, gamma=0: # Q1 = 0 + 0.5*(1 - 0) = 0.5 # Q2 = 0.5 + 0.5*(3 - 0.5) = 1.75 advExpected = {"QLearning_Q00": 1.75, "DynaQ_Q00": 1.75, "PSA_Q00": 1.75} print("\nAdvanced test results:") advAllPass = True for k, obs in advResults.items(): exp = advExpected[k] ok = bool(np.isclose(obs, exp, rtol=1e-6, atol=1e-8)) advAllPass = advAllPass and ok print(f" {k:15s}: observed={obs:.6f} expected={exp:.6f} [{'PASS' if ok else 'FAIL'}]") print("\nAdvanced overall: " + ("PASS" if advAllPass else "FAIL")) # End of advanced test. print("\nAll deterministic tests completed.")