文章目录

这几天搭建 elasticsearch 集群做日志分析,终于有机会可以实际跑一下 skyline 的效果。不过比较麻烦的事情是,skyline 是一个比较完备的系统而不是插件,要求我们把数据通过 msgpack 发过去存到 redis 里。这是个很没有道理的做法,早在去年刚看到这个项目的时候我就在博客里写下了愿景是应该用 elasticsearch 替换掉 redis。等了这么久没等到,干脆就自己动手实现。修改后,skyline 其余的程序完全可以直接扔掉,只留下这一个脚本定时运行就够了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
import pandas
import numpy as np
import scipy
import statsmodels.api as sm
import os
import sys
import json
import traceback
import logging
from time import time
from elasticsearch import Elasticsearch


MAX_RESOLUTION = 1000
MAX_TOLERABLE_BOREDOM = 100
MIN_TOLERABLE_LENGTH = 1
REDOM_SET_SIZE = 1
BOREDOM_SET_SIZE = 1
FULL_DURATION = 86400
CONSENSUS = 6
STALE_PERIOD = 500
ALGORITHMS = [
'first_hour_average',
'mean_subtraction_cumulation',
'stddev_from_average',
'stddev_from_moving_average',
'least_squares',
'grubbs',
'histogram_bins',
'median_absolute_deviation',
'ks_test',
]

logger = logging.getLogger("AnalyzerLog")

"""
This is no man's land. Do anything you want in here,
as long as you return a boolean that determines whether the input
timeseries is anomalous or not.
To add an algorithm, define it here, and add its name to settings.ALGORITHMS.
"""


def tail_avg(timeseries):
"""
This is a utility function used to calculate the average of the last three
datapoints in the series as a measure, instead of just the last datapoint.
It reduces noise, but it also reduces sensitivity and increases the delay
to detection.
"""
try:
t = (timeseries[-1][1] + timeseries[-2][1] + timeseries[-3][1]) / 3
return t
except IndexError:
return timeseries[-1][1]


def median_absolute_deviation(timeseries):
"""
A timeseries is anomalous if the deviation of its latest datapoint with
respect to the median is X times larger than the median of deviations.
"""

series = pandas.Series([x[1] for x in timeseries])
median = series.median()
demedianed = np.abs(series - median)
median_deviation = demedianed.median()

# The test statistic is infinite when the median is zero,
# so it becomes super sensitive. We play it safe and skip when this happens.
if median_deviation == 0:
return False

test_statistic = demedianed.iget(-1) / median_deviation

# Completely arbitary...triggers if the median deviation is
# 6 times bigger than the median
if test_statistic > 6:
return True


def grubbs(timeseries):
"""
A timeseries is anomalous if the Z score is greater than the Grubb's score.
"""

series = scipy.array([x[1] for x in timeseries])
stdDev = scipy.std(series)
mean = np.mean(series)
tail_average = tail_avg(timeseries)
z_score = (tail_average - mean) / stdDev
len_series = len(series)
threshold = scipy.stats.t.isf(.05 / (2 * len_series), len_series - 2)
threshold_squared = threshold * threshold
grubbs_score = ((len_series - 1) / np.sqrt(len_series)) * np.sqrt(threshold_squared / (len_series - 2 + threshold_squared))

return z_score > grubbs_score


def first_hour_average(timeseries):
"""
Calcuate the simple average over one hour, FULL_DURATION seconds ago.
A timeseries is anomalous if the average of the last three datapoints
are outside of three standard deviations of this value.
"""
last_hour_threshold = time() - (FULL_DURATION - 3600)
series = pandas.Series([x[1] for x in timeseries if x[0] < last_hour_threshold])
mean = (series).mean()
stdDev = (series).std()
t = tail_avg(timeseries)

return abs(t - mean) > 3 * stdDev


def stddev_from_average(timeseries):
"""
A timeseries is anomalous if the absolute value of the average of the latest
three datapoint minus the moving average is greater than three standard
deviations of the average. This does not exponentially weight the MA and so
is better for detecting anomalies with respect to the entire series.
"""
series = pandas.Series([x[1] for x in timeseries])
mean = series.mean()
stdDev = series.std()
t = tail_avg(timeseries)

return abs(t - mean) > 3 * stdDev


def stddev_from_moving_average(timeseries):
"""
A timeseries is anomalous if the absolute value of the average of the latest
three datapoint minus the moving average is greater than three standard
deviations of the moving average. This is better for finding anomalies with
respect to the short term trends.
"""
series = pandas.Series([x[1] for x in timeseries])
expAverage = pandas.stats.moments.ewma(series, com=50)
stdDev = pandas.stats.moments.ewmstd(series, com=50)

return abs(series.iget(-1) - expAverage.iget(-1)) > 3 * stdDev.iget(-1)


def mean_subtraction_cumulation(timeseries):
"""
A timeseries is anomalous if the value of the next datapoint in the
series is farther than three standard deviations out in cumulative terms
after subtracting the mean from each data point.
"""

series = pandas.Series([x[1] if x[1] else 0 for x in timeseries])
series = series - series[0:len(series) - 1].mean()
stdDev = series[0:len(series) - 1].std()
expAverage = pandas.stats.moments.ewma(series, com=15)

return abs(series.iget(-1)) > 3 * stdDev


def least_squares(timeseries):
"""
A timeseries is anomalous if the average of the last three datapoints
on a projected least squares model is greater than three sigma.
"""

x = np.array([t[0] for t in timeseries])
y = np.array([t[1] for t in timeseries])
A = np.vstack([x, np.ones(len(x))]).T
results = np.linalg.lstsq(A, y)
residual = results[1]
m, c = np.linalg.lstsq(A, y)[0]
errors = []
for i, value in enumerate(y):
projected = m * x[i] + c
error = value - projected
errors.append(error)

if len(errors) < 3:
return False

std_dev = scipy.std(errors)
t = (errors[-1] + errors[-2] + errors[-3]) / 3

return abs(t) > std_dev * 3 and round(std_dev) != 0 and round(t) != 0


def histogram_bins(timeseries):
"""
A timeseries is anomalous if the average of the last three datapoints falls
into a histogram bin with less than 20 other datapoints (you'll need to tweak
that number depending on your data)
Returns: the size of the bin which contains the tail_avg. Smaller bin size
means more anomalous.
"""

series = scipy.array([x[1] for x in timeseries])
t = tail_avg(timeseries)
h = np.histogram(series, bins=15)
bins = h[1]
for index, bin_size in enumerate(h[0]):
if bin_size <= 20:
# Is it in the first bin?
if index == 0:
if t <= bins[0]:
return True
# Is it in the current bin?
elif t >= bins[index] and t < bins[index + 1]:
return True

return False


def ks_test(timeseries):
"""
A timeseries is anomalous if 2 sample Kolmogorov-Smirnov test indicates
that data distribution for last 10 minutes is different from last hour.
It produces false positives on non-stationary series so Augmented
Dickey-Fuller test applied to check for stationarity.
"""

hour_ago = time() - 3600
ten_minutes_ago = time() - 600
reference = scipy.array([x[1] for x in timeseries if x[0] >= hour_ago and x[0] < ten_minutes_ago])
probe = scipy.array([x[1] for x in timeseries if x[0] >= ten_minutes_ago])

if reference.size < 20 or probe.size < 20:
return False

ks_d, ks_p_value = scipy.stats.ks_2samp(reference, probe)

if ks_p_value < 0.05 and ks_d > 0.5:
adf = sm.tsa.stattools.adfuller(reference, 10)
if adf[1] < 0.05:
return True

return False


def run_selected_algorithm(timeseries):
"""
Filter timeseries and run selected algorithm.
"""
# Get rid of short series
if len(timeseries) < MIN_TOLERABLE_LENGTH:
pass

# Get rid of stale series
if time() - timeseries[-1][0] > STALE_PERIOD:
pass

# Get rid of boring series
if len(set(item[1] for item in timeseries[-MAX_TOLERABLE_BOREDOM:])) == BOREDOM_SET_SIZE:
pass

try:
ensemble = [globals()[algorithm](timeseries) for algorithm in ALGORITHMS]
threshold = len(ensemble) - CONSENSUS
if ensemble.count(False) <= threshold:
return True, ensemble, timeseries[-1][1]
return False, ensemble, timeseries[-1][1]
except:
logging.error("Algorithm error: " + traceback.format_exc())
return False, [], 1

def get_es_data(index, field):
"""
Get values of the field from elasticsearh index.
"""
es = Elasticsearch(['10.4.16.68', '10.4.27.162'])
res = es.search(index=index, body={
"query" : {
"filtered" : {
"query" : { "match_all" : {}},
"filter" : {
"range" : { "@timestamp" : { "from" : "now-1h", "to" : "now" }}
}
}
},
"size" : 0,
"facets" : {
field : { "date_histogram" : { "key_field" : "@timestamp", "value_field" : field, "interval" : "5m" } }
}
})
print(res['hits']['total'])
timeseries = []
for entry in res['facets'][field]['entries']:
timeseries.append( [entry['time'], entry['total']] )
return(timeseries)

if __name__ == "__main__":

timeseries = get_es_data('logstash-ngxaccounting-2014.05.30', 'code.504')

anomalous, ensemble, datapoint = run_selected_algorithm(timeseries)
if anomalous:
print("Your anomalous datapoint (" + str(datapoint) + ") don't pass following test:")
for index, value in enumerate(ensemble):
if value:
print("\t" + ALGORITHMS[index])
else:
print("Your latest datapoint is OK")
print(ensemble, datapoint)

这里面主要就是拼了一下 elasticsearch 的 date_histogram 类型的 facet 请求,获取最近 1 个小时的每 5 分钟统计值构成的时间序列数据。然后发给前面那些检验算法。

如果要推广用,把里面这个 code.504 提出来做一个可配置项就行了。

本文引自三斗室

文章目录