add temperature from multiplex group
[leaf-can-utils.git] / nissan-leaf-mitm.py
1 #! /usr/bin/env python3
2
3 import argparse
4 import can4python as can
5 import sys
6
7 def print_signals(received_signals, sent_signals):
8   sys.stdout.write("\033[H")
9   print("                                                     Hexadecimal    Decimal")
10   print("                                                       In   Out     In    Out")
11   for (key, received_value) in sorted(received_signals.items()):
12     received_value = int(received_value)
13     sent_value = int(sent_signals[key])
14     sys.stdout.write("\033[K")
15     # TODO look at bit width of signal
16     if key.startswith("_"):
17       print("{:>50}: {: 5x} {: 5x} {:6} {:6}".format(key, received_value, sent_value, received_value, sent_value))
18     else:
19       print("{:>50}: {: 05x} {: 05x} {:6} {:6}".format(key, received_value, sent_value, received_value, sent_value))
20
21 def calculate_checksum(frame_data):
22   data = []
23   for b in frame_data:
24     data.append(b)
25   data[7] = 0
26
27   result = 0
28   for b in data:
29     for i in range(7, -1, -1):
30       bit = 0 if (b & (1 << i)) == 0 else 1
31       if result > 127:
32         result = (((result << 1) + bit) ^ 0x85) & 0xff
33       else:
34         result = ((result << 1) + bit) & 0xff
35   return result
36
37 def verify_checksum(frame_id, frame_data):
38   checksum = calculate_checksum(frame_data)
39   if frame_data[7] != checksum:
40     raise ValueError("Checksum does not match in frame " + hex(frame_id))
41
42 def update_checksum(frame, signals):
43   checksum = calculate_checksum(frame.frame_data)
44   checksum_signal = checksum_signals[frame.frame_id]
45   frame.set_signalvalue(checksum_signal, checksum);
46   signals[checksum_signal.signalname] = checksum
47
48 def decode_mux(signals):
49   if "1dc Mux" in signals:
50     mux_id = signals.pop("1dc Mux")
51     value_5 = signals.pop("_1dc Mux Value 5")
52     value_6 = signals.pop("_1dc Mux Value 6")
53     signals["_1dc Mux_5_{:02x}".format(int(mux_id))] = value_5
54     signals["_1dc Mux_6_{:02x}".format(int(mux_id))] = value_6
55   if "5bc Mux 1" in signals:
56     mux_id = signals.pop("5bc Mux 1")
57     value = signals.pop("5bc Mux 1 Value")
58     if mux_id == 0x3:
59       signals["Capacity Bars"] = value
60     else:
61       signals["_5bc Mux_1_" + '{:02x}'.format(int(mux_id))] = value
62   if "5bc Mux 2" in signals:
63     mux_id = signals.pop("5bc Mux 2")
64     value = signals.pop("5bc Mux 2 Value")
65     signals["_5bc Mux_2_" + '{:02x}'.format(int(mux_id))] = value
66   if "5c0 Mux" in signals:
67     mux_id = signals.pop("5c0 Mux")
68     value_1 = signals.pop("5c0 Mux 1 Value")
69     value_2 = signals.pop("5c0 Mux 2 Value")
70     value_5 = signals.pop("5c0 Mux 5 Value")
71     if mux_id == 0x40:
72       signals["_5c0 Mux_1_{:02x}".format(int(mux_id))] = value_1
73       signals["Temperature"] = value_2
74       signals["_5c0 Mux_5_{:02x}".format(int(mux_id))] = value_5
75     else:
76       signals["_5c0 Mux_1_{:02x}".format(int(mux_id))] = value_1
77       signals["_5c0 Mux_2_{:02x}".format(int(mux_id))] = value_2
78       signals["_5c0 Mux_5_{:02x}".format(int(mux_id))] = value_5
79
80 def encode_mux_value(signals, mux_signal, value_signal, decoded_value_signal_prefix, mux_value):
81   decoded_value_signal = (decoded_value_signal_prefix + "{:02x}").format(mux_value)
82   if decoded_value_signal in signals:
83     signals[mux_signal] = mux_value
84     signals[value_signal] = signals.pop(decoded_value_signal)
85
86 def encode_mux_values(signals, mux_signal, value_signal, decoded_value_signal_prefix, mux_values):
87   for mux_value in mux_values:
88     encode_mux_value(signals, mux_signal, value_signal, decoded_value_signal_prefix, mux_value)
89
90 def encode_mux(signals):
91   _1dc_mux_values = [
92       0x01,
93       0x04,
94       0x05,
95       0x08,
96       0x0c,
97       0x1f,
98   ]
99   encode_mux_values(signals, "1dc Mux", "_1dc Mux Value 5", "_1dc Mux_5_", _1dc_mux_values)
100   encode_mux_values(signals, "1dc Mux", "_1dc Mux Value 6", "_1dc Mux_6_", _1dc_mux_values)
101   if "Capacity Bars" in signals:
102     signals["5bc Mux 1"] = 0x03
103     signals["5bc Mux 1 Value"] = signals.pop("Capacity Bars")
104   encode_mux_values(signals, "5bc Mux 1", "5bc Mux 1 Value", "_5bc Mux_1_", [
105       0x02,
106       0x04,
107       0x05,
108       0x0e,
109       0x0e,
110       0x0f,
111   ])
112   encode_mux_values(signals, "5bc Mux 2", "5bc Mux 2 Value", "_5bc Mux_2_", [
113       0x1f04,
114       0x2005,
115       0x2006,
116       0x2106,
117       0x4005,
118       0x4006,
119   ])
120   _5c0_mux_values = [
121       0x80,
122       0xc0,
123   ]
124   encode_mux_values(signals, "5c0 Mux", "5c0 Mux 1 Value", "_5c0 Mux_1_", _5c0_mux_values)
125   encode_mux_values(signals, "5c0 Mux", "5c0 Mux 2 Value", "_5c0 Mux_2_", _5c0_mux_values)
126   encode_mux_values(signals, "5c0 Mux", "5c0 Mux 5 Value", "_5c0 Mux_5_", _5c0_mux_values)
127   if "Temperature" in signals:
128     signals["5c0 Mux"] = 0x40
129     signals["5c0 Mux 1 Value"] = signals.pop("_5c0 Mux_1_40")
130     signals["5c0 Mux 2 Value"] = signals.pop("Temperature")
131     signals["5c0 Mux 5 Value"] = signals.pop("_5c0 Mux_5_40")
132
133 def replace(received_frame_signals, signal, value):
134   if signal in received_frame_signals:
135     received_frame_signals[signal] = value
136
137 parser = argparse.ArgumentParser(description='CAN Bus MitM')
138 parser.add_argument("-d", "--dump-kcd", action="store_true")
139 args = parser.parse_args()
140
141 bus_in = can.CanBus.from_kcd_file('nissan-leaf.kcd', 'vcan0', ego_node_ids=["1"])
142 bus_out = can.CanBus.from_kcd_file('nissan-leaf.kcd', 'vcan1', ego_node_ids=["1", "2", "3"])
143
144 if args.dump_kcd:
145   print(bus_in.get_descriptive_ascii_art())
146   exit()
147
148 signal_definitions = {}
149 for (frame_id, frame_definition) in bus_in.config.framedefinitions.items():
150   for signal_definition in frame_definition.signaldefinitions:
151     signal_definitions[signal_definition.signalname] = signal_definition
152
153 checksum_signals = {}
154 checksum_signals[0x1db] = signal_definitions["1db Checksum"];
155 checksum_signals[0x1dc] = signal_definitions["1dc Checksum"];
156 checksum_signals[0x55b] = signal_definitions["55b Checksum"];
157
158 received_signals = {}
159 sent_signals = {}
160
161 sys.stdout.write("\033[2J")
162 sys.stdout.flush()
163
164 while True:
165   received_frame = bus_in.recv_next_frame()
166   received_frame_signals = received_frame.unpack(bus_in.config.framedefinitions)
167 #  if received_frame.frame_id in [0x1db, 0x1dc, 0x55b, 0x5bc, 0x5c0]:
168 #    continue
169   #print(received_frame_signals)
170   decode_mux(received_frame_signals)
171   #print(received_frame_signals)
172
173   if received_frame.frame_id in checksum_signals:
174     verify_checksum(received_frame.frame_id, received_frame.frame_data)
175
176   for (key, value) in received_frame_signals.items():
177     received_signals[key] = value
178
179 #  replace(received_frame_signals, '_1dc 1', 0x04)
180 #  replace(received_frame_signals, '_1dc 2', 0xdf)
181 #  replace(received_frame_signals, '_1dc Mux_5_01', 0x09)
182 #  replace(received_frame_signals, '_1dc Mux_5_05', 0xd8)
183 #  replace(received_frame_signals, '_1dc Mux_5_0c', 0xcc)
184 #  replace(received_frame_signals, '_1dc Mux_6_01', 0x0d)
185 #  replace(received_frame_signals, '_1dc Mux_6_08', 0xc3)
186 #  replace(received_frame_signals, '_55b 2', 0x55)
187 #  replace(received_frame_signals, '_55c 3', 0xef)
188 #  replace(received_frame_signals, '_5c0 Mux_5_40', 0x6c)
189
190   # we have to copy the signals to the sent list before we encode the mux and
191   # then again after we updat the checksums
192   for (key, value) in received_frame_signals.items():
193     sent_signals[key] = value
194
195   encode_mux(received_frame_signals)
196   for (key, value) in received_frame_signals.items():
197     signal_definition = signal_definitions[key]
198     received_frame.set_signalvalue(signal_definition, value)
199
200   if received_frame.frame_id in checksum_signals:
201     update_checksum(received_frame, received_frame_signals)
202     #verify_checksum(received_frame.frame_data)
203
204   # Copy values to the Sent Signals again so we update the checksums
205   for (key, value) in received_frame_signals.items():
206     sent_signals[key] = value
207
208   print_signals(received_signals, sent_signals)
209
210   bus_out.send_signals(received_frame_signals)
211