081. 编写一个函数,实现简单的数据库服务器功能
实现一个简单的数据库服务器功能,可以使用 Python 的 socket
模块来处理网络通信,同时使用 sqlite3
模块来管理数据库。以下是一个简单的示例,展示如何创建一个基于 TCP 的数据库服务器,该服务器可以接收客户端的查询请求并返回结果。
示例代码
以下代码实现了一个简单的数据库服务器,使用 SQLite 作为后端数据库。服务器可以接收客户端的 SQL 查询请求并返回查询结果。
import socket
import sqlite3
import threading
# 数据库服务器类
class SimpleDatabaseServer:
def __init__(self, host='127.0.0.1', port=65432, db_name='example.db'):
self.host = host
self.port = port
self.db_name = db_name
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen()
print(f"数据库服务器启动,监听地址 {self.host}:{self.port}")
def start(self):
try:
while True:
conn, addr = self.server_socket.accept()
print(f"连接来自 {addr}")
client_thread = threading.Thread(target=self.handle_client, args=(conn, addr))
client_thread.start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
self.server_socket.close()
def handle_client(self, conn, addr):
with conn:
while True:
data = conn.recv(1024)
if not data:
break
query = data.decode('utf-8').strip()
print(f"来自 {addr} 的查询:{query}")
response = self.execute_query(query)
conn.sendall(response.encode('utf-8'))
def execute_query(self, query):
try:
with sqlite3.connect(self.db_name) as conn:
cursor = conn.cursor()
cursor.execute(query)
if query.lower().startswith("select"):
result = cursor.fetchall()
return str(result)
else:
conn.commit()
return "操作成功"
except sqlite3.Error as e:
return f"错误:{e}"
# 主函数
def main():
server = SimpleDatabaseServer()
server.start()
# 示例用法
if __name__ == "__main__":
main()
代码说明
数据库服务器初始化:
-
使用
socket.socket
创建一个 TCP 套接字。 -
使用
bind
方法绑定到指定的地址和端口。 -
使用
listen
方法开始监听连接请求。
启动服务器:
-
使用
accept
方法接受客户端的连接。 -
为每个客户端创建一个线程来处理请求。
处理客户端请求:
-
使用
recv
方法接收客户端发送的查询请求。 -
调用
execute_query
方法执行查询。 -
将查询结果发送回客户端。
执行查询:
-
使用
sqlite3.connect
连接到 SQLite 数据库。 -
使用
cursor.execute
执行 SQL 查询。 -
如果是
SELECT
查询,返回查询结果;否则返回操作成功或错误信息。
异常处理:捕获 sqlite3.Error
异常,返回错误信息。
示例输出
假设运行上述代码,服务器将启动并监听指定的地址和端口。当客户端连接并发送 SQL 查询请求时,服务器将执行查询并返回结果。
注意事项
- 多线程处理:使用
threading
模块为每个客户端创建一个线程,以支持多个客户端同时连接。 - 安全性:在实际应用中,需要对客户端的查询进行验证,避免 SQL 注入攻击。
- 错误处理:在实际应用中,建议添加更详细的异常处理机制,以处理网络错误、数据库错误等情况。
- 停止服务器:在实际应用中,可以设置一个条件变量或信号处理机制,以便在需要时停止服务器。
扩展功能
- 支持更多数据库:使用其他数据库(如 MySQL、PostgreSQL)代替 SQLite。
- 协议支持:实现更复杂的通信协议,例如支持 JSON 格式的请求和响应。
- 身份验证和授权:添加用户身份验证和授权机制,确保只有授权用户可以访问数据库。
视频讲解
BiliBili: 视睿网络-哔哩哔哩视频 (bilibili.com)