1 | // SPDX-License-Identifier: GPL-2.0 |
2 | /* |
3 | * bcachefs journalling code, for btree insertions |
4 | * |
5 | * Copyright 2012 Google, Inc. |
6 | */ |
7 | |
8 | #include "bcachefs.h" |
9 | #include "alloc_foreground.h" |
10 | #include "bkey_methods.h" |
11 | #include "btree_gc.h" |
12 | #include "btree_update.h" |
13 | #include "btree_write_buffer.h" |
14 | #include "buckets.h" |
15 | #include "error.h" |
16 | #include "journal.h" |
17 | #include "journal_io.h" |
18 | #include "journal_reclaim.h" |
19 | #include "journal_sb.h" |
20 | #include "journal_seq_blacklist.h" |
21 | #include "trace.h" |
22 | |
23 | static const char * const bch2_journal_errors[] = { |
24 | #define x(n) #n, |
25 | JOURNAL_ERRORS() |
26 | #undef x |
27 | NULL |
28 | }; |
29 | |
30 | static inline bool journal_seq_unwritten(struct journal *j, u64 seq) |
31 | { |
32 | return seq > j->seq_ondisk; |
33 | } |
34 | |
35 | static bool __journal_entry_is_open(union journal_res_state state) |
36 | { |
37 | return state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL; |
38 | } |
39 | |
40 | static inline unsigned nr_unwritten_journal_entries(struct journal *j) |
41 | { |
42 | return atomic64_read(v: &j->seq) - j->seq_ondisk; |
43 | } |
44 | |
45 | static bool journal_entry_is_open(struct journal *j) |
46 | { |
47 | return __journal_entry_is_open(state: j->reservations); |
48 | } |
49 | |
50 | static void bch2_journal_buf_to_text(struct printbuf *out, struct journal *j, u64 seq) |
51 | { |
52 | union journal_res_state s = READ_ONCE(j->reservations); |
53 | unsigned i = seq & JOURNAL_BUF_MASK; |
54 | struct journal_buf *buf = j->buf + i; |
55 | |
56 | prt_str(out, str: "seq:" ); |
57 | prt_tab(out); |
58 | prt_printf(out, "%llu" , seq); |
59 | prt_newline(out); |
60 | printbuf_indent_add(out, 2); |
61 | |
62 | prt_str(out, str: "refcount:" ); |
63 | prt_tab(out); |
64 | prt_printf(out, "%u" , journal_state_count(s, i)); |
65 | prt_newline(out); |
66 | |
67 | prt_str(out, str: "size:" ); |
68 | prt_tab(out); |
69 | prt_human_readable_u64(out, vstruct_bytes(buf->data)); |
70 | prt_newline(out); |
71 | |
72 | prt_str(out, str: "expires:" ); |
73 | prt_tab(out); |
74 | prt_printf(out, "%li jiffies" , buf->expires - jiffies); |
75 | prt_newline(out); |
76 | |
77 | prt_str(out, str: "flags:" ); |
78 | prt_tab(out); |
79 | if (buf->noflush) |
80 | prt_str(out, str: "noflush " ); |
81 | if (buf->must_flush) |
82 | prt_str(out, str: "must_flush " ); |
83 | if (buf->separate_flush) |
84 | prt_str(out, str: "separate_flush " ); |
85 | if (buf->need_flush_to_write_buffer) |
86 | prt_str(out, str: "need_flush_to_write_buffer " ); |
87 | if (buf->write_started) |
88 | prt_str(out, str: "write_started " ); |
89 | if (buf->write_allocated) |
90 | prt_str(out, str: "write allocated " ); |
91 | if (buf->write_done) |
92 | prt_str(out, str: "write done" ); |
93 | prt_newline(out); |
94 | |
95 | printbuf_indent_sub(out, 2); |
96 | } |
97 | |
98 | static void bch2_journal_bufs_to_text(struct printbuf *out, struct journal *j) |
99 | { |
100 | if (!out->nr_tabstops) |
101 | printbuf_tabstop_push(out, 24); |
102 | |
103 | for (u64 seq = journal_last_unwritten_seq(j); |
104 | seq <= journal_cur_seq(j); |
105 | seq++) |
106 | bch2_journal_buf_to_text(out, j, seq); |
107 | prt_printf(out, "last buf %s\n" , journal_entry_is_open(j) ? "open" : "closed" ); |
108 | } |
109 | |
110 | static inline struct journal_buf * |
111 | journal_seq_to_buf(struct journal *j, u64 seq) |
112 | { |
113 | struct journal_buf *buf = NULL; |
114 | |
115 | EBUG_ON(seq > journal_cur_seq(j)); |
116 | |
117 | if (journal_seq_unwritten(j, seq)) { |
118 | buf = j->buf + (seq & JOURNAL_BUF_MASK); |
119 | EBUG_ON(le64_to_cpu(buf->data->seq) != seq); |
120 | } |
121 | return buf; |
122 | } |
123 | |
124 | static void journal_pin_list_init(struct journal_entry_pin_list *p, int count) |
125 | { |
126 | unsigned i; |
127 | |
128 | for (i = 0; i < ARRAY_SIZE(p->list); i++) |
129 | INIT_LIST_HEAD(list: &p->list[i]); |
130 | INIT_LIST_HEAD(list: &p->flushed); |
131 | atomic_set(v: &p->count, i: count); |
132 | p->devs.nr = 0; |
133 | } |
134 | |
135 | /* |
136 | * Detect stuck journal conditions and trigger shutdown. Technically the journal |
137 | * can end up stuck for a variety of reasons, such as a blocked I/O, journal |
138 | * reservation lockup, etc. Since this is a fatal error with potentially |
139 | * unpredictable characteristics, we want to be fairly conservative before we |
140 | * decide to shut things down. |
141 | * |
142 | * Consider the journal stuck when it appears full with no ability to commit |
143 | * btree transactions, to discard journal buckets, nor acquire priority |
144 | * (reserved watermark) reservation. |
145 | */ |
146 | static inline bool |
147 | journal_error_check_stuck(struct journal *j, int error, unsigned flags) |
148 | { |
149 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
150 | bool stuck = false; |
151 | struct printbuf buf = PRINTBUF; |
152 | |
153 | if (!(error == JOURNAL_ERR_journal_full || |
154 | error == JOURNAL_ERR_journal_pin_full) || |
155 | nr_unwritten_journal_entries(j) || |
156 | (flags & BCH_WATERMARK_MASK) != BCH_WATERMARK_reclaim) |
157 | return stuck; |
158 | |
159 | spin_lock(lock: &j->lock); |
160 | |
161 | if (j->can_discard) { |
162 | spin_unlock(lock: &j->lock); |
163 | return stuck; |
164 | } |
165 | |
166 | stuck = true; |
167 | |
168 | /* |
169 | * The journal shutdown path will set ->err_seq, but do it here first to |
170 | * serialize against concurrent failures and avoid duplicate error |
171 | * reports. |
172 | */ |
173 | if (j->err_seq) { |
174 | spin_unlock(lock: &j->lock); |
175 | return stuck; |
176 | } |
177 | j->err_seq = journal_cur_seq(j); |
178 | spin_unlock(lock: &j->lock); |
179 | |
180 | bch_err(c, "Journal stuck! Hava a pre-reservation but journal full (error %s)" , |
181 | bch2_journal_errors[error]); |
182 | bch2_journal_debug_to_text(&buf, j); |
183 | bch_err(c, "%s" , buf.buf); |
184 | |
185 | printbuf_reset(buf: &buf); |
186 | bch2_journal_pins_to_text(&buf, j); |
187 | bch_err(c, "Journal pins:\n%s" , buf.buf); |
188 | printbuf_exit(&buf); |
189 | |
190 | bch2_fatal_error(c); |
191 | dump_stack(); |
192 | |
193 | return stuck; |
194 | } |
195 | |
196 | void bch2_journal_do_writes(struct journal *j) |
197 | { |
198 | for (u64 seq = journal_last_unwritten_seq(j); |
199 | seq <= journal_cur_seq(j); |
200 | seq++) { |
201 | unsigned idx = seq & JOURNAL_BUF_MASK; |
202 | struct journal_buf *w = j->buf + idx; |
203 | |
204 | if (w->write_started && !w->write_allocated) |
205 | break; |
206 | if (w->write_started) |
207 | continue; |
208 | |
209 | if (!journal_state_count(s: j->reservations, idx)) { |
210 | w->write_started = true; |
211 | closure_call(cl: &w->io, fn: bch2_journal_write, wq: j->wq, NULL); |
212 | } |
213 | |
214 | break; |
215 | } |
216 | } |
217 | |
218 | /* |
219 | * Final processing when the last reference of a journal buffer has been |
220 | * dropped. Drop the pin list reference acquired at journal entry open and write |
221 | * the buffer, if requested. |
222 | */ |
223 | void bch2_journal_buf_put_final(struct journal *j, u64 seq) |
224 | { |
225 | lockdep_assert_held(&j->lock); |
226 | |
227 | if (__bch2_journal_pin_put(j, seq)) |
228 | bch2_journal_reclaim_fast(j); |
229 | bch2_journal_do_writes(j); |
230 | } |
231 | |
232 | /* |
233 | * Returns true if journal entry is now closed: |
234 | * |
235 | * We don't close a journal_buf until the next journal_buf is finished writing, |
236 | * and can be opened again - this also initializes the next journal_buf: |
237 | */ |
238 | static void __journal_entry_close(struct journal *j, unsigned closed_val, bool trace) |
239 | { |
240 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
241 | struct journal_buf *buf = journal_cur_buf(j); |
242 | union journal_res_state old, new; |
243 | u64 v = atomic64_read(v: &j->reservations.counter); |
244 | unsigned sectors; |
245 | |
246 | BUG_ON(closed_val != JOURNAL_ENTRY_CLOSED_VAL && |
247 | closed_val != JOURNAL_ENTRY_ERROR_VAL); |
248 | |
249 | lockdep_assert_held(&j->lock); |
250 | |
251 | do { |
252 | old.v = new.v = v; |
253 | new.cur_entry_offset = closed_val; |
254 | |
255 | if (old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL || |
256 | old.cur_entry_offset == new.cur_entry_offset) |
257 | return; |
258 | } while ((v = atomic64_cmpxchg(v: &j->reservations.counter, |
259 | old: old.v, new: new.v)) != old.v); |
260 | |
261 | if (!__journal_entry_is_open(state: old)) |
262 | return; |
263 | |
264 | /* Close out old buffer: */ |
265 | buf->data->u64s = cpu_to_le32(old.cur_entry_offset); |
266 | |
267 | if (trace_journal_entry_close_enabled() && trace) { |
268 | struct printbuf pbuf = PRINTBUF; |
269 | pbuf.atomic++; |
270 | |
271 | prt_str(out: &pbuf, str: "entry size: " ); |
272 | prt_human_readable_u64(&pbuf, vstruct_bytes(buf->data)); |
273 | prt_newline(&pbuf); |
274 | bch2_prt_task_backtrace(&pbuf, current, 1, GFP_NOWAIT); |
275 | trace_journal_entry_close(c, str: pbuf.buf); |
276 | printbuf_exit(&pbuf); |
277 | } |
278 | |
279 | sectors = vstruct_blocks_plus(buf->data, c->block_bits, |
280 | buf->u64s_reserved) << c->block_bits; |
281 | BUG_ON(sectors > buf->sectors); |
282 | buf->sectors = sectors; |
283 | |
284 | /* |
285 | * We have to set last_seq here, _before_ opening a new journal entry: |
286 | * |
287 | * A threads may replace an old pin with a new pin on their current |
288 | * journal reservation - the expectation being that the journal will |
289 | * contain either what the old pin protected or what the new pin |
290 | * protects. |
291 | * |
292 | * After the old pin is dropped journal_last_seq() won't include the old |
293 | * pin, so we can only write the updated last_seq on the entry that |
294 | * contains whatever the new pin protects. |
295 | * |
296 | * Restated, we can _not_ update last_seq for a given entry if there |
297 | * could be a newer entry open with reservations/pins that have been |
298 | * taken against it. |
299 | * |
300 | * Hence, we want update/set last_seq on the current journal entry right |
301 | * before we open a new one: |
302 | */ |
303 | buf->last_seq = journal_last_seq(j); |
304 | buf->data->last_seq = cpu_to_le64(buf->last_seq); |
305 | BUG_ON(buf->last_seq > le64_to_cpu(buf->data->seq)); |
306 | |
307 | cancel_delayed_work(dwork: &j->write_work); |
308 | |
309 | bch2_journal_space_available(j); |
310 | |
311 | __bch2_journal_buf_put(j, idx: old.idx, le64_to_cpu(buf->data->seq)); |
312 | } |
313 | |
314 | void bch2_journal_halt(struct journal *j) |
315 | { |
316 | spin_lock(lock: &j->lock); |
317 | __journal_entry_close(j, JOURNAL_ENTRY_ERROR_VAL, trace: true); |
318 | if (!j->err_seq) |
319 | j->err_seq = journal_cur_seq(j); |
320 | journal_wake(j); |
321 | spin_unlock(lock: &j->lock); |
322 | } |
323 | |
324 | static bool journal_entry_want_write(struct journal *j) |
325 | { |
326 | bool ret = !journal_entry_is_open(j) || |
327 | journal_cur_seq(j) == journal_last_unwritten_seq(j); |
328 | |
329 | /* Don't close it yet if we already have a write in flight: */ |
330 | if (ret) |
331 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: true); |
332 | else if (nr_unwritten_journal_entries(j)) { |
333 | struct journal_buf *buf = journal_cur_buf(j); |
334 | |
335 | if (!buf->flush_time) { |
336 | buf->flush_time = local_clock() ?: 1; |
337 | buf->expires = jiffies; |
338 | } |
339 | } |
340 | |
341 | return ret; |
342 | } |
343 | |
344 | bool bch2_journal_entry_close(struct journal *j) |
345 | { |
346 | bool ret; |
347 | |
348 | spin_lock(lock: &j->lock); |
349 | ret = journal_entry_want_write(j); |
350 | spin_unlock(lock: &j->lock); |
351 | |
352 | return ret; |
353 | } |
354 | |
355 | /* |
356 | * should _only_ called from journal_res_get() - when we actually want a |
357 | * journal reservation - journal entry is open means journal is dirty: |
358 | */ |
359 | static int journal_entry_open(struct journal *j) |
360 | { |
361 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
362 | struct journal_buf *buf = j->buf + |
363 | ((journal_cur_seq(j) + 1) & JOURNAL_BUF_MASK); |
364 | union journal_res_state old, new; |
365 | int u64s; |
366 | u64 v; |
367 | |
368 | lockdep_assert_held(&j->lock); |
369 | BUG_ON(journal_entry_is_open(j)); |
370 | BUG_ON(BCH_SB_CLEAN(c->disk_sb.sb)); |
371 | |
372 | if (j->blocked) |
373 | return JOURNAL_ERR_blocked; |
374 | |
375 | if (j->cur_entry_error) |
376 | return j->cur_entry_error; |
377 | |
378 | if (bch2_journal_error(j)) |
379 | return JOURNAL_ERR_insufficient_devices; /* -EROFS */ |
380 | |
381 | if (!fifo_free(&j->pin)) |
382 | return JOURNAL_ERR_journal_pin_full; |
383 | |
384 | if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf)) |
385 | return JOURNAL_ERR_max_in_flight; |
386 | |
387 | BUG_ON(!j->cur_entry_sectors); |
388 | |
389 | buf->expires = |
390 | (journal_cur_seq(j) == j->flushed_seq_ondisk |
391 | ? jiffies |
392 | : j->last_flush_write) + |
393 | msecs_to_jiffies(m: c->opts.journal_flush_delay); |
394 | |
395 | buf->u64s_reserved = j->entry_u64s_reserved; |
396 | buf->disk_sectors = j->cur_entry_sectors; |
397 | buf->sectors = min(buf->disk_sectors, buf->buf_size >> 9); |
398 | |
399 | u64s = (int) (buf->sectors << 9) / sizeof(u64) - |
400 | journal_entry_overhead(j); |
401 | u64s = clamp_t(int, u64s, 0, JOURNAL_ENTRY_CLOSED_VAL - 1); |
402 | |
403 | if (u64s <= (ssize_t) j->early_journal_entries.nr) |
404 | return JOURNAL_ERR_journal_full; |
405 | |
406 | if (fifo_empty(&j->pin) && j->reclaim_thread) |
407 | wake_up_process(tsk: j->reclaim_thread); |
408 | |
409 | /* |
410 | * The fifo_push() needs to happen at the same time as j->seq is |
411 | * incremented for journal_last_seq() to be calculated correctly |
412 | */ |
413 | atomic64_inc(v: &j->seq); |
414 | journal_pin_list_init(fifo_push_ref(&j->pin), count: 1); |
415 | |
416 | BUG_ON(j->pin.back - 1 != atomic64_read(&j->seq)); |
417 | |
418 | BUG_ON(j->buf + (journal_cur_seq(j) & JOURNAL_BUF_MASK) != buf); |
419 | |
420 | bkey_extent_init(k: &buf->key); |
421 | buf->noflush = false; |
422 | buf->must_flush = false; |
423 | buf->separate_flush = false; |
424 | buf->flush_time = 0; |
425 | buf->need_flush_to_write_buffer = true; |
426 | buf->write_started = false; |
427 | buf->write_allocated = false; |
428 | buf->write_done = false; |
429 | |
430 | memset(buf->data, 0, sizeof(*buf->data)); |
431 | buf->data->seq = cpu_to_le64(journal_cur_seq(j)); |
432 | buf->data->u64s = 0; |
433 | |
434 | if (j->early_journal_entries.nr) { |
435 | memcpy(buf->data->_data, j->early_journal_entries.data, |
436 | j->early_journal_entries.nr * sizeof(u64)); |
437 | le32_add_cpu(var: &buf->data->u64s, val: j->early_journal_entries.nr); |
438 | } |
439 | |
440 | /* |
441 | * Must be set before marking the journal entry as open: |
442 | */ |
443 | j->cur_entry_u64s = u64s; |
444 | |
445 | v = atomic64_read(v: &j->reservations.counter); |
446 | do { |
447 | old.v = new.v = v; |
448 | |
449 | BUG_ON(old.cur_entry_offset == JOURNAL_ENTRY_ERROR_VAL); |
450 | |
451 | new.idx++; |
452 | BUG_ON(journal_state_count(new, new.idx)); |
453 | BUG_ON(new.idx != (journal_cur_seq(j) & JOURNAL_BUF_MASK)); |
454 | |
455 | journal_state_inc(s: &new); |
456 | |
457 | /* Handle any already added entries */ |
458 | new.cur_entry_offset = le32_to_cpu(buf->data->u64s); |
459 | } while ((v = atomic64_cmpxchg(v: &j->reservations.counter, |
460 | old: old.v, new: new.v)) != old.v); |
461 | |
462 | if (nr_unwritten_journal_entries(j) == 1) |
463 | mod_delayed_work(wq: j->wq, |
464 | dwork: &j->write_work, |
465 | delay: msecs_to_jiffies(m: c->opts.journal_flush_delay)); |
466 | journal_wake(j); |
467 | |
468 | if (j->early_journal_entries.nr) |
469 | darray_exit(&j->early_journal_entries); |
470 | return 0; |
471 | } |
472 | |
473 | static bool journal_quiesced(struct journal *j) |
474 | { |
475 | bool ret = atomic64_read(v: &j->seq) == j->seq_ondisk; |
476 | |
477 | if (!ret) |
478 | bch2_journal_entry_close(j); |
479 | return ret; |
480 | } |
481 | |
482 | static void journal_quiesce(struct journal *j) |
483 | { |
484 | wait_event(j->wait, journal_quiesced(j)); |
485 | } |
486 | |
487 | static void journal_write_work(struct work_struct *work) |
488 | { |
489 | struct journal *j = container_of(work, struct journal, write_work.work); |
490 | |
491 | spin_lock(lock: &j->lock); |
492 | if (__journal_entry_is_open(state: j->reservations)) { |
493 | long delta = journal_cur_buf(j)->expires - jiffies; |
494 | |
495 | if (delta > 0) |
496 | mod_delayed_work(wq: j->wq, dwork: &j->write_work, delay: delta); |
497 | else |
498 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: true); |
499 | } |
500 | spin_unlock(lock: &j->lock); |
501 | } |
502 | |
503 | static int __journal_res_get(struct journal *j, struct journal_res *res, |
504 | unsigned flags) |
505 | { |
506 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
507 | struct journal_buf *buf; |
508 | bool can_discard; |
509 | int ret; |
510 | retry: |
511 | if (journal_res_get_fast(j, res, flags)) |
512 | return 0; |
513 | |
514 | if (bch2_journal_error(j)) |
515 | return -BCH_ERR_erofs_journal_err; |
516 | |
517 | if (j->blocked) |
518 | return -BCH_ERR_journal_res_get_blocked; |
519 | |
520 | if ((flags & BCH_WATERMARK_MASK) < j->watermark) { |
521 | ret = JOURNAL_ERR_journal_full; |
522 | can_discard = j->can_discard; |
523 | goto out; |
524 | } |
525 | |
526 | if (nr_unwritten_journal_entries(j) == ARRAY_SIZE(j->buf) && !journal_entry_is_open(j)) { |
527 | ret = JOURNAL_ERR_max_in_flight; |
528 | goto out; |
529 | } |
530 | |
531 | spin_lock(lock: &j->lock); |
532 | |
533 | /* |
534 | * Recheck after taking the lock, so we don't race with another thread |
535 | * that just did journal_entry_open() and call bch2_journal_entry_close() |
536 | * unnecessarily |
537 | */ |
538 | if (journal_res_get_fast(j, res, flags)) { |
539 | ret = 0; |
540 | goto unlock; |
541 | } |
542 | |
543 | /* |
544 | * If we couldn't get a reservation because the current buf filled up, |
545 | * and we had room for a bigger entry on disk, signal that we want to |
546 | * realloc the journal bufs: |
547 | */ |
548 | buf = journal_cur_buf(j); |
549 | if (journal_entry_is_open(j) && |
550 | buf->buf_size >> 9 < buf->disk_sectors && |
551 | buf->buf_size < JOURNAL_ENTRY_SIZE_MAX) |
552 | j->buf_size_want = max(j->buf_size_want, buf->buf_size << 1); |
553 | |
554 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: false); |
555 | ret = journal_entry_open(j) ?: JOURNAL_ERR_retry; |
556 | unlock: |
557 | can_discard = j->can_discard; |
558 | spin_unlock(lock: &j->lock); |
559 | out: |
560 | if (ret == JOURNAL_ERR_retry) |
561 | goto retry; |
562 | if (!ret) |
563 | return 0; |
564 | |
565 | if (journal_error_check_stuck(j, error: ret, flags)) |
566 | ret = -BCH_ERR_journal_res_get_blocked; |
567 | |
568 | if (ret == JOURNAL_ERR_max_in_flight && |
569 | track_event_change(stats: &c->times[BCH_TIME_blocked_journal_max_in_flight], v: true)) { |
570 | |
571 | struct printbuf buf = PRINTBUF; |
572 | prt_printf(&buf, "seq %llu\n" , journal_cur_seq(j)); |
573 | bch2_journal_bufs_to_text(out: &buf, j); |
574 | trace_journal_entry_full(c, str: buf.buf); |
575 | printbuf_exit(&buf); |
576 | count_event(c, journal_entry_full); |
577 | } |
578 | |
579 | /* |
580 | * Journal is full - can't rely on reclaim from work item due to |
581 | * freezing: |
582 | */ |
583 | if ((ret == JOURNAL_ERR_journal_full || |
584 | ret == JOURNAL_ERR_journal_pin_full) && |
585 | !(flags & JOURNAL_RES_GET_NONBLOCK)) { |
586 | if (can_discard) { |
587 | bch2_journal_do_discards(j); |
588 | goto retry; |
589 | } |
590 | |
591 | if (mutex_trylock(lock: &j->reclaim_lock)) { |
592 | bch2_journal_reclaim(j); |
593 | mutex_unlock(lock: &j->reclaim_lock); |
594 | } |
595 | } |
596 | |
597 | return ret == JOURNAL_ERR_insufficient_devices |
598 | ? -BCH_ERR_erofs_journal_err |
599 | : -BCH_ERR_journal_res_get_blocked; |
600 | } |
601 | |
602 | /* |
603 | * Essentially the entry function to the journaling code. When bcachefs is doing |
604 | * a btree insert, it calls this function to get the current journal write. |
605 | * Journal write is the structure used set up journal writes. The calling |
606 | * function will then add its keys to the structure, queuing them for the next |
607 | * write. |
608 | * |
609 | * To ensure forward progress, the current task must not be holding any |
610 | * btree node write locks. |
611 | */ |
612 | int bch2_journal_res_get_slowpath(struct journal *j, struct journal_res *res, |
613 | unsigned flags) |
614 | { |
615 | int ret; |
616 | |
617 | closure_wait_event(&j->async_wait, |
618 | (ret = __journal_res_get(j, res, flags)) != -BCH_ERR_journal_res_get_blocked || |
619 | (flags & JOURNAL_RES_GET_NONBLOCK)); |
620 | return ret; |
621 | } |
622 | |
623 | /* journal_entry_res: */ |
624 | |
625 | void bch2_journal_entry_res_resize(struct journal *j, |
626 | struct journal_entry_res *res, |
627 | unsigned new_u64s) |
628 | { |
629 | union journal_res_state state; |
630 | int d = new_u64s - res->u64s; |
631 | |
632 | spin_lock(lock: &j->lock); |
633 | |
634 | j->entry_u64s_reserved += d; |
635 | if (d <= 0) |
636 | goto out; |
637 | |
638 | j->cur_entry_u64s = max_t(int, 0, j->cur_entry_u64s - d); |
639 | smp_mb(); |
640 | state = READ_ONCE(j->reservations); |
641 | |
642 | if (state.cur_entry_offset < JOURNAL_ENTRY_CLOSED_VAL && |
643 | state.cur_entry_offset > j->cur_entry_u64s) { |
644 | j->cur_entry_u64s += d; |
645 | /* |
646 | * Not enough room in current journal entry, have to flush it: |
647 | */ |
648 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: true); |
649 | } else { |
650 | journal_cur_buf(j)->u64s_reserved += d; |
651 | } |
652 | out: |
653 | spin_unlock(lock: &j->lock); |
654 | res->u64s += d; |
655 | } |
656 | |
657 | /* journal flushing: */ |
658 | |
659 | /** |
660 | * bch2_journal_flush_seq_async - wait for a journal entry to be written |
661 | * @j: journal object |
662 | * @seq: seq to flush |
663 | * @parent: closure object to wait with |
664 | * Returns: 1 if @seq has already been flushed, 0 if @seq is being flushed, |
665 | * -EIO if @seq will never be flushed |
666 | * |
667 | * Like bch2_journal_wait_on_seq, except that it triggers a write immediately if |
668 | * necessary |
669 | */ |
670 | int bch2_journal_flush_seq_async(struct journal *j, u64 seq, |
671 | struct closure *parent) |
672 | { |
673 | struct journal_buf *buf; |
674 | int ret = 0; |
675 | |
676 | if (seq <= j->flushed_seq_ondisk) |
677 | return 1; |
678 | |
679 | spin_lock(lock: &j->lock); |
680 | |
681 | if (WARN_ONCE(seq > journal_cur_seq(j), |
682 | "requested to flush journal seq %llu, but currently at %llu" , |
683 | seq, journal_cur_seq(j))) |
684 | goto out; |
685 | |
686 | /* Recheck under lock: */ |
687 | if (j->err_seq && seq >= j->err_seq) { |
688 | ret = -EIO; |
689 | goto out; |
690 | } |
691 | |
692 | if (seq <= j->flushed_seq_ondisk) { |
693 | ret = 1; |
694 | goto out; |
695 | } |
696 | |
697 | /* if seq was written, but not flushed - flush a newer one instead */ |
698 | seq = max(seq, journal_last_unwritten_seq(j)); |
699 | |
700 | recheck_need_open: |
701 | if (seq > journal_cur_seq(j)) { |
702 | struct journal_res res = { 0 }; |
703 | |
704 | if (journal_entry_is_open(j)) |
705 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: true); |
706 | |
707 | spin_unlock(lock: &j->lock); |
708 | |
709 | ret = bch2_journal_res_get(j, res: &res, u64s: jset_u64s(u64s: 0), flags: 0); |
710 | if (ret) |
711 | return ret; |
712 | |
713 | seq = res.seq; |
714 | buf = journal_seq_to_buf(j, seq); |
715 | buf->must_flush = true; |
716 | |
717 | if (!buf->flush_time) { |
718 | buf->flush_time = local_clock() ?: 1; |
719 | buf->expires = jiffies; |
720 | } |
721 | |
722 | if (parent && !closure_wait(list: &buf->wait, cl: parent)) |
723 | BUG(); |
724 | |
725 | bch2_journal_res_put(j, res: &res); |
726 | |
727 | spin_lock(lock: &j->lock); |
728 | goto want_write; |
729 | } |
730 | |
731 | /* |
732 | * if write was kicked off without a flush, or if we promised it |
733 | * wouldn't be a flush, flush the next sequence number instead |
734 | */ |
735 | buf = journal_seq_to_buf(j, seq); |
736 | if (buf->noflush) { |
737 | seq++; |
738 | goto recheck_need_open; |
739 | } |
740 | |
741 | buf->must_flush = true; |
742 | |
743 | if (parent && !closure_wait(list: &buf->wait, cl: parent)) |
744 | BUG(); |
745 | want_write: |
746 | if (seq == journal_cur_seq(j)) |
747 | journal_entry_want_write(j); |
748 | out: |
749 | spin_unlock(lock: &j->lock); |
750 | return ret; |
751 | } |
752 | |
753 | int bch2_journal_flush_seq(struct journal *j, u64 seq) |
754 | { |
755 | u64 start_time = local_clock(); |
756 | int ret, ret2; |
757 | |
758 | /* |
759 | * Don't update time_stats when @seq is already flushed: |
760 | */ |
761 | if (seq <= j->flushed_seq_ondisk) |
762 | return 0; |
763 | |
764 | ret = wait_event_interruptible(j->wait, (ret2 = bch2_journal_flush_seq_async(j, seq, NULL))); |
765 | |
766 | if (!ret) |
767 | bch2_time_stats_update(stats: j->flush_seq_time, start: start_time); |
768 | |
769 | return ret ?: ret2 < 0 ? ret2 : 0; |
770 | } |
771 | |
772 | /* |
773 | * bch2_journal_flush_async - if there is an open journal entry, or a journal |
774 | * still being written, write it and wait for the write to complete |
775 | */ |
776 | void bch2_journal_flush_async(struct journal *j, struct closure *parent) |
777 | { |
778 | bch2_journal_flush_seq_async(j, seq: atomic64_read(v: &j->seq), parent); |
779 | } |
780 | |
781 | int bch2_journal_flush(struct journal *j) |
782 | { |
783 | return bch2_journal_flush_seq(j, seq: atomic64_read(v: &j->seq)); |
784 | } |
785 | |
786 | /* |
787 | * bch2_journal_noflush_seq - tell the journal not to issue any flushes before |
788 | * @seq |
789 | */ |
790 | bool bch2_journal_noflush_seq(struct journal *j, u64 seq) |
791 | { |
792 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
793 | u64 unwritten_seq; |
794 | bool ret = false; |
795 | |
796 | if (!(c->sb.features & (1ULL << BCH_FEATURE_journal_no_flush))) |
797 | return false; |
798 | |
799 | if (seq <= c->journal.flushed_seq_ondisk) |
800 | return false; |
801 | |
802 | spin_lock(lock: &j->lock); |
803 | if (seq <= c->journal.flushed_seq_ondisk) |
804 | goto out; |
805 | |
806 | for (unwritten_seq = journal_last_unwritten_seq(j); |
807 | unwritten_seq < seq; |
808 | unwritten_seq++) { |
809 | struct journal_buf *buf = journal_seq_to_buf(j, seq: unwritten_seq); |
810 | |
811 | /* journal flush already in flight, or flush requseted */ |
812 | if (buf->must_flush) |
813 | goto out; |
814 | |
815 | buf->noflush = true; |
816 | } |
817 | |
818 | ret = true; |
819 | out: |
820 | spin_unlock(lock: &j->lock); |
821 | return ret; |
822 | } |
823 | |
824 | int bch2_journal_meta(struct journal *j) |
825 | { |
826 | struct journal_buf *buf; |
827 | struct journal_res res; |
828 | int ret; |
829 | |
830 | memset(&res, 0, sizeof(res)); |
831 | |
832 | ret = bch2_journal_res_get(j, res: &res, u64s: jset_u64s(u64s: 0), flags: 0); |
833 | if (ret) |
834 | return ret; |
835 | |
836 | buf = j->buf + (res.seq & JOURNAL_BUF_MASK); |
837 | buf->must_flush = true; |
838 | |
839 | if (!buf->flush_time) { |
840 | buf->flush_time = local_clock() ?: 1; |
841 | buf->expires = jiffies; |
842 | } |
843 | |
844 | bch2_journal_res_put(j, res: &res); |
845 | |
846 | return bch2_journal_flush_seq(j, seq: res.seq); |
847 | } |
848 | |
849 | /* block/unlock the journal: */ |
850 | |
851 | void bch2_journal_unblock(struct journal *j) |
852 | { |
853 | spin_lock(lock: &j->lock); |
854 | j->blocked--; |
855 | spin_unlock(lock: &j->lock); |
856 | |
857 | journal_wake(j); |
858 | } |
859 | |
860 | void bch2_journal_block(struct journal *j) |
861 | { |
862 | spin_lock(lock: &j->lock); |
863 | j->blocked++; |
864 | spin_unlock(lock: &j->lock); |
865 | |
866 | journal_quiesce(j); |
867 | } |
868 | |
869 | static struct journal_buf *__bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq) |
870 | { |
871 | struct journal_buf *ret = NULL; |
872 | |
873 | mutex_lock(&j->buf_lock); |
874 | spin_lock(lock: &j->lock); |
875 | max_seq = min(max_seq, journal_cur_seq(j)); |
876 | |
877 | for (u64 seq = journal_last_unwritten_seq(j); |
878 | seq <= max_seq; |
879 | seq++) { |
880 | unsigned idx = seq & JOURNAL_BUF_MASK; |
881 | struct journal_buf *buf = j->buf + idx; |
882 | |
883 | if (buf->need_flush_to_write_buffer) { |
884 | if (seq == journal_cur_seq(j)) |
885 | __journal_entry_close(j, JOURNAL_ENTRY_CLOSED_VAL, trace: true); |
886 | |
887 | union journal_res_state s; |
888 | s.v = atomic64_read_acquire(v: &j->reservations.counter); |
889 | |
890 | ret = journal_state_count(s, idx) |
891 | ? ERR_PTR(error: -EAGAIN) |
892 | : buf; |
893 | break; |
894 | } |
895 | } |
896 | |
897 | spin_unlock(lock: &j->lock); |
898 | if (IS_ERR_OR_NULL(ptr: ret)) |
899 | mutex_unlock(lock: &j->buf_lock); |
900 | return ret; |
901 | } |
902 | |
903 | struct journal_buf *bch2_next_write_buffer_flush_journal_buf(struct journal *j, u64 max_seq) |
904 | { |
905 | struct journal_buf *ret; |
906 | |
907 | wait_event(j->wait, (ret = __bch2_next_write_buffer_flush_journal_buf(j, max_seq)) != ERR_PTR(-EAGAIN)); |
908 | return ret; |
909 | } |
910 | |
911 | /* allocate journal on a device: */ |
912 | |
913 | static int __bch2_set_nr_journal_buckets(struct bch_dev *ca, unsigned nr, |
914 | bool new_fs, struct closure *cl) |
915 | { |
916 | struct bch_fs *c = ca->fs; |
917 | struct journal_device *ja = &ca->journal; |
918 | u64 *new_bucket_seq = NULL, *new_buckets = NULL; |
919 | struct open_bucket **ob = NULL; |
920 | long *bu = NULL; |
921 | unsigned i, pos, nr_got = 0, nr_want = nr - ja->nr; |
922 | int ret = 0; |
923 | |
924 | BUG_ON(nr <= ja->nr); |
925 | |
926 | bu = kcalloc(n: nr_want, size: sizeof(*bu), GFP_KERNEL); |
927 | ob = kcalloc(n: nr_want, size: sizeof(*ob), GFP_KERNEL); |
928 | new_buckets = kcalloc(n: nr, size: sizeof(u64), GFP_KERNEL); |
929 | new_bucket_seq = kcalloc(n: nr, size: sizeof(u64), GFP_KERNEL); |
930 | if (!bu || !ob || !new_buckets || !new_bucket_seq) { |
931 | ret = -BCH_ERR_ENOMEM_set_nr_journal_buckets; |
932 | goto err_free; |
933 | } |
934 | |
935 | for (nr_got = 0; nr_got < nr_want; nr_got++) { |
936 | if (new_fs) { |
937 | bu[nr_got] = bch2_bucket_alloc_new_fs(ca); |
938 | if (bu[nr_got] < 0) { |
939 | ret = -BCH_ERR_ENOSPC_bucket_alloc; |
940 | break; |
941 | } |
942 | } else { |
943 | ob[nr_got] = bch2_bucket_alloc(c, ca, BCH_WATERMARK_normal, cl); |
944 | ret = PTR_ERR_OR_ZERO(ptr: ob[nr_got]); |
945 | if (ret) |
946 | break; |
947 | |
948 | ret = bch2_trans_run(c, |
949 | bch2_trans_mark_metadata_bucket(trans, ca, |
950 | ob[nr_got]->bucket, BCH_DATA_journal, |
951 | ca->mi.bucket_size)); |
952 | if (ret) { |
953 | bch2_open_bucket_put(c, ob: ob[nr_got]); |
954 | bch_err_msg(c, ret, "marking new journal buckets" ); |
955 | break; |
956 | } |
957 | |
958 | bu[nr_got] = ob[nr_got]->bucket; |
959 | } |
960 | } |
961 | |
962 | if (!nr_got) |
963 | goto err_free; |
964 | |
965 | /* Don't return an error if we successfully allocated some buckets: */ |
966 | ret = 0; |
967 | |
968 | if (c) { |
969 | bch2_journal_flush_all_pins(j: &c->journal); |
970 | bch2_journal_block(j: &c->journal); |
971 | mutex_lock(&c->sb_lock); |
972 | } |
973 | |
974 | memcpy(new_buckets, ja->buckets, ja->nr * sizeof(u64)); |
975 | memcpy(new_bucket_seq, ja->bucket_seq, ja->nr * sizeof(u64)); |
976 | |
977 | BUG_ON(ja->discard_idx > ja->nr); |
978 | |
979 | pos = ja->discard_idx ?: ja->nr; |
980 | |
981 | memmove(new_buckets + pos + nr_got, |
982 | new_buckets + pos, |
983 | sizeof(new_buckets[0]) * (ja->nr - pos)); |
984 | memmove(new_bucket_seq + pos + nr_got, |
985 | new_bucket_seq + pos, |
986 | sizeof(new_bucket_seq[0]) * (ja->nr - pos)); |
987 | |
988 | for (i = 0; i < nr_got; i++) { |
989 | new_buckets[pos + i] = bu[i]; |
990 | new_bucket_seq[pos + i] = 0; |
991 | } |
992 | |
993 | nr = ja->nr + nr_got; |
994 | |
995 | ret = bch2_journal_buckets_to_sb(c, ca, new_buckets, nr); |
996 | if (ret) |
997 | goto err_unblock; |
998 | |
999 | if (!new_fs) |
1000 | bch2_write_super(c); |
1001 | |
1002 | /* Commit: */ |
1003 | if (c) |
1004 | spin_lock(lock: &c->journal.lock); |
1005 | |
1006 | swap(new_buckets, ja->buckets); |
1007 | swap(new_bucket_seq, ja->bucket_seq); |
1008 | ja->nr = nr; |
1009 | |
1010 | if (pos <= ja->discard_idx) |
1011 | ja->discard_idx = (ja->discard_idx + nr_got) % ja->nr; |
1012 | if (pos <= ja->dirty_idx_ondisk) |
1013 | ja->dirty_idx_ondisk = (ja->dirty_idx_ondisk + nr_got) % ja->nr; |
1014 | if (pos <= ja->dirty_idx) |
1015 | ja->dirty_idx = (ja->dirty_idx + nr_got) % ja->nr; |
1016 | if (pos <= ja->cur_idx) |
1017 | ja->cur_idx = (ja->cur_idx + nr_got) % ja->nr; |
1018 | |
1019 | if (c) |
1020 | spin_unlock(lock: &c->journal.lock); |
1021 | err_unblock: |
1022 | if (c) { |
1023 | bch2_journal_unblock(j: &c->journal); |
1024 | mutex_unlock(lock: &c->sb_lock); |
1025 | } |
1026 | |
1027 | if (ret && !new_fs) |
1028 | for (i = 0; i < nr_got; i++) |
1029 | bch2_trans_run(c, |
1030 | bch2_trans_mark_metadata_bucket(trans, ca, |
1031 | bu[i], BCH_DATA_free, 0)); |
1032 | err_free: |
1033 | if (!new_fs) |
1034 | for (i = 0; i < nr_got; i++) |
1035 | bch2_open_bucket_put(c, ob: ob[i]); |
1036 | |
1037 | kfree(objp: new_bucket_seq); |
1038 | kfree(objp: new_buckets); |
1039 | kfree(objp: ob); |
1040 | kfree(objp: bu); |
1041 | return ret; |
1042 | } |
1043 | |
1044 | /* |
1045 | * Allocate more journal space at runtime - not currently making use if it, but |
1046 | * the code works: |
1047 | */ |
1048 | int bch2_set_nr_journal_buckets(struct bch_fs *c, struct bch_dev *ca, |
1049 | unsigned nr) |
1050 | { |
1051 | struct journal_device *ja = &ca->journal; |
1052 | struct closure cl; |
1053 | int ret = 0; |
1054 | |
1055 | closure_init_stack(cl: &cl); |
1056 | |
1057 | down_write(sem: &c->state_lock); |
1058 | |
1059 | /* don't handle reducing nr of buckets yet: */ |
1060 | if (nr < ja->nr) |
1061 | goto unlock; |
1062 | |
1063 | while (ja->nr < nr) { |
1064 | struct disk_reservation disk_res = { 0, 0, 0 }; |
1065 | |
1066 | /* |
1067 | * note: journal buckets aren't really counted as _sectors_ used yet, so |
1068 | * we don't need the disk reservation to avoid the BUG_ON() in buckets.c |
1069 | * when space used goes up without a reservation - but we do need the |
1070 | * reservation to ensure we'll actually be able to allocate: |
1071 | * |
1072 | * XXX: that's not right, disk reservations only ensure a |
1073 | * filesystem-wide allocation will succeed, this is a device |
1074 | * specific allocation - we can hang here: |
1075 | */ |
1076 | |
1077 | ret = bch2_disk_reservation_get(c, res: &disk_res, |
1078 | sectors: bucket_to_sector(ca, b: nr - ja->nr), nr_replicas: 1, flags: 0); |
1079 | if (ret) |
1080 | break; |
1081 | |
1082 | ret = __bch2_set_nr_journal_buckets(ca, nr, new_fs: false, cl: &cl); |
1083 | |
1084 | bch2_disk_reservation_put(c, res: &disk_res); |
1085 | |
1086 | closure_sync(cl: &cl); |
1087 | |
1088 | if (ret && ret != -BCH_ERR_bucket_alloc_blocked) |
1089 | break; |
1090 | } |
1091 | |
1092 | bch_err_fn(c, ret); |
1093 | unlock: |
1094 | up_write(sem: &c->state_lock); |
1095 | return ret; |
1096 | } |
1097 | |
1098 | int bch2_dev_journal_alloc(struct bch_dev *ca) |
1099 | { |
1100 | unsigned nr; |
1101 | int ret; |
1102 | |
1103 | if (dynamic_fault("bcachefs:add:journal_alloc" )) { |
1104 | ret = -BCH_ERR_ENOMEM_set_nr_journal_buckets; |
1105 | goto err; |
1106 | } |
1107 | |
1108 | /* 1/128th of the device by default: */ |
1109 | nr = ca->mi.nbuckets >> 7; |
1110 | |
1111 | /* |
1112 | * clamp journal size to 8192 buckets or 8GB (in sectors), whichever |
1113 | * is smaller: |
1114 | */ |
1115 | nr = clamp_t(unsigned, nr, |
1116 | BCH_JOURNAL_BUCKETS_MIN, |
1117 | min(1 << 13, |
1118 | (1 << 24) / ca->mi.bucket_size)); |
1119 | |
1120 | ret = __bch2_set_nr_journal_buckets(ca, nr, new_fs: true, NULL); |
1121 | err: |
1122 | bch_err_fn(ca, ret); |
1123 | return ret; |
1124 | } |
1125 | |
1126 | int bch2_fs_journal_alloc(struct bch_fs *c) |
1127 | { |
1128 | for_each_online_member(c, ca) { |
1129 | if (ca->journal.nr) |
1130 | continue; |
1131 | |
1132 | int ret = bch2_dev_journal_alloc(ca); |
1133 | if (ret) { |
1134 | percpu_ref_put(ref: &ca->io_ref); |
1135 | return ret; |
1136 | } |
1137 | } |
1138 | |
1139 | return 0; |
1140 | } |
1141 | |
1142 | /* startup/shutdown: */ |
1143 | |
1144 | static bool bch2_journal_writing_to_device(struct journal *j, unsigned dev_idx) |
1145 | { |
1146 | bool ret = false; |
1147 | u64 seq; |
1148 | |
1149 | spin_lock(lock: &j->lock); |
1150 | for (seq = journal_last_unwritten_seq(j); |
1151 | seq <= journal_cur_seq(j) && !ret; |
1152 | seq++) { |
1153 | struct journal_buf *buf = journal_seq_to_buf(j, seq); |
1154 | |
1155 | if (bch2_bkey_has_device_c(bkey_i_to_s_c(k: &buf->key), dev_idx)) |
1156 | ret = true; |
1157 | } |
1158 | spin_unlock(lock: &j->lock); |
1159 | |
1160 | return ret; |
1161 | } |
1162 | |
1163 | void bch2_dev_journal_stop(struct journal *j, struct bch_dev *ca) |
1164 | { |
1165 | wait_event(j->wait, !bch2_journal_writing_to_device(j, ca->dev_idx)); |
1166 | } |
1167 | |
1168 | void bch2_fs_journal_stop(struct journal *j) |
1169 | { |
1170 | bch2_journal_reclaim_stop(j); |
1171 | bch2_journal_flush_all_pins(j); |
1172 | |
1173 | wait_event(j->wait, bch2_journal_entry_close(j)); |
1174 | |
1175 | /* |
1176 | * Always write a new journal entry, to make sure the clock hands are up |
1177 | * to date (and match the superblock) |
1178 | */ |
1179 | bch2_journal_meta(j); |
1180 | |
1181 | journal_quiesce(j); |
1182 | |
1183 | BUG_ON(!bch2_journal_error(j) && |
1184 | test_bit(JOURNAL_REPLAY_DONE, &j->flags) && |
1185 | j->last_empty_seq != journal_cur_seq(j)); |
1186 | |
1187 | cancel_delayed_work_sync(dwork: &j->write_work); |
1188 | } |
1189 | |
1190 | int bch2_fs_journal_start(struct journal *j, u64 cur_seq) |
1191 | { |
1192 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
1193 | struct journal_entry_pin_list *p; |
1194 | struct journal_replay *i, **_i; |
1195 | struct genradix_iter iter; |
1196 | bool had_entries = false; |
1197 | u64 last_seq = cur_seq, nr, seq; |
1198 | |
1199 | genradix_for_each_reverse(&c->journal_entries, iter, _i) { |
1200 | i = *_i; |
1201 | |
1202 | if (journal_replay_ignore(i)) |
1203 | continue; |
1204 | |
1205 | last_seq = le64_to_cpu(i->j.last_seq); |
1206 | break; |
1207 | } |
1208 | |
1209 | nr = cur_seq - last_seq; |
1210 | |
1211 | if (nr + 1 > j->pin.size) { |
1212 | free_fifo(&j->pin); |
1213 | init_fifo(&j->pin, roundup_pow_of_two(nr + 1), GFP_KERNEL); |
1214 | if (!j->pin.data) { |
1215 | bch_err(c, "error reallocating journal fifo (%llu open entries)" , nr); |
1216 | return -BCH_ERR_ENOMEM_journal_pin_fifo; |
1217 | } |
1218 | } |
1219 | |
1220 | j->replay_journal_seq = last_seq; |
1221 | j->replay_journal_seq_end = cur_seq; |
1222 | j->last_seq_ondisk = last_seq; |
1223 | j->flushed_seq_ondisk = cur_seq - 1; |
1224 | j->seq_ondisk = cur_seq - 1; |
1225 | j->pin.front = last_seq; |
1226 | j->pin.back = cur_seq; |
1227 | atomic64_set(v: &j->seq, i: cur_seq - 1); |
1228 | |
1229 | fifo_for_each_entry_ptr(p, &j->pin, seq) |
1230 | journal_pin_list_init(p, count: 1); |
1231 | |
1232 | genradix_for_each(&c->journal_entries, iter, _i) { |
1233 | i = *_i; |
1234 | |
1235 | if (journal_replay_ignore(i)) |
1236 | continue; |
1237 | |
1238 | seq = le64_to_cpu(i->j.seq); |
1239 | BUG_ON(seq >= cur_seq); |
1240 | |
1241 | if (seq < last_seq) |
1242 | continue; |
1243 | |
1244 | if (journal_entry_empty(j: &i->j)) |
1245 | j->last_empty_seq = le64_to_cpu(i->j.seq); |
1246 | |
1247 | p = journal_seq_pin(j, seq); |
1248 | |
1249 | p->devs.nr = 0; |
1250 | darray_for_each(i->ptrs, ptr) |
1251 | bch2_dev_list_add_dev(devs: &p->devs, dev: ptr->dev); |
1252 | |
1253 | had_entries = true; |
1254 | } |
1255 | |
1256 | if (!had_entries) |
1257 | j->last_empty_seq = cur_seq; |
1258 | |
1259 | spin_lock(lock: &j->lock); |
1260 | |
1261 | set_bit(nr: JOURNAL_STARTED, addr: &j->flags); |
1262 | j->last_flush_write = jiffies; |
1263 | |
1264 | j->reservations.idx = j->reservations.unwritten_idx = journal_cur_seq(j); |
1265 | j->reservations.unwritten_idx++; |
1266 | |
1267 | c->last_bucket_seq_cleanup = journal_cur_seq(j); |
1268 | |
1269 | bch2_journal_space_available(j); |
1270 | spin_unlock(lock: &j->lock); |
1271 | |
1272 | return bch2_journal_reclaim_start(j); |
1273 | } |
1274 | |
1275 | /* init/exit: */ |
1276 | |
1277 | void bch2_dev_journal_exit(struct bch_dev *ca) |
1278 | { |
1279 | struct journal_device *ja = &ca->journal; |
1280 | |
1281 | for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { |
1282 | kfree(objp: ja->bio[i]); |
1283 | ja->bio[i] = NULL; |
1284 | } |
1285 | |
1286 | kfree(objp: ja->buckets); |
1287 | kfree(objp: ja->bucket_seq); |
1288 | ja->buckets = NULL; |
1289 | ja->bucket_seq = NULL; |
1290 | } |
1291 | |
1292 | int bch2_dev_journal_init(struct bch_dev *ca, struct bch_sb *sb) |
1293 | { |
1294 | struct journal_device *ja = &ca->journal; |
1295 | struct bch_sb_field_journal *journal_buckets = |
1296 | bch2_sb_field_get(sb, journal); |
1297 | struct bch_sb_field_journal_v2 *journal_buckets_v2 = |
1298 | bch2_sb_field_get(sb, journal_v2); |
1299 | |
1300 | ja->nr = 0; |
1301 | |
1302 | if (journal_buckets_v2) { |
1303 | unsigned nr = bch2_sb_field_journal_v2_nr_entries(j: journal_buckets_v2); |
1304 | |
1305 | for (unsigned i = 0; i < nr; i++) |
1306 | ja->nr += le64_to_cpu(journal_buckets_v2->d[i].nr); |
1307 | } else if (journal_buckets) { |
1308 | ja->nr = bch2_nr_journal_buckets(j: journal_buckets); |
1309 | } |
1310 | |
1311 | ja->bucket_seq = kcalloc(n: ja->nr, size: sizeof(u64), GFP_KERNEL); |
1312 | if (!ja->bucket_seq) |
1313 | return -BCH_ERR_ENOMEM_dev_journal_init; |
1314 | |
1315 | unsigned nr_bvecs = DIV_ROUND_UP(JOURNAL_ENTRY_SIZE_MAX, PAGE_SIZE); |
1316 | |
1317 | for (unsigned i = 0; i < ARRAY_SIZE(ja->bio); i++) { |
1318 | ja->bio[i] = kmalloc(struct_size(ja->bio[i], bio.bi_inline_vecs, |
1319 | nr_bvecs), GFP_KERNEL); |
1320 | if (!ja->bio[i]) |
1321 | return -BCH_ERR_ENOMEM_dev_journal_init; |
1322 | |
1323 | ja->bio[i]->ca = ca; |
1324 | ja->bio[i]->buf_idx = i; |
1325 | bio_init(bio: &ja->bio[i]->bio, NULL, table: ja->bio[i]->bio.bi_inline_vecs, max_vecs: nr_bvecs, opf: 0); |
1326 | } |
1327 | |
1328 | ja->buckets = kcalloc(n: ja->nr, size: sizeof(u64), GFP_KERNEL); |
1329 | if (!ja->buckets) |
1330 | return -BCH_ERR_ENOMEM_dev_journal_init; |
1331 | |
1332 | if (journal_buckets_v2) { |
1333 | unsigned nr = bch2_sb_field_journal_v2_nr_entries(j: journal_buckets_v2); |
1334 | unsigned dst = 0; |
1335 | |
1336 | for (unsigned i = 0; i < nr; i++) |
1337 | for (unsigned j = 0; j < le64_to_cpu(journal_buckets_v2->d[i].nr); j++) |
1338 | ja->buckets[dst++] = |
1339 | le64_to_cpu(journal_buckets_v2->d[i].start) + j; |
1340 | } else if (journal_buckets) { |
1341 | for (unsigned i = 0; i < ja->nr; i++) |
1342 | ja->buckets[i] = le64_to_cpu(journal_buckets->buckets[i]); |
1343 | } |
1344 | |
1345 | return 0; |
1346 | } |
1347 | |
1348 | void bch2_fs_journal_exit(struct journal *j) |
1349 | { |
1350 | if (j->wq) |
1351 | destroy_workqueue(wq: j->wq); |
1352 | |
1353 | darray_exit(&j->early_journal_entries); |
1354 | |
1355 | for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) |
1356 | kvfree(addr: j->buf[i].data); |
1357 | free_fifo(&j->pin); |
1358 | } |
1359 | |
1360 | int bch2_fs_journal_init(struct journal *j) |
1361 | { |
1362 | static struct lock_class_key res_key; |
1363 | |
1364 | mutex_init(&j->buf_lock); |
1365 | spin_lock_init(&j->lock); |
1366 | spin_lock_init(&j->err_lock); |
1367 | init_waitqueue_head(&j->wait); |
1368 | INIT_DELAYED_WORK(&j->write_work, journal_write_work); |
1369 | init_waitqueue_head(&j->reclaim_wait); |
1370 | init_waitqueue_head(&j->pin_flush_wait); |
1371 | mutex_init(&j->reclaim_lock); |
1372 | mutex_init(&j->discard_lock); |
1373 | |
1374 | lockdep_init_map(lock: &j->res_map, name: "journal res" , key: &res_key, subclass: 0); |
1375 | |
1376 | atomic64_set(v: &j->reservations.counter, |
1377 | i: ((union journal_res_state) |
1378 | { .cur_entry_offset = JOURNAL_ENTRY_CLOSED_VAL }).v); |
1379 | |
1380 | if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL))) |
1381 | return -BCH_ERR_ENOMEM_journal_pin_fifo; |
1382 | |
1383 | for (unsigned i = 0; i < ARRAY_SIZE(j->buf); i++) { |
1384 | j->buf[i].buf_size = JOURNAL_ENTRY_SIZE_MIN; |
1385 | j->buf[i].data = kvmalloc(size: j->buf[i].buf_size, GFP_KERNEL); |
1386 | if (!j->buf[i].data) |
1387 | return -BCH_ERR_ENOMEM_journal_buf; |
1388 | j->buf[i].idx = i; |
1389 | } |
1390 | |
1391 | j->pin.front = j->pin.back = 1; |
1392 | |
1393 | j->wq = alloc_workqueue(fmt: "bcachefs_journal" , |
1394 | flags: WQ_HIGHPRI|WQ_FREEZABLE|WQ_UNBOUND|WQ_MEM_RECLAIM, max_active: 512); |
1395 | if (!j->wq) |
1396 | return -BCH_ERR_ENOMEM_fs_other_alloc; |
1397 | return 0; |
1398 | } |
1399 | |
1400 | /* debug: */ |
1401 | |
1402 | void __bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) |
1403 | { |
1404 | struct bch_fs *c = container_of(j, struct bch_fs, journal); |
1405 | union journal_res_state s; |
1406 | unsigned long now = jiffies; |
1407 | u64 nr_writes = j->nr_flush_writes + j->nr_noflush_writes; |
1408 | |
1409 | if (!out->nr_tabstops) |
1410 | printbuf_tabstop_push(out, 24); |
1411 | out->atomic++; |
1412 | |
1413 | rcu_read_lock(); |
1414 | s = READ_ONCE(j->reservations); |
1415 | |
1416 | prt_printf(out, "dirty journal entries:\t%llu/%llu\n" , fifo_used(&j->pin), j->pin.size); |
1417 | prt_printf(out, "seq:\t\t\t%llu\n" , journal_cur_seq(j)); |
1418 | prt_printf(out, "seq_ondisk:\t\t%llu\n" , j->seq_ondisk); |
1419 | prt_printf(out, "last_seq:\t\t%llu\n" , journal_last_seq(j)); |
1420 | prt_printf(out, "last_seq_ondisk:\t%llu\n" , j->last_seq_ondisk); |
1421 | prt_printf(out, "flushed_seq_ondisk:\t%llu\n" , j->flushed_seq_ondisk); |
1422 | prt_printf(out, "watermark:\t\t%s\n" , bch2_watermarks[j->watermark]); |
1423 | prt_printf(out, "each entry reserved:\t%u\n" , j->entry_u64s_reserved); |
1424 | prt_printf(out, "nr flush writes:\t%llu\n" , j->nr_flush_writes); |
1425 | prt_printf(out, "nr noflush writes:\t%llu\n" , j->nr_noflush_writes); |
1426 | prt_printf(out, "average write size:\t" ); |
1427 | prt_human_readable_u64(out, nr_writes ? div64_u64(j->entry_bytes_written, nr_writes) : 0); |
1428 | prt_newline(out); |
1429 | prt_printf(out, "nr direct reclaim:\t%llu\n" , j->nr_direct_reclaim); |
1430 | prt_printf(out, "nr background reclaim:\t%llu\n" , j->nr_background_reclaim); |
1431 | prt_printf(out, "reclaim kicked:\t\t%u\n" , j->reclaim_kicked); |
1432 | prt_printf(out, "reclaim runs in:\t%u ms\n" , time_after(j->next_reclaim, now) |
1433 | ? jiffies_to_msecs(j->next_reclaim - jiffies) : 0); |
1434 | prt_printf(out, "blocked:\t\t%u\n" , j->blocked); |
1435 | prt_printf(out, "current entry sectors:\t%u\n" , j->cur_entry_sectors); |
1436 | prt_printf(out, "current entry error:\t%s\n" , bch2_journal_errors[j->cur_entry_error]); |
1437 | prt_printf(out, "current entry:\t\t" ); |
1438 | |
1439 | switch (s.cur_entry_offset) { |
1440 | case JOURNAL_ENTRY_ERROR_VAL: |
1441 | prt_printf(out, "error" ); |
1442 | break; |
1443 | case JOURNAL_ENTRY_CLOSED_VAL: |
1444 | prt_printf(out, "closed" ); |
1445 | break; |
1446 | default: |
1447 | prt_printf(out, "%u/%u" , s.cur_entry_offset, j->cur_entry_u64s); |
1448 | break; |
1449 | } |
1450 | |
1451 | prt_newline(out); |
1452 | prt_printf(out, "unwritten entries:" ); |
1453 | prt_newline(out); |
1454 | bch2_journal_bufs_to_text(out, j); |
1455 | |
1456 | prt_printf(out, |
1457 | "replay done:\t\t%i\n" , |
1458 | test_bit(JOURNAL_REPLAY_DONE, &j->flags)); |
1459 | |
1460 | prt_printf(out, "space:\n" ); |
1461 | prt_printf(out, "\tdiscarded\t%u:%u\n" , |
1462 | j->space[journal_space_discarded].next_entry, |
1463 | j->space[journal_space_discarded].total); |
1464 | prt_printf(out, "\tclean ondisk\t%u:%u\n" , |
1465 | j->space[journal_space_clean_ondisk].next_entry, |
1466 | j->space[journal_space_clean_ondisk].total); |
1467 | prt_printf(out, "\tclean\t\t%u:%u\n" , |
1468 | j->space[journal_space_clean].next_entry, |
1469 | j->space[journal_space_clean].total); |
1470 | prt_printf(out, "\ttotal\t\t%u:%u\n" , |
1471 | j->space[journal_space_total].next_entry, |
1472 | j->space[journal_space_total].total); |
1473 | |
1474 | for_each_member_device_rcu(c, ca, &c->rw_devs[BCH_DATA_journal]) { |
1475 | struct journal_device *ja = &ca->journal; |
1476 | |
1477 | if (!test_bit(ca->dev_idx, c->rw_devs[BCH_DATA_journal].d)) |
1478 | continue; |
1479 | |
1480 | if (!ja->nr) |
1481 | continue; |
1482 | |
1483 | prt_printf(out, "dev %u:\n" , ca->dev_idx); |
1484 | prt_printf(out, "\tnr\t\t%u\n" , ja->nr); |
1485 | prt_printf(out, "\tbucket size\t%u\n" , ca->mi.bucket_size); |
1486 | prt_printf(out, "\tavailable\t%u:%u\n" , bch2_journal_dev_buckets_available(j, ja, journal_space_discarded), ja->sectors_free); |
1487 | prt_printf(out, "\tdiscard_idx\t%u\n" , ja->discard_idx); |
1488 | prt_printf(out, "\tdirty_ondisk\t%u (seq %llu)\n" , ja->dirty_idx_ondisk, ja->bucket_seq[ja->dirty_idx_ondisk]); |
1489 | prt_printf(out, "\tdirty_idx\t%u (seq %llu)\n" , ja->dirty_idx, ja->bucket_seq[ja->dirty_idx]); |
1490 | prt_printf(out, "\tcur_idx\t\t%u (seq %llu)\n" , ja->cur_idx, ja->bucket_seq[ja->cur_idx]); |
1491 | } |
1492 | |
1493 | rcu_read_unlock(); |
1494 | |
1495 | --out->atomic; |
1496 | } |
1497 | |
1498 | void bch2_journal_debug_to_text(struct printbuf *out, struct journal *j) |
1499 | { |
1500 | spin_lock(lock: &j->lock); |
1501 | __bch2_journal_debug_to_text(out, j); |
1502 | spin_unlock(lock: &j->lock); |
1503 | } |
1504 | |
1505 | bool bch2_journal_seq_pins_to_text(struct printbuf *out, struct journal *j, u64 *seq) |
1506 | { |
1507 | struct journal_entry_pin_list *pin_list; |
1508 | struct journal_entry_pin *pin; |
1509 | |
1510 | spin_lock(lock: &j->lock); |
1511 | *seq = max(*seq, j->pin.front); |
1512 | |
1513 | if (*seq >= j->pin.back) { |
1514 | spin_unlock(lock: &j->lock); |
1515 | return true; |
1516 | } |
1517 | |
1518 | out->atomic++; |
1519 | |
1520 | pin_list = journal_seq_pin(j, seq: *seq); |
1521 | |
1522 | prt_printf(out, "%llu: count %u" , *seq, atomic_read(&pin_list->count)); |
1523 | prt_newline(out); |
1524 | printbuf_indent_add(out, 2); |
1525 | |
1526 | for (unsigned i = 0; i < ARRAY_SIZE(pin_list->list); i++) |
1527 | list_for_each_entry(pin, &pin_list->list[i], list) { |
1528 | prt_printf(out, "\t%px %ps" , pin, pin->flush); |
1529 | prt_newline(out); |
1530 | } |
1531 | |
1532 | if (!list_empty(head: &pin_list->flushed)) { |
1533 | prt_printf(out, "flushed:" ); |
1534 | prt_newline(out); |
1535 | } |
1536 | |
1537 | list_for_each_entry(pin, &pin_list->flushed, list) { |
1538 | prt_printf(out, "\t%px %ps" , pin, pin->flush); |
1539 | prt_newline(out); |
1540 | } |
1541 | |
1542 | printbuf_indent_sub(out, 2); |
1543 | |
1544 | --out->atomic; |
1545 | spin_unlock(lock: &j->lock); |
1546 | |
1547 | return false; |
1548 | } |
1549 | |
1550 | void bch2_journal_pins_to_text(struct printbuf *out, struct journal *j) |
1551 | { |
1552 | u64 seq = 0; |
1553 | |
1554 | while (!bch2_journal_seq_pins_to_text(out, j, seq: &seq)) |
1555 | seq++; |
1556 | } |
1557 | |