Julia에서 시계열 데이터 처리 예제 – 병렬처리 (with GP)

해당 제목에 대한 내용은 기존에 올린 “Julia에서 시계열 데이터 처리 예제”들을 참조 하면됩니다.

여기서는 다중 프로세스로 Genetic Programming(GP)을 병렬처리 하는 방법을 보여 줍니다.

다중 프로세스가 train, test할 데이터는 SharedArray로 공유 합니다.

아래 모듈은 “/home/shpark/julia_test/exproptimizatoin/examples” 폴더 있다고 가정
WS.jl : 연습용 데이터를 만드는 모듈 / 이 게시글에서는 WindowSlider 구조체만 사용함
GP2.jl: Genetic Programming 적용 모듈
아래에 소스를 게제 합니다.
그리고 GP2 활용은 jupyter notebook으로 되어 있습니다

구현소스(Julia Notebook – html 버전)

이 예제에서는 GP를 통해 f(x1,x2) = 0.9 x1^2 + 0.7 x1 x2 + 0.3 x2 의 근사식을 찾고 학습 되지 않은 데이터 대한
prediction을 합니다.

GP2.jl 소스

module GP2
using Distributed 
using Random
using ExprOptimization
using SharedArrays

mutable struct DGrammar
  grammar::Array{String,1}
  function DGrammar()
    grammar = Array{String,1}(undef,0)
    push!(grammar,"@grammar begin")
    push!(grammar,"end")
    new(grammar)
  end
end
#=
examples:
grmmar = DGrammar()
grmmar("R = R + R")
grmmar("R = R - R")
=#
function (g::DGrammar)(r::String)
   insert!(g.grammar,2,r)
end
#=
Return Grammar
grmmar()
=#
function (g::DGrammar)()
   eval(Meta.parse(join(g.grammar,"\n")))
end

mutable struct Vars
  v::Vector{String}
function Vars()
    v = Vector{String}(undef,0)
    new(v)
  end
end
function (vs::Vars)(s::String)
  push!(vs.v,s)
end
function (vs::Vars)()
  vars = "(["*join(vs.v,",")*"])"
end

function train(gp::GeneticProgram, grammar::Grammar, 
  train_x::Array{Float64,2},train_y::Array{Float64,1},nJob::Int64=1)
  #=
  nJob이 1 보다 크면 병렬처리
  =#
  if nJob > 1
    pids = addprocs(nJob)
    module_dir = "/home/shpark/julia_test/exproptimizatoin/examples"
    # everywhere 사용시 각 process에 값을 넘겨 주려면 interpolation으로 넘겨 줘야 한다.
    @everywhere push!(LOAD_PATH, $module_dir) 
    eval(macroexpand(Distributed,quote @everywhere using GP2 end))
    nr,nc = size(train_x)  
    shared_train_x = SharedArray{Float64}((nr,nc),pids=pids)
    shared_train_y = SharedArray{Float64}((nr,1),pids=pids)
    shared_train_x[:,:] = train_x
    shared_train_y[:,:] = train_y
      
    rps = [@spawnat i GP2._train_parallel(gp,grammar,
          shared_train_x,shared_train_y)  
        for i in pids]
    ms = map(x->fetch(rps[x]),1:length(rps))
    # nJob개의 프로세스에서 가져온 결과중 loss가 가장 작은
    # 개체를 골라 낸다.
    # m의 타입 및 내부 fields를 알려면
    # type확인 : typeof(m)
    # fields 확인 : fieldnames(typeof(m))
    r = ms[argmin([m.loss for m in ms ])]
    rmprocs(pids)    
  else
    r = _train(gp,grammar,train_x,train_y)
  end
  return r
end


function _train(gp::GeneticProgram, grammar::Grammar, 
  train_x::Array{Float64,2},train_y::Array{Float64,1})
    
  S = SymbolTable(grammar)
  
  function loss(tree::RuleNode, grammar::Grammar)
    ex = get_executable(tree,grammar)
    nr,nc = size(train_x)
    ΣΔy = 0 
    for r in 1:nr
      for c in 1:nc
        x = train_x[r,c]      
        S[Symbol("x",c)] = x        
      end
      y_fit = Core.eval(S,ex)      
      y = train_y[r]
      ΣΔy += (y - y_fit)^2
    end  
    los = sqrt(ΣΔy)/nr
  end    
  results_gp = optimize(gp, grammar, :R, loss,verbose=true)
  return results_gp  
end
  
function _train_parallel(gp::GeneticProgram, grammar::Grammar, 
  train_x::SharedArray,train_y::SharedArray)
  
  S = SymbolTable(grammar)
  
  function loss(tree::RuleNode, grammar::Grammar)
    ex = get_executable(tree,grammar)
    nr,nc = size(train_x)
    ΣΔy = 0 
    for r in 1:nr
      for c in 1:nc
        x = train_x[r,c]      
        S[Symbol("x",c)] = x        
      end
      y_fit = Core.eval(S,ex)      
      y = train_y[r]
      ΣΔy += (y - y_fit)^2
    end  
    los = sqrt(ΣΔy)/nr
  end    
  results_gp = optimize(gp, grammar, :R, loss,verbose=true)
  return results_gp
end

function predict(results_gp::ExprOptResult,grammar::Grammar,test_x::Array{Float64,2})
  #=
  Prediction 
  =#
  S = SymbolTable(grammar)
  ex = results_gp.expr
  nr,nc = size(test_x)
  y = Array{Float64,1}(undef,0)
  for r in 1:nr
    for c in 1:nc
      x = test_x[r,c]
      S[Symbol("x",c)] = x
    end
    push!(y, Core.eval(S,ex))
  end
  y
end    

end # module GP2

WS.jl 소스

module WS
using Random
using DataFrames
using CSV

mutable struct WindowSlider
  w::Int # window_size - number of time steps to look back
  o::Int # offset between last reading and temperature
  r::Int # response_size - number of time steps to predict
  l::Int # maximum length to slide - (#observation - w)
  p::Int # final predictors - (#predictors * w)
  names::Array{Symbol,1}
  function WindowSlider(;window_size::Int=5)
    w = window_size
    o = 0
    r = 1
    l = 0
    p = 0
    names = Array{Symbol,1}(undef,0)
    new(w,o,r,l,p,names)
  end
end

function re_init(arr::Array{Float64,1})
  cumsum!(arr,arr;dims=1)
  return arr .- arr[1]
end

function collect_windows(ws::WindowSlider,X::DataFrame;window_size::Int=5, offset::Int=0,previous_y = false)
  N, cols = size(X)
  cols -= 1
  ws.o = offset
  ws.w = window_size
  ws.l = N - (ws.w + ws.r) + 1
  ws.p = previous_y ? (cols + 1) * ws.w : cols * ws.w
  
  # create the names of the variables in the window
  # Check first if we need to create that for the response itself
  x = previous_y ? deepcopy(X) : select(X,Not(names(X)[end]))
  
  for col in names(x)
    for i in 1:ws.w
      #= python에서는 Δt(1)을 컬럼 이름으로 사용하고 있으나
      줄리아에서는 괄호등 특수 문자는 Symbol로 허용되지 않는다
      줄리아의 DataFrame의 컬럼은 symbol이 사용되고 참조시에도 
      symbol이 사용된다.
      예를 들어 df.Δt_1 은 하용되나 df.Δt(1)은 허용되지 않는다.
      =#
      # t(1),..,t(w1),x1(1),...,x1(w)
      name = Symbol(col,"_",i)
      push!(ws.names,name)
    end
  end  
  
  # Incorporate the timestamps where we want to predict
  # t(w),..,t(w+r-1)
  for k in 1:ws.r
    name = Symbol("Δt","_",ws.w + k)
    push!(ws.names, name)
  end
  push!(ws.names, :Y)
  
  df = DataFrame(zeros(ws.l,ws.p + ws.r + 1), ws.names) 
  
  # Populate by rows in the new dataframe
  for i in 1:ws.l    
    # 2차원 배열 0 크기의 cols개의 컬럼을 가진 배열로 초기화
    slices = Array{Float64,1}(undef,0)
    # Flatten the lags of predictions
    _, cols = size(x)
    # Julia에서 a[1:3] 이면 1 부터 3 까지 index의 3개의 값을 가져온다.
    # Python에서는 1 부터 2까지 2개의 값을 가져온다.
    for p in 1:cols
      line = X[i:ws.w + i - 1,p]
      # Reinitialization at every window for Δt
      if p == 1 
        line = re_init(line) 
      end
      
      # Concatenate the lines in one slice
      push!(slices, line...)
    end
    
    # Incorporate the timestamps where we want to predict
    # predict 라인의 즉 윈도우에서 w+r 번째의 
    line = re_init(X[i:ws.w + ws.r + i - 1, 1])[end]
    y = X[ws.w + ws.r + i - 1, end]
    push!(slices,line..., y...)
    
    # Incorporate the slice to the cake(df)
    df[i,:] .= slices
  end
  return df
end


function _data_set(;N::Int=600,window_size::Int=5, seed::Int=123)
  Random.seed!(seed)
  N = N
  #t = Float64[0:N-1...]
  t = Float64[0:N-1;]
  t = t .+ rand(N) / 4
  t = t .- rand(N) / 7
  t = round.(t; digits=2)
  x1 = round.(rand(N)*5;digits=2)
  x2 = round.(rand(N)*5;digits=2)
  x3 = round.(rand(N)*5;digits=2)
  n  = round.(rand(N)*2;digits=2)

  x1_1 = circshift(x1,1) # t-1
  x2_1 = circshift(x2,1) # t-1
  x3_3 = circshift(x3,3) # t-3  

  y = @. log(abs(2 + x1))-x2_1^2 + 0.02*x3_3*exp(x1_1)
  y = @. round(y+n,digits=2) 
  Δt = t[2:end] .- t[1:end-1]
  insert!(Δt,1,0)
  df = DataFrame([t Δt x1 x2 x3 y],[:t,:Δt,:x1,:x2,:x3,:y])
  return df
end

struct Dataset
  t::Array{Float64,1}
  n_train::UnitRange{Int64}
  n_test::UnitRange{Int64}
  train::DataFrame
  test::DataFrame
  train_windows::DataFrame
  test_windows::DataFrame
  train_windows_y_inc::DataFrame
  test_windows_y_inc::DataFrame
end

function data_set(;N::Int=600,window_size::Int=5, seed::Int=2018,
                n_train=1:160,n_test=161:200)
  
  df = _data_set(N=N,window_size=window_size,seed=seed)
  t = df.t
  train = df[n_train,2:end]
  test = df[n_test,2:end]  
  train_windows = collect_windows(WindowSlider(),train,previous_y=false)    
  test_windows = collect_windows(WindowSlider(),test,previous_y=false)
  train_windows_y_inc = collect_windows(WindowSlider(),train,previous_y=true)    
  test_windows_y_inc = collect_windows(WindowSlider(),test,previous_y=true)
  return Dataset(t,n_train, n_test,
    train,test,
    train_windows,test_windows,
    train_windows_y_inc,test_windows_y_inc)
end

function data_kospi(;N::Int=600,window_size::Int=5, seed::Int=2018,
                n_train=1:160,n_test=161:200)
  df = CSV.read("kospi.csv")  
  df2 = df[n_train[1]:n_test[end],:]
  select!(df2, Not([:close,:date]))
  # 다음날 종가
#   next_day_close = df[n_train[2]:n_test[end]+1,end]
#   _,nc = size(df2)
#   insertcols!(df2,nc+1,:next_day_close=>next_day_close)
  #=
  각 컬럼 데이터를 정규화 한다.
  단 train data의 max값으로 train 및 test 데이터를 정규화 한다.
  즉 train시 적용되었던 조건을 test data에 적용한다.
  =#  
  for i in 1:size(df2)[2]
    c_max = max(df2[n_train,i]...)
    df2[!,i] = df2[!,i]/c_max  
  end 
  t = Float64[(n_train[1]:n_test[end])...]
  Δt=ones(n_test[end])
  insertcols!(df2,1,:t=>t)
  insertcols!(df2,2,:Δt=>Δt)

  train= df2[n_train,2:end]
  test= df2[n_test,2:end]  
  train_windows= collect_windows(WindowSlider() ,train,previous_y=false,window_size=window_size)    
  test_windows= collect_windows(WindowSlider(),test,previous_y=false,window_size=window_size)
  train_windows_y_inc= collect_windows(WindowSlider(),train,previous_y=true,window_size=window_size)    
  test_windows_y_inc= collect_windows(WindowSlider(),test,previous_y=true,window_size=window_size)
  return Dataset(t,n_train, n_test,
    train,test,
    train_windows,test_windows,
    train_windows_y_inc,test_windows_y_inc)
end

end # module WS

댓글 달기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다