Generic Task Operations

Generic task operations.

async shared.database.operations.generic_task.create_task(task_type, data, status='pending', result=None)[source]

Create a generic task.

Parameters:
  • task_type (str) – Task type

  • data (dict[str, Any]) – Task data

  • status (str) – Task status

  • result (Optional[str]) – Task result

Return type:

Task

Returns:

Task instance

async shared.database.operations.generic_task.get_task(task_id)[source]

Get task by ID.

Parameters:

task_id (int) – Task ID

Return type:

Optional[Task]

Returns:

Task instance or None

async shared.database.operations.generic_task.list_completed_tasks_since(last_id)[source]

List completed tasks since last ID.

Parameters:

last_id (int) – Last task ID

Return type:

List[Task]

Returns:

List of Task instances

async shared.database.operations.generic_task.list_pending_tasks()[source]

List pending tasks.

Return type:

List[Task]

Returns:

List of Task instances

async shared.database.operations.generic_task.mark_task_completed(task_id, result_text)[source]

Mark task as completed.

Parameters:
  • task_id (int) – Task ID

  • result_text (Optional[str]) – Result text

Return type:

None

async shared.database.operations.generic_task.mark_task_failed(task_id, error_text)[source]

Mark task as failed.

Parameters:
  • task_id (int) – Task ID

  • error_text (str) – Error text

Return type:

None

async shared.database.operations.generic_task.mark_task_sent(task_id)[source]

Mark task as sent.

Parameters:

task_id (int) – Task ID

Return type:

None