实施CUSUM求解规则,以检测观察流中的不连续性

问题遇到的现象和发生背景

实施CUSUM求解规则,以检测观察流中的不连续性(观察值的概率分布中的一次性跳跃变化)。观察流的数据应该用 "没有不连续 "和 "在时间$/nu$有不连续 "两种方式建模。
与提问问题1和问题2类似,使用蒙特卡洛方法为已实现的求解器估计误报概率和故障后到做出决定的平均延迟(以观测次数计)。考虑在没有篡改的情况下对所构建的解算器的约束(避免无限等待)。
独立设置问题的参数:阈值、衰减时间等。
调查阈值和两个备选参数值的接近程度对性能的影响(错误警报的概率,故障后决定前的平均失踪观测值数量)。
问题1
在 2 个简单假设的情况下,用计算机实施序贯概率比检验 (SPRT)。
使用蒙特卡洛方法估计其错误类型 I 和 II 的概率,以及条件预期样本大小(运行测试 10 000 次,$H_0$ 为真,
计算有利于 $H_1$ 的 FALSE 决策的数量,除以 10 000 -> 你得到 $\alpha$ 的估计值;启动一个计数器,
将你停止测试的观察次数添加到一个计数器,每次运行时 $H_0$ 为真,然后除以 10 000 ->
你将得到 $ 的估计值t_0$;$\beta$ 和 $t_1$ 的估计是相似的)。

实现:几何分布,$\theta$ 是它唯一的参数。
分析错误概率和假设接近度的预期样本大小的依赖关系。将 $\alpha$ 和 $\beta$ 用于您选择的阈值计算

尝试过的解决方法

与上两个提问问题类似

我想要达到的结果

用Python编写写一下注释,使用蒙特卡洛方法,对几何分布进行检验,同时满足题目的要求。

基于Monster 组和GPT的调写:
用Python实现的序贯概率比检验的示例,针对几何分布的情况。代码中使用蒙特卡罗方法估计错误类型 I 和 II 的概率,以及条件预期样本大小。


import numpy as np

def sequential_probability_ratio_test(sample, threshold, alpha, beta):
    # 初始化统计量
    t = 0
    n = len(sample)
    log_l = 0
    log_f = 0
    decision = 0

    # 计算参数
    p0 = 1 / (1 + threshold)
    p1 = threshold / (1 + threshold)

    # 进行SPRT测试
    while decision == 0 and t < n:
        x = sample[t]
        log_l += np.log(p1 * (1 - p0)**x)
        log_f += np.log(p0 * (1 - p1)**x)
        t += 1

        # 判断是否需要停止测试
        if log_l - log_f >= np.log(beta / alpha):
            decision = 1
        elif log_f - log_l >= np.log(beta / alpha):
            decision = -1

    # 返回统计信息
    return decision, t

# 进行蒙特卡罗模拟
n_simulations = 10000
alpha = 0.05
beta = 0.1
theta = 0.2
threshold = 2

n_false = 0
n_miss = 0
false_count = 0
miss_count = 0

for i in range(n_simulations):
    sample_size = sequential_probability_ratio_test(np.random.geometric(theta, 100), threshold, alpha, beta)[1]
    if sample_size < 100:
        n_false += 1
        false_count += 100 - sample_size
    else:
        n_miss += 1
        miss_count += sample_size - 100

# 计算误差率和平均停顿时间
false_rate = n_false / n_simulations
miss_rate = n_miss / n_simulations
avg_false_count = false_count / n_false if n_false > 0 else 0
avg_miss_count = miss_count / n_miss if n_miss > 0 else 0

# 打印结果
print("错误类型 I 的概率:", false_rate)
print("错误类型 II 的概率:", miss_rate)
print("条件预期样本大小(平均失踪观测值数量):", avg_miss_count)
print("错误警报的数量:", avg_false_count)

# 计算阈值为2时的 alpha 和 beta
p0 = 1 / (1 + threshold)
p1 = threshold / (1 + threshold)
a = (np.log(beta / alpha) + np.log((1 - p1) / p1)) / np.log((1 - p0) / p0)
b = (np.log(alpha / beta) + np.log((1 - p0) / p0)) / np.log((1 - p1) / p1)
print("阈值为2时的 alpha 和 beta:", alpha**a, beta**b)

首先定义了一个名为 sequential_probability_ratio_test 的函数,该函数接受样本、阈值、α和β等参数,然后进行SPRT测试并返回决策和观测样本数等统计信息。

然后进行蒙特卡罗模拟,使用 np.random.geometric 生成一个参数为 theta 的几何分布样本,并调用 sequential_probability_ratio_test 函数进行SPRT测试。如果测试结果表明样本数小于100,则认为是错误类型 I,统计错误数和错误警报的数量;否则认为是错误类型 II,统计错失数和条件预期样本大小等信息。

最后,计算错误类型 I 和 II 的概率,以及条件预期样本大小和错误警报的数量等统计信息,并打印输出。

此外,该代码还计算了阈值为2时的α和β,并将其打印输出。

可以使用Python编写代码来实现。以下代码,其中使用了蒙特卡洛模拟方法对几何分布进行检验,并计算错误类型 I 和 II 的概率以及条件预期样本大小。

import numpy as np

def sequential_probability_ratio_test(observations, alpha, beta, theta):
    n = len(observations)
    t0 = 0
    t1 = 0
    for i in range(n):
        x = observations[i]
        t0 += np.log((1 - theta) / (1 - beta)) if x == 0 else np.log(theta / alpha)
        t1 += np.log((1 - theta) / (1 - alpha)) if x == 0 else np.log(theta / beta)
        if t0 <= np.log(beta / (1 - alpha)) or t1 <= np.log(alpha / (1 - beta)):
            return i+1, t0 <= np.log(beta / (1 - alpha))
    return n, t0 <= np.log(beta / (1 - alpha))

def monte_carlo_simulations(observations, alpha, beta, theta, num_trials):
    false_positives = 0
    false_negatives = 0
    missed_observations = 0
    for i in range(num_trials):
        simulated_observations = np.random.geometric(theta, size=len(observations))
        _, decision = sequential_probability_ratio_test(simulated_observations, alpha, beta, theta)
        if decision and not np.array_equal(simulated_observations, observations):
            false_positives += 1
        elif not decision and np.array_equal(simulated_observations, observations):
            false_negatives += 1
        elif not decision and not np.array_equal(simulated_observations, observations):
            missed_observations += 1
    return false_positives / num_trials, false_negatives / num_trials, missed_observations / num_trials

# example usage
observations = np.random.geometric(0.5, size=100)
alpha = 0.05
beta = 0.05
theta = 0.4
num_trials = 10000

fp, fn, mo = monte_carlo_simulations(observations, alpha, beta, theta, num_trials)
print("False positives:", fp)
print("False negatives:", fn)
print("Missed observations:", mo)


该代码中,sequential_probability_ratio_test函数实现了序贯概率比检验,monte_carlo_simulations函数使用蒙特卡洛模拟方法进行检验。使用np.random.geometric函数生成几何分布的样本,并使用计算结果来计算错误类型 I 和 II 的概率以及条件预期样本大小。

可以使用不同的参数值调用该代码来分析错误概率和假设接近度的预期样本大小的依赖关系。最后,可以将计算结果与所选阈值进行比较。
望采纳。

下面是一个Python实现,使用蒙特卡洛方法来估计序贯概率比检验(SPRT)中的错误类型 I 和 II 的概率,以及条件预期样本大小。该实现基于几何分布,其中 $\theta$ 是唯一参数。

import numpy as np

def geometric_dist(theta, n):
    """生成参数为theta的几何分布的n个样本"""
    p = 1 / (1 + theta) # 几何分布的概率
    samples = np.random.geometric(p, size=n) - 1
    return samples

def sequential_probability_ratio_test(samples, alpha, beta, stop):
    """使用序贯概率比检验来检测假设H0和H1"""
    n = len(samples)
    threshold_lower = np.log(beta / (1 - alpha)) / n
    threshold_upper = np.log((1 - beta) / alpha) / n
    S = 0 # 检验统计量
    i = 0 # 样本计数器
    while S < threshold_upper and S > threshold_lower and i < stop:
        S += np.log((1 + theta) / theta) if samples[i] else np.log(theta / (1 + theta))
        i += 1
    decision = 1 if S >= threshold_upper else 0
    return decision, i

# 参数设置
theta = 0.2
alpha = 0.05
beta = 0.1
stop = 1000
trials = 10000

# 运行蒙特卡洛模拟
false_positive = 0
false_negative = 0
average_sample_size = 0

for i in range(trials):
    # 生成样本
    samples = geometric_dist(theta, stop)
    # 进行SPRT检验
    decision, sample_size = sequential_probability_ratio_test(samples, alpha, beta, stop)
    # 记录错误类型I和II的数量以及平均样本大小
    if decision == 1 and theta == 0:
        false_positive += 1
    elif decision == 0 and theta > 0:
        false_negative += 1
    average_sample_size += sample_size

# 计算估计值
false_positive_rate = false_positive / trials
false_negative_rate = false_negative / trials
average_sample_size /= trials

# 打印结果
print("False positive rate:", false_positive_rate)
print("False negative rate:", false_negative_rate)
print("Average sample size:", average_sample_size)

该实现生成几何分布的 $n$ 个样本,其中 $n$ 由 stop 参数指定。然后,它使用序贯概率比检验(SPRT)来检测假设 $H_0$ 和 $H_1$,其中 $H_0$ 是 $\theta = 0$,$H_1$ 是 $\theta > 0$。如果检验统计量在阈值上限和下限之间,则继续增加样本数,直到检验统计量超过上限或低于下限,或者样本数量达到 stop 参数所指定的数量。最后,函数返回决策和样本数。

#import modules
import numpy as np  #导入numpy模块
import scipy.stats as stats #导入scipy的stats模块

#set up parameters
theta = 0.5 #the parameter of the geometric distribution 定义几何分布的参数θ为0.5
alpha = 0.05 #the threshold for type I error 定义类型I误差的阈值为0.05

beta = 0.1 #the threshold for type II error 定义类型II误差的阈值为0.1

#run monte carlo simulation
n_sim = 10000 #number of simulations 定义模拟的次数为10000
count_H0 = 0 #counter for type I error 定义类型I误差的计数器
count_H1 = 0 #counter for type II error 定义类型II误差的计数器
for i in range(n_sim): #开始循环
    x = np.random.geometric(theta) #generate one random observation 生成一个随机观测值
    p = stats.geom.cdf(x, theta) #calculate the cdf of the observation 计算x的几何分布的累积分布函数值
    t = x #calculate the test statistic 计算检验统计量
    if p < alpha: #if p<alpha, type I error happens 如果p小于α,则类型I误差发生
        count_H0 += 1 #计数器加1
    elif p > 1-beta: #if p>1-beta, type II error happens 如果p大于1-β,则类型II误差发生
        count_H1 += 1 #计数器加1

#calculate the estimated probabilities of error
alpha_hat = count_H0 / n_sim #计算类型I误差的估计概率
beta_hat = count_H1 / n_sim #计算类型II误差的估计概率

#calculate the expected sample size for given alpha and beta
t_alpha = stats.geom.ppf(alpha, theta) #计算给定α和θ时,期望样本大小的t_α
t_beta = stats.geom.ppf(1-beta, theta) #计算给定β和θ时,期望样本大小的t_β

#output the results
print('Estimated probability of type I error: ', alpha_hat)  #输出类型I误差的估计概率
print('Estimated probability of type II error: ', beta_hat)  #输出类型II误差的估计概率
print('Expected sample size for given alpha and beta: ', t_alpha, t_beta) #输出给定α和β时,期望样本大小的t_α和t_β

以上回答来自ChatGPT

下面是一个简单的实现过程,仅供参考。

首先,我们需要实现序贯概率比检验(SPRT)的算法。这里我们使用几何分布作为原假设($H_0$)和备择假设($H_1$)的分布。

import math
import random

def sprt_geom(theta, alpha, beta, min_obs=10, max_obs=1000):
    """
    Sequential Probability Ratio Test (SPRT) for Geometric Distribution
    :param theta: probability of success
    :param alpha: type I error rate
    :param beta: type II error rate
    :param min_obs: minimum number of observations
    :param max_obs: maximum number of observations
    :return: (decision, sample_size)
    """
    # calculate threshold values
    A = math.log(beta / (1 - alpha))
    B = math.log((1 - beta) / alpha)

    # initialization
    n = 0
    S = 0
    F = 0

    while True:
        # generate observation from geometric distribution
        x = random.geometric(theta)

        # update likelihood ratio and check stopping condition
        S += math.log(1 - theta) - math.log(theta) * x
        F += math.log(1 - theta) * x
        if S >= B:
            return True, n
        if F >= A:
            return False, n

        # update sample size
        n += 1
        if n >= max_obs:
            return False, n
        if n >= min_obs and (S < A or F < B):
            return False, n

该函数的输入参数包括 $\theta$,即几何分布的概率参数,以及 $\alpha$ 和 $\beta$,分别代表类型 I 和类型 II 错误的概率限制。min_obs 和 max_obs 参数分别表示样本数的最小和最大限制。该函数的输出为一个元组,第一个元素表示 SPRT 决策的结果(True 或 False),第二个元素表示达到决策所需的样本数。

接下来,我们可以使用蒙特卡洛方法来估计错误概率和假设接近度的预期样本大小。我们可以编写一个函数来运行多次测试并计算结果:

def run_sprt_test(theta, alpha, beta, num_trials=10000, min_obs=10, max_obs=1000):
    """
    Run SPRT test multiple times and compute error probabilities and expected sample sizes
    :param theta: probability of success
    :param alpha: type I error rate
    :param beta: type II error rate
    :param num_trials: number of test runs
    :param min_obs: minimum number of observations
    :param max_obs: maximum number of observations
    :return: (alpha_hat, beta_hat, t0_hat, t1_hat)
    """
    alpha_hat = 0.0
    beta_hat = 0.0
    t0_hat = 0.0
    t1_hat = 0.0

    for i in range(num_trials):
        decision, n = sprt_geom

以下是一个Python的实现代码,其中包括蒙特卡洛模拟SPRT,计算错误类型I和II的概率以及条件预期样本大小,以及分析错误概率和假设接近度的依赖关系。

import numpy as np

def sprt_geom(theta, alpha, beta, n, delta):
    """
    Implement sequential probability ratio test (SPRT) for geometric distribution with parameter theta.
    The null hypothesis is that theta = theta0, and the alternative hypothesis is that theta = theta1 > theta0.
    The function returns the decision (0 for H0, 1 for H1), the sample size used, and the log-likelihood ratio at each stage.
    """
    ll0 = np.zeros(n+1)  # log-likelihood ratio under H0
    ll1 = np.zeros(n+1)  # log-likelihood ratio under H1
    ll0[0] = np.log(1-alpha/beta)
    ll1[0] = np.log(alpha/(1-beta))
    t = 0
    while t <= n and abs(ll1[t] - ll0[t]) < delta:
        x = np.random.geometric(p=theta)
        ll0[t+1] = ll0[t] + np.log((1-theta0)**x * theta0)
        ll1[t+1] = ll1[t] + np.log((1-theta1)**x * theta1/theta0)
        t += 1
    if ll1[t] - ll0[t] >= delta:
        return 1, t, ll1 - ll0
    else:
        return 0, t, ll1 - ll0

# Parameters
theta0 = 0.1  # null hypothesis
theta1 = 0.2  # alternative hypothesis
alpha = 0.05  # type I error probability
beta = 0.1    # type II error probability
n = 100       # maximum sample size
delta = np.log((1-beta)/alpha)

# Simulate SPRT
n_trials = 10000
decisions = np.zeros(n_trials)
sample_sizes = np.zeros(n_trials)
ll_ratios = np.zeros((n_trials, n+1))
for i in range(n_trials):
    decision, sample_size, ll_ratio = sprt_geom(theta1, alpha, beta, n, delta)
    decisions[i] = decision
    sample_sizes[i] = sample_size
    ll_ratios[i,:] = ll_ratio

# Calculate error probabilities and expected sample sizes
alpha_hat = np.mean(decisions)
t0_hat = np.mean(sample_sizes[decisions==0])
t1_hat = np.mean(sample_sizes[decisions==1])
beta_hat = np.mean(1 - decisions)
print(f"alpha_hat = {alpha_hat:.4f}")
print(f"beta_hat = {beta_hat:.4f}")
print(f"t0_hat = {t0_hat:.2f}")
print(f"t1_hat = {t1_hat:.2f}")

# Plot log-likelihood ratios for H0 and H1
import matplotlib.pyplot as plt
fig, ax = plt.subplots()
ax.plot(np.arange(n+1), np.zeros(n+1), 'k--')
ax.plot(np.arange(n+1), np.log((1-alpha)/beta) * np.ones(n+1), 'r--')
ax.plot(np.arange(n+1), np.log(alpha/(1-beta)) * np.ones(n+1), 'g--')
ax.plot(np.arange


如果对您有帮助,请给与采纳,谢谢!

这个问题需要进行序贯概率比检验(Sequential Probability Ratio Test, SPRT)和蒙特卡洛方法,以估计错误类型 I 和 II 的概率、条件预期样本大小,以及错误概率和假设接近度的预期样本大小的依赖关系。

以下是实现 SPRT 和 Monte Carlo 模拟的基本思路:

  1. 实现几何分布
    根据题目所述,这里需要实现几何分布。几何分布可以使用 numpy 库的 geometric 函数实现,该函数的参数是成功的概率 p。
import numpy as np

# 几何分布
def geometric(p, size):
    return np.random.geometric(p, size)

2.实现 SPRT
根据题目所述,这里需要实现 SPRT。SPRT 的实现步骤如下:

  • 选择 $\alpha$ 和 $\beta$(也就是两个备选参数值),并初始化计数器,例如:
alpha = 0.05
beta = 0.05
count = 0
  • 初始化 SPRT 参数,包括 $L_0$、$L_1$ 和 $A$,例如:
p0 = 0.1
p1 = 0.2
L0 = np.log((1 - p1) / (1 - p0))
L1 = np.log(p1 / p0)
A = np.log((1 - beta) / alpha)
  • 进行 SPRT 检验,例如:
def sprt(p, L0, L1, A):
    L = 0
    n = 0
    while True:
        if L >= A:
            return 1, n
        if L <= -A:
            return 0, n
        x = p[n]
        L += L1 if x else L0
        n += 1

在 SPRT 检验中,$p$ 是观察到的几何分布数据,$L_0$ 和 $L_1$ 是两种假设下的对数似然比,$A$ 是边界常量。

  • 运行 Monte Carlo 模拟
    这里的 Monte Carlo 模拟需要运行多次 SPRT 检验,然后根据结果统计误报概率和故障后到做出决定的平均延迟。例如:
size = 100  # 样本数量
N = 10000  # Monte Carlo 模拟次数
FPR = 0
MD = 0
for i in range(N):
    p = geometric(p0, size)  # 生成几何分布样本
    decision, n = sprt(p, L0, L1, A)  # 进行 SPRT 检验
    if decision == 1:
        FPR += 1
    if decision == 1 and n > size // 2:
        MD += n - size // 2
FPR /= N
MD /= N

在 Monte Carlo 模拟中,$size$ 是样本数量,


import numpy as np
import math

# 函数: 计算几何分布的概率质量函数 (PMF)
def geometric_pmf(k, p):
    return (1 - p)**(k-1) * p

# 函数: 计算几何分布的累积分布函数 (CDF)
def geometric_cdf(k, p):
    return 1 - (1 - p)**k

# 函数: 生成一组几何分布的随机样本
def generate_geometric_samples(p, n):
    return np.random.geometric(p, n)

# 函数: 执行序贯概率比检验 (SPRT) 的单次实验
def sprt_experiment(p0, p1, alpha, beta, n):
    # 初始化序贯概率比统计量和停止规则的阈值
    A = math.log(beta/(1-alpha))
    B = math.log((1-beta)/alpha)

    # 初始化样本计数器和概率比计数器
    n0, n1 = 0, 0
    log_likelihood_ratio = 0

    # 进行序贯概率比检验
    while log_likelihood_ratio < B and log_likelihood_ratio > A and n0+n1 < n:
        # 生成新的样本点
        X = generate_geometric_samples(p0, 1)[0]

        # 计算概率比统计量
        log_likelihood_ratio += math.log(geometric_pmf(X, p1) / geometric_pmf(X, p0))

        # 根据样本属于哪个分布更新样本计数器
        if np.random.rand() < geometric_pmf(X, p1) / geometric_pmf(X, p0):
            n1 += 1
        else:
            n0 += 1

    # 根据概率比统计量的值作出决策
    if log_likelihood_ratio >= B:
        return 1  # 接受H1
    else:
        return 0  # 接受H0

# 函数: 执行多次实验,计算错误概率和平均观测次数
def run_sprt_trials(p0, p1, alpha, beta, n, trials):
    # 初始化计数器
    false_rejections = 0  # 错误类型I的次数
    missed_detections = 0  # 错误类型II的次数
    total_observations = 0  # 所有实验中的观测次数

    # 进行多次实验
    for i in range(trials):
        # 执行一次实验
        decision = sprt_experiment(p0, p1, alpha, beta, n)

        # 根据实验结果更新计数器
        if decision == 1 and p0 == 1:
            false_rejections += 1
        elif decision == 0 and p0 != 1:
            missed_detections += 1
        total_observations += n

以下是使用Python实现序贯概率比检验的一个示例,以计算错误类型I和II的概率,以及条件预期样本大小。

import numpy as np
import math

# 函数用于计算 SPRT 的错误类型 I 和 II 的概率
def sprt_test(p0, p1, alpha, beta, max_n):
    n = 0
    t0 = 0
    t1 = 0
    while n < max_n:
        u = np.random.rand()
        if u < p1 / p0:
            t1 += math.log(p1 / p0)
        else:
            t0 += math.log((1 - p1) / (1 - p0))
        if t1 - t0 >= math.log(beta / (1 - alpha)):
            return "H1"
        if t0 - t1 >= math.log((1 - beta) / alpha):
            return "H0"
        n += 1
    return "NONE"

# 函数用于计算条件预期样本大小
def expected_sample_size(p0, p1, alpha, beta):
    return (math.log(beta / (1 - alpha)) - math.log((1 - beta) / alpha)) / (p1 - p0)

# 测试函数
def test_sprt():
    # 设置假设和阈值参数
    p0 = 0.6
    p1 = 0.8
    alpha = 0.05
    beta = 0.05

    # 进行蒙特卡洛模拟并计算错误类型 I 和 II 的概率
    num_trials = 10000
    false_positives = 0
    false_negatives = 0
    for i in range(num_trials):
        result = sprt_test(p0, p1, alpha, beta, 1000)
        if result == "H1":
            false_positives += 1
        elif result == "H0":
            false_negatives += 1

    # 输出结果
    print("假阳性概率:", false_positives / num_trials)
    print("假阴性概率:", false_negatives / num_trials)
    print("条件预期样本大小:", expected_sample_size(p0, p1, alpha, beta))

if __name__ == '__main__':
    test_sprt()

该示例程序使用蒙特卡洛方法进行模拟,通过计算实现序贯概率比检验,从而估计错误类型I和II的概率,以及条件预期样本大小。在示例中,假设概率 $p_0=0.6$,备选假设概率 $p_1=0.8$,显著性水平 $\alpha=0.05$,犯二类错误的概率 $\beta=0.05$,最大观测次数为1000,总共进行10000次模拟。最后,输出估计的假阳性概率、假阴性概率和条件预期样本大小。