081. 编写一个函数,实现简单的数据库服务器功能

实现一个简单的数据库服务器功能,可以使用 Python 的 socket 模块来处理网络通信,同时使用 sqlite3 模块来管理数据库。以下是一个简单的示例,展示如何创建一个基于 TCP 的数据库服务器,该服务器可以接收客户端的查询请求并返回结果。

示例代码

以下代码实现了一个简单的数据库服务器,使用 SQLite 作为后端数据库。服务器可以接收客户端的 SQL 查询请求并返回查询结果。

import socket
import sqlite3
import threading

# 数据库服务器类
class SimpleDatabaseServer:
    def __init__(self, host='127.0.0.1', port=65432, db_name='example.db'):
        self.host = host
        self.port = port
        self.db_name = db_name
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen()
        print(f"数据库服务器启动,监听地址 {self.host}:{self.port}")

    def start(self):
        try:
            while True:
                conn, addr = self.server_socket.accept()
                print(f"连接来自 {addr}")
                client_thread = threading.Thread(target=self.handle_client, args=(conn, addr))
                client_thread.start()
        except KeyboardInterrupt:
            print("服务器关闭")
        finally:
            self.server_socket.close()

    def handle_client(self, conn, addr):
        with conn:
            while True:
                data = conn.recv(1024)
                if not data:
                    break
                query = data.decode('utf-8').strip()
                print(f"来自 {addr} 的查询:{query}")
                response = self.execute_query(query)
                conn.sendall(response.encode('utf-8'))

    def execute_query(self, query):
        try:
            with sqlite3.connect(self.db_name) as conn:
                cursor = conn.cursor()
                cursor.execute(query)
                if query.lower().startswith("select"):
                    result = cursor.fetchall()
                    return str(result)
                else:
                    conn.commit()
                    return "操作成功"
        except sqlite3.Error as e:
            return f"错误:{e}"

# 主函数
def main():
    server = SimpleDatabaseServer()
    server.start()

# 示例用法
if __name__ == "__main__":
    main()

代码说明

数据库服务器初始化

  • 使用 socket.socket 创建一个 TCP 套接字。

  • 使用 bind 方法绑定到指定的地址和端口。

  • 使用 listen 方法开始监听连接请求。

启动服务器

  • 使用 accept 方法接受客户端的连接。

  • 为每个客户端创建一个线程来处理请求。

处理客户端请求

  • 使用 recv 方法接收客户端发送的查询请求。

  • 调用 execute_query 方法执行查询。

  • 将查询结果发送回客户端。

执行查询

  • 使用 sqlite3.connect 连接到 SQLite 数据库。

  • 使用 cursor.execute 执行 SQL 查询。

  • 如果是 SELECT 查询,返回查询结果;否则返回操作成功或错误信息。

异常处理:捕获 sqlite3.Error 异常,返回错误信息。

示例输出

假设运行上述代码,服务器将启动并监听指定的地址和端口。当客户端连接并发送 SQL 查询请求时,服务器将执行查询并返回结果。

注意事项

  1. 多线程处理:使用 threading 模块为每个客户端创建一个线程,以支持多个客户端同时连接。
  2. 安全性:在实际应用中,需要对客户端的查询进行验证,避免 SQL 注入攻击。
  3. 错误处理:在实际应用中,建议添加更详细的异常处理机制,以处理网络错误、数据库错误等情况。
  4. 停止服务器:在实际应用中,可以设置一个条件变量或信号处理机制,以便在需要时停止服务器。

扩展功能

  1. 支持更多数据库:使用其他数据库(如 MySQL、PostgreSQL)代替 SQLite。
  2. 协议支持:实现更复杂的通信协议,例如支持 JSON 格式的请求和响应。
  3. 身份验证和授权:添加用户身份验证和授权机制,确保只有授权用户可以访问数据库。

视频讲解

BiliBili: 视睿网络-哔哩哔哩视频 (bilibili.com)