Compare commits
20 Commits
feat/iot-2
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 14c239054f | |||
| 8c5c5ef44a | |||
| acd7a35e1d | |||
| d6f625151c | |||
| 3cfd342318 | |||
| 6c4153fe23 | |||
| 8c664a479d | |||
| c78759fd52 | |||
| ba6f94a279 | |||
| 9f3ca9c6f2 | |||
| 323ddf27fb | |||
| a5f916c62a | |||
| 3e248fee8c | |||
| b534d79434 | |||
| c24b1eb641 | |||
| 4d85659277 | |||
| 6bbd49355d | |||
| 7707455a24 | |||
| ea374d131a | |||
| a32a4375bc |
60
.github/workflows/maven.yml
vendored
60
.github/workflows/maven.yml
vendored
@@ -1,30 +1,30 @@
|
||||
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
|
||||
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
|
||||
|
||||
name: Java CI with Maven
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ master ]
|
||||
# pull_request:
|
||||
# branches: [ master ]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
java: [ '8', '11', '17' ]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up JDK ${{ matrix.Java }}
|
||||
uses: actions/setup-java@v2
|
||||
with:
|
||||
java-version: ${{ matrix.java }}
|
||||
distribution: 'temurin'
|
||||
cache: maven
|
||||
- name: Build with Maven
|
||||
run: mvn -B package --file pom.xml -Dmaven.test.skip=true
|
||||
# This workflow will build a Java project with Maven, and cache/restore any dependencies to improve the workflow execution time
|
||||
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven
|
||||
|
||||
name: Java CI with Maven
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [ master, release/next ]
|
||||
# pull_request:
|
||||
# branches: [ master ]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
java: [ '8', '11', '17' ]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
- name: Set up JDK ${{ matrix.Java }}
|
||||
uses: actions/setup-java@v2
|
||||
with:
|
||||
java-version: ${{ matrix.java }}
|
||||
distribution: 'temurin'
|
||||
cache: maven
|
||||
- name: Build with Maven
|
||||
run: mvn -B package --file pom.xml -Dmaven.test.skip=true
|
||||
|
||||
168
Jenkinsfile
vendored
168
Jenkinsfile
vendored
@@ -22,16 +22,29 @@ pipeline {
|
||||
|
||||
// 镜像仓库配置(Infra 服务器内网地址,Prod 服务器可通过内网拉取)
|
||||
REGISTRY = '172.17.16.7:5000'
|
||||
REGISTRY_HOST = '172.17.16.7'
|
||||
REGISTRY_CONTAINER = 'registry'
|
||||
DEPS_IMAGE = "${REGISTRY}/aiot-deps:latest"
|
||||
|
||||
// 服务配置
|
||||
CORE_SERVICES = 'viewsh-gateway,viewsh-module-system-server,viewsh-module-infra-server,viewsh-module-iot-server,viewsh-module-iot-gateway,viewsh-module-ops-server'
|
||||
|
||||
// 部署配置(Prod 服务器内网地址)
|
||||
// 部署配置(默认 Prod,release/next 分支会在 Initialize 阶段覆盖为 Staging)
|
||||
DEPLOY_HOST = '172.17.16.14'
|
||||
DEPLOY_PATH = '/opt/aiot-platform-cloud'
|
||||
SSH_KEY = '/var/jenkins_home/.ssh/id_rsa'
|
||||
|
||||
// Staging 服务器配置
|
||||
STAGING_DEPLOY_HOST = '172.17.16.7'
|
||||
STAGING_DEPLOY_PATH = '/opt/aiot-platform-cloud'
|
||||
|
||||
// 磁盘守护阈值(%):低于 MIN 直接 fail;低于 WARN 仅告警
|
||||
DISK_FREE_MIN_PCT = '5'
|
||||
DISK_FREE_WARN_PCT = '10'
|
||||
|
||||
// 镜像保留份数(每服务)
|
||||
IMAGE_KEEP_COUNT = '3'
|
||||
|
||||
// 性能配置 - 将动态调整
|
||||
BUILD_TIMEOUT = 45
|
||||
DEPLOY_TIMEOUT = 10
|
||||
@@ -63,6 +76,15 @@ pipeline {
|
||||
echo "Start Time: ${new Date()}"
|
||||
echo "=========================================="
|
||||
|
||||
// 根据分支选择部署目标
|
||||
if (env.BRANCH_NAME == 'release/next') {
|
||||
env.DEPLOY_HOST = env.STAGING_DEPLOY_HOST
|
||||
env.DEPLOY_PATH = env.STAGING_DEPLOY_PATH
|
||||
echo "📦 Deploy target: STAGING (${env.DEPLOY_HOST})"
|
||||
} else {
|
||||
echo "📦 Deploy target: PRODUCTION (${env.DEPLOY_HOST})"
|
||||
}
|
||||
|
||||
// 【优化2】动态检测系统资源
|
||||
detectSystemResources()
|
||||
}
|
||||
@@ -257,17 +279,44 @@ pipeline {
|
||||
}
|
||||
}
|
||||
|
||||
stage('Deploy') {
|
||||
stage('Pre-deploy Check') {
|
||||
when {
|
||||
allOf {
|
||||
expression { env.SERVICES_TO_BUILD != '' }
|
||||
branch 'master'
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch 'release/next'
|
||||
}
|
||||
}
|
||||
}
|
||||
steps {
|
||||
script {
|
||||
def stageStartTime = System.currentTimeMillis()
|
||||
|
||||
echo "🛡️ Pre-deploy health check: remote disk & SSH reachability"
|
||||
|
||||
// 检查 Prod 与 Registry 两台主机的磁盘,低于阈值 fail fast,避免 sshd 在磁盘满时被拖垮
|
||||
checkRemoteDiskOrFail(env.DEPLOY_HOST, 'Deploy')
|
||||
checkRemoteDiskOrFail(env.REGISTRY_HOST, 'Registry')
|
||||
|
||||
recordStageMetrics('Pre-deploy Check', stageStartTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage('Deploy') {
|
||||
when {
|
||||
allOf {
|
||||
expression { env.SERVICES_TO_BUILD != '' }
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch 'release/next'
|
||||
}
|
||||
}
|
||||
}
|
||||
steps {
|
||||
script {
|
||||
def stageStartTime = System.currentTimeMillis()
|
||||
|
||||
def servicesToDeploy = env.SERVICES_TO_BUILD.split(',')
|
||||
def sortedServices = sortServicesByDependency(servicesToDeploy)
|
||||
|
||||
@@ -312,7 +361,10 @@ pipeline {
|
||||
when {
|
||||
allOf {
|
||||
expression { env.SERVICES_TO_BUILD != '' }
|
||||
branch 'master'
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch 'release/next'
|
||||
}
|
||||
}
|
||||
}
|
||||
steps {
|
||||
@@ -356,6 +408,32 @@ pipeline {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
stage('Cleanup Old Images') {
|
||||
when {
|
||||
allOf {
|
||||
expression { env.SERVICES_TO_BUILD != '' }
|
||||
anyOf {
|
||||
branch 'master'
|
||||
branch 'release/next'
|
||||
}
|
||||
}
|
||||
}
|
||||
steps {
|
||||
script {
|
||||
def stageStartTime = System.currentTimeMillis()
|
||||
echo "🧹 Cleaning up old images (keep=${env.IMAGE_KEEP_COUNT})"
|
||||
|
||||
// Prod/Staging 本地:清旧镜像 + dangling + builder cache
|
||||
cleanupDeployHost(env.DEPLOY_HOST, env.IMAGE_KEEP_COUNT)
|
||||
|
||||
// Registry:按保留策略删 manifest + 触发 GC 释放磁盘
|
||||
cleanupRegistry(env.REGISTRY_HOST, env.REGISTRY_CONTAINER, env.IMAGE_KEEP_COUNT)
|
||||
|
||||
recordStageMetrics('Cleanup Old Images', stageStartTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
post {
|
||||
@@ -790,12 +868,16 @@ def buildService(String service) {
|
||||
--build-arg SKIP_TESTS=true \\
|
||||
--build-arg MAVEN_OPTS="${env.MAVEN_OPTS}" \\
|
||||
-t ${env.REGISTRY}/${service}:${env.IMAGE_TAG} \\
|
||||
-t ${env.REGISTRY}/${service}:latest \\
|
||||
.
|
||||
|
||||
# 推送镜像
|
||||
# 推送带版本号的镜像
|
||||
docker push ${env.REGISTRY}/${service}:${env.IMAGE_TAG}
|
||||
docker push ${env.REGISTRY}/${service}:latest
|
||||
|
||||
# 仅 master 分支推送 latest 标签
|
||||
if [ "${env.BRANCH_NAME}" = "master" ]; then
|
||||
docker tag ${env.REGISTRY}/${service}:${env.IMAGE_TAG} ${env.REGISTRY}/${service}:latest
|
||||
docker push ${env.REGISTRY}/${service}:latest
|
||||
fi
|
||||
|
||||
set +x
|
||||
"""
|
||||
@@ -1050,3 +1132,73 @@ def getModulePathForService(String service) {
|
||||
]
|
||||
return map.get(service, service)
|
||||
}
|
||||
|
||||
// ============================================
|
||||
// 磁盘/清理相关 helper(Prod + Registry)
|
||||
// ============================================
|
||||
|
||||
// 检查远端磁盘剩余百分比;低于 MIN 阈值直接 fail;低于 WARN 仅告警
|
||||
def checkRemoteDiskOrFail(String host, String role) {
|
||||
def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}"
|
||||
def minPct = Integer.parseInt(env.DISK_FREE_MIN_PCT)
|
||||
def warnPct = Integer.parseInt(env.DISK_FREE_WARN_PCT)
|
||||
|
||||
def freePct
|
||||
try {
|
||||
// awk 的 $5+0 会把 "42%" 强转为 42,避免多层 gsub 转义
|
||||
freePct = sh(
|
||||
script: "ssh ${sshOpts} root@${host} \"df -P / | awk 'NR==2 { print 100 - \\\$5+0 }'\"",
|
||||
returnStdout: true
|
||||
).trim() as int
|
||||
} catch (Exception e) {
|
||||
error("❌ [${role}] 无法通过 SSH 到 ${host} 读取磁盘(可能 sshd 已被磁盘满拖垮):${e.message}")
|
||||
}
|
||||
|
||||
echo " ${role}@${host}: 根分区空闲 ${freePct}%"
|
||||
if (freePct < minPct) {
|
||||
error("❌ [${role}] ${host} 根分区空闲仅 ${freePct}% < ${minPct}%,终止部署避免二次失败。请先手动清理或跑 scripts/cleanup.sh")
|
||||
} else if (freePct < warnPct) {
|
||||
echo "⚠️ [${role}] ${host} 空闲 ${freePct}% < ${warnPct}%,本次部署后会触发自动清理"
|
||||
}
|
||||
}
|
||||
|
||||
// Prod/Staging 本地清理:调用仓库内的 cleanup.sh
|
||||
def cleanupDeployHost(String host, String keep) {
|
||||
def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}"
|
||||
try {
|
||||
echo "📤 Syncing scripts/cleanup.sh to ${host}..."
|
||||
sh "scp ${sshOpts} scripts/cleanup.sh root@${host}:${env.DEPLOY_PATH}/cleanup.sh"
|
||||
sh """
|
||||
ssh ${sshOpts} root@${host} '
|
||||
cd ${env.DEPLOY_PATH}
|
||||
chmod +x cleanup.sh
|
||||
./cleanup.sh --keep=${keep} --registry=${env.REGISTRY}
|
||||
'
|
||||
"""
|
||||
echo "✅ Deploy host 清理完成"
|
||||
} catch (Exception e) {
|
||||
// 清理失败不影响发布结果,仅告警
|
||||
echo "⚠️ Deploy host 清理失败(不致命):${e.message}"
|
||||
}
|
||||
}
|
||||
|
||||
// Registry 清理:同步 python 脚本 → 按保留策略删 manifest → GC
|
||||
def cleanupRegistry(String host, String registryContainer, String keep) {
|
||||
def sshOpts = "-o StrictHostKeyChecking=no -o ConnectTimeout=10 -i ${env.SSH_KEY}"
|
||||
try {
|
||||
echo "📤 Syncing scripts/registry-cleanup.py to ${host}..."
|
||||
sh "scp ${sshOpts} scripts/registry-cleanup.py root@${host}:/tmp/registry-cleanup.py"
|
||||
sh """
|
||||
ssh ${sshOpts} root@${host} '
|
||||
python3 /tmp/registry-cleanup.py \
|
||||
--registry http://localhost:5000 \
|
||||
--keep ${keep} \
|
||||
--repos ${env.CORE_SERVICES} \
|
||||
--gc-container ${registryContainer}
|
||||
'
|
||||
"""
|
||||
echo "✅ Registry 清理 + GC 完成"
|
||||
} catch (Exception e) {
|
||||
echo "⚠️ Registry 清理失败(不致命):${e.message}"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,14 +3,7 @@
|
||||
# 预下载所有依赖,供服务构建时复用
|
||||
# ============================================
|
||||
|
||||
FROM eclipse-temurin:17-jdk-alpine
|
||||
|
||||
# 安装 Maven(从阿里云镜像下载,避免 apk maven 包拉入冗余 JDK)
|
||||
ARG MAVEN_VERSION=3.9.14
|
||||
RUN wget -q https://mirrors.aliyun.com/apache/maven/maven-3/${MAVEN_VERSION}/binaries/apache-maven-${MAVEN_VERSION}-bin.tar.gz -O /tmp/maven.tar.gz \
|
||||
&& tar xzf /tmp/maven.tar.gz -C /opt \
|
||||
&& ln -s /opt/apache-maven-${MAVEN_VERSION}/bin/mvn /usr/bin/mvn \
|
||||
&& rm /tmp/maven.tar.gz
|
||||
FROM maven:3.9-eclipse-temurin-17
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
|
||||
@@ -6,14 +6,7 @@
|
||||
# ============================================
|
||||
|
||||
# ============ 构建阶段 ============
|
||||
FROM eclipse-temurin:17-jdk-alpine AS builder
|
||||
|
||||
# 安装 Maven(从阿里云镜像下载,避免 apk maven 包拉入冗余 JDK)
|
||||
ARG MAVEN_VERSION=3.9.14
|
||||
RUN wget -q https://mirrors.aliyun.com/apache/maven/maven-3/${MAVEN_VERSION}/binaries/apache-maven-${MAVEN_VERSION}-bin.tar.gz -O /tmp/maven.tar.gz \
|
||||
&& tar xzf /tmp/maven.tar.gz -C /opt \
|
||||
&& ln -s /opt/apache-maven-${MAVEN_VERSION}/bin/mvn /usr/bin/mvn \
|
||||
&& rm /tmp/maven.tar.gz
|
||||
FROM maven:3.9-eclipse-temurin-17 AS builder
|
||||
|
||||
WORKDIR /build
|
||||
|
||||
|
||||
@@ -2,80 +2,122 @@
|
||||
|
||||
# ============================================
|
||||
# AIOT Platform - 清理脚本
|
||||
# 清理旧镜像和容器,释放存储空间
|
||||
# 清理部署主机上的旧镜像 / 停止容器 / 构建缓存,释放存储空间
|
||||
# ============================================
|
||||
|
||||
set -e
|
||||
|
||||
# ---- 默认参数 ----
|
||||
KEEP=3
|
||||
PRUNE_VOLUMES=false
|
||||
REGISTRY_HOST="localhost:5000"
|
||||
|
||||
usage() {
|
||||
cat <<EOF
|
||||
用法: $0 [options]
|
||||
|
||||
Options:
|
||||
--keep=N 每个服务保留最近 N 个本地镜像(默认 3)
|
||||
--prune-volumes 额外清理未使用的 Docker volume(默认不清,避免误删数据)
|
||||
--registry=HOST 本地 docker images 仓库前缀(默认 localhost:5000)
|
||||
-h, --help 帮助
|
||||
|
||||
示例:
|
||||
$0 --keep=3
|
||||
$0 --keep=2 --prune-volumes
|
||||
EOF
|
||||
}
|
||||
|
||||
for arg in "$@"; do
|
||||
case "$arg" in
|
||||
--keep=*) KEEP="${arg#*=}" ;;
|
||||
--prune-volumes) PRUNE_VOLUMES=true ;;
|
||||
--registry=*) REGISTRY_HOST="${arg#*=}" ;;
|
||||
-h|--help) usage; exit 0 ;;
|
||||
*) echo "未知参数: $arg"; usage; exit 1 ;;
|
||||
esac
|
||||
done
|
||||
|
||||
if ! [[ "$KEEP" =~ ^[0-9]+$ ]] || [ "$KEEP" -lt 1 ]; then
|
||||
echo "❌ --keep 必须为正整数"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# 颜色输出
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m'
|
||||
|
||||
log_info() {
|
||||
echo -e "${GREEN}[INFO]${NC} $1"
|
||||
}
|
||||
|
||||
log_warn() {
|
||||
echo -e "${YELLOW}[WARN]${NC} $1"
|
||||
}
|
||||
log_info() { echo -e "${GREEN}[INFO]${NC} $1"; }
|
||||
log_warn() { echo -e "${YELLOW}[WARN]${NC} $1"; }
|
||||
|
||||
log_info "========================================="
|
||||
log_info "AIOT Platform 清理开始"
|
||||
log_info "AIOT Platform 清理开始 (keep=${KEEP})"
|
||||
log_info "========================================="
|
||||
|
||||
# 显示当前磁盘使用情况
|
||||
log_info "当前磁盘使用情况:"
|
||||
df -h | grep -E "Filesystem|/$"
|
||||
df -h | grep -E "Filesystem|/$" || true
|
||||
echo ""
|
||||
log_info "当前 Docker 磁盘使用:"
|
||||
docker system df
|
||||
echo ""
|
||||
|
||||
# 清理停止的容器
|
||||
log_info "清理停止的容器..."
|
||||
docker container prune -f
|
||||
|
||||
# 清理悬空镜像
|
||||
log_info "清理悬空镜像..."
|
||||
docker image prune -f
|
||||
|
||||
# 清理旧版本镜像(保留最近 3 个版本)
|
||||
log_info "清理旧版本镜像(保留最近 3 个版本)..."
|
||||
log_info "清理旧版本镜像(每个服务保留最近 ${KEEP} 个)..."
|
||||
|
||||
SERVICES="viewsh-gateway viewsh-module-system-server viewsh-module-infra-server viewsh-module-iot-server viewsh-module-iot-gateway viewsh-module-ops-server"
|
||||
SERVICES=(
|
||||
"viewsh-gateway"
|
||||
"viewsh-module-system-server"
|
||||
"viewsh-module-infra-server"
|
||||
"viewsh-module-iot-server"
|
||||
"viewsh-module-iot-gateway"
|
||||
"viewsh-module-ops-server"
|
||||
)
|
||||
|
||||
for service in $SERVICES; do
|
||||
for service in "${SERVICES[@]}"; do
|
||||
log_info "处理服务: ${service}"
|
||||
|
||||
# 获取所有镜像,按时间排序,删除除了最新 3 个之外的所有镜像
|
||||
docker images "localhost:5000/${service}" --format "{{.ID}} {{.Tag}}" | \
|
||||
grep -v "latest" | \
|
||||
tail -n +4 | \
|
||||
awk '{print $1}' | \
|
||||
xargs -r docker rmi -f 2>/dev/null || true
|
||||
|
||||
# 按创建时间降序取 ID 列表,跳过 latest tag,保留前 KEEP 个
|
||||
mapfile -t ids_to_delete < <(
|
||||
docker images "${REGISTRY_HOST}/${service}" \
|
||||
--format '{{.CreatedAt}}|{{.ID}}|{{.Tag}}' \
|
||||
| grep -v '|latest$' \
|
||||
| sort -r \
|
||||
| awk -F'|' -v k="$KEEP" 'NR > k {print $2}'
|
||||
)
|
||||
|
||||
if [ "${#ids_to_delete[@]}" -eq 0 ]; then
|
||||
log_info " └─ 无可清理镜像"
|
||||
continue
|
||||
fi
|
||||
|
||||
log_info " └─ 删除 ${#ids_to_delete[@]} 个旧镜像"
|
||||
# 去重后批量删
|
||||
printf '%s\n' "${ids_to_delete[@]}" | sort -u | xargs -r docker rmi -f 2>/dev/null || true
|
||||
done
|
||||
|
||||
# 清理未使用的卷(谨慎使用)
|
||||
log_warn "是否清理未使用的卷? (y/N)"
|
||||
read -r response
|
||||
if [ "$response" = "y" ] || [ "$response" = "Y" ]; then
|
||||
log_info "清理未使用的卷..."
|
||||
if [ "$PRUNE_VOLUMES" = true ]; then
|
||||
log_warn "清理未使用的 volume(--prune-volumes 已启用)"
|
||||
docker volume prune -f
|
||||
else
|
||||
log_info "跳过 volume 清理(如需清理请加 --prune-volumes)"
|
||||
fi
|
||||
|
||||
# 清理构建缓存
|
||||
log_info "清理 Docker 构建缓存..."
|
||||
docker builder prune -f --filter "until=24h"
|
||||
log_info "清理 Docker 构建缓存(24h 前)..."
|
||||
docker builder prune -f --filter "until=24h" || true
|
||||
|
||||
# 显示清理后的磁盘使用情况
|
||||
echo ""
|
||||
log_info "========================================="
|
||||
log_info "清理完成"
|
||||
log_info "========================================="
|
||||
echo ""
|
||||
log_info "清理后磁盘使用情况:"
|
||||
df -h | grep -E "Filesystem|/$"
|
||||
df -h | grep -E "Filesystem|/$" || true
|
||||
echo ""
|
||||
log_info "清理后 Docker 磁盘使用:"
|
||||
docker system df
|
||||
|
||||
261
scripts/registry-cleanup.py
Normal file
261
scripts/registry-cleanup.py
Normal file
@@ -0,0 +1,261 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Docker Registry 镜像清理工具
|
||||
|
||||
按 tag 语义/时间排序,每个仓库保留最近 N 个版本(默认 3),其余逻辑删除。
|
||||
支持可选触发容器内 `registry garbage-collect` 真正回收磁盘空间。
|
||||
|
||||
典型用法(在 Registry 宿主机上执行):
|
||||
# 保留最近 3 个
|
||||
python3 registry-cleanup.py
|
||||
|
||||
# 指定仓库列表 + GC
|
||||
python3 registry-cleanup.py \\
|
||||
--registry http://localhost:5000 \\
|
||||
--keep 3 \\
|
||||
--repos viewsh-gateway,viewsh-module-iot-server \\
|
||||
--gc-container registry
|
||||
|
||||
# 空跑查看计划(不执行删除)
|
||||
python3 registry-cleanup.py --dry-run
|
||||
|
||||
退出码:0=成功 / 1=参数错误 / 2=Registry 不可达 / 3=部分仓库处理失败
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
DEFAULT_REGISTRY = "http://localhost:5000"
|
||||
DEFAULT_KEEP = 3
|
||||
DEFAULT_REPOS = [
|
||||
"viewsh-gateway",
|
||||
"viewsh-module-infra-server",
|
||||
"viewsh-module-iot-gateway",
|
||||
"viewsh-module-iot-server",
|
||||
"viewsh-module-ops-server",
|
||||
"viewsh-module-system-server",
|
||||
]
|
||||
|
||||
# 覆盖常见的 manifest 媒体类型,避免 BuildKit/OCI 推的 tag 取不到 digest
|
||||
MANIFEST_ACCEPT = ", ".join([
|
||||
"application/vnd.docker.distribution.manifest.v2+json",
|
||||
"application/vnd.docker.distribution.manifest.list.v2+json",
|
||||
"application/vnd.oci.image.manifest.v1+json",
|
||||
"application/vnd.oci.image.index.v1+json",
|
||||
])
|
||||
|
||||
|
||||
def http_request(method: str, url: str, headers: Optional[dict] = None, timeout: int = 10):
|
||||
req = urllib.request.Request(url, method=method, headers=headers or {})
|
||||
return urllib.request.urlopen(req, timeout=timeout)
|
||||
|
||||
|
||||
def list_tags(registry: str, repo: str) -> List[str]:
|
||||
url = f"{registry}/v2/{repo}/tags/list"
|
||||
try:
|
||||
with http_request("GET", url) as resp:
|
||||
body = resp.read().decode("utf-8")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
data = json.loads(body)
|
||||
return [t for t in (data.get("tags") or []) if t != "latest"]
|
||||
|
||||
|
||||
def get_manifest_digest(registry: str, repo: str, tag: str) -> Optional[str]:
|
||||
url = f"{registry}/v2/{repo}/manifests/{tag}"
|
||||
try:
|
||||
with http_request("HEAD", url, headers={"Accept": MANIFEST_ACCEPT}) as resp:
|
||||
return resp.headers.get("Docker-Content-Digest")
|
||||
except urllib.error.HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def delete_manifest(registry: str, repo: str, digest: str) -> bool:
|
||||
url = f"{registry}/v2/{repo}/manifests/{digest}"
|
||||
try:
|
||||
with http_request("DELETE", url) as resp:
|
||||
return 200 <= resp.status < 300
|
||||
except urllib.error.HTTPError as e:
|
||||
# 202 Accepted 会抛 HTTPError,特殊处理
|
||||
return 200 <= e.code < 300
|
||||
|
||||
|
||||
# tag 排序:优先按"数字/build 号"降序,其次按字典序降序
|
||||
_BUILD_NUM_RE = re.compile(r"(\d+)")
|
||||
|
||||
|
||||
def tag_sort_key(tag: str) -> Tuple[int, str]:
|
||||
"""
|
||||
返回 (数字, 原始tag) 供降序排序使用。
|
||||
- 若 tag 包含数字(如 build-123、1.2.3、20260424103000),取最后一个数字段作主排序键
|
||||
- 否则数字位 = -1,只用字符串兜底
|
||||
"""
|
||||
nums = _BUILD_NUM_RE.findall(tag)
|
||||
primary = int(nums[-1]) if nums else -1
|
||||
return (primary, tag)
|
||||
|
||||
|
||||
def cleanup_repo(
|
||||
registry: str,
|
||||
repo: str,
|
||||
keep: int,
|
||||
dry_run: bool,
|
||||
) -> Tuple[int, int, int]:
|
||||
"""
|
||||
返回 (total, deleted, skipped)
|
||||
"""
|
||||
tags = list_tags(registry, repo)
|
||||
total = len(tags)
|
||||
if total == 0:
|
||||
print(f" └─ 无 tag,跳过")
|
||||
return 0, 0, 0
|
||||
|
||||
# 降序:越新的越靠前
|
||||
tags.sort(key=tag_sort_key, reverse=True)
|
||||
keep_list = tags[:keep]
|
||||
delete_list = tags[keep:]
|
||||
|
||||
print(f" ├─ 共 {total} 个 tag,保留最新 {len(keep_list)} 个:{keep_list}")
|
||||
if not delete_list:
|
||||
print(f" └─ 无需删除")
|
||||
return total, 0, 0
|
||||
|
||||
deleted = 0
|
||||
skipped = 0
|
||||
# 去重:多个 tag 可能指向同一 digest,只需要 DELETE 一次
|
||||
seen_digests = set()
|
||||
for tag in delete_list:
|
||||
digest = get_manifest_digest(registry, repo, tag)
|
||||
if not digest:
|
||||
print(f" │ [SKIP] {tag:30s} (digest 取不到)")
|
||||
skipped += 1
|
||||
continue
|
||||
if digest in seen_digests:
|
||||
print(f" │ [DEDUP] {tag:30s} {digest[:19]}...")
|
||||
continue
|
||||
seen_digests.add(digest)
|
||||
if dry_run:
|
||||
print(f" │ [DRY] {tag:30s} {digest[:19]}...")
|
||||
deleted += 1
|
||||
continue
|
||||
ok = delete_manifest(registry, repo, digest)
|
||||
if ok:
|
||||
print(f" │ [DELETE] {tag:30s} {digest[:19]}...")
|
||||
deleted += 1
|
||||
else:
|
||||
print(f" │ [FAIL] {tag:30s} {digest[:19]}...")
|
||||
skipped += 1
|
||||
|
||||
print(f" └─ 已删除 {deleted},跳过 {skipped}")
|
||||
return total, deleted, skipped
|
||||
|
||||
|
||||
def run_gc(container: str, dry_run: bool) -> bool:
|
||||
cmd = [
|
||||
"docker", "exec", container,
|
||||
"registry", "garbage-collect",
|
||||
"--delete-untagged=true",
|
||||
"/etc/docker/registry/config.yml",
|
||||
]
|
||||
print(f"\n🧹 触发 Registry GC:{' '.join(cmd)}")
|
||||
if dry_run:
|
||||
print(" (--dry-run 已跳过实际执行)")
|
||||
return True
|
||||
try:
|
||||
result = subprocess.run(cmd, capture_output=True, text=True, timeout=300)
|
||||
except subprocess.TimeoutExpired:
|
||||
print(" ❌ GC 超时(>5min)")
|
||||
return False
|
||||
except FileNotFoundError:
|
||||
print(" ❌ 找不到 docker 命令,确认脚本跑在 Registry 宿主机")
|
||||
return False
|
||||
if result.returncode == 0:
|
||||
# GC 输出可能很长,只打 tail
|
||||
tail = "\n".join(result.stdout.splitlines()[-10:])
|
||||
print(f" ✅ GC 成功(输出末 10 行):\n{tail}")
|
||||
return True
|
||||
print(f" ❌ GC 失败 (rc={result.returncode})")
|
||||
if result.stderr:
|
||||
print(f" stderr: {result.stderr.strip()[:500]}")
|
||||
return False
|
||||
|
||||
|
||||
def parse_args():
|
||||
p = argparse.ArgumentParser(description="Docker Registry 镜像清理")
|
||||
p.add_argument("--registry", default=DEFAULT_REGISTRY, help=f"Registry URL(默认 {DEFAULT_REGISTRY})")
|
||||
p.add_argument("--keep", type=int, default=DEFAULT_KEEP, help=f"每仓库保留版本数(默认 {DEFAULT_KEEP})")
|
||||
p.add_argument("--repos", default=",".join(DEFAULT_REPOS),
|
||||
help="逗号分隔的仓库名列表(默认:内置服务清单)")
|
||||
p.add_argument("--gc-container", default=None,
|
||||
help="Registry 容器名;指定则删除完后触发 garbage-collect")
|
||||
p.add_argument("--dry-run", action="store_true", help="只打印计划,不实际删除")
|
||||
return p.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
if args.keep < 1:
|
||||
print("❌ --keep 必须 >= 1", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
repos = [r.strip() for r in args.repos.split(",") if r.strip()]
|
||||
if not repos:
|
||||
print("❌ 仓库列表为空", file=sys.stderr)
|
||||
return 1
|
||||
|
||||
# 连通性检查
|
||||
try:
|
||||
with http_request("GET", f"{args.registry}/v2/", timeout=5):
|
||||
pass
|
||||
except (urllib.error.URLError, urllib.error.HTTPError) as e:
|
||||
# /v2/ 返回 401 也算通(部分 Registry 开启认证)
|
||||
if not (isinstance(e, urllib.error.HTTPError) and e.code == 401):
|
||||
print(f"❌ Registry 不可达:{args.registry} — {e}", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
print(f"🎯 Registry={args.registry} keep={args.keep} dry_run={args.dry_run}")
|
||||
print(f"📦 仓库:{repos}\n")
|
||||
|
||||
overall_total = overall_deleted = overall_skipped = 0
|
||||
failed_repos = []
|
||||
for repo in repos:
|
||||
print(f"=== {repo} ===")
|
||||
try:
|
||||
t, d, s = cleanup_repo(args.registry, repo, args.keep, args.dry_run)
|
||||
overall_total += t
|
||||
overall_deleted += d
|
||||
overall_skipped += s
|
||||
except Exception as e:
|
||||
print(f" ❌ 处理异常:{e}")
|
||||
failed_repos.append(repo)
|
||||
|
||||
print(f"\n📊 总计:扫描 {overall_total} / 删除 {overall_deleted} / 跳过 {overall_skipped}")
|
||||
if failed_repos:
|
||||
print(f"⚠️ 失败仓库:{failed_repos}")
|
||||
|
||||
# GC
|
||||
if args.gc_container and overall_deleted > 0:
|
||||
ok = run_gc(args.gc_container, args.dry_run)
|
||||
if not ok:
|
||||
return 3
|
||||
elif args.gc_container:
|
||||
print("\n🟡 无 manifest 被删除,跳过 GC")
|
||||
else:
|
||||
print("\n💡 未指定 --gc-container,逻辑删除完成但磁盘尚未释放;")
|
||||
print(" 请在 Registry 宿主机手动执行:")
|
||||
print(f" docker exec <registry-container> registry garbage-collect \\")
|
||||
print(f" --delete-untagged=true /etc/docker/registry/config.yml")
|
||||
|
||||
return 3 if failed_repos else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -10,6 +10,7 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@@ -60,6 +61,7 @@ public class CleanRuleProcessorManager {
|
||||
}
|
||||
|
||||
Long deviceId = message.getDeviceId();
|
||||
LocalDateTime reportTime = message.getReportTime();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> data = (Map<String, Object>) message.getParams();
|
||||
|
||||
@@ -73,7 +75,7 @@ public class CleanRuleProcessorManager {
|
||||
} else {
|
||||
// 属性上报:直接遍历 key-value
|
||||
data.forEach((identifier, value) ->
|
||||
processDataSafely(deviceId, identifier, value));
|
||||
processDataSafely(deviceId, identifier, value, reportTime));
|
||||
|
||||
// 4. 蓝牙信号缺失补偿:当设备上报了属性但不含 bluetoothDevices 时,
|
||||
// 主动注入一次 null 调用,使 BeaconDetectionRuleProcessor 能写入 -999(信号缺失),
|
||||
@@ -81,7 +83,7 @@ public class CleanRuleProcessorManager {
|
||||
if (!data.containsKey("bluetoothDevices")) {
|
||||
beaconDetectionRuleProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
|
||||
// 轨迹检测同样需要信号丢失补偿,注入 null 使窗口写入 -999
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null);
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, "bluetoothDevices", null, reportTime);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -127,7 +129,7 @@ public class CleanRuleProcessorManager {
|
||||
* @param identifier 标识符
|
||||
* @param value 数据值
|
||||
*/
|
||||
private void processDataSafely(Long deviceId, String identifier, Object value) {
|
||||
private void processDataSafely(Long deviceId, String identifier, Object value, LocalDateTime reportTime) {
|
||||
try {
|
||||
switch (identifier) {
|
||||
case "people_in", "people_out" ->
|
||||
@@ -135,7 +137,7 @@ public class CleanRuleProcessorManager {
|
||||
case "bluetoothDevices" -> {
|
||||
beaconDetectionRuleProcessor.processPropertyChange(deviceId, identifier, value);
|
||||
// 轨迹检测:独立于保洁到岗检测,匹配所有已知 Beacon
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value);
|
||||
trajectoryDetectionProcessor.processPropertyChange(deviceId, identifier, value, reportTime);
|
||||
}
|
||||
default -> {
|
||||
// 其他属性/事件忽略
|
||||
|
||||
@@ -24,6 +24,7 @@ import org.springframework.stereotype.Component;
|
||||
|
||||
import com.viewsh.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@@ -57,6 +58,17 @@ public class TrajectoryDetectionProcessor {
|
||||
private static final String DEVICE_ENABLED_KEY_PATTERN = "iot:trajectory:device:enabled:%s";
|
||||
private static final int DEVICE_ENABLED_TTL_SECONDS = 3600; // 1小时
|
||||
|
||||
/**
|
||||
* 最小停留时长(毫秒):进入区域后至少停留这么久,才允许发布 LEAVE/AREA_SWITCH,
|
||||
* 用于过滤 RSSI 抖动和批量消息回放导致的瞬态切换
|
||||
*/
|
||||
private static final long MIN_STAY_MILLIS = 5_000L;
|
||||
|
||||
/**
|
||||
* 区域切换滞回阈值(dB):候选区域 RSSI 必须比当前区域 RSSI 高出此值,才允许 AREA_SWITCH
|
||||
*/
|
||||
private static final int RSSI_HYSTERESIS_DB = 5;
|
||||
|
||||
@Resource
|
||||
private BeaconRegistryService beaconRegistryService;
|
||||
|
||||
@@ -84,11 +96,20 @@ public class TrajectoryDetectionProcessor {
|
||||
* @param deviceId 设备ID(工牌)
|
||||
* @param identifier 属性标识符
|
||||
* @param propertyValue 蓝牙设备列表
|
||||
* @param reportTime 设备上报时间(可为 null,为空则用当前时间兜底)
|
||||
*/
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue) {
|
||||
public void processPropertyChange(Long deviceId, String identifier, Object propertyValue,
|
||||
LocalDateTime reportTime) {
|
||||
if (!"bluetoothDevices".equals(identifier)) {
|
||||
return;
|
||||
}
|
||||
LocalDateTime eventTime;
|
||||
if (reportTime != null) {
|
||||
eventTime = reportTime;
|
||||
} else {
|
||||
eventTime = LocalDateTime.now();
|
||||
log.warn("[Trajectory] reportTime 为空,使用当前时间兜底:deviceId={}(若频繁出现请排查上游调用链)", deviceId);
|
||||
}
|
||||
|
||||
// 1. 检查设备是否开启轨迹功能
|
||||
if (!isTrajectoryEnabled(deviceId)) {
|
||||
@@ -125,9 +146,9 @@ public class TrajectoryDetectionProcessor {
|
||||
|
||||
// 8. 处理区域状态变化
|
||||
if (currentArea != null) {
|
||||
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex);
|
||||
processWithCurrentArea(deviceId, currentArea, matchedBeacons, areaConfigIndex, eventTime);
|
||||
} else {
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,8 +160,11 @@ public class TrajectoryDetectionProcessor {
|
||||
private void processWithCurrentArea(Long deviceId,
|
||||
TrajectoryStateRedisDAO.CurrentAreaInfo currentArea,
|
||||
Map<Long, MatchedBeacon> matchedBeacons,
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex,
|
||||
LocalDateTime eventTime) {
|
||||
Long currentAreaId = currentArea.getAreaId();
|
||||
long stayMillis = System.currentTimeMillis() - (currentArea.getEnterTime() != null ? currentArea.getEnterTime() : 0L);
|
||||
boolean minStayReached = stayMillis >= MIN_STAY_MILLIS;
|
||||
|
||||
// 6a. 检查当前区域的退出条件
|
||||
BeaconPresenceConfig currentConfig = areaConfigIndex.get(currentAreaId);
|
||||
@@ -153,16 +177,22 @@ public class TrajectoryDetectionProcessor {
|
||||
AreaState.IN_AREA);
|
||||
|
||||
if (exitResult == DetectionResult.LEAVE_CONFIRMED) {
|
||||
if (!minStayReached) {
|
||||
// 未达到最小停留时长,视为瞬态抖动,忽略本次离开
|
||||
log.debug("[Trajectory] 未达最小停留,忽略 LEAVE:deviceId={}, areaId={}, stayMs={}",
|
||||
deviceId, currentAreaId, stayMillis);
|
||||
return;
|
||||
}
|
||||
// 确认离开当前区域
|
||||
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
|
||||
"SIGNAL_LOSS", currentArea.getEnterTime());
|
||||
"SIGNAL_LOSS", currentArea.getEnterTime(), eventTime);
|
||||
stateRedisDAO.clearCurrentArea(deviceId);
|
||||
windowRedisDAO.clearWindow(deviceId, currentAreaId);
|
||||
|
||||
log.info("[Trajectory] 离开区域:deviceId={}, areaId={}, reason=SIGNAL_LOSS", deviceId, currentAreaId);
|
||||
|
||||
// 离开后,尝试进入新区域
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex);
|
||||
processWithoutCurrentArea(deviceId, matchedBeacons, areaConfigIndex, eventTime);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -170,15 +200,29 @@ public class TrajectoryDetectionProcessor {
|
||||
// 6b. 当前区域未退出,检查是否有更强区域触发切换
|
||||
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, currentAreaId);
|
||||
if (bestCandidate != null && !bestCandidate.areaId.equals(currentAreaId)) {
|
||||
if (!minStayReached) {
|
||||
log.debug("[Trajectory] 未达最小停留,忽略 AREA_SWITCH:deviceId={}, from={}, to={}, stayMs={}",
|
||||
deviceId, currentAreaId, bestCandidate.areaId, stayMillis);
|
||||
return;
|
||||
}
|
||||
// 切换滞回:候选 RSSI 必须显著强于当前区域 RSSI
|
||||
// 优先取本次匹配值;若当前未匹配到,回退到窗口里最近一次非缺失(-999)样本,
|
||||
// 避免当前信标短暂漏扫时滞回被 -999 哨兵破坏
|
||||
int currentRssi = resolveCurrentAreaRssi(deviceId, currentAreaId, matchedBeacons);
|
||||
if (bestCandidate.rssi - currentRssi < RSSI_HYSTERESIS_DB) {
|
||||
log.debug("[Trajectory] 未达滞回阈值,忽略 AREA_SWITCH:deviceId={}, from={}({}dBm), to={}({}dBm)",
|
||||
deviceId, currentAreaId, currentRssi, bestCandidate.areaId, bestCandidate.rssi);
|
||||
return;
|
||||
}
|
||||
// 区域切换:先离开当前区域,再进入新区域
|
||||
publishLeaveEvent(deviceId, currentAreaId, currentArea.getBeaconMac(),
|
||||
"AREA_SWITCH", currentArea.getEnterTime());
|
||||
"AREA_SWITCH", currentArea.getEnterTime(), eventTime);
|
||||
windowRedisDAO.clearWindow(deviceId, currentAreaId);
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
|
||||
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime);
|
||||
|
||||
log.info("[Trajectory] 区域切换:deviceId={}, from={}, to={}", deviceId, currentAreaId, bestCandidate.areaId);
|
||||
}
|
||||
@@ -189,13 +233,14 @@ public class TrajectoryDetectionProcessor {
|
||||
*/
|
||||
private void processWithoutCurrentArea(Long deviceId,
|
||||
Map<Long, MatchedBeacon> matchedBeacons,
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex) {
|
||||
Map<Long, BeaconPresenceConfig> areaConfigIndex,
|
||||
LocalDateTime eventTime) {
|
||||
MatchedBeacon bestCandidate = findBestEnterCandidate(deviceId, matchedBeacons, null);
|
||||
if (bestCandidate != null) {
|
||||
long now = System.currentTimeMillis();
|
||||
stateRedisDAO.setCurrentArea(deviceId, bestCandidate.areaId, now, bestCandidate.beaconMac);
|
||||
windowRedisDAO.clearWindow(deviceId, bestCandidate.areaId);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi);
|
||||
publishEnterEvent(deviceId, bestCandidate.areaId, bestCandidate.beaconMac, bestCandidate.rssi, eventTime);
|
||||
|
||||
log.info("[Trajectory] 进入区域:deviceId={}, areaId={}, rssi={}", deviceId, bestCandidate.areaId, bestCandidate.rssi);
|
||||
}
|
||||
@@ -282,6 +327,31 @@ public class TrajectoryDetectionProcessor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析当前区域用于滞回判断的参考 RSSI:
|
||||
* 1) 本次上报匹配到了当前区域 → 直接用本次 RSSI
|
||||
* 2) 未匹配到 → 取窗口里最近一次非 -999(缺失哨兵)样本
|
||||
* 3) 仍取不到 → 返回 -999(此时滞回天然放行,等价于允许切换,
|
||||
* 因为当前区域已彻底失去信号)
|
||||
*/
|
||||
private int resolveCurrentAreaRssi(Long deviceId, Long currentAreaId,
|
||||
Map<Long, MatchedBeacon> matchedBeacons) {
|
||||
MatchedBeacon matched = matchedBeacons.get(currentAreaId);
|
||||
if (matched != null) {
|
||||
return matched.rssi;
|
||||
}
|
||||
List<Integer> window = windowRedisDAO.getWindow(deviceId, currentAreaId);
|
||||
if (window != null) {
|
||||
for (int i = window.size() - 1; i >= 0; i--) {
|
||||
Integer sample = window.get(i);
|
||||
if (sample != null && sample != -999) {
|
||||
return sample;
|
||||
}
|
||||
}
|
||||
}
|
||||
return -999;
|
||||
}
|
||||
|
||||
/**
|
||||
* 找到信号最强且满足进入条件的候选区域
|
||||
*
|
||||
@@ -366,7 +436,8 @@ public class TrajectoryDetectionProcessor {
|
||||
|
||||
// ==================== 事件发布 ====================
|
||||
|
||||
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi) {
|
||||
private void publishEnterEvent(Long deviceId, Long areaId, String beaconMac, Integer enterRssi,
|
||||
LocalDateTime eventTime) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
TrajectoryEnterEvent event = TrajectoryEnterEvent.builder()
|
||||
@@ -376,6 +447,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.areaId(areaId)
|
||||
.beaconMac(beaconMac)
|
||||
.enterRssi(enterRssi)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.build();
|
||||
|
||||
@@ -390,7 +462,7 @@ public class TrajectoryDetectionProcessor {
|
||||
}
|
||||
|
||||
private void publishLeaveEvent(Long deviceId, Long areaId, String beaconMac,
|
||||
String leaveReason, Long enterTimestamp) {
|
||||
String leaveReason, Long enterTimestamp, LocalDateTime eventTime) {
|
||||
try {
|
||||
IotDeviceDO device = deviceService.getDeviceFromCache(deviceId);
|
||||
TrajectoryLeaveEvent event = TrajectoryLeaveEvent.builder()
|
||||
@@ -401,6 +473,7 @@ public class TrajectoryDetectionProcessor {
|
||||
.beaconMac(beaconMac)
|
||||
.leaveReason(leaveReason)
|
||||
.enterTimestamp(enterTimestamp)
|
||||
.eventTime((eventTime != null ? eventTime : LocalDateTime.now()).toString())
|
||||
.tenantId(TenantContextHolder.getTenantId())
|
||||
.build();
|
||||
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
package com.viewsh.module.ops.environment.integration.listener;
|
||||
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||
import com.viewsh.module.ops.enums.BadgeDeviceStatusEnum;
|
||||
import com.viewsh.module.ops.environment.integration.dto.IotDeviceStatusChangedEventDTO;
|
||||
import com.viewsh.module.ops.environment.service.badge.BadgeDeviceStatusService;
|
||||
import com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent;
|
||||
import com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
|
||||
/**
|
||||
* 区域-工牌设备绑定/解绑事件监听器
|
||||
* <p>
|
||||
* 绑定({@link AreaDeviceBoundEvent}):
|
||||
* BADGE 关系建立前,IoT 实时上线事件会被 {@code BadgeDeviceStatusEventHandler.isBadgeDevice()}
|
||||
* 拒掉;建立关系后没有任何机制回填 Redis,导致设备直到下次定时对账(5/30 分钟)才会出现在
|
||||
* "可分配工牌"列表,期间收到的工单也无法派给该设备。监听器在绑定事务提交后定向查询一次
|
||||
* IoT 设备信息(含状态、昵称),回写 Ops 工牌缓存。
|
||||
* <p>
|
||||
* 解绑({@link AreaDeviceUnboundEvent}):
|
||||
* 解绑后 SyncJob 因关系记录消失不会再扫到该设备,Redis 工牌缓存得等 24h TTL 自然过期,
|
||||
* 期间该设备仍可能出现在"可分配/活跃工牌"列表里。监听器在解绑事务提交后立即清理 Redis 状态,
|
||||
* 与绑定路径形成闭环。
|
||||
* <p>
|
||||
* 二者均使用 AFTER_COMMIT + @Async:事务提交后才在独立线程执行,不阻塞绑定/解绑接口响应。
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class BadgeAreaBoundEventListener {
|
||||
|
||||
private static final String TYPE_BADGE = "BADGE";
|
||||
|
||||
@Resource
|
||||
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||
|
||||
@Resource
|
||||
private BadgeDeviceStatusService badgeDeviceStatusService;
|
||||
|
||||
@Async("ops-task-executor")
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
|
||||
public void onAreaDeviceBound(AreaDeviceBoundEvent event) {
|
||||
if (event == null || !TYPE_BADGE.equals(event.getRelationType())) {
|
||||
return;
|
||||
}
|
||||
Long deviceId = event.getDeviceId();
|
||||
Long areaId = event.getAreaId();
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// 单次 RPC 取齐 state + nickname + deviceName(IotDeviceSimpleRespDTO 已含 state 字段)
|
||||
CommonResult<IotDeviceSimpleRespDTO> result = iotDeviceQueryApi.getDevice(deviceId);
|
||||
if (result == null || !result.isSuccess() || result.getData() == null) {
|
||||
log.warn("[BadgeAreaBoundEventListener] 查询 IoT 设备失败,跳过回填: deviceId={}, msg={}",
|
||||
deviceId, result != null ? result.getMsg() : "null");
|
||||
return;
|
||||
}
|
||||
IotDeviceSimpleRespDTO device = result.getData();
|
||||
|
||||
// IotDeviceSimpleRespDTO.state 与 IotDeviceStatusChangedEventDTO 的 status 编码一致
|
||||
// (0=未激活,1=在线,2=离线),未激活/离线统一回写 OFFLINE
|
||||
BadgeDeviceStatusEnum target = IotDeviceStatusChangedEventDTO.STATUS_ONLINE.equals(device.getState())
|
||||
? BadgeDeviceStatusEnum.IDLE
|
||||
: BadgeDeviceStatusEnum.OFFLINE;
|
||||
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
device.getDeviceName(),
|
||||
device.getNickname(),
|
||||
target == BadgeDeviceStatusEnum.IDLE ? areaId : null,
|
||||
target,
|
||||
"BADGE 绑定后回填");
|
||||
|
||||
log.info("[BadgeAreaBoundEventListener] 工牌设备状态回填完成: deviceId={}, areaId={}, target={}",
|
||||
deviceId, areaId, target);
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeAreaBoundEventListener] 工牌设备状态回填失败: deviceId={}, areaId={}",
|
||||
deviceId, areaId, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Async("ops-task-executor")
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
|
||||
public void onAreaDeviceUnbound(AreaDeviceUnboundEvent event) {
|
||||
if (event == null || !TYPE_BADGE.equals(event.getRelationType())) {
|
||||
return;
|
||||
}
|
||||
Long deviceId = event.getDeviceId();
|
||||
if (deviceId == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
badgeDeviceStatusService.deleteBadgeStatus(deviceId);
|
||||
log.info("[BadgeAreaBoundEventListener] 工牌设备解绑后 Redis 状态已清理: deviceId={}, areaId={}",
|
||||
deviceId, event.getAreaId());
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeAreaBoundEventListener] 工牌设备解绑后清理失败: deviceId={}", deviceId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -15,9 +15,6 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
/**
|
||||
* 工牌设备状态事件监听器
|
||||
@@ -87,9 +84,6 @@ public class BadgeDeviceStatusEventListener {
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@Resource
|
||||
private PlatformTransactionManager transactionManager;
|
||||
|
||||
/**
|
||||
* 监听工单状态变更事件,同步更新设备工单关联
|
||||
* <p>
|
||||
@@ -180,40 +174,27 @@ public class BadgeDeviceStatusEventListener {
|
||||
|
||||
/**
|
||||
* 处理工单推送状态(首次设置工单关联)
|
||||
* <p>
|
||||
* 若 Redis 里检测到旧 orderId(正常业务不应出现),仅打 ERROR 告警并清理 Redis 关联。
|
||||
* 此前版本会在此处"自动取消旧工单",但那是对"数据已错乱"场景的暴力兜底:
|
||||
* <ul>
|
||||
* <li>取消使用 REQUIRES_NEW 独立事务且吞异常,失败时新单照常落地,旧单残留,形成越清越多</li>
|
||||
* <li>真正的防线应在 DispatchEngine.autoDispatchNext 入口做设备空闲校验</li>
|
||||
* </ul>
|
||||
* 现改为被动告警,暴露问题等待定位,避免误杀保洁员正在执行的任务。
|
||||
*/
|
||||
private void handleDispatched(Long deviceId, Long orderId, OpsOrderDO order) {
|
||||
// 检查并清理旧工单(防止工单切换时状态残留)
|
||||
BadgeDeviceStatusDTO deviceStatus = badgeDeviceStatusService.getBadgeStatus(deviceId);
|
||||
if (deviceStatus != null && deviceStatus.getCurrentOpsOrderId() != null) {
|
||||
Long oldOrderId = deviceStatus.getCurrentOpsOrderId();
|
||||
if (!oldOrderId.equals(orderId)) {
|
||||
log.warn("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留: " +
|
||||
"deviceId={}, oldOrderId={}, newOrderId={}", deviceId, oldOrderId, orderId);
|
||||
|
||||
// 检查旧工单是否仍在进行中,如果是则先取消
|
||||
OpsOrderDO oldOrder = opsOrderMapper.selectById(oldOrderId);
|
||||
if (oldOrder != null) {
|
||||
WorkOrderStatusEnum oldStatus = WorkOrderStatusEnum.fromStatus(oldOrder.getStatus());
|
||||
if (oldStatus == WorkOrderStatusEnum.DISPATCHED
|
||||
|| oldStatus == WorkOrderStatusEnum.CONFIRMED
|
||||
|| oldStatus == WorkOrderStatusEnum.ARRIVED) {
|
||||
// 旧工单仍在进行,先取消
|
||||
// 使用 REQUIRES_NEW 独立事务,避免内层异常标记外层事务 rollback-only
|
||||
log.warn("[BadgeDeviceStatusEventListener] 取消残留的旧工单: oldOrderId={}", oldOrderId);
|
||||
try {
|
||||
TransactionTemplate txTemplate = new TransactionTemplate(transactionManager);
|
||||
txTemplate.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
|
||||
txTemplate.executeWithoutResult(status -> {
|
||||
orderLifecycleManager.cancelOrder(oldOrderId, deviceId,
|
||||
OperatorTypeEnum.SYSTEM, "新工单派发,自动取消旧工单");
|
||||
});
|
||||
} catch (Exception e) {
|
||||
log.error("[BadgeDeviceStatusEventListener] 取消旧工单失败: oldOrderId={}", oldOrderId, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
String oldStatus = oldOrder != null ? oldOrder.getStatus() : "NOT_FOUND";
|
||||
log.error("[BadgeDeviceStatusEventListener] 派发新工单时检测到旧工单残留(数据可能已错乱,需人工核查): " +
|
||||
"deviceId={}, oldOrderId={}, oldStatus={}, newOrderId={}",
|
||||
deviceId, oldOrderId, oldStatus, orderId);
|
||||
|
||||
// 确保设备状态清理(无论旧工单是否取消成功)
|
||||
// 清理 Redis 中对旧工单的关联(纯 Redis 操作,不触达状态机)
|
||||
badgeDeviceStatusService.clearCurrentOrder(deviceId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.framework.tenant.core.job.TenantJob;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceStatusQueryApi;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||
import com.viewsh.module.iot.api.device.dto.status.DeviceStatusRespDTO;
|
||||
import com.viewsh.module.ops.api.badge.BadgeDeviceStatusDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
@@ -18,6 +20,8 @@ import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@@ -47,6 +51,9 @@ public class BadgeDeviceStatusSyncJob {
|
||||
@Resource
|
||||
private IotDeviceStatusQueryApi iotDeviceStatusQueryApi;
|
||||
|
||||
@Resource
|
||||
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||
|
||||
@Resource
|
||||
private OpsAreaDeviceRelationMapper areaDeviceRelationMapper;
|
||||
|
||||
@@ -120,6 +127,9 @@ public class BadgeDeviceStatusSyncJob {
|
||||
OpsAreaDeviceRelationDO::getAreaId,
|
||||
(existing, replacement) -> existing));
|
||||
|
||||
// 3b. 批量查询设备 nickname(IoT 是唯一可信源),防止 Redis key 丢失后降级到 deviceCode
|
||||
Map<Long, String> deviceNicknameMap = loadDeviceNicknameMap(deviceIds);
|
||||
|
||||
// 4. 逐一对账并修正
|
||||
for (DeviceStatusRespDTO iotStatus : iotResult.getData()) {
|
||||
// 4a. 工单一致性检查(修复残留的已终态工单关联)
|
||||
@@ -135,7 +145,10 @@ public class BadgeDeviceStatusSyncJob {
|
||||
}
|
||||
|
||||
// 4b. IoT 在线/离线状态对账
|
||||
boolean corrected = syncSingleDevice(iotStatus, deviceAreaMap.get(iotStatus.getDeviceId()));
|
||||
boolean corrected = syncSingleDevice(
|
||||
iotStatus,
|
||||
deviceAreaMap.get(iotStatus.getDeviceId()),
|
||||
deviceNicknameMap.get(iotStatus.getDeviceId()));
|
||||
syncCount++;
|
||||
if (corrected) {
|
||||
correctedCount++;
|
||||
@@ -154,9 +167,10 @@ public class BadgeDeviceStatusSyncJob {
|
||||
*
|
||||
* @param iotStatus IoT 设备状态
|
||||
* @param areaId 设备所属区域ID
|
||||
* @param nickname 设备昵称(从 IoT 查到的权威值,允许 null)
|
||||
* @return 是否进行了修正
|
||||
*/
|
||||
private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId) {
|
||||
private boolean syncSingleDevice(DeviceStatusRespDTO iotStatus, Long areaId, String nickname) {
|
||||
Long deviceId = iotStatus.getDeviceId();
|
||||
|
||||
try {
|
||||
@@ -168,8 +182,20 @@ public class BadgeDeviceStatusSyncJob {
|
||||
boolean opsOnline = opsStatus != null && opsStatus.getStatus() != null
|
||||
&& opsStatus.getStatus().isActive();
|
||||
|
||||
// 如果状态一致,无需修正
|
||||
// 如果状态一致,但 Redis 缺 nickname 而 IoT 有值,则补写一次防止派单时降级显示 deviceCode
|
||||
if (iotOnline == opsOnline) {
|
||||
if (iotOnline && nickname != null
|
||||
&& (opsStatus == null || opsStatus.getNickname() == null)) {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
iotStatus.getDeviceCode(),
|
||||
nickname,
|
||||
areaId,
|
||||
BadgeDeviceStatusEnum.IDLE,
|
||||
"定时对账补写-昵称");
|
||||
log.info("[SyncJob] 补写设备昵称:deviceId={}, nickname={}", deviceId, nickname);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -178,17 +204,17 @@ public class BadgeDeviceStatusSyncJob {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
iotStatus.getDeviceCode(),
|
||||
null, // nickname: 对账场景不更新昵称,保留Redis中已有值
|
||||
nickname,
|
||||
areaId,
|
||||
BadgeDeviceStatusEnum.IDLE,
|
||||
"定时对账修正-上线");
|
||||
log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}",
|
||||
deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL");
|
||||
log.info("[SyncJob] 修正设备状态:deviceId={}, IoT=ONLINE, Ops={}, nickname={}",
|
||||
deviceId, opsOnline ? "ACTIVE" : "OFFLINE/NULL", nickname);
|
||||
} else {
|
||||
badgeDeviceStatusService.updateBadgeOnlineStatus(
|
||||
deviceId,
|
||||
iotStatus.getDeviceCode(),
|
||||
null, // nickname: 对账场景不更新昵称,保留Redis中已有值
|
||||
nickname,
|
||||
null,
|
||||
BadgeDeviceStatusEnum.OFFLINE,
|
||||
"定时对账修正-离线");
|
||||
@@ -204,6 +230,35 @@ public class BadgeDeviceStatusSyncJob {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量从 IoT 查询设备昵称
|
||||
* <p>
|
||||
* Redis 中 ops:badge:device:{deviceId} 的 nickname 字段可能因 TTL/重启/缓存清理而缺失,
|
||||
* 每次对账时以 IoT 为唯一可信源做回填,避免派单时降级为 deviceCode(如 "43607737587")。
|
||||
*/
|
||||
private Map<Long, String> loadDeviceNicknameMap(List<Long> deviceIds) {
|
||||
if (CollUtil.isEmpty(deviceIds)) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
try {
|
||||
CommonResult<List<IotDeviceSimpleRespDTO>> result = iotDeviceQueryApi.batchGetDevices(deviceIds);
|
||||
if (!result.isSuccess() || CollUtil.isEmpty(result.getData())) {
|
||||
log.warn("[SyncJob] 查询设备昵称失败或为空: {}", result.getMsg());
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
Map<Long, String> map = new HashMap<>(result.getData().size());
|
||||
for (IotDeviceSimpleRespDTO dto : result.getData()) {
|
||||
if (dto.getId() != null && dto.getNickname() != null) {
|
||||
map.put(dto.getId(), dto.getNickname());
|
||||
}
|
||||
}
|
||||
return map;
|
||||
} catch (Exception e) {
|
||||
log.warn("[SyncJob] 批量查询设备昵称异常,本次对账跳过昵称回填", e);
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步结果
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
package com.viewsh.module.ops.environment.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX;
|
||||
import com.viewsh.framework.tenant.core.job.TenantJob;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 保洁工单超时自动取消 Job
|
||||
* <p>
|
||||
* 职责:
|
||||
* 扫描所有保洁类(order_type=CLEAN)非终态工单,
|
||||
* 若最近一次进展(update_time)距今超过阈值(默认 12 小时),
|
||||
* 以 SYSTEM 身份走正常取消流程将其关闭。
|
||||
* <p>
|
||||
* 设计要点:
|
||||
* 1. 时间基准使用 update_time 而非 create_time——任何状态转换/字段更新都会刷新 update_time,
|
||||
* 这样"按最新进展计算超时"才准确:刚被重派的 DISPATCHED 单不会因 create_time 老而被误杀。
|
||||
* 2. 状态白名单 = PENDING / QUEUED / DISPATCHED / CONFIRMED / ARRIVED(不含 PAUSED)。
|
||||
* PAUSED 是 P0 打断的产物,应由 resumeInterruptedOrder 经状态机走 PAUSED → DISPATCHED
|
||||
* 恢复。若此 Job 把 PAUSED 单直接 CANCELLED,P0 完成后的 resume 会在状态机检查
|
||||
* "PAUSED → DISPATCHED" 时因源状态已变为 CANCELLED 而抛 IllegalStateException,
|
||||
* 进而破坏 P0 恢复链路。PAUSED 若真的卡死(P0 也卡),交由人工审核,不自动化。
|
||||
* 3. 取消调用 {@link OrderLifecycleManager#cancelOrder} 走完整责任链:
|
||||
* StateTransitionHandler → QueueSyncHandler → EventPublishHandler
|
||||
* → CleanOrderEventListener.onOrderStateChanged(CANCELLED) 会统一处理
|
||||
* TTS 停播、设备工单关联回收、审计日志。
|
||||
* 4. 单单独立事务 + try/catch 隔离,单条失败不影响批次其余工单。
|
||||
* 5. 单次扫描限 batchSize 条,防止异常堆积时一次性取消过多触发事件风暴;
|
||||
* 未处理完的工单留给下一轮 cron。
|
||||
* 6. cancel 前再做一次乐观校验:重查 update_time 是否仍 <= threshold。
|
||||
* 候选装内存到实际 cancel 之间如果有用户触达(确认/到岗),update_time 会被刷新;
|
||||
* 此时放弃 cancel,避免误杀用户刚触达的工单。
|
||||
* <p>
|
||||
* XXL-Job 配置建议:
|
||||
* - JobHandler: cleanOrderAutoCancelJob
|
||||
* - Cron: 0 17 * * * ? (每小时 :17 触发,避开整点尖峰)
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class CleanOrderAutoCancelJob {
|
||||
|
||||
private static final String BUSINESS_TYPE_CLEAN = "CLEAN";
|
||||
private static final String CANCEL_REASON = "超过12小时未处理,系统自动完结";
|
||||
|
||||
@Resource
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
|
||||
@Resource
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
/** 超时时长(小时),update_time 距今超过此值视为卡死 */
|
||||
@Value("${viewsh.ops.clean.auto-cancel.timeout-hours:12}")
|
||||
private int timeoutHours;
|
||||
|
||||
/** 单次最大扫描/取消工单数,防止事件风暴 */
|
||||
@Value("${viewsh.ops.clean.auto-cancel.batch-size:200}")
|
||||
private int batchSize;
|
||||
|
||||
@XxlJob("cleanOrderAutoCancelJob")
|
||||
@TenantJob
|
||||
public String execute() {
|
||||
try {
|
||||
CancelResult result = scanAndCancel();
|
||||
return StrUtil.format(
|
||||
"保洁工单超时自动取消完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms",
|
||||
result.scanned, result.succeeded, result.failed, result.skippedStale, result.durationMs);
|
||||
} catch (Exception e) {
|
||||
log.error("[CleanOrderAutoCancelJob] 执行失败", e);
|
||||
return StrUtil.format("保洁工单超时自动取消失败: {}", e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public CancelResult scanAndCancel() {
|
||||
long startTime = System.currentTimeMillis();
|
||||
LocalDateTime threshold = LocalDateTime.now().minusHours(timeoutHours);
|
||||
|
||||
log.info("[CleanOrderAutoCancelJob] 开始扫描: timeoutHours={}, threshold={}, batchSize={}",
|
||||
timeoutHours, threshold, batchSize);
|
||||
|
||||
List<OpsOrderDO> candidates = opsOrderMapper.selectList(new LambdaQueryWrapperX<OpsOrderDO>()
|
||||
.eq(OpsOrderDO::getOrderType, BUSINESS_TYPE_CLEAN)
|
||||
.notIn(OpsOrderDO::getStatus,
|
||||
WorkOrderStatusEnum.COMPLETED.getStatus(),
|
||||
WorkOrderStatusEnum.CANCELLED.getStatus(),
|
||||
// PAUSED 交由 resumeInterruptedOrder 经状态机恢复,不在此 Job 自动化处理
|
||||
WorkOrderStatusEnum.PAUSED.getStatus())
|
||||
.le(OpsOrderDO::getUpdateTime, threshold)
|
||||
.orderByAsc(OpsOrderDO::getUpdateTime)
|
||||
.last("LIMIT " + batchSize));
|
||||
|
||||
if (CollUtil.isEmpty(candidates)) {
|
||||
log.info("[CleanOrderAutoCancelJob] 无超时工单");
|
||||
return new CancelResult(0, 0, 0, 0, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
int succeeded = 0;
|
||||
int failed = 0;
|
||||
int skippedStale = 0;
|
||||
|
||||
for (OpsOrderDO order : candidates) {
|
||||
Long orderId = order.getId();
|
||||
try {
|
||||
// 乐观校验:候选装内存→实际 cancel 之间,用户可能已触达工单刷新 update_time。
|
||||
// 重查一次确认仍超时,避免把用户刚点过的工单一并 cancel 掉。
|
||||
OpsOrderDO fresh = opsOrderMapper.selectById(orderId);
|
||||
if (fresh == null
|
||||
|| WorkOrderStatusEnum.COMPLETED.getStatus().equals(fresh.getStatus())
|
||||
|| WorkOrderStatusEnum.CANCELLED.getStatus().equals(fresh.getStatus())
|
||||
|| WorkOrderStatusEnum.PAUSED.getStatus().equals(fresh.getStatus())
|
||||
|| fresh.getUpdateTime() == null
|
||||
|| fresh.getUpdateTime().isAfter(threshold)) {
|
||||
skippedStale++;
|
||||
log.info("[CleanOrderAutoCancelJob] 并发触达/状态已变,跳过: orderId={}, snapshotStatus={}, latestStatus={}, latestUpdateTime={}",
|
||||
orderId, order.getStatus(),
|
||||
fresh != null ? fresh.getStatus() : "NOT_FOUND",
|
||||
fresh != null ? fresh.getUpdateTime() : null);
|
||||
continue;
|
||||
}
|
||||
|
||||
orderLifecycleManager.cancelOrder(
|
||||
orderId,
|
||||
null,
|
||||
OperatorTypeEnum.SYSTEM,
|
||||
CANCEL_REASON);
|
||||
succeeded++;
|
||||
log.info("[CleanOrderAutoCancelJob] 自动取消成功: orderId={}, orderCode={}, status={}, updateTime={}",
|
||||
orderId, order.getOrderCode(), order.getStatus(), order.getUpdateTime());
|
||||
} catch (Exception e) {
|
||||
failed++;
|
||||
log.warn("[CleanOrderAutoCancelJob] 自动取消失败: orderId={}, orderCode={}, status={}, error={}",
|
||||
orderId, order.getOrderCode(), order.getStatus(), e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
log.info("[CleanOrderAutoCancelJob] 扫描完成: 扫描 {} 单, 成功 {}, 失败 {}, 跳过 {}, 耗时 {} ms",
|
||||
candidates.size(), succeeded, failed, skippedStale, duration);
|
||||
|
||||
return new CancelResult(candidates.size(), succeeded, failed, skippedStale, duration);
|
||||
}
|
||||
|
||||
public record CancelResult(int scanned, int succeeded, int failed, int skippedStale, long durationMs) {
|
||||
}
|
||||
}
|
||||
@@ -275,6 +275,7 @@ public class CleanBadgeServiceImpl implements CleanBadgeService {
|
||||
return BadgeStatusRespDTO.builder()
|
||||
.deviceId(status.getDeviceId())
|
||||
.deviceKey(status.getDeviceCode())
|
||||
.nickname(status.getNickname())
|
||||
.status(status.getStatusCode())
|
||||
.batteryLevel(status.getBatteryLevel())
|
||||
.lastHeartbeatTime(formatTimestamp(status.getLastHeartbeatTime()))
|
||||
|
||||
@@ -347,6 +347,7 @@ public class CleanWorkOrderServiceImpl implements CleanWorkOrderService {
|
||||
.orderId(req.getOrderId())
|
||||
.operator(OperatorContext.ofAdmin(req.getOperatorId(), resolveUserName(req.getOperatorId())))
|
||||
.assigneeId(req.getAssigneeId())
|
||||
.assigneeName(req.getAssigneeName())
|
||||
.reason(req.getRemark())
|
||||
.build());
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.viewsh.module.ops.environment.service.cleanorder.dto;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import jakarta.validation.constraints.Size;
|
||||
import lombok.Data;
|
||||
|
||||
/**
|
||||
@@ -21,6 +22,10 @@ public class CleanManualDispatchReqDTO {
|
||||
@NotNull(message = "目标设备ID不能为空")
|
||||
private Long assigneeId;
|
||||
|
||||
@Schema(description = "目标设备名称(昵称或设备编码)", example = "男卫-01")
|
||||
@Size(max = 100, message = "设备名称不能超过100字符")
|
||||
private String assigneeName;
|
||||
|
||||
@Schema(description = "派单备注", example = "紧急情况,指定该设备处理")
|
||||
private String remark;
|
||||
|
||||
|
||||
@@ -89,8 +89,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
|
||||
|
||||
if (selectedDevice != null) {
|
||||
String reason = buildRecommendationReason(selectedDevice, context);
|
||||
String assigneeName = selectedDevice.getNickname() != null
|
||||
? selectedDevice.getNickname() : selectedDevice.getDeviceCode();
|
||||
String assigneeName = resolveAssigneeName(selectedDevice);
|
||||
return AssigneeRecommendation.of(
|
||||
selectedDevice.getDeviceId(),
|
||||
assigneeName,
|
||||
@@ -118,8 +117,7 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
|
||||
.map(device -> {
|
||||
int score = calculateScore(device);
|
||||
String reason = buildRecommendationReason(device, context);
|
||||
String assigneeName = device.getNickname() != null
|
||||
? device.getNickname() : device.getDeviceCode();
|
||||
String assigneeName = resolveAssigneeName(device);
|
||||
return AssigneeRecommendation.of(
|
||||
device.getDeviceId(),
|
||||
assigneeName,
|
||||
@@ -133,6 +131,25 @@ public class BadgeDeviceAreaAssignStrategy implements AssignStrategy {
|
||||
|
||||
// ==================== 私有方法 ====================
|
||||
|
||||
/**
|
||||
* 解析执行人展示名称。
|
||||
* <p>
|
||||
* 优先用 nickname;nickname 缺失时(例如 Redis 状态缓存被清理、IoT 侧未维护昵称),
|
||||
* 返回 "工牌-尾号" 这样的可读降级文案,避免把 deviceCode/IMEI 这类长数字串直接当作人员名字暴露给调用方。
|
||||
*/
|
||||
private String resolveAssigneeName(BadgeDeviceStatusDTO device) {
|
||||
String nickname = device.getNickname();
|
||||
if (nickname != null && !nickname.isBlank()) {
|
||||
return nickname;
|
||||
}
|
||||
String code = device.getDeviceCode();
|
||||
if (code != null && !code.isBlank()) {
|
||||
int len = code.length();
|
||||
return "工牌-" + (len > 4 ? code.substring(len - 4) : code);
|
||||
}
|
||||
return device.getDeviceId() != null ? "工牌-" + device.getDeviceId() : "未知工牌";
|
||||
}
|
||||
|
||||
/**
|
||||
* 选择最佳设备
|
||||
*/
|
||||
|
||||
@@ -55,15 +55,9 @@ public class CleanOrderBusinessStrategy implements OrderBusinessStrategy {
|
||||
if (!badge.isOnline()) {
|
||||
throw new IllegalStateException("目标保洁设备当前离线,不能手动派单");
|
||||
}
|
||||
if (!badge.canAcceptNewOrder()) {
|
||||
throw new IllegalStateException("目标保洁设备当前不可接单");
|
||||
}
|
||||
if (order.getAreaId() != null && badge.getCurrentAreaId() == null) {
|
||||
throw new IllegalStateException("目标保洁设备当前未绑定区域,不能手动派单");
|
||||
}
|
||||
if (order.getAreaId() != null && !order.getAreaId().equals(badge.getCurrentAreaId())) {
|
||||
throw new IllegalStateException("目标保洁设备不在当前工单所属区域");
|
||||
}
|
||||
// 注意:以下校验已按产品需求移除,由调度员人工判断合理性:
|
||||
// 1. canAcceptNewOrder() — 允许向 BUSY/PAUSED 工牌手动派单,工单进入 QUEUED 排队
|
||||
// 2. 区域一致性校验 — 允许跨区域分配,支持灵活调度场景
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -87,4 +81,11 @@ public class CleanOrderBusinessStrategy implements OrderBusinessStrategy {
|
||||
log.info("[CleanStrategy] 升级优先级后置完成: orderId={}, newPriority={}, queueId={}",
|
||||
cmd.getOrderId(), newPriority, queueDTO.getId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterDispatch(DispatchOrderCommand cmd, OpsOrderDO order) {
|
||||
// TODO: 转派场景下(order.getAssigneeId() != null && !order.getAssigneeId().equals(cmd.getAssigneeId())),
|
||||
// 应向旧工牌发送震动/语音通知告知任务已转移,避免旧工牌持有者继续前往已无效的区域。
|
||||
// 实现参考:cleanOrderNotificationService.sendReassignNotification(oldBadgeId, order.getOrderCode())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
package com.viewsh.module.ops.environment.job;
|
||||
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyLong;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* 验证 CleanOrderAutoCancelJob 的五条不变量:
|
||||
* <ol>
|
||||
* <li>无候选 → 返回零结果,不触发取消</li>
|
||||
* <li>正常批次 → 依次 cancel,成功计数正确</li>
|
||||
* <li>单条失败不中断其余 → try/catch 隔离</li>
|
||||
* <li>候选到 cancel 间被用户触达 → 乐观锁跳过(避免误杀)</li>
|
||||
* <li>候选到 cancel 间状态变为终态/PAUSED → 跳过</li>
|
||||
* </ol>
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class CleanOrderAutoCancelJobTest {
|
||||
|
||||
@Mock
|
||||
private OpsOrderMapper opsOrderMapper;
|
||||
@Mock
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
|
||||
@InjectMocks
|
||||
private CleanOrderAutoCancelJob job;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ReflectionTestUtils.setField(job, "timeoutHours", 12);
|
||||
ReflectionTestUtils.setField(job, "batchSize", 200);
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenNoCandidates_shouldReturnZeroCounts() {
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(Collections.emptyList());
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(0, result.scanned());
|
||||
assertEquals(0, result.succeeded());
|
||||
assertEquals(0, result.failed());
|
||||
assertEquals(0, result.skippedStale());
|
||||
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenAllCandidatesStillStale_shouldCancelAll() {
|
||||
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
|
||||
OpsOrderDO a = stale(101L, "WO-101", WorkOrderStatusEnum.DISPATCHED, staleTime);
|
||||
OpsOrderDO b = stale(102L, "WO-102", WorkOrderStatusEnum.CONFIRMED, staleTime);
|
||||
OpsOrderDO c = stale(103L, "WO-103", WorkOrderStatusEnum.ARRIVED, staleTime);
|
||||
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(List.of(a, b, c));
|
||||
// Fresh fetch confirms all three are still stale
|
||||
when(opsOrderMapper.selectById(101L)).thenReturn(a);
|
||||
when(opsOrderMapper.selectById(102L)).thenReturn(b);
|
||||
when(opsOrderMapper.selectById(103L)).thenReturn(c);
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(3, result.scanned());
|
||||
assertEquals(3, result.succeeded());
|
||||
assertEquals(0, result.failed());
|
||||
assertEquals(0, result.skippedStale());
|
||||
verify(orderLifecycleManager, times(3))
|
||||
.cancelOrder(anyLong(), eq(null), eq(OperatorTypeEnum.SYSTEM), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenOneCancelThrows_shouldNotAbortBatch() {
|
||||
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
|
||||
OpsOrderDO a = stale(201L, "WO-201", WorkOrderStatusEnum.DISPATCHED, staleTime);
|
||||
OpsOrderDO b = stale(202L, "WO-202", WorkOrderStatusEnum.CONFIRMED, staleTime);
|
||||
OpsOrderDO c = stale(203L, "WO-203", WorkOrderStatusEnum.ARRIVED, staleTime);
|
||||
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(List.of(a, b, c));
|
||||
when(opsOrderMapper.selectById(201L)).thenReturn(a);
|
||||
when(opsOrderMapper.selectById(202L)).thenReturn(b);
|
||||
when(opsOrderMapper.selectById(203L)).thenReturn(c);
|
||||
// 第二条取消抛异常,不应影响第一、第三条。
|
||||
// 不能用 doThrow(...).when(mock).cancelOrder(eq(202L), ...)——strict stubs 会把"201L 调用和 202L 存根不匹配"判成错配。
|
||||
// 改用 doAnswer 按 orderId 路由,覆盖所有 cancel 调用。
|
||||
doAnswer(invocation -> {
|
||||
Long orderId = invocation.getArgument(0);
|
||||
if (orderId != null && orderId == 202L) {
|
||||
throw new IllegalStateException("状态机非法转换");
|
||||
}
|
||||
return null;
|
||||
}).when(orderLifecycleManager).cancelOrder(anyLong(), any(), any(), any());
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(3, result.scanned());
|
||||
assertEquals(2, result.succeeded());
|
||||
assertEquals(1, result.failed());
|
||||
assertEquals(0, result.skippedStale());
|
||||
verify(orderLifecycleManager).cancelOrder(eq(201L), any(), any(), any());
|
||||
verify(orderLifecycleManager).cancelOrder(eq(202L), any(), any(), any());
|
||||
verify(orderLifecycleManager).cancelOrder(eq(203L), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenOrderTouchedBeforeCancel_shouldSkipAsStale() {
|
||||
// 候选装内存时 update_time=13h ago,实际 cancel 前用户刚刚点确认,update_time 刷为"1 分钟前"。
|
||||
// 乐观校验应跳过,避免误杀已被触达的工单。
|
||||
LocalDateTime snapshotUpdate = LocalDateTime.now().minusHours(13);
|
||||
LocalDateTime freshUpdate = LocalDateTime.now().minusMinutes(1);
|
||||
|
||||
OpsOrderDO snapshot = stale(301L, "WO-301", WorkOrderStatusEnum.DISPATCHED, snapshotUpdate);
|
||||
OpsOrderDO fresh = stale(301L, "WO-301", WorkOrderStatusEnum.CONFIRMED, freshUpdate);
|
||||
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(List.of(snapshot));
|
||||
when(opsOrderMapper.selectById(301L)).thenReturn(fresh);
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(1, result.scanned());
|
||||
assertEquals(0, result.succeeded());
|
||||
assertEquals(1, result.skippedStale());
|
||||
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenOrderBecameTerminal_shouldSkip() {
|
||||
// 候选装内存时还是 ARRIVED,实际 cancel 前已被其他路径 forceComplete 为 COMPLETED
|
||||
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
|
||||
OpsOrderDO snapshot = stale(401L, "WO-401", WorkOrderStatusEnum.ARRIVED, staleTime);
|
||||
OpsOrderDO fresh = stale(401L, "WO-401", WorkOrderStatusEnum.COMPLETED, staleTime);
|
||||
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(List.of(snapshot));
|
||||
when(opsOrderMapper.selectById(401L)).thenReturn(fresh);
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(1, result.skippedStale());
|
||||
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void scanAndCancel_whenOrderBecamePaused_shouldSkip() {
|
||||
// 快照是 DISPATCHED,刚被 P0 打断成 PAUSED——此 Job 应放行给 resumeInterruptedOrder
|
||||
LocalDateTime staleTime = LocalDateTime.now().minusHours(13);
|
||||
OpsOrderDO snapshot = stale(501L, "WO-501", WorkOrderStatusEnum.DISPATCHED, staleTime);
|
||||
OpsOrderDO fresh = stale(501L, "WO-501", WorkOrderStatusEnum.PAUSED,
|
||||
LocalDateTime.now().minusHours(14)); // update_time 刚刷新,但仍<=threshold;状态变 PAUSED 就该跳过
|
||||
|
||||
when(opsOrderMapper.selectList(any(com.viewsh.framework.mybatis.core.query.LambdaQueryWrapperX.class)))
|
||||
.thenReturn(List.of(snapshot));
|
||||
when(opsOrderMapper.selectById(501L)).thenReturn(fresh);
|
||||
|
||||
CleanOrderAutoCancelJob.CancelResult result = job.scanAndCancel();
|
||||
|
||||
assertEquals(1, result.skippedStale());
|
||||
verify(orderLifecycleManager, never()).cancelOrder(anyLong(), any(), any(), any());
|
||||
}
|
||||
|
||||
// ==================== Helpers ====================
|
||||
|
||||
private OpsOrderDO stale(Long id, String code, WorkOrderStatusEnum status, LocalDateTime updateTime) {
|
||||
OpsOrderDO order = OpsOrderDO.builder()
|
||||
.id(id)
|
||||
.orderCode(code)
|
||||
.status(status.getStatus())
|
||||
.orderType("CLEAN")
|
||||
.build();
|
||||
order.setUpdateTime(updateTime);
|
||||
return order;
|
||||
}
|
||||
}
|
||||
@@ -26,6 +26,9 @@ public class BadgeStatusRespDTO {
|
||||
@Schema(description = "设备编码", example = "badge_001")
|
||||
private String deviceKey;
|
||||
|
||||
@Schema(description = "设备昵称(用户可读的显示名称)", example = "张三的工牌")
|
||||
private String nickname;
|
||||
|
||||
@Schema(description = "状态(IDLE/BUSY/OFFLINE/PAUSED)", example = "IDLE")
|
||||
private String status;
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
@@ -178,6 +179,22 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
public DispatchResult autoDispatchNext(Long completedOrderId, Long assigneeId) {
|
||||
log.info("任务完成后自动派发下一单: completedOrderId={}, assigneeId={}", completedOrderId, assigneeId);
|
||||
|
||||
if (assigneeId == null) {
|
||||
log.warn("autoDispatchNext 缺少执行人,跳过派发: completedOrderId={}", completedOrderId);
|
||||
return DispatchResult.success("缺少执行人,跳过派发", null);
|
||||
}
|
||||
|
||||
// 空闲校验:若执行人仍挂着其他活跃工单(DISPATCHED/CONFIRMED/ARRIVED/PAUSED),
|
||||
// 说明设备尚未真正空闲,不应再派发新任务——否则会触发"同一设备并行多单"的状态错乱,
|
||||
// 典型场景是管理员手动取消一个僵尸 DISPATCHED 单时,handleCancelled 会调到这里。
|
||||
List<OpsOrderDO> activeOrders = orderMapper.selectActiveByAssignee(assigneeId, completedOrderId);
|
||||
if (!activeOrders.isEmpty()) {
|
||||
OpsOrderDO head = activeOrders.get(0);
|
||||
log.info("执行人仍有活跃工单,跳过自动派发: assigneeId={}, completedOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
|
||||
assigneeId, completedOrderId, activeOrders.size(), head.getId(), head.getStatus());
|
||||
return DispatchResult.success("执行人非空闲,跳过派发", assigneeId);
|
||||
}
|
||||
|
||||
Long fallbackAreaId = null;
|
||||
OpsOrderDO completedOrder = orderMapper.selectById(completedOrderId);
|
||||
if (completedOrder != null) {
|
||||
@@ -229,7 +246,9 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.reason("等待队列动态重排后自动派发")
|
||||
.build();
|
||||
|
||||
OrderTransitionResult result = orderLifecycleManager.transition(request);
|
||||
// 走 dispatch() 而不是 transition():dispatch 内部会先做 FOR UPDATE 不变量检查
|
||||
// (Bug #2 防线),避免 autoDispatchNext 在"从队列派发"这一类入口绕过串行化。
|
||||
OrderTransitionResult result = orderLifecycleManager.dispatch(request);
|
||||
|
||||
if (result.isSuccess()) {
|
||||
return DispatchResult.success("已按队列总分派发下一单", assigneeId);
|
||||
@@ -346,6 +365,23 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
Long orderId = context.getOrderId();
|
||||
Long assigneeId = context.getRecommendedAssigneeId();
|
||||
|
||||
// 兜底校验:调度策略基于 Redis 的设备状态判空闲,可能与 MySQL 的 ops_order 实际活跃态不一致
|
||||
// (例如设备 Redis 状态被某次 COMPLETED 清回 IDLE 但历史 CONFIRMED/DISPATCHED 单仍残留)。
|
||||
// 若分配路径会真正推送工单给设备(DIRECT_DISPATCH / PUSH_AND_ENQUEUE),
|
||||
// 此处再查一次 MySQL,非空闲时强制降级到 ENQUEUE_ONLY,避免同一设备并行多单的状态错乱。
|
||||
if (assigneeId != null
|
||||
&& (decision.getPath() == DispatchPath.DIRECT_DISPATCH
|
||||
|| decision.getPath() == DispatchPath.PUSH_AND_ENQUEUE)) {
|
||||
List<OpsOrderDO> activeOrders = orderMapper.selectActiveByAssignee(assigneeId, orderId);
|
||||
if (!activeOrders.isEmpty()) {
|
||||
OpsOrderDO head = activeOrders.get(0);
|
||||
log.warn("调度决策为 {} 但执行人仍挂活跃工单,降级为仅入队: orderId={}, assigneeId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
|
||||
decision.getPath(), orderId, assigneeId,
|
||||
activeOrders.size(), head.getId(), head.getStatus());
|
||||
return executeEnqueueOnly(context, assigneeId);
|
||||
}
|
||||
}
|
||||
|
||||
switch (decision.getPath()) {
|
||||
case DIRECT_DISPATCH:
|
||||
return executeDirectDispatch(context, assigneeId);
|
||||
@@ -402,9 +438,25 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
DispatchPath.DIRECT_DISPATCH,
|
||||
result.getQueueId()
|
||||
);
|
||||
} else {
|
||||
return DispatchResult.fail("直接派单失败: " + result.getMessage());
|
||||
}
|
||||
|
||||
// 并发冲突兜底:dispatch 入口的 FOR UPDATE 判定执行人已有活跃工单,
|
||||
// 此时工单仍在原状态(通常是 PENDING)。如果仍是 PENDING,直接降级为入队,
|
||||
// 避免工单悬空;若已是 QUEUED(例如从队列派发被抢先),则让它继续留在队列等下一轮。
|
||||
if (result.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
|
||||
OpsOrderDO order = orderMapper.selectById(context.getOrderId());
|
||||
String currentStatus = order != null ? order.getStatus() : null;
|
||||
if (WorkOrderStatusEnum.QUEUED.getStatus().equals(currentStatus)) {
|
||||
log.warn("直接派单被 FOR UPDATE 拒绝且工单已在队列中,保持 QUEUED 等待下一轮: orderId={}, assigneeId={}",
|
||||
context.getOrderId(), assigneeId);
|
||||
return DispatchResult.fail("并发冲突,已留在队列等待: " + result.getMessage());
|
||||
}
|
||||
log.warn("直接派单被 FOR UPDATE 拒绝,降级为入队: orderId={}, assigneeId={}, reason={}",
|
||||
context.getOrderId(), assigneeId, result.getMessage());
|
||||
return executeEnqueueOnly(context, assigneeId);
|
||||
}
|
||||
|
||||
return DispatchResult.fail("直接派单失败: " + result.getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -427,8 +479,15 @@ public class DispatchEngineImpl implements DispatchEngine {
|
||||
.reason("自动推送等待任务")
|
||||
.build();
|
||||
|
||||
orderLifecycleManager.dispatch(dispatchRequest);
|
||||
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
|
||||
OrderTransitionResult pushResult = orderLifecycleManager.dispatch(dispatchRequest);
|
||||
if (pushResult.isSuccess()) {
|
||||
log.info("已推送等待任务: taskId={}", firstWaiting.getId());
|
||||
} else {
|
||||
// 可能被 dispatch() 里的 FOR UPDATE 拒绝:此处不中断新任务入队流程,
|
||||
// 但要把"推送失败"清晰落在日志里,避免 "已推送" 说谎误导运维排查。
|
||||
log.warn("推送等待任务失败,继续执行新任务入队: taskId={}, orderId={}, error={}",
|
||||
firstWaiting.getId(), firstWaiting.getOpsOrderId(), pushResult.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
// 新任务入队
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
package com.viewsh.module.ops.core.event;
|
||||
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
/**
|
||||
* 工单状态转换"尝试"领域事件
|
||||
* <p>
|
||||
* 与 {@link OrderStateChangedEvent} 的区别:
|
||||
* <ul>
|
||||
* <li>{@code OrderStateChangedEvent} 仅在状态转换 <b>成功</b> 后发布(EventPublishHandler),
|
||||
* 订阅方是业务层监听器(TTS 播报、设备状态同步等)。</li>
|
||||
* <li>{@code OrderTransitionAttemptedEvent} 在每一次 transition 尝试时都发布——成功、失败、
|
||||
* FOR UPDATE 被拒 都发。订阅方是审计日志,用于打穿事务回滚造成的审计断链
|
||||
* (rollback 场景下 ops_order_event 无记录,bus_log 需独立事务补齐)。</li>
|
||||
* </ul>
|
||||
* 事务边界:
|
||||
* <ul>
|
||||
* <li>发布方在主事务内 {@code publishEvent},事件会被 Spring 挂在当前事务的 synchronization 上。</li>
|
||||
* <li>订阅方用 {@code @TransactionalEventListener(AFTER_COMMIT)} 或 {@code AFTER_ROLLBACK}
|
||||
* 分别处理 commit 与 rollback 场景,保证两种结果都留痕。</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class OrderTransitionAttemptedEvent {
|
||||
|
||||
/** 工单ID */
|
||||
private Long orderId;
|
||||
|
||||
/** 工单类型(CLEAN / SECURITY / REPAIR / SERVICE) */
|
||||
private String orderType;
|
||||
|
||||
/** 工单编号(冗余,便于日志检索) */
|
||||
private String orderCode;
|
||||
|
||||
/** 原状态(查询时的当前状态) */
|
||||
private WorkOrderStatusEnum fromStatus;
|
||||
|
||||
/** 目标状态 */
|
||||
private WorkOrderStatusEnum targetStatus;
|
||||
|
||||
/** 执行人ID */
|
||||
private Long assigneeId;
|
||||
|
||||
/** 操作人类型 */
|
||||
private OperatorTypeEnum operatorType;
|
||||
|
||||
/** 操作人ID */
|
||||
private Long operatorId;
|
||||
|
||||
/** 原因/备注 */
|
||||
private String reason;
|
||||
|
||||
/**
|
||||
* 发布时的"声明结果"。
|
||||
* <p>
|
||||
* 注意:这是发布瞬间的判断;如果后续 handler 抛异常导致整个事务 rollback,
|
||||
* 监听器在 {@code AFTER_ROLLBACK} 阶段应强制将其视为失败。
|
||||
*/
|
||||
private boolean success;
|
||||
|
||||
/** 失败错误码(success=false 时有值) */
|
||||
private TransitionErrorCode errorCode;
|
||||
|
||||
/** 失败原因(简要消息,success=false 时有值) */
|
||||
private String errorMessage;
|
||||
|
||||
/** 异常摘要(success=false 且存在异常时有值,只保留 class + message,不带堆栈) */
|
||||
private String causeSummary;
|
||||
|
||||
/** 事件时间 */
|
||||
private LocalDateTime attemptedAt;
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.viewsh.module.ops.core.lifecycle;
|
||||
|
||||
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
|
||||
import com.viewsh.module.ops.core.lifecycle.handler.EventPublishHandler;
|
||||
import com.viewsh.module.ops.core.lifecycle.handler.QueueSyncHandler;
|
||||
import com.viewsh.module.ops.core.lifecycle.handler.StateTransitionHandler;
|
||||
@@ -7,6 +8,10 @@ import com.viewsh.module.ops.core.lifecycle.handler.TransitionHandler;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionContext;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
@@ -62,6 +67,9 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
@Resource
|
||||
private EventLogRecorder eventLogRecorder;
|
||||
|
||||
@Resource
|
||||
private ApplicationEventPublisher applicationEventPublisher;
|
||||
|
||||
/**
|
||||
* 责任链处理器
|
||||
*/
|
||||
@@ -101,10 +109,15 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
// 4. 检查结果
|
||||
if (context.hasError()) {
|
||||
log.error("状态转换失败: orderId={}, error={}", order.getId(), context.getErrorMessage());
|
||||
publishAttempt(order, oldStatus, request, false,
|
||||
TransitionErrorCode.INVALID_TRANSITION,
|
||||
context.getErrorMessage(),
|
||||
summarizeThrowable(context.getCause()));
|
||||
return OrderTransitionResult.fail(order.getId(), context.getErrorMessage());
|
||||
}
|
||||
|
||||
log.info("状态转换成功: orderId={}, {} -> {}", order.getId(), oldStatus, request.getTargetStatus());
|
||||
publishAttempt(order, oldStatus, request, true, null, null, null);
|
||||
return OrderTransitionResult.builder()
|
||||
.success(true)
|
||||
.orderId(order.getId())
|
||||
@@ -142,6 +155,35 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
// 设置目标状态
|
||||
request.setTargetStatus(WorkOrderStatusEnum.DISPATCHED);
|
||||
|
||||
// 业务不变量:同一执行人在任一时刻最多只能有 1 条活跃工单
|
||||
// (DISPATCHED/CONFIRMED/ARRIVED)。PAUSED 不纳入——P0 打断恢复走的就是
|
||||
// PAUSED→DISPATCHED,此处放行。对命中行加 FOR UPDATE,配合 @Transactional
|
||||
// 串行化并发派发;命中则本次派发被拒,由调用方决定降级策略
|
||||
// (DispatchEngineImpl.executeDirectDispatch 会降级为入队)。
|
||||
if (request.getAssigneeId() != null) {
|
||||
java.util.List<OpsOrderDO> activeOrders = opsOrderMapper.selectActiveByAssigneeForUpdate(
|
||||
request.getAssigneeId(), request.getOrderId());
|
||||
if (!activeOrders.isEmpty()) {
|
||||
OpsOrderDO head = activeOrders.get(0);
|
||||
String msg = "执行人已有活跃工单: orderId=" + head.getId() + ", status=" + head.getStatus();
|
||||
log.warn("派发被拒:执行人已有活跃工单: assigneeId={}, requestOrderId={}, activeCount={}, sampleOrderId={}, sampleStatus={}",
|
||||
request.getAssigneeId(), request.getOrderId(),
|
||||
activeOrders.size(), head.getId(), head.getStatus());
|
||||
|
||||
// 审计:记录"派发被拒"尝试,AFTER_COMMIT 监听器会写 bus_log
|
||||
OpsOrderDO subject = opsOrderMapper.selectById(request.getOrderId());
|
||||
WorkOrderStatusEnum fromStatus = subject != null
|
||||
? WorkOrderStatusEnum.valueOf(subject.getStatus()) : null;
|
||||
publishAttempt(subject != null ? subject : head, fromStatus, request, false,
|
||||
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER, msg, null);
|
||||
|
||||
return OrderTransitionResult.fail(
|
||||
request.getOrderId(),
|
||||
msg,
|
||||
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER);
|
||||
}
|
||||
}
|
||||
|
||||
// 派单时更新工单的 assigneeId(从 PENDING -> DISPATCHED)
|
||||
if (request.getAssigneeId() != null) {
|
||||
OpsOrderDO order = opsOrderMapper.selectById(request.getOrderId());
|
||||
@@ -188,17 +230,22 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
public void resumeOrder(Long orderId, Long operatorId) {
|
||||
log.info("开始恢复工单: orderId={}, operatorId={}", orderId, operatorId);
|
||||
|
||||
// 构建请求
|
||||
// 取出工单自身的 assigneeId 透传给 dispatch,使其 FOR UPDATE 不变量检查生效——
|
||||
// 否则 P0 恢复与并发派发竞争时可能再出现"同一 assignee 两条 DISPATCHED"。
|
||||
// assigneeId == null 的异常态(工单已卸人)下 dispatch 会跳过该检查,行为退化为原 transition。
|
||||
OpsOrderDO order = opsOrderMapper.selectById(orderId);
|
||||
Long assigneeId = order != null ? order.getAssigneeId() : null;
|
||||
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(orderId)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.assigneeId(assigneeId)
|
||||
.operatorType(OperatorTypeEnum.CLEANER)
|
||||
.operatorId(operatorId)
|
||||
.reason("恢复工单")
|
||||
.build();
|
||||
|
||||
// 执行状态转换
|
||||
OrderTransitionResult result = transition(request);
|
||||
OrderTransitionResult result = dispatch(request);
|
||||
|
||||
if (!result.isSuccess()) {
|
||||
throw new IllegalStateException("恢复工单失败: " + result.getMessage());
|
||||
@@ -409,4 +456,49 @@ public class OrderLifecycleManagerImpl implements OrderLifecycleManager {
|
||||
|| WorkOrderStatusEnum.ARRIVED == status;
|
||||
}
|
||||
|
||||
/**
|
||||
* 发布状态转换尝试事件,覆盖成功、普通失败、并发冲突三种情况。
|
||||
* 订阅方 {@code OrderTransitionAuditListener} 在 AFTER_COMMIT/AFTER_ROLLBACK
|
||||
* 阶段落 bus_log,保证事务回滚不断链。
|
||||
*/
|
||||
private void publishAttempt(OpsOrderDO order, WorkOrderStatusEnum fromStatus,
|
||||
OrderTransitionRequest request, boolean success,
|
||||
TransitionErrorCode errorCode, String errorMessage,
|
||||
String causeSummary) {
|
||||
try {
|
||||
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(order != null ? order.getId() : request.getOrderId())
|
||||
.orderType(order != null ? order.getOrderType() : null)
|
||||
.orderCode(order != null ? order.getOrderCode() : null)
|
||||
.fromStatus(fromStatus)
|
||||
.targetStatus(request.getTargetStatus())
|
||||
.assigneeId(request.getAssigneeId())
|
||||
.operatorType(request.getOperatorType())
|
||||
.operatorId(request.getOperatorId())
|
||||
.reason(request.getReason())
|
||||
.success(success)
|
||||
.errorCode(errorCode)
|
||||
.errorMessage(errorMessage)
|
||||
.causeSummary(causeSummary)
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
} catch (Exception e) {
|
||||
// 审计事件发布失败不应影响主流程
|
||||
log.error("发布转换尝试事件失败: orderId={}, targetStatus={}",
|
||||
request.getOrderId(), request.getTargetStatus(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 摘要异常:只保留类名 + message,不带堆栈,防止 bus_log 爆炸。
|
||||
*/
|
||||
private String summarizeThrowable(Throwable t) {
|
||||
if (t == null) {
|
||||
return null;
|
||||
}
|
||||
String msg = t.getMessage();
|
||||
return t.getClass().getSimpleName() + (msg != null ? ": " + msg : "");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
package com.viewsh.module.ops.core.lifecycle.audit;
|
||||
|
||||
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.event.TransactionPhase;
|
||||
import org.springframework.transaction.event.TransactionalEventListener;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 工单状态转换尝试审计监听器。
|
||||
* <p>
|
||||
* 闭环设计:
|
||||
* <ul>
|
||||
* <li><b>AFTER_COMMIT</b>:主事务成功提交,按事件本身的 success 标志写 bus_log。</li>
|
||||
* <li><b>AFTER_ROLLBACK</b>:主事务已回滚——事件里的数据(ops_order_event 等)全部消失。
|
||||
* 此时必须新开一个独立事务写 bus_log,否则审计链断裂。</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* 字段归位:
|
||||
* <ul>
|
||||
* <li>{@code eventLevel}:成功=INFO;失败=WARN(冲突被拒)或 ERROR(状态机异常)</li>
|
||||
* <li>{@code eventDomain}:统一用 DISPATCH(派发域),便于运维按域聚合</li>
|
||||
* <li>{@code eventType}:成功→业务 LogType(如 ORDER_DISPATCHED);失败→TRANSITION_FAILED
|
||||
* 或 DISPATCH_REJECTED</li>
|
||||
* <li>{@code eventPayload}:errorCode / fromStatus / targetStatus / operatorType / reason / cause</li>
|
||||
* </ul>
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class OrderTransitionAuditListener {
|
||||
|
||||
@Resource
|
||||
private EventLogRecorder eventLogRecorder;
|
||||
|
||||
/**
|
||||
* 主事务已提交:仅在转换异常路径下写 bus_log。
|
||||
* <p>
|
||||
* 取舍:成功路径不再写"状态转换成功"镜像记录——业务详情已由各条线 EventListener
|
||||
* (如 CleanOrderEventListener 的 ORDER_DISPATCHED 等)和 ops_order_event 表覆盖,
|
||||
* 此处镜像在 bus_log 形成噪声且与业务日志重复。仅保留派发被拒(DISPATCH_REJECTED)
|
||||
* 等运维需追溯的异常类型,便于审计真正的"为什么失败"。
|
||||
* <p>
|
||||
* fallbackExecution=true:在无事务上下文时也执行(如测试、跨线程补写场景)。
|
||||
*/
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
|
||||
public void onAfterCommit(OrderTransitionAttemptedEvent event) {
|
||||
if (event.isSuccess()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/false));
|
||||
} catch (Exception e) {
|
||||
log.error("[TransitionAudit] AFTER_COMMIT 写 bus_log 失败: orderId={}, success={}, errorCode={}",
|
||||
event.getOrderId(), event.isSuccess(), event.getErrorCode(), e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 主事务已回滚:无论事件里声称 success 与否,这次"尝试"都**实际未落库**。
|
||||
* 必须开独立事务写 bus_log,否则日志也会因同事务回滚而丢失。
|
||||
*/
|
||||
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
|
||||
public void onAfterRollback(OrderTransitionAttemptedEvent event) {
|
||||
writeRollbackAudit(event);
|
||||
}
|
||||
|
||||
/**
|
||||
* 写入"事务已回滚"的审计记录。
|
||||
* <p>
|
||||
* 不加 @Transactional:AFTER_ROLLBACK 阶段主事务已彻底结束,当前线程无活跃事务;
|
||||
* 且本方法由 onAfterRollback 自调用,Spring 代理不会拦截,加注解也是死注解。
|
||||
* 实际行为:eventLogRecorder.recordSync 的 insert 在 auto-commit 模式下单条提交,
|
||||
* 失败只丢这一行审计、不影响主业务(主业务早已回滚并报错给调用方)。
|
||||
*/
|
||||
public void writeRollbackAudit(OrderTransitionAttemptedEvent event) {
|
||||
try {
|
||||
eventLogRecorder.recordSync(toRecord(event, /*rolledBack=*/true));
|
||||
} catch (Exception e) {
|
||||
log.error("[TransitionAudit] AFTER_ROLLBACK 写 bus_log 失败: orderId={}, targetStatus={}",
|
||||
event.getOrderId(), event.getTargetStatus(), e);
|
||||
}
|
||||
}
|
||||
|
||||
// ==================== 私有映射方法 ====================
|
||||
|
||||
private EventLogRecord toRecord(OrderTransitionAttemptedEvent event, boolean rolledBack) {
|
||||
// rolledBack=true 时强制视为失败:即便发布时声明 success=true,
|
||||
// 事务 rollback 说明写入未真正生效。
|
||||
boolean success = event.isSuccess() && !rolledBack;
|
||||
|
||||
EventLevel level = success ? EventLevel.INFO
|
||||
: (event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER
|
||||
? EventLevel.WARN : EventLevel.ERROR);
|
||||
|
||||
String eventTypeCode = resolveEventTypeCode(event, success, rolledBack);
|
||||
|
||||
Map<String, Object> payload = new HashMap<>();
|
||||
payload.put("fromStatus", event.getFromStatus() != null ? event.getFromStatus().getStatus() : null);
|
||||
payload.put("targetStatus", event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : null);
|
||||
payload.put("operatorType", event.getOperatorType() != null ? event.getOperatorType().getType() : null);
|
||||
payload.put("reason", event.getReason());
|
||||
payload.put("success", success);
|
||||
payload.put("rolledBack", rolledBack);
|
||||
if (event.getErrorCode() != null) {
|
||||
payload.put("errorCode", event.getErrorCode().name());
|
||||
}
|
||||
if (event.getErrorMessage() != null) {
|
||||
payload.put("errorMessage", event.getErrorMessage());
|
||||
}
|
||||
if (event.getCauseSummary() != null) {
|
||||
payload.put("cause", event.getCauseSummary());
|
||||
}
|
||||
if (event.getOrderCode() != null) {
|
||||
payload.put("orderCode", event.getOrderCode());
|
||||
}
|
||||
|
||||
String message = buildMessage(event, success, rolledBack);
|
||||
|
||||
return EventLogRecord.builder()
|
||||
.module(LogModule.fromOrderType(event.getOrderType()))
|
||||
.domain(EventDomain.DISPATCH)
|
||||
.eventType(eventTypeCode)
|
||||
.level(level)
|
||||
.message(message)
|
||||
.targetId(event.getOrderId())
|
||||
.targetType("order")
|
||||
.deviceId(event.getAssigneeId())
|
||||
.personId(event.getOperatorId())
|
||||
.payload(payload)
|
||||
.eventTime(event.getAttemptedAt())
|
||||
.build();
|
||||
}
|
||||
|
||||
private String resolveEventTypeCode(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
|
||||
if (!success && event.getErrorCode() == TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER) {
|
||||
return LogType.DISPATCH_REJECTED.getCode();
|
||||
}
|
||||
if (!success) {
|
||||
return LogType.TRANSITION_FAILED.getCode();
|
||||
}
|
||||
// 成功场景:按目标状态映射到业务 LogType;ops_order_event 已有时间轴,
|
||||
// 这里 bus_log 仅作宽表镜像,便于运维按 domain/module 聚合查询。
|
||||
if (event.getTargetStatus() == null) {
|
||||
return LogType.SYSTEM_EVENT.getCode();
|
||||
}
|
||||
return switch (event.getTargetStatus()) {
|
||||
case QUEUED -> LogType.ORDER_QUEUED.getCode();
|
||||
case DISPATCHED -> LogType.ORDER_DISPATCHED.getCode();
|
||||
case CONFIRMED -> LogType.ORDER_CONFIRM.getCode();
|
||||
case ARRIVED -> LogType.ORDER_ARRIVED.getCode();
|
||||
case PAUSED -> LogType.ORDER_PAUSED.getCode();
|
||||
case COMPLETED -> LogType.ORDER_COMPLETED.getCode();
|
||||
case CANCELLED -> LogType.ORDER_CANCELLED.getCode();
|
||||
default -> LogType.SYSTEM_EVENT.getCode();
|
||||
};
|
||||
}
|
||||
|
||||
private String buildMessage(OrderTransitionAttemptedEvent event, boolean success, boolean rolledBack) {
|
||||
String from = event.getFromStatus() != null ? event.getFromStatus().getStatus() : "?";
|
||||
String to = event.getTargetStatus() != null ? event.getTargetStatus().getStatus() : "?";
|
||||
if (success) {
|
||||
return String.format("状态转换成功: %s -> %s", from, to);
|
||||
}
|
||||
String prefix = rolledBack ? "状态转换回滚" : "状态转换失败";
|
||||
String detail = event.getErrorMessage() != null ? event.getErrorMessage() : "";
|
||||
return String.format("%s: %s -> %s %s", prefix, from, to, detail).trim();
|
||||
}
|
||||
}
|
||||
@@ -47,6 +47,14 @@ public class OrderTransitionResult {
|
||||
*/
|
||||
private Long queueId;
|
||||
|
||||
/**
|
||||
* 失败错误码(仅 success=false 时有值)
|
||||
* <p>
|
||||
* 调用方可据此区分需降级的失败(如 ASSIGNEE_HAS_ACTIVE_ORDER)与硬失败,
|
||||
* 未显式设置时默认为 {@link TransitionErrorCode#OTHER}。
|
||||
*/
|
||||
private TransitionErrorCode errorCode;
|
||||
|
||||
/**
|
||||
* 成功结果
|
||||
*/
|
||||
@@ -81,6 +89,7 @@ public class OrderTransitionResult {
|
||||
return OrderTransitionResult.builder()
|
||||
.success(false)
|
||||
.message(message)
|
||||
.errorCode(TransitionErrorCode.OTHER)
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -92,6 +101,19 @@ public class OrderTransitionResult {
|
||||
.success(false)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.errorCode(TransitionErrorCode.OTHER)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 失败结果(带工单ID 和错误码)
|
||||
*/
|
||||
public static OrderTransitionResult fail(Long orderId, String message, TransitionErrorCode errorCode) {
|
||||
return OrderTransitionResult.builder()
|
||||
.success(false)
|
||||
.orderId(orderId)
|
||||
.message(message)
|
||||
.errorCode(errorCode)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.viewsh.module.ops.core.lifecycle.model;
|
||||
|
||||
/**
|
||||
* 状态转换失败的错误码
|
||||
* <p>
|
||||
* 用于调用方区分可恢复/需降级的失败场景(如并发冲突)与真正的硬失败(状态机非法转换等),
|
||||
* 避免把"可降级"的结果误当成硬错误直接向用户暴露。
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
public enum TransitionErrorCode {
|
||||
|
||||
/**
|
||||
* 执行人已有活跃工单(DISPATCHED/CONFIRMED/ARRIVED),不应再派发。
|
||||
* <p>
|
||||
* 发生在 OrderLifecycleManager.dispatch 入口的 FOR UPDATE 兜底检查命中时。
|
||||
* 调用方应将工单降级到 QUEUED(入队等待下一轮动态派发),避免 PENDING 状态悬空。
|
||||
*/
|
||||
ASSIGNEE_HAS_ACTIVE_ORDER,
|
||||
|
||||
/**
|
||||
* 状态机不允许此转换(非法的状态流转)
|
||||
*/
|
||||
INVALID_TRANSITION,
|
||||
|
||||
/**
|
||||
* 工单不存在
|
||||
*/
|
||||
ORDER_NOT_FOUND,
|
||||
|
||||
/**
|
||||
* 其他失败(无特定归类)
|
||||
*/
|
||||
OTHER;
|
||||
}
|
||||
@@ -72,7 +72,19 @@ public class ManualOrderActionFacade {
|
||||
boolean idle = strategy.isAssigneeIdle(cmd, order);
|
||||
WorkOrderStatusEnum targetStatus = idle ? WorkOrderStatusEnum.DISPATCHED : WorkOrderStatusEnum.QUEUED;
|
||||
|
||||
// 4. 状态变更
|
||||
// 4. 提前写入执行人字段
|
||||
// 注:必须在 transition() 之前完成。transition() 在事务内同步发布 OrderStateChangedEvent,
|
||||
// BadgeDeviceStatusEventListener 会再次 selectById 拿 assigneeId 决定是否写 Redis 工单关联;
|
||||
// 若此处后置则事件触发时 assigneeId 仍为 null,工牌按键查询 (assigneeDeviceId) 永远查不到工单。
|
||||
// 同时写 assigneeDeviceId,与 OrderLifecycleManagerImpl.dispatch() 自动派单路径对齐。
|
||||
OpsOrderDO assigneeUpdate = new OpsOrderDO();
|
||||
assigneeUpdate.setId(cmd.getOrderId());
|
||||
assigneeUpdate.setAssigneeId(cmd.getAssigneeId());
|
||||
assigneeUpdate.setAssigneeName(cmd.getAssigneeName());
|
||||
assigneeUpdate.setAssigneeDeviceId(cmd.getAssigneeId());
|
||||
opsOrderMapper.updateById(assigneeUpdate);
|
||||
|
||||
// 5. 状态变更
|
||||
OrderTransitionRequest request = OrderTransitionRequest.builder()
|
||||
.orderId(cmd.getOrderId())
|
||||
.targetStatus(targetStatus)
|
||||
@@ -89,13 +101,6 @@ public class ManualOrderActionFacade {
|
||||
throw new IllegalStateException("手动派单失败: " + result.getMessage());
|
||||
}
|
||||
|
||||
// 5. 更新主表执行人(只更新 assignee 字段,避免覆盖状态机已写入的 status)
|
||||
OpsOrderDO assigneeUpdate = new OpsOrderDO();
|
||||
assigneeUpdate.setId(cmd.getOrderId());
|
||||
assigneeUpdate.setAssigneeId(cmd.getAssigneeId());
|
||||
assigneeUpdate.setAssigneeName(cmd.getAssigneeName());
|
||||
opsOrderMapper.updateById(assigneeUpdate);
|
||||
|
||||
// 6. 条线后置
|
||||
// 注:业务日志由生命周期事件 → 条线 EventListener 统一记录,此处不重复写
|
||||
strategy.afterDispatch(cmd, order);
|
||||
|
||||
@@ -54,7 +54,7 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据用户ID查询队列列表
|
||||
* 根据用户ID查询队列列表(含历史 REMOVED 记录,通常用于审计/统计)
|
||||
*/
|
||||
default List<OpsOrderQueueDO> selectListByUserId(Long userId) {
|
||||
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
|
||||
@@ -62,6 +62,19 @@ public interface OpsOrderQueueMapper extends BaseMapperX<OpsOrderQueueDO> {
|
||||
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据用户ID查询活跃队列列表(仅 WAITING/PROCESSING/PAUSED,排除 REMOVED/已终态)
|
||||
* <p>
|
||||
* 同步到 Redis、计算队列长度、查询当前任务等场景应走此方法,避免
|
||||
* 将历史 REMOVED 记录同步到 Redis 造成 ZSet / Hash 膨胀。
|
||||
*/
|
||||
default List<OpsOrderQueueDO> selectActiveListByUserId(Long userId) {
|
||||
return selectList(new LambdaQueryWrapperX<OpsOrderQueueDO>()
|
||||
.eq(OpsOrderQueueDO::getUserId, userId)
|
||||
.in(OpsOrderQueueDO::getQueueStatus, "WAITING", "PROCESSING", "PAUSED")
|
||||
.orderByDesc(OpsOrderQueueDO::getEnqueueTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据用户ID和状态查询队列列表
|
||||
* 用于强制从 MySQL 读取最新数据
|
||||
|
||||
@@ -92,6 +92,80 @@ public interface OpsOrderMapper extends BaseMapperX<OpsOrderDO> {
|
||||
.last("LIMIT 1"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询执行人名下尚未结束的工单(DISPATCHED/CONFIRMED/ARRIVED/PAUSED)
|
||||
* <p>
|
||||
* 用于 autoDispatchNext 等调度入口的空闲校验:若该执行人仍挂着活跃工单,
|
||||
* 则不应再派发新任务,避免"越清越多"的级联派发。
|
||||
*
|
||||
* @param assigneeId 执行人ID(工牌设备ID)
|
||||
* @param excludeOrderId 需要排除的工单ID(通常是刚完成/取消触发本次调度的工单),可传 null
|
||||
* @return 活跃工单列表,按创建时间升序
|
||||
*/
|
||||
default List<OpsOrderDO> selectActiveByAssignee(Long assigneeId, Long excludeOrderId) {
|
||||
return selectList(new LambdaQueryWrapperX<OpsOrderDO>()
|
||||
.eq(OpsOrderDO::getAssigneeId, assigneeId)
|
||||
.in(OpsOrderDO::getStatus,
|
||||
WorkOrderStatusEnum.DISPATCHED.getStatus(),
|
||||
WorkOrderStatusEnum.CONFIRMED.getStatus(),
|
||||
WorkOrderStatusEnum.ARRIVED.getStatus(),
|
||||
WorkOrderStatusEnum.PAUSED.getStatus())
|
||||
.ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId)
|
||||
.orderByAsc(OpsOrderDO::getCreateTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询执行人名下"正在执行"的工单,并对命中行加行锁(SELECT ... FOR UPDATE)
|
||||
* <p>
|
||||
* 与 {@link #selectActiveByAssignee} 的区别:
|
||||
* <ul>
|
||||
* <li><b>不含 PAUSED</b>——PAUSED 代表 P0 打断后挂起的旧任务,不占用"当前时间片",
|
||||
* 派发时(如 P0 结束后恢复)不应被它阻塞</li>
|
||||
* <li>结果行加 FOR UPDATE 排他锁,用于 dispatch 入口做业务不变量校验:
|
||||
* "同一执行人在任一时刻最多只能有 1 条活跃工单"。</li>
|
||||
* </ul>
|
||||
* 必须在事务中调用,否则锁无意义。
|
||||
*
|
||||
* @param assigneeId 执行人ID
|
||||
* @param excludeOrderId 排除的工单ID(通常是本次正在派发的工单本身)
|
||||
* @return 命中的活跃工单列表(通常空列表表示可派发)
|
||||
*/
|
||||
default List<OpsOrderDO> selectActiveByAssigneeForUpdate(Long assigneeId, Long excludeOrderId) {
|
||||
return selectList(new LambdaQueryWrapperX<OpsOrderDO>()
|
||||
.eq(OpsOrderDO::getAssigneeId, assigneeId)
|
||||
.in(OpsOrderDO::getStatus,
|
||||
WorkOrderStatusEnum.DISPATCHED.getStatus(),
|
||||
WorkOrderStatusEnum.CONFIRMED.getStatus(),
|
||||
WorkOrderStatusEnum.ARRIVED.getStatus())
|
||||
.ne(excludeOrderId != null, OpsOrderDO::getId, excludeOrderId)
|
||||
.orderByAsc(OpsOrderDO::getCreateTime)
|
||||
.last("FOR UPDATE"));
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询执行人最近一条已完成工单的区域(用于楼层基准兜底)
|
||||
* <p>
|
||||
* 用途:{@code OrderQueueServiceEnhanced.resolveBaselineAreaId} 的二级兜底。
|
||||
* 当执行人当前没有 PROCESSING 工单时(短暂空闲),用最近完成的那一单的
|
||||
* 区域作为"物理位置推断",保证楼层差评分在空闲期仍然生效。
|
||||
* <p>
|
||||
* 时间窗:通过 {@code since} 过滤,超过窗口仍空闲则认为轨迹失效,
|
||||
* 返回 null 让调用方降级到更外层的兜底(fallbackAreaId 或无楼层模式)。
|
||||
*
|
||||
* @param assigneeId 执行人ID
|
||||
* @param since 只考虑 updateTime 晚于此时间的工单(如 now - 24h)
|
||||
* @return 最近一条 COMPLETED 工单的 areaId;无匹配返回 null
|
||||
*/
|
||||
default Long selectLatestCompletedAreaIdByAssignee(Long assigneeId, LocalDateTime since) {
|
||||
OpsOrderDO order = selectOne(new LambdaQueryWrapperX<OpsOrderDO>()
|
||||
.eq(OpsOrderDO::getAssigneeId, assigneeId)
|
||||
.eq(OpsOrderDO::getStatus, WorkOrderStatusEnum.COMPLETED.getStatus())
|
||||
.ge(since != null, OpsOrderDO::getUpdateTime, since)
|
||||
.orderByDesc(OpsOrderDO::getUpdateTime)
|
||||
.last("LIMIT 1"));
|
||||
return order != null ? order.getAreaId() : null;
|
||||
}
|
||||
|
||||
// ==================== 统计聚合查询 ====================
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,93 +1,160 @@
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 工单编号生成器
|
||||
* <p>
|
||||
* 格式:{业务前缀}-{日期}-{序号}
|
||||
* 例如:CLEAN-20250119-0001, SECURITY-20250119-0001
|
||||
* <p>
|
||||
* 特性:
|
||||
* - 使用 Redis 保证序号唯一性
|
||||
* - 序号每日自动重置(按日期分 key)
|
||||
* - 不同业务类型独立计数
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class OrderCodeGenerator {
|
||||
|
||||
private static final String KEY_PREFIX = "ops:order:code:";
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final long DEFAULT_EXPIRE_DAYS = 7; // key 默认保留7天
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
/**
|
||||
* 生成工单编号
|
||||
*
|
||||
* @param businessType 业务类型(如:CLEAN、SECURITY、FACILITIES)
|
||||
* @return 工单编号,格式:{业务类型}-{日期}-{4位序号}
|
||||
*/
|
||||
public String generate(String businessType) {
|
||||
if (businessType == null || businessType.isEmpty()) {
|
||||
throw new IllegalArgumentException("Business type cannot be null or empty");
|
||||
}
|
||||
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
|
||||
// Redis 自增并获取新值
|
||||
Long seq = stringRedisTemplate.opsForValue().increment(key);
|
||||
|
||||
// 首次创建时设置过期时间
|
||||
if (seq != null && seq == 1) {
|
||||
stringRedisTemplate.expire(key, DEFAULT_EXPIRE_DAYS, TimeUnit.DAYS);
|
||||
}
|
||||
|
||||
if (seq == null) {
|
||||
throw new RuntimeException("Failed to generate order code for business type: " + businessType);
|
||||
}
|
||||
|
||||
String orderCode = String.format("%s-%s-%04d", businessType, dateStr, seq);
|
||||
|
||||
log.debug("生成工单编号: businessType={}, orderCode={}", businessType, orderCode);
|
||||
|
||||
return orderCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定业务类型当天的当前序号
|
||||
*
|
||||
* @param businessType 业务类型
|
||||
* @return 当前序号,如果不存在返回0
|
||||
*/
|
||||
public long getCurrentSeq(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String value = stringRedisTemplate.opsForValue().get(key);
|
||||
return value == null ? 0 : Long.parseLong(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置指定业务类型当天的序号(仅供测试或特殊场景使用)
|
||||
*
|
||||
* @param businessType 业务类型
|
||||
*/
|
||||
public void reset(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
stringRedisTemplate.delete(key);
|
||||
log.warn("重置工单编号序号: businessType={}, date={}", businessType, dateStr);
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 工单编号生成器
|
||||
* <p>
|
||||
* 格式:{业务前缀}-{日期}-{序号}
|
||||
* 例如:CLEAN-20250119-0001, SECURITY-20250119-0001
|
||||
* <p>
|
||||
* 特性:
|
||||
* - 使用 Redis 保证序号唯一性
|
||||
* - 序号每日自动重置(按日期分 key)
|
||||
* - 不同业务类型独立计数
|
||||
* - 应用启动后首次使用时自动从数据库校准,之后纯 Redis 自增
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class OrderCodeGenerator {
|
||||
|
||||
private static final String KEY_PREFIX = "ops:order:code:";
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
private static final long DEFAULT_EXPIRE_DAYS = 7;
|
||||
|
||||
/**
|
||||
* 记录本次应用生命周期内已校准过的 key,避免重复查库。
|
||||
* <p>
|
||||
* ConcurrentHashMap: key = "CLEAN:20260413", value = dateStr(用于跨天清理)
|
||||
*/
|
||||
private final ConcurrentHashMap<String, String> calibratedKeys = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* Lua 脚本:原子性地确保 Redis 值 >= 数据库最大序号,然后自增
|
||||
*/
|
||||
private static final String INCR_WITH_FLOOR_SCRIPT =
|
||||
"local floor = tonumber(ARGV[1]) " +
|
||||
"local ttl = tonumber(ARGV[2]) " +
|
||||
"local current = tonumber(redis.call('GET', KEYS[1]) or '0') " +
|
||||
"if current < floor then " +
|
||||
" redis.call('SET', KEYS[1], tostring(floor)) " +
|
||||
"end " +
|
||||
"local seq = redis.call('INCR', KEYS[1]) " +
|
||||
"redis.call('EXPIRE', KEYS[1], ttl) " +
|
||||
"return seq";
|
||||
|
||||
private static final DefaultRedisScript<Long> REDIS_SCRIPT = new DefaultRedisScript<>(INCR_WITH_FLOOR_SCRIPT, Long.class);
|
||||
|
||||
@Resource
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Resource
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
/**
|
||||
* 生成工单编号
|
||||
*
|
||||
* @param businessType 业务类型(如:CLEAN、SECURITY、FACILITIES)
|
||||
* @return 工单编号,格式:{业务类型}-{日期}-{4位序号}
|
||||
*/
|
||||
public String generate(String businessType) {
|
||||
if (businessType == null || businessType.isEmpty()) {
|
||||
throw new IllegalArgumentException("Business type cannot be null or empty");
|
||||
}
|
||||
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String calibrationKey = businessType + ":" + dateStr;
|
||||
|
||||
Long seq;
|
||||
if (!dateStr.equals(calibratedKeys.get(calibrationKey))) {
|
||||
// 首次调用或跨天:查库校准 + 自增(Lua 原子操作)
|
||||
long dbMaxSeq = getMaxSeqFromDatabase(businessType, dateStr);
|
||||
long ttlSeconds = TimeUnit.DAYS.toSeconds(DEFAULT_EXPIRE_DAYS);
|
||||
seq = stringRedisTemplate.execute(REDIS_SCRIPT,
|
||||
Collections.singletonList(key),
|
||||
String.valueOf(dbMaxSeq),
|
||||
String.valueOf(ttlSeconds));
|
||||
// 校准成功后记录,同时清理旧日期的条目
|
||||
evictStaleEntries(dateStr);
|
||||
calibratedKeys.put(calibrationKey, dateStr);
|
||||
log.info("工单序号校准完成: key={}, dbMaxSeq={}, newSeq={}", key, dbMaxSeq, seq);
|
||||
} else {
|
||||
// 后续调用:纯 Redis 自增,无数据库开销
|
||||
seq = stringRedisTemplate.opsForValue().increment(key);
|
||||
}
|
||||
|
||||
if (seq == null) {
|
||||
throw new RuntimeException("Failed to generate order code for business type: " + businessType);
|
||||
}
|
||||
|
||||
String orderCode = String.format("%s-%s-%04d", businessType, dateStr, seq);
|
||||
log.debug("生成工单编号: businessType={}, orderCode={}", businessType, orderCode);
|
||||
return orderCode;
|
||||
}
|
||||
|
||||
/**
|
||||
* 从数据库查询当天该业务类型的最大序号
|
||||
*
|
||||
* @throws RuntimeException 首次校准失败时向上抛出,避免静默产生重复编号
|
||||
*/
|
||||
long getMaxSeqFromDatabase(String businessType, String dateStr) {
|
||||
// 转义 LIKE 通配符,防止 businessType 包含 % 或 _
|
||||
String safeType = businessType.replace("%", "\\%").replace("_", "\\_");
|
||||
String prefix = safeType + "-" + dateStr + "-";
|
||||
Integer maxSeq = jdbcTemplate.queryForObject(
|
||||
"SELECT MAX(CAST(SUBSTRING(order_code, ?) AS UNSIGNED)) FROM ops_order " +
|
||||
"WHERE order_code LIKE ? AND deleted = b'0'",
|
||||
Integer.class,
|
||||
prefix.length() + 1,
|
||||
prefix + "%"
|
||||
);
|
||||
return maxSeq != null ? maxSeq : 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* 清理非当天的校准记录,防止长期运行内存泄漏
|
||||
*/
|
||||
private void evictStaleEntries(String currentDateStr) {
|
||||
calibratedKeys.entrySet().removeIf(entry -> !currentDateStr.equals(entry.getValue()));
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取指定业务类型当天的当前序号
|
||||
*
|
||||
* @param businessType 业务类型
|
||||
* @return 当前序号,如果不存在返回0
|
||||
*/
|
||||
public long getCurrentSeq(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
String value = stringRedisTemplate.opsForValue().get(key);
|
||||
return value == null ? 0 : Long.parseLong(value);
|
||||
}
|
||||
|
||||
/**
|
||||
* 重置指定业务类型当天的序号(仅供测试或特殊场景使用)
|
||||
*
|
||||
* @param businessType 业务类型
|
||||
*/
|
||||
public void reset(String businessType) {
|
||||
String dateStr = LocalDate.now().format(DATE_FORMATTER);
|
||||
String key = KEY_PREFIX + businessType + ":" + dateStr;
|
||||
stringRedisTemplate.delete(key);
|
||||
calibratedKeys.remove(businessType + ":" + dateStr);
|
||||
log.warn("重置工单编号序号: businessType={}, date={}", businessType, dateStr);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,14 @@ public enum LogType {
|
||||
COMPLETE_SUPPRESSED_INVALID("COMPLETE_SUPPRESSED_INVALID", "作业时长不足抑制"),
|
||||
BEACON_COMPLETE_REQUESTED("BEACON_COMPLETE_REQUESTED", "信号丢失自动完成请求"),
|
||||
TTS_REQUEST("TTS_REQUEST", "语音播报请求"),
|
||||
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝");
|
||||
ARRIVE_REJECTED("ARRIVE_REJECTED", "到岗请求被拒绝"),
|
||||
|
||||
// ========== 状态机转换闭环审计 ==========
|
||||
|
||||
/** 状态转换尝试失败(状态机异常、handler 抛错等) */
|
||||
TRANSITION_FAILED("TRANSITION_FAILED", "状态转换失败"),
|
||||
/** 派发被 FOR UPDATE 拒绝(同 assignee 已有活跃工单) */
|
||||
DISPATCH_REJECTED("DISPATCH_REJECTED", "派发被拒绝");
|
||||
|
||||
private static final Map<String, LogType> CODE_MAP = new HashMap<>();
|
||||
|
||||
|
||||
@@ -25,6 +25,17 @@ public interface EventLogRecorder {
|
||||
*/
|
||||
void recordAsync(EventLogRecord record);
|
||||
|
||||
/**
|
||||
* 记录事件日志(同步)
|
||||
* <p>
|
||||
* 需要确保日志真正落库的场景使用(如 AFTER_COMMIT 审计、事务回滚场景补写)。
|
||||
* 调用方负责事务边界:本方法内部不开启事务,MyBatis 的 insert 会按当前线程的
|
||||
* 事务上下文执行;若无活跃事务则自动单条提交。
|
||||
*
|
||||
* @param record 日志记录
|
||||
*/
|
||||
void recordSync(EventLogRecord record);
|
||||
|
||||
// ==================== 便捷方法:按级别记录 ====================
|
||||
|
||||
/**
|
||||
|
||||
@@ -58,6 +58,7 @@ public class EventLogRecorderImpl implements EventLogRecorder {
|
||||
* <p>
|
||||
* 用于需要确认日志写入成功的场景(如测试、关键业务)
|
||||
*/
|
||||
@Override
|
||||
public void recordSync(EventLogRecord record) {
|
||||
doRecord(record);
|
||||
}
|
||||
|
||||
@@ -12,8 +12,11 @@ import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceBindReqVO;
|
||||
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceRelationRespVO;
|
||||
import com.viewsh.module.ops.dal.dataobject.vo.area.AreaDeviceUpdateReqVO;
|
||||
import com.viewsh.module.ops.enums.ErrorCodeConstants;
|
||||
import com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent;
|
||||
import com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
@@ -46,6 +49,9 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
@Resource
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Resource
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
private static final String TYPE_TRAFFIC_COUNTER = "TRAFFIC_COUNTER";
|
||||
private static final String TYPE_BEACON = "BEACON";
|
||||
private static final String TYPE_BADGE = "BADGE";
|
||||
@@ -116,6 +122,16 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
// 清除可能存在的 NULL_CACHE 标记
|
||||
areaDeviceService.evictConfigCache(relation.getAreaId(), relation.getRelationType());
|
||||
|
||||
// 发布绑定事件
|
||||
// 用途:BADGE 绑定前的实时上线事件会被丢弃(无 BADGE 关系),
|
||||
// 条线监听器订阅此事件后可立即从 IoT 拉取当前状态,回填 Redis 工牌缓存,
|
||||
// 避免新绑定的设备直到下次 5/30 分钟对账才能被派单或显示在"可分配工牌"列表。
|
||||
eventPublisher.publishEvent(AreaDeviceBoundEvent.builder()
|
||||
.areaId(relation.getAreaId())
|
||||
.deviceId(relation.getDeviceId())
|
||||
.relationType(relation.getRelationType())
|
||||
.build());
|
||||
|
||||
return relation.getId();
|
||||
}
|
||||
|
||||
@@ -158,6 +174,15 @@ public class AreaDeviceRelationServiceImpl implements AreaDeviceRelationService
|
||||
if (deleted) {
|
||||
// 同步 Redis 缓存
|
||||
areaDeviceService.evictConfigCache(existing.getAreaId(), existing.getRelationType());
|
||||
|
||||
// 发布解绑事件,与绑定路径形成闭环
|
||||
// 用途:解绑后 SyncJob 不再扫到该设备,BADGE 类型 Redis 缓存得等 24h TTL 才过期,
|
||||
// 期间设备仍可能出现在"可分配工牌"列表里。条线监听器收到事件立即清理 Redis。
|
||||
eventPublisher.publishEvent(AreaDeviceUnboundEvent.builder()
|
||||
.areaId(existing.getAreaId())
|
||||
.deviceId(existing.getDeviceId())
|
||||
.relationType(existing.getRelationType())
|
||||
.build());
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
package com.viewsh.module.ops.service.area.event;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 区域-设备绑定完成事件
|
||||
* <p>
|
||||
* 在 {@code AreaDeviceRelationService.bindDevice()} 成功插入关系记录后发布。
|
||||
* <p>
|
||||
* 业务背景:BADGE 关系建立前,IoT 实时上线事件会因 {@code BadgeDeviceStatusEventHandler.isBadgeDevice()}
|
||||
* 返回 false 而被丢弃;建立关系后没有任何机制回填 Redis,需等定时对账 Job 才能恢复,
|
||||
* 表现为 "可分配工牌列表" 不出现新绑定的设备、新工单也不会派给它。
|
||||
* 监听方(条线模块)通过订阅本事件完成一次定向状态同步。
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class AreaDeviceBoundEvent {
|
||||
|
||||
/** 区域ID */
|
||||
private Long areaId;
|
||||
|
||||
/** 设备ID */
|
||||
private Long deviceId;
|
||||
|
||||
/** 关联类型:TRAFFIC_COUNTER / BEACON / BADGE */
|
||||
private String relationType;
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package com.viewsh.module.ops.service.area.event;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 区域-设备解绑完成事件
|
||||
* <p>
|
||||
* 在 {@code AreaDeviceRelationService.unbindDevice()} 成功删除关系记录后发布。
|
||||
* <p>
|
||||
* 业务背景:BADGE 解绑后 SyncJob 不再扫到该设备,Redis 工牌缓存等 24h TTL 才过期,
|
||||
* 期间该设备仍可能出现在"可分配/活跃工牌"列表里。条线监听器订阅本事件后立即清理 Redis,
|
||||
* 与 {@link AreaDeviceBoundEvent} 的回填路径形成闭环。
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class AreaDeviceUnboundEvent {
|
||||
|
||||
/** 区域ID */
|
||||
private Long areaId;
|
||||
|
||||
/** 设备ID */
|
||||
private Long deviceId;
|
||||
|
||||
/** 关联类型:TRAFFIC_COUNTER / BEACON / BADGE */
|
||||
private String relationType;
|
||||
}
|
||||
@@ -115,6 +115,9 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
// TODO: 触发紧急派单流程(在派单引擎中实现)
|
||||
}
|
||||
|
||||
// 5. 事务提交后按全局楼层重排一次:新入队工单立即按楼层差参与排序,不等下一次 rebuild
|
||||
triggerQueueRebuildAfterCommit(userId, null);
|
||||
|
||||
return queueDO.getId();
|
||||
}
|
||||
|
||||
@@ -467,7 +470,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
}
|
||||
|
||||
// 2. Redis 未命中,从 MySQL 获取并同步到 Redis
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectListByUserId(userId);
|
||||
// 只同步活跃态(WAITING/PROCESSING/PAUSED),排除 REMOVED 历史记录,避免 Redis 膨胀
|
||||
List<OpsOrderQueueDO> mysqlList = orderQueueMapper.selectActiveListByUserId(userId);
|
||||
if (mysqlList != null && !mysqlList.isEmpty()) {
|
||||
// 同步到 Redis
|
||||
List<OrderQueueDTO> dtoList = convertToDTO(mysqlList);
|
||||
@@ -511,10 +515,31 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
Integer baseFloorNo = resolveFloorNo(baselineAreaId);
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
|
||||
// 批量装载 orders + areas,消除 N+1:100 条 WAITING 从 200 次 SELECT 降为 2 次。
|
||||
List<Long> orderIds = waitingQueues.stream()
|
||||
.map(OpsOrderQueueDO::getOpsOrderId)
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
Map<Long, Long> orderIdToAreaId = orderIds.isEmpty()
|
||||
? Collections.emptyMap()
|
||||
: orderMapper.selectBatchIds(orderIds).stream()
|
||||
.filter(o -> o.getAreaId() != null)
|
||||
.collect(Collectors.toMap(OpsOrderDO::getId, OpsOrderDO::getAreaId,
|
||||
(a, b) -> a));
|
||||
List<Long> areaIds = orderIdToAreaId.values().stream().distinct().collect(Collectors.toList());
|
||||
Map<Long, Integer> areaIdToFloorNo = areaIds.isEmpty()
|
||||
? Collections.emptyMap()
|
||||
: areaMapper.selectBatchIds(areaIds).stream()
|
||||
.filter(a -> a.getFloorNo() != null)
|
||||
.collect(Collectors.toMap(OpsBusAreaDO::getId, OpsBusAreaDO::getFloorNo,
|
||||
(a, b) -> a));
|
||||
|
||||
List<OrderQueueDTO> rebuiltTasks = new ArrayList<>(waitingQueues.size());
|
||||
for (OpsOrderQueueDO queueDO : waitingQueues) {
|
||||
OrderQueueDTO dto = convertToDTO(queueDO);
|
||||
Integer targetFloorNo = resolveFloorNo(resolveOrderAreaId(queueDO.getOpsOrderId()));
|
||||
Long targetAreaId = orderIdToAreaId.get(queueDO.getOpsOrderId());
|
||||
Integer targetFloorNo = targetAreaId != null ? areaIdToFloorNo.get(targetAreaId) : null;
|
||||
QueueScoreResult result = queueScoreCalculator.calculate(QueueScoreContext.builder()
|
||||
.priority(queueDO.getPriority())
|
||||
.baseFloorNo(baseFloorNo)
|
||||
@@ -739,7 +764,17 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析楼层基准区域(三级兜底)
|
||||
* <ol>
|
||||
* <li>当前 PROCESSING 工单的区域——表示“正在做的楼层”</li>
|
||||
* <li>最近 24 小时内已完成工单的区域——投射保洁员最近的物理位置</li>
|
||||
* <li>调用方显式传入的 {@code fallbackAreaId}(如 autoDispatchNext 传的 completedOrder.areaId)</li>
|
||||
* </ol>
|
||||
* 都未命中则返回 null,本次排序降级为无楼层模式。
|
||||
*/
|
||||
private Long resolveBaselineAreaId(Long userId, Long fallbackAreaId) {
|
||||
// 一级:当前正在执行的工单
|
||||
OpsOrderQueueDO processingQueue = orderQueueMapper.selectCurrentExecutingByUserId(userId);
|
||||
if (processingQueue != null) {
|
||||
Long processingAreaId = resolveOrderAreaId(processingQueue.getOpsOrderId());
|
||||
@@ -747,6 +782,13 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
return processingAreaId;
|
||||
}
|
||||
}
|
||||
// 二级:最近 24 小时内的已完成工单,推断保洁员当前物理位置
|
||||
Long recentAreaId = orderMapper.selectLatestCompletedAreaIdByAssignee(
|
||||
userId, LocalDateTime.now().minusHours(24));
|
||||
if (recentAreaId != null) {
|
||||
return recentAreaId;
|
||||
}
|
||||
// 三级:调用方提示的区域(可为 null)
|
||||
return fallbackAreaId;
|
||||
}
|
||||
|
||||
@@ -764,7 +806,8 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
}
|
||||
|
||||
private void syncUserQueueToRedis(Long userId, List<OrderQueueDTO> rebuiltWaitingTasks) {
|
||||
List<OpsOrderQueueDO> queues = orderQueueMapper.selectListByUserId(userId);
|
||||
// 只同步活跃态(WAITING/PROCESSING/PAUSED),避免把历史 REMOVED 记录回写 Redis ZSet/Hash
|
||||
List<OpsOrderQueueDO> queues = orderQueueMapper.selectActiveListByUserId(userId);
|
||||
if (queues == null || queues.isEmpty()) {
|
||||
redisQueueService.clearQueue(userId);
|
||||
return;
|
||||
@@ -794,6 +837,21 @@ public class OrderQueueServiceEnhanced implements OrderQueueService {
|
||||
redisQueueService.batchEnqueue(queueDTOs);
|
||||
}
|
||||
|
||||
/**
|
||||
* 在当前事务提交后触发一次等待队列重算。
|
||||
* <p>
|
||||
* <b>事务边界说明</b>:本方法在 afterCommit 阶段(即外层事务已提交)自调用
|
||||
* {@link #rebuildWaitingTasksByUserId(Long, Long)},此时:
|
||||
* <ul>
|
||||
* <li>当前线程不在任何事务中(主事务刚提交完)</li>
|
||||
* <li>自调用绕过 Spring 代理,rebuild 方法上的 @Transactional 不生效</li>
|
||||
* <li>实际运行在 auto-commit 模式:每个 updateById 独立提交</li>
|
||||
* </ul>
|
||||
* <b>后果</b>:rebuild 中途抛异常时 MySQL 可能半更新、Redis 可能部分写入,
|
||||
* 不强一致但最终一致——下一次 enqueue 会再触发一次完整 rebuild 自愈。
|
||||
* 对“队列排序”这类可重放数据可以接受;若未来改为影响 MySQL 外表的写入,
|
||||
* 需要把 rebuild 抽到独立 bean,用代理调用走新事务。
|
||||
*/
|
||||
private void triggerQueueRebuildAfterCommit(Long userId, Long fallbackAreaId) {
|
||||
Runnable rebuildAction = () -> {
|
||||
try {
|
||||
|
||||
@@ -9,7 +9,11 @@ import java.time.LocalDateTime;
|
||||
public class QueueScoreCalculator {
|
||||
|
||||
static final int PRIORITY_WEIGHT = 1500;
|
||||
static final int FLOOR_WEIGHT = 60;
|
||||
/**
|
||||
* 楼层差权重。10 层封顶 × 100 = 1000,大于 aging 上限 720,实现"强楼层优先":
|
||||
* 等满 4 小时(aging 上限)的任务也不会反超更近楼层的同优先级任务。
|
||||
*/
|
||||
static final int FLOOR_WEIGHT = 100;
|
||||
static final int AGING_WEIGHT = 3;
|
||||
static final int MAX_FLOOR_DIFF = 10;
|
||||
static final int MAX_AGING_MINUTES = 240;
|
||||
@@ -22,11 +26,11 @@ public class QueueScoreCalculator {
|
||||
Integer targetFloorNo = context.getTargetFloorNo();
|
||||
Integer floorDiff = null;
|
||||
int floorDiffScore = 0;
|
||||
// 语义对称:只要 baseFloor 或 targetFloor 任一缺失,就视为"信息不足",不参与楼层排序(score=0)。
|
||||
// 旧逻辑会在"有 base 无 target"时打 +600 罚分,导致同一工单在保洁员忙碌/空闲时排序不单调。
|
||||
if (baseFloorNo != null && targetFloorNo != null) {
|
||||
floorDiff = Math.abs(targetFloorNo - baseFloorNo);
|
||||
floorDiffScore = Math.min(floorDiff, MAX_FLOOR_DIFF) * FLOOR_WEIGHT;
|
||||
} else if (baseFloorNo != null) {
|
||||
floorDiffScore = MAX_FLOOR_DIFF * FLOOR_WEIGHT;
|
||||
}
|
||||
|
||||
long waitMinutes = 0;
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
package com.viewsh.module.ops.core.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchPath;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
||||
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* 验证 Bug #2:FOR UPDATE 并发冲突降级路径。
|
||||
* <p>
|
||||
* 背景:OrderLifecycleManager.dispatch 入口加了 selectActiveByAssigneeForUpdate 行锁,
|
||||
* 命中时返 {@link TransitionErrorCode#ASSIGNEE_HAS_ACTIVE_ORDER}。
|
||||
* DispatchEngine 需按工单当前状态分支处理:
|
||||
* <ul>
|
||||
* <li>PENDING(从未入队)→ 直接降级为入队,避免悬空</li>
|
||||
* <li>QUEUED(已在队列)→ 保留排队,不做重复入队</li>
|
||||
* <li>其他错误码(如 INVALID_TRANSITION)→ 硬失败,不走降级</li>
|
||||
* </ul>
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DispatchEngineConflictFallbackTest {
|
||||
|
||||
@Mock
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
@Mock
|
||||
private OrderQueueService orderQueueService;
|
||||
@Mock
|
||||
private OpsOrderMapper orderMapper;
|
||||
@Mock
|
||||
private AssignStrategy assignStrategy;
|
||||
@Mock
|
||||
private ScheduleStrategy scheduleStrategy;
|
||||
|
||||
@InjectMocks
|
||||
private DispatchEngineImpl dispatchEngine;
|
||||
|
||||
private static final String CLEAN = "CLEAN";
|
||||
private static final Long ASSIGNEE_ID = 31L;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy);
|
||||
dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
void directDispatch_onConflict_whenOrderIsPending_shouldDowngradeToEnqueue() {
|
||||
// PENDING 工单派发被拒 → 降级 executeEnqueueOnly,避免工单悬空
|
||||
Long orderId = 400L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
stubHappyPathUntilDispatch(orderId);
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.fail(orderId,
|
||||
"同执行人已有活跃工单 999",
|
||||
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER));
|
||||
when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(orderId)
|
||||
.status(WorkOrderStatusEnum.PENDING.getStatus())
|
||||
.build());
|
||||
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.success(orderId,
|
||||
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, 6000L));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
|
||||
assertEquals(6000L, result.getQueueId());
|
||||
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void directDispatch_onConflict_whenOrderAlreadyQueued_shouldKeepInQueue() {
|
||||
// QUEUED 工单(从队列中被拉出派发)再次被拒 → 不重复入队,继续留在队列等下一轮
|
||||
Long orderId = 401L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
stubHappyPathUntilDispatch(orderId);
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.fail(orderId,
|
||||
"同执行人已有活跃工单 998",
|
||||
TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER));
|
||||
when(orderMapper.selectById(orderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(orderId)
|
||||
.status(WorkOrderStatusEnum.QUEUED.getStatus())
|
||||
.build());
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertFalse(result.isSuccess());
|
||||
assertTrue(result.getMessage().contains("已留在队列等待"),
|
||||
"冲突信息应说明工单已留在队列,实际: " + result.getMessage());
|
||||
// 关键断言:不能再调一次 enqueue,否则队列里会出现两条记录
|
||||
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void directDispatch_onGeneralFailure_shouldNotDowngrade() {
|
||||
// 非并发冲突的失败(例如非法状态转换)不走降级路径,且不查 selectById
|
||||
Long orderId = 402L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
stubHappyPathUntilDispatch(orderId);
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.fail(orderId,
|
||||
"非法状态转换",
|
||||
TransitionErrorCode.INVALID_TRANSITION));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertFalse(result.isSuccess());
|
||||
assertTrue(result.getMessage().contains("直接派单失败"),
|
||||
"一般失败应归类为直接派单失败,实际: " + result.getMessage());
|
||||
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
|
||||
// 非冲突错误不应触发工单状态复核
|
||||
verify(orderMapper, never()).selectById(orderId);
|
||||
}
|
||||
|
||||
// ==================== Helpers ====================
|
||||
|
||||
private OrderDispatchContext baseContext(Long orderId) {
|
||||
return OrderDispatchContext.builder()
|
||||
.orderId(orderId)
|
||||
.orderCode("WO-TEST-" + orderId)
|
||||
.businessType(CLEAN)
|
||||
.areaId(501L)
|
||||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* 装配 dispatch 路径上到 orderLifecycleManager.dispatch() 之前的全部 stub:
|
||||
* 策略推荐成功 + 决策为 DIRECT_DISPATCH + 兜底查询 MySQL 为空闲。
|
||||
* 留给测试自己控制 orderLifecycleManager.dispatch 的返回。
|
||||
*/
|
||||
private void stubHappyPathUntilDispatch(Long orderId) {
|
||||
when(assignStrategy.recommend(any())).thenReturn(
|
||||
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
|
||||
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
|
||||
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of());
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,264 @@
|
||||
package com.viewsh.module.ops.core.dispatch;
|
||||
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueDTO;
|
||||
import com.viewsh.module.ops.api.queue.OrderQueueService;
|
||||
import com.viewsh.module.ops.core.dispatch.model.AssigneeRecommendation;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchDecision;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchPath;
|
||||
import com.viewsh.module.ops.core.dispatch.model.DispatchResult;
|
||||
import com.viewsh.module.ops.core.dispatch.model.OrderDispatchContext;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.AssignStrategy;
|
||||
import com.viewsh.module.ops.core.dispatch.strategy.ScheduleStrategy;
|
||||
import com.viewsh.module.ops.core.lifecycle.OrderLifecycleManager;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionRequest;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.OrderTransitionResult;
|
||||
import com.viewsh.module.ops.dal.dataobject.workorder.OpsOrderDO;
|
||||
import com.viewsh.module.ops.dal.mysql.workorder.OpsOrderMapper;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* 验证 Bug #1(autoDispatchNext 空闲兜底)+ Bug #4(executeDispatch 前置检查)。
|
||||
* <p>
|
||||
* 产线事故:管理员 cancel 一个僵尸 DISPATCHED 单 → handleCancelled → autoDispatchNext,
|
||||
* 若不校验活跃态,就会在同一设备上派发新单、旧单不死,最终 0002=CONFIRMED + 0003=DISPATCHED 并存。
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DispatchEngineIdleCheckTest {
|
||||
|
||||
@Mock
|
||||
private OrderLifecycleManager orderLifecycleManager;
|
||||
@Mock
|
||||
private OrderQueueService orderQueueService;
|
||||
@Mock
|
||||
private OpsOrderMapper orderMapper;
|
||||
@Mock
|
||||
private AssignStrategy assignStrategy;
|
||||
@Mock
|
||||
private ScheduleStrategy scheduleStrategy;
|
||||
|
||||
@InjectMocks
|
||||
private DispatchEngineImpl dispatchEngine;
|
||||
|
||||
private static final String CLEAN = "CLEAN";
|
||||
private static final Long ASSIGNEE_ID = 31L;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
dispatchEngine.registerAssignStrategy(CLEAN, assignStrategy);
|
||||
dispatchEngine.registerScheduleStrategy(CLEAN, scheduleStrategy);
|
||||
}
|
||||
|
||||
@Test
|
||||
void autoDispatchNext_shouldSkip_whenAssigneeStillHasActiveOrder() {
|
||||
// 场景:completedOrderId=100 刚被 cancel,但设备 31 名下还挂着 200L CONFIRMED 单
|
||||
Long completedOrderId = 100L;
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId))
|
||||
.thenReturn(List.of(OpsOrderDO.builder()
|
||||
.id(200L)
|
||||
.status(WorkOrderStatusEnum.CONFIRMED.getStatus())
|
||||
.build()));
|
||||
|
||||
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals("执行人非空闲,跳过派发", result.getMessage());
|
||||
assertEquals(ASSIGNEE_ID, result.getAssigneeId());
|
||||
// 不应触发后续队列重排和派发
|
||||
verifyNoInteractions(orderQueueService);
|
||||
verifyNoInteractions(orderLifecycleManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
void autoDispatchNext_shouldSkip_whenAssigneeHasPausedOrder() {
|
||||
// PAUSED 也视为"仍有任务",不能派发新单(否则 PAUSED 恢复回来就和新单冲突)
|
||||
Long completedOrderId = 101L;
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId))
|
||||
.thenReturn(List.of(OpsOrderDO.builder()
|
||||
.id(201L)
|
||||
.status(WorkOrderStatusEnum.PAUSED.getStatus())
|
||||
.build()));
|
||||
|
||||
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals("执行人非空闲,跳过派发", result.getMessage());
|
||||
verifyNoInteractions(orderLifecycleManager);
|
||||
}
|
||||
|
||||
@Test
|
||||
void autoDispatchNext_shouldReturnEarly_whenAssigneeIdIsNull() {
|
||||
// 入参校验:assigneeId 空直接返回,不查活跃态
|
||||
DispatchResult result = dispatchEngine.autoDispatchNext(100L, null);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals("缺少执行人,跳过派发", result.getMessage());
|
||||
verifyNoInteractions(orderMapper);
|
||||
verifyNoInteractions(orderQueueService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void executeDispatch_shouldDowngradeDirectDispatchToEnqueue_whenMysqlShowsActive() {
|
||||
// Bug #4:Redis 说设备空闲,但 MySQL 仍有活跃态 → 兜底把 DIRECT_DISPATCH 降级为 ENQUEUE_ONLY
|
||||
Long orderId = 300L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
when(assignStrategy.recommend(any())).thenReturn(
|
||||
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
|
||||
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
|
||||
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId))
|
||||
.thenReturn(List.of(OpsOrderDO.builder()
|
||||
.id(999L)
|
||||
.status(WorkOrderStatusEnum.ARRIVED.getStatus())
|
||||
.build()));
|
||||
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(successEnqueue(orderId, 5000L));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
|
||||
assertEquals(5000L, result.getQueueId());
|
||||
verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class));
|
||||
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void executeDispatch_shouldDowngradePushAndEnqueue_whenMysqlShowsActive() {
|
||||
// PUSH_AND_ENQUEUE 路径同样要兜底:本应"推送旧队首 + 新单入队",
|
||||
// 但旧队首已活跃,推送会触发 FOR UPDATE 冲突,所以直接降级为 ENQUEUE_ONLY
|
||||
Long orderId = 301L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
when(assignStrategy.recommend(any())).thenReturn(
|
||||
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
|
||||
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.pushAndEnqueue());
|
||||
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId))
|
||||
.thenReturn(List.of(OpsOrderDO.builder()
|
||||
.id(998L)
|
||||
.status(WorkOrderStatusEnum.DISPATCHED.getStatus())
|
||||
.build()));
|
||||
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(successEnqueue(orderId, 5001L));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
|
||||
verify(orderLifecycleManager, never()).dispatch(any(OrderTransitionRequest.class));
|
||||
verify(orderLifecycleManager).enqueue(any(OrderTransitionRequest.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
void executeDispatch_shouldNotQueryMysql_whenPathIsEnqueueOnly() {
|
||||
// ENQUEUE_ONLY 本来就不推送,无需兜底查询——避免给每一次入队都叠加一次 SQL 开销
|
||||
Long orderId = 302L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
when(assignStrategy.recommend(any())).thenReturn(
|
||||
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
|
||||
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.enqueueOnly());
|
||||
when(orderLifecycleManager.enqueue(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(successEnqueue(orderId, 5002L));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals(DispatchPath.ENQUEUE_ONLY, result.getPath());
|
||||
// 关键:ENQUEUE_ONLY 不应触发兜底查询
|
||||
verify(orderMapper, never()).selectActiveByAssignee(any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void executeDispatch_shouldProceedDirectDispatch_whenMysqlConfirmsIdle() {
|
||||
// 反向用例:MySQL 也确认空闲 → 正常走 DIRECT_DISPATCH,不降级
|
||||
Long orderId = 303L;
|
||||
OrderDispatchContext context = baseContext(orderId);
|
||||
|
||||
when(assignStrategy.recommend(any())).thenReturn(
|
||||
AssigneeRecommendation.of(ASSIGNEE_ID, "工牌31", 80, "区域最近"));
|
||||
when(scheduleStrategy.decide(any())).thenReturn(DispatchDecision.directDispatch());
|
||||
when(orderQueueService.getTasksByUserId(ASSIGNEE_ID)).thenReturn(List.of());
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, orderId)).thenReturn(List.of());
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.success(orderId,
|
||||
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.DISPATCHED));
|
||||
|
||||
DispatchResult result = dispatchEngine.dispatch(context);
|
||||
|
||||
assertFalse(!result.isSuccess()); // 确认成功
|
||||
assertEquals(DispatchPath.DIRECT_DISPATCH, result.getPath());
|
||||
verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class));
|
||||
verify(orderLifecycleManager, never()).enqueue(any(OrderTransitionRequest.class));
|
||||
}
|
||||
|
||||
// ==================== Helpers ====================
|
||||
|
||||
private OrderDispatchContext baseContext(Long orderId) {
|
||||
return OrderDispatchContext.builder()
|
||||
.orderId(orderId)
|
||||
.orderCode("WO-TEST-" + orderId)
|
||||
.businessType(CLEAN)
|
||||
.areaId(501L)
|
||||
.build();
|
||||
}
|
||||
|
||||
private OrderTransitionResult successEnqueue(Long orderId, Long queueId) {
|
||||
return OrderTransitionResult.success(orderId,
|
||||
WorkOrderStatusEnum.PENDING, WorkOrderStatusEnum.QUEUED, queueId);
|
||||
}
|
||||
|
||||
@Test
|
||||
void autoDispatchNext_whenDispatchingFromQueue_shouldGoThroughDispatchNotTransition() {
|
||||
// 锁死 P1 修复:从队列派发必须走 dispatch(),以继承 Bug #2 的 FOR UPDATE 串行化防线。
|
||||
// 如果未来有人改回 transition(),本测试会红:autoDispatchNext 绕过 FOR UPDATE 的漏洞就回来了。
|
||||
Long completedOrderId = 700L;
|
||||
Long waitingOrderId = 701L;
|
||||
Long queueId = 800L;
|
||||
|
||||
when(orderMapper.selectActiveByAssignee(ASSIGNEE_ID, completedOrderId)).thenReturn(List.of());
|
||||
when(orderMapper.selectById(completedOrderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(completedOrderId).areaId(501L).build());
|
||||
OrderQueueDTO waitingDTO = new OrderQueueDTO();
|
||||
waitingDTO.setId(queueId);
|
||||
waitingDTO.setOpsOrderId(waitingOrderId);
|
||||
waitingDTO.setQueueScore(1000.0);
|
||||
waitingDTO.setFloorDiff(1);
|
||||
waitingDTO.setWaitMinutes(2L);
|
||||
when(orderQueueService.rebuildWaitingTasksByUserId(ASSIGNEE_ID, 501L))
|
||||
.thenReturn(List.of(waitingDTO));
|
||||
when(orderMapper.selectById(waitingOrderId)).thenReturn(OpsOrderDO.builder()
|
||||
.id(waitingOrderId)
|
||||
.status(WorkOrderStatusEnum.QUEUED.getStatus())
|
||||
.build());
|
||||
when(orderLifecycleManager.dispatch(any(OrderTransitionRequest.class)))
|
||||
.thenReturn(OrderTransitionResult.success(waitingOrderId,
|
||||
WorkOrderStatusEnum.QUEUED, WorkOrderStatusEnum.DISPATCHED));
|
||||
|
||||
DispatchResult result = dispatchEngine.autoDispatchNext(completedOrderId, ASSIGNEE_ID);
|
||||
|
||||
assertTrue(result.isSuccess());
|
||||
assertEquals("已按队列总分派发下一单", result.getMessage());
|
||||
// 关键断言:必须调 dispatch()(带 FOR UPDATE)而不是 transition()(裸责任链)
|
||||
verify(orderLifecycleManager).dispatch(any(OrderTransitionRequest.class));
|
||||
verify(orderLifecycleManager, never()).transition(any(OrderTransitionRequest.class));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,211 @@
|
||||
package com.viewsh.module.ops.core.lifecycle.audit;
|
||||
|
||||
import com.viewsh.module.ops.core.event.OrderTransitionAttemptedEvent;
|
||||
import com.viewsh.module.ops.core.lifecycle.model.TransitionErrorCode;
|
||||
import com.viewsh.module.ops.enums.OperatorTypeEnum;
|
||||
import com.viewsh.module.ops.enums.WorkOrderStatusEnum;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventDomain;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.EventLevel;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogModule;
|
||||
import com.viewsh.module.ops.infrastructure.log.enumeration.LogType;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecord;
|
||||
import com.viewsh.module.ops.infrastructure.log.recorder.EventLogRecorder;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
||||
/**
|
||||
* 验证 Bug #7:状态转换审计闭环。
|
||||
* <p>
|
||||
* 三条路径:
|
||||
* <ol>
|
||||
* <li>AFTER_COMMIT + 成功 → INFO + 业务 LogType(如 ORDER_DISPATCHED)</li>
|
||||
* <li>AFTER_COMMIT + 并发冲突失败 → WARN + DISPATCH_REJECTED</li>
|
||||
* <li>AFTER_COMMIT + 一般失败 → ERROR + TRANSITION_FAILED</li>
|
||||
* <li>AFTER_ROLLBACK → 无论事件声明 success 与否都视为失败(事务已回滚),独立事务补写</li>
|
||||
* </ol>
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class OrderTransitionAuditListenerTest {
|
||||
|
||||
@Mock
|
||||
private EventLogRecorder eventLogRecorder;
|
||||
|
||||
@InjectMocks
|
||||
private OrderTransitionAuditListener listener;
|
||||
|
||||
@Test
|
||||
void onAfterCommit_success_shouldRecordInfoWithBusinessEventType() {
|
||||
OrderTransitionAttemptedEvent event = successEvent(100L, WorkOrderStatusEnum.QUEUED,
|
||||
WorkOrderStatusEnum.DISPATCHED);
|
||||
|
||||
listener.onAfterCommit(event);
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
assertEquals(EventLevel.INFO, rec.getLevel());
|
||||
assertEquals(EventDomain.DISPATCH, rec.getDomain());
|
||||
assertEquals(LogType.ORDER_DISPATCHED.getCode(), rec.getEventType());
|
||||
assertEquals(LogModule.CLEAN, rec.getModule());
|
||||
assertEquals(100L, rec.getTargetId());
|
||||
assertEquals("order", rec.getTargetType());
|
||||
assertEquals(Boolean.TRUE, rec.getPayload().get("success"));
|
||||
assertEquals(Boolean.FALSE, rec.getPayload().get("rolledBack"));
|
||||
assertEquals(WorkOrderStatusEnum.QUEUED.getStatus(), rec.getPayload().get("fromStatus"));
|
||||
assertEquals(WorkOrderStatusEnum.DISPATCHED.getStatus(), rec.getPayload().get("targetStatus"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void onAfterCommit_success_shouldMapAllBusinessStatusesToLogType() {
|
||||
// 验证关键目标状态都能映射到对应 LogType(避免回归导致全映射到 SYSTEM_EVENT)
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.QUEUED, LogType.ORDER_QUEUED);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.DISPATCHED, LogType.ORDER_DISPATCHED);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.CONFIRMED, LogType.ORDER_CONFIRM);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.ARRIVED, LogType.ORDER_ARRIVED);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.PAUSED, LogType.ORDER_PAUSED);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.COMPLETED, LogType.ORDER_COMPLETED);
|
||||
assertLogTypeForTarget(WorkOrderStatusEnum.CANCELLED, LogType.ORDER_CANCELLED);
|
||||
}
|
||||
|
||||
@Test
|
||||
void onAfterCommit_forUpdateRejected_shouldRecordWarnWithDispatchRejected() {
|
||||
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(200L)
|
||||
.orderType("CLEAN")
|
||||
.fromStatus(WorkOrderStatusEnum.PENDING)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.assigneeId(31L)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.success(false)
|
||||
.errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)
|
||||
.errorMessage("同执行人已有活跃工单 999")
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
|
||||
listener.onAfterCommit(event);
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
// 并发冲突只是业务层拒绝,不是系统异常,所以是 WARN 而不是 ERROR
|
||||
assertEquals(EventLevel.WARN, rec.getLevel());
|
||||
assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType());
|
||||
assertEquals("ASSIGNEE_HAS_ACTIVE_ORDER", rec.getPayload().get("errorCode"));
|
||||
assertEquals("同执行人已有活跃工单 999", rec.getPayload().get("errorMessage"));
|
||||
assertTrue(rec.getMessage().contains("状态转换失败"),
|
||||
"消息应标明转换失败,实际: " + rec.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
void onAfterCommit_generalFailure_shouldRecordErrorWithTransitionFailed() {
|
||||
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(300L)
|
||||
.orderType("SECURITY")
|
||||
.fromStatus(WorkOrderStatusEnum.PENDING)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.success(false)
|
||||
.errorCode(TransitionErrorCode.INVALID_TRANSITION)
|
||||
.errorMessage("PENDING → ARRIVED 非法")
|
||||
.causeSummary("IllegalStateException: PENDING → ARRIVED 非法")
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
|
||||
listener.onAfterCommit(event);
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
assertEquals(EventLevel.ERROR, rec.getLevel());
|
||||
assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType());
|
||||
assertEquals(LogModule.SECURITY, rec.getModule());
|
||||
assertEquals("INVALID_TRANSITION", rec.getPayload().get("errorCode"));
|
||||
assertEquals("IllegalStateException: PENDING → ARRIVED 非法",
|
||||
rec.getPayload().get("cause"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRollbackAudit_evenIfEventClaimsSuccess_shouldForceFailure() {
|
||||
// 即便发布时声明 success=true,事务 rollback 就是没真正落库,必须按失败记录
|
||||
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(400L)
|
||||
.orderType("CLEAN")
|
||||
.fromStatus(WorkOrderStatusEnum.QUEUED)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.success(true) // 发布时乐观声明
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
|
||||
listener.writeRollbackAudit(event);
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
assertEquals(EventLevel.ERROR, rec.getLevel());
|
||||
assertEquals(LogType.TRANSITION_FAILED.getCode(), rec.getEventType());
|
||||
assertEquals(Boolean.TRUE, rec.getPayload().get("rolledBack"));
|
||||
assertEquals(Boolean.FALSE, rec.getPayload().get("success"));
|
||||
assertTrue(rec.getMessage().contains("状态转换回滚"),
|
||||
"回滚消息应明确标注,实际: " + rec.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
void writeRollbackAudit_withForUpdateRejected_stillMapsToDispatchRejected() {
|
||||
// 回滚 + 冲突错误码 → 依然归类为 DISPATCH_REJECTED(而不是 TRANSITION_FAILED)
|
||||
OrderTransitionAttemptedEvent event = OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(500L)
|
||||
.orderType("CLEAN")
|
||||
.fromStatus(WorkOrderStatusEnum.PENDING)
|
||||
.targetStatus(WorkOrderStatusEnum.DISPATCHED)
|
||||
.success(false)
|
||||
.errorCode(TransitionErrorCode.ASSIGNEE_HAS_ACTIVE_ORDER)
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
|
||||
listener.writeRollbackAudit(event);
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
// 冲突型错误即便回滚也应是 WARN + DISPATCH_REJECTED,方便运维过滤
|
||||
assertEquals(EventLevel.WARN, rec.getLevel());
|
||||
assertEquals(LogType.DISPATCH_REJECTED.getCode(), rec.getEventType());
|
||||
assertFalse((Boolean) rec.getPayload().get("success"));
|
||||
}
|
||||
|
||||
// ==================== Helpers ====================
|
||||
|
||||
private OrderTransitionAttemptedEvent successEvent(Long orderId,
|
||||
WorkOrderStatusEnum from,
|
||||
WorkOrderStatusEnum to) {
|
||||
return OrderTransitionAttemptedEvent.builder()
|
||||
.orderId(orderId)
|
||||
.orderType("CLEAN")
|
||||
.orderCode("WO-" + orderId)
|
||||
.fromStatus(from)
|
||||
.targetStatus(to)
|
||||
.assigneeId(31L)
|
||||
.operatorType(OperatorTypeEnum.SYSTEM)
|
||||
.operatorId(31L)
|
||||
.reason("test")
|
||||
.success(true)
|
||||
.attemptedAt(LocalDateTime.now())
|
||||
.build();
|
||||
}
|
||||
|
||||
private void assertLogTypeForTarget(WorkOrderStatusEnum target, LogType expected) {
|
||||
org.mockito.Mockito.reset(eventLogRecorder);
|
||||
listener.onAfterCommit(successEvent(1000L + target.ordinal(),
|
||||
WorkOrderStatusEnum.PENDING, target));
|
||||
|
||||
EventLogRecord rec = captureRecord();
|
||||
assertEquals(expected.getCode(), rec.getEventType(),
|
||||
"target=" + target + " 应映射到 " + expected);
|
||||
}
|
||||
|
||||
private EventLogRecord captureRecord() {
|
||||
ArgumentCaptor<EventLogRecord> captor = ArgumentCaptor.forClass(EventLogRecord.class);
|
||||
verify(eventLogRecorder).recordSync(captor.capture());
|
||||
return captor.getValue();
|
||||
}
|
||||
}
|
||||
@@ -1,182 +1,228 @@
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* 工单编号生成器测试
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class OrderCodeGeneratorTest {
|
||||
|
||||
@Mock
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Mock
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
|
||||
private OrderCodeGenerator orderCodeGenerator;
|
||||
|
||||
private static final String DATE_FORMAT = "yyyyMMdd";
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern(DATE_FORMAT);
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
orderCodeGenerator = new OrderCodeGenerator();
|
||||
ReflectionTestUtils.setField(orderCodeGenerator, "stringRedisTemplate", stringRedisTemplate);
|
||||
|
||||
// Mock stringRedisTemplate.opsForValue()
|
||||
lenient().when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_FirstOrderOfToday() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(1L);
|
||||
|
||||
// When
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
String expected = String.format("CLEAN-%s-0001", expectedDate);
|
||||
assertEquals(expected, orderCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_SecondOrderOfToday() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(2L);
|
||||
|
||||
// When
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
String expected = String.format("CLEAN-%s-0002", expectedDate);
|
||||
assertEquals(expected, orderCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_DifferentBusinessTypes() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(1L);
|
||||
|
||||
// When
|
||||
String cleanCode = orderCodeGenerator.generate("CLEAN");
|
||||
String securityCode = orderCodeGenerator.generate("SECURITY");
|
||||
|
||||
// Then
|
||||
assertTrue(cleanCode.startsWith("CLEAN-"));
|
||||
assertTrue(securityCode.startsWith("SECURITY-"));
|
||||
assertNotEquals(cleanCode, securityCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_LargeSequenceNumber() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(9999L);
|
||||
|
||||
// When
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
String expected = String.format("CLEAN-%s-9999", expectedDate);
|
||||
assertEquals(expected, orderCode);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_NullBusinessType_ThrowsException() {
|
||||
// When & Then
|
||||
assertThrows(IllegalArgumentException.class, () -> orderCodeGenerator.generate(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_EmptyBusinessType_ThrowsException() {
|
||||
// When & Then
|
||||
assertThrows(IllegalArgumentException.class, () -> orderCodeGenerator.generate(""));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_SetsExpirationOnFirstUse() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(1L);
|
||||
|
||||
// When
|
||||
orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then
|
||||
verify(stringRedisTemplate).expire(anyString(), eq(7L), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetCurrentSeq_NoExistingRecords_ReturnsZero() {
|
||||
// Given
|
||||
when(valueOperations.get(anyString())).thenReturn(null);
|
||||
|
||||
// When
|
||||
long seq = orderCodeGenerator.getCurrentSeq("CLEAN");
|
||||
|
||||
// Then
|
||||
assertEquals(0, seq);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetCurrentSeq_ExistingRecords_ReturnsValue() {
|
||||
// Given
|
||||
when(valueOperations.get(anyString())).thenReturn("5");
|
||||
|
||||
// When
|
||||
long seq = orderCodeGenerator.getCurrentSeq("CLEAN");
|
||||
|
||||
// Then
|
||||
assertEquals(5, seq);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReset_DeletesKey() {
|
||||
// When
|
||||
orderCodeGenerator.reset("CLEAN");
|
||||
|
||||
// Then
|
||||
verify(stringRedisTemplate).delete(contains("CLEAN"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_FormatConsistency() {
|
||||
// Given
|
||||
when(valueOperations.increment(anyString())).thenReturn(1L);
|
||||
|
||||
// When
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then: 验证格式为 {TYPE}-{DATE}-{SEQUENCE}
|
||||
String[] parts = orderCode.split("-");
|
||||
assertEquals(3, parts.length);
|
||||
assertEquals("CLEAN", parts[0]);
|
||||
|
||||
// 验证日期部分是8位数字
|
||||
assertEquals(8, parts[1].length());
|
||||
assertTrue(parts[1].matches("\\d{8}"));
|
||||
|
||||
// 验证序号部分是4位数字
|
||||
assertEquals(4, parts[2].length());
|
||||
assertTrue(parts[2].matches("\\d{4}"));
|
||||
}
|
||||
}
|
||||
package com.viewsh.module.ops.infrastructure.code;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.mockito.junit.jupiter.MockitoSettings;
|
||||
import org.mockito.quality.Strictness;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
import org.springframework.data.redis.core.script.RedisScript;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.test.util.ReflectionTestUtils;
|
||||
|
||||
import java.time.LocalDate;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.ArgumentMatchers.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/**
|
||||
* 工单编号生成器测试
|
||||
*
|
||||
* @author lzh
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
@MockitoSettings(strictness = Strictness.LENIENT)
|
||||
class OrderCodeGeneratorTest {
|
||||
|
||||
@Mock
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Mock
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
|
||||
@Mock
|
||||
private JdbcTemplate jdbcTemplate;
|
||||
|
||||
private OrderCodeGenerator orderCodeGenerator;
|
||||
|
||||
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd");
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
orderCodeGenerator = new OrderCodeGenerator();
|
||||
ReflectionTestUtils.setField(orderCodeGenerator, "stringRedisTemplate", stringRedisTemplate);
|
||||
ReflectionTestUtils.setField(orderCodeGenerator, "jdbcTemplate", jdbcTemplate);
|
||||
|
||||
when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
/**
|
||||
* 统一 mock Lua 脚本调用
|
||||
*/
|
||||
private void mockLuaScript(Long returnValue) {
|
||||
when(stringRedisTemplate.execute(any(RedisScript.class), anyList(), anyString(), anyString()))
|
||||
.thenReturn(returnValue);
|
||||
}
|
||||
|
||||
private void mockDbMaxSeq(Integer maxSeq) {
|
||||
when(jdbcTemplate.queryForObject(anyString(), eq(Integer.class), anyInt(), anyString()))
|
||||
.thenReturn(maxSeq);
|
||||
}
|
||||
|
||||
// ==================== 首次调用(校准)测试 ====================
|
||||
|
||||
@Test
|
||||
void testGenerate_FirstCall_CalibratesFromDatabase() {
|
||||
// Given: 数据库中已有 3 条记录
|
||||
mockDbMaxSeq(3);
|
||||
mockLuaScript(4L);
|
||||
|
||||
// When
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Then
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
assertEquals(String.format("CLEAN-%s-0004", expectedDate), orderCode);
|
||||
verify(stringRedisTemplate).execute(any(RedisScript.class), anyList(), eq("3"), anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_FirstCall_NoDatabaseRecords() {
|
||||
mockDbMaxSeq(null);
|
||||
mockLuaScript(1L);
|
||||
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
assertEquals(String.format("CLEAN-%s-0001", expectedDate), orderCode);
|
||||
verify(stringRedisTemplate).execute(any(RedisScript.class), anyList(), eq("0"), anyString());
|
||||
}
|
||||
|
||||
// ==================== 后续调用(纯 Redis)测试 ====================
|
||||
|
||||
@Test
|
||||
void testGenerate_SubsequentCall_PureRedisIncrement() {
|
||||
// 首次调用:校准
|
||||
mockDbMaxSeq(0);
|
||||
mockLuaScript(1L);
|
||||
orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// 第二次调用:纯 Redis
|
||||
when(valueOperations.increment(anyString())).thenReturn(2L);
|
||||
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
assertEquals(String.format("CLEAN-%s-0002", expectedDate), orderCode);
|
||||
// Lua 脚本只调用一次
|
||||
verify(stringRedisTemplate, times(1)).execute(any(RedisScript.class), anyList(), anyString(), anyString());
|
||||
verify(valueOperations, times(1)).increment(anyString());
|
||||
}
|
||||
|
||||
// ==================== 不同业务类型独立计数 ====================
|
||||
|
||||
@Test
|
||||
void testGenerate_DifferentBusinessTypes_IndependentCalibration() {
|
||||
mockDbMaxSeq(0);
|
||||
mockLuaScript(1L);
|
||||
|
||||
String cleanCode = orderCodeGenerator.generate("CLEAN");
|
||||
String securityCode = orderCodeGenerator.generate("SECURITY");
|
||||
|
||||
assertTrue(cleanCode.startsWith("CLEAN-"));
|
||||
assertTrue(securityCode.startsWith("SECURITY-"));
|
||||
// 各业务类型各校准一次
|
||||
verify(stringRedisTemplate, times(2)).execute(any(RedisScript.class), anyList(), anyString(), anyString());
|
||||
}
|
||||
|
||||
// ==================== 异常处理测试 ====================
|
||||
|
||||
@Test
|
||||
void testGenerate_NullBusinessType_ThrowsException() {
|
||||
assertThrows(IllegalArgumentException.class, () -> orderCodeGenerator.generate(null));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_EmptyBusinessType_ThrowsException() {
|
||||
assertThrows(IllegalArgumentException.class, () -> orderCodeGenerator.generate(""));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_DatabaseError_ThrowsException() {
|
||||
when(jdbcTemplate.queryForObject(anyString(), eq(Integer.class), anyInt(), anyString()))
|
||||
.thenThrow(new RuntimeException("DB connection failed"));
|
||||
|
||||
assertThrows(RuntimeException.class, () -> orderCodeGenerator.generate("CLEAN"));
|
||||
}
|
||||
|
||||
// ==================== 格式验证 ====================
|
||||
|
||||
@Test
|
||||
void testGenerate_FormatConsistency() {
|
||||
mockDbMaxSeq(0);
|
||||
mockLuaScript(1L);
|
||||
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
String[] parts = orderCode.split("-");
|
||||
assertEquals(3, parts.length);
|
||||
assertEquals("CLEAN", parts[0]);
|
||||
assertEquals(8, parts[1].length());
|
||||
assertTrue(parts[1].matches("\\d{8}"));
|
||||
assertEquals(4, parts[2].length());
|
||||
assertTrue(parts[2].matches("\\d{4}"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGenerate_LargeSequenceNumber() {
|
||||
mockDbMaxSeq(9998);
|
||||
mockLuaScript(9999L);
|
||||
|
||||
String orderCode = orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
String expectedDate = LocalDate.now().format(DATE_FORMATTER);
|
||||
assertEquals(String.format("CLEAN-%s-9999", expectedDate), orderCode);
|
||||
}
|
||||
|
||||
// ==================== 辅助方法测试 ====================
|
||||
|
||||
@Test
|
||||
void testGetCurrentSeq_NoExistingRecords_ReturnsZero() {
|
||||
when(valueOperations.get(anyString())).thenReturn(null);
|
||||
assertEquals(0, orderCodeGenerator.getCurrentSeq("CLEAN"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testGetCurrentSeq_ExistingRecords_ReturnsValue() {
|
||||
when(valueOperations.get(anyString())).thenReturn("5");
|
||||
assertEquals(5, orderCodeGenerator.getCurrentSeq("CLEAN"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testReset_DeletesKeyAndClearsCalibration() {
|
||||
// 先校准
|
||||
mockDbMaxSeq(0);
|
||||
mockLuaScript(1L);
|
||||
orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// 重置
|
||||
orderCodeGenerator.reset("CLEAN");
|
||||
|
||||
// 再次 generate 应重新校准
|
||||
orderCodeGenerator.generate("CLEAN");
|
||||
|
||||
// Lua 脚本被调用 2 次
|
||||
verify(stringRedisTemplate, times(2)).execute(any(RedisScript.class), anyList(), anyString(), anyString());
|
||||
}
|
||||
|
||||
// ==================== SQL 安全测试 ====================
|
||||
|
||||
@Test
|
||||
void testGetMaxSeqFromDatabase_EscapesLikeWildcards() {
|
||||
mockDbMaxSeq(null);
|
||||
|
||||
orderCodeGenerator.getMaxSeqFromDatabase("CLEAN%TEST", "20260413");
|
||||
|
||||
verify(jdbcTemplate).queryForObject(
|
||||
anyString(),
|
||||
eq(Integer.class),
|
||||
anyInt(),
|
||||
eq("CLEAN\\%TEST-20260413-%")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
package com.viewsh.module.ops.service.area;
|
||||
|
||||
import com.viewsh.framework.common.pojo.CommonResult;
|
||||
import com.viewsh.module.iot.api.device.IotDeviceQueryApi;
|
||||
import com.viewsh.module.iot.api.device.dto.IotDeviceSimpleRespDTO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsAreaDeviceRelationDO;
|
||||
import com.viewsh.module.ops.dal.dataobject.area.OpsBusAreaDO;
|
||||
import com.viewsh.module.ops.dal.mysql.area.OpsAreaDeviceRelationMapper;
|
||||
@@ -15,6 +18,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.InjectMocks;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
@@ -43,14 +47,20 @@ class AreaDeviceRelationServiceTest {
|
||||
@Mock
|
||||
private OpsAreaDeviceRelationMapper opsAreaDeviceRelationMapper;
|
||||
|
||||
@Mock
|
||||
private OpsBusAreaMapper opsBusAreaMapper;
|
||||
|
||||
@Mock
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@InjectMocks
|
||||
private AreaDeviceRelationServiceImpl areaDeviceRelationService;
|
||||
@Mock
|
||||
private OpsBusAreaMapper opsBusAreaMapper;
|
||||
|
||||
@Mock
|
||||
private AreaDeviceService areaDeviceService;
|
||||
|
||||
@Mock
|
||||
private ApplicationEventPublisher eventPublisher;
|
||||
|
||||
@Mock
|
||||
private IotDeviceQueryApi iotDeviceQueryApi;
|
||||
|
||||
@InjectMocks
|
||||
private AreaDeviceRelationServiceImpl areaDeviceRelationService;
|
||||
|
||||
private OpsBusAreaDO testArea;
|
||||
private OpsAreaDeviceRelationDO testRelation;
|
||||
@@ -121,12 +131,22 @@ class AreaDeviceRelationServiceTest {
|
||||
return 1;
|
||||
});
|
||||
|
||||
// bindDevice 内部会调 IoT 接口阻断式校验设备存在性
|
||||
IotDeviceSimpleRespDTO iotDevice = new IotDeviceSimpleRespDTO();
|
||||
iotDevice.setId(50001L);
|
||||
iotDevice.setDeviceName("TRAFFIC_COUNTER_001");
|
||||
iotDevice.setProductId(10L);
|
||||
iotDevice.setProductKey("traffic_counter_v1");
|
||||
when(iotDeviceQueryApi.getDevice(50001L)).thenReturn(CommonResult.success(iotDevice));
|
||||
|
||||
// When
|
||||
Long relationId = areaDeviceRelationService.bindDevice(bindReq);
|
||||
|
||||
// Then
|
||||
assertNotNull(relationId);
|
||||
verify(opsAreaDeviceRelationMapper, times(1)).insert(any(OpsAreaDeviceRelationDO.class));
|
||||
// 验证绑定成功后发布事件,供条线监听器回填 Redis
|
||||
verify(eventPublisher, times(1)).publishEvent(any(com.viewsh.module.ops.service.area.event.AreaDeviceBoundEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -225,6 +245,8 @@ class AreaDeviceRelationServiceTest {
|
||||
// Then
|
||||
assertTrue(result);
|
||||
verify(opsAreaDeviceRelationMapper, times(1)).deleteById(1L);
|
||||
// 验证解绑后发布事件,供条线监听器清理 Redis
|
||||
verify(eventPublisher, times(1)).publishEvent(any(com.viewsh.module.ops.service.area.event.AreaDeviceUnboundEvent.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -238,6 +260,8 @@ class AreaDeviceRelationServiceTest {
|
||||
// Then
|
||||
assertFalse(result); // 第一次就返回false
|
||||
verify(opsAreaDeviceRelationMapper, never()).deleteById(anyLong());
|
||||
// 不存在的关联不应触发事件
|
||||
verify(eventPublisher, never()).publishEvent(any());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -77,13 +77,21 @@ class OrderQueueServiceEnhancedTest {
|
||||
when(orderQueueMapper.selectListByUserIdAndStatus(userId, OrderQueueStatusEnum.WAITING.getStatus()))
|
||||
.thenReturn(List.of(olderFarTask, newerNearTask));
|
||||
when(orderQueueMapper.selectCurrentExecutingByUserId(userId)).thenReturn(currentTask);
|
||||
when(orderQueueMapper.selectListByUserId(userId)).thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
|
||||
// syncUserQueueToRedis 走 selectActiveListByUserId(Bug#6),只返回活跃态
|
||||
when(orderQueueMapper.selectActiveListByUserId(userId))
|
||||
.thenReturn(List.of(olderFarTask, newerNearTask, currentTask));
|
||||
// resolveOrderAreaId 仍单条 selectById(PROCESSING 工单的 area)
|
||||
when(orderMapper.selectById(900L)).thenReturn(OpsOrderDO.builder().id(900L).areaId(501L).build());
|
||||
when(orderMapper.selectById(101L)).thenReturn(OpsOrderDO.builder().id(101L).areaId(503L).build());
|
||||
when(orderMapper.selectById(102L)).thenReturn(OpsOrderDO.builder().id(102L).areaId(502L).build());
|
||||
// WAITING 工单批量加载
|
||||
when(orderMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection()))
|
||||
.thenReturn(List.of(
|
||||
OpsOrderDO.builder().id(101L).areaId(503L).build(),
|
||||
OpsOrderDO.builder().id(102L).areaId(502L).build()));
|
||||
when(areaMapper.selectById(501L)).thenReturn(OpsBusAreaDO.builder().id(501L).floorNo(5).build());
|
||||
when(areaMapper.selectById(502L)).thenReturn(OpsBusAreaDO.builder().id(502L).floorNo(6).build());
|
||||
when(areaMapper.selectById(503L)).thenReturn(OpsBusAreaDO.builder().id(503L).floorNo(8).build());
|
||||
when(areaMapper.selectBatchIds(org.mockito.ArgumentMatchers.anyCollection()))
|
||||
.thenReturn(List.of(
|
||||
OpsBusAreaDO.builder().id(503L).floorNo(8).build(),
|
||||
OpsBusAreaDO.builder().id(502L).floorNo(6).build()));
|
||||
when(orderQueueMapper.updateById(any(OpsOrderQueueDO.class))).thenReturn(1);
|
||||
|
||||
List<OrderQueueDTO> rebuiltTasks = orderQueueService.rebuildWaitingTasksByUserId(userId, null);
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
package com.viewsh.module.ops.service.queue;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* 楼层权重修复的补充测试(commit a5f916c)。
|
||||
* <p>
|
||||
* 与 {@link QueueScoreCalculatorTest}(基础行为)互补,覆盖这次改动的三个关键不变量:
|
||||
* <ol>
|
||||
* <li><b>G 强楼层优先</b>:FLOOR_WEIGHT=100,10 层封顶 1000 > aging 封顶 720,
|
||||
* 保证等满 4 小时的任务也不会反超近楼层任务</li>
|
||||
* <li><b>B 语义对称</b>:base 或 target 任一缺失 → floorScore=0,不再有"有 base 无 target → +600"罚分</li>
|
||||
* <li><b>floor 封顶</b>:楼层差超过 MAX_FLOOR_DIFF=10 时按 10 计算</li>
|
||||
* </ol>
|
||||
*/
|
||||
class QueueScoreCalculatorEnhancedTest {
|
||||
|
||||
private final QueueScoreCalculator calculator = new QueueScoreCalculator();
|
||||
|
||||
@Test
|
||||
void strongFloorPriority_farLongWaitedTaskShouldNotOvertakeNearJustInTask() {
|
||||
// G: 同 P1 优先级下,"远楼层+等满 4 小时" 不应反超 "近楼层+刚入队"。
|
||||
// 近刚入队: priority=1500, floor=0, aging=0 → 1500
|
||||
// 远等满: priority=1500, floor=10*100=1000, aging=720 → 1780
|
||||
// near.score (1500) < far.score (1780) → near 先派发,符合"强楼层优先"
|
||||
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
|
||||
|
||||
QueueScoreResult nearJustIn = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(3).targetFloorNo(3)
|
||||
.enqueueTime(now).now(now)
|
||||
.build());
|
||||
|
||||
QueueScoreResult farLongWaited = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(3).targetFloorNo(13) // diff=10(封顶)
|
||||
.enqueueTime(now.minusHours(4)) // aging 封顶 240 min
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
assertTrue(nearJustIn.getTotalScore() < farLongWaited.getTotalScore(),
|
||||
"near=" + nearJustIn.getTotalScore() + " far=" + farLongWaited.getTotalScore()
|
||||
+ ":远楼层即便等满也不应反超近楼层");
|
||||
}
|
||||
|
||||
@Test
|
||||
void symmetricNullHandling_baseFloorMissing_shouldGiveZeroFloorScore() {
|
||||
// B: baseFloor=null(执行人位置未知),不应被动扣分
|
||||
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
|
||||
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(null).targetFloorNo(5)
|
||||
.enqueueTime(now).now(now)
|
||||
.build());
|
||||
|
||||
// priorityScore=1500, floorScore=0, aging=0 → 1500
|
||||
assertEquals(1500.0, result.getTotalScore(), 0.001);
|
||||
assertNull(result.getFloorDiff(), "floorDiff 应为 null,表示楼层信息不完整");
|
||||
}
|
||||
|
||||
@Test
|
||||
void symmetricNullHandling_targetFloorMissing_shouldGiveZeroFloorScore() {
|
||||
// B: targetFloor=null(工单区域未登记楼层)同样应得 floorScore=0——与 baseFloor 缺失等价
|
||||
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
|
||||
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5).targetFloorNo(null)
|
||||
.enqueueTime(now).now(now)
|
||||
.build());
|
||||
|
||||
assertEquals(1500.0, result.getTotalScore(), 0.001);
|
||||
assertNull(result.getFloorDiff());
|
||||
}
|
||||
|
||||
@Test
|
||||
void symmetricNullHandling_bothTasksWithPartialFloor_shouldSortByAgingOnly() {
|
||||
// 关键回归:旧逻辑会给"有 base 无 target"+600 罚分,导致同一工单在不同 base 场景排序不单调。
|
||||
// 现在两个任务楼层信息同等"不完整"应仅靠 aging 排序,"等得久"的排前。
|
||||
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
|
||||
|
||||
QueueScoreResult newerNoTarget = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5).targetFloorNo(null)
|
||||
.enqueueTime(now.minusMinutes(5))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
QueueScoreResult olderNoTarget = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(5).targetFloorNo(null)
|
||||
.enqueueTime(now.minusMinutes(80))
|
||||
.now(now)
|
||||
.build());
|
||||
|
||||
// 两者 floorScore 都 =0,aging 越大分越低 → older 先派
|
||||
assertTrue(olderNoTarget.getTotalScore() < newerNoTarget.getTotalScore());
|
||||
}
|
||||
|
||||
@Test
|
||||
void floorCappedAtMaxFloorDiff_evenWhenActualDiffMuchLarger() {
|
||||
// 楼层差 25 应按 10 封顶:floorScore = 10 × 100 = 1000
|
||||
LocalDateTime now = LocalDateTime.of(2026, 4, 20, 12, 0);
|
||||
QueueScoreResult result = calculator.calculate(QueueScoreContext.builder()
|
||||
.priority(1)
|
||||
.baseFloorNo(0).targetFloorNo(25)
|
||||
.enqueueTime(now).now(now)
|
||||
.build());
|
||||
|
||||
assertEquals(25, result.getFloorDiff(), "floorDiff 透传原始差值,便于诊断");
|
||||
// priorityScore=1500 + floorScore(capped)=1000 - aging=0 = 2500
|
||||
assertEquals(2500.0, result.getTotalScore(), 0.001,
|
||||
"超过 10 层差应按 10 层封顶计算分数");
|
||||
}
|
||||
|
||||
@Test
|
||||
void floorWeightShouldDominateAgingCap() {
|
||||
// 锁死这次改动的核心不变量:FLOOR_WEIGHT * MAX_FLOOR_DIFF > AGING_WEIGHT * MAX_AGING_MINUTES
|
||||
// 即 100 * 10 = 1000 > 3 * 240 = 720
|
||||
int floorMax = QueueScoreCalculator.FLOOR_WEIGHT * QueueScoreCalculator.MAX_FLOOR_DIFF;
|
||||
int agingMax = QueueScoreCalculator.AGING_WEIGHT * QueueScoreCalculator.MAX_AGING_MINUTES;
|
||||
assertTrue(floorMax > agingMax,
|
||||
"权重失衡:floor 封顶 " + floorMax + " 不再压倒 aging 封顶 " + agingMax
|
||||
+ ",会导致等得久的远楼层任务反超近楼层");
|
||||
}
|
||||
}
|
||||
@@ -146,6 +146,12 @@ viewsh:
|
||||
connect-timeout: 5000
|
||||
read-timeout: 10000
|
||||
max-retry: 2
|
||||
clean:
|
||||
auto-cancel:
|
||||
# 保洁工单 update_time 距今超过此小时数视为卡死,由 CleanOrderAutoCancelJob 自动取消
|
||||
timeout-hours: 12
|
||||
# 单次扫描/取消上限,防止事件风暴;超出的工单留给下一轮 cron
|
||||
batch-size: 200
|
||||
# API 签名配置:外部系统调用开放接口时使用(如安保工单的告警系统)
|
||||
signature:
|
||||
apps:
|
||||
|
||||
Reference in New Issue
Block a user