돈의 정석: 정박사의 재테크 연구소

로또 프로그램 제작(2) 본문

재테크

로또 프로그램 제작(2)

재테크 정박사 2025. 2. 12. 00:16

 

안녕하세요..정박사입니다.

 

요새 계속 생각을 좀 해보고 있거든요...

어떤 알고리즘을 보강할지..

 

제 원래의 알고리즘은 다음과 같아요,..

우선 데이터를 로딩하고, one hot인코딩을 해서 학습에 필요하게 데이터를 만듭니다.

이후 학습데이터, test데이터, 검증 데이터로 분류를 하고 batch size를 정의합니다.

 

그다음은 딥런닝 모델을 작성하는건데, 저는 로또 번호가 시간에 대해 단서가 있다고 가정을 하고

LSTM모델을 사용했습니다.

그 다음 상금액을 가중치로 추가해서 상금액에 따라 많이 나오는 번호를 뽑았습니다.

 

하지만, 성능이 그닥 좋지가 않아서 지금 생각하는건..

전체 데이터 학습 후 실제 로또 번호로 테스트하고 재학습하는 방식입니다.

 

이거의 장점은 

  1. 더 많은 학습 데이터를 활용할 수 있어 패턴 인식 능력이 향상될 수 있고, 
  2. 최신 당첨 번호에 대한 즉각적인 피드백과 학습이 가능하고, 
  3. 시계열 데이터의 특성을 더 잘 반영할 수 있을거 같아요..

우려되는 점은

  1. 과적합(Overfitting)의 위험이 있어서 모든 데이터를 학습에 사용하면 모델이 너무 특정 패턴에 맞춰질 수 있고....
  2. 로또는 기본적으로 무작위성이 매우 높은 시스템이라, 패턴 학습이 실제 성능 향상으로 이어지지 않을 수 있습니다.
  3. 최신 당첨 번호에 대한 즉각적인 재학습이 오히려 모델의 일반화 능력을 해칠 수 도 있구요..

해서 최종 생각은

  1. 전체 데이터로 학습하되, 드롭아웃(Dropout) 비율을 높여 과적합을 방지하고,
  2. 재학습 시에는 학습률(Learning Rate)을 매우 작게 설정하여 급격한 가중치 변화를 방지하고,
  3. 앙상블 방식을 도입하여 여러 모델의 예측을 종합해 볼까 합니다.
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

# 데이터 로드
rows = np.loadtxt("./lotto.csv", delimiter=",")
row_count = len(rows)
print(f"총 로또 회차 수: {row_count}")

# 당첨번호를 원핫인코딩벡터(ohbin)으로 변환
def numbers2ohbin(numbers):
    ohbin = np.zeros(45)
    for i in range(6):
        ohbin[int(numbers[i])-1] = 1
    return ohbin

# 원핫인코딩벡터(ohbin)를 번호로 변환
def ohbin2numbers(ohbin):
    numbers = []
    for i in range(len(ohbin)):
        if ohbin[i] == 1.0:
            numbers.append(i+1)
    return numbers

numbers = rows[:, 1:7]
ohbins = list(map(numbers2ohbin, numbers))

x_samples = ohbins[:-1]
y_samples = ohbins[1:]

# 데이터셋 분할 인덱스
train_idx = (0, 800)
val_idx = (801, 1007)
test_idx = (1008, len(x_samples))

print(f"train: {train_idx}, val: {val_idx}, test: {test_idx}")

# NumPy 배열로 변환 및 리쉐이프
x_samples = np.array(x_samples).reshape(-1, 1, 45)
y_samples = np.array(y_samples).reshape(-1, 45)

# 데이터셋 생성
batch_size = 1

train_x = x_samples[train_idx[0]:train_idx[1]]
train_y = y_samples[train_idx[0]:train_idx[1]]

val_x = x_samples[val_idx[0]:val_idx[1]]
val_y = y_samples[val_idx[0]:val_idx[1]]

test_x = x_samples[test_idx[0]:test_idx[1]]
test_y = y_samples[test_idx[0]:test_idx[1]]

train_dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y)).batch(batch_size)
val_dataset = tf.data.Dataset.from_tensor_slices((val_x, val_y)).batch(batch_size)
test_dataset = tf.data.Dataset.from_tensor_slices((test_x, test_y)).batch(batch_size)

# 모델 정의
inputs = tf.keras.Input(shape=(1, 45), batch_size=batch_size)
lstm_layer = layers.LSTM(
    256,
    stateful=True,
    return_sequences=False
)
x = lstm_layer(inputs)
x = layers.Dropout(0.2)(x)  # 드롭아웃 추가
outputs = layers.Dense(45, activation='sigmoid')(x)
model = models.Model(inputs=inputs, outputs=outputs)

# 모델 컴파일
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# 상태 초기화를 위한 콜백
class ResetStatesCallback(tf.keras.callbacks.Callback):
    def on_epoch_begin(self, epoch, logs=None):
        self.model.reset_states()

# 모델 학습
epochs = 100
'''history = model.fit(
    train_dataset,
    epochs=epochs,
    validation_data=val_dataset,
    shuffle=False,
    callbacks=[ResetStatesCallback()]
)'''

history = model.fit(
    train_dataset.shuffle(buffer_size=len(train_x)),
    epochs=epochs,
    validation_data=val_dataset,
    callbacks=[ResetStatesCallback()]
)


# 학습 결과 시각화
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Train Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Accuracy')
plt.legend()
plt.show()

# 88회부터 지금까지 1등부터 5등까지 상금의 평균낸다.
'''mean_prize = [
    np.mean(rows[87:, 8]),
    np.mean(rows[87:, 9]),
    np.mean(rows[87:, 10]),
    np.mean(rows[87:, 11]),
    np.mean(rows[87:, 12])
]
print(f"평균 상금: {mean_prize}")'''

mean_prize = [
    np.mean(rows[:, 8]),
    np.mean(rows[:, 9]),
    np.mean(rows[:, 10]),
    np.mean(rows[:, 11]),
    np.mean(rows[:, 12])
]

# 등수와 상금을 반환함
def calc_reward(true_numbers, true_bonus, pred_numbers):
    count = len(set(pred_numbers) & set(true_numbers))
    if count == 6:
        return 0, mean_prize[0]
    elif count == 5 and true_bonus in pred_numbers:
        return 1, mean_prize[1]
    elif count == 5:
        return 2, mean_prize[2]
    elif count == 4:
        return 3, mean_prize[3]
    elif count == 3:
        return 4, mean_prize[4]
    else:
        return 5, 0

# 번호 생성 함수
'''def gen_numbers_from_probability(nums_prob):
    ball_box = []
    for n in range(45):
        ball_count = int(nums_prob[n] * 100 + 1)
        ball = np.full((ball_count), n+1)
        ball_box += list(ball)
    selected_balls = []
    while len(selected_balls) < 6:
        ball = np.random.choice(ball_box)
        if ball not in selected_balls:
            selected_balls.append(ball)
    return selected_balls'''

def gen_numbers_from_probability(nums_prob):
    # normalize probability
    nums_prob = nums_prob / np.sum(nums_prob)
    # 45개 중 확률에 따라 6개 번호 무작위 추출(중복 없이)
    selected_nums = np.random.choice(range(1,46), size=6, replace=False, p=nums_prob)
    return selected_nums.tolist()

# 결과 저장 변수 초기화
train_total_reward = []
train_total_grade = np.zeros(6, dtype=int)

val_total_reward = []
val_total_grade = np.zeros(6, dtype=int)

test_total_reward = []
test_total_grade = np.zeros(6, dtype=int)

model.reset_states()

print('[No. ] 1st 2nd 3rd 4th 5th 6th Rewards')

for i in range(len(x_samples)):
    xs = x_samples[i].reshape(1, 1, 45)
    ys_pred = model.predict_on_batch(xs)

    sum_reward = 0
    sum_grade = np.zeros(6, dtype=int)

    for n in range(10):
        numbers = gen_numbers_from_probability(ys_pred[0])

        if i+1 < len(rows):
            grade, reward = calc_reward(rows[i+1,1:7], rows[i+1,7], numbers)
        else:
            break

        sum_reward += reward
        sum_grade[grade] += 1

        if i >= train_idx[0] and i < train_idx[1]:
            train_total_grade[grade] += 1
        elif i >= val_idx[0] and i < val_idx[1]:
            val_total_grade[grade] += 1
        elif i >= test_idx[0] and i < test_idx[1]:
            test_total_grade[grade] += 1

    if i >= train_idx[0] and i < train_idx[1]:
        train_total_reward.append(sum_reward)
    elif i >= val_idx[0] and i < val_idx[1]:
        val_total_reward.append(sum_reward)
    elif i >= test_idx[0] and i < test_idx[1]:
        test_total_reward.append(sum_reward)

    print('[{0:4d}] {1:3d} {2:3d} {3:3d} {4:3d} {5:3d} {6:3d} {7:15,d}'.format(
        i+1,
        sum_grade[0], sum_grade[1], sum_grade[2], sum_grade[3], sum_grade[4], sum_grade[5],
        int(sum_reward)
    ))

print('Total')
print('==========')
print('Train {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(
    train_total_grade[0], train_total_grade[1], train_total_grade[2], train_total_grade[3],
    train_total_grade[4], train_total_grade[5], int(sum(train_total_reward))
))
print('Val   {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(
    val_total_grade[0], val_total_grade[1], val_total_grade[2], val_total_grade[3],
    val_total_grade[4], val_total_grade[5], int(sum(val_total_reward))
))
print('Test  {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(
    test_total_grade[0], test_total_grade[1], test_total_grade[2], test_total_grade[3],
    test_total_grade[4], test_total_grade[5], int(sum(test_total_reward))
))
print('==========')

# 추가 학습 (원하는 경우)
# 다시 학습을 수행하려면 아래 코드를 사용하세요.
for epoch in range(100):
    model.reset_states()
    history = model.fit(
        train_dataset,
        epochs=1,
        shuffle=False,
        verbose=0  # 출력 억제
    )
    print('epoch {0:4d} train acc {1:0.3f} loss {2:0.3f}'.format(
        epoch,
        history.history['accuracy'][0],
        history.history['loss'][0]
    ))

# 마지막 회차까지 학습한 모델로 다음 회차 추론
print('receive numbers')

xs = x_samples[-1].reshape(1, 1, 45)
ys_pred = model.predict_on_batch(xs)

list_numbers = []

for n in range(10):
    numbers = gen_numbers_from_probability(ys_pred[0])
    numbers.sort()
    print('{0} : {1}'.format(n, numbers))
    list_numbers.append(numbers)