大家好,我是jobleap.cn的小九。
psycopg2-binary 是 Python 连接 PostgreSQL 数据库的核心库(psycopg2的预编译二进制版本,无需编译依赖,开箱即用),本文将从环境准备、核心 API 讲解到实战案例,全面串联其常用用法,帮助你掌握 PostgreSQL 数据库的 Python 操作全流程。
一、环境准备
1. 安装 psycopg2-binary
使用 pip 快速安装(推荐 Python 3.6+ 版本):
# 安装最新版pip install psycopg2-binary# 安装指定版本(如适配特定 PostgreSQL 版本)pip install psycopg2-binary==2.9.92. 前置条件
- 已安装并启动 PostgreSQL 服务(本地/远程);
- 拥有可访问的 PostgreSQL 数据库、用户名和密码;
- 确保目标数据库端口(默认 5432)未被防火墙拦截。
二、核心概念与基础 API
psycopg2 的核心操作围绕连接(Connection)和游标(Cursor)展开:
Connection:负责与 PostgreSQL 数据库建立网络连接,管理事务;Cursor:基于连接创建的操作句柄,用于执行 SQL 语句、获取查询结果。
1. 数据库连接(connect())
psycopg2.connect()是创建数据库连接的核心函数,支持通过参数或 DSN 字符串传参,常用参数如下:
| 参数 | 说明 | 默认值 |
|---|---|---|
| host | 数据库服务器地址 | localhost |
| port | 数据库端口 | 5432 |
| dbname/database | 目标数据库名 | - |
| user | 数据库用户名 | 当前系统用户 |
| password | 数据库密码 | - |
| sslmode | SSL 连接模式(如 require) | disable |
基础连接示例:
importpsycopg2frompsycopg2importOperationalErrordefcreate_connection(db_name,db_user,db_password,db_host,db_port):"""创建数据库连接并返回 Connection 对象"""connection=Nonetry:connection=psycopg2.connect(database=db_name,user=db_user,password=db_password,host=db_host,port=db_port,)print("PostgreSQL 连接成功 ✅")exceptOperationalErrorase:print(f"连接失败 ❌:{e}")returnconnection# 替换为你的数据库信息conn=create_connection(db_name="test_db",db_user="postgres",db_password="123456",db_host="localhost",db_port="5432")2. 游标创建与 SQL 执行(cursor()/execute())
创建连接后,需通过conn.cursor()创建游标,再用游标执行 SQL 语句:
cursor.execute(sql, params):执行单条 SQL 语句(支持参数化查询);cursor.executemany(sql, params_list):批量执行相同结构的 SQL 语句;psycopg2.extras.execute_batch(cursor, sql, params_list, page_size=100):高性能批量执行(推荐替代executemany)。
(1)创建数据表示例
defcreate_table(connection):"""创建用户表(users)"""create_table_query=""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); """try:# 创建游标cursor=connection.cursor()# 执行 SQLcursor.execute(create_table_query)# 提交事务(psycopg2 默认关闭自动提交,必须手动提交)connection.commit()print("数据表创建成功 ✅")exceptExceptionase:print(f"创建表失败 ❌:{e}")# 异常时回滚事务connection.rollback()finally:# 关闭游标(避免资源泄漏)cursor.close()# 调用创建表函数ifconn:create_table(conn)(2)参数化查询(防 SQL 注入)
关键:psycopg2 使用%s作为占位符(而非 Python 的{}或%),参数需以元组/列表传入。
definsert_single_user(connection,name,age,email):"""插入单条用户数据(参数化查询)"""insert_query=""" INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; # 避免重复插入 """try:cursor=connection.cursor()# 传入参数(元组形式)cursor.execute(insert_query,(name,age,email))connection.commit()print(f"插入用户{name}成功 ✅")exceptExceptionase:print(f"插入失败 ❌:{e}")connection.rollback()finally:cursor.close()# 插入单条数据ifconn:insert_single_user(conn,"张三",25,"zhangsan@example.com")(3)批量插入数据
frompsycopg2importextrasdefbatch_insert_users(connection,users_list):"""批量插入用户数据(高性能版)"""insert_query=""" INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING; """try:cursor=connection.cursor()# 高性能批量执行(page_size 控制每次批量提交的条数)extras.execute_batch(cursor,insert_query,users_list,page_size=100)connection.commit()print(f"批量插入{len(users_list)}条数据成功 ✅")exceptExceptionase:print(f"批量插入失败 ❌:{e}")connection.rollback()finally:cursor.close()# 批量插入示例数据ifconn:users_data=[("李四",28,"lisi@example.com"),("王五",30,"wangwu@example.com"),("赵六",22,"zhaoliu@example.com")]batch_insert_users(conn,users_data)3. 数据查询(fetchone()/fetchmany()/fetchall())
执行查询类 SQL(SELECT)后,需通过游标获取结果:
cursor.fetchone():获取下一条结果(返回元组,无数据时返回 None);cursor.fetchmany(size):获取指定条数的结果(返回列表,元素为元组);cursor.fetchall():获取所有剩余结果(返回列表,元素为元组);cursor.rowcount:返回受上一条 SQL 影响的行数(查询时为匹配的行数)。
查询示例:
defquery_users(connection,age_min=0):"""查询年龄大于等于 age_min 的用户"""query=""" SELECT id, name, age, email, create_time FROM users WHERE age >= %s; """try:cursor=connection.cursor()cursor.execute(query,(age_min,))# 方式1:获取单条数据# single_user = cursor.fetchone()# if single_user:# print("单条结果:", single_user)# 方式2:获取指定条数(如2条)# partial_users = cursor.fetchmany(2)# print("部分结果:", partial_users)# 方式3:获取所有结果all_users=cursor.fetchall()print(f"\n查询到{cursor.rowcount}条符合条件的用户:")foruserinall_users:# 解析元组(id, name, age, email, create_time)print(f"ID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 创建时间:{user[4]}")exceptExceptionase:print(f"查询失败 ❌:{e}")finally:cursor.close()# 查询年龄≥25的用户ifconn:query_users(conn,age_min=25)4. 数据更新与删除
更新/删除操作与插入逻辑一致,需注意事务提交和参数化:
defupdate_user_age(connection,email,new_age):"""根据邮箱更新用户年龄"""update_query=""" UPDATE users SET age = %s WHERE email = %s; """try:cursor=connection.cursor()cursor.execute(update_query,(new_age,email))connection.commit()ifcursor.rowcount>0:print(f"更新{email}的年龄为{new_age}成功 ✅")else:print(f"未找到邮箱为{email}的用户 ❌")exceptExceptionase:print(f"更新失败 ❌:{e}")connection.rollback()finally:cursor.close()defdelete_user(connection,user_id):"""根据ID删除用户"""delete_query="DELETE FROM users WHERE id = %s;"try:cursor=connection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()ifcursor.rowcount>0:print(f"删除ID为{user_id}的用户成功 ✅")else:print(f"未找到ID为{user_id}的用户 ❌")exceptExceptionase:print(f"删除失败 ❌:{e}")connection.rollback()finally:cursor.close()# 执行更新和删除ifconn:update_user_age(conn,"zhangsan@example.com",26)delete_user(conn,3)# 删除ID为3的用户query_users(conn)# 重新查询验证结果5. 事务管理(commit()/rollback())
psycopg2 默认关闭「自动提交」模式,所有修改类操作(INSERT/UPDATE/DELETE/CREATE)都需要手动调用conn.commit()确认;若执行过程中出现异常,需调用conn.rollback()回滚事务,避免数据不一致。
事务回滚示例:
deftest_transaction(connection):"""测试事务回滚"""try:cursor=connection.cursor()# 第一步:插入数据cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)",("测试用户",99,"test@example.com"))# 第二步:故意触发错误(比如插入重复邮箱)cursor.execute("INSERT INTO users (name, age, email) VALUES (%s, %s, %s)",("重复用户",88,"zhangsan@example.com"))# 无异常则提交connection.commit()exceptExceptionase:print(f"事务执行失败,触发回滚 ❌:{e}")connection.rollback()# 回滚所有未提交的操作finally:cursor.close()# 测试事务回滚(最终 "测试用户" 不会被插入)ifconn:test_transaction(conn)query_users(conn)6. 类型转换(PostgreSQL ↔ Python)
psycopg2 会自动完成 PostgreSQL 类型与 Python 类型的转换,常用映射关系如下:
| PostgreSQL 类型 | Python 类型 |
|---|---|
| INT/SERIAL | int |
| VARCHAR/TEXT | str |
| TIMESTAMP/DATE | datetime.datetime/date |
| BOOLEAN | bool |
| ARRAY | list |
| JSON/JSONB | dict/list(需导入 extras) |
JSON 类型操作示例:
frompsycopg2.extrasimportJsondeftest_json_type(connection):"""测试 JSON 类型字段操作"""# 1. 先添加 JSON 字段alter_query="ALTER TABLE users ADD COLUMN IF NOT EXISTS info JSONB;"# 2. 更新 JSON 数据update_query="UPDATE users SET info = %s WHERE email = %s;"try:cursor=connection.cursor()cursor.execute(alter_query)# 传入 Python 字典(通过 Json 封装)user_info={"hobby":["篮球","编程"],"address":"北京市"}cursor.execute(update_query,(Json(user_info),"zhangsan@example.com"))connection.commit()# 3. 查询 JSON 字段cursor.execute("SELECT name, info FROM users WHERE email = %s;",("zhangsan@example.com",))result=cursor.fetchone()print(f"\nJSON 字段查询结果:")print(f"姓名:{result[0]}, 信息:{result[1]}")print(f"提取 hobby:{result[1]['hobby']}")# 直接按字典访问exceptExceptionase:print(f"JSON 操作失败 ❌:{e}")connection.rollback()finally:cursor.close()ifconn:test_json_type(conn)7. 连接池(生产环境必备)
频繁创建/关闭连接会消耗大量资源,生产环境建议使用连接池(psycopg2.pool)复用连接:
frompsycopg2importpool# 创建连接池(最小1个,最大5个连接)connection_pool=pool.SimpleConnectionPool(minconn=1,maxconn=5,database="test_db",user="postgres",password="123456",host="localhost",port="5432")defuse_pooled_connection():"""使用连接池获取连接"""# 从池获取连接conn=connection_pool.getconn()ifconn:print("\n从连接池获取连接成功 ✅")query_users(conn)# 归还连接到池(不是关闭)connection_pool.putconn(conn)# 测试连接池use_pooled_connection()# 关闭连接池(程序退出时)connection_pool.closeall()8. 资源释放
使用完连接和游标后,必须关闭以释放资源,推荐通过finally块确保执行:
# 最终关闭连接(非连接池场景)ifconn:try:conn.close()print("\n数据库连接已关闭 ✅")exceptExceptionase:print(f"关闭连接失败 ❌:{e}")三、完整实战脚本(串联所有 API)
以下脚本整合了上述所有常用操作,可直接运行(需替换数据库信息):
importpsycopg2frompsycopg2importOperationalError,extrasfrompsycopg2.extrasimportJsonfrompsycopg2importpool# 1. 创建数据库连接(或连接池)defcreate_connection(db_name,db_user,db_password,db_host,db_port):connection=Nonetry:connection=psycopg2.connect(database=db_name,user=db_user,password=db_password,host=db_host,port=db_port,)print("PostgreSQL 连接成功 ✅")exceptOperationalErrorase:print(f"连接失败 ❌:{e}")returnconnection# 2. 创建数据表defcreate_table(connection):create_table_query=""" CREATE TABLE IF NOT EXISTS users ( id SERIAL PRIMARY KEY, name VARCHAR(50) NOT NULL, age INT, email VARCHAR(100) UNIQUE, create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, info JSONB ); """try:cursor=connection.cursor()cursor.execute(create_table_query)connection.commit()print("数据表创建成功 ✅")exceptExceptionase:print(f"创建表失败 ❌:{e}")connection.rollback()finally:cursor.close()# 3. 插入/更新/删除/查询definsert_user(connection,user_data):insert_query="INSERT INTO users (name, age, email) VALUES (%s, %s, %s) ON CONFLICT (email) DO NOTHING;"try:cursor=connection.cursor()extras.execute_batch(cursor,insert_query,user_data,page_size=100)connection.commit()print(f"批量插入{len(user_data)}条数据成功 ✅")exceptExceptionase:print(f"插入失败 ❌:{e}")connection.rollback()finally:cursor.close()defupdate_user_info(connection,email,info):update_query="UPDATE users SET info = %s WHERE email = %s;"try:cursor=connection.cursor()cursor.execute(update_query,(Json(info),email))connection.commit()print(f"更新{email}的扩展信息成功 ✅")exceptExceptionase:print(f"更新失败 ❌:{e}")connection.rollback()finally:cursor.close()defquery_users(connection,age_min=0):query="SELECT id, name, age, email, info FROM users WHERE age >= %s;"try:cursor=connection.cursor()cursor.execute(query,(age_min,))all_users=cursor.fetchall()print(f"\n查询到{cursor.rowcount}条用户数据:")foruserinall_users:print(f"ID:{user[0]}, 姓名:{user[1]}, 年龄:{user[2]}, 邮箱:{user[3]}, 扩展信息:{user[4]}")exceptExceptionase:print(f"查询失败 ❌:{e}")finally:cursor.close()defdelete_user(connection,user_id):delete_query="DELETE FROM users WHERE id = %s;"try:cursor=connection.cursor()cursor.execute(delete_query,(user_id,))connection.commit()print(f"删除ID为{user_id}的用户{'成功'ifcursor.rowcount>0else'失败'}✅")exceptExceptionase:print(f"删除失败 ❌:{e}")connection.rollback()finally:cursor.close()# 主流程if__name__=="__main__":# 替换为你的数据库信息DB_CONFIG={"db_name":"test_db","db_user":"postgres","db_password":"123456","db_host":"localhost","db_port":"5432"}# 创建连接conn=create_connection(**DB_CONFIG)ifnotconn:exit(1)# 执行核心操作create_table(conn)insert_user(conn,[("张三",25,"zhangsan@example.com"),("李四",28,"lisi@example.com"),("王五",30,"wangwu@example.com")])update_user_info(conn,"zhangsan@example.com",{"hobby":["篮球","编程"],"address":"北京市"})query_users(conn,age_min=25)delete_user(conn,3)query_users(conn,age_min=25)# 关闭连接ifconn:conn.close()print("\n数据库连接已关闭 ✅")四、常见问题与注意事项
- SQL 注入风险:严禁拼接 SQL 字符串,必须使用
%s占位符传参; - 编码问题:PostgreSQL 默认编码为 UTF8,Python 脚本需确保编码一致;
- 连接超时:远程连接时需设置
connect_timeout参数(如connect(..., connect_timeout=10)); - 大结果集处理:避免使用
fetchall(),改用fetchone()或fetchmany()分批读取,防止内存溢出; - 版本兼容:psycopg2-binary 版本需与 PostgreSQL 服务版本适配(如 2.9.x 适配 PostgreSQL 12+)。
五、总结
psycopg2-binary 的核心流程可总结为:创建连接 → 创建游标 → 执行 SQL → 处理结果 → 提交/回滚事务 → 释放资源
掌握connect()、cursor()、execute()、commit()、fetch*()等核心 API,结合参数化查询、事务管理和连接池,即可安全、高效地实现 PostgreSQL 数据库的增删改查。生产环境中还需注意异常捕获、资源释放和性能优化(如批量操作、索引设计),确保系统稳定运行。