4 changed files with 480 additions and 46 deletions
@ -1,83 +1,317 @@ |
|||
package org.nl.acs.opc; |
|||
|
|||
import cn.hutool.core.util.ObjectUtil; |
|||
import cn.hutool.core.util.StrUtil; |
|||
import com.alibaba.fastjson.JSON; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.apache.lucene.util.NamedThreadFactory; |
|||
import org.nl.acs.AcsConfig; |
|||
import org.nl.acs.auto.run.AbstractAutoRunnable; |
|||
import org.nl.acs.opc.service.dto.OpcServerManageDto; |
|||
import org.nl.acs.udw.UnifiedDataAccessor; |
|||
import org.nl.acs.udw.UnifiedDataAccessorFactory; |
|||
import org.nl.acs.udw.UnifiedDataAppService; |
|||
import org.nl.common.enums.LogTypeEnum; |
|||
import org.nl.config.SpringContextHolder; |
|||
import org.nl.system.service.param.ISysParamService; |
|||
import org.openscada.opc.lib.da.Group; |
|||
import org.openscada.opc.lib.da.Item; |
|||
import org.openscada.opc.lib.da.ItemState; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Iterator; |
|||
import java.util.List; |
|||
import java.util.Map; |
|||
import java.util.Set; |
|||
import java.util.concurrent.ExecutorService; |
|||
import java.util.concurrent.Executors; |
|||
import java.util.*; |
|||
import java.util.Map.Entry; |
|||
import java.util.concurrent.*; |
|||
import java.util.regex.Pattern; |
|||
|
|||
/** |
|||
* OPC设备同步启动 |
|||
*/ |
|||
@Component |
|||
@Slf4j |
|||
public class DeviceOpcSynchronizeAutoRun extends AbstractAutoRunnable { |
|||
|
|||
public static boolean isRun = false; |
|||
ExecutorService executorService = Executors.newCachedThreadPool(); |
|||
static boolean isRun = true; |
|||
@Autowired |
|||
private DeviceAppService deviceAppService; |
|||
@Autowired |
|||
private OpcServerManageService opcServerManageService; |
|||
// @Autowired
|
|||
// LuceneExecuteLogService lucene;
|
|||
|
|||
static ExecutorService executorService; |
|||
public static Map<String, OpcServerManageDto> opcServersConfig; |
|||
public static Map<String, OpcItemDto> itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); |
|||
|
|||
static boolean canRefreshOpcEntity = true; |
|||
private long lastRefreshOpcEntityTime; |
|||
static UnifiedDataAccessor udw; |
|||
private static Map<String, Boolean> canReadOpcValues; |
|||
private static volatile Map<String, OpcEntity> opcCodeOpcEntityMapping; |
|||
|
|||
public DeviceOpcSynchronizeAutoRun() { |
|||
this.lastRefreshOpcEntityTime = 0L; |
|||
} |
|||
|
|||
@Override |
|||
public String getCode() { |
|||
return DeviceOpcSynchronizeAutoRun.class.getSimpleName(); |
|||
} |
|||
|
|||
@Override |
|||
public String getName() { |
|||
return "opc设备同步器"; |
|||
} |
|||
|
|||
@Override |
|||
public void autoRun() throws Exception { |
|||
{ |
|||
isRun = true; |
|||
|
|||
Map<String, OpcServerManageDto> servers = this.opcServerManageService.queryAllServerMap(); |
|||
Map<String, List<List<OpcItemDto>>> pros; |
|||
do{ |
|||
Thread.sleep(1000L); |
|||
pros = this.deviceAppService.findAllFormatProtocolFromDriver(); |
|||
}while (ObjectUtil.isEmpty(pros)); |
|||
Set<String> keys = pros.keySet(); |
|||
Iterator var4 = keys.iterator(); |
|||
//代码执行一次
|
|||
while (var4.hasNext()) { |
|||
String key = (String) var4.next(); |
|||
List<List<OpcItemDto>> list = (List) pros.get(key); |
|||
OpcServerManageDto opcServer = (OpcServerManageDto) servers.get(key); |
|||
Iterator var8 = list.iterator(); |
|||
while (var8.hasNext()) { |
|||
List<OpcItemDto> groupProtols = (List) var8.next(); |
|||
DeviceOpcProtocolRunable runable = new DeviceOpcProtocolRunable(); |
|||
runable.setProtocols(groupProtols); |
|||
runable.setOpcServer(opcServer); |
|||
this.executorService.submit(runable); |
|||
static Group getGroup(String opcCode) throws Exception { |
|||
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); |
|||
return opcServerService.getServer(opcCode); |
|||
} |
|||
|
|||
static void submitTimeLimitTask(Runnable runnable, String opcCode) { |
|||
CompletableFuture<Void> future = CompletableFuture.runAsync(runnable, executorService); |
|||
|
|||
// try {
|
|||
// future.get(10L, TimeUnit.SECONDS);
|
|||
// } catch (InterruptedException var9) {
|
|||
// Thread.currentThread().interrupt();
|
|||
// } catch (ExecutionException var10) {
|
|||
// var10.printStackTrace();
|
|||
// } catch (TimeoutException var11) {
|
|||
// itemCodeOpcItemDtoMapping.keySet().forEach((key) -> {
|
|||
// udw.setValue(key, (Object) null);
|
|||
// });
|
|||
// canReadOpcValues = new ConcurrentHashMap<>();
|
|||
// System.out.println("opc设备同步器 任务执行超时,取消任务...");
|
|||
// future.cancel(true);
|
|||
// } finally {
|
|||
// canRefreshOpcEntity = true;
|
|||
// if (opcCode != null) {
|
|||
// canReadOpcValues.put(opcCode, true);
|
|||
// }
|
|||
//
|
|||
// }
|
|||
} |
|||
|
|||
// 同步无光电设备信号
|
|||
//Map<String, List<List<OpcItemDto>>> pros1 = this.deviceAppService.findAllFormatProtocolFromDriver();
|
|||
//List<DeviceDriver> opcDrivers = this.deviceAppService.findDeviceDriver(DeviceDriver.class);
|
|||
private ExecutorService createThreadPool() { |
|||
ThreadPoolExecutor executor = new ThreadPoolExecutor(32, 32, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory("opc-sync")); |
|||
executor.allowCoreThreadTimeOut(true); |
|||
return executor; |
|||
} |
|||
|
|||
public void autoRun() { |
|||
OpcStartTag.is_run = true; |
|||
opcServersConfig = this.opcServerManageService.queryAllServerMap(); |
|||
executorService = this.createThreadPool(); |
|||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
|||
itemCodeOpcItemDtoMapping.keySet().forEach((key) -> { |
|||
udw.setValue(key, (Object) null); |
|||
}); |
|||
canRefreshOpcEntity = true; |
|||
canReadOpcValues.clear(); |
|||
|
|||
while (true) { |
|||
Thread.sleep(3000L); |
|||
this.refreshOpcEntity(); |
|||
Iterator var1 = opcServersConfig.keySet().iterator(); |
|||
|
|||
while (var1.hasNext()) { |
|||
String opcCode = (String) var1.next(); |
|||
submitTimeLimitTask(() -> { |
|||
boolean in = false; |
|||
try { |
|||
if (canReadOpcValues.computeIfAbsent(opcCode, (key) -> true)) { |
|||
in = true; |
|||
canReadOpcValues.put(opcCode, false); |
|||
this.readOpcValues(opcCode); |
|||
} |
|||
} catch (Exception var3) { |
|||
var3.printStackTrace(); |
|||
} finally { |
|||
canRefreshOpcEntity = true; |
|||
if (opcCode != null && in) { |
|||
canReadOpcValues.put(opcCode, true); |
|||
} |
|||
} |
|||
}, opcCode); |
|||
} |
|||
|
|||
ThreadUtl.sleep((long) OpcConfig.synchronized_millisecond); |
|||
} |
|||
} |
|||
|
|||
private void readOpcValues(String opcCode) throws Exception { |
|||
synchronized (opcCode.intern()) { |
|||
OpcEntity opcEntity = (OpcEntity) opcCodeOpcEntityMapping.get(opcCode); |
|||
if (opcEntity != null) { |
|||
if (opcEntity.getItems().size() != 0) { |
|||
long begin = System.currentTimeMillis(); |
|||
if (log.isTraceEnabled()) { |
|||
log.trace("opc {} 开始计时{}", opcCode, begin); |
|||
} |
|||
|
|||
new HashMap(); |
|||
|
|||
Map<Item, ItemState> itemStatus; |
|||
try { |
|||
itemStatus = opcEntity.readAll(); |
|||
} catch (Exception var15) { |
|||
itemStatus = opcEntity.readDividually(); |
|||
} |
|||
|
|||
long end = System.currentTimeMillis(); |
|||
long duration = end - begin; |
|||
if (log.isTraceEnabled()) { |
|||
log.trace("opc {} 读取耗时:{}", opcCode, duration); |
|||
} |
|||
|
|||
if (duration > 1000L) { |
|||
log.warn("opc {} 读取超时 : {}", opcCode, duration); |
|||
} |
|||
|
|||
// boolean allNull = itemStatus.entrySet().stream().map((map) -> {
|
|||
// return OpcUtl.getValue((Item)map.getKey(), (ItemState)map.getValue());
|
|||
// }).allMatch(Objects::isNull);
|
|||
// if (allNull) {
|
|||
// opcEntity.getItems().clear();
|
|||
// }
|
|||
|
|||
UnifiedDataAccessor udw = opcEntity.getUdw(); |
|||
|
|||
|
|||
Set<Item> items = itemStatus.keySet(); |
|||
Iterator var18 = items.iterator(); |
|||
|
|||
while (var18.hasNext()) { |
|||
Item item = (Item) var18.next(); |
|||
ItemState itemState = (ItemState) itemStatus.get(item); |
|||
Object nowValue = OpcUtl.getValue(item, itemState); |
|||
String itemId = item.getId(); |
|||
Object historyValue = udw.getValue(itemId); |
|||
if (!ObjectUtl.isEquals(itemState.getQuality(), QualityTypeValue.OPC_QUALITY_GOOD) && historyValue != null) { |
|||
log.warn("opc 值不健康 item: {}, 状态: {}", itemId, itemState.getQuality()); |
|||
} |
|||
if (!UnifiedDataAppService.isEquals(nowValue, historyValue)) { |
|||
OpcItemDto itemDto = (OpcItemDto) itemCodeOpcItemDtoMapping.get(itemId); |
|||
if (true) { |
|||
this.logItemChanged(itemId, udw, nowValue, itemDto); |
|||
} |
|||
udw.setValue(itemId, nowValue); |
|||
} |
|||
|
|||
} |
|||
|
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void refreshOpcEntity() { |
|||
if (canRefreshOpcEntity) { |
|||
canRefreshOpcEntity = false; |
|||
long now = System.currentTimeMillis(); |
|||
if (now - this.lastRefreshOpcEntityTime >= 20000L) { |
|||
this.lastRefreshOpcEntityTime = now; |
|||
submitTimeLimitTask(() -> { |
|||
try { |
|||
Map<String, List<List<OpcItemDto>>> protocol = this.deviceAppService.findAllFormatProtocolFromDriver(); |
|||
Iterator var2 = protocol.entrySet().iterator(); |
|||
|
|||
while (var2.hasNext()) { |
|||
Entry<String, List<List<OpcItemDto>>> stringListEntry = (Entry) var2.next(); |
|||
String opcCode = (String) stringListEntry.getKey(); |
|||
List<List<OpcItemDto>> opcItemDtos = (List) stringListEntry.getValue(); |
|||
((OpcEntity) opcCodeOpcEntityMapping.computeIfAbsent(opcCode, OpcEntity::new)).reload(opcItemDtos); |
|||
} |
|||
} catch (Exception var6) { |
|||
var6.printStackTrace(); |
|||
} finally { |
|||
canRefreshOpcEntity = true; |
|||
} |
|||
|
|||
}, (String) null); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private void logMessage(String errorMessage) { |
|||
try { |
|||
// issueLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]);
|
|||
// businessLogger.setResource(OpcConfig.resource_code, OpcConfig.resource_name).setError(StringUtl.getString(100), "设备同步通信异常").log(errorMessage, new Object[0]);
|
|||
} catch (Exception var5) { |
|||
var5.printStackTrace(); |
|||
} |
|||
|
|||
} |
|||
|
|||
@Override |
|||
public void after() { |
|||
isRun = false; |
|||
this.executorService.shutdownNow(); |
|||
this.executorService = Executors.newCachedThreadPool(); |
|||
OpcStartTag.is_run = false; |
|||
opcCodeOpcEntityMapping.values().forEach((opcEntity) -> { |
|||
opcEntity.cleanUdwCache(); |
|||
OpcServerService opcServerService = SpringContextHolder.getBean(OpcServerService.class); |
|||
opcServerService.cleanGroups(opcEntity.getOpcCode()); |
|||
}); |
|||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
|||
itemCodeOpcItemDtoMapping = new ConcurrentHashMap(); |
|||
executorService.shutdownNow(); |
|||
} |
|||
|
|||
private void logItemChanged(String itemId, UnifiedDataAccessor accessor_value, Object value, OpcItemDto itemDto) { |
|||
ISysParamService paramService = SpringContextHolder.getBean(ISysParamService.class); |
|||
Object his = accessor_value.getValue(itemId); |
|||
List<String> relate_items = itemDto.getRelate_items(); |
|||
if (relate_items != null && !relate_items.isEmpty()) { |
|||
StringBuilder sb = new StringBuilder(); |
|||
Iterator var8 = relate_items.iterator(); |
|||
|
|||
while (var8.hasNext()) { |
|||
String relate = (String) var8.next(); |
|||
Object obj = accessor_value.getValue(relate); |
|||
sb.append("key:" + relate + "value: " + obj + ";"); |
|||
} |
|||
if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { |
|||
// 存在上次点位值为null情况 则不记录日志
|
|||
if(!(his instanceof Float) && !(value instanceof Float)){ |
|||
log.info(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value)); |
|||
// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1),
|
|||
// String.valueOf(his), String.valueOf(value));
|
|||
// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc());
|
|||
// String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue();
|
|||
// if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){
|
|||
// log.info("{}", JSON.toJSONString(luceneLogDto));
|
|||
// }
|
|||
} |
|||
} |
|||
} else { |
|||
|
|||
// if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) {
|
|||
// if(!(his instanceof Float) && !(value instanceof Float)){
|
|||
// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(), itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1),
|
|||
// String.valueOf(his), String.valueOf(value));
|
|||
// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc());
|
|||
// log.info("{}", JSON.toJSONString(luceneLogDto));
|
|||
// }
|
|||
// }
|
|||
|
|||
if (!itemDto.getItem_code().endsWith("heartbeat") && !itemDto.getItem_code().endsWith("time") && !itemDto.getItem_code().endsWith("consumption")) { |
|||
log.info(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1), String.valueOf(his), String.valueOf(value)); |
|||
// if(!(his instanceof Float) && !(value instanceof Float)){
|
|||
// LuceneLogDto luceneLogDto = new LuceneLogDto(itemDto.getOpc_server_code(), itemDto.getOpc_plc_code(),4, itemDto.getDevice_code(), itemDto.getItem_code().substring(itemDto.getItem_code().lastIndexOf(".") + 1),
|
|||
// String.valueOf(his), String.valueOf(value));
|
|||
// luceneLogDto.setLogType(LogTypeEnum.DEVICE_LOG.getDesc());
|
|||
// String logLevel = paramService.findByCode(AcsConfig.LOGLEVEL).getValue();
|
|||
// if(StrUtil.isNotEmpty(logLevel) && isNumeric(logLevel) && (luceneLogDto.getLog_level() >= Integer.parseInt(logLevel))){
|
|||
// log.info("{}", JSON.toJSONString(luceneLogDto));
|
|||
// }
|
|||
// }
|
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
static { |
|||
udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); |
|||
canReadOpcValues = new ConcurrentHashMap(); |
|||
opcCodeOpcEntityMapping = new ConcurrentHashMap(); |
|||
} |
|||
|
|||
public static boolean isNumeric(String str) { |
|||
return Pattern.compile("^[0-9]+$").matcher(str).matches(); |
|||
} |
|||
} |
|||
|
@ -0,0 +1,180 @@ |
|||
package org.nl.acs.opc; |
|||
|
|||
import org.nl.acs.opc.DeviceOpcSynchronizeAutoRun; |
|||
import org.nl.acs.opc.OpcConfig; |
|||
import org.nl.acs.opc.OpcItemDto; |
|||
import org.nl.acs.opc.OpcServerService; |
|||
import org.nl.acs.udw.UnifiedDataAccessor; |
|||
import org.nl.acs.udw.UnifiedDataAccessorFactory; |
|||
import org.openscada.opc.lib.da.Group; |
|||
import org.openscada.opc.lib.da.Item; |
|||
import org.openscada.opc.lib.da.ItemState; |
|||
import org.springframework.beans.factory.annotation.Autowired; |
|||
|
|||
import java.util.*; |
|||
import java.util.concurrent.CompletableFuture; |
|||
import java.util.concurrent.ConcurrentHashMap; |
|||
import java.util.stream.Collectors; |
|||
|
|||
public class OpcEntity { |
|||
@Autowired |
|||
OpcServerService opcServerService; |
|||
private final UnifiedDataAccessor udw; |
|||
private Map<String, List<Item>> items; |
|||
private Map<String, List<String>> someFailDevices; |
|||
private String opcCode; |
|||
|
|||
public OpcEntity(String opcCode) { |
|||
this.udw = UnifiedDataAccessorFactory.getAccessor(OpcConfig.udw_opc_value_key); |
|||
this.items = new ConcurrentHashMap(); |
|||
this.someFailDevices = new ConcurrentHashMap(); |
|||
this.opcCode = opcCode; |
|||
} |
|||
|
|||
public void reload(List<List<OpcItemDto>> opcItemDtos) { |
|||
Map<String, List<String>> itemCodes = new ConcurrentHashMap(); |
|||
(opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.groupingBy(OpcItemDto::getDevice_code))).forEach((deviceCodes, opcItemDtoList) -> { |
|||
itemCodes.put(deviceCodes, opcItemDtoList.stream().map(OpcItemDto::getItem_code).collect(Collectors.toList())); |
|||
}); |
|||
DeviceOpcSynchronizeAutoRun.itemCodeOpcItemDtoMapping.putAll((Map)opcItemDtos.stream().flatMap(Collection::stream).collect(Collectors.toMap(OpcItemDto::getItem_code, (obj) -> { |
|||
return obj; |
|||
},(k, v) -> k))); |
|||
if (this.items.size() == 0) { |
|||
itemCodes.values().stream().flatMap(Collection::stream).forEach((key) -> { |
|||
this.udw.setValue(key, (Object)null); |
|||
}); |
|||
this.addItemsIntoGroup(itemCodes); |
|||
} else { |
|||
if (this.someFailDevices.size() > 0) { |
|||
this.reAddDevices(); |
|||
} |
|||
|
|||
} |
|||
} |
|||
|
|||
private void reAddDevices() { |
|||
Map<String, List<Item>> addItems = new ConcurrentHashMap(); |
|||
StringBuilder err_message = new StringBuilder(); |
|||
this.someFailDevices.forEach((deviceCode, itemCodesList) -> { |
|||
itemCodesList.forEach((itemCode) -> { |
|||
try { |
|||
Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); |
|||
((List)addItems.computeIfAbsent(deviceCode, (key) -> { |
|||
return new ArrayList(); |
|||
})).add(group.addItem(itemCode)); |
|||
} catch (Exception var6) { |
|||
err_message.append(itemCode).append(" 添加失败; "); |
|||
} |
|||
|
|||
}); |
|||
List<Item> deviceItems = (List)addItems.get(deviceCode); |
|||
if (deviceItems != null && deviceItems.size() == itemCodesList.size()) { |
|||
this.someFailDevices.remove(deviceCode); |
|||
} else if (itemCodesList.size() == 0) { |
|||
addItems.remove(deviceCode); |
|||
} else { |
|||
assert deviceItems != null; |
|||
|
|||
((List)this.someFailDevices.get(deviceCode)).removeAll(deviceItems); |
|||
} |
|||
|
|||
synchronized(this.opcCode.intern()) { |
|||
this.items.putAll(addItems); |
|||
} |
|||
|
|||
if (err_message.length() > 0) { |
|||
String errMsg = err_message.toString(); |
|||
//this.log.warn("{}:{}", com.wxzd.wcs.opc.OpcConfig.resource_code, errMsg);
|
|||
} |
|||
|
|||
}); |
|||
} |
|||
|
|||
private void addItemsIntoGroup(Map<String, List<String>> itemCodes) { |
|||
try { |
|||
Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); |
|||
StringBuilder err_message = new StringBuilder(); |
|||
Map<String, List<Item>> items = new ConcurrentHashMap(); |
|||
itemCodes.forEach((deviceCode, itemCodesList) -> { |
|||
itemCodesList.forEach((itemCode) -> { |
|||
try { |
|||
((List)items.computeIfAbsent(deviceCode, (key) -> { |
|||
return new ArrayList(); |
|||
})).add(group.addItem(itemCode)); |
|||
} catch (Exception var7) { |
|||
((List)this.someFailDevices.computeIfAbsent(deviceCode, (key) -> { |
|||
return new ArrayList(); |
|||
})).add(itemCode); |
|||
this.udw.setValue(itemCode, (Object)null); |
|||
err_message.append(itemCode).append(" 添加失败; "); |
|||
} |
|||
|
|||
}); |
|||
List<Item> deviceItems = (List)items.get(deviceCode); |
|||
if (deviceItems != null && deviceItems.size() != itemCodesList.size()) { |
|||
items.remove(deviceCode); |
|||
this.someFailDevices.put(deviceCode, itemCodesList); |
|||
} |
|||
|
|||
}); |
|||
synchronized(this.opcCode.intern()) { |
|||
this.items = items; |
|||
} |
|||
|
|||
if (err_message.length() > 0) { |
|||
String errMsg = err_message.toString(); |
|||
// this.log.warn("{}:{}", OpcConfig.resource_code, errMsg);
|
|||
} |
|||
} catch (Exception var8) { |
|||
var8.printStackTrace(); |
|||
} |
|||
|
|||
} |
|||
|
|||
public void cleanUdwCache() { |
|||
this.items.values().stream().flatMap(Collection::stream).map(Item::getId).forEach((key) -> { |
|||
this.udw.setValue(key, (Object)null); |
|||
}); |
|||
} |
|||
|
|||
public Map<Item, ItemState> readAll() throws Exception { |
|||
return opcServerService.getServer(this.opcCode).read(true, (Item[])this.items.values().stream().flatMap(Collection::stream).toArray((x$0) -> { |
|||
return new Item[x$0]; |
|||
})); |
|||
} |
|||
|
|||
public Map<Item, ItemState> readDividually() { |
|||
Map<Item, ItemState> result = new HashMap(); |
|||
CompletableFuture[] futures = (CompletableFuture[])this.items.entrySet().stream().map((entry) -> { |
|||
return CompletableFuture.runAsync(() -> { |
|||
try { |
|||
Group group = DeviceOpcSynchronizeAutoRun.getGroup(this.opcCode); |
|||
result.putAll(group.read(true, (Item[])((List)entry.getValue()).toArray(new Item[0]))); |
|||
} catch (Exception var5) { |
|||
String deviceCode = (String)entry.getKey(); |
|||
// to do
|
|||
// this.someFailDevices.put(deviceCode, ((List)entry.getValue()).stream().map(Item::getId).collect(Collectors.toList()));
|
|||
this.items.remove(deviceCode); |
|||
} |
|||
|
|||
}, DeviceOpcSynchronizeAutoRun.executorService); |
|||
}).toArray((x$0) -> { |
|||
return new CompletableFuture[x$0]; |
|||
}); |
|||
CompletableFuture.allOf(futures).join(); |
|||
return result; |
|||
} |
|||
|
|||
|
|||
public UnifiedDataAccessor getUdw() { |
|||
return this.udw; |
|||
} |
|||
|
|||
public Map<String, List<Item>> getItems() { |
|||
return this.items; |
|||
} |
|||
|
|||
public String getOpcCode() { |
|||
return this.opcCode; |
|||
} |
|||
} |
Loading…
Reference in new issue