해당 제목에 대한 내용은 기존에 올린 “Julia에서 시계열 데이터 처리 예제”들을 참조 하면됩니다.
여기서는 다중 프로세스로 Genetic Programming(GP)을 병렬처리 하는 방법을 보여 줍니다.
다중 프로세스가 train, test할 데이터는 SharedArray로 공유 합니다.
아래 모듈은 “/home/shpark/julia_test/exproptimizatoin/examples” 폴더 있다고 가정
WS.jl : 연습용 데이터를 만드는 모듈 / 이 게시글에서는 WindowSlider 구조체만 사용함
GP2.jl: Genetic Programming 적용 모듈
아래에 소스를 게제 합니다.
그리고 GP2 활용은 jupyter notebook으로 되어 있습니다
구현소스(Julia Notebook – html 버전)
이 예제에서는 GP를 통해 f(x1,x2) = 0.9 x1^2 + 0.7 x1 x2 + 0.3 x2 의 근사식을 찾고 학습 되지 않은 데이터 대한
prediction을 합니다.
GP2.jl 소스
module GP2
using Distributed
using Random
using ExprOptimization
using SharedArrays
mutable struct DGrammar
grammar::Array{String,1}
function DGrammar()
grammar = Array{String,1}(undef,0)
push!(grammar,"@grammar begin")
push!(grammar,"end")
new(grammar)
end
end
#=
examples:
grmmar = DGrammar()
grmmar("R = R + R")
grmmar("R = R - R")
=#
function (g::DGrammar)(r::String)
insert!(g.grammar,2,r)
end
#=
Return Grammar
grmmar()
=#
function (g::DGrammar)()
eval(Meta.parse(join(g.grammar,"\n")))
end
mutable struct Vars
v::Vector{String}
function Vars()
v = Vector{String}(undef,0)
new(v)
end
end
function (vs::Vars)(s::String)
push!(vs.v,s)
end
function (vs::Vars)()
vars = "(["*join(vs.v,",")*"])"
end
function train(gp::GeneticProgram, grammar::Grammar,
train_x::Array{Float64,2},train_y::Array{Float64,1},nJob::Int64=1)
#=
nJob이 1 보다 크면 병렬처리
=#
if nJob > 1
pids = addprocs(nJob)
module_dir = "/home/shpark/julia_test/exproptimizatoin/examples"
# everywhere 사용시 각 process에 값을 넘겨 주려면 interpolation으로 넘겨 줘야 한다.
@everywhere push!(LOAD_PATH, $module_dir)
eval(macroexpand(Distributed,quote @everywhere using GP2 end))
nr,nc = size(train_x)
shared_train_x = SharedArray{Float64}((nr,nc),pids=pids)
shared_train_y = SharedArray{Float64}((nr,1),pids=pids)
shared_train_x[:,:] = train_x
shared_train_y[:,:] = train_y
rps = [@spawnat i GP2._train_parallel(gp,grammar,
shared_train_x,shared_train_y)
for i in pids]
ms = map(x->fetch(rps[x]),1:length(rps))
# nJob개의 프로세스에서 가져온 결과중 loss가 가장 작은
# 개체를 골라 낸다.
# m의 타입 및 내부 fields를 알려면
# type확인 : typeof(m)
# fields 확인 : fieldnames(typeof(m))
r = ms[argmin([m.loss for m in ms ])]
rmprocs(pids)
else
r = _train(gp,grammar,train_x,train_y)
end
return r
end
function _train(gp::GeneticProgram, grammar::Grammar,
train_x::Array{Float64,2},train_y::Array{Float64,1})
S = SymbolTable(grammar)
function loss(tree::RuleNode, grammar::Grammar)
ex = get_executable(tree,grammar)
nr,nc = size(train_x)
ΣΔy = 0
for r in 1:nr
for c in 1:nc
x = train_x[r,c]
S[Symbol("x",c)] = x
end
y_fit = Core.eval(S,ex)
y = train_y[r]
ΣΔy += (y - y_fit)^2
end
los = sqrt(ΣΔy)/nr
end
results_gp = optimize(gp, grammar, :R, loss,verbose=true)
return results_gp
end
function _train_parallel(gp::GeneticProgram, grammar::Grammar,
train_x::SharedArray,train_y::SharedArray)
S = SymbolTable(grammar)
function loss(tree::RuleNode, grammar::Grammar)
ex = get_executable(tree,grammar)
nr,nc = size(train_x)
ΣΔy = 0
for r in 1:nr
for c in 1:nc
x = train_x[r,c]
S[Symbol("x",c)] = x
end
y_fit = Core.eval(S,ex)
y = train_y[r]
ΣΔy += (y - y_fit)^2
end
los = sqrt(ΣΔy)/nr
end
results_gp = optimize(gp, grammar, :R, loss,verbose=true)
return results_gp
end
function predict(results_gp::ExprOptResult,grammar::Grammar,test_x::Array{Float64,2})
#=
Prediction
=#
S = SymbolTable(grammar)
ex = results_gp.expr
nr,nc = size(test_x)
y = Array{Float64,1}(undef,0)
for r in 1:nr
for c in 1:nc
x = test_x[r,c]
S[Symbol("x",c)] = x
end
push!(y, Core.eval(S,ex))
end
y
end
end # module GP2
WS.jl 소스
module WS
using Random
using DataFrames
using CSV
mutable struct WindowSlider
w::Int # window_size - number of time steps to look back
o::Int # offset between last reading and temperature
r::Int # response_size - number of time steps to predict
l::Int # maximum length to slide - (#observation - w)
p::Int # final predictors - (#predictors * w)
names::Array{Symbol,1}
function WindowSlider(;window_size::Int=5)
w = window_size
o = 0
r = 1
l = 0
p = 0
names = Array{Symbol,1}(undef,0)
new(w,o,r,l,p,names)
end
end
function re_init(arr::Array{Float64,1})
cumsum!(arr,arr;dims=1)
return arr .- arr[1]
end
function collect_windows(ws::WindowSlider,X::DataFrame;window_size::Int=5, offset::Int=0,previous_y = false)
N, cols = size(X)
cols -= 1
ws.o = offset
ws.w = window_size
ws.l = N - (ws.w + ws.r) + 1
ws.p = previous_y ? (cols + 1) * ws.w : cols * ws.w
# create the names of the variables in the window
# Check first if we need to create that for the response itself
x = previous_y ? deepcopy(X) : select(X,Not(names(X)[end]))
for col in names(x)
for i in 1:ws.w
#= python에서는 Δt(1)을 컬럼 이름으로 사용하고 있으나
줄리아에서는 괄호등 특수 문자는 Symbol로 허용되지 않는다
줄리아의 DataFrame의 컬럼은 symbol이 사용되고 참조시에도
symbol이 사용된다.
예를 들어 df.Δt_1 은 하용되나 df.Δt(1)은 허용되지 않는다.
=#
# t(1),..,t(w1),x1(1),...,x1(w)
name = Symbol(col,"_",i)
push!(ws.names,name)
end
end
# Incorporate the timestamps where we want to predict
# t(w),..,t(w+r-1)
for k in 1:ws.r
name = Symbol("Δt","_",ws.w + k)
push!(ws.names, name)
end
push!(ws.names, :Y)
df = DataFrame(zeros(ws.l,ws.p + ws.r + 1), ws.names)
# Populate by rows in the new dataframe
for i in 1:ws.l
# 2차원 배열 0 크기의 cols개의 컬럼을 가진 배열로 초기화
slices = Array{Float64,1}(undef,0)
# Flatten the lags of predictions
_, cols = size(x)
# Julia에서 a[1:3] 이면 1 부터 3 까지 index의 3개의 값을 가져온다.
# Python에서는 1 부터 2까지 2개의 값을 가져온다.
for p in 1:cols
line = X[i:ws.w + i - 1,p]
# Reinitialization at every window for Δt
if p == 1
line = re_init(line)
end
# Concatenate the lines in one slice
push!(slices, line...)
end
# Incorporate the timestamps where we want to predict
# predict 라인의 즉 윈도우에서 w+r 번째의
line = re_init(X[i:ws.w + ws.r + i - 1, 1])[end]
y = X[ws.w + ws.r + i - 1, end]
push!(slices,line..., y...)
# Incorporate the slice to the cake(df)
df[i,:] .= slices
end
return df
end
function _data_set(;N::Int=600,window_size::Int=5, seed::Int=123)
Random.seed!(seed)
N = N
#t = Float64[0:N-1...]
t = Float64[0:N-1;]
t = t .+ rand(N) / 4
t = t .- rand(N) / 7
t = round.(t; digits=2)
x1 = round.(rand(N)*5;digits=2)
x2 = round.(rand(N)*5;digits=2)
x3 = round.(rand(N)*5;digits=2)
n = round.(rand(N)*2;digits=2)
x1_1 = circshift(x1,1) # t-1
x2_1 = circshift(x2,1) # t-1
x3_3 = circshift(x3,3) # t-3
y = @. log(abs(2 + x1))-x2_1^2 + 0.02*x3_3*exp(x1_1)
y = @. round(y+n,digits=2)
Δt = t[2:end] .- t[1:end-1]
insert!(Δt,1,0)
df = DataFrame([t Δt x1 x2 x3 y],[:t,:Δt,:x1,:x2,:x3,:y])
return df
end
struct Dataset
t::Array{Float64,1}
n_train::UnitRange{Int64}
n_test::UnitRange{Int64}
train::DataFrame
test::DataFrame
train_windows::DataFrame
test_windows::DataFrame
train_windows_y_inc::DataFrame
test_windows_y_inc::DataFrame
end
function data_set(;N::Int=600,window_size::Int=5, seed::Int=2018,
n_train=1:160,n_test=161:200)
df = _data_set(N=N,window_size=window_size,seed=seed)
t = df.t
train = df[n_train,2:end]
test = df[n_test,2:end]
train_windows = collect_windows(WindowSlider(),train,previous_y=false)
test_windows = collect_windows(WindowSlider(),test,previous_y=false)
train_windows_y_inc = collect_windows(WindowSlider(),train,previous_y=true)
test_windows_y_inc = collect_windows(WindowSlider(),test,previous_y=true)
return Dataset(t,n_train, n_test,
train,test,
train_windows,test_windows,
train_windows_y_inc,test_windows_y_inc)
end
function data_kospi(;N::Int=600,window_size::Int=5, seed::Int=2018,
n_train=1:160,n_test=161:200)
df = CSV.read("kospi.csv")
df2 = df[n_train[1]:n_test[end],:]
select!(df2, Not([:close,:date]))
# 다음날 종가
# next_day_close = df[n_train[2]:n_test[end]+1,end]
# _,nc = size(df2)
# insertcols!(df2,nc+1,:next_day_close=>next_day_close)
#=
각 컬럼 데이터를 정규화 한다.
단 train data의 max값으로 train 및 test 데이터를 정규화 한다.
즉 train시 적용되었던 조건을 test data에 적용한다.
=#
for i in 1:size(df2)[2]
c_max = max(df2[n_train,i]...)
df2[!,i] = df2[!,i]/c_max
end
t = Float64[(n_train[1]:n_test[end])...]
Δt=ones(n_test[end])
insertcols!(df2,1,:t=>t)
insertcols!(df2,2,:Δt=>Δt)
train= df2[n_train,2:end]
test= df2[n_test,2:end]
train_windows= collect_windows(WindowSlider() ,train,previous_y=false,window_size=window_size)
test_windows= collect_windows(WindowSlider(),test,previous_y=false,window_size=window_size)
train_windows_y_inc= collect_windows(WindowSlider(),train,previous_y=true,window_size=window_size)
test_windows_y_inc= collect_windows(WindowSlider(),test,previous_y=true,window_size=window_size)
return Dataset(t,n_train, n_test,
train,test,
train_windows,test_windows,
train_windows_y_inc,test_windows_y_inc)
end
end # module WS