본문 바로가기
카테고리 없음

공유 메모리를 이용한 프로세스간 데이터 공유 | pid_t, key_t

by 상레알 2010. 12. 27.
출처 : http://www.linuxquestions.org/questions/programming-9/pid_t-key_t-265740/
출처 : http://www.joinc.co.kr/modules/moniwiki/wiki.php/Site/system_programing/IPC/message_driven

/usr/include/bin/types.h

typedef int __pid_t; /* Type of process identifications. */

typedef int __key_t;

공유메모리 큐 구성을 통한 효과적인 데이터 처리 프로세스 구현

동일한 데이터를 여러번 복사해서 사용해야 하는 여러개의 단위 모듈로 이루어진 미들웨어 소프트웨어에서 가장 효율적인
데이터 교환에 대한 방법으로 공유메모리의 큐 구성에 대해 알아보도로한다.

1. 공유메모리는 (큐)버퍼의 역할이 가능해야 하므로 배열로 구성된다.
2. 데이터를 동기화 시킬 수 있어야 한다. 
3. 데이터 동기화를 위해서 각 단위 모듈(소비자)은 데이터 입력 부분과 처리부분이 분리되어야 한다.
  분리를 위해서 버퍼가 사용되어야 한다.

공유메모리르 통한 queue 버퍼의 구성이라고 생각하면 될것 같다. 다음과 같은(소프트웨어)시스템
구성을 가질 것이다.

 
이때 소비자는 각각의 버퍼를 유지 할 수 있어야 한다.



공유 메모리를 이용한 프로세스간 데이터 공유

미들웨어와 같은 규모가 있는 소프트웨어 만들어야 할경우 여러개의 다위 프로세스로 이루어지는 경우
가 많다. 기능별로 모듈화 하는 방식인대, 예를 들어 통신 모듈, 데이터 분석 모듈, 데이터 저장 모듈
, 시스템 감시 모듈.. 등으로 나누어 지는 경우다.

이렇게 하는 이유는 각 모듈별로 개발자를 할당할 수 있으며, 디버깅 및 유지보수가 수월하기 때문이다.
필요한 일만을 하기 때문에 문제가 발생할 여지도 적고, 문제가 생기더라도 전체 시스템이 멈추는 최악
의 상활을 피할 수 있기 때문이다.


데이터 공유시 발생 가능한 문제

이렇게 단위 모듈별로 소프트웨어 시스템이 구성될 경우 대략 다음과 같은 구성을 가지게 될것이다.


 
각 모듈간의 데이터 교환을 위해서 결국 IPC(interprocess communication, 프로세스간 통신)를 사용하게 될건데
선택하는 IPC에 따라서 입출력 관련 부하는 그렇다 치더라도, 동일한 데이터를 몇번이고 복사하는
문제가 발생한다.

문제해결을 위한 다양한 제안

공유 메모리 + 세마포어

가장 심플한 제안 같지만 몇가지 문제가 보인다. 단지 하나의 생산자와 하나의 소비자로 이루어진
경우에는 문제가 없지만 소비자가 여럿일 경우 세마포어 값을 어떻게 제어할 것이냐가 중요한 문제가 된다.

공유 메모리 + 세마포어의 핵심이라면 생산자가 데이터를 쌓을 때마다 세마포어 값을 1씩 증가 시키고,
소비자는 세마포어 값을 1씩 감소시키고 만약 0일 경우에는 생산자가 쓴 데이터가 없다는 가정하에 해당 영역
에서 기다리게 되는것이다. 그런데 소비자가 여럿일 ㅕㅇ우 과연 어느 시점에서 세마포어 값을 감소시켜야 하는지가
애매모호해 진다.

물론 세마포어설비를 소비자 갯수만큼 만들어서 제어하도록하는 등의 몇가지 방법이 있지만 복잡한 관계로 생각하지
않기로 했다. 가능하면 효율적으로 심플하게 작동하는 방법을 찾길 원하기 떄문이다.

공유 메모리 + 생산자 통지

공유메모리의 가장 앞부분에 현재 생산자가 어디에 쓰고 있는지를 남기는 방법이다. 그럼 소비자가 자기가 읽고 있는
데이터가 생산자의 쓰고 있는 부분과 일치하는 지를 알 수 있고 만약 일치한다면 기다리도록 하는 것이다.

심플한 방법이긴 한데, busy wait 상태에 놓이지 않도록 하는게 문제해결의 핵심일것 같다. 소비자가 생산자의
데이터 쓰는 시점을 기다리기 위해서 바쁘게 index값이 변하는지 검사하는것은 아무래도 좋은
방법이 아니기 때문이다. 이를 위해 다음의 해법을 생각해 보았다.

    - 소비자가 생산자으이 index와 일치하게 되어서 기다려야 할 경우 생산자로의 통지를 기다리도록 한다.
      생산자는 데이터를 쓰게 될경우 이를 통지해야 한다. 이럴 경우 통지를 위한 방법이 중요해진다.
      간단히 생각할 수 있는 방법은 signal을 이용하는 방법으로 소비자는 sigsuspend로 기다리고 생산자는
      kill등을 이용해서 시그널을 보내도록 하는 방법이다.

공유 메모리 + 파일 레코드 잠금

공유메모리가 담을 수 있는 원소의 크기만큼의 크기의 잠금 전용 임시 파일을 만든다. 생산자는 데이터의 인텍스
위치에 대응하는 파일의 레코드를 잠근다. 소비자는 데이터를 읽기전에 레코드의 잠금을 먼저 얻도록 코딩
한다. 그러면 소비자는 바쁜 대기상태에 놓이지 않으면서도 데이터의 존재 유무를 판단해서 데이터를 읽을 수 있다.

이 방법을 응용하는데 있어서 주의해야 할 점은 생산자가 다음 레코드 잠금을 얻기전에 소비자가 데이터를 읽어 버리는 경우다. 이 문제를 해결하기 위해서 생산자는 반드시 다음 레코드 잠금을 얻은 후 이전 레코드 잠금을 풀도록코딩해야한다. 

공유 메모리 + 파일 레코드 잠금 구현

이번에는 공유 메모리와 파일 레코드 잠금을 통해서 구현 하기로 했다.

아이디어는 간단하다. 생산자는 배열을 가지는 공유메모리와 공유메모리의 배열의 크기와 동일한 
전용 파일을 생성한다.생산자는 (공유메모리)배열에 데이터를 적으면 해당 배열의 첩자를 기준으로 
해서 파일의 레코드를 잠그게 된다. 10번째 배열에 썻다면 파일의 10번째 레코드를 잠그는 식이다.

잠근 레코드를 여는 시점은 생산자가 다음 레코드의 잠금을 얻었을 때가 된다. 이런식으로 어렵지 않게 
생산자와 소비자간 동기화를 이룰 수 있게 된다.
소비자는 제일 처음 시작되면 잠금 파일의 몇번째 레코드가 잠겨 있는지 확인할 수 있으므로 잠긴
레코드 영역을 검사하면 가장 최근 생산자가 데이터를 쓴 공유메모리의 위치를 얻을 수 있을 것이다.
소비자는 이 위치부터 데이터를 읽어 나가면 된다.

 

테스트 코드

아래의 코드는 아이디어의 적응 가능 여부를 확인하기 위한 테스트용 코드다. 공유메모리 관련함수들 shmget(2), shmat(2), shmdt(2)와 레코드 잠금과 관련해서 fcntl(2)이 사용된 외에 특별한 다른 시스템콜은 사용하지 않았다.

생산자


#include <sys/ipc.h>  
#include <sys/shm.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#define QUEUE_SIZE 10
// 생산자와 소비자간 공유할 데이터
struct data { char name[80]; };
struct flock lock, unlock;



int
lock_open(int fd, int index)
{
lock.l_start = index;
lock.l_type = F_WRLCK;
lock.l_len = 1;
lock.l_whence = SEEK_SET;
return
fcntl(fd, F_SETLKW, &lock);
}



int
lock_close(int fd, int index)
{
unlock.l_start = index;
unlock.l_type = F_UNLCK;
unlock.l_len = 1;
unlock.l_whence = SEEK_SET;
return
fcntl(fd, F_SETLK, &unlock);
}
void
lock_init()
{
lock.l_start = 0;
lock.l_type = F_WRLCK;
lock.l_len = 1;
lock.l_whence = SEEK_SET;
}
void
unlock_init()
{
unlock.l_start = 0;
unlock.l_type = F_UNLCK;
unlock.l_len = 1;
unlock.l_whence = SEEK_SET;
}
int
main()
{
int shmid;
int i = 0;
int offset = 0;
struct data *cal_num;
void *shared_memory;
struct data ldata;
int fd;
lock_init();
unlock_init(); // 잠금 파일을 생성한다.
if ((fd = open("shm_lock", O_CREAT|O_RDWR)) < 0)
{
perror("file open error ");
exit(0);
} // 파일을 공유메모리 큐의 크기만큼 만든다.
write(fd, (void *)'\0',
sizeof(char)*QUEUE_SIZE);// 공유메모리를 생성한다.
// 공유메모리의 크기는 QUEUE_SIZE * 원소의 크기가 된다.
shmid =
shmget((key_t)1234, sizeof(ldata)*QUEUE_SIZE, 0666|IPC_CREAT);
if (shmid == -1)
{
perror("shmget failed : ");
exit(0);
} // 공유할 메모리의 크기를 할당하고 이를 공유 메모리영역에 붙인다.
shared_memory = (void *)malloc(sizeof(ldata)*QUEUE_SIZE);
shared_memory = shmat(shmid, (void *)0, 0);
if (shared_memory == (void *)-1)
{
perror("shmat failed : ");

exit(0);
}
while(1)
{ // 공유할 데이터
sprintf(ldata.name, "write Data : %d\n",i); // 이건 디버깅용 출력물
printf("%d %s",(i==0)? QUEUE_SIZE - 1:i-1, ldata.name); // 레코드를 잠근다.
if(lock_open(fd, i)< 0)
{
perror("lock error");
} // 레코드 잠금을 얻었다면 // 이전 레코드의 잠금을 푼다.
if(lock_close(fd, (i==0)? QUEUE_SIZE - 1: i-1) < 0)
{
perror("flock error");
} // 공유메모리에 데이터를 쓴다.
memcpy((void *)shared_memory+offset, (void *)&ldata, sizeof(ldata));
sleep(1);
offset +=
sizeof(ldata);
i++; // 이건 순환 큐이다. 만약 큐의 크기를 모두 채웠다면
// offset과 인덱스 번호 i를 초기화 한다.
if (i == QUEUE_SIZE) {
i = 0; offset = 0;
}
}
}



소비자
 
#include <sys/ipc.h>
#include <sys/shm.h>
#include <errno.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#define QUEUE_SIZE 10





struct data { char name[80]; };
struct flock lock, unlock;
int
lock_open(int fd, int index)
{
lock.l_start = index;
lock.l_type = F_WRLCK;
lock.l_len = 1;
lock.l_whence = SEEK_SET;
return
fcntl(fd, F_SETLKW, &lock);
}
int lock_isopen(int fd, int index)
{
lock.l_start = index;
lock.l_type = F_WRLCK;
lock.l_len = 1;
lock.l_whence = SEEK_SET;
return
fcntl(fd, F_SETLK, &lock);
}
int
lock_close(int fd, int index)
{
unlock.l_start = index;
unlock.l_type = F_UNLCK;
unlock.l_len = 1;
unlock.l_whence = SEEK_SET;
return
fcntl(fd, F_SETLK, &unlock);
}
void
lock_init()
{
lock.l_start = 0;
lock.l_type = F_WRLCK;
lock.l_len = 1;
lock.l_whence = SEEK_SET;
}
void
unlock_init()
{
unlock.l_start = 0;
unlock.l_type = F_UNLCK;
unlock.l_len = 1;
unlock.l_whence = SEEK_SET;
}
int
main()
{
int shmid;
int i = 0;
int offset = 0;
int fd;
void *shared_memory;
struct data *ldata;
lock_init();
unlock_init();
if ((fd =
open("shm_lock", O_RDWR)) < 0)
{
perror("file open error ");
exit(0);
}
shmid =
shmget((key_t)1234, sizeof(struct data)*QUEUE_SIZE, 0666);
if (shmid == -1)
{
perror("shmget failed : ");
exit(0);
}
shared_memory = (void *)malloc(sizeof(ldata)*QUEUE_SIZE);
shared_memory = shmat(shmid, (void *)0, 0);
if (shared_memory == (void *)-1)
{
perror("shmat failed : ");
exit(0);
}
// 이 부분은 생산자가 가장 최근에 쓴 데이터의 인덱스를
// 찾아내기 위한 코드다.
// 잠금 파일의 레코드를 차례대로 검사하면서 잠금이 있는 부분을 검사한다.
// 잠금이 검사되면, 리턴한다.
while(1)
{
if(lock_isopen(fd, i)< 0)
{
if (errno == EAGAIN)
{
printf("Read index is %d %d %d\n", i, EAGAIN, errno);
fcntl(fd, F_GETLK, &lock); // 코드 1
offset =
sizeof(struct data)*i;
break;
}
else
{
printf("Init Error\n");
exit(0);
}
}
lock_close(fd, i);
i++;
if (i == QUEUE_SIZE)
{
printf("Server Error\n");
} } // 공유 메모리로 부터 데이터를 읽는다.
while(1) {
if (
lock_open(fd, i) < 0) { perror("flock error"); }
ldata = (struct data *)(shared_memory+offset); printf("%s",ldata->name);
lock_close(fd, i);
offset +=
sizeof(struct data);
i++; if (i == QUEUE_SIZE) { i = 0;offset = 0; } } }


문제점 및 해결 방안
여러가지 소비자 참자
여러개의 소비자가 참가할 경우 각 소비자 마다 파일의 잠금을 얻게 되므로 늦게 참가하는 소비자는 앞의
소비자의 잠금 때문에 최신의 데이터를 읽지 못하는 문제가 발생할 수 있다 .이 문제의 해결은 소비자
예제 소스 코드1을 통해서 해결한다. GETLK는 파일 잠금에 대한 어떠한 작동도 하지 않으면서 현재
얻은 파일 잠금을 해체한 프로세스의 PID를 가져올 수 있다.

우리는 해당 PID가 생산자의 PID인지 판다해서 적절한 행동을 취하면 된다. 만약 다른 소비자의 잠금이라면
fcntl을 SETLK로 호출하고, 생상자의 잠금이라면 SETLKW로 호출하면 될것이다.

생산자의 비정상 종료
생산자가 비정상 종료하는 일이 없도록 만들어 줘야 하겠지만, 프로그래머의 입장에서는 만약에 있을 지도
모르는 비정상 종료를 염두에 두어야 한다. 생산자가 비저앙 종료를 하게 되면, 해당 레코드에 대한 잠금이
풀려버리고, 소비자들은 잠금이 없는 상태에서 폭주(무한 순환)하게 될것이다. 이 문제의 가장간단한 해결 방법은 배열의 우너소 구조체의 가장 앞에 flag를 둬서 이 값을 체크하도록 하는 것이다. 예를 들어 생산자 99번째 배열에 데이터를 써다면 flag를 1로하고, 그 다음 데이터의 flag를 0
으로 하는 것이다. 그렇다면 만약의 경우 생산자가 죽어서 잠금이 풀려버리더라도 소비자는 flag값을 통해서 생산자가 지정상적으로 종료 했음을 인지할 수 있게 된다. flag는 int형으로 하면 될 것이다.

공유 메모리큐 라이브러리 제작
- 지금까지의 내용을 기본으로 일반적으로 사용가능한 공유 메모리 큐 라이브러리를 작성한다.
요구사항

- 일반적으로 사용가능
- 공유 메모리큐이 관리를 위한 구조체 정의(버전, 생산자 정보, 메모리 큐의 크기등을 명시)
- 모듈에서 사용가능한 큐 버퍼 사용
- 데이터의 읽는 부분과 처리 부분을 분리한다. 이를 위해서 버퍼를 구성할 수 있어야 한다.
- 효율적인 작동
- 가능하면 데이터 복사는 최소한으로 이루어지도로 제한해야 한다. 예를 들어서 모듈 내부에서도 데이터를 읽는 부분과 처리부분의 분리를 위해서 버퍼를 사용한다고 했는데, 가능하면 포인터
복사만이 이루어 지게 하는 등 데이터 복사를 최고화 시킬 필요하가 있다.

일반적인 구조


다음은 여기에서 만들 공유 메모리 큐 라이브러리와 각 모듈간의 구조를 나타낸 그림이다.


소비자(단위 모듈)은 공유 메모리 큐에서 데이터를 읽어 들이는 부분과 이를 처리하는 부분을 분리 시키도록 하며 분리를 위해서 중간에 버퍼를 둔다. 버퍼를 두어서 분리시키는 이유는 특별히 짧은 시간에 다량의 처리해야할 데이터가 들어오더라도 이를 버퍼에 담아 둠으로써 안정적으로 데이터를 처리할 수
있도록 하기 위함이다.

모듈 내부 버퍼 관리 클래스 : TQueue
일반구조를 보면 모듈 내부에서 입력부와 처리부를 분리하기 위해서 버퍼를 사용하고 있음을 알 수 있다.
이는 멀티 쓰레드 혹은 멀티 프로세스 구조로 모듈이 작성되어야 함을 의미한다. 멀터 프로세스 구조로 갈
경우 고려해야할 복잡한 문제가 있기 때문에 - IPC를 사용할 경우 데이터를 모두 복사해서 보내야하나느데, 이는 최초 데이터 복사를 제거하기 위해서 공유메모리를 사용한 의미를 무색케만든다ㅏ. 자식과 부 모간 공유메모리를 구성하는 방법도 있겠으나 너무 복잡하다. - 멀티 쓰레드 환경으로 구성하기로 했다.

버퍼관리 클래스는 쓰레드 전역적인 위치에 존재하며, Input/Output을 ㅟ한 두개의 메서드를 제공한다.
큐는 템플릿을 이용해서 구현하기로 한다. Input/Output 동기화를 위해서 내부적으로 mutex와 조건 변수 를 사용하게 된다.

template <typename T>
class TQueue { private : T *container;
unsigned int size;
public: }



공유메모리 관리 클래스 : ShmQue

라이브러리는 C++을 이용해서 작성하기로 했다. 아무래도 메서드와 데이터를 함께 관리할 수 있는게 큰 장점이고, 모듈 내부에서 관리되는 버퍼를 위해서도 STL의 컨테이너를 이용할수 있는등의 장점을 가지기 때문이다.