Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/redis/redis/llms.txt

Use this file to discover all available pages before exploring further.

The official Python client for Redis is redis-py. It provides a complete interface to Redis, including support for all Redis data types, commands, and advanced features like pipelining, pub/sub, and Redis modules.

Installation

Install redis-py using pip:
pip install redis
For production applications, consider installing redis[hiredis] for improved performance. The hiredis C parser can significantly speed up response parsing.

Quick Start

1

Import and connect

Create a connection to your Redis server:
import redis

# Connect to Redis
client = redis.Redis(
    host='localhost',
    port=6379,
    decode_responses=True  # Automatically decode responses to strings
)

# Test the connection
client.ping()
print("Connected to Redis!")
2

Perform basic operations

Use Redis commands through the client:
# SET and GET operations
client.set('user:1:name', 'Alice')
name = client.get('user:1:name')
print(f"Name: {name}")  # Output: Name: Alice

# Work with numbers
client.set('counter', 0)
client.incr('counter')
client.incrby('counter', 5)
count = client.get('counter')
print(f"Counter: {count}")  # Output: Counter: 6

# Expiration
client.setex('session:abc', 3600, 'session_data')  # Expires in 1 hour
3

Handle errors

Implement proper error handling:
import redis
from redis.exceptions import ConnectionError, TimeoutError, RedisError

try:
    client = redis.Redis(
        host='localhost',
        port=6379,
        socket_connect_timeout=5,
        socket_timeout=5,
        decode_responses=True
    )
    
    # Perform operations
    client.set('key', 'value')
    value = client.get('key')
    
except ConnectionError as e:
    print(f"Could not connect to Redis: {e}")
except TimeoutError as e:
    print(f"Redis operation timed out: {e}")
except RedisError as e:
    print(f"Redis error: {e}")
finally:
    # Close the connection
    client.close()

Connection Patterns

Using Connection Pools

For production applications, use connection pooling:
import redis

# Create a connection pool
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    max_connections=10,
    decode_responses=True
)

# Create clients from the pool
client = redis.Redis(connection_pool=pool)

# All operations use connections from the pool
client.set('key', 'value')
value = client.get('key')

Secure TLS Connections

Connect to Redis with TLS encryption:
import redis

client = redis.Redis(
    host='your-redis-host.com',
    port=6380,
    password='your-password',
    ssl=True,
    ssl_cert_reqs='required',
    ssl_ca_certs='/path/to/ca.crt',
    decode_responses=True
)

Redis Cloud Connection

Connect to Redis Cloud:
import redis

client = redis.Redis(
    host='your-endpoint.redis.cloud',
    port=12345,
    password='your-password',
    ssl=True,
    decode_responses=True
)

Working with Data Structures

Hashes

# Store user data as a hash
client.hset('user:1', mapping={
    'name': 'Alice',
    'email': 'alice@example.com',
    'age': 30
})

# Get specific fields
name = client.hget('user:1', 'name')

# Get all fields
user = client.hgetall('user:1')
print(user)  # {'name': 'Alice', 'email': 'alice@example.com', 'age': '30'}

# Increment numeric field
client.hincrby('user:1', 'age', 1)

Lists

# Add items to a list
client.rpush('queue', 'task1', 'task2', 'task3')

# Get list length
length = client.llen('queue')

# Pop item from list
task = client.lpop('queue')
print(task)  # task1

# Get range of items
tasks = client.lrange('queue', 0, -1)

Sets

# Add members to a set
client.sadd('tags', 'python', 'redis', 'database')

# Check membership
is_member = client.sismember('tags', 'python')  # True

# Get all members
tags = client.smembers('tags')

# Set operations
client.sadd('tags2', 'redis', 'cache', 'nosql')
common = client.sinter('tags', 'tags2')  # {'redis'}

Sorted Sets

# Add scored members
client.zadd('leaderboard', {
    'alice': 100,
    'bob': 150,
    'charlie': 120
})

# Get top scores (descending)
top_players = client.zrevrange('leaderboard', 0, 2, withscores=True)
# [('bob', 150.0), ('charlie', 120.0), ('alice', 100.0)]

# Get rank
rank = client.zrevrank('leaderboard', 'alice')  # 2 (0-indexed)

# Increment score
client.zincrby('leaderboard', 25, 'alice')

Advanced Features

Pipelining

Batch multiple commands for better performance:
# Create a pipeline
pipe = client.pipeline()

# Queue multiple commands
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')
pipe.get('key1')

# Execute all commands at once
results = pipe.execute()
print(results)  # [True, True, 1, 'value1']

Transactions

Execute commands atomically:
# Use WATCH for optimistic locking
with client.pipeline() as pipe:
    while True:
        try:
            # Watch a key for changes
            pipe.watch('balance')
            
            balance = int(pipe.get('balance') or 0)
            if balance >= 100:
                # Execute transaction
                pipe.multi()
                pipe.decrby('balance', 100)
                pipe.execute()
                break
            else:
                pipe.unwatch()
                raise ValueError("Insufficient balance")
        except redis.WatchError:
            # Retry if key was modified
            continue

Pub/Sub

Implement publish/subscribe messaging:
import redis
import threading

def message_handler(message):
    print(f"Received: {message['data']}")

# Subscriber
def subscriber():
    client = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = client.pubsub()
    pubsub.subscribe('notifications')
    
    for message in pubsub.listen():
        if message['type'] == 'message':
            message_handler(message)

# Start subscriber in a thread
thread = threading.Thread(target=subscriber)
thread.daemon = True
thread.start()

# Publisher
publisher = redis.Redis(host='localhost', port=6379, decode_responses=True)
publisher.publish('notifications', 'Hello, Redis!')

Starter Project

Redis Python Starter Project

Clone the official starter project to get up and running quickly with complete examples and best practices.

Additional Resources

redis-py Documentation

Official redis-py documentation

RedisVL for AI

Python library for vector operations and AI workloads

Redis Commands

Complete Redis command reference

Data Types Guide

Learn about Redis data structures