Smart socket
import sqlite3
class SmartSocket:
DB_FILE = "smart_socket.db"
def __init__(self):
self.db = sqlite3.connect(SmartSocket.DB_FILE)
# self.name = name
# self.description = description
self.switches = [{"state": False, "remark": f"开关{i+1}"} for i in range(8)]
self._create_table()
#self._load_or_create_switches()
def _create_table(self):
cursor = self.db.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS socket_switches (
id INTEGER PRIMARY KEY AUTOINCREMENT,
socket_name TEXT,
socket_description TEXT,
switch_index INTEGER,
state INTEGER,
switch_remark TEXT
)
''')
self.db.commit()
def _load_or_create_switches(self, name, description):
cursor = self.db.cursor()
cursor.execute('''
SELECT switch_index, state, switch_remark, socket_description
FROM socket_switches
WHERE socket_name = ?
ORDER BY switch_index
''', (name,))
rows = cursor.fetchall()
if rows:
self.switches = [{"state": bool(r[1]), "remark": r[2]} for r in rows]
self.description = rows[0][3] # 读取数据库中的描述
else:
# 新插座,写入数据库
for i, switch in enumerate(self.switches):
cursor.execute('''
INSERT INTO socket_switches
(socket_name, socket_description, switch_index, state, switch_remark)
VALUES (?, ?, ?, ?, ?)
''', (name, description, i, int(switch["state"]), switch["remark"]))
self.db.commit()
def _update_switch(self, socket_name, index, state, remark):
cursor = self.db.cursor()
cursor.execute('''
UPDATE socket_switches
SET state = ?, switch_remark = ?
WHERE socket_name = ? AND switch_index = ?
''', (int(state), remark, socket_name, index))
self.db.commit()
def _update_desciption_remark(self,socket_name,socket_description,index,remark):
cursor = self.db.cursor()
cursor.execute('''
UPDATE socket_switches
SET socket_description = ?, switch_remark = ?
WHERE socket_name = ? AND switch_index = ?
''', (socket_description, remark, socket_name, index))
self.db.commit()
def turn_on(self, socket_name, index):
self._validate_index(index)
self.switches[index]["state"] = True
self._update_switch(socket_name, index, True, self.switches[index]["remark"])
def turn_on_socket(self, socket_name):
#switches = [{"state": True, "remark": f"开关{i+1}"} for i in range(8)]
for i in range(8):
self.turn_on(socket_name, i)
def turn_off(self, socket_name, index):
self._validate_index(index)
self.switches[index]["state"] = False
self._update_switch(socket_name, index, False, self.switches[index]["remark"])
def turn_off_socket(self,socket_name):
for i in range(8):
self.turn_off(socket_name, i)
#no work
def toggle(self,socket_name, index):
self._validate_index(index)
self.switches[index]["state"] = not self.switches[index]["state"]
state = self.switches[index]["state"]
self._update_switch(socket_name, index, state, self.switches[index]["remark"])
def set_remark(self, index, remark):
self._validate_index(index)
self.switches[index]["remark"] = remark
state = self.switches[index]["state"]
self._update_switch(self.name, index, state, remark)
def get_status(self):
lines = [f"{i+1}: {'开' if sw['state'] else '关'} - 备注: {sw['remark']}"
for i, sw in enumerate(self.switches)]
return f"插座:{self.name}\n备注:{self.description}\n开关状态:\n" + "\n".join(lines)
def _validate_index(self, index):
if not 0 <= index < 8:
raise ValueError("开关索引必须在 0 到 7 之间")
def close(self):
self.db.close()
# 🔹 静态方法:列出所有插座
@staticmethod
def list_all_sockets():
db = sqlite3.connect(SmartSocket.DB_FILE)
cursor = db.cursor()
cursor.execute('SELECT DISTINCT socket_name FROM socket_switches')
rows = cursor.fetchall()
db.close()
return [r[0] for r in rows]
@staticmethod
def list_all_sockets_with_status():
db = sqlite3.connect(SmartSocket.DB_FILE)
cursor = db.cursor()
cursor.execute('''
SELECT socket_name, socket_description, switch_index, state, switch_remark
FROM socket_switches
ORDER BY socket_name, switch_index
''')
rows = cursor.fetchall()
db.close()
if not rows:
print("数据库中没有插座记录。")
return
# 分组输出
from collections import defaultdict
grouped = defaultdict(list)
descriptions = {}
for name, desc, idx, state, remark in rows:
print(name, desc, idx, state, remark)
'''
for name, desc, idx, state, remark in rows:
grouped[name].append((idx, state, remark))
descriptions[name] = desc
for name in grouped:
print(f"\n插座:{name}")
print(f"备注:{descriptions[name]}")
for idx, state, remark in grouped[name]:
status = "开" if state else "关"
print(f" 开关{idx+1}: {status} - 备注: {remark}")
'''
# 🔹 静态方法:删除一个插座
@staticmethod
def delete_socket(socket_name: str):
db = sqlite3.connect(SmartSocket.DB_FILE)
cursor = db.cursor()
cursor.execute('DELETE FROM socket_switches WHERE socket_name = ?', (socket_name,))
db.commit()
db.close()
@staticmethod
def add_socket(name: str, description: str = ""):
db = sqlite3.connect(SmartSocket.DB_FILE)
cursor = db.cursor()
# 检查是否已存在
cursor.execute('SELECT 1 FROM socket_switches WHERE socket_name = ?', (name,))
if cursor.fetchone():
print(f"插座“{name}”已存在,未重复添加。")
db.close()
return
# 插入8个默认开关
for i in range(8):
cursor.execute('''
INSERT INTO socket_switches
(socket_name, socket_description, switch_index, state, switch_remark)
VALUES (?, ?, ?, ?, ?)
''', (name, description, i, 0, f"开关{i+1}"))
db.commit()
db.close()
print(f"插座“{name}”添加成功。")
# 示例用法
if __name__ == "__main__":
# 创建或加载插座
socket1 = SmartSocket()
#socket1._load_or_create_switches("餐厅插座", "供电视机使用")
# socket1.turn_off_socket("客厅插座")
#socket1.turn_on("客厅插座",1)
# print(socket1.list_all_sockets())
#socket1._update_desciption_remark("客厅插座","Huawei-100",2,"QiLin")
socket1.list_all_sockets_with_status()
#socket1.toggle("客厅插座",3) #no use
# socket1.set_remark(0, "电视电源")
# socket1.turn_on(0)
# print(socket1.get_status())
# socket1.close()
# 列出所有插座
# print("\n当前所有插座名称:")
# for name in SmartSocket.list_all_sockets():
# print("-", name)
# 删除插座示例(可取消注释)
# SmartSocket.delete_socket("客厅插座")