KV storage
The KV (key-value) storage service provides a simple yet powerful way to store and retrieve data in your Flows applications. It offers persistent storage with flexible data structures, allowing you to maintain state across executions without the complexity of a full database.
KV storage is commonly used with the lifecycle service to maintain state across sync operations.
Overview
Section titled “Overview”The KV storage service provides two storage scopes:
- App-level storage: Shared across all blocks within an app installation
- Block-level storage: Scoped to a specific block instance
This service is ideal for:
- Storing configuration data
- Caching external API responses
- Maintaining counters and aggregated metrics
- Persisting state between executions
- Implementing rate limiting
- Building simple queues or work lists
Storage levels
Section titled “Storage levels”App-level storage
Section titled “App-level storage”App-level storage is shared across all blocks within an app installation. This makes it ideal for:
- Shared configuration values
- Cross-block coordination
- Global counters or aggregators
- App-wide caching
// Store an API key that all blocks can accessawait kv.app.set({ key: "api-credentials", value: { key: "api-xyz-123", secret: "shhhh-secret", },});
// Retrieve the API key from any blockconst credentials = await kv.app.get("api-credentials");Block-level storage
Section titled “Block-level storage”Block-level storage is scoped to a specific block instance. This is perfect for:
- Block-specific state
- Block configuration
- Local caching
- Block-specific counters or metrics
// Store block-specific stateawait kv.block.set({ key: "last-sync-timestamp", value: Date.now(),});
// Retrieve block-specific stateconst lastSync = await kv.block.get("last-sync-timestamp");Core API
Section titled “Core API”The KV storage service provides a consistent API for both app-level and block-level storage.
Data structure
Section titled “Data structure”Each key-value pair follows this structure:
interface KVPair { key: string; // The unique identifier value?: any; // Any serializable value updatedAt?: number; // Unix timestamp when the KV pair was last updated ttl?: number; // Optional time-to-live in seconds // Optional lock information lock?: { id: string; timeout?: number; };}Getting values
Section titled “Getting values”Get a single value
Section titled “Get a single value”// App levelconst value = await kv.app.get("my-key");
// Block levelconst value = await kv.block.get("my-key");Get multiple values
Section titled “Get multiple values”// App levelconst values = await kv.app.getMany(["key1", "key2", "key3"]);
// Block levelconst values = await kv.block.getMany(["key1", "key2", "key3"]);Setting values
Section titled “Setting values”Set a single value
Section titled “Set a single value”// App levelawait kv.app.set({ key: "user-preferences", value: { theme: "dark", notifications: true },});
// Block level with TTL (expires after 3600 seconds / 1 hour)await kv.block.set({ key: "auth-token", value: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", ttl: 3600,});Set multiple values
Section titled “Set multiple values”// App levelawait kv.app.setMany([ { key: "config:timeout", value: 30000 }, { key: "config:retries", value: 3 },]);
// Block levelawait kv.block.setMany([ { key: "counter:visits", value: 42 }, { key: "counter:conversions", value: 7 },]);Listing values
Section titled “Listing values”List operations allow you to retrieve multiple key-value pairs that share a common prefix.
// App level - list all configuration keysconst configList = await kv.app.list({ keyPrefix: "config:" });
// Block level - list with paginationlet allMetrics = [];let startingKey = undefined;
do { const result = await kv.block.list({ keyPrefix: "metrics:", startingKey, });
allMetrics = [...allMetrics, ...result.pairs]; startingKey = result.nextStartingKey;} while (startingKey);Deleting values
Section titled “Deleting values”// App level - delete keysawait kv.app.delete(["global-temp-data"]);
// Block level - delete keysawait kv.block.delete(["temp-data"]);
// Delete multiple keys at either levelawait kv.app.delete(["cache:global-123", "cache:global-456"]);await kv.block.delete(["cache:user-123", "cache:user-456"]);Key features
Section titled “Key features”Time to live (TTL)
Section titled “Time to live (TTL)”You can set an expiration time on values using the ttl parameter:
// Cache an API response for 5 minutes (300 seconds)await kv.block.set({ key: "cache:weather-data", value: weatherApiResponse, ttl: 300,});This is particularly useful for:
- Caching external API responses
- Temporary tokens or credentials
- Rate limiting implementations
- Session-like data
Value serialization
Section titled “Value serialization”The KV storage service automatically handles serialization and deserialization of values. You can store:
- Primitive types (string, number, boolean)
- Objects
- Arrays
- null
// Store a complex objectawait kv.app.set({ key: "complex-data", value: { users: [ { id: 1, name: "Alice", active: true }, { id: 2, name: "Bob", active: false }, ], metadata: { lastUpdated: "2023-05-15T14:30:00Z", version: 2.1, }, },});
// Retrieve it later, fully deserializedconst data = await kv.app.get("complex-data");console.log(data.users[0].name); // "Alice"Race conditions and concurrency
Section titled “Race conditions and concurrency”The KV storage service includes a robust locking mechanism for safe concurrent access to shared keys. While basic operations like get and set are individual network requests, the service provides optimistic locking to coordinate access when multiple processes need to modify the same key simultaneously.
Understanding race conditions
Section titled “Understanding race conditions”Without locking, race conditions can occur when multiple processes access the same key:
// PROBLEMATIC: Race condition can occur hereasync function incrementCounter(counterName, increment = 1) { // Get current value const currentCounter = await kv.app.get(`counter:${counterName}`); const currentValue = currentCounter?.value || 0;
// Set new value const newValue = currentValue + increment; await kv.app.set({ key: `counter:${counterName}`, value: newValue, });
return newValue;}If two functions call this simultaneously with the same counter:
- Both read the same initial value (e.g., 10)
- Both calculate a new value (11)
- Both write 11 back
- The counter is incremented once instead of twice
Using locks for safe concurrent access
Section titled “Using locks for safe concurrent access”The KV service provides a locking mechanism to prevent race conditions:
async function safeIncrementCounter(counterName, increment = 1) { const lockId = crypto.randomUUID();
// Try to acquire lock const lockAcquired = await kv.app.set({ key: `counter:${counterName}`, value: 0, // Initial value if key doesn't exist lock: { id: lockId, timeout: 30, // Lock expires after 30 seconds }, });
if (!lockAcquired) { // Another process has the lock throw new Error("Could not acquire lock, try again later"); }
try { // Read current value const counter = await kv.app.get(`counter:${counterName}`); const currentValue = counter?.value || 0;
// Calculate new value const newValue = currentValue + increment;
// Update with the same lock ID const updateSuccess = await kv.app.set({ key: `counter:${counterName}`, value: newValue, lock: { id: lockId, // Must use the same lock ID }, });
if (!updateSuccess) { throw new Error("Lost lock during operation"); }
return newValue; } finally { // Always release the lock await kv.app.releaseLock({ key: `counter:${counterName}`, lockId, }); }}How locking works
Section titled “How locking works”The locking mechanism provides several guarantees:
- Unique lock IDs: Each lock is identified by a unique ID (typically a UUID)
- Lock acquisition: Setting a key with a lock succeeds only if the key is unlocked or the lock has expired
- Lock ownership: Only the exact lock ID can modify or release the lock
- Automatic expiration: Locks with timeouts automatically expire after the specified duration
- Graceful failures: Lock conflicts return
falseinstead of throwing errors - Database-level atomicity: Uses PostgreSQL row-level locks for thread-safe operations
Lock renewal for long operations
Section titled “Lock renewal for long operations”For operations that take longer than the initial timeout, you can renew the lock:
const lockId = crypto.randomUUID();
// Acquire lock with 30 second timeoutconst lockAcquired = await kv.app.set({ key: "long-process", value: "starting", lock: { id: lockId, timeout: 30, },});
if (lockAcquired) { try { // Start long-running work await doSomeWork();
// Extend lock before it expires const renewed = await kv.app.renewLock({ key: "long-process", lock: { id: lockId, timeout: 60, // Extend for 60 more seconds }, });
if (renewed) { // Continue with more work await doMoreWork(); } else { throw new Error("Failed to renew lock"); } } finally { await kv.app.releaseLock({ key: "long-process", lockId }); }}Lock best practices
Section titled “Lock best practices”When using locks, follow these guidelines:
- Always release locks: Use try-finally blocks to ensure locks are released even if errors occur
- Set appropriate timeouts: Balance between preventing stale locks and allowing operations to complete
- Handle lock failures gracefully: Lock acquisition can fail legitimately when another process has the lock
- Use unique lock IDs: Generate UUIDs for each lock to prevent conflicts
- Keep critical sections short: Minimize the time a lock is held to improve concurrency
- Consider retries: Implement retry logic with exponential backoff for transient lock conflicts
Real-world examples
Section titled “Real-world examples”Example 1: Caching external API data
Section titled “Example 1: Caching external API data”async function fetchWithCache(url, cacheKey, ttlSeconds = 300) { // Try to get from cache first const cached = await kv.block.get(cacheKey);
if (cached && cached.value) { console.log(`Cache hit for ${cacheKey}`); return cached.value; }
// Cache miss, fetch from API console.log(`Cache miss for ${cacheKey}, fetching from API`); const response = await fetch(url); const data = await response.json();
// Store in cache with TTL await kv.block.set({ key: cacheKey, value: data, ttl: ttlSeconds, });
return data;}
// Usageconst weatherData = await fetchWithCache( "https://api.weather.com/forecast?city=seattle", "cache:weather:seattle", 1800, // Cache for 30 minutes);Example 2: Implementing a rate limiter
Section titled “Example 2: Implementing a rate limiter”async function checkRateLimit(userId, limit = 10, windowSeconds = 60) { const key = `ratelimit:${userId}:${Math.floor(Date.now() / (windowSeconds * 1000))}`;
// Get current count const current = await kv.app.get(key); const count = current?.value || 0;
if (count >= limit) { return { allowed: false, current: count, limit, reset: windowSeconds - ((Date.now() / 1000) % windowSeconds), }; }
// Increment count await kv.app.set({ key, value: count + 1, ttl: windowSeconds, });
return { allowed: true, current: count + 1, limit, reset: windowSeconds - ((Date.now() / 1000) % windowSeconds), };}
// Usageasync function handleRequest(userId, requestData) { const rateLimit = await checkRateLimit(userId, 5, 60);
if (!rateLimit.allowed) { throw new Error( `Rate limit exceeded. Try again in ${Math.ceil(rateLimit.reset)} seconds.`, ); }
// Process the request...}Example 3: Maintaining persistent counters
Section titled “Example 3: Maintaining persistent counters”async function incrementCounter(counterName, increment = 1) { // Get current value const currentCounter = await kv.app.get(`counter:${counterName}`); const currentValue = currentCounter?.value || 0;
// Set new value const newValue = currentValue + increment; await kv.app.set({ key: `counter:${counterName}`, value: newValue, });
return newValue;}
// Note: This simple implementation is vulnerable to race conditions as described earlier.// For production use, consider using the saferIncrementCounter function with retries.
// Track API usageasync function trackAPIUsage(endpoint, userId) { await incrementCounter(`api:total:${endpoint}`); await incrementCounter(`api:user:${userId}:${endpoint}`);
// Also update daily stats with TTL const today = new Date().toISOString().split("T")[0]; await incrementCounter(`api:daily:${today}:${endpoint}`);
// Keep daily stats for 30 days const dailyKey = `api:daily:${today}:${endpoint}`; const current = await kv.app.get(dailyKey); if (current) { await kv.app.set({ key: dailyKey, value: current.value, ttl: 30 * 24 * 60 * 60, // 30 days in seconds }); }}