【Python】多腕バンディット問題①―ε-greedyアルゴリズムのPythonコードの詳細メモ

 ディスプレイ広告を出稿するにあたり、広告のレイアウト案1と案2をどちらにすべきかであったり、ウェブページのレイアウトをA案とB案のどちらにするかなどの意思決定の場面に遭遇する機会が仕事でありました。
 前者では決裁関与者による手直しにより案1に絞られ、後者では会議の出席者の多数決でA案になりました。前者の場合は、広告をより多くの人に見てもらうことが主な目的であり、一定数以上のインプレッション数を契約に盛り込めばいいという考えでした。一方、後者ではウェブページのレイアウトが煩雑でありもっと見やすくしたいという上の意見によりウェブページのレイアウトを修正することになりました。
 このとき、1つの案に多数決で絞り込むのではなく、現状のレイアウトも含めて3パターンのウェブページのレイアウトをすべて利用して、直帰率などの定量的な指標を用いて検証すればいいのになーと思うとともに、そういえばA/Bテスト以外にもバンディットアルゴリズムとかあるけど具体的にどんな手法なんだっけという状態でした。なので、これは多腕バンディット問題について勉強する機会が訪れたんだということにして、今回はε-greedyアルゴリズムについての学習メモを残しておきます。

はじめに

 多腕バンディット問題(バンディット問題, multi-armed bandit problem)とは、複数の選択肢(たとえば、作成した複数のウェブページのレイアウト案を指し、これをバンディット問題ではアームと呼び、強化学習ではアクションまたはコントロールと呼ぶ)から最も良いものを逐次的に探す問題のことをいいます。ここでいう「最も良い」とは、施策を実行する者(エージェントまたは予測者)が選択したアームからもたらされる報酬(たとえば、ウェブページであればコンバージョン率やクリック率のことを指します)が最も高いということを意味しており、バンディット問題において予測者はアームの選択を有限回数(理論的には無限回数も想定される)繰り返し反復して得られる総報酬を最大化したいと考えています。
 このとき問題になるのは、どのようなアームを選択すべきかということです。もちろん報酬が高いアームを選択したいのは当然なんですが、予測者は事前にそのことが分からない(事前情報はあったとしても完全に把握はしていない)のが普通です。そのため、まずは情報を収集するためにアームを選択しようとする行動のことを「探索(exploration)」といい、情報を収集した結果、最も報酬が高いと思われる(現時点での経験期待報酬が高い)アームを選択し利用する行動のことを「活用(exploitation)」といいます。これらの行動を反復的に行うことにより報酬を最大化するアームを選択するのが予測者の目的となります。
 ここで着目すべき点は、これらの行動はトレードオフの関係にあるということです。情報収集のためにアームを引き続ければ、どのアームの報酬が高いのかが分かってきますが、あまり探索ばかり続けてしまうと、現時点で期待報酬が高いアームを活用し続けていた場合の総報酬と比べて報酬が低くなる可能性があります。そのため、探索ばかり行うとこのような機会費用が発生する一方で、活用ばかり続けると現時点の期待報酬よりも高い報酬を得ることのできるアームの存在を見落としてしまう可能性が生じます。このような行動間の関係を探索と活用のトレードオフ(exploration-exploitation tradeoff)といいます。
 報酬を最大化するには、探索と活用のバランスをとる必要があり、そのためのアルゴリズムのひとつがε-greedyアルゴリズムです。このアルゴリズムを理解するためにはまず、greedyアルゴリズム(貪欲法ともいう)について簡単に触れておきます。

greedyアルゴリズム(貪欲法)

 バンディット問題におけるgreedyアルゴリズムとは、すべてのアームをN回探索したうえで、報酬(報酬の期待値)が最大のアームを選択するというものです。最初に探索を行い、その時点での報酬が最も高いアームを活用し続けるため、もしそのアームが最適なものでない場合は、誤った最適解に基づいてアームを引き続けることになるので、中々悲惨な状況だと思います。これがgreedyアルゴリズムのリスクであると同時に「活用」のみを続けることのリスクでもあるといえるでしょう。

ε-greedyアルゴリズム

 「活用」のみを続けることに対するリスクを緩和する方法としてε-greedyアルゴリズムがあります。これは、確率 ε 0 \leq ε \leq 1)でランダムなアームを選択すること以外は、基本的にgreedyアルゴリズムと同じといえます。そのため、 ε=0の場合だと当然greedyアルゴリズムと等しくなります。確率 εで探索を行うため、確率 1-εでは活用(これまでの報酬の平均値が最大のアームを選択する)を行います。
 このように探索と活用を逐次的に行い総報酬の最大化を目的にアームを選択しますが、選択する各アームごとに確率分布を仮定(それぞれの分布はi.i.d)しており、選んだアームの確率分布から報酬が引かれるという考え方をします。
 たとえば、アームを広告クリエィティブとし、報酬をクリック数とするならば、このバンディト問題は総クリック数を最大化する広告を選択することが目的となります。このとき、各アームの確率分布にベルヌーイ分布を仮定すると、確率 \thetaで報酬1となり、確率 1-\thetaで報酬0となります。これは広告がクリックされたならば報酬1が与えられ、クリックされなければ報酬が0となることを意味しており、確率 \thetaはアームから報酬が得られる成功確率みたいなものといえます。確率 \thetaが高いアームほど、クリックしてもらえる確率が高くなるため、このアームを活用し続ければ、総報酬も高くなることが予測されます。しかしながら、予測者はこの確率 \thetaのことを知らないので、探索を行い情報収集を行います。よって、成功確率 \thetaが高いということは、そのアームの期待報酬も高いといえそうです。

Pythonによる実装

 Pythonによるコードは『Bandit Algorithms for Website Optimization』(2012年, John White著)の本文中のコード(githubで公開)を利用します。他のウェブページにあるPythonのコードは大体このコードの写経かもしくはベースにしているものがほとんどであり、本記事もこれに準拠します。ただし、White(2012)のコードだとシミュレーション結果をプロットするのにRを使用していたり、numpyなどのライブラリを使用していないためややコードが読みづらくなっています。また、map関数の戻り値をそのままリストとして利用するコードが書かれているが、Python 3.x系ではmap関数の戻り値はMapオブジェクトとなるため、戻り値をそのまま利用するとエラーが生じてしまうため、そこらへんを修正しています(本記事はversion3.6を使用)。なお、コードを写経するだけでは他のブログと変わりないため、ここではコードについて詳細にみていくことで他の記事と差別化を図ります(コードを書く際にWhite(2012)以外で参考にしたサイトは次のものです:Epsilon-Greedy法 – Momentum←コードがシンプルでとても読みやすかったです!)。

コード全体

 まずは、コードの全体像をはじめに示します。次に個々のパーツごとに詳細な説明を加えていきます。

#------------------------------------------------------------------------------
# DEFINITION
#------------------------------------------------------------------------------
# BernoulliArm()クラスの定義
class BernoulliArm():
    def __init__(self, p):
        self.p = p

    def draw(self):
        if self.p > random.random() :
            return 1.0
        else:
            return 0.0

# EpsilonGreedy()クラスの定義
class EpsilonGreedy():
    def __init__(self, epsilon, counts, values):
        self.epsilon = epsilon 
        self.counts = counts 
        self.values = values 
        return
    
    def initialize(self, n_arms): 
        self.counts = np.zeros(n_arms)
        self.values = np.zeros(n_arms)
        return
    
    def select_arm(self):
        if self.epsilon > random.random(): 
            return np.random.randint(0, len(self.values))
        else:
            return np.argmax(self.values)
        
    def update(self, chosen_arm, reward):
        self.counts[chosen_arm] = self.counts[chosen_arm] + 1 
        n = self.counts[chosen_arm]
        value = self.values[chosen_arm] 
        new_value = ((n-1) / float(n)) * value + (1 / float(n)) * reward 
        self.values[chosen_arm] = new_value
        return

# test_algorithm()メソッドの定義
def test_algorithm(algo, arms, num_sims, horizon):
    chosen_arms = np.zeros(num_sims * horizon)
    rewards = np.zeros(num_sims * horizon)
    cumulative_rewards = np.zeros(num_sims * horizon)
    sim_nums = np.zeros(num_sims * horizon)
    times = np.zeros(num_sims * horizon)

    for sim in range(num_sims):
        sim = sim + 1 
        algo.initialize(len(arms)) 
        
        for t in range(horizon):
            t = t + 1 
            index = (sim - 1) * horizon + t - 1 
            sim_nums[index] = sim
            times[index] = t
            chosen_arm = algo.select_arm()
            chosen_arms[index] = chosen_arm
            
            reward = arms[chosen_arm].draw()
            rewards[index] = reward
            
            if t == 1:
                cumulative_rewards[index] = reward
            else:
                cumulative_rewards[index] = cumulative_rewards[index - 1] + reward
            
            algo.update(chosen_arm, reward)
            
    return [sim_nums, times, chosen_arms, rewards, cumulative_rewards]

#------------------------------------------------------------------------------
# MAIN 
#------------------------------------------------------------------------------
theta = np.array([0.1, 0.1, 0.1, 0.1, 0.9])
n_arms = len(theta) 
random.shuffle(theta)
    
arms = map(lambda x: BernoulliArm(x), theta)  
arms = list(arms)
    
for epsilon in [0.1, 0.2, 0.3, 0.4, 0.5]:
    algo = EpsilonGreedy(epsilon, [], [])
    algo.initialize(n_arms)
    results = test_algorithm(algo, arms, 5000, 250) 
    
    df = pd.DataFrame({"times": results[1], "rewards": results[3]})
    grouped = df["rewards"].groupby(df["times"]) 

    plt.plot(grouped.mean(), label="epsilon="+str(epsilon)) 

plt.legend(loc="best")
plt.show()


シミュレーションのセットアップについて

  • アーム数は5本
  • 各アームはベルヌーイ分布に従う
  • アームから得られる報酬は未知の成功確率 \thetaのベルヌーイ過程となる
  • そのため、報酬は確率 \thetaで1の値をとり、確率 1-\thetaで0の値をとる
  • 各アームから報酬が得られる成功確率 \thetaは各シミュレーションにおいてアームを引く回数(ラウンドまたはhoraizonという)によらず一定となる(もちろん予測者はこの値を知らない)
  • 確率 \thetaは{0.1, 0.1, 0.1, 0.1, 0.9}のいづれかとなるように設定する
  • シミュレーション回数は5000回とし、ラウンド数を250とする
  • 探索する確率εは[0.1, 0.5]の区間を0.1刻みで使用する

 

 コードについては、大きく分けてDEFINITIONパートとMAINパートに分けることができます。前者では、ベルヌーイ分布に従うアームのクラスとアルゴリズムのクラスを定義しており、それぞれのクラスのメソッドも定義しています。後者のMAINパートではセットアップで明示した値を使用して、シミュレーションの実行及び実行結果の図示についてのコードを記述しています。このような大枠のもと、それぞれのパートにおけるコードの内容について以下で言及します。

使用するライブラリについて

# 使用するライブラリ
> import pandas as pd
> import numpy as np
> import matplotlib.pyplot as plt
> import random


【DEFINISION】ベルヌーイ分布に従うアームの設定

> class BernoulliArm():
>     def __init__(self, p):
>         self.p = p
> 
>     def draw(self):
>         if self.p > random.random() :
>             return 1.0
>         else:
>             return 0.0

 BernoulliArmクラスでは、コンストラクタ内のインスタンス変数pを用いて、アームを引いて報酬を与える動作を定義しています。draw()メソッドでは確率pのとき1の値を返し、確率1-pのときに0の値を返すメソッドとなっています。実際に簡単なコードで挙動を確認すると以下のようになります。

> theta = [0.1,0.5,0.9] #アームから報酬が得られる成功確率(10%, 50%, 90%の3種類) 
> test = []
> for x in theta:
>     test.append(BernoulliArm(x).draw())
> print(test)
[0.0, 0.0, 1.0]


【DEFINISION】ε-greedyアルゴリズムクラスの定義及びそのメソッドについて

EpsilonGreedyクラスではコンストラクタを含めて5つのメソッドが定義されています。ここでは1つ1つを詳細にみていきます。

1. コンストラクタメソッド
> class EpsilonGreedy():
>     def __init__(self, epsilon, counts, values):
>         self.epsilon = epsilon # 探索する確率
>         self.counts = counts # アームを引く回数
>         self.values = values # 引いたアームから得られる報酬の平均値
>         return
2.初期化メソッド
>  def initialize(self, n_arms): # countsとvaluesを初期化
>         self.counts = np.zeros(n_arms)
>         self.values = np.zeros(n_arms)
>         return

ここではNumpyの配列を初期化するメソッドnp.zeros()を使用します。このメソッドは成分がすべてゼロの配列を生成します。これにより、成分がゼロの配列を生成しておき、特定の成分の値を後から更新することが可能となります。

3.アームを選択するselect_arm()メソッド
> def select_arm(self):
>         if self.epsilon > random.random(): # 確率εで探索を行う
>             return np.random.randint(0, len(self.values))
>         else: # 確率1-εで活用を行う
>             return np.argmax(self.values)

 ここでは確率εで探索を行い、1-εで活用を行うという動作を定義しています。このメソッドでは確率εで行動を起こすことを「if self.epsilon > random.random(): 」というif文で表現しており、return文で探索の動作を定義しています。
 探索とはランダムにアームを選ぶことなので、区間内のランダムな整数を返すnumpy.random.randint()を使用しますが、ここでrandom.randint()ではなくnumpy.random.randint()を使用しているのは区間の上限の値を含まない半開区間からランダムに整数を生成したいからです。つまり、探索を行ったときにランダムにゼロから始まるアームのインデックスの値のいづれかを返したいため、区間[0, アームの総数)から整数値を返すnumpy.random.randint()を使用することはまさにその目的に合致しているということになります。
 次に活用とは、これまでの報酬の平均値が最大のアームを選択することでしたので、 np.argmax()を用いてvaluesの要素のうち最も値が大きな要素のインデックスを返します。以上より、select_arm()メソッドは探索や活用した結果の値ではなくインデックスを返すメソッドとなります。

4.update()メソッド
>     def update(self, chosen_arm, reward):
>         self.counts[chosen_arm] = self.counts[chosen_arm] + 1 #アームを選んだ回数を更新
>         n = self.counts[chosen_arm]
>  
>         value = self.values[chosen_arm] 
>         new_value = ((n-1) / float(n)) * value + (1 / float(n)) * reward 
>         self.values[chosen_arm] = new_value
>         return

 選択したアームの情報を反映させるためにupdate()メソッドを定義します。update()メソッド内で定義したローカル変数nとvalueを使用してアームの平均報酬額を更新します。nは今回のアームの選択した回数を反映した値であり、valueは選択したアームの更新前の平均報酬額となります。更新後の平均報酬額new_valueの計算はvalueとrewardにそれぞれウエイト(n-1) / float(n)と1 / float(n)を乗じた加重平均として表現されます。こうして更新されたnew_valueをself.values[chosen_arm]に戻すことで、アームの平均報酬額を更新したことになります。


【DEFINISION】シミュレーションのテストを実行するtest_algorithm()メソッドについて

> def test_algorithm(algo, arms, num_sims, horizon):
>     # 変数の初期化
>     chosen_arms = np.zeros(num_sims * horizon)
>     rewards = np.zeros(num_sims * horizon)
>     cumulative_rewards = np.zeros(num_sims * horizon)
>     sim_nums = np.zeros(num_sims * horizon)
>     times = np.zeros(num_sims * horizon)
>
>     # 2重forループ
>     for sim in range(num_sims):
>         sim = sim + 1 # シミュレーション回数をカウント
>         algo.initialize(len(arms)) # アルゴリズム設定を初期化
>  
>         for t in range(horizon):
>             t = t + 1 # ラウンドの回数をカウント
>             index = (sim - 1) * horizon + t - 1 # 現時点の回数(シミュレーション回数×ラウンド回数)をindexに代入
>             sim_nums[index] = sim # simをsim_numsのindex番目の要素へ代入
>             times[index] = t  # tをtimesのindex番目の要素へ代入
>  
>             chosen_arm = algo.select_arm() # select_arm()メソッドにより選択したアームをchosen_armへ代入
>             chosen_arms[index] = chosen_arm # chosen_armをchosen_armsのindex番目の要素に代入
>  
>             reward = arms[chosen_arm].draw()
>             rewards[index] = reward # rewardをrewardsのindex番目の要素へ代入
>  
>             if t == 1: # はじめてのラウンドならば
>                 cumulative_rewards[index] = reward # rewardをcumulative_rewardsのindex番目の要素へ代入
>             else:
>                 cumulative_rewards[index] = cumulative_rewards[index - 1] + reward 
>  
>             algo.update(chosen_arm, reward)
>  
>     return [sim_nums, times, chosen_arms, rewards, cumulative_rewards]

 test_algorithm()メソッドは変数の初期化パートと2重forループのパートに分けることができます。変数の初期化パートではシミュレーション回数5000とラウンド数250を掛けた1,250,000のゼロ要素を持つ配列を用意します。そして、用意した配列に値を入れるために2重forループ文を実行していきます。
 外側のループから見ていきます。外側のforループはシミュレーションの回数分だけ実行されます。最初にシミュレーションの回数を記録する変数simをインクリメントします。ここでのrange関数はゼロの値から始まるため、シミュレーション回数がゼロ回なるのを防ぐため、初めにインクリメントしています。次にinitialize()メソッドを使用し、これまでに引いたアームに関する情報(counts, values)を消去します。これはアルゴリズムの初期化を行うことを意味します。
 次に内側のループについて見ていきます。内側のforループはラウンドの回数分ループを実行します。まず、ラウンドの回数を記録するために変数tをインクリメントします。次に変数indexはシミュレーション回数×ラウンド回数を総数としたときの現在のループ時点の回数を代入した変数です。このindexは初期化した各変数に各ラウンドごとの値を格納するために利用することを想定しており、simとtから1を引くことにより変数indexを配列の要素に対応させています。こうして作成したindexを初期化した変数であるsim_numsとtimesのindex番目の要素として用いてインクリメントしたsimとtを代入します。
 次にchosen_armには、select_arm()メソッドにより選択したアームを代入し、その値をchosen_armsのindex番目の要素に代入します。これにより各ラウンドにおいて選択したアームを chosen_armsに入れることが可能となります。アームを選択した後は、報酬についてです。rewardには初期化した変数armsのchosen_arm番目の要素についてdraw()メソッドにより探索(活用)した結果を代入します。そして、初期化した変数rewardsのindex番目の要素にrewardの値を代入することにより、各ラウンドごとの報酬を変数rewardsに格納していくことが可能となります。次にif文により、初回のラウンドとそれ以外のラウンドについて、初期化した変数cumulative_rewardsに値を格納します。そして、最後にupdate()メソッドにより選択したアームの情報を反映させるまでが内側のループの処理内容となります。

【MAIN】シミュレーションの実行と実行結果の表示

> theta = np.array([0.1, 0.1, 0.1, 0.1, 0.9]) # アームから報酬が得られる成功確率
> n_arms = len(theta) 
> random.shuffle(theta)
> 
> arms = map(lambda x: BernoulliArm(x), theta)  
> arms = list(arms) # armsをリスト化する
> 
> for epsilon in [0.1, 0.2, 0.3, 0.4, 0.5]:
>     algo = EpsilonGreedy(epsilon, [], [])
>     algo.initialize(n_arms)
>     results = test_algorithm(algo, arms, 5000, 250) # シミュレーションの実行結果をresultsへ格納
>     
>  # プロット 
>     df = pd.DataFrame({"times": results[1], "rewards": results[3]})
>     grouped = df["rewards"].groupby(df["times"]) 
> 
>     plt.plot(grouped.mean(), label="epsilon="+str(epsilon)) # 各ラウンドごとの報酬の平均値をεの値ごとにプロットする
> 
> plt.legend(loc="best")
> plt.show()

 まず、変数thetaはアームから報酬が得られる確率の配列を格納しています。今回の設定では5本のアームのうち4本の確率が10%であり、残りの1本の確率が90%に設定しています。次に変数armsは、thetaのすべての要素を BernoulliArm()に入れる処理を行います。ここではmap関数とラムダ式を組み合わせることですっきりしたコードになっていますが、ここであえてmap関数やラムダ式を使用せずに記述すると以下のようになります。

> theta = [0.1, 0.1, 0.1, 0.1, 0.9]
> test = []
> for i in theta:
>     test.append(BernoulliArm(i))
> print(test)
[<__main__.BernoulliArm object at 0x0000023A97D65A90>, <__main__.BernoulliArm object at 0x0000023A97D65748>, <__main__.BernoulliArm object at 0x0000023A97D69908>, <__main__.BernoulliArm object at 0x0000023A97D69D30>, <__main__.BernoulliArm object at 0x0000023A97D69898>]

上記のコードではtestをリスト化していません。これはmap関数を使用したときの戻り値がmap型オブジェクトになってしまうためであり、上記のコードではきちんとclass型のオブジェクトのリストとなっています。ちなみに、map関数を適用した後では以下のように表示されます。

> arms = map(lambda x: Bernoulli(x), means)
> print(arms)
<map object at 0x0000023A97D69AC8>

この時点ではarmsにはクラス型のオブジェクトが格納されていますが、この後でdraw()メソッドを使用して[0.0, 1.0]の値を返すようになります。簡単な例としては以下のようなイメージになります。

> arms = map(lambda x: Bernoulli(x), means)
> arms = list(arms)
> reward = arms[4].draw()
> print(reward)
1.0

 このようにしてarmsなどforループ文の前に変数を用意したあとで、εの値ごとにシミュレーションを実行し、その結果をローカル変数resultsに格納します。そして、resultsを利用してシミュレーション結果をプロットします。

シミュレーション結果の解釈

 横軸にラウンド数、縦軸には各ラウンドごとの報酬の平均値を示し、イプシロンの値ごとにプロットしたのが以下の図となります。
f:id:joure:20171119233241p:plain
 
 ラウンド数が50以下では、すべてのepsilon(探索する確率)において勾配が急ですが、50を過ぎてから勾配が緩やかになってきます。100以降ではepsilonが0.1の場合を除いて得られる平均報酬額が一定の水準に収束していますが、epsilonが0.1の場合ではラウンド数が100以降も緩やかに得られる平均報酬額が上昇しており、150以降のラウンドでは平均報酬額がトップになっております。このことは、探索する確率を低く設定すると最初は得られる平均報酬額は低いが、ラウンド数を増やすことで、最も高い平均報酬額が得られることを意味しています。


おわりに

 今回は多腕バンディット問題に利用するアルゴリズムの1つであるε-greedyアルゴリズムについて確認してみました。