场景:前端用户使用epoll事件,接受用户请求。后端使用zeromq请求后端服务器。
思路:将epoll加入zeromq的poll里面,以监听epoll是否有事件。如果是epoll的事件,则调用epoll_wait 获取事件的fd列表并处理。否则按照zeromq的方式处理。
zeromq 的poll用法参考:http://api.zeromq.org/4-0:zmq-poll 以及例子里面的:mspoller.cpp 这里重点不在epoll中,所以实现有些粗糙,自行加入非阻塞等等。
#include "zmq.hpp"
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#define MAX_EPOLL 50
int mybind(int port)
{
int sockfd;
struct sockaddr_in my_addr;
struct sockaddr_in their_addr;
int sin_size;
if((sockfd = socket(AF_INET,SOCK_STREAM,0))==-1) {
perror("create socket");
exit(1);
}
//初始化结构体,并绑定端口
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(port);
my_addr.sin_addr.s_addr = INADDR_ANY;
bzero(&(my_addr.sin_zero),8);
setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,&my_addr,sizeof(my_addr));
if(bind(sockfd,(struct sockaddr *)&my_addr,sizeof(struct sockaddr))==-1){
perror("bind port");
exit(1);
}
if(listen(sockfd,10)==-1){
perror("listen port");
exit(1);
}
return sockfd;
}
void process_request(int epfd,int client,zmq::socket_t & request)
{
char buf[255]={0};
int len=recv(client,buf,sizeof(buf),0);
printf("recv data %d\n",client);
if(len<=0){
struct epoll_event ev;
printf("close %d\n",client);
ev.events=EPOLLIN;
ev.data.fd=client;
ev.data.ptr=NULL;
epoll_ctl(epfd,EPOLL_CTL_DEL,client,&ev);
close(client);
return;
}
printf("recv %s\n",buf);
char sfd[32];
sprintf(sfd,"%d",client);
zmq::message_t msg_fd(strlen(sfd));
memcpy(msg_fd.data(),sfd,strlen(sfd));
zmq::message_t msg_empty;
zmq::message_t msg_data(len);
memcpy(msg_data.data(),buf,len);
request.send(msg_fd,ZMQ_SNDMORE);
request.send(msg_empty,ZMQ_SNDMORE);
request.send(msg_data);
}
void process_listen(int epfd,int listen_fd)
{
struct sockaddr_in their_addr;
socklen_t sin_size;
sin_size = sizeof(struct sockaddr_in);
int client=accept(listen_fd,(struct sockaddr *)&their_addr,&sin_size);
printf("accept %d\n",client);
if(client<0){
perror("accept socket");
exit(1);
}
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=client;
epoll_ctl(epfd,EPOLL_CTL_ADD,client,&ev);
}
void process_epoll_in(int epfd,int listen_fd,zmq::socket_t & request)
{
struct epoll_event evs[MAX_EPOLL]={0};
int nfds=epoll_wait(epfd,&evs[0],MAX_EPOLL,-1);
printf("epoll wait %d\n",nfds);
for(int i=0; i<nfds; i++){
printf("epoll ev %d %d\n",i,evs[i].data.fd);
if(listen_fd==evs[i].data.fd){
process_listen(epfd,listen_fd);
}else{
process_request(epfd,evs[i].data.fd,request);
}
}
}
int main (int argc, char *argv[])
{
zmq::context_t context(1);
zmq::socket_t responder(context, ZMQ_REP);
responder.bind("tcp://*:5560");
zmq::socket_t request(context, ZMQ_DEALER );
request.connect("tcp://localhost:5560");
//Initialize epoll
//
struct epoll_event ev;
int epfd=epoll_create(MAX_EPOLL);
int listen_fd=mybind(5561);
printf("bind %d\n",listen_fd);
ev.events=EPOLLIN;
ev.data.fd=listen_fd;
epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&ev);
// Initialize poll set
zmq::pollitem_t items [] = {
{ responder, 0, ZMQ_POLLIN, 0 },
{ request, 0, ZMQ_POLLIN, 0 },
{ 0, epfd, ZMQ_POLLIN, 0 }
};
while(1)
{
/* Poll for events indefinitely */
int rc = zmq_poll (items, 3, -1);
if (items [0].revents & ZMQ_POLLIN) {
//worker process
printf("responder recv\n");
zmq::message_t message;
responder.recv(&message);
responder.send(message);
}
if (items[1].revents & ZMQ_POLLIN) {
//request recv msg;
printf("request recv\n");
zmq::message_t message[3];
int index;
int more=1;
for(index=0;more && index<3; index++){
request.recv(&message[index]);
more=message[index].more();
}
if(index>=3){
int fd=atoi((char*)message[0].data());
send(fd,message[2].data(),message[2].size(),0);
}
}
if(items[2].revents & ZMQ_POLLIN){
process_epoll_in(epfd,listen_fd,request);
}
}
}
```cpp