17 changed files with 292 additions and 380 deletions
@ -1,316 +1,109 @@ |
|||||
package org.nl.acs.opc; |
package org.nl.acs.opc; |
||||
|
|
||||
import cn.hutool.core.util.StrUtil; |
import cn.hutool.core.util.ObjectUtil; |
||||
import com.alibaba.fastjson.JSON; |
import org.dromara.dynamictp.core.support.ThreadPoolBuilder; |
||||
import lombok.extern.slf4j.Slf4j; |
|
||||
import org.apache.lucene.util.NamedThreadFactory; |
|
||||
import org.nl.acs.AcsConfig; |
|
||||
import org.nl.acs.auto.run.AbstractAutoRunnable; |
import org.nl.acs.auto.run.AbstractAutoRunnable; |
||||
import org.nl.acs.opc.service.dto.OpcServerManageDto; |
import org.nl.acs.opc.service.dto.OpcServerManageDto; |
||||
import org.nl.acs.udw.UnifiedDataAccessor; |
import org.nl.config.thread.TheadFactoryName; |
||||
import org.nl.acs.udw.UnifiedDataAccessorFactory; |
|
||||
import org.nl.acs.udw.UnifiedDataAppService; |
|
||||
import org.nl.common.enums.LogTypeEnum; |
|
||||
import org.nl.config.SpringContextHolder; |
|
||||
import org.nl.system.service.lucene.dto.LuceneLogDto; |
|
||||
import org.nl.system.service.param.ISysParamService; |
|
||||
import org.openscada.opc.lib.da.Group; |
|
||||
import org.openscada.opc.lib.da.Item; |
|
||||
import org.openscada.opc.lib.da.ItemState; |
|
||||
import org.springframework.beans.factory.annotation.Autowired; |
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
import org.springframework.stereotype.Component; |
||||
|
|
||||
import java.util.*; |
import javax.annotation.Resource; |
||||
import java.util.Map.Entry; |
import java.util.Iterator; |
||||
import java.util.concurrent.*; |
import java.util.List; |
||||
import java.util.regex.Pattern; |
import java.util.Map; |
||||
|
import java.util.Set; |
||||
|
import java.util.concurrent.ExecutorService; |
||||
|
import java.util.concurrent.Executors; |
||||
|
import java.util.concurrent.ThreadPoolExecutor; |
||||
|
import java.util.concurrent.TimeUnit; |
||||
|
|
||||
|
import static org.dromara.dynamictp.common.em.QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE; |
||||
|
|
||||
/** |
/** |
||||
* OPC设备同步启动 |
* OPC设备同步启动 |
||||
|
* @author 20220102CG\noblelift |
||||
*/ |
*/ |
||||
@Component |
@Component |
||||
@Slf4j |
|
||||
public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { |
public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { |
||||
static boolean isRun = true; |
|
||||
|
public static boolean isRun = false; |
||||
|
ExecutorService executorService = ThreadPoolBuilder.newBuilder() |
||||
|
.threadPoolName("deviceOpc_thread") |
||||
|
.threadFactory("deviceOpc_thread") |
||||
|
.corePoolSize(80) |
||||
|
.maximumPoolSize(100) |
||||
|
.keepAliveTime(40) |
||||
|
.timeUnit(TimeUnit.SECONDS) |
||||
|
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000) |
||||
|
.buildDynamic(); |
||||
|
|
||||
@Autowired |
@Autowired |
||||
private DeviceAppService deviceAppService; |
private DeviceAppService deviceAppService; |
||||
@Autowired |
@Autowired |
||||
private OpcServerManageService opcServerManageService; |
private OpcServerManageService opcServerManageService; |
||||
// @Autowired
|
|
||||
// LuceneExecuteLogService lucene;
|
|
||||
|
|
||||
static ExecutorService executorService; |
|
||||
public static Map<String, OpcServerManageDto> opcServersConfig; |
|
||||
public static Map<String, OpcItemDto> itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); |
|
||||
|
|
||||
static boolean canRefreshOpcEntity = true; |
|
||||
private long lastRefreshOpcEntityTime; |
|
||||
static UnifiedDataAccessor udw; |
|
||||
private static Map<String, Boolean> canReadOpcValues; |
|
||||
private static volatile Map<String, OpcEntity> opcCodeOpcEntityMapping; |
|
||||
|
|
||||
public DeviceOpcSynchronizeAutoRun() { |
|
||||
this.lastRefreshOpcEntityTime = 0L; |
|
||||
} |
|
||||
|
|
||||
|
@Override |
||||
public String getCode() { |
public String getCode() { |
||||
return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); |
return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); |
||||
} |
} |
||||
|
|
||||
|
@Override |
||||
public String getName() { |
public String getName() { |
||||
return "opc设备同步器"; |
return "opc设备同步器"; |
||||
} |
} |
||||
|
|
||||
static Group getGroup(String opcCode) throws Exception { |
@Override |
||||
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); |
public void autoRun() throws Exception { |
||||
return opcServerService.getServer(opcCode); |
{ |
||||
} |
//Thread.sleep(10000L);
|
||||
|
isRun = true; |
||||
static void submitTimeLimitTask(Runnable runnable, String opcCode) { |
|
||||
CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executorService); |
Map<String, OpcServerManageDto> servers = this.opcServerManageService.queryAllServerMap(); |
||||
|
Map<String, List<List<OpcItemDto>>> pros; |
||||
// try {
|
do { |
||||
// future.get(10L, TimeUnit.SECONDS);
|
Thread.sleep(1000L); |
||||
// } catch (InterruptedException var9) {
|
pros = this.deviceAppService.findAllFormatProtocolFromDriver(); |
||||
// Thread.currentThread().interrupt();
|
} while (ObjectUtil.isEmpty(pros)); |
||||
// } catch (ExecutionException var10) {
|
Set<String> keys = pros.keySet(); |
||||
// var10.printStackTrace();
|
Iterator var4 = keys.iterator(); |
||||
// } catch (TimeoutException var11) {
|
//代码执行一次
|
||||
// itemCodeOpcItemDtoMapping.keySet().forEach((key) -> {
|
while (var4.hasNext()) { |
||||
// udw.setValue(key, (Object) null);
|
String key = (String) var4.next(); |
||||
// });
|
List<List<OpcItemDto>> list = (List) pros.get(key); |
||||
// canReadOpcValues = new ConcurrentHashMap<>();
|
OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); |
||||
// System.out.println("opc设备同步器 任务执行超时,取消任务...");
|
Iterator var8 = list.iterator(); |
||||
// future.cancel(true);
|
while (var8.hasNext()) { |
||||
// } finally {
|
List<OpcItemDto> groupProtols = (List) var8.next(); |
||||
// canRefreshOpcEntity = true;
|
DeviceOpcProtocolRunable runable = new DeviceOpcProtocolRunable(); |
||||
// if (opcCode != null) {
|
runable.setProtocols(groupProtols); |
||||
// canReadOpcValues.put(opcCode, true);
|
runable.setOpcServer(opcServer); |
||||
// }
|
this.executorService.submit(runable); |
||||
//
|
|
||||
// }
|
|
||||
} |
|
||||
|
|
||||
private ExecutorService createThreadPool() { |
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(32, 32, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("opc-sync")); |
|
||||
executor.allowCoreThreadTimeOut(true); |
|
||||
return executor; |
|
||||
} |
|
||||
|
|
||||
public void autoRun() { |
|
||||
OpcStartTag.is_run = true; |
|
||||
opcServersConfig = this.opcServerManageService.queryAllServerMap(); |
|
||||
executorService = this.createThreadPool(); |
|
||||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
|
||||
itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { |
|
||||
udw.setValue(key, (Object) null); |
|
||||
}); |
|
||||
canRefreshOpcEntity = true; |
|
||||
canReadOpcValues.clear(); |
|
||||
|
|
||||
while (true) { |
|
||||
this.refreshOpcEntity(); |
|
||||
Iterator var1 = opcServersConfig.keySet().iterator(); |
|
||||
|
|
||||
while (var1.hasNext()) { |
|
||||
String opcCode = (String) var1.next(); |
|
||||
submitTimeLimitTask(() -> { |
|
||||
boolean in = false; |
|
||||
try { |
|
||||
if (canReadOpcValues.computeIfAbsent(opcCode, (key) -> true)) { |
|
||||
in = true; |
|
||||
canReadOpcValues.put(opcCode, false); |
|
||||
this.readOpcValues(opcCode); |
|
||||
} |
|
||||
} catch (Exception var3) { |
|
||||
var3.printStackTrace(); |
|
||||
} finally { |
|
||||
canRefreshOpcEntity = true; |
|
||||
if (opcCode != null && in) { |
|
||||
canReadOpcValues.put(opcCode, true); |
|
||||
} |
|
||||
} |
|
||||
}, opcCode); |
|
||||
} |
|
||||
|
|
||||
ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
private void readOpcValues(String opcCode) throws Exception { |
|
||||
synchronized (opcCode.intern()) { |
|
||||
OpcEntity opcEntity = (OpcEntity) opcCodeOpcEntityMapping.get(opcCode); |
|
||||
if (opcEntity != null) { |
|
||||
if (opcEntity.getItems().size() != 0) { |
|
||||
long begin = System.currentTimeMillis(); |
|
||||
if (log.isTraceEnabled()) { |
|
||||
log.trace("opc {} 开始计时{}", opcCode, begin); |
|
||||
} |
|
||||
|
|
||||
new HashMap(); |
|
||||
|
|
||||
Map<Item, ItemState> itemStatus; |
|
||||
try { |
|
||||
itemStatus = opcEntity.readAll(); |
|
||||
} catch (Exception var15) { |
|
||||
itemStatus = opcEntity.readDividually(); |
|
||||
} |
|
||||
|
|
||||
long end = System.currentTimeMillis(); |
|
||||
long duration = end - begin; |
|
||||
if (log.isTraceEnabled()) { |
|
||||
log.trace("opc {} 读取耗时:{}", opcCode, duration); |
|
||||
} |
|
||||
|
|
||||
if (duration > 1000L) { |
|
||||
log.warn("opc {} 读取超时 : {}", opcCode, duration); |
|
||||
} |
|
||||
|
|
||||
// boolean allNull = itemStatus.entrySet().stream().map((map) -> {
|
|
||||
// return OpcUtl.getValue((Item)map.getKey(), (ItemState)map.getValue());
|
|
||||
// }).allMatch(Objects::isNull);
|
|
||||
// if (allNull) {
|
|
||||
// opcEntity.getItems().clear();
|
|
||||
// }
|
|
||||
|
|
||||
UnifiedDataAccessor udw = opcEntity.getUdw(); |
|
||||
|
|
||||
|
|
||||
Set<Item> items = itemStatus.keySet(); |
|
||||
Iterator var18 = items.iterator(); |
|
||||
|
|
||||
while (var18.hasNext()) { |
|
||||
Item item = (Item) var18.next(); |
|
||||
ItemState itemState = (ItemState) itemStatus.get(item); |
|
||||
Object nowValue = OpcUtl.getValue(item, itemState); |
|
||||
String itemId = item.getId(); |
|
||||
Object historyValue = udw.getValue(itemId); |
|
||||
if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && historyValue != null) { |
|
||||
log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); |
|
||||
} |
|
||||
if (!UnifiedDataAppService.isEquals(nowValue, historyValue)) { |
|
||||
OpcItemDto itemDto = (OpcItemDto) itemCodeOpcItemDtoMapping.get(itemId); |
|
||||
if (true) { |
|
||||
this.logItemChanged(itemId, udw, nowValue, itemDto); |
|
||||
} |
|
||||
udw.setValue(itemId, nowValue); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
} |
} |
||||
} |
} |
||||
} |
|
||||
} |
|
||||
|
|
||||
private void refreshOpcEntity() { |
|
||||
if (canRefreshOpcEntity) { |
|
||||
canRefreshOpcEntity = false; |
|
||||
long now = System.currentTimeMillis(); |
|
||||
if (now - this.lastRefreshOpcEntityTime >= 20000L) { |
|
||||
this.lastRefreshOpcEntityTime = now; |
|
||||
submitTimeLimitTask(() -> { |
|
||||
try { |
|
||||
Map<String, List<List<OpcItemDto>>> protocol = this.deviceAppService.findAllFormatProtocolFromDriver(); |
|
||||
Iterator var2 = protocol.entrySet().iterator(); |
|
||||
|
|
||||
while (var2.hasNext()) { |
// 同步无光电设备信号
|
||||
Entry<String, List<List<OpcItemDto>>> stringListEntry = (Entry) var2.next(); |
//Map<String, List<List<OpcItemDto>>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver();
|
||||
String opcCode = (String) stringListEntry.getKey(); |
//List<DeviceDriver> opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class);
|
||||
List<List<OpcItemDto>> opcItemDtos = (List) stringListEntry.getValue(); |
|
||||
((OpcEntity) opcCodeOpcEntityMapping.computeIfAbsent(opcCode, OpcEntity::new)).reload(opcItemDtos); |
|
||||
} |
|
||||
} catch (Exception var6) { |
|
||||
var6.printStackTrace(); |
|
||||
} finally { |
|
||||
canRefreshOpcEntity = true; |
|
||||
} |
|
||||
|
|
||||
}, (String) null); |
while (true) { |
||||
|
Thread.sleep(3000L); |
||||
} |
} |
||||
} |
} |
||||
} |
} |
||||
|
|
||||
private void logMessage(String errorMessage) { |
@Override |
||||
try { |
|
||||
// issueLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]);
|
|
||||
// businessLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]);
|
|
||||
} catch (Exception var5) { |
|
||||
var5.printStackTrace(); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
|
|
||||
public void after() { |
public void after() { |
||||
OpcStartTag.is_run = false; |
isRun = false; |
||||
opcCodeOpcEntityMapping.values().forEach((opcEntity) -> { |
this.executorService.shutdownNow(); |
||||
opcEntity.cleanUdwCache(); |
this.executorService = ThreadPoolBuilder.newBuilder() |
||||
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); |
.threadPoolName("deviceOpc_thread") |
||||
opcServerService.cleanGroups(opcEntity.getOpcCode()); |
.threadFactory("deviceOpc_thread") |
||||
}); |
.corePoolSize(80) |
||||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
.maximumPoolSize(100) |
||||
itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); |
.keepAliveTime(40) |
||||
executorService.shutdownNow(); |
.timeUnit(TimeUnit.SECONDS) |
||||
} |
.workQueue(MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), 2000) |
||||
|
.buildDynamic(); |
||||
private void logItemChanged(String itemId, UnifiedDataAccessor accessor_value, Object value, OpcItemDto itemDto) { |
|
||||
ISysParamService paramService = SpringContextHolder.getBean(ISysParamService.class); |
|
||||
Object his = accessor_value.getValue(itemId); |
|
||||
List<String> relate_items = itemDto.getRelate_items(); |
|
||||
if (relate_items != null && !relate_items.isEmpty()) { |
|
||||
StringBuilder sb = new StringBuilder(); |
|
||||
Iterator var8 = relate_items.iterator(); |
|
||||
|
|
||||
while (var8.hasNext()) { |
|
||||
String relate = (String) var8.next(); |
|
||||
Object obj = accessor_value.getValue(relate); |
|
||||
sb.append("key:" + relate + "value: " + obj + ";"); |
|
||||
} |
|
||||
if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { |
|
||||
// 存在上次点位值为null情况 则不记录日志
|
|
||||
if(!(his instanceof Float) && !(value instanceof Float)){ |
|
||||
LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), |
|
||||
String.valueOf(his), String.valueOf(value)); |
|
||||
luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); |
|
||||
String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); |
|
||||
if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ |
|
||||
log.info("{}", JSON.toJSONString(luceneLogDto)); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} else { |
|
||||
|
|
||||
// if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) {
|
|
||||
// if(!(his instanceof Float) && !(value instanceof Float)){
|
|
||||
// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1),
|
|
||||
// String.valueOf(his), String.valueOf(value));
|
|
||||
// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc());
|
|
||||
// log.info("{}", JSON.toJSONString(luceneLogDto));
|
|
||||
// }
|
|
||||
// }
|
|
||||
|
|
||||
if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { |
|
||||
if(!(his instanceof Float) && !(value instanceof Float)){ |
|
||||
LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), |
|
||||
String.valueOf(his), String.valueOf(value)); |
|
||||
luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); |
|
||||
String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); |
|
||||
if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ |
|
||||
log.info("{}", JSON.toJSONString(luceneLogDto)); |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
|
|
||||
static { |
|
||||
udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); |
|
||||
canReadOpcValues = new ConcurrentHashMap(); |
|
||||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
|
||||
} |
|
||||
|
|
||||
public static boolean isNumeric(String str) { |
|
||||
return Pattern.compile("^[0-9]+$").matcher(str).matches(); |
|
||||
} |
} |
||||
} |
} |
||||
|
@ -0,0 +1,34 @@ |
|||||
|
<?xml version="1.0" encoding="UTF-8"?> |
||||
|
<included> |
||||
|
<springProperty scope="context" name="logPath" source="logging.file.path" defaultValue="logs"/> |
||||
|
<property name="LOG_HOME" value="${logPath}"/> |
||||
|
<!--<define name="DEVICECODE" class="org.nl.common.logging.DeviceCodeDir"/>--> |
||||
|
<!-- 按照每天生成日志文件 --> |
||||
|
<appender name="FILE_11" class="ch.qos.logback.core.rolling.RollingFileAppender"> |
||||
|
<rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> |
||||
|
<!--日志文件输出的文件名--> |
||||
|
<FileNamePattern>${LOG_HOME}/connectorOrSorting/%d{yyyy-MM-dd}.%i.log</FileNamePattern> |
||||
|
<!--日志文件保留天数--> |
||||
|
<maxHistory>15</maxHistory> |
||||
|
<!--单个日志最大容量 至少10MB才能看得出来--> |
||||
|
<maxFileSize>200MB</maxFileSize> |
||||
|
<!--所有日志最多占多大容量--> |
||||
|
<totalSizeCap>2GB</totalSizeCap> |
||||
|
</rollingPolicy> |
||||
|
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> |
||||
|
<!--格式化输出:%d表示日期,%thread表示线程名,%-5level:级别从左显示5个字符宽度%msg:日志消息,%n是换行符--> |
||||
|
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> |
||||
|
<charset>${log.charset}</charset> |
||||
|
</encoder> |
||||
|
|
||||
|
</appender> |
||||
|
|
||||
|
<!-- <logger name="org.nl.start.Init" level="info" additivity="false"> |
||||
|
<appender-ref ref="FILE3"/> |
||||
|
</logger>--> |
||||
|
|
||||
|
<!-- 打印sql --> |
||||
|
<logger name="org.nl.wms.sch.task_manage.task.ConnectorUtil" level="info" additivity="false"> |
||||
|
<appender-ref ref="FILE_11"/> |
||||
|
</logger> |
||||
|
</included> |
Loading…
Reference in new issue