035——从GUI->Client->Server->driver实现SPI控制DAC芯片

发布于:2024-04-25 ⋅ 阅读:(23) ⋅ 点赞:(0)

目录

1、修改GUI

2、修改client

3、server编写

4、driver_handle

5、test

6、 项目管理


1、修改GUI

我想让DAC控制是个滑动条

import PySimpleGUI as sg

def slider_callback(slider_value):
    print(f'Current value: {slider_value}')

layout = [
    [sg.Text('Select a value:')],
    [sg.Slider(range=(1, 100), orientation='h', size=(20, 15), default_value=50, key='-SLIDER-', enable_events=True)],
    [sg.Button('OK')]
]

window = sg.Window('Slider Example', layout)

while True:
    event, values = window.read()

    if event == 'Close' or event == 'OK':
        break
    elif event == '-SLIDER-':
        slider_value = values['-SLIDER-']
        slider_callback(slider_value)

window.close()

 

现在滑动条的event可以获取了但是有个问题就是和后面DAC之间匹配不上,太丑啦

import PySimpleGUI as sg  
  
layout = [  
    [sg.Text('居中对齐', size=(10, 1), justification='c', pad=(10, 5))],  # 水平和垂直填充,居中对齐  
    [sg.Text('左对齐偏下', size=(10, 1), justification='l', pad=((10, 0), (5, 0)))],  # 上边填充使文本偏下,左对齐  
    [sg.Text('右对齐偏下', size=(10, 1), justification='r', pad=((0, 10), (5, 0)))],  # 上边填充使文本偏下,右对齐  
]  
  
window = sg.Window('文本对齐示例', layout)  
  
while True:  
    event, values = window.read()  
    if event == sg.WIN_CLOSED:  
        break  
  
window.close()

分开测试都好使,放在一起就不行,里普瓦

好像可以,之前给的值太小了导致看不出来,给了个20现在看着还算是协调

'''
fuction : 客户端界面
author  : 辛天宇
date    : 2024-4-12
-------------------------------
author date  modify

'''
import PySimpleGUI as sg
import tool
import global_var

# 调用显示框架
def show_window(theme):
    # 是否使用自定义标题栏
    use_custom_titlebar = False
    # 设置主题
    sg.theme(theme)
    # 创建菜单
    Menu = sg.Menu
    # 左部layout
    layout_l =  [
                    [tool.name('NetWork'), sg.Button('Connect', key='Connect')],
                    [tool.name('NetWork'), sg.Button('Disconnect', key='Disconnect')],
                    [tool.name('NetWork'), sg.Output(size=(32, 1), key='IP')],
                    [sg.Checkbox('Input', use_custom_titlebar, enable_events=True, key='input')],
                    [sg.InputText(disabled=True, key='txbuff'), sg.Button('SEND', key='send')],
                    [sg.Slider(range=(0, 100), orientation='h', size=(29, 20), default_value=0, key='DAC', enable_events=True), sg.Text('DAC', size=(5,2), justification='c',pad=(0,(20,0)), font='Courier 14')],
                    [sg.InputText(key='AT24C02_I', default_text=''), sg.Button('AT24C02 WRITE', key='AT24C02_W')],
                ]
    # 右部layout
    layout_r  = [
                    [sg.Output(size=(16, 1), key='LED_O'), sg.Button('LED', key='LED')],
                    [sg.Output(size=(16, 1), key='SR501_O'), sg.Button('SR501', key='SR501')],
                    [sg.Output(size=(16, 1), key='SR04_O'), sg.Button('SR04', key='SR04')],
                    [sg.Output(size=(16, 1), key='IRDA_O'), sg.Button('IRDA', key='IRDA')],
                    [sg.Output(size=(16, 1), key='DHT11_O'), sg.Button('DHT11', key='dht11')],
                    [sg.Output(size=(16, 1), key='DS18B20_O'), sg.Button('DS18B20', key='ds18b20')],
                    [sg.Output(size=(16, 1), key='AT24C02_O'), sg.Button('AT24C02 READ', key='AT24C02_R')],
                    #[tool.name('Text'), sg.Button('IIC', key='IIC')],
                ]
    # 修饰
    topic = tool.add_stars_to_string(global_var.TOPIC, 5)
    # 设置字体为14号,无特殊样式,通常字体名称和大小用元组表示  
    topic_font = ('_', 24)
    # 整体layout
    layout = [
                [sg.T(topic, text_color='blue', justification='c', font=topic_font)],
                [sg.Col(layout_l), sg.Col(layout_r)],
                [sg.Text('Output:', size=(7,1), justification='r',pad=(0,0), font='Courier 10'),],
                [sg.Output(size=(96, 8), key='Output')],
                [sg.Button('Clean', key='Clean'), sg.Button('Quit', key='Quit')]
            ]

    window = sg.Window('The PySimpleGUI Element List', layout, finalize=True, keep_on_top=True)
    return window

# 处理事件
def event_handle():
    window = show_window('DefaultNoMoreNagging')
    # 事件循环  
    while True:  
        try:
            event, values = window.read()
            if event == 'Exit':  
                break
            if event == 'dht11':
                message = f"{global_var.TEM}°C   {global_var.HUM}%"
                window['Getvalue'].update(message)
            if event == 'ds18b20':
                message = f"{global_var.TEM}°C"
                window['Getvalue'].update(message)
            if event == 'input': 
                print(f"INPUT-----------------") 
            elif event == 'Quit':  
                print(f"See you.............")
                break
            # elif event == 'Connect':
            #     global_var.SERVER_IP = "192.168.5.10"
            #     window['IP'].update(global_var.SERVER_IP)
            # elif event == 'Disconnect':  
            #     global_var.SERVER_IP = "connectionless network service"
            #     window['IP'].update(global_var.SERVER_IP)
            elif event is None:
                print(f"xxxxxxxxxxxxxxxxxxxx")
                break
            elif event == 'LED':  
                print(f"LED-----------------") 
            # 处理其他事件...
        except Exception as e:
            window.close()
            print(f"An error occurred: {e}")
            return 0
    window.close()
    return 0  

def main():
    # theme参考/client/README
    event_handle()

if __name__ == '__main__':
    main()

2、修改client

'''
fuction : 客户端程序
author  : 辛天宇
date    : 2024-4-13
-------------------------------
author     date      modify
辛天宇   2024-4-15   结合GUI和网络通信

'''
import show
import tcp
import tool
import socket
import global_var


def send_handle(window, client_socket, values):
    global_var.TX_BUF = values['txbuff'] 
    print(f"txbuff={global_var.TX_BUF}")
    # 清理input
    window['txbuff'].update(value='')
    data = global_var.TX_BUF
    client_socket.sendall(data.encode())
    # 接收服务器的响应
    data = client_socket.recv(512)
    # 将字节字符串转化为字符串
    global_var.RX_BUF = data.decode('utf-8')
    # print(f"rx......{global_var.RX_BUF}") 

def quit_handel(client_socket):
    cmd='Q'
    client_socket.sendall(cmd.encode())
    tcp.disconnect_to_server(client_socket)

def motor_handel(window, client_socket, values):
    i = int(values['MOTOR_I'])
    global_var.MOTOR_DATA = str(abs(i)%360)
    if i >= 0:
        global_var.MOTOR_DIRECTION='0'
    else:
        global_var.MOTOR_DIRECTION='1'
    message = 's'+global_var.MOTOR_DIRECTION+global_var.MOTOR_DATA
    # 清理input
    window['MOTOR_I'].update(value='0')
    set_tx_buf('motor', message)
    send_cmd(client_socket)
    # 接收服务器的响应
    data = client_socket.recv(512)
    # 将字节字符串转化为字符串
    global_var.RX_BUF = data.decode('utf-8')
    # print(f"rx......{global_var.RX_BUF}") 

# 进行一次发送
def send_cmd(client_socket):
    data = global_var.TX_BUF
    client_socket.sendall(data.encode())
    # 接收服务器的响应
    data = client_socket.recv(512)
    # 将字节字符串转化为字符串
    global_var.RX_BUF = data.decode('utf-8')
    # print(f"rx......{global_var.RX_BUF}") 

# 设置发送消息
def set_tx_buf(device, message): 
    if device == 'sr04':
        global_var.TX_BUF = '@002'+message
    if device == 'led':
        global_var.TX_BUF = '@000'+message
    elif device == 'sr501':
        global_var.TX_BUF = '@001'+message
    elif device == 'irda':
        global_var.TX_BUF = '@003'+message
    elif device == 'motor':
        global_var.TX_BUF = '@004'+message
    elif device == 'dht11':
        global_var.TX_BUF = '@005'+message
    elif device == 'ds18b20':
        global_var.TX_BUF = '@006'+message
    elif device == 'AT24C02':
        global_var.TX_BUF = '@007'+message
    elif device == 'dac':
        global_var.TX_BUF = '@008'+message

# 解析IRDA传入的数据
def irda_data_analysis(cmd):
    if cmd[5] == 'r':
        global_var.IRDA_DATA = "RED"
    elif cmd[5] == 'm':
        global_var.IRDA_DATA = "MENU"
    elif cmd[5] == 't':
        global_var.IRDA_DATA = "TEST"
    elif cmd[5] == '+':
        global_var.IRDA_DATA = "+"
    elif cmd[5] == 'b':
        global_var.IRDA_DATA = "BACK"
    elif cmd[5] == 'l':
        global_var.IRDA_DATA = "LEFT"
    elif cmd[5] == 'p':
        global_var.IRDA_DATA = "START"
    elif cmd[5] == 'r':
        global_var.IRDA_DATA = "RIGHT"
    elif cmd[5] == '0':
        global_var.IRDA_DATA = "0"
    elif cmd[5] == '1':
        global_var.IRDA_DATA = "1"
    elif cmd[5] == '2':
        global_var.IRDA_DATA = "2"
    elif cmd[5] == '-':
        global_var.IRDA_DATA = "-"
    elif cmd[5] == 'c':
        global_var.IRDA_DATA = "c"
    elif cmd[5] == '3':
        global_var.IRDA_DATA = "3"
    elif cmd[5] == '4':
        global_var.IRDA_DATA = "4"
    elif cmd[5] == '5':
        global_var.IRDA_DATA = "5"
    elif cmd[5] == '6':
        global_var.IRDA_DATA = "6"
    elif cmd[5] == '7':
        global_var.IRDA_DATA = "7"
    elif cmd[5] == '8':
        global_var.IRDA_DATA = "8"
    elif cmd[5] == '9':
        global_var.IRDA_DATA = "9"

# 处理数据
def cmd_handle(window):
    cmd = global_var.RX_BUF
    global_var.RX_BUF="#000"
    if len(cmd) < 4:
        print("cmd ERROR")
        return -1
    if '@' == cmd[0]:
        # 目前驱动设备数量只有两位数
        if cmd[1] == '0':
            # LED: @000+1位命令位+1位数据位
            if cmd[2] == '0' and cmd[3] == '0':
                if cmd[5] == '1':
                    print("LED Status change success")
                elif cmd[5] == '0':
                    print("LED Status change failure")
                else:
                    print("message ERROR")
            # SR501:@001+1位数据位
            elif cmd[2] == '0' and cmd[3] == '1':
                if cmd[5] == '1':
                    print("有人")
                    message='有人'
                    window['SR501_O'].update(message)
                elif cmd[5] == '0':
                    print("无人")
                    message='无人'
                    window['SR501_O'].update(message)
                else:
                    print("message ERROR")
            # SR04
            elif cmd[2] == '0' and cmd[3] == '2':
                if cmd[4] == 'g':
                    global_var.SR04_DATA = cmd[5:8]
                    message = f"{global_var.SR04_DATA}cm"
                    window['SR04_O'].update(message)
                else:
                    print("SR04: message ERROR")               
            # irda
            elif cmd[2] == '0' and cmd[3] == '3':
                # message = cmd[5]
                # print(message)
                if cmd[4] == 'g':
                    irda_data_analysis(cmd)
                    window['IRDA_O'].update(global_var.IRDA_DATA)
            # motor
            elif cmd[2] == '0' and cmd[3] == '4':
                if cmd[4] == 's':
                    print("MOTOR: message SCUESS") 
                else:
                    print("MOTOR: message ERROR")  
            # dht11
            elif cmd[2] == '0' and cmd[3] == '5':
                if cmd[4] == 'g':
                    try:
                        global_var.TEM=cmd[5]+cmd[6]
                        global_var.HUM=cmd[7]+cmd[8]
                        message = f"{global_var.TEM}°C   {global_var.HUM}%"
                        window['DHT11_O'].update(message)
                    except:
                        message = "ERROR"
                        window['DHT11_O'].update(message)
                        print("DHT11: message ERROR")
                else:
                    message = "ERROR"
                    window['DHT11_O'].update(message)
                    print("DHT11: message ERROR")  
            # ds18b20
            elif cmd[2] == '0' and cmd[3] == '6':
                if cmd[4] == 'g':
                    try:
                        global_var.TEM=cmd[5]+cmd[6]
                        message = f"{global_var.TEM}°C"
                        window['DS18B20_O'].update(message)
                    except:
                        message = "ERROR"
                        window['DS18B20_O'].update(message)
                        print("DS18B20: message ERROR")
                else:
                    message = "ERROR"
                    window['DS18B20_O'].update(message)
                    print("DS18B20: message ERROR")
            # AT24C02
            elif cmd[2] == '0' and cmd[3] == '7':
                if cmd[4] == 'g':
                    try:
                        message = cmd[5:]
                        window['AT24C02_O'].update(message)
                    except:
                        message = "ERROR"
                        window['AT24C02_O'].update(message)
                        print("AT24C02: message ERROR")
                elif cmd[4] == 'p':
                    window['AT24C02_I'].update(value='')
                    print("AT24C02: write SUCCESS")
                else:
                    message = "ERROR"
                    window['AT24C02_O'].update(message)
                    print("AT24C02: message ERROR")
            # DAC&&spi
            elif cmd[2] == '0' and cmd[3] == '8':
                if cmd[4] == 'p':
                    print("DAC: SUCCESS")
                else:
                    print("DAC: message ERROR")

# 处理事件
def event_handle(window, client_socket):
    led = 0
    # 事件循环
    while True:  
        try:
            cmd_handle(window)
            event, values = window.read()
            if event == 'input':
                window['txbuff'].update(disabled=not values['input'])
            elif event == 'send':
                send_handle(window, client_socket, values)
            elif event == 'Clean':
                window['Output'].update(value='')
                window['AT24C02_O'].update(value='')
            elif event == 'dht11':
                window['DHT11_O'].update(value=' ')
                set_tx_buf('dht11', 'g')
                send_cmd(client_socket)
            elif event == 'ds18b20':
                window['DS18B20_O'].update(value=' ')
                set_tx_buf('ds18b20', 'g')
                send_cmd(client_socket)
            elif event == 'Quit': 
                quit_handel(client_socket) 
                print(f"See you.............")
                break
            elif event is None:
                print(f"xxxxxxxxxxxxxxxxxxxx")
                break
            elif event == 'LED':
                if led % 2 == 0:
                    set_tx_buf('led','p1')
                else:
                    set_tx_buf('led','p0')
                led+=1
                if led > 100:
                    led = 0
                send_cmd(client_socket)
            elif event == 'SR501':
                set_tx_buf('sr501','g')
                send_cmd(client_socket)
            elif event == 'SR04':
                set_tx_buf('sr04','g')
                send_cmd(client_socket)
            elif event == 'IRDA':
                set_tx_buf('irda','g')
                send_cmd(client_socket)
            elif event == 'MOTOR':
                motor_handel(window, client_socket, values)
            elif event == 'AT24C02_R':
                window['AT24C02_O'].update(value='')
                set_tx_buf('AT24C02','g')
                send_cmd(client_socket)
            elif event == 'AT24C02_W':
                global_var.AT24C02_DATA = values['AT24C02_I']
                set_tx_buf('AT24C02','p'+global_var.AT24C02_DATA)
                print(global_var.TX_BUF)
                window['AT24C02_I'].update(value='')
                send_cmd(client_socket)
            elif event == 'DAC':
                set_tx_buf('dac','p'+values['DAC'])
                send_cmd(client_socket)
            # 处理其他事件...
        except Exception as e:
            window.close()
            print(f"An error occurred: {e}")
            return 0
    window.close()
    return 0  

def main():
    # 创建GUI对象
    window = show.show_window('DefaultNoMoreNagging')
    # 尝试连接到服务器  
    client_socket = tcp.connect_to_server()
    if client_socket is not None: 
        event_handle(window, client_socket)

if __name__ == '__main__':
    main()

3、server编写

            case 8:
                printf("DAC!!!\n");
                if ('p' == cmd[4])
                {
                    direction = direction_put;
                    get_data = extract_digit_number(cmd, 5, (strlen(&cmd[5])));
                    if (NOERROR == dac_handle(&get_data))
                    {
                        tx_buffer = "@008p";
                    }
                    else
                    {
                        tx_buffer = "@008e";
                    }

                }
                else
                {
                    tx_buffer = "@008e";
                }
                
                if (send(acceptfd, tx_buffer, strlen(tx_buffer), 0) < 0)
                {
                    perror("send failed");  
                }
                break;
            default:
                printf("Unknown equipment!!!\n");

4、driver_handle

/*
*author   : xintianyu
*function : Handle dac Settings
*date     : 2024-4-24
-----------------------
author date  modify

*/
int dac_handle(int *data)
{
	/*传入参数后面要做通用处理使用空指针*/
	/*路径宏定义*/
    char *device = "/dev/cebss_dac";
    int ret = NOERROR;
	int dac_val = 0;
	static int fd;

    /* 打开文件 */
	fd = open(device, O_RDWR);
	if (fd == -1)
	{
		printf("can not open file %s\n", device);
		return ERROR;
	}

    /*0~1023*/
    dac_val = *data*1023/100+1;
	printf("dac_val == %d\n", dac_val);
	write(fd, &dac_val, 2);
	dac_val += 50;

	close(fd);
	return ret;
}

5、test

非常的丝滑。

6、 项目管理