linux线程封装和互斥

发布于:2025-08-01 ⋅ 阅读:(22) ⋅ 点赞:(0)

目录

线程控制

线程封装

线程互斥

互斥概念

互斥量mutex

互斥量接口

初始化互斥量

销毁互斥量

互斥量加锁和解锁

互斥量实现原理的探究


线程控制

我们接上。

1.

先前说,线程会有自己的私有数据,比如,线程ID,线程上下文数据,独立栈等等,但其实这个私有说法不大准确,为什么?

我们一般称为独立数据,因为线程是共享进程内的资源,它的LWP(也能说是pcb)指向进程的虚拟地址空间,所以,按理来说,只要能拿到这个虚拟地址空间的任何一个地址,都能访问。而且使用第三方pthread库,库中也会有它的线程管理结构(TCB,线程局部数据,线程栈),而库在加载到内存中,是在mmap区。

2.

一个线程有独立的上下文得益于它在内核要有自己的pcb,在用户层,有自己独立的管理块。

我们可以验证一下线程独立的栈数据能不能被其他线程拿到,其实也就是验证线程没有所谓的“私有”数据,它共享进程的所有:

int *p = nullptr;
void *routine(void *agrs)
{
    int a = 123;
    p = &a;//将自己地址送出去
    while (true)
    {
        sleep(1);
    }
}
int main()
{
    pthread_t tid;
    pthread_create(&tid, nullptr, routine, nullptr);
    while (true)
    {
        sleep(1);
        std::cout << *p << std::endl;
    }
    pthread_join(tid, nullptr);
    return 0;
}

理论上,只要线程能拿到地址,就能访问任何一个资源。

linux上利用系统调用模拟线程:

使用clone函数:

代码:

#define STACK_SIZE (1024 * 1024) // 1MB 的栈空间
// 子进程执行的函数
static int child_func(void *arg)
{
    while (true)
    {
        printf("Child process: PID = %d\n", getpid());
        sleep(1);
    }
    return 0;
}
int main()
{
    //创建栈空间
    char *stack = (char *)malloc(STACK_SIZE); 
    if (stack == NULL)
    {
        perror("malloc");
        exit(EXIT_FAILURE);
    }
    // 使用 clone 创建子进程
    pid_t pid = clone(child_func, stack + STACK_SIZE, CLONE_VM | SIGCHLD, NULL);
    if (pid == -1)
    {
        perror("clone");
        free(stack);
        exit(EXIT_FAILURE);
    }
    printf("Parent process: PID = %d, Child PID = %d\n", getpid(), pid);
    // 等待子进程结束
    if (waitpid(pid, NULL, 0) == -1)
    {
        perror("waitpid");
        free(stack);
        exit(EXIT_FAILURE);
    }
    free(stack);
    return 0;
}

理解线程局部存储:

//加了之后,每个线程的局部存储中,都会有
//一个int a变量!!!
//这样只有一个线程访问自己的a变量
//不存在并发问题
__thread int a = 0;
void *routine(void *agrs)
{
    while (1)
    {
        printf("新线程,a++:%d\n", ++a);
        sleep(1);
    }
}
int main()
{
    pthread_t rid;
    pthread_create(&rid, nullptr, routine, nullptr);
    while(1)
    {
        printf("主线程,a:%d\n",a);
        sleep(1);
    }
    pthread_join(rid, nullptr);
    return 0;
}

解释:

线程封装

封装成面对对象的线程。

我们想直接传入我们写好的函数func传给Thread类对象,创建的新线程调用它,这样就模拟出来了。我们可以传函数指针或者lambda或者其他可调用对象即可。

标志位函数:

创建线程:

我们简单的写会出问题:

如:

所以我们可以将Routine函数设置成静态函数,因为静态成员函数属于类本身而非对象实例,它没有this指针形参。

因为静态成员函数它没有this指针,所以它不能直接调用普通成员函数,如果要调用,必须对象实例,对象指针或者引用才能调用普通成员函数。

所以:

封装代码:

namespace ThreadModule
{
    static uint32_t number = 1;
    class Thread
    {
        using func_t = std::function<void()>;

    private:
        void EnableDetach()
        {
            std::cout << "detach thread" << std::endl;
            _isdetach = true;
        }
        void EnableRunning()
        {
            _isrunning = true;
        }

    public:
        Thread(func_t func)
            : _tid(0), _isdetach(false),
              _isrunning(false), _res(nullptr), _func(func)
        {
            _name = "Thread-" + std::to_string(number++);
        }
        void Detach()
        {
            // 只有不是分离的线程才能分离
            if (_isdetach)
                return;
            if (_isrunning)
                pthread_detach(_tid);
            EnableDetach();
        }
        static void *Routine(void *agrs)
        {
            Thread* self=static_cast<Thread*>(agrs);
            self->EnableRunning();
            if (self->_isdetach)
                self->Detach();
            //设置名字
            pthread_setname_np(self->_tid,self->_name.c_str());
            self->_func();//回调函数
            return nullptr;
        }
        bool Start()
        {
            if (_isrunning)
                return false;
            int n = pthread_create(&_tid, nullptr, Routine, this);
            if (n != 0)
            {
                std::cout << "create pthread fail" << std::endl;
                return false;
            }
            else
            {
                std::cout << "create pthread success" << std::endl;
                return true;
            }
        }
        bool Stop()
        {
            if (_isrunning) // 只要在跑的时候才能停止
            {
                int n = pthread_cancel(_tid);
                if (n != 0)
                {
                    std::cerr << "pthread stop fail" << strerror(n) << std::endl;
                    return false;
                }
                else
                {
                    _isrunning = false;
                    printf("%s stop success\n", _name.c_str());
                    return true;
                }
            }
            return false;
        }
        void Join()
        {
            if (_isdetach)
            {
                std::cout << "线程是分离,不能join" << std::endl;
                return;
            }
            int n = pthread_join(_tid, &_res);
            if (n != 0)
                std::cerr << "pthread join fail" << strerror(n) << std::endl;
            else
                std::cout << "pthread join success" << std::endl;
        }

    private:
        pthread_t _tid;
        std::string _name;
        bool _isdetach;
        bool _isrunning;
        void *_res;
        func_t _func;
    };
}

验证:

代码:

void func()
{
    char name[64];
    while(1)
    {
        pthread_getname_np(pthread_self(),name,sizeof(name));
        std::cout<<"新线程name:"<<name<<std::endl;
        sleep(1);
    }
}
int main()
{
    Thread t(func);
    t.Start();
    // t.Detach();//改位置验证即可
    sleep(5);
    t.Stop(); 
    t.Join();   
    return 0;
}

执行:

多线程验证代码:

int main()
{
    std::vector<Thread> threads;
    for (int i = 0; i < 5; i++)
    {
        //lambda
        threads.emplace_back([]()
                             {
        while(1)
        {
            char name[64];
            pthread_getname_np(pthread_self(),name,sizeof(name));
            std::cout<<"新线程name:"<<name<<std::endl; 
            sleep(1);
        } });
    }
    for(auto& thread:threads)
    {
        thread.Start();
    }
    for(auto& thread:threads)
    {
        thread.Join();
    }
    return 0;
}

我们这样写,func函数只能传无参数的可调用对象,那如果想要func函数传参呢?

我们可以这样写:

当然如果我们需要多个参数呢?我们可以使用可变参数来写,这只是用一个参数来举例子。

全部代码:

namespace ThreadModule
{
    static uint32_t number = 1;
    template<class T>
    class Thread
    {
        using func_t = std::function<void(T)>;

    private:
        void EnableDetach()
        {
            std::cout << "detach thread" << std::endl;
            _isdetach = true;
        }
        void EnableRunning()
        {
            _isrunning = true;
        }
    public:
        Thread(func_t func,T data)
            : _tid(0), _isdetach(false),
              _isrunning(false), _res(nullptr), _func(func)
              ,_data(data)
        {
            _name = "Thread-" + std::to_string(number++);
        }
        void Detach()
        {
            // 只有不是分离的线程才能分离
            if (_isdetach)
                return;
            if (_isrunning)
                pthread_detach(_tid);
            EnableDetach();
        }
        static void *Routine(void *agrs)
        {
            Thread *self = static_cast<Thread *>(agrs);
            self->EnableRunning();
            if (self->_isdetach)
                self->Detach();
            // 设置名字
            pthread_setname_np(self->_tid, self->_name.c_str());
            self->_func(self->_data); // 回调函数
            return nullptr;
        }
        bool Start()
        {
            if (_isrunning)
                return false;
            int n = pthread_create(&_tid, nullptr, Routine, this);
            if (n != 0)
            {
                std::cout << "create pthread fail" << std::endl;
                return false;
            }
            else
            {
                std::cout << "create pthread success" << std::endl;
                return true;
            }
        }
        bool Stop()
        {
            if (_isrunning) // 只要在跑的时候才能停止
            {
                int n = pthread_cancel(_tid);
                if (n != 0)
                {
                    std::cerr << "pthread stop fail" << strerror(n) << std::endl;
                    return false;
                }
                else
                {
                    _isrunning = false;
                    printf("%s stop success\n", _name.c_str());
                    return true;
                }
            }
            return false;
        }
        void Join()
        {
            if (_isdetach)
            {
                std::cout << "线程是分离,不能join" << std::endl;
                return;
            }
            int n = pthread_join(_tid, &_res);
            if (n != 0)
                std::cerr << "pthread join fail" << strerror(n) << std::endl;
            else
                std::cout << "pthread join success" << std::endl;
        }
    private:
        pthread_t _tid;
        std::string _name;
        bool _isdetach;
        bool _isrunning;
        void *_res;
        func_t _func;
        T _data;
    };
}

验证:

我们可以传任何类型的参数,哪怕是一个对象。

class TD
{
    private:
    int _a;
    int _b;
};
void func(TD td)
{
    while(1)
    {
        std::cout<<"hhhhh"<<std::endl;
        sleep(1);
    }
}
int main()
{
    TD td;
    Thread<TD> t(func,td);
    t.Start();
    sleep(4);
    t.Stop();
    t.Join();
    return 0;
}

线程互斥

互斥概念

  • 临界资源:多线程执⾏流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起 保护作⽤
  • 原⼦性:不会被任何调度机制打断的操作,该操作只有两态,要么完成, 要么未完成

互斥量mutex

  • ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量 归属单个线程,其他线程⽆法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完 成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来⼀些问题。

我们看个例子:多线程抢票的例子

代码:

int ticket = 100;
void *routine(void *agrs)
{
    char *id = (char *)agrs;
    while (1)
    {
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s buy ticket:%d\n", id, ticket);
            ticket--;
        }
        else
            break;
    }
    return nullptr;
}
int main()
{
    pthread_t tid1, tid2, tid3, tid4;
    pthread_create(&tid1, nullptr, routine, (void *)"thread-1");
    pthread_create(&tid2, nullptr, routine, (void *)"thread-2");
    pthread_create(&tid3, nullptr, routine, (void *)"thread-3");
    pthread_create(&tid4, nullptr, routine, (void *)"thread-4");
    //回收
    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);
    pthread_join(tid4, nullptr);
    return 0;
}

执行:

票竟然出现了负数!这其实就是多线程访问共享资源的并发问题,也叫线程安全问题!

原因:主要原因在ticket判断上,次要原因在ticket--上。

1.

我们现在初步理解一条汇编指令才是原子的,而一条语句不是原子的。

我们先看ticket--:

ticket--在底层其实是被汇编成3条指令(载入,减少,写回),这3条指令每一条都是原子的,只有做了没做的情况,而ticket--不是原子的。

多线程在做ticket--的时候可能会出现并发问题:

当线程A载入ticket到cpu寄存器ebx中,然后减一(ebx中的值为99),刚想将计算后的值写回ticket内存中,此时被OS切换成线程B了,在切换之前,会做保存线程A上下文(也就是说会将pc指针和计算后的值保存下来),线程B做载入,减少,写回3个动作,假设线程B一直没有切换,将ticket减到1了才被切换(此时内存ticket值为1),线程B的上下数据保存下来,切换线程A,线程A上下文数据被恢复至寄存器中,按照pc指针地址继续执行向下执行,也就是说,线程A会写入ticket内存中,此时ticket内存中的值是99。至此,线程A做的减减工作付诸东流了,一切白费!这就导致了数据不一致问题。

2.

再看ticket判断:

ticket判断在底层被汇编成俩个指令,载入和逻辑判断。

当线程A载入和逻辑判断做完,刚想做ticket--的工作,被切换了,线程A上下文数据被保存,线程B和C也是如此,都做完了载入和判断2个动作,假设ticket减到1的过程中,都是正常的,现在ticket为1,线程A被切换回,线程A的上下文数据被恢复,继续向下执行ticket--,此时ticket为0,线程A再次被切换成线程B,线程B上下文数据被恢复,继续向下执行ticket--,此时ticket为-1,线程B被切换成线程C,线程C上下文数据被恢复,继续向下执行,此时ticket为-2,然后线程A被恢复继续向下执行判断(循环执行完一次)跳出循环,线程B和C也是如此跳出循环。至此,ticket最终为-2!!!

3.

理解上下文数据:

cpu在执行程序,会将线程的数据加载到cpu寄存器中,也就是硬件中,cpu硬件只有一套,而数据可能有多套(线程切换时),cpu在执行程序时,硬件中的数据就是当前执行流的硬件上下文数据。

4.

线程什么时候会发生切换?

时间片耗尽,阻塞式IO,sleep等会将线程切走。从内核态返回用户态的时候,会进行检查,线程切回。(先前学习信号,也是这个时候会检查信号)

理解:

时间片耗尽线程切换其实就是时钟中断,OS切换线程,阻塞式IO,比如read的时候写端没有写的时候,读端会阻塞,这个时候就会线程切换!sleep也能这么理解,等待的时候会切换,这样不浪费资源,不可能让其他线程一直等着这个线程阻塞或等待完成吧!

其实时间片耗尽,阻塞式IO,sleep等就会陷入内核(中断,系统调用),而从内核态返回用户态的时候会进行检查,进而线程切回。

解决方法:当某个执行流进⼊临界区执⾏时,不允许其他执行流进⼊该临界区。质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。

加锁的本质,其实就是执行临界区代码时,由并行转化成串行。在某个线程执行期间,不会被打扰,也就是一种变相的原子性表现。

互斥量接口

初始化互斥量

初始化互斥量有两种⽅法:

⽅法1,静态分配:

//表示mutex已经被初始化了
pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;

方法2,动态分配:

销毁互斥量

销毁互斥量需要注意:

  • 使⽤ PTHREAD_ MUTEX_ INITIALIZER 初始化的互斥量不需要销毁
  • 不要销毁⼀个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁

互斥量加锁和解锁

调⽤ pthread_mutex_lock 时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到 互斥量,那么pthread_lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。

解决上述售票线程安全问题:

代码:

//也可以全局变量
// pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int ticket = 100;
void *routine(void *agrs)
{
    pthread_mutex_t *mutex = (pthread_mutex_t *)agrs;
    while (1)
    {
        pthread_mutex_lock(mutex); // 加锁
        if (ticket > 0)
        {
            usleep(1000);
            printf("buy ticket:%d\n", ticket);
            ticket--;
            pthread_mutex_unlock(mutex); // 解锁
        }
        else
        {
            //注意这里也要解锁
            pthread_mutex_unlock(mutex); // 解锁
            break;
        }
    }
    return nullptr;
}
int main()
{
    //也可以临时定义一个锁
    pthread_mutex_t mutex;
    pthread_mutex_init(&mutex,nullptr);//初始化
    //创建新线程
    pthread_t tid1, tid2, tid3, tid4;
    pthread_create(&tid1, nullptr, routine, &mutex);
    pthread_create(&tid2, nullptr, routine, &mutex);
    pthread_create(&tid3, nullptr, routine, &mutex);
    pthread_create(&tid4, nullptr, routine, &mutex);
    // 回收
    pthread_join(tid1, nullptr);
    pthread_join(tid2, nullptr);
    pthread_join(tid3, nullptr);
    pthread_join(tid4, nullptr);
    return 0;
}

互斥量实现原理的探究

问题:

1.

加锁之后,在临界区内部,允许线程切换吗?切换了会发生类似上述抢票情况吗?

允许切换,切换了不会发生类似上述抢票情况。因为某个线程进入临界区是加锁了的,即使被切换,也是持有锁被切换,而其他线程不会进入临界区,只会被挂起等待,也就是说其他线程必须要等持有锁线程切换回,执行完临界区代码,释放锁之后,才会被调度,继续展开对锁的竞争,从而进入临界区。

也就是说,锁会保证每次访问全局资源只有单个线程!

2.

怎么实现的?

只需要让某个线程一口气将临界区代码执行完,不要发生切换即可!

在硬件上:有很多办法,最直接的就是关闭时钟中断,这样哪怕时间片耗尽也不会切换。但是有风险,出现差池,可能会发生死机,系统不响应的情况。

在软件上:我们来看看底层。

我们可以将锁看成一个整数(我们假设是1),线程A申请锁,会将%al寄存器清0,再将%al寄存器中的数据和锁交换,此时%al寄存器中值是1,内存中mutex值是0,线程A刚想继续向下执行,被切换了,它的硬件上下数据被保存,调度线程B,线程B会将会将%al寄存器清0,将%al寄存器中的数据和锁交换,此时%al寄存器值为0,内存中mutex值也为0,然后判断,会被挂起等待,切换成线程A,继续向下执行,判断,条件成立(值>0),申请成功,进入临界区!

关键就在,只要谁交换了锁,锁就会一直跟着它,锁只有一份,谁交换到了,就是谁的!随便线程切换都没用,其他线程只会被挂起等待。

我们下期见!!!