#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <sys/ioctl.h> #include <string.h> #include <sys/types.h> #include <sys/socket.h> #include <sys/un.h> #include <netinet/in.h> #include <arpa/inet.h> #include <netdb.h> #include <signal.h> #include <errno.h> /* values for WORKER_INFO.status */ /* "worker" -> "manager" message */ #define WORKER_BUSY 0x01 #define WORKER_IDLE 0x02 /* "manager" -> "worker" message */ #define WORKER_CONT 0x03 #define WORKER_STOP 0x04 typedef struct worker_info { int pid; int status; } WORKER_INFO; int worker(int ls, int is) { int dsock; struct sockaddr_in client; unsigned int addrlen = sizeof(struct sockaddr_in); char * hello = "Hello, world! I'm dynamic worker pool server.\n"; char * bye = "Goodbye, world!\n"; int r; struct sockaddr_un from; socklen_t len = sizeof(struct sockaddr_un); WORKER_INFO msg; signal(SIGPIPE, SIG_IGN); msg.pid = getpid(); for (;;) { dsock = accept(ls, (struct sockaddr *)&client, &addrlen); if ( -1 == dsock) { perror("accept()\n"); continue; }; msg.status = WORKER_BUSY; r = sendto(is, &msg, sizeof(WORKER_INFO), 0, NULL, 0); if (-1 == r) { perror("sendto()"); }; // printf("got client\n"); /* serve client */ r = write(dsock, hello, strlen(hello)); if ( (-1 == r) || (!r)) { printf("write(dsock) #1\n"); goto error; }; r = sleep(10); /* kinda working ... */ if (r) { printf("interrupted sleep()\n"); }; r = write(dsock, bye, strlen(bye)); if ( (-1 == r) || (!r)) { printf("write(dsock) #2\n"); }; error: close(dsock); msg.status = WORKER_IDLE; r = sendto(is, &msg, sizeof(WORKER_INFO), 0, NULL, 0); if ( -1 == r ) { perror("sendto(is)"); }; /* check if we are going to be fired or not */ r = recvfrom(is, &msg, sizeof(WORKER_INFO), 0, (struct sockaddr*)&from, &len); if (msg.status == WORKER_STOP) { /* wait until manager kill us */ while(1) sleep(1); } }; } int lsock, isocks[2]; pid_t hire_worker() { pid_t pid; pid = fork(); switch (pid) { case -1: ;;; perror("fork()"); ;;; break; /* child process */ case 0: ;;; worker(lsock, isocks[1]); ;;; break; /* parent */ default: ;;; printf("worker (pid = %d) hired\n", pid); ;;; break; }; return pid; } void fire_worker(pid_t pid) { kill(pid, SIGTERM); printf("worker (pid = %d) fired\n", pid); } #define MIN_WORKERS 5 #define MAX_WORKERS 128 #define TCP_PORT 12345 struct sockaddr_in serv_addr; int total_workers = MIN_WORKERS; int idle_workers = MIN_WORKERS; int busy_workers = 0; int main(int argc, char *argv[]) { int r, p, pid; int yes = 1; /* create listening socket */ lsock = socket(PF_INET, SOCK_STREAM, 0); if ( -1 == lsock ) { perror("socket()"); exit(1); }; r = setsockopt(lsock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); if ( r == -1 ) { perror("setsockopt()"); exit(1); }; memset(&serv_addr, 0, sizeof(serv_addr)); serv_addr.sin_family = AF_INET; serv_addr.sin_addr.s_addr = INADDR_ANY; serv_addr.sin_port = htons(TCP_PORT); r = bind(lsock, (struct sockaddr*)&serv_addr, sizeof(serv_addr)); if ( -1 == r ) { perror("bind()"); exit(1); }; r = listen(lsock, MAX_WORKERS); if ( -1 == r ) { perror("listen()"); exit(1); }; /* create UNIX socket pair for internal use */ r = socketpair(PF_LOCAL, SOCK_DGRAM, 0, isocks); if ( -1 == r ) { perror("socketpair()"); exit(1); }; /* prefork some workers */ for (p = 0; p < MIN_WORKERS; p++) { pid = hire_worker(); }; /* now perform worker pool management */ signal(SIGCHLD, SIG_IGN); for (;;) { struct sockaddr_un from; socklen_t len = sizeof(struct sockaddr_un); WORKER_INFO msg; sleep(1); /* check if we have enough workers */ if ( (total_workers == busy_workers) && (total_workers < MAX_WORKERS) ) { /* no more idle workers, hire one... */ pid = hire_worker(); if (pid > 0) { total_workers++; idle_workers++; }; }; while (1) { memset(&msg, 0, sizeof(WORKER_INFO)); r = recvfrom(isocks[0], &msg, sizeof(WORKER_INFO), MSG_DONTWAIT, (struct sockaddr*)&from, &len); if ( -1 == r ) { if (errno != EAGAIN) perror("recvfrom()"); break; }; // printf("message from worker %d - status is %d\n", msg.pid, msg.status); if (msg.status == WORKER_BUSY) { busy_workers++; idle_workers--; }; if (msg.status == WORKER_IDLE) { idle_workers++; busy_workers--; /* check if we have too much workers */ if ( (total_workers > MIN_WORKERS) && (idle_workers > busy_workers) ) { msg.status = WORKER_STOP; r = sendto(isocks[0], &msg, sizeof(WORKER_INFO), 0, NULL, 0); fire_worker(msg.pid); total_workers--; idle_workers--; } else { msg.status = WORKER_CONT; r = sendto(isocks[0], &msg, sizeof(WORKER_INFO), 0, NULL, 0); }; }; }; printf("workers: (total/busy/idle) = %d/%d/%d\n", total_workers, busy_workers, idle_workers); }; }