Web3j实战,如何高效监听以太坊新区块与交易事件
在以太坊区块链的世界里,实时获取链上数据,如新区块的诞生、特定交易的执行、智能合约事件的触发等,对于构建去中心化应用(DApp)、数据分析、风险监控等场景至关重要,Web3j,作为Java和Android平台上最受欢迎的以太坊交互库,提供了强大而便捷的功能来实现这些需求,本文将重点介绍如何使用Web3j来监听以太坊的新区块事件以及其他相关事件。
为什么需要监听以太坊事件
在深入技术细节之前,我们先理解一下为什么监听事件如此重要:
- 实时响应:DApp需要根据链上事件(如转账、NFT铸造、投票等)实时更新UI或执行后续逻辑。
- 数据同步:对于需要与以太坊链保持数据同步的应用,监听新区块是最基础的数据获取方式。
- 智能合约交互:智能合约的
event是向链外通知状态变化的重要手段,监听这些事件可以获取合约操作的详细信息。 - 监控与告警:监控系统可以监听特定交易或异常事件,及时发出告警。
Web3j简介
Web3j是一个轻量级、响应式的Java库,用于与以太坊节点进行交互,它支持以太坊的所有核心功能,包括:
- 账户管理(创建、解锁、发送交易)
- 智能合约部署与交互(ABI, 二进制编码)
- 区块链数据查询(区块、交易、余额等)
- 事件监听(Event Subscriptions)
Web3j提供了两种主要的监听方式:
- 同步监听(Blocking):线程会阻塞,直到收到事件或超时。
- 异步监听(Non-blocking/Reactive):基于回调或响应式编程(如RxJava),更适合现代应用架构。
监听新区块事件(New Block Events)
监听新区块是最常见的需求之一,每当矿工挖出一个新区块,节点就会广播这个事件,使用Web3j监听新区块非常简单。
1 异步监听(推荐)
异步监听不会阻塞调用线程,用户体验更好。
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.http.HttpService;
import java.util.concurrent.Flow;
public class NewBlockListener {
public static void main(String[] args) {
// 1. 创建Web3j实例,连接到以太坊节点(可以是Infura、Alchemy或本地节点)
Web3j web3j = Web3j.build(new HttpService("https://mainnet.infura.io/v3/YOUR_PROJECT_ID"));
// 2. 创建过滤器,监听所有新区块
// EthFilter的参数:fromBlock, toBlock, address (这里不需要特定地址)
EthFilter filter = new EthFilter(
DefaultBlockParameterName.EARLIEST, // 从最早区块开始监听(或指定特定区块号)
DefaultBlockParameterName.LATEST, // 监听最新区块
n
ull); // null表示监听所有新区块,不限制特定合约地址
// 3. 订阅新区块事件
web3j.blockFlowable(filter).subscribe(block -> {
// 4. 处理接收到的新区块
EthBlock.Block actualBlock = block.getBlock();
System.out.println("新区块监听触发!区块号: " + actualBlock.getNumber() +
", 哈希: " + actualBlock.getHash() +
", 时间戳: " + actualBlock.getTimestamp() +
", 交易数量: " + actualBlock.getTransactions().size());
}, error -> {
// 错误处理
System.err.println("监听新区块时发生错误: " + error.getMessage());
}, () -> {
// 监听结束(通常不会发生,除非手动取消订阅)
System.out.println("新区块监听结束");
});
System.out.println("新区块监听已启动,等待新区块...");
// 为了保持程序运行以便接收事件,可以添加一个循环或使用CountDownLatch
// 注意:在实际应用中,这通常在后台线程中运行
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2 同步监听
同步监听会阻塞当前线程,直到收到事件或超时,在某些简单脚本或特定场景下可能使用。
// ... (web3j初始化同上)
EthFilter filter = new EthFilter(
DefaultBlockParameterName.LATEST, // 从最新区块开始监听下一个
DefaultBlockParameterName.LATEST,
null);
try {
EthBlock block = web3j.ethNewBlockFilter(filter).send().getFilterChanges()
.stream()
.findFirst()
.orElse(null); // 简化示例,实际可能需要循环等待
if (block != null) {
System.out.println("同步监听收到新区块: " + block.getBlock().getNumber());
}
} catch (Exception e) {
e.printStackTrace();
}
注意:同步监听在实际应用中较少用于持续的事件流,更适合一次性查询或轮询。
监听智能合约事件
除了新区块,监听智能合约的事件是Web3j的另一核心功能,假设我们有一个简单的智能合约,它发出一个Transfer事件。
1 智能合约示例(Solidity)
pragma solidity ^0.8.0;
contract SimpleToken {
event Transfer(address indexed from, address indexed to, uint256 value);
function transfer(address to, uint256 amount) public {
// 模拟转账逻辑
emit Transfer(msg.sender, to, amount);
}
}
2 使用Web3j监听合约事件
监听合约事件需要合约的ABI(Application Binary Interface)和合约地址。
import org.web3j.abi.EventEncoder;
import org.web3j.abi.EventValues;
import org.web3j.abi.TypeReference;
import org.web3j.abi.datatypes.Address;
import org.web3j.abi.datatypes.Event;
import org.web3j.abi.datatypes.Uint;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.tx.gas.ContractGasProvider;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
public class ContractEventListener {
// 合约ABI (简化版,实际项目中应从合约编译文件获取)
private static final String TRANSFER_EVENT_ABI = "event Transfer(address indexed from, address indexed to, uint256 value)";
// 合约地址
private static final String CONTRACT_ADDRESS = "0x..."; // 替换为实际合约地址
public static void main(String[] args) {
Web3j web3j = Web3j.build(new HttpService("https://mainnet.infura.io/v3/YOUR_PROJECT_ID"));
// 1. 定义事件对象
Event transferEvent = new Event("Transfer",
Arrays.asList(
new TypeReference<Address>(true) {}, // indexed
new TypeReference<Address>(true) {}, // indexed
new TypeReference<Uint>() {} // not indexed
),
null);
// 2. 创建过滤器,指定合约地址和事件签名
// 事件签名是事件签名的Keccak-256哈希的前4个字节
// 也可以直接使用EventEncoder.encode(transferEvent)获取完整签名
EthFilter filter = new EthFilter(
DefaultBlockParameterName.EARLIEST,
DefaultBlockParameterName.LATEST,
CONTRACT_ADDRESS)
.addSingleTopic(EventEncoder.encode(transferEvent));
// 3. 订阅事件
web3j.ethLogFlowable(filter).subscribe(log -> {
// 4. 解析事件日志
EventValues eventValues = web3j.abi().decodeEvent(
transferEvent,
log); // log 是RawTransaction的Log对象
Address from = (Address) eventValues.getValues().get(0).getValue();
Address to = (Address) eventValues.getValues().get(1).getValue();
Uint value = (Uint) eventValues.getValues().get(2).getValue();
System.out.println("合约事件监听触发!Transfer事件:");
System.out.println("From: " + from.getValue());
System.out.println("To: " + to.getValue());
System.out.println("Value: " + value.getValue());
}, error -> {
System.err.println("监听合约事件时发生错误: " + error.getMessage());
}, () -> {
System.out.println("合约事件监听结束");
});
System.out.println("合约事件监听已启动,等待事件...");
// 保持程序运行