Task Scheduler
Schedule and automate recurring API operations
Overview
The Task Scheduler allows you to run API operations on a schedule - every few seconds, minutes, hours, or on specific days and times. It's built into the AcumaticaClient and handles task execution, monitoring, and persistence automatically.
Important: The scheduler runs in a background thread. You must keep your main script running (e.g., with a while True
loop or as part of a web server) for scheduled tasks to execute.
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import IntervalSchedule
import time
client = AcumaticaClient()
# Define a task function
def sync_customers():
customers = client.customers.get_list()
print(f"Synced {len(customers)} customers")
# Schedule to run every hour
client.scheduler.add_task(
name="customer_sync",
callable_obj=sync_customers,
schedule=IntervalSchedule(hours=1)
)
# Start the scheduler
client.scheduler.start()
# Keep the script running (scheduler runs in background thread)
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping scheduler...")
client.scheduler.stop()
client.logout()
Basic Usage
Access the scheduler via client.scheduler
:
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import IntervalSchedule
import time
client = AcumaticaClient()
def my_task():
print("Task executed!")
# Add task
task = client.scheduler.add_task(
name="my_task",
callable_obj=my_task,
schedule=IntervalSchedule(seconds=30)
)
# Start scheduler
client.scheduler.start()
# Keep running - scheduler executes in background
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
client.scheduler.stop()
Schedule Types
Create schedules using the builder pattern:
Interval Schedules
from easy_acumatica.scheduler import IntervalSchedule
# Every 30 seconds
IntervalSchedule(seconds=30)
# Every 5 minutes
IntervalSchedule(minutes=5)
# Every 2 hours
IntervalSchedule(hours=2)
# Every day
IntervalSchedule(days=1)
# Combined intervals
IntervalSchedule(hours=1, minutes=30) # Every 1.5 hours
Daily Schedules
from easy_acumatica.scheduler import DailySchedule
# Every day at 9:00 AM
DailySchedule(time="09:00")
# Every day at 5:30 PM
DailySchedule(time="17:30:00")
# Note: For multiple times per day, create separate tasks
Weekly Schedules
from easy_acumatica.scheduler import WeeklySchedule
# Every Monday at 9:00 AM
WeeklySchedule(day_of_week=0, time="09:00") # 0 = Monday
# Every Friday at 5:00 PM
WeeklySchedule(day_of_week=4, time="17:00") # 4 = Friday
# Day of week: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday, 4=Friday, 5=Saturday, 6=Sunday
Cron-like Schedules
from easy_acumatica.scheduler import CronSchedule
# Cron syntax: minute hour day month weekday
# Every weekday at 8:30 AM
CronSchedule("30 8 * * 1-5")
# First day of month at midnight
CronSchedule("0 0 1 * *")
# Every 15 minutes
CronSchedule("*/15 * * * *")
Task Management
Manage scheduled tasks:
# List all tasks
tasks = client.scheduler.list_tasks()
for task in tasks:
print(f"{task.name}: {task.status}")
# Get specific task
task = client.scheduler.get_task_by_name("customer_sync")
# Pause a task
client.scheduler.pause_task(task.id)
# Resume a task
client.scheduler.resume_task(task.id)
# Remove a task
client.scheduler.remove_task(task.id)
# Get task by ID
task = client.scheduler.get_task(task_id)
Task Options
Configure task behavior with RetryPolicy
and callbacks:
from easy_acumatica.scheduler import IntervalSchedule, RetryPolicy
def my_task():
# Task logic
pass
# Create retry policy
retry_policy = RetryPolicy(
max_retries=3,
retry_delay=120,
exponential_backoff=True,
backoff_factor=2.0
)
client.scheduler.add_task(
name="my_task",
callable_obj=my_task,
schedule=IntervalSchedule(minutes=5),
retry_policy=retry_policy,
enabled=True,
max_runs=None,
priority=0
)
retry_policy
:RetryPolicy
object for automatic retry on failureon_success
: Callback function executed after successful completionon_failure
: Callback function executed after task failureenabled
: Whether the task is enabled (default: True)max_runs
: Maximum number of executions (default: unlimited)priority
: Task priority for execution order (default: 0)
Callbacks and Error Handling
Handle task events with callbacks:
from easy_acumatica.scheduler import IntervalSchedule
def my_task():
# Your task logic
result = client.customers.get_list()
return len(result)
def on_success(task_result):
print(f"Task completed: {task_result.result} customers")
def on_failure(task_result):
print(f"Task failed: {task_result.error}")
client.scheduler.add_task(
name="customer_sync",
callable_obj=my_task,
schedule=IntervalSchedule(minutes=10),
on_success=on_success,
on_failure=on_failure
)
Scheduler Control
Start, stop, and monitor the scheduler:
# Start scheduler
client.scheduler.start()
# Check if running
is_running = client.scheduler._running
# Get statistics
print(f"Total executions: {client.scheduler.total_executions}")
print(f"Successful: {client.scheduler.successful_executions}")
print(f"Failed: {client.scheduler.failed_executions}")
# Stop scheduler (waits for running tasks)
client.scheduler.stop(wait=True, timeout=30)
# Force stop (don't wait)
client.scheduler.stop(wait=False)
Task Persistence
Persist tasks across restarts:
from pathlib import Path
from easy_acumatica.scheduler import IntervalSchedule
# Enable persistence
client = AcumaticaClient()
client.scheduler.persist_tasks = True
client.scheduler.persist_file = Path("my_tasks.json")
# Add tasks - they'll be saved automatically
client.scheduler.add_task(
name="sync_task",
callable_obj=sync_function,
schedule=IntervalSchedule(hours=1)
)
# On next startup, tasks are automatically loaded
# (Note: callable functions must be importable)
Complex Example
A practical example combining multiple features:
from easy_acumatica import AcumaticaClient
from easy_acumatica.odata import QueryOptions, F
from easy_acumatica.scheduler import IntervalSchedule, RetryPolicy
from datetime import datetime
import logging
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
client = AcumaticaClient()
def process_pending_orders():
"""Process orders that need attention."""
try:
# Find pending orders
options = QueryOptions(
filter=(F.Status == "Pending") & (F.OrderDate < datetime.now()),
top=50
)
orders = client.sales_orders.get_list(options=options)
processed = 0
for order in orders:
try:
# Process order (example: release it)
if hasattr(client.sales_orders, 'invoke_action_release'):
client.sales_orders.invoke_action_release(order)
processed += 1
except Exception as e:
logger.error(f"Failed to process order {order['OrderNbr']}: {e}")
return processed
except Exception as e:
logger.error(f"Order processing failed: {e}")
raise
def on_order_success(result):
logger.info(f"Successfully processed {result.result} orders")
def on_order_failure(result):
logger.error(f"Order processing failed: {result.error}")
# Create retry policy
retry_policy = RetryPolicy(
max_retries=2,
retry_delay=60,
exponential_backoff=True
)
# Schedule order processing every 15 minutes
client.scheduler.add_task(
name="process_orders",
callable_obj=process_pending_orders,
schedule=IntervalSchedule(minutes=15),
retry_policy=retry_policy,
on_success=on_order_success,
on_failure=on_order_failure
)
# Start scheduler
client.scheduler.start()
logger.info("Scheduler started - processing orders every 15 minutes")
# Keep application running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
client.scheduler.stop()
client.logout()
Decorator Pattern
Use decorators for cleaner task definitions:
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import every
client = AcumaticaClient()
# Using the 'every' decorator (automatically creates IntervalSchedule)
@every(hours=1, scheduler=client.scheduler)
def sync_customers():
customers = client.customers.get_list()
print(f"Synced {len(customers)} customers")
return len(customers)
# Task is automatically added to scheduler
client.scheduler.start()
Best Practices
- Keep the main script running: Use a
while True
loop or run as part of a web server/daemon - Keep task functions lightweight and fast
- Use error callbacks to handle and log failures
- Set appropriate timeouts to prevent hanging tasks
- Be mindful of API rate limits when scheduling frequent tasks
- Use
max_instances=1
to prevent overlapping executions - Enable persistence for long-running applications
- Test schedules with
run_immediately=True
before deploying - The scheduler automatically stops when the client is closed