Implementing multithreaded epoll

Posted on December 13, 2015
Tags:
by Sanchayan Maity

Finally I have some time to relax a bit and write about a few things now that my fall semester course ended. I had taken a Distributed Systems class where we were taught the basics of distributed systems, had some assignments, with the final assignment being evaluation of some of the distributed key-value stores. The most interesting thing I learned on my own was how to use epoll in a multithreaded design. Was actually trying to use the libevent library during my second assignment, however, due to my lack of understanding I could not get it to work the way I wanted. During my third assignment I tried using epoll directly, but, I wanted to do a multithreaded design. Searching on the internet did not get me any examples, only notes on how one might implement it. So I tried and finally got it working. This will probably be the first write up on the net to be talking about this. However I am no expert in such things, so if anyone has any suggestions or improvements to what I did, please do share in the comments or mail me directly.

One can clone the concerned repository with

$ git clone https://gitlab.com/SanchayanMaity/Multithreaded_Epoll.git

The first requirement is a workqueue implementation. During my work with libevent and looking for usable examples, I came across one which was being used with libevent by the code’s author Ron Cemer [1]. The second requirement was to have a good usable epoll example I could use which I found here [2].

[1]. http://roncemer.com/software-development/multi-threaded-libevent-server-example/
[2]. https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/

With the above I had a workqueue implementation and a usable epoll example, on which I could base my work upon. So let us jump to the thread which uses epoll and acts as the main event loop queueing up work for the workqueues to process.

efd = epoll_create1(0);
if (efd == -1)
	err_sys("Error with epoll_create1");

event.data.fd = listenfd;
event.events = EPOLLIN;
ret = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &event);
if (ret == -1)
	err_sys("Error with epoll_ctl");

/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof(event));

/* The event loop */
for ( ; ; )	{

	client_length = addr_length;

	n = epoll_wait(efd, events, MAXEVENTS, -1);
	for (i = 0; i < n; i++) {
	......
	}
}

The listen descriptor is created when the main loop starts. The listen descriptor is added to the list of descriptors epoll should wait on. After that we start our infinite loop, in which we block with epoll waiting for events on this listen descriptor. Once any event occurs on the listen descriptor, which in this case are the incoming connection requests, we iterate over the event list. There are three possibilites from here on. First is of course some error occured, so we check for errors, we also check if connections were closed with the EPOLLHUP and if it is not the event we registered for.

if ((events[i].events & EPOLLERR) ||
	(events[i].events & EPOLLHUP) ||
	(!(events[i].events & EPOLLIN))) {
	/*
	* An error has occured on this fd or the socket is not ready
	* for reading. Why were we notified then?
	*/
	printf("epoll error\n");
	close(events[i].data.fd);
	for (int j = 0; j < MAX_NO_OF_SERVERS; j++)
		if (events[i].data.fd == sconn[i].sockfd) {
			sconn[i].sockfd = -1;
			sconn[i].server_connected = false;
		}
	continue;
}

Second is an incoming connection request. In this case we accept the incoming connection, and then add the descriptor returned by accept to the descriptor list of events epoll should let us know about. This descriptor would be the one on which incoming and outgoing messages will be processed and send respectively.

if (listenfd == events[i].data.fd) {
	/*
	* We have a notification on the listening socket, which means
	* one or more incoming connections.
	*/
	while (1) {
		infd = accept(listenfd, client_address, &client_length);
		if (infd == -1) {
			if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
				/* We processed all incoming connections */
				break;
			}
			else {
				perror("Error with accept");
				break;
			}
		} else
			printf("Connection accepted\n");

		/*
		* Make the incoming socket non blocking and add it to
		* the lists of fds to monitor. In the future when the
		* read/write calls are made non blocking this will be
		* required.
		*/
		/*
		if (!make_socket_nonblocking(infd)) {
			perror("Could not make socket nonblocking");
			abort();
		}
		*/

		event.data.fd = infd;
		event.events = EPOLLIN | EPOLLERR | EPOLLHUP;
		ret = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
		if (ret == -1) {
			perror("Error with epoll_ctl");
			abort();
		}
	}
	continue;
}

Third case is an incoming request on one of the connections that was accepted in second step. In this case, we dynamically allocate a job and add it to the workqueue along with the respective data required. However before adding the job to the workqueue, we make sure to remove the descriptor which we will process, from the list of event descriptors epoll will monitor. See the comment in code.

else {

	if ((job = malloc(sizeof(* job))) == NULL) {
		perror("Failed to allocate memory for job object");
		continue;
	}

	if ((job_wq_data = malloc(sizeof(* job_wq_data))) == NULL) {
		perror("Failed to allocate memory for wq data");
		free(job);
		continue;
	}

	job_wq_data->sockfd = events[i].data.fd;
	job->job_function = process_peer_request;
	job->user_data = job_wq_data;

	/*
	* In a multi threaded environment epoll is not suppose to monitor
	* descriptors on which other threads are working. Ideally use of
	* the EPOLLONESHOT flag should have disabled it for the next epoll_wait
	* till the worker thread reenables it, however for some reason it seems
	* not to work. So manually delete the fd from being monitored by epoll
	* and add it back in the process_peer_request function after it finishes
	* working with the said descriptor. In future use EPOLLONESHOT after
	* investigation and finding the fix.
	*/
	ret = epoll_ctl(efd, EPOLL_CTL_DEL, events[i].data.fd, &events[i]);
	if (ret == -1)
		perror("Error with epoll_ctl DEL");

	/* Add the job for processing by workqueue */
	workqueue_add_job(&workqueue, job);
}

The above code snippets are from the dht.c file which is the primary file. The workqueue implementation can be seen and understood by going through the workqueue.c and workqueue.h files. Note that the function called by the workqueue will basically do the job of processing the incoming client request as per one’s protocol requirements. Workqueue initialisation is simple enough as below

/* Initialize work queue */
if (workqueue_init(&workqueue, NUMBER_OF_WQS)) {
	workqueue_shutdown(&workqueue);
	perror("Failed to create workqueue");
	goto free_tokens;
}

At the end of the request processing function, the descriptor which was passed to the function for processing needs to be added back to the list of descriptors epoll should monitor.

event.data.fd = connfd;
event.events = EPOLLIN;
ret = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event);
if (ret == -1)
	perror("Error with epoll_ctl ADD");
free(job_wq_data);
free(job);

The server thread with the epoll event loop will only be one and depending on the number of cores one would initialise the number of workqueues. So since I had a four core system, I had three workqueues and one thread with epoll event loop. This kind of a system can scale well instead of naive approaches like one thread or one process per client connection. Further work to be done are to get the EPOLLONESHOT flag to work as I am sure I was definitely doing something wrong while trying to use it and second would be using non blocking sockets instead of blocking one, however that will be quite a bit of work. Non blocking sockets are not so easy to handle.