0%

统计学习方法|隐马尔可夫模型原理详解与实现

隐马尔可夫模型(HMM),是一个可用来解决标注问题的生成模型,在自然语言处理、语音识别等领域有着广泛的应用。本篇博客将详细地介绍HMM模型的三个问题与解决算法,包括:前向-后向算法、Baum-Welch算法以及大名鼎鼎的Viterbi算法。最后,并采用python以及hmmlearn库这两种方式,来对HMM模型进行实现。

隐马尔可夫模型介绍

隐马尔可夫模型(HMM),属于概率图模型。它的原理,总结起来就是三句话:一个模型,两个假设,三个问题(算法)。下面就讲一介绍。

一个模型

HMM模型描述的是一个隐藏的马尔可夫链生成不可观测的状态序列,再由状态序列生成观测序列的过程。在这里,我们需要定义一些符号。我们将状态序列记为:$I=\{i_1,i_2,…,i_T\}$,其中$T$表示序列的长度,所有的状态变量的取值集合记做:$Q=\{q_1,q_2,…,q_N\}$,其中$N$表示状态取值的总数。我们将观测序列记为:$O=\{o_1,o_2,…,o_T\}$,其中$T$表示序列的长度,所有的观测变量的取值集合记做:$V=\{v_1,v_2,…,v_M\}$。记HMM模型的参数为:$\lambda=(\pi,A,B)$。其中,$\pi$表示初始概率分布,$A$表示状态转移矩阵;$B$表示观测概率分布。具体它的概率图如下:

两个假设

HMM模型中有两个假设:齐次马尔可夫假设观测独立性假设

齐次马尔可夫假设。在给定$i_t$的情况下,$i_{t+1}$与$i_{t-1}$以及之前的状态变量无关。即:

观测独立性假设。在给定$i_t$的情况下,$o_t$与之前的状态与观测变量都无关。即:

三个问题(算法)

HMM模型主要要解决三个问题:Evaluation、Learning、Decoding

Evaluation。即:在给定模型参数$\lambda$的情况,生成观测序列$O$的概率$P(O|\lambda)$。

Learning。即:求$\lambda$,即:$\lambda=\mathop{argmax}\limits_{\lambda}P(O|\lambda)$。

Decoding。即:给定模型参数与观测序列的情况下,求概率最大的状态序列。即:$I=\mathop {argmax} \limits_{I}P(I|O)$。

关于Evaluation问题,我们使用前向算法或者后向算法进行求解;关于Learning问题,我们使用Baum-Welch算法来求解;关于Decoding问题,我们使用Viterbi算法来求解。下面将一一介绍这些算法。

Evaluation问题的求解

关于Evaluation问题的求解,有两种算法:前向算法与后向算法。其实,最后结果是一样,只是求解过程不太一样。在这里,我将直接给出推导过程。

首先需要化简$P(O|\lambda)$。如下:

所以我们可以看到,公式:$P(O|\lambda)=\mathop{\sum}\limits_{I}\prod_{t=1}^{T}b_{i_t}(o_t) \pi (a_{i_1})\prod_{t=2}^{T}a_{i_{t-1},i_{t-2}}$。其计算复杂度为:$O(TN^T)$。当T很大的时候,计算复杂度是以指数形式增长的,所以近乎是不可解的。所以,我们必须找新的算法来解决这个问题。

前向算法

使用前向算法,得出递推公式,如下:

所以,我们可以得到递推公式:$\alpha_{t+1}(i)=\mathop{\sum}\limits_{j=1}^{N}\alpha_t(j)a_{ji}b_i(o_{t+1})$。又$\alpha_1(i)=\pi_ib_i(o_1)$,$P(O|\lambda)=\mathop{\sum}\limits_{i=1}^{N}\alpha_T(i)$。那么我们就可以求出$p(O|\lambda)$的值了。

后向算法

除了使用前向算法,后向算法也是求解$P(O|\lambda)$的一种方法。递推公式的推导如下:

所以,我们得到了递推公式:$\beta_t(i)=\mathop{\sum}\limits_{j=1}^{N}b_j(o_{t+1})\beta_{t+1}(j)$。又$\beta_T(i)=1$,$P(O|\lambda)=\mathop{\sum}\limits_{i=1}^{N}\pi_ib_i(o_1)\beta_1(i)$。那么我们就可以求出$P(O|\lambda)$的值了。

Learning问题的求解

所谓的Learning问题,其实就是求$\lambda=\mathop{argmax}\limits_{\lambda}P(O|\lambda)$。那么,直接求的话,我们会发现这个式子是没有解析解的。所以,在HMM模型中,采用Baum-Welch算法,也就是EM算法来解决这个问题。

Baum-Welch算法(EM算法)

Baum-Welch算法推导如下:

Decoding问题的求解

所谓的decoding问题,就是:给定模型参数$\lambda$与观测序列$O$,求最大概率的状态变量$I$。即:$I=\mathop{argmax}\limits_{I}P(I|O)$。在HMM中,我们使用Viterbi算法来求解,实际上是一种动态规划的思想,非常简单。

Viterbi算法

我们定义$\delta_t(i)$为第t时刻状态为$q_i$的所有单个路径中的概率最大值。公式如下:

那么,很容易就能得到递推公式:

其中,$i=1,2,…,N$。根据这个递推公式,我们很快就能得到所有路径中概率的最大值。那么怎么求得概率最大的路径呢?

我们定义$\psi_t(i)$表示t时刻状态为你$q_i$的所有单个路径中概率最大的路径的第$t-1$个节点。公式如下:

整理如下:

至此,HMM模型,理论部分就讲完了🎉~

HMM模型的实现

把模型实现一遍才算是真正的吃透了这个模型呀。在这里,我采取了python以及hmmlearn库这两种方式来实现HMM模型,我的github里面可以下在到所有的代码,欢迎访问我的github,也欢迎大家star和fork。附上GitHub地址: 《统计学习方法》及常规机器学习模型实现。具体代码如下

使用hmmlearn库实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# coding:utf-8
# Author:codewithzichao
# E-mail:lzichao@pku.edu.cn

'''
hmmlearn中有两个模型:高斯HMM与多项式HMM,分别对应于:变量是连续的与离散的。

'''
import numpy as np
from hmmlearn import hmm

# 3 个盒子状态
states = ['box1', 'box2', 'box3']
# 2 个球观察状态
observations = ['red', 'white']
# 初始化概率
start_probability = np.array([0.2, 0.4, 0.4])
# 转移概率
transition_probability = np.array([
[0.5, 0.2, 0.3],
[0.3, 0.5, 0.2],
[0.2, 0.3, 0.5]
])

# 发射状态概率
emission_probability = np.array([
[0.5, 0.5],
[0.4, 0.6],
[0.7, 0.3]
])

if __name__ == "__main__":
# 建立模型,设置参数
model = hmm.MultinomialHMM(n_components=len(states))
model.startprob_ = start_probability
model.transmat_ = transition_probability
model.emissionprob_ = emission_probability

# 预测问题,执行Viterbi算法
seen = np.array([0, 1, 0])
logprob, box = model.decode(seen.reshape(-1, 1), algorithm='viterbi')
print('The ball picked:', ','.join(map(lambda x: observations[x], seen)))
print('The hidden box:', ','.join(map(lambda x: states[x], box)))

box_pre = model.predict(seen.reshape(-1, 1))
print('The ball picked:', ','.join(map(lambda x: observations[x], seen)))
print('The hidden box predict:', ','.join(map(lambda x: states[x], box_pre)))
# 观测序列的概率计算问题
# score函数返回的是以自然对数为底的对数概率值
# ln0.13022≈−2.0385
print(model.score(seen.reshape(-1, 1)))

print("-------------------------------")

# 学习问题
states = ["box1", "box2", "box3"]
n_states = len(states)

observations = ["red", "white"]
n_observations = len(observations)

model = hmm.MultinomialHMM(n_components=n_states)
# 三个观测序列,用来训练
X = np.array([[0, 1, 0, 1], [0, 0, 0, 1], [1, 0, 1, 1]])
model.fit(X) # 训练模型
print(model.startprob_) # 得到初始概率矩阵
print(model.transmat_) # 得到状态转移矩阵
print(model.emissionprob_) # 得到观测概率分布
# 概率计算
print(model.score(X))


Python实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# coding=utf8
# Author:codeiwithzichao
# E-mail:lizichao@pku.edu.cn

import numpy as np


class HMM(object):
def __init__(self, N, M):
self.A = np.zeros((N, N)) # 状态转移概率矩阵
self.B = np.zeros((N, M)) # 观测概率矩阵
self.Pi = np.array([1.0 / N] * N) # 初始状态概率矩阵

self.N = N # 可能的状态数
self.M = M # 可能的观测数

def cal_probality(self, O):
self.T = len(O)
self.O = O

self.forward()
return sum(self.alpha[self.T - 1])

def forward(self):
"""
前向算法
"""
self.alpha = np.zeros((self.T, self.N))

# 公式 10.15
for i in range(self.N):
self.alpha[0][i] = self.Pi[i] * self.B[i][self.O[0]]

# 公式10.16
for t in range(1, self.T):
for i in range(self.N):
sum = 0
for j in range(self.N):
sum += self.alpha[t - 1][j] * self.A[j][i]
self.alpha[t][i] = sum * self.B[i][self.O[t]]

def backward(self):
"""
后向算法
"""
self.beta = np.zeros((self.T, self.N))

# 公式10.19
for i in range(self.N):
self.beta[self.T - 1][i] = 1

# 公式10.20
for t in range(self.T - 2, -1, -1):
for i in range(self.N):
for j in range(self.N):
self.beta[t][i] += self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]

def cal_gamma(self, i, t):
"""
公式 10.24
"""
numerator = self.alpha[t][i] * self.beta[t][i]
denominator = 0

for j in range(self.N):
denominator += self.alpha[t][j] * self.beta[t][j]

return numerator / denominator

def cal_ksi(self, i, j, t):
"""
公式 10.26
"""

numerator = self.alpha[t][i] * self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]
denominator = 0

for i in range(self.N):
for j in range(self.N):
denominator += self.alpha[t][i] * self.A[i][j] * self.B[j][self.O[t + 1]] * self.beta[t + 1][j]

return numerator / denominator

def init(self):
"""
随机生成 A,B,Pi
并保证每行相加等于 1
"""
import random
for i in range(self.N):
randomlist = [random.randint(0, 100) for t in range(self.N)]
Sum = sum(randomlist)
for j in range(self.N):
self.A[i][j] = randomlist[j] / Sum

for i in range(self.N):
randomlist = [random.randint(0, 100) for t in range(self.M)]
Sum = sum(randomlist)
for j in range(self.M):
self.B[i][j] = randomlist[j] / Sum

def train(self, O, MaxSteps=100):
self.T = len(O)
self.O = O

# 初始化
self.init()

step = 0
# 递推
while step < MaxSteps:
step += 1
print(step)
tmp_A = np.zeros((self.N, self.N))
tmp_B = np.zeros((self.N, self.M))
tmp_pi = np.array([0.0] * self.N)

self.forward()
self.backward()

# a_{ij}
for i in range(self.N):
for j in range(self.N):
numerator = 0.0
denominator = 0.0
for t in range(self.T - 1):
numerator += self.cal_ksi(i, j, t)
denominator += self.cal_gamma(i, t)
tmp_A[i][j] = numerator / denominator

# b_{jk}
for j in range(self.N):
for k in range(self.M):
numerator = 0.0
denominator = 0.0
for t in range(self.T):
if k == self.O[t]:
numerator += self.cal_gamma(j, t)
denominator += self.cal_gamma(j, t)
tmp_B[j][k] = numerator / denominator

# pi_i
for i in range(self.N):
tmp_pi[i] = self.cal_gamma(i, 0)

self.A = tmp_A
self.B = tmp_B
self.Pi = tmp_pi

def generate(self, length):
import random
I = []

# start
ran = random.randint(0, 1000) / 1000.0
i = 0
while self.Pi[i] < ran or self.Pi[i] < 0.0001:
ran -= self.Pi[i]
i += 1
I.append(i)

# 生成状态序列
for i in range(1, length):
last = I[-1]
ran = random.randint(0, 1000) / 1000.0
i = 0
while self.A[last][i] < ran or self.A[last][i] < 0.0001:
ran -= self.A[last][i]
i += 1
I.append(i)

# 生成观测序列
Y = []
for i in range(length):
k = 0
ran = random.randint(0, 1000) / 1000.0
while self.B[I[i]][k] < ran or self.B[I[i]][k] < 0.0001:
ran -= self.B[I[i]][k]
k += 1
Y.append(k)

return Y


def triangle(length):
'''
三角波
'''
X = [i for i in range(length)]
Y = []

for x in X:
x = x % 6
if x <= 3:
Y.append(x)
else:
Y.append(6 - x)
return X, Y


def sin(length):
'''
三角波
'''
import math
X = [i for i in range(length)]
Y = []

for x in X:
x = x % 20
Y.append(int(math.sin((x * math.pi) / 10) * 50) + 50)
return X, Y


def show_data(x, y):
import matplotlib.pyplot as plt
plt.plot(x, y, 'g')
plt.show()

return y


if __name__ == '__main__':

hmm = HMM(15,101)
sin_x, sin_y = sin(40)
show_data(sin_x, sin_y)
hmm.train(sin_y)
y = hmm.generate(100)
x = [i for i in range(100)]
show_data(x,y)

Would you like to buy me a cup of coffee☕️~