Intermediate

Your API call takes 300 milliseconds. You call it 50 times a minute, and the data changes only every few hours. That is 50 redundant round-trips, 50 sets of network overhead, and 50 chances for a transient failure to surface to your users. Caching the result for 10 minutes would cut 99% of those calls while keeping the data fresh enough for any reasonable use case. Python’s standard library offers functools.lru_cache, but it has no expiry time, no maximum memory limit, and no way to cache across multiple function arguments without manual key management. cachetools fills all these gaps.

cachetools is a Python library that provides several ready-made cache classes: LRU (Least Recently Used), TTL (Time to Live), LFU (Least Frequently Used), and more. Each cache is a dictionary-like object with a configurable maximum size and an eviction policy that removes entries when the cache is full. Install it with pip install cachetools. There are no mandatory dependencies — cachetools is pure Python and works in any environment.

This article covers the four most useful cache types (LRU, TTL, LFU, RR), using the @cached and @cachedmethod decorators for automatic function memoization, handling thread safety in concurrent applications, cache invalidation strategies, and a real-world example of caching API responses in a Flask application. By the end you will be able to add intelligent caching to any Python function in under five lines of code.

cachetools Quick Example

The fastest way to cache a function’s results is the @cached decorator with an LRU cache:

# quick_cache.py
import time
import cachetools
from cachetools import cached, LRUCache

@cached(cache=LRUCache(maxsize=128))
def get_user_data(user_id: int) -> dict:
    """Simulate a slow database or API call."""
    time.sleep(0.5)  # Simulate 500ms latency
    return {"id": user_id, "name": f"User {user_id}", "score": user_id * 10}

# First call: slow (cache miss)
start = time.perf_counter()
result = get_user_data(42)
print(f"First call:  {(time.perf_counter()-start)*1000:.0f}ms -- {result}")

# Second call: instant (cache hit)
start = time.perf_counter()
result = get_user_data(42)
print(f"Second call: {(time.perf_counter()-start)*1000:.0f}ms -- {result}")

# Different argument: slow again (cache miss)
start = time.perf_counter()
result = get_user_data(99)
print(f"Third call:  {(time.perf_counter()-start)*1000:.0f}ms -- {result}")

Output:

First call:  502ms -- {'id': 42, 'name': 'User 42', 'score': 420}
Second call:   0ms -- {'id': 42, 'name': 'User 42', 'score': 420}
Third call:  501ms -- {'id': 99, 'name': 'User 99', 'score': 990}

The second call returns in under 1 millisecond because the result is already stored in the LRU cache. The third call is slow again because user_id=99 is a different cache key from user_id=42. The cache key is derived from the function arguments by default — you can customize it with the key parameter. The maxsize=128 means the cache holds up to 128 distinct argument combinations; older entries are evicted when the limit is reached.

What Is cachetools and Which Cache Type Should You Use?

cachetools provides several cache implementations, each with a different eviction policy. The eviction policy determines which entry gets removed when the cache is full and a new item needs to be stored. Choosing the right eviction policy depends on your access patterns.

CacheEviction policyBest forHas TTL?
LRUCacheLeast Recently UsedGeneral purpose, most common patternNo
TTLCacheTime To Live + LRUAPI responses, config data that expiresYes
LFUCacheLeast Frequently UsedNon-uniform access, popular items differNo
RRCacheRandom ReplacementUniform access, simple evictionNo
MRUCacheMost Recently UsedSequential scans, most recent is least usefulNo

LRUCache is the right default for most applications — it keeps the items you accessed most recently and evicts items that have not been used for a while, which aligns with real access patterns (hot items get accessed repeatedly, cold items do not). TTLCache adds an expiry time, which is essential when caching data that changes over time, such as API responses or database records. Use LFUCache when a small subset of items is accessed far more often than others — it prioritizes keeping the most popular items regardless of recency.

LRUCache — Least Recently Used

LRUCache keeps the N most recently accessed entries. When the cache is full and a new entry arrives, the least recently used entry is evicted. This is the right choice for function results that are expensive to compute and accessed repeatedly with the same arguments.

# lru_cache_example.py
from cachetools import LRUCache, cached

# Create a cache that holds at most 3 entries
cache = LRUCache(maxsize=3)

@cached(cache=cache)
def compute_fibonacci(n: int) -> int:
    """Compute Fibonacci number -- expensive for large n."""
    if n < 2:
        return n
    return compute_fibonacci(n - 1) + compute_fibonacci(n - 2)

# Fill the cache: fib(10), fib(11), fib(12) are cached
print(compute_fibonacci(10))  # 55
print(compute_fibonacci(11))  # 89
print(compute_fibonacci(12))  # 144

print(f"Cache size: {len(cache)}")   # 3
print(f"Cache keys: {list(cache)}")  # [(10,), (11,), (12,)]

# Access a new value -- fib(13) evicts the LRU entry (fib(10))
print(compute_fibonacci(13))
print(f"Cache keys after: {list(cache)}")  # [(11,), (12,), (13,)]

# Inspect cache statistics
print(f"Cache info: {compute_fibonacci.cache_info()}")

Output:

55
89
144
Cache size: 3
Cache keys: [(10,), (11,), (12,)]
233
Cache keys after: [(11,), (12,), (13,)]
Cache info: CacheInfo(hits=X, misses=X, maxsize=3, currsize=3)

The cache is a regular dictionary-like object you can inspect, clear, and manipulate directly. cache.clear() invalidates all entries at once, and del cache[(10,)] invalidates a specific entry. This manual control is something functools.lru_cache does not easily support.

TTLCache -- Time To Live

TTLCache combines an LRU eviction policy with an expiry time. Every entry in the cache is considered stale after ttl seconds and will be evicted automatically on the next access or when iterating the cache. This makes TTLCache ideal for caching external data that changes over time.

# ttl_cache_example.py
import time
from cachetools import TTLCache, cached

# Cache holds up to 100 entries, each expires after 10 seconds
ttl_cache = TTLCache(maxsize=100, ttl=10)

@cached(cache=ttl_cache)
def fetch_exchange_rate(currency: str) -> float:
    """Simulate fetching a live exchange rate."""
    print(f"  [API call] Fetching rate for {currency}...")
    # In production, this would call a real API
    rates = {"USD": 1.0, "EUR": 0.92, "GBP": 0.79, "JPY": 149.5}
    return rates.get(currency, 1.0)

print("First fetch (cache miss):")
rate = fetch_exchange_rate("EUR")
print(f"EUR rate: {rate}")

print("\nSecond fetch (cache hit -- no API call):")
rate = fetch_exchange_rate("EUR")
print(f"EUR rate: {rate}")

print(f"\nCache has {len(ttl_cache)} entry, expires in {ttl_cache.timer() - ttl_cache['EUR']:.1f}s" if False else "")

# Simulate cache expiry by using a very short TTL
short_cache = TTLCache(maxsize=10, ttl=1)

@cached(cache=short_cache)
def get_timestamp(key: str) -> float:
    return time.time()

t1 = get_timestamp("a")
time.sleep(0.5)
t2 = get_timestamp("a")  # Hit -- same cached value
time.sleep(0.6)
t3 = get_timestamp("a")  # Miss -- TTL expired, new value

print(f"\nTimestamp test:")
print(f"t1={t1:.3f}, t2={t2:.3f} (same -- cache hit)")
print(f"t3={t3:.3f} (different -- TTL expired, new call)")
print(f"t3 > t1: {t3 > t1}")

Output:

First fetch (cache miss):
  [API call] Fetching rate for EUR...
EUR rate: 0.92

Second fetch (cache hit -- no API call):
EUR rate: 0.92

Timestamp test:
t1=1717300000.123, t2=1717300000.123 (same -- cache hit)
t3=1717300001.234 (different -- TTL expired, new call)
t3 > t1: True

The TTL is measured from when the entry is first inserted, not from when it is last accessed. An entry inserted at t=0 with ttl=60 expires at t=60 regardless of how many times it was read in between. If you need sliding expiry (where an access resets the timer), you must manage that manually by deleting and re-inserting the entry on each access.

Thread Safety with cachetools

cachetools cache objects are NOT thread-safe by default. If multiple threads read from and write to the cache concurrently, you will get race conditions and corrupt cache state. cachetools provides a Lock parameter for the @cached decorator to serialize access:

# thread_safe_cache.py
import threading
import time
from cachetools import TTLCache, cached

# Thread-safe cache using a threading lock
cache = TTLCache(maxsize=200, ttl=60)
lock = threading.RLock()

@cached(cache=cache, lock=lock)
def get_config(config_key: str) -> str:
    """Simulate a slow config read."""
    time.sleep(0.1)
    configs = {
        "db_host": "postgres.internal:5432",
        "redis_url": "redis://cache.internal:6379",
        "feature_flags": "new_ui=true,dark_mode=false",
    }
    return configs.get(config_key, "")

# Simulate 10 concurrent threads all requesting the same config key
results = []
errors = []

def worker(key: str):
    try:
        value = get_config(key)
        results.append(value)
    except Exception as e:
        errors.append(str(e))

threads = [threading.Thread(target=worker, args=("db_host",)) for _ in range(10)]
start = time.perf_counter()
for t in threads:
    t.start()
for t in threads:
    t.join()
elapsed = time.perf_counter() - start

print(f"10 threads completed in {elapsed:.2f}s")
print(f"All results identical: {len(set(results)) == 1}")
print(f"Result: {results[0]}")
print(f"Errors: {errors}")
print(f"Cache size: {len(cache)}")

Output:

10 threads completed in 0.10s
All results identical: True
Result: postgres.internal:5432
Cache size: 1

With the lock=threading.RLock() parameter, the first thread to request "db_host" acquires the lock, makes the slow call, and populates the cache. All other threads wait for the lock, then immediately get the cached value without making additional calls. The total time is roughly one slow call (100ms) instead of ten (1000ms). Always pass a lock when your cached function will be called from multiple threads -- this includes web frameworks like Flask and Django where request handlers run concurrently.

Caching Instance Methods with @cachedmethod

The @cached decorator creates a single shared cache for all calls to a function. For class methods, you often want each instance to have its own cache. Use @cachedmethod with a cache accessor function:

# cachedmethod_example.py
import threading
from cachetools import TTLCache, cachedmethod
from cachetools.keys import hashkey

class WeatherService:
    """Fetches weather data with per-instance TTL caching."""

    def __init__(self, api_key: str, cache_ttl: int = 300):
        self.api_key = api_key
        self._cache = TTLCache(maxsize=50, ttl=cache_ttl)
        self._lock = threading.RLock()

    @cachedmethod(cache=lambda self: self._cache, lock=lambda self: self._lock)
    def get_weather(self, city: str) -> dict:
        """Simulate API call -- 200ms delay."""
        import time
        time.sleep(0.2)
        # In production: return requests.get(f"https://api.weather.com/{city}").json()
        return {
            "city": city,
            "temp_c": len(city) * 3,   # predictable fake data
            "humidity": 65,
            "description": "Partly cloudy",
        }

    def clear_cache(self):
        self._cache.clear()

    @property
    def cache_size(self):
        return len(self._cache)

# Each instance gets its own cache
svc1 = WeatherService(api_key="key-abc", cache_ttl=300)
svc2 = WeatherService(api_key="key-xyz", cache_ttl=60)

import time
start = time.perf_counter()
print(svc1.get_weather("Sydney"))
print(f"First call: {(time.perf_counter()-start)*1000:.0f}ms")

start = time.perf_counter()
print(svc1.get_weather("Sydney"))  # cache hit in svc1
print(f"Second call (svc1): {(time.perf_counter()-start)*1000:.0f}ms")

start = time.perf_counter()
print(svc2.get_weather("Sydney"))  # cache miss in svc2 (separate instance)
print(f"First call (svc2): {(time.perf_counter()-start)*1000:.0f}ms")

print(f"\nsvc1 cache size: {svc1.cache_size}")
print(f"svc2 cache size: {svc2.cache_size}")

Output:

{'city': 'Sydney', 'temp_c': 18, 'humidity': 65, 'description': 'Partly cloudy'}
First call: 202ms
{'city': 'Sydney', 'temp_c': 18, 'humidity': 65, 'description': 'Partly cloudy'}
Second call (svc1): 0ms
{'city': 'Sydney', 'temp_c': 18, 'humidity': 65, 'description': 'Partly cloudy'}
First call (svc2): 201ms

svc1 cache size: 1
svc2 cache size: 1

The lambda self: self._cache accessor tells @cachedmethod which cache object to use for each instance. Because each instance stores its own self._cache, the two WeatherService objects have independent caches with independent TTLs. This pattern is especially useful when different instances connect to different backends or have different freshness requirements.

Real-Life Example: Caching API Responses in a Flask App

The following Flask application caches responses from a public REST API, demonstrating TTL caching, cache inspection, and manual invalidation:

# flask_cached_api.py
import time
import threading
from flask import Flask, jsonify
from cachetools import TTLCache, cached

app = Flask(__name__)

# Cache up to 200 responses, each valid for 5 minutes
response_cache = TTLCache(maxsize=200, ttl=300)
cache_lock = threading.RLock()

import urllib.request
import json as _json

def _fetch_post(post_id: int) -> dict:
    """Fetch a post from JSONPlaceholder (real public API)."""
    url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
    with urllib.request.urlopen(url, timeout=5) as resp:
        return _json.loads(resp.read())

@cached(cache=response_cache, lock=cache_lock)
def get_post_cached(post_id: int) -> dict:
    """Return post data, hitting cache if available."""
    return _fetch_post(post_id)

@app.route("/posts/")
def post_detail(post_id: int):
    start = time.perf_counter()
    data = get_post_cached(post_id)
    elapsed_ms = (time.perf_counter() - start) * 1000
    return jsonify({
        "data": data,
        "cache_size": len(response_cache),
        "elapsed_ms": round(elapsed_ms, 1),
    })

@app.route("/posts//invalidate", methods=["DELETE"])
def invalidate_post(post_id: int):
    key = (post_id,)
    with cache_lock:
        if key in response_cache:
            del response_cache[key]
            return jsonify({"invalidated": True, "post_id": post_id})
    return jsonify({"invalidated": False, "reason": "not in cache"})

@app.route("/cache/stats")
def cache_stats():
    return jsonify({
        "size": len(response_cache),
        "maxsize": response_cache.maxsize,
        "ttl_seconds": response_cache.ttl,
    })

if __name__ == "__main__":
    app.run(debug=True, port=5000)

Testing the API:

# Terminal -- run the Flask app first, then:

# First request (cache miss -- ~120ms API round-trip)
curl http://localhost:5000/posts/1
# {"data":{"userId":1,"id":1,"title":"sunt aut facere..."},"cache_size":1,"elapsed_ms":124.3}

# Second request (cache hit -- under 1ms)
curl http://localhost:5000/posts/1
# {"data":{"userId":1,"id":1,"title":"sunt aut facere..."},"cache_size":1,"elapsed_ms":0.1}

# Check cache stats
curl http://localhost:5000/cache/stats
# {"maxsize":200,"size":1,"ttl_seconds":300}

# Invalidate post 1 manually
curl -X DELETE http://localhost:5000/posts/1/invalidate
# {"invalidated":true,"post_id":1}

# Next request fetches fresh data
curl http://localhost:5000/posts/1
# {"data":{...},"cache_size":1,"elapsed_ms":118.7}

This pattern -- TTLCache + threading lock + manual invalidation endpoint -- covers 90% of real API caching scenarios. The TTL handles the common case (data goes stale and we eventually want fresh data), the lock handles concurrent requests (only one thread calls the API for each unique post ID), and the DELETE endpoint handles the uncommon case (we know the data changed and want to force a refresh immediately).

Frequently Asked Questions

What is the difference between cachetools and functools.lru_cache?

The main differences are expiry, size control, and flexibility. functools.lru_cache is built in to Python, never expires its entries, and cannot be shared across instances. cachetools adds TTL-based expiry, several eviction policies beyond LRU, and a cache object you can inspect and manipulate independently of the cached function. If you need expiry or you need to invalidate specific cache entries, use cachetools. For simple memoization with no expiry, functools.lru_cache is simpler.

How do I cache functions with unhashable arguments?

cachetools cache keys must be hashable by default. If your function takes a list, dict, or other unhashable type, use a custom key function. For example, to cache a function that takes a list: @cached(cache=LRUCache(128), key=lambda lst: tuple(sorted(lst))). The key function converts the unhashable argument to a hashable representation. cachetools also provides cachetools.keys.hashkey and cachetools.keys.typedkey for common scenarios.

Does cachetools work with async functions?

The standard @cached decorator does not work with async def functions. For async code, you need to either use a synchronous wrapper (call the async function synchronously inside a cached sync function) or use an async-aware caching library like asyncache, which provides @acached and @acached_method decorators compatible with cachetools cache classes. The cachetools cache objects themselves are compatible with async code as long as you use an appropriate async lock such as asyncio.Lock.

How do I handle cache stampede (thundering herd)?

Cache stampede happens when many concurrent requests arrive for an expired cache entry simultaneously -- they all miss the cache, all call the underlying function at the same time, and all receive the result within milliseconds of each other, flooding your backend. The threading lock pattern shown in this article prevents stampede by serializing cache misses: only one thread calls the underlying function at a time. For async applications with very high concurrency, consider adding a probabilistic early expiry (compute a new value when the TTL is 80% elapsed with some probability) or use a separate in-progress flag to detect and suppress parallel computations.

When should I use Redis instead of cachetools?

Use cachetools when the cached data only needs to survive within a single process. Use Redis when you need to share the cache across multiple processes or machines (such as multiple web server workers), when the cache must survive process restarts, or when the cached data is large enough that storing it in-process would consume too much RAM. A common production architecture uses both: cachetools for a fast per-process L1 cache with a short TTL, and Redis for a shared L2 cache with a longer TTL.

Conclusion

cachetools gives you LRU, TTL, LFU, and random-replacement caches as plain Python objects, plus the @cached and @cachedmethod decorators for zero-friction function memoization. You have seen how to apply LRUCache for general memoization, TTLCache for time-expiring API responses, thread-safe caching with lock=threading.RLock(), per-instance caching with @cachedmethod, and a real Flask application with manual cache invalidation.

The best next step is to profile your application and find the three slowest function calls that are called repeatedly with the same arguments. Wrap each one with a TTLCache and an appropriate TTL -- 60 seconds for data that changes often, 300 seconds for data that changes rarely. Measure the before-and-after response times and cache hit rates. For most applications, this change alone produces a measurable improvement in throughput and latency.

For the full cachetools API reference including MRU, RR caches, and custom key functions, see the official cachetools documentation.