Batch Operations
Execute multiple API calls concurrently for improved performance
Overview
Batch operations allow you to execute multiple API calls concurrently using separate authenticated HTTP sessions. This dramatically improves performance when you need to perform many operations, achieving significant speedups through parallel execution.
from easy_acumatica import AcumaticaClient, BatchCall
client = AcumaticaClient()
# Create batch of customer lookups
# Use .batch(args) when passing arguments
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("CUST002"),
client.customers.get_by_id.batch("CUST003"),
max_concurrent=5
)
# Execute all calls concurrently
cust1, cust2, cust3 = batch.execute()
# 3x faster than sequential calls!
Basic Usage
Create batch calls using the .batch
property on any service method:
# Use .batch() at the END of the method call
# With arguments: method.batch(args)
# Without arguments: method.batch() with empty parens
# Batch calls with arguments
get_cust1 = client.customers.get_by_id.batch("CUST001")
get_cust2 = client.customers.get_by_id.batch("CUST002")
get_inv1 = client.invoices.get_by_id.batch("INV001")
# Batch calls without arguments
get_all_customers = client.customers.get_list.batch()
get_all_orders = client.sales_orders.get_list.batch()
# These calls haven't executed yet - they're just wrapped for batching
Executing Batches
Execute batches with the BatchCall
class and unpack results:
from easy_acumatica import BatchCall
# Create and execute batch with arguments
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("CUST002"),
client.customers.get_by_id.batch("CUST003")
)
# Execute and unpack results
customer1, customer2, customer3 = batch.execute()
# Batch without arguments - use empty parens
batch = BatchCall(
client.customers.get_list.batch(),
client.sales_orders.get_list.batch(),
client.stock_items.get_list.batch(),
max_concurrent=5
)
# Execute and get individual results
batch.execute()
for result in batch.get_successful_results():
print(f"Retrieved {len(result)} records")
Configuration Options
Control batch execution behavior with various options:
from easy_acumatica import BatchCall
# Progress callback
def on_progress(completed, total):
print(f"Progress: {completed}/{total} calls completed")
# Configured batch
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("CUST002"),
client.customers.get_by_id.batch("CUST003"),
max_concurrent=5, # Max 5 concurrent calls
timeout=30.0, # 30 second total timeout
fail_fast=False, # Continue on errors
return_exceptions=True, # Return errors, don't raise
progress_callback=on_progress
)
results = batch.execute()
max_concurrent
: Maximum number of concurrent API calls (default: 10)timeout
: Total timeout for batch execution in secondsfail_fast
: Stop execution on first failure (default: False)return_exceptions
: Return exceptions instead of raising (default: True)progress_callback
: Function called with progress updates
Helper Functions
Use helper functions for common batching patterns:
Batch Fetch by IDs
from easy_acumatica.batch import create_batch_from_ids
# Fetch many customers by ID
customer_ids = ["CUST001", "CUST002", "CUST003", "CUST004", "CUST005"]
batch = create_batch_from_ids(
client.customers,
customer_ids,
method_name='get_by_id'
)
customers = batch.execute()
Batch Queries with Filters
from easy_acumatica.odata import QueryOptions, F
from easy_acumatica.batch import create_batch_from_filters
# Create multiple filter queries
filters = [
QueryOptions(filter=F.Status == "Active"),
QueryOptions(filter=F.Status == "OnHold"),
QueryOptions(filter=F.Status == "Inactive")
]
batch = create_batch_from_filters(
client.customers,
filters,
method_name='get_list'
)
active, on_hold, inactive = batch.execute()
print(f"Active: {len(active)}, On Hold: {len(on_hold)}, Inactive: {len(inactive)}")
Error Handling
Handle errors in batch operations:
from easy_acumatica import BatchCall
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("NOTEXIST"), # Will fail
client.customers.get_by_id.batch("CUST003"),
return_exceptions=True # Return errors instead of raising
)
results = batch.execute()
# Check each result
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Call {i} failed: {result}")
else:
print(f"Call {i} succeeded: {result['CustomerID']}")
# Or get only successful results
successful = batch.get_successful_results()
print(f"Got {len(successful)} successful results")
# Get failed calls
failed = batch.get_failed_calls()
for index, call, error in failed:
print(f"Call {index} failed with: {error}")
Progress Tracking
Monitor batch execution progress:
from easy_acumatica import BatchCall
def progress_handler(completed, total):
percent = (completed / total) * 100
print(f"\rProgress: {completed}/{total} ({percent:.1f}%)", end="")
batch = BatchCall(
*[client.customers.get_by_id.batch(f"CUST{i:03d}") for i in range(1, 101)],
max_concurrent=10,
progress_callback=progress_handler
)
results = batch.execute()
print("\nBatch complete!")
Batch Statistics
Access execution statistics after batch completion:
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("CUST002"),
client.customers.get_by_id.batch("CUST003")
)
batch.execute()
# Print summary
batch.print_summary()
# Access stats directly
stats = batch.stats
print(f"Total time: {stats.total_time:.2f}s")
print(f"Average call time: {stats.average_call_time:.3f}s")
print(f"Success rate: {stats.successful_calls / stats.total_calls * 100:.1f}%")
print(f"Concurrency level: {stats.concurrency_level}")
Retrying Failed Calls
Retry only the failed calls from a batch:
# Initial batch with some failures
batch = BatchCall(
client.customers.get_by_id.batch("CUST001"),
client.customers.get_by_id.batch("NOTEXIST"), # Fails
client.customers.get_by_id.batch("CUST003"),
client.customers.get_by_id.batch("ALSOBAD"), # Fails
return_exceptions=True
)
batch.execute()
# Retry only failed calls
retry_batch = batch.retry_failed_calls(max_concurrent=3)
retry_results = retry_batch.execute()
# Combine successful results from both attempts
all_successful = batch.get_successful_results() + retry_batch.get_successful_results()
print(f"Total successful: {len(all_successful)}")
Advanced Usage
Combine batching with other features for complex workflows:
from easy_acumatica import AcumaticaClient, BatchCall
from easy_acumatica.odata import QueryOptions, F
client = AcumaticaClient()
# Fetch multiple customers with details
customer_ids = ["CUST001", "CUST002", "CUST003"]
options = QueryOptions(expand=["Contacts", "Orders"])
# Create batch with query options
batch = BatchCall(
*[client.customers.get_by_id.batch(cid, options=options) for cid in customer_ids],
max_concurrent=5
)
customers = batch.execute()
# Process results and create new batch for updates
update_batch = BatchCall(
*[
client.customers.put_entity.batch({
"CustomerID": {"value": cust["CustomerID"]["value"]},
"CreditLimit": {"value": cust["CreditLimit"]["value"] * 1.1}
})
for cust in customers if not isinstance(cust, Exception)
],
max_concurrent=3 # Lower concurrency for writes
)
updated = update_batch.execute()
print(f"Updated {len(updated)} customers")
Performance Tips
- Adjust
max_concurrent
based on your server's capacity (typically 5-20) - Use batch operations for independent calls only - dependent calls should run sequentially
- Each batch call uses a separate authenticated session to avoid serialization bottlenecks
- Sessions are automatically created, authenticated, and cleaned up
- Batching works best with read operations; use cautiously with writes
- Monitor your Acumatica server's load when using high concurrency