首页 > 其他分享 >【libevent】bufferevent的并发访问问题

【libevent】bufferevent的并发访问问题

时间:2024-07-10 14:30:22浏览次数:15  
标签:struct bufferevent 并发 session cb libevent bev event

一、问题

在使用libevent实现websocket服务器时,发生了并发访问的问题。

服务器程序功能主要包括实时响应Websocket客户端的控制请求,同时发送温度到客户端。

现象:

不加上温度发送功能时,程序正常运行

加上温度发送功能后,就会出现段错误,而且检查后发现bufferevent并不为空

二、原因

在我的代码中,temp_cb()用于发送温度,read_cb()用于读取客户端发送来的数据(包括连接请求和led控制等),在没有加上温度发送功能前,程序中bufferevent只用处理错误事件和接收数据事件,加上后还要处理发送温度的事件。如果在发送温度事件时,同时可能有其他事件或回调函数正在修改或访问相同的 bev 变量,可能会导致竞态条件或意外的状态更改,从而引发段错误,导致并发问题。

代码如下:

static void temp_cb(evutil_socket_t fd, short events, void *arg)
{
	 struct bufferevent		*bev = (struct bufferevent *)arg;

	 if (!bev) {
        log_error("Received NULL buffer event in temp_cb\n");
        return;
    }

    send_temperature(bev);
}



static void read_cb (struct bufferevent *bev, void *ctx)
{
    wss_session_t              *session = bev->cbarg;

    if( !session->handshaked )
    {
        do_wss_handshake(session);
        return ;
    }

    do_parser_frames(session);

    return ;
}

static void event_cb (struct bufferevent *bev, short events, void *ctx)
{
    wss_session_t              *session = bev->cbarg;

    if( events&(BEV_EVENT_EOF|BEV_EVENT_ERROR) )
    {
        if( session )
            log_warn("remote client %s closed\n", session->client);

        bufferevent_free(bev);
    }

    return ;
}


static void accept_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *addr, int len, void *arg)
{
	struct event_base               *ebase = arg;
	struct bufferevent 				*recv_bev, *send_bev;
	struct event            		*temp_event = NULL;
	struct timeval       			tv={10, 0};
    struct sockaddr_in              *sock = (struct sockaddr_in *)addr;
	wss_session_t                   *session;

	if( !(session = malloc(sizeof(*session))) )
    {
        log_error("malloc for session failure:%s\n", strerror(errno));
        close(fd);
        return ;
    }

	memset(session, 0, sizeof(*session));
    
	snprintf(session->client, sizeof(session->client), "[%d->%s:%d]", fd, inet_ntoa(sock->sin_addr), ntohs(sock->sin_port));
    log_info("accpet new socket client %s\n", session->client);


	bev_accpt = bufferevent_socket_new(ebase, fd, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_DEFER_CALLBACKS);
	if( !bev_accpt )
    {
        log_error("create bufferevent for client for %s failed\n", session->client);
        return;
    }
	session->bev = bev_accpt;

	bufferevent_setcb(bev_accpt, read_cb, NULL, event_cb, session);
	bufferevent_enable(bev_accpt, EV_READ|EV_WRITE);


	temp_event = event_new(ebase, -1, EV_PERSIST, temp_cb, bev_accpt);
	if (!temp_event)
    {
        log_error("failed to create temp event\n");
        return;
    }
	event_add(temp_event, &tv);


    return;
}

三、分析

libevent是通过I/O多路复用来实现高效的事件处理,事件循环(event loop)会不断调用底层的 I/O 多路复用函数( selectpollepoll),等待事件的发生。

libevent本身确实是一个单线程事件驱动模型。但是也可能出现并发问题:

  1. 多线程环境:尽管 libevent 的核心是单线程的,但如果程序是多线程的,并且多个线程尝试访问和操作同一个 bufferevent,就会出现并发问题。
  2. 事件回调重入:如果事件回调函数执行的时间较长,而在此期间另一个事件被触发并尝试访问同一个资源,可能会导致重入问题。
  3. 非线程安全代码:即使在单线程环境中,某些操作可能会触发不安全的并发访问,例如在回调中操作全局变量或共享资源。

四、解决方法

解决并发访问的问题最常用的方法是加锁,libevent 提供了一些机制来确保 bufferevent 在多线程环境下的安全性。例如,可以使用 bufferevent_lockbufferevent_unlock 函数来显式地对 bufferevent 进行加锁和解锁操作。

我选择的方式是把接收的bufferevent和发送的bufferevent分开,将接收和发送操作独立进行,避免一个操作阻塞另一个操作,提高整体响应速度。

标签:struct,bufferevent,并发,session,cb,libevent,bev,event
From: https://www.cnblogs.com/LiBlog--/p/18293998

相关文章

  • go并发模式 o-channel
    packagemainimport("fmt""time")funcmain(){varorfunc(channels...<-chaninterface{})<-chaninterface{}or=func(channels...<-chaninterface{})<-chaninterface{}{switchlen(channels)......
  • go并发模式 or-do-channel + bridge
    packagemainimport("context""fmt")//orDonefuncorDone(ctxcontext.Context,value<-chanint)<-chanint{ordoneStream:=make(chanint)gofunc(){deferclose(ordoneStream)for{......
  • go并发模式 tee-channel
    packagemainimport("context""fmt""time")functeeChannel(ctxcontext.Context,value<-chanint)(<-chanint,<-chanint){ch1:=make(chanint)ch2:=make(chanint)gofunc(){......
  • go并发模式 pipeline
    packagemainimport("fmt""math/rand")funcmain(){pFn:=func(done<-chaninterface{},fnfunc()int)<-chanint{valueStream:=make(chanint)gofunc(){deferclose(valueStream)......
  • go并发模式 错误处理
    packagemainimport("fmt""net/http")typeResultsstruct{ErrorerrorResponse*http.Response}funcmain(){checkStatus:=func(done<-chaninterface{},urls...string)<-chanResults{re......
  • go并发模式 扇入扇出
    扇入扇出寻找素数:packagemainimport("fmt""math/rand""runtime""sync""time")varrepeatFn=func(done<-chaninterface{},fnfunc()interface{})<-chaninterface{}{valueSt......
  • 【转】-Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
    Java并发编程:CountDownLatch、CyclicBarrier和Semaphore该博客转载自​Matrix海子​的​Java并发编程:CountDownLatch、CyclicBarrier和Semaphore在java1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下......
  • 【转】-并发下的集合
    高并发下的Java数据结构(List、Set、Map、Queue)本文转载至​薛勤的博客​的​高并发下的Java数据结构(List、Set、Map、Queue)由于并行程序与串行程序的不同特点,适用于串行程序的一些数据结构可能无法直接在并发环境下正常工作,这是因为这些数据结构不是线程安全的。本节将着重......
  • 【转】-Java并发编程:CountDownLatch、CyclicBarrier和Semaphore
    Java并发编程:CountDownLatch、CyclicBarrier和Semaphore该博客转载自​Matrix海子​的​Java并发编程:CountDownLatch、CyclicBarrier和Semaphore在java1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下......
  • 【转】-Java并发之CyclicBarrier
    Java并发之CyclicBarrier​ 该博客转载自​巴蜀码哥​**的​Java并发之CyclicBarrier**barrier(屏障)与互斥量、读写锁、自旋锁不同,它不是用来保护临界区的。相反,它跟条件变量一样,是用来协同多线程一起工作的。条件变量是多线程间传递状态的改变来达到协同工作的效果。屏障是......