Files
d3i-szct/models/dcm_task_process_info.py
2026-06-02 17:46:38 +08:00

757 lines
29 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import random
from typing import Union, Optional, Callable
import pandas as pd
from sqlalchemy import select, delete
from tornado_swagger.model import register_swagger_model
from wtforms import StringField, TextAreaField, IntegerField
from wtforms.validators import Length
import models
from models.common_model import CommonModel
from models.db_models import TD3iDcmTaskProcessInfo
from paste.core.logging import echo_log
from paste.util.pagination import Pagination
from paste.web.form import ModelForm
class DcmTaskProcessInfoForm(ModelForm):
"""
办理经过表单验证类(完全映射 TD3iDcmTaskProcessInfo 字段)。
用于验证和处理数字城管-部门待办任务办理经过的记录/更新表单数据。
字段完全映射数据库表 t_d3i_dcm_task_process_info 的字段结构。
"""
# 基础信息
raw_id = IntegerField('原始主键ID')
rec_id = IntegerField('记录ID')
act_id = IntegerField('任务ID')
act_def_id = IntegerField('流程节点定义ID')
act_def_name = StringField('流程节点名称', validators=[Length(max=100, message='流程节点名称长度不能超过100字符')])
act_time_state_id = IntegerField('操作时间状态ID')
act_limit_info = StringField('操作时限信息', validators=[Length(max=255, message='时限信息长度不能超过255字符')])
act_used_time_char = StringField('已用时间(字符串)', validators=[Length(max=50, message='已用时间描述长度不能超过50字符')])
act_remain_time_char = StringField('剩余时间(字符串)', validators=[Length(max=50, message='剩余时间描述长度不能超过50字符')])
act_deadline_time = IntegerField('操作截止时间戳(毫秒)')
act_property_id = IntegerField('操作属性ID')
# 操作信息
action_name = StringField('操作动作名称', validators=[Length(max=100, message='操作动作名称长度不能超过100字符')])
action_time = IntegerField('操作时间戳(毫秒)')
title = StringField('操作标题', validators=[Length(max=100, message='标题长度不能超过100字符')])
detail = TextAreaField('操作详细意见', validators=[Length(max=65535, message='意见长度不能超过65535字符')])
backup_detail = TextAreaField('备用意见', validators=[Length(max=65535, message='备用意见长度不能超过65535字符')])
medias = TextAreaField('附件信息(JSON格式)', validators=[Length(max=65535, message='附件信息长度不能超过65535字符')])
# 单位与人员
unit_name = StringField('当前操作单位', validators=[Length(max=100, message='单位名称长度不能超过100字符')])
unit_contact = StringField('单位联系方式', validators=[Length(max=255, message='联系方式长度不能超过255字符')])
human_id = IntegerField('操作人ID')
human_name = StringField('操作人名称(含单位)', validators=[Length(max=255, message='操作人名称长度不能超过255字符')])
role_name = StringField('当前角色名称', validators=[Length(max=100, message='角色名称长度不能超过100字符')])
# 项目信息
item_id = IntegerField('项目ID')
item_type_id = IntegerField('任务类型ID')
item_content = TextAreaField('任务内容摘要', validators=[Length(max=65535, message='内容摘要长度不能超过65535字符')])
item_process_info_list = TextAreaField('子流程列表(JSON', validators=[Length(max=65535, message='子流程列表长度不能超过65535字符')])
sub_process_info = TextAreaField('子流程信息', validators=[Length(max=65535, message='子流程信息长度不能超过65535字符')])
# 捆绑信息
bundle_time_state_id = IntegerField('组合时间状态ID')
bundle_limit_info = StringField('组合时限信息', validators=[Length(max=255, message='组合时限信息长度不能超过255字符')])
bundle_used_char = StringField('组合已用时间', validators=[Length(max=50, message='组合已用时间长度不能超过50字符')])
bundle_remain_char = StringField('组合剩余时间', validators=[Length(max=50, message='组合剩余时间长度不能超过50字符')])
bundle_deadline_time = IntegerField('组合截止时间戳(毫秒)')
# 下一节点信息
show_unit_contact = IntegerField('是否显示单位联系方式')
pre_unit_name = StringField('上一单位', validators=[Length(max=100, message='上一单位名称长度不能超过100字符')])
pre_action_name = StringField('上一操作名称', validators=[Length(max=100, message='上一操作名称长度不能超过100字符')])
pre_human_name = StringField('上一操作人', validators=[Length(max=255, message='上一操作人名称长度不能超过255字符')])
pre_act_opinion = TextAreaField('上一操作意见', validators=[Length(max=65535, message='上一操作意见长度不能超过65535字符')])
next_act_def_name = StringField('下一节点名称', validators=[Length(max=100, message='下一节点名称长度不能超过100字符')])
next_role_part_name = StringField('下一角色/单位', validators=[Length(max=255, message='下一角色/单位长度不能超过255字符')])
next_role_name = StringField('下一角色名称', validators=[Length(max=100, message='下一角色名称长度不能超过100字符')])
next_act_property_id = IntegerField('下一操作属性ID')
# 最后节点标识
last_act_flag = IntegerField('是否为最后一节点(0否,1是)')
def process(self, formdata=None, obj=None, **kwargs):
if formdata:
for name, values in formdata.items():
if isinstance(values, list) and values:
formdata[name] = [v.strip() if isinstance(v, str) else v for v in values]
elif isinstance(values, str):
formdata[name] = values.strip()
super().process(formdata, obj, **kwargs)
class DcmTaskProcessInfoBase(TD3iDcmTaskProcessInfo, CommonModel):
"""
办理经过基础类(完全映射 TD3iDcmTaskProcessInfo 字段)。
封装所有与任务办理经过相关的通用操作方法。
"""
FieldMapping = {
'raw_id': 'id',
'act_id': 'actID',
'act_def_id': 'actDefID',
'act_def_name': 'actDefName',
'act_time_state_id': 'actTimeStateID',
'act_limit_info': 'actLimitInfo',
'act_used_time_char': 'actUsedTimeChar',
'act_remain_time_char': 'actRemainTimeChar',
'act_deadline_time': 'actDeadlineTime',
'act_property_id': 'actPropertyID',
'action_name': 'actionName',
'action_time': 'actionTime',
'title': 'title',
'detail': 'detail',
'backup_detail': 'backupDetail',
'medias': 'medias',
'unit_name': 'unitName',
'unit_contact': 'unitContact',
'human_id': 'humanID',
'human_name': 'humanName',
'role_name': 'roleName',
'item_id': 'itemID',
'item_type_id': 'itemTypeID',
'item_content': 'itemContent',
'item_process_info_list': 'itemProcessInfoList',
'sub_process_info': 'subProcessInfo',
'bundle_time_state_id': 'bundleTimeStateID',
'bundle_limit_info': 'bundleLimitInfo',
'bundle_used_char': 'bundleUsedChar',
'bundle_remain_char': 'bundleRemainChar',
'bundle_deadline_time': 'bundleDeadlineTime',
'show_unit_contact': 'showUnitContact',
'pre_unit_name': 'preUnitName',
'pre_action_name': 'preActionName',
'pre_human_name': 'preHumanName',
'pre_act_opinion': 'preActOpinion',
'next_act_def_name': 'nextActDefName',
'next_role_part_name': 'nextRolePartName',
'next_role_name': 'nextRoleName',
'next_act_property_id': 'nextActPropertyID',
'last_act_flag': 'lastActFlag',
}
"""
处理流程映射
"""
@classmethod
async def exist_other(cls, id: Union[str, int], rec_id: Union[str, int], act_id: Union[str, int]):
"""
检查是否存在除当前记录外的其他同任务ID或同原始主键ID的记录。
:param act_id: 当前任务ID
:param rec_id: 原始主键ID
:return: 存在返回记录对象,不存在返回None
"""
_query = select(cls).where(cls.id != id, cls.rec_id == rec_id, cls.act_id == act_id)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def find_by_ids(cls, ids: list[Union[str, int]]):
"""
根据ID列表批量查找办理经过。
"""
_query = select(cls).where(cls.id.in_(ids))
_record_list: list[cls] = (await cls.orm_execute_scalars(_query)).all()
return _record_list
@classmethod
async def is_exist(cls, rec_id: Union[str, int], act_id: Union[str, int]):
"""
检查办理经过是否已经存在(根据原始主键ID)。
"""
_query = select(cls).where(cls.rec_id == rec_id, cls.act_id == act_id)
_record: cls = await cls.query_first(_query)
return _record
@classmethod
async def search_base(cls, is_paging=True, **kwargs):
"""
按参数搜索办理流程记录的基础方法。
支持字段:
- act_id, unit_name, human_name, role_name, last_act_flag
- 支持模糊匹配:act_def_name, action_name, title, item_content
:param is_paging: 是否分页
:param kwargs: 查询参数
:key int page_number: 页码(缺省随机1~100
:key int page_size: 每页数量(缺省20
:key dict sort_clause: 排序配置,如 {'action_time': 'desc'}
:key int act_id: 精确匹配任务ID
:key str unit_name: 精确匹配单位
:key str human_name: 精确匹配操作人
:key str role_name: 精确匹配角色
:key int last_act_flag: 精确匹配是否为最后一节点
:key str act_def_name: 模糊匹配流程节点名称
:key str action_name: 模糊匹配操作动作
:key str title: 模糊匹配标题
:key str item_content: 模糊匹配内容摘要
:key int action_time_start: 时间范围起始(毫秒)
:key int action_time_end: 时间范围结束(毫秒)
:return: (DataFrame, Pagination)
"""
page_number = kwargs.get('page_number', random.randint(1, 100))
page_size = kwargs.get('page_size', 20)
kwargs.update({'page_number': page_number, 'page_size': page_size})
_name_likes = {
cls.act_def_name.key: '%{}%',
cls.action_name.key: '%{}%',
cls.title.key: '%{}%',
cls.item_content.key: '%{}%',
}
_query = select(cls).where(
*cls.search_wheres(likes=_name_likes, **kwargs)
)
if 'action_time_start' in kwargs and 'action_time_end' in kwargs:
_query = _query.where(
cls.action_time >= kwargs['action_time_start'],
cls.action_time <= kwargs['action_time_end']
)
_paging = None
if is_paging:
_row_count = await cls.query_count(_query)
_paging = Pagination(_row_count).paging(page_number, page_size)
_data_query = _query.limit(page_size).offset(_paging.offset_size)
else:
_data_query = _query.where()
_sort_clause = cls.sort_clauses(kwargs.get('sort_clause', {}))
if _sort_clause:
_data_query = _data_query.order_by(*_sort_clause)
else:
_data_query = _data_query.order_by(cls.action_time.desc())
_process_df = await cls.query_as_df(_data_query)
if not _process_df.empty:
_process_df.replace(models.EmptyInDF + models.EmptyDatetimeInDF, '', inplace=True)
return _process_df, _paging
@classmethod
async def search(cls, **kwargs):
"""
按参数搜索办理流程记录,返回分页格式数据。
"""
_process_df, _paging = await cls.search_base(**kwargs)
return {
'total': _paging.row_count,
'rows': _process_df.to_dict('records'),
'pagination': {
'page_number': _paging.page_number,
'page_count': _paging.page_count,
'page_size': _paging.page_size,
},
}
@classmethod
async def exists_rec_id(cls, data_df: pd.DataFrame):
"""
查找 data_df 中在数据库中已存在和不存在的记录。根据 raw_id 字段判断。
:param data_df: 输入的数据框架,必须包含 raw_id 列
:return: (exists_df: pd.DataFrame, latest_df: pd.DataFrame)
- exists_df: 在数据库中存在的记录
- latest_df: 在数据库中不存在的记录
"""
if data_df.empty:
return pd.DataFrame(), pd.DataFrame()
# 获取待查询的 (rec_id, act_id) 组合
pairs = data_df[[cls.rec_id.key, cls.act_id.key]].drop_duplicates().values.tolist()
if not pairs:
return pd.DataFrame(), data_df.copy()
# 查询数据库中已存在的记录
_query = select(cls.id, cls.rec_id, cls.act_id).where(
(cls.rec_id.in_([p[0] for p in pairs])) &
(cls.act_id.in_([p[1] for p in pairs]))
)
exists_df = await cls.query_as_df(_query)
if exists_df.empty:
return pd.DataFrame(), data_df.copy()
# 构建 (rec_id, act_id) -> id 的映射
key_to_id_map = dict(zip(zip(exists_df[cls.rec_id.key], exists_df[cls.act_id.key]), exists_df[cls.id.key]))
# 根据组合是否在数据库中划分数据
mask_exists = data_df.apply(lambda row: (row[cls.rec_id.key], row[cls.act_id.key]) in key_to_id_map, axis=1)
exists_df = data_df[mask_exists].copy()
exists_df[cls.id.key] = exists_df.apply(lambda row: key_to_id_map[(row[cls.rec_id.key], row[cls.act_id.key])], axis=1)
latest_df = data_df[~mask_exists].copy()
return exists_df, latest_df
@classmethod
async def fill_process_info(cls, data_df: pd.DataFrame, index_field: str = 'id',
column_name: str = 'process_infos',
preprocessing: Optional[Callable] = None):
"""
填充办理过程数据到数据框架。
用于在查询结果中添加关联的办理过程信息。
:param pandas.DataFrame data_df: 待填充的数据框架
:param str index_field: 索引字段,一般是任务ID
:param str column_name: 填充时,新增加的列名称,默认为`process_infos`
:param preprocessing: 预处理,注意预处理必须要返回处理后的结果
:return: 办理过程数据框架(已填充)
:rtype: pandas.DataFrame
"""
if data_df.empty:
return pd.DataFrame()
_task_ids = list(set(data_df[index_field].unique().tolist()))
if not _task_ids:
return pd.DataFrame()
_query = select(cls).where(cls.dcm_task_id.in_(_task_ids))
_info_df: pd.DataFrame = await cls.query_as_df(_query)
if not _info_df.empty:
_info_df.replace(models.EmptyInDF+models.EmptyDatetimeInDF, '', inplace=True)
# 整理输出数据类型
_info_df[cls.id.key] = _info_df[cls.id.key].astype(str)
_info_df[cls.dcm_task_id.key] = _info_df[cls.dcm_task_id.key].astype(str)
# 设置索引
_info_df['index_id'] = _info_df[cls.dcm_task_id.key]
_info_df.set_index(['index_id'], inplace=True)
# 对数据进行预处理
if isinstance(preprocessing, Callable):
_info_df = preprocessing(_info_df)
# 增加数据填充列
data_df[column_name] = data_df[index_field].apply(
lambda x: _info_df.query(f"{cls.dcm_task_id.key}=='{x}'").to_dict('records')
)
else:
data_df[column_name] = [[] for _ in range(len(data_df))]
return _info_df
@register_swagger_model
class DcmTaskProcessInfo(DcmTaskProcessInfoBase):
"""
办理经过业务模型类(主业务类,完全继承 TD3iDcmTaskProcessInfo 字段)。
---
description: 数字城管-部门待办任务办理经过
type: object
properties:
id:
description: 主键ID
type: integer
example: 1001
readOnly: true
raw_id:
description: 原始主键ID
type: integer
example: 2001
act_id:
description: 任务ID
type: integer
example: 3001
act_def_id:
description: 流程节点定义ID
type: integer
example: 101
act_def_name:
description: 流程节点名称
type: string
example: "受理"
maxLength: 100
act_time_state_id:
description: 操作时间状态ID
type: integer
example: 1
act_limit_info:
description: 操作时限信息
type: string
example: "24小时内"
maxLength: 255
act_used_time_char:
description: 已用时间(字符串)
type: string
example: "12小时"
maxLength: 50
act_remain_time_char:
description: 剩余时间(字符串)
type: string
example: "12小时"
maxLength: 50
act_deadline_time:
description: 操作截止时间戳(毫秒)
type: integer
example: 1714578000000
act_property_id:
description: 操作属性ID
type: integer
example: 5
action_name:
description: 操作动作名称(如批转、回退)
type: string
example: "批转"
maxLength: 100
action_time:
description: 操作时间戳(毫秒)
type: integer
example: 1714567890000
title:
description: 操作标题
type: string
example: "案件受理完成"
maxLength: 100
detail:
description: 操作详细意见
type: string
example: "经核查,该案件属实,已转交市政工程处处理。"
maxLength: 65535
backup_detail:
description: 备用意见
type: string
example: "系统自动记录"
maxLength: 65535
medias:
description: 附件信息(JSON格式)
type: string
example: "[{\"media_id\":3001,\"name\":\"photo.jpg\"}]"
maxLength: 65535
unit_name:
description: 当前操作单位
type: string
example: "市政工程处"
maxLength: 100
unit_contact:
description: 单位联系方式
type: string
example: "025-88888888"
maxLength: 255
human_id:
description: 操作人ID-1为系统
type: integer
example: 101
human_name:
description: 操作人名称(含单位)
type: string
example: "张三(市政工程处)"
maxLength: 255
role_name:
description: 当前角色名称
type: string
example: "审批员"
maxLength: 100
item_id:
description: 项目ID
type: integer
example: 4001
item_type_id:
description: 任务类型ID
type: integer
example: 101
item_content:
description: 任务内容摘要
type: string
example: "中山路破损路面修复"
maxLength: 65535
item_process_info_list:
description: 子流程列表(JSON
type: string
example: "[{\"node\":\"受理\",\"time\":1714567890000}]"
maxLength: 65535
sub_process_info:
description: 子流程信息
type: string
example: "{\"sub1\":\"已处理\",\"sub2\":\"待验收\"}"
maxLength: 65535
bundle_time_state_id:
description: 组合时间状态ID
type: integer
example: 2
bundle_limit_info:
description: 组合时限信息
type: string
example: "48小时内"
maxLength: 255
bundle_used_char:
description: 组合已用时间
type: string
example: "36小时"
maxLength: 50
bundle_remain_char:
description: 组合剩余时间
type: string
example: "12小时"
maxLength: 50
bundle_deadline_time:
description: 组合截止时间戳(毫秒)
type: integer
example: 1714580000000
show_unit_contact:
description: 是否显示单位联系方式
type: integer
example: 1
pre_unit_name:
description: 上一单位
type: string
example: "街道办"
maxLength: 100
pre_action_name:
description: 上一操作名称
type: string
example: "初审"
maxLength: 100
pre_human_name:
description: 上一操作人
type: string
example: "李四(街道办)"
maxLength: 255
pre_act_opinion:
description: 上一操作意见
type: string
example: "建议转交专业部门处理"
maxLength: 65535
next_act_def_name:
description: 下一节点名称
type: string
example: "审批"
maxLength: 100
next_role_part_name:
description: 下一角色/单位
type: string
example: "市政工程处-审批组"
maxLength: 255
next_role_name:
description: 下一角色名称
type: string
example: "审批员"
maxLength: 100
next_act_property_id:
description: 下一操作属性ID
type: integer
example: 6
last_act_flag:
description: 是否为最后一节点(0否,1是)
type: integer
example: 0
created_at:
description: 创建时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-15 10:30:00"
readOnly: true
created_by:
description: 创建者用户名
type: string
example: "admin"
readOnly: true
updated_at:
description: 修改时间,ISO格式的日期时间字符串
type: string
format: date-time
example: "2024-01-16 14:25:00"
readOnly: true
updated_by:
description: 修改者用户名
type: string
example: "editor"
readOnly: true
"""
@classmethod
async def create(cls, **kwargs):
"""
创建办理经过。
业务流程:
1. 使用 D3iDcmTaskProcessInfoForm 验证表单数据
2. 设置创建者、更新者
3. 保存到数据库
4. 返回创建的流程记录对象
:param kwargs: 办理经过参数字典
:return: 新建流程记录对象
:rtype: DcmTaskProcessInfo
:raises ValidationError: 当表单验证失败时抛出
"""
# 处理字符串字段去除空格
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmTaskProcessInfoForm(formdata=kwargs)
_form.validate_form()
# 检查是否存在同记录ID的任务
_task: cls = await cls.is_exist(_form.rec_id.data, _form.act_id.data)
assert _task is None, "记录ID已存在,不能重复创建。"
_record = cls().copy_from_dict(_form.data, skip_none=True).before_save()
await _record.async_save()
return _record
@classmethod
async def delete(cls, process_id: Union[str, int]):
"""
软删除办理经过(设置 delete_flag=1)。
:param process_id: 要删除的办理经过ID
:return: 删除的流程记录对象
:rtype: DcmTaskProcessInfo
:raises AssertionError: 当记录不存在时抛出
"""
_record: cls = await cls.async_find_by_id(process_id)
assert _record, f"根据 ID {process_id} 未找到办理经过。"
# 执行删除
_del_query = delete(cls).where(cls.id == _record.id)
_del_count = (await cls.raw_execute(_del_query)).rowcount
echo_log(f'已删除任务办理经过(记录ID{_record.rec_id}ID{_record.id}.')
return _record
@classmethod
async def modify(cls, process_id: Union[str, int], **kwargs):
"""
修改办理经过(仅允许修改非核心流程字段,如意见、标题、联系方式等)。
注意:不允许修改 act_id、action_time、act_def_name 等关键流程节点信息。
业务流程:
1. 将 process_id 加入参数
2. 处理字符串字段去除空格
3. 使用表单验证
4. 查询原记录
5. 验证存在性
6. 更新允许字段
7. 设置更新者
8. 保存
:param process_id: 要修改的办理经过ID
:param kwargs: 需要更新的字段
:return: 修改后的流程记录对象
:rtype: DcmTaskProcessInfo
:raises AssertionError: 当记录不存在时抛出
:raises ValidationError: 当表单验证失败时抛出
"""
kwargs[cls.id.key] = process_id
for _k, _v in kwargs.items():
if isinstance(_v, str):
kwargs[_k] = _v.strip()
_form = DcmTaskProcessInfoForm(formdata=kwargs)
_form.validate_form()
_record: cls = await cls.async_find_by_id(process_id)
assert _record, f'查无此办理经过。'
# 仅允许更新非核心流程字段
allowed_fields = {
'title', 'detail', 'backup_detail', 'medias',
'unit_contact', 'human_name', 'role_name',
'item_content', 'item_process_info_list', 'sub_process_info',
'show_unit_contact', 'pre_act_opinion', 'pre_human_name'
}
update_data = {k: v for k, v in _form.data.items() if k in allowed_fields and v is not None}
_record.copy_from_dict(update_data, skip_none=True).before_save()
await _record.async_save()
return _record
@classmethod
async def create_batch(cls, data_df: pd.DataFrame):
"""
批量创建新办理经过(传入数据应为全新记录,无需校验是否存在)。
:param data_df: 包含办理经过数据的 DataFrame,字段需与模型属性匹配(如 raw_id, act_id 等)
:return: 成功创建的记录数量
:rtype: int
"""
if data_df.empty:
return 0
# 一次性转为字典列表(C 层高效)
records = data_df.to_dict('records')
# 用列表推导式构造对象
records = [cls().copy_from_dict(record, skip_none=True).before_save() for record in records]
# 批量插入
session = cls.get_aio_session()
try:
session.add_all(records)
await session.commit()
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量创建成功:创建 {len(records)} 条任务办理经过。")
return len(records)
@classmethod
async def modify_batch(cls, data_df: pd.DataFrame):
"""
批量修改已有办理经过。
:param data_df: 包含办理经过数据的 DataFrame
:return: 成功更新的记录数量
:rtype: int
"""
if data_df.empty:
return 0
# 必须包含 id 列
if 'id' not in data_df.columns:
echo_log(f"错误:modify_batch 要求输入数据必须包含 '{cls.id.key}' 列(主键)")
return 0
# 转换为字典列表
update_data = data_df.to_dict('records')
# 使用 bulk_update_mappings
session = cls.get_aio_session()
try:
await session.run_sync(
lambda sync_session: sync_session.bulk_update_mappings(cls, update_data)
)
await session.commit()
updated_count = len(update_data)
except Exception as e:
await session.rollback()
raise e
finally:
await session.close()
echo_log(f"批量修改成功:更新 {updated_count} 条任务办理经过。")
return updated_count
@classmethod
async def save_batch(cls, data_df: pd.DataFrame):
"""
批量保存数据,自动处理新建和更新。
:param data_df: 要保存的数据框架
:return: 新建和更新的数量
"""
# 筛选数据状态
_exists_df, _latest_df = await DcmTaskProcessInfo.exists_rec_id(data_df)
# 保存到数据库
_created_count = await DcmTaskProcessInfo.create_batch(_latest_df)
_updated_count = await DcmTaskProcessInfo.modify_batch(_exists_df)
return _created_count, _updated_count