SpringBoot與Pinot整合,實現廣告實時競價決策系統
作者:Java知識日歷
Apache Pinot是一個開源的實時分布式OLAP(Online Analytical Processing,聯機分析處理)數據存儲系統,專為低延遲、高并發的分析查詢而設計,使得基于海量實時數據的快速分析和決策(如程序化廣告競價)成為可能。
Apache Pinot是一個開源的實時分布式OLAP(Online Analytical Processing,聯機分析處理)數據存儲系統,專為低延遲、高并發的分析查詢而設計,使得基于海量實時數據的快速分析和決策(如程序化廣告競價)成為可能。
我們為什么選擇Apache Pinot?
- 確保競價決策的實時性: 為了做出最優決策,系統必須能迅速查詢到最新的歷史性能數據,如特定用戶群體、地理位置、設備類型下的平均CPC(每次點擊成本)、CTR(點擊率)等。Pinot正是為此類低延遲(毫秒級)分析查詢而設計。它能確保在競價決策的關鍵路徑上,數據查詢不會成為瓶頸。相比之下,傳統的Hadoop、Spark SQL或OLTP數據庫可能無法滿足如此嚴苛的延遲要求。
- 保障系統的高可用與高并發: 大型DSP每天需要處理數億甚至數十億次的競價請求,每個競價請求都可能觸發一次或多次對歷史數據的查詢,這帶來了巨大的查詢并發壓力。Pinot的分布式架構和優化設計使其能夠高效地處理高并發的查詢請求,保證系統在流量高峰時依然穩定可靠。
- 利用最新的數據: 廣告效果是動態變化的,今天的策略可能明天就不再有效。系統需要基于最新的數據(分鐘級甚至秒級更新)進行決策。Pinot能直接從Kafka等消息隊列中消費實時事件流(如廣告展示、點擊、轉化數據),近乎實時地將這些數據索引并可供查詢,確保決策依據的數據是“熱乎的”。
- 高效執行復雜的分析聚合查詢: Pinot采用列式存儲和多種索引技術,對這類分析查詢進行了深度優化,查詢性能遠超傳統的行式數據庫。Pinot的Schema設計和預聚合功能也進一步加速特定的查詢模式。
代碼實操
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-java-client</artifactId>
<version>${pinot.version}</version>
</dependency>application.yml
server:
port: 8080
# Apache Pinot相關配置
pinot:
controller:
host: localhost # Pinot Controller的主機地址
port: 9000 # Pinot Controller的端口
broker:
# Broker地址列表,用于負載均衡或高可用
hosts:
- localhost:8009
connection:
# 查詢超時時間(毫秒),防止長時間查詢阻塞
query-timeout-ms: 30000
# 建立連接的超時時間(毫秒)
connection-timeout-ms: 5000
# Socket讀取超時時間(毫秒)
socket-timeout-ms: 10000
# 默認數據庫和表(如果查詢中未指定)
default:
database: defaultDatabase
table: defaultTable
# 日志級別配置
logging:
level:
# 為本項目包設置DEBUG級別,便于查看詳細日志
com.example.pinotdemo: DEBUG
# 為Pinot客戶端設置INFO級別,查看連接和查詢日志
org.apache.pinot.client: INFOPinot連接配置類
package com.example.pinotdemo.config;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.PinotClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.MalformedURLException;
import java.util.List;
/**
* Pinot連接配置類
* 用于創建和管理Pinot連接的Spring Bean
*/
@Configuration
public class PinotConfig {
private static final Logger logger = LoggerFactory.getLogger(PinotConfig.class);
// 從application.yml中注入Broker主機列表
@Value("${pinot.broker.hosts}")
private List<String> brokerHosts;
// 從application.yml中注入Controller主機和端口(雖然連接Broker,但有時需要Controller信息)
@Value("${pinot.controller.host}")
private String controllerHost;
@Value("${pinot.controller.port}")
private int controllerPort;
// 從application.yml中注入各種超時配置
@Value("${pinot.connection.query-timeout-ms:30000}")
private int queryTimeoutMs;
@Value("${pinot.connection.connection-timeout-ms:5000}")
private int connectionTimeoutMs;
@Value("${pinot.connection.socket-timeout-ms:10000}")
private int socketTimeoutMs;
/**
* 定義一個名為pinotConnection的Bean
* @return Pinot Connection對象
* @throws MalformedURLException 如果Broker URL格式錯誤
* @throws PinotClientException 如果連接Pinot失敗
*/
@Bean
public Connection pinotConnection() throws MalformedURLException, PinotClientException {
// 將主機列表轉換為逗號分隔的字符串,用于構建JDBC URL
String brokerUrl = String.join(",", brokerHosts);
logger.info("正在初始化Pinot連接到Brokers: {}", brokerUrl);
// 構建Pinot的JDBC連接URL
String url = "jdbc:pinot://" + brokerUrl;
// 使用Pinot客戶端庫創建連接
Connection connection = Connection.fromUrl(url);
// 記錄超時配置
logger.debug("查詢超時: {}ms", queryTimeoutMs);
logger.debug("連接超時: {}ms", connectionTimeoutMs);
logger.debug("Socket超時: {}ms", socketTimeoutMs);
logger.info("Pinot連接初始化成功。");
return connection;
}
}Pinot Service
package com.example.pinotdemo.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.client.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Pinot服務類
* 封裝了與Pinot數據庫交互的具體邏輯
*/
@Service
public class PinotService {
private static final Logger logger = LoggerFactory.getLogger(PinotService.class);
@Autowired
private Connection pinotConnection;
@Autowired
private ObjectMapper objectMapper;
/**
* 執行原始SQL查詢
* 這是一個通用方法,可以執行任何有效的Pinot SQL查詢
*
* @param sql 要執行的SQL查詢語句
* @return 查詢結果,以List<Map<String, Object>>格式返回,其中Map代表一行數據
*/
public List<Map<String, Object>> executeQuery(String sql) {
logger.debug("正在執行Pinot查詢: {}", sql);
List<Map<String, Object>> results = new ArrayList<>();
try {
// 創建一個查詢請求對象
Request request = new Request(sql);
// 通過連接執行查詢,返回ResultSetGroup(可能包含多個結果集,但通常只有一個)
ResultSetGroup resultSetGroup = pinotConnection.execute(request);
// 遍歷可能存在的多個結果集
for (int i = 0; i < resultSetGroup.getResultSetCount(); i++) {
var resultSet = resultSetGroup.getResultSet(i);
// 獲取列名
int columnCount = resultSet.getColumnCount();
String[] columnNames = new String[columnCount];
for (int j = 0; j < columnCount; j++) {
columnNames[j] = resultSet.getColumnName(j);
}
while (resultSet.next()) {
Map<String, Object> row = new HashMap<>();
for (int j = 0; j < columnCount; j++) {
String columnName = columnNames[j];
Object value = resultSet.getObject(j);
row.put(columnName, value);
}
results.add(row);
}
}
logger.info("查詢返回了 {} 行數據。", results.size());
} catch (Exception e) {
logger.error("執行Pinot查詢時出錯: {}", sql, e);
throw new RuntimeException("執行Pinot查詢失敗: " + e.getMessage(), e);
}
return results;
}
/**
* 查詢廣告性能數據的特定方法
* 演示如何構建一個具體的業務查詢
*
* @param country 國家篩選條件
* @param deviceType 設備類型篩選條件
* @param adSlot 廣告位篩選條件
* @return 包含性能指標的Map列表
*/
public List<Map<String, Object>> getAdPerformanceData(String country, String deviceType, String adSlot) {
// 構建SQL查詢語句,使用聚合函數SUM, AVG等來計算指標
String sql = String.format(
"SELECT sum(impressions) AS total_impressions, " + // 計算總展示次數
" sum(clicks) AS total_clicks, " + // 計算總點擊次數
" sum(spend) AS total_spend, " + // 計算總花費
" AVG(cpc) AS avg_cpc " + // 計算平均CPC
"FROM ad_performance_table " + // 從廣告性能表查詢
"WHERE country = '%s' AND device_type = '%s' AND ad_slot = '%s' " + // 應用篩選條件
"AND timeColumn > ago('1d')",
country, deviceType, adSlot
);
logger.info("正在獲取廣告性能數據 - 國家: {}, 設備: {}, 廣告位: {}", country, deviceType, adSlot);
return executeQuery(sql);
}
/**
* 查詢實時指標的示例方法
* 可用于實時儀表盤或告警
*/
public List<Map<String, Object>> getRealTimeMetrics() {
// 查詢最近5分鐘內的事件類型計數
String sql = "SELECT event_type, count(*) AS event_count FROM streaming_events_table WHERE timeColumn > ago('5m') GROUP BY event_type";
logger.info("正在獲取最近5分鐘的實時指標。");
return executeQuery(sql);
}
}Pinot Controller
package com.example.pinotdemo.controller;
import com.example.pinotdemo.service.PinotService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/api/pinot")
public class PinotController {
private static final Logger logger = LoggerFactory.getLogger(PinotController.class);
@Autowired
private PinotService pinotService;
/**
* 接收SQL查詢請求的API端點
* @param sql 通過URL參數傳遞的SQL查詢語句
* @return 查詢結果或錯誤響應
*/
@GetMapping("/query")
public ResponseEntity<List<Map<String, Object>>> runQuery(@RequestParam String sql) {
logger.info("收到查詢請求: {}", sql);
try {
List<Map<String, Object>> results = pinotService.executeQuery(sql);
return ResponseEntity.ok(results);
} catch (Exception e) {
logger.error("處理查詢請求時出錯", e);
return ResponseEntity.badRequest().build();
}
}
/**
* 獲取廣告性能數據的API端點
* 這是一個更具體的業務接口
*/
@GetMapping("/ad-performance")
public ResponseEntity<List<Map<String, Object>>> getAdPerformance(
@RequestParam String country, // 從URL參數獲取國家
@RequestParam String deviceType,// 從URL參數獲取設備類型
@RequestParam String adSlot) { // 從URL參數獲取廣告位
logger.info("收到廣告性能請求 - 國家: {}, 設備: {}, 廣告位: {}", country, deviceType, adSlot);
try {
List<Map<String, Object>> results = pinotService.getAdPerformanceData(country, deviceType, adSlot);
return ResponseEntity.ok(results);
} catch (Exception e) {
logger.error("處理廣告性能請求時出錯", e);
return ResponseEntity.badRequest().build();
}
}
/**
* 獲取實時指標的API端點
*/
@GetMapping("/real-time-metrics")
public ResponseEntity<List<Map<String, Object>>> getRealTimeMetrics() {
logger.info("收到實時指標請求");
try {
List<Map<String, Object>> results = pinotService.getRealTimeMetrics();
return ResponseEntity.ok(results);
} catch (Exception e) {
logger.error("處理實時指標請求時出錯", e);
return ResponseEntity.badRequest().build();
}
}
/**
* 健康檢查端點
*/
@GetMapping("/health")
public ResponseEntity<Map<String, String>> health() {
// 返回簡單的健康狀態信息
Map<String, String> status = Map.of("status", "UP", "component", "Pinot Integration Demo");
return ResponseEntity.ok(status);
}
}BidRequest
package com.example.pinotdemo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* 競價請求數據傳輸對象 (Data Transfer Object)
* 定義了從廣告交易平臺(ADX)接收到的競價請求的數據結構
* 使用@JsonProperty注解映射JSON字段名到Java屬性
*/
public class BidRequest {
@JsonProperty("request_id")
private String requestId; // 競價請求的唯一標識符
@JsonProperty("user_agent")
private String userAgent; // 用戶瀏覽器的User-Agent字符串
@JsonProperty("ip")
private String ip; // 用戶的IP地址
@JsonProperty("device_type")
private String deviceType; // 設備類型 (e.g., mobile, desktop, tablet)
@JsonProperty("os")
private String os; // 操作系統 (e.g., iOS, Android, Windows)
@JsonProperty("browser")
private String browser; // 瀏覽器名稱 (e.g., Chrome, Safari)
@JsonProperty("country")
private String country; // 用戶所在國家
@JsonProperty("city")
private String city; // 用戶所在城市
@JsonProperty("ad_slot")
private String adSlot; // 廣告位標識 (e.g., banner_top, video_pre)
@JsonProperty("floor_price")
private double floorPrice; // 廣告主設定的最低出價(底價)
public BidRequest() {}
public BidRequest(String requestId, String userAgent, String ip, String deviceType, String os, String browser, String country, String city, String adSlot, double floorPrice) {
this.requestId = requestId;
this.userAgent = userAgent;
this.ip = ip;
this.deviceType = deviceType;
this.os = os;
this.browser = browser;
this.country = country;
this.city = city;
this.adSlot = adSlot;
this.floorPrice = floorPrice;
}
public String getRequestId() { return requestId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public String getUserAgent() { return userAgent; }
public void setUserAgent(String userAgent) { this.userAgent = userAgent; }
public String getIp() { return ip; }
public void setIp(String ip) { this.ip = ip; }
public String getDeviceType() { return deviceType; }
public void setDeviceType(String deviceType) { this.deviceType = deviceType; }
public String getOs() { return os; }
public void setOs(String os) { this.os = os; }
public String getBrowser() { return browser; }
public void setBrowser(String browser) { this.browser = browser; }
public String getCountry() { return country; }
public void setCountry(String country) { this.country = country; }
public String getCity() { return city; }
public void setCity(String city) { this.city = city; }
public String getAdSlot() { return adSlot; }
public void setAdSlot(String adSlot) { this.adSlot = adSlot; }
public double getFloorPrice() { return floorPrice; }
public void setFloorPrice(double floorPrice) { this.floorPrice = floorPrice; }
}BidResponse
package com.example.pinotdemo.dto;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* 競價響應數據傳輸對象
* 定義了發送給廣告交易平臺(ADX)的競價響應的數據結構
* 使用@JsonProperty注解映射JSON字段名到Java屬性
*/
public class BidResponse {
@JsonProperty("request_id")
private String requestId; // 對應的競價請求ID
@JsonProperty("bid")
private boolean bid; // 是否出價 (true: 出價, false: 不出價)
@JsonProperty("bid_price")
private double bidPrice; // 出價金額
@JsonProperty("creative_id")
private String creativeId; // 如果出價成功,關聯的廣告創意ID
@JsonProperty("ad_id")
private String adId; // 如果出價成功,關聯的廣告ID
public BidResponse() {}
public BidResponse(String requestId, boolean bid, double bidPrice, String creativeId, String adId) {
this.requestId = requestId;
this.bid = bid;
this.bidPrice = bidPrice;
this.creativeId = creativeId;
this.adId = adId;
}
public String getRequestId() { return requestId; }
public boolean isBid() { return bid; }
public double getBidPrice() { return bidPrice; }
public String getCreativeId() { return creativeId; }
public String getAdId() { return adId; }
public void setRequestId(String requestId) { this.requestId = requestId; }
public void setBid(boolean bid) { this.bid = bid; }
public void setBidPrice(double bidPrice) { this.bidPrice = bidPrice; }
public void setCreativeId(String creativeId) { this.creativeId = creativeId; }
public void setAdId(String adId) { this.adId = adId; }
}Bidding Controller
package com.example.pinotdemo.controller;
import com.example.pinotdemo.dto.BidRequest;
import com.example.pinotdemo.dto.BidResponse;
import com.example.pinotdemo.service.BiddingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
/**
* 競價控制器
* 處理來自廣告交易平臺(ADX)的實時競價(RTB)請求
*/
@RestController
@RequestMapping("/api/bidding")
public class BiddingController {
private static final Logger logger = LoggerFactory.getLogger(BiddingController.class);
@Autowired
private BiddingService biddingService;
/**
* 處理競價請求的API端點
* @param bidRequest 包含競價信息的JSON請求體
* @return 包含競價決策結果的JSON響應
*/
@PostMapping("/bid")
public ResponseEntity<BidResponse> handleBidRequest(@RequestBody BidRequest bidRequest) {
logger.info("收到競價請求: {}", bidRequest.getRequestId());
try {
// 調用服務層處理競價邏輯
BidResponse response = biddingService.processBidRequest(bidRequest);
logger.info("處理完競價請求: {}. 出價: {}, 價格: {}", bidRequest.getRequestId(), response.isBid(), response.getBidPrice());
return ResponseEntity.ok(response);
} catch (Exception e) {
logger.error("處理競價請求時出錯: {}", bidRequest.getRequestId(), e);
BidResponse errorResponse = new BidResponse(bidRequest.getRequestId(), false, 0.0, null, null);
return ResponseEntity.ok(errorResponse);
}
}
/**
* 獲取競價決策日志的API端點(示例)
* 這個端點可以查詢存儲在Pinot中的歷史競價決策記錄
*/
@GetMapping("/decision-logs")
public ResponseEntity<List<Map<String, Object>>> getDecisionLogs() {
logger.info("正在從Pinot獲取競價決策日志...");
// 示例:SELECT * FROM bidding_decisions_log WHERE timestamp > ago('1h')
// 實現取決于如何在Pinot中存儲決策日志
// return ResponseEntity.ok(pinotService.executeQuery("SELECT * FROM ..."));
return ResponseEntity.ok(List.of());
}
}Bidding Service
package com.example.pinotdemo.service;
import com.example.pinotdemo.dto.BidRequest;
import com.example.pinotdemo.dto.BidResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
/**
* 競價服務類
* 核心業務邏輯層,負責根據競價請求和Pinot中的實時數據做出競價決策
*/
@Service
public class BiddingService {
private static final Logger logger = LoggerFactory.getLogger(BiddingService.class);
@Autowired
private PinotService pinotService;
private final Random random = new Random();
/**
* 處理競價請求的主要方法
* @param request 來自ADX的競價請求對象
* @return 競價決策響應對象
*/
public BidResponse processBidRequest(BidRequest request) {
String requestId = request.getRequestId();
String country = request.getCountry();
String deviceType = request.getDeviceType();
String adSlot = request.getAdSlot();
double floorPrice = request.getFloorPrice();
logger.info("正在處理競價請求: ID={}, 國家={}, 設備={}, 廣告位={}, 底價={}", requestId, country, deviceType, adSlot, floorPrice);
// 1. 查詢Pinot獲取相關的實時性能數據
// 調用PinotService中的方法,該方法會執行SQL查詢并返回結果
List<Map<String, Object>> performanceData = pinotService.getAdPerformanceData(country, deviceType, adSlot);
// 2. 分析查詢到的數據,做出決策
// 從查詢結果中提取關鍵指標,例如平均CPC (Cost Per Click)
Optional<Double> avgCpcOpt = performanceData.stream()
.findFirst() // 假設聚合查詢只返回一行數據
.map(row -> (Double) row.get("avg_cpc")); // 獲取"avg_cpc"列的值
if (!avgCpcOpt.isPresent() || avgCpcOpt.get() == null) {
logger.warn("未找到請求 {} 的歷史 avg_cpc 數據,使用默認策略。", requestId);
// 如果沒有歷史數據,執行默認策略
return createDefaultBidResponse(requestId, floorPrice);
}
double avgCpc = avgCpcOpt.get();
logger.debug("為請求 {} 計算出的 avg_cpc: {}", requestId, avgCpc);
// 根據策略(例如:avg_cpc * 因子,同時尊重底價)計算出價
double calculatedBidPrice = calculateBidPrice(avgCpc, floorPrice);
// 3. 決定是否出價以及出價金額
if (calculatedBidPrice > floorPrice) {
// 決定出價
String creativeId = selectCreativeId(request); // 選擇合適的廣告創意ID
String adId = deriveAdId(creativeId); // 根據創意ID推導廣告ID
logger.info("請求 {} 贏得競價,價格: {}", requestId, calculatedBidPrice);
// 返回包含出價信息的響應
return new BidResponse(requestId, true, calculatedBidPrice, creativeId, adId);
} else {
// 決定不出價
logger.info("請求 {} 未贏得競價,計算價格: {} <= 底價: {}", requestId, calculatedBidPrice, floorPrice);
// 返回不出價的響應
return new BidResponse(requestId, false, 0.0, null, null);
}
}
/**
* 當沒有歷史數據時,使用的默認競價策略
* @param requestId 請求ID
* @param floorPrice 底價
* @return 默認的競價響應
*/
private BidResponse createDefaultBidResponse(String requestId, double floorPrice) {
// 默認策略:如果沒有數據,有30%的概率出價,出價略高于底價
boolean shouldBid = random.nextDouble() > 0.7; // 70%概率不競價,30%概率競價
if (shouldBid) {
double defaultBid = floorPrice * 1.05; // 出價比底價高5%
String creativeId = "default_creative_" + random.nextInt(10); // 隨機選擇一個默認創意
String adId = deriveAdId(creativeId);
return new BidResponse(requestId, true, defaultBid, creativeId, adId);
} else {
// 不出價
return new BidResponse(requestId, false, 0.0, null, null);
}
}
/**
* 根據歷史平均CPC和底價計算本次出價
* @param avgCpc 歷史平均CPC
* @param floorPrice 底價
* @return 計算出的出價
*/
private double calculateBidPrice(double avgCpc, double floorPrice) {
// 策略:出價比平均CPC高10%,但不能低于底價,也不能超過底價的1.5倍
double targetBid = avgCpc * 1.10; // 目標出價 = 平均CPC * 1.1
double maxBid = floorPrice * 1.5; // 出價上限 = 底價 * 1.5
// 最終出價 = max(底價, min(目標出價, 上限))
double finalBid = Math.max(floorPrice, Math.min(targetBid, maxBid));
logger.debug("計算出價: 目標={}, 上限={}, 最終={}", targetBid, maxBid, finalBid);
return finalBid;
}
/**
* 選擇廣告創意ID的邏輯(簡化示例)
* 在真實場景中,這里會包含復雜的用戶畫像、創意匹配、預算控制等邏輯
* @param request 競價請求
* @return 選中的創意ID
*/
private String selectCreativeId(BidRequest request) {
// 示例:基于廣告位和時間戳生成一個創意ID
return"creative_" + request.getAdSlot() + "_" + System.currentTimeMillis() % 100;
}
/**
* 從創意ID推導廣告ID的邏輯(簡化示例)
* @param creativeId 創意ID
* @return 廣告ID
*/
private String deriveAdId(String creativeId) {
// 示例:將創意ID前綴替換為廣告ID前綴
return creativeId.replace("creative_", "ad_");
}
}Application
package com.example.pinotdemo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class PinotIntegrationDemoApplication {
public static void main(String[] args) {
SpringApplication.run(PinotIntegrationDemoApplication.class, args);
}
}責任編輯:武曉燕
來源:
Java知識日歷



































