1、Libevent
1.1 简介
Libevent 是一个用C语言编写的、轻量级的开源高性能事件驱动网络库。
基本的socket编程是阻塞/同步的,每个操作除非已经完成或者出错才会返回,这样对于每一个请求,要使用一个线程或者单独的进程去处理,系统资源没法支撑大量的请求,于是各系统分别提出了基于异步/callback的系统调用,例如Linux的epoll,BSD的kqueue,Windows的IOCP。由于在内核层面做了支持,所以可以用O(1)的效率查找到active的fd。
libevent就是对这些高效IO的封装,提供统一的API,简化开发。
1.2 特点
libevent是一个用于开发可扩展性网络服务器的基于事件驱动(event-driven)模型的网络库。Libevent有几个显著的特点:
(1)事件驱动(event-driven),高性能;
(2)轻量级,专注于网络,不如 ACE 那么臃肿庞大;
(3)跨平台,支持 Windows、Linux、*BSD和 Mac Os;
(4)支持多种I/O多路复用技术,epoll、poll、dev/poll、select和kqueue等;
(5)支持I/O,定时器和信号等事件;
(6)采用Reactor模式;
libevent实现高并发的关键在于其基于事件驱动的异步模型。主要实现方式包括:
- 事件驱动: libevent使用事件循环机制,通过监听和响应事件来实现非阻塞IO操作。当有事件发生时,libevent会调用相应的回调函数进行处理,从而避免了传统同步阻塞IO模型中的线程阻塞和资源浪费。
- 多路复用: libevent利用操作系统提供的多路复用机制(如epoll、kqueue、select等),将多个IO操作事件集中管理,实现在单线程或少量线程上处理大量并发连接的能力。这样可以避免每个连接都需要一个线程的资源消耗,从而提高了系统的并发性能。
- 缓冲区管理: libevent的bufferevent模块提供了高效的缓冲区管理,可以在网络IO中缓冲数据,以减少系统调用次数和数据拷贝,从而提升性能。
- 非阻塞IO: libevent使用非阻塞IO操作,通过设置套接字为非阻塞模式,使得IO操作不会阻塞整个进程或线程,提高了处理并发连接的效率。
1.3 下载安装
这里下载的是libevent-2.1.11-stable.tar.gz
tar -zxvf libevent-2.1.11-stable.tar.gz
cd libevent-2.1.11-stable/
./configure --prefix=`pwd`/install #检查安装环境,指定安装路径,生成makefile
make
make install
2、事件驱动
2.1 简介
事件驱动(Event-Driven)是一种编程范式,系统中的各种操作是通过事件的发生来驱动的。事件驱动编程模型通过事件监听器(Listener)和事件处理器(Handler)来响应和处理各种事件。
事件驱动模型的核心是通过事件(Event)来触发操作。事件可以是用户的操作(如鼠标点击、键盘输入),也可以是系统内部的状态变化(如定时器事件、网络数据到达)。事件驱动编程通过事件循环(Event Loop)不断地等待和处理事件,从而实现系统的响应和交互。
2.2 组成部分
- 事件:事件是系统中的一种信号,表示某种操作或状态的发生。事件可以携带额外的信息,如鼠标位置、键盘按键值、网络数据等。
- 事件源(Event Source):事件源是事件的产生者,可以是用户输入设备、网络连接、定时器等。
- 事件监听器(Event Listener):事件监听器是一个回调函数或方法,用于监听和捕获特定类型的事件。当事件发生时,事件监听器会被触发并调用相应的事件处理器。
- 事件处理器(Event Handler):事件处理器是实际处理事件的逻辑,包含具体的业务操作。
- 事件循环(Event Loop):事件循环是事件驱动模型的核心,负责不断地等待和分发事件。事件循环通常是一个无限循环,通过检查事件队列或事件多路复用器来获取事件,并调用相应的事件处理器。
2.3 工作流程
- 事件注册:应用程序将事件和对应的事件监听器注册到事件源。
- 事件产生:当事件源产生事件时,事件被放入事件队列中等待处理
- 事件分发:事件循环不断地从事件队列中取出事件,将事件分发给相应的事件监听器。
- 事件处理:事件监听器调用事件处理器,执行相应的业务逻辑。
- 事件循环继续:事件处理完成后,事件循环继续等待和处理下一个事件。
3、Reactor模式
3.1 简介
Reactor模式是一种高效的事件驱动设计模式,广泛应用于高性能服务器和网络应用中。它通过将事件的检测和分发与实际的事件处理逻辑分离,提供了一种高效的多任务并发处理方法。
3.2 事件处理机制
普通函数调用的机制:
- 函数调用:调用一个函数,传递必要的参数。
- 执行函数:程序执行进入函数体,按顺序执行函数内的代码。
- 返回结果:函数执行完毕后返回结果,程序继续执行调用处之后的代码。
- 阻塞执行:在函数执行期间,调用线程会被阻塞,直到函数返回结果。
而事件驱动机制不同,程序不是主动的调用某个API完成处理,恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”
3.3 组成部分
事件源(Event Source): Linux上是文件描述符,Windows上是Socket或者Handle,统一称为“句柄集”,程序在指定的句柄上注册关心的事件。
事件多路复用器(Event Demultiplexer):使用操作系统提供的多路复用机制(如select、poll、epoll、kqueue)来监听多个事件源。程序首先将其关心的句柄(事件源)及其事件注册到event demultiplexer上; 当有事件到达时,event demultiplexer会通知程序处理事件。
反应器 Reactor:事件管理的接口,内部使用event demultiplexer注册、注销事件,并运行事件循环,当有事件进入“就绪”状态时,调用注册事件的回调函数处理事件。(libevent中的event_base结构体)
事件处理器(Event Handler):提供了一组接口(回调函数),每个接口对应了一种类型的事件,供Reactor在相应的事件发生时调用,执行相应的事件处理。
3.4 工作流程
4、基本使用
4.1 事件处理流程
- 应用程序准备并初始化event,设置事件类型和回调函数
- 向libevent添加事件event。对于定时事件,libevent使用一个小根堆管理,key为超时时间;对于Signal和I/O事件,libevent将其放入到等待链表(wait list)中,这是一个双向链表
- 程序调用event_base_dispatch()系列函数进入无限循环,等待事件
5、源码分析
5.1 event_base
libevent默认情况下是单线程的(可以配置成多线程),每个线程有且只有一个event_base,对应一个struct event_base结构体(以及附于其上的事件管理器),用来托管给它的一系列event,可以和操作系统的进程管理类比,当一个事件发生后,event_base会在合适的时间(不一定是立即)去调用绑定在这个事件上的函数,直到这个函数执行完,再返回其他事件。
event_base结构体就是Reactor框架中的反应器Reactor。
结构体声明位于event-internal.h文件中。
//创建一个event_base
struct event_base *base = event_base_new();
assert(base != NULL);
event_base内部有一个循环,循环阻塞在epoll/kqueue等系统调用上,直到有事件发生,然后去处理这些事件。
启动event_base的循环,这样才能开始处理发生的事件。循环的启动使用event_base_dispatch,循环将一直持续,直到不再有需要关注的事件,或者是遇到event_loopbreak()/event_loopexit()函数。
//启动事件循环
event_base_dispatch(base);
5.2 事件event
event是整个libevent的核心。event就是Reactor框架中的事件处理程序组件。它提供了函数接口,供Reactor在事件发生时调用,以执行相应的事件处理。
5.2.1. 事件类型
libevent 支持以下几种主要的事件类型:
- I/O 事件:用于监视文件描述符(如套接字)的可读、可写状态。
- 定时事件:在指定的时间间隔之后触发。
- 信号事件:用于处理 UNIX 信号。
5.2.2 创建event
每个事件对应一个struct event,struct event使用event_new来创建和绑定,使用event_add来启用:
//创建并绑定一个event
struct event *event;
//参数:event_base, 监听的fd,事件类型及属性,绑定的回调函数,给回调函数的参数
event = event_new(base, listener, EV_READ|EV_PERSIST, callback_func, (void*)base);
//参数:event,超时时间(struct timeval *类型的,NULL表示无超时设置)
event_add(event, NULL);
libevent支持的事件及属性包括(使用bitfield实现,所以要用 | 来让它们合体)
(1) EV_TIMEOUT: 超时
(2) EV_READ: 只要网络缓冲中还有数据,回调函数就会被触发
(3) EV_WRITE: 只要塞给网络缓冲的数据被写完,回调函数就会被触发
(4) EV_SIGNAL: POSIX信号量
(5) EV_PERSIST: 不指定这个属性的话,回调函数被触发后事件会被删除
(6) EV_ET: Edge-Trigger边缘触发,参考EPOLL_ET
5.2.3 管理event
每次当有事件event转变为就绪状态时,libevent就会把它移入到active event list[priority]中,其中priority是event的优先级。
libevent会根据自己的调度策略选择就绪事件,调用其callback()函数执行事件处理;并根据就绪的句柄和事件类型填充callback函数的参数
5.3 事件主循环
在libevent中,事件主循环的作用就是执行一个循环,在循环中监听事件以及超时的事件并且将这些激活的事件进行处理。libevent提供了对用户开放了两种执行事件主循环的函数:
int event_base_dispatch(struct event_base *);
int event_base_loop(struct event_base *, int);
在event_base_dispatch函数中,实际上调用的是event_base_loop(event_base, 0)。
event_base_loop()函数的定义:
int event_base_loop(struct event_base *base, int flags){
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
struct timeval *tv_p;
int res, done;
// 清空时间缓存
base->tv_cache.tv_sec = 0;
// evsignal_base是全局变量,在处理signal时,用于指名signal所属的event_base实例
if (base->sig.ev_signal_added)
evsignal_base = base;
done = 0;
while (!done) { // 事件主循环
// 查看是否需要跳出循环,程序可以调用event_loopexit_cb()设置event_gotterm标记
// 调用event_base_loopbreak()设置event_break标记
if (base->event_gotterm) {
base->event_gotterm = 0;
break;
}
if (base->event_break) {
base->event_break = 0;
break;
}
// 校正系统时间,如果系统使用的是非MONOTONIC时间,用户可能会向后调整了系统时间
// 在timeout_correct函数里,比较last wait time和当前时间,如果当前时间< last wait time
// 表明时间有问题,这是需要更新timer_heap中所有定时事件的超时时间。
timeout_correct(base, &tv);
// 根据timer heap中事件的最小超时时间,计算系统I/O demultiplexer的最大等待时间
tv_p = &tv;
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base, &tv_p);
} else {
// 依然有未处理的就绪时间,就让I/O demultiplexer立即返回,不必等待
// 下面会提到,在libevent中,低优先级的就绪事件可能不能立即被处理
evutil_timerclear(&tv);
}
// 如果当前没有注册事件,就退出
if (!event_haveevents(base)) {
event_debug(("%s: no events registered.", __func__));
return (1);
}
// 更新last wait time,并清空time cache
gettime(base, &base->event_tv);
base->tv_cache.tv_sec = 0;
// 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select等;
// 在evsel->dispatch()中,会把就绪signal event、I/O event插入到激活链表中
res = evsel->dispatch(base, evbase, tv_p);
if (res == -1)
return (-1);
// 将time cache赋值为当前系统时间
gettime(base, &base->tv_cache);
// 检查heap中的timer events,将就绪的timer event从heap上删除,并插入到激活链表中
timeout_process(base);
// 调用event_process_active()处理激活链表中的就绪event,调用其回调函数执行事件处理
// 该函数会寻找最高优先级(priority值越小优先级越高)的激活事件链表,
// 然后处理链表中的所有就绪事件;
// 因此低优先级的就绪事件可能得不到及时处理;
if (base->event_count_active) {
event_process_active(base);
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
// 循环结束,清空时间缓存
base->tv_cache.tv_sec = 0;
event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}
5.4 bufferevent
5.4.1 简介
bufferevent专门为封装成带有缓冲区的socket套接字。一个bufferevent包含了一个底层传输的fd(通常为socket),一个输入buffer和一个输出buffer。每个 bufferevent 有两个与数据相关的回调:读取回调函数和写入回调函数。 bufferevent中回调函数触发的条件与普通的event回调函数触发的条件不同( 普通的事件在底层传输端口已经就绪,可读或可写时就会执行回调函数,而 bufferevent 是在读取或写入了一定量的数据后才会调用回调函数)。
struct bufferevent结构体定义:
//bufferevent_struct.h文件
struct bufferevent {
struct event_base *ev_base; //执行ev_base,最后需要将socket fd添加进ev_base。
//操作结构体,成员有一些函数指针。类似struct eventop结构体
const struct bufferevent_ops *be_ops;
struct event ev_read;//读事件event
struct event ev_write;//写事件event
struct evbuffer *input;//读缓冲区
struct evbuffer *output; //写缓冲区
struct event_watermark wm_read;//读水位
struct event_watermark wm_write;//写水位
bufferevent_data_cb readcb;//可读时的回调函数指针
bufferevent_data_cb writecb;//可写时的回调函数指针
bufferevent_event_cb errorcb;//错误发生时的回调函数指针
void *cbarg;//回调函数的参数
struct timeval timeout_read;//读事件event的超时值
struct timeval timeout_write;//写事件event的超时值
/** Events that are currently enabled: currently EV_READ and EV_WRITE
are supported. */
short enabled;
};
5.4.2 水位线
我们刚刚说bufferevent两个数据相关的回调:一个读取回调和一个写入回调,bufferevent 是在读取或写入了一定量的数据后才会调用回调函数。通过调整bufferevent的读取和写入“水位(watermarks)”可以覆盖这些函数的默认行为。
每个bufferevent有四个水位:
读取低水位:读取操作使得输入缓冲区的数据量在此级别或者更高时,读取回调将被调用。默认值为0,所以每个读取操作都会导致读取回调被调用。
读取高水位:输入缓冲区中的数据量达到此级别后,bufferevent将停止读取,直到输入缓冲区中足够量的数据被抽取,使得数据量低于此级别。默认值是无限,所以永远不会因为输入缓冲区的大小而停止读取。
写入低水位:写入操作使得输出缓冲区的数据量达到或者低于此级别时,写入回调将被调用。默认值是0,所以只有输出缓冲区空的时候才会调用写入回调。
写入高水位:bufferevent没有直接使用这个水位。它在bufferevent用作另外一个bufferevent的底层传输端口时有特殊意义。
5.4.3 evbuffer
bufferevent采用evbuffer作为输入输出缓存。evbuffer像是一个字节队列,在队列的末尾写入数据,在队列的头部读取数据。evbuffer具体实现则是一个链表,链表中的每个节点都是一块连续的内存块,往evbuffer写数据时(调用evbuffer_add/evbuffer_add_printf等函数),evbuffer内部动态创建链表节点,并紧凑的写入数据(一个节点写满后,再写另外一个节点);从evbuffer中删除数据时(调用evbuffer_remove/evbuffer_drain),从链表头部节点开始读取,当一个节点的数据被全部读取后删除该节点,如果未读取完,则用标示记录数据已读取(删除)的位置。对于这种头部有数据被标示为读取(删除)的节点,再次写入数据时,可能会进行调整,即将数据部分整体往前拷贝移动,然后再继续写入数据。
关于evbuffer和bufferevent的详细介绍可以参考:http://t.csdnimg.cn/rit2P
6、 示例代码
6.1 客户端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <getopt.h>
#include <fcntl.h>
#include <signal.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/util.h>
#include <arpa/inet.h>
#define IP "127.0.0.1"
#define PORT 8888
void event_cb(struct bufferevent *bev, short events, void *arg)
{
if( events & BEV_EVENT_EOF )
{
printf("Connection closed\n");
}
else if( events & BEV_EVENT_ERROR )
{
printf("Connection error:%s\n", strerror(errno));
}
}
void send_data(evutil_socket_t fd, short events, void *arg)
{
struct bufferevent *bev = (struct bufferevent *)arg;
char buf[32] = "Hello, world!";
printf("%s\n", buf);
bufferevent_write(bev, buf, strlen(buf));
}
int main (int argc, char **argv)
{
struct sockaddr_in addr;
int len = sizeof(addr);
struct event_base *base = NULL;
struct bufferevent *bev = NULL;
struct timeval tv;
struct event *ev = NULL;
base = event_base_new();
if( !base )
{
printf("event_base_new() failure\n");
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
inet_aton(IP, &addr.sin_addr);
bev = bufferevent_socket_new(base, -1, BEV_OPT_CLOSE_ON_FREE);
if( !bev )
{
printf("Could not create bufferevent\n");
event_base_free(base);
return -2;
}
bufferevent_setcb(bev, NULL, NULL, event_callback, NULL);
bufferevent_enable(bev, EV_READ | EV_WRITE);
bufferevent_socket_connect(bev, (struct sockaddr *)&addr, len);
tv.tv_sec = 3;
tv.tv_usec = 0;
ev = event_new(base, -1, EV_PERSIST, send_data, bev);
event_add(ev, &tv);
event_base_dispatch(base);
event_free(ev);
bufferevent_free(bev);
event_base_free(base);
return 0;
}
6.2服务器端
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <errno.h>
#include <string.h>
#include <getopt.h>
#include <fcntl.h>
#include <signal.h>
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <event2/util.h>
#include <event2/listener.h>
#include <arpa/inet.h>
#define PORT 8888
void read_cb(struct bufferevent *bev, void *arg)
{
char buf[32];
int len;
while( (len = bufferevent_read(bev, buf, sizeof(buf))) > 0 )
{
buf[len] = '\0';
printf("%s\n", buf);
}
}
void event_cb(struct bufferevent *bev, short events, void *arg)
{
if( events & BEV_EVENT_EOF )
{
printf("Connection closed\n");
}
else if( events & BEV_EVENT_ERROR )
{
printf("Connection error:%s\n", strerror(errno));
}
}
void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int len, void *arg)
{
struct event_base *base = (struct event_base *)arg;
struct bufferevent *bev = NULL;
bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
if( !bev )
{
printf("bufferevent_socket_new() failure:%s\n", strerror(errno));
close(fd);
return ;
}
bufferevent_setcb(bev, read_cb, NULL, event_callback, NULL);
bufferevent_enable(bev, EV_READ | EV_WRITE);
}
int main (int argc, char **argv)
{
struct sockaddr_in addr;
struct event_base *base = NULL;
struct evconnlistener *listener = NULL;
base = event_base_new();
if( !base )
{
printf("event_base_new() failure\n");
return -1;
}
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(PORT);
addr.sin_addr.s_addr = htonl(INADDR_ANY);
listener = evconnlistener_new_bind(base, listener_cb, base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr *)&addr, sizeof(addr));
if( !listener )
{
printf("Can't create a listener\n");
return -2;
}
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
标签:struct,简介,bufferevent,base,事件,libevent,event
From: https://www.cnblogs.com/LiBlog--/p/18327543