Computer

VHTS와 병렬 컴퓨팅: 3. job list 파일 공유를 이용한 병렬처리

Novelism 2022. 3. 28. 08:07

멀티 코어가 보급되기 이전에 등장한 병렬 컴퓨팅은 기본적으로 여러 컴퓨터를 사용하는 방식이었습니다.

그중 가장 대표적인 것이 MPI (massage passing interface)입니다.

여러 프로세스 사이에서, massage를 주고받는 인터페이스입니다. 이는 서로 다른 컴퓨터에서 실행 중인 프로세스에서도 성립합니다. massage라고 표현했지만, 거대 데이터가 될 수도 있습니다. 

mpi를 실행할 때는 mpirun 같은 명령어를 사용하고, 실행 시에 MPI common world로 노드들이 묶여서 프로세서가 실행됩니다. 실행 이후에 common world를 변화시킬 수 없습니다. 특정 node에서 에러가 발생 시, 전부 종료하고 다시 시작해야 합니다. 

 mpi 같은 메모리 상의 massage를 공유하는 방법 대신, 보조기억장치(HDD, SSD)의 파일을 공유해서 통신을 하는 방법이 있습니다. 이 방법은 한 컴퓨터에 속한 프로세스들 사이에서도 사용할 수 있고, 여러 컴퓨터에서도 공유되는 보조기억장치가 있다면 (NFS, 네트워크 파일 시스템 등) 사용할 수 있습니다. 또한 실행 시 common world를 만들 필요도 없고, 작업 중에 노드를 제거할 수도, 더 추가할 수도 있습니다. 에러가 난 노드만 정지시키고 다시 시작할 수도 있습니다. 

 

원리는 다음과 같습니다.

사용할 컴퓨터들에 공유 스토리지 (NFS)를 준비합니다. (lustre가 더 좋습니다만...)

참여할 프로세스들은 master와 workers 이 있습니다. 

 master가 하는 일은, 잡을 관리하는 일이고, workers는 잡을 수행하는 일입니다. 

 이때, 작업 리스트 파일을 공유하는 방식으로 서로 소통합니다. 

 먼저 master는 전체 잡 목록과, 할 일 목록 (todo list)을 작성합니다. (처음에 두 파일은 같습니다.)

 예를 들어, 동일한 연산을 파라미터를 바꿔가며 실행한다고 가정한다면, 할 일 목록 파일에는 파라미터들이 기입됩니다. 

 그리고 마스터는 잠이 듭니다. (주기적으로 깨어나도록)

  그리고 worker는 할 일 목록 파일에 접근해서, 할 일 목록에 내용이 있다면, 그중 가장 위에 있는 것을 하나 가져옵니다. 할 일 목록에서 진행 중 목록 (current list)으로 job을 이동시킵니다. (current list에 기록하고, todo list에서 삭제합니다.) 가능하면 진행자 이름도 기록하는 것이 좋습니다.

 그리고 worker는 그 job을 수행합니다. job 이 종료되면 완료 목록 (done list)에 기록합니다. (결괏값이나, 결과 값이 파일 위치도 함께 기록합니다.)

 그리고 todo list로 가서 위의 작업을 반복합니다. todo list가 더 이상 없으면 종료됩니다.

 master는 주기적으로 깨어나서, 완료 목록과 전체 잡 목록을 비교해서, 전체 잡이 다 끝나는지를 확인합니다.

 다 끝났다면, 잡을 취합한 후 종료됩니다. 

 

이 과정에서, 여러 프로세스가 동시에 하나의 파일에 접근해선 안됩니다. 그래서 fild lock을 사용합니다. 

코드는 다음과 같습니다. (더미코드, 실행 불가)

master.py

#!/usr/bin/env python
import sys
import os
from filelock import FileLock
import pandas as pd


def get_job_list(list_dir):
    job_idx_list = list()
    list_file = list_dir + '/list.txt'
    if not os.path.exists(list_file):
        return job_idx_list
    freeze_lock = FileLock('{}.lock'.format(list_file))
    with freeze_lock.acquire(timeout=30):
        with open(list_file, 'r') as fp:
            lines = fp.readlines()
        for line in lines:
            job_idx_list += [int(line.strip())]
    job_idx_list = sorted(job_idx_list)
    
    return job_idx_list


def set_job_list(job_idx_list, list_dir):
    list_file = list_dir + '/list.txt'
    freeze_lock = FileLock('{}.lock'.format(list_file))
    with freeze_lock.acquire(timeout=30):
        if os.path.exists(list_file):
            with open(list_file, 'r') as fp:
                lines = fp.readlines()
        else:
            lines = list()
        with open(list_file, 'w') as fp:
            for line in lines:
                fp.write(line)
            for job_idx in job_idx_list:
                line = job_idx + '\n'
                fp.write(line)
    return


def data_sampling(master_dir, todo_dir, file_size):
	field_separator = ','
    sep = field_separator
	remain_file = master_dir + '/' + 'remain.txt' 
    df = pd.read_csv(remain_file, sep=field_separator, header=0)
	num_data = df.shape[0]

    todo_list = list()
    num_file = int(np.ceil(num_data/file_size))
    for idx in range(0, num_file):
        job_idx = '%d' % (idx)
        ini = idx * file_size
        fin = (idx+1) * file_size
        df_todo = df[ini:fin]
        job_todo_file = todo_dir + '/' + job_idx + '.txt'

        df_todo.to_csv(job_todo_file, sep=sep, float_format='%.3f',
                           header=True, index=False)
        todo_list += [job_idx]

    set_job_list(todo_list, master_dir)
    set_job_list(todo_list, todo_dir)

    return 


def check_done(master_dir, done_dir):

    check = False

    master_job_idx_list = get_job_list(master_dir)
    done_job_idx_list = get_job_list(done_dir)
    master_job = list()
    done_job = list()

    for job_idx in master_job_idx_list:
        master_job += [job_idx]
    for job_idx in done_job_idx_list:
        done_job += [job_idx]

    master_job = set(master_job)
    done_job = set(done_job)
    running_job = master_job - done_job
    if len(running_job) == 0:
        check = True
    return check


def gather_result(master_dir, done_dir):

    result_file = master_dir + '/' + 'result.txt'

    all_data = list()
	field_separator = ','
    df_result = pd.read_csv(result_file,sep=field_separator, header=0)
        all_data.append(df_result)
    done_job_idx_list = get_job_list(done_dir)
    for job_idx in done_job_idx_list:
        job_done_file = done_dir + '/' + job_idx + '.txt'
        df_done = pd.read_csv(job_done_file, sep=field_separator, header=0)
        all_data.append(df_done)
    df = pd.concat(all_data, axis=0, ignore_index=True)
    sep = field_separator
    df.to_csv(result_file, sep=sep, float_format='%.3f', index=False)
    return


def master(master_dir, todo_dir, done_dir, sleep_cycle=60):

    check = False
        
    data_sampling(master_dir, todo_dir, file_size)
    while not check:
        time.sleep(sleep_cycle)
        check = check_done()
        
    gather_result(done_dir)
    line_out = 'end ' 
    print(line_out, flush=True)
    break


    return

def main():
    master(master_dir, todo_dir, done_dir)


if __name__ == "__main__":
    main()

 

worker.py

#!/usr/bin/env python
import sys
import os
from filelock import FileLock

def get_job_from_list(list_dir):
    list_file = list_dir + '/list.txt'
    if not os.path.exists(list_file):
        job_idx = None
        return job_idx
    freeze_lock = FileLock('{}.lock'.format(list_file))
    with freeze_lock.acquire(timeout=30):
        with open(list_file, 'r') as fp:
            lines = fp.readlines()
        if len(lines) == 0:
            job_idx = None
        else:
            job_idx = lines[0].strip()
            with open(list_file, 'w') as fp:
                for line in lines[1:]:
                    fp.write(line)
    return job_idx


def set_job_from_list(job_idx, list_dir):
    list_file = list_dir + '/list.txt'
    freeze_lock = FileLock('{}.lock'.format(list_file))
    with freeze_lock.acquire(timeout=30):
        if os.path.exists(list_file):
            with open(list_file, 'r') as fp:
                lines = fp.readlines()
        else:
            lines = list()
        with open(list_file, 'w') as fp:
            for line in lines:
                fp.write(line)
            line = job_idx + '\n'
            fp.write(line)
    return


def remove_job_from_list(job_idx, list_dir):
    list_file = list_dir + '/list.txt'
    freeze_lock = FileLock('{}.lock'.format(list_file))
    with freeze_lock.acquire(timeout=30):
        if os.path.exists(list_file):
            with open(list_file, 'r') as fp:
                lines = fp.readlines()
        with open(list_file, 'w') as fp:
            for line in lines:
                if line.strip() != job_idx:
                    fp.write(line)
    return


def get_and_set_job(todo_dir, current_dir):

    job_idx = get_job_from_list(todo_dir)
    if job_idx is None:
        return job_idx
    set_job_from_list(job_idx, current_dir)
    
    job_todo_file = todo_dir + '/' + job_idx + '.txt'
    job_current_file = current_dir + '/' + job_idx + '.txt' 

    os.replace(job_todo_file, job_current_file)
    return job_idx


def move_done(job_idx, current_dir, done_dir):

    remove_job_from_list(job_idx, current_dir)
    set_job_from_list(job_idx, done_dir)
    job_current_file = current_dir + '/' + job_idx + '.txt'
    os.remove(job_current_file)
    return job_idx

def working(todo_dir, current_dir, done_dir):
    pid = os.getpid()
    line_out = 'Start sub_dock pid: %d' % (pid)
    print(line_out, flush=True)

    while True:
        job_idx = get_and_set_job(todo_dir, current_dir)
        line_out = 'get a job: %s' % job_idx
        print(line_out, flush=True)
        if job_idx is None:
            line_out = 'End sub_dock pid %d' % pid
            print(line_out, flush=True)
            break

#		main job 
        run_work(job_idx)
        
        move_done(job_idx, current_dir, done_dir)
        line_out = 'done job: %s' % job_idx
        print(line_out, flush=True)

    return
    
def main():
    working(todo_dir, current_dir, done_dir)

if __name__ == "__main__":
    main()