#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/ioctl.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <signal.h>
#include <errno.h>
/* values for WORKER_INFO.status */
/* "worker" -> "manager" message */
#define WORKER_BUSY 0x01
#define WORKER_IDLE 0x02
/* "manager" -> "worker" message */
#define WORKER_CONT 0x03
#define WORKER_STOP 0x04
typedef struct worker_info {
int pid;
int status;
} WORKER_INFO;
int worker(int ls, int is)
{
int dsock;
struct sockaddr_in client;
unsigned int addrlen = sizeof(struct sockaddr_in);
char * hello = "Hello, world! I'm dynamic worker pool server.\n";
char * bye = "Goodbye, world!\n";
int r;
struct sockaddr_un from;
socklen_t len = sizeof(struct sockaddr_un);
WORKER_INFO msg;
signal(SIGPIPE, SIG_IGN);
msg.pid = getpid();
for (;;) {
dsock = accept(ls, (struct sockaddr *)&client, &addrlen);
if ( -1 == dsock) {
perror("accept()\n");
continue;
};
msg.status = WORKER_BUSY;
r = sendto(is, &msg, sizeof(WORKER_INFO), 0, NULL, 0);
if (-1 == r) {
perror("sendto()");
};
// printf("got client\n");
/* serve client */
r = write(dsock, hello, strlen(hello));
if ( (-1 == r) || (!r)) {
printf("write(dsock) #1\n");
goto error;
};
r = sleep(10); /* kinda working ... */
if (r) {
printf("interrupted sleep()\n");
};
r = write(dsock, bye, strlen(bye));
if ( (-1 == r) || (!r)) {
printf("write(dsock) #2\n");
};
error:
close(dsock);
msg.status = WORKER_IDLE;
r = sendto(is, &msg, sizeof(WORKER_INFO), 0, NULL, 0);
if ( -1 == r ) {
perror("sendto(is)");
};
/* check if we are going to be fired or not */
r = recvfrom(is, &msg, sizeof(WORKER_INFO), 0, (struct sockaddr*)&from, &len);
if (msg.status == WORKER_STOP) {
/* wait until manager kill us */
while(1)
sleep(1);
}
};
}
int lsock, isocks[2];
pid_t hire_worker()
{
pid_t pid;
pid = fork();
switch (pid) {
case -1:
;;;
perror("fork()");
;;;
break;
/* child process */
case 0:
;;;
worker(lsock, isocks[1]);
;;;
break;
/* parent */
default:
;;;
printf("worker (pid = %d) hired\n", pid);
;;;
break;
};
return pid;
}
void fire_worker(pid_t pid)
{
kill(pid, SIGTERM);
printf("worker (pid = %d) fired\n", pid);
}
#define MIN_WORKERS 5
#define MAX_WORKERS 128
#define TCP_PORT 12345
struct sockaddr_in serv_addr;
int total_workers = MIN_WORKERS;
int idle_workers = MIN_WORKERS;
int busy_workers = 0;
int main(int argc, char *argv[])
{
int r, p, pid;
int yes = 1;
/* create listening socket */
lsock = socket(PF_INET, SOCK_STREAM, 0);
if ( -1 == lsock ) {
perror("socket()");
exit(1);
};
r = setsockopt(lsock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if ( r == -1 ) {
perror("setsockopt()");
exit(1);
};
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(TCP_PORT);
r = bind(lsock, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if ( -1 == r ) {
perror("bind()");
exit(1);
};
r = listen(lsock, MAX_WORKERS);
if ( -1 == r ) {
perror("listen()");
exit(1);
};
/* create UNIX socket pair for internal use */
r = socketpair(PF_LOCAL, SOCK_DGRAM, 0, isocks);
if ( -1 == r ) {
perror("socketpair()");
exit(1);
};
/* prefork some workers */
for (p = 0; p < MIN_WORKERS; p++) {
pid = hire_worker();
};
/* now perform worker pool management */
signal(SIGCHLD, SIG_IGN);
for (;;) {
struct sockaddr_un from;
socklen_t len = sizeof(struct sockaddr_un);
WORKER_INFO msg;
sleep(1);
/* check if we have enough workers */
if ( (total_workers == busy_workers) && (total_workers < MAX_WORKERS) ) {
/* no more idle workers, hire one... */
pid = hire_worker();
if (pid > 0) {
total_workers++;
idle_workers++;
};
};
while (1) {
memset(&msg, 0, sizeof(WORKER_INFO));
r = recvfrom(isocks[0], &msg, sizeof(WORKER_INFO), MSG_DONTWAIT, (struct sockaddr*)&from, &len);
if ( -1 == r ) {
if (errno != EAGAIN)
perror("recvfrom()");
break;
};
// printf("message from worker %d - status is %d\n", msg.pid, msg.status);
if (msg.status == WORKER_BUSY) {
busy_workers++;
idle_workers--;
};
if (msg.status == WORKER_IDLE) {
idle_workers++;
busy_workers--;
/* check if we have too much workers */
if ( (total_workers > MIN_WORKERS) && (idle_workers > busy_workers) ) {
msg.status = WORKER_STOP;
r = sendto(isocks[0], &msg, sizeof(WORKER_INFO), 0, NULL, 0);
fire_worker(msg.pid);
total_workers--;
idle_workers--;
} else {
msg.status = WORKER_CONT;
r = sendto(isocks[0], &msg, sizeof(WORKER_INFO), 0, NULL, 0);
};
};
};
printf("workers: (total/busy/idle) = %d/%d/%d\n", total_workers, busy_workers, idle_workers);
};
}