-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmessage_processor.py
More file actions
138 lines (108 loc) · 5.42 KB
/
message_processor.py
File metadata and controls
138 lines (108 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# ==============================================================================
# 1. message_processor.py - Core message processing logic
# ==============================================================================
from typing import List, Dict, Set
from collections import defaultdict
import time
from concurrent.futures import ThreadPoolExecutor
from utils import enrich_message
class MessageProcessor:
"""Handles core message processing logic"""
def __init__(self, config, db, vector_store, analyzer, session):
self.config = config
self.db = db
self.vector_store = vector_store
self.analyzer = analyzer
self.session = session
self.streamer_username = "niaghtmares"
def process_messages_batch(self, messages: List[Dict]) -> List[Dict]:
"""Process messages in batches and return alerts"""
alerts = []
alert_ids = set()
processed_count = 0
print(f"\n🔄 Procesando en lotes de {self.config.BATCH_SIZE} mensajes...")
for i in range(0, len(messages), self.config.BATCH_SIZE):
batch = messages[i:i + self.config.BATCH_SIZE]
batch_num = i // self.config.BATCH_SIZE + 1
total_batches = (len(messages) - 1) // self.config.BATCH_SIZE + 1
print(f"📦 Procesando lote {batch_num}/{total_batches} ({len(batch)} mensajes)")
batch_start = time.time()
batch_alerts = self._process_single_batch(batch, alert_ids)
batch_time = time.time() - batch_start
alerts.extend(batch_alerts)
processed_count += len(batch)
# Progress reporting
if batch_num % 5 == 0 or batch_num == total_batches:
rate = len(batch) / batch_time if batch_time > 0 else 0
print(f" ⚡ Lote {batch_num} completado en {batch_time:.2f}s ({rate:.1f} msg/s)")
print(
f" 📊 Progreso total: {processed_count}/{len(messages)} ({processed_count / len(messages) * 100:.1f}%)")
return alerts
def _process_single_batch(self, batch: List[Dict], alert_ids: Set[str]) -> List[Dict]:
"""Process a single batch of messages"""
batch_alerts = []
for message in batch:
try:
message = enrich_message(message)
message_id = message['message_id']
if not message.get("username"):
print(f"⚠️ Mensaje sin username detectado: {message}")
continue
# Analyze message
analysis = self.analyzer.analyze_message(message)
analysis["message_id"] = message_id
# Get embedding
embedding = self.analyzer.get_embedding(message['text'])
if embedding:
# Save to both stores
point_id = self._save_with_verification(message, analysis, embedding)
# Check for alerts
if (analysis['requires_action'] and
analysis['message_id'] not in alert_ids and
message['username'].lower() != self.streamer_username):
alert = self._create_alert(message, analysis)
batch_alerts.append(alert)
alert_ids.add(analysis['message_id'])
except Exception as e:
print(f"❌ Error procesando mensaje de {message.get('username', 'unknown')}: {e}")
continue
return batch_alerts
def _save_with_verification(self, message: Dict, analysis: Dict, embedding: List[float]) -> str:
"""Save to both databases with consistency verification"""
try:
point_id = self.vector_store.add_message(message, analysis, embedding)
if point_id:
postgres_success = self.db.save_analysis(message, analysis, point_id)
if not postgres_success:
print(f"⚠️ Inconsistencia: Guardado en Qdrant pero falló PostgreSQL")
return point_id
return ""
except Exception as e:
print(f"❌ Error en guardado verificado: {e}")
return ""
def _create_alert(self, message: Dict, analysis: Dict) -> Dict:
"""Create a standardized alert object"""
return {
'message_id': analysis['message_id'],
'username': message['username'],
'text': message['text'],
'timestamp': message['timestamp_str'],
'toxicity': analysis['toxicity_score'],
'spam_probability': analysis['spam_probability'],
'action': analysis['action_type'],
'reason': analysis['reasoning'],
'categories': analysis.get('categories', []),
'severity': self._calculate_severity(analysis)
}
def _calculate_severity(self, analysis: Dict) -> str:
"""Calculate alert severity based on multiple factors"""
toxicity = analysis.get('toxicity_score', 0)
spam = analysis.get('spam_probability', 0)
if toxicity >= 0.8 or spam >= 0.9:
return 'CRITICAL'
elif toxicity >= 0.6 or spam >= 0.7:
return 'HIGH'
elif toxicity >= 0.4 or spam >= 0.5:
return 'MEDIUM'
else:
return 'LOW'