diff --git a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java index fba634d..005603d 100644 --- a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java +++ b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/DeviceOpcSynchronizeAutoRun.java @@ -1,83 +1,317 @@ package org.nl.acs.opc; -import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSON; +import lombok.extern.slf4j.Slf4j; +import org.apache.lucene.util.NamedThreadFactory; +import org.nl.acs.AcsConfig; import org.nl.acs.auto.run.AbstractAutoRunnable; import org.nl.acs.opc.service.dto.OpcServerManageDto; +import org.nl.acs.udw.UnifiedDataAccessor; +import org.nl.acs.udw.UnifiedDataAccessorFactory; +import org.nl.acs.udw.UnifiedDataAppService; +import org.nl.common.enums.LogTypeEnum; +import org.nl.config.SpringContextHolder; +import org.nl.system.service.param.ISysParamService; +import org.openscada.opc.lib.da.Group; +import org.openscada.opc.lib.da.Item; +import org.openscada.opc.lib.da.ItemState; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.*; +import java.util.regex.Pattern; /** * OPC设备同步启动 */ @Component +@Slf4j public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { - - public static boolean isRun = false; - ExecutorService executorService = Executors.newCachedThreadPool(); + static boolean isRun = true; @Autowired private DeviceAppService deviceAppService; @Autowired private OpcServerManageService opcServerManageService; +// @Autowired +// LuceneExecuteLogService lucene; + + static ExecutorService executorService; + public static Map opcServersConfig; + public static Map itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); + + static boolean canRefreshOpcEntity = true; + private long lastRefreshOpcEntityTime; + static UnifiedDataAccessor udw; + private static Map canReadOpcValues; + private static volatile Map opcCodeOpcEntityMapping; + + public DeviceOpcSynchronizeAutoRun() { + this.lastRefreshOpcEntityTime = 0L; + } - @Override public String getCode() { return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); } - @Override public String getName() { return "opc设备同步器"; } - @Override - public void autoRun() throws Exception { - { - isRun = true; - - Map servers = this.opcServerManageService.queryAllServerMap(); - Map>> pros; - do{ - Thread.sleep(1000L); - pros = this.deviceAppService.findAllFormatProtocolFromDriver(); - }while (ObjectUtil.isEmpty(pros)); - Set keys = pros.keySet(); - Iterator var4 = keys.iterator(); - //代码执行一次 - while (var4.hasNext()) { - String key = (String) var4.next(); - List> list = (List) pros.get(key); - OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); - Iterator var8 = list.iterator(); - while (var8.hasNext()) { - List groupProtols = (List) var8.next(); - DeviceOpcProtocolRunable runable = new DeviceOpcProtocolRunable(); - runable.setProtocols(groupProtols); - runable.setOpcServer(opcServer); - this.executorService.submit(runable); + static Group getGroup(String opcCode) throws Exception { + OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); + return opcServerService.getServer(opcCode); + } + + static void submitTimeLimitTask(Runnable runnable, String opcCode) { + CompletableFuture future = CompletableFuture.runAsync(runnable, executorService); + +// try { +// future.get(10L, TimeUnit.SECONDS); +// } catch (InterruptedException var9) { +// Thread.currentThread().interrupt(); +// } catch (ExecutionException var10) { +// var10.printStackTrace(); +// } catch (TimeoutException var11) { +// itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { +// udw.setValue(key, (Object) null); +// }); +// canReadOpcValues = new ConcurrentHashMap<>(); +// System.out.println("opc设备同步器 任务执行超时,取消任务..."); +// future.cancel(true); +// } finally { +// canRefreshOpcEntity = true; +// if (opcCode != null) { +// canReadOpcValues.put(opcCode, true); +// } +// +// } + } + + private ExecutorService createThreadPool() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(32, 32, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("opc-sync")); + executor.allowCoreThreadTimeOut(true); + return executor; + } + + public void autoRun() { + OpcStartTag.is_run = true; + opcServersConfig = this.opcServerManageService.queryAllServerMap(); + executorService = this.createThreadPool(); + opcCodeOpcEntityMapping = new ConcurrentHashMap(); + itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { + udw.setValue(key, (Object) null); + }); + canRefreshOpcEntity = true; + canReadOpcValues.clear(); + + while (true) { + this.refreshOpcEntity(); + Iterator var1 = opcServersConfig.keySet().iterator(); + + while (var1.hasNext()) { + String opcCode = (String) var1.next(); + submitTimeLimitTask(() -> { + boolean in = false; + try { + if (canReadOpcValues.computeIfAbsent(opcCode, (key) -> true)) { + in = true; + canReadOpcValues.put(opcCode, false); + this.readOpcValues(opcCode); + } + } catch (Exception var3) { + var3.printStackTrace(); + } finally { + canRefreshOpcEntity = true; + if (opcCode != null && in) { + canReadOpcValues.put(opcCode, true); + } + } + }, opcCode); + } + + ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); + } + } + + private void readOpcValues(String opcCode) throws Exception { + synchronized (opcCode.intern()) { + OpcEntity opcEntity = (OpcEntity) opcCodeOpcEntityMapping.get(opcCode); + if (opcEntity != null) { + if (opcEntity.getItems().size() != 0) { + long begin = System.currentTimeMillis(); + if (log.isTraceEnabled()) { + log.trace("opc {} 开始计时{}", opcCode, begin); + } + + new HashMap(); + + Map itemStatus; + try { + itemStatus = opcEntity.readAll(); + } catch (Exception var15) { + itemStatus = opcEntity.readDividually(); + } + + long end = System.currentTimeMillis(); + long duration = end - begin; + if (log.isTraceEnabled()) { + log.trace("opc {} 读取耗时:{}", opcCode, duration); + } + + if (duration > 1000L) { + log.warn("opc {} 读取超时 : {}", opcCode, duration); + } + +// boolean allNull = itemStatus.entrySet().stream().map((map) -> { +// return OpcUtl.getValue((Item)map.getKey(), (ItemState)map.getValue()); +// }).allMatch(Objects::isNull); +// if (allNull) { +// opcEntity.getItems().clear(); +// } + + UnifiedDataAccessor udw = opcEntity.getUdw(); + + + Set items = itemStatus.keySet(); + Iterator var18 = items.iterator(); + + while (var18.hasNext()) { + Item item = (Item) var18.next(); + ItemState itemState = (ItemState) itemStatus.get(item); + Object nowValue = OpcUtl.getValue(item, itemState); + String itemId = item.getId(); + Object historyValue = udw.getValue(itemId); + if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && historyValue != null) { + log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); + } + if (!UnifiedDataAppService.isEquals(nowValue, historyValue)) { + OpcItemDto itemDto = (OpcItemDto) itemCodeOpcItemDtoMapping.get(itemId); + if (true) { + this.logItemChanged(itemId, udw, nowValue, itemDto); + } + udw.setValue(itemId, nowValue); + } + + } + } } + } + } + + private void refreshOpcEntity() { + if (canRefreshOpcEntity) { + canRefreshOpcEntity = false; + long now = System.currentTimeMillis(); + if (now - this.lastRefreshOpcEntityTime >= 20000L) { + this.lastRefreshOpcEntityTime = now; + submitTimeLimitTask(() -> { + try { + Map>> protocol = this.deviceAppService.findAllFormatProtocolFromDriver(); + Iterator var2 = protocol.entrySet().iterator(); - // 同步无光电设备信号 - //Map>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver(); - //List opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class); + while (var2.hasNext()) { + Entry>> stringListEntry = (Entry) var2.next(); + String opcCode = (String) stringListEntry.getKey(); + List> opcItemDtos = (List) stringListEntry.getValue(); + ((OpcEntity) opcCodeOpcEntityMapping.computeIfAbsent(opcCode, OpcEntity::new)).reload(opcItemDtos); + } + } catch (Exception var6) { + var6.printStackTrace(); + } finally { + canRefreshOpcEntity = true; + } - while (true) { - Thread.sleep(3000L); + }, (String) null); } } } - @Override + private void logMessage(String errorMessage) { + try { +// issueLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); +// businessLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]); + } catch (Exception var5) { + var5.printStackTrace(); + } + + } + public void after() { - isRun = false; - this.executorService.shutdownNow(); - this.executorService = Executors.newCachedThreadPool(); + OpcStartTag.is_run = false; + opcCodeOpcEntityMapping.values().forEach((opcEntity) -> { + opcEntity.cleanUdwCache(); + OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); + opcServerService.cleanGroups(opcEntity.getOpcCode()); + }); + opcCodeOpcEntityMapping = new ConcurrentHashMap(); + itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); + executorService.shutdownNow(); + } + + private void logItemChanged(String itemId, UnifiedDataAccessor accessor_value, Object value, OpcItemDto itemDto) { + ISysParamService paramService = SpringContextHolder.getBean(ISysParamService.class); + Object his = accessor_value.getValue(itemId); + List relate_items = itemDto.getRelate_items(); + if (relate_items != null && !relate_items.isEmpty()) { + StringBuilder sb = new StringBuilder(); + Iterator var8 = relate_items.iterator(); + + while (var8.hasNext()) { + String relate = (String) var8.next(); + Object obj = accessor_value.getValue(relate); + sb.append("key:" + relate + "value: " + obj + ";"); + } + if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { + // 存在上次点位值为null情况 则不记录日志 + if(!(his instanceof Float) && !(value instanceof Float)){ + log.info(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value)); +// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), +// String.valueOf(his), String.valueOf(value)); +// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); +// String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); +// if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ +// log.info("{}", JSON.toJSONString(luceneLogDto)); +// } + } + } + } else { + +// if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { +// if(!(his instanceof Float) && !(value instanceof Float)){ +// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), +// String.valueOf(his), String.valueOf(value)); +// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); +// log.info("{}", JSON.toJSONString(luceneLogDto)); +// } +// } + + if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { + log.info(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value)); +// if(!(his instanceof Float) && !(value instanceof Float)){ +// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), +// String.valueOf(his), String.valueOf(value)); +// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc()); +// String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue(); +// if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){ +// log.info("{}", JSON.toJSONString(luceneLogDto)); +// } +// } + } + + } + } + + static { + udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); + canReadOpcValues = new ConcurrentHashMap(); + opcCodeOpcEntityMapping = new ConcurrentHashMap(); + } + + public static boolean isNumeric(String str) { + return Pattern.compile("^[0-9]+$").matcher(str).matches(); } } diff --git a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java new file mode 100644 index 0000000..b291a56 --- /dev/null +++ b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcEntity.java @@ -0,0 +1,180 @@ +package org.nl.acs.opc; + +import org.nl.acs.opc.DeviceOpcSynchronizeAutoRun; +import org.nl.acs.opc.OpcConfig; +import org.nl.acs.opc.OpcItemDto; +import org.nl.acs.opc.OpcServerService; +import org.nl.acs.udw.UnifiedDataAccessor; +import org.nl.acs.udw.UnifiedDataAccessorFactory; +import org.openscada.opc.lib.da.Group; +import org.openscada.opc.lib.da.Item; +import org.openscada.opc.lib.da.ItemState; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +public class OpcEntity { + @Autowired + OpcServerService opcServerService; + private final UnifiedDataAccessor udw; + private Map> items; + private Map> someFailDevices; + private String opcCode; + + public OpcEntity(String opcCode) { + this.udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); + this.items = new ConcurrentHashMap(); + this.someFailDevices = new ConcurrentHashMap(); + this.opcCode = opcCode; + } + + public void reload(List> opcItemDtos) { + Map> itemCodes = new ConcurrentHashMap(); + (opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.groupingBy(OpcItemDto::getDevice_code))).forEach((deviceCodes, opcItemDtoList) -> { + itemCodes.put(deviceCodes, opcItemDtoList.stream().map(OpcItemDto::getItem_code).collect(Collectors.toList())); + }); + DeviceOpcSynchronizeAutoRun.itemCodeOpcItemDtoMapping.putAll((Map)opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.toMap(OpcItemDto::getItem_code, (obj) -> { + return obj; + },(k, v) -> k))); + if (this.items.size() == 0) { + itemCodes.values().stream().flatMap(Collection::stream).forEach((key) -> { + this.udw.setValue(key, (Object)null); + }); + this.addItemsIntoGroup(itemCodes); + } else { + if (this.someFailDevices.size() > 0) { + this.reAddDevices(); + } + + } + } + + private void reAddDevices() { + Map> addItems = new ConcurrentHashMap(); + StringBuilder err_message = new StringBuilder(); + this.someFailDevices.forEach((deviceCode, itemCodesList) -> { + itemCodesList.forEach((itemCode) -> { + try { + Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); + ((List)addItems.computeIfAbsent(deviceCode, (key) -> { + return new ArrayList(); + })).add(group.addItem(itemCode)); + } catch (Exception var6) { + err_message.append(itemCode).append(" 添加失败; "); + } + + }); + List deviceItems = (List)addItems.get(deviceCode); + if (deviceItems != null && deviceItems.size() == itemCodesList.size()) { + this.someFailDevices.remove(deviceCode); + } else if (itemCodesList.size() == 0) { + addItems.remove(deviceCode); + } else { + assert deviceItems != null; + + ((List)this.someFailDevices.get(deviceCode)).removeAll(deviceItems); + } + + synchronized(this.opcCode.intern()) { + this.items.putAll(addItems); + } + + if (err_message.length() > 0) { + String errMsg = err_message.toString(); + //this.log.warn("{}:{}", com.wxzd.wcs.opc.OpcConfig.resource_code, errMsg); + } + + }); + } + + private void addItemsIntoGroup(Map> itemCodes) { + try { + Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); + StringBuilder err_message = new StringBuilder(); + Map> items = new ConcurrentHashMap(); + itemCodes.forEach((deviceCode, itemCodesList) -> { + itemCodesList.forEach((itemCode) -> { + try { + ((List)items.computeIfAbsent(deviceCode, (key) -> { + return new ArrayList(); + })).add(group.addItem(itemCode)); + } catch (Exception var7) { + ((List)this.someFailDevices.computeIfAbsent(deviceCode, (key) -> { + return new ArrayList(); + })).add(itemCode); + this.udw.setValue(itemCode, (Object)null); + err_message.append(itemCode).append(" 添加失败; "); + } + + }); + List deviceItems = (List)items.get(deviceCode); + if (deviceItems != null && deviceItems.size() != itemCodesList.size()) { + items.remove(deviceCode); + this.someFailDevices.put(deviceCode, itemCodesList); + } + + }); + synchronized(this.opcCode.intern()) { + this.items = items; + } + + if (err_message.length() > 0) { + String errMsg = err_message.toString(); +// this.log.warn("{}:{}", OpcConfig.resource_code, errMsg); + } + } catch (Exception var8) { + var8.printStackTrace(); + } + + } + + public void cleanUdwCache() { + this.items.values().stream().flatMap(Collection::stream).map(Item::getId).forEach((key) -> { + this.udw.setValue(key, (Object)null); + }); + } + + public Map readAll() throws Exception { + return opcServerService.getServer(this.opcCode).read(true, (Item[])this.items.values().stream().flatMap(Collection::stream).toArray((x$0) -> { + return new Item[x$0]; + })); + } + + public Map readDividually() { + Map result = new HashMap(); + CompletableFuture[] futures = (CompletableFuture[])this.items.entrySet().stream().map((entry) -> { + return CompletableFuture.runAsync(() -> { + try { + Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); + result.putAll(group.read(true, (Item[])((List)entry.getValue()).toArray(new Item[0]))); + } catch (Exception var5) { + String deviceCode = (String)entry.getKey(); + // to do +// this.someFailDevices.put(deviceCode, ((List)entry.getValue()).stream().map(Item::getId).collect(Collectors.toList())); + this.items.remove(deviceCode); + } + + }, DeviceOpcSynchronizeAutoRun.executorService); + }).toArray((x$0) -> { + return new CompletableFuture[x$0]; + }); + CompletableFuture.allOf(futures).join(); + return result; + } + + + public UnifiedDataAccessor getUdw() { + return this.udw; + } + + public Map> getItems() { + return this.items; + } + + public String getOpcCode() { + return this.opcCode; + } +} diff --git a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java index 3e1f7a3..bd0bd37 100644 --- a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java +++ b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerService.java @@ -18,4 +18,6 @@ public interface OpcServerService { void writeInteger(String var1, ItemValue... var2); void clearServer(String var1); + + void cleanGroups(String var1); } diff --git a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java index 8695fbe..c518f97 100644 --- a/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java +++ b/acs/nladmin-system/nlsso-server/src/main/java/org/nl/acs/opc/OpcServerServiceImpl.java @@ -146,6 +146,24 @@ public class OpcServerServiceImpl implements OpcServerService, ApplicationAutoIn this.groups.remove(code); } + public void cleanGroups(String opcCode) { + Group group = (Group)this.groups.get(opcCode); + if (group != null) { + Server server = group.getServer(); + + try { + group.remove(); + } catch (JIException var5) { + var5.printStackTrace(); + } + + this.groups.remove(opcCode); + server.disconnect(); + this.servers.remove(opcCode); + } + + } + public void writeInteger(String code, ItemValue... values) { try { Group group = this.getServer(code);