SSブログ

並列化について その5 [C/C++関連]

前回、pthreadでOpenMPの動作のような仕組みを作ればよいと書いた。
そこでやり方を考えてみると、スレッドをはじめに起動したら、そのままにしておいて、
メインのスレッドから子スレッドに指示を出すやり方を行うようにすればいいと思われる。
つまり、

メインスレッド     子スレッド
   ↓
  初期化
   ↓
子スレッド起動     指示待ち(Spinlock) 
   ↓          ↓
子スレッド指示(1)  処理実行
   ↓          ↓
子スレッド終了待ち   処理終了指示
(Spinlock)       ↓
   ↓         指示待ち(Spinlock)
(1)へ以下ループ     
   ↓
子スレッド破棄
   ↓
  終了 

こんな感じかな。かなり端折ってしまったけど、ソース的には下記のよう。スレッドを待つ方法としては、interface変数をmutexでロックして、メインスレッドはinterfaceが0になるまでスピンロックで待つ。子スレッドは各処理が終了したら、interfaceをデクリメントし、interfaceが0になったらメインループはinterfaceを子スレッド数を書き込む。というやり方で同期を取っている。
さて実行結果は

time of elaps:77053.168000[msec]

あれ、メチャクチャ遅くなっている。まあ当たり前といったら当たり前で、全てのスレッドがスピンロックで待っているので、全てのコアが100%フル回転している。Quadコアでは5スレッドが走っている状態。しかも排他処理で競合しているので、こればっかりは仕方がない。各スレッドの同期について、スピンロックで待たない方法か、スピンロックでもCPUの利用率を下げる方法を考えなくてはならない。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

#include "Utils.h"

#define THREAD_NUM (4)
#define MAX_THREAD_NUM (4)

#ifdef LINUX
#include <sched.h>
#include <unistd.h>
pthread_mutex_t common_mutex;
pthread_mutex_t local_mutex[THREAD_NUM];
#endif


volatile int interface = THREAD_NUM;
enum {WAIT, GO, EXIT};
volatile int local_command[THREAD_NUM];

float *FDATA;
#define DATA_SIZE (100000000)

typedef struct _arg_t{
int id;
int thread_num;
} arg_t;

float ***FDATA_A;
float ***FDATA_B;
float ***FDATA_C;

#define NX 100
#define NY 300
#define NZ 300

arg_t thread_arg[THREAD_NUM];

void thread_func(void *arg);

int main(){
unsigned long long i,j,k;
// timer variables
unsigned int st,elaps;
unsigned long long iter;

pthread_t handle[THREAD_NUM];
pthread_mutex_init(&common_mutex, NULL);

FDATA_A = (float ***)malloc(NX * sizeof(float **));
FDATA_B = (float ***)malloc(NX * sizeof(float **));
FDATA_C = (float ***)malloc(NX * sizeof(float **));
FDATA_A[0] = (float **)malloc(NX * NY * sizeof(float *));
FDATA_B[0] = (float **)malloc(NX * NY * sizeof(float *));
FDATA_C[0] = (float **)malloc(NX * NY * sizeof(float *));

for( i = 0; i < NX; i++){
FDATA_A[i] = FDATA_A[0] + i * NY;
FDATA_B[i] = FDATA_B[0] + i * NY;
FDATA_C[i] = FDATA_C[0] + i * NY;
}

FDATA_A[0][0] = (float *)malloc(NX * NY * NZ * sizeof(float));
FDATA_B[0][0] = (float *)malloc(NX * NY * NZ * sizeof(float));
FDATA_C[0][0] = (float *)malloc(NX * NY * NZ * sizeof(float));
memset(FDATA_A[0][0],0,(NX * NY * NZ * sizeof(float)));
memset(FDATA_B[0][0],0,(NX * NY * NZ * sizeof(float)));
memset(FDATA_C[0][0],0,(NX * NY * NZ * sizeof(float)));

for( i = 0; i < NX; i++){
for( j = 0; j < NY; j++){
FDATA_A[i][j] = FDATA_A[0][0] + (i * NY + j ) * NZ;
FDATA_B[i][j] = FDATA_B[0][0] + (i * NY + j ) * NZ;
FDATA_C[i][j] = FDATA_C[0][0] + (i * NY + j ) * NZ;
}
}

for(i=0;i<THREAD_NUM;i++){
pthread_mutex_init(&local_mutex[i], NULL);
local_command[i] = WAIT;
}

for(i=0;i<THREAD_NUM;i++){
thread_arg[i].id = i;
thread_arg[i].thread_num = THREAD_NUM;
pthread_create(&handle[i], NULL, (void *)thread_func, (void *)&thread_arg[i]);
}
start_timer(&st);

// Main routine -- begin
for( iter = 0; iter < 1000; iter++){

for(i=0;i<THREAD_NUM;i++){
pthread_mutex_lock(&local_mutex[i]);
local_command[i] = GO;
pthread_mutex_unlock(&local_mutex[i]);
}
// reset interface 
while(interface != 0){
}
pthread_mutex_lock(&common_mutex);
interface = THREAD_NUM;
pthread_mutex_unlock(&common_mutex);
}
printf("finish main routine\n");
// Main routine -- End

// sent EIXT command to all threads
for(i=0;i<THREAD_NUM;i++){
pthread_mutex_lock(&local_mutex[i]);
local_command[i] = EXIT;
pthread_mutex_unlock(&local_mutex[i]);
}

// wait and finish  for all thread
for(i=0;i<THREAD_NUM;i++)
pthread_join(handle[i], NULL);
elaps = stop_timer(&st);
print_timer(elaps);

pthread_mutex_destroy(&common_mutex);
for(i=0;i<THREAD_NUM;i++){
pthread_mutex_destroy(&local_mutex[i]);
}

float sum=0.0;
for(i = 0; i < NX; i++){
for(j = 0; j < NY; j++){
for(k = 0; k < NZ; k++){
sum += FDATA_C[i][j][k];
}
}
}

printf("Sum:%lf\n",sum);

free(FDATA_A[0][0]);
free(FDATA_B[0][0]);
free(FDATA_C[0][0]);
free(FDATA_A[0]);
free(FDATA_B[0]);
free(FDATA_C[0]);
free(FDATA_A);
free(FDATA_B);
free(FDATA_C);

return 0;
}

void thread_func(void *arg){

int id = ((arg_t *)arg)->id;
int num = ((arg_t *)arg)->thread_num;
unsigned long long i,j,k;
volatile int local;
#ifdef LINUX
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(id, &mask);
if(sched_setaffinity(0, sizeof(mask), &mask) == -1)
printf("WARING: Faild to set CPU affinity ...(cpuid=%d)\n",id);
#endif
printf("Thread %d started on CPU %d.\n",id, sched_getcpu());
loop:
pthread_mutex_lock(&local_mutex[id]);
local = local_command[id];
pthread_mutex_unlock(&local_mutex[id]);
if(local == WAIT){
}
// Running main routine.
if(local == GO){
for(i = id; i < NX; i+=num){
for(j = 0; j < NY; j++){
for(k = 0; k < NZ; k++){
FDATA_C[i][j][k] = FDATA_A[i][j][k] 
+ FDATA_B[i][j][k] - j;
FDATA_A[i][j][k] = FDATA_B[i][j][k] 
+ FDATA_C[i][j][k] - i;
FDATA_B[i][j][k] = FDATA_C[i][j][k] 
- FDATA_A[i][j][k] + k;
}
}
}

pthread_mutex_lock(&local_mutex[id]);
local_command[id] = WAIT;
pthread_mutex_unlock(&local_mutex[id]);
pthread_mutex_lock(&common_mutex);
interface--;
pthread_mutex_unlock(&common_mutex);

goto loop;
} // end of locall == GO

if(local == EXIT) goto end;

goto loop;

end:
pthread_mutex_lock(&common_mutex);
interface--;
printf("Thread %d finished. interface:%d\n",id, interface);
pthread_mutex_unlock(&common_mutex);
}


nice!(0)  コメント(0)  トラックバック(0) 

nice! 0

コメント 0

コメントを書く

お名前:
URL:
コメント:
画像認証:
下の画像に表示されている文字を入力してください。

トラックバック 0

トラックバックの受付は締め切りました

この広告は前回の更新から一定期間経過したブログに表示されています。更新すると自動で解除されます。