python | redis 常用操作类
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
'''
@文件 : redis_util.py
@时间 : 2021/11/02 14:19:30
@作者 : ycfarmer
@版本 : 0.1
@说明 : redis python 操作类
'''
# 导入模块
from redis import Redis,StrictRedis,ConnectionPool
import time
# 主类
class REDIS(object):
def __init__(self,kwargs):
self.host = kwargs.get('host')
self.port= kwargs.get('port')
self.db = kwargs.get('db')
self.username = kwargs.get('username')
self.password = kwargs.get('password')
# print(self.port)
pool = ConnectionPool(host=self.host,port=self.port,db=self.db,password=self.password)
self.r = StrictRedis(connection_pool=pool)
# 向集合添加元素
def sadd(self, key, val):
return self.r.sadd(key,val)
def srem(self,key,val):
return self.r.srem(key,val)
# 判断是否存在
def sismember(self, key,val):
return self.r.sismember(key,val)
# 删除sadd集合
def sadd_del(self,key):
return self.r.delete(key)
# 有序集合
def zadd(self,key,val):
return self.r.zadd(key,val)
# 字符串加1
def incr(self,key,val):
return self.r.incr(key,val)
def zcard(self,key):
return self.r.zcard(key)
def keys(self,key):
return self.r.keys(key)
def get_key_count(self,key):
scan_cmd_exec_times = 0
# count = 0
cur = 0
key_count = 0
while True:
if scan_cmd_exec_times > 0 and scan_cmd_exec_times % 100 == 0:
# 每执行100条scan,sleep 0.1秒,防止ops大幅抬升
time.sleep(0.1)
cur, user_match_sets = self.r.scan(cursor=cur, match=f'{key}', count=1000)
scan_cmd_exec_times += 1
if cur == 0:
break
else:
key_count += len(user_match_sets)
# print(f"[{key}]扫出:{key_count} 个,{cur}")
return key_count
复制
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。
评论
目录