Easy-Acumatica
Documentation

Task Scheduler

Schedule and automate recurring API operations

Overview

The Task Scheduler allows you to run API operations on a schedule - every few seconds, minutes, hours, or on specific days and times. It's built into the AcumaticaClient and handles task execution, monitoring, and persistence automatically.

Important: The scheduler runs in a background thread. You must keep your main script running (e.g., with a while True loop or as part of a web server) for scheduled tasks to execute.

Python
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import IntervalSchedule
import time

client = AcumaticaClient()

# Define a task function
def sync_customers():
    customers = client.customers.get_list()
    print(f"Synced {len(customers)} customers")

# Schedule to run every hour
client.scheduler.add_task(
    name="customer_sync",
    callable_obj=sync_customers,
    schedule=IntervalSchedule(hours=1)
)

# Start the scheduler
client.scheduler.start()

# Keep the script running (scheduler runs in background thread)
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping scheduler...")
    client.scheduler.stop()
    client.logout()

Basic Usage

Access the scheduler via client.scheduler:

Python
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import IntervalSchedule
import time

client = AcumaticaClient()

def my_task():
    print("Task executed!")

# Add task
task = client.scheduler.add_task(
    name="my_task",
    callable_obj=my_task,
    schedule=IntervalSchedule(seconds=30)
)

# Start scheduler
client.scheduler.start()

# Keep running - scheduler executes in background
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    client.scheduler.stop()

Schedule Types

Create schedules using the builder pattern:

Interval Schedules

Python
from easy_acumatica.scheduler import IntervalSchedule

# Every 30 seconds
IntervalSchedule(seconds=30)

# Every 5 minutes
IntervalSchedule(minutes=5)

# Every 2 hours
IntervalSchedule(hours=2)

# Every day
IntervalSchedule(days=1)

# Combined intervals
IntervalSchedule(hours=1, minutes=30)  # Every 1.5 hours

Daily Schedules

Python
from easy_acumatica.scheduler import DailySchedule

# Every day at 9:00 AM
DailySchedule(time="09:00")

# Every day at 5:30 PM
DailySchedule(time="17:30:00")

# Note: For multiple times per day, create separate tasks

Weekly Schedules

Python
from easy_acumatica.scheduler import WeeklySchedule

# Every Monday at 9:00 AM
WeeklySchedule(day_of_week=0, time="09:00")  # 0 = Monday

# Every Friday at 5:00 PM
WeeklySchedule(day_of_week=4, time="17:00")  # 4 = Friday

# Day of week: 0=Monday, 1=Tuesday, 2=Wednesday, 3=Thursday, 4=Friday, 5=Saturday, 6=Sunday

Cron-like Schedules

Python
from easy_acumatica.scheduler import CronSchedule

# Cron syntax: minute hour day month weekday
# Every weekday at 8:30 AM
CronSchedule("30 8 * * 1-5")

# First day of month at midnight
CronSchedule("0 0 1 * *")

# Every 15 minutes
CronSchedule("*/15 * * * *")

Task Management

Manage scheduled tasks:

Python
# List all tasks
tasks = client.scheduler.list_tasks()
for task in tasks:
    print(f"{task.name}: {task.status}")

# Get specific task
task = client.scheduler.get_task_by_name("customer_sync")

# Pause a task
client.scheduler.pause_task(task.id)

# Resume a task
client.scheduler.resume_task(task.id)

# Remove a task
client.scheduler.remove_task(task.id)

# Get task by ID
task = client.scheduler.get_task(task_id)

Task Options

Configure task behavior with RetryPolicy and callbacks:

Python
from easy_acumatica.scheduler import IntervalSchedule, RetryPolicy

def my_task():
    # Task logic
    pass

# Create retry policy
retry_policy = RetryPolicy(
    max_retries=3,
    retry_delay=120,
    exponential_backoff=True,
    backoff_factor=2.0
)

client.scheduler.add_task(
    name="my_task",
    callable_obj=my_task,
    schedule=IntervalSchedule(minutes=5),
    retry_policy=retry_policy,
    enabled=True,
    max_runs=None,
    priority=0
)
  • retry_policy: RetryPolicy object for automatic retry on failure
  • on_success: Callback function executed after successful completion
  • on_failure: Callback function executed after task failure
  • enabled: Whether the task is enabled (default: True)
  • max_runs: Maximum number of executions (default: unlimited)
  • priority: Task priority for execution order (default: 0)

Callbacks and Error Handling

Handle task events with callbacks:

Python
from easy_acumatica.scheduler import IntervalSchedule

def my_task():
    # Your task logic
    result = client.customers.get_list()
    return len(result)

def on_success(task_result):
    print(f"Task completed: {task_result.result} customers")

def on_failure(task_result):
    print(f"Task failed: {task_result.error}")

client.scheduler.add_task(
    name="customer_sync",
    callable_obj=my_task,
    schedule=IntervalSchedule(minutes=10),
    on_success=on_success,
    on_failure=on_failure
)

Scheduler Control

Start, stop, and monitor the scheduler:

Python
# Start scheduler
client.scheduler.start()

# Check if running
is_running = client.scheduler._running

# Get statistics
print(f"Total executions: {client.scheduler.total_executions}")
print(f"Successful: {client.scheduler.successful_executions}")
print(f"Failed: {client.scheduler.failed_executions}")

# Stop scheduler (waits for running tasks)
client.scheduler.stop(wait=True, timeout=30)

# Force stop (don't wait)
client.scheduler.stop(wait=False)

Task Persistence

Persist tasks across restarts:

Python
from pathlib import Path
from easy_acumatica.scheduler import IntervalSchedule

# Enable persistence
client = AcumaticaClient()
client.scheduler.persist_tasks = True
client.scheduler.persist_file = Path("my_tasks.json")

# Add tasks - they'll be saved automatically
client.scheduler.add_task(
    name="sync_task",
    callable_obj=sync_function,
    schedule=IntervalSchedule(hours=1)
)

# On next startup, tasks are automatically loaded
# (Note: callable functions must be importable)

Complex Example

A practical example combining multiple features:

Python
from easy_acumatica import AcumaticaClient
from easy_acumatica.odata import QueryOptions, F
from easy_acumatica.scheduler import IntervalSchedule, RetryPolicy
from datetime import datetime
import logging
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

client = AcumaticaClient()

def process_pending_orders():
    """Process orders that need attention."""
    try:
        # Find pending orders
        options = QueryOptions(
            filter=(F.Status == "Pending") & (F.OrderDate < datetime.now()),
            top=50
        )
        orders = client.sales_orders.get_list(options=options)

        processed = 0
        for order in orders:
            try:
                # Process order (example: release it)
                if hasattr(client.sales_orders, 'invoke_action_release'):
                    client.sales_orders.invoke_action_release(order)
                    processed += 1
            except Exception as e:
                logger.error(f"Failed to process order {order['OrderNbr']}: {e}")

        return processed
    except Exception as e:
        logger.error(f"Order processing failed: {e}")
        raise

def on_order_success(result):
    logger.info(f"Successfully processed {result.result} orders")

def on_order_failure(result):
    logger.error(f"Order processing failed: {result.error}")

# Create retry policy
retry_policy = RetryPolicy(
    max_retries=2,
    retry_delay=60,
    exponential_backoff=True
)

# Schedule order processing every 15 minutes
client.scheduler.add_task(
    name="process_orders",
    callable_obj=process_pending_orders,
    schedule=IntervalSchedule(minutes=15),
    retry_policy=retry_policy,
    on_success=on_order_success,
    on_failure=on_order_failure
)

# Start scheduler
client.scheduler.start()
logger.info("Scheduler started - processing orders every 15 minutes")

# Keep application running
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    logger.info("Shutting down...")
    client.scheduler.stop()
    client.logout()

Decorator Pattern

Use decorators for cleaner task definitions:

Python
from easy_acumatica import AcumaticaClient
from easy_acumatica.scheduler import every

client = AcumaticaClient()

# Using the 'every' decorator (automatically creates IntervalSchedule)
@every(hours=1, scheduler=client.scheduler)
def sync_customers():
    customers = client.customers.get_list()
    print(f"Synced {len(customers)} customers")
    return len(customers)

# Task is automatically added to scheduler
client.scheduler.start()

Best Practices

  • Keep the main script running: Use a while True loop or run as part of a web server/daemon
  • Keep task functions lightweight and fast
  • Use error callbacks to handle and log failures
  • Set appropriate timeouts to prevent hanging tasks
  • Be mindful of API rate limits when scheduling frequent tasks
  • Use max_instances=1 to prevent overlapping executions
  • Enable persistence for long-running applications
  • Test schedules with run_immediately=True before deploying
  • The scheduler automatically stops when the client is closed